You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sh...@apache.org on 2012/02/02 19:57:49 UTC

svn commit: r1239760 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ src/main/java/org/apache/hadoop/hdfs/server/namenode/ src/test/java/org/apache/hadoop/hdfs/ src/test/java/or...

Author: shv
Date: Thu Feb  2 18:57:49 2012
New Revision: 1239760

URL: http://svn.apache.org/viewvc?rev=1239760&view=rev
Log:
HDFS-2718. Optimize OP_ADD in edits loading. Contributed by Konstantin Shvachko.

Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAbandonBlock.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1239760&r1=1239759&r2=1239760&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Feb  2 18:57:49 2012
@@ -1665,6 +1665,8 @@ Release 0.22.1 - Unreleased
 
   OPTIMIZATIONS
 
+    HDFS-2718. Optimize OP_ADD in edits loading. (shv)
+
   BUG FIXES
 
 Release 0.22.0 - 2011-11-29

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java?rev=1239760&r1=1239759&r2=1239760&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java Thu Feb  2 18:57:49 2012
@@ -157,9 +157,6 @@ public class BlockInfoUnderConstruction 
   BlockInfo convertToCompleteBlock() throws IOException {
     assert getBlockUCState() != BlockUCState.COMPLETE :
       "Trying to convert a COMPLETE block";
-    if(getBlockUCState() != BlockUCState.COMMITTED)
-      throw new IOException(
-          "Cannot complete block: block has not been COMMITTED by the client");
     return new BlockInfo(this);
   }
 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1239760&r1=1239759&r2=1239760&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Thu Feb  2 18:57:49 2012
