You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ma...@apache.org on 2009/02/02 23:26:04 UTC
svn commit: r740131 - in /hadoop/zookeeper/trunk: ./
src/contrib/fatjar/conf/ src/java/main/org/apache/zookeeper/server/
src/java/main/org/apache/zookeeper/server/quorum/ src/java/systest/
src/java/systest/org/apache/zookeeper/test/system/ src/java/tes...
Author: mahadev
Date: Mon Feb 2 22:26:03 2009
New Revision: 740131
URL: http://svn.apache.org/viewvc?rev=740131&view=rev
Log:
ZOOKEEPER-286. Make GenerateLoad use InstanceContainers. (breed via mahadev)
Added:
hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/GenerateLoad.java
- copied, changed from r740129, hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/GenerateLoad.java
Removed:
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/GenerateLoad.java
Modified:
hadoop/zookeeper/trunk/CHANGES.txt
hadoop/zookeeper/trunk/src/contrib/fatjar/conf/mainClasses
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerStats.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
hadoop/zookeeper/trunk/src/java/systest/README.txt
hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/BaseSysTest.java
hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/InstanceContainer.java
hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/InstanceManager.java
hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/QuorumPeerInstance.java
hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/SimpleClient.java
hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/SimpleSysTest.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/PurgeTxnTest.java
Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=740131&r1=740130&r2=740131&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Mon Feb 2 22:26:03 2009
@@ -145,6 +145,8 @@
cleanup datadir (snaps/logs) (mahadev via phunt)
ZOOKEEPER-69. ZooKeeper logo
+
+ ZOOKEEPER-286. Make GenerateLoad use InstanceContainers. (breed via mahadev)
NEW FEATURES:
Modified: hadoop/zookeeper/trunk/src/contrib/fatjar/conf/mainClasses
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/fatjar/conf/mainClasses?rev=740131&r1=740130&r2=740131&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/fatjar/conf/mainClasses (original)
+++ hadoop/zookeeper/trunk/src/contrib/fatjar/conf/mainClasses Mon Feb 2 22:26:03 2009
@@ -3,7 +3,7 @@
::Server Commands
server:org.apache.zookeeper.server.quorum.QuorumPeerMain:Start ZooKeeper server
::Test Commands
-generateLoad:org.apache.zookeeper.test.GenerateLoad:A distributed load generator for testing
+generateLoad:org.apache.zookeeper.test.system.GenerateLoad:A distributed load generator for testing
quorumBench:org.apache.zookeeper.server.QuorumBenchmark:A benchmark of just the quorum protocol
abBench:org.apache.zookeeper.server.quorum.AtomicBroadcastBenchmark:A benchmark of just the atomic broadcast
ic:org.apache.zookeeper.test.system.InstanceContainer:A container that will instantiate classes as directed by an instance manager
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java?rev=740131&r1=740130&r2=740131&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java Mon Feb 2 22:26:03 2009
@@ -376,7 +376,9 @@
stats.packetsSent++;
/* We've sent the whole buffer, so drop the buffer */
sent -= bb.remaining();
- zk.serverStats().incrementPacketsSent();
+ if (zk != null) {
+ zk.serverStats().incrementPacketsSent();
+ }
outgoingBuffers.remove();
}
// ZooLog.logTraceMessage(LOG,
@@ -589,6 +591,7 @@
sb.append(zk.dataTree.dumpEphemerals()).append("\n");
sendBuffer(ByteBuffer.wrap(sb.toString().getBytes()));
}
+ sendBuffer(NIOServerCnxn.closeConn);
k.interestOps(SelectionKey.OP_WRITE);
return;
} else if (len == reqsCmd) {
@@ -603,6 +606,7 @@
}
}
sendBuffer(ByteBuffer.wrap(sb.toString().getBytes()));
+ sendBuffer(NIOServerCnxn.closeConn);
k.interestOps(SelectionKey.OP_WRITE);
return;
} else if (len == statCmd) {
@@ -626,6 +630,7 @@
sb.append("ZooKeeperServer not running\n");
sendBuffer(ByteBuffer.wrap(sb.toString().getBytes()));
+ sendBuffer(NIOServerCnxn.closeConn);
k.interestOps(SelectionKey.OP_WRITE);
return;
} else if (len == enviCmd) {
@@ -642,6 +647,7 @@
}
sendBuffer(ByteBuffer.wrap(sb.toString().getBytes()));
+ sendBuffer(NIOServerCnxn.closeConn);
k.interestOps(SelectionKey.OP_WRITE);
return;
} else if (len == srstCmd) {
@@ -650,6 +656,7 @@
zk.serverStats().reset();
sendBuffer(ByteBuffer.wrap("Stats reset.\n".getBytes()));
+ sendBuffer(NIOServerCnxn.closeConn);
k.interestOps(SelectionKey.OP_WRITE);
return;
}
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerStats.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerStats.java?rev=740131&r1=740130&r2=740131&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerStats.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerStats.java Mon Feb 2 22:26:03 2009
@@ -35,6 +35,7 @@
public interface Provider {
public long getOutstandingRequests();
public long getLastProcessedZxid();
+ public String getState();
}
public ServerStats(Provider provider) {
@@ -74,7 +75,7 @@
}
public String getServerState() {
- return "standalone";
+ return provider.getState();
}
@Override
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java?rev=740131&r1=740130&r2=740131&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java Mon Feb 2 22:26:03 2009
@@ -56,7 +56,7 @@
public SyncRequestProcessor(ZooKeeperServer zks,
RequestProcessor nextProcessor) {
- super("SyncThread:" + zks.getClientPort());
+ super("SyncThread:" + zks.getServerId());
this.zks = zks;
this.nextProcessor = nextProcessor;
start();
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java?rev=740131&r1=740130&r2=740131&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java Mon Feb 2 22:26:03 2009
@@ -119,7 +119,6 @@
int requestsInProcess;
List<ChangeRecord> outstandingChanges = new ArrayList<ChangeRecord>();
private NIOServerCnxn.Factory serverCnxnFactory;
- private int clientPort;
private final ServerStats serverStats;
@@ -637,13 +636,9 @@
}
public int getClientPort() {
- return clientPort;
+ return serverCnxnFactory != null ? serverCnxnFactory.ss.socket().getLocalPort() : -1;
}
- public void setClientPort(int clientPort) {
- this.clientPort = clientPort;
- }
-
public void setTxnLogFactory(FileTxnSnapLog txnLog) {
this.txnLogFactory = txnLog;
}
@@ -651,4 +646,8 @@
public FileTxnSnapLog getTxnLogFactory() {
return this.txnLogFactory;
}
+
+ public String getState() {
+ return "standalone";
+ }
}
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java?rev=740131&r1=740130&r2=740131&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java Mon Feb 2 22:26:03 2009
@@ -61,7 +61,6 @@
public ZooKeeperServer createServer() throws IOException {
// create a file logger url from the command line args
ZooKeeperServer zks = new ZooKeeperServer();
- zks.setClientPort(ServerConfig.getClientPort());
FileTxnSnapLog ftxn = new FileTxnSnapLog(new
File(ServerConfig.getDataLogDir()),
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java?rev=740131&r1=740130&r2=740131&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java Mon Feb 2 22:26:03 2009
@@ -45,7 +45,7 @@
public FollowerRequestProcessor(FollowerZooKeeperServer zks,
RequestProcessor nextProcessor) {
- super("FollowerRequestProcessor:" + zks.getClientPort());
+ super("FollowerRequestProcessor:" + zks.getServerId());
this.zks = zks;
this.nextProcessor = nextProcessor;
start();
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java?rev=740131&r1=740130&r2=740131&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java Mon Feb 2 22:26:03 2009
@@ -83,7 +83,7 @@
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
commitProcessor = new CommitProcessor(finalProcessor,
- Integer.toString(getClientPort()), true);
+ Long.toString(getServerId()), true);
firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
syncProcessor = new SyncRequestProcessor(this,
new SendAckRequestProcessor(getFollower()));
@@ -229,4 +229,9 @@
}
jmxServerBean = null;
}
+
+ @Override
+ public String getState() {
+ return "follower";
+ }
}
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java?rev=740131&r1=740130&r2=740131&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java Mon Feb 2 22:26:03 2009
@@ -62,7 +62,7 @@
RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(
finalProcessor, getLeader().toBeApplied);
commitProcessor = new CommitProcessor(toBeAppliedProcessor,
- Integer.toString(getClientPort()), false);
+ Long.toString(getServerId()), false);
RequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
commitProcessor);
firstProcessor = new PrepRequestProcessor(this, proposalProcessor);
@@ -138,4 +138,9 @@
}
jmxServerBean = null;
}
+
+ @Override
+ public String getState() {
+ return "leader";
+ }
}
Modified: hadoop/zookeeper/trunk/src/java/systest/README.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/systest/README.txt?rev=740131&r1=740130&r2=740131&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/systest/README.txt (original)
+++ hadoop/zookeeper/trunk/src/java/systest/README.txt Mon Feb 2 22:26:03 2009
@@ -40,3 +40,23 @@
java -DsysTest.zkHostPort=hostA:2181 -jar build/contrib/fatjar/zookeeper-dev-fatjar.jar systest org.apache.zookeeper.test.system.SimpleSysTest
where hostA is running the zk server started in step 2) above
+
+InstanceContainers can also be used to run a the saturation benchmark. The
+first two steps are the same as the system test. Step 3 is almost the same:
+
+3) start the InstanceContainer on each host:
+
+e.g. java -jar zookeeper-<version>-fatjar.jar ic <name> <zkHostPort> <prefix>
+
+note prefix can be /sysTest or any other path. If you do use /sysTest, make
+sure the system test isn't running when you run the benchmark.
+
+4) run GenerateLoad using the following
+
+java -jar build/contrib/fatjar/zookeeper-dev-fatjar.jar generateLoad <zkHostPort> <prefix> #servers #clients
+
+Once GenerateLoad is started, it will read commands from stdin. Usually
+the only command you need to know is "percentage" which sets the percentage
+of writes to use in the requests. Once a percentage is set, the benchmark
+will start. "percentage 0" will cause only reads to be issued and
+"percentage 100" will cause only writes to be issued.
Modified: hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/BaseSysTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/BaseSysTest.java?rev=740131&r1=740130&r2=740131&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/BaseSysTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/BaseSysTest.java Mon Feb 2 22:26:03 2009
@@ -23,17 +23,14 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
-import java.security.MessageDigest;
import java.util.HashMap;
import junit.framework.TestCase;
-import org.apache.log4j.Logger;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.server.quorum.QuorumPeer;
-import org.apache.zookeeper.server.quorum.QuorumStats;
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
import org.junit.runner.JUnitCore;
@@ -121,14 +118,13 @@
StringBuilder sbServer = new StringBuilder();
try {
for(int i = 0; i < count; i++) {
- String r = QuorumPeerInstance.createServer(im, i);
+ String r[] = QuorumPeerInstance.createServer(im, i);
if (i > 0) {
sbClient.append(',');
sbServer.append(',');
}
- String parts[] = r.split(",");
- sbClient.append(parts[0]);
- sbServer.append(parts[1]);
+ sbClient.append(r[0]);
+ sbServer.append(r[1]);
}
serverHostPort = sbClient.toString();
quorumHostPort = sbServer.toString();
Copied: hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/GenerateLoad.java (from r740129, hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/GenerateLoad.java)
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/GenerateLoad.java?p2=hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/GenerateLoad.java&p1=hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/GenerateLoad.java&r1=740129&r2=740131&rev=740131&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/GenerateLoad.java (original)
+++ hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/GenerateLoad.java Mon Feb 2 22:26:03 2009
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.zookeeper.test;
+package org.apache.zookeeper.test.system;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
@@ -25,8 +25,11 @@
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintStream;
+import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
@@ -79,7 +82,8 @@
synchronized static void add(long time, int count, Socket s) {
long interval = time / INTERVAL;
if (currentInterval == 0 || currentInterval > interval) {
- System.out.println("Dropping " + count + " for " + new Date(time) + " " + currentInterval + ">" + interval);
+ System.out.println("Dropping " + count + " for " + new Date(time)
+ + " " + currentInterval + ">" + interval);
return;
}
// We track totals by seconds
@@ -109,20 +113,22 @@
public void run() {
try {
System.out.println("Connected to " + s);
- BufferedReader is = new BufferedReader(new InputStreamReader(s.getInputStream()));
+ BufferedReader is = new BufferedReader(new InputStreamReader(s
+ .getInputStream()));
String result;
while ((result = is.readLine()) != null) {
String timePercentCount[] = result.split(" ");
if (timePercentCount.length != 5) {
- System.err.println("Got " + result + " from " + s + " exitng.");
+ System.err.println("Got " + result + " from " + s
+ + " exitng.");
throw new IOException(result);
}
long time = Long.parseLong(timePercentCount[0]);
- //int percent = Integer.parseInt(timePercentCount[1]);
+ // int percent = Integer.parseInt(timePercentCount[1]);
int count = Integer.parseInt(timePercentCount[2]);
int errs = Integer.parseInt(timePercentCount[3]);
if (errs > 0) {
- System.out.println(s+" Got an error! " + errs);
+ System.out.println(s + " Got an error! " + errs);
}
add(time, count, s);
}
@@ -162,6 +168,7 @@
try {
while (true) {
Socket s = ss.accept();
+ System.err.println("Accepted connection from " + s);
slaves.add(new SlaveThread(s));
}
} catch (IOException e) {
@@ -177,6 +184,8 @@
}
static class ReporterThread extends Thread {
+ static int percentage;
+
ReporterThread() {
setDaemon(true);
start();
@@ -186,7 +195,7 @@
try {
currentInterval = System.currentTimeMillis() / INTERVAL;
// Give things time to report;
- Thread.sleep(INTERVAL*2);
+ Thread.sleep(INTERVAL * 2);
long min = 99999;
long max = 0;
long total = 0;
@@ -196,8 +205,10 @@
long lastInterval = currentInterval;
currentInterval += 1;
long count = remove(lastInterval);
- count=count*1000/INTERVAL; // Multiply by 1000 to get reqs/sec
- if (lastChange != 0 && (lastChange + INTERVAL*4 + 5000)< now) {
+ count = count * 1000 / INTERVAL; // Multiply by 1000 to get
+ // reqs/sec
+ if (lastChange != 0
+ && (lastChange + INTERVAL * 4 + 5000) < now) {
// We only want to print anything if things have had a
// chance to change
@@ -211,17 +222,13 @@
number++;
Calendar calendar = Calendar.getInstance();
calendar.setTimeInMillis(lastInterval * INTERVAL);
- String report = lastInterval + " " + calendar.get(Calendar.HOUR_OF_DAY)
- + ":" + calendar.get(Calendar.MINUTE)
- + ":" + calendar.get(Calendar.SECOND)
- + " "
- + percentage
- + "% "
- + count
- + " "
- + min
- + " "
- + ((double)total / (double)number) + " " + max;
+ String report = lastInterval + " "
+ + calendar.get(Calendar.HOUR_OF_DAY) + ":"
+ + calendar.get(Calendar.MINUTE) + ":"
+ + calendar.get(Calendar.SECOND) + " "
+ + percentage + "% " + count + " " + min + " "
+ + ((double) total / (double) number) + " "
+ + max;
System.err.println(report);
if (sf != null) {
sf.println(report);
@@ -243,7 +250,7 @@
synchronized static void sendChange(int percentage) {
long now = System.currentTimeMillis();
long start = now;
- GenerateLoad.percentage = percentage;
+ ReporterThread.percentage = percentage;
for (SlaveThread st : slaves.toArray(new SlaveThread[0])) {
st.send(percentage);
}
@@ -255,214 +262,408 @@
lastChange = now;
}
- static int percentage = -1;
+ static public class GeneratorInstance implements Instance {
- static String host;
+ int percentage = -1;
- static Socket s;
+ int errors;
- static int errors;
+ final Object statSync = new Object();
- static final Object statSync = new Object();
+ int finished;
- static int finished;
+ int reads;
- static int reads;
+ int writes;
- static int writes;
+ int rlatency;
- static int rlatency;
+ int wlatency;
- static int wlatency;
+ int outstanding;
+
+ volatile boolean alive;
- static int outstanding;
+ class ZooKeeperThread extends Thread implements Watcher, DataCallback,
+ StatCallback {
+ String host;
- static class ZooKeeperThread extends Thread implements Watcher, DataCallback,
- StatCallback {
- ZooKeeperThread() {
- setDaemon(true);
- start();
- alive = true;
- }
+ ZooKeeperThread(String host) {
+ setDaemon(true);
+ alive = true;
+ this.host = host;
+ start();
+ }
- static final int outstandingLimit = 100;
+ static final int outstandingLimit = 100;
- synchronized void incOutstanding() throws InterruptedException {
- outstanding++;
- while (outstanding > outstandingLimit) {
- wait();
+ synchronized void incOutstanding() throws InterruptedException {
+ outstanding++;
+ while (outstanding > outstandingLimit) {
+ wait();
+ }
}
- }
- synchronized void decOutstanding() {
- outstanding--;
- notifyAll();
- }
+ synchronized void decOutstanding() {
+ outstanding--;
+ notifyAll();
+ }
- boolean alive;
+ Random r = new Random();
- Random r = new Random();
+ String path;
- String path;
+ ZooKeeper zk;
- ZooKeeper zk;
+ boolean connected;
- public void run() {
- try {
- byte bytes[] = new byte[1024];
- zk = new ZooKeeper(host, 60000, this);
- for(int i = 0; i < 300; i++) {
+ public void run() {
+ try {
+ byte bytes[] = new byte[1024];
+ zk = new ZooKeeper(host, 60000, this);
+ synchronized (this) {
+ if (!connected) {
+ wait(20000);
+ }
+ }
+ for (int i = 0; i < 300; i++) {
+ try {
+ Thread.sleep(100);
+ path = zk.create("/client", new byte[16],
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.EPHEMERAL_SEQUENTIAL);
+ break;
+ } catch (KeeperException e) {
+ LOG.error("keeper exception thrown", e);
+ }
+ }
+ if (path == null) {
+ System.err.println("Couldn't create a node in /!");
+ return;
+ }
+ while (alive) {
+ if (r.nextInt(100) < percentage) {
+ zk.setData(path, bytes, -1, this, System
+ .currentTimeMillis());
+ } else {
+ zk.getData(path, false, this, System
+ .currentTimeMillis());
+ }
+ incOutstanding();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ alive = false;
try {
- Thread.sleep(100);
- path = zk.create("/client", new byte[16], Ids.OPEN_ACL_UNSAFE,
- CreateMode.EPHEMERAL_SEQUENTIAL);
- break;
- } catch(KeeperException e) {
- LOG.error("keeper exception thrown", e);
- }
- }
- if (path == null) {
- System.err.println("Couldn't create a node in /!");
- System.exit(44);
- }
- System.err.println("Created: " + s);
- while (alive) {
- if (r.nextInt(100) < percentage) {
- zk.setData(path, bytes, -1, this, System.currentTimeMillis());
+ zk.close();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ public void process(WatchedEvent event) {
+ System.err.println(event);
+ synchronized (this) {
+ if (event.getType() == EventType.None) {
+ connected = (event.getState() == KeeperState.SyncConnected);
+ notifyAll();
+ }
+ }
+ }
+
+ public void processResult(int rc, String path, Object ctx, byte[] data,
+ Stat stat) {
+ decOutstanding();
+ synchronized (statSync) {
+ if (!alive) {
+ return;
+ }
+ if (rc != 0) {
+ System.err.println("Got rc = " + rc);
+ errors++;
} else {
- zk.getData(path, false, this, System.currentTimeMillis());
+ finished++;
+ rlatency += System.currentTimeMillis() - (Long) ctx;
+ reads++;
}
- incOutstanding();
}
- } catch (Exception e) {
- e.printStackTrace();
- System.exit(3);
- } finally {
- alive = false;
}
- }
- public void process(WatchedEvent event) {
- System.err.println(event);
- synchronized(this) {
- try {
- wait(200);
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ public void processResult(int rc, String path, Object ctx, Stat stat) {
+ decOutstanding();
+ synchronized (statSync) {
+ if (rc != 0) {
+ System.err.println("Got rc = " + rc);
+ errors++;
+ } else {
+ finished++;
+ wlatency += System.currentTimeMillis() - (Long) ctx;
+ writes++;
+ }
}
}
- if (event.getType() == EventType.None && event.getState() == KeeperState.Expired) {
+ }
+
+ class SenderThread extends Thread {
+ Socket s;
+
+ SenderThread(Socket s) {
+ this.s = s;
+ setDaemon(true);
+ start();
+ }
+
+ public void run() {
try {
- zk = new ZooKeeper(host, 10000, this);
- } catch (IOException e) {
+ OutputStream os = s.getOutputStream();
+ finished = 0;
+ errors = 0;
+ while (alive) {
+ Thread.sleep(300);
+ if (percentage == -1 || (finished == 0 && errors == 0)) {
+ continue;
+ }
+ String report = System.currentTimeMillis() + " "
+ + percentage + " " + finished + " " + errors + " "
+ + outstanding + "\n";
+ /* String subreport = reads + " "
+ + (((double) rlatency) / reads) + " " + writes
+ + " " + (((double) wlatency / writes)); */
+ synchronized (statSync) {
+ finished = 0;
+ errors = 0;
+ reads = 0;
+ writes = 0;
+ rlatency = 0;
+ wlatency = 0;
+ }
+ os.write(report.getBytes());
+ //System.out.println("Reporting " + report + "+" + subreport);
+ }
+ } catch (Exception e) {
e.printStackTrace();
}
+
}
}
- public void processResult(int rc, String path, Object ctx, byte[] data,
- Stat stat) {
- decOutstanding();
- synchronized(statSync) {
- if (rc != 0) {
- System.err.println("Got rc = " + rc);
- errors++;
- } else {
- finished++;
- rlatency += System.currentTimeMillis() - (Long)ctx;
- reads++;
- }
- }
+ Socket s;
+ ZooKeeperThread zkThread;
+ SenderThread sendThread;
+ Reporter r;
+
+ public void configure(final String params) {
+ System.err.println("Got " + params);
+ new Thread() {
+ public void run() {
+ try {
+ String parts[] = params.split(" ");
+ String hostPort[] = parts[1].split(":");
+ s = new Socket(hostPort[0], Integer.parseInt(hostPort[1]));
+ zkThread = new ZooKeeperThread(parts[0]);
+ sendThread = new SenderThread(s);
+ BufferedReader is = new BufferedReader(new InputStreamReader(s
+ .getInputStream()));
+ String line;
+ while ((line = is.readLine()) != null) {
+ percentage = Integer.parseInt(line);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }.start();
}
- public void processResult(int rc, String path, Object ctx, Stat stat) {
- decOutstanding();
- synchronized(statSync) {
- if (rc != 0) {
- System.err.println("Got rc = " + rc);
- errors++;
- } else {
- finished++;
- wlatency += System.currentTimeMillis() - (Long)ctx;
- writes++;
- }
- }
+ public void setReporter(Reporter r) {
+ this.r = r;
}
- }
- static class SenderThread extends Thread {
- SenderThread() {
- setDaemon(true);
- start();
+ public void start() {
+ try {
+ r.report("started");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
}
- public void run() {
+
+ public void stop() {
+ alive = false;
+ zkThread.interrupt();
+ sendThread.interrupt();
try {
- OutputStream os = s.getOutputStream();
- finished = 0;
- errors = 0;
- while(true) {
- Thread.sleep(300);
- if (percentage == -1 || (finished == 0 && errors == 0)) {
- continue;
- }
- String report = System.currentTimeMillis() + " " + percentage + " " + finished + " " + errors + " " + outstanding + "\n";
- String subreport = reads + " " + (((double)rlatency)/reads) + " " + writes + " " + (((double)wlatency/writes));
- synchronized(statSync) {
- finished = 0;
- errors = 0;
- reads = 0;
- writes = 0;
- rlatency = 0;
- wlatency = 0;
- }
- os.write(report.getBytes());
- System.out.println("Reporting " + report + "+" + subreport);
- }
+ zkThread.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ try {
+ sendThread.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ try {
+ r.report("stopped");
} catch (Exception e) {
e.printStackTrace();
}
+ try {
+ s.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ }
+
+ private static class StatusWatcher implements Watcher {
+ volatile boolean connected;
+ public void process(WatchedEvent event) {
+ if (event.getType() == Watcher.Event.EventType.None) {
+ synchronized (this) {
+ connected = event.getState() == Watcher.Event.KeeperState.SyncConnected;
+ notifyAll();
+ }
+ }
+ }
+
+ public boolean isConnected() {
+ return connected;
+ }
+
+ synchronized public boolean waitConnected(long timeout)
+ throws InterruptedException {
+ long endTime = System.currentTimeMillis() + timeout;
+ while (!connected && System.currentTimeMillis() < endTime) {
+ wait(endTime - System.currentTimeMillis());
+ }
+ return connected;
}
}
+ private static boolean leaderOnly;
+
+ private static String []processOptions(String args[]) {
+ ArrayList<String> newArgs = new ArrayList<String>();
+ for(String a: args) {
+ if (a.equals("--leaderOnly")) {
+ leaderOnly = true;
+ } else {
+ newArgs.add(a);
+ }
+ }
+ return newArgs.toArray(new String[0]);
+ }
+
/**
* @param args
* @throws InterruptedException
+ * @throws KeeperException
+ * @throws DuplicateNameException
+ * @throws NoAvailableContainers
+ * @throws NoAssignmentException
*/
- public static void main(String[] args) throws InterruptedException {
- if (args.length == 1) {
+ public static void main(String[] args) throws InterruptedException,
+ KeeperException, NoAvailableContainers, DuplicateNameException,
+ NoAssignmentException {
+
+ args = processOptions(args);
+ if (args.length == 4) {
try {
- int port = Integer.parseInt(args[0]);
- ss = new ServerSocket(port);
+ StatusWatcher statusWatcher = new StatusWatcher();
+ ZooKeeper zk = new ZooKeeper(args[0], 15000, statusWatcher);
+ if (!statusWatcher.waitConnected(5000)) {
+ System.err.println("Could not connect to " + args[0]);
+ return;
+ }
+ InstanceManager im = new InstanceManager(zk, args[1]);
+ ss = new ServerSocket(0);
+ int port = ss.getLocalPort();
+ int serverCount = Integer.parseInt(args[2]);
+ int clientCount = Integer.parseInt(args[3]);
+ StringBuilder quorumHostPort = new StringBuilder();
+ StringBuilder zkHostPort = new StringBuilder();
+ for (int i = 0; i < serverCount; i++) {
+ String r[] = QuorumPeerInstance.createServer(im, i);
+ if (i > 0) {
+ quorumHostPort.append(',');
+ zkHostPort.append(',');
+ }
+ zkHostPort.append(r[0]);
+ quorumHostPort.append(r[1]);
+ }
+ for (int i = 0; i < serverCount; i++) {
+ QuorumPeerInstance.startInstance(im, quorumHostPort
+ .toString(), i);
+ }
+ if (leaderOnly) {
+ int tries = 0;
+ outer:
+ while(true) {
+ Thread.sleep(1000);
+ IOException lastException = null;
+ String parts[] = zkHostPort.toString().split(",");
+ for(int i = 0; i < parts.length; i++) {
+ try {
+ String mode = getMode(parts[i]);
+ if (mode.equals("leader")) {
+ zkHostPort = new StringBuilder(parts[i]);
+ System.out.println("Connecting exclusively to " + zkHostPort.toString());
+ break outer;
+ }
+ } catch(IOException e) {
+ lastException = e;
+ }
+ }
+ if (tries++ > 3) {
+ throw lastException;
+ }
+ }
+ }
+ for (int i = 0; i < clientCount; i++) {
+ im.assignInstance("client" + i, GeneratorInstance.class,
+ zkHostPort.toString()
+ + ' '
+ + InetAddress.getLocalHost()
+ .getCanonicalHostName() + ':'
+ + port, 1);
+ }
new AcceptorThread();
new ReporterThread();
- BufferedReader is = new BufferedReader(new InputStreamReader(System.in));
+ BufferedReader is = new BufferedReader(new InputStreamReader(
+ System.in));
String line;
while ((line = is.readLine()) != null) {
try {
String cmdNumber[] = line.split(" ");
- if (cmdNumber[0].equals("percentage") && cmdNumber.length > 1) {
+ if (cmdNumber[0].equals("percentage")
+ && cmdNumber.length > 1) {
int number = Integer.parseInt(cmdNumber[1]);
if (number < 0 || number > 100) {
- throw new NumberFormatException("must be between 0 and 100");
+ throw new NumberFormatException(
+ "must be between 0 and 100");
}
sendChange(number);
- } else if (cmdNumber[0].equals("sleep") && cmdNumber.length > 1) {
+ } else if (cmdNumber[0].equals("sleep")
+ && cmdNumber.length > 1) {
int number = Integer.parseInt(cmdNumber[1]);
- Thread.sleep(number*1000);
- } else if (cmdNumber[0].equals("save") && cmdNumber.length > 1) {
+ Thread.sleep(number * 1000);
+ } else if (cmdNumber[0].equals("save")
+ && cmdNumber.length > 1) {
sf = new PrintStream(cmdNumber[1]);
} else {
System.err.println("Commands must be:");
- System.err.println("\tpercentage new_write_percentage");
+ System.err
+ .println("\tpercentage new_write_percentage");
System.err.println("\tsleep seconds_to_sleep");
System.err.println("\tsave file_to_save_output");
}
} catch (NumberFormatException e) {
- System.out
- .println("Not a valid number: " + e.getMessage());
+ System.out.println("Not a valid number: "
+ + e.getMessage());
}
}
} catch (NumberFormatException e) {
@@ -471,35 +672,29 @@
e.printStackTrace();
System.exit(2);
}
- } else if (args.length == 2) {
- host = args[1];
- String hostPort[] = args[0].split(":");
- try {
- s = new Socket(hostPort[0], Integer
- .parseInt(hostPort[1]));
- new ZooKeeperThread();
- new SenderThread();
- BufferedReader is = new BufferedReader(new InputStreamReader(s.getInputStream()));
- String line;
- while((line = is.readLine()) != null) {
- percentage = Integer.parseInt(line);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
} else {
doUsage();
}
}
+ private static String getMode(String hostPort) throws NumberFormatException, UnknownHostException, IOException {
+ String parts[] = hostPort.split(":");
+ Socket s = new Socket(parts[0], Integer.parseInt(parts[1]));
+ s.getOutputStream().write("stat".getBytes());
+ BufferedReader br = new BufferedReader(new InputStreamReader(s.getInputStream()));
+ String line;
+ while((line = br.readLine()) != null) {
+ if (line.startsWith("Mode: ")) {
+ return line.substring(6);
+ }
+ }
+ return "unknown";
+ }
+
private static void doUsage() {
- System.err
- .println("USAGE: "
- + GenerateLoad.class.getName()
- + " controller_host:port zookeeper_host:port-> connects to a controller");
System.err.println("USAGE: " + GenerateLoad.class.getName()
- + " controller_port -> starts a controller");
+ + " [--leaderOnly] zookeeper_host:port containerPrefix #ofServers #ofClients");
System.exit(2);
}
}
Modified: hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/InstanceContainer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/InstanceContainer.java?rev=740131&r1=740130&r2=740131&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/InstanceContainer.java (original)
+++ hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/InstanceContainer.java Mon Feb 2 22:26:03 2009
@@ -53,7 +53,6 @@
this.myNode = myNode;
this.dc = dc;
}
- @Override
public void process(WatchedEvent event) {
if (event.getPath() != null && event.getPath().equals(myNode)) {
zk.getData(myNode, this, dc, this);
@@ -70,7 +69,6 @@
this.myInstance = myInstance;
lastVer = ver;
}
- @Override
public void processResult(int rc, String path,
Object ctx, byte[] data, Stat stat) {
if (rc == KeeperException.Code.NONODE.intValue()) {
@@ -95,7 +93,6 @@
myReportNode = reportsNode + '/' + child;
}
- @Override
public void report(String report) throws KeeperException, InterruptedException {
for(int j = 0; j < maxTries; j++) {
try {
@@ -217,7 +214,6 @@
}
}
- @Override
public void process(WatchedEvent event) {
if (KeeperState.Expired == event.getState()) {
// It's all over
@@ -231,7 +227,6 @@
}
HashMap<String, Instance> instances = new HashMap<String, Instance>();
- @Override
public void processResult(int rc, String path, Object ctx,
List<String> children) {
if (rc != KeeperException.Code.OK.intValue()) {
@@ -288,6 +283,9 @@
zk.getData(myNode, watcher, dc, watcher);
} catch (Exception e) {
LOG.warn("Skipping " + child, e);
+ if (e.getCause() != null) {
+ LOG.warn("Caused by", e.getCause());
+ }
}
}
Modified: hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/InstanceManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/InstanceManager.java?rev=740131&r1=740130&r2=740131&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/InstanceManager.java (original)
+++ hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/InstanceManager.java Mon Feb 2 22:26:03 2009
@@ -115,7 +115,7 @@
zk.create(readyNode, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch(NodeExistsException e) { /* this is ok */ }
}
- @Override
+
synchronized public void processResult(int rc, String path, Object ctx,
List<String> children) {
if (rc != KeeperException.Code.OK.intValue()) {
@@ -156,7 +156,7 @@
zk.delete(deadNode, -1);
} catch(NoNodeException e) { /* this is ok */ }
}
- @Override
+
public void process(WatchedEvent event) {
if (event.getPath().equals(statusNode)) {
zk.getChildren(statusNode, this, this, null);
@@ -297,7 +297,6 @@
synchronized(eventObj) {
// wait for the node to appear
Stat eStat = zk.exists(reportsNode + '/' + name, new Watcher() {
- @Override
public void process(WatchedEvent event) {
synchronized(eventObj) {
eventObj.notifyAll();
Modified: hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/QuorumPeerInstance.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/QuorumPeerInstance.java?rev=740131&r1=740130&r2=740131&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/QuorumPeerInstance.java (original)
+++ hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/QuorumPeerInstance.java Mon Feb 2 22:26:03 2009
@@ -19,12 +19,14 @@
package org.apache.zookeeper.test.system;
import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
+import java.util.Properties;
import org.apache.log4j.Logger;
import org.apache.zookeeper.KeeperException;
@@ -41,7 +43,6 @@
Reporter r;
QuorumPeer peer;
- @Override
public void setReporter(Reporter r) {
this.r = r;
}
@@ -49,19 +50,33 @@
InetSocketAddress clientAddr;
InetSocketAddress quorumAddr;
HashMap<Long, QuorumServer> peers;
- File dir;
+ File snapDir, logDir;
public QuorumPeerInstance() {
try {
- dir = File.createTempFile("test", ".dir");
+ File tmpFile = File.createTempFile("test", ".dir");
+ File tmpDir = tmpFile.getParentFile();
+ tmpFile.delete();
+ File zkDirs = new File(tmpDir, "zktmp.cfg");
+ logDir = tmpDir;
+ snapDir = tmpDir;
+ if (zkDirs.exists()) {
+ Properties p = new Properties();
+ p.load(new FileInputStream(zkDirs));
+ logDir = new File(p.getProperty("logDir", tmpDir.getAbsolutePath()));
+ snapDir = new File(p.getProperty("snapDir", tmpDir.getAbsolutePath()));
+ }
+ logDir = File.createTempFile("zktst", ".dir", logDir);
+ logDir.delete();
+ logDir.mkdir();
+ snapDir = File.createTempFile("zktst", ".dir", snapDir);
+ snapDir.delete();
+ snapDir.mkdir();
} catch (IOException e) {
e.printStackTrace();
}
- dir.delete();
- dir.mkdir();
}
- @Override
public void configure(String params) {
if (clientAddr == null) {
// The first time we are configured, it is just to tell
@@ -140,7 +155,7 @@
LOG.warn("Peer " + serverId + " already started");
return;
}
- peer = new QuorumPeer(peers, dir, dir, clientAddr.getPort(), 0, serverId, tickTime, initLimit, syncLimit);
+ peer = new QuorumPeer(peers, snapDir, logDir, clientAddr.getPort(), 0, serverId, tickTime, initLimit, syncLimit);
peer.start();
for(int i = 0; i < 5; i++) {
Thread.sleep(500);
@@ -157,7 +172,6 @@
}
}
- @Override
public void start() {
}
@@ -171,7 +185,7 @@
}
dir.delete();
}
- @Override
+
public void stop() {
if (LOG.isDebugEnabled()) {
LOG.debug("Stopping peer " + serverId);
@@ -179,7 +193,12 @@
if (peer != null) {
peer.shutdown();
}
- recursiveDelete(dir);
+ if (logDir != null) {
+ recursiveDelete(logDir);
+ }
+ if (snapDir != null) {
+ recursiveDelete(snapDir);
+ }
}
/**
@@ -192,9 +211,9 @@
* @throws InterruptedException
* @throws KeeperException
*/
- public static String createServer(InstanceManager im, int i) throws NoAvailableContainers, DuplicateNameException, InterruptedException, KeeperException {
+ public static String[] createServer(InstanceManager im, int i) throws NoAvailableContainers, DuplicateNameException, InterruptedException, KeeperException {
im.assignInstance("server"+i, QuorumPeerInstance.class, Integer.toString(i), 50);
- return im.getStatus("server"+i, 3000);
+ return im.getStatus("server"+i, 3000).split(",");
}
Modified: hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/SimpleClient.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/SimpleClient.java?rev=740131&r1=740130&r2=740131&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/SimpleClient.java (original)
+++ hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/SimpleClient.java Mon Feb 2 22:26:03 2009
@@ -31,7 +31,7 @@
import org.apache.zookeeper.data.Stat;
/**
- * The client that gets spawned for the SimpleSysTest
+ * The client that gets spawned for the SimpleSysTest
*
*/
public class SimpleClient implements Instance, Watcher, AsyncCallback.DataCallback, StringCallback, StatCallback {
@@ -42,25 +42,23 @@
transient String myPath;
byte data[];
boolean createdEphemeral;
- @Override
public void configure(String params) {
String parts[] = params.split(" ");
hostPort = parts[1];
this.index = Integer.parseInt(parts[0]);
myPath = "/simpleCase/" + index;
}
- @Override
+
public void start() {
try {
zk = new ZooKeeper(hostPort, 15000, this);
zk.getData("/simpleCase", true, this, null);
- r.report("Client " + index + " connecting to " + hostPort);
+ r.report("Client " + index + " connecting to " + hostPort);
} catch (Exception e) {
e.printStackTrace();
}
}
-
- @Override
+
public void stop() {
try {
if (zk != null) {
@@ -70,13 +68,12 @@
e.printStackTrace();
}
}
- @Override
public void process(WatchedEvent event) {
if (event.getPath() != null && event.getPath().equals("/simpleCase")) {
zk.getData("/simpleCase", true, this, null);
}
}
- @Override
+
public void processResult(int rc, String path, Object ctx, byte[] data,
Stat stat) {
if (rc != 0) {
@@ -94,15 +91,14 @@
} else {
zk.setData(myPath, data, -1, this, null);
}
- }
+ }
}
- @Override
+
public void processResult(int rc, String path, Object ctx, String name) {
if (rc != 0) {
zk.create(myPath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, this, null);
}
}
- @Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
if (rc != 0) {
zk.setData(myPath, data, -1, this, null);
@@ -112,9 +108,8 @@
public String toString() {
return SimpleClient.class.getName() + "[" + index + "] using " + hostPort;
}
-
+
Reporter r;
- @Override
public void setReporter(Reporter r) {
this.r = r;
}
Modified: hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/SimpleSysTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/SimpleSysTest.java?rev=740131&r1=740130&r2=740131&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/SimpleSysTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/SimpleSysTest.java Mon Feb 2 22:26:03 2009
@@ -154,7 +154,6 @@
stopServers();
}
- @Override
public void process(WatchedEvent event) {
if (event.getState() == KeeperState.SyncConnected) {
synchronized(this) {
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/PurgeTxnTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/PurgeTxnTest.java?rev=740131&r1=740130&r2=740131&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/PurgeTxnTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/PurgeTxnTest.java Mon Feb 2 22:26:03 2009
@@ -82,9 +82,8 @@
assertTrue("exactly 3 snapshots ", (numSnaps == 3));
}
- @Override
public void process(WatchedEvent event) {
// do nothing
}
-}
\ No newline at end of file
+}