You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ha...@apache.org on 2017/08/03 18:41:35 UTC

zookeeper git commit: ZOOKEEPER-2853: Update lastZxidSeen in FileTxnLog

Repository: zookeeper
Updated Branches:
  refs/heads/master 69c8cbea1 -> 5c4e44332


ZOOKEEPER-2853: Update lastZxidSeen in FileTxnLog

Author: Fangmin Lyu <al...@fb.com>

Reviewers: Michael Han <ha...@apache.org>, maoling <ma...@sina.com>

Closes #322 from lvfangmin/ZOOKEEPER-2853


Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/5c4e4433
Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/5c4e4433
Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/5c4e4433

Branch: refs/heads/master
Commit: 5c4e44332e55bbf21ca59583f3e8ca97fc4bb266
Parents: 69c8cbe
Author: Fangmin Lyu <al...@fb.com>
Authored: Thu Aug 3 11:41:30 2017 -0700
Committer: Michael Han <ha...@apache.org>
Committed: Thu Aug 3 11:41:30 2017 -0700

----------------------------------------------------------------------
 .../server/persistence/FileTxnLog.java          | 124 ++++++++++---------
 1 file changed, 63 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zookeeper/blob/5c4e4433/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java b/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
index 9f55ab4..72bb583 100644
--- a/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
+++ b/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
@@ -53,25 +53,25 @@ import org.slf4j.LoggerFactory;
  * <blockquote><pre>
  * LogFile:
  *     FileHeader TxnList ZeroPad
- * 
+ *
  * FileHeader: {
  *     magic 4bytes (ZKLG)
  *     version 4bytes
  *     dbid 8bytes
  *   }
- * 
+ *
  * TxnList:
  *     Txn || Txn TxnList
- *     
+ *
  * Txn:
  *     checksum Txnlen TxnHeader Record 0x42
- * 
+ *
  * checksum: 8bytes Adler32 is currently used
  *   calculated across payload -- Txnlen, TxnHeader, Record and 0x42
- * 
+ *
  * Txnlen:
  *     len 4bytes
- * 
+ *
  * TxnHeader: {
  *     sessionid 8bytes
  *     cxid 4bytes
@@ -79,13 +79,13 @@ import org.slf4j.LoggerFactory;
  *     time 8bytes
  *     type 4bytes
  *   }
- *     
+ *
  * Record:
  *     See Jute definition file for details on the various record types
- *      
+ *
  * ZeroPad:
  *     0 padded to EOF (filled during preallocation stage)
