You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by br...@apache.org on 2010/11/04 17:01:51 UTC

svn commit: r1031051 - in /hadoop/zookeeper/trunk: ./ src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/ src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/ src/java/main/org/apache/zookeeper/ src/java/ma...

Author: breed
Date: Thu Nov  4 16:01:51 2010
New Revision: 1031051

URL: http://svn.apache.org/viewvc?rev=1031051&view=rev
Log:
ZOOKEEPER-907. Spurious "KeeperErrorCode = Session moved" messages

Modified:
    hadoop/zookeeper/trunk/CHANGES.txt
    hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
    hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperMain.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTest.java

Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=1031051&r1=1031050&r2=1031051&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Thu Nov  4 16:01:51 2010
@@ -145,6 +145,8 @@ BUGFIXES: 
   ZOOKEEPER-898. C Client might not cleanup correctly during close 
   (jared cantwell via mahadev)
 
+  ZOOKEEPER-907. Spurious "KeeperErrorCode = Session moved" messages (vishal k via breed)
+
 IMPROVEMENTS:
   ZOOKEEPER-724. Improve junit test integration - log harness information 
   (phunt via mahadev)

Modified: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java?rev=1031051&r1=1031050&r2=1031051&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java Thu Nov  4 16:01:51 2010
@@ -56,6 +56,8 @@ public class ServerConfiguration extends
     protected final static String RETENTION_SECS = "retention_secs";
     protected final static String INTER_REGION_SSL_ENABLED = "inter_region_ssl_enabled";
     protected final static String MESSAGES_CONSUMED_THREAD_RUN_INTERVAL = "messages_consumed_thread_run_interval";
+    protected final static String BK_ENSEMBLE_SIZE = "bk_ensemble_size";
+    protected final static String BK_QUORUM_SIZE = "bk_quorum_size";
 
     // these are the derived attributes
     protected ByteString myRegionByteString = null;
@@ -244,6 +246,20 @@ public class ServerConfiguration extends
         return conf.getInt(MESSAGES_CONSUMED_THREAD_RUN_INTERVAL, 60000);
     }
 
