You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by si...@apache.org on 2013/03/25 06:27:24 UTC
svn commit: r1460523 [1/2] - in /zookeeper/bookkeeper/trunk: ./
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/
bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/
bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ bookke...
Author: sijie
Date: Mon Mar 25 05:27:23 2013
New Revision: 1460523
URL: http://svn.apache.org/r1460523
Log:
BOOKKEEPER-557: Compiler error showing up badly with jdk 7 (ivank via sijie)
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerLayoutTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestLedgerManager.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieZKExpireTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java
zookeeper/bookkeeper/trunk/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/HedwigMessagingSessionFacade.java
zookeeper/bookkeeper/trunk/hedwig-client-jms/src/test/java/org/apache/activemq/perf/ConnectionChurnTest.java
zookeeper/bookkeeper/trunk/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/CompositePublishTest.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkPublisher.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkSubscriber.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelHandler.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/JLineHedwigCompletor.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/FakeBookie.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/ZkMetadataManagerFactory.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/UmbrellaHandler.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxySubscribeHandler.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/ZkUtils.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestSubAfterCloseSub.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/netty/TestMultiplexing.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/filter/TestMessageFilter.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigRegion.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/MessageBoundedPersistenceTest.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestDeadlock.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestUpdateSubscriptionState.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestConcurrentTopicAcquisition.java
zookeeper/bookkeeper/trunk/pom.xml
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1460523&r1=1460522&r2=1460523&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Mon Mar 25 05:27:23 2013
@@ -50,6 +50,8 @@ Trunk (unreleased changes)
BOOKKEEPER-573: Script to start a bookkeeper cluster (ivank via sijie)
+ BOOKKEEPER-557: Compiler error showing up badly with jdk 7 (ivank via sijie)
+
Release 4.2.0 - 2013-01-14
Non-backward compatible changes:
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java?rev=1460523&r1=1460522&r2=1460523&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java Mon Mar 25 05:27:23 2013
@@ -120,7 +120,7 @@ class BookieWatcher implements Watcher,
final HashSet<InetSocketAddress> deadBookies;
synchronized (this) {
- deadBookies = (HashSet<InetSocketAddress>)knownBookies.clone();
+ deadBookies = new HashSet<InetSocketAddress>(knownBookies);
deadBookies.removeAll(newBookieAddrs);
// No need to close readonly bookie clients.
deadBookies.removeAll(readOnlyBookieWatcher.getReadOnlyBookies());
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java?rev=1460523&r1=1460522&r2=1460523&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java Mon Mar 25 05:27:23 2013
@@ -188,7 +188,7 @@ public class ClientConfiguration extends
* @return zookeeper servers
*/
public String getZkServers() {
- List<Object> servers = getList(ZK_SERVERS, null);
+ List servers = getList(ZK_SERVERS, null);
if (null == servers || 0 == servers.size()) {
return "localhost";
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java?rev=1460523&r1=1460522&r2=1460523&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java Mon Mar 25 05:27:23 2013
@@ -387,7 +387,7 @@ public class ServerConfiguration extends
* @return zookeeper servers
*/
public String getZkServers() {
- List<Object> servers = getList(ZK_SERVERS, null);
+ List servers = getList(ZK_SERVERS, null);
if (null == servers || 0 == servers.size()) {
return null;
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java?rev=1460523&r1=1460522&r2=1460523&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java Mon Mar 25 05:27:23 2013
@@ -177,7 +177,7 @@ abstract class AbstractZkLedgerManager i
new StatCallback() {
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
- if (KeeperException.Code.BadVersion == rc) {
+ if (KeeperException.Code.BADVERSION.intValue() == rc) {
cb.operationComplete(BKException.Code.MetadataVersionException, null);
} else if (KeeperException.Code.OK.intValue() == rc) {
// update metadata version
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java?rev=1460523&r1=1460522&r2=1460523&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java Mon Mar 25 05:27:23 2013
@@ -133,16 +133,17 @@ public abstract class LedgerManagerFacto
// handle pre V2 layout
if (layout.getLayoutFormatVersion() <= V1) {
// pre V2 layout we use type of ledger manager
+ @SuppressWarnings("deprecation")
String lmType = conf.getLedgerManagerType();
- if (lmType != null && !layout.getManagerType().equals(lmType)) {
+ if (lmType != null && !layout.getManagerFactoryClass().equals(lmType)) {
throw new IOException("Configured layout " + lmType
- + " does not match existing layout " + layout.getManagerType());
+ + " does not match existing layout " + layout.getManagerFactoryClass());
}
// create the ledger manager
- if (FlatLedgerManagerFactory.NAME.equals(layout.getManagerType())) {
+ if (FlatLedgerManagerFactory.NAME.equals(layout.getManagerFactoryClass())) {
lmFactory = new FlatLedgerManagerFactory();
- } else if (HierarchicalLedgerManagerFactory.NAME.equals(layout.getManagerType())) {
+ } else if (HierarchicalLedgerManagerFactory.NAME.equals(layout.getManagerFactoryClass())) {
lmFactory = new HierarchicalLedgerManagerFactory();
} else {
throw new IOException("Unknown ledger manager type: " + lmType);
@@ -189,6 +190,7 @@ public abstract class LedgerManagerFacto
// use default ledger manager factory if no one provided
if (factoryClass == null) {
// for backward compatibility, check manager type
+ @SuppressWarnings("deprecation")
String lmType = conf.getLedgerManagerType();
if (lmType == null) {
factoryClass = FlatLedgerManagerFactory.class;
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java?rev=1460523&r1=1460522&r2=1460523&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java Mon Mar 25 05:27:23 2013
@@ -42,7 +42,6 @@ import org.jboss.netty.channel.ChannelFu
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
@@ -66,8 +65,6 @@ import org.slf4j.LoggerFactory;
* has reconnect logic if a connection to a bookie fails.
*
*/
-
-@ChannelPipelineCoverage("one")
public class PerChannelBookieClient extends SimpleChannelHandler implements ChannelPipelineFactory {
static final Logger LOG = LoggerFactory.getLogger(PerChannelBookieClient.class);
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java?rev=1460523&r1=1460522&r2=1460523&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java Mon Mar 25 05:27:23 2013
@@ -151,6 +151,7 @@ public class Auditor implements Watcher
return;
}
executor.submit(new Runnable() {
+ @SuppressWarnings("unchecked")
public void run() {
try {
waitIfLedgerReplicationDisabled();
@@ -267,6 +268,7 @@ public class Auditor implements Watcher
return zkc.getChildren(conf.getZkAvailableBookiesPath(), this);
}
+ @SuppressWarnings("unchecked")
private void auditingBookies(List<String> availableBookies)
throws BKAuditException, KeeperException, InterruptedException {
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java?rev=1460523&r1=1460522&r2=1460523&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java Mon Mar 25 05:27:23 2013
@@ -287,9 +287,9 @@ public class TestSpeculativeRead extends
LedgerHandle l = bkspec.openLedger(id, digestType, passwd);
ArrayList<InetSocketAddress> ensemble = l.getLedgerMetadata().getEnsembles().get(0L);
- Set<InetSocketAddress> allHosts = new HashSet(ensemble);
- Set<InetSocketAddress> noHost = new HashSet();
- Set<InetSocketAddress> secondHostOnly = new HashSet();
+ Set<InetSocketAddress> allHosts = new HashSet<InetSocketAddress>(ensemble);
+ Set<InetSocketAddress> noHost = new HashSet<InetSocketAddress>();
+ Set<InetSocketAddress> secondHostOnly = new HashSet<InetSocketAddress>();
secondHostOnly.add(ensemble.get(1));
PendingReadOp.LedgerEntryRequest req0 = null, req2 = null, req4 = null;
try {
@@ -339,4 +339,4 @@ public class TestSpeculativeRead extends
bkspec.close();
}
}
-}
\ No newline at end of file
+}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerLayoutTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerLayoutTest.java?rev=1460523&r1=1460522&r2=1460523&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerLayoutTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerLayoutTest.java Mon Mar 25 05:27:23 2013
@@ -56,7 +56,7 @@ public class LedgerLayoutTest extends Bo
layout2.store(zkc, ledgerRootPath);
layout = LedgerLayout.readLayout(zkc, ledgerRootPath);
- assertEquals(testName, layout.getManagerType());
+ assertEquals(testName, layout.getManagerFactoryClass());
assertEquals(testVersion, layout.getManagerVersion());
}
@@ -141,7 +141,7 @@ public class LedgerLayoutTest extends Bo
LedgerLayout layout = LedgerLayout.readLayout(zkc, conf.getZkLedgersRootPath());
assertNotNull("Should not be null", layout);
- assertEquals(FlatLedgerManagerFactory.NAME, layout.getManagerType());
+ assertEquals(FlatLedgerManagerFactory.NAME, layout.getManagerFactoryClass());
assertEquals(FlatLedgerManagerFactory.CUR_VERSION, layout.getManagerVersion());
assertEquals(1, layout.getLayoutFormatVersion());
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestLedgerManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestLedgerManager.java?rev=1460523&r1=1460522&r2=1460523&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestLedgerManager.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestLedgerManager.java Mon Mar 25 05:27:23 2013
@@ -111,6 +111,7 @@ public class TestLedgerManager extends B
/**
* Test bad client configuration
*/
+ @SuppressWarnings("deprecation")
@Test(timeout=60000)
public void testBadConfV1() throws Exception {
ClientConfiguration conf = new ClientConfiguration();
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieZKExpireTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieZKExpireTest.java?rev=1460523&r1=1460522&r2=1460523&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieZKExpireTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieZKExpireTest.java Mon Mar 25 05:27:23 2013
@@ -42,6 +42,7 @@ public class BookieZKExpireTest extends
baseClientConf.setZkTimeout(6000);
}
+ @SuppressWarnings("deprecation")
@Test(timeout=60000)
public void testBookieServerZKExpireBehaviour() throws Exception {
BookieServer server = null;
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java?rev=1460523&r1=1460522&r2=1460523&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java Mon Mar 25 05:27:23 2013
@@ -108,6 +108,7 @@ public class ZooKeeperUtil {
for (final Thread t : allthreads) {
if (t.getName().contains("SyncThread:0")) {
Thread sleeper = new Thread() {
+ @SuppressWarnings("deprecation")
public void run() {
try {
t.suspend();
Modified: zookeeper/bookkeeper/trunk/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/HedwigMessagingSessionFacade.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/HedwigMessagingSessionFacade.java?rev=1460523&r1=1460522&r2=1460523&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/HedwigMessagingSessionFacade.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/HedwigMessagingSessionFacade.java Mon Mar 25 05:27:23 2013
@@ -31,6 +31,8 @@ import org.apache.hedwig.jms.DebugUtil;
import org.apache.hedwig.jms.message.MessageImpl;
import org.apache.hedwig.jms.message.MessageUtil;
import org.apache.hedwig.protocol.PubSubProtocol;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
import org.apache.hedwig.util.Callback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -357,9 +359,10 @@ public class HedwigMessagingSessionFacad
}
try {
+ SubscriptionOptions opts = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
hedwigClient.getSubscriber().subscribe(ByteString.copyFromUtf8(topicName),
- ByteString.copyFromUtf8(subscribedId),
- PubSubProtocol.SubscribeRequest.CreateOrAttach.CREATE_OR_ATTACH);
+ ByteString.copyFromUtf8(subscribedId), opts);
} catch (PubSubException.CouldNotConnectException e) {
JMSException je = new JMSException("receive failed, could not connect .. " + e);
je.setLinkedException(e);
Modified: zookeeper/bookkeeper/trunk/hedwig-client-jms/src/test/java/org/apache/activemq/perf/ConnectionChurnTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client-jms/src/test/java/org/apache/activemq/perf/ConnectionChurnTest.java?rev=1460523&r1=1460522&r2=1460523&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client-jms/src/test/java/org/apache/activemq/perf/ConnectionChurnTest.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client-jms/src/test/java/org/apache/activemq/perf/ConnectionChurnTest.java Mon Mar 25 05:27:23 2013
@@ -40,7 +40,7 @@ public class ConnectionChurnTest extends
public void testPerformance() throws Exception {
ConnectionFactory factory = createConnectionFactory();
- List<Connection> list = new ArrayList();
+ List<Connection> list = new ArrayList<Connection>();
for (int i = 0; i < CONNECTION_COUNT; i++) {
Connection connection = factory.createConnection();
connection.start();
Modified: zookeeper/bookkeeper/trunk/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/CompositePublishTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/CompositePublishTest.java?rev=1460523&r1=1460522&r2=1460523&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/CompositePublishTest.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/CompositePublishTest.java Mon Mar 25 05:27:23 2013
@@ -42,6 +42,7 @@ public class CompositePublishTest extend
protected MessageConsumer[] consumers;
protected List[] messageLists;
+ @SuppressWarnings("unchecked")
protected void setUp() throws Exception {
super.setUp();
@@ -116,6 +117,7 @@ public class CompositePublishTest extend
return super.getSubject() + ".";
}
+ @SuppressWarnings("unchecked")
protected void assertMessagesAreReceived() throws JMSException {
waitForMessagesToBeDelivered();
int size = messageLists.length;
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkPublisher.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkPublisher.java?rev=1460523&r1=1460522&r2=1460523&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkPublisher.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkPublisher.java Mon Mar 25 05:27:23 2013
@@ -26,6 +26,7 @@ import org.apache.hedwig.client.api.Subs
import org.apache.hedwig.client.benchmark.BenchmarkUtils.BenchmarkCallback;
import org.apache.hedwig.client.benchmark.BenchmarkUtils.ThroughputLatencyAggregator;
import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
import org.apache.hedwig.util.Callback;
@@ -50,7 +51,9 @@ public class BenchmarkPublisher extends
public void warmup(int nWarmup) throws Exception {
ByteString topic = ByteString.copyFromUtf8("warmup" + partitionIndex);
ByteString subId = ByteString.copyFromUtf8("sub");
- subscriber.subscribe(topic, subId, CreateOrAttach.CREATE_OR_ATTACH);
+ SubscriptionOptions opts = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
+ subscriber.subscribe(topic, subId, opts);
subscriber.startDelivery(topic, subId, new MessageHandler() {
@Override
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkSubscriber.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkSubscriber.java?rev=1460523&r1=1460522&r2=1460523&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkSubscriber.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkSubscriber.java Mon Mar 25 05:27:23 2013
@@ -35,6 +35,7 @@ import org.apache.hedwig.client.benchmar
import org.apache.hedwig.protocol.PubSubProtocol.Message;
import org.apache.hedwig.protocol.PubSubProtocol.RegionSpecificSeqId;
import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
import org.apache.hedwig.util.Callback;
public class BenchmarkSubscriber extends BenchmarkWorker implements Callable<Void> {
@@ -73,7 +74,9 @@ public class BenchmarkSubscriber extends
final String topic = HedwigBenchmark.TOPIC_PREFIX + i;
- subscriber.subscribe(ByteString.copyFromUtf8(topic), subId, CreateOrAttach.CREATE_OR_ATTACH);
+ SubscriptionOptions opts = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
+ subscriber.subscribe(ByteString.copyFromUtf8(topic), subId, opts);
subscriber.startDelivery(ByteString.copyFromUtf8(topic), subId, new MessageHandler() {
@Override
@@ -130,7 +133,10 @@ public class BenchmarkSubscriber extends
if (!HedwigBenchmark.amIResponsibleForTopic(i, partitionIndex, numPartitions)) {
continue;
}
- subscriber.asyncSubscribe(ByteString.copyFromUtf8(topicPrefix + i), subId, CreateOrAttach.CREATE_OR_ATTACH,
+ SubscriptionOptions opts = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
+ subscriber.asyncSubscribe(ByteString.copyFromUtf8(topicPrefix + i),
+ subId, opts,
new BenchmarkCallback(agg), null);
}
// Wait till the benchmark test has completed
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java?rev=1460523&r1=1460522&r2=1460523&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java Mon Mar 25 05:27:23 2013
@@ -52,6 +52,7 @@ import org.apache.hedwig.util.Subscripti
* This is the Hedwig Netty specific implementation of the Subscriber interface.
*
*/
+@SuppressWarnings("deprecation") // so that we can implemented the Deprecated subscribe methods without a warning
public class HedwigSubscriber implements Subscriber {
private static Logger logger = LoggerFactory.getLogger(HedwigSubscriber.class);
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelHandler.java?rev=1460523&r1=1460522&r2=1460523&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/HChannelHandler.java Mon Mar 25 05:27:23 2013
@@ -26,7 +26,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
@@ -48,7 +47,6 @@ import org.apache.hedwig.protocol.PubSub
import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEventResponse;
import static org.apache.hedwig.util.VarArgs.va;
-@ChannelPipelineCoverage("all")
public class HChannelHandler extends SimpleChannelHandler {
private static Logger logger = LoggerFactory.getLogger(HChannelHandler.class);
@@ -267,7 +265,7 @@ public class HChannelHandler extends Sim
// explicitly or the client has been stopped.
if (cfg.isSSLEnabled() && !channelClosedExplicitly && !channelManager.isClosed()) {
logger.debug("Initiating the SSL handshake");
- ctx.getPipeline().get(SslHandler.class).handshake(e.getChannel());
+ ctx.getPipeline().get(SslHandler.class).handshake();
}
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java?rev=1460523&r1=1460522&r2=1460523&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java Mon Mar 25 05:27:23 2013
@@ -346,7 +346,9 @@ public class HedwigConsole {
try {
for (int j=startSub; j<=endSub; j++) {
ByteString sub = ByteString.copyFromUtf8(subPrefix + j);
- subscriber.subscribe(topic, sub, CreateOrAttach.CREATE_OR_ATTACH);
+ SubscriptionOptions opts = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
+ subscriber.subscribe(topic, sub, opts);
subscriber.unsubscribe(topic, sub);
}
System.out.println("RMSUB " + topic.toStringUtf8() + " DONE");
@@ -471,7 +473,9 @@ public class HedwigConsole {
System.out.println("Starting PUBSUB test ...");
try {
// sub the topic
- subscriber.subscribe(topic, subId, CreateOrAttach.CREATE_OR_ATTACH);
+ SubscriptionOptions opts = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
+ subscriber.subscribe(topic, subId, opts);
subscribed = true;
System.out.println("Sub topic " + topic.toStringUtf8() + ", subscriber id " + subId.toStringUtf8());
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/JLineHedwigCompletor.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/JLineHedwigCompletor.java?rev=1460523&r1=1460522&r2=1460523&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/JLineHedwigCompletor.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/JLineHedwigCompletor.java Mon Mar 25 05:27:23 2013
@@ -68,21 +68,24 @@ public class JLineHedwigCompletor implem
READTOPIC.equalsIgnoreCase(tokens[0]))) {
return completeTopic(buffer, tokens[1], candidates);
}
- List<String> cmds = HedwigCommands.findCandidateCommands(tokens);
+ List cmds = HedwigCommands.findCandidateCommands(tokens);
return completeCommand(buffer, tokens[tokens.length - 1], cmds, candidates);
}
+ @SuppressWarnings("unchecked")
private int completeCommand(String buffer, String token,
- List<String> commands, List<String> candidates) {
- for (String cmd : commands) {
- if (cmd.startsWith(token)) {
- candidates.add(cmd);
+ List commands, List candidates) {
+ for (Object cmdo : commands) {
+ assert (cmdo instanceof String);
+ if (((String)cmdo).startsWith(token)) {
+ candidates.add(cmdo);
}
}
return buffer.lastIndexOf(" ") + 1;
}
- private int completeTopic(String buffer, String token, List<String> candidates) {
+ @SuppressWarnings("unchecked")
+ private int completeTopic(String buffer, String token, List candidates) {
try {
Iterator<ByteString> children = admin.getTopics();
int i = 0;
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/FakeBookie.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/FakeBookie.java?rev=1460523&r1=1460522&r2=1460523&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/FakeBookie.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/FakeBookie.java Mon Mar 25 05:27:23 2013
@@ -26,7 +26,7 @@ import org.jboss.netty.bootstrap.ServerB
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineCoverage;
+import org.jboss.netty.channel.ChannelHandler.Sharable;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.MessageEvent;
@@ -38,7 +38,7 @@ import org.jboss.netty.handler.codec.fra
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.logging.Log4JLoggerFactory;
-@ChannelPipelineCoverage("all")
+@Sharable
public class FakeBookie extends SimpleChannelHandler implements
ChannelPipelineFactory {
static final Logger logger = LoggerFactory.getLogger(FakeBookie.class);
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java?rev=1460523&r1=1460522&r2=1460523&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java Mon Mar 25 05:27:23 2013
@@ -264,7 +264,7 @@ public class ServerConfiguration extends
* @return String
*/
public String getZkHost() {
- List<Object> servers = conf.getList(ZK_HOST, null);
+ List servers = conf.getList(ZK_HOST, null);
if (null == servers || 0 == servers.size()) {
return "localhost";
}
@@ -447,6 +447,7 @@ public class ServerConfiguration extends
* copies of each ledger entry is written).
*
* @return int
+ * @deprecated please use #getBkWriteQuorumSize() and #getBkAckQuorumSize()
*/
@Deprecated
protected int getBkQuorumSize() {
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/ZkMetadataManagerFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/ZkMetadataManagerFactory.java?rev=1460523&r1=1460522&r2=1460523&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/ZkMetadataManagerFactory.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/ZkMetadataManagerFactory.java Mon Mar 25 05:27:23 2013
@@ -274,7 +274,7 @@ public class ZkMetadataManagerFactory ex
callback.operationFailed(ctx, PubSubException.create(StatusCode.NO_TOPIC_PERSISTENCE_INFO,
"No persistence info found for topic " + topic.toStringUtf8()));
return;
- } else if (rc == Code.BadVersion) {
+ } else if (rc == Code.BADVERSION.intValue()) {
// bad version
callback.operationFailed(ctx, PubSubException.create(StatusCode.BAD_VERSION,
"Bad version provided to update persistence info of topic " + topic.toStringUtf8()));
@@ -318,7 +318,7 @@ public class ZkMetadataManagerFactory ex
callback.operationFailed(ctx, PubSubException.create(StatusCode.NO_TOPIC_PERSISTENCE_INFO,
"No persistence info found for topic " + topic.toStringUtf8()));
return;
- } else if (rc == Code.BadVersion) {
+ } else if (rc == Code.BADVERSION.intValue()) {
// bad version
callback.operationFailed(ctx, PubSubException.create(StatusCode.BAD_VERSION,
"Bad version provided to delete persistence info of topic " + topic.toStringUtf8()));
@@ -449,7 +449,7 @@ public class ZkMetadataManagerFactory ex
"No subscription state found for (topic:" + topic.toStringUtf8() + ", subscriber:"
+ subscriberId.toStringUtf8() + ")."));
return;
- } else if (rc == Code.BadVersion) {
+ } else if (rc == Code.BADVERSION.intValue()) {
// bad version
callback.operationFailed(ctx, PubSubException.create(StatusCode.BAD_VERSION,
"Bad version provided to replace subscription data of topic "
@@ -503,7 +503,7 @@ public class ZkMetadataManagerFactory ex
"No subscription state found for (topic:" + topic.toStringUtf8() + ", subscriber:"
+ subscriberId.toStringUtf8() + ")."));
return;
- } else if (rc == Code.BadVersion) {
+ } else if (rc == Code.BADVERSION.intValue()) {
// bad version
callback.operationFailed(ctx, PubSubException.create(StatusCode.BAD_VERSION,
"Bad version provided to delete subscription data of topic "
@@ -737,7 +737,7 @@ public class ZkMetadataManagerFactory ex
callback.operationFailed(ctx, PubSubException.create(StatusCode.NO_TOPIC_OWNER_INFO,
"No owner info found for topic " + topic.toStringUtf8()));
return;
- } else if (rc == Code.BadVersion) {
+ } else if (rc == Code.BADVERSION.intValue()) {
// bad version
callback.operationFailed(ctx, PubSubException.create(StatusCode.BAD_VERSION,
"Bad version provided to update owner info of topic " + topic.toStringUtf8()));
@@ -811,7 +811,7 @@ public class ZkMetadataManagerFactory ex
callback.operationFailed(ctx, PubSubException.create(StatusCode.NO_TOPIC_OWNER_INFO,
"No owner info found for topic " + topic.toStringUtf8()));
return;
- } else if (Code.BadVersion == rc) {
+ } else if (Code.BADVERSION.intValue() == rc) {
// bad version
callback.operationFailed(ctx, PubSubException.create(StatusCode.BAD_VERSION,
"Bad version provided to delete owner info of topic " + topic.toStringUtf8()));
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/UmbrellaHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/UmbrellaHandler.java?rev=1460523&r1=1460522&r2=1460523&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/UmbrellaHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/UmbrellaHandler.java Mon Mar 25 05:27:23 2013
@@ -26,7 +26,7 @@ import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipelineCoverage;
+import org.jboss.netty.channel.ChannelHandler.Sharable;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
@@ -44,7 +44,7 @@ import org.apache.hedwig.protoextensions
import org.apache.hedwig.server.handlers.ChannelDisconnectListener;
import org.apache.hedwig.server.handlers.Handler;
-@ChannelPipelineCoverage("all")
+@Sharable
public class UmbrellaHandler extends SimpleChannelHandler {
static Logger logger = LoggerFactory.getLogger(UmbrellaHandler.class);
@@ -100,7 +100,7 @@ public class UmbrellaHandler extends Sim
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
if (isSSLEnabled) {
- ctx.getPipeline().get(SslHandler.class).handshake(e.getChannel()).addListener(new ChannelFutureListener() {
+ ctx.getPipeline().get(SslHandler.class).handshake().addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
logger.debug("SSL handshake has completed successfully!");
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxySubscribeHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxySubscribeHandler.java?rev=1460523&r1=1460522&r2=1460523&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxySubscribeHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/proxy/ProxySubscribeHandler.java Mon Mar 25 05:27:23 2013
@@ -26,6 +26,7 @@ import org.apache.hedwig.exceptions.PubS
import org.apache.hedwig.exceptions.PubSubException.TopicBusyException;
import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
import org.apache.hedwig.protoextensions.PubSubResponseUtils;
import org.apache.hedwig.server.handlers.ChannelDisconnectListener;
import org.apache.hedwig.server.handlers.Handler;
@@ -59,9 +60,11 @@ public class ProxySubscribeHandler imple
SubscribeRequest subRequest = request.getSubscribeRequest();
final TopicSubscriber topicSubscriber = new TopicSubscriber(request.getTopic(), subRequest.getSubscriberId());
+ SubscriptionOptions opts = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(subRequest.getCreateOrAttach()).build();
- subscriber.asyncSubscribe(topicSubscriber.getTopic(), subRequest.getSubscriberId(), subRequest
- .getCreateOrAttach(), new Callback<Void>() {
+ subscriber.asyncSubscribe(topicSubscriber.getTopic(), subRequest.getSubscriberId(),
+ opts, new Callback<Void>() {
@Override
public void operationFailed(Object ctx, PubSubException exception) {
channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId()));
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/ZkUtils.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/ZkUtils.java?rev=1460523&r1=1460522&r2=1460523&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/ZkUtils.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/ZkUtils.java Mon Mar 25 05:27:23 2013
@@ -68,7 +68,7 @@ public class ZkUtils {
}
if (Code.OK.intValue() != syncObj.rc) {
- throw KeeperException.create(syncObj.rc, syncObj.path);
+ throw KeeperException.create(KeeperException.Code.get(syncObj.rc), syncObj.path);
}
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java?rev=1460523&r1=1460522&r2=1460523&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java Mon Mar 25 05:27:23 2013
@@ -222,7 +222,9 @@ public class TestPubSubClient extends Pu
final Map<String, MessageSeqId> receivedMsgs =
new HashMap<String, MessageSeqId>();
- subscriber.subscribe(topic, subid, CreateOrAttach.CREATE_OR_ATTACH);
+ SubscriptionOptions opts = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
+ subscriber.subscribe(topic, subid, opts);
subscriber.startDelivery(topic, subid, new MessageHandler() {
synchronized public void deliver(ByteString topic, ByteString subscriberId,
Message msg, Callback<Void> callback,
@@ -286,7 +288,9 @@ public class TestPubSubClient extends Pu
final Map<String, MessageSeqId> receivedMsgs =
new HashMap<String, MessageSeqId>();
- subscriber.subscribe(topic, subid, CreateOrAttach.CREATE_OR_ATTACH);
+ SubscriptionOptions opts = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
+ subscriber.subscribe(topic, subid, opts);
subscriber.startDelivery(topic, subid, new MessageHandler() {
synchronized public void deliver(ByteString topic, ByteString subscriberId,
Message msg, Callback<Void> callback,
@@ -361,7 +365,9 @@ public class TestPubSubClient extends Pu
public void testSyncSubscribe() throws Exception {
boolean subscribeSuccess = true;
try {
- subscriber.subscribe(ByteString.copyFromUtf8("mySyncSubscribeTopic"), ByteString.copyFromUtf8("1"), CreateOrAttach.CREATE_OR_ATTACH);
+ SubscriptionOptions opts = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
+ subscriber.subscribe(ByteString.copyFromUtf8("mySyncSubscribeTopic"), ByteString.copyFromUtf8("1"), opts);
} catch (Exception e) {
subscribeSuccess = false;
}
@@ -370,8 +376,10 @@ public class TestPubSubClient extends Pu
@Test(timeout=60000)
public void testAsyncSubscribe() throws Exception {
+ SubscriptionOptions opts = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
subscriber.asyncSubscribe(ByteString.copyFromUtf8("myAsyncSubscribeTopic"), ByteString.copyFromUtf8("1"),
- CreateOrAttach.CREATE_OR_ATTACH, new TestCallback(), null);
+ opts, new TestCallback(), null);
assertTrue(queue.take());
}
@@ -379,7 +387,9 @@ public class TestPubSubClient extends Pu
public void testStartDeliveryAfterCloseSub() throws Exception {
ByteString topic = ByteString.copyFromUtf8("testStartDeliveryAfterCloseSub");
ByteString subid = ByteString.copyFromUtf8("mysubid");
- subscriber.subscribe(topic, subid, CreateOrAttach.CREATE_OR_ATTACH);
+ SubscriptionOptions opts = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
+ subscriber.subscribe(topic, subid, opts);
// Start delivery for the subscriber
subscriber.startDelivery(topic, subid, new TestMessageHandler());
@@ -394,7 +404,7 @@ public class TestPubSubClient extends Pu
subscriber.closeSubscription(topic, subid);
// subscribe again
- subscriber.subscribe(topic, subid, CreateOrAttach.CREATE_OR_ATTACH);
+ subscriber.subscribe(topic, subid, opts);
subscriber.startDelivery(topic, subid, new TestMessageHandler());
publisher.publish(topic, Message.newBuilder()
@@ -406,7 +416,10 @@ public class TestPubSubClient extends Pu
public void testSubscribeAndConsume() throws Exception {
ByteString topic = ByteString.copyFromUtf8("myConsumeTopic");
ByteString subscriberId = ByteString.copyFromUtf8("1");
- subscriber.asyncSubscribe(topic, subscriberId, CreateOrAttach.CREATE_OR_ATTACH, new TestCallback(), null);
+ SubscriptionOptions opts = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
+
+ subscriber.asyncSubscribe(topic, subscriberId, opts, new TestCallback(), null);
assertTrue(queue.take());
// Start delivery for the subscriber
@@ -440,7 +453,9 @@ public class TestPubSubClient extends Pu
public void testAsyncSubscribeAndUnsubscribe() throws Exception {
ByteString topic = ByteString.copyFromUtf8("myAsyncUnsubTopic");
ByteString subscriberId = ByteString.copyFromUtf8("1");
- subscriber.asyncSubscribe(topic, subscriberId, CreateOrAttach.CREATE_OR_ATTACH, new TestCallback(), null);
+ SubscriptionOptions opts = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
+ subscriber.asyncSubscribe(topic, subscriberId, opts, new TestCallback(), null);
assertTrue(queue.take());
subscriber.asyncUnsubscribe(topic, subscriberId, new TestCallback(), null);
assertTrue(queue.take());
@@ -463,7 +478,9 @@ public class TestPubSubClient extends Pu
public void testAsyncSubscribeAndCloseSubscription() throws Exception {
ByteString topic = ByteString.copyFromUtf8("myAsyncSubAndCloseSubTopic");
ByteString subscriberId = ByteString.copyFromUtf8("1");
- subscriber.asyncSubscribe(topic, subscriberId, CreateOrAttach.CREATE_OR_ATTACH, new TestCallback(), null);
+ SubscriptionOptions opts = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
+ subscriber.asyncSubscribe(topic, subscriberId, opts, new TestCallback(), null);
assertTrue(queue.take());
subscriber.closeSubscription(topic, subscriberId);
assertTrue(true);
@@ -474,11 +491,13 @@ public class TestPubSubClient extends Pu
ByteString topic = ByteString.copyFromUtf8("mySubClosesubAndPublish");
ByteString subid = ByteString.copyFromUtf8("mysub");
// to populate startServing/stopServing sequeuence
+ SubscriptionOptions opts = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
for (int i=0; i<5; i++) {
- subscriber.subscribe(topic, subid, CreateOrAttach.CREATE_OR_ATTACH);
+ subscriber.subscribe(topic, subid, opts);
subscriber.closeSubscription(topic, subid);
}
- subscriber.subscribe(topic, subid, CreateOrAttach.CREATE_OR_ATTACH);
+ subscriber.subscribe(topic, subid, opts);
subscriber.startDelivery(topic, subid, new TestMessageHandler());
for (int i=0; i<3; i++) {
publisher.asyncPublish(topic,
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestSubAfterCloseSub.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestSubAfterCloseSub.java?rev=1460523&r1=1460522&r2=1460523&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestSubAfterCloseSub.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/TestSubAfterCloseSub.java Mon Mar 25 05:27:23 2013
@@ -27,6 +27,7 @@ import org.apache.hedwig.client.api.Subs
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.protocol.PubSubProtocol.Message;
import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
import org.apache.hedwig.server.HedwigHubTestBase;
import org.apache.hedwig.server.delivery.DeliveryManager;
import org.apache.hedwig.server.delivery.FIFODeliveryManager;
@@ -106,7 +107,9 @@ public class TestSubAfterCloseSub extend
final CountDownLatch deliverLatch = new CountDownLatch(1);
try {
- subscriber.subscribe(topic, subid, CreateOrAttach.CREATE_OR_ATTACH);
+ SubscriptionOptions opts = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
+ subscriber.subscribe(topic, subid, opts);
sleepDeliveryManager(wakeupLatch);
subscriber.asyncCloseSubscription(topic, subid, new Callback<Void>() {
@Override
@@ -118,7 +121,10 @@ public class TestSubAfterCloseSub extend
logger.error("Closesub failed : ", exception);
}
}, null);
- subscriber.asyncSubscribe(topic, subid, CreateOrAttach.ATTACH, new Callback<Void>() {
+
+ SubscriptionOptions optsAttach = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.ATTACH).build();
+ subscriber.asyncSubscribe(topic, subid, optsAttach, new Callback<Void>() {
@Override
public void operationFinished(Object ctx, Void resultOfOperation) {
try {
@@ -181,9 +187,14 @@ public class TestSubAfterCloseSub extend
final ByteString topic = ByteString.copyFromUtf8("TestSimpleClientTopicBusy");
final ByteString subid = ByteString.copyFromUtf8("mysub");
- subscriber1.subscribe(topic, subid, CreateOrAttach.CREATE_OR_ATTACH);
+ SubscriptionOptions opts1 = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
+ subscriber1.subscribe(topic, subid, opts1);
subscriber1.closeSubscription(topic, subid);
- subscriber2.subscribe(topic, subid, CreateOrAttach.ATTACH);
+
+ SubscriptionOptions opts2 = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.ATTACH).build();
+ subscriber2.subscribe(topic, subid, opts2);
subscriber2.closeSubscription(topic, subid);
client1.close();
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/netty/TestMultiplexing.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/netty/TestMultiplexing.java?rev=1460523&r1=1460522&r2=1460523&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/netty/TestMultiplexing.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/netty/TestMultiplexing.java Mon Mar 25 05:27:23 2013
@@ -33,6 +33,7 @@ import org.apache.hedwig.client.api.Subs
import org.apache.hedwig.protocol.PubSubProtocol.Message;
import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
import org.apache.hedwig.server.HedwigHubTestBase;
import org.apache.hedwig.server.common.ServerConfiguration;
import org.apache.hedwig.util.Callback;
@@ -270,10 +271,13 @@ public class TestMultiplexing extends He
TestMessageHandler csHandler22 =
new TestMessageHandler(1, X, true, X);
- subscriber.subscribe(topic1, subid1, CreateOrAttach.CREATE);
- subscriber.subscribe(topic1, subid2, CreateOrAttach.CREATE);
- subscriber.subscribe(topic2, subid1, CreateOrAttach.CREATE);
- subscriber.subscribe(topic2, subid2, CreateOrAttach.CREATE);
+ SubscriptionOptions opts = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE).build();
+
+ subscriber.subscribe(topic1, subid1, opts);
+ subscriber.subscribe(topic1, subid2, opts);
+ subscriber.subscribe(topic2, subid1, opts);
+ subscriber.subscribe(topic2, subid2, opts);
// start deliveries
subscriber.startDelivery(topic1, subid1, csHandler11);
@@ -330,10 +334,12 @@ public class TestMultiplexing extends He
TestMessageHandler csHandler22 =
new TestMessageHandler(1, X, true, X);
- subscriber.subscribe(topic1, subid1, CreateOrAttach.CREATE);
- subscriber.subscribe(topic1, subid2, CreateOrAttach.CREATE);
- subscriber.subscribe(topic2, subid1, CreateOrAttach.CREATE);
- subscriber.subscribe(topic2, subid2, CreateOrAttach.CREATE);
+ SubscriptionOptions opts = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE).build();
+ subscriber.subscribe(topic1, subid1, opts);
+ subscriber.subscribe(topic1, subid2, opts);
+ subscriber.subscribe(topic2, subid1, opts);
+ subscriber.subscribe(topic2, subid2, opts);
// start deliveries
subscriber.startDelivery(topic1, subid1, csHandler11);
@@ -390,10 +396,12 @@ public class TestMultiplexing extends He
ThrottleMessageHandler csHandler22 =
new ThrottleMessageHandler(1, 3*X, false, X);
- subscriber.subscribe(topic1, subid1, CreateOrAttach.CREATE);
- subscriber.subscribe(topic1, subid2, CreateOrAttach.CREATE);
- subscriber.subscribe(topic2, subid1, CreateOrAttach.CREATE);
- subscriber.subscribe(topic2, subid2, CreateOrAttach.CREATE);
+ SubscriptionOptions opts = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE).build();
+ subscriber.subscribe(topic1, subid1, opts);
+ subscriber.subscribe(topic1, subid2, opts);
+ subscriber.subscribe(topic2, subid1, opts);
+ subscriber.subscribe(topic2, subid2, opts);
// start deliveries
subscriber.startDelivery(topic1, subid1, csHandler11);
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java?rev=1460523&r1=1460522&r2=1460523&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java Mon Mar 25 05:27:23 2013
@@ -675,7 +675,11 @@ public class TestBackwardCompat extends
publisher.publish(topic, org.apache.hedwig.protocol.PubSubProtocol.Message.newBuilder().setBody(
ByteString.copyFromUtf8(String.valueOf(i))).build());
}
- subscriber.subscribe(topic, subid, org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.ATTACH);
+ org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions opts
+ = org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.ATTACH)
+ .build();
+ subscriber.subscribe(topic, subid, opts);
final AtomicInteger expected = new AtomicInteger(x - y);
final CountDownLatch latch = new CountDownLatch(1);
@@ -744,7 +748,12 @@ public class TestBackwardCompat extends
};
filter.initialize(conf.getConf());
- subscriber.subscribe(topic, subid, org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.ATTACH);
+
+ org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions opts
+ = org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.ATTACH)
+ .build();
+ subscriber.subscribe(topic, subid, opts);
final int base = start + M - start % M;
final AtomicInteger expected = new AtomicInteger(base);
final CountDownLatch latch = new CountDownLatch(1);
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java?rev=1460523&r1=1460522&r2=1460523&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java Mon Mar 25 05:27:23 2013
@@ -106,7 +106,9 @@ public class TestThrottlingDelivery exte
pub.publish(topic, Message.newBuilder().setBody(
ByteString.copyFromUtf8(String.valueOf(i))).build());
}
- sub.subscribe(topic, subid, CreateOrAttach.ATTACH);
+ SubscriptionOptions opts = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.ATTACH).build();
+ sub.subscribe(topic, subid, opts);
final AtomicInteger expected = new AtomicInteger(1);
final CountDownLatch throttleLatch = new CountDownLatch(1);
@@ -207,7 +209,9 @@ public class TestThrottlingDelivery exte
ByteString topic = ByteString.copyFromUtf8("testServerSideThrottle");
ByteString subid = ByteString.copyFromUtf8("serverThrottleSub");
- sub.subscribe(topic, subid, CreateOrAttach.CREATE);
+ SubscriptionOptions opts = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE).build();
+ sub.subscribe(topic, subid, opts);
sub.closeSubscription(topic, subid);
// throttle with hub server's setting
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/filter/TestMessageFilter.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/filter/TestMessageFilter.java?rev=1460523&r1=1460522&r2=1460523&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/filter/TestMessageFilter.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/filter/TestMessageFilter.java Mon Mar 25 05:27:23 2013
@@ -253,7 +253,9 @@ public class TestMessageFilter extends H
ByteString topic = ByteString.copyFromUtf8("TestMessageFilter");
ByteString subid = ByteString.copyFromUtf8("mysub");
- subscriber.subscribe(topic, subid, CreateOrAttach.CREATE_OR_ATTACH);
+ SubscriptionOptions opts = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
+ subscriber.subscribe(topic, subid, opts);
subscriber.closeSubscription(topic, subid);
publishNums(topic, 0, 100, 2);
receiveNumModM(topic, subid, ModMessageFilter.class.getName(), null, 0, 50, 2, true);
@@ -282,7 +284,9 @@ public class TestMessageFilter extends H
ByteString topic = ByteString.copyFromUtf8("TestChangeSubscriptionPreferences");
ByteString subid = ByteString.copyFromUtf8("mysub");
- subscriber.subscribe(topic, subid, CreateOrAttach.CREATE_OR_ATTACH);
+ SubscriptionOptions opts = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
+ subscriber.subscribe(topic, subid, opts);
subscriber.closeSubscription(topic, subid);
publishNums(topic, 0, 100, 2);
@@ -304,7 +308,9 @@ public class TestMessageFilter extends H
ByteString topic = ByteString.copyFromUtf8("TestChangeMessageFilter");
ByteString subid = ByteString.copyFromUtf8("mysub");
- subscriber.subscribe(topic, subid, CreateOrAttach.CREATE_OR_ATTACH);
+ SubscriptionOptions opts = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
+ subscriber.subscribe(topic, subid, opts);
subscriber.closeSubscription(topic, subid);
publishNums(topic, 0, 100, 3);
@@ -321,7 +327,9 @@ public class TestMessageFilter extends H
ByteString topic = ByteString.copyFromUtf8("TestFixMessageFilter");
ByteString subid = ByteString.copyFromUtf8("mysub");
- subscriber.subscribe(topic, subid, CreateOrAttach.CREATE_OR_ATTACH);
+ SubscriptionOptions opts = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
+ subscriber.subscribe(topic, subid, opts);
subscriber.closeSubscription(topic, subid);
publishNums(topic, 0, 100, 3);
@@ -340,7 +348,9 @@ public class TestMessageFilter extends H
public void testNullClientMessageFilter() throws Exception {
ByteString topic = ByteString.copyFromUtf8("TestNullClientMessageFilter");
ByteString subid = ByteString.copyFromUtf8("mysub");
- subscriber.subscribe(topic, subid, CreateOrAttach.CREATE_OR_ATTACH);
+ SubscriptionOptions opts = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
+ subscriber.subscribe(topic, subid, opts);
try {
subscriber.startDeliveryWithFilter(topic, subid, null, new ModMessageFilter());
fail("Should fail start delivery with filter using null message handler.");
@@ -364,7 +374,9 @@ public class TestMessageFilter extends H
ByteString topic = ByteString.copyFromUtf8("TestClientMessageFilter");
ByteString subid = ByteString.copyFromUtf8("mysub");
- subscriber.subscribe(topic, subid, CreateOrAttach.CREATE_OR_ATTACH);
+ SubscriptionOptions opts = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
+ subscriber.subscribe(topic, subid, opts);
subscriber.closeSubscription(topic, subid);
publishNums(topic, 0, 100, 2);
receiveNumModM(topic, subid, null, new ModMessageFilter(), 0, 50, 2, true);
@@ -375,7 +387,9 @@ public class TestMessageFilter extends H
ByteString topic = ByteString.copyFromUtf8("TestChangeSubscriptionPreferencesForClientFilter");
ByteString subid = ByteString.copyFromUtf8("mysub");
- subscriber.subscribe(topic, subid, CreateOrAttach.CREATE_OR_ATTACH);
+ SubscriptionOptions opts = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
+ subscriber.subscribe(topic, subid, opts);
subscriber.closeSubscription(topic, subid);
publishNums(topic, 0, 100, 2);
@@ -389,7 +403,9 @@ public class TestMessageFilter extends H
ByteString topic = ByteString.copyFromUtf8("TestChangeClientSideMessageFilter");
ByteString subid = ByteString.copyFromUtf8("mysub");
- subscriber.subscribe(topic, subid, CreateOrAttach.CREATE_OR_ATTACH);
+ SubscriptionOptions opts = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
+ subscriber.subscribe(topic, subid, opts);
subscriber.closeSubscription(topic, subid);
publishNums(topic, 0, 100, 3);
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java?rev=1460523&r1=1460522&r2=1460523&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java Mon Mar 25 05:27:23 2013
@@ -48,6 +48,7 @@ import org.apache.hedwig.protocol.PubSub
import org.apache.hedwig.protocol.PubSubProtocol.StartDeliveryRequest;
import org.apache.hedwig.protocol.PubSubProtocol.StopDeliveryRequest;
import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
import org.apache.hedwig.server.HedwigHubTestBase;
@@ -300,7 +301,9 @@ public abstract class TestHedwigHub exte
if (logger.isDebugEnabled())
logger.debug("Subscribing to topics and starting delivery.");
for (int i = 0; i < batchSize; i++) {
- subscriber.asyncSubscribe(getTopic(i), localSubscriberId, CreateOrAttach.CREATE_OR_ATTACH,
+ SubscriptionOptions opts = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
+ subscriber.asyncSubscribe(getTopic(i), localSubscriberId, opts,
new TestCallback(queue), null);
assertTrue(queue.take());
}
@@ -347,7 +350,9 @@ public abstract class TestHedwigHub exte
Publisher myPublisher = myClient.getPublisher();
ByteString myTopic = getTopic(0);
// Subscribe to a topic and start delivery on it
- mySubscriber.asyncSubscribe(myTopic, localSubscriberId, CreateOrAttach.CREATE_OR_ATTACH,
+ SubscriptionOptions opts = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
+ mySubscriber.asyncSubscribe(myTopic, localSubscriberId, opts,
new TestCallback(queue), null);
assertTrue(queue.take());
startDelivery(mySubscriber, myTopic, localSubscriberId, new TestMessageHandler(consumeQueue));
@@ -384,14 +389,20 @@ public abstract class TestHedwigHub exte
@Test(timeout=10000)
public void testAttachToSubscriptionSuccess() throws Exception {
ByteString topic = getTopic(0);
- subscriber.asyncSubscribe(topic, localSubscriberId, CreateOrAttach.CREATE_OR_ATTACH, new TestCallback(queue),
+ SubscriptionOptions opts1 = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
+
+ subscriber.asyncSubscribe(topic, localSubscriberId, opts1, new TestCallback(queue),
null);
assertTrue(queue.take());
// Close the subscription asynchronously
subscriber.asyncCloseSubscription(topic, localSubscriberId, new TestCallback(queue), null);
assertTrue(queue.take());
+
+ SubscriptionOptions opts2 = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.ATTACH).build();
// Now try to attach to the subscription
- subscriber.asyncSubscribe(topic, localSubscriberId, CreateOrAttach.ATTACH, new TestCallback(queue), null);
+ subscriber.asyncSubscribe(topic, localSubscriberId, opts2, new TestCallback(queue), null);
assertTrue(queue.take());
// Start delivery and publish some messages. Make sure they are consumed
// correctly.
@@ -437,8 +448,10 @@ public abstract class TestHedwigHub exte
@Test(timeout=10000)
public void testUnsubscribe() throws Exception {
ByteString topic = getTopic(0);
- subscriber.asyncSubscribe(topic, localSubscriberId, CreateOrAttach.CREATE_OR_ATTACH, new TestCallback(queue),
- null);
+ SubscriptionOptions opts = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
+
+ subscriber.asyncSubscribe(topic, localSubscriberId, opts, new TestCallback(queue), null);
assertTrue(queue.take());
startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue));
publisher.asyncPublish(topic, getMsg(0), new TestCallback(queue), null);
@@ -488,8 +501,9 @@ public abstract class TestHedwigHub exte
@Test(timeout=10000)
public void testCloseSubscription() throws Exception {
ByteString topic = getTopic(0);
- subscriber.asyncSubscribe(topic, localSubscriberId, CreateOrAttach.CREATE_OR_ATTACH, new TestCallback(queue),
- null);
+ SubscriptionOptions opts = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
+ subscriber.asyncSubscribe(topic, localSubscriberId, opts, new TestCallback(queue), null);
assertTrue(queue.take());
startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue));
publisher.asyncPublish(topic, getMsg(0), new TestCallback(queue), null);
@@ -520,8 +534,10 @@ public abstract class TestHedwigHub exte
@Test(timeout=10000)
public void testStartDeliveryTwice() throws Exception {
ByteString topic = getTopic(0);
- subscriber.asyncSubscribe(topic, localSubscriberId, CreateOrAttach.CREATE_OR_ATTACH, new TestCallback(queue),
- null);
+ SubscriptionOptions opts = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
+
+ subscriber.asyncSubscribe(topic, localSubscriberId, opts, new TestCallback(queue), null);
assertTrue(queue.take());
startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue));
try {
@@ -534,8 +550,10 @@ public abstract class TestHedwigHub exte
@Test(timeout=10000)
public void testStopDelivery() throws Exception {
ByteString topic = getTopic(0);
- subscriber.asyncSubscribe(topic, localSubscriberId, CreateOrAttach.CREATE_OR_ATTACH, new TestCallback(queue),
- null);
+ SubscriptionOptions opts = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
+
+ subscriber.asyncSubscribe(topic, localSubscriberId, opts, new TestCallback(queue), null);
assertTrue(queue.take());
startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue));
publisher.asyncPublish(topic, getMsg(0), new TestCallback(queue), null);
@@ -575,8 +593,10 @@ public abstract class TestHedwigHub exte
@Test(timeout=10000)
public void testConsumedMessagesInOrder() throws Exception {
ByteString topic = getTopic(0);
- subscriber.asyncSubscribe(topic, localSubscriberId, CreateOrAttach.CREATE_OR_ATTACH, new TestCallback(queue),
- null);
+ SubscriptionOptions opts = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
+
+ subscriber.asyncSubscribe(topic, localSubscriberId, opts, new TestCallback(queue), null);
assertTrue(queue.take());
startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue));
// Now publish some messages and verify that they are delivered in order
@@ -596,21 +616,28 @@ public abstract class TestHedwigHub exte
@Test(timeout=10000)
public void testCreateSubscriptionFailure() throws Exception {
ByteString topic = getTopic(0);
- subscriber.asyncSubscribe(topic, localSubscriberId, CreateOrAttach.CREATE_OR_ATTACH, new TestCallback(queue),
- null);
+ SubscriptionOptions opts = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
+
+ subscriber.asyncSubscribe(topic, localSubscriberId, opts, new TestCallback(queue), null);
assertTrue(queue.take());
// Close the subscription asynchronously
subscriber.asyncCloseSubscription(topic, localSubscriberId, new TestCallback(queue), null);
assertTrue(queue.take());
// Now try to create the subscription when it already exists
- subscriber.asyncSubscribe(topic, localSubscriberId, CreateOrAttach.CREATE, new TestCallback(queue), null);
+ SubscriptionOptions optsCreate = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE).build();
+
+ subscriber.asyncSubscribe(topic, localSubscriberId, optsCreate, new TestCallback(queue), null);
assertFalse(queue.take());
}
@Test(timeout=10000)
public void testCreateSubscriptionSuccess() throws Exception {
ByteString topic = getTopic(0);
- subscriber.asyncSubscribe(topic, localSubscriberId, CreateOrAttach.CREATE, new TestCallback(queue), null);
+ SubscriptionOptions opts = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE).build();
+ subscriber.asyncSubscribe(topic, localSubscriberId, opts, new TestCallback(queue), null);
assertTrue(queue.take());
startDelivery(topic, localSubscriberId, new TestMessageHandler(consumeQueue));
int batchSize = 5;
@@ -624,7 +651,10 @@ public abstract class TestHedwigHub exte
@Test(timeout=10000)
public void testAttachToSubscriptionFailure() throws Exception {
ByteString topic = getTopic(0);
- subscriber.asyncSubscribe(topic, localSubscriberId, CreateOrAttach.ATTACH, new TestCallback(queue), null);
+ SubscriptionOptions opts = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.ATTACH).build();
+
+ subscriber.asyncSubscribe(topic, localSubscriberId, opts, new TestCallback(queue), null);
assertFalse(queue.take());
}
@@ -635,7 +665,9 @@ public abstract class TestHedwigHub exte
public void testSyncSubscribeWithInvalidSubscriberId() throws Exception {
boolean subscribeSuccess = false;
try {
- subscriber.subscribe(getTopic(0), hubSubscriberId, CreateOrAttach.CREATE_OR_ATTACH);
+ SubscriptionOptions opts = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
+ subscriber.subscribe(getTopic(0), hubSubscriberId, opts);
} catch (InvalidSubscriberIdException e) {
subscribeSuccess = true;
} catch (Exception ex) {
@@ -646,7 +678,9 @@ public abstract class TestHedwigHub exte
@Test(timeout=10000)
public void testAsyncSubscribeWithInvalidSubscriberId() throws Exception {
- subscriber.asyncSubscribe(getTopic(0), hubSubscriberId, CreateOrAttach.CREATE_OR_ATTACH,
+ SubscriptionOptions opts = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
+ subscriber.asyncSubscribe(getTopic(0), hubSubscriberId, opts,
new TestCallback(queue), null);
assertFalse(queue.take());
}
@@ -679,7 +713,9 @@ public abstract class TestHedwigHub exte
Subscriber hubSubscriber = hubClient.getSubscriber();
boolean subscribeSuccess = false;
try {
- hubSubscriber.subscribe(getTopic(0), localSubscriberId, CreateOrAttach.CREATE_OR_ATTACH);
+ SubscriptionOptions opts = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
+ hubSubscriber.subscribe(getTopic(0), localSubscriberId, opts);
} catch (InvalidSubscriberIdException e) {
subscribeSuccess = true;
} catch (Exception ex) {
@@ -693,7 +729,9 @@ public abstract class TestHedwigHub exte
public void testAsyncHubSubscribeWithInvalidSubscriberId() throws Exception {
Client hubClient = new HedwigHubClient(new HubClientConfiguration());
Subscriber hubSubscriber = hubClient.getSubscriber();
- hubSubscriber.asyncSubscribe(getTopic(0), localSubscriberId, CreateOrAttach.CREATE_OR_ATTACH, new TestCallback(
+ SubscriptionOptions opts = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
+ hubSubscriber.asyncSubscribe(getTopic(0), localSubscriberId, opts, new TestCallback(
queue), null);
assertFalse(queue.take());
hubClient.close();