@@ -438,15 +438,23 @@ public class BlockManager {
    */
   private BlockInfo completeBlock(final INodeFile fileINode,
       final int blkIndex) throws IOException {
+    return completeBlock(fileINode, blkIndex, false);
+  }
+
+  public BlockInfo completeBlock(final INodeFile fileINode, 
+      final int blkIndex, final boolean force) throws IOException {
     if(blkIndex < 0)
       return null;
     BlockInfo curBlock = fileINode.getBlocks()[blkIndex];
     if(curBlock.isComplete())
       return curBlock;
     BlockInfoUnderConstruction ucBlock = (BlockInfoUnderConstruction)curBlock;
-    if(ucBlock.numNodes() < minReplication)
+    if(!force && ucBlock.numNodes() < minReplication)
       throw new IOException("Cannot complete block: " +
           "block does not satisfy minimal replication requirement.");
+    if(!force && ucBlock.getBlockUCState() != BlockUCState.COMMITTED)
+      throw new IOException(
+          "Cannot complete block: block has not been COMMITTED by the client");
     BlockInfo completeBlock = ucBlock.convertToCompleteBlock();
     // replace penultimate block in file
     fileINode.setBlock(blkIndex, completeBlock);

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1239760&r1=1239759&r2=1239760&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Thu Feb  2 18:57:49 2012
@@ -267,22 +267,28 @@ public class FSDirectory implements Clos
                             short replication,
                             long modificationTime,
                             long atime,
-                            long preferredBlockSize) 
+                            long preferredBlockSize,
+                            String clientName,
+                            String clientMachine)
       throws UnresolvedLinkException {
     INode newNode;
-    long diskspace = UNKNOWN_DISK_SPACE;
     assert hasWriteLock();
     if (blocks == null)
       newNode = new INodeDirectory(permissions, modificationTime);
-    else {
+    else if(blocks.length == 0 || blocks[blocks.length-1].getBlockUCState()
+        == BlockUCState.UNDER_CONSTRUCTION) {
+      newNode = new INodeFileUnderConstruction(
+          permissions, blocks.length, replication,
+          preferredBlockSize, modificationTime, clientName, 
+          clientMachine, null);
+    } else {
       newNode = new INodeFile(permissions, blocks.length, replication,
                               modificationTime, atime, preferredBlockSize);
-      diskspace = ((INodeFile)newNode).diskspaceConsumed(blocks);
     }
     writeLock();
     try {
       try {
-        newNode = addNode(path, newNode, diskspace);
+        newNode = addNode(path, newNode, UNKNOWN_DISK_SPACE);
         if(newNode != null && blocks != null) {
           int nrBlocks = blocks.length;
           // Add file->block mapping
@@ -301,6 +307,74 @@ public class FSDirectory implements Clos
 
   }
 
+  /**
+   * Update files in-memory data structures with new block information.
+   * @throws IOException 
+   */
+  void updateFile(INodeFile file,
+                  String path,
+                  PermissionStatus permissions,
+                  BlockInfo[] blocks, 
+                  short replication,
+                  long mtime,
+                  long atime,
+                  long preferredBlockSize) throws IOException {
+
+    // Update the salient file attributes.
+    file.setAccessTime(atime);
+    file.setModificationTimeForce(mtime);
+
+    // Update its block list
+    BlockInfo[] oldBlocks = file.getBlocks();
+
+    // Are we only updating the last block's gen stamp.
+    boolean isGenStampUpdate = oldBlocks.length == blocks.length;
+
+    // First, update blocks in common
+    BlockInfo oldBlock = null;
+    for (int i = 0; i < oldBlocks.length && i < blocks.length; i++) {
+      oldBlock = oldBlocks[i];
+      Block newBlock = blocks[i];
+
+      boolean isLastBlock = i == oldBlocks.length - 1;
+      if (oldBlock.getBlockId() != newBlock.getBlockId() ||
+          (oldBlock.getGenerationStamp() != newBlock.getGenerationStamp() && 
+              !(isGenStampUpdate && isLastBlock))) {
+        throw new IOException("Mismatched block IDs or generation stamps, " + 
+            "attempting to replace block " + oldBlock + " with " + newBlock +
+            " as block # " + i + "/" + blocks.length + " of " + path);
+      }
+
+      oldBlock.setNumBytes(newBlock.getNumBytes());
+      oldBlock.setGenerationStamp(newBlock.getGenerationStamp());
+    }
+
+    if (blocks.length < oldBlocks.length) {
+      // We're removing a block from the file, e.g. abandonBlock(...)
+      if (!file.isUnderConstruction()) {
+        throw new IOException("Trying to remove a block from file " +
+            path + " which is not under construction.");
+      }
+      if (blocks.length != oldBlocks.length - 1) {
+        throw new IOException("Trying to remove more than one block from file "
+            + path);
+      }
+      unprotectedRemoveBlock(path,
+          (INodeFileUnderConstruction)file, oldBlocks[oldBlocks.length - 1]);
+    } else if (blocks.length > oldBlocks.length) {
+      // We're adding blocks
+      // First complete last old Block
+      getBlockManager().completeBlock(file, oldBlocks.length-1, true);
+      // Add the new blocks
+      for (int i = oldBlocks.length; i < blocks.length; i++) {
+        // addBlock();
+        BlockInfo newBI = blocks[i];
+        getBlockManager().addINode(newBI, file);
+        file.addBlock(newBI);
+      }
+    }
+  }
+
   INodeDirectory addToParent(byte[] src, INodeDirectory parentINode,
       INode newNode, boolean propagateModTime) throws UnresolvedLinkException {
     // NOTE: This does not update space counts for parents
@@ -422,28 +496,33 @@ public class FSDirectory implements Clos
 
     writeLock();
     try {
-      // modify file-> block and blocksMap
-      fileNode.removeLastBlock(block);
-      getBlockManager().removeBlockFromMap(block);
-
+      unprotectedRemoveBlock(path, fileNode, block);
       // write modified block locations to log
       fsImage.getEditLog().logOpenFile(path, fileNode);
-      if(NameNode.stateChangeLog.isDebugEnabled()) {
-        NameNode.stateChangeLog.debug("DIR* FSDirectory.removeBlock: "
-            +path+" with "+block
-            +" block is removed from the file system");
-      }
-
-      // update space consumed
-      INode[] pathINodes = getExistingPathINodes(path);
-      updateCount(pathINodes, pathINodes.length-1, 0,
-          -fileNode.getPreferredBlockSize()*fileNode.getReplication(), true);
     } finally {
       writeUnlock();
     }
     return true;
   }
 
+  void unprotectedRemoveBlock(String path, INodeFileUnderConstruction fileNode, 
+      Block block) throws IOException {
+    // modify file-> block and blocksMap
+    fileNode.removeLastBlock(block);
+    getBlockManager().removeBlockFromMap(block);
+
+    if(NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* FSDirectory.removeBlock: "
+          +path+" with "+block
+          +" block is removed from the file system");
+    }
+
+    // update space consumed
+    INode[] pathINodes = getExistingPathINodes(path);
+    updateCount(pathINodes, pathINodes.length-1, 0,
+        -fileNode.getPreferredBlockSize()*fileNode.getReplication(), true);
+  }
+
   /**
    * @see #unprotectedRenameTo(String, String, long)
    * @deprecated Use {@link #renameTo(String, String, Rename...)} instead.

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1239760&r1=1239759&r2=1239760&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Thu Feb  2 18:57:49 2012
@@ -187,31 +187,53 @@ public class FSEditLogLoader {
                   " clientMachine " + addCloseOp.clientMachine);
             }
 
-            fsDir.unprotectedDelete(addCloseOp.path, addCloseOp.mtime);
-
-            // add to the file tree
-            INodeFile node = (INodeFile)fsDir.unprotectedAddFile(
-                addCloseOp.path, permissions,
-                blocks, replication,
-                addCloseOp.mtime, addCloseOp.atime, blockSize);
-            if (addCloseOp.opCode == FSEditLogOpCodes.OP_ADD) {
-              //
-              // Replace current node with a INodeUnderConstruction.
-              // Recreate in-memory lease record.
-              //
-              INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
-                                        node.getLocalNameBytes(),
-                                        node.getReplication(),
-                                        node.getModificationTime(),
-                                        node.getPreferredBlockSize(),
-                                        node.getBlocks(),
-                                        node.getPermissionStatus(),
-                                        addCloseOp.clientName,
-                                        addCloseOp.clientMachine,
-                                        null);
-              fsDir.replaceNode(addCloseOp.path, node, cons);
-              fsNamesys.leaseManager.addLease(cons.getClientName(),
-                                              addCloseOp.path);
+            // There are four cases here:
+            // 1. OP_ADD to create a new file
+            // 2. OP_ADD to update file blocks
+            // 3. OP_ADD to open file for append
+            // 4. OP_CLOSE to close the file
+
+            // See if the file already exists
+            INodeFile oldFile = fsDir.getFileINode(addCloseOp.path);
+            if (oldFile == null) { // OP_ADD for a new file
+              assert addCloseOp.opCode == FSEditLogOpCodes.OP_ADD : 
+                "Expected opcode OP_ADD, but got " + addCloseOp.opCode;
+              fsDir.unprotectedAddFile(
+                  addCloseOp.path, permissions, blocks, replication,
+                  addCloseOp.mtime, addCloseOp.atime, blockSize,
+                  addCloseOp.clientName, addCloseOp.clientMachine);
+            } else {
+              fsDir.updateFile(oldFile,
+                  addCloseOp.path, permissions, blocks, replication,
+                  addCloseOp.mtime, addCloseOp.atime, blockSize);
+              if(addCloseOp.opCode == FSEditLogOpCodes.OP_CLOSE) {  // OP_CLOSE
+                assert oldFile.isUnderConstruction() : 
+                  "File is not under construction: " + addCloseOp.path;
+                fsNamesys.getBlockManager().completeBlock(
+                    oldFile, blocks.length-1, true);
+                INodeFile newFile =
+                  ((INodeFileUnderConstruction)oldFile).convertToInodeFile();
+                fsDir.replaceNode(addCloseOp.path, oldFile, newFile);
+              } else if(! oldFile.isUnderConstruction()) {  // OP_ADD for append
+                INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
+                    oldFile.getLocalNameBytes(),
+                    oldFile.getReplication(), 
+                    oldFile.getModificationTime(),
+                    oldFile.getPreferredBlockSize(),
+                    oldFile.getBlocks(),
+                    oldFile.getPermissionStatus(),
+                    addCloseOp.clientName,
+                    addCloseOp.clientMachine,
+                    null);
+                fsDir.replaceNode(addCloseOp.path, oldFile, cons);
+              }
+            }
+            // Update file lease
+            if(addCloseOp.opCode == FSEditLogOpCodes.OP_ADD) {
+              fsNamesys.leaseManager.addLease(addCloseOp.clientName, addCloseOp.path);
+            } else {  // Ops.OP_CLOSE
+              fsNamesys.leaseManager.removeLease(
+                  ((INodeFileUnderConstruction)oldFile).getClientName(), addCloseOp.path);
             }
             break;
           }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java?rev=1239760&r1=1239759&r2=1239760&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java Thu Feb  2 18:57:49 2012
@@ -41,8 +41,20 @@ public class INodeFileUnderConstruction 
                              String clientName,
                              String clientMachine,
                              DatanodeDescriptor clientNode) {
-    super(permissions.applyUMask(UMASK), 0, replication, modTime, modTime,
-        preferredBlockSize);
+    this(permissions, 0, replication, preferredBlockSize, modTime,
+        clientName, clientMachine, clientNode);
+  }
+
+  INodeFileUnderConstruction(PermissionStatus permissions,
+                             int nrBlocks,
+                             short replication,
+                             long preferredBlockSize,
+                             long modTime,
+                             String clientName,
+                             String clientMachine,
+                             DatanodeDescriptor clientNode) {
+    super(permissions.applyUMask(UMASK), nrBlocks, replication,
+        modTime, modTime, preferredBlockSize);
     this.clientName = clientName;
     this.clientMachine = clientMachine;
     this.clientNode = clientNode;

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAbandonBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAbandonBlock.java?rev=1239760&r1=1239759&r2=1239760&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAbandonBlock.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAbandonBlock.java Thu Feb  2 18:57:49 2012
@@ -72,12 +72,20 @@ public class TestAbandonBlock {
 
     // Now abandon the last block
     DFSClient dfsclient = DFSClientAdapter.getDFSClient((DistributedFileSystem)fs);
-    LocatedBlocks blocks = dfsclient.getNamenode().getBlockLocations(src, 0, 1);
+    LocatedBlocks blocks =
+      dfsclient.getNamenode().getBlockLocations(src, 0, Integer.MAX_VALUE);
+    int orginalNumBlocks = blocks.locatedBlockCount();
     LocatedBlock b = blocks.getLastLocatedBlock();
     dfsclient.getNamenode().abandonBlock(b.getBlock(), src, dfsclient.clientName);
 
     // And close the file
     fout.close();
+
+    // Close cluster and check the block has been abandoned after restart
+    cluster.restartNameNode();
+    blocks = dfsclient.getNamenode().getBlockLocations(src, 0, Integer.MAX_VALUE);
+    assert orginalNumBlocks == blocks.locatedBlockCount() + 1 :
+      "Blocks " + b + " has not been abandoned.";
   }
 
   @Test

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java?rev=1239760&r1=1239759&r2=1239760&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java Thu Feb  2 18:57:49 2012
@@ -116,10 +116,12 @@ public class TestEditLog extends TestCas
     int numTransactions;
     short replication = 3;
     long blockSize = 64;
+    int startIndex;
 
-    Transactions(FSNamesystem ns, int num) {
+    Transactions(FSNamesystem ns, int numTx, int startIdx) {
       namesystem = ns;
-      numTransactions = num;
+      numTransactions = numTx;
+      startIndex = startIdx;
     }
 
     // add a bunch of transactions.
@@ -131,8 +133,8 @@ public class TestEditLog extends TestCas
       for (int i = 0; i < numTransactions; i++) {
         INodeFileUnderConstruction inode = new INodeFileUnderConstruction(
                             p, replication, blockSize, 0, "", "", null);
-        editLog.logOpenFile("/filename" + i, inode);
-        editLog.logCloseFile("/filename" + i, inode);
+        editLog.logOpenFile("/filename" + startIndex + i, inode);
+        editLog.logCloseFile("/filename" + startIndex + i, inode);
         editLog.logSync();
       }
     }
@@ -280,7 +282,8 @@ public class TestEditLog extends TestCas
       // Create threads and make them run transactions concurrently.
       Thread threadId[] = new Thread[NUM_THREADS];
       for (int i = 0; i < NUM_THREADS; i++) {
-        Transactions trans = new Transactions(namesystem, NUM_TRANSACTIONS);
+        Transactions trans =
+          new Transactions(namesystem, NUM_TRANSACTIONS, i*NUM_TRANSACTIONS);
         threadId[i] = new Thread(trans, "TransactionThread-" + i);
         threadId[i].start();
       }
@@ -293,11 +296,16 @@ public class TestEditLog extends TestCas
           i--;      // retry 
         }
       } 
-      
+
+      // Reopen some files as for append
+      Transactions trans = 
+        new Transactions(namesystem, NUM_TRANSACTIONS, NUM_TRANSACTIONS / 2);
+      trans.run();
+
       // Roll another time to finalize edits_inprogress_3
       fsimage.rollEditLog();
       
-      long expectedTxns = (NUM_THREADS * 2 * NUM_TRANSACTIONS) + 2; // +2 for start/end txns
+      long expectedTxns = ((NUM_THREADS+1) * 2 * NUM_TRANSACTIONS) + 2; // +2 for start/end txns
    
       // Verify that we can read in all the transactions that we have written.
       // If there were any corruptions, it is likely that the reading in