You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/05/17 18:32:06 UTC
svn commit: r1339691 - in /zookeeper/bookkeeper/trunk: ./
hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/
hedwig-client/src/main/java/org/apache/hedwig/client/handlers/
hedwig-client/src/main/java/org/apache/hedwig/util/ hedwig-protocol...
Author: ivank
Date: Thu May 17 16:32:05 2012
New Revision: 1339691
URL: http://svn.apache.org/viewvc?rev=1339691&view=rev
Log:
BOOKKEEPER-72: Fix warnings issued by FindBugs (ivank)
Added:
zookeeper/bookkeeper/trunk/hedwig-server/src/main/resources/findbugsExclude.xml
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkPublisher.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkSubscriber.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkUtils.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/HedwigSocketAddress.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/PathUtils.java
zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/PubSubResponseUtils.java
zookeeper/bookkeeper/trunk/hedwig-server/pom.xml
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigCommands.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/ReadTopic.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/TerminateJVMExceptionHandler.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/ChannelEndPoint.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerBean.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/ServerStats.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/LocalDBPersistenceManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyConfiguration.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/TrueFilter.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/SafeAsyncCallback.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigHubTestBase.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigRegionTestBase.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/PubSubServerStandAloneTestBase.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestPubSubServerStartup.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/netty/TestPubSubServer.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Thu May 17 16:32:05 2012
@@ -14,6 +14,8 @@ Trunk (unreleased changes)
BOOKKEEPER-254: Bump zookeeper version in poms (ivank)
+ BOOKKEEPER-72: Fix warnings issued by FindBugs (ivank)
+
bookkeeper-server/
BOOKKEEPER-142: Parsing last log id is wrong, which may make entry log files overwritten (Sijie Gou via ivank)
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkPublisher.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkPublisher.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkPublisher.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkPublisher.java Thu May 17 16:32:05 2012
@@ -61,6 +61,8 @@ public class BenchmarkPublisher extends
// picking constants arbitarily for warmup phase
ThroughputLatencyAggregator agg = new ThroughputLatencyAggregator("acked pubs", nWarmup, 100);
+ agg.startProgress();
+
Message msg = getMsg(1024);
for (int i = 0; i < nWarmup; i++) {
publisher.asyncPublish(topic, msg, new BenchmarkCallback(agg), null);
@@ -100,6 +102,7 @@ public class BenchmarkPublisher extends
int myPublishLimit = numMessages / numRegions / numPartitions - myPublishCount;
myPublishCount = 0;
ThroughputLatencyAggregator agg = new ThroughputLatencyAggregator("acked pubs", myPublishLimit, nParallel);
+ agg.startProgress();
int topicLabel = 0;
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkSubscriber.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkSubscriber.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkSubscriber.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkSubscriber.java Thu May 17 16:32:05 2012
@@ -59,6 +59,8 @@ public class BenchmarkSubscriber extends
public Void call() throws Exception {
final ThroughputAggregator agg = new ThroughputAggregator("recvs", numMessages);
+ agg.startProgress();
+
final Map<String, Long> lastSeqIdSeenMap = new HashMap<String, Long>();
for (int i = startTopicLabel; i < startTopicLabel + numTopics; i++) {
@@ -120,6 +122,8 @@ public class BenchmarkSubscriber extends
throws InterruptedException {
long startTime = System.currentTimeMillis();
ThroughputLatencyAggregator agg = new ThroughputLatencyAggregator(label, count / numPartitions, npar);
+ agg.startProgress();
+
int end = start + count;
for (int i = start; i < end; ++i) {
if (!HedwigBenchmark.amIResponsibleForTopic(i, partitionIndex, numPartitions)) {
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkUtils.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkUtils.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkUtils.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkUtils.java Thu May 17 16:32:05 2012
@@ -60,6 +60,10 @@ public class BenchmarkUtils {
}
}
+ public void startProgress() {
+ tpAgg.startProgress();
+ }
+
public void reportLatency(long latency) {
sum.addAndGet(latency);
@@ -101,6 +105,7 @@ public class BenchmarkUtils {
final AtomicInteger done = new AtomicInteger();
final AtomicLong earliest = new AtomicLong();
final AtomicInteger numFailed = new AtomicInteger();
+ final Thread progressThread;
final LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
public ThroughputAggregator(final String label, final int count) {
@@ -109,7 +114,7 @@ public class BenchmarkUtils {
if (count == 0)
queue.add(0);
if (Boolean.getBoolean("progress")) {
- new Thread(new Runnable() {
+ progressThread = new Thread(new Runnable() {
@Override
public void run() {
try {
@@ -123,7 +128,15 @@ public class BenchmarkUtils {
throw new RuntimeException(ex);
}
}
- }).start();
+ });
+ } else {
+ progressThread = null;
+ }
+ }
+
+ public void startProgress() {
+ if (progressThread != null) {
+ progressThread.start();
}
}
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java Thu May 17 16:32:05 2012
@@ -78,7 +78,7 @@ public class SubscribeResponseHandler {
// Public getter to retrieve the original PubSubData used for the Subscribe
// request.
- public PubSubData getOrigSubData() {
+ synchronized public PubSubData getOrigSubData() {
return origSubData;
}
@@ -98,35 +98,37 @@ public class SubscribeResponseHandler {
+ HedwigClientImpl.getHostFromChannel(channel));
switch (response.getStatusCode()) {
case SUCCESS:
- // For successful Subscribe requests, store this Channel locally
- // and set it to not be readable initially.
- // This way we won't be delivering messages for this topic
- // subscription until the client explicitly says so.
- subscribeChannel = channel;
- subscribeChannel.setReadable(false);
- // Store the original PubSubData used to create this successful
- // Subscribe request.
- origSubData = pubSubData;
- // Store the mapping for the TopicSubscriber to the Channel.
- // This is so we can control the starting and stopping of
- // message deliveries from the server on that Channel. Store
- // this only on a successful ack response from the server.
- TopicSubscriber topicSubscriber = new TopicSubscriber(pubSubData.topic, pubSubData.subscriberId);
- responseHandler.getSubscriber().setChannelForTopic(topicSubscriber, channel);
- // Lazily create the Set (from a concurrent hashmap) to keep track
- // of outstanding Messages to be consumed by the client app. At this
- // stage, delivery for that topic hasn't started yet so creation of
- // this Set should be thread safe. We'll create the Set with an initial
- // capacity equal to the configured parameter for the maximum number of
- // outstanding messages to allow. The load factor will be set to
- // 1.0f which means we'll only rehash and allocate more space if
- // we ever exceed the initial capacity. That should be okay
- // because when that happens, things are slow already and piling
- // up on the client app side to consume messages.
-
- outstandingMsgSet = Collections.newSetFromMap(new ConcurrentHashMap<Message,Boolean>(
- responseHandler.getConfiguration().getMaximumOutstandingMessages(), 1.0f));
-
+ synchronized(this) {
+ // For successful Subscribe requests, store this Channel locally
+ // and set it to not be readable initially.
+ // This way we won't be delivering messages for this topic
+ // subscription until the client explicitly says so.
+ subscribeChannel = channel;
+ subscribeChannel.setReadable(false);
+ // Store the original PubSubData used to create this successful
+ // Subscribe request.
+ origSubData = pubSubData;
+
+ // Store the mapping for the TopicSubscriber to the Channel.
+ // This is so we can control the starting and stopping of
+ // message deliveries from the server on that Channel. Store
+ // this only on a successful ack response from the server.
+ TopicSubscriber topicSubscriber = new TopicSubscriber(pubSubData.topic, pubSubData.subscriberId);
+ responseHandler.getSubscriber().setChannelForTopic(topicSubscriber, channel);
+ // Lazily create the Set (from a concurrent hashmap) to keep track
+ // of outstanding Messages to be consumed by the client app. At this
+ // stage, delivery for that topic hasn't started yet so creation of
+ // this Set should be thread safe. We'll create the Set with an initial
+ // capacity equal to the configured parameter for the maximum number of
+ // outstanding messages to allow. The load factor will be set to
+ // 1.0f which means we'll only rehash and allocate more space if
+ // we ever exceed the initial capacity. That should be okay
+ // because when that happens, things are slow already and piling
+ // up on the client app side to consume messages.
+ outstandingMsgSet = Collections.newSetFromMap(
+ new ConcurrentHashMap<Message,Boolean>(
+ responseHandler.getConfiguration().getMaximumOutstandingMessages(), 1.0f));
+ }
// Response was success so invoke the callback's operationFinished
// method.
pubSubData.callback.operationFinished(pubSubData.context, null);
@@ -162,9 +164,11 @@ public class SubscribeResponseHandler {
// Main method to handle consuming a message for a topic that the client is
// subscribed to.
public void handleSubscribeMessage(PubSubResponse response) {
- if (logger.isDebugEnabled())
- logger.debug("Handling a Subscribe message in response: " + response + ", topic: "
- + origSubData.topic.toStringUtf8() + ", subscriberId: " + origSubData.subscriberId.toStringUtf8());
+ if (logger.isDebugEnabled()) {
+ logger.debug("Handling a Subscribe message in response: {}, topic: {}, subscriberId: {}",
+ new Object[] { response, getOrigSubData().topic.toStringUtf8(),
+ getOrigSubData().subscriberId.toStringUtf8() });
+ }
Message message = response.getMessage();
synchronized (this) {
@@ -300,9 +304,11 @@ public class SubscribeResponseHandler {
* MessageHandler to register for this ResponseHandler instance.
*/
public void setMessageHandler(MessageHandler messageHandler) {
- if (logger.isDebugEnabled())
- logger.debug("Setting the messageHandler for topic: " + origSubData.topic.toStringUtf8()
- + ", subscriberId: " + origSubData.subscriberId.toStringUtf8());
+ if (logger.isDebugEnabled()) {
+ logger.debug("Setting the messageHandler for topic: {}, subscriberId: {}",
+ getOrigSubData().topic.toStringUtf8(),
+ getOrigSubData().subscriberId.toStringUtf8());
+ }
synchronized (this) {
this.messageHandler = messageHandler;
// Once the MessageHandler is registered, see if we have any queued up
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/HedwigSocketAddress.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/HedwigSocketAddress.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/HedwigSocketAddress.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/HedwigSocketAddress.java Thu May 17 16:32:05 2012
@@ -124,6 +124,11 @@ public class HedwigSocketAddress {
return (this.hostname.equals(that.hostname) && (this.port == that.port) && (this.sslPort == that.sslPort));
}
+ @Override
+ public int hashCode() {
+ return (this.hostname + this.port + this.sslPort).hashCode();
+ }
+
// Static helper method to return the string representation for an
// InetSocketAddress. The HedwigClient can only operate in SSL or non-SSL
// mode. So the server hosts it connects to will just be an
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/PathUtils.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/PathUtils.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/PathUtils.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/util/PathUtils.java Thu May 17 16:32:05 2012
@@ -26,12 +26,12 @@ public class PathUtils {
/** Generate all prefixes for a path. "/a/b/c" -> ["/a","/a/b","/a/b/c"] */
public static List<String> prefixes(String path) {
List<String> prefixes = new ArrayList<String>();
- String prefix = "";
+ StringBuilder prefix = new StringBuilder();
for (String comp : path.split("/+")) {
// Skip the first (empty) path component.
if (!comp.equals("")) {
- prefix += "/" + comp;
- prefixes.add(prefix);
+ prefix.append("/").append(comp);
+ prefixes.add(prefix.toString());
}
}
return prefixes;
Modified: zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/PubSubResponseUtils.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/PubSubResponseUtils.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/PubSubResponseUtils.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/PubSubResponseUtils.java Thu May 17 16:32:05 2012
@@ -27,7 +27,7 @@ public class PubSubResponseUtils {
/**
* Change here if bumping up the version number that the server sends back
*/
- protected static ProtocolVersion serverVersion = ProtocolVersion.VERSION_ONE;
+ protected final static ProtocolVersion serverVersion = ProtocolVersion.VERSION_ONE;
static PubSubResponse.Builder getBasicBuilder(StatusCode status) {
return PubSubResponse.newBuilder().setProtocolVersion(serverVersion).setStatusCode(status);
Modified: zookeeper/bookkeeper/trunk/hedwig-server/pom.xml
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/pom.xml?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/pom.xml (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/pom.xml Thu May 17 16:32:05 2012
@@ -137,31 +137,13 @@
</descriptors>
</configuration>
</plugin>
-<!--
<plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <version>1.2.1</version>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <artifactSet>
- <excludes>
- <exclude>classworlds:classworlds</exclude>
- <exclude>junit:junit</exclude>
- <exclude>jmock:jmock</exclude>
- <exclude>xml-apis:xml-apis</exclude>
- </excludes>
- </artifactSet>
- </configuration>
- </execution>
- </executions>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ <configuration>
+ <excludeFilterFile>${basedir}/src/main/resources/findbugsExclude.xml</excludeFilterFile>
+ </configuration>
</plugin>
- -->
<plugin>
<artifactId>maven-antrun-plugin</artifactId>
<executions>
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java Thu May 17 16:32:05 2012
@@ -18,6 +18,7 @@
package org.apache.hedwig.admin;
+import java.util.Arrays;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
@@ -65,7 +66,7 @@ public class HedwigAdmin {
protected ClientConfiguration bkClientConf;
// Empty watcher
- private class MyWatcher implements Watcher {
+ private static class MyWatcher implements Watcher {
public void process(WatchedEvent event) {
}
}
@@ -140,7 +141,7 @@ public class HedwigAdmin {
* @return bookeeper passwd
*/
public byte[] getBkPasswd() {
- return passwd;
+ return Arrays.copyOf(passwd, passwd.length);
}
/**
@@ -187,8 +188,10 @@ public class HedwigAdmin {
if (data != null) {
load = Integer.parseInt(new String(data));
}
- } catch (Exception e) {
- // igore
+ } catch (KeeperException ke) {
+ LOG.warn("Couldn't read hub data from ZooKeeper", ke);
+ } catch (InterruptedException ie) {
+ LOG.warn("Interrupted during read", ie);
}
hubs.put(new HedwigSocketAddress(host), load);
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigCommands.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigCommands.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigCommands.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigCommands.java Thu May 17 16:32:05 2012
@@ -336,8 +336,6 @@ public final class HedwigCommands {
public String getDescription() { return desc; }
- public String[] getUsage() { return usage; }
-
public Map<String, COMMAND> getSubCommands() { return subCmds; }
public void addSubCommand(COMMAND c) {
@@ -364,7 +362,10 @@ public final class HedwigCommands {
commands.put(c.getName(), c);
}
- static {
+ static synchronized void init() {
+ if (commands != null) {
+ return;
+ }
commands = new LinkedHashMap<String, COMMAND>();
addCommand(COMMAND.CMD_PUB);
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java Thu May 17 16:32:05 2012
@@ -34,7 +34,8 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.hedwig.admin.HedwigAdmin;
@@ -100,7 +101,7 @@ public class HedwigConsole {
boolean runCmd(String[] args) throws Exception;
}
- class HelpCmd implements MyCommand {
+ static class HelpCmd implements MyCommand {
@Override
public boolean runCmd(String[] args) throws Exception {
@@ -127,7 +128,7 @@ public class HedwigConsole {
printMessage("Quitting ...");
hubClient.close();
admin.close();
- System.exit(0);
+ Runtime.getRuntime().exit(0);
return true;
}
}
@@ -223,7 +224,7 @@ public class HedwigConsole {
}
- class ConsoleMessageHandler implements MessageHandler {
+ static class ConsoleMessageHandler implements MessageHandler {
@Override
public void deliver(ByteString topic, ByteString subscriberId,
@@ -444,7 +445,7 @@ public class HedwigConsole {
boolean subscribed = false;
boolean success = false;
- final AtomicBoolean isDone = new AtomicBoolean(false);
+ final CountDownLatch isDone = new CountDownLatch(1);
long elapsedTime = 0L;
System.out.println("Starting PUBSUB test ...");
@@ -471,10 +472,7 @@ public class HedwigConsole {
if (thisTopic.equals(topic) && subscriberId.equals(subId) &&
msg.getBody().equals(message.getBody())) {
System.out.println("Received message : " + message.getBody().toStringUtf8());
- synchronized(isDone) {
- isDone.set(true);
- isDone.notify();
- }
+ isDone.countDown();
}
callback.operationFinished(context, null);
}
@@ -482,10 +480,7 @@ public class HedwigConsole {
});
// wait for the message
- synchronized (isDone) {
- isDone.wait(timeoutSecs * 1000L);
- }
- success = isDone.get();
+ success = isDone.await(timeoutSecs, TimeUnit.SECONDS);
elapsedTime = System.currentTimeMillis() - startTime;
} finally {
try {
@@ -783,6 +778,7 @@ public class HedwigConsole {
* @throws InterruptedException
*/
public HedwigConsole(String[] args) throws IOException, InterruptedException {
+ HedwigCommands.init();
cl.parseOptions(args);
if (cl.getCommand() == null) {
@@ -970,10 +966,26 @@ public class HedwigConsole {
historyEnabled = true;
System.out.println("JLine history support is enabled");
- } catch (Exception e) {
+ } catch (ClassNotFoundException e) {
+ System.out.println("JLine history support is disabled");
+ LOG.debug("JLine history disabled with exception", e);
historyEnabled = false;
- e.printStackTrace();
+ } catch (NoSuchMethodException e) {
+ System.out.println("JLine history support is disabled");
+ LOG.debug("JLine history disabled with exception", e);
+ historyEnabled = false;
+ } catch (InvocationTargetException e) {
System.out.println("JLine history support is disabled");
+ LOG.debug("JLine history disabled with exception", e);
+ historyEnabled = false;
+ } catch (IllegalAccessException e) {
+ System.out.println("JLine history support is disabled");
+ LOG.debug("JLine history disabled with exception", e);
+ historyEnabled = false;
+ } catch (InstantiationException e) {
+ System.out.println("JLine history support is disabled");
+ LOG.debug("JLine history disabled with exception", e);
+ historyEnabled = false;
}
String line;
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/ReadTopic.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/ReadTopic.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/ReadTopic.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/ReadTopic.java Thu May 17 16:32:05 2012
@@ -72,7 +72,6 @@ public class ReadTopic {
static final int NUM_MESSAGES_TO_PRINT = 15;
SortedMap<Long, InMemoryLedgerRange> ledgers = new TreeMap<Long, InMemoryLedgerRange>();
- SubscriptionState leastSubscriber = null;
static class InMemoryLedgerRange {
LedgerRange range;
@@ -152,7 +151,6 @@ public class ReadTopic {
long localMsgId = state.getMsgId().getLocalComponent();
if (localMsgId < leastConsumedSeqId) {
leastConsumedSeqId = localMsgId;
- this.leastSubscriber = state;
}
}
if (leastConsumedSeqId == Long.MAX_VALUE) {
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/TerminateJVMExceptionHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/TerminateJVMExceptionHandler.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/TerminateJVMExceptionHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/TerminateJVMExceptionHandler.java Thu May 17 16:32:05 2012
@@ -26,7 +26,7 @@ public class TerminateJVMExceptionHandle
@Override
public void uncaughtException(Thread t, Throwable e) {
logger.error("Uncaught exception in thread " + t.getName(), e);
- System.exit(1);
+ Runtime.getRuntime().exit(1);
}
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/ChannelEndPoint.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/ChannelEndPoint.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/ChannelEndPoint.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/ChannelEndPoint.java Thu May 17 16:32:05 2012
@@ -78,4 +78,8 @@ public class ChannelEndPoint implements
}
}
+ @Override
+ public int hashCode() {
+ return channel.hashCode();
+ }
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java Thu May 17 16:32:05 2012
@@ -22,6 +22,8 @@ import org.apache.hedwig.protocol.PubSub
import org.apache.hedwig.server.subscriptions.MessageFilter;
public interface DeliveryManager {
+ public void start();
+
public void startServingSubscription(ByteString topic, ByteString subscriberId, MessageSeqId seqIdToStartFrom,
DeliveryEndPoint endPoint, MessageFilter filter, boolean isHubSubscriber);
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java Thu May 17 16:32:05 2012
@@ -89,15 +89,20 @@ public class FIFODeliveryManager impleme
// Boolean indicating if this thread should continue running. This is used
// when we want to stop the thread during a PubSubServer shutdown.
protected boolean keepRunning = true;
+ private final Thread workerThread;
public FIFODeliveryManager(PersistenceManager persistenceMgr, ServerConfiguration cfg) {
this.persistenceMgr = persistenceMgr;
perTopicDeliveryPtrs = new HashMap<ByteString, SortedMap<Long, Set<ActiveSubscriberState>>>();
subscriberStates = new HashMap<TopicSubscriber, ActiveSubscriberState>();
- new Thread(this, "DeliveryManagerThread").start();
+ workerThread = new Thread(this, "DeliveryManagerThread");
this.cfg = cfg;
}
+ public void start() {
+ workerThread.start();
+ }
+
/**
* ===================================================================== Our
* usual enqueue function, stop if error because of unbounded queue, should
@@ -296,7 +301,7 @@ public class FIFODeliveryManager impleme
long localSeqIdDeliveringNow;
long lastSeqIdCommunicatedExternally;
// TODO make use of these variables
- MessageFilter filter;
+
boolean isHubSubscriber;
final static int SEQ_ID_SLACK = 10;
@@ -306,7 +311,7 @@ public class FIFODeliveryManager impleme
this.subscriberId = subscriberId;
this.lastLocalSeqIdDelivered = lastLocalSeqIdDelivered;
this.deliveryEndPoint = deliveryEndPoint;
- this.filter = filter;
+
this.isHubSubscriber = isHubSubscriber;
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerBean.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerBean.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerBean.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerBean.java Thu May 17 16:32:05 2012
@@ -30,7 +30,7 @@ public class NettyHandlerBean implements
public NettyHandlerBean(Map<OperationType, Handler> handlers) {
this.handlers = handlers;
- subHandler = (SubscribeHandler) handlers.get(OperationType.SUBSCRIBE);
+ subHandler = (SubscribeHandler) this.handlers.get(OperationType.SUBSCRIBE);
}
@Override
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java Thu May 17 16:32:05 2012
@@ -107,6 +107,7 @@ public class PubSubServer {
// JMX Beans
NettyHandlerBean jmxNettyBean;
PubSubServerBean jmxServerBean;
+ final ThreadGroup tg;
protected PersistenceManager instantiatePersistenceManager(TopicManager topicMgr) throws IOException,
InterruptedException {
@@ -311,8 +312,9 @@ public class PubSubServer {
* @throws InterruptedException
* @throws ConfigurationException
*/
- public PubSubServer(final ServerConfiguration conf, final Thread.UncaughtExceptionHandler exceptionHandler)
- throws Exception {
+ public PubSubServer(final ServerConfiguration conf,
+ final Thread.UncaughtExceptionHandler exceptionHandler)
+ throws ConfigurationException {
// First validate the conf
this.conf = conf;
@@ -320,7 +322,7 @@ public class PubSubServer {
// We need a custom thread group, so that we can override the uncaught
// exception method
- ThreadGroup tg = new ThreadGroup("hedwig") {
+ tg = new ThreadGroup("hedwig") {
@Override
public void uncaughtException(Thread t, Throwable e) {
exceptionHandler.uncaughtException(t, e);
@@ -330,7 +332,9 @@ public class PubSubServer {
// we do in ZK threads throws an exception, we want our handler to be
// called, not theirs.
SafeAsyncCallback.setUncaughtExceptionHandler(exceptionHandler);
+ }
+ public void start() throws Exception {
final SynchronousQueue<Either<Object, Exception>> queue = new SynchronousQueue<Either<Object, Exception>>();
new Thread(tg, new Runnable() {
@@ -349,6 +353,8 @@ public class PubSubServer {
tm = instantiateTopicManager();
pm = instantiatePersistenceManager(tm);
dm = new FIFODeliveryManager(pm, conf);
+ dm.start();
+
sm = instantiateSubscriptionManager(tm, pm);
rm = instantiateRegionManager(pm, scheduler);
sm.addListener(rm);
@@ -422,7 +428,7 @@ public class PubSubServer {
logger.info("Using configuration file " + confFile);
}
try {
- new PubSubServer(conf);
+ new PubSubServer(conf).start();
} catch (Throwable t) {
errorMsgAndExit("Error during startup", t, RC_OTHER);
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/ServerStats.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/ServerStats.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/ServerStats.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/ServerStats.java Thu May 17 16:32:05 2012
@@ -121,7 +121,7 @@ public class ServerStats {
++latencyBuckets[bucket];
}
- public OpStatData toOpStatData() {
+ synchronized public OpStatData toOpStatData() {
double avgLatency = numSuccessOps > 0 ? totalLatency / numSuccessOps : 0.0f;
StringBuilder sb = new StringBuilder();
for (int i=0; i<NUM_BUCKETS; i++) {
@@ -131,7 +131,8 @@ public class ServerStats {
}
}
- return new OpStatData(maxLatency, minLatency, avgLatency, numSuccessOps, numFailedOps, sb.toString());
+ return new OpStatData(maxLatency, minLatency, avgLatency,
+ numSuccessOps, numFailedOps, sb.toString());
}
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/LocalDBPersistenceManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/LocalDBPersistenceManager.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/LocalDBPersistenceManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/LocalDBPersistenceManager.java Thu May 17 16:32:05 2012
@@ -28,6 +28,10 @@ import java.sql.Statement;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.math.BigInteger;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
import javax.sql.rowset.serial.SerialBlob;
import org.slf4j.Logger;
@@ -74,6 +78,18 @@ public class LocalDBPersistenceManager i
}
}
};
+
+ private static final ThreadLocal<MessageDigest> threadLocalDigest = new ThreadLocal<MessageDigest>() {
+ @Override
+ protected MessageDigest initialValue() {
+ try {
+ return MessageDigest.getInstance("MD5");
+ } catch (NoSuchAlgorithmException e) {
+ logger.error("Could not find MD5 hash", e);
+ return null;
+ }
+ }
+ };
static final String ID_FIELD_NAME = "id";
static final String MSG_FIELD_NAME = "msg";
static final String driver = "org.apache.derby.jdbc.EmbeddedDriver";
@@ -246,14 +262,22 @@ public class LocalDBPersistenceManager i
* sneak in and create the table before us
*/
private void createTable(Connection conn, ByteString topic) {
-
+ Statement stmt = null;
try {
- Statement stmt = conn.createStatement();
+ stmt = conn.createStatement();
String tableName = getTableNameForTopic(topic);
stmt.execute("CREATE TABLE " + tableName + " (" + ID_FIELD_NAME + " BIGINT NOT NULL CONSTRAINT ID_PK_"
- + tableName + " PRIMARY KEY," + MSG_FIELD_NAME + " BLOB(2M) NOT NULL)");
+ + tableName + " PRIMARY KEY," + MSG_FIELD_NAME + " BLOB(2M) NOT NULL)");
} catch (SQLException e) {
logger.debug("Could not create table", e);
+ } finally {
+ try {
+ if (stmt != null) {
+ stmt.close();
+ }
+ } catch (SQLException e) {
+ logger.error("Error closing statement", e);
+ }
}
}
@@ -274,7 +298,11 @@ public class LocalDBPersistenceManager i
}
private String getTableNameForTopic(ByteString topic) {
- return (topic.toStringUtf8() + "_" + version);
+ String src = (topic.toStringUtf8() + "_" + version);
+ threadLocalDigest.get().reset();
+ byte[] digest = threadLocalDigest.get().digest(src.getBytes());
+ BigInteger bigInt = new BigInteger(1,digest);
+ return String.format("TABLE_%032X", bigInt);
}
private void scanMessagesInternal(ByteString topic, long startSeqId, int messageLimit, long sizeLimit,
@@ -290,7 +318,7 @@ public class LocalDBPersistenceManager i
long currentSeqId;
currentSeqId = startSeqId;
- PreparedStatement stmt;
+ PreparedStatement stmt = null;
try {
try {
stmt = conn.prepareStatement("SELECT * FROM " + getTableNameForTopic(topic) + " WHERE " + ID_FIELD_NAME
@@ -367,8 +395,15 @@ public class LocalDBPersistenceManager i
logger.error("Message stored in derby is not parseable", e);
callback.scanFailed(ctx, new ServiceDownException(e));
return;
+ } finally {
+ try {
+ if (stmt != null) {
+ stmt.close();
+ }
+ } catch (SQLException e) {
+ logger.error("Error closing statement", e);
+ }
}
-
}
public void deliveredUntil(ByteString topic, Long seqId) {
@@ -381,20 +416,27 @@ public class LocalDBPersistenceManager i
logger.error("Not connected to derby");
return;
}
- PreparedStatement stmt;
+ PreparedStatement stmt = null;
try {
stmt = conn.prepareStatement("DELETE FROM " + getTableNameForTopic(topic) + " WHERE " + ID_FIELD_NAME
+ " <= ?");
stmt.setLong(1, seqId);
int rowCount = stmt.executeUpdate();
logger.debug("Deleted " + rowCount + " records for topic: " + topic.toStringUtf8() + ", seqId: " + seqId);
- stmt.close();
} catch (SQLException sqle) {
String theError = (sqle).getSQLState();
if (theError.equals("42X05")) {
logger.warn("Table for topic (" + topic + ") does not exist so no consumed messages to delete!");
} else
logger.error("Error while executing derby delete for consumed messages", sqle);
+ } finally {
+ try {
+ if (stmt != null) {
+ stmt.close();
+ }
+ } catch (SQLException e) {
+ logger.error("Error closing statement", e);
+ }
}
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java Thu May 17 16:32:05 2012
@@ -458,7 +458,7 @@ public class ReadAheadCache implements P
}
protected static class HashSetCacheKeyFactory implements Factory<Set<CacheKey>> {
- protected static HashSetCacheKeyFactory instance = new HashSetCacheKeyFactory();
+ protected final static HashSetCacheKeyFactory instance = new HashSetCacheKeyFactory();
public Set<CacheKey> newInstance() {
return new HashSet<CacheKey>();
@@ -466,7 +466,7 @@ public class ReadAheadCache implements P
}
protected static class TreeSetLongFactory implements Factory<SortedSet<Long>> {
- protected static TreeSetLongFactory instance = new TreeSetLongFactory();
+ protected final static TreeSetLongFactory instance = new TreeSetLongFactory();
public SortedSet<Long> newInstance() {
return new TreeSet<Long>();
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java Thu May 17 16:32:05 2012
@@ -53,18 +53,24 @@ public class HedwigProxy {
Map<OperationType, Handler> handlers;
ProxyConfiguration cfg;
ChannelTracker tracker;
+ ThreadGroup tg;
- public HedwigProxy(final ProxyConfiguration cfg, final UncaughtExceptionHandler exceptionHandler)
- throws InterruptedException {
+ public HedwigProxy(final ProxyConfiguration cfg, final UncaughtExceptionHandler exceptionHandler) {
this.cfg = cfg;
- ThreadGroup tg = new ThreadGroup("hedwigproxy") {
+ tg = new ThreadGroup("hedwigproxy") {
@Override
public void uncaughtException(Thread t, Throwable e) {
exceptionHandler.uncaughtException(t, e);
}
};
+ }
+
+ public HedwigProxy(ProxyConfiguration conf) throws InterruptedException {
+ this(conf, new TerminateJVMExceptionHandler());
+ }
+ public void start() throws InterruptedException {
final LinkedBlockingQueue<Boolean> queue = new LinkedBlockingQueue<Boolean>();
new Thread(tg, new Runnable() {
@@ -84,10 +90,6 @@ public class HedwigProxy {
queue.take();
}
- public HedwigProxy(ProxyConfiguration conf) throws InterruptedException {
- this(conf, new TerminateJVMExceptionHandler());
- }
-
// used for testing
public ChannelTracker getChannelTracker() {
return tracker;
@@ -161,7 +163,7 @@ public class HedwigProxy {
logger.info("Using configuration file " + confFile);
}
try {
- new HedwigProxy(conf);
+ new HedwigProxy(conf).start();
} catch (Throwable t) {
PubSubServer.errorMsgAndExit("Error during startup", t, PubSubServer.RC_OTHER);
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyConfiguration.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxyConfiguration.java Thu May 17 16:32:05 2012
@@ -21,8 +21,8 @@ import org.apache.hedwig.client.conf.Cli
public class ProxyConfiguration extends ClientConfiguration {
- protected static String PROXY_PORT = "proxy_port";
- protected static String MAX_MESSAGE_SIZE = "max_message_size";
+ protected final static String PROXY_PORT = "proxy_port";
+ protected final static String MAX_MESSAGE_SIZE = "max_message_size";
public int getProxyPort() {
return conf.getInt(PROXY_PORT, 9099);
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/regions/RegionManager.java Thu May 17 16:32:05 2012
@@ -26,7 +26,7 @@ import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.CountDownLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -93,7 +93,7 @@ public class RegionManager implements Su
if (null == topics || topics.isEmpty()) {
continue;
}
- final AtomicBoolean done = new AtomicBoolean(false);
+ final CountDownLatch done = new CountDownLatch(1);
Callback<Void> postCb = new Callback<Void>() {
@Override
public void operationFinished(Object ctx,
@@ -106,10 +106,7 @@ public class RegionManager implements Su
finish();
}
void finish() {
- synchronized (done) {
- done.set(true);
- done.notifyAll();
- }
+ done.countDown();
}
};
Callback<Void> mcb = CallbackUtils.multiCallback(topics.size(), postCb, null);
@@ -122,14 +119,10 @@ public class RegionManager implements Su
}
retrySubscribe(client, topic, mcb);
}
- synchronized (done) {
- if (done.get()) {
- try {
- done.wait();
- } catch (InterruptedException e) {
- LOGGER.warn("Exception during retrying remote subscriptions : ", e);
- }
- }
+ try {
+ done.await();
+ } catch (InterruptedException e) {
+ LOGGER.warn("Exception during retrying remote subscriptions : ", e);
}
}
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java Thu May 17 16:32:05 2012
@@ -71,7 +71,7 @@ public abstract class AbstractSubscripti
Callback<Void> noopCallback = new NoopCallback<Void>();
- class NoopCallback<T> implements Callback<T> {
+ static class NoopCallback<T> implements Callback<T> {
@Override
public void operationFailed(Object ctx, PubSubException exception) {
logger.warn("Exception found in AbstractSubscriptionManager : ", exception);
@@ -110,6 +110,10 @@ public abstract class AbstractSubscripti
// so it should be safe to run this fairly often.
for (ByteString topic : top2sub2seq.keySet()) {
final Map<ByteString, InMemorySubscriptionState> topicSubscriptions = top2sub2seq.get(topic);
+ if (topicSubscriptions == null) {
+ continue;
+ }
+
long minConsumedMessage = Long.MAX_VALUE;
boolean hasBound = true;
// Loop through all subscribers to the current topic to find the
@@ -126,9 +130,9 @@ public abstract class AbstractSubscripti
// Don't call the PersistenceManager if nobody is subscribed to
// the topic yet, or the consume pointer has not changed since
// the last time, or if this is the initial subscription.
+ Long minConsumedFromMap = topic2MinConsumedMessagesMap.get(topic);
if (topicSubscriptions.isEmpty()
- || (topic2MinConsumedMessagesMap.containsKey(topic)
- && topic2MinConsumedMessagesMap.get(topic) == minConsumedMessage)
+ || (minConsumedFromMap != null && minConsumedFromMap.equals(minConsumedMessage))
|| minConsumedMessage == 0) {
topic2MinConsumedMessagesMap.put(topic, minConsumedMessage);
pm.consumedUntil(topic, minConsumedMessage);
@@ -404,7 +408,8 @@ public abstract class AbstractSubscripti
// so the following codes only happened when remote subscription failed.
// it is safe to decrement the local count so next subscribe op
// could have the chance to subscribe remote.
- topic2LocalCounts.get(topic).decrementAndGet();
+ AtomicInteger count = topic2LocalCounts.get(topic);
+ if (count != null) { count.decrementAndGet(); }
}
cb.operationFailed(ctx, exception);
}
@@ -422,8 +427,10 @@ public abstract class AbstractSubscripti
};
+ AtomicInteger count = topic2LocalCounts.get(topic);
if (!SubscriptionStateUtils.isHubSubscriber(subRequest.getSubscriberId())
- && topic2LocalCounts.get(topic).incrementAndGet() == 1)
+ && count != null
+ && count.incrementAndGet() == 1)
notifySubscribe(topic, subRequest.getSynchronous(), cb2, ctx);
else
cb2.operationFinished(ctx, resultOfOperation);
@@ -537,8 +544,9 @@ public abstract class AbstractSubscripti
public void operationFinished(Object ctx, Void resultOfOperation) {
topicSubscriptions.remove(subscriberId);
// Notify listeners if necessary.
+ AtomicInteger count = topic2LocalCounts.get(topic);
if (!SubscriptionStateUtils.isHubSubscriber(subscriberId)
- && topic2LocalCounts.get(topic).decrementAndGet() == 0)
+ && count != null && count.decrementAndGet() == 0)
notifyUnsubcribe(topic);
updateMessageBound(topic);
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/TrueFilter.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/TrueFilter.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/TrueFilter.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/TrueFilter.java Thu May 17 16:32:05 2012
@@ -20,7 +20,7 @@ package org.apache.hedwig.server.subscri
import org.apache.hedwig.protocol.PubSubProtocol.Message;
public class TrueFilter implements MessageFilter {
- protected static TrueFilter instance = new TrueFilter();
+ protected final static TrueFilter instance = new TrueFilter();
public static TrueFilter instance() {
return instance;
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java Thu May 17 16:32:05 2012
@@ -103,7 +103,7 @@ public class ZkTopicManager extends Abst
// Check for expired connection.
if (event.getState().equals(Watcher.Event.KeeperState.Expired)) {
logger.error("ZK client connection to the ZK server has expired!");
- System.exit(1);
+ Runtime.getRuntime().exit(1);
}
}
});
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/SafeAsyncCallback.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/SafeAsyncCallback.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/SafeAsyncCallback.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/SafeAsyncCallback.java Thu May 17 16:32:05 2012
@@ -22,7 +22,7 @@ import java.lang.Thread.UncaughtExceptio
import org.apache.hedwig.server.common.TerminateJVMExceptionHandler;
public class SafeAsyncCallback {
- protected static UncaughtExceptionHandler uncaughtExceptionHandler = new TerminateJVMExceptionHandler();
+ static UncaughtExceptionHandler uncaughtExceptionHandler = new TerminateJVMExceptionHandler();
public static void setUncaughtExceptionHandler(UncaughtExceptionHandler uncaughtExceptionHandler) {
SafeAsyncCallback.uncaughtExceptionHandler = uncaughtExceptionHandler;
Added: zookeeper/bookkeeper/trunk/hedwig-server/src/main/resources/findbugsExclude.xml
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/resources/findbugsExclude.xml?rev=1339691&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/resources/findbugsExclude.xml (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/resources/findbugsExclude.xml Thu May 17 16:32:05 2012
@@ -0,0 +1,25 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+//-->
+<FindBugsFilter>
+ <Match>
+ <Class name="org.apache.hedwig.server.persistence.LocalDBPersistenceManager" />
+ <Method name="createTable" />
+ <!-- We make is safe by hashing the input before using //-->
+ <Bug pattern="SQL_NONCONSTANT_STRING_PASSED_TO_EXECUTE" />
+ </Match>
+</FindBugsFilter>
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigHubTestBase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigHubTestBase.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigHubTestBase.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigHubTestBase.java Thu May 17 16:32:05 2012
@@ -118,7 +118,10 @@ public abstract class HedwigHubTestBase
// Now create the PubSubServer Hubs
serversList = new LinkedList<PubSubServer>();
for (int i = 0; i < numServers; i++) {
- serversList.add(new PubSubServer(getServerConfiguration(initialServerPort + i, initialSSLServerPort + i)));
+ PubSubServer s = new PubSubServer(
+ getServerConfiguration(initialServerPort + i, initialSSLServerPort + i));
+ serversList.add(s);
+ s.start();
}
}
protected void stopHubServers() throws Exception {
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigRegionTestBase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigRegionTestBase.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigRegionTestBase.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/HedwigRegionTestBase.java Thu May 17 16:32:05 2012
@@ -232,9 +232,13 @@ public abstract class HedwigRegionTestBa
// servers. We will basically increment through the port numbers
// starting from the initial ones defined.
for (int j = 0; j < numServersPerRegion; j++) {
- serversList.add(new PubSubServer(getServerConfiguration(initialServerPort
- + (j + i * numServersPerRegion), initialSSLServerPort + (j + i * numServersPerRegion),
- regionName)));
+ PubSubServer s = new PubSubServer(
+ getServerConfiguration(initialServerPort
+ + (j + i * numServersPerRegion),
+ initialSSLServerPort + (j + i * numServersPerRegion),
+ regionName));
+ serversList.add(s);
+ s.start();
}
// Store this list of servers created for the current region
regionServersMap.put(regionName, serversList);
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/PubSubServerStandAloneTestBase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/PubSubServerStandAloneTestBase.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/PubSubServerStandAloneTestBase.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/PubSubServerStandAloneTestBase.java Thu May 17 16:32:05 2012
@@ -48,6 +48,7 @@ public abstract class PubSubServerStandA
public void setUp() throws Exception {
logger.info("STARTING " + getName());
server = new PubSubServer(new StandAloneServerConfiguration());
+ server.start();
logger.info("Standalone PubSubServer test setup finished");
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestPubSubServerStartup.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestPubSubServerStartup.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestPubSubServerStartup.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestPubSubServerStartup.java Thu May 17 16:32:05 2012
@@ -96,6 +96,7 @@ public class TestPubSubServerStartup {
try {
logger.info("starting hedwig broker!");
hedwigServer = new PubSubServer(serverConf);
+ hedwigServer.start();
} catch (Exception e) {
e.printStackTrace();
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java Thu May 17 16:32:05 2012
@@ -64,6 +64,10 @@ public class StubDeliveryManager impleme
}
@Override
+ public void start() {
+ }
+
+ @Override
public void stop() {
// do nothing now
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java Thu May 17 16:32:05 2012
@@ -212,6 +212,7 @@ public class TestHedwigHub extends Hedwi
super.setUp();
if (mode == Mode.PROXY) {
proxy = new HedwigProxy(proxyConf);
+ proxy.start();
}
client = new HedwigClient(getClientConfiguration());
publisher = client.getPublisher();
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/netty/TestPubSubServer.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/netty/TestPubSubServer.java?rev=1339691&r1=1339690&r2=1339691&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/netty/TestPubSubServer.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/netty/TestPubSubServer.java Thu May 17 16:32:05 2012
@@ -55,6 +55,7 @@ public class TestPubSubServer extends Pu
return super.getServerPort() + 1;
}
});
+ server1.start();
server1.shutdown();
}
@@ -91,7 +92,7 @@ public class TestPubSubServer extends Pu
return instantiator.instantiateTopicManager();
}
};
-
+ server.start();
return server;
}