You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fp...@apache.org on 2013/11/27 00:43:29 UTC

svn commit: r1545883 - in /zookeeper/branches/branch-3.4: ./ src/java/main/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/server/quorum/

Author: fpj
Date: Tue Nov 26 23:43:29 2013
New Revision: 1545883

URL: http://svn.apache.org/r1545883
Log:
ZOOKEEPER-1653. zookeeper fails to start because of inconsistent epoch (michim via fpj)


Modified:
    zookeeper/branches/branch-3.4/CHANGES.txt
    zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
    zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
    zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
    zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java

Modified: zookeeper/branches/branch-3.4/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/CHANGES.txt?rev=1545883&r1=1545882&r2=1545883&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/CHANGES.txt (original)
+++ zookeeper/branches/branch-3.4/CHANGES.txt Tue Nov 26 23:43:29 2013
@@ -164,6 +164,9 @@ BUGFIXES:
 
   ZOOKEEPER-1817. Fix don't care for b3.4 (fpj)
 
+  ZOOKEEPER-1653. zookeeper fails to start because of inconsistent
+  epoch (michim via fpj)
+
 IMPROVEMENTS:
 
   ZOOKEEPER-1564. Allow JUnit test build with IBM Java

Modified: zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Learner.java?rev=1545883&r1=1545882&r2=1545883&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Learner.java (original)
+++ zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Learner.java Tue Nov 26 23:43:29 2013
@@ -23,6 +23,7 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.File;
 import java.io.IOException;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
@@ -423,8 +424,22 @@ public class Learner {       
                     self.cnxnFactory.setZooKeeperServer(zk);                
                     break outerLoop;
                 case Leader.NEWLEADER: // it will be NEWLEADER in v1.0
+                    // Create updatingEpoch file and remove it after current
+                    // epoch is set. QuorumPeer.loadDataBase() uses this file to
+                    // detect the case where the server was terminated after
+                    // taking a snapshot but before setting the current epoch.
+                    File updating = new File(self.getTxnFactory().getSnapDir(),
+                                        QuorumPeer.UPDATING_EPOCH_FILENAME);
+                    if (!updating.exists() && !updating.createNewFile()) {
+                        throw new IOException("Failed to create " +
+                                              updating.toString());
+                    }
                     zk.takeSnapshot();
                     self.setCurrentEpoch(newEpoch);
+                    if (!updating.delete()) {
+                        throw new IOException("Failed to delete " +
+                                              updating.toString());
+                    }
                     snapshotTaken = true;
                     writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
                     break;

Modified: zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=1545883&r1=1545882&r2=1545883&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original)
+++ zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Tue Nov 26 23:43:29 2013
@@ -443,7 +443,9 @@ public class QuorumPeer extends Thread i
         super.start();
     }
 