- * </pre></blockquote> 
+ * </pre></blockquote>
  */
 public class FileTxnLog implements TxnLog {
     private static final Logger LOG;
@@ -175,7 +175,7 @@ public class FileTxnLog implements TxnLog {
     /**
      * close all the open file handles
      * @throws IOException
-     */
+      */
     public synchronized void close() throws IOException {
         if (logStream != null) {
             logStream.close();
@@ -184,54 +184,56 @@ public class FileTxnLog implements TxnLog {
             log.close();
         }
     }
-    
+
     /**
      * append an entry to the transaction log
      * @param hdr the header of the transaction
      * @param txn the transaction part of the entry
-     * returns true iff something appended, otw false 
+     * returns true iff something appended, otw false
      */
     public synchronized boolean append(TxnHeader hdr, Record txn)
         throws IOException
     {
-        if (hdr != null) {
-            if (hdr.getZxid() <= lastZxidSeen) {
-                LOG.warn("Current zxid " + hdr.getZxid()
-                        + " is <= " + lastZxidSeen + " for "
-                        + hdr.getType());
-            }
-            if (logStream==null) {
-               if(LOG.isInfoEnabled()){
-                    LOG.info("Creating new log file: log." +  
-                            Long.toHexString(hdr.getZxid()));
-               }
-               
-               logFileWrite = new File(logDir, ("log." + 
-                       Long.toHexString(hdr.getZxid())));
-               fos = new FileOutputStream(logFileWrite);
-               logStream=new BufferedOutputStream(fos);
-               oa = BinaryOutputArchive.getArchive(logStream);
-               FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
-               fhdr.serialize(oa, "fileheader");
-               // Make sure that the magic number is written before padding.
-               logStream.flush();
-               currentSize = fos.getChannel().position();
-               streamsToFlush.add(fos);
-            }
-            padFile(fos);
-            byte[] buf = Util.marshallTxnEntry(hdr, txn);
-            if (buf == null || buf.length == 0) {
-                throw new IOException("Faulty serialization for header " +
-                        "and txn");
-            }
-            Checksum crc = makeChecksumAlgorithm();
-            crc.update(buf, 0, buf.length);
-            oa.writeLong(crc.getValue(), "txnEntryCRC");
-            Util.writeTxnBytes(oa, buf);
-            
-            return true;
+        if (hdr == null) {
+            return false;
         }
-        return false;
+        if (hdr.getZxid() <= lastZxidSeen) {
+            LOG.warn("Current zxid " + hdr.getZxid()
+                    + " is <= " + lastZxidSeen + " for "
+                    + hdr.getType());
+        } else {
+            lastZxidSeen = hdr.getZxid();
+        }
+        if (logStream==null) {
+           if(LOG.isInfoEnabled()){
+                LOG.info("Creating new log file: log." +
+                        Long.toHexString(hdr.getZxid()));
+           }
+
+           logFileWrite = new File(logDir, ("log." +
+                   Long.toHexString(hdr.getZxid())));
+           fos = new FileOutputStream(logFileWrite);
+           logStream=new BufferedOutputStream(fos);
+           oa = BinaryOutputArchive.getArchive(logStream);
+           FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
+           fhdr.serialize(oa, "fileheader");
+           // Make sure that the magic number is written before padding.
+           logStream.flush();
+           currentSize = fos.getChannel().position();
+           streamsToFlush.add(fos);
+        }
+        padFile(fos);
+        byte[] buf = Util.marshallTxnEntry(hdr, txn);
+        if (buf == null || buf.length == 0) {
+            throw new IOException("Faulty serialization for header " +
+                    "and txn");
+        }
+        Checksum crc = makeChecksumAlgorithm();
+        crc.update(buf, 0, buf.length);
+        oa.writeLong(crc.getValue(), "txnEntryCRC");
+        Util.writeTxnBytes(oa, buf);
+
+        return true;
     }
 
     /**
@@ -456,10 +458,10 @@ public class FileTxnLog implements TxnLog {
     }
 
     /**
-     * a class that keeps track of the position 
+     * a class that keeps track of the position
      * in the input stream. The position points to offset
-     * that has been consumed by the applications. It can 
-     * wrap buffered input streams to provide the right offset 
+     * that has been consumed by the applications. It can
+     * wrap buffered input streams to provide the right offset
      * for the application.
      */
     static class PositionInputStream extends FilterInputStream {
@@ -468,7 +470,7 @@ public class FileTxnLog implements TxnLog {
             super(in);
             position = 0;
         }
-        
+
         @Override
         public int read() throws IOException {
             int rc = super.read();
@@ -483,9 +485,9 @@ public class FileTxnLog implements TxnLog {
             if (rc > 0) {
                 position += rc;
             }
-            return rc;            
+            return rc;
         }
-        
+
         @Override
         public int read(byte[] b, int off, int len) throws IOException {
             int rc = super.read(b, off, len);
@@ -494,7 +496,7 @@ public class FileTxnLog implements TxnLog {
             }
             return rc;
         }
-        
+
         @Override
         public long skip(long n) throws IOException {
             long rc = super.skip(n);
@@ -522,7 +524,7 @@ public class FileTxnLog implements TxnLog {
             throw new UnsupportedOperationException("reset");
         }
     }
-    
+
     /**
      * this class implements the txnlog iterator interface
      * which is used for reading the transaction logs
@@ -535,7 +537,7 @@ public class FileTxnLog implements TxnLog {
         File logFile;
         InputArchive ia;
         static final String CRC_ERROR="CRC check failed";
-       
+
         PositionInputStream inputStream=null;
         //stored files is the list of files greater than
         //the zxid we are looking for.
@@ -564,7 +566,7 @@ public class FileTxnLog implements TxnLog {
                 }
             }
         }
-        
+
         /**
          * create an iterator over a transaction database directory
          * @param logDir the transaction database directory
@@ -596,7 +598,7 @@ public class FileTxnLog implements TxnLog {
             goToNextLog();
             next();
         }
-        
+
         /**
          * Return total storage size of txnlog that will return by this iterator.
          */
@@ -634,7 +636,7 @@ public class FileTxnLog implements TxnLog {
             FileHeader header= new FileHeader();
             header.deserialize(ia, "fileheader");
             if (header.getMagic() != FileTxnLog.TXNLOG_MAGIC) {
-                throw new IOException("Transaction log: " + this.logFile + " has invalid magic number " 
+                throw new IOException("Transaction log: " + this.logFile + " has invalid magic number "
                         + header.getMagic()
                         + " != " + FileTxnLog.TXNLOG_MAGIC);
             }