+    // This parameter is used when Bookkeeper is the persistence store
+    // and indicates what the ensemble size is (i.e. how many bookie
+    // servers to stripe the ledger entries across).
+    public int getBkEnsembleSize() {
+        return conf.getInt(BK_ENSEMBLE_SIZE, 3);
+    }
+
+    // This parameter is used when Bookkeeper is the persistence store
+    // and indicates what the quorum size is (i.e. how many redundant
+    // copies of each ledger entry is written).
+    public int getBkQuorumSize() {
+        return conf.getInt(BK_QUORUM_SIZE, 2);
+    }
+
     /*
      * Is this a valid configuration that we can run with? This code might grow
      * over time.
@@ -262,6 +278,11 @@ public class ServerConfiguration extends
                     throw new ConfigurationException("Region defined does not have required SSL port: " + hubString);
             }
         }
+        // Validate that the Bookkeeper ensemble size >= quorum size.
+        if (getBkEnsembleSize() < getBkQuorumSize()) {
+            throw new ConfigurationException("BK ensemble size (" + getBkEnsembleSize()
+                    + ") is less than the quorum size (" + getBkQuorumSize() + ")");
+        }
 
         // add other checks here
     }

Modified: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java?rev=1031051&r1=1031050&r2=1031051&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java Thu Nov  4 16:01:51 2010
@@ -608,10 +608,7 @@ public class BookkeeperPersistenceManage
          *            the same when we try to write
          */
         private void openNewTopicLedger(final int expectedVersionOfLedgersNode, final TopicInfo topicInfo) {
-            final int ENSEMBLE_SIZE = 3;
-            final int QUORUM_SIZE = 2;
-
-            bk.asyncCreateLedger(ENSEMBLE_SIZE, QUORUM_SIZE, DigestType.CRC32, passwd,
+            bk.asyncCreateLedger(cfg.getBkEnsembleSize(), cfg.getBkQuorumSize(), DigestType.CRC32, passwd,
                     new SafeAsynBKCallback.CreateCallback() {
                         boolean processed = false;
 

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperMain.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperMain.java?rev=1031051&r1=1031050&r2=1031051&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperMain.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperMain.java Thu Nov  4 16:01:51 2010
@@ -782,6 +782,9 @@ public class ZooKeeperMain {
             }
         } else if (cmd.equals("close")) {
                 zk.close();            
+        } else if (cmd.equals("sync") && args.length >= 2) {
+            path = args[1];
+            zk.sync(path, new AsyncCallback.VoidCallback() { public void processResult(int rc, String path, Object ctx) { System.out.println("Sync returned " + rc); } }, null );
         } else if (cmd.equals("addauth") && args.length >=2 ) {
             byte[] b = null;
             if (args.length >= 3)

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java?rev=1031051&r1=1031050&r2=1031051&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java Thu Nov  4 16:01:51 2010
@@ -449,14 +449,14 @@ public class LearnerHandler extends Thre
                     cxid = bb.getInt();
                     type = bb.getInt();
                     bb = bb.slice();
+                    Request si;
                     if(type == OpCode.sync){
-                     	leader.zk.submitRequest(new LearnerSyncRequest(this, sessionId, cxid, type, bb,
-                                qp.getAuthinfo()));
+                        si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());
                     } else {
-                        Request si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
-                        si.setOwner(this);
-                        leader.zk.submitRequest(si);
+                        si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
                     }
+                    si.setOwner(this);
+                    leader.zk.submitRequest(si);
                     break;
                 default:
                 }

Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java?rev=1031051&r1=1031050&r2=1031051&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java Thu Nov  4 16:01:51 2010
@@ -186,7 +186,6 @@ public class QuorumTest extends QuorumBa
      * @throws KeeperException
      */
     @Test
-    @Ignore
     public void testSessionMoved() throws Exception {
         String hostPorts[] = qb.hostPort.split(",");
         DisconnectableZooKeeper zk = new DisconnectableZooKeeper(hostPorts[0],
@@ -206,6 +205,20 @@ public class QuorumTest extends QuorumBa
                     zk.getSessionId(),
                     zk.getSessionPasswd());
             zknew.setData("/", new byte[1], -1);
+            final int result[] = new int[1];
+            result[0] = Integer.MAX_VALUE;
+            zknew.sync("/", new AsyncCallback.VoidCallback() {
+                    public void processResult(int rc, String path, Object ctx) {
+                        synchronized(result) { result[0] = rc; result.notify(); }
+                    }
+                }, null);
+            synchronized(result) {
+                if(result[0] == Integer.MAX_VALUE) {
+                    result.wait(5000);
+                }
+            }
+            LOG.info(hostPorts[(i+1)%hostPorts.length] + " Sync returned " + result[0]);
+            Assert.assertTrue(result[0] == KeeperException.Code.OK.intValue());
             try {
                 zk.setData("/", new byte[1], -1);
                 Assert.fail("Should have lost the connection");

Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTest.java?rev=1031051&r1=1031050&r2=1031051&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTest.java Thu Nov  4 16:01:51 2010
@@ -307,7 +307,6 @@ public class SessionTest extends ZKTestC
      * @throws KeeperException
      */
     @Test
-    @Ignore
     public void testSessionMove() throws Exception {
         String hostPorts[] = HOSTPORT.split(",");
         DisconnectableZooKeeper zk = new DisconnectableZooKeeper(hostPorts[0],
@@ -324,6 +323,20 @@ public class SessionTest extends ZKTestC
                     new MyWatcher(Integer.toString(i+1)),
                     zk.getSessionId(),
                     zk.getSessionPasswd());
+            final int result[] = new int[1];
+            result[0] = Integer.MAX_VALUE;
+            zknew.sync("/", new AsyncCallback.VoidCallback() {
+                    public void processResult(int rc, String path, Object ctx) {
+                        synchronized(result) { result[0] = rc; result.notify(); }
+                    }
+                }, null);
+            synchronized(result) {
+                if(result[0] == Integer.MAX_VALUE) {
+                    result.wait(5000);
+                }
+            }
+            LOG.info(hostPorts[(i+1)%hostPorts.length] + " Sync returned " + result[0]);
+            Assert.assertTrue(result[0] == KeeperException.Code.OK.intValue());
             zknew.setData("/", new byte[1], -1);
             try {
                 zk.setData("/", new byte[1], -1);