You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/01/05 00:51:16 UTC
[11/51] [partial] incubator-distributedlog git commit: DL-4:
Repackage the source under apache namespace
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java
new file mode 100644
index 0000000..00e6b5c
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java
@@ -0,0 +1,1106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import org.apache.distributedlog.callback.LogSegmentListener;
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
+import org.apache.distributedlog.exceptions.AlreadyClosedException;
+import org.apache.distributedlog.exceptions.LogEmptyException;
+import org.apache.distributedlog.exceptions.LogNotFoundException;
+import org.apache.distributedlog.exceptions.UnexpectedException;
+import org.apache.distributedlog.function.CloseAsyncCloseableFunction;
+import org.apache.distributedlog.function.GetVersionedValueFunction;
+import org.apache.distributedlog.injector.AsyncFailureInjector;
+import org.apache.distributedlog.logsegment.LogSegmentEntryStore;
+import org.apache.distributedlog.logsegment.LogSegmentEntryWriter;
+import org.apache.distributedlog.metadata.LogMetadataForReader;
+import org.apache.distributedlog.metadata.LogMetadataForWriter;
+import org.apache.distributedlog.io.AsyncCloseable;
+import org.apache.distributedlog.lock.DistributedLock;
+import org.apache.distributedlog.lock.NopDistributedLock;
+import org.apache.distributedlog.lock.ZKDistributedLock;
+import org.apache.distributedlog.logsegment.LogSegmentFilter;
+import org.apache.distributedlog.logsegment.LogSegmentMetadataCache;
+import org.apache.distributedlog.metadata.LogStreamMetadataStore;
+import org.apache.distributedlog.namespace.NamespaceDriver;
+import org.apache.distributedlog.stats.BroadCastStatsLogger;
+import org.apache.distributedlog.subscription.SubscriptionsStore;
+import org.apache.distributedlog.util.Allocator;
+import org.apache.distributedlog.util.DLUtils;
+import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.util.MonitoredFuturePool;
+import org.apache.distributedlog.util.OrderedScheduler;
+import org.apache.distributedlog.util.PermitLimiter;
+import org.apache.distributedlog.util.PermitManager;
+import org.apache.distributedlog.util.SchedulerUtils;
+import org.apache.distributedlog.util.Utils;
+import com.twitter.util.ExceptionalFunction;
+import com.twitter.util.ExceptionalFunction0;
+import com.twitter.util.Function;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
+import com.twitter.util.Promise;
+import org.apache.bookkeeper.feature.FeatureProvider;
+import org.apache.bookkeeper.stats.AlertStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.runtime.AbstractFunction0;
+import scala.runtime.AbstractFunction1;
+import scala.runtime.BoxedUnit;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.distributedlog.namespace.NamespaceDriver.Role.READER;
+import static org.apache.distributedlog.namespace.NamespaceDriver.Role.WRITER;
+
+/**
+ * <h3>Metrics</h3>
+ * <ul>
+ * <li> `log_writer/*`: all asynchronous writer related metrics are exposed under scope `log_writer`.
+ * See {@link BKAsyncLogWriter} for detail stats.
+ * <li> `async_reader/*`: all asyncrhonous reader related metrics are exposed under scope `async_reader`.
+ * See {@link BKAsyncLogReader} for detail stats.
+ * <li> `writer_future_pool/*`: metrics about the future pools that used by writers are exposed under
+ * scope `writer_future_pool`. See {@link MonitoredFuturePool} for detail stats.
+ * <li> `reader_future_pool/*`: metrics about the future pools that used by readers are exposed under
+ * scope `reader_future_pool`. See {@link MonitoredFuturePool} for detail stats.
+ * <li> `lock/*`: metrics about the locks used by writers. See {@link ZKDistributedLock} for detail
+ * stats.
+ * <li> `read_lock/*`: metrics about the locks used by readers. See {@link ZKDistributedLock} for
+ * detail stats.
+ * <li> `logsegments/*`: metrics about basic operations on log segments. See {@link BKLogHandler} for details.
+ * <li> `segments/*`: metrics about write operations on log segments. See {@link BKLogWriteHandler} for details.
+ * <li> `readahead_worker/*`: metrics about readahead workers used by readers. See {@link BKLogReadHandler}
+ * for details.
+ * </ul>
+ */
+class BKDistributedLogManager implements DistributedLogManager {
+ static final Logger LOG = LoggerFactory.getLogger(BKDistributedLogManager.class);
+
+ static final Function<LogRecordWithDLSN, Long> RECORD_2_TXID_FUNCTION =
+ new Function<LogRecordWithDLSN, Long>() {
+ @Override
+ public Long apply(LogRecordWithDLSN record) {
+ return record.getTransactionId();
+ }
+ };
+
+ static final Function<LogRecordWithDLSN, DLSN> RECORD_2_DLSN_FUNCTION =
+ new Function<LogRecordWithDLSN, DLSN>() {
+ @Override
+ public DLSN apply(LogRecordWithDLSN record) {
+ return record.getDlsn();
+ }
+ };
+
+ private final URI uri;
+ private final String name;
+ private final String clientId;
+ private final int regionId;
+ private final String streamIdentifier;
+ private final DistributedLogConfiguration conf;
+ private final DynamicDistributedLogConfiguration dynConf;
+ private final NamespaceDriver driver;
+ private Promise<Void> closePromise;
+ private final OrderedScheduler scheduler;
+ private final FeatureProvider featureProvider;
+ private final AsyncFailureInjector failureInjector;
+ private final StatsLogger statsLogger;
+ private final StatsLogger perLogStatsLogger;
+ final AlertStatsLogger alertStatsLogger;
+
+ // log segment metadata cache
+ private final LogSegmentMetadataCache logSegmentMetadataCache;
+
+ //
+ // Writer Related Variables
+ //
+ private final PermitLimiter writeLimiter;
+
+ //
+ // Reader Related Variables
+ ///
+ // read handler for listener.
+ private BKLogReadHandler readHandlerForListener = null;
+ private final PendingReaders pendingReaders;
+
+ // resource to close
+ private final Optional<AsyncCloseable> resourcesCloseable;
+
+ /**
+ * Create a {@link DistributedLogManager} with supplied resources.
+ *
+ * @param name log name
+ * @param conf distributedlog configuration
+ * @param dynConf dynamic distributedlog configuration
+ * @param uri uri location for the log
+ * @param driver namespace driver
+ * @param logSegmentMetadataCache log segment metadata cache
+ * @param scheduler ordered scheduled used by readers and writers
+ * @param clientId client id that used to initiate the locks
+ * @param regionId region id that would be encrypted as part of log segment metadata
+ * to indicate which region that the log segment will be created
+ * @param writeLimiter write limiter
+ * @param featureProvider provider to offer features
+ * @param statsLogger stats logger to receive stats
+ * @param perLogStatsLogger stats logger to receive per log stats
+ * @throws IOException
+ */
+ BKDistributedLogManager(String name,
+ DistributedLogConfiguration conf,
+ DynamicDistributedLogConfiguration dynConf,
+ URI uri,
+ NamespaceDriver driver,
+ LogSegmentMetadataCache logSegmentMetadataCache,
+ OrderedScheduler scheduler,
+ String clientId,
+ Integer regionId,
+ PermitLimiter writeLimiter,
+ FeatureProvider featureProvider,
+ AsyncFailureInjector failureInjector,
+ StatsLogger statsLogger,
+ StatsLogger perLogStatsLogger,
+ Optional<AsyncCloseable> resourcesCloseable) {
+ this.name = name;
+ this.conf = conf;
+ this.dynConf = dynConf;
+ this.uri = uri;
+ this.driver = driver;
+ this.logSegmentMetadataCache = logSegmentMetadataCache;
+ this.scheduler = scheduler;
+ this.statsLogger = statsLogger;
+ this.perLogStatsLogger = BroadCastStatsLogger.masterslave(perLogStatsLogger, statsLogger);
+ this.pendingReaders = new PendingReaders(scheduler);
+ this.regionId = regionId;
+ this.clientId = clientId;
+ this.streamIdentifier = conf.getUnpartitionedStreamName();
+ this.writeLimiter = writeLimiter;
+ // Feature Provider
+ this.featureProvider = featureProvider;
+ // Failure Injector
+ this.failureInjector = failureInjector;
+ // Stats
+ this.alertStatsLogger = new AlertStatsLogger(this.perLogStatsLogger, "dl_alert");
+ this.resourcesCloseable = resourcesCloseable;
+ }
+
+ @Override
+ public String getStreamName() {
+ return name;
+ }
+
+ @Override
+ public NamespaceDriver getNamespaceDriver() {
+ return driver;
+ }
+
+ URI getUri() {
+ return uri;
+ }
+
+ DistributedLogConfiguration getConf() {
+ return conf;
+ }
+
+ OrderedScheduler getScheduler() {
+ return scheduler;
+ }
+
+ AsyncFailureInjector getFailureInjector() {
+ return failureInjector;
+ }
+
+ //
+ // Test Methods
+ //
+
+ @VisibleForTesting
+ LogStreamMetadataStore getWriterMetadataStore() {
+ return driver.getLogStreamMetadataStore(WRITER);
+ }
+
+ @VisibleForTesting
+ LogSegmentEntryStore getReaderEntryStore() {
+ return driver.getLogSegmentEntryStore(READER);
+ }
+
+ @VisibleForTesting
+ FeatureProvider getFeatureProvider() {
+ return this.featureProvider;
+ }
+
+ private synchronized BKLogReadHandler getReadHandlerAndRegisterListener(
+ boolean create, LogSegmentListener listener) {
+ if (null == readHandlerForListener && create) {
+ readHandlerForListener = createReadHandler();
+ readHandlerForListener.registerListener(listener);
+ // start fetch the log segments after created the listener
+ readHandlerForListener.asyncStartFetchLogSegments();
+ return readHandlerForListener;
+ }
+ if (null != readHandlerForListener && null != listener) {
+ readHandlerForListener.registerListener(listener);
+ }
+ return readHandlerForListener;
+ }
+
+ @Override
+ public List<LogSegmentMetadata> getLogSegments() throws IOException {
+ return FutureUtils.result(getLogSegmentsAsync());
+ }
+
+ protected Future<List<LogSegmentMetadata>> getLogSegmentsAsync() {
+ final BKLogReadHandler readHandler = createReadHandler();
+ return readHandler.readLogSegmentsFromStore(
+ LogSegmentMetadata.COMPARATOR,
+ LogSegmentFilter.DEFAULT_FILTER,
+ null)
+ .map(GetVersionedValueFunction.GET_LOGSEGMENT_LIST_FUNC)
+ .ensure(CloseAsyncCloseableFunction.of(readHandler));
+ }
+
+ @Override
+ public void registerListener(LogSegmentListener listener) throws IOException {
+ getReadHandlerAndRegisterListener(true, listener);
+ }
+
+ @Override
+ public synchronized void unregisterListener(LogSegmentListener listener) {
+ if (null != readHandlerForListener) {
+ readHandlerForListener.unregisterListener(listener);
+ }
+ }
+
+ public void checkClosedOrInError(String operation) throws AlreadyClosedException {
+ synchronized (this) {
+ if (null != closePromise) {
+ throw new AlreadyClosedException("Executing " + operation + " on already closed DistributedLogManager");
+ }
+ }
+ }
+
+ // Create Read Handler
+
+ synchronized BKLogReadHandler createReadHandler() {
+ Optional<String> subscriberId = Optional.absent();
+ return createReadHandler(subscriberId, false);
+ }
+
+ synchronized BKLogReadHandler createReadHandler(Optional<String> subscriberId) {
+ return createReadHandler(subscriberId, false);
+ }
+
+ synchronized BKLogReadHandler createReadHandler(Optional<String> subscriberId,
+ boolean isHandleForReading) {
+ return createReadHandler(
+ subscriberId,
+ null,
+ isHandleForReading);
+ }
+
+ synchronized BKLogReadHandler createReadHandler(Optional<String> subscriberId,
+ AsyncNotification notification,
+ boolean isHandleForReading) {
+ LogMetadataForReader logMetadata = LogMetadataForReader.of(uri, name, streamIdentifier);
+ return new BKLogReadHandler(
+ logMetadata,
+ subscriberId,
+ conf,
+ dynConf,
+ driver.getLogStreamMetadataStore(READER),
+ logSegmentMetadataCache,
+ driver.getLogSegmentEntryStore(READER),
+ scheduler,
+ alertStatsLogger,
+ statsLogger,
+ perLogStatsLogger,
+ clientId,
+ notification,
+ isHandleForReading);
+ }
+
+ // Create Ledger Allocator
+
+
+
+ // Create Write Handler
+
+ public BKLogWriteHandler createWriteHandler(boolean lockHandler)
+ throws IOException {
+ return FutureUtils.result(asyncCreateWriteHandler(lockHandler));
+ }
+
+ Future<BKLogWriteHandler> asyncCreateWriteHandler(final boolean lockHandler) {
+ // Fetching Log Metadata (create if not exists)
+ return driver.getLogStreamMetadataStore(WRITER).getLog(
+ uri,
+ name,
+ true,
+ conf.getCreateStreamIfNotExists()
+ ).flatMap(new AbstractFunction1<LogMetadataForWriter, Future<BKLogWriteHandler>>() {
+ @Override
+ public Future<BKLogWriteHandler> apply(LogMetadataForWriter logMetadata) {
+ Promise<BKLogWriteHandler> createPromise = new Promise<BKLogWriteHandler>();
+ createWriteHandler(logMetadata, lockHandler, createPromise);
+ return createPromise;
+ }
+ });
+ }
+
+ private void createWriteHandler(LogMetadataForWriter logMetadata,
+ boolean lockHandler,
+ final Promise<BKLogWriteHandler> createPromise) {
+ // Build the locks
+ DistributedLock lock;
+ if (conf.isWriteLockEnabled()) {
+ lock = driver.getLogStreamMetadataStore(WRITER).createWriteLock(logMetadata);
+ } else {
+ lock = NopDistributedLock.INSTANCE;
+ }
+
+ Allocator<LogSegmentEntryWriter, Object> segmentAllocator;
+ try {
+ segmentAllocator = driver.getLogSegmentEntryStore(WRITER)
+ .newLogSegmentAllocator(logMetadata, dynConf);
+ } catch (IOException ioe) {
+ FutureUtils.setException(createPromise, ioe);
+ return;
+ }
+
+ // Make sure writer handler created before resources are initialized
+ final BKLogWriteHandler writeHandler = new BKLogWriteHandler(
+ logMetadata,
+ conf,
+ driver.getLogStreamMetadataStore(WRITER),
+ logSegmentMetadataCache,
+ driver.getLogSegmentEntryStore(WRITER),
+ scheduler,
+ segmentAllocator,
+ statsLogger,
+ perLogStatsLogger,
+ alertStatsLogger,
+ clientId,
+ regionId,
+ writeLimiter,
+ featureProvider,
+ dynConf,
+ lock);
+ if (lockHandler) {
+ writeHandler.lockHandler().addEventListener(new FutureEventListener<DistributedLock>() {
+ @Override
+ public void onSuccess(DistributedLock lock) {
+ FutureUtils.setValue(createPromise, writeHandler);
+ }
+
+ @Override
+ public void onFailure(final Throwable cause) {
+ writeHandler.asyncClose().ensure(new AbstractFunction0<BoxedUnit>() {
+ @Override
+ public BoxedUnit apply() {
+ FutureUtils.setException(createPromise, cause);
+ return BoxedUnit.UNIT;
+ }
+ });
+ }
+ });
+ } else {
+ FutureUtils.setValue(createPromise, writeHandler);
+ }
+ }
+
+ PermitManager getLogSegmentRollingPermitManager() {
+ return driver.getLogStreamMetadataStore(WRITER).getPermitManager();
+ }
+
+ <T> Future<T> processReaderOperation(final Function<BKLogReadHandler, Future<T>> func) {
+ return scheduler.apply(new ExceptionalFunction0<BKLogReadHandler>() {
+ @Override
+ public BKLogReadHandler applyE() throws Throwable {
+ return getReadHandlerAndRegisterListener(true, null);
+ }
+ }).flatMap(new ExceptionalFunction<BKLogReadHandler, Future<T>>() {
+ @Override
+ public Future<T> applyE(final BKLogReadHandler readHandler) throws Throwable {
+ return func.apply(readHandler);
+ }
+ });
+ }
+
+ /**
+ * Check if an end of stream marker was added to the stream
+ * A stream with an end of stream marker cannot be appended to
+ *
+ * @return true if the marker was added to the stream, false otherwise
+ */
+ @Override
+ public boolean isEndOfStreamMarked() throws IOException {
+ checkClosedOrInError("isEndOfStreamMarked");
+ long lastTxId = FutureUtils.result(getLastLogRecordAsyncInternal(false, true)).getTransactionId();
+ return lastTxId == DistributedLogConstants.MAX_TXID;
+ }
+
+ /**
+ * Begin appending to the end of the log stream which is being treated as a sequence of bytes
+ *
+ * @return the writer interface to generate log records
+ */
+ public AppendOnlyStreamWriter getAppendOnlyStreamWriter() throws IOException {
+ long position;
+ try {
+ position = FutureUtils.result(getLastLogRecordAsyncInternal(true, false)).getTransactionId();
+ if (DistributedLogConstants.INVALID_TXID == position ||
+ DistributedLogConstants.EMPTY_LOGSEGMENT_TX_ID == position) {
+ position = 0;
+ }
+ } catch (LogEmptyException ex) {
+ position = 0;
+ } catch (LogNotFoundException ex) {
+ position = 0;
+ }
+ return new AppendOnlyStreamWriter(startAsyncLogSegmentNonPartitioned(), position);
+ }
+
+ /**
+ * Get a reader to read a log stream as a sequence of bytes
+ *
+ * @return the writer interface to generate log records
+ */
+ public AppendOnlyStreamReader getAppendOnlyStreamReader() throws IOException {
+ return new AppendOnlyStreamReader(this);
+ }
+
+ /**
+ * Begin writing to the log stream identified by the name
+ *
+ * @return the writer interface to generate log records
+ */
+ @Override
+ public BKSyncLogWriter startLogSegmentNonPartitioned() throws IOException {
+ checkClosedOrInError("startLogSegmentNonPartitioned");
+ BKSyncLogWriter writer = new BKSyncLogWriter(conf, dynConf, this);
+ boolean success = false;
+ try {
+ writer.createAndCacheWriteHandler();
+ BKLogWriteHandler writeHandler = writer.getWriteHandler();
+ FutureUtils.result(writeHandler.lockHandler());
+ success = true;
+ return writer;
+ } finally {
+ if (!success) {
+ writer.abort();
+ }
+ }
+ }
+
+ /**
+ * Begin writing to the log stream identified by the name
+ *
+ * @return the writer interface to generate log records
+ */
+ @Override
+ public BKAsyncLogWriter startAsyncLogSegmentNonPartitioned() throws IOException {
+ return (BKAsyncLogWriter) FutureUtils.result(openAsyncLogWriter());
+ }
+
+ @Override
+ public Future<AsyncLogWriter> openAsyncLogWriter() {
+ try {
+ checkClosedOrInError("startLogSegmentNonPartitioned");
+ } catch (AlreadyClosedException e) {
+ return Future.exception(e);
+ }
+
+ Future<BKLogWriteHandler> createWriteHandleFuture;
+ synchronized (this) {
+ // 1. create the locked write handler
+ createWriteHandleFuture = asyncCreateWriteHandler(true);
+ }
+ return createWriteHandleFuture.flatMap(new AbstractFunction1<BKLogWriteHandler, Future<AsyncLogWriter>>() {
+ @Override
+ public Future<AsyncLogWriter> apply(final BKLogWriteHandler writeHandler) {
+ final BKAsyncLogWriter writer;
+ synchronized (BKDistributedLogManager.this) {
+ // 2. create the writer with the handler
+ writer = new BKAsyncLogWriter(
+ conf,
+ dynConf,
+ BKDistributedLogManager.this,
+ writeHandler,
+ featureProvider,
+ statsLogger);
+ }
+ // 3. recover the incomplete log segments
+ return writeHandler.recoverIncompleteLogSegments()
+ .map(new AbstractFunction1<Long, AsyncLogWriter>() {
+ @Override
+ public AsyncLogWriter apply(Long lastTxId) {
+ // 4. update last tx id if successfully recovered
+ writer.setLastTxId(lastTxId);
+ return writer;
+ }
+ }).onFailure(new AbstractFunction1<Throwable, BoxedUnit>() {
+ @Override
+ public BoxedUnit apply(Throwable cause) {
+ // 5. close the writer if recovery failed
+ writer.asyncAbort();
+ return BoxedUnit.UNIT;
+ }
+ });
+ }
+ });
+ }
+
+ @Override
+ public Future<DLSN> getDLSNNotLessThanTxId(final long fromTxnId) {
+ return getLogSegmentsAsync().flatMap(new AbstractFunction1<List<LogSegmentMetadata>, Future<DLSN>>() {
+ @Override
+ public Future<DLSN> apply(List<LogSegmentMetadata> segments) {
+ return getDLSNNotLessThanTxId(fromTxnId, segments);
+ }
+ });
+ }
+
+ private Future<DLSN> getDLSNNotLessThanTxId(long fromTxnId,
+ final List<LogSegmentMetadata> segments) {
+ if (segments.isEmpty()) {
+ return getLastDLSNAsync();
+ }
+ final int segmentIdx = DLUtils.findLogSegmentNotLessThanTxnId(segments, fromTxnId);
+ if (segmentIdx < 0) {
+ return Future.value(new DLSN(segments.get(0).getLogSegmentSequenceNumber(), 0L, 0L));
+ }
+ return getDLSNNotLessThanTxIdInSegment(
+ fromTxnId,
+ segmentIdx,
+ segments,
+ driver.getLogSegmentEntryStore(READER)
+ );
+ }
+
+ private Future<DLSN> getDLSNNotLessThanTxIdInSegment(final long fromTxnId,
+ final int segmentIdx,
+ final List<LogSegmentMetadata> segments,
+ final LogSegmentEntryStore entryStore) {
+ final LogSegmentMetadata segment = segments.get(segmentIdx);
+ return ReadUtils.getLogRecordNotLessThanTxId(
+ name,
+ segment,
+ fromTxnId,
+ scheduler,
+ entryStore,
+ Math.max(2, dynConf.getReadAheadBatchSize())
+ ).flatMap(new AbstractFunction1<Optional<LogRecordWithDLSN>, Future<DLSN>>() {
+ @Override
+ public Future<DLSN> apply(Optional<LogRecordWithDLSN> foundRecord) {
+ if (foundRecord.isPresent()) {
+ return Future.value(foundRecord.get().getDlsn());
+ }
+ if ((segments.size() - 1) == segmentIdx) {
+ return getLastLogRecordAsync().map(new AbstractFunction1<LogRecordWithDLSN, DLSN>() {
+ @Override
+ public DLSN apply(LogRecordWithDLSN record) {
+ if (record.getTransactionId() >= fromTxnId) {
+ return record.getDlsn();
+ }
+ return record.getDlsn().getNextDLSN();
+ }
+ });
+ } else {
+ return getDLSNNotLessThanTxIdInSegment(
+ fromTxnId,
+ segmentIdx + 1,
+ segments,
+ entryStore);
+ }
+ }
+ });
+ }
+
+ /**
+ * Get the input stream starting with fromTxnId for the specified log
+ *
+ * @param fromTxnId - the first transaction id we want to read
+ * @return the stream starting with transaction fromTxnId
+ * @throws IOException if a stream cannot be found.
+ */
+ @Override
+ public LogReader getInputStream(long fromTxnId)
+ throws IOException {
+ return getInputStreamInternal(fromTxnId);
+ }
+
+ @Override
+ public LogReader getInputStream(DLSN fromDLSN) throws IOException {
+ return getInputStreamInternal(fromDLSN, Optional.<Long>absent());
+ }
+
+ @Override
+ public AsyncLogReader getAsyncLogReader(long fromTxnId) throws IOException {
+ return FutureUtils.result(openAsyncLogReader(fromTxnId));
+ }
+
+ /**
+ * Opening a log reader positioning by transaction id <code>fromTxnId</code>.
+ *
+ * <p>
+ * - retrieve log segments for the stream
+ * - if the log segment list is empty, positioning by the last dlsn
+ * - otherwise, find the first log segment that contains the records whose transaction ids are not less than
+ * the provided transaction id <code>fromTxnId</code>
+ * - if all log segments' records' transaction ids are more than <code>fromTxnId</code>, positioning
+ * on the first record.
+ * - otherwise, search the log segment to find the log record
+ * - if the log record is found, positioning the reader by that found record's dlsn
+ * - otherwise, positioning by the last dlsn
+ * </p>
+ *
+ * @see DLUtils#findLogSegmentNotLessThanTxnId(List, long)
+ * @see ReadUtils#getLogRecordNotLessThanTxId(String, LogSegmentMetadata, long, ExecutorService, LogSegmentEntryStore, int)
+ * @param fromTxnId
+ * transaction id to start reading from
+ * @return future representing the open result.
+ */
+ @Override
+ public Future<AsyncLogReader> openAsyncLogReader(long fromTxnId) {
+ final Promise<DLSN> dlsnPromise = new Promise<DLSN>();
+ getDLSNNotLessThanTxId(fromTxnId).addEventListener(new FutureEventListener<DLSN>() {
+
+ @Override
+ public void onSuccess(DLSN dlsn) {
+ dlsnPromise.setValue(dlsn);
+ }
+
+ @Override
+ public void onFailure(Throwable cause) {
+ if (cause instanceof LogEmptyException) {
+ dlsnPromise.setValue(DLSN.InitialDLSN);
+ } else {
+ dlsnPromise.setException(cause);
+ }
+ }
+ });
+ return dlsnPromise.flatMap(new AbstractFunction1<DLSN, Future<AsyncLogReader>>() {
+ @Override
+ public Future<AsyncLogReader> apply(DLSN dlsn) {
+ return openAsyncLogReader(dlsn);
+ }
+ });
+ }
+
+ @Override
+ public AsyncLogReader getAsyncLogReader(DLSN fromDLSN) throws IOException {
+ return FutureUtils.result(openAsyncLogReader(fromDLSN));
+ }
+
+ @Override
+ public Future<AsyncLogReader> openAsyncLogReader(DLSN fromDLSN) {
+ Optional<String> subscriberId = Optional.absent();
+ AsyncLogReader reader = new BKAsyncLogReader(
+ this,
+ scheduler,
+ fromDLSN,
+ subscriberId,
+ false,
+ statsLogger);
+ pendingReaders.add(reader);
+ return Future.value(reader);
+ }
+
+ /**
+ * Note the lock here is a sort of elective exclusive lock. I.e. acquiring this lock will only prevent other
+ * people who try to acquire the lock from reading from the stream. Normal readers (and writers) will not be
+ * blocked.
+ */
+ @Override
+ public Future<AsyncLogReader> getAsyncLogReaderWithLock(final DLSN fromDLSN) {
+ Optional<String> subscriberId = Optional.absent();
+ return getAsyncLogReaderWithLock(Optional.of(fromDLSN), subscriberId);
+ }
+
+ @Override
+ public Future<AsyncLogReader> getAsyncLogReaderWithLock(final DLSN fromDLSN, final String subscriberId) {
+ return getAsyncLogReaderWithLock(Optional.of(fromDLSN), Optional.of(subscriberId));
+ }
+
+ @Override
+ public Future<AsyncLogReader> getAsyncLogReaderWithLock(String subscriberId) {
+ Optional<DLSN> fromDLSN = Optional.absent();
+ return getAsyncLogReaderWithLock(fromDLSN, Optional.of(subscriberId));
+ }
+
+ protected Future<AsyncLogReader> getAsyncLogReaderWithLock(final Optional<DLSN> fromDLSN,
+ final Optional<String> subscriberId) {
+ if (!fromDLSN.isPresent() && !subscriberId.isPresent()) {
+ return Future.exception(new UnexpectedException("Neither from dlsn nor subscriber id is provided."));
+ }
+ final BKAsyncLogReader reader = new BKAsyncLogReader(
+ BKDistributedLogManager.this,
+ scheduler,
+ fromDLSN.isPresent() ? fromDLSN.get() : DLSN.InitialDLSN,
+ subscriberId,
+ false,
+ statsLogger);
+ pendingReaders.add(reader);
+ final Future<Void> lockFuture = reader.lockStream();
+ final Promise<AsyncLogReader> createPromise = new Promise<AsyncLogReader>(
+ new Function<Throwable, BoxedUnit>() {
+ @Override
+ public BoxedUnit apply(Throwable cause) {
+ // cancel the lock when the creation future is cancelled
+ lockFuture.cancel();
+ return BoxedUnit.UNIT;
+ }
+ });
+ // lock the stream - fetch the last commit position on success
+ lockFuture.flatMap(new Function<Void, Future<AsyncLogReader>>() {
+ @Override
+ public Future<AsyncLogReader> apply(Void complete) {
+ if (fromDLSN.isPresent()) {
+ return Future.value((AsyncLogReader) reader);
+ }
+ LOG.info("Reader {} @ {} reading last commit position from subscription store after acquired lock.",
+ subscriberId.get(), name);
+ // we acquired lock
+ final SubscriptionsStore subscriptionsStore = driver.getSubscriptionsStore(getStreamName());
+ return subscriptionsStore.getLastCommitPosition(subscriberId.get())
+ .map(new ExceptionalFunction<DLSN, AsyncLogReader>() {
+ @Override
+ public AsyncLogReader applyE(DLSN lastCommitPosition) throws UnexpectedException {
+ LOG.info("Reader {} @ {} positioned to last commit position {}.",
+ new Object[] { subscriberId.get(), name, lastCommitPosition });
+ reader.setStartDLSN(lastCommitPosition);
+ return reader;
+ }
+ });
+ }
+ }).addEventListener(new FutureEventListener<AsyncLogReader>() {
+ @Override
+ public void onSuccess(AsyncLogReader r) {
+ pendingReaders.remove(reader);
+ FutureUtils.setValue(createPromise, r);
+ }
+
+ @Override
+ public void onFailure(final Throwable cause) {
+ pendingReaders.remove(reader);
+ reader.asyncClose().ensure(new AbstractFunction0<BoxedUnit>() {
+ @Override
+ public BoxedUnit apply() {
+ FutureUtils.setException(createPromise, cause);
+ return BoxedUnit.UNIT;
+ }
+ });
+ }
+ });
+ return createPromise;
+ }
+
+ /**
+ * Get the input stream starting with fromTxnId for the specified log
+ *
+ * @param fromTxnId
+ * transaction id to start reading from
+ * @return log reader
+ * @throws IOException
+ */
+ LogReader getInputStreamInternal(long fromTxnId)
+ throws IOException {
+ DLSN fromDLSN;
+ try {
+ fromDLSN = FutureUtils.result(getDLSNNotLessThanTxId(fromTxnId));
+ } catch (LogEmptyException lee) {
+ fromDLSN = DLSN.InitialDLSN;
+ }
+ return getInputStreamInternal(fromDLSN, Optional.of(fromTxnId));
+ }
+
+ LogReader getInputStreamInternal(DLSN fromDLSN, Optional<Long> fromTxnId)
+ throws IOException {
+ LOG.info("Create sync reader starting from {}", fromDLSN);
+ checkClosedOrInError("getInputStream");
+ return new BKSyncLogReader(
+ conf,
+ this,
+ fromDLSN,
+ fromTxnId,
+ statsLogger);
+ }
+
+ /**
+ * Get the last log record in the stream
+ *
+ * @return the last log record in the stream
+ * @throws java.io.IOException if a stream cannot be found.
+ */
+ @Override
+ public LogRecordWithDLSN getLastLogRecord() throws IOException {
+ checkClosedOrInError("getLastLogRecord");
+ return FutureUtils.result(getLastLogRecordAsync());
+ }
+
+ @Override
+ public long getFirstTxId() throws IOException {
+ checkClosedOrInError("getFirstTxId");
+ return FutureUtils.result(getFirstRecordAsyncInternal()).getTransactionId();
+ }
+
+ @Override
+ public long getLastTxId() throws IOException {
+ checkClosedOrInError("getLastTxId");
+ return FutureUtils.result(getLastTxIdAsync());
+ }
+
+ @Override
+ public DLSN getLastDLSN() throws IOException {
+ checkClosedOrInError("getLastDLSN");
+ return FutureUtils.result(getLastLogRecordAsyncInternal(false, false)).getDlsn();
+ }
+
+ /**
+ * Get Latest log record in the log
+ *
+ * @return latest log record
+ */
+ @Override
+ public Future<LogRecordWithDLSN> getLastLogRecordAsync() {
+ return getLastLogRecordAsyncInternal(false, false);
+ }
+
+ private Future<LogRecordWithDLSN> getLastLogRecordAsyncInternal(final boolean recover,
+ final boolean includeEndOfStream) {
+ return processReaderOperation(new Function<BKLogReadHandler, Future<LogRecordWithDLSN>>() {
+ @Override
+ public Future<LogRecordWithDLSN> apply(final BKLogReadHandler ledgerHandler) {
+ return ledgerHandler.getLastLogRecordAsync(recover, includeEndOfStream);
+ }
+ });
+ }
+
+ /**
+ * Get Latest Transaction Id in the log
+ *
+ * @return latest transaction id
+ */
+ @Override
+ public Future<Long> getLastTxIdAsync() {
+ return getLastLogRecordAsyncInternal(false, false)
+ .map(RECORD_2_TXID_FUNCTION);
+ }
+
+ /**
+ * Get first DLSN in the log.
+ *
+ * @return first dlsn in the stream
+ */
+ @Override
+ public Future<DLSN> getFirstDLSNAsync() {
+ return getFirstRecordAsyncInternal().map(RECORD_2_DLSN_FUNCTION);
+ }
+
+ private Future<LogRecordWithDLSN> getFirstRecordAsyncInternal() {
+ return processReaderOperation(new Function<BKLogReadHandler, Future<LogRecordWithDLSN>>() {
+ @Override
+ public Future<LogRecordWithDLSN> apply(final BKLogReadHandler ledgerHandler) {
+ return ledgerHandler.asyncGetFirstLogRecord();
+ }
+ });
+ }
+
+ /**
+ * Get Latest DLSN in the log.
+ *
+ * @return latest transaction id
+ */
+ @Override
+ public Future<DLSN> getLastDLSNAsync() {
+ return getLastLogRecordAsyncInternal(false, false)
+ .map(RECORD_2_DLSN_FUNCTION);
+ }
+
+ /**
+ * Get the number of log records in the active portion of the log
+ * Any log segments that have already been truncated will not be included
+ *
+ * @return number of log records
+ * @throws IOException
+ */
+ @Override
+ public long getLogRecordCount() throws IOException {
+ checkClosedOrInError("getLogRecordCount");
+ return FutureUtils.result(getLogRecordCountAsync(DLSN.InitialDLSN));
+ }
+
+ /**
+ * Get the number of log records in the active portion of the log asynchronously.
+ * Any log segments that have already been truncated will not be included
+ *
+ * @return future number of log records
+ * @throws IOException
+ */
+ @Override
+ public Future<Long> getLogRecordCountAsync(final DLSN beginDLSN) {
+ return processReaderOperation(new Function<BKLogReadHandler, Future<Long>>() {
+ @Override
+ public Future<Long> apply(BKLogReadHandler ledgerHandler) {
+ return ledgerHandler.asyncGetLogRecordCount(beginDLSN);
+ }
+ });
+ }
+
+ @Override
+ public void recover() throws IOException {
+ recoverInternal(conf.getUnpartitionedStreamName());
+ }
+
+ /**
+ * Recover a specified stream within the log container
+ * The writer implicitly recovers a topic when it resumes writing.
+ * This allows applications to recover a container explicitly so
+ * that application may read a fully recovered log before resuming
+ * the writes
+ *
+ * @throws IOException if the recovery fails
+ */
+ private void recoverInternal(String streamIdentifier) throws IOException {
+ checkClosedOrInError("recoverInternal");
+ BKLogWriteHandler ledgerHandler = createWriteHandler(true);
+ try {
+ FutureUtils.result(ledgerHandler.recoverIncompleteLogSegments());
+ } finally {
+ Utils.closeQuietly(ledgerHandler);
+ }
+ }
+
+ /**
+ * Delete all the partitions of the specified log
+ *
+ * @throws IOException if the deletion fails
+ */
+ @Override
+ public void delete() throws IOException {
+ FutureUtils.result(driver.getLogStreamMetadataStore(WRITER)
+ .deleteLog(uri, getStreamName()));
+ }
+
+ /**
+ * The DistributedLogManager may archive/purge any logs for transactionId
+ * less than or equal to minImageTxId.
+ * This is to be used only when the client explicitly manages deletion. If
+ * the cleanup policy is based on sliding time window, then this method need
+ * not be called.
+ *
+ * @param minTxIdToKeep the earliest txid that must be retained
+ * @throws IOException if purging fails
+ */
+ @Override
+ public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException {
+ Preconditions.checkArgument(minTxIdToKeep > 0, "Invalid transaction id " + minTxIdToKeep);
+ checkClosedOrInError("purgeLogSegmentsOlderThan");
+ BKLogWriteHandler ledgerHandler = createWriteHandler(true);
+ try {
+ LOG.info("Purging logs for {} older than {}", ledgerHandler.getFullyQualifiedName(), minTxIdToKeep);
+ FutureUtils.result(ledgerHandler.purgeLogSegmentsOlderThanTxnId(minTxIdToKeep));
+ } finally {
+ Utils.closeQuietly(ledgerHandler);
+ }
+ }
+
+ static class PendingReaders implements AsyncCloseable {
+
+ final ExecutorService executorService;
+ final Set<AsyncCloseable> readers = new HashSet<AsyncCloseable>();
+
+ PendingReaders(ExecutorService executorService) {
+ this.executorService = executorService;
+ }
+
+ public synchronized void remove(AsyncCloseable reader) {
+ readers.remove(reader);
+ }
+
+ public synchronized void add(AsyncCloseable reader) {
+ readers.add(reader);
+ }
+
+ @Override
+ public Future<Void> asyncClose() {
+ return Utils.closeSequence(executorService, true, readers.toArray(new AsyncLogReader[readers.size()]))
+ .onSuccess(new AbstractFunction1<Void, BoxedUnit>() {
+ @Override
+ public BoxedUnit apply(Void value) {
+ readers.clear();
+ return BoxedUnit.UNIT;
+ }
+ });
+ }
+ };
+
+ /**
+ * Close the distributed log manager, freeing any resources it may hold.
+ */
+ @Override
+ public Future<Void> asyncClose() {
+ Promise<Void> closeFuture;
+ BKLogReadHandler readHandlerToClose;
+ synchronized (this) {
+ if (null != closePromise) {
+ return closePromise;
+ }
+ closeFuture = closePromise = new Promise<Void>();
+ readHandlerToClose = readHandlerForListener;
+ }
+
+ Future<Void> closeResult = Utils.closeSequence(null, true,
+ readHandlerToClose,
+ pendingReaders,
+ resourcesCloseable.or(AsyncCloseable.NULL));
+ closeResult.proxyTo(closeFuture);
+ return closeFuture;
+ }
+
+ @Override
+ public void close() throws IOException {
+ FutureUtils.result(asyncClose());
+ }
+
+ @Override
+ public String toString() {
+ return String.format("DLM:%s:%s", getUri(), getStreamName());
+ }
+
+ public void raiseAlert(String msg, Object... args) {
+ alertStatsLogger.raise(msg, args);
+ }
+
+ @Override
+ public SubscriptionsStore getSubscriptionsStore() {
+ return driver.getSubscriptionsStore(getStreamName());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java
new file mode 100644
index 0000000..0a4608e
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java
@@ -0,0 +1,320 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Ticker;
+import org.apache.distributedlog.acl.AccessControlManager;
+import org.apache.distributedlog.callback.NamespaceListener;
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
+import org.apache.distributedlog.exceptions.AlreadyClosedException;
+import org.apache.distributedlog.exceptions.InvalidStreamNameException;
+import org.apache.distributedlog.exceptions.LogNotFoundException;
+import org.apache.distributedlog.injector.AsyncFailureInjector;
+import org.apache.distributedlog.io.AsyncCloseable;
+import org.apache.distributedlog.logsegment.LogSegmentMetadataCache;
+import org.apache.distributedlog.namespace.DistributedLogNamespace;
+import org.apache.distributedlog.namespace.NamespaceDriver;
+import org.apache.distributedlog.util.ConfUtils;
+import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.util.MonitoredScheduledThreadPoolExecutor;
+import org.apache.distributedlog.util.OrderedScheduler;
+import org.apache.distributedlog.util.PermitLimiter;
+import org.apache.distributedlog.util.SchedulerUtils;
+import org.apache.distributedlog.util.Utils;
+import org.apache.bookkeeper.feature.FeatureProvider;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Iterator;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.distributedlog.namespace.NamespaceDriver.Role.WRITER;
+import static org.apache.distributedlog.util.DLUtils.validateName;
+
+/**
+ * BKDistributedLogNamespace is the default implementation of {@link DistributedLogNamespace}. It uses
+ * zookeeper for metadata storage and bookkeeper for data storage.
+ * <h3>Metrics</h3>
+ *
+ * <h4>ZooKeeper Client</h4>
+ * See {@link ZooKeeperClient} for detail sub-stats.
+ * <ul>
+ * <li> `scope`/dlzk_factory_writer_shared/* : stats about the zookeeper client shared by all DL writers.
+ * <li> `scope`/dlzk_factory_reader_shared/* : stats about the zookeeper client shared by all DL readers.
+ * <li> `scope`/bkzk_factory_writer_shared/* : stats about the zookeeper client used by bookkeeper client
+ * shared by all DL writers.
+ * <li> `scope`/bkzk_factory_reader_shared/* : stats about the zookeeper client used by bookkeeper client
+ * shared by all DL readers.
+ * </ul>
+ *
+ * <h4>BookKeeper Client</h4>
+ * BookKeeper client stats are exposed directly to current scope. See {@link BookKeeperClient} for detail stats.
+ *
+ * <h4>Utils</h4>
+ * <ul>
+ * <li> `scope`/factory/thread_pool/* : stats about the ordered scheduler used by this namespace.
+ * See {@link OrderedScheduler}.
+ * <li> `scope`/factory/readahead_thread_pool/* : stats about the readahead thread pool executor
+ * used by this namespace. See {@link MonitoredScheduledThreadPoolExecutor}.
+ * <li> `scope`/writeLimiter/* : stats about the global write limiter used by this namespace.
+ * See {@link PermitLimiter}.
+ * </ul>
+ *
+ * <h4>DistributedLogManager</h4>
+ *
+ * All the core stats about reader and writer are exposed under current scope via {@link BKDistributedLogManager}.
+ */
+public class BKDistributedLogNamespace implements DistributedLogNamespace {
+ static final Logger LOG = LoggerFactory.getLogger(BKDistributedLogNamespace.class);
+
+ private final String clientId;
+ private final int regionId;
+ private final DistributedLogConfiguration conf;
+ private final URI namespace;
+ // namespace driver
+ private final NamespaceDriver driver;
+ // resources
+ private final OrderedScheduler scheduler;
+ private final PermitLimiter writeLimiter;
+ private final AsyncFailureInjector failureInjector;
+ // log segment metadata store
+ private final LogSegmentMetadataCache logSegmentMetadataCache;
+ // feature provider
+ private final FeatureProvider featureProvider;
+ // Stats Loggers
+ private final StatsLogger statsLogger;
+ private final StatsLogger perLogStatsLogger;
+
+ protected final AtomicBoolean closed = new AtomicBoolean(false);
+
+ public BKDistributedLogNamespace(
+ DistributedLogConfiguration conf,
+ URI uri,
+ NamespaceDriver driver,
+ OrderedScheduler scheduler,
+ FeatureProvider featureProvider,
+ PermitLimiter writeLimiter,
+ AsyncFailureInjector failureInjector,
+ StatsLogger statsLogger,
+ StatsLogger perLogStatsLogger,
+ String clientId,
+ int regionId) {
+ this.conf = conf;
+ this.namespace = uri;
+ this.driver = driver;
+ this.scheduler = scheduler;
+ this.featureProvider = featureProvider;
+ this.writeLimiter = writeLimiter;
+ this.failureInjector = failureInjector;
+ this.statsLogger = statsLogger;
+ this.perLogStatsLogger = perLogStatsLogger;
+ this.clientId = clientId;
+ this.regionId = regionId;
+
+ // create a log segment metadata cache
+ this.logSegmentMetadataCache = new LogSegmentMetadataCache(conf, Ticker.systemTicker());
+ }
+
+ @Override
+ public NamespaceDriver getNamespaceDriver() {
+ return driver;
+ }
+
+ //
+ // Namespace Methods
+ //
+
+ @Override
+ public void createLog(String logName)
+ throws InvalidStreamNameException, IOException {
+ checkState();
+ validateName(logName);
+ URI uri = FutureUtils.result(driver.getLogMetadataStore().createLog(logName));
+ FutureUtils.result(driver.getLogStreamMetadataStore(WRITER).getLog(uri, logName, true, true));
+ }
+
+ @Override
+ public void deleteLog(String logName)
+ throws InvalidStreamNameException, LogNotFoundException, IOException {
+ checkState();
+ validateName(logName);
+ Optional<URI> uri = FutureUtils.result(driver.getLogMetadataStore().getLogLocation(logName));
+ if (!uri.isPresent()) {
+ throw new LogNotFoundException("Log " + logName + " isn't found.");
+ }
+ DistributedLogManager dlm = openLogInternal(
+ uri.get(),
+ logName,
+ Optional.<DistributedLogConfiguration>absent(),
+ Optional.<DynamicDistributedLogConfiguration>absent());
+ dlm.delete();
+ }
+
+ @Override
+ public DistributedLogManager openLog(String logName)
+ throws InvalidStreamNameException, IOException {
+ return openLog(logName,
+ Optional.<DistributedLogConfiguration>absent(),
+ Optional.<DynamicDistributedLogConfiguration>absent(),
+ Optional.<StatsLogger>absent());
+ }
+
+ @Override
+ public DistributedLogManager openLog(String logName,
+ Optional<DistributedLogConfiguration> logConf,
+ Optional<DynamicDistributedLogConfiguration> dynamicLogConf,
+ Optional<StatsLogger> perStreamStatsLogger)
+ throws InvalidStreamNameException, IOException {
+ checkState();
+ validateName(logName);
+ Optional<URI> uri = FutureUtils.result(driver.getLogMetadataStore().getLogLocation(logName));
+ if (!uri.isPresent()) {
+ throw new LogNotFoundException("Log " + logName + " isn't found.");
+ }
+ return openLogInternal(
+ uri.get(),
+ logName,
+ logConf,
+ dynamicLogConf);
+ }
+
+ @Override
+ public boolean logExists(String logName)
+ throws IOException, IllegalArgumentException {
+ checkState();
+ Optional<URI> uri = FutureUtils.result(driver.getLogMetadataStore().getLogLocation(logName));
+ if (uri.isPresent()) {
+ try {
+ FutureUtils.result(driver.getLogStreamMetadataStore(WRITER)
+ .logExists(uri.get(), logName));
+ return true;
+ } catch (LogNotFoundException lnfe) {
+ return false;
+ }
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public Iterator<String> getLogs() throws IOException {
+ checkState();
+ return FutureUtils.result(driver.getLogMetadataStore().getLogs());
+ }
+
+ @Override
+ public void registerNamespaceListener(NamespaceListener listener) {
+ driver.getLogMetadataStore().registerNamespaceListener(listener);
+ }
+
+ @Override
+ public synchronized AccessControlManager createAccessControlManager() throws IOException {
+ checkState();
+ return driver.getAccessControlManager();
+ }
+
+ /**
+ * Open the log in location <i>uri</i>.
+ *
+ * @param uri
+ * location to store the log
+ * @param nameOfLogStream
+ * name of the log
+ * @param logConfiguration
+ * optional stream configuration
+ * @param dynamicLogConfiguration
+ * dynamic stream configuration overrides.
+ * @return distributedlog manager instance.
+ * @throws InvalidStreamNameException if the stream name is invalid
+ * @throws IOException
+ */
+ protected DistributedLogManager openLogInternal(
+ URI uri,
+ String nameOfLogStream,
+ Optional<DistributedLogConfiguration> logConfiguration,
+ Optional<DynamicDistributedLogConfiguration> dynamicLogConfiguration)
+ throws InvalidStreamNameException, IOException {
+ // Make sure the name is well formed
+ checkState();
+ validateName(nameOfLogStream);
+
+ DistributedLogConfiguration mergedConfiguration = new DistributedLogConfiguration();
+ mergedConfiguration.addConfiguration(conf);
+ mergedConfiguration.loadStreamConf(logConfiguration);
+ // If dynamic config was not provided, default to a static view of the global configuration.
+ DynamicDistributedLogConfiguration dynConf = null;
+ if (dynamicLogConfiguration.isPresent()) {
+ dynConf = dynamicLogConfiguration.get();
+ } else {
+ dynConf = ConfUtils.getConstDynConf(mergedConfiguration);
+ }
+
+ return new BKDistributedLogManager(
+ nameOfLogStream, /* Log Name */
+ mergedConfiguration, /* Configuration */
+ dynConf, /* Dynamic Configuration */
+ uri, /* Namespace URI */
+ driver, /* Namespace Driver */
+ logSegmentMetadataCache, /* Log Segment Metadata Cache */
+ scheduler, /* DL scheduler */
+ clientId, /* Client Id */
+ regionId, /* Region Id */
+ writeLimiter, /* Write Limiter */
+ featureProvider.scope("dl"), /* Feature Provider */
+ failureInjector, /* Failure Injector */
+ statsLogger, /* Stats Logger */
+ perLogStatsLogger, /* Per Log Stats Logger */
+ Optional.<AsyncCloseable>absent() /* shared resources, we don't need to close any resources in dlm */
+ );
+ }
+
+ /**
+ * Check the namespace state.
+ *
+ * @throws IOException
+ */
+ private void checkState() throws IOException {
+ if (closed.get()) {
+ LOG.error("BK namespace {} is already closed", namespace);
+ throw new AlreadyClosedException("BK namespace " + namespace + " is already closed");
+ }
+ }
+
+ /**
+ * Close the distributed log manager factory, freeing any resources it may hold.
+ */
+ @Override
+ public void close() {
+ if (!closed.compareAndSet(false, true)) {
+ return;
+ }
+ // shutdown the driver
+ Utils.close(driver);
+ // close the write limiter
+ this.writeLimiter.close();
+ // Shutdown the schedulers
+ SchedulerUtils.shutdownScheduler(scheduler, conf.getSchedulerShutdownTimeoutMs(),
+ TimeUnit.MILLISECONDS);
+ LOG.info("Executor Service Stopped.");
+ }
+}