You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by br...@apache.org on 2008/10/01 08:26:48 UTC

svn commit: r700690 [1/2] - in /hadoop/zookeeper/trunk: ./ src/ src/java/ src/java/jmx/org/apache/zookeeper/jmx/server/ src/java/jmx/org/apache/zookeeper/server/ src/java/jmx/org/apache/zookeeper/server/quorum/ src/java/main/org/apache/zookeeper/server...

Author: breed
Date: Tue Sep 30 23:26:47 2008
New Revision: 700690

URL: http://svn.apache.org/viewvc?rev=700690&view=rev
Log:
ZOOKEEPER-38. headers (version+) in log/snap files

Added:
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileSnap.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/SnapShot.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/TxnLog.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/Util.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/SerializeUtils.java
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/DataTreeUnitTest.java
Modified:
    hadoop/zookeeper/trunk/CHANGES.txt
    hadoop/zookeeper/trunk/src/java/OldChangeLog
    hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/jmx/server/DataTreeBean.java
    hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/ManagedZooKeeperServer.java
    hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/ManagedZooKeeperServerMain.java
    hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/ObservableZooKeeperServer.java
    hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ManagedQuorumPeer.java
    hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ManagedQuorumPeerMain.java
    hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ObservableFollowerZooKeeperServer.java
    hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ObservableLeaderZooKeeperServer.java
    hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ObservableQuorumPeer.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PurgeTxnLog.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/ZooKeeperServerTest.java
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/RecoveryTest.java
    hadoop/zookeeper/trunk/src/zookeeper.jute

Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=700690&r1=700689&r2=700690&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Tue Sep 30 23:26:47 2008
@@ -96,3 +96,6 @@
 
  ZOOKEEPER-117. threading issues in Leader election (Flavio Junqueira and Patrick
  Hunt via breed)
+
+ ZOOKEEPER-38. headers (version+) in log/snap files (Andrew Kornev and Mahadev
+ Konar via breed)

Modified: hadoop/zookeeper/trunk/src/java/OldChangeLog
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/OldChangeLog?rev=700690&r1=700689&r2=700690&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/OldChangeLog (original)
+++ hadoop/zookeeper/trunk/src/java/OldChangeLog Tue Sep 30 23:26:47 2008
@@ -1,82 +0,0 @@
-Release 2.2.0
-2008-05-08 Andrew Kornev <ak...@users.sourceforge.net>
-
-    * phunt: [1956480] Renamed ZooLog to ZooTrace. Major cleanup of tracing.
-    
-    * fpj: [1958274] got rid of unused vars in leader election
-    
-    * fpj: [1958361] Patch to fix NPE upon access to watcher
-
-    * tedunning: [1951806] Added a sample startup script.
-    
-    * akornev: [1956499] Added the "dist" target to the ant buildfile.
-    
-    * breed: [1947090] Fixed improper timeout tracking at clients.
-    
-    * phunt: [1953737] Millisecond timing in the trace file.
-    
-    * phunt: [1949253] Move to log4j for logging.
-    
-    * phunt: [1942451] build optimization: uptodate check on jute
-    
-    * phunt: [1943392] Test environment changes: unit/func/perf/coverage test
-    
-    * mahadevkonar: [1934859] Performance enhancement for serialization of records.
-    
-    * phunt: [1931630] Fixed ZooKeeperServer loadData() method to optimally 
-      scan for the most recent valid snapshot.
-    
-    * akornev: [1917295] Root node watch not triggered
-
-    * breed: [1912209] Session End Game handling
-      
-    * akornev: [1913967] code refactoring for JMX enablement. Added ServerStats and 
-      QuorumStats classes. Bug fixes: OutOfMemory under heavy load, disk I/O now 
-      uses buffered streams, NIOServerCnxn.Factory shuffles the selector keys 
-      to avoid starvation. Lots of formatting: replaced tabs with whitespaces, 
-      DOS eol style converted to UNIX.
-      
-    * breed: [1882928] Log the uncaught exceptions from the SendThread and EventThread
-    
-    * fpj: [1881204] New leader election algorithm over TCP.
-    
-    * akornev: [1898314] Added support for server version info at runtime; added 
-      the "release" target to ant build file
-    
-    * breed: [1892108] Configurable packet sanity check
-    
-    * akornev: [1889354] JAR manifest file now includes additional metadata: Built-By, 
-      Built-At, Built-On, Implementation-Title, Implementation-Version and 
-      Implementation-Vendor. Use SvnAnt ant task to extract SVN version number.
-      
-    * mahadevkonar: [1881545] fixed logging to output session id in hex
-
-Release 1.1.0
-2008-01-28 Andrew Kornev <ak...@users.sourceforge.net>
-
-    * breed: [1875540] Make sure java client aborts the outgoing packets when 
-      a connection closes.
-      
-    * Jute compiler: emit the #ifdef extern "C" guards in the generated .jute.h
-    
-    * mahadevkonar: [1844561] fast sync between the leader and the follower.
-    
-    * mahadevkonar: [1849444] fixed session id generation routine to generate 
-      unique session ids.
-      
-    * breed: [1845696] fixed a race condition in the quorum server where it 
-      is possible that a create session request can be committed and applied 
-      at a follower before it is applied at the leader.
-      
-    * breed: [1841938] implemented the sync operation to flush updates pedning 
-      on the Leader.
-      
-    * vlarsen: [1835834] fixed a few compiler warnings, removed some @Override 
-      annotations used with interfaces.
-    
-Release 1.0.0
-2007-11-27 Andrew Kornev <ak...@users.sourceforge.net>
-
-    * Updated the jute compiler to emit int32_t vs int in the generated C code
-    
-    * Changed release numbering scheme to match that of the C client
\ No newline at end of file

Modified: hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/jmx/server/DataTreeBean.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/jmx/server/DataTreeBean.java?rev=700690&r1=700689&r2=700690&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/jmx/server/DataTreeBean.java (original)
+++ hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/jmx/server/DataTreeBean.java Tue Sep 30 23:26:47 2008
@@ -55,6 +55,7 @@
         }
         return stream.size();
       */
+        LOG.warn("Not Implemented");            
         return -1;
     }
 

Modified: hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/ManagedZooKeeperServer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/ManagedZooKeeperServer.java?rev=700690&r1=700689&r2=700690&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/ManagedZooKeeperServer.java (original)
+++ hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/ManagedZooKeeperServer.java Tue Sep 30 23:26:47 2008
@@ -18,8 +18,11 @@
 
 package org.apache.zookeeper.server;
 
+import static org.apache.zookeeper.server.ServerConfig.getClientPort;
+
 import java.io.File;
 import java.io.IOException;
+import java.net.URI;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.log4j.Logger;
@@ -30,6 +33,9 @@
 import org.apache.zookeeper.jmx.server.DataTreeMXBean;
 import org.apache.zookeeper.jmx.server.ZooKeeperServerBean;
 import org.apache.zookeeper.jmx.server.ZooKeeperServerMXBean;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.persistence.Util;
+
 import org.apache.zookeeper.server.util.ConnectionObserver;
 import org.apache.zookeeper.server.util.ObserverManager;
 import org.apache.zookeeper.server.util.ServerObserver;
@@ -103,22 +109,17 @@
         }
     }
 
