You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by br...@apache.org on 2008/10/01 08:26:48 UTC
svn commit: r700690 [1/2] - in /hadoop/zookeeper/trunk: ./ src/ src/java/
src/java/jmx/org/apache/zookeeper/jmx/server/
src/java/jmx/org/apache/zookeeper/server/
src/java/jmx/org/apache/zookeeper/server/quorum/
src/java/main/org/apache/zookeeper/server...
Author: breed
Date: Tue Sep 30 23:26:47 2008
New Revision: 700690
URL: http://svn.apache.org/viewvc?rev=700690&view=rev
Log:
ZOOKEEPER-38. headers (version+) in log/snap files
Added:
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileSnap.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/SnapShot.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/TxnLog.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/Util.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/SerializeUtils.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/DataTreeUnitTest.java
Modified:
hadoop/zookeeper/trunk/CHANGES.txt
hadoop/zookeeper/trunk/src/java/OldChangeLog
hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/jmx/server/DataTreeBean.java
hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/ManagedZooKeeperServer.java
hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/ManagedZooKeeperServerMain.java
hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/ObservableZooKeeperServer.java
hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ManagedQuorumPeer.java
hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ManagedQuorumPeerMain.java
hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ObservableFollowerZooKeeperServer.java
hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ObservableLeaderZooKeeperServer.java
hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ObservableQuorumPeer.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PurgeTxnLog.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/ZooKeeperServerTest.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/RecoveryTest.java
hadoop/zookeeper/trunk/src/zookeeper.jute
Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=700690&r1=700689&r2=700690&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Tue Sep 30 23:26:47 2008
@@ -96,3 +96,6 @@
ZOOKEEPER-117. threading issues in Leader election (Flavio Junqueira and Patrick
Hunt via breed)
+
+ ZOOKEEPER-38. headers (version+) in log/snap files (Andrew Kornev and Mahadev
+ Konar via breed)
Modified: hadoop/zookeeper/trunk/src/java/OldChangeLog
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/OldChangeLog?rev=700690&r1=700689&r2=700690&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/OldChangeLog (original)
+++ hadoop/zookeeper/trunk/src/java/OldChangeLog Tue Sep 30 23:26:47 2008
@@ -1,82 +0,0 @@
-Release 2.2.0
-2008-05-08 Andrew Kornev <ak...@users.sourceforge.net>
-
- * phunt: [1956480] Renamed ZooLog to ZooTrace. Major cleanup of tracing.
-
- * fpj: [1958274] got rid of unused vars in leader election
-
- * fpj: [1958361] Patch to fix NPE upon access to watcher
-
- * tedunning: [1951806] Added a sample startup script.
-
- * akornev: [1956499] Added the "dist" target to the ant buildfile.
-
- * breed: [1947090] Fixed improper timeout tracking at clients.
-
- * phunt: [1953737] Millisecond timing in the trace file.
-
- * phunt: [1949253] Move to log4j for logging.
-
- * phunt: [1942451] build optimization: uptodate check on jute
-
- * phunt: [1943392] Test environment changes: unit/func/perf/coverage test
-
- * mahadevkonar: [1934859] Performance enhancement for serialization of records.
-
- * phunt: [1931630] Fixed ZooKeeperServer loadData() method to optimally
- scan for the most recent valid snapshot.
-
- * akornev: [1917295] Root node watch not triggered
-
- * breed: [1912209] Session End Game handling
-
- * akornev: [1913967] code refactoring for JMX enablement. Added ServerStats and
- QuorumStats classes. Bug fixes: OutOfMemory under heavy load, disk I/O now
- uses buffered streams, NIOServerCnxn.Factory shuffles the selector keys
- to avoid starvation. Lots of formatting: replaced tabs with whitespaces,
- DOS eol style converted to UNIX.
-
- * breed: [1882928] Log the uncaught exceptions from the SendThread and EventThread
-
- * fpj: [1881204] New leader election algorithm over TCP.
-
- * akornev: [1898314] Added support for server version info at runtime; added
- the "release" target to ant build file
-
- * breed: [1892108] Configurable packet sanity check
-
- * akornev: [1889354] JAR manifest file now includes additional metadata: Built-By,
- Built-At, Built-On, Implementation-Title, Implementation-Version and
- Implementation-Vendor. Use SvnAnt ant task to extract SVN version number.
-
- * mahadevkonar: [1881545] fixed logging to output session id in hex
-
-Release 1.1.0
-2008-01-28 Andrew Kornev <ak...@users.sourceforge.net>
-
- * breed: [1875540] Make sure java client aborts the outgoing packets when
- a connection closes.
-
- * Jute compiler: emit the #ifdef extern "C" guards in the generated .jute.h
-
- * mahadevkonar: [1844561] fast sync between the leader and the follower.
-
- * mahadevkonar: [1849444] fixed session id generation routine to generate
- unique session ids.
-
- * breed: [1845696] fixed a race condition in the quorum server where it
- is possible that a create session request can be committed and applied
- at a follower before it is applied at the leader.
-
- * breed: [1841938] implemented the sync operation to flush updates pedning
- on the Leader.
-
- * vlarsen: [1835834] fixed a few compiler warnings, removed some @Override
- annotations used with interfaces.
-
-Release 1.0.0
-2007-11-27 Andrew Kornev <ak...@users.sourceforge.net>
-
- * Updated the jute compiler to emit int32_t vs int in the generated C code
-
- * Changed release numbering scheme to match that of the C client
\ No newline at end of file
Modified: hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/jmx/server/DataTreeBean.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/jmx/server/DataTreeBean.java?rev=700690&r1=700689&r2=700690&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/jmx/server/DataTreeBean.java (original)
+++ hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/jmx/server/DataTreeBean.java Tue Sep 30 23:26:47 2008
@@ -55,6 +55,7 @@
}
return stream.size();
*/
+ LOG.warn("Not Implemented");
return -1;
}
Modified: hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/ManagedZooKeeperServer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/ManagedZooKeeperServer.java?rev=700690&r1=700689&r2=700690&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/ManagedZooKeeperServer.java (original)
+++ hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/ManagedZooKeeperServer.java Tue Sep 30 23:26:47 2008
@@ -18,8 +18,11 @@
package org.apache.zookeeper.server;
+import static org.apache.zookeeper.server.ServerConfig.getClientPort;
+
import java.io.File;
import java.io.IOException;
+import java.net.URI;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
@@ -30,6 +33,9 @@
import org.apache.zookeeper.jmx.server.DataTreeMXBean;
import org.apache.zookeeper.jmx.server.ZooKeeperServerBean;
import org.apache.zookeeper.jmx.server.ZooKeeperServerMXBean;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.persistence.Util;
+
import org.apache.zookeeper.server.util.ConnectionObserver;
import org.apache.zookeeper.server.util.ObserverManager;
import org.apache.zookeeper.server.util.ServerObserver;
@@ -103,22 +109,17 @@
}
}
- public ManagedZooKeeperServer() {
- super();
+ public ManagedZooKeeperServer(FileTxnSnapLog logFactory,
+ int tickTime,DataTreeBuilder treeBuilder) throws IOException {
+ super(logFactory, tickTime,treeBuilder);
ObserverManager.getInstance().add(new ManagedServerObserver());
ObserverManager.getInstance().add(new ManagedConnectionObserver());
}
- public ManagedZooKeeperServer(File dataDir, File dataLogDir, int tickTime, DataTreeBuilder treeBuilder) throws IOException {
- super(dataDir, dataLogDir, tickTime, treeBuilder);
+ public ManagedZooKeeperServer(FileTxnSnapLog logFactory,
+ DataTreeBuilder treeBuilder) throws IOException {
+ super(logFactory,treeBuilder);
ObserverManager.getInstance().add(new ManagedServerObserver());
ObserverManager.getInstance().add(new ManagedConnectionObserver());
}
-
- public ManagedZooKeeperServer(File dataDir, File dataLogDir, int tickTime) throws IOException {
- super(dataDir, dataLogDir, tickTime);
- ObserverManager.getInstance().add(new ManagedServerObserver());
- ObserverManager.getInstance().add(new ManagedConnectionObserver());
- }
-
}
Modified: hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/ManagedZooKeeperServerMain.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/ManagedZooKeeperServerMain.java?rev=700690&r1=700689&r2=700690&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/ManagedZooKeeperServerMain.java (original)
+++ hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/ManagedZooKeeperServerMain.java Tue Sep 30 23:26:47 2008
@@ -26,6 +26,8 @@
import org.apache.zookeeper.jmx.server.ConnectionMXBean;
import org.apache.zookeeper.jmx.server.DataTreeMXBean;
import org.apache.zookeeper.jmx.server.ZooKeeperServerMXBean;
+import org.apache.zookeeper.server.persistence.FileTxnLog;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.util.ZooKeeperObserverManager;
/**
@@ -65,12 +67,10 @@
return new ObservableNIOServerCnxn.Factory(getClientPort());
}
public ZooKeeperServer createServer() throws IOException {
- ManagedZooKeeperServer zks = new ManagedZooKeeperServer();
- zks.setDataDir(new File(ServerConfig.getDataDir()));
- zks.setDataLogDir(new File(ServerConfig.getDataLogDir()));
- zks.setClientPort(ServerConfig.getClientPort());
- // TODO: we may want to build an observable/managed data tree here instead
- zks.setTreeBuilder(new ZooKeeperServer.BasicDataTreeBuilder());
+ ManagedZooKeeperServer zks = new ManagedZooKeeperServer(
+ new FileTxnSnapLog(new File(ServerConfig.getDataDir()),
+ new File(ServerConfig.getDataLogDir())),
+ new ZooKeeperServer.BasicDataTreeBuilder());
return zks;
}
});
Modified: hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/ObservableZooKeeperServer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/ObservableZooKeeperServer.java?rev=700690&r1=700689&r2=700690&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/ObservableZooKeeperServer.java (original)
+++ hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/ObservableZooKeeperServer.java Tue Sep 30 23:26:47 2008
@@ -18,11 +18,13 @@
package org.apache.zookeeper.server;
-import java.io.File;
import java.io.IOException;
+
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.util.ObserverManager;
import org.apache.zookeeper.server.util.ServerObserver;
+
/**
* The observable server broadcast notifications when its state changes.
*
@@ -33,17 +35,13 @@
public class ObservableZooKeeperServer extends ZooKeeperServer{
private ZooKeeperObserverNotifier notifier=new ZooKeeperObserverNotifier(this);
-
- public ObservableZooKeeperServer() {
- super();
+ public ObservableZooKeeperServer(FileTxnSnapLog logFactory,
+ int tickTime,DataTreeBuilder treeBuilder) throws IOException {
+ super(logFactory, tickTime,treeBuilder);
}
-
- public ObservableZooKeeperServer(File dataDir, File dataLogDir, int tickTime, DataTreeBuilder treeBuilder) throws IOException {
- super(dataDir, dataLogDir, tickTime, treeBuilder);
- }
-
- public ObservableZooKeeperServer(File dataDir, File dataLogDir, int tickTime) throws IOException {
- super(dataDir, dataLogDir, tickTime);
+ public ObservableZooKeeperServer(FileTxnSnapLog logFactory,
+ DataTreeBuilder treeBuilder) throws IOException {
+ super(logFactory,treeBuilder);
}
public void shutdown() {
Modified: hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ManagedQuorumPeer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ManagedQuorumPeer.java?rev=700690&r1=700689&r2=700690&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ManagedQuorumPeer.java (original)
+++ hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ManagedQuorumPeer.java Tue Sep 30 23:26:47 2008
@@ -46,6 +46,8 @@
import org.apache.zookeeper.server.ServerCnxn;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.ZooTrace;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.persistence.Util;
import org.apache.zookeeper.server.util.ConnectionObserver;
import org.apache.zookeeper.server.util.ObserverManager;
import org.apache.zookeeper.server.util.QuorumPeerObserver;
@@ -207,15 +209,21 @@
setupObservers();
}
- public ManagedQuorumPeer(ArrayList<QuorumServer> quorumPeers, File dataDir, File dataLogDir, int clientPort, int electionAlg, int electionPort, long myid, int tickTime, int initLimit,
- int syncLimit) throws IOException {
- super(quorumPeers, dataDir, dataLogDir, clientPort, electionAlg, electionPort, myid, tickTime, initLimit, syncLimit);
+ public ManagedQuorumPeer(ArrayList<QuorumServer> quorumPeers,
+ File dataDir, File dataLogDir, int clientPort,
+ int electionAlg, int electionPort, long myid, int tickTime, int initLimit,
+ int syncLimit) throws IOException {
+ super(quorumPeers, dataDir, dataLogDir, clientPort,
+ electionAlg, electionPort, myid, tickTime, initLimit, syncLimit);
setupObservers();
}
- public ManagedQuorumPeer(ArrayList<QuorumServer> quorumPeers, File dataDir, File dataLogDir, int electionType, int electionPort, long myid, int tickTime, int initLimit, int syncLimit,
- NIOServerCnxn.Factory cnxnFactory) throws IOException {
- super(quorumPeers, dataDir, dataLogDir, electionType, electionPort, myid, tickTime, initLimit, syncLimit, cnxnFactory);
+ public ManagedQuorumPeer(ArrayList<QuorumServer> quorumPeers,
+ File dataDir, File dataLogDir, int electionType, int electionPort,
+ long myid, int tickTime, int initLimit, int syncLimit,
+ NIOServerCnxn.Factory cnxnFactory) throws IOException {
+ super(quorumPeers, dataDir, dataLogDir, electionType, electionPort,
+ myid, tickTime, initLimit, syncLimit, cnxnFactory);
setupObservers();
}
Modified: hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ManagedQuorumPeerMain.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ManagedQuorumPeerMain.java?rev=700690&r1=700689&r2=700690&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ManagedQuorumPeerMain.java (original)
+++ hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ManagedQuorumPeerMain.java Tue Sep 30 23:26:47 2008
@@ -35,6 +35,14 @@
import org.apache.zookeeper.server.NIOServerCnxn;
import org.apache.zookeeper.server.ObservableNIOServerCnxn;
import org.apache.zookeeper.server.ServerConfig;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.ZooTrace;
+import org.apache.zookeeper.server.persistence.FileTxnLog;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.util.ConnectionObserver;
+import org.apache.zookeeper.server.util.ObserverManager;
+import org.apache.zookeeper.server.util.QuorumPeerObserver;
+import org.apache.zookeeper.server.util.ServerObserver;
import org.apache.zookeeper.server.util.ZooKeeperObserverManager;
/**
@@ -82,12 +90,13 @@
ZooKeeperObserverManager.setAsConcrete();
runPeer(new QuorumPeer.Factory() {
public QuorumPeer create(NIOServerCnxn.Factory cnxnFactory)
- throws IOException {
-
+ throws IOException {
ManagedQuorumPeer peer = new ManagedQuorumPeer();
peer.setClientPort(ServerConfig.getClientPort());
- peer.setDataDir(new File(ServerConfig.getDataDir()));
- peer.setDataLogDir(new File(ServerConfig.getDataLogDir()));
+ FileTxnSnapLog factory = new FileTxnSnapLog(new
+ File(ServerConfig.getDataLogDir()), new
+ File(ServerConfig.getDataDir()));
+ peer.setTxnFactory(factory);
peer.setQuorumPeers(QuorumPeerConfig.getServers());
peer.setElectionPort(QuorumPeerConfig.getElectionPort());
peer.setElectionType(QuorumPeerConfig.getElectionAlg());
@@ -97,7 +106,7 @@
peer.setSyncLimit(QuorumPeerConfig.getSyncLimit());
peer.setCnxnFactory(cnxnFactory);
return peer;
-
+
}
public NIOServerCnxn.Factory createConnectionFactory() throws IOException {
return new ObservableNIOServerCnxn.Factory(getClientPort());
Modified: hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ObservableFollowerZooKeeperServer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ObservableFollowerZooKeeperServer.java?rev=700690&r1=700689&r2=700690&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ObservableFollowerZooKeeperServer.java (original)
+++ hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ObservableFollowerZooKeeperServer.java Tue Sep 30 23:26:47 2008
@@ -18,10 +18,10 @@
package org.apache.zookeeper.server.quorum;
-import java.io.File;
import java.io.IOException;
import org.apache.zookeeper.server.ZooKeeperObserverNotifier;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.util.ObserverManager;
import org.apache.zookeeper.server.util.ServerObserver;
@@ -37,9 +37,9 @@
private ZooKeeperObserverNotifier notifier;
- public ObservableFollowerZooKeeperServer(File dataDir, File dataLogDir,
+ public ObservableFollowerZooKeeperServer(FileTxnSnapLog logFactory,
QuorumPeer self, DataTreeBuilder treeBuilder) throws IOException {
- super(dataDir, dataLogDir, self, treeBuilder);
+ super(logFactory, self, treeBuilder);
notifier=new ZooKeeperObserverNotifier(this);
}
Modified: hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ObservableLeaderZooKeeperServer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ObservableLeaderZooKeeperServer.java?rev=700690&r1=700689&r2=700690&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ObservableLeaderZooKeeperServer.java (original)
+++ hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ObservableLeaderZooKeeperServer.java Tue Sep 30 23:26:47 2008
@@ -18,10 +18,10 @@
package org.apache.zookeeper.server.quorum;
-import java.io.File;
import java.io.IOException;
import org.apache.zookeeper.server.ZooKeeperObserverNotifier;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.util.ObserverManager;
import org.apache.zookeeper.server.util.ServerObserver;
@@ -37,9 +37,9 @@
private ZooKeeperObserverNotifier notifier;
- public ObservableLeaderZooKeeperServer(File dataDir, File dataLogDir,
+ public ObservableLeaderZooKeeperServer(FileTxnSnapLog logFactory,
QuorumPeer self, DataTreeBuilder treeBuilder) throws IOException {
- super(dataDir, dataLogDir, self, treeBuilder);
+ super(logFactory, self, treeBuilder);
notifier=new ZooKeeperObserverNotifier(this);
}
Modified: hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ObservableQuorumPeer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ObservableQuorumPeer.java?rev=700690&r1=700689&r2=700690&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ObservableQuorumPeer.java (original)
+++ hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ObservableQuorumPeer.java Tue Sep 30 23:26:47 2008
@@ -24,6 +24,7 @@
import org.apache.zookeeper.server.NIOServerCnxn;
import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.util.EventInfo;
import org.apache.zookeeper.server.util.ObservableComponent;
import org.apache.zookeeper.server.util.ObserverManager;
@@ -58,32 +59,40 @@
};
public abstract void dispatch(ObservableQuorumPeer peer,QuorumPeerObserver ob);
}
-
-
public ObservableQuorumPeer() {
super();
}
- public ObservableQuorumPeer(ArrayList<QuorumServer> quorumPeers, File dataDir, File dataLogDir, int clientPort, int electionAlg, int electionPort, long myid, int tickTime, int initLimit,
- int syncLimit) throws IOException {
- super(quorumPeers, dataDir, dataLogDir, clientPort, electionAlg, electionPort, myid, tickTime, initLimit, syncLimit);
+ public ObservableQuorumPeer(ArrayList<QuorumServer> quorumPeers, File dataDir,
+ File dataLogDir, int clientPort, int electionAlg,
+ int electionPort, long myid, int tickTime, int initLimit,
+ int syncLimit) throws IOException {
+ super(quorumPeers, dataDir, dataLogDir, clientPort,
+ electionAlg, electionPort, myid, tickTime, initLimit, syncLimit);
}
- public ObservableQuorumPeer(ArrayList<QuorumServer> quorumPeers, File dataDir, File dataLogDir, int electionType, int electionPort, long myid, int tickTime, int initLimit, int syncLimit,
- NIOServerCnxn.Factory cnxnFactory) throws IOException {
- super(quorumPeers, dataDir, dataLogDir, electionType, electionPort, myid, tickTime, initLimit, syncLimit, cnxnFactory);
+ public ObservableQuorumPeer(ArrayList<QuorumServer> quorumPeers,
+ File dataDir, File dataLogDir, int electionType,
+ int electionPort, long myid, int tickTime,
+ int initLimit, int syncLimit,
+ NIOServerCnxn.Factory cnxnFactory) throws IOException {
+ super(quorumPeers, dataDir, dataLogDir, electionType, electionPort,
+ myid, tickTime, initLimit, syncLimit, cnxnFactory);
}
+
// instantiate an observable follower
- protected Follower makeFollower(File dataDir,File dataLogDir) throws IOException {
- return new ObservableFollower(this, new ObservableFollowerZooKeeperServer(dataDir,
- dataLogDir, this,new ZooKeeperServer.BasicDataTreeBuilder()));
+ protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
+ return new ObservableFollower(this,
+ new ObservableFollowerZooKeeperServer(logFactory, this,
+ new ZooKeeperServer.BasicDataTreeBuilder()));
}
// instantiate an observable leader
- protected Leader makeLeader(File dataDir,File dataLogDir) throws IOException {
- return new ObservableLeader(this, new ObservableLeaderZooKeeperServer(dataDir,
- dataLogDir,this,new ZooKeeperServer.BasicDataTreeBuilder()));
+ protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException {
+ return new ObservableLeader(this,
+ new ObservableLeaderZooKeeperServer(logFactory,
+ this,new ZooKeeperServer.BasicDataTreeBuilder()));
}
public void run() {
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java?rev=700690&r1=700689&r2=700690&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java Tue Sep 30 23:26:47 2008
@@ -285,7 +285,8 @@
}
}
- public ArrayList<String> getChildren(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException {
+ public List<String> getChildren(String path, Stat stat, Watcher watcher)
+ throws KeeperException.NoNodeException {
DataNode n = nodes.get(path);
if (n == null) {
throw new KeeperException.NoNodeException();
@@ -462,8 +463,7 @@
* @throws IOException
* @throws InterruptedException
*/
- void serializeNode(OutputArchive oa, StringBuilder path)
- throws IOException, InterruptedException {
+ void serializeNode(OutputArchive oa, StringBuilder path) throws IOException {
String pathString = path.toString();
DataNode node = getNode(pathString);
if (node == null) {
@@ -494,8 +494,7 @@
public boolean initialized = false;
- public void serialize(OutputArchive oa, String tag) throws IOException,
- InterruptedException {
+ public void serialize(OutputArchive oa, String tag) throws IOException {
scount = 0;
serializeNode(oa, new StringBuilder(""));
// / marks end of stream
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java?rev=700690&r1=700689&r2=700690&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java Tue Sep 30 23:26:47 2008
@@ -20,7 +20,6 @@
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.List;
import org.apache.log4j.Logger;
@@ -212,7 +211,7 @@
}
PrepRequestProcessor.checkACL(zks, n.acl, ZooDefs.Perms.READ,
request.authInfo);
- ArrayList<String> children = zks.dataTree.getChildren(
+ List<String> children = zks.dataTree.getChildren(
getChildrenRequest.getPath(), stat, getChildrenRequest
.getWatch() ? request.cnxn : null);
rsp = new GetChildrenResponse(children);
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PurgeTxnLog.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PurgeTxnLog.java?rev=700690&r1=700689&r2=700690&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PurgeTxnLog.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PurgeTxnLog.java Tue Sep 30 23:26:47 2008
@@ -22,16 +22,21 @@
import java.io.FileFilter;
import java.io.IOException;
import java.text.DateFormat;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.persistence.Util;
+
public class PurgeTxnLog {
static void printUsage(){
- System.out.println("PurgeTxnLog dataLogDir ");
+ System.out.println("PurgeTxnLog dataLogDir [snapDir]");
System.out.println("\tdataLogDir -- path to the txn log directory");
+ System.out.println("\tsnapDir -- path to the snapshot directory");
System.exit(1);
}
/**
@@ -39,38 +44,42 @@
* dataLogDir -- txn log directory
*/
public static void main(String[] args) throws IOException {
- if(args.length!=1)
+ if(args.length<1 || args.length>2)
printUsage();
File dataDir=new File(args[0]);
-
- // find the most recent valid snapshot
- long highestZxid = -1;
- for (File f : dataDir.listFiles()) {
- long zxid = ZooKeeperServer.isValidSnapshot(f);
- if (zxid > highestZxid) {
- highestZxid = zxid;
+ File snapDir=dataDir;
+ if(args.length==2){
+ snapDir=new File(args[1]);
}
- }
- // found any valid snapshots?
- if(highestZxid==-1)
- return; // no snapshots
-
+ FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir);
+
+ // found any valid recent snapshots?
+
// files to exclude from deletion
Set<File> exc=new HashSet<File>();
- exc.add(new File(dataDir, "snapshot."+Long.toHexString(highestZxid)));
- exc.addAll(Arrays.asList(ZooKeeperServer.getLogFiles(dataDir.listFiles(),highestZxid)));
+ File snapShot = txnLog.findMostRecentSnapshot();
+ exc.add(txnLog.findMostRecentSnapshot());
+ long zxid = Util.getZxidFromName(snapShot.getName(),"snapshot");
+ exc.addAll(Arrays.asList(txnLog.getSnapshotLogs(zxid)));
final Set<File> exclude=exc;
- List<File> files=Arrays.asList(dataDir.listFiles(new FileFilter(){
+ class MyFileFilter implements FileFilter{
+ private final String prefix;
+ MyFileFilter(String prefix){
+ this.prefix=prefix;
+ }
public boolean accept(File f){
- if(!f.getName().startsWith("log.") &&
- !f.getName().startsWith("snapshot."))
- return false;
- if(exclude.contains(f))
+ if(!f.getName().startsWith(prefix) || exclude.contains(f))
return false;
return true;
- }}));
+ }
+ }
+ // add all non-excluded log files
+ List<File> files=new ArrayList<File>(
+ Arrays.asList(dataDir.listFiles(new MyFileFilter("log."))));
+ // add all non-excluded snapshot files to the deletion list
+ files.addAll(Arrays.asList(snapDir.listFiles(new MyFileFilter("snapshot."))));
// remove the old files
for(File f: files)
{
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java?rev=700690&r1=700689&r2=700690&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java Tue Sep 30 23:26:47 2008
@@ -18,22 +18,13 @@
package org.apache.zookeeper.server;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
import java.util.LinkedList;
import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.log4j.Logger;
-import org.apache.jute.BinaryOutputArchive;
-import org.apache.jute.Record;
-import org.apache.zookeeper.server.util.Profiler;
-import org.apache.zookeeper.txn.TxnHeader;
/**
* This RequestProcessor logs requests to disk. It batches the requests to do
@@ -42,57 +33,24 @@
*/
public class SyncRequestProcessor extends Thread implements RequestProcessor {
private static final Logger LOG = Logger.getLogger(SyncRequestProcessor.class);
-
- static final int PADDING_TIMEOUT=1000;
- ZooKeeperServer zks;
-
- LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>();
-
- static boolean forceSync;
- static {
- forceSync = !System.getProperty("zookeeper.forceSync", "yes").equals(
- "no");
- }
-
- private static long preAllocSize = 65536 * 1024;
- static {
- String size = System.getProperty("zookeeper.preAllocSize");
- if (size != null) {
- try {
- preAllocSize = Long.parseLong(size) * 1024;
- } catch (NumberFormatException e) {
- LOG.warn(size
- + " is not a valid value for zookeeper.preAllocSize");
- }
- }
- }
-
+ private ZooKeeperServer zks;
+ private LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>();
+ private RequestProcessor nextProcessor;
+ boolean timeToDie = false;
/**
- * Change the data log pre-allocation size on the fly.
- *
- * You might want to do this on systems (Windows esp) where preallocation
- * is slow, WARN messages are output the log if preAllocation is taking
- * too long -- will stall the request pipeline.
- *
- * This value can also be set through the "zookeeper.preAllocSize" (also
- * in K bytes) environment variable when starting the jvm.
- *
- * @param size size in K bytes to change the log prealloc to
+ * Transactions that have been written and are waiting to be flushed to
+ * disk. Basically this is the list of SyncItems whose callbacks will be
+ * invoked after flush returns successfully.
*/
- public static void setPreAllocSize(long size) {
- preAllocSize = size * 1024;
- }
-
+ private LinkedList<Request> toFlush = new LinkedList<Request>();
+ private Random r = new Random(System.nanoTime());
+ private int logCount = 0;
/**
* The number of log entries to log before starting a snapshot
*/
- static public int snapCount = ZooKeeperServer.getSnapCount();
-
- Thread snapInProcess;
-
- RequestProcessor nextProcessor;
+ public static int snapCount = ZooKeeperServer.getSnapCount();
- boolean timeToDie = false;
+ private Request requestOfDeath = Request.requestOfDeath;
public SyncRequestProcessor(ZooKeeperServer zks,
RequestProcessor nextProcessor) {
@@ -102,45 +60,13 @@
start();
}
- /**
- * Transactions that have been written and are waiting to be flushed to
- * disk. Basically this is the list of SyncItems whose callbacks will be
- * invoked after flush returns successfully.
- */
- LinkedList<Request> toFlush = new LinkedList<Request>();
-
- FileOutputStream logStream;
-
- BinaryOutputArchive logArchive;
-
- Random r = new Random(System.nanoTime());
-
- int logCount = 0;
-
- Request requestOfDeath = Request.requestOfDeath;
-
- private static ByteBuffer fill = ByteBuffer.allocateDirect(1024);
-
- LinkedList<FileOutputStream> streamsToFlush = new LinkedList<FileOutputStream>();
-
- private long padLogFile(FileChannel fc,long fileSize) throws IOException{
- long position = fc.position();
- // We pad the file in 1M chunks to avoid syncing to
- // write the new filesize.
- if (position + 4096 >= fileSize) {
- fileSize = fileSize + preAllocSize;
- fill.position(0);
- fc.write(fill, fileSize);
- }
- return fileSize;
+ private void startSnapshot() throws IOException {
+ zks.takeSnapshot();
}
@Override
public void run() {
try {
- long fileSize = 0;
- long lastZxidSeen = -1;
- FileChannel fc = null;
while (true) {
Request si = null;
if (toFlush.isEmpty()) {
@@ -156,75 +82,16 @@
break;
}
if (si != null) {
- // LOG.warn("Sync>>> cxid = " + si.cxid + " type = " +
- // si.type + " id = " + si.sessionId + " zxid = " +
- // Long.toHexString(si.zxid));
- ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK,
- 'S', si, "");
- TxnHeader hdr = si.hdr;
- if (hdr != null) {
- if (hdr.getZxid() <= lastZxidSeen) {
- LOG.warn("Current zxid " + hdr.getZxid()
- + " is <= " + lastZxidSeen + " for "
- + hdr.getType());
- }
- Record txn = si.txn;
- if (logStream == null) {
- fileSize = 0;
- logStream = new FileOutputStream(new File(
- zks.dataLogDir, ZooKeeperServer
- .getLogName(hdr.getZxid())));
- synchronized (streamsToFlush) {
- streamsToFlush.add(logStream);
- }
- fc = logStream.getChannel();
- logArchive = BinaryOutputArchive
- .getArchive(logStream);
- }
- final long fsize=fileSize;
- final FileChannel ffc=fc;
- fileSize = Profiler.profile(
- new Profiler.Operation<Long>() {
- public Long execute() throws Exception {
- return SyncRequestProcessor.this
- .padLogFile(ffc, fsize);
- }
- }, PADDING_TIMEOUT,
- "Logfile padding exceeded time threshold"
- );
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- BinaryOutputArchive boa = BinaryOutputArchive
- .getArchive(baos);
- hdr.serialize(boa, "hdr");
- if (txn != null) {
- txn.serialize(boa, "txn");
- }
- logArchive.writeBuffer(baos.toByteArray(), "txnEntry");
- logArchive.writeByte((byte) 0x42, "EOR");
+ zks.getLogWriter().append(si);
logCount++;
if (logCount > snapCount / 2
&& r.nextInt(snapCount / 2) == 0) {
- // We just want one snapshot going at a time
- if (snapInProcess != null
- && snapInProcess.isAlive()) {
- LOG.warn("Too busy to snap, skipping");
- } else {
- logStream = null;
- logArchive = null;
- snapInProcess = new Thread() {
- public void run() {
- try {
- zks.snapshot();
- } catch (Exception e) {
- LOG.warn("Unexpected exception",e);
- }
- }
- };
- snapInProcess.start();
- }
+ // roll the log
+ zks.getLogWriter().rollLog();
+ // take a snapshot
+ startSnapshot();
logCount = 0;
}
- }
toFlush.add(si);
if (toFlush.size() > 1000) {
flush(toFlush);
@@ -236,33 +103,15 @@
System.exit(11);
}
ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
- "SyncRequestProcessor exiyed!");
+ "SyncRequestProcessor exited!");
}
@SuppressWarnings("unchecked")
private void flush(LinkedList<Request> toFlush) throws IOException {
- if (toFlush.size() == 0) {
+ if (toFlush.size() == 0)
return;
- }
- LinkedList<FileOutputStream> streamsToFlushNow;
- synchronized (streamsToFlush) {
- streamsToFlushNow = (LinkedList<FileOutputStream>) streamsToFlush
- .clone();
- }
- for (FileOutputStream fos : streamsToFlushNow) {
- fos.flush();
- if (forceSync) {
- fos.getChannel().force(false);
- }
- }
- while (streamsToFlushNow.size() > 1) {
- FileOutputStream fos = streamsToFlushNow.removeFirst();
- fos.close();
- synchronized (streamsToFlush) {
- streamsToFlush.remove(fos);
- }
- }
+ zks.getLogWriter().commit();
while (toFlush.size() > 0) {
Request i = toFlush.remove();
nextProcessor.processRequest(i);
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java?rev=700690&r1=700689&r2=700690&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java Tue Sep 30 23:26:47 2008
@@ -18,53 +18,36 @@
package org.apache.zookeeper.server;
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
-import java.io.EOFException;
import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
-import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
-import org.apache.log4j.Logger;
-
import org.apache.jute.BinaryInputArchive;
import org.apache.jute.BinaryOutputArchive;
import org.apache.jute.InputArchive;
+import org.apache.jute.OutputArchive;
import org.apache.jute.Record;
+import org.apache.log4j.Logger;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.proto.RequestHeader;
import org.apache.zookeeper.server.SessionTracker.SessionExpirer;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog.PlayBackListener;
import org.apache.zookeeper.server.quorum.Leader;
import org.apache.zookeeper.server.quorum.QuorumPacket;
import org.apache.zookeeper.server.quorum.Leader.Proposal;
-import org.apache.zookeeper.txn.CreateSessionTxn;
-import org.apache.zookeeper.txn.CreateTxn;
-import org.apache.zookeeper.txn.DeleteTxn;
-import org.apache.zookeeper.txn.ErrorTxn;
-import org.apache.zookeeper.txn.SetACLTxn;
-import org.apache.zookeeper.txn.SetDataTxn;
+import org.apache.zookeeper.server.util.SerializeUtils;
import org.apache.zookeeper.txn.TxnHeader;
/**
@@ -108,14 +91,7 @@
private DataTreeBuilder treeBuilder;
public DataTree dataTree;
protected SessionTracker sessionTracker;
- /**
- * directory for storing the snapshot
- */
- File dataDir;
- /**
- * directoy for storing the log tnxns
- */
- File dataLogDir;
+ private FileTxnSnapLog txnLogFactory = null;
protected ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;
protected long hzxid = 0;
final public static Exception ok = new Exception("No prob");
@@ -131,12 +107,12 @@
List<ChangeRecord> outstandingChanges = new ArrayList<ChangeRecord>();
private NIOServerCnxn.Factory serverCnxnFactory;
private int clientPort;
-
-
+
void removeCnxn(ServerCnxn cnxn) {
dataTree.removeCnxn(cnxn);
}
+
/**
* Creates a ZooKeeperServer instance. Nothing is setup, use the setX
* methods to prepare the instance (eg datadir, datalogdir, ticktime,
@@ -148,6 +124,8 @@
ServerStats.getInstance().setStatsProvider(this);
treeBuilder = new BasicDataTreeBuilder();
}
+
+
/**
* Creates a ZooKeeperServer instance. It sets everything up, but doesn't
* actually start listening for clients until run() is invoked.
@@ -155,27 +133,25 @@
* @param dataDir the directory to put the data
* @throws IOException
*/
- public ZooKeeperServer(File dataDir, File dataLogDir, int tickTime,
+ public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime,
DataTreeBuilder treeBuilder) throws IOException {
- this.dataDir = dataDir;
- this.dataLogDir = dataLogDir;
+ this.txnLogFactory=txnLogFactory;
this.tickTime = tickTime;
this.treeBuilder = treeBuilder;
ServerStats.getInstance().setStatsProvider(this);
- LOG.info("Created server with dataDir:" + dataDir
- + " dataLogDir:" + dataLogDir
- + " tickTime:" + tickTime);
+ LOG.info("Created server");
}
/**
* This constructor is for backward compatibility with the existing unit
* test code.
+ * It defaults to FileLogProvider persistence provider.
*/
- public ZooKeeperServer(File dataDir, File dataLogDir, int tickTime)
- throws IOException
- {
- this(dataDir, dataLogDir, tickTime, new BasicDataTreeBuilder());
+ public ZooKeeperServer(File snapDir, File logDir, int tickTime)
+ throws IOException {
+ this(new FileTxnSnapLog(snapDir,logDir),
+ tickTime,new BasicDataTreeBuilder());
}
/**
@@ -183,180 +159,28 @@
*
* @throws IOException
*/
- public ZooKeeperServer(DataTreeBuilder treeBuilder) throws IOException {
- this(new File(ServerConfig.getDataDir()), new File(ServerConfig
- .getDataLogDir()), DEFAULT_TICK_TIME, treeBuilder);
- }
-
- public static long getZxidFromName(String name, String prefix) {
- long zxid = -1;
- String nameParts[] = name.split("\\.");
- if (nameParts.length == 2 && nameParts[0].equals(prefix)) {
- try {
- zxid = Long.parseLong(nameParts[1], 16);
- } catch (NumberFormatException e) {
- LOG.warn("unable to parse zxid string into long: "
- + nameParts[1]);
- }
- }
- return zxid;
- }
-
- static public long isValidSnapshot(File f) throws IOException {
- long zxid = getZxidFromName(f.getName(), "snapshot");
- if (zxid == -1)
- return -1;
-
- // Check for a valid snapshot
- RandomAccessFile raf = new RandomAccessFile(f, "r");
- try {
- raf.seek(raf.length() - 5);
- byte bytes[] = new byte[5];
- raf.read(bytes);
- ByteBuffer bb = ByteBuffer.wrap(bytes);
- int len = bb.getInt();
- byte b = bb.get();
- if (len != 1 || b != '/') {
- LOG.warn("Invalid snapshot " + f + " len = " + len
- + " byte = " + (b & 0xff));
- return -1;
- }
- } finally {
- raf.close();
- }
-
- return zxid;
- }
-
- /**
- * Compare file file names of form "prefix.version". Sort order result
- * returned in order of version.
- */
- private static class DataDirFileComparator implements Comparator<File> {
- private String prefix;
- private boolean ascending;
- public DataDirFileComparator(String prefix, boolean ascending) {
- this.prefix = prefix;
- this.ascending = ascending;
- }
-
- public int compare(File o1, File o2) {
- long z1 = getZxidFromName(o1.getName(), prefix);
- long z2 = getZxidFromName(o2.getName(), prefix);
- int result = z1 < z2 ? -1 : (z1 > z2 ? 1 : 0);
- return ascending ? result : -result;
- }
- }
-
- /**
- * Sort the list of files. Recency as determined by the version component
- * of the file name.
- *
- * @param files array of files
- * @param prefix files not matching this prefix are assumed to have a
- * version = -1)
- * @param ascending true sorted in ascending order, false results in
- * descending order
- * @return sorted input files
- */
- static List<File>
- sortDataDir(File[] files, String prefix, boolean ascending)
- {
- List<File> filelist = Arrays.asList(files);
- Collections.sort(filelist, new DataDirFileComparator(prefix, ascending));
- return filelist;
- }
-
- /**
- * Find the log file that starts at, or just before, the snapshot. Return
- * this and all subsequent logs. Results are ordered by zxid of file,
- * ascending order.
- *
- * @param logDirList array of files
- * @param snapshotZxid return files at, or before this zxid
- * @return
- */
- static File[] getLogFiles(File[] logDirList,long snapshotZxid) {
- List<File> files = sortDataDir(logDirList, "log", true);
- long logZxid = 0;
- // Find the log file that starts before or at the same time as the
- // zxid of the snapshot
- for (File f : files) {
- long fzxid = getZxidFromName(f.getName(), "log");
- if (fzxid > snapshotZxid) {
- continue;
- }
- if (fzxid > logZxid) {
- logZxid = fzxid;
- }
- }
- List<File> v=new ArrayList<File>(5);
- // Apply the logs
- for (File f : files) {
- long fzxid = getZxidFromName(f.getName(), "log");
- if (fzxid < logZxid) {
- continue;
- }
- v.add(f);
- }
- return v.toArray(new File[0]);
+ public ZooKeeperServer(FileTxnSnapLog txnLogFactory,DataTreeBuilder treeBuilder) throws IOException {
+ this(txnLogFactory, DEFAULT_TICK_TIME, treeBuilder);
}
/**
* Restore sessions and data
*/
- private void loadSnapshotAndLogs() throws IOException {
- long zxid = -1;
-
- // Find the most recent snapshot
- List<File> files = sortDataDir(dataDir.listFiles(), "snapshot", false);
- for (File f : files) {
- zxid = isValidSnapshot(f);
- if (zxid == -1) {
- LOG.warn("Skipping " + f);
- continue;
- }
-
- LOG.warn("Processing snapshot: " + f);
-
- FileInputStream snapFIS = new FileInputStream(f);
- try {
- InputStream snapIS = new BufferedInputStream(snapFIS);
- try {
- loadData(BinaryInputArchive.getArchive(snapIS));
- } finally {
- snapIS.close();
- }
- } finally {
- snapFIS.close();
- }
-
- dataTree.lastProcessedZxid = zxid;
-
- // Apply the logs on/after the selected snapshot
- File[] logfiles = getLogFiles(dataLogDir.listFiles(), zxid);
- for (File logfile : logfiles) {
- LOG.warn("Processing log file: " + logfile);
-
- InputStream logIS =
- new BufferedInputStream(new FileInputStream(logfile));
- zxid = playLog(BinaryInputArchive.getArchive(logIS));
- logIS.close();
- }
- hzxid = zxid;
-
- break;
- }
-
- if (zxid == -1) {
- sessionsWithTimeouts = new ConcurrentHashMap<Long, Integer>();
- dataTree = treeBuilder.build();
- }
- }
-
public void loadData() throws IOException, InterruptedException {
- loadSnapshotAndLogs();
-
+ PlayBackListener listener=new PlayBackListener(){
+ public void onTxnLoaded(TxnHeader hdr,Record txn){
+ Request r = new Request(null, 0, hdr.getCxid(),hdr.getType(),
+ null, null);
+ r.txn = txn;
+ r.hdr = hdr;
+ r.zxid = hdr.getZxid();
+ addCommittedProposal(r);
+ }
+ };
+ sessionsWithTimeouts = new ConcurrentHashMap<Long, Integer>();
+ dataTree = treeBuilder.build();
+ long zxid = txnLogFactory.restore(dataTree,sessionsWithTimeouts,listener);
+ this.hzxid = zxid;
// Clean up dead sessions
LinkedList<Long> deadSessions = new LinkedList<Long>();
for (long session : dataTree.getSessions()) {
@@ -369,94 +193,14 @@
killSession(session);
}
// Make a clean snapshot
- snapshot();
- }
-
- public void loadData(InputArchive ia) throws IOException {
- sessionsWithTimeouts = new ConcurrentHashMap<Long, Integer>();
- dataTree = treeBuilder.build();
-
- int count = ia.readInt("count");
- while (count > 0) {
- long id = ia.readLong("id");
- int to = ia.readInt("timeout");
- sessionsWithTimeouts.put(id, to);
- ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
- "loadData --- session in archive: " + id
- + " with timeout: " + to);
- count--;
- }
- dataTree.deserialize(ia, "tree");
- }
-
- public long playLog(InputArchive logStream) throws IOException {
- long highestZxid = 0;
- try {
- while (true) {
- byte[] bytes = logStream.readBuffer("txnEntry");
- if (bytes.length == 0) {
- // Since we preallocate, we define EOF to be an
- // empty transaction
- throw new EOFException();
- }
- InputArchive ia = BinaryInputArchive
- .getArchive(new ByteArrayInputStream(bytes));
- TxnHeader hdr = new TxnHeader();
- Record txn = deserializeTxn(ia, hdr);
- if (logStream.readByte("EOR") != 'B') {
- LOG.warn("Last transaction was partial.");
- throw new EOFException();
- }
- if (hdr.getZxid() <= highestZxid && highestZxid != 0) {
- LOG.error(highestZxid + "(higestZxid) >= "
- + hdr.getZxid() + "(next log) for type "
- + hdr.getType());
- } else {
- highestZxid = hdr.getZxid();
- }
- switch (hdr.getType()) {
- case OpCode.createSession:
- sessionsWithTimeouts.put(hdr.getClientId(),
- ((CreateSessionTxn) txn).getTimeOut());
- ZooTrace.logTraceMessage(LOG,
- ZooTrace.SESSION_TRACE_MASK,
- "playLog --- create session in log: 0x"
- + Long.toHexString(hdr.getClientId())
- + " with timeout: "
- + ((CreateSessionTxn) txn).getTimeOut());
- // give dataTree a chance to sync its lastProcessedZxid
- dataTree.processTxn(hdr, txn);
- break;
- case OpCode.closeSession:
- sessionsWithTimeouts.remove(hdr.getClientId());
- ZooTrace.logTraceMessage(LOG,
- ZooTrace.SESSION_TRACE_MASK,
- "playLog --- close session in log: 0x"
- + Long.toHexString(hdr.getClientId()));
- dataTree.processTxn(hdr, txn);
- break;
- default:
- dataTree.processTxn(hdr, txn);
- }
- Request r = new Request(null, 0, hdr.getCxid(), hdr.getType(),
- null, null);
- r.txn = txn;
- r.hdr = hdr;
- r.zxid = hdr.getZxid();
- addCommittedProposal(r);
- }
- } catch (EOFException e) {
- // expected in some cases - see comments in try block
- }
- return highestZxid;
+ takeSnapshot();
}
/**
* maintains a list of last 500 or so committed requests. This is used for
* fast follower synchronization.
*
- * @param request
- * committed request
+ * @param request committed request
*/
public void addCommittedProposal(Request request) {
@@ -492,135 +236,9 @@
}
}
- static public Record deserializeTxn(InputArchive ia, TxnHeader hdr)
- throws IOException {
- hdr.deserialize(ia, "hdr");
- Record txn = null;
- switch (hdr.getType()) {
- case OpCode.createSession:
- // This isn't really an error txn; it just has the same
- // format. The error represents the timeout
- txn = new CreateSessionTxn();
- break;
- case OpCode.closeSession:
- return null;
- case OpCode.create:
- txn = new CreateTxn();
- break;
- case OpCode.delete:
- txn = new DeleteTxn();
- break;
- case OpCode.setData:
- txn = new SetDataTxn();
- break;
- case OpCode.setACL:
- txn = new SetACLTxn();
- break;
- case OpCode.error:
- txn = new ErrorTxn();
- break;
- }
- if (txn != null) {
- txn.deserialize(ia, "txn");
- }
- return txn;
- }
-
- public void truncateLog(long finalZxid) throws IOException {
- long highestZxid = 0;
- for (File f : dataDir.listFiles()) {
- long zxid = isValidSnapshot(f);
- if (zxid == -1) {
- LOG.warn("Skipping " + f);
- continue;
- }
- if (zxid > highestZxid) {
- highestZxid = zxid;
- }
- }
- File[] files = getLogFiles(dataLogDir.listFiles(), highestZxid);
- boolean truncated = false;
- for (File f : files) {
- FileInputStream fin = new FileInputStream(f);
- InputArchive ia = BinaryInputArchive.getArchive(fin);
- FileChannel fchan = fin.getChannel();
- try {
- while (true) {
- byte[] bytes = ia.readBuffer("txtEntry");
- if (bytes.length == 0) {
- // Since we preallocate, we define EOF to be an
- // empty transaction
- throw new EOFException();
- }
- InputArchive iab = BinaryInputArchive
- .getArchive(new ByteArrayInputStream(bytes));
- TxnHeader hdr = new TxnHeader();
- deserializeTxn(iab, hdr);
- if (ia.readByte("EOF") != 'B') {
- LOG.warn("Last transaction was partial.");
- throw new EOFException();
- }
- if (hdr.getZxid() == finalZxid) {
- // this is where we need to truncate
-
- long pos = fchan.position();
- fin.close();
- FileOutputStream fout = new FileOutputStream(f);
- FileChannel fchanOut = fout.getChannel();
- fchanOut.truncate(pos);
- fchanOut.close();
- fout.close();
- truncated = true;
- break;
- }
- }
- } catch (EOFException eof) {
- // expected in some cases - see comments in try block
- } finally {
- fchan.close();
- fin.close();
- }
- if (truncated == true) {
- break;
- }
- }
- if (truncated == false) {
- // not able to truncate the log
- LOG.error("Not able to truncate the log zxid 0x"
- + Long.toHexString(finalZxid));
- System.exit(13);
- }
-
- }
-
- public void snapshot(BinaryOutputArchive oa) throws IOException,
- InterruptedException {
- HashMap<Long, Integer> sessSnap = new HashMap<Long, Integer>(
- sessionsWithTimeouts);
- oa.writeInt(sessSnap.size(), "count");
- for (Entry<Long, Integer> entry : sessSnap.entrySet()) {
- oa.writeLong(entry.getKey().longValue(), "id");
- oa.writeInt(entry.getValue().intValue(), "timeout");
- }
- dataTree.serialize(oa, "tree");
- }
-
- public void snapshot() throws InterruptedException {
- long lastZxid = dataTree.lastProcessedZxid;
- ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
- "Snapshotting: zxid 0x" + Long.toHexString(lastZxid));
+ public void takeSnapshot(){
try {
- File f = new File(dataDir, "snapshot." + Long.toHexString(lastZxid));
- OutputStream sessOS = new BufferedOutputStream(new FileOutputStream(f));
- try {
- BinaryOutputArchive oa = BinaryOutputArchive.getArchive(sessOS);
- snapshot(oa);
- sessOS.flush();
- } finally {
- sessOS.close();
- }
- ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
- "Snapshotting finished: zxid 0x" + Long.toHexString(lastZxid));
+ txnLogFactory.save(dataTree, sessionsWithTimeouts);
} catch (IOException e) {
LOG.error("Severe error, exiting",e);
// This is a severe error that we cannot recover from,
@@ -629,6 +247,18 @@
}
}
+ public void serializeSnapshot(OutputArchive oa) throws IOException,
+ InterruptedException {
+ SerializeUtils.serializeSnapshot(dataTree, oa, sessionsWithTimeouts);
+ }
+
+ public void deserializeSnapshot(InputArchive ia) throws IOException {
+ sessionsWithTimeouts = new ConcurrentHashMap<Long, Integer>();
+ dataTree = treeBuilder.build();
+
+ SerializeUtils.deserializeSnapshot(dataTree,ia,sessionsWithTimeouts);
+ }
+
/**
* This should be called from a synchronized block on this!
*/
@@ -644,10 +274,6 @@
return System.currentTimeMillis();
}
- static String getLogName(long zxid) {
- return "log." + Long.toHexString(zxid);
- }
-
public void closeSession(long sessionId) throws InterruptedException {
ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
"ZooKeeperServer --- Session to be closed: 0x"
@@ -691,14 +317,6 @@
}
public void startup() throws IOException, InterruptedException {
- if (dataDir == null || !dataDir.isDirectory()) {
- throw new IOException("data directory does not exist: " + dataDir);
- }
- if (dataLogDir == null || !dataLogDir.isDirectory()) {
- throw new IOException("data log directory does not exist: "
- + dataLogDir);
- }
-
if (dataTree == null) {
loadData();
}
@@ -916,14 +534,42 @@
return serverCnxnFactory;
}
+ /**
+ * return the last proceesed id from the
+ * datatree
+ */
public long getLastProcessedZxid() {
return dataTree.lastProcessedZxid;
}
+ /**
+ * return the outstanding requests
+ * in the queue, which havent been
+ * processed yet
+ */
public long getOutstandingRequests() {
return getInProcess();
}
+ /**
+ * trunccate the log to get in sync with others
+ * if in a quorum
+ * @param zxid the zxid that it needs to get in sync
+ * with others
+ * @throws IOException
+ */
+ public void truncateLog(long zxid) throws IOException {
+ this.txnLogFactory.truncateLog(zxid);
+ }
+
+ /**
+ * the snapshot and logwriter for this instance
+ * @return
+ */
+ public FileTxnSnapLog getLogWriter() {
+ return this.txnLogFactory;
+ }
+
public int getTickTime() {
return tickTime;
}
@@ -939,41 +585,7 @@
public void setTreeBuilder(DataTreeBuilder treeBuilder) {
this.treeBuilder = treeBuilder;
}
-
- /**
- * Gets directory for storing the snapshot
- */
- public File getDataDir() {
- return dataDir;
- }
-
- /**
- * Sets directory for storing the snapshot
- */
- public void setDataDir(File dataDir) throws IOException {
- this.dataDir = dataDir;
- if (!dataDir.isDirectory()) {
- throw new IOException("data directory does not exist");
- }
- }
-
- /**
- * Gets directoy for storing the log tnxns
- */
- public File getDataLogDir() {
- return dataLogDir;
- }
-
- /**
- * Sets directoy for storing the log tnxns
- */
- public void setDataLogDir(File dataLogDir) throws IOException {
- this.dataLogDir = dataLogDir;
- if (!dataLogDir.isDirectory()) {
- throw new IOException("data log directory does not exist");
- }
- }
-
+
public int getClientPort() {
return clientPort;
}
@@ -981,4 +593,12 @@
public void setClientPort(int clientPort) {
this.clientPort = clientPort;
}
+
+ public void setTxnLogFactory(FileTxnSnapLog txnLog) {
+ this.txnLogFactory = txnLog;
+ }
+
+ public FileTxnSnapLog getTxnLogFactory() {
+ return this.txnLogFactory;
+ }
}
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java?rev=700690&r1=700689&r2=700690&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java Tue Sep 30 23:26:47 2008
@@ -22,12 +22,14 @@
import java.io.IOException;
import org.apache.log4j.Logger;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.ZooKeeperServer.BasicDataTreeBuilder;
/**
* This class starts and runs a standalone ZooKeeperServer.
*/
public class ZooKeeperServerMain {
-
+
private static final Logger LOG = Logger.getLogger(ZooKeeperServerMain.class);
/*
@@ -43,11 +45,15 @@
}
public ZooKeeperServer createServer() throws IOException {
+ // create a file logger url from the command line args
ZooKeeperServer zks = new ZooKeeperServer();
- zks.setDataDir(new File(ServerConfig.getDataDir()));
- zks.setDataLogDir(new File(ServerConfig.getDataLogDir()));
zks.setClientPort(ServerConfig.getClientPort());
- return zks;
+
+ FileTxnSnapLog ftxn = new FileTxnSnapLog(new
+ File(ServerConfig.getDataLogDir()),
+ new File(ServerConfig.getDataDir()));
+ zks.setTxnLogFactory(ftxn);
+ return zks;
}
});
}
Added: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileSnap.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileSnap.java?rev=700690&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileSnap.java (added)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileSnap.java Tue Sep 30 23:26:47 2008
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.persistence;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.InputArchive;
+import org.apache.jute.OutputArchive;
+import org.apache.zookeeper.server.DataTree;
+import org.apache.zookeeper.server.util.SerializeUtils;
+
+/**
+ * This class implements the snapshot interface.
+ * it is responsible for storing, serializing
+ * and deserializing the right snapshot.
+ * and provides access to the snapshots.
+ */
+public class FileSnap implements SnapShot {
+ File snapDir;
+ private static final int VERSION=2;
+ private static final long dbId=-1;
+ public final static int MAGIC = ByteBuffer.wrap("AK47".getBytes()).getInt();
+ public FileSnap(File snapDir) {
+ this.snapDir = snapDir;
+ }
+
+ /**
+ * deserialize a data tree from the most recent snapshot
+ * @return the zxid of the snapshot
+ */
+ public long deserialize(DataTree dt, Map<Long, Integer> sessions)
+ throws IOException {
+ File snap = findMostRecentSnapshot();
+ if (snap == null) {
+ return -1L;
+ }
+ InputStream snapIS = new BufferedInputStream(new FileInputStream(snap));
+ InputArchive ia=BinaryInputArchive.getArchive(snapIS);
+ deserialize(dt,sessions,ia);
+ snapIS.close();
+ dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), "snapshot");
+ return dt.lastProcessedZxid;
+ }
+
+ /**
+ * deserialize the datatree from an inputarchive
+ * @param dt the datatree to be serialized into
+ * @param sessions the sessions to be filled up
+ * @param ia the input archive to restore from
+ * @throws IOException
+ */
+ protected void deserialize(DataTree dt, Map<Long, Integer> sessions,
+ InputArchive ia) throws IOException {
+ FileHeader header = new FileHeader();
+ header.deserialize(ia, "fileheader");
+ SerializeUtils.deserializeSnapshot(dt,ia,sessions);
+ }
+
+ /**
+ * find the most recent snapshot in the database.
+ * @return the file containing the most recent snapshot
+ */
+ public File findMostRecentSnapshot() throws IOException {
+ List<File> files = Util.sortDataDir(snapDir.listFiles(), "snapshot", false);
+ for (File f : files) {
+ if(Util.isValidSnapshot(f))
+ return f;
+ }
+ return null;
+ }
+
+ /**
+ * serialize the datatree and sessions
+ * @param dt the datatree to be serialized
+ * @param sessions the sessions to be serialized
+ * @param oa the output archive to serialize into
+ * @param header the header of this snapshot
+ * @throws IOException
+ */
+ protected void serialize(DataTree dt,Map<Long, Integer> sessions,
+ OutputArchive oa, FileHeader header) throws IOException {
+ // this is really a programmatic error and not something that can
+ // happen at runtime
+ if(header==null)
+ throw new IllegalStateException(
+ "Snapshot's not open for writing: uninitialized header");
+ header.serialize(oa, "fileheader");
+ SerializeUtils.serializeSnapshot(dt,oa,sessions);
+ }
+
+ /**
+ * serialize the datatree and session into the file snapshot
+ * @param dt the datatree to be serialized
+ * @param sessions the sessions to be serialized
+ * @param snapShot the file to store snapshot into
+ */
+ public void serialize(DataTree dt, Map<Long, Integer> sessions, File snapShot)
+ throws IOException {
+ OutputStream sessOS = new BufferedOutputStream(new FileOutputStream(snapShot));
+ OutputArchive oa = BinaryOutputArchive.getArchive(sessOS);
+ FileHeader header = new FileHeader(MAGIC, VERSION, dbId);
+ serialize(dt,sessions,oa, header);
+ sessOS.flush();
+ sessOS.close();
+ }
+
+ }
\ No newline at end of file