You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/03/16 04:44:23 UTC
[13/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/netty/UmbrellaHandler.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/netty/UmbrellaHandler.java b/hedwig-server/src/main/java/org/apache/hedwig/server/netty/UmbrellaHandler.java
deleted file mode 100644
index 00a52bf..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/netty/UmbrellaHandler.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.netty;
-
-import java.io.IOException;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelHandler.Sharable;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.handler.codec.frame.CorruptedFrameException;
-import org.jboss.netty.handler.codec.frame.TooLongFrameException;
-import org.jboss.netty.handler.ssl.SslHandler;
-
-import org.apache.hedwig.exceptions.PubSubException.MalformedRequestException;
-import org.apache.hedwig.protocol.PubSubProtocol;
-import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
-import org.apache.hedwig.protoextensions.PubSubResponseUtils;
-import org.apache.hedwig.server.handlers.ChannelDisconnectListener;
-import org.apache.hedwig.server.handlers.Handler;
-
-@Sharable
-public class UmbrellaHandler extends SimpleChannelHandler {
- private static final Logger logger = LoggerFactory.getLogger(UmbrellaHandler.class);
-
- private final Map<OperationType, Handler> handlers;
- private final ChannelGroup allChannels;
- private final ChannelDisconnectListener channelDisconnectListener;
- private final boolean isSSLEnabled;
-
- public UmbrellaHandler(ChannelGroup allChannels, Map<OperationType, Handler> handlers,
- ChannelDisconnectListener channelDisconnectListener,
- boolean isSSLEnabled) {
- this.allChannels = allChannels;
- this.isSSLEnabled = isSSLEnabled;
- this.handlers = handlers;
- this.channelDisconnectListener = channelDisconnectListener;
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
- Throwable throwable = e.getCause();
-
- // Add here if there are more exceptions we need to be able to tolerate.
- // 1. IOException may be thrown when a channel is forcefully closed by
- // the other end, or by the ProtobufDecoder when an invalid protobuf is
- // received
- // 2. TooLongFrameException is thrown by the LengthBasedDecoder if it
- // receives a packet that is too big
- // 3. CorruptedFramException is thrown by the LengthBasedDecoder when
- // the length is negative etc.
- if (throwable instanceof IOException || throwable instanceof TooLongFrameException
- || throwable instanceof CorruptedFrameException) {
- e.getChannel().close();
- logger.debug("Uncaught exception", throwable);
- } else {
- // call our uncaught exception handler, which might decide to
- // shutdown the system
- Thread thread = Thread.currentThread();
- thread.getUncaughtExceptionHandler().uncaughtException(thread, throwable);
- }
-
- }
-
- @Override
- public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
- // If SSL is NOT enabled, then we can add this channel to the
- // ChannelGroup. Otherwise, that is done when the channel is connected
- // and the SSL handshake has completed successfully.
- if (!isSSLEnabled) {
- allChannels.add(ctx.getChannel());
- }
- }
-
- @Override
- public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
- if (isSSLEnabled) {
- ctx.getPipeline().get(SslHandler.class).handshake().addListener(new ChannelFutureListener() {
- public void operationComplete(ChannelFuture future) throws Exception {
- if (future.isSuccess()) {
- logger.debug("SSL handshake has completed successfully!");
- allChannels.add(future.getChannel());
- } else {
- future.getChannel().close();
- }
- }
- });
- }
- }
-
- @Override
- public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
- Channel channel = ctx.getChannel();
- // subscribe handler needs to know about channel disconnects
- channelDisconnectListener.channelDisconnected(channel);
- channel.close();
- }
-
- public static void sendErrorResponseToMalformedRequest(Channel channel, long txnId, String msg) {
- logger.debug("Malformed request from {}, msg = {}", channel.getRemoteAddress(), msg);
- MalformedRequestException mre = new MalformedRequestException(msg);
- PubSubResponse response = PubSubResponseUtils.getResponseForException(mre, txnId);
- channel.write(response);
- }
-
- @Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
-
- if (!(e.getMessage() instanceof PubSubProtocol.PubSubRequest)) {
- ctx.sendUpstream(e);
- return;
- }
-
- PubSubProtocol.PubSubRequest request = (PubSubProtocol.PubSubRequest) e.getMessage();
-
- Handler handler = handlers.get(request.getType());
- Channel channel = ctx.getChannel();
- long txnId = request.getTxnId();
-
- if (handler == null) {
- sendErrorResponseToMalformedRequest(channel, txnId, "Request type " + request.getType().getNumber()
- + " unknown");
- return;
- }
-
- handler.handleRequest(request, channel);
- ServerStats.getInstance().incrementRequestsReceived();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java
deleted file mode 100644
index b0b5a80..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java
+++ /dev/null
@@ -1,1263 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.persistence;
-
-import java.io.IOException;
-import java.util.Enumeration;
-import java.util.Iterator;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
-import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.LedgerEntry;
-import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.client.BookKeeper.DigestType;
-import org.apache.bookkeeper.versioning.Version;
-import org.apache.bookkeeper.versioning.Versioned;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.exceptions.PubSubException.ServerNotResponsibleForTopicException;
-import org.apache.hedwig.protocol.PubSubProtocol.LedgerRange;
-import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protoextensions.MessageIdUtils;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.common.TopicOpQueuer;
-import org.apache.hedwig.server.common.UnexpectedError;
-import org.apache.hedwig.server.meta.MetadataManagerFactory;
-import org.apache.hedwig.server.meta.TopicPersistenceManager;
-import org.apache.hedwig.server.persistence.ScanCallback.ReasonForFinish;
-import org.apache.hedwig.server.topics.TopicManager;
-import org.apache.hedwig.server.topics.TopicOwnershipChangeListener;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.zookeeper.SafeAsynBKCallback;
-import static org.apache.hedwig.util.VarArgs.va;
-import static com.google.common.base.Charsets.UTF_8;
-
-/**
- * This persistence manager uses zookeeper and bookkeeper to store messages.
- *
- * Information about topics are stored in zookeeper with a znode named after the
- * topic that contains an ASCII encoded list with records of the following form:
- *
- * <pre>
- * startSeqId(included)\tledgerId\n
- * </pre>
- *
- */
-
-public class BookkeeperPersistenceManager implements PersistenceManagerWithRangeScan, TopicOwnershipChangeListener {
- private static final Logger logger = LoggerFactory.getLogger(BookkeeperPersistenceManager.class);
- static byte[] passwd = "sillysecret".getBytes(UTF_8);
- private BookKeeper bk;
- private TopicPersistenceManager tpManager;
- private ServerConfiguration cfg;
- private TopicManager tm;
-
- private static final long START_SEQ_ID = 1L;
- // max number of entries allowed in a ledger
- private static final long UNLIMITED_ENTRIES = 0L;
- private final long maxEntriesPerLedger;
-
- static class InMemoryLedgerRange {
- LedgerRange range;
- LedgerHandle handle;
-
- public InMemoryLedgerRange(LedgerRange range, LedgerHandle handle) {
- this.range = range;
- this.handle = handle;
- }
-
- public InMemoryLedgerRange(LedgerRange range) {
- this(range, null);
- }
-
- public long getStartSeqIdIncluded() {
- assert range.hasStartSeqIdIncluded();
- return range.getStartSeqIdIncluded();
- }
- }
-
- static class TopicInfo {
- /**
- * stores the last message-seq-id vector that has been pushed to BK for
- * persistence (but not necessarily acked yet by BK)
- *
- */
- MessageSeqId lastSeqIdPushed;
-
- /**
- * stores the last message-id that has been acked by BK. This number is
- * basically used for limiting scans to not read past what has been
- * persisted by BK
- */
- long lastEntryIdAckedInCurrentLedger = -1; // because BK ledgers starts
- // at 0
-
- /**
- * stores a sorted structure of the ledgers for a topic, mapping from
- * the endSeqIdIncluded to the ledger info. This structure does not
- * include the current ledger
- */
- TreeMap<Long, InMemoryLedgerRange> ledgerRanges = new TreeMap<Long, InMemoryLedgerRange>();
- Version ledgerRangesVersion = Version.NEW;
-
- /**
- * This is the handle of the current ledger that is being used to write
- * messages
- */
- InMemoryLedgerRange currentLedgerRange;
-
- /**
- * Flag to release topic when encountering unrecoverable exceptions
- */
- AtomicBoolean doRelease = new AtomicBoolean(false);
-
- /**
- * Flag indicats the topic is changing ledger
- */
- AtomicBoolean doChangeLedger = new AtomicBoolean(false);
- /**
- * Last seq id to change ledger.
- */
- long lastSeqIdBeforeLedgerChange = -1;
- /**
- * List to buffer all persist requests during changing ledger.
- */
- LinkedList<PersistRequest> deferredRequests = null;
-
- final static int UNLIMITED = 0;
- int messageBound = UNLIMITED;
- }
-
- Map<ByteString, TopicInfo> topicInfos = new ConcurrentHashMap<ByteString, TopicInfo>();
-
- TopicOpQueuer queuer;
-
- /**
- * Instantiates a BookKeeperPersistence manager.
- *
- * @param bk
- * a reference to bookkeeper to use.
- * @param metaManagerFactory
- * a metadata manager factory handle to use.
- * @param tm
- * a reference to topic manager.
- * @param cfg
- * Server configuration object
- * @param executor
- * A executor
- */
- public BookkeeperPersistenceManager(BookKeeper bk, MetadataManagerFactory metaManagerFactory,
- TopicManager tm, ServerConfiguration cfg,
- ScheduledExecutorService executor) {
- this.bk = bk;
- this.tpManager = metaManagerFactory.newTopicPersistenceManager();
- this.cfg = cfg;
- this.tm = tm;
- this.maxEntriesPerLedger = cfg.getMaxEntriesPerLedger();
- queuer = new TopicOpQueuer(executor);
- tm.addTopicOwnershipChangeListener(this);
- }
-
- private static LedgerRange buildLedgerRange(long ledgerId, long startOfLedger,
- MessageSeqId endOfLedger) {
- LedgerRange.Builder builder =
- LedgerRange.newBuilder().setLedgerId(ledgerId).setStartSeqIdIncluded(startOfLedger)
- .setEndSeqIdIncluded(endOfLedger);
- return builder.build();
- }
-
- class RangeScanOp extends TopicOpQueuer.SynchronousOp {
- RangeScanRequest request;
- int numMessagesRead = 0;
- long totalSizeRead = 0;
- TopicInfo topicInfo;
- long startSeqIdToScan;
-
- public RangeScanOp(RangeScanRequest request) {
- this(request, -1L, 0, 0L);
- }
-
- public RangeScanOp(RangeScanRequest request, long startSeqId, int numMessagesRead, long totalSizeRead) {
- queuer.super(request.topic);
- this.request = request;
- this.startSeqIdToScan = startSeqId;
- this.numMessagesRead = numMessagesRead;
- this.totalSizeRead = totalSizeRead;
- }
-
- @Override
- protected void runInternal() {
- topicInfo = topicInfos.get(topic);
-
- if (topicInfo == null) {
- request.callback.scanFailed(request.ctx, new PubSubException.ServerNotResponsibleForTopicException(""));
- return;
- }
-
- // if startSeqIdToScan is less than zero, which means it is an unfinished scan request
- // we continue the scan from the provided position
- startReadingFrom(startSeqIdToScan < 0 ? request.startSeqId : startSeqIdToScan);
- }
-
- protected void read(final InMemoryLedgerRange imlr, final long startSeqId, final long endSeqId) {
- // Verify whether startSeqId falls in ledger range.
- // Only the left endpoint of range needs to be checked.
- if (imlr.getStartSeqIdIncluded() > startSeqId) {
- logger.error(
- "Invalid RangeScan read, startSeqId {} doesn't fall in ledger range [{} ~ {}]",
- va(startSeqId, imlr.getStartSeqIdIncluded(), imlr.range.hasEndSeqIdIncluded() ? imlr.range
- .getEndSeqIdIncluded().getLocalComponent() : ""));
- request.callback.scanFailed(request.ctx, new PubSubException.UnexpectedConditionException("Scan request is out of range"));
-
- // try release topic to reset the state
- lostTopic(topic);
- return;
- }
-
- if (imlr.handle == null) {
-
- bk.asyncOpenLedger(imlr.range.getLedgerId(), DigestType.CRC32, passwd,
- new SafeAsynBKCallback.OpenCallback() {
- @Override
- public void safeOpenComplete(int rc, LedgerHandle ledgerHandle, Object ctx) {
- if (rc == BKException.Code.OK) {
- imlr.handle = ledgerHandle;
- read(imlr, startSeqId, endSeqId);
- return;
- }
- BKException bke = BKException.create(rc);
- logger.error("Could not open ledger: " + imlr.range.getLedgerId() + " for topic: "
- + topic);
- request.callback.scanFailed(ctx, new PubSubException.ServiceDownException(bke));
- return;
- }
- }, request.ctx);
- return;
- }
-
- // ledger handle is not null, we can read from it
- long correctedEndSeqId = Math.min(startSeqId + request.messageLimit - numMessagesRead - 1, endSeqId);
-
- if (logger.isDebugEnabled()) {
- logger.debug("Issuing a bk read for ledger: " + imlr.handle.getId() + " from entry-id: "
- + (startSeqId - imlr.getStartSeqIdIncluded()) + " to entry-id: "
- + (correctedEndSeqId - imlr.getStartSeqIdIncluded()));
- }
-
- imlr.handle.asyncReadEntries(startSeqId - imlr.getStartSeqIdIncluded(), correctedEndSeqId
- - imlr.getStartSeqIdIncluded(), new SafeAsynBKCallback.ReadCallback() {
-
- long expectedEntryId = startSeqId - imlr.getStartSeqIdIncluded();
-
- @Override
- public void safeReadComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
- if (rc != BKException.Code.OK || !seq.hasMoreElements()) {
- if (rc == BKException.Code.OK) {
- // means that there is no entries read, provide a meaningful exception
- rc = BKException.Code.NoSuchEntryException;
- }
- BKException bke = BKException.create(rc);
- logger.error("Error while reading from ledger: " + imlr.range.getLedgerId() + " for topic: "
- + topic.toStringUtf8(), bke);
- request.callback.scanFailed(request.ctx, new PubSubException.ServiceDownException(bke));
- return;
- }
-
- LedgerEntry entry = null;
- while (seq.hasMoreElements()) {
- entry = seq.nextElement();
- Message message;
- try {
- message = Message.parseFrom(entry.getEntryInputStream());
- } catch (IOException e) {
- String msg = "Unreadable message found in ledger: " + imlr.range.getLedgerId()
- + " for topic: " + topic.toStringUtf8();
- logger.error(msg, e);
- request.callback.scanFailed(ctx, new PubSubException.UnexpectedConditionException(msg));
- return;
- }
-
- logger.debug("Read response from ledger: {} entry-id: {}",
- lh.getId(), entry.getEntryId());
-
- assert expectedEntryId == entry.getEntryId() : "expectedEntryId (" + expectedEntryId
- + ") != entry.getEntryId() (" + entry.getEntryId() + ")";
- assert (message.getMsgId().getLocalComponent() - imlr.getStartSeqIdIncluded()) == expectedEntryId;
-
- expectedEntryId++;
- request.callback.messageScanned(ctx, message);
- numMessagesRead++;
- totalSizeRead += message.getBody().size();
-
- if (numMessagesRead >= request.messageLimit) {
- request.callback.scanFinished(ctx, ReasonForFinish.NUM_MESSAGES_LIMIT_EXCEEDED);
- return;
- }
-
- if (totalSizeRead >= request.sizeLimit) {
- request.callback.scanFinished(ctx, ReasonForFinish.SIZE_LIMIT_EXCEEDED);
- return;
- }
- }
-
- // continue scanning messages
- scanMessages(request, imlr.getStartSeqIdIncluded() + entry.getEntryId() + 1, numMessagesRead, totalSizeRead);
- }
- }, request.ctx);
- }
-
- protected void startReadingFrom(long startSeqId) {
-
- Map.Entry<Long, InMemoryLedgerRange> entry = topicInfo.ledgerRanges.ceilingEntry(startSeqId);
-
- if (entry == null) {
- // None of the old ledgers have this seq-id, we must use the
- // current ledger
- long endSeqId = topicInfo.currentLedgerRange.getStartSeqIdIncluded()
- + topicInfo.lastEntryIdAckedInCurrentLedger;
-
- if (endSeqId < startSeqId) {
- request.callback.scanFinished(request.ctx, ReasonForFinish.NO_MORE_MESSAGES);
- return;
- }
-
- read(topicInfo.currentLedgerRange, startSeqId, endSeqId);
- } else {
- read(entry.getValue(), startSeqId, entry.getValue().range.getEndSeqIdIncluded().getLocalComponent());
- }
-
- }
-
- }
-
- @Override
- public void scanMessages(RangeScanRequest request) {
- queuer.pushAndMaybeRun(request.topic, new RangeScanOp(request));
- }
-
- protected void scanMessages(RangeScanRequest request, long scanSeqId, int numMsgsRead, long totalSizeRead) {
- queuer.pushAndMaybeRun(request.topic, new RangeScanOp(request, scanSeqId, numMsgsRead, totalSizeRead));
- }
-
- public void deliveredUntil(ByteString topic, Long seqId) {
- // Nothing to do here. this is just a hint that we cannot use.
- }
-
- class UpdateLedgerOp extends TopicOpQueuer.AsynchronousOp<Void> {
- private Set<Long> ledgersDeleted;
-
- public UpdateLedgerOp(ByteString topic, final Callback<Void> cb, final Object ctx,
- Set<Long> ledgersDeleted) {
- queuer.super(topic, cb, ctx);
- this.ledgersDeleted = ledgersDeleted;
- }
-
- @Override
- public void run() {
- final TopicInfo topicInfo = topicInfos.get(topic);
- if (topicInfo == null) {
- logger.error("Server is not responsible for topic!");
- cb.operationFailed(ctx, new PubSubException.ServerNotResponsibleForTopicException(""));
- return;
- }
- LedgerRanges.Builder builder = LedgerRanges.newBuilder();
- final Set<Long> keysToRemove = new HashSet<Long>();
- boolean foundUnconsumedLedger = false;
- for (Map.Entry<Long, InMemoryLedgerRange> e : topicInfo.ledgerRanges.entrySet()) {
- LedgerRange lr = e.getValue().range;
- long ledgerId = lr.getLedgerId();
- if (!foundUnconsumedLedger && ledgersDeleted.contains(ledgerId)) {
- keysToRemove.add(e.getKey());
- if (!lr.hasEndSeqIdIncluded()) {
- String msg = "Should not remove unclosed ledger " + ledgerId + " for topic " + topic.toStringUtf8();
- logger.error(msg);
- cb.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg));
- return;
- }
- } else {
- foundUnconsumedLedger = true;
- builder.addRanges(lr);
- }
- }
- builder.addRanges(topicInfo.currentLedgerRange.range);
-
- if (!keysToRemove.isEmpty()) {
- final LedgerRanges newRanges = builder.build();
- tpManager.writeTopicPersistenceInfo(
- topic, newRanges, topicInfo.ledgerRangesVersion, new Callback<Version>() {
- public void operationFinished(Object ctx, Version newVersion) {
- // Finally, all done
- for (Long k : keysToRemove) {
- topicInfo.ledgerRanges.remove(k);
- }
- topicInfo.ledgerRangesVersion = newVersion;
- cb.operationFinished(ctx, null);
- }
- public void operationFailed(Object ctx, PubSubException exception) {
- cb.operationFailed(ctx, exception);
- }
- }, ctx);
- } else {
- cb.operationFinished(ctx, null);
- }
- }
- }
-
- class ConsumeUntilOp extends TopicOpQueuer.SynchronousOp {
- private final long seqId;
-
- public ConsumeUntilOp(ByteString topic, long seqId) {
- queuer.super(topic);
- this.seqId = seqId;
- }
-
- @Override
- public void runInternal() {
- TopicInfo topicInfo = topicInfos.get(topic);
- if (topicInfo == null) {
- logger.error("Server is not responsible for topic!");
- return;
- }
-
- final LinkedList<Long> ledgersToDelete = new LinkedList<Long>();
- for (Long endSeqIdIncluded : topicInfo.ledgerRanges.keySet()) {
- if (endSeqIdIncluded <= seqId) {
- // This ledger's message entries have all been consumed already
- // so it is safe to delete it from BookKeeper.
- long ledgerId = topicInfo.ledgerRanges.get(endSeqIdIncluded).range.getLedgerId();
- ledgersToDelete.add(ledgerId);
- } else {
- break;
- }
- }
-
- // no ledgers need to delete
- if (ledgersToDelete.isEmpty()) {
- return;
- }
-
- Set<Long> ledgersDeleted = new HashSet<Long>();
- deleteLedgersAndUpdateLedgersRange(topic, ledgersToDelete, ledgersDeleted);
- }
- }
-
- private void deleteLedgersAndUpdateLedgersRange(final ByteString topic,
- final LinkedList<Long> ledgersToDelete,
- final Set<Long> ledgersDeleted) {
- if (ledgersToDelete.isEmpty()) {
- Callback<Void> cb = new Callback<Void>() {
- public void operationFinished(Object ctx, Void result) {
- // do nothing, op is async to stop other ops
- // occurring on the topic during the update
- }
- public void operationFailed(Object ctx, PubSubException exception) {
- logger.error("Failed to update ledger znode for topic {} deleting ledgers {} : {}",
- va(topic.toStringUtf8(), ledgersDeleted, exception.getMessage()));
- }
- };
- queuer.pushAndMaybeRun(topic, new UpdateLedgerOp(topic, cb, null, ledgersDeleted));
- return;
- }
-
- final Long ledger = ledgersToDelete.poll();
- if (null == ledger) {
- deleteLedgersAndUpdateLedgersRange(topic, ledgersToDelete, ledgersDeleted);
- return;
- }
-
- bk.asyncDeleteLedger(ledger, new DeleteCallback() {
- @Override
- public void deleteComplete(int rc, Object ctx) {
- if (BKException.Code.NoSuchLedgerExistsException == rc ||
- BKException.Code.OK == rc) {
- ledgersDeleted.add(ledger);
- deleteLedgersAndUpdateLedgersRange(topic, ledgersToDelete, ledgersDeleted);
- return;
- } else {
- logger.warn("Exception while deleting consumed ledger {}, stop deleting other ledgers {} "
- + "and update ledger ranges with deleted ledgers {} : {}",
- va(ledger, ledgersToDelete, ledgersDeleted, BKException.create(rc)));
- // We should not continue when failed to delete ledger
- Callback<Void> cb = new Callback<Void>() {
- public void operationFinished(Object ctx, Void result) {
- // do nothing, op is async to stop other ops
- // occurring on the topic during the update
- }
- public void operationFailed(Object ctx, PubSubException exception) {
- logger.error("Failed to update ledger znode for topic {} deleting ledgers {} : {}",
- va(topic, ledgersDeleted, exception.getMessage()));
- }
- };
- queuer.pushAndMaybeRun(topic, new UpdateLedgerOp(topic, cb, null, ledgersDeleted));
- return;
- }
- }
- }, null);
- }
-
- public void consumedUntil(ByteString topic, Long seqId) {
- queuer.pushAndMaybeRun(topic, new ConsumeUntilOp(topic, Math.max(seqId, getMinSeqIdForTopic(topic))));
- }
-
- public void consumeToBound(ByteString topic) {
- TopicInfo topicInfo = topicInfos.get(topic);
-
- if (topicInfo == null || topicInfo.messageBound == topicInfo.UNLIMITED) {
- return;
- }
- queuer.pushAndMaybeRun(topic, new ConsumeUntilOp(topic, getMinSeqIdForTopic(topic)));
- }
-
- public long getMinSeqIdForTopic(ByteString topic) {
- TopicInfo topicInfo = topicInfos.get(topic);
-
- if (topicInfo == null || topicInfo.messageBound == topicInfo.UNLIMITED) {
- return Long.MIN_VALUE;
- } else {
- return (topicInfo.lastSeqIdPushed.getLocalComponent() - topicInfo.messageBound) + 1;
- }
- }
-
- public MessageSeqId getCurrentSeqIdForTopic(ByteString topic) throws ServerNotResponsibleForTopicException {
- TopicInfo topicInfo = topicInfos.get(topic);
-
- if (topicInfo == null) {
- throw new PubSubException.ServerNotResponsibleForTopicException("");
- }
-
- return topicInfo.lastSeqIdPushed;
- }
-
- public long getSeqIdAfterSkipping(ByteString topic, long seqId, int skipAmount) {
- return Math.max(seqId + skipAmount, getMinSeqIdForTopic(topic));
- }
-
- /**
- * Release topic on failure
- *
- * @param topic
- * Topic Name
- * @param e
- * Failure Exception
- * @param ctx
- * Callback context
- */
- protected void releaseTopicIfRequested(final ByteString topic, Exception e, Object ctx) {
- TopicInfo topicInfo = topicInfos.get(topic);
- if (topicInfo == null) {
- logger.warn("No topic found when trying to release ownership of topic " + topic.toStringUtf8()
- + " on failure.");
- return;
- }
- // do release owner ship of topic
- if (topicInfo.doRelease.compareAndSet(false, true)) {
- logger.info("Release topic " + topic.toStringUtf8() + " when bookkeeper persistence mananger encounters failure :",
- e);
- tm.releaseTopic(topic, new Callback<Void>() {
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- logger.error("Exception found on releasing topic " + topic.toStringUtf8()
- + " when encountering exception from bookkeeper:", exception);
- }
- @Override
- public void operationFinished(Object ctx, Void resultOfOperation) {
- logger.info("successfully releasing topic {} when encountering"
- + " exception from bookkeeper", topic.toStringUtf8());
- }
- }, null);
- }
- // if release happens when the topic is changing ledger
- // we need to fail all queued persist requests
- if (topicInfo.doChangeLedger.get()) {
- for (PersistRequest pr : topicInfo.deferredRequests) {
- pr.getCallback().operationFailed(ctx, new PubSubException.ServiceDownException(e));
- }
- topicInfo.deferredRequests.clear();
- topicInfo.lastSeqIdBeforeLedgerChange = -1;
- }
- }
-
- public class PersistOp extends TopicOpQueuer.SynchronousOp {
- PersistRequest request;
-
- public PersistOp(PersistRequest request) {
- queuer.super(request.topic);
- this.request = request;
- }
-
- @Override
- public void runInternal() {
- doPersistMessage(request);
- }
- }
-
- /**
- * Persist a message by executing a persist request.
- */
- protected void doPersistMessage(final PersistRequest request) {
- final ByteString topic = request.topic;
- final TopicInfo topicInfo = topicInfos.get(topic);
-
- if (topicInfo == null) {
- request.getCallback().operationFailed(request.ctx,
- new PubSubException.ServerNotResponsibleForTopicException(""));
- return;
- }
-
- if (topicInfo.doRelease.get()) {
- request.getCallback().operationFailed(request.ctx, new PubSubException.ServiceDownException(
- "The ownership of the topic is releasing due to unrecoverable issue."));
- return;
- }
-
- // if the topic is changing ledger, queue following persist requests until ledger is changed
- if (topicInfo.doChangeLedger.get()) {
- logger.info("Topic {} is changing ledger, so queue persist request for message.",
- topic.toStringUtf8());
- topicInfo.deferredRequests.add(request);
- return;
- }
-
- final long localSeqId = topicInfo.lastSeqIdPushed.getLocalComponent() + 1;
- MessageSeqId.Builder builder = MessageSeqId.newBuilder();
- if (request.message.hasMsgId()) {
- MessageIdUtils.takeRegionMaximum(builder, topicInfo.lastSeqIdPushed, request.message.getMsgId());
- } else {
- builder.addAllRemoteComponents(topicInfo.lastSeqIdPushed.getRemoteComponentsList());
- }
- builder.setLocalComponent(localSeqId);
-
- // check whether reach the threshold of a ledger, if it does,
- // open a ledger to write
- long entriesInThisLedger = localSeqId - topicInfo.currentLedgerRange.getStartSeqIdIncluded() + 1;
- if (UNLIMITED_ENTRIES != maxEntriesPerLedger &&
- entriesInThisLedger >= maxEntriesPerLedger) {
- if (topicInfo.doChangeLedger.compareAndSet(false, true)) {
- // for order guarantees, we should wait until all the adding operations for current ledger
- // are succeed. so we just mark it as lastSeqIdBeforeLedgerChange
- // when the lastSeqIdBeforeLedgerChange acked, we do changing the ledger
- if (null == topicInfo.deferredRequests) {
- topicInfo.deferredRequests = new LinkedList<PersistRequest>();
- }
- topicInfo.lastSeqIdBeforeLedgerChange = localSeqId;
- }
- }
-
- topicInfo.lastSeqIdPushed = builder.build();
- Message msgToSerialize = Message.newBuilder(request.message).setMsgId(topicInfo.lastSeqIdPushed).build();
-
- final MessageSeqId responseSeqId = msgToSerialize.getMsgId();
- topicInfo.currentLedgerRange.handle.asyncAddEntry(msgToSerialize.toByteArray(),
- new SafeAsynBKCallback.AddCallback() {
- AtomicBoolean processed = new AtomicBoolean(false);
- @Override
- public void safeAddComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
-
- // avoid double callback by mistake, since we may do change ledger in this callback.
- if (!processed.compareAndSet(false, true)) {
- return;
- }
- if (rc != BKException.Code.OK) {
- BKException bke = BKException.create(rc);
- logger.error("Error while persisting entry to ledger: " + lh.getId() + " for topic: "
- + topic.toStringUtf8(), bke);
- request.getCallback().operationFailed(ctx, new PubSubException.ServiceDownException(bke));
-
- // To preserve ordering guarantees, we
- // should give up the topic and not let
- // other operations through
- releaseTopicIfRequested(request.topic, bke, ctx);
- return;
- }
-
- if (entryId + topicInfo.currentLedgerRange.getStartSeqIdIncluded() != localSeqId) {
- String msg = "Expected BK to assign entry-id: "
- + (localSeqId - topicInfo.currentLedgerRange.getStartSeqIdIncluded())
- + " but it instead assigned entry-id: " + entryId + " topic: "
- + topic.toStringUtf8() + "ledger: " + lh.getId();
- logger.error(msg);
- throw new UnexpectedError(msg);
- }
-
- topicInfo.lastEntryIdAckedInCurrentLedger = entryId;
- request.getCallback().operationFinished(ctx, responseSeqId);
- // if this acked entry is the last entry of current ledger
- // we can add a ChangeLedgerOp to execute to change ledger
- if (topicInfo.doChangeLedger.get() &&
- entryId + topicInfo.currentLedgerRange.getStartSeqIdIncluded() == topicInfo.lastSeqIdBeforeLedgerChange) {
- // change ledger
- changeLedger(topic, new Callback<Void>() {
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- logger.error("Failed to change ledger for topic " + topic.toStringUtf8(), exception);
- // change ledger failed, we should give up topic
- releaseTopicIfRequested(request.topic, exception, ctx);
- }
- @Override
- public void operationFinished(Object ctx, Void resultOfOperation) {
- topicInfo.doChangeLedger.set(false);
- topicInfo.lastSeqIdBeforeLedgerChange = -1;
- // the ledger is changed, persist queued requests
- // if the number of queued persist requests is more than maxEntriesPerLedger
- // we just persist maxEntriesPerLedger requests, other requests are still queued
- // until next ledger changed.
- int numRequests = 0;
- while (!topicInfo.deferredRequests.isEmpty() &&
- numRequests < maxEntriesPerLedger) {
- PersistRequest pr = topicInfo.deferredRequests.removeFirst();
- doPersistMessage(pr);
- ++numRequests;
- }
- logger.debug("Finished persisting {} queued requests, but there are still {} requests in queue.",
- numRequests, topicInfo.deferredRequests.size());
- }
- }, ctx);
- }
- }
- }, request.ctx);
- }
-
- public void persistMessage(PersistRequest request) {
- queuer.pushAndMaybeRun(request.topic, new PersistOp(request));
- }
-
- public void scanSingleMessage(ScanRequest request) {
- throw new RuntimeException("Not implemented");
- }
-
- static SafeAsynBKCallback.CloseCallback noOpCloseCallback = new SafeAsynBKCallback.CloseCallback() {
- @Override
- public void safeCloseComplete(int rc, LedgerHandle ledgerHandle, Object ctx) {
- };
- };
-
- class AcquireOp extends TopicOpQueuer.AsynchronousOp<Void> {
- public AcquireOp(ByteString topic, Callback<Void> cb, Object ctx) {
- queuer.super(topic, cb, ctx);
- }
-
- @Override
- public void run() {
- if (topicInfos.containsKey(topic)) {
- // Already acquired, do nothing
- cb.operationFinished(ctx, null);
- return;
- }
-
- // read persistence info
- tpManager.readTopicPersistenceInfo(topic, new Callback<Versioned<LedgerRanges>>() {
- @Override
- public void operationFinished(Object ctx, Versioned<LedgerRanges> ranges) {
- if (null != ranges) {
- processTopicLedgerRanges(ranges.getValue(), ranges.getVersion());
- } else {
- processTopicLedgerRanges(LedgerRanges.getDefaultInstance(), Version.NEW);
- }
- }
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- cb.operationFailed(ctx, exception);
- }
- }, ctx);
- }
-
- void processTopicLedgerRanges(final LedgerRanges ranges, final Version version) {
- final List<LedgerRange> rangesList = ranges.getRangesList();
- if (!rangesList.isEmpty()) {
- LedgerRange range = rangesList.get(0);
- if (range.hasStartSeqIdIncluded()) {
- // we already have start seq id
- processTopicLedgerRanges(rangesList, version, range.getStartSeqIdIncluded());
- return;
- }
- getStartSeqIdToProcessTopicLedgerRanges(rangesList, version);
- return;
- }
- // process topic ledger ranges directly
- processTopicLedgerRanges(rangesList, version, START_SEQ_ID);
- }
-
- /**
- * Process old version ledger ranges to fetch start seq id.
- */
- void getStartSeqIdToProcessTopicLedgerRanges(
- final List<LedgerRange> rangesList, final Version version) {
-
- final LedgerRange range = rangesList.get(0);
-
- if (!range.hasEndSeqIdIncluded()) {
- // process topic ledger ranges directly
- processTopicLedgerRanges(rangesList, version, START_SEQ_ID);
- return;
- }
-
- final long ledgerId = range.getLedgerId();
- // open the first ledger to compute right start seq id
- bk.asyncOpenLedger(ledgerId, DigestType.CRC32, passwd,
- new SafeAsynBKCallback.OpenCallback() {
-
- @Override
- public void safeOpenComplete(int rc, LedgerHandle ledgerHandle, Object ctx) {
-
- if (rc == BKException.Code.NoSuchLedgerExistsException) {
- // process next ledger
- processTopicLedgerRanges(rangesList, version, START_SEQ_ID);
- return;
- } else if (rc != BKException.Code.OK) {
- BKException bke = BKException.create(rc);
- logger.error("Could not open ledger {} to get start seq id while acquiring topic {} : {}",
- va(ledgerId, topic.toStringUtf8(), bke));
- cb.operationFailed(ctx, new PubSubException.ServiceDownException(bke));
- return;
- }
-
- final long numEntriesInLastLedger = ledgerHandle.getLastAddConfirmed() + 1;
-
- // the ledger is closed before, calling close is just a nop operation.
- try {
- ledgerHandle.close();
- } catch (InterruptedException ie) {
- // the exception would never be thrown for a read only ledger handle.
- } catch (BKException bke) {
- // the exception would never be thrown for a read only ledger handle.
- }
-
- if (numEntriesInLastLedger <= 0) {
- String msg = "No entries found in a have-end-seq-id ledger " + ledgerId
- + " when acquiring topic " + topic.toStringUtf8() + ".";
- logger.error(msg);
- cb.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg));
- return;
- }
- long endOfLedger = range.getEndSeqIdIncluded().getLocalComponent();
- long startOfLedger = endOfLedger - numEntriesInLastLedger + 1;
-
- processTopicLedgerRanges(rangesList, version, startOfLedger);
- }
-
- }, ctx);
- }
-
- void processTopicLedgerRanges(final List<LedgerRange> rangesList, final Version version,
- long startOfLedger) {
- logger.info("Process {} ledgers for topic {} starting from seq id {}.",
- va(rangesList.size(), topic.toStringUtf8(), startOfLedger));
-
- Iterator<LedgerRange> lrIterator = rangesList.iterator();
-
- TopicInfo topicInfo = new TopicInfo();
- while (lrIterator.hasNext()) {
- LedgerRange range = lrIterator.next();
-
- if (range.hasEndSeqIdIncluded()) {
- // this means it was a valid and completely closed ledger
- long endOfLedger = range.getEndSeqIdIncluded().getLocalComponent();
- if (range.hasStartSeqIdIncluded()) {
- startOfLedger = range.getStartSeqIdIncluded();
- } else {
- range = buildLedgerRange(range.getLedgerId(), startOfLedger,
- range.getEndSeqIdIncluded());
- }
- topicInfo.ledgerRanges.put(endOfLedger, new InMemoryLedgerRange(range));
- if (startOfLedger < endOfLedger + 1) {
- startOfLedger = endOfLedger + 1;
- }
- continue;
- }
-
- // If it doesn't have a valid end, it must be the last ledger
- if (lrIterator.hasNext()) {
- String msg = "Ledger-id: " + range.getLedgerId() + " for topic: " + topic.toStringUtf8()
- + " is not the last one but still does not have an end seq-id";
- logger.error(msg);
- cb.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg));
- return;
- }
-
- if (range.hasStartSeqIdIncluded()) {
- startOfLedger = range.getStartSeqIdIncluded();
- }
-
- // The last ledger does not have a valid seq-id, lets try to
- // find it out
- recoverLastTopicLedgerAndOpenNewOne(range.getLedgerId(), startOfLedger,
- version, topicInfo);
- return;
- }
-
- // All ledgers were found properly closed, just start a new one
- openNewTopicLedger(topic, version, topicInfo, startOfLedger, false, cb, ctx);
- }
-
- /**
- * Recovers the last ledger, opens a new one, and persists the new
- * information to ZK
- *
- * @param ledgerId
- * Ledger to be recovered
- * @param expectedStartSeqId
- * Start seq id of the ledger to recover
- * @param expectedVersionOfLedgerNode
- * Expected version to update ledgers range
- * @param topicInfo
- * Topic info
- */
- private void recoverLastTopicLedgerAndOpenNewOne(final long ledgerId, final long expectedStartSeqId,
- final Version expectedVersionOfLedgerNode, final TopicInfo topicInfo) {
-
- bk.asyncOpenLedger(ledgerId, DigestType.CRC32, passwd, new SafeAsynBKCallback.OpenCallback() {
- @Override
- public void safeOpenComplete(int rc, LedgerHandle ledgerHandle, Object ctx) {
-
- if (rc != BKException.Code.OK) {
- BKException bke = BKException.create(rc);
- logger.error("While acquiring topic: " + topic.toStringUtf8()
- + ", could not open unrecovered ledger: " + ledgerId, bke);
- cb.operationFailed(ctx, new PubSubException.ServiceDownException(bke));
- return;
- }
-
- final long numEntriesInLastLedger = ledgerHandle.getLastAddConfirmed() + 1;
-
- if (numEntriesInLastLedger <= 0) {
- // this was an empty ledger that someone created but
- // couldn't write to, so just ignore it
- logger.info("Pruning empty ledger: " + ledgerId + " for topic: " + topic.toStringUtf8());
- closeLedger(ledgerHandle);
- openNewTopicLedger(topic, expectedVersionOfLedgerNode, topicInfo,
- expectedStartSeqId, false, cb, ctx);
- return;
- }
-
- // we have to read the last entry of the ledger to find
- // out the last seq-id
-
- ledgerHandle.asyncReadEntries(numEntriesInLastLedger - 1, numEntriesInLastLedger - 1,
- new SafeAsynBKCallback.ReadCallback() {
- @Override
- public void safeReadComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq,
- Object ctx) {
- if (rc != BKException.Code.OK || !seq.hasMoreElements()) {
- if (rc == BKException.Code.OK) {
- // means that there is no entries read, provide a meaningful exception
- rc = BKException.Code.NoSuchEntryException;
- }
- logger.info("Received error code {}", rc);
- BKException bke = BKException.create(rc);
- logger.error("While recovering ledger: " + ledgerId + " for topic: "
- + topic.toStringUtf8() + ", could not read last entry", bke);
- cb.operationFailed(ctx, new PubSubException.ServiceDownException(bke));
- return;
- }
-
- Message lastMessage;
- try {
- lastMessage = Message.parseFrom(seq.nextElement().getEntry());
- } catch (InvalidProtocolBufferException e) {
- String msg = "While recovering ledger: " + ledgerId + " for topic: "
- + topic.toStringUtf8() + ", could not deserialize last message";
- logger.error(msg, e);
- cb.operationFailed(ctx, new PubSubException.UnexpectedConditionException(msg));
- return;
- }
-
- long endOfLedger = lastMessage.getMsgId().getLocalComponent();
- long startOfLedger = endOfLedger - numEntriesInLastLedger + 1;
-
- if (startOfLedger != expectedStartSeqId) {
- // gap would be introduced by old version when gc consumed ledgers
- String msg = "Expected start seq id of recovered ledger " + ledgerId
- + " to be " + expectedStartSeqId + " but it was "
- + startOfLedger + ".";
- logger.warn(msg);
- }
-
- LedgerRange lr = buildLedgerRange(ledgerId, startOfLedger, lastMessage.getMsgId());
- topicInfo.ledgerRanges.put(endOfLedger,
- new InMemoryLedgerRange(lr, lh));
-
- logger.info("Recovered unclosed ledger: {} for topic: {} with {} entries starting from seq id {}",
- va(ledgerId, topic.toStringUtf8(), numEntriesInLastLedger, startOfLedger));
-
- openNewTopicLedger(topic, expectedVersionOfLedgerNode, topicInfo, endOfLedger + 1, false, cb, ctx);
- }
- }, ctx);
-
- }
-
- }, ctx);
- }
- }
-
- /**
- * Open New Ledger to write for a topic.
- *
- * @param topic
- * Topic Name
- * @param expectedVersionOfLedgersNode
- * Expected Version to Update Ledgers Node.
- * @param topicInfo
- * Topic Information
- * @param startSeqId
- * Start of sequence id for new ledger
- * @param changeLedger
- * Whether is it called when changing ledger
- * @param cb
- * Callback to trigger after opening new ledger.
- * @param ctx
- * Callback context.
- */
- void openNewTopicLedger(final ByteString topic,
- final Version expectedVersionOfLedgersNode, final TopicInfo topicInfo,
- final long startSeqId, final boolean changeLedger,
- final Callback<Void> cb, final Object ctx) {
- bk.asyncCreateLedger(cfg.getBkEnsembleSize(), cfg.getBkWriteQuorumSize(),
- cfg.getBkAckQuorumSize(), DigestType.CRC32, passwd,
- new SafeAsynBKCallback.CreateCallback() {
- AtomicBoolean processed = new AtomicBoolean(false);
-
- @Override
- public void safeCreateComplete(int rc, LedgerHandle lh, Object ctx) {
- if (!processed.compareAndSet(false, true)) {
- return;
- }
-
- if (rc != BKException.Code.OK) {
- BKException bke = BKException.create(rc);
- logger.error("Could not create new ledger while acquiring topic: "
- + topic.toStringUtf8(), bke);
- cb.operationFailed(ctx, new PubSubException.ServiceDownException(bke));
- return;
- }
-
- // compute last seq id
- if (!changeLedger) {
- topicInfo.lastSeqIdPushed = topicInfo.ledgerRanges.isEmpty() ? MessageSeqId.newBuilder()
- .setLocalComponent(startSeqId - 1).build() : topicInfo.ledgerRanges.lastEntry().getValue().range
- .getEndSeqIdIncluded();
- }
-
- LedgerRange lastRange = LedgerRange.newBuilder().setLedgerId(lh.getId())
- .setStartSeqIdIncluded(startSeqId).build();
- topicInfo.currentLedgerRange = new InMemoryLedgerRange(lastRange, lh);
- topicInfo.lastEntryIdAckedInCurrentLedger = -1;
-
- // Persist the fact that we started this new
- // ledger to ZK
-
- LedgerRanges.Builder builder = LedgerRanges.newBuilder();
- for (InMemoryLedgerRange imlr : topicInfo.ledgerRanges.values()) {
- builder.addRanges(imlr.range);
- }
- builder.addRanges(lastRange);
-
- tpManager.writeTopicPersistenceInfo(
- topic, builder.build(), expectedVersionOfLedgersNode, new Callback<Version>() {
- @Override
- public void operationFinished(Object ctx, Version newVersion) {
- // Finally, all done
- topicInfo.ledgerRangesVersion = newVersion;
- topicInfos.put(topic, topicInfo);
- cb.operationFinished(ctx, null);
- }
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- cb.operationFailed(ctx, exception);
- }
- }, ctx);
- return;
- }
- }, ctx);
- }
-
- /**
- * acquire ownership of a topic, doing whatever is needed to be able to
- * perform reads and writes on that topic from here on
- *
- * @param topic
- * @param callback
- * @param ctx
- */
- @Override
- public void acquiredTopic(ByteString topic, Callback<Void> callback, Object ctx) {
- queuer.pushAndMaybeRun(topic, new AcquireOp(topic, callback, ctx));
- }
-
- /**
- * Change ledger to write for a topic.
- */
- class ChangeLedgerOp extends TopicOpQueuer.AsynchronousOp<Void> {
-
- public ChangeLedgerOp(ByteString topic, Callback<Void> cb, Object ctx) {
- queuer.super(topic, cb, ctx);
- }
-
- @Override
- public void run() {
- TopicInfo topicInfo = topicInfos.get(topic);
- if (null == topicInfo) {
- logger.error("Weired! hub server doesn't own topic " + topic.toStringUtf8()
- + " when changing ledger to write.");
- cb.operationFailed(ctx, new PubSubException.ServerNotResponsibleForTopicException(""));
- return;
- }
- closeLastTopicLedgerAndOpenNewOne(topicInfo);
- }
-
- private void closeLastTopicLedgerAndOpenNewOne(final TopicInfo topicInfo) {
- final long ledgerId = topicInfo.currentLedgerRange.handle.getId();
- topicInfo.currentLedgerRange.handle.asyncClose(new CloseCallback() {
- AtomicBoolean processed = new AtomicBoolean(false);
- @Override
- public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
- if (!processed.compareAndSet(false, true)) {
- return;
- }
- if (BKException.Code.OK != rc) {
- BKException bke = BKException.create(rc);
- logger.error("Could not close ledger " + ledgerId
- + " while changing ledger of topic " + topic.toStringUtf8(), bke);
- cb.operationFailed(ctx, new PubSubException.ServiceDownException(bke));
- return;
- }
- long endSeqId = topicInfo.lastSeqIdPushed.getLocalComponent();
- // update last range
- LedgerRange lastRange =
- buildLedgerRange(ledgerId, topicInfo.currentLedgerRange.getStartSeqIdIncluded(),
- topicInfo.lastSeqIdPushed);
-
- topicInfo.currentLedgerRange.range = lastRange;
- // put current ledger to ledger ranges
- topicInfo.ledgerRanges.put(endSeqId, topicInfo.currentLedgerRange);
- logger.info("Closed written ledger " + ledgerId + " for topic "
- + topic.toStringUtf8() + " to change ledger.");
- openNewTopicLedger(topic, topicInfo.ledgerRangesVersion,
- topicInfo, endSeqId + 1, true, cb, ctx);
- }
- }, ctx);
- }
-
- }
-
- /**
- * Change ledger to write for a topic.
- *
- * @param topic
- * Topic Name
- */
- protected void changeLedger(ByteString topic, Callback<Void> cb, Object ctx) {
- queuer.pushAndMaybeRun(topic, new ChangeLedgerOp(topic, cb, ctx));
- }
-
- public void closeLedger(LedgerHandle lh) {
- // try {
- // lh.asyncClose(noOpCloseCallback, null);
- // } catch (InterruptedException e) {
- // logger.error(e);
- // Thread.currentThread().interrupt();
- // }
- }
-
- class ReleaseOp extends TopicOpQueuer.SynchronousOp {
-
- public ReleaseOp(ByteString topic) {
- queuer.super(topic);
- }
-
- @Override
- public void runInternal() {
- TopicInfo topicInfo = topicInfos.remove(topic);
-
- if (topicInfo == null) {
- return;
- }
-
- for (InMemoryLedgerRange imlr : topicInfo.ledgerRanges.values()) {
- if (imlr.handle != null) {
- closeLedger(imlr.handle);
- }
- }
-
- if (topicInfo.currentLedgerRange != null && topicInfo.currentLedgerRange.handle != null) {
- closeLedger(topicInfo.currentLedgerRange.handle);
- }
- }
- }
-
- /**
- * Release any resources for the topic that might be currently held. There
- * wont be any subsequent reads or writes on that topic coming
- *
- * @param topic
- */
- @Override
- public void lostTopic(ByteString topic) {
- queuer.pushAndMaybeRun(topic, new ReleaseOp(topic));
- }
-
- class SetMessageBoundOp extends TopicOpQueuer.SynchronousOp {
- final int bound;
-
- public SetMessageBoundOp(ByteString topic, int bound) {
- queuer.super(topic);
- this.bound = bound;
- }
-
- @Override
- public void runInternal() {
- TopicInfo topicInfo = topicInfos.get(topic);
- if (topicInfo != null) {
- topicInfo.messageBound = bound;
- }
- }
- }
-
- public void setMessageBound(ByteString topic, Integer bound) {
- queuer.pushAndMaybeRun(topic, new SetMessageBoundOp(topic, bound));
- }
-
- public void clearMessageBound(ByteString topic) {
- queuer.pushAndMaybeRun(topic, new SetMessageBoundOp(topic, TopicInfo.UNLIMITED));
- }
-
- @Override
- public void stop() {
- try {
- tpManager.close();
- } catch (IOException ioe) {
- logger.warn("Exception closing topic persistence manager : ", ioe);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/CacheKey.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/CacheKey.java b/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/CacheKey.java
deleted file mode 100644
index 26bdb94..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/CacheKey.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.persistence;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.server.common.ByteStringInterner;
-
-public class CacheKey {
-
- ByteString topic;
- long seqId;
-
- public CacheKey(ByteString topic, long seqId) {
- this.topic = ByteStringInterner.intern(topic);
- this.seqId = seqId;
- }
-
- public ByteString getTopic() {
- return topic;
- }
-
- public long getSeqId() {
- return seqId;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + (int) (seqId ^ (seqId >>> 32));
- result = prime * result + ((topic == null) ? 0 : topic.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- CacheKey other = (CacheKey) obj;
- if (seqId != other.seqId)
- return false;
- if (topic == null) {
- if (other.topic != null)
- return false;
- } else if (!topic.equals(other.topic))
- return false;
- return true;
- }
-
- @Override
- public String toString() {
- return "(" + topic.toStringUtf8() + "," + seqId + ")";
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/CacheValue.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/CacheValue.java b/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/CacheValue.java
deleted file mode 100644
index 992ff11..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/CacheValue.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.persistence;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.server.common.UnexpectedError;
-
-/**
- * This class is NOT thread safe. It need not be thread-safe because our
- * read-ahead cache will operate with only 1 thread
- *
- */
-public class CacheValue {
-
- private static final Logger logger = LoggerFactory.getLogger(ReadAheadCache.class);
-
- // Actually we don't care the order of callbacks
- // when a scan callback, it should be delivered to both callbacks
- Set<ScanCallbackWithContext> callbacks = new HashSet<ScanCallbackWithContext>();
- Message message;
- long timeOfAddition = 0;
-
- public CacheValue() {
- }
-
- public boolean isStub() {
- return message == null;
- }
-
- public long getTimeOfAddition() {
- if (message == null) {
- throw new UnexpectedError("Time of add requested from a stub");
- }
- return timeOfAddition;
- }
-
- public void setMessageAndInvokeCallbacks(Message message, long currTime) {
- if (this.message != null) {
- // Duplicate read for the same message coming back
- return;
- }
-
- this.message = message;
- this.timeOfAddition = currTime;
-
- logger.debug("Invoking {} callbacks for {} message added to cache", callbacks.size(), message);
- for (ScanCallbackWithContext callbackWithCtx : callbacks) {
- if (null != callbackWithCtx) {
- callbackWithCtx.getScanCallback().messageScanned(callbackWithCtx.getCtx(), message);
- }
- }
- }
-
- public boolean removeCallback(ScanCallback callback, Object ctx) {
- return callbacks.remove(new ScanCallbackWithContext(callback, ctx));
- }
-
- public void addCallback(ScanCallback callback, Object ctx) {
- if (!isStub()) {
- // call the callback right away
- callback.messageScanned(ctx, message);
- return;
- }
-
- callbacks.add(new ScanCallbackWithContext(callback, ctx));
- }
-
- public Message getMessage() {
- return message;
- }
-
- public void setErrorAndInvokeCallbacks(Exception exception) {
- for (ScanCallbackWithContext callbackWithCtx : callbacks) {
- if (null != callbackWithCtx) {
- callbackWithCtx.getScanCallback().scanFailed(callbackWithCtx.getCtx(), exception);
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/CancelScanRequest.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/CancelScanRequest.java b/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/CancelScanRequest.java
deleted file mode 100644
index c3b5214..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/CancelScanRequest.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.persistence;
-
-public interface CancelScanRequest {
-
- /**
- * @return the scan request to cancel
- */
- public ScanRequest getScanRequest();
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/Factory.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/Factory.java b/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/Factory.java
deleted file mode 100644
index c1ee24c..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/Factory.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.persistence;
-
-public interface Factory<T> {
- public T newInstance();
-}