You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fp...@apache.org on 2011/09/05 19:39:03 UTC
svn commit: r1165369 [4/9] - in /zookeeper/bookkeeper/trunk: ./
bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/
bookkeeper-server/src/main/java/org/apache/bookkeeper/clie...
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java Mon Sep 5 17:38:57 2011
@@ -1,7 +1,7 @@
package org.apache.bookkeeper.test;
/*
- *
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -9,16 +9,16 @@ package org.apache.bookkeeper.test;
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
+ *
*/
import java.io.File;
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieFailureTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieFailureTest.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieFailureTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieFailureTest.java Mon Sep 5 17:38:57 2011
@@ -1,7 +1,7 @@
package org.apache.bookkeeper.test;
/*
- *
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -9,16 +9,16 @@ package org.apache.bookkeeper.test;
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
+ *
*/
import java.io.File;
@@ -48,7 +48,7 @@ import org.junit.Test;
* This test tests read and write, synchronous and asynchronous, strings and
* integers for a BookKeeper client. The test deployment uses a ZooKeeper server
* and three BookKeepers.
- *
+ *
*/
public class BookieFailureTest extends BaseTestCase implements AddCallback, ReadCallback {
@@ -70,7 +70,7 @@ public class BookieFailureTest extends B
ArrayList<byte[]> entries; // generated entries
ArrayList<Integer> entriesSize;
DigestType digestType;
-
+
// Synchronization
SyncObj sync;
Set<Object> syncObjs;
@@ -87,12 +87,12 @@ public class BookieFailureTest extends B
public BookieFailureTest(DigestType digestType) {
super(4);
- this.digestType = digestType;
+ this.digestType = digestType;
}
-
+
/**
* Tests writes and reads when a bookie fails.
- *
+ *
* @throws {@link IOException}
*/
@Test
@@ -100,7 +100,7 @@ public class BookieFailureTest extends B
LOG.info("#### BK1 ####");
auxTestReadWriteAsyncSingleClient(bs.get(0));
}
-
+
@Test
public void testAsyncBK2() throws IOException {
LOG.info("#### BK2 ####");
@@ -118,23 +118,23 @@ public class BookieFailureTest extends B
LOG.info("#### BK4 ####");
auxTestReadWriteAsyncSingleClient(bs.get(3));
}
-
+
@Test
- public void testBookieRecovery() throws Exception{
+ public void testBookieRecovery() throws Exception {
//Shutdown all but 1 bookie
bs.get(0).shutdown();
bs.get(1).shutdown();
bs.get(2).shutdown();
-
+
byte[] passwd = "blah".getBytes();
LedgerHandle lh = bkc.createLedger(1, 1,digestType, passwd);
-
+
int numEntries = 100;
- for (int i=0; i< numEntries; i++){
+ for (int i=0; i< numEntries; i++) {
byte[] data = (""+i).getBytes();
lh.addEntry(data);
}
-
+
bs.get(3).shutdown();
BookieServer server = new BookieServer(initialPort + 3, HOSTPORT, tmpDirs.get(3), new File[] { tmpDirs.get(3)});
server.start();
@@ -142,15 +142,15 @@ public class BookieFailureTest extends B
assertEquals(numEntries - 1 , lh.getLastAddConfirmed());
Enumeration<LedgerEntry> entries = lh.readEntries(0, lh.getLastAddConfirmed());
-
+
int numScanned = 0;
- while (entries.hasMoreElements()){
+ while (entries.hasMoreElements()) {
assertEquals((""+numScanned), new String(entries.nextElement().getEntry()));
numScanned++;
}
assertEquals(numEntries, numScanned);
-
-
+
+
}
void auxTestReadWriteAsyncSingleClient(BookieServer bs) throws IOException {
@@ -168,9 +168,9 @@ public class BookieFailureTest extends B
entries.add(entry.array());
entriesSize.add(entry.array().length);
lh.asyncAddEntry(entry.array(), this, sync);
-
+
}
-
+
LOG.info("Wrote " + numEntriesToWrite + " and now going to fail bookie.");
// Bookie fail
bs.shutdown();
@@ -269,7 +269,7 @@ public class BookieFailureTest extends B
super.setUp();
rng = new Random(System.currentTimeMillis()); // Initialize the Random
- // Number Generator
+ // Number Generator
entries = new ArrayList<byte[]>(); // initialize the entries list
entriesSize = new ArrayList<Integer>();
sync = new SyncObj(); // initialize the synchronization data structure
@@ -300,4 +300,4 @@ public class BookieFailureTest extends B
}
}
-}
\ No newline at end of file
+}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieReadWriteTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieReadWriteTest.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieReadWriteTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieReadWriteTest.java Mon Sep 5 17:38:57 2011
@@ -1,7 +1,7 @@
package org.apache.bookkeeper.test;
/*
- *
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -9,16 +9,16 @@ package org.apache.bookkeeper.test;
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
+ *
*/
import java.io.File;
@@ -57,11 +57,11 @@ import org.junit.Test;
* This test tests read and write, synchronous and asynchronous, strings and
* integers for a BookKeeper client. The test deployment uses a ZooKeeper server
* and three BookKeepers.
- *
+ *
*/
-public class BookieReadWriteTest extends BaseTestCase
-implements AddCallback, ReadCallback, ReadLastConfirmedCallback {
+public class BookieReadWriteTest extends BaseTestCase
+ implements AddCallback, ReadCallback, ReadLastConfirmedCallback {
// Depending on the taste, select the amount of logging
// by decommenting one of the two lines below
@@ -79,10 +79,10 @@ implements AddCallback, ReadCallback, Re
Random rng; // Random Number Generator
ArrayList<byte[]> entries; // generated entries
ArrayList<Integer> entriesSize;
-
+
DigestType digestType;
-
- public BookieReadWriteTest(DigestType digestType){
+
+ public BookieReadWriteTest(DigestType digestType) {
super(3);
this.digestType = digestType;
}
@@ -114,7 +114,7 @@ implements AddCallback, ReadCallback, Re
/**
* test the streaming api for reading and writing
- *
+ *
* @throws {@link IOException}, {@link KeeperException}
*/
@Test
@@ -259,7 +259,7 @@ implements AddCallback, ReadCallback, Re
ledgerId = lh.getId();
LOG.info("Ledger ID: " + lh.getId());
byte bytes[] = {'a','b','c','d','e','f','g','h','i'};
-
+
lh.asyncAddEntry(bytes, 0, bytes.length, this, sync);
lh.asyncAddEntry(bytes, 0, 4, this, sync); // abcd
lh.asyncAddEntry(bytes, 3, 4, this, sync); // defg
@@ -275,37 +275,37 @@ implements AddCallback, ReadCallback, Re
}
try {
- lh.asyncAddEntry(bytes, -1, bytes.length, this, sync);
+ lh.asyncAddEntry(bytes, -1, bytes.length, this, sync);
fail("Shouldn't be able to use negative offset");
} catch (ArrayIndexOutOfBoundsException aiob) {
// expected
}
try {
- lh.asyncAddEntry(bytes, 0, bytes.length+1, this, sync);
+ lh.asyncAddEntry(bytes, 0, bytes.length+1, this, sync);
fail("Shouldn't be able to use that much length");
} catch (ArrayIndexOutOfBoundsException aiob) {
// expected
}
try {
- lh.asyncAddEntry(bytes, -1, bytes.length+2, this, sync);
+ lh.asyncAddEntry(bytes, -1, bytes.length+2, this, sync);
fail("Shouldn't be able to use negative offset "
+ "with that much length");
} catch (ArrayIndexOutOfBoundsException aiob) {
// expected
}
try {
- lh.asyncAddEntry(bytes, 4, -3, this, sync);
+ lh.asyncAddEntry(bytes, 4, -3, this, sync);
fail("Shouldn't be able to use negative length");
} catch (ArrayIndexOutOfBoundsException aiob) {
// expected
}
try {
- lh.asyncAddEntry(bytes, -4, -3, this, sync);
+ lh.asyncAddEntry(bytes, -4, -3, this, sync);
fail("Shouldn't be able to use negative offset & length");
} catch (ArrayIndexOutOfBoundsException aiob) {
// expected
}
-
+
LOG.debug("*** WRITE COMPLETE ***");
// close ledger
@@ -316,7 +316,7 @@ implements AddCallback, ReadCallback, Re
// open ledger
lh = bkc.openLedger(ledgerId, digestType, ledgerPassword);
LOG.debug("Number of entries written: " + (lh.getLastAddConfirmed() + 1));
- assertTrue("Verifying number of entries written",
+ assertTrue("Verifying number of entries written",
lh.getLastAddConfirmed() == (numEntries - 1));
// read entries
@@ -336,26 +336,26 @@ implements AddCallback, ReadCallback, Re
while (ls.hasMoreElements()) {
byte[] expected = null;
byte[] entry = ls.nextElement().getEntry();
-
+
switch (i) {
- case 0:
+ case 0:
expected = Arrays.copyOfRange(bytes, 0, bytes.length);
break;
- case 1:
+ case 1:
expected = Arrays.copyOfRange(bytes, 0, 4);
break;
- case 2:
+ case 2:
expected = Arrays.copyOfRange(bytes, 3, 3+4);
break;
- case 3:
+ case 3:
expected = Arrays.copyOfRange(bytes, 3, 3+(bytes.length-3));
break;
}
assertNotNull("There are more checks than writes", expected);
-
+
String message = "Checking entry " + i + " for equality ["
- + new String(entry, "UTF-8") + ","
- + new String(expected, "UTF-8") + "]";
+ + new String(entry, "UTF-8") + ","
+ + new String(expected, "UTF-8") + "]";
assertTrue(message, Arrays.equals(entry, expected));
i++;
@@ -377,47 +377,47 @@ implements AddCallback, ReadCallback, Re
class ThrottleTestCallback implements ReadCallback {
int throttle;
-
- ThrottleTestCallback(int threshold){
+
+ ThrottleTestCallback(int threshold) {
this.throttle = threshold;
}
-
- public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx){
- if(rc != BKException.Code.OK){
+
+ public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
+ if(rc != BKException.Code.OK) {
fail("Return code is not OK: " + rc);
}
-
+
ls = seq;
- synchronized(sync){
+ synchronized(sync) {
sync.counter += throttle;
sync.notify();
}
LOG.info("Current counter: " + sync.counter);
}
}
-
+
/**
* Method for obtaining the available permits of a ledger handle
* using reflection to avoid adding a new public method to the
* class.
- *
+ *
* @param lh
* @return
*/
@SuppressWarnings("unchecked")
int getAvailablePermits(LedgerHandle lh) throws
- NoSuchFieldException, IllegalAccessException
- {
- Field field = LedgerHandle.class.getDeclaredField("opCounterSem");
- field.setAccessible(true);
- return ((Semaphore)field.get(lh)).availablePermits();
+ NoSuchFieldException, IllegalAccessException
+ {
+ Field field = LedgerHandle.class.getDeclaredField("opCounterSem");
+ field.setAccessible(true);
+ return ((Semaphore)field.get(lh)).availablePermits();
}
-
+
@Test
- public void testReadWriteAsyncSingleClientThrottle() throws
- IOException, NoSuchFieldException, IllegalAccessException {
+ public void testReadWriteAsyncSingleClientThrottle() throws
+ IOException, NoSuchFieldException, IllegalAccessException {
try {
-
+
Integer throttle = 100;
ThrottleTestCallback tcb = new ThrottleTestCallback(throttle);
// Create a ledger
@@ -426,8 +426,8 @@ implements AddCallback, ReadCallback, Re
// bkc.initMessageDigest("SHA1");
ledgerId = lh.getId();
LOG.info("Ledger ID: " + lh.getId());
-
- numEntriesToWrite = 8000;
+
+ numEntriesToWrite = 8000;
for (int i = 0; i < (numEntriesToWrite - 2000); i++) {
ByteBuffer entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
@@ -442,7 +442,7 @@ implements AddCallback, ReadCallback, Re
int testValue = getAvailablePermits(lh);
assertTrue("Difference is incorrect : " + i + ", " + sync.counter + ", " + testValue, testValue <= throttle);
}
-
+
for (int i = 0; i < 2000; i++) {
ByteBuffer entry = ByteBuffer.allocate(4);
@@ -452,14 +452,14 @@ implements AddCallback, ReadCallback, Re
entries.add(entry.array());
entriesSize.add(entry.array().length);
lh.asyncAddEntry(entry.array(), this, sync);
-
+
/*
* Check that the difference is no larger than the throttling threshold
*/
int testValue = getAvailablePermits(lh);
assertTrue("Difference is incorrect : " + i + ", " + sync.counter + ", " + testValue, testValue <= throttle);
}
-
+
// wait for all entries to be acknowledged
synchronized (sync) {
while (sync.counter < numEntriesToWrite) {
@@ -473,7 +473,7 @@ implements AddCallback, ReadCallback, Re
lh.close();
// *** WRITING PART COMPLETE // READ PART BEGINS ***
-
+
// open ledger
lh = bkc.openLedger(ledgerId, digestType, ledgerPassword);
LOG.debug("Number of entries written: " + (lh.getLastAddConfirmed() + 1));
@@ -486,7 +486,7 @@ implements AddCallback, ReadCallback, Re
int testValue = getAvailablePermits(lh);
assertTrue("Difference is incorrect : " + i + ", " + sync.counter + ", " + testValue, testValue <= throttle);
}
-
+
synchronized (sync) {
while (sync.counter < numEntriesToWrite) {
LOG.info("Entries counter = " + sync.counter);
@@ -508,7 +508,7 @@ implements AddCallback, ReadCallback, Re
fail("Test failed due to interruption");
}
}
-
+
@Test
public void testSyncReadAsyncWriteStringsSingleClient() throws IOException {
LOG.info("TEST READ WRITE STRINGS MIXED SINGLE CLIENT");
@@ -703,9 +703,9 @@ implements AddCallback, ReadCallback, Re
LOG.debug("Number of entries written: " + lh.getLastAddConfirmed() + ", " + lh2.getLastAddConfirmed());
assertTrue("Verifying number of entries written lh (" + lh.getLastAddConfirmed() + ")", lh
- .getLastAddConfirmed() == (numEntriesToWrite - 1));
+ .getLastAddConfirmed() == (numEntriesToWrite - 1));
assertTrue("Verifying number of entries written lh2 (" + lh2.getLastAddConfirmed() + ")", lh2
- .getLastAddConfirmed() == (numEntriesToWrite - 1));
+ .getLastAddConfirmed() == (numEntriesToWrite - 1));
ls = lh.readEntries(0, numEntriesToWrite - 1);
int i = 0;
@@ -764,7 +764,7 @@ implements AddCallback, ReadCallback, Re
}
long length = numEntriesToWrite * 4;
assertTrue("Ledger length before closing: " + lh.getLength(), lh.getLength() == length);
-
+
LOG.debug("*** WRITE COMPLETE ***");
// close ledger
lh.close();
@@ -788,7 +788,7 @@ implements AddCallback, ReadCallback, Re
fail("Test failed due to interruption");
}
}
-
+
@Test
public void testShutdown() throws IOException {
try {
@@ -796,13 +796,13 @@ implements AddCallback, ReadCallback, Re
Long throttle = (((Double) Math.max(1.0, ((double) 10000/numLedgers))).longValue());
System.setProperty("throttle", throttle.toString());
LedgerHandle[] lhArray = new LedgerHandle[numLedgers];
- for(int i = 0; i < numLedgers; i++){
+ for(int i = 0; i < numLedgers; i++) {
lhArray[i] = bkc.createLedger(3, 2, BookKeeper.DigestType.CRC32, new byte[] {'a', 'b'});
LOG.debug("Ledger handle: " + lhArray[i].getId());
}
LOG.info("Done creating ledgers.");
Random r = new Random();
-
+
for (int i = 0; i < numEntriesToWrite; i++) {
ByteBuffer entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
@@ -810,7 +810,7 @@ implements AddCallback, ReadCallback, Re
entries.add(entry.array());
entriesSize.add(entry.array().length);
-
+
int nextLh = r.nextInt(numLedgers);
lhArray[nextLh].asyncAddEntry(entry.array(), this, sync);
}
@@ -822,10 +822,10 @@ implements AddCallback, ReadCallback, Re
sync.wait();
}
}
-
+
LOG.debug("*** WRITE COMPLETE ***");
// close ledger
- for(int i = 0; i < lhArray.length; i++){
+ for(int i = 0; i < lhArray.length; i++) {
lhArray[i].close();
}
} catch (KeeperException e) {
@@ -839,7 +839,7 @@ implements AddCallback, ReadCallback, Re
fail("Test failed due to interruption");
}
}
-
+
public void testReadFromOpenLedger() throws IOException {
try {
// Create a ledger
@@ -855,7 +855,7 @@ implements AddCallback, ReadCallback, Re
entries.add(entry.array());
entriesSize.add(entry.array().length);
lh.addEntry(entry.array());
- if(i == numEntriesToWrite/2){
+ if(i == numEntriesToWrite/2) {
LedgerHandle lhOpen = bkc.openLedgerNoRecovery(ledgerId, digestType, ledgerPassword);
Enumeration<LedgerEntry> readEntry = lh.readEntries(i, i);
assertTrue("Enumeration of ledger entries has no element", readEntry.hasMoreElements() == true);
@@ -864,7 +864,7 @@ implements AddCallback, ReadCallback, Re
long last = lh.readLastConfirmed();
assertTrue("Last confirmed add: " + last, last == (numEntriesToWrite - 2));
-
+
LOG.debug("*** WRITE COMPLETE ***");
// close ledger
lh.close();
@@ -885,10 +885,10 @@ implements AddCallback, ReadCallback, Re
lh.addEntry(entry.array());
}
-
+
SyncObj sync = new SyncObj();
lh.asyncReadLastConfirmed(this, sync);
-
+
// Wait for for last confirmed
synchronized (sync) {
while (sync.lastConfirmed == -1) {
@@ -896,13 +896,13 @@ implements AddCallback, ReadCallback, Re
sync.wait();
}
}
-
+
assertTrue("Last confirmed add: " + sync.lastConfirmed, sync.lastConfirmed == (numEntriesToWrite - 2));
-
+
LOG.debug("*** WRITE COMPLETE ***");
// close ledger
lh.close();
-
+
} catch (KeeperException e) {
LOG.error("Test failed", e);
@@ -915,8 +915,8 @@ implements AddCallback, ReadCallback, Re
fail("Test failed due to interruption");
}
}
-
-
+
+
@Test
public void testLastConfirmedAdd() throws IOException {
try {
@@ -937,7 +937,7 @@ implements AddCallback, ReadCallback, Re
long last = lh.readLastConfirmed();
assertTrue("Last confirmed add: " + last, last == (numEntriesToWrite - 2));
-
+
LOG.debug("*** WRITE COMPLETE ***");
// close ledger
lh.close();
@@ -958,10 +958,10 @@ implements AddCallback, ReadCallback, Re
lh.addEntry(entry.array());
}
-
+
SyncObj sync = new SyncObj();
lh.asyncReadLastConfirmed(this, sync);
-
+
// Wait for for last confirmed
synchronized (sync) {
while (sync.lastConfirmed == -1) {
@@ -969,13 +969,13 @@ implements AddCallback, ReadCallback, Re
sync.wait();
}
}
-
+
assertTrue("Last confirmed add: " + sync.lastConfirmed, sync.lastConfirmed == (numEntriesToWrite - 2));
-
+
LOG.debug("*** WRITE COMPLETE ***");
// close ledger
lh.close();
-
+
} catch (KeeperException e) {
LOG.error("Test failed", e);
@@ -988,13 +988,13 @@ implements AddCallback, ReadCallback, Re
fail("Test failed due to interruption");
}
}
-
-
+
+
public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
if(rc != BKException.Code.OK) fail("Return code is not OK: " + rc);
-
+
SyncObj x = (SyncObj) ctx;
-
+
synchronized (x) {
x.counter++;
x.notify();
@@ -1003,7 +1003,7 @@ implements AddCallback, ReadCallback, Re
public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
if(rc != BKException.Code.OK) fail("Return code is not OK: " + rc);
-
+
ls = seq;
synchronized (sync) {
@@ -1014,22 +1014,22 @@ implements AddCallback, ReadCallback, Re
public void readLastConfirmedComplete(int rc, long lastConfirmed, Object ctx) {
SyncObj sync = (SyncObj) ctx;
-
- synchronized(sync){
+
+ synchronized(sync) {
sync.lastConfirmed = lastConfirmed;
sync.notify();
}
}
-
+
@Before
- public void setUp() throws Exception{
+ public void setUp() throws Exception {
super.setUp();
rng = new Random(System.currentTimeMillis()); // Initialize the Random
- // Number Generator
+ // Number Generator
entries = new ArrayList<byte[]>(); // initialize the entries list
entriesSize = new ArrayList<Integer>();
sync = new SyncObj(); // initialize the synchronization data structure
-
+
}
/* Clean up a directory recursively */
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieRecoveryTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieRecoveryTest.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieRecoveryTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieRecoveryTest.java Mon Sep 5 17:38:57 2011
@@ -1,7 +1,7 @@
package org.apache.bookkeeper.test;
/*
- *
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -9,16 +9,16 @@ package org.apache.bookkeeper.test;
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
+ *
*/
import java.io.File;
@@ -109,7 +109,7 @@ public class BookieRecoveryTest extends
/**
* Helper method to create a number of ledgers
- *
+ *
* @param numLedgers
* Number of ledgers to create
* @return List of LedgerHandles for each of the ledgers created
@@ -119,7 +119,7 @@ public class BookieRecoveryTest extends
* @throws InterruptedException
*/
private List<LedgerHandle> createLedgers(int numLedgers) throws BKException, KeeperException, IOException,
- InterruptedException {
+ InterruptedException {
List<LedgerHandle> lhs = new ArrayList<LedgerHandle>();
for (int i = 0; i < numLedgers; i++) {
lhs.add(bkc.createLedger(digestType, System.getProperty("passwd").getBytes()));
@@ -129,7 +129,7 @@ public class BookieRecoveryTest extends
/**
* Helper method to write dummy ledger entries to all of the ledgers passed.
- *
+ *
* @param numEntries
* Number of ledger entries to write for each ledger
* @param startEntryId
@@ -140,7 +140,7 @@ public class BookieRecoveryTest extends
* @throws InterruptedException
*/
private void writeEntriestoLedgers(int numEntries, long startEntryId, List<LedgerHandle> lhs) throws BKException,
- InterruptedException {
+ InterruptedException {
for (LedgerHandle lh : lhs) {
for (int i = 0; i < numEntries; i++) {
lh.addEntry(("LedgerId: " + lh.getId() + ", EntryId: " + (startEntryId + i)).getBytes());
@@ -151,33 +151,33 @@ public class BookieRecoveryTest extends
/**
* Helper method to startup a new bookie server with the indicated port
* number
- *
+ *
* @param port
* Port to start the new bookie server on
* @throws IOException
*/
private void startNewBookie(int port)
- throws IOException, InterruptedException, KeeperException {
+ throws IOException, InterruptedException, KeeperException {
File f = File.createTempFile("bookie", "test");
tmpDirs.add(f);
f.delete();
f.mkdir();
-
+
BookieServer server = new BookieServer(port, HOSTPORT, f, new File[] { f });
server.start();
bs.add(server);
-
- while(bkc.getZkHandle().exists("/ledgers/available/" + InetAddress.getLocalHost().getHostAddress() + ":" + port, false) == null){
+
+ while(bkc.getZkHandle().exists("/ledgers/available/" + InetAddress.getLocalHost().getHostAddress() + ":" + port, false) == null) {
Thread.sleep(500);
}
-
+
bkc.readBookiesBlocking();
LOG.info("New bookie on port " + port + " has been created.");
}
-
+
/**
* Helper method to verify that we can read the recovered ledger entries.
- *
+ *
* @param numLedgers
* Number of ledgers to verify
* @param startEntryId
@@ -188,7 +188,7 @@ public class BookieRecoveryTest extends
* @throws InterruptedException
*/
private void verifyRecoveredLedgers(int numLedgers, long startEntryId, long endEntryId) throws BKException,
- InterruptedException {
+ InterruptedException {
// Get a set of LedgerHandles for all of the ledgers to verify
List<LedgerHandle> lhs = new ArrayList<LedgerHandle>();
for (int i = 0; i < numLedgers; i++) {
@@ -201,7 +201,7 @@ public class BookieRecoveryTest extends
while (entries.hasMoreElements()) {
LedgerEntry entry = entries.nextElement();
assertTrue(new String(entry.getEntry()).equals("LedgerId: " + entry.getLedgerId() + ", EntryId: "
- + entry.getEntryId()));
+ + entry.getEntryId()));
}
}
@@ -213,7 +213,7 @@ public class BookieRecoveryTest extends
* replace it, and then recovering the ledger entries from the killed bookie
* onto the new one. We'll verify that the entries stored on the killed
* bookie are properly copied over and restored onto the new one.
- *
+ *
* @throws Exception
*/
@Test
@@ -243,7 +243,7 @@ public class BookieRecoveryTest extends
InetSocketAddress bookieSrc = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), initialPort);
InetSocketAddress bookieDest = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), newBookiePort);
LOG.info("Now recover the data on the killed bookie (" + bookieSrc + ") and replicate it to the new one ("
- + bookieDest + ")");
+ + bookieDest + ")");
// Initiate the sync object
sync.value = false;
bkTools.asyncRecoverBookieData(bookieSrc, bookieDest, bookieRecoverCb, sync);
@@ -266,7 +266,7 @@ public class BookieRecoveryTest extends
* onto random available bookie servers. We'll verify that the entries
* stored on the killed bookie are properly copied over and restored onto
* the other bookies.
- *
+ *
* @throws Exception
*/
@Test
@@ -283,7 +283,7 @@ public class BookieRecoveryTest extends
LOG.info("Finished writing all ledger entries so shutdown one of the bookies.");
bs.get(0).shutdown();
bs.remove(0);
-
+
// Startup three new bookie servers
for (int i = 0; i < 3; i++) {
int newBookiePort = initialPort + numBookies + i;
@@ -298,7 +298,7 @@ public class BookieRecoveryTest extends
InetSocketAddress bookieSrc = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), initialPort);
InetSocketAddress bookieDest = null;
LOG.info("Now recover the data on the killed bookie (" + bookieSrc
- + ") and replicate it to a random available one");
+ + ") and replicate it to a random available one");
// Initiate the sync object
sync.value = false;
bkTools.asyncRecoverBookieData(bookieSrc, bookieDest, bookieRecoverCb, sync);
@@ -320,7 +320,7 @@ public class BookieRecoveryTest extends
* replace it, and then recovering the ledger entries from the killed bookie
* onto the new one. We'll verify that the entries stored on the killed
* bookie are properly copied over and restored onto the new one.
- *
+ *
* @throws Exception
*/
@Test
@@ -337,7 +337,7 @@ public class BookieRecoveryTest extends
LOG.info("Finished writing all ledger entries so shutdown one of the bookies.");
bs.get(0).shutdown();
bs.remove(0);
-
+
// Startup a new bookie server
int newBookiePort = initialPort + numBookies;
startNewBookie(newBookiePort);
@@ -350,7 +350,7 @@ public class BookieRecoveryTest extends
InetSocketAddress bookieSrc = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), initialPort);
InetSocketAddress bookieDest = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), newBookiePort);
LOG.info("Now recover the data on the killed bookie (" + bookieSrc + ") and replicate it to the new one ("
- + bookieDest + ")");
+ + bookieDest + ")");
bkTools.recoverBookieData(bookieSrc, bookieDest);
// Verify the recovered ledger entries are okay.
@@ -364,7 +364,7 @@ public class BookieRecoveryTest extends
* onto random available bookie servers. We'll verify that the entries
* stored on the killed bookie are properly copied over and restored onto
* the other bookies.
- *
+ *
* @throws Exception
*/
@Test
@@ -396,7 +396,7 @@ public class BookieRecoveryTest extends
InetSocketAddress bookieSrc = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), initialPort);
InetSocketAddress bookieDest = null;
LOG.info("Now recover the data on the killed bookie (" + bookieSrc
- + ") and replicate it to a random available one");
+ + ") and replicate it to a random available one");
bkTools.recoverBookieData(bookieSrc, bookieDest);
// Verify the recovered ledger entries are okay.
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieZKExpireTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieZKExpireTest.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieZKExpireTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieZKExpireTest.java Mon Sep 5 17:38:57 2011
@@ -46,7 +46,7 @@ public class BookieZKExpireTest extends
File f = File.createTempFile("bookieserver", "test");
f.delete();
f.mkdir();
-
+
HashSet<Thread> threadset = new HashSet<Thread>();
int threadCount = Thread.activeCount();
Thread threads[] = new Thread[threadCount*2];
@@ -56,10 +56,10 @@ public class BookieZKExpireTest extends
threadset.add(threads[i]);
}
}
-
+
server = new BookieServer(initialPort + 1, HOSTPORT, f, new File[] { f });
server.start();
-
+
Thread.sleep(10);
Thread sendthread = null;
threadCount = Thread.activeCount();
@@ -67,17 +67,17 @@ public class BookieZKExpireTest extends
threadCount = Thread.enumerate(threads);
for(int i = 0; i < threadCount; i++) {
if (threads[i].getName().indexOf("SendThread") != -1
- && !threadset.contains(threads[i])) {
+ && !threadset.contains(threads[i])) {
sendthread = threads[i];
break;
}
}
assertNotNull("Send thread not found", sendthread);
-
+
sendthread.suspend();
Thread.sleep(2*10000);
sendthread.resume();
-
+
// allow watcher thread to run
Thread.sleep(3000);
assertFalse("Bookie should have shutdown on losing zk session", server.isBookieRunning());
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/CloseTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/CloseTest.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/CloseTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/CloseTest.java Mon Sep 5 17:38:57 2011
@@ -1,7 +1,7 @@
package org.apache.bookkeeper.test;
/*
- *
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -9,16 +9,16 @@ package org.apache.bookkeeper.test;
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
+ *
*/
import org.junit.*;
@@ -29,10 +29,10 @@ import org.apache.log4j.Logger;
/**
* This unit test tests closing ledgers sequentially. It creates 4 ledgers, then
* write 1000 entries to each ledger and close it.
- *
+ *
*/
-public class CloseTest extends BaseTestCase{
+public class CloseTest extends BaseTestCase {
static Logger LOG = Logger.getLogger(CloseTest.class);
DigestType digestType;
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConcurrentLedgerTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConcurrentLedgerTest.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConcurrentLedgerTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConcurrentLedgerTest.java Mon Sep 5 17:38:57 2011
@@ -48,7 +48,7 @@ public class ConcurrentLedgerTest extend
File txnDir, ledgerDir;
int recvTimeout = 10000;
Semaphore throttle;
-
+
@Override
@Before
public void setUp() throws IOException {
@@ -67,10 +67,10 @@ public class ConcurrentLedgerTest extend
tmpFile = File.createTempFile("book", ".ledger", ledgerDir);
ledgerDir = new File(tmpFile.getParent(), tmpFile.getName()+".dir");
ledgerDir.mkdirs();
-
+
bookie = new Bookie(5000, null, txnDir, new File[] {ledgerDir});
}
-
+
static void recursiveDelete(File f) {
if (f.isFile()) {
f.delete();
@@ -81,7 +81,7 @@ public class ConcurrentLedgerTest extend
f.delete();
}
}
-
+
@Override
@After
public void tearDown() {
@@ -153,7 +153,7 @@ public class ConcurrentLedgerTest extend
WriteCallback cb = new WriteCallback() {
@Override
public void writeComplete(int rc, long ledgerId, long entryId,
- InetSocketAddress addr, Object ctx) {
+ InetSocketAddress addr, Object ctx) {
AtomicInteger counter = (AtomicInteger)ctx;
counter.getAndIncrement();
throttle.release();
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerCacheTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerCacheTest.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerCacheTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerCacheTest.java Mon Sep 5 17:38:57 2011
@@ -47,17 +47,17 @@ import junit.framework.TestCase;
*/
public class LedgerCacheTest extends TestCase {
static Logger LOG = Logger.getLogger(LedgerCacheTest.class);
-
+
Bookie bookie;
File txnDir, ledgerDir;
-
+
class TestWriteCallback implements WriteCallback {
- public void writeComplete(int rc, long ledgerId, long entryId, InetSocketAddress addr, Object ctx){
+ public void writeComplete(int rc, long ledgerId, long entryId, InetSocketAddress addr, Object ctx) {
LOG.info("Added entry: " + rc + ", " + ledgerId + ", " + entryId + ", " + addr);
}
}
-
-
+
+
@Override
@Before
public void setUp() throws IOException {
@@ -76,12 +76,12 @@ public class LedgerCacheTest extends Tes
tmpFile = File.createTempFile("book", ".ledger", ledgerDir);
ledgerDir = new File(tmpFile.getParent(), tmpFile.getName()+".dir");
ledgerDir.mkdirs();
-
-
- bookie = new Bookie(5000, null, txnDir, new File[] {ledgerDir});
+
+
+ bookie = new Bookie(5000, null, txnDir, new File[] {ledgerDir});
}
-
-
+
+
@Override
@After
public void tearDown() {
@@ -93,10 +93,10 @@ public class LedgerCacheTest extends Tes
LOG.error("Error tearing down", e);
}
}
-
+
/**
* Recursively deletes a directory. This is a duplication of BookieClientTest.
- *
+ *
* @param dir
*/
private static void recursiveDelete(File dir) {
@@ -108,16 +108,16 @@ public class LedgerCacheTest extends Tes
}
dir.delete();
}
-
+
@Test
- public void testAddEntryException()
- throws GeneralSecurityException, BookieException {
+ public void testAddEntryException()
+ throws GeneralSecurityException, BookieException {
/*
* Populate ledger cache
*/
- try{
+ try {
byte[] masterKey = "blah".getBytes();
- for( int i = 0; i < 30000; i++){
+ for( int i = 0; i < 30000; i++) {
MacDigestManager dm = new MacDigestManager(i, masterKey);
byte[] data = "0123456789".getBytes();
ByteBuffer entry = dm.computeDigestAndPackageForSending(0, 0, 10, data, 0, data.length).toByteBuffer();
@@ -128,5 +128,5 @@ public class LedgerCacheTest extends Tes
fail("Failed to add entry.");
}
}
-
-}
\ No newline at end of file
+
+}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java Mon Sep 5 17:38:57 2011
@@ -1,7 +1,7 @@
package org.apache.bookkeeper.test;
/*
- *
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -9,16 +9,16 @@ package org.apache.bookkeeper.test;
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
+ *
*/
import java.io.File;
@@ -85,7 +85,7 @@ public class LedgerDeleteTest extends Ba
* It will then delete all of the ledgers from the client and let the
* server's EntryLogger garbage collector thread delete the initial entry
* log file.
- *
+ *
* @throws Exception
*/
@Test
@@ -104,7 +104,7 @@ public class LedgerDeleteTest extends Ba
for (File ledgerDirectory : tmpDirs) {
for (File f : ledgerDirectory.listFiles()) {
assertFalse("Found the entry log file (0.log) that should have been deleted in ledgerDirectory: "
- + ledgerDirectory, f.isFile() && f.getName().equals("0.log"));
+ + ledgerDirectory, f.isFile() && f.getName().equals("0.log"));
}
}
}
@@ -114,7 +114,7 @@ public class LedgerDeleteTest extends Ba
* restart the Bookie Servers after it has written out the ledger entries.
* On restart, there will be existing entry logs and ledger index files for
* the EntryLogger and LedgerCache to read and store into memory.
- *
+ *
* @throws Exception
*/
@Test
@@ -155,7 +155,7 @@ public class LedgerDeleteTest extends Ba
for (File ledgerDirectory : tmpDirs) {
for (File f : ledgerDirectory.listFiles()) {
assertFalse("Found the entry log file ([0,1].log) that should have been deleted in ledgerDirectory: "
- + ledgerDirectory, f.isFile() && (f.getName().equals("0.log") || f.getName().equals("1.log")));
+ + ledgerDirectory, f.isFile() && (f.getName().equals("0.log") || f.getName().equals("1.log")));
}
}
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerRecoveryTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerRecoveryTest.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerRecoveryTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerRecoveryTest.java Mon Sep 5 17:38:57 2011
@@ -1,7 +1,7 @@
package org.apache.bookkeeper.test;
/*
- *
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -9,16 +9,16 @@ package org.apache.bookkeeper.test;
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
+ *
*/
import org.junit.*;
@@ -28,7 +28,7 @@ import org.apache.log4j.Logger;
/**
* This unit test tests ledger recovery.
- *
+ *
*/
public class LedgerRecoveryTest extends BaseTestCase {
@@ -54,7 +54,7 @@ public class LedgerRecoveryTest extends
}
long length = (long) (numEntries * tmp.length());
-
+
/*
* Try to open ledger.
*/
@@ -64,24 +64,24 @@ public class LedgerRecoveryTest extends
* Check if has recovered properly.
*/
assertTrue("Has not recovered correctly: " + afterlh.getLastAddConfirmed(),
- afterlh.getLastAddConfirmed() == numEntries - 1);
- assertTrue("Has not set the length correctly: " + afterlh.getLength() + ", " + length,
- afterlh.getLength() == length);
+ afterlh.getLastAddConfirmed() == numEntries - 1);
+ assertTrue("Has not set the length correctly: " + afterlh.getLength() + ", " + length,
+ afterlh.getLength() == length);
}
-
+
@Test
public void testLedgerRecovery() throws Exception {
testInternal(100);
-
+
}
@Test
- public void testEmptyLedgerRecoveryOne() throws Exception{
+ public void testEmptyLedgerRecoveryOne() throws Exception {
testInternal(1);
}
@Test
- public void testEmptyLedgerRecovery() throws Exception{
+ public void testEmptyLedgerRecovery() throws Exception {
testInternal(0);
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java Mon Sep 5 17:38:57 2011
@@ -1,7 +1,7 @@
package org.apache.bookkeeper.test;
/*
- *
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -9,16 +9,16 @@ package org.apache.bookkeeper.test;
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
+ *
*/
import java.net.InetSocketAddress;
@@ -37,9 +37,9 @@ import org.jboss.netty.channel.socket.ni
/**
* This class tests BookieClient. It just sends the a new entry to itself.
- *
- *
- *
+ *
+ *
+ *
*/
class LoopbackClient implements WriteCallback {
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/NIOServerFactoryTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/NIOServerFactoryTest.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/NIOServerFactoryTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/NIOServerFactoryTest.java Mon Sep 5 17:38:57 2011
@@ -1,7 +1,7 @@
package org.apache.bookkeeper.test;
/*
- *
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -9,16 +9,16 @@ package org.apache.bookkeeper.test;
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
+ *
*/
import java.net.Socket;
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/MessageHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/MessageHandler.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/MessageHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/MessageHandler.java Mon Sep 5 17:38:57 2011
@@ -24,13 +24,13 @@ import org.apache.hedwig.util.Callback;
/**
* Interface to define the client handler logic to consume messages it is
* subscribed to.
- *
+ *
*/
public interface MessageHandler {
/**
* Consumes a message it is subscribed to and has been delivered to it.
- *
+ *
* @param topic
* The topic name where the message came from.
* @param subscriberId
@@ -45,4 +45,4 @@ public interface MessageHandler {
*/
public void consume(ByteString topic, ByteString subscriberId, Message msg, Callback<Void> callback, Object context);
-}
\ No newline at end of file
+}
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Publisher.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Publisher.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Publisher.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Publisher.java Mon Sep 5 17:38:57 2011
@@ -25,13 +25,13 @@ import org.apache.hedwig.util.Callback;
/**
* Interface to define the client Publisher API.
- *
+ *
*/
public interface Publisher {
/**
* Publishes a message on the given topic.
- *
+ *
* @param topic
* Topic name to publish on
* @param msg
@@ -45,7 +45,7 @@ public interface Publisher {
/**
* Publishes a message asynchronously on the given topic.
- *
+ *
* @param topic
* Topic name to publish on
* @param msg
@@ -60,4 +60,4 @@ public interface Publisher {
*/
public void asyncPublish(ByteString topic, Message msg, Callback<Void> callback, Object context);
-}
\ No newline at end of file
+}
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java Mon Sep 5 17:38:57 2011
@@ -31,13 +31,13 @@ import org.apache.hedwig.util.Callback;
/**
* Interface to define the client Subscriber API.
- *
+ *
*/
public interface Subscriber {
/**
* Subscribe to the given topic for the inputted subscriberId.
- *
+ *
* @param topic
* Topic name of the subscription
* @param subscriberId
@@ -58,12 +58,12 @@ public interface Subscriber {
*/
public void subscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode)
throws CouldNotConnectException, ClientAlreadySubscribedException, ServiceDownException,
- InvalidSubscriberIdException;
+ InvalidSubscriberIdException;
/**
* Subscribe to the given topic asynchronously for the inputted subscriberId
* disregarding if the topic has been created yet or not.
- *
+ *
* @param topic
* Topic name of the subscription
* @param subscriberId
@@ -80,12 +80,12 @@ public interface Subscriber {
* asynchronously.
*/
public void asyncSubscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode, Callback<Void> callback,
- Object context);
+ Object context);
/**
* Unsubscribe from a topic that the subscriberId user has previously
* subscribed to.
- *
+ *
* @param topic
* Topic name of the subscription
* @param subscriberId
@@ -102,12 +102,12 @@ public interface Subscriber {
* e.g. local vs. hub subscriber
*/
public void unsubscribe(ByteString topic, ByteString subscriberId) throws CouldNotConnectException,
- ClientNotSubscribedException, ServiceDownException, InvalidSubscriberIdException;
+ ClientNotSubscribedException, ServiceDownException, InvalidSubscriberIdException;
/**
* Unsubscribe from a topic asynchronously that the subscriberId user has
* previously subscribed to.
- *
+ *
* @param topic
* Topic name of the subscription
* @param subscriberId
@@ -124,7 +124,7 @@ public interface Subscriber {
/**
* Manually send a consume message to the server for the given inputs.
- *
+ *
* @param topic
* Topic name of the subscription
* @param subscriberId
@@ -132,7 +132,7 @@ public interface Subscriber {
* @param messageSeqId
* Message Sequence ID for the latest message that the client app
* has successfully consumed. All messages up to that point will
- * also be considered as consumed.
+ * also be considered as consumed.
* @throws ClientNotSubscribedException
* If the client is not currently subscribed to the topic based
* on the client's local state.
@@ -143,7 +143,7 @@ public interface Subscriber {
/**
* Checks if the subscriberId client is currently subscribed to the given
* topic.
- *
+ *
* @param topic
* Topic name of the subscription.
* @param subscriberId
@@ -156,12 +156,12 @@ public interface Subscriber {
* @return Boolean indicating if the client has a subscription or not.
*/
public boolean hasSubscription(ByteString topic, ByteString subscriberId) throws CouldNotConnectException,
- ServiceDownException;
+ ServiceDownException;
/**
* Fills the input List with the subscriptions this subscriberId client is
* subscribed to.
- *
+ *
* @param subscriberId
* ID of the subscriber
* @return List filled with subscription name (topic) strings.
@@ -171,12 +171,12 @@ public interface Subscriber {
* If there is an error retrieving the list of topics
*/
public List<ByteString> getSubscriptionList(ByteString subscriberId) throws CouldNotConnectException,
- ServiceDownException;
+ ServiceDownException;
/**
* Begin delivery of messages from the server to us for this topic and
* subscriberId.
- *
+ *
* @param topic
* Topic name of the subscription
* @param subscriberId
@@ -191,7 +191,7 @@ public interface Subscriber {
/**
* Stop delivery of messages for this topic and subscriberId.
- *
+ *
* @param topic
* Topic name of the subscription
* @param subscriberId
@@ -205,7 +205,7 @@ public interface Subscriber {
* Closes all of the client side cached data for this subscription without
* actually sending an unsubscribe request to the server. This will close
* the subscribe channel synchronously (if it exists) for the topic.
- *
+ *
* @param topic
* Topic name of the subscription
* @param subscriberId
@@ -220,7 +220,7 @@ public interface Subscriber {
* Closes all of the client side cached data for this subscription without
* actually sending an unsubscribe request to the server. This will close
* the subscribe channel asynchronously (if it exists) for the topic.
- *
+ *
* @param topic
* Topic name of the subscription
* @param subscriberId
@@ -232,6 +232,6 @@ public interface Subscriber {
* asynchronously.
*/
public void asyncCloseSubscription(ByteString topic, ByteString subscriberId, Callback<Void> callback,
- Object context);
+ Object context);
-}
\ No newline at end of file
+}
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkPublisher.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkPublisher.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkPublisher.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkPublisher.java Mon Sep 5 17:38:57 2011
@@ -35,7 +35,7 @@ public class BenchmarkPublisher extends
double rate;
public BenchmarkPublisher(int numTopics, int numMessages, int numRegions, int startTopicLabel, int partitionIndex,
- int numPartitions, Publisher publisher, Subscriber subscriber, int msgSize, int nParallel, int rate) {
+ int numPartitions, Publisher publisher, Subscriber subscriber, int msgSize, int nParallel, int rate) {
super(numTopics, numMessages, numRegions, startTopicLabel, partitionIndex, numPartitions);
this.publisher = publisher;
this.msgSize = msgSize;
@@ -53,7 +53,7 @@ public class BenchmarkPublisher extends
subscriber.startDelivery(topic, subId, new MessageHandler() {
@Override
public void consume(ByteString topic, ByteString subscriberId, Message msg, Callback<Void> callback,
- Object context) {
+ Object context) {
// noop
callback.operationFinished(context, null);
}
@@ -88,7 +88,7 @@ public class BenchmarkPublisher extends
// Single warmup for every topic
int myPublishCount = 0;
for (int i = 0; i < numTopics; i++) {
- if (!HedwigBenchmark.amIResponsibleForTopic(startTopicLabel + i, partitionIndex, numPartitions)){
+ if (!HedwigBenchmark.amIResponsibleForTopic(startTopicLabel + i, partitionIndex, numPartitions)) {
continue;
}
ByteString topic = ByteString.copyFromUtf8(HedwigBenchmark.TOPIC_PREFIX + (startTopicLabel + i));
@@ -123,8 +123,8 @@ public class BenchmarkPublisher extends
}
System.out.println("Finished unacked pubs: tput = " + BenchmarkUtils.calcTp(myPublishLimit, startTime)
- + " ops/s");
- // Wait till the benchmark test has completed
+ + " ops/s");
+ // Wait till the benchmark test has completed
agg.tpAgg.queue.take();
System.out.println(agg.summarize(startTime));
return null;
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkSubscriber.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkSubscriber.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkSubscriber.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkSubscriber.java Mon Sep 5 17:38:57 2011
@@ -34,17 +34,17 @@ import org.apache.hedwig.protocol.PubSub
import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
import org.apache.hedwig.util.Callback;
-public class BenchmarkSubscriber extends BenchmarkWorker implements Callable<Void>{
+public class BenchmarkSubscriber extends BenchmarkWorker implements Callable<Void> {
static final Logger logger = Logger.getLogger(BenchmarkSubscriber.class);
Subscriber subscriber;
ByteString subId;
-
+
public BenchmarkSubscriber(int numTopics, int numMessages, int numRegions,
- int startTopicLabel, int partitionIndex, int numPartitions, Subscriber subscriber, ByteString subId) {
+ int startTopicLabel, int partitionIndex, int numPartitions, Subscriber subscriber, ByteString subId) {
super(numTopics, numMessages, numRegions, startTopicLabel, partitionIndex, numPartitions);
this.subscriber = subscriber;
- this.subId = subId;
+ this.subId = subId;
}
public void warmup(int numWarmup) throws InterruptedException {
@@ -73,10 +73,10 @@ public class BenchmarkSubscriber extends
@Override
public void consume(ByteString thisTopic, ByteString subscriberId, Message msg,
- Callback<Void> callback, Object context) {
+ Callback<Void> callback, Object context) {
if (logger.isDebugEnabled())
logger.debug("Got message from src-region: " + msg.getSrcRegion() + " with seq-id: "
- + msg.getMsgId());
+ + msg.getMsgId());
String mapKey = topic + msg.getSrcRegion().toStringUtf8();
Long lastSeqIdSeen = lastSeqIdSeenMap.get(mapKey);
@@ -86,7 +86,7 @@ public class BenchmarkSubscriber extends
if (getSrcSeqId(msg) <= lastSeqIdSeen) {
logger.info("Redelivery of message, src-region: " + msg.getSrcRegion() + "seq-id: "
- + msg.getMsgId());
+ + msg.getMsgId());
} else {
agg.ding(false);
}
@@ -97,7 +97,7 @@ public class BenchmarkSubscriber extends
}
System.out.println("Finished subscribing to topics and now waiting for messages to come in...");
// Wait till the benchmark test has completed
- agg.queue.take();
+ agg.queue.take();
System.out.println(agg.summarize(agg.earliest.get()));
return null;
}
@@ -121,11 +121,11 @@ public class BenchmarkSubscriber extends
ThroughputLatencyAggregator agg = new ThroughputLatencyAggregator(label, count / numPartitions, npar);
int end = start + count;
for (int i = start; i < end; ++i) {
- if (!HedwigBenchmark.amIResponsibleForTopic(i, partitionIndex, numPartitions)){
+ if (!HedwigBenchmark.amIResponsibleForTopic(i, partitionIndex, numPartitions)) {
continue;
}
subscriber.asyncSubscribe(ByteString.copyFromUtf8(topicPrefix + i), subId, CreateOrAttach.CREATE_OR_ATTACH,
- new BenchmarkCallback(agg), null);
+ new BenchmarkCallback(agg), null);
}
// Wait till the benchmark test has completed
agg.tpAgg.queue.take();
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkUtils.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkUtils.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkUtils.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkUtils.java Mon Sep 5 17:38:57 2011
@@ -86,7 +86,7 @@ public class BenchmarkUtils {
public String summarize(long startTime) {
double percentile = Double.parseDouble(System.getProperty("percentile", "99.9"));
return tpAgg.summarize(startTime) + ", avg latency = " + sum.get() / tpAgg.count + ", " + percentile
- + "%ile latency = " + getPercentile(percentile);
+ + "%ile latency = " + getPercentile(percentile);
}
}
@@ -140,7 +140,7 @@ public class BenchmarkUtils {
public String summarize(long startTime) {
return "Finished " + label + ": count = " + done.get() + ", tput = " + calcTp(count, startTime)
- + " ops/s, numFailed = " + numFailed;
+ + " ops/s, numFailed = " + numFailed;
}
}
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkWorker.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkWorker.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkWorker.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkWorker.java Mon Sep 5 17:38:57 2011
@@ -26,7 +26,7 @@ public class BenchmarkWorker {
int numPartitions;
public BenchmarkWorker(int numTopics, int numMessages, int numRegions,
- int startTopicLabel, int partitionIndex, int numPartitions) {
+ int startTopicLabel, int partitionIndex, int numPartitions) {
this.numTopics = numTopics;
this.numMessages = numMessages;
this.numRegions = numRegions;
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/HedwigBenchmark.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/HedwigBenchmark.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/HedwigBenchmark.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/HedwigBenchmark.java Mon Sep 5 17:38:57 2011
@@ -98,7 +98,7 @@ public class HedwigBenchmark implements
startTopicLabel, partitionIndex, numPartitions, publisher, subscriber, msgSize, nParallel, rate);
benchmarkPub.warmup(nWarmups);
benchmarkPub.call();
-
+
} else {
throw new Exception("unknown mode: " + mode);
}
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/conf/ClientConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/conf/ClientConfiguration.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/conf/ClientConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/conf/ClientConfiguration.java Mon Sep 5 17:38:57 2011
@@ -79,7 +79,7 @@ public class ClientConfiguration extends
// automatically send the consume message to the server based on the
// configured amount of messages consumed by the client app. The client app
// could choose to override this behavior and instead, manually send the
- // consume message to the server via the client library using its own
+ // consume message to the server via the client library using its own
// logic and policy.
public boolean isAutoSendConsumeMessageEnabled() {
return conf.getBoolean(AUTO_SEND_CONSUME_MESSAGE_ENABLED, true);
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/data/MessageConsumeData.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/data/MessageConsumeData.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/data/MessageConsumeData.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/data/MessageConsumeData.java Mon Sep 5 17:38:57 2011
@@ -26,7 +26,7 @@ import org.apache.hedwig.protocol.PubSub
* server for a given TopicSubscriber. This will be used as the Context in the
* VoidCallback for the MessageHandlers once they've completed consuming the
* message.
- *
+ *
*/
public class MessageConsumeData {
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/data/PubSubData.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/data/PubSubData.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/data/PubSubData.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/data/PubSubData.java Mon Sep 5 17:38:57 2011
@@ -32,7 +32,7 @@ import org.apache.hedwig.util.Callback;
* request in case of a server redirect. This will be used for all sync/async
* calls, and for all the known types of request messages to send to the server
* hubs: Publish, Subscribe, Unsubscribe, and Consume.
- *
+ *
*/
public class PubSubData {
// Static string constants
@@ -84,8 +84,8 @@ public class PubSubData {
// Constructor for all types of PubSub request data to send to the server
public PubSubData(final ByteString topic, final Message msg, final ByteString subscriberId,
- final OperationType operationType, final CreateOrAttach createOrAttach, final Callback<Void> callback,
- final Object context) {
+ final OperationType operationType, final CreateOrAttach createOrAttach, final Callback<Void> callback,
+ final Object context) {
this.topic = topic;
this.msg = msg;
this.subscriberId = subscriberId;
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/data/TopicSubscriber.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/data/TopicSubscriber.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/data/TopicSubscriber.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/data/TopicSubscriber.java Mon Sep 5 17:38:57 2011
@@ -25,7 +25,7 @@ import com.google.protobuf.ByteString;
* Wrapper class object for the Topic + SubscriberId combination. Since the
* Subscribe flows always use the Topic + SubscriberId as the logical entity,
* we'll create a simple class to encapsulate that.
- *
+ *
*/
public class TopicSubscriber {
private final ByteString topic;
@@ -62,13 +62,13 @@ public class TopicSubscriber {
sb.append(PubSubData.COMMA).append("SubscriberId: " + subscriberId.toStringUtf8());
return sb.toString();
}
-
+
public ByteString getTopic() {
return topic;
}
-
+
public ByteString getSubscriberId() {
return subscriberId;
}
-}
\ No newline at end of file
+}
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/ServerRedirectLoopException.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/ServerRedirectLoopException.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/ServerRedirectLoopException.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/ServerRedirectLoopException.java Mon Sep 5 17:38:57 2011
@@ -19,9 +19,9 @@ package org.apache.hedwig.client.excepti
/**
* This is a Hedwig client side exception when the PubSubRequest is being
- * redirected to a server where the request has already been sent to previously.
+ * redirected to a server where the request has already been sent to previously.
* To avoid having a cyclical redirect loop, this condition is checked for
- * and this exception will be thrown to the client caller.
+ * and this exception will be thrown to the client caller.
*/
public class ServerRedirectLoopException extends Exception {
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/TooManyServerRedirectsException.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/TooManyServerRedirectsException.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/TooManyServerRedirectsException.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/TooManyServerRedirectsException.java Mon Sep 5 17:38:57 2011
@@ -22,7 +22,7 @@ package org.apache.hedwig.client.excepti
* redirects during a publish/subscribe call. We only allow a certain number of
* server redirects to find the topic master. If we have exceeded this
* configured amount, the publish/subscribe will fail with this exception.
- *
+ *
*/
public class TooManyServerRedirectsException extends Exception {
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/MessageConsumeCallback.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/MessageConsumeCallback.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/MessageConsumeCallback.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/MessageConsumeCallback.java Mon Sep 5 17:38:57 2011
@@ -35,7 +35,7 @@ import org.apache.hedwig.util.Callback;
* can use a singleton for the class. The object context used should be the
* MessageConsumeData type. That will contain all of the information needed to
* call the message consume logic in the client lib ResponseHandler.
- *
+ *
*/
public class MessageConsumeCallback implements Callback<Void> {
@@ -61,7 +61,7 @@ public class MessageConsumeCallback impl
// Try to consume the message again
Channel topicSubscriberChannel = client.getSubscriber().getChannelForTopic(topicSubscriber);
HedwigClient.getResponseHandlerFromChannel(topicSubscriberChannel).getSubscribeResponseHandler()
- .asyncMessageConsume(messageConsumeData.msg);
+ .asyncMessageConsume(messageConsumeData.msg);
}
}
@@ -72,7 +72,7 @@ public class MessageConsumeCallback impl
// to the ResponseHandler indicating that the message is consumed.
Channel topicSubscriberChannel = client.getSubscriber().getChannelForTopic(topicSubscriber);
HedwigClient.getResponseHandlerFromChannel(topicSubscriberChannel).getSubscribeResponseHandler()
- .messageConsumed(messageConsumeData.msg);
+ .messageConsumed(messageConsumeData.msg);
}
public void operationFailed(Object ctx, PubSubException exception) {
@@ -89,7 +89,7 @@ public class MessageConsumeCallback impl
// perhaps what the last amount of time we slept was. We could stick
// some of this meta-data into the MessageConsumeData when we retry.
client.getClientTimer().schedule(new MessageConsumeRetryTask(messageConsumeData, topicSubscriber),
- client.getConfiguration().getMessageConsumeRetryWaitTime());
+ client.getConfiguration().getMessageConsumeRetryWaitTime());
}
}
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/PubSubCallback.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/PubSubCallback.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/PubSubCallback.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/PubSubCallback.java Mon Sep 5 17:38:57 2011
@@ -27,7 +27,7 @@ import org.apache.hedwig.util.Callback;
* This class is used when we are doing synchronous type of operations. All
* underlying client ops in Hedwig are async so this is just a way to make the
* async calls synchronous.
- *
+ *
*/
public class PubSubCallback implements Callback<Void> {
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/PublishResponseHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/PublishResponseHandler.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/PublishResponseHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/PublishResponseHandler.java Mon Sep 5 17:38:57 2011
@@ -40,7 +40,7 @@ public class PublishResponseHandler {
public void handlePublishResponse(PubSubResponse response, PubSubData pubSubData, Channel channel) throws Exception {
if (logger.isDebugEnabled())
logger.debug("Handling a Publish response: " + response + ", pubSubData: " + pubSubData + ", host: "
- + HedwigClient.getHostFromChannel(channel));
+ + HedwigClient.getHostFromChannel(channel));
switch (response.getStatusCode()) {
case SUCCESS:
// Response was success so invoke the callback's operationFinished
@@ -51,7 +51,7 @@ public class PublishResponseHandler {
// Response was service down failure so just invoke the callback's
// operationFailed method.
pubSubData.callback.operationFailed(pubSubData.context, new ServiceDownException(
- "Server responded with a SERVICE_DOWN status"));
+ "Server responded with a SERVICE_DOWN status"));
break;
case NOT_RESPONSIBLE_FOR_TOPIC:
// Redirect response so we'll need to repost the original Publish
@@ -63,7 +63,7 @@ public class PublishResponseHandler {
// cases.
logger.error("Unexpected error response from server for PubSubResponse: " + response);
pubSubData.callback.operationFailed(pubSubData.context, new ServiceDownException(
- "Server responded with a status code of: " + response.getStatusCode()));
+ "Server responded with a status code of: " + response.getStatusCode()));
break;
}
}