You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/03/16 04:44:13 UTC
[03/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/MessageBoundedPersistenceTest.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/MessageBoundedPersistenceTest.java b/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/MessageBoundedPersistenceTest.java
deleted file mode 100644
index 08f287c..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/MessageBoundedPersistenceTest.java
+++ /dev/null
@@ -1,288 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.persistence;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import static org.junit.Assert.*;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.hedwig.client.HedwigClient;
-import org.apache.hedwig.client.api.MessageHandler;
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
-import org.apache.hedwig.protocol.PubSubProtocol.LedgerRange;
-import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges;
-
-import org.apache.hedwig.client.api.Client;
-import org.apache.hedwig.client.api.Subscriber;
-import org.apache.hedwig.client.api.Publisher;
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.util.Callback;
-
-import org.apache.hedwig.server.HedwigHubTestBase;
-import org.apache.hedwig.server.common.ServerConfiguration;
-
-public class MessageBoundedPersistenceTest extends HedwigHubTestBase {
- protected static final Logger logger = LoggerFactory.getLogger(MessageBoundedPersistenceTest.class);
-
- protected class SmallReadAheadServerConfiguration
- extends HedwigHubTestBase.HubServerConfiguration {
- SmallReadAheadServerConfiguration(int serverPort, int sslServerPort) {
- super(serverPort, sslServerPort);
- }
- public long getMaximumCacheSize() {
- return 1;
- }
-
- public int getReadAheadCount() {
- return 1;
- }
-
- public int getMessagesConsumedThreadRunInterval() {
- return 1000; // run every second
- }
- }
-
- protected ServerConfiguration getServerConfiguration(int serverPort, int sslServerPort) {
- return new SmallReadAheadServerConfiguration(serverPort, sslServerPort);
- }
-
- private class MessageBoundClientConfiguration extends HubClientConfiguration {
- final int messageBound;
-
- public MessageBoundClientConfiguration(int bound) {
- this.messageBound = bound;
- }
-
- public MessageBoundClientConfiguration() {
- this(5);
- }
-
- public int getSubscriptionMessageBound() {
- return messageBound;
- }
- }
-
- private void sendXExpectLastY(Publisher pub, Subscriber sub,
- ByteString topic, ByteString subid,
- final int X, final int Y) throws Exception {
- for (int i = 0; i < X; i++) {
- pub.publish(topic, Message.newBuilder().setBody(
- ByteString.copyFromUtf8(String.valueOf(i))).build());
- }
- SubscriptionOptions opts = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.ATTACH).build();
- sub.subscribe(topic, subid, opts);
-
- final AtomicInteger expected = new AtomicInteger(X - Y);
- final CountDownLatch latch = new CountDownLatch(1);
- sub.startDelivery(topic, subid, new MessageHandler () {
- synchronized public void deliver(ByteString topic, ByteString subscriberId,
- Message msg, Callback<Void> callback,
- Object context) {
- try {
- int value = Integer.valueOf(msg.getBody().toStringUtf8());
-
- if (value == expected.get()) {
- expected.incrementAndGet();
- } else {
- // error condition
- logger.error("Did not receive expected value, expected {}, got {}",
- expected.get(), value);
- expected.set(0);
- latch.countDown();
- }
- if (expected.get() == X) {
- latch.countDown();
- }
- callback.operationFinished(context, null);
- } catch (Exception e) {
- logger.error("Received bad message", e);
- latch.countDown();// will error on match
- }
- }
- });
- assertTrue("Timed out waiting for messages Y is " + Y
- + " expected is currently " + expected.get(), latch.await(10, TimeUnit.SECONDS));
- assertEquals("Should be expected message with " + X, X, expected.get());
-
- sub.stopDelivery(topic, subid);
- sub.closeSubscription(topic, subid);
- }
-
- @Test(timeout=60000)
- public void testBasicBounding() throws Exception {
- Client client = new HedwigClient(new MessageBoundClientConfiguration(5));
- Publisher pub = client.getPublisher();
- Subscriber sub = client.getSubscriber();
-
- ByteString topic = ByteString.copyFromUtf8("basicBoundingTopic");
- ByteString subid = ByteString.copyFromUtf8("basicBoundingSubId");
- SubscriptionOptions opts = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.CREATE).build();
- sub.subscribe(topic, subid, opts);
- sub.closeSubscription(topic, subid);
-
- sendXExpectLastY(pub, sub, topic, subid, 1000, 5);
-
- client.close();
- }
-
- @Test(timeout=60000)
- public void testMultipleSubscribers() throws Exception {
- ByteString topic = ByteString.copyFromUtf8("multiSubTopic");
-
- Client client = new HedwigClient(new HubClientConfiguration());
- Publisher pub = client.getPublisher();
- Subscriber sub = client.getSubscriber();
-
- SubscriptionOptions options5 = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.CREATE).setMessageBound(5).build();
- SubscriptionOptions options20 = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.CREATE).setMessageBound(20).build();
- SubscriptionOptions optionsUnbounded = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.CREATE).build();
-
- ByteString subid5 = ByteString.copyFromUtf8("bound5SubId");
- ByteString subid20 = ByteString.copyFromUtf8("bound20SubId");
- ByteString subidUnbounded = ByteString.copyFromUtf8("noboundSubId");
-
- sub.subscribe(topic, subid5, options5);
- sub.closeSubscription(topic, subid5);
- sendXExpectLastY(pub, sub, topic, subid5, 1000, 5);
-
- sub.subscribe(topic, subid20, options20);
- sub.closeSubscription(topic, subid20);
- sendXExpectLastY(pub, sub, topic, subid20, 1000, 20);
-
- sub.subscribe(topic, subidUnbounded, optionsUnbounded);
- sub.closeSubscription(topic, subidUnbounded);
-
- sendXExpectLastY(pub, sub, topic, subidUnbounded, 10000, 10000);
- sub.unsubscribe(topic, subidUnbounded);
-
- sendXExpectLastY(pub, sub, topic, subid20, 1000, 20);
- sub.unsubscribe(topic, subid20);
-
- sendXExpectLastY(pub, sub, topic, subid5, 1000, 5);
- sub.unsubscribe(topic, subid5);
-
- client.close();
- }
-
- @Test(timeout=60000)
- public void testUpdateMessageBound() throws Exception {
- ByteString topic = ByteString.copyFromUtf8("UpdateMessageBound");
-
- Client client = new HedwigClient(new HubClientConfiguration());
- Publisher pub = client.getPublisher();
- Subscriber sub = client.getSubscriber();
-
- SubscriptionOptions options5 = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).setMessageBound(5).build();
- SubscriptionOptions options20 = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).setMessageBound(20).build();
- SubscriptionOptions options10 = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).setMessageBound(10).build();
-
- ByteString subid = ByteString.copyFromUtf8("updateSubId");
-
- sub.subscribe(topic, subid, options5);
- sub.closeSubscription(topic, subid);
- sendXExpectLastY(pub, sub, topic, subid, 50, 5);
-
- // update bound to 20
- sub.subscribe(topic, subid, options20);
- sub.closeSubscription(topic, subid);
- sendXExpectLastY(pub, sub, topic, subid, 50, 20);
-
- // update bound to 10
- sub.subscribe(topic, subid, options10);
- sub.closeSubscription(topic, subid);
- sendXExpectLastY(pub, sub, topic, subid, 50, 10);
-
- // message bound is not provided, no update
- SubscriptionOptions opts = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
- sub.subscribe(topic, subid, opts);
- sub.closeSubscription(topic, subid);
- sendXExpectLastY(pub, sub, topic, subid, 50, 10);
-
- client.close();
- }
-
- @Test(timeout=60000)
- public void testLedgerGC() throws Exception {
- Client client = new HedwigClient(new MessageBoundClientConfiguration());
- Publisher pub = client.getPublisher();
- Subscriber sub = client.getSubscriber();
-
- String ledgersPath = "/hedwig/standalone/topics/testGCTopic/ledgers";
- ByteString topic = ByteString.copyFromUtf8("testGCTopic");
- ByteString subid = ByteString.copyFromUtf8("testGCSubId");
- SubscriptionOptions opts = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
- sub.subscribe(topic, subid, opts);
- sub.closeSubscription(topic, subid);
-
- for (int i = 1; i <= 100; i++) {
- pub.publish(topic, Message.newBuilder().setBody(
- ByteString.copyFromUtf8(String.valueOf(i))).build());
- }
- LedgerRanges r = LedgerRanges.parseFrom(bktb.getZooKeeperClient().getData(ledgersPath, false, null));
- assertEquals("Should only have 1 ledger yet", 1, r.getRangesList().size());
- long firstLedger = r.getRangesList().get(0).getLedgerId();
-
- stopHubServers();
- startHubServers();
-
- pub.publish(topic, Message.newBuilder().setBody(
- ByteString.copyFromUtf8(String.valueOf(0xdeadbeef))).build());
-
- r = LedgerRanges.parseFrom(bktb.getZooKeeperClient().getData(ledgersPath, false, null));
- assertEquals("Should have 2 ledgers after restart", 2, r.getRangesList().size());
-
- for (int i = 100; i <= 200; i++) {
- pub.publish(topic, Message.newBuilder().setBody(
- ByteString.copyFromUtf8(String.valueOf(i))).build());
- }
- Thread.sleep(5000); // give GC a chance to happen
-
- r = LedgerRanges.parseFrom(bktb.getZooKeeperClient().getData(ledgersPath, false, null));
- long secondLedger = r.getRangesList().get(0).getLedgerId();
-
- assertEquals("Should only have 1 ledger after GC", 1, r.getRangesList().size());
-
- // ensure original ledger doesn't exist
- String firstLedgerPath = String.format("/ledgers/L%010d", firstLedger);
- String secondLedgerPath = String.format("/ledgers/L%010d", secondLedger);
- assertNull("Ledger should not exist", bktb.getZooKeeperClient().exists(firstLedgerPath, false));
- assertNotNull("Ledger should exist", bktb.getZooKeeperClient().exists(secondLedgerPath, false));
-
- client.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java b/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java
deleted file mode 100644
index 827677f..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubPersistenceManager.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.persistence;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protoextensions.MessageIdUtils;
-import org.apache.hedwig.server.persistence.ScanCallback.ReasonForFinish;
-
-public class StubPersistenceManager implements PersistenceManagerWithRangeScan {
- Map<ByteString, List<Message>> messages = new HashMap<ByteString, List<Message>>();
- boolean failure = false;
- ServiceDownException exception = new ServiceDownException("Asked to fail");
-
- public void deliveredUntil(ByteString topic, Long seqId) {
- // noop
- }
-
- public void consumedUntil(ByteString topic, Long seqId) {
- // noop
- }
-
- public void setMessageBound(ByteString topic, Integer bound) {
- // noop
- }
-
- public void clearMessageBound(ByteString topic) {
- // noop
- }
-
- public void consumeToBound(ByteString topic) {
- // noop
- }
-
- protected static class ArrayListMessageFactory implements Factory<List<Message>> {
- static ArrayListMessageFactory instance = new ArrayListMessageFactory();
-
- public List<Message> newInstance() {
- return new ArrayList<Message>();
- }
- }
-
- public MessageSeqId getCurrentSeqIdForTopic(ByteString topic) {
- long seqId = MapMethods.getAfterInsertingIfAbsent(messages, topic, ArrayListMessageFactory.instance).size();
- return MessageSeqId.newBuilder().setLocalComponent(seqId).build();
- }
-
- public long getSeqIdAfterSkipping(ByteString topic, long seqId, int skipAmount) {
- return seqId + skipAmount;
- }
-
- public void persistMessage(PersistRequest request) {
- if (failure) {
- request.getCallback().operationFailed(request.getCtx(), exception);
- return;
- }
-
- MapMethods.addToMultiMap(messages, request.getTopic(), request.getMessage(), ArrayListMessageFactory.instance);
- request.getCallback().operationFinished(request.getCtx(), MessageIdUtils.mergeLocalSeqId(request.getMessage(),
- (long) messages.get(request.getTopic()).size()).getMsgId());
- }
-
- public void scanSingleMessage(ScanRequest request) {
- if (failure) {
- request.getCallback().scanFailed(request.getCtx(), exception);
- return;
- }
-
- long index = request.getStartSeqId() - 1;
- List<Message> messageList = messages.get(request.getTopic());
- if (index >= messageList.size()) {
- request.getCallback().scanFinished(request.getCtx(), ReasonForFinish.NO_MORE_MESSAGES);
- return;
- }
-
- Message msg = messageList.get((int) index);
- Message toDeliver = MessageIdUtils.mergeLocalSeqId(msg, request.getStartSeqId());
- request.getCallback().messageScanned(request.getCtx(), toDeliver);
- }
-
- public void scanMessages(RangeScanRequest request) {
- if (failure) {
- request.getCallback().scanFailed(request.getCtx(), exception);
- return;
- }
-
- long totalSize = 0;
- long startSeqId = request.getStartSeqId();
- for (int i = 0; i < request.getMessageLimit(); i++) {
- List<Message> messageList = MapMethods.getAfterInsertingIfAbsent(messages, request.getTopic(),
- ArrayListMessageFactory.instance);
- if (startSeqId + i > messageList.size()) {
- request.getCallback().scanFinished(request.getCtx(), ReasonForFinish.NO_MORE_MESSAGES);
- return;
- }
- Message msg = messageList.get((int) startSeqId + i - 1);
- Message toDeliver = MessageIdUtils.mergeLocalSeqId(msg, startSeqId + i);
- request.getCallback().messageScanned(request.getCtx(), toDeliver);
-
- totalSize += toDeliver.getBody().size();
-
- if (totalSize > request.getSizeLimit()) {
- request.getCallback().scanFinished(request.getCtx(), ReasonForFinish.SIZE_LIMIT_EXCEEDED);
- return;
- }
- }
- request.getCallback().scanFinished(request.getCtx(), ReasonForFinish.NUM_MESSAGES_LIMIT_EXCEEDED);
-
- }
-
- @Override
- public void stop() {
- // do nothing
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubScanCallback.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubScanCallback.java b/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubScanCallback.java
deleted file mode 100644
index e9fbd08..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/StubScanCallback.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.persistence;
-
-import java.util.concurrent.LinkedBlockingQueue;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.util.ConcurrencyUtils;
-import org.apache.hedwig.util.Either;
-
-public class StubScanCallback implements ScanCallback {
-
- public static Message END_MESSAGE = Message.newBuilder().setBody(ByteString.EMPTY).build();
-
- LinkedBlockingQueue<Either<Message, Exception>> queue = new LinkedBlockingQueue<Either<Message,Exception>>();
-
- @Override
- public void messageScanned(Object ctx, Message message) {
- ConcurrencyUtils.put(queue, Either.of(message, (Exception) null));
- }
-
- @Override
- public void scanFailed(Object ctx, Exception exception) {
- ConcurrencyUtils.put(queue, Either.of((Message) null, exception));
- }
-
- @Override
- public void scanFinished(Object ctx, ReasonForFinish reason) {
- ConcurrencyUtils.put(queue, Either.of(END_MESSAGE, (Exception) null));
-
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java b/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java
deleted file mode 100644
index d65750b..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java
+++ /dev/null
@@ -1,798 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.persistence;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.bookkeeper.versioning.Version;
-import org.apache.bookkeeper.versioning.Versioned;
-
-import org.apache.hedwig.HelperMethods;
-import org.apache.hedwig.StubCallback;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.protocol.PubSubProtocol;
-import org.apache.hedwig.protocol.PubSubProtocol.LedgerRange;
-import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.meta.MetadataManagerFactory;
-import org.apache.hedwig.server.meta.SubscriptionDataManager;
-import org.apache.hedwig.server.meta.TopicOwnershipManager;
-import org.apache.hedwig.server.meta.TopicPersistenceManager;
-import org.apache.hedwig.server.subscriptions.MMSubscriptionManager;
-import org.apache.hedwig.server.topics.TopicManager;
-import org.apache.hedwig.server.topics.TrivialOwnAllTopicManager;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.ConcurrencyUtils;
-import org.apache.hedwig.util.Either;
-import org.apache.zookeeper.ZooKeeper;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.protobuf.ByteString;
-
-import static org.junit.Assert.*;
-
-@RunWith(Parameterized.class)
-public class TestBookKeeperPersistenceManager {
- private static final Logger logger = LoggerFactory.getLogger(TestPersistenceManagerBlackBox.class);
-
- BookKeeperTestBase bktb;
- private final int numBookies = 3;
- private final long readDelay = 2000L;
- private final int maxEntriesPerLedger = 10;
-
- ServerConfiguration conf;
- ScheduledExecutorService scheduler;
-
- TopicManager tm;
- BookkeeperPersistenceManager manager;
- PubSubException failureException = null;
- TestMetadataManagerFactory metadataManagerFactory;
- TopicPersistenceManager tpManager;
- MMSubscriptionManager sm;
-
- boolean removeStartSeqId;
-
- static class TestMetadataManagerFactory extends MetadataManagerFactory {
-
- final MetadataManagerFactory factory;
- int serviceDownCount = 0;
-
- TestMetadataManagerFactory(ServerConfiguration conf, ZooKeeper zk) throws Exception {
- factory = MetadataManagerFactory.newMetadataManagerFactory(conf, zk);
- }
-
- public void setServiceDownCount(int count) {
- this.serviceDownCount = count;
- }
-
- @Override
- public int getCurrentVersion() {
- return factory.getCurrentVersion();
- }
-
- @Override
- protected MetadataManagerFactory initialize(
- ServerConfiguration cfg, ZooKeeper zk, int version) throws IOException {
- // do nothing
- return factory;
- }
-
- @Override
- public void shutdown() throws IOException {
- factory.shutdown();
- }
-
- @Override
- public Iterator<ByteString> getTopics() throws IOException {
- return factory.getTopics();
- }
-
- @Override
- public TopicPersistenceManager newTopicPersistenceManager() {
- final TopicPersistenceManager manager = factory.newTopicPersistenceManager();
- return new TopicPersistenceManager() {
-
- @Override
- public void close() throws IOException {
- manager.close();
- }
-
- @Override
- public void readTopicPersistenceInfo(ByteString topic,
- Callback<Versioned<LedgerRanges>> callback, Object ctx) {
- if (serviceDownCount > 0) {
- --serviceDownCount;
- callback.operationFailed(ctx, new PubSubException.ServiceDownException("Metadata Store is down"));
- return;
- }
- manager.readTopicPersistenceInfo(topic, callback, ctx);
- }
- @Override
- public void writeTopicPersistenceInfo(ByteString topic, LedgerRanges ranges, Version version,
- Callback<Version> callback, Object ctx) {
- if (serviceDownCount > 0) {
- --serviceDownCount;
- callback.operationFailed(ctx, new PubSubException.ServiceDownException("Metadata Store is down"));
- return;
- }
- manager.writeTopicPersistenceInfo(topic, ranges, version, callback, ctx);
- }
- @Override
- public void deleteTopicPersistenceInfo(ByteString topic, Version version,
- Callback<Void> callback, Object ctx) {
- if (serviceDownCount > 0) {
- --serviceDownCount;
- callback.operationFailed(ctx, new PubSubException.ServiceDownException("Metadata Store is down"));
- return;
- }
- manager.deleteTopicPersistenceInfo(topic, version, callback, ctx);
- }
- };
- }
-
- @Override
- public SubscriptionDataManager newSubscriptionDataManager() {
- final SubscriptionDataManager sdm = factory.newSubscriptionDataManager();
- return new SubscriptionDataManager() {
- @Override
- public void close() throws IOException {
- sdm.close();
- }
-
- @Override
- public void createSubscriptionData(ByteString topic, ByteString subscriberId, SubscriptionData data,
- Callback<Version> callback, Object ctx) {
- sdm.createSubscriptionData(topic, subscriberId, data, callback, ctx);
- }
-
- @Override
- public boolean isPartialUpdateSupported() {
- return sdm.isPartialUpdateSupported();
- }
-
- @Override
- public void updateSubscriptionData(ByteString topic, ByteString subscriberId,
- SubscriptionData dataToUpdate, Version version, Callback<Version> callback, Object ctx) {
- if (serviceDownCount > 0) {
- --serviceDownCount;
- callback.operationFailed(ctx,
- new PubSubException.ServiceDownException("Metadata Store is down"));
- return;
- }
- sdm.updateSubscriptionData(topic, subscriberId, dataToUpdate, version, callback, ctx);
- }
-
- @Override
- public void replaceSubscriptionData(ByteString topic, ByteString subscriberId,
- SubscriptionData dataToReplace, Version version, Callback<Version> callback, Object ctx) {
- if (serviceDownCount > 0) {
- --serviceDownCount;
- callback.operationFailed(ctx,
- new PubSubException.ServiceDownException("Metadata Store is down"));
- return;
- }
- sdm.replaceSubscriptionData(topic, subscriberId, dataToReplace, version, callback, ctx);
- }
-
- @Override
- public void deleteSubscriptionData(ByteString topic, ByteString subscriberId, Version version,
- Callback<Void> callback, Object ctx) {
- sdm.deleteSubscriptionData(topic, subscriberId, version, callback, ctx);
- }
-
- @Override
- public void readSubscriptionData(ByteString topic, ByteString subscriberId,
- Callback<Versioned<SubscriptionData>> callback, Object ctx) {
- sdm.readSubscriptionData(topic, subscriberId, callback, ctx);
- }
-
- @Override
- public void readSubscriptions(ByteString topic,
- Callback<Map<ByteString, Versioned<SubscriptionData>>> cb, Object ctx) {
- sdm.readSubscriptions(topic, cb, ctx);
- }
- };
- }
-
- @Override
- public TopicOwnershipManager newTopicOwnershipManager() {
- return factory.newTopicOwnershipManager();
- }
-
- @Override
- public void format(ServerConfiguration cfg, ZooKeeper zk) throws IOException {
- factory.format(cfg, zk);
- }
- }
-
- public TestBookKeeperPersistenceManager(boolean removeStartSeqId) {
- this.removeStartSeqId = removeStartSeqId;
- }
-
- @Parameters
- public static Collection<Object[]> configs() {
- return Arrays.asList(new Object[][] {
- { true }, { false }
- });
- }
-
- @SuppressWarnings("deprecation")
- private void startCluster(long delay) throws Exception {
- bktb = new BookKeeperTestBase(numBookies, 0L);
- bktb.setUp();
-
- conf = new ServerConfiguration() {
- @Override
- public int getMessagesConsumedThreadRunInterval() {
- return 2000;
- }
- @Override
- public int getConsumeInterval() {
- return 0;
- }
- @Override
- public long getMaxEntriesPerLedger() {
- return maxEntriesPerLedger;
- }
- };
- org.apache.bookkeeper.conf.ClientConfiguration bkClientConf =
- new org.apache.bookkeeper.conf.ClientConfiguration();
- bkClientConf.setNumWorkerThreads(1).setReadTimeout(9999)
- .setThrottleValue(3);
- conf.addConf(bkClientConf);
-
- metadataManagerFactory = new TestMetadataManagerFactory(conf, bktb.getZooKeeperClient());
- tpManager = metadataManagerFactory.newTopicPersistenceManager();
-
- scheduler = Executors.newScheduledThreadPool(1);
- tm = new TrivialOwnAllTopicManager(conf, scheduler);
- manager = new BookkeeperPersistenceManager(bktb.bk, metadataManagerFactory,
- tm, conf, scheduler);
- sm = new MMSubscriptionManager(conf, metadataManagerFactory, tm, manager, null, scheduler);
- }
-
- private void stopCluster() throws Exception {
- tm.stop();
- manager.stop();
- sm.stop();
- tpManager.close();
- metadataManagerFactory.shutdown();
- scheduler.shutdown();
- bktb.tearDown();
- }
-
- @Before
- public void setUp() throws Exception {
- startCluster(0L);
- }
-
- @After
- public void tearDown() throws Exception {
- stopCluster();
- }
-
- class RangeScanVerifier implements ScanCallback {
- LinkedList<Message> pubMsgs;
- boolean runNextScan = false;
- RangeScanRequest nextScan = null;
-
- public RangeScanVerifier(LinkedList<Message> pubMsgs, RangeScanRequest nextScan) {
- this.pubMsgs = pubMsgs;
- this.nextScan = nextScan;
- }
-
- @Override
- public void messageScanned(Object ctx, Message recvMessage) {
- logger.info("Scanned message : {}", recvMessage.getMsgId().getLocalComponent());
- if (null != nextScan && !runNextScan) {
- runNextScan = true;
- manager.scanMessages(nextScan);
- }
-
- if (pubMsgs.size() == 0) {
- return;
- }
-
- Message pubMsg = pubMsgs.removeFirst();
- if (!HelperMethods.areEqual(recvMessage, pubMsg)) {
- fail("Scanned message not equal to expected");
- }
- }
-
- @Override
- public void scanFailed(Object ctx, Exception exception) {
- fail("Failed to scan messages.");
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public void scanFinished(Object ctx, ReasonForFinish reason) {
- LinkedBlockingQueue<Boolean> statusQueue = (LinkedBlockingQueue<Boolean>) ctx;
- try {
- statusQueue.put(pubMsgs.isEmpty());
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
-
- }
-
- private LinkedList<Message> subMessages(List<Message> msgs, int start, int end) {
- LinkedList<Message> result = new LinkedList<Message>();
- for (int i=start; i<=end; i++) {
- result.add(msgs.get(i));
- }
- return result;
- }
-
- @Test(timeout=60000)
- public void testScanMessagesOnClosedLedgerAfterDeleteLedger() throws Exception {
- scanMessagesAfterDeleteLedgerTest(2);
- }
-
- @Test(timeout=60000)
- public void testScanMessagesOnUnclosedLedgerAfterDeleteLedger() throws Exception {
- scanMessagesAfterDeleteLedgerTest(1);
- }
-
- private void scanMessagesAfterDeleteLedgerTest(int numLedgers) throws Exception {
- ByteString topic = ByteString.copyFromUtf8("TestScanMessagesAfterDeleteLedger");
-
- List<Message> msgs = new ArrayList<Message>();
-
- acquireTopic(topic);
- msgs.addAll(publishMessages(topic, 2));
-
- for (int i=0; i<numLedgers; i++) {
- releaseTopic(topic);
- // acquire topic again to force a new ledger
- acquireTopic(topic);
- msgs.addAll(publishMessages(topic, 2));
- }
-
- consumedUntil(topic, 2L);
- // Wait until ledger ranges is updated.
- Thread.sleep(2000L);
- releaseTopic(topic);
-
- // acquire topic again
- acquireTopic(topic);
- // scan messages starting from 3
- LinkedBlockingQueue<Boolean> statusQueue =
- new LinkedBlockingQueue<Boolean>();
- manager.scanMessages(new RangeScanRequest(topic, 3, 2, Long.MAX_VALUE,
- new RangeScanVerifier(subMessages(msgs, 2, 3), null), statusQueue));
- Boolean b = statusQueue.poll(10 * readDelay, TimeUnit.MILLISECONDS);
- assertTrue("Should succeed to scan messages after deleted consumed ledger.", b);
- }
-
- @Test(timeout=60000)
- public void testScanMessagesOnEmptyLedgerAfterDeleteLedger() throws Exception {
- ByteString topic = ByteString.copyFromUtf8("TestScanMessagesOnEmptyLedgerAfterDeleteLedger");
-
- List<Message> msgs = new ArrayList<Message>();
-
- acquireTopic(topic);
- msgs.addAll(publishMessages(topic, 2));
- releaseTopic(topic);
-
- // acquire topic again to force a new ledger
- acquireTopic(topic);
- logger.info("Consumed messages.");
- consumedUntil(topic, 2L);
- // Wait until ledger ranges is updated.
- Thread.sleep(2000L);
- logger.info("Released topic with an empty ledger.");
- // release topic to force an empty ledger
- releaseTopic(topic);
-
- // publish 2 more messages, these message expected to be id 3 and 4
- acquireTopic(topic);
- logger.info("Published more messages.");
- msgs.addAll(publishMessages(topic, 2));
- releaseTopic(topic);
-
- // acquire topic again
- acquireTopic(topic);
- // scan messages starting from 3
- LinkedBlockingQueue<Boolean> statusQueue =
- new LinkedBlockingQueue<Boolean>();
- long startSeqId = removeStartSeqId ? 1 : 3;
- manager.scanMessages(new RangeScanRequest(topic, startSeqId, 2, Long.MAX_VALUE,
- new RangeScanVerifier(subMessages(msgs, 2, 3), null), statusQueue));
- Boolean b = statusQueue.poll(10 * readDelay, TimeUnit.MILLISECONDS);
- assertTrue("Should succeed to scan messages after deleted consumed ledger.", b);
- }
-
- @Test(timeout=60000)
- public void testFailedToDeleteLedger1() throws Exception {
- failedToDeleteLedgersTest(1);
- }
-
- @Test(timeout=60000)
- public void testFailedToDeleteLedger2() throws Exception {
- // succeed to delete second ledger
- failedToDeleteLedgersTest(2);
- }
-
- private void failedToDeleteLedgersTest(int numLedgers) throws Exception {
- final ByteString topic = ByteString.copyFromUtf8("TestFailedToDeleteLedger");
- final int serviceDownCount = 1;
-
- List<Message> msgs = new ArrayList<Message>();
-
- for (int i=0; i<numLedgers; i++) {
- acquireTopic(topic);
- msgs.addAll(publishMessages(topic, 2));
- releaseTopic(topic);
- }
-
- // acquire topic again to force a new ledger
- acquireTopic(topic);
- logger.info("Consumed messages.");
- metadataManagerFactory.setServiceDownCount(serviceDownCount);
- // failed consumed
- consumedUntil(topic, 2L * numLedgers);
- // Wait until ledger ranges is updated.
- Thread.sleep(2000L);
- logger.info("Released topic with an empty ledger.");
- // release topic to force an empty ledger
- releaseTopic(topic);
-
- // publish 2 more messages, these message expected to be id 3 and 4
- acquireTopic(topic);
- logger.info("Published more messages.");
- msgs.addAll(publishMessages(topic, 2));
- releaseTopic(topic);
-
- // acquire topic again
- acquireTopic(topic);
- LinkedBlockingQueue<Boolean> statusQueue =
- new LinkedBlockingQueue<Boolean>();
- manager.scanMessages(new RangeScanRequest(topic, numLedgers * 2 + 1, 2, Long.MAX_VALUE,
- new RangeScanVerifier(subMessages(msgs, numLedgers * 2, numLedgers * 2 + 1), null), statusQueue));
- Boolean b = statusQueue.poll(10 * readDelay, TimeUnit.MILLISECONDS);
- assertTrue("Should succeed to scan messages after deleted consumed ledger.", b);
-
- // consumed
- consumedUntil(topic, (numLedgers + 1) * 2L);
- // Wait until ledger ranges is updated.
- Thread.sleep(2000L);
-
- Semaphore latch = new Semaphore(1);
- latch.acquire();
- tpManager.readTopicPersistenceInfo(topic, new Callback<Versioned<LedgerRanges>>() {
- @Override
- public void operationFinished(Object ctx, Versioned<LedgerRanges> ranges) {
- if (null == ranges || ranges.getValue().getRangesList().size() > 1) {
- failureException = new PubSubException.NoTopicPersistenceInfoException("Invalid persistence info found for topic " + topic.toStringUtf8());
- ((Semaphore)ctx).release();
- return;
- }
- failureException = null;
- ((Semaphore)ctx).release();
- }
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- failureException = exception;
- ((Semaphore)ctx).release();
- }
- }, latch);
- latch.acquire();
- latch.release();
- assertNull("Should not fail with exception.", failureException);
- }
-
- @Test(timeout=60000)
- public void testScanMessagesOnTwoLedgers() throws Exception {
- stopCluster();
- startCluster(readDelay);
-
- ByteString topic = ByteString.copyFromUtf8("TestScanMessagesOnTwoLedgers");
-
- List<Message> msgs = new ArrayList<Message>();
-
- acquireTopic(topic);
- msgs.addAll(publishMessages(topic, 1));
- releaseTopic(topic);
-
- // acquire topic again to force a new ledger
- acquireTopic(topic);
- msgs.addAll(publishMessages(topic, 3));
-
- // scan messages
- LinkedBlockingQueue<Boolean> statusQueue = new LinkedBlockingQueue<Boolean>();
- RangeScanRequest nextScan = new RangeScanRequest(topic, 3, 2, Long.MAX_VALUE,
- new RangeScanVerifier(subMessages(msgs, 2, 3), null), statusQueue);
- manager.scanMessages(new RangeScanRequest(topic, 1, 2, Long.MAX_VALUE,
- new RangeScanVerifier(subMessages(msgs, 0, 1), nextScan), statusQueue));
- Boolean b = statusQueue.poll(10 * readDelay, TimeUnit.MILLISECONDS);
- if (b == null) {
- fail("One scan request doesn't finish");
- }
- b = statusQueue.poll(10 * readDelay, TimeUnit.MILLISECONDS);
- if (b == null) {
- fail("One scan request doesn't finish");
- }
- }
-
- @Test(timeout=60000)
- public void testInconsistentSubscriptionStateAndLedgerRanges1() throws Exception {
- // See the comment of inconsistentSubscriptionStateAndLedgerRanges.
- // For this case, Step (2) failed to update subscription state metadata,
- // but LedgerRanges is updated success.
- // Result: scan messages from 1 to 4 take place on ledger L2.
- inconsistentSubscriptionStateAndLedgerRanges(1);
- }
-
- @Test(timeout=60000)
- public void testInconsistentSubscriptionStateAndLedgerRanges2() throws Exception {
- // See the comment of inconsistentSubscriptionStateAndLedgerRanges.
- // For this case, step (2) failed to update subscription state metadata,
- // step (3) successfully delete L1 but failed to update LedgerRanges.
- // Result: scan messages from 1 to 4 falls in L1 and L2,
- // but BookKeeper may complain L1 not found.
- inconsistentSubscriptionStateAndLedgerRanges(2);
- }
-
- /**
- * Since InMemorySubscriptionState and LedgerRanges is maintained
- * separately, there may exist such inconsistent state:
- * (1). Topic ledgers: L1 [1 ~ 2], L2 [3 ~ ]
- * (2). Subscriber consumes to 2 and InMemorySubscriptionState is updated
- * successfully but failed when updating subscription state metadata
- * (3). AbstractSubscriptionManager#MessagesConsumedTask use
- * InMemorySubscriptionState to do garbage collection
- * and L1 is delete
- * (4). If Hub restarts at this time, old subscription state is read and
- * Hub will try to deliver message from 1
- */
- public void inconsistentSubscriptionStateAndLedgerRanges(int failedCount) throws Exception {
- final ByteString topic = ByteString.copyFromUtf8("inconsistentSubscriptionStateAndLedgerRanges");
- final ByteString subscriberId = ByteString.copyFromUtf8("subId");
- LinkedList<Message> msgs = new LinkedList<Message>();
-
- // make ledger L1 [1 ~ 2]
- acquireTopic(topic);
- msgs.addAll(publishMessages(topic, 2));
- releaseTopic(topic);
-
- // acquire topic again to force a new ledger L2 [3 ~ ]
- acquireTopic(topic);
- msgs.addAll(publishMessages(topic, 2));
-
- StubCallback<Void> voidCb = new StubCallback<Void>();
- StubCallback<SubscriptionData> subDataCb = new StubCallback<SubscriptionData>();
- Either<Void, PubSubException> voidResult;
- Either<SubscriptionData, PubSubException> subDataResult;
-
- // prepare for subscription
- sm.acquiredTopic(topic, voidCb, null);
- voidResult = ConcurrencyUtils.take(voidCb.queue);
- assertNull(voidResult.right()); // no exception
-
- // Do subscription
- SubscribeRequest subRequest = SubscribeRequest.newBuilder().setSubscriberId(subscriberId)
- .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
- sm.serveSubscribeRequest(topic, subRequest, MessageSeqId.newBuilder().setLocalComponent(0).build(), subDataCb,
- null);
- subDataResult = ConcurrencyUtils.take(subDataCb.queue);
- assertNotNull(subDataResult.left()); // serveSubscribeRequest success
- // and return a SubscriptionData
- // object
- assertNull(subDataResult.right()); // no exception
-
- // simulate inconsistent situation between InMemorySubscriptionState and
- // LedgerRanges
- metadataManagerFactory.setServiceDownCount(failedCount);
- sm.setConsumeSeqIdForSubscriber(topic, subscriberId, MessageSeqId.newBuilder().setLocalComponent(2).build(),
- voidCb, null);
- voidResult = ConcurrencyUtils.take(voidCb.queue);
- assertNotNull(voidResult.right()); // update subscription state failed
- // and expect a exception
-
- // wait AbstractSubscriptionManager#MessagesConsumedTask to garbage
- // collect ledger L1
- Thread.sleep(conf.getMessagesConsumedThreadRunInterval() * 2);
-
- // simulate hub restart: read old subscription state metadata and deliver
- // messages from 1
- LinkedBlockingQueue<Boolean> statusQueue = new LinkedBlockingQueue<Boolean>();
- RangeScanRequest scan = new RangeScanRequest(topic, 1, 4, Long.MAX_VALUE, new RangeScanVerifier(msgs, null),
- statusQueue);
- manager.scanMessages(scan);
- Boolean b = statusQueue.poll(10 * readDelay, TimeUnit.MILLISECONDS);
- if (b == null) {
- fail("Scan request doesn't finish");
- }
- }
-
- @Test(timeout=60000)
- // Add this test case for BOOKKEEPER-458
- public void testReadWhenTopicChangeLedger() throws Exception {
- final ByteString topic = ByteString.copyFromUtf8("testReadWhenTopicChangeLedger");
- LinkedList<Message> msgs = new LinkedList<Message>();
-
- // Write maxEntriesPerLedger entries to make topic change ledger
- acquireTopic(topic);
- msgs.addAll(publishMessages(topic, maxEntriesPerLedger));
-
- // Notice, change ledger operation is asynchronous, so we should wait!!!
- Thread.sleep(2000);
-
- // Issue a scan request right start from the new ledger
- LinkedBlockingQueue<Boolean> statusQueue = new LinkedBlockingQueue<Boolean>();
- RangeScanRequest scan = new RangeScanRequest(topic, maxEntriesPerLedger + 1, 1, Long.MAX_VALUE,
- new RangeScanVerifier(msgs, null), statusQueue);
- manager.scanMessages(scan);
- Boolean b = statusQueue.poll(10 * readDelay, TimeUnit.MILLISECONDS);
- if (b == null) {
- fail("Scan request timeout");
- }
- assertFalse("Expect none message is scanned on the new created ledger", b);
- }
-
- class TestCallback implements Callback<PubSubProtocol.MessageSeqId> {
-
- @Override
- @SuppressWarnings("unchecked")
- public void operationFailed(Object ctx, PubSubException exception) {
- LinkedBlockingQueue<Boolean> statusQueue = (LinkedBlockingQueue<Boolean>) ctx;
- try {
- statusQueue.put(false);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public void operationFinished(Object ctx, PubSubProtocol.MessageSeqId resultOfOperation) {
- LinkedBlockingQueue<Boolean> statusQueue = (LinkedBlockingQueue<Boolean>) ctx;
- try {
- statusQueue.put(true);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- protected List<Message> publishMessages(ByteString topic, int numMsgs) throws Exception {
- List<Message> msgs = HelperMethods.getRandomPublishedMessages(numMsgs, 1024);
- LinkedBlockingQueue<Boolean> statusQueue = new LinkedBlockingQueue<Boolean>();
- for (Message msg : msgs) {
-
- try {
- manager.persistMessage(new PersistRequest(topic, msg, new TestCallback(), statusQueue));
- // wait a maximum of a minute
- Boolean b = statusQueue.poll(60, TimeUnit.SECONDS);
- if (b == null) {
- throw new RuntimeException("Publish timed out");
- }
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
- return msgs;
- }
-
- protected void acquireTopic(ByteString topic) throws Exception {
- Semaphore latch = new Semaphore(1);
- latch.acquire();
- manager.acquiredTopic(topic, new Callback<Void>() {
- @Override
- public void operationFinished(Object ctx, Void resultOfOperation) {
- failureException = null;
- ((Semaphore)ctx).release();
- }
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- failureException = exception;
- ((Semaphore)ctx).release();
- }
- }, latch);
- latch.acquire();
- latch.release();
- if (null != failureException) {
- throw failureException;
- }
- }
-
- protected void releaseTopic(final ByteString topic) throws Exception {
- manager.lostTopic(topic);
- // backward testing ledger ranges without start seq id
- if (removeStartSeqId) {
- Semaphore latch = new Semaphore(1);
- latch.acquire();
- tpManager.readTopicPersistenceInfo(topic, new Callback<Versioned<LedgerRanges>>() {
- @Override
- public void operationFinished(Object ctx, Versioned<LedgerRanges> ranges) {
- if (null == ranges) {
- failureException = new PubSubException.NoTopicPersistenceInfoException("No persistence info found for topic " + topic.toStringUtf8());
- ((Semaphore)ctx).release();
- return;
- }
-
- // build a new ledger ranges w/o start seq id.
- LedgerRanges.Builder builder = LedgerRanges.newBuilder();
- final List<LedgerRange> rangesList = ranges.getValue().getRangesList();
- for (LedgerRange range : rangesList) {
- LedgerRange.Builder newRangeBuilder = LedgerRange.newBuilder();
- newRangeBuilder.setLedgerId(range.getLedgerId());
- if (range.hasEndSeqIdIncluded()) {
- newRangeBuilder.setEndSeqIdIncluded(range.getEndSeqIdIncluded());
- }
- builder.addRanges(newRangeBuilder.build());
- }
- tpManager.writeTopicPersistenceInfo(topic, builder.build(), ranges.getVersion(),
- new Callback<Version>() {
- @Override
- public void operationFinished(Object ctx, Version newVersion) {
- failureException = null;
- ((Semaphore)ctx).release();
- }
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- failureException = exception;
- ((Semaphore)ctx).release();
- }
- }, ctx);
- }
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- failureException = exception;
- ((Semaphore)ctx).release();
- }
- }, latch);
- latch.acquire();
- latch.release();
- if (null != failureException) {
- throw failureException;
- }
- }
- }
-
- protected void consumedUntil(ByteString topic, long seqId) throws Exception {
- manager.consumedUntil(topic, seqId);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManagerBlackBox.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManagerBlackBox.java b/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManagerBlackBox.java
deleted file mode 100644
index 1f30b5b..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManagerBlackBox.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.persistence;
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import org.junit.After;
-import org.junit.Before;
-
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.meta.MetadataManagerFactory;
-import org.apache.hedwig.server.topics.TrivialOwnAllTopicManager;
-
-public class TestBookKeeperPersistenceManagerBlackBox extends TestPersistenceManagerBlackBox {
- BookKeeperTestBase bktb;
- private final int numBookies = 3;
-
- MetadataManagerFactory metadataManagerFactory = null;
-
- @Before
- @Override
- public void setUp() throws Exception {
- // We need to setUp this class first since the super.setUp() method will
- // need the BookKeeperTestBase to be instantiated.
- bktb = new BookKeeperTestBase(numBookies);
- bktb.setUp();
- super.setUp();
- }
-
-
- @After
- @Override
- public void tearDown() throws Exception {
- bktb.tearDown();
- super.tearDown();
- if (null != metadataManagerFactory) {
- metadataManagerFactory.shutdown();
- }
- }
-
- @Override
- long getLowestSeqId() {
- return 1;
- }
-
- @Override
- PersistenceManager instantiatePersistenceManager() throws Exception {
- ServerConfiguration conf = new ServerConfiguration();
- ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
-
- metadataManagerFactory =
- MetadataManagerFactory.newMetadataManagerFactory(conf, bktb.getZooKeeperClient());
-
- return new BookkeeperPersistenceManager(bktb.bk, metadataManagerFactory,
- new TrivialOwnAllTopicManager(conf, scheduler),
- conf, scheduler);
- }
-
- @Override
- public long getExpectedSeqId(int numPublished) {
- return numPublished;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookkeeperPersistenceManagerWhiteBox.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookkeeperPersistenceManagerWhiteBox.java b/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookkeeperPersistenceManagerWhiteBox.java
deleted file mode 100644
index da2b06c..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookkeeperPersistenceManagerWhiteBox.java
+++ /dev/null
@@ -1,355 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.persistence;
-
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-
-import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.hedwig.protocol.PubSubProtocol;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.apache.hedwig.util.Either;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.HelperMethods;
-import org.apache.hedwig.StubCallback;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.meta.MetadataManagerFactory;
-import org.apache.hedwig.server.topics.TopicManager;
-import org.apache.hedwig.server.topics.TrivialOwnAllTopicManager;
-import org.apache.hedwig.util.ConcurrencyUtils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.junit.Assert.*;
-
-public class TestBookkeeperPersistenceManagerWhiteBox {
-
- protected static final Logger logger =
- LoggerFactory.getLogger(TestBookkeeperPersistenceManagerWhiteBox.class);
-
- BookKeeperTestBase bktb;
- private final int numBookies = 3;
- BookkeeperPersistenceManager bkpm;
- MetadataManagerFactory mm;
- ServerConfiguration conf;
- ScheduledExecutorService scheduler;
- TopicManager tm;
- ByteString topic = ByteString.copyFromUtf8("topic0");
-
- @Before
- public void setUp() throws Exception {
- bktb = new BookKeeperTestBase(numBookies);
- bktb.setUp();
-
- conf = new ServerConfiguration();
- scheduler = Executors.newScheduledThreadPool(1);
- tm = new TrivialOwnAllTopicManager(conf, scheduler);
-
- mm = MetadataManagerFactory.newMetadataManagerFactory(conf, bktb.getZooKeeperClient());
-
- bkpm = new BookkeeperPersistenceManager(bktb.bk, mm, tm, conf, scheduler);
- }
-
- @After
- public void tearDown() throws Exception {
- mm.shutdown();
- bktb.tearDown();
- }
-
- @Test(timeout=60000)
- public void testEmptyDirtyLedger() throws Exception {
-
- StubCallback<Void> stubCallback = new StubCallback<Void>();
- bkpm.acquiredTopic(topic, stubCallback, null);
- assertNull(ConcurrencyUtils.take(stubCallback.queue).right());
- // now abandon, and try another time, the prev ledger should be dirty
-
- bkpm = new BookkeeperPersistenceManager(new BookKeeper(bktb.getZkHostPort()), mm, tm,
- conf, scheduler);
- bkpm.acquiredTopic(topic, stubCallback, null);
- assertNull(ConcurrencyUtils.take(stubCallback.queue).right());
- assertEquals(0, bkpm.topicInfos.get(topic).ledgerRanges.size());
- }
-
- @Test(timeout=60000)
- public void testNonEmptyDirtyLedger() throws Exception {
-
- Random r = new Random();
- int NUM_MESSAGES_TO_TEST = 100;
- int SIZE_OF_MESSAGES_TO_TEST = 100;
- int index = 0;
- int numPrevLedgers = 0;
- List<Message> messages = HelperMethods.getRandomPublishedMessages(NUM_MESSAGES_TO_TEST,
- SIZE_OF_MESSAGES_TO_TEST);
-
- while (index < messages.size()) {
-
- StubCallback<Void> stubCallback = new StubCallback<Void>();
- bkpm.acquiredTopic(topic, stubCallback, null);
- assertNull(ConcurrencyUtils.take(stubCallback.queue).right());
- assertEquals(numPrevLedgers, bkpm.topicInfos.get(topic).ledgerRanges.size());
-
- StubCallback<PubSubProtocol.MessageSeqId> persistCallback = new StubCallback<PubSubProtocol.MessageSeqId>();
- bkpm.persistMessage(new PersistRequest(topic, messages.get(index), persistCallback, null));
- assertEquals(index + 1, ConcurrencyUtils.take(persistCallback.queue).left().getLocalComponent());
-
- index++;
- // once in every 10 times, give up ledger
- if (r.nextInt(10) == 9) {
- // should not release topic when the message is last message
- // otherwise when we call scan, bookkeeper persistence manager doesn't own the topic
- if (index < messages.size()) {
- // Make the bkpm lose its memory
- bkpm.topicInfos.clear();
- numPrevLedgers++;
- }
- }
- }
-
- // Lets scan now
- StubScanCallback scanCallback = new StubScanCallback();
- bkpm.scanMessages(new RangeScanRequest(topic, 1, NUM_MESSAGES_TO_TEST, Long.MAX_VALUE, scanCallback, null));
- for (int i = 0; i < messages.size(); i++) {
- Message scannedMessage = ConcurrencyUtils.take(scanCallback.queue).left();
- assertTrue(messages.get(i).getBody().equals(scannedMessage.getBody()));
- assertEquals(i + 1, scannedMessage.getMsgId().getLocalComponent());
- }
- assertTrue(StubScanCallback.END_MESSAGE == ConcurrencyUtils.take(scanCallback.queue).left());
-
- }
-
- static final long maxEntriesPerLedger = 10;
-
- class ChangeLedgerServerConfiguration extends ServerConfiguration {
- @Override
- public long getMaxEntriesPerLedger() {
- return maxEntriesPerLedger;
- }
- }
-
- @Test(timeout=60000)
- public void testSyncChangeLedgers() throws Exception {
- int NUM_MESSAGES_TO_TEST = 101;
- int SIZE_OF_MESSAGES_TO_TEST = 100;
- int index = 0;
- List<Message> messages = HelperMethods.getRandomPublishedMessages(NUM_MESSAGES_TO_TEST,
- SIZE_OF_MESSAGES_TO_TEST);
-
- bkpm = new BookkeeperPersistenceManager(bktb.bk, mm, tm,
- new ChangeLedgerServerConfiguration(), scheduler);
-
- // acquire the topic
- StubCallback<Void> stubCallback = new StubCallback<Void>();
- bkpm.acquiredTopic(topic, stubCallback, null);
- assertNull(ConcurrencyUtils.take(stubCallback.queue).right());
- assertEquals(0, bkpm.topicInfos.get(topic).ledgerRanges.size());
-
- while (index < messages.size()) {
- logger.debug("Persist message {}", (index + 1));
- StubCallback<MessageSeqId> persistCallback = new StubCallback<MessageSeqId>();
- bkpm.persistMessage(new PersistRequest(topic, messages.get(index), persistCallback, null));
- assertEquals(index + 1, ConcurrencyUtils.take(persistCallback.queue).left().getLocalComponent());
-
- index++;
- if (index % maxEntriesPerLedger == 1) {
- assertEquals(index / maxEntriesPerLedger, bkpm.topicInfos.get(topic).ledgerRanges.size());
- }
- }
- assertEquals(NUM_MESSAGES_TO_TEST / maxEntriesPerLedger, bkpm.topicInfos.get(topic).ledgerRanges.size());
-
- // Lets scan now
- StubScanCallback scanCallback = new StubScanCallback();
- bkpm.scanMessages(new RangeScanRequest(topic, 1, NUM_MESSAGES_TO_TEST, Long.MAX_VALUE, scanCallback, null));
- for (int i = 0; i < messages.size(); i++) {
- Message scannedMessage = ConcurrencyUtils.take(scanCallback.queue).left();
- assertTrue(messages.get(i).getBody().equals(scannedMessage.getBody()));
- assertEquals(i + 1, scannedMessage.getMsgId().getLocalComponent());
- }
- assertTrue(StubScanCallback.END_MESSAGE == ConcurrencyUtils.take(scanCallback.queue).left());
-
- // Make the bkpm lose its memory
- bkpm.topicInfos.clear();
-
- // acquire the topic again
- stubCallback = new StubCallback<Void>();
- bkpm.acquiredTopic(topic, stubCallback, null);
- assertNull(ConcurrencyUtils.take(stubCallback.queue).right());
- assertEquals(NUM_MESSAGES_TO_TEST / maxEntriesPerLedger + 1, bkpm.topicInfos.get(topic).ledgerRanges.size());
- }
-
- class OrderCheckingCallback extends StubCallback<MessageSeqId> {
- long curMsgId;
- int numMessages;
- int numProcessed;
- int numSuccess;
- int numFailed;
-
- OrderCheckingCallback(long startMsgId, int numMessages) {
- this.curMsgId = startMsgId;
- this.numMessages = numMessages;
- numProcessed = numSuccess = numFailed = 0;
- }
-
- @Override
- public void operationFailed(Object ctx, final PubSubException exception) {
- synchronized (this) {
- ++numFailed;
- ++numProcessed;
- if (numProcessed == numMessages) {
- MessageSeqId.Builder seqIdBuilder =
- MessageSeqId.newBuilder().setLocalComponent(curMsgId);
- super.operationFinished(ctx, seqIdBuilder.build());
- }
- }
- }
-
- @Override
- public void operationFinished(Object ctx, final MessageSeqId seqId) {
- synchronized(this) {
- long msgId = seqId.getLocalComponent();
- if (msgId == curMsgId) {
- ++curMsgId;
- }
- ++numSuccess;
- ++numProcessed;
- if (numProcessed == numMessages) {
- MessageSeqId.Builder seqIdBuilder =
- MessageSeqId.newBuilder().setLocalComponent(curMsgId);
- super.operationFinished(ctx, seqIdBuilder.build());
- }
- }
- }
- }
-
- @Test(timeout=60000)
- public void testAsyncChangeLedgers() throws Exception {
- int NUM_MESSAGES_TO_TEST = 101;
- int SIZE_OF_MESSAGES_TO_TEST = 100;
- List<Message> messages = HelperMethods.getRandomPublishedMessages(NUM_MESSAGES_TO_TEST,
- SIZE_OF_MESSAGES_TO_TEST);
-
- bkpm = new BookkeeperPersistenceManager(bktb.bk, mm, tm,
- new ChangeLedgerServerConfiguration(), scheduler);
-
- // acquire the topic
- StubCallback<Void> stubCallback = new StubCallback<Void>();
- bkpm.acquiredTopic(topic, stubCallback, null);
- assertNull(ConcurrencyUtils.take(stubCallback.queue).right());
- assertEquals(0, bkpm.topicInfos.get(topic).ledgerRanges.size());
-
- OrderCheckingCallback persistCallback =
- new OrderCheckingCallback(1, NUM_MESSAGES_TO_TEST);
- for (Message message : messages) {
- bkpm.persistMessage(new PersistRequest(topic, message, persistCallback, null));
- }
- assertEquals(NUM_MESSAGES_TO_TEST + 1,
- ConcurrencyUtils.take(persistCallback.queue).left().getLocalComponent());
- assertEquals(NUM_MESSAGES_TO_TEST, persistCallback.numSuccess);
- assertEquals(0, persistCallback.numFailed);
- assertEquals(NUM_MESSAGES_TO_TEST / maxEntriesPerLedger,
- bkpm.topicInfos.get(topic).ledgerRanges.size());
-
- // ensure the bkpm has the topic before scanning
- stubCallback = new StubCallback<Void>();
- bkpm.acquiredTopic(topic, stubCallback, null);
-
- // Lets scan now
- StubScanCallback scanCallback = new StubScanCallback();
- bkpm.scanMessages(new RangeScanRequest(topic, 1, NUM_MESSAGES_TO_TEST, Long.MAX_VALUE, scanCallback, null));
- for (int i = 0; i < messages.size(); i++) {
- Either<Message,Exception> e = ConcurrencyUtils.take(scanCallback.queue);
- Message scannedMessage = e.left();
- if (scannedMessage == null) {
- throw e.right();
- }
-
- assertTrue(messages.get(i).getBody().equals(scannedMessage.getBody()));
- assertEquals(i + 1, scannedMessage.getMsgId().getLocalComponent());
- }
- assertTrue(StubScanCallback.END_MESSAGE == ConcurrencyUtils.take(scanCallback.queue).left());
-
- // Make the bkpm lose its memory
- bkpm.topicInfos.clear();
-
- // acquire the topic again
- stubCallback = new StubCallback<Void>();
- bkpm.acquiredTopic(topic, stubCallback, null);
- assertNull(ConcurrencyUtils.take(stubCallback.queue).right());
- assertEquals(NUM_MESSAGES_TO_TEST / maxEntriesPerLedger + 1,
- bkpm.topicInfos.get(topic).ledgerRanges.size());
- }
-
- class ChangeLedgerCallback extends OrderCheckingCallback {
- boolean tearDown = false;
- ChangeLedgerCallback(long startMsgId, int numMessages) {
- super(startMsgId, numMessages);
- }
-
- @Override
- public void operationFinished(Object ctx, final MessageSeqId msgId) {
- super.operationFinished(ctx, msgId);
- // shutdown bookie server when changing ledger
- // so following requests should fail
- if (msgId.getLocalComponent() >= maxEntriesPerLedger && !tearDown) {
- try {
- bktb.tearDownOneBookieServer();
- bktb.tearDownOneBookieServer();
- } catch (Exception e) {
- logger.error("Failed to tear down bookie server.");
- }
- tearDown = true;
- }
- }
- }
-
- @Test(timeout=60000)
- public void testChangeLedgerFailure() throws Exception {
- int NUM_MESSAGES_TO_TEST = 101;
- int SIZE_OF_MESSAGES_TO_TEST = 100;
- List<Message> messages = HelperMethods.getRandomPublishedMessages(NUM_MESSAGES_TO_TEST,
- SIZE_OF_MESSAGES_TO_TEST);
-
- bkpm = new BookkeeperPersistenceManager(bktb.bk, mm, tm,
- new ChangeLedgerServerConfiguration(), scheduler);
-
- // acquire the topic
- StubCallback<Void> stubCallback = new StubCallback<Void>();
- bkpm.acquiredTopic(topic, stubCallback, null);
- assertNull(ConcurrencyUtils.take(stubCallback.queue).right());
- assertEquals(0, bkpm.topicInfos.get(topic).ledgerRanges.size());
-
- ChangeLedgerCallback persistCallback =
- new ChangeLedgerCallback(1, NUM_MESSAGES_TO_TEST);
- for (Message message : messages) {
- bkpm.persistMessage(new PersistRequest(topic, message, persistCallback, null));
- }
- assertEquals(maxEntriesPerLedger + 1,
- ConcurrencyUtils.take(persistCallback.queue).left().getLocalComponent());
- assertEquals(maxEntriesPerLedger, persistCallback.numSuccess);
- assertEquals(NUM_MESSAGES_TO_TEST - maxEntriesPerLedger, persistCallback.numFailed);
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestDeadlock.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestDeadlock.java b/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestDeadlock.java
deleted file mode 100644
index 90c1817..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestDeadlock.java
+++ /dev/null
@@ -1,276 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.persistence;
-
-import java.io.IOException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.SynchronousQueue;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.client.api.MessageHandler;
-import org.apache.hedwig.client.api.Publisher;
-import org.apache.hedwig.client.api.Subscriber;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.client.HedwigClient;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
-import org.apache.hedwig.server.HedwigHubTestBase;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.ConcurrencyUtils;
-
-public class TestDeadlock extends HedwigHubTestBase {
-
- protected static final Logger logger = LoggerFactory.getLogger(TestDeadlock.class);
-
- // Client side variables
- protected HedwigClient client;
- protected Publisher publisher;
- protected Subscriber subscriber;
-
- ByteString topic = ByteString.copyFromUtf8("DeadLockTopic");
- ByteString subscriberId = ByteString.copyFromUtf8("dl");
-
- public TestDeadlock() {
- super(1);
- }
-
- @Override
- @Before
- public void setUp() throws Exception {
- numBookies = 1;
- readDelay = 1000L; // 1s
- super.setUp();
- client = new HedwigClient(new HubClientConfiguration());
- publisher = client.getPublisher();
- subscriber = client.getSubscriber();
- }
-
- @Override
- @After
- public void tearDown() throws Exception {
- client.close();
- super.tearDown();
- }
-
- // Test implementation of Callback for async client actions.
- static class TestCallback implements Callback<Void> {
- private final SynchronousQueue<Boolean> queue;
-
- public TestCallback(SynchronousQueue<Boolean> queue) {
- this.queue = queue;
- }
-
- @Override
- public void operationFinished(Object ctx, Void resultOfOperation) {
- new Thread(new Runnable() {
- @Override
- public void run() {
- if (logger.isDebugEnabled())
- logger.debug("Operation finished!");
- ConcurrencyUtils.put(queue, true);
- }
- }).start();
- }
-
- @Override
- public void operationFailed(Object ctx, final PubSubException exception) {
- new Thread(new Runnable() {
- @Override
- public void run() {
- logger.error("Operation failed!", exception);
- ConcurrencyUtils.put(queue, false);
- }
- }).start();
- }
- }
-
- // Test implementation of subscriber's message handler.
- class TestMessageHandler implements MessageHandler {
- private final SynchronousQueue<Boolean> consumeQueue;
- boolean doAdd = false;
-
- public TestMessageHandler(SynchronousQueue<Boolean> consumeQueue) {
- this.consumeQueue = consumeQueue;
- }
-
- public void deliver(ByteString t, ByteString sub, final Message msg, Callback<Void> callback,
- Object context) {
- if (!doAdd) {
- // after receiving first message, we send a publish
- // to obtain permit of second ledger
- doAdd = true;
- new Thread(new Runnable() {
- @Override
- public void run() {
- // publish messages again to obtain permits
- logger.info("Start publishing message to obtain permit");
- // it obtains the permit and wait for a response,
- // but the response is delayed and readEntries is called
- // in the readComplete callback to read entries of the
- // same ledger. since there is no permit, it blocks
- try {
- CountDownLatch latch = new CountDownLatch(1);
- sleepBookies(8, latch);
- latch.await();
- SynchronousQueue<Boolean> queue = new SynchronousQueue<Boolean>();
- for (int i=0; i<3; i++) {
- publisher.asyncPublish(topic, getMsg(9999), new TestCallback(queue), null);
- }
- for (int i=0; i<3; i++) {
- if (!queue.take()) {
- logger.error("Error publishing to topic {}", topic);
- ConcurrencyUtils.put(consumeQueue, false);
- }
- }
- } catch (Exception e) {
- logger.error("Failed to publish message to obtain permit.");
- }
- }
- }).start();
- }
-
- new Thread(new Runnable() {
- @Override
- public void run() {
- ConcurrencyUtils.put(consumeQueue, true);
- }
- }).start();
- callback.operationFinished(context, null);
- }
- }
-
- // Helper function to generate Messages
- protected Message getMsg(int msgNum) {
- return Message.newBuilder().setBody(ByteString.copyFromUtf8("Message" + msgNum)).build();
- }
-
- // Helper function to generate Topics
- protected ByteString getTopic(int topicNum) {
- return ByteString.copyFromUtf8("DeadLockTopic" + topicNum);
- }
-
- class TestServerConfiguration extends HubServerConfiguration {
- public TestServerConfiguration(int serverPort, int sslServerPort) {
- super(serverPort, sslServerPort);
- }
- @Override
- public int getBkEnsembleSize() {
- return 1;
- }
- @Override
- public int getBkWriteQuorumSize() {
- return 1;
- }
- @Override
- public int getBkAckQuorumSize() {
- return 1;
- }
- @Override
- public int getReadAheadCount() {
- return 4;
- }
- @Override
- public long getMaximumCacheSize() {
- return 32;
- }
- }
-
- @SuppressWarnings("deprecation")
- @Override
- protected ServerConfiguration getServerConfiguration(int serverPort, int sslServerPort) {
- ServerConfiguration serverConf = new TestServerConfiguration(serverPort, sslServerPort);
-
- org.apache.bookkeeper.conf.ClientConfiguration bkClientConf =
- new org.apache.bookkeeper.conf.ClientConfiguration();
- bkClientConf.setNumWorkerThreads(1).setReadTimeout(9999)
- .setThrottleValue(3);
- try {
- serverConf.addConf(bkClientConf);
- } catch (Exception e) {
- }
- return serverConf;
- }
-
- @Test(timeout=60000)
- public void testDeadlock() throws Exception {
- int numMessages = 5;
-
- SynchronousQueue<Boolean> consumeQueue = new SynchronousQueue<Boolean>();
-
- // subscribe to topic
- logger.info("Setup subscriptions");
- SubscriptionOptions opts = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
- subscriber.subscribe(topic, subscriberId, opts);
- subscriber.closeSubscription(topic, subscriberId);
-
- // publish 5 messages to form first ledger
- for (int i=0; i<numMessages; i++) {
- logger.info("Start publishing message {}", i);
- publisher.publish(topic, getMsg(i));
- }
-
- stopHubServers();
- Thread.sleep(1000);
- startHubServers();
-
- logger.info("Start publishing messages");
- // publish enough messages to second ledger
- // so a scan request need to scan over two ledgers, which
- // cause readEntries executed in the previous readEntries
- for (int i=0; i<numMessages; i++) {
- logger.info("Start publishing message {}", i+5);
- publisher.publish(topic, getMsg(i));
- }
-
- logger.info("Start subscribe topics again and receive messages");
- // subscribe to topic
- subscriber.subscribe(topic, subscriberId, opts);
- subscriber.startDelivery(topic, subscriberId,
- new TestMessageHandler(consumeQueue));
- for (int i=0; i<(2*numMessages+3); i++) {
- assertTrue(consumeQueue.take());
- }
- }
-
- protected void sleepBookies(final int seconds, final CountDownLatch l)
- throws InterruptedException, IOException {
- Thread sleeper = new Thread() {
- public void run() {
- try {
- bktb.suspendAllBookieServers();
- l.countDown();
- Thread.sleep(seconds * 1000);
- bktb.resumeAllBookieServers();
- } catch (Exception e) {
- logger.error("Error suspending thread", e);
- }
- }
- };
- sleeper.start();
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestLocalDBPersistenceManagerBlackBox.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestLocalDBPersistenceManagerBlackBox.java b/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestLocalDBPersistenceManagerBlackBox.java
deleted file mode 100644
index 856eab4..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestLocalDBPersistenceManagerBlackBox.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.persistence;
-
-import org.junit.After;
-import org.junit.Before;
-
-public class TestLocalDBPersistenceManagerBlackBox extends TestPersistenceManagerBlackBox {
-
- @After
- @Override
- public void tearDown() throws Exception {
- super.tearDown();
- ((LocalDBPersistenceManager) persistenceManager).reset();
- }
-
- @Before
- @Override
- public void setUp() throws Exception {
- super.setUp();
- }
-
- @Override
- long getLowestSeqId() {
- return 1;
- }
-
- @Override
- PersistenceManager instantiatePersistenceManager() {
- return LocalDBPersistenceManager.instance();
- }
-
- @Override
- public long getExpectedSeqId(int numPublished) {
- return numPublished;
- }
-
-}