You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by br...@apache.org on 2010/11/04 17:01:51 UTC
svn commit: r1031051 - in /hadoop/zookeeper/trunk: ./
src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/
src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/
src/java/main/org/apache/zookeeper/ src/java/ma...
Author: breed
Date: Thu Nov 4 16:01:51 2010
New Revision: 1031051
URL: http://svn.apache.org/viewvc?rev=1031051&view=rev
Log:
ZOOKEEPER-907. Spurious "KeeperErrorCode = Session moved" messages
Modified:
hadoop/zookeeper/trunk/CHANGES.txt
hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperMain.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTest.java
Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=1031051&r1=1031050&r2=1031051&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Thu Nov 4 16:01:51 2010
@@ -145,6 +145,8 @@ BUGFIXES:
ZOOKEEPER-898. C Client might not cleanup correctly during close
(jared cantwell via mahadev)
+ ZOOKEEPER-907. Spurious "KeeperErrorCode = Session moved" messages (vishal k via breed)
+
IMPROVEMENTS:
ZOOKEEPER-724. Improve junit test integration - log harness information
(phunt via mahadev)
Modified: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java?rev=1031051&r1=1031050&r2=1031051&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java Thu Nov 4 16:01:51 2010
@@ -56,6 +56,8 @@ public class ServerConfiguration extends
protected final static String RETENTION_SECS = "retention_secs";
protected final static String INTER_REGION_SSL_ENABLED = "inter_region_ssl_enabled";
protected final static String MESSAGES_CONSUMED_THREAD_RUN_INTERVAL = "messages_consumed_thread_run_interval";
+ protected final static String BK_ENSEMBLE_SIZE = "bk_ensemble_size";
+ protected final static String BK_QUORUM_SIZE = "bk_quorum_size";
// these are the derived attributes
protected ByteString myRegionByteString = null;
@@ -244,6 +246,20 @@ public class ServerConfiguration extends
return conf.getInt(MESSAGES_CONSUMED_THREAD_RUN_INTERVAL, 60000);
}
+ // This parameter is used when Bookkeeper is the persistence store
+ // and indicates what the ensemble size is (i.e. how many bookie
+ // servers to stripe the ledger entries across).
+ public int getBkEnsembleSize() {
+ return conf.getInt(BK_ENSEMBLE_SIZE, 3);
+ }
+
+ // This parameter is used when Bookkeeper is the persistence store
+ // and indicates what the quorum size is (i.e. how many redundant
+ // copies of each ledger entry is written).
+ public int getBkQuorumSize() {
+ return conf.getInt(BK_QUORUM_SIZE, 2);
+ }
+
/*
* Is this a valid configuration that we can run with? This code might grow
* over time.
@@ -262,6 +278,11 @@ public class ServerConfiguration extends
throw new ConfigurationException("Region defined does not have required SSL port: " + hubString);
}
}
+ // Validate that the Bookkeeper ensemble size >= quorum size.
+ if (getBkEnsembleSize() < getBkQuorumSize()) {
+ throw new ConfigurationException("BK ensemble size (" + getBkEnsembleSize()
+ + ") is less than the quorum size (" + getBkQuorumSize() + ")");
+ }
// add other checks here
}
Modified: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java?rev=1031051&r1=1031050&r2=1031051&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java Thu Nov 4 16:01:51 2010
@@ -608,10 +608,7 @@ public class BookkeeperPersistenceManage
* the same when we try to write
*/
private void openNewTopicLedger(final int expectedVersionOfLedgersNode, final TopicInfo topicInfo) {
- final int ENSEMBLE_SIZE = 3;
- final int QUORUM_SIZE = 2;
-
- bk.asyncCreateLedger(ENSEMBLE_SIZE, QUORUM_SIZE, DigestType.CRC32, passwd,
+ bk.asyncCreateLedger(cfg.getBkEnsembleSize(), cfg.getBkQuorumSize(), DigestType.CRC32, passwd,
new SafeAsynBKCallback.CreateCallback() {
boolean processed = false;
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperMain.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperMain.java?rev=1031051&r1=1031050&r2=1031051&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperMain.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperMain.java Thu Nov 4 16:01:51 2010
@@ -782,6 +782,9 @@ public class ZooKeeperMain {
}
} else if (cmd.equals("close")) {
zk.close();
+ } else if (cmd.equals("sync") && args.length >= 2) {
+ path = args[1];
+ zk.sync(path, new AsyncCallback.VoidCallback() { public void processResult(int rc, String path, Object ctx) { System.out.println("Sync returned " + rc); } }, null );
} else if (cmd.equals("addauth") && args.length >=2 ) {
byte[] b = null;
if (args.length >= 3)
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java?rev=1031051&r1=1031050&r2=1031051&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java Thu Nov 4 16:01:51 2010
@@ -449,14 +449,14 @@ public class LearnerHandler extends Thre
cxid = bb.getInt();
type = bb.getInt();
bb = bb.slice();
+ Request si;
if(type == OpCode.sync){
- leader.zk.submitRequest(new LearnerSyncRequest(this, sessionId, cxid, type, bb,
- qp.getAuthinfo()));
+ si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());
} else {
- Request si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
- si.setOwner(this);
- leader.zk.submitRequest(si);
+ si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
}
+ si.setOwner(this);
+ leader.zk.submitRequest(si);
break;
default:
}
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java?rev=1031051&r1=1031050&r2=1031051&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java Thu Nov 4 16:01:51 2010
@@ -186,7 +186,6 @@ public class QuorumTest extends QuorumBa
* @throws KeeperException
*/
@Test
- @Ignore
public void testSessionMoved() throws Exception {
String hostPorts[] = qb.hostPort.split(",");
DisconnectableZooKeeper zk = new DisconnectableZooKeeper(hostPorts[0],
@@ -206,6 +205,20 @@ public class QuorumTest extends QuorumBa
zk.getSessionId(),
zk.getSessionPasswd());
zknew.setData("/", new byte[1], -1);
+ final int result[] = new int[1];
+ result[0] = Integer.MAX_VALUE;
+ zknew.sync("/", new AsyncCallback.VoidCallback() {
+ public void processResult(int rc, String path, Object ctx) {
+ synchronized(result) { result[0] = rc; result.notify(); }
+ }
+ }, null);
+ synchronized(result) {
+ if(result[0] == Integer.MAX_VALUE) {
+ result.wait(5000);
+ }
+ }
+ LOG.info(hostPorts[(i+1)%hostPorts.length] + " Sync returned " + result[0]);
+ Assert.assertTrue(result[0] == KeeperException.Code.OK.intValue());
try {
zk.setData("/", new byte[1], -1);
Assert.fail("Should have lost the connection");
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTest.java?rev=1031051&r1=1031050&r2=1031051&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTest.java Thu Nov 4 16:01:51 2010
@@ -307,7 +307,6 @@ public class SessionTest extends ZKTestC
* @throws KeeperException
*/
@Test
- @Ignore
public void testSessionMove() throws Exception {
String hostPorts[] = HOSTPORT.split(",");
DisconnectableZooKeeper zk = new DisconnectableZooKeeper(hostPorts[0],
@@ -324,6 +323,20 @@ public class SessionTest extends ZKTestC
new MyWatcher(Integer.toString(i+1)),
zk.getSessionId(),
zk.getSessionPasswd());
+ final int result[] = new int[1];
+ result[0] = Integer.MAX_VALUE;
+ zknew.sync("/", new AsyncCallback.VoidCallback() {
+ public void processResult(int rc, String path, Object ctx) {
+ synchronized(result) { result[0] = rc; result.notify(); }
+ }
+ }, null);
+ synchronized(result) {
+ if(result[0] == Integer.MAX_VALUE) {
+ result.wait(5000);
+ }
+ }
+ LOG.info(hostPorts[(i+1)%hostPorts.length] + " Sync returned " + result[0]);
+ Assert.assertTrue(result[0] == KeeperException.Code.OK.intValue());
zknew.setData("/", new byte[1], -1);
try {
zk.setData("/", new byte[1], -1);