-	private void loadDataBase() {
+    private void loadDataBase() {
+        File updating = new File(getTxnFactory().getSnapDir(),
+                                 UPDATING_EPOCH_FILENAME);
 		try {
             zkDb.loadDataBase();
 
@@ -452,6 +454,17 @@ public class QuorumPeer extends Thread i
     		long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
             try {
             	currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
+                if (epochOfZxid > currentEpoch && updating.exists()) {
+                    LOG.info("{} found. The server was terminated after " +
+                             "taking a snapshot but before updating current " +
+                             "epoch. Setting current epoch to {}.",
+                             UPDATING_EPOCH_FILENAME, epochOfZxid);
+                    setCurrentEpoch(epochOfZxid);
+                    if (!updating.delete()) {
+                        throw new IOException("Failed to delete " +
+                                              updating.toString());
+                    }
+                }
             } catch(FileNotFoundException e) {
             	// pick a reasonable epoch number
             	// this should only happen once when moving to a
@@ -1156,6 +1169,8 @@ public class QuorumPeer extends Thread i
 
 	public static final String ACCEPTED_EPOCH_FILENAME = "acceptedEpoch";
 
+    public static final String UPDATING_EPOCH_FILENAME = "updatingEpoch";
+
 	/**
 	 * Write a long value to disk atomically. Either succeeds or an exception
 	 * is thrown.

Modified: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java?rev=1545883&r1=1545882&r2=1545883&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java (original)
+++ zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java Tue Nov 26 23:43:29 2013
@@ -20,8 +20,13 @@ package org.apache.zookeeper.server.quor
 
 import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
 
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileReader;
 import java.io.LineNumberReader;
+import java.io.OutputStreamWriter;
 import java.io.StringReader;
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -29,6 +34,7 @@ import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 
 import org.apache.log4j.Layout;
@@ -44,8 +50,10 @@ import org.apache.zookeeper.ZooDefs.OpCo
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper.States;
+import org.apache.zookeeper.common.AtomicFileOutputStream;
 import org.apache.zookeeper.server.quorum.Leader.Proposal;
 import org.apache.zookeeper.server.quorum.QuorumPeerTestBase.MainThread;
+import org.apache.zookeeper.server.util.ZxidUtils;
 import org.apache.zookeeper.test.ClientBase;
 import org.junit.Assert;
 import org.junit.Test;
@@ -56,6 +64,9 @@ import org.junit.Test;
  *
  */
 public class QuorumPeerMainTest extends QuorumPeerTestBase {
+    protected static final Logger LOG =
+        Logger.getLogger(QuorumPeerMainTest.class);
+
 	/**
      * Verify the ability to start a cluster.
      */
@@ -669,4 +680,106 @@ public class QuorumPeerMainTest extends 
                     " to shutdown, expected " + maxwait);
         }
     }
+
+    static long readLongFromFile(File file) throws IOException {
+        BufferedReader br = new BufferedReader(new FileReader(file));
+        String line = "";
+        try {
+            line = br.readLine();
+            return Long.parseLong(line);
+        } catch(NumberFormatException e) {
+            throw new IOException("Found " + line + " in " + file);
+        } finally {
+            br.close();
+        }
+    }
+
+    static void writeLongToFile(File file, long value) throws IOException {
+        AtomicFileOutputStream out = new AtomicFileOutputStream(file);
+        BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out));
+        try {
+            bw.write(Long.toString(value));
+            bw.flush();
+            out.flush();
+            out.close();
+        } catch (IOException e) {
+            LOG.error("Failed to write new file " + file, e);
+            out.abort();
+            throw e;
+        }
+    }
+
+    /**
+     * ZOOKEEPER-1653 Make sure the server starts if the current epoch is less
+     * than the epoch from last logged zxid and updatingEpoch file exists.
+     */
+    @Test
+    public void testUpdatingEpoch() throws Exception {
+        // Create a cluster and restart them multiple times to bump the epoch.
+        int numServers = 3;
+        Servers servers = LaunchServers(numServers);
+        File currentEpochFile;
+        for (int i = 0; i < 10; i++) {
+            for (int j = 0; j < numServers; j++) {
+                servers.mt[j].shutdown();
+            }
+            waitForAll(servers.zk, States.CONNECTING);
+            for (int j = 0; j < numServers; j++) {
+                servers.mt[j].start();
+            }
+            waitForAll(servers.zk, States.CONNECTED);
+        }
+
+        // Current epoch is 11 now.
+        for (int i = 0; i < numServers; i++) {
+            currentEpochFile = new File(
+                new File(servers.mt[i].dataDir, "version-2"),
+                QuorumPeer.CURRENT_EPOCH_FILENAME);
+            LOG.info("Validating current epoch: " + servers.mt[i].dataDir);
+            Assert.assertEquals("Current epoch should be 11.", 11,
+                                readLongFromFile(currentEpochFile));
+        }
+
+        // Find a follower and get epoch from the last logged zxid.
+        int followerIndex = -1;
+        for (int i = 0; i < numServers; i++) {
+            if (servers.mt[i].main.quorumPeer.leader == null) {
+                followerIndex = i;
+                break;
+            }
+        }
+        Assert.assertTrue("Found a valid follower",
+                          followerIndex >= 0 && followerIndex < numServers);
+        MainThread follower = servers.mt[followerIndex];
+        long zxid = follower.main.quorumPeer.getLastLoggedZxid();
+        long epochFromZxid = ZxidUtils.getEpochFromZxid(zxid);
+
+        // Shutdown the cluster
+        for (int i = 0; i < numServers; i++) {
+          servers.mt[i].shutdown();
+        }
+        waitForAll(servers.zk, States.CONNECTING);
+
+        // Make current epoch less than epoch from the last logged zxid.
+        // The server should fail to start.
+        File followerDataDir = new File(follower.dataDir, "version-2");
+        currentEpochFile = new File(followerDataDir,
+                QuorumPeer.CURRENT_EPOCH_FILENAME);
+        writeLongToFile(currentEpochFile, epochFromZxid - 1);
+        follower.start();
+        Assert.assertTrue(follower.mainFailed.await(10, TimeUnit.SECONDS));
+
+        // Touch the updateEpoch file. Now the server should start.
+        File updatingEpochFile = new File(followerDataDir,
+                QuorumPeer.UPDATING_EPOCH_FILENAME);
+        updatingEpochFile.createNewFile();
+        for (int i = 0; i < numServers; i++) {
+          servers.mt[i].start();
+        }
+        waitForAll(servers.zk, States.CONNECTED);
+        Assert.assertNotNull("Make sure the server started with acceptEpoch",
+                             follower.main.quorumPeer.getActiveServer());
+        Assert.assertFalse("updatingEpoch file should get deleted",
+                           updatingEpochFile.exists());
+    }
 }

Modified: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java?rev=1545883&r1=1545882&r2=1545883&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java (original)
+++ zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java Tue Nov 26 23:43:29 2013
@@ -24,6 +24,7 @@ package org.apache.zookeeper.server.quor
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
 
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
@@ -57,6 +58,8 @@ public class QuorumPeerTestBase extends 
     public static class MainThread implements Runnable {
         final File confFile;
         volatile TestQPMain main;
+        final File dataDir;
+        CountDownLatch mainFailed;
 
         public MainThread(int myid, int clientPort, String quorumCfgSection)
                 throws IOException {
@@ -70,7 +73,7 @@ public class QuorumPeerTestBase extends 
             fwriter.write("initLimit=10\n");
             fwriter.write("syncLimit=5\n");
 
-            File dataDir = new File(tmpDir, "data");
+            dataDir = new File(tmpDir, "data");
             if (!dataDir.mkdir()) {
                 throw new IOException("Unable to mkdir " + dataDir);
             }
@@ -101,6 +104,7 @@ public class QuorumPeerTestBase extends 
             main = new TestQPMain();
             currentThread = new Thread(this);
             currentThread.start();
+            mainFailed = new CountDownLatch(1);
         }
 
         public void run() {
@@ -111,6 +115,8 @@ public class QuorumPeerTestBase extends 
             } catch (Exception e) {
                 // test will still fail even though we just log/ignore
                 LOG.error("unexpected exception in run", e);
+                main.shutdown();
+                mainFailed.countDown();
             } finally {
                 currentThread = null;
             }