You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ha...@apache.org on 2017/08/03 18:41:35 UTC
zookeeper git commit: ZOOKEEPER-2853: Update lastZxidSeen in
FileTxnLog
Repository: zookeeper
Updated Branches:
refs/heads/master 69c8cbea1 -> 5c4e44332
ZOOKEEPER-2853: Update lastZxidSeen in FileTxnLog
Author: Fangmin Lyu <al...@fb.com>
Reviewers: Michael Han <ha...@apache.org>, maoling <ma...@sina.com>
Closes #322 from lvfangmin/ZOOKEEPER-2853
Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/5c4e4433
Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/5c4e4433
Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/5c4e4433
Branch: refs/heads/master
Commit: 5c4e44332e55bbf21ca59583f3e8ca97fc4bb266
Parents: 69c8cbe
Author: Fangmin Lyu <al...@fb.com>
Authored: Thu Aug 3 11:41:30 2017 -0700
Committer: Michael Han <ha...@apache.org>
Committed: Thu Aug 3 11:41:30 2017 -0700
----------------------------------------------------------------------
.../server/persistence/FileTxnLog.java | 124 ++++++++++---------
1 file changed, 63 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/5c4e4433/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java b/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
index 9f55ab4..72bb583 100644
--- a/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
+++ b/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
@@ -53,25 +53,25 @@ import org.slf4j.LoggerFactory;
* <blockquote><pre>
* LogFile:
* FileHeader TxnList ZeroPad
- *
+ *
* FileHeader: {
* magic 4bytes (ZKLG)
* version 4bytes
* dbid 8bytes
* }
- *
+ *
* TxnList:
* Txn || Txn TxnList
- *
+ *
* Txn:
* checksum Txnlen TxnHeader Record 0x42
- *
+ *
* checksum: 8bytes Adler32 is currently used
* calculated across payload -- Txnlen, TxnHeader, Record and 0x42
- *
+ *
* Txnlen:
* len 4bytes
- *
+ *
* TxnHeader: {
* sessionid 8bytes
* cxid 4bytes
@@ -79,13 +79,13 @@ import org.slf4j.LoggerFactory;
* time 8bytes
* type 4bytes
* }
- *
+ *
* Record:
* See Jute definition file for details on the various record types
- *
+ *
* ZeroPad:
* 0 padded to EOF (filled during preallocation stage)
- * </pre></blockquote>
+ * </pre></blockquote>
*/
public class FileTxnLog implements TxnLog {
private static final Logger LOG;
@@ -175,7 +175,7 @@ public class FileTxnLog implements TxnLog {
/**
* close all the open file handles
* @throws IOException
- */
+ */
public synchronized void close() throws IOException {
if (logStream != null) {
logStream.close();
@@ -184,54 +184,56 @@ public class FileTxnLog implements TxnLog {
log.close();
}
}
-
+
/**
* append an entry to the transaction log
* @param hdr the header of the transaction
* @param txn the transaction part of the entry
- * returns true iff something appended, otw false
+ * returns true iff something appended, otw false
*/
public synchronized boolean append(TxnHeader hdr, Record txn)
throws IOException
{
- if (hdr != null) {
- if (hdr.getZxid() <= lastZxidSeen) {
- LOG.warn("Current zxid " + hdr.getZxid()
- + " is <= " + lastZxidSeen + " for "
- + hdr.getType());
- }
- if (logStream==null) {
- if(LOG.isInfoEnabled()){
- LOG.info("Creating new log file: log." +
- Long.toHexString(hdr.getZxid()));
- }
-
- logFileWrite = new File(logDir, ("log." +
- Long.toHexString(hdr.getZxid())));
- fos = new FileOutputStream(logFileWrite);
- logStream=new BufferedOutputStream(fos);
- oa = BinaryOutputArchive.getArchive(logStream);
- FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
- fhdr.serialize(oa, "fileheader");
- // Make sure that the magic number is written before padding.
- logStream.flush();
- currentSize = fos.getChannel().position();
- streamsToFlush.add(fos);
- }
- padFile(fos);
- byte[] buf = Util.marshallTxnEntry(hdr, txn);
- if (buf == null || buf.length == 0) {
- throw new IOException("Faulty serialization for header " +
- "and txn");
- }
- Checksum crc = makeChecksumAlgorithm();
- crc.update(buf, 0, buf.length);
- oa.writeLong(crc.getValue(), "txnEntryCRC");
- Util.writeTxnBytes(oa, buf);
-
- return true;
+ if (hdr == null) {
+ return false;
}
- return false;
+ if (hdr.getZxid() <= lastZxidSeen) {
+ LOG.warn("Current zxid " + hdr.getZxid()
+ + " is <= " + lastZxidSeen + " for "
+ + hdr.getType());
+ } else {
+ lastZxidSeen = hdr.getZxid();
+ }
+ if (logStream==null) {
+ if(LOG.isInfoEnabled()){
+ LOG.info("Creating new log file: log." +
+ Long.toHexString(hdr.getZxid()));
+ }
+
+ logFileWrite = new File(logDir, ("log." +
+ Long.toHexString(hdr.getZxid())));
+ fos = new FileOutputStream(logFileWrite);
+ logStream=new BufferedOutputStream(fos);
+ oa = BinaryOutputArchive.getArchive(logStream);
+ FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
+ fhdr.serialize(oa, "fileheader");
+ // Make sure that the magic number is written before padding.
+ logStream.flush();
+ currentSize = fos.getChannel().position();
+ streamsToFlush.add(fos);
+ }
+ padFile(fos);
+ byte[] buf = Util.marshallTxnEntry(hdr, txn);
+ if (buf == null || buf.length == 0) {
+ throw new IOException("Faulty serialization for header " +
+ "and txn");
+ }
+ Checksum crc = makeChecksumAlgorithm();
+ crc.update(buf, 0, buf.length);
+ oa.writeLong(crc.getValue(), "txnEntryCRC");
+ Util.writeTxnBytes(oa, buf);
+
+ return true;
}
/**
@@ -456,10 +458,10 @@ public class FileTxnLog implements TxnLog {
}
/**
- * a class that keeps track of the position
+ * a class that keeps track of the position
* in the input stream. The position points to offset
- * that has been consumed by the applications. It can
- * wrap buffered input streams to provide the right offset
+ * that has been consumed by the applications. It can
+ * wrap buffered input streams to provide the right offset
* for the application.
*/
static class PositionInputStream extends FilterInputStream {
@@ -468,7 +470,7 @@ public class FileTxnLog implements TxnLog {
super(in);
position = 0;
}
-
+
@Override
public int read() throws IOException {
int rc = super.read();
@@ -483,9 +485,9 @@ public class FileTxnLog implements TxnLog {
if (rc > 0) {
position += rc;
}
- return rc;
+ return rc;
}
-
+
@Override
public int read(byte[] b, int off, int len) throws IOException {
int rc = super.read(b, off, len);
@@ -494,7 +496,7 @@ public class FileTxnLog implements TxnLog {
}
return rc;
}
-
+
@Override
public long skip(long n) throws IOException {
long rc = super.skip(n);
@@ -522,7 +524,7 @@ public class FileTxnLog implements TxnLog {
throw new UnsupportedOperationException("reset");
}
}
-
+
/**
* this class implements the txnlog iterator interface
* which is used for reading the transaction logs
@@ -535,7 +537,7 @@ public class FileTxnLog implements TxnLog {
File logFile;
InputArchive ia;
static final String CRC_ERROR="CRC check failed";
-
+
PositionInputStream inputStream=null;
//stored files is the list of files greater than
//the zxid we are looking for.
@@ -564,7 +566,7 @@ public class FileTxnLog implements TxnLog {
}
}
}
-
+
/**
* create an iterator over a transaction database directory
* @param logDir the transaction database directory
@@ -596,7 +598,7 @@ public class FileTxnLog implements TxnLog {
goToNextLog();
next();
}
-
+
/**
* Return total storage size of txnlog that will return by this iterator.
*/
@@ -634,7 +636,7 @@ public class FileTxnLog implements TxnLog {
FileHeader header= new FileHeader();
header.deserialize(ia, "fileheader");
if (header.getMagic() != FileTxnLog.TXNLOG_MAGIC) {
- throw new IOException("Transaction log: " + this.logFile + " has invalid magic number "
+ throw new IOException("Transaction log: " + this.logFile + " has invalid magic number "
+ header.getMagic()
+ " != " + FileTxnLog.TXNLOG_MAGIC);
}