-    public ManagedZooKeeperServer() {
-        super();
+    public ManagedZooKeeperServer(FileTxnSnapLog logFactory, 
+            int tickTime,DataTreeBuilder treeBuilder) throws IOException {
+        super(logFactory, tickTime,treeBuilder);
         ObserverManager.getInstance().add(new ManagedServerObserver());
         ObserverManager.getInstance().add(new ManagedConnectionObserver());
     }
 
-    public ManagedZooKeeperServer(File dataDir, File dataLogDir, int tickTime, DataTreeBuilder treeBuilder) throws IOException {
-        super(dataDir, dataLogDir, tickTime, treeBuilder);
+    public ManagedZooKeeperServer(FileTxnSnapLog logFactory,
+            DataTreeBuilder treeBuilder) throws IOException {
+        super(logFactory,treeBuilder);
         ObserverManager.getInstance().add(new ManagedServerObserver());
         ObserverManager.getInstance().add(new ManagedConnectionObserver());
     }
-
-    public ManagedZooKeeperServer(File dataDir, File dataLogDir, int tickTime) throws IOException {
-        super(dataDir, dataLogDir, tickTime);
-        ObserverManager.getInstance().add(new ManagedServerObserver());
-        ObserverManager.getInstance().add(new ManagedConnectionObserver());
-    }
-
 }

Modified: hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/ManagedZooKeeperServerMain.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/ManagedZooKeeperServerMain.java?rev=700690&r1=700689&r2=700690&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/ManagedZooKeeperServerMain.java (original)
+++ hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/ManagedZooKeeperServerMain.java Tue Sep 30 23:26:47 2008
@@ -26,6 +26,8 @@
 import org.apache.zookeeper.jmx.server.ConnectionMXBean;
 import org.apache.zookeeper.jmx.server.DataTreeMXBean;
 import org.apache.zookeeper.jmx.server.ZooKeeperServerMXBean;
+import org.apache.zookeeper.server.persistence.FileTxnLog;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.util.ZooKeeperObserverManager;
 
 /**
@@ -65,12 +67,10 @@
                 return new ObservableNIOServerCnxn.Factory(getClientPort());
             }
             public ZooKeeperServer createServer() throws IOException {
-                ManagedZooKeeperServer zks = new ManagedZooKeeperServer();
-                zks.setDataDir(new File(ServerConfig.getDataDir()));
-                zks.setDataLogDir(new File(ServerConfig.getDataLogDir()));
-                zks.setClientPort(ServerConfig.getClientPort());
-                // TODO: we may want to build an observable/managed data tree here instead
-                zks.setTreeBuilder(new ZooKeeperServer.BasicDataTreeBuilder());
+                ManagedZooKeeperServer zks = new ManagedZooKeeperServer(
+                        new FileTxnSnapLog(new File(ServerConfig.getDataDir()),
+                        new File(ServerConfig.getDataLogDir())),
+                new ZooKeeperServer.BasicDataTreeBuilder());
                 return zks;
             }
         });

Modified: hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/ObservableZooKeeperServer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/ObservableZooKeeperServer.java?rev=700690&r1=700689&r2=700690&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/ObservableZooKeeperServer.java (original)
+++ hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/ObservableZooKeeperServer.java Tue Sep 30 23:26:47 2008
@@ -18,11 +18,13 @@
 
 package org.apache.zookeeper.server;
 
-import java.io.File;
 import java.io.IOException;
 
+
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.util.ObserverManager;
 import org.apache.zookeeper.server.util.ServerObserver;
+
 /**
  * The observable server broadcast notifications when its state changes. 
  * 
@@ -33,17 +35,13 @@
 public class ObservableZooKeeperServer extends ZooKeeperServer{
 
     private ZooKeeperObserverNotifier notifier=new ZooKeeperObserverNotifier(this);
-    
-    public ObservableZooKeeperServer() {
-        super();
+    public ObservableZooKeeperServer(FileTxnSnapLog logFactory, 
+            int tickTime,DataTreeBuilder treeBuilder) throws IOException {
+        super(logFactory, tickTime,treeBuilder);
     }
-
-    public ObservableZooKeeperServer(File dataDir, File dataLogDir, int tickTime, DataTreeBuilder treeBuilder) throws IOException {
-        super(dataDir, dataLogDir, tickTime, treeBuilder);
-    }
-
-    public ObservableZooKeeperServer(File dataDir, File dataLogDir, int tickTime) throws IOException {
-        super(dataDir, dataLogDir, tickTime);
+    public ObservableZooKeeperServer(FileTxnSnapLog logFactory,
+            DataTreeBuilder treeBuilder) throws IOException {
+        super(logFactory,treeBuilder);
     }
 
     public void shutdown() {

Modified: hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ManagedQuorumPeer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ManagedQuorumPeer.java?rev=700690&r1=700689&r2=700690&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ManagedQuorumPeer.java (original)
+++ hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ManagedQuorumPeer.java Tue Sep 30 23:26:47 2008
@@ -46,6 +46,8 @@
 import org.apache.zookeeper.server.ServerCnxn;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.ZooTrace;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.persistence.Util;
 import org.apache.zookeeper.server.util.ConnectionObserver;
 import org.apache.zookeeper.server.util.ObserverManager;
 import org.apache.zookeeper.server.util.QuorumPeerObserver;
@@ -207,15 +209,21 @@
         setupObservers();
     }
 
-    public ManagedQuorumPeer(ArrayList<QuorumServer> quorumPeers, File dataDir, File dataLogDir, int clientPort, int electionAlg, int electionPort, long myid, int tickTime, int initLimit,
-                                int syncLimit) throws IOException {
-        super(quorumPeers, dataDir, dataLogDir, clientPort, electionAlg, electionPort, myid, tickTime, initLimit, syncLimit);
+    public ManagedQuorumPeer(ArrayList<QuorumServer> quorumPeers, 
+            File dataDir, File dataLogDir, int clientPort, 
+            int electionAlg, int electionPort, long myid, int tickTime, int initLimit,
+            int syncLimit) throws IOException {
+        super(quorumPeers, dataDir, dataLogDir, clientPort, 
+                electionAlg, electionPort, myid, tickTime, initLimit, syncLimit);
         setupObservers();
     }
 
-    public ManagedQuorumPeer(ArrayList<QuorumServer> quorumPeers, File dataDir, File dataLogDir, int electionType, int electionPort, long myid, int tickTime, int initLimit, int syncLimit,
-                                NIOServerCnxn.Factory cnxnFactory) throws IOException {
-        super(quorumPeers, dataDir, dataLogDir, electionType, electionPort, myid, tickTime, initLimit, syncLimit, cnxnFactory);
+    public ManagedQuorumPeer(ArrayList<QuorumServer> quorumPeers, 
+            File dataDir, File dataLogDir, int electionType, int electionPort,
+            long myid, int tickTime, int initLimit, int syncLimit,
+            NIOServerCnxn.Factory cnxnFactory) throws IOException {
+        super(quorumPeers, dataDir, dataLogDir, electionType, electionPort,
+                myid, tickTime, initLimit, syncLimit, cnxnFactory);
         setupObservers();
     }
 

Modified: hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ManagedQuorumPeerMain.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ManagedQuorumPeerMain.java?rev=700690&r1=700689&r2=700690&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ManagedQuorumPeerMain.java (original)
+++ hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ManagedQuorumPeerMain.java Tue Sep 30 23:26:47 2008
@@ -35,6 +35,14 @@
 import org.apache.zookeeper.server.NIOServerCnxn;
 import org.apache.zookeeper.server.ObservableNIOServerCnxn;
 import org.apache.zookeeper.server.ServerConfig;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.ZooTrace;
+import org.apache.zookeeper.server.persistence.FileTxnLog;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.util.ConnectionObserver;
+import org.apache.zookeeper.server.util.ObserverManager;
+import org.apache.zookeeper.server.util.QuorumPeerObserver;
+import org.apache.zookeeper.server.util.ServerObserver;
 import org.apache.zookeeper.server.util.ZooKeeperObserverManager;
 
 /**
@@ -82,12 +90,13 @@
             ZooKeeperObserverManager.setAsConcrete();
             runPeer(new QuorumPeer.Factory() {
                 public QuorumPeer create(NIOServerCnxn.Factory cnxnFactory)
-                        throws IOException {
-                    
+                throws IOException {
                     ManagedQuorumPeer peer = new ManagedQuorumPeer();
                     peer.setClientPort(ServerConfig.getClientPort());
-                    peer.setDataDir(new File(ServerConfig.getDataDir()));
-                    peer.setDataLogDir(new File(ServerConfig.getDataLogDir()));
+                    FileTxnSnapLog factory = new FileTxnSnapLog(new 
+                            File(ServerConfig.getDataLogDir()), new  
+                                    File(ServerConfig.getDataDir()));
+                    peer.setTxnFactory(factory);
                     peer.setQuorumPeers(QuorumPeerConfig.getServers());
                     peer.setElectionPort(QuorumPeerConfig.getElectionPort());
                     peer.setElectionType(QuorumPeerConfig.getElectionAlg());
@@ -97,7 +106,7 @@
                     peer.setSyncLimit(QuorumPeerConfig.getSyncLimit());
                     peer.setCnxnFactory(cnxnFactory);
                     return peer;
-                    
+
                 }
                 public NIOServerCnxn.Factory createConnectionFactory() throws IOException {
                     return new ObservableNIOServerCnxn.Factory(getClientPort());

Modified: hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ObservableFollowerZooKeeperServer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ObservableFollowerZooKeeperServer.java?rev=700690&r1=700689&r2=700690&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ObservableFollowerZooKeeperServer.java (original)
+++ hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ObservableFollowerZooKeeperServer.java Tue Sep 30 23:26:47 2008
@@ -18,10 +18,10 @@
 
 package org.apache.zookeeper.server.quorum;
 
-import java.io.File;
 import java.io.IOException;
 
 import org.apache.zookeeper.server.ZooKeeperObserverNotifier;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.util.ObserverManager;
 import org.apache.zookeeper.server.util.ServerObserver;
 
@@ -37,9 +37,9 @@
 
     private ZooKeeperObserverNotifier notifier;
 
-    public ObservableFollowerZooKeeperServer(File dataDir, File dataLogDir,
+    public ObservableFollowerZooKeeperServer(FileTxnSnapLog logFactory,
             QuorumPeer self, DataTreeBuilder treeBuilder) throws IOException {
-        super(dataDir, dataLogDir, self, treeBuilder);
+        super(logFactory, self, treeBuilder);
         notifier=new ZooKeeperObserverNotifier(this);
     }
     

Modified: hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ObservableLeaderZooKeeperServer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ObservableLeaderZooKeeperServer.java?rev=700690&r1=700689&r2=700690&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ObservableLeaderZooKeeperServer.java (original)
+++ hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ObservableLeaderZooKeeperServer.java Tue Sep 30 23:26:47 2008
@@ -18,10 +18,10 @@
 
 package org.apache.zookeeper.server.quorum;
 
-import java.io.File;
 import java.io.IOException;
 
 import org.apache.zookeeper.server.ZooKeeperObserverNotifier;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.util.ObserverManager;
 import org.apache.zookeeper.server.util.ServerObserver;
 
@@ -37,9 +37,9 @@
 
     private ZooKeeperObserverNotifier notifier;
 
-    public ObservableLeaderZooKeeperServer(File dataDir, File dataLogDir,
+    public ObservableLeaderZooKeeperServer(FileTxnSnapLog logFactory,
             QuorumPeer self, DataTreeBuilder treeBuilder) throws IOException {
-        super(dataDir, dataLogDir, self, treeBuilder);
+        super(logFactory, self, treeBuilder);
         notifier=new ZooKeeperObserverNotifier(this);
     }
 

Modified: hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ObservableQuorumPeer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ObservableQuorumPeer.java?rev=700690&r1=700689&r2=700690&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ObservableQuorumPeer.java (original)
+++ hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ObservableQuorumPeer.java Tue Sep 30 23:26:47 2008
@@ -24,6 +24,7 @@
 
 import org.apache.zookeeper.server.NIOServerCnxn;
 import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.util.EventInfo;
 import org.apache.zookeeper.server.util.ObservableComponent;
 import org.apache.zookeeper.server.util.ObserverManager;
@@ -58,32 +59,40 @@
         };
         public abstract void dispatch(ObservableQuorumPeer peer,QuorumPeerObserver ob);
     }
-
-
     public ObservableQuorumPeer() {
         super();
     }
 
-    public ObservableQuorumPeer(ArrayList<QuorumServer> quorumPeers, File dataDir, File dataLogDir, int clientPort, int electionAlg, int electionPort, long myid, int tickTime, int initLimit,
-                                int syncLimit) throws IOException {
-        super(quorumPeers, dataDir, dataLogDir, clientPort, electionAlg, electionPort, myid, tickTime, initLimit, syncLimit);
+    public ObservableQuorumPeer(ArrayList<QuorumServer> quorumPeers, File dataDir,
+            File dataLogDir, int clientPort, int electionAlg,
+            int electionPort, long myid, int tickTime, int initLimit,
+            int syncLimit) throws IOException {
+        super(quorumPeers, dataDir, dataLogDir, clientPort, 
+                electionAlg, electionPort, myid, tickTime, initLimit, syncLimit);
     }
 
-    public ObservableQuorumPeer(ArrayList<QuorumServer> quorumPeers, File dataDir, File dataLogDir, int electionType, int electionPort, long myid, int tickTime, int initLimit, int syncLimit,
-                                NIOServerCnxn.Factory cnxnFactory) throws IOException {
-        super(quorumPeers, dataDir, dataLogDir, electionType, electionPort, myid, tickTime, initLimit, syncLimit, cnxnFactory);
+    public ObservableQuorumPeer(ArrayList<QuorumServer> quorumPeers,
+            File dataDir, File dataLogDir, int electionType, 
+            int electionPort, long myid, int tickTime, 
+            int initLimit, int syncLimit,
+            NIOServerCnxn.Factory cnxnFactory) throws IOException {
+        super(quorumPeers, dataDir, dataLogDir, electionType, electionPort,
+                myid, tickTime, initLimit, syncLimit, cnxnFactory);
     }
 
+
     // instantiate an observable follower
-    protected Follower makeFollower(File dataDir,File dataLogDir) throws IOException {
-        return new ObservableFollower(this, new ObservableFollowerZooKeeperServer(dataDir,
-                dataLogDir, this,new ZooKeeperServer.BasicDataTreeBuilder()));
+    protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
+        return new ObservableFollower(this, 
+                new ObservableFollowerZooKeeperServer(logFactory, this,
+                        new ZooKeeperServer.BasicDataTreeBuilder()));
     }
 
     // instantiate an observable leader
-    protected Leader makeLeader(File dataDir,File dataLogDir) throws IOException {
-        return new ObservableLeader(this, new ObservableLeaderZooKeeperServer(dataDir, 
-                dataLogDir,this,new ZooKeeperServer.BasicDataTreeBuilder()));
+    protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException {
+        return new ObservableLeader(this, 
+                new ObservableLeaderZooKeeperServer(logFactory, 
+                        this,new ZooKeeperServer.BasicDataTreeBuilder()));
     }
 
     public void run() {

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java?rev=700690&r1=700689&r2=700690&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java Tue Sep 30 23:26:47 2008
@@ -285,7 +285,8 @@
         }
     }
 
-    public ArrayList<String> getChildren(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException {
+    public List<String> getChildren(String path, Stat stat, Watcher watcher) 
+            throws KeeperException.NoNodeException {
         DataNode n = nodes.get(path);
         if (n == null) {
             throw new KeeperException.NoNodeException();
@@ -462,8 +463,7 @@
      * @throws IOException
      * @throws InterruptedException
      */
-    void serializeNode(OutputArchive oa, StringBuilder path)
-            throws IOException, InterruptedException {
+    void serializeNode(OutputArchive oa, StringBuilder path) throws IOException {
         String pathString = path.toString();
         DataNode node = getNode(pathString);
         if (node == null) {
@@ -494,8 +494,7 @@
 
     public boolean initialized = false;
 
-    public void serialize(OutputArchive oa, String tag) throws IOException,
-            InterruptedException {
+    public void serialize(OutputArchive oa, String tag) throws IOException {
         scount = 0;
         serializeNode(oa, new StringBuilder(""));
         // / marks end of stream

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java?rev=700690&r1=700689&r2=700690&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java Tue Sep 30 23:26:47 2008
@@ -20,7 +20,6 @@
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.log4j.Logger;
@@ -212,7 +211,7 @@
                 }
                 PrepRequestProcessor.checkACL(zks, n.acl, ZooDefs.Perms.READ,
                         request.authInfo);
-                ArrayList<String> children = zks.dataTree.getChildren(
+                List<String> children = zks.dataTree.getChildren(
                         getChildrenRequest.getPath(), stat, getChildrenRequest
                                 .getWatch() ? request.cnxn : null);
                 rsp = new GetChildrenResponse(children);

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PurgeTxnLog.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PurgeTxnLog.java?rev=700690&r1=700689&r2=700690&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PurgeTxnLog.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PurgeTxnLog.java Tue Sep 30 23:26:47 2008
@@ -22,16 +22,21 @@
 import java.io.FileFilter;
 import java.io.IOException;
 import java.text.DateFormat;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.persistence.Util;
+
 public class PurgeTxnLog {
 
     static void printUsage(){
-        System.out.println("PurgeTxnLog dataLogDir ");
+        System.out.println("PurgeTxnLog dataLogDir [snapDir]");
         System.out.println("\tdataLogDir -- path to the txn log directory");
+        System.out.println("\tsnapDir -- path to the snapshot directory");
         System.exit(1);
     }
     /**
@@ -39,38 +44,42 @@
      *     dataLogDir -- txn log directory
      */
     public static void main(String[] args) throws IOException {
-        if(args.length!=1)
+        if(args.length<1 || args.length>2)
             printUsage();
 
         File dataDir=new File(args[0]);
-
-        // find the most recent valid snapshot
-        long highestZxid = -1;
-        for (File f : dataDir.listFiles()) {
-            long zxid = ZooKeeperServer.isValidSnapshot(f);
-            if (zxid > highestZxid) {
-                highestZxid = zxid;
+        File snapDir=dataDir;
+        if(args.length==2){
+            snapDir=new File(args[1]);
             }
-        }
-        // found any valid snapshots?
-        if(highestZxid==-1)
-            return;  // no snapshots
-
+        FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir);
+        
+        // found any valid recent snapshots?
+        
         // files to exclude from deletion
         Set<File> exc=new HashSet<File>();
-        exc.add(new File(dataDir, "snapshot."+Long.toHexString(highestZxid)));
-        exc.addAll(Arrays.asList(ZooKeeperServer.getLogFiles(dataDir.listFiles(),highestZxid)));
+        File snapShot = txnLog.findMostRecentSnapshot();
+        exc.add(txnLog.findMostRecentSnapshot());
+        long zxid = Util.getZxidFromName(snapShot.getName(),"snapshot");
+        exc.addAll(Arrays.asList(txnLog.getSnapshotLogs(zxid)));
 
         final Set<File> exclude=exc;
-        List<File> files=Arrays.asList(dataDir.listFiles(new FileFilter(){
+        class MyFileFilter implements FileFilter{
+            private final String prefix;
+            MyFileFilter(String prefix){
+                this.prefix=prefix;
+            }
             public boolean accept(File f){
-                if(!f.getName().startsWith("log.") &&
-                        !f.getName().startsWith("snapshot."))
-                    return false;
-                if(exclude.contains(f))
+                if(!f.getName().startsWith(prefix) || exclude.contains(f))
                     return false;
                 return true;
-            }}));
+            }
+        }
+        // add all non-excluded log files
+        List<File> files=new ArrayList<File>(
+                Arrays.asList(dataDir.listFiles(new MyFileFilter("log."))));
+        // add all non-excluded snapshot files to the deletion list
+        files.addAll(Arrays.asList(snapDir.listFiles(new MyFileFilter("snapshot."))));
         // remove the old files
         for(File f: files)
         {

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java?rev=700690&r1=700689&r2=700690&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java Tue Sep 30 23:26:47 2008
@@ -18,22 +18,13 @@
 
 package org.apache.zookeeper.server;
 
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
 import java.util.LinkedList;
 import java.util.Random;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.log4j.Logger;
 
-import org.apache.jute.BinaryOutputArchive;
-import org.apache.jute.Record;
-import org.apache.zookeeper.server.util.Profiler;
-import org.apache.zookeeper.txn.TxnHeader;
 
 /**
  * This RequestProcessor logs requests to disk. It batches the requests to do
@@ -42,57 +33,24 @@
  */
 public class SyncRequestProcessor extends Thread implements RequestProcessor {
     private static final Logger LOG = Logger.getLogger(SyncRequestProcessor.class);
-
-    static final int PADDING_TIMEOUT=1000;
-    ZooKeeperServer zks;
-
-    LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>();
-
-    static boolean forceSync;
-    static {
-        forceSync = !System.getProperty("zookeeper.forceSync", "yes").equals(
-                "no");
-    }
-
-    private static long preAllocSize = 65536 * 1024;
-    static {
-        String size = System.getProperty("zookeeper.preAllocSize");
-        if (size != null) {
-            try {
-                preAllocSize = Long.parseLong(size) * 1024;
-            } catch (NumberFormatException e) {
-                LOG.warn(size 
-                        + " is not a valid value for zookeeper.preAllocSize");
-            }
-        }
-    }
-    
+    private ZooKeeperServer zks;
+    private LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>();
+    private RequestProcessor nextProcessor;
+    boolean timeToDie = false;
     /**
-     * Change the data log pre-allocation size on the fly.
-     * 
-     * You might want to do this on systems (Windows esp) where preallocation
-     * is slow, WARN messages are output the log if preAllocation is taking
-     * too long -- will stall the request pipeline.
-     * 
-     * This value can also be set through the "zookeeper.preAllocSize" (also
-     * in K bytes) environment variable when starting the jvm.
-     * 
-     * @param size size in K bytes to change the log prealloc to
+     * Transactions that have been written and are waiting to be flushed to
+     * disk. Basically this is the list of SyncItems whose callbacks will be
+     * invoked after flush returns successfully.
      */
-    public static void setPreAllocSize(long size) {
-        preAllocSize = size * 1024; 
-    }
-
+    private LinkedList<Request> toFlush = new LinkedList<Request>();
+    private Random r = new Random(System.nanoTime());
+    private int logCount = 0;
     /**
      * The number of log entries to log before starting a snapshot
      */
-    static public int snapCount = ZooKeeperServer.getSnapCount();
-
-    Thread snapInProcess;
-
-    RequestProcessor nextProcessor;
+    public static int snapCount = ZooKeeperServer.getSnapCount();
 
-    boolean timeToDie = false;
+    private Request requestOfDeath = Request.requestOfDeath;
 
     public SyncRequestProcessor(ZooKeeperServer zks,
             RequestProcessor nextProcessor) {
@@ -102,45 +60,13 @@
         start();
     }
 
-    /**
-     * Transactions that have been written and are waiting to be flushed to
-     * disk. Basically this is the list of SyncItems whose callbacks will be
-     * invoked after flush returns successfully.
-     */
-    LinkedList<Request> toFlush = new LinkedList<Request>();
-
-    FileOutputStream logStream;
-
-    BinaryOutputArchive logArchive;
-
-    Random r = new Random(System.nanoTime());
-
-    int logCount = 0;
-
-    Request requestOfDeath = Request.requestOfDeath;
-
-    private static ByteBuffer fill = ByteBuffer.allocateDirect(1024);
-
-    LinkedList<FileOutputStream> streamsToFlush = new LinkedList<FileOutputStream>();
-
-    private long padLogFile(FileChannel fc,long fileSize) throws IOException{
-        long position = fc.position();
-        // We pad the file in 1M chunks to avoid syncing to
-        // write the new filesize.
-        if (position + 4096 >= fileSize) {
-            fileSize = fileSize + preAllocSize;
-            fill.position(0);
-            fc.write(fill, fileSize);
-        }
-        return fileSize;
+    private void startSnapshot() throws IOException {
+        zks.takeSnapshot();
     }
 
     @Override
     public void run() {
         try {
-            long fileSize = 0;
-            long lastZxidSeen = -1;
-            FileChannel fc = null;
             while (true) {
                 Request si = null;
                 if (toFlush.isEmpty()) {
@@ -156,75 +82,16 @@
                     break;
                 }
                 if (si != null) {
-                    // LOG.warn("Sync>>> cxid = " + si.cxid + " type = " +
-                    // si.type + " id = " + si.sessionId + " zxid = " +
-                    // Long.toHexString(si.zxid));
-                    ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK,
-                            'S', si, "");
-                    TxnHeader hdr = si.hdr;
-                    if (hdr != null) {
-                        if (hdr.getZxid() <= lastZxidSeen) {
-                            LOG.warn("Current zxid " + hdr.getZxid()
-                                    + " is <= " + lastZxidSeen + " for "
-                                    + hdr.getType());
-                        }
-                        Record txn = si.txn;
-                        if (logStream == null) {
-                            fileSize = 0;
-                            logStream = new FileOutputStream(new File(
-                                    zks.dataLogDir, ZooKeeperServer
-                                            .getLogName(hdr.getZxid())));
-                            synchronized (streamsToFlush) {
-                                streamsToFlush.add(logStream);
-                            }
-                            fc = logStream.getChannel();
-                            logArchive = BinaryOutputArchive
-                                    .getArchive(logStream);
-                        }
-                        final long fsize=fileSize;
-                        final FileChannel ffc=fc;
-                        fileSize = Profiler.profile(
-                            new Profiler.Operation<Long>() {
-                                public Long execute() throws Exception {
-                                    return SyncRequestProcessor.this
-                                            .padLogFile(ffc, fsize);
-                                }
-                            }, PADDING_TIMEOUT,
-                            "Logfile padding exceeded time threshold"
-                        );
-                        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-                        BinaryOutputArchive boa = BinaryOutputArchive
-                                .getArchive(baos);
-                        hdr.serialize(boa, "hdr");
-                        if (txn != null) {
-                            txn.serialize(boa, "txn");
-                        }
-                        logArchive.writeBuffer(baos.toByteArray(), "txnEntry");
-                        logArchive.writeByte((byte) 0x42, "EOR");
+                    zks.getLogWriter().append(si);
                         logCount++;
                         if (logCount > snapCount / 2
                                 && r.nextInt(snapCount / 2) == 0) {
-                            // We just want one snapshot going at a time
-                            if (snapInProcess != null
-                                    && snapInProcess.isAlive()) {
-                                LOG.warn("Too busy to snap, skipping");
-                            } else {
-                                logStream = null;
-                                logArchive = null;
-                                snapInProcess = new Thread() {
-                                    public void run() {
-                                        try {
-                                            zks.snapshot();
-                                        } catch (Exception e) {
-                                            LOG.warn("Unexpected exception",e);
-                                        }
-                                    }
-                                };
-                                snapInProcess.start();
-                            }
+                            // roll the log
+                            zks.getLogWriter().rollLog();
+                            // take a snapshot
+                            startSnapshot();
                             logCount = 0;
                         }
-                    }
                     toFlush.add(si);
                     if (toFlush.size() > 1000) {
                         flush(toFlush);
@@ -236,33 +103,15 @@
             System.exit(11);
         }
         ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
-                                     "SyncRequestProcessor exiyed!");
+                                     "SyncRequestProcessor exited!");
     }
 
     @SuppressWarnings("unchecked")
     private void flush(LinkedList<Request> toFlush) throws IOException {
-        if (toFlush.size() == 0) {
+        if (toFlush.size() == 0)
             return;
-        }
 
-        LinkedList<FileOutputStream> streamsToFlushNow;
-        synchronized (streamsToFlush) {
-            streamsToFlushNow = (LinkedList<FileOutputStream>) streamsToFlush
-                    .clone();
-        }
-        for (FileOutputStream fos : streamsToFlushNow) {
-            fos.flush();
-            if (forceSync) {
-                fos.getChannel().force(false);
-            }
-        }
-        while (streamsToFlushNow.size() > 1) {
-            FileOutputStream fos = streamsToFlushNow.removeFirst();
-            fos.close();
-            synchronized (streamsToFlush) {
-                streamsToFlush.remove(fos);
-            }
-        }
+        zks.getLogWriter().commit();
         while (toFlush.size() > 0) {
             Request i = toFlush.remove();
             nextProcessor.processRequest(i);

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java?rev=700690&r1=700689&r2=700690&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java Tue Sep 30 23:26:47 2008
@@ -18,53 +18,36 @@
 
 package org.apache.zookeeper.server;
 
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.EOFException;
 import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
-import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import org.apache.log4j.Logger;
-
 import org.apache.jute.BinaryInputArchive;
 import org.apache.jute.BinaryOutputArchive;
 import org.apache.jute.InputArchive;
+import org.apache.jute.OutputArchive;
 import org.apache.jute.Record;
+import org.apache.log4j.Logger;
 import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Id;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.proto.RequestHeader;
 import org.apache.zookeeper.server.SessionTracker.SessionExpirer;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog.PlayBackListener;
 import org.apache.zookeeper.server.quorum.Leader;
 import org.apache.zookeeper.server.quorum.QuorumPacket;
 import org.apache.zookeeper.server.quorum.Leader.Proposal;
-import org.apache.zookeeper.txn.CreateSessionTxn;
-import org.apache.zookeeper.txn.CreateTxn;
-import org.apache.zookeeper.txn.DeleteTxn;
-import org.apache.zookeeper.txn.ErrorTxn;
-import org.apache.zookeeper.txn.SetACLTxn;
-import org.apache.zookeeper.txn.SetDataTxn;
+import org.apache.zookeeper.server.util.SerializeUtils;
 import org.apache.zookeeper.txn.TxnHeader;
 
 /**
@@ -108,14 +91,7 @@
     private DataTreeBuilder treeBuilder;
     public DataTree dataTree;
     protected SessionTracker sessionTracker;
-    /**
-     * directory for storing the snapshot
-     */
-    File dataDir;
-    /**
-     * directoy for storing the log tnxns
-     */
-    File dataLogDir;
+    private FileTxnSnapLog txnLogFactory = null;
     protected ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;
     protected long hzxid = 0;
     final public static Exception ok = new Exception("No prob");
@@ -131,12 +107,12 @@
     List<ChangeRecord> outstandingChanges = new ArrayList<ChangeRecord>();
     private NIOServerCnxn.Factory serverCnxnFactory;
     private int clientPort;
-
- 
+    
     void removeCnxn(ServerCnxn cnxn) {
         dataTree.removeCnxn(cnxn);
     }
 
+ 
     /**
      * Creates a ZooKeeperServer instance. Nothing is setup, use the setX
      * methods to prepare the instance (eg datadir, datalogdir, ticktime, 
@@ -148,6 +124,8 @@
         ServerStats.getInstance().setStatsProvider(this);
         treeBuilder = new BasicDataTreeBuilder();
     }
+
+    
     /**
      * Creates a ZooKeeperServer instance. It sets everything up, but doesn't
      * actually start listening for clients until run() is invoked.
@@ -155,27 +133,25 @@
      * @param dataDir the directory to put the data
      * @throws IOException
      */
-    public ZooKeeperServer(File dataDir, File dataLogDir, int tickTime,
+    public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime,
             DataTreeBuilder treeBuilder) throws IOException {
-        this.dataDir = dataDir;
-        this.dataLogDir = dataLogDir;
+        this.txnLogFactory=txnLogFactory;
         this.tickTime = tickTime;
         this.treeBuilder = treeBuilder;
         ServerStats.getInstance().setStatsProvider(this);
         
-        LOG.info("Created server with dataDir:" + dataDir 
-                + " dataLogDir:" + dataLogDir
-                + " tickTime:" + tickTime);
+        LOG.info("Created server");
     }
 
     /**
      * This constructor is for backward compatibility with the existing unit
      * test code.
+     * It defaults to FileLogProvider persistence provider.
      */
-    public ZooKeeperServer(File dataDir, File dataLogDir, int tickTime)
-        throws IOException 
-    {
-        this(dataDir, dataLogDir, tickTime, new BasicDataTreeBuilder());
+    public ZooKeeperServer(File snapDir, File logDir, int tickTime)
+            throws IOException {
+        this(new FileTxnSnapLog(snapDir,logDir),
+                tickTime,new BasicDataTreeBuilder());
     }
 
     /**
@@ -183,180 +159,28 @@
      *
      * @throws IOException
      */
-    public ZooKeeperServer(DataTreeBuilder treeBuilder) throws IOException {
-        this(new File(ServerConfig.getDataDir()), new File(ServerConfig
-                .getDataLogDir()), DEFAULT_TICK_TIME, treeBuilder);
-    }
-
-    public static long getZxidFromName(String name, String prefix) {
-        long zxid = -1;
-        String nameParts[] = name.split("\\.");
-        if (nameParts.length == 2 && nameParts[0].equals(prefix)) {
-            try {
-                zxid = Long.parseLong(nameParts[1], 16);
-            } catch (NumberFormatException e) {
-                LOG.warn("unable to parse zxid string into long: "
-                        + nameParts[1]);
-            }
-        }
-        return zxid;
-    }
-
-    static public long isValidSnapshot(File f) throws IOException {
-        long zxid = getZxidFromName(f.getName(), "snapshot");
-        if (zxid == -1)
-            return -1;
-
-        // Check for a valid snapshot
-        RandomAccessFile raf = new RandomAccessFile(f, "r");
-        try {
-            raf.seek(raf.length() - 5);
-            byte bytes[] = new byte[5];
-            raf.read(bytes);
-            ByteBuffer bb = ByteBuffer.wrap(bytes);
-            int len = bb.getInt();
-            byte b = bb.get();
-            if (len != 1 || b != '/') {
-                LOG.warn("Invalid snapshot " + f + " len = " + len
-                        + " byte = " + (b & 0xff));
-                return -1;
-            }
-        } finally {
-            raf.close();
-        }
-
-        return zxid;
-    }
-
-    /**
-     * Compare file file names of form "prefix.version". Sort order result
-     * returned in order of version.
-     */
-    private static class DataDirFileComparator implements Comparator<File> {
-        private String prefix;
-        private boolean ascending;
-        public DataDirFileComparator(String prefix, boolean ascending) {
-            this.prefix = prefix;
-            this.ascending = ascending;
-        }
-
-        public int compare(File o1, File o2) {
-            long z1 = getZxidFromName(o1.getName(), prefix);
-            long z2 = getZxidFromName(o2.getName(), prefix);
-            int result = z1 < z2 ? -1 : (z1 > z2 ? 1 : 0);
-            return ascending ? result : -result;
-        }
-    }
-
-    /**
-     * Sort the list of files. Recency as determined by the version component
-     * of the file name.
-     *
-     * @param files array of files
-     * @param prefix files not matching this prefix are assumed to have a
-     * version = -1)
-     * @param ascending true sorted in ascending order, false results in
-     * descending order
-     * @return sorted input files
-     */
-    static List<File>
-        sortDataDir(File[] files, String prefix, boolean ascending)
-    {
-        List<File> filelist = Arrays.asList(files);
-        Collections.sort(filelist, new DataDirFileComparator(prefix, ascending));
-        return filelist;
-    }
-
-    /**
-     * Find the log file that starts at, or just before, the snapshot. Return
-     * this and all subsequent logs. Results are ordered by zxid of file,
-     * ascending order.
-     *
-     * @param logDirList array of files
-     * @param snapshotZxid return files at, or before this zxid
-     * @return
-     */
-    static File[] getLogFiles(File[] logDirList,long snapshotZxid) {
-        List<File> files = sortDataDir(logDirList, "log", true);
-        long logZxid = 0;
-        // Find the log file that starts before or at the same time as the
-        // zxid of the snapshot
-        for (File f : files) {
-            long fzxid = getZxidFromName(f.getName(), "log");
-            if (fzxid > snapshotZxid) {
-                continue;
-            }
-            if (fzxid > logZxid) {
-                logZxid = fzxid;
-            }
-        }
-        List<File> v=new ArrayList<File>(5);
-        // Apply the logs
-        for (File f : files) {
-            long fzxid = getZxidFromName(f.getName(), "log");
-            if (fzxid < logZxid) {
-                continue;
-            }
-            v.add(f);
-        }
-        return v.toArray(new File[0]);
+    public ZooKeeperServer(FileTxnSnapLog txnLogFactory,DataTreeBuilder treeBuilder) throws IOException {
+        this(txnLogFactory, DEFAULT_TICK_TIME, treeBuilder);
     }
 
     /**
      *  Restore sessions and data
      */
-    private void loadSnapshotAndLogs() throws IOException {
-        long zxid = -1;
-
-        // Find the most recent snapshot
-        List<File> files = sortDataDir(dataDir.listFiles(), "snapshot", false);
-        for (File f : files) {
-            zxid = isValidSnapshot(f);
-            if (zxid == -1) {
-                LOG.warn("Skipping " + f);
-                continue;
-            }
-
-            LOG.warn("Processing snapshot: " + f);
-
-            FileInputStream snapFIS = new FileInputStream(f);
-            try {
-                InputStream snapIS = new BufferedInputStream(snapFIS);
-                try {
-                    loadData(BinaryInputArchive.getArchive(snapIS));
-                } finally {
-                    snapIS.close();
-                }
-            } finally {
-                snapFIS.close();
-            }
-
-            dataTree.lastProcessedZxid = zxid;
-
-            // Apply the logs on/after the selected snapshot
-            File[] logfiles = getLogFiles(dataLogDir.listFiles(), zxid);
-            for (File logfile : logfiles) {
-                LOG.warn("Processing log file: " + logfile);
-
-                InputStream logIS =
-                    new BufferedInputStream(new FileInputStream(logfile));
-                zxid = playLog(BinaryInputArchive.getArchive(logIS));
-                logIS.close();
-            }
-            hzxid = zxid;
-
-            break;
-        }
-
-        if (zxid == -1) {
-            sessionsWithTimeouts = new ConcurrentHashMap<Long, Integer>();
-            dataTree = treeBuilder.build();
-        }
-    }
-
     public void loadData() throws IOException, InterruptedException {
-        loadSnapshotAndLogs();
-
+        PlayBackListener listener=new PlayBackListener(){
+            public void onTxnLoaded(TxnHeader hdr,Record txn){
+                Request r = new Request(null, 0, hdr.getCxid(),hdr.getType(),
+                        null, null);
+                r.txn = txn;
+                r.hdr = hdr;
+                r.zxid = hdr.getZxid();
+                addCommittedProposal(r);
+            }
+        };
+        sessionsWithTimeouts = new ConcurrentHashMap<Long, Integer>();
+        dataTree = treeBuilder.build();
+        long zxid = txnLogFactory.restore(dataTree,sessionsWithTimeouts,listener);
+        this.hzxid = zxid;
         // Clean up dead sessions
         LinkedList<Long> deadSessions = new LinkedList<Long>();
         for (long session : dataTree.getSessions()) {
@@ -369,94 +193,14 @@
             killSession(session);
         }
         // Make a clean snapshot
-        snapshot();
-    }
-
-    public void loadData(InputArchive ia) throws IOException {
-        sessionsWithTimeouts = new ConcurrentHashMap<Long, Integer>();
-        dataTree = treeBuilder.build();
-
-        int count = ia.readInt("count");
-        while (count > 0) {
-            long id = ia.readLong("id");
-            int to = ia.readInt("timeout");
-            sessionsWithTimeouts.put(id, to);
-            ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
-                                     "loadData --- session in archive: " + id
-                                     + " with timeout: " + to);
-            count--;
-        }
-        dataTree.deserialize(ia, "tree");
-    }
-
-    public long playLog(InputArchive logStream) throws IOException {
-        long highestZxid = 0;
-        try {
-            while (true) {
-                byte[] bytes = logStream.readBuffer("txnEntry");
-                if (bytes.length == 0) {
-                    // Since we preallocate, we define EOF to be an
-                    // empty transaction
-                    throw new EOFException();
-                }
-                InputArchive ia = BinaryInputArchive
-                        .getArchive(new ByteArrayInputStream(bytes));
-                TxnHeader hdr = new TxnHeader();
-                Record txn = deserializeTxn(ia, hdr);
-                if (logStream.readByte("EOR") != 'B') {
-                    LOG.warn("Last transaction was partial.");
-                    throw new EOFException();
-                }
-                if (hdr.getZxid() <= highestZxid && highestZxid != 0) {
-                    LOG.error(highestZxid + "(higestZxid) >= "
-                            + hdr.getZxid() + "(next log) for type "
-                            + hdr.getType());
-                } else {
-                    highestZxid = hdr.getZxid();
-                }
-                switch (hdr.getType()) {
-                case OpCode.createSession:
-                    sessionsWithTimeouts.put(hdr.getClientId(),
-                            ((CreateSessionTxn) txn).getTimeOut());
-                    ZooTrace.logTraceMessage(LOG,
-                                             ZooTrace.SESSION_TRACE_MASK,
-                            "playLog --- create session in log: 0x"
-                                    + Long.toHexString(hdr.getClientId())
-                                    + " with timeout: "
-                                    + ((CreateSessionTxn) txn).getTimeOut());
-                    // give dataTree a chance to sync its lastProcessedZxid
-                    dataTree.processTxn(hdr, txn);
-                    break;
-                case OpCode.closeSession:
-                    sessionsWithTimeouts.remove(hdr.getClientId());
-                    ZooTrace.logTraceMessage(LOG,
-                            ZooTrace.SESSION_TRACE_MASK,
-                            "playLog --- close session in log: 0x"
-                                    + Long.toHexString(hdr.getClientId()));
-                    dataTree.processTxn(hdr, txn);
-                    break;
-                default:
-                    dataTree.processTxn(hdr, txn);
-                }
-                Request r = new Request(null, 0, hdr.getCxid(), hdr.getType(),
-                        null, null);
-                r.txn = txn;
-                r.hdr = hdr;
-                r.zxid = hdr.getZxid();
-                addCommittedProposal(r);
-            }
-        } catch (EOFException e) {
-            // expected in some cases - see comments in try block
-        }
-        return highestZxid;
+        takeSnapshot();
     }
 
     /**
      * maintains a list of last 500 or so committed requests. This is used for
      * fast follower synchronization.
      *
-     * @param request
-     *            committed request
+     * @param request committed request
      */
 
     public void addCommittedProposal(Request request) {
@@ -492,135 +236,9 @@
         }
     }
 
-    static public Record deserializeTxn(InputArchive ia, TxnHeader hdr)
-            throws IOException {
-        hdr.deserialize(ia, "hdr");
-        Record txn = null;
-        switch (hdr.getType()) {
-        case OpCode.createSession:
-            // This isn't really an error txn; it just has the same
-            // format. The error represents the timeout
-            txn = new CreateSessionTxn();
-            break;
-        case OpCode.closeSession:
-            return null;
-        case OpCode.create:
-            txn = new CreateTxn();
-            break;
-        case OpCode.delete:
-            txn = new DeleteTxn();
-            break;
-        case OpCode.setData:
-            txn = new SetDataTxn();
-            break;
-        case OpCode.setACL:
-            txn = new SetACLTxn();
-            break;
-        case OpCode.error:
-            txn = new ErrorTxn();
-            break;
-        }
-        if (txn != null) {
-            txn.deserialize(ia, "txn");
-        }
-        return txn;
-    }
-
-    public void truncateLog(long finalZxid) throws IOException {
-        long highestZxid = 0;
-        for (File f : dataDir.listFiles()) {
-            long zxid = isValidSnapshot(f);
-            if (zxid == -1) {
-                LOG.warn("Skipping " + f);
-                continue;
-            }
-            if (zxid > highestZxid) {
-                highestZxid = zxid;
-            }
-        }
-        File[] files = getLogFiles(dataLogDir.listFiles(), highestZxid);
-        boolean truncated = false;
-        for (File f : files) {
-            FileInputStream fin = new FileInputStream(f);
-            InputArchive ia = BinaryInputArchive.getArchive(fin);
-            FileChannel fchan = fin.getChannel();
-            try {
-                while (true) {
-                    byte[] bytes = ia.readBuffer("txtEntry");
-                    if (bytes.length == 0) {
-                        // Since we preallocate, we define EOF to be an
-                        // empty transaction
-                        throw new EOFException();
-                    }
-                    InputArchive iab = BinaryInputArchive
-                            .getArchive(new ByteArrayInputStream(bytes));
-                    TxnHeader hdr = new TxnHeader();
-                    deserializeTxn(iab, hdr);
-                    if (ia.readByte("EOF") != 'B') {
-                        LOG.warn("Last transaction was partial.");
-                        throw new EOFException();
-                    }
-                    if (hdr.getZxid() == finalZxid) {
-                        // this is where we need to truncate
-
-                        long pos = fchan.position();
-                        fin.close();
-                        FileOutputStream fout = new FileOutputStream(f);
-                        FileChannel fchanOut = fout.getChannel();
-                        fchanOut.truncate(pos);
-                        fchanOut.close();
-                        fout.close();
-                        truncated = true;
-                        break;
-                    }
-                }
-            } catch (EOFException eof) {
-                // expected in some cases - see comments in try block
-            } finally {
-                fchan.close();
-                fin.close();
-            }
-            if (truncated == true) {
-                break;
-            }
-        }
-        if (truncated == false) {
-            // not able to truncate the log
-            LOG.error("Not able to truncate the log zxid 0x"
-                    + Long.toHexString(finalZxid));
-            System.exit(13);
-        }
-
-    }
-
-    public void snapshot(BinaryOutputArchive oa) throws IOException,
-            InterruptedException {
-        HashMap<Long, Integer> sessSnap = new HashMap<Long, Integer>(
-                sessionsWithTimeouts);
-        oa.writeInt(sessSnap.size(), "count");
-        for (Entry<Long, Integer> entry : sessSnap.entrySet()) {
-            oa.writeLong(entry.getKey().longValue(), "id");
-            oa.writeInt(entry.getValue().intValue(), "timeout");
-        }
-        dataTree.serialize(oa, "tree");
-    }
-
-    public void snapshot() throws InterruptedException {
-        long lastZxid = dataTree.lastProcessedZxid;
-        ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
-                "Snapshotting: zxid 0x" + Long.toHexString(lastZxid));
+    public void takeSnapshot(){
         try {
-            File f = new File(dataDir, "snapshot." + Long.toHexString(lastZxid));
-            OutputStream sessOS = new BufferedOutputStream(new FileOutputStream(f));
-            try {
-                BinaryOutputArchive oa = BinaryOutputArchive.getArchive(sessOS);
-                snapshot(oa);
-                sessOS.flush();
-            } finally {
-                sessOS.close();
-            }
-            ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
-                    "Snapshotting finished: zxid 0x" + Long.toHexString(lastZxid));
+            txnLogFactory.save(dataTree, sessionsWithTimeouts);
         } catch (IOException e) {
             LOG.error("Severe error, exiting",e);
             // This is a severe error that we cannot recover from,
@@ -629,6 +247,18 @@
         }
     }
 
+    public void serializeSnapshot(OutputArchive oa) throws IOException,
+            InterruptedException {
+        SerializeUtils.serializeSnapshot(dataTree, oa, sessionsWithTimeouts);
+    }
+
+    public void deserializeSnapshot(InputArchive ia) throws IOException {
+        sessionsWithTimeouts = new ConcurrentHashMap<Long, Integer>();
+        dataTree = treeBuilder.build();
+
+        SerializeUtils.deserializeSnapshot(dataTree,ia,sessionsWithTimeouts);
+    }
+
     /**
      * This should be called from a synchronized block on this!
      */
@@ -644,10 +274,6 @@
         return System.currentTimeMillis();
     }
 
-    static String getLogName(long zxid) {
-        return "log." + Long.toHexString(zxid);
-    }
-
     public void closeSession(long sessionId) throws InterruptedException {
         ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
                                  "ZooKeeperServer --- Session to be closed: 0x"
@@ -691,14 +317,6 @@
     }
 
     public void startup() throws IOException, InterruptedException {
-        if (dataDir == null || !dataDir.isDirectory()) {
-            throw new IOException("data directory does not exist: " + dataDir);
-        }
-        if (dataLogDir == null || !dataLogDir.isDirectory()) {
-            throw new IOException("data log directory does not exist: "
-                    + dataLogDir);
-        }
-
         if (dataTree == null) {
             loadData();
         }
@@ -916,14 +534,42 @@
         return serverCnxnFactory;
     }
 
+    /**
+     * return the last proceesed id from the 
+     * datatree
+     */
     public long getLastProcessedZxid() {
         return dataTree.lastProcessedZxid;
     }
 
+    /**
+     * return the outstanding requests
+     * in the queue, which havent been 
+     * processed yet
+     */
     public long getOutstandingRequests() {
         return getInProcess();
     }
 
+    /**
+     * trunccate the log to get in sync with others 
+     * if in a quorum
+     * @param zxid the zxid that it needs to get in sync
+     * with others
+     * @throws IOException
+     */
+    public void truncateLog(long zxid) throws IOException {
+        this.txnLogFactory.truncateLog(zxid);
+    }
+    
+    /**
+     * the snapshot and logwriter for this instance
+     * @return
+     */
+    public FileTxnSnapLog getLogWriter() {
+        return this.txnLogFactory;
+    }
+    
     public int getTickTime() {
         return tickTime;
     }
@@ -939,41 +585,7 @@
     public void setTreeBuilder(DataTreeBuilder treeBuilder) {
         this.treeBuilder = treeBuilder;
     }
-
-    /**
-     * Gets directory for storing the snapshot
-     */
-    public File getDataDir() {
-        return dataDir;
-    }
-
-    /**
-     * Sets directory for storing the snapshot
-     */
-    public void setDataDir(File dataDir) throws IOException {
-        this.dataDir = dataDir;
-        if (!dataDir.isDirectory()) {
-            throw new IOException("data directory does not exist");
-        }
-    }
-
-    /**
-     * Gets directoy for storing the log tnxns
-     */
-    public File getDataLogDir() {
-        return dataLogDir;
-    }
-
-    /**
-     * Sets directoy for storing the log tnxns
-     */
-    public void setDataLogDir(File dataLogDir) throws IOException {
-        this.dataLogDir = dataLogDir;
-        if (!dataLogDir.isDirectory()) {
-            throw new IOException("data log directory does not exist");
-        }
-    }
-
+    
     public int getClientPort() {
         return clientPort;
     }
@@ -981,4 +593,12 @@
     public void setClientPort(int clientPort) {
         this.clientPort = clientPort;
     }
+    
+    public void setTxnLogFactory(FileTxnSnapLog txnLog) {
+        this.txnLogFactory = txnLog;
+    }
+    
+    public FileTxnSnapLog getTxnLogFactory() {
+        return this.txnLogFactory;
+    }
 }

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java?rev=700690&r1=700689&r2=700690&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java Tue Sep 30 23:26:47 2008
@@ -22,12 +22,14 @@
 import java.io.IOException;
 
 import org.apache.log4j.Logger;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.ZooKeeperServer.BasicDataTreeBuilder;
 
 /**
  * This class starts and runs a standalone ZooKeeperServer.
  */
 public class ZooKeeperServerMain {
-    
+
     private static final Logger LOG = Logger.getLogger(ZooKeeperServerMain.class);
 
     /*
@@ -43,11 +45,15 @@
             }
 
             public ZooKeeperServer createServer() throws IOException {
+                // create a file logger url from the command line args
                 ZooKeeperServer zks = new ZooKeeperServer();
-                zks.setDataDir(new File(ServerConfig.getDataDir()));
-                zks.setDataLogDir(new File(ServerConfig.getDataLogDir()));
                 zks.setClientPort(ServerConfig.getClientPort());
-                return zks;
+
+               FileTxnSnapLog ftxn = new FileTxnSnapLog(new 
+                       File(ServerConfig.getDataLogDir()),
+                        new File(ServerConfig.getDataDir()));
+               zks.setTxnLogFactory(ftxn);
+               return zks;
             }
         });
     }

Added: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileSnap.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileSnap.java?rev=700690&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileSnap.java (added)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileSnap.java Tue Sep 30 23:26:47 2008
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.persistence;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.InputArchive;
+import org.apache.jute.OutputArchive;
+import org.apache.zookeeper.server.DataTree;
+import org.apache.zookeeper.server.util.SerializeUtils;
+
+/**
+ * This class implements the snapshot interface.
+ * it is responsible for storing, serializing
+ * and deserializing the right snapshot.
+ * and provides access to the snapshots.
+ */
+public class FileSnap implements SnapShot {
+    File snapDir;
+    private static final int VERSION=2;
+    private static final long dbId=-1;
+    public final static int MAGIC = ByteBuffer.wrap("AK47".getBytes()).getInt();
+    public FileSnap(File snapDir) {
+        this.snapDir = snapDir;
+    }
+    
+    /**
+     * deserialize a data tree from the most recent snapshot
+     * @return the zxid of the snapshot
+     */
+    public long deserialize(DataTree dt, Map<Long, Integer> sessions)
+            throws IOException {
+        File snap = findMostRecentSnapshot();
+        if (snap == null) {
+            return -1L;
+        }
+        InputStream snapIS = new BufferedInputStream(new FileInputStream(snap));
+        InputArchive ia=BinaryInputArchive.getArchive(snapIS);
+        deserialize(dt,sessions,ia);
+        snapIS.close();
+        dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), "snapshot");
+        return dt.lastProcessedZxid;
+    }
+    
+    /**
+     * deserialize the datatree from an inputarchive
+     * @param dt the datatree to be serialized into
+     * @param sessions the sessions to be filled up
+     * @param ia the input archive to restore from
+     * @throws IOException
+     */
+    protected void deserialize(DataTree dt, Map<Long, Integer> sessions,
+            InputArchive ia) throws IOException {
+        FileHeader header = new FileHeader();
+        header.deserialize(ia, "fileheader");
+        SerializeUtils.deserializeSnapshot(dt,ia,sessions);
+    }
+    
+    /**
+     * find the most recent snapshot in the database.
+     * @return the file containing the most recent snapshot
+     */
+    public File findMostRecentSnapshot() throws IOException {
+        List<File> files = Util.sortDataDir(snapDir.listFiles(), "snapshot", false);
+        for (File f : files) {
+            if(Util.isValidSnapshot(f))
+                return f;
+        }
+        return null;
+    }
+
+    /**
+     * serialize the datatree and sessions
+     * @param dt the datatree to be serialized
+     * @param sessions the sessions to be serialized
+     * @param oa the output archive to serialize into
+     * @param header the header of this snapshot
+     * @throws IOException
+     */
+    protected void serialize(DataTree dt,Map<Long, Integer> sessions,
+            OutputArchive oa, FileHeader header) throws IOException {
+        // this is really a programmatic error and not something that can 
+        // happen at runtime
+        if(header==null)
+            throw new IllegalStateException(
+                    "Snapshot's not open for writing: uninitialized header");
+        header.serialize(oa, "fileheader");
+        SerializeUtils.serializeSnapshot(dt,oa,sessions);
+    }
+    
+    /**
+     * serialize the datatree and session into the file snapshot
+     * @param dt the datatree to be serialized
+     * @param sessions the sessions to be serialized
+     * @param snapShot the file to store snapshot into
+     */
+    public void serialize(DataTree dt, Map<Long, Integer> sessions, File snapShot)
+            throws IOException {
+        OutputStream sessOS = new BufferedOutputStream(new FileOutputStream(snapShot));
+        OutputArchive oa = BinaryOutputArchive.getArchive(sessOS);
+        FileHeader header = new FileHeader(MAGIC, VERSION, dbId);
+        serialize(dt,sessions,oa, header);
+        sessOS.flush();
+        sessOS.close();
+    }
+   
+ }
\ No newline at end of file