You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/21 17:20:55 UTC

[23/23] incubator-distributedlog git commit: DL-124: Use Java8 Future rather than twitter Future

DL-124: Use Java8 Future rather than twitter Future

Switch to use Java8 CompletableFuture, to reduce dependencies introduced by twitter future and make it more friendly to users (users don't think of using which version of scala).

This change is based on #132 . Gitsha ce0686e is the change to review.

The changes:

- Change Future to CompletableFuture
- Map to thenApply
- flatMap to thenCompose
- Added a FutureEventListener, and switch addEvenListener to whenComplete (or whenCompleteAsync)
- setValue to complete
- setException to completeExceptionally
- add rescue, ignore, ensure to FutureUtils as util functions.

Author: Sijie Guo <si...@apache.org>

Reviewers: Jia Zhai <None>, Leigh Stewart <ls...@apache.org>

Closes #133 from sijie/change_twitter_future_to_java_future


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/53fca4ac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/53fca4ac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/53fca4ac

Branch: refs/heads/master
Commit: 53fca4ac30c0d0f769940669be79c53dcaee3a23
Parents: 0f4ea28
Author: Sijie Guo <si...@apache.org>
Authored: Wed Jun 21 10:20:15 2017 -0700
Committer: Sijie Guo <si...@apache.org>
Committed: Wed Jun 21 10:20:15 2017 -0700

----------------------------------------------------------------------
 distributedlog-benchmark/conf/log4j.properties  |   4 -
 .../benchmark/DLWriterWorker.java               |  26 +-
 .../distributedlog/benchmark/ReaderWorker.java  |  26 +-
 .../distributedlog/benchmark/WriterWorker.java  |   2 +-
 .../benchmark/stream/AsyncReaderBenchmark.java  |  16 +-
 .../benchmark/stream/LedgerReadBenchmark.java   |   6 +-
 .../benchmark/stream/StreamBenchmark.java       |  10 +-
 .../benchmark/stream/SyncReaderBenchmark.java   |  10 +-
 .../resources/distributedlog/checkstyle.xml     |   6 +-
 distributedlog-common/pom.xml                   | 167 ++++++
 .../annotations/DistributedLogAnnotations.java  |  35 ++
 .../common/annotations/package-info.java        |  21 +
 .../common/concurrent/AsyncSemaphore.java       | 160 +++++
 .../common/concurrent/FutureEventListener.java  |  44 ++
 .../common/concurrent/FutureUtils.java          | 376 ++++++++++++
 .../common/concurrent/package-info.java         |  22 +
 .../config/ConcurrentBaseConfiguration.java     |  71 +++
 .../config/ConcurrentConstConfiguration.java    |  32 +
 .../common/config/ConfigurationListener.java    |  32 +
 .../config/ConfigurationSubscription.java       | 187 ++++++
 .../common/config/FileConfigurationBuilder.java |  28 +
 .../config/PropertiesConfigurationBuilder.java  |  39 ++
 .../common/config/package-info.java             |  21 +
 .../common/functions/VoidFunctions.java         |  31 +
 .../common/functions/package-info.java          |  21 +
 .../distributedlog/common/package-info.java     |  22 +
 .../common/rate/MovingAverageRate.java          |  27 +
 .../common/rate/MovingAverageRateFactory.java   |  62 ++
 .../common/rate/SampledMovingAverageRate.java   |  95 +++
 .../common/rate/package-info.java               |  22 +
 .../common/stats/BroadCastStatsLogger.java      | 194 ++++++
 .../common/stats/OpStatsListener.java           |  54 ++
 .../common/stats/package-info.java              |  21 +
 .../common/util/BitMaskUtils.java               |  57 ++
 .../distributedlog/common/util/MathUtil.java    |  36 ++
 .../distributedlog/common/util/Permit.java      |  28 +
 .../common/util/PermitLimiter.java              |  57 ++
 .../common/util/PermitManager.java              |  94 +++
 .../common/util/SchedulerUtils.java             |  43 ++
 .../distributedlog/common/util/Sequencer.java   |  31 +
 .../distributedlog/common/util/Sizable.java     |  31 +
 .../common/util/package-info.java               |  22 +
 .../org/apache/distributedlog/io/Abortable.java |  41 ++
 .../apache/distributedlog/io/Abortables.java    | 185 ++++++
 .../distributedlog/io/AsyncAbortable.java       |  48 ++
 .../distributedlog/io/AsyncCloseable.java       |  46 ++
 .../distributedlog/io/AsyncDeleteable.java      |  34 ++
 .../apache/distributedlog/io/package-info.java  |  21 +
 .../distributedlog/util/OrderedScheduler.java   | 353 +++++++++++
 .../distributedlog/util/package-info.java       |  21 +
 .../src/main/resources/findbugsExclude.xml      |  32 +
 .../common/concurrent/TestFutureUtils.java      | 384 ++++++++++++
 .../common/config/PropertiesWriter.java         |  69 +++
 .../config/TestConcurrentBaseConfiguration.java |  47 ++
 .../config/TestConfigurationSubscription.java   | 173 ++++++
 .../common/util/TestTimedOutTestsListener.java  | 183 ++++++
 .../common/util/TimedOutTestsListener.java      | 168 ++++++
 distributedlog-core-twitter/pom.xml             | 141 +++++
 .../apache/distributedlog/AsyncLogReader.java   |  79 +++
 .../distributedlog/AsyncLogReaderImpl.java      |  78 +++
 .../apache/distributedlog/AsyncLogWriter.java   |  87 +++
 .../distributedlog/AsyncLogWriterImpl.java      |  88 +++
 .../distributedlog/DistributedLogManager.java   | 317 ++++++++++
 .../DistributedLogManagerImpl.java              | 227 +++++++
 .../org/apache/distributedlog/LogReader.java    | 204 +++++++
 .../apache/distributedlog/LogReaderImpl.java    |  63 ++
 .../org/apache/distributedlog/LogWriter.java    |  77 +++
 .../apache/distributedlog/LogWriterImpl.java    |  75 +++
 .../distributedlog/SubscriptionsStoreImpl.java  |  67 +++
 .../namespace/DistributedLogNamespace.java      | 190 ++++++
 .../DistributedLogNamespaceBuilder.java         | 177 ++++++
 .../namespace/DistributedLogNamespaceImpl.java  | 101 ++++
 .../distributedlog/namespace/package-info.java  |  22 +
 .../org/apache/distributedlog/package-info.java |  22 +
 .../distributedlog/stats/OpStatsListener.java   |  55 ++
 .../distributedlog/stats/package-info.java      |  22 +
 .../subscription/SubscriptionsStore.java        |  67 +++
 .../subscription/package-info.java              |  22 +
 .../apache/distributedlog/util/FutureUtils.java | 596 +++++++++++++++++++
 .../distributedlog/util/package-info.java       |  22 +
 .../src/main/resources/findbugsExclude.xml      |  19 +
 .../distributedlog/TestAsyncLogReaderImpl.java  |  91 +++
 .../distributedlog/TestAsyncLogWriterImpl.java  | 117 ++++
 .../TestDistributedLogManagerImpl.java          | 351 +++++++++++
 .../distributedlog/TestLogReaderImpl.java       |  66 ++
 .../distributedlog/TestLogWriterImpl.java       |  82 +++
 .../TestSubscriptionStoreImpl.java              |  91 +++
 .../TestDistributedLogNamespaceBuilder.java     | 104 ++++
 .../TestDistributedLogNamespaceImpl.java        | 119 ++++
 distributedlog-core/conf/log4j.properties       |   4 -
 distributedlog-core/pom.xml                     |  85 +--
 .../distributedlog/AppendOnlyStreamReader.java  |   4 +-
 .../distributedlog/AppendOnlyStreamWriter.java  |  20 +-
 .../apache/distributedlog/AsyncLogReader.java   |  69 ---
 .../apache/distributedlog/AsyncLogWriter.java   |  70 ---
 .../distributedlog/AsyncNotification.java       |   2 +-
 .../distributedlog/BKAbstractLogWriter.java     | 239 ++++----
 .../apache/distributedlog/BKAsyncLogReader.java | 114 ++--
 .../apache/distributedlog/BKAsyncLogWriter.java | 181 +++---
 .../distributedlog/BKDistributedLogManager.java | 405 ++++++-------
 .../BKDistributedLogNamespace.java              |  29 +-
 .../org/apache/distributedlog/BKLogHandler.java | 227 ++++---
 .../apache/distributedlog/BKLogReadHandler.java | 115 ++--
 .../distributedlog/BKLogSegmentWriter.java      | 199 +++----
 .../distributedlog/BKLogWriteHandler.java       | 322 +++++-----
 .../apache/distributedlog/BKSyncLogReader.java  |  41 +-
 .../apache/distributedlog/BKSyncLogWriter.java  |  12 +-
 .../apache/distributedlog/BKTransmitPacket.java |  21 +-
 .../apache/distributedlog/BookKeeperClient.java |  48 +-
 .../DistributedLogConfiguration.java            |   7 +-
 .../distributedlog/DistributedLogManager.java   | 308 ----------
 .../java/org/apache/distributedlog/Entry.java   |   4 +-
 .../apache/distributedlog/EnvelopedEntry.java   |   4 +-
 .../distributedlog/EnvelopedEntryWriter.java    |  12 +-
 .../org/apache/distributedlog/LogReader.java    | 195 ------
 .../distributedlog/LogSegmentMetadata.java      |  23 +-
 .../org/apache/distributedlog/LogWriter.java    |  78 ---
 .../apache/distributedlog/MetadataAccessor.java |  43 --
 .../distributedlog/ReadAheadEntryReader.java    | 115 ++--
 .../org/apache/distributedlog/ReadUtils.java    | 104 ++--
 .../org/apache/distributedlog/WriteLimiter.java |   2 +-
 .../admin/DistributedLogAdmin.java              |  58 +-
 .../distributedlog/api/AsyncLogReader.java      |  69 +++
 .../distributedlog/api/AsyncLogWriter.java      |  70 +++
 .../api/DistributedLogManager.java              | 311 ++++++++++
 .../apache/distributedlog/api/LogReader.java    | 198 ++++++
 .../apache/distributedlog/api/LogWriter.java    |  79 +++
 .../distributedlog/api/MetadataAccessor.java    |  43 ++
 .../distributedlog/api/namespace/Namespace.java | 191 ++++++
 .../api/namespace/NamespaceBuilder.java         | 275 +++++++++
 .../api/namespace/package-info.java             |  22 +
 .../apache/distributedlog/api/package-info.java |  25 +
 .../subscription/SubscriptionStateStore.java    |  39 ++
 .../api/subscription/SubscriptionsStore.java    |  67 +++
 .../distributedlog/auditor/DLAuditor.java       |  49 +-
 .../bk/LedgerAllocatorDelegator.java            |  13 +-
 .../distributedlog/bk/LedgerAllocatorPool.java  |  59 +-
 .../bk/SimpleLedgerAllocator.java               | 126 ++--
 .../callback/ReadAheadCallback.java             |  25 -
 .../config/ConcurrentBaseConfiguration.java     |  76 ---
 .../config/ConcurrentConstConfiguration.java    |  31 -
 .../config/ConfigurationListener.java           |  32 -
 .../config/ConfigurationSubscription.java       | 186 ------
 .../config/DynamicConfigurationFactory.java     |   6 +-
 .../DynamicDistributedLogConfiguration.java     |   1 +
 .../config/FileConfigurationBuilder.java        |  28 -
 .../config/PropertiesConfigurationBuilder.java  |  40 --
 .../feature/ConfigurationFeatureProvider.java   |   2 +-
 .../DynamicConfigurationFeatureProvider.java    |  10 +-
 .../function/CloseAsyncCloseableFunction.java   |  51 --
 .../function/DefaultValueMapFunction.java       |  41 --
 .../function/GetLastTxIdFunction.java           |   7 +-
 .../function/GetVersionedValueFunction.java     |  39 --
 .../distributedlog/function/VoidFunctions.java  |  34 --
 .../distributedlog/impl/BKNamespaceDriver.java  |   4 +-
 .../distributedlog/impl/ZKLogMetadataStore.java |  37 +-
 .../impl/ZKLogSegmentMetadataStore.java         |  33 +-
 .../distributedlog/impl/ZKMetadataAccessor.java |  19 +-
 .../impl/acl/ZKAccessControl.java               |  57 +-
 .../impl/acl/ZKAccessControlManager.java        |  59 +-
 .../federated/FederatedZKLogMetadataStore.java  | 228 ++++---
 .../impl/logsegment/BKLogSegmentAllocator.java  |  22 +-
 .../logsegment/BKLogSegmentEntryReader.java     |  78 +--
 .../impl/logsegment/BKLogSegmentEntryStore.java |  41 +-
 .../BKLogSegmentRandomAccessEntryReader.java    |  30 +-
 .../distributedlog/impl/logsegment/BKUtils.java |  22 +-
 .../impl/metadata/ZKLogStreamMetadataStore.java | 168 +++---
 .../subscription/ZKSubscriptionStateStore.java  |  49 +-
 .../impl/subscription/ZKSubscriptionsStore.java |  84 ++-
 .../org/apache/distributedlog/io/Abortable.java |  41 --
 .../apache/distributedlog/io/Abortables.java    | 183 ------
 .../distributedlog/io/AsyncAbortable.java       |  57 --
 .../distributedlog/io/AsyncCloseable.java       |  60 --
 .../distributedlog/io/AsyncDeleteable.java      |  34 --
 .../apache/distributedlog/io/package-info.java  |  21 -
 .../limiter/ComposableRequestLimiter.java       |   8 +-
 .../distributedlog/lock/DistributedLock.java    |   4 +-
 .../apache/distributedlog/lock/LockWaiter.java  |  26 +-
 .../distributedlog/lock/NopDistributedLock.java |  11 +-
 .../apache/distributedlog/lock/SessionLock.java |  12 +-
 .../distributedlog/lock/SessionLockFactory.java |   4 +-
 .../distributedlog/lock/ZKDistributedLock.java  | 276 ++++-----
 .../distributedlog/lock/ZKSessionLock.java      | 248 ++++----
 .../lock/ZKSessionLockFactory.java              |  41 +-
 .../logsegment/LogSegmentEntryReader.java       |   7 +-
 .../logsegment/LogSegmentEntryStore.java        |   9 +-
 .../logsegment/LogSegmentEntryWriter.java       |   2 +-
 .../logsegment/LogSegmentMetadataStore.java     |  12 +-
 .../LogSegmentRandomAccessEntryReader.java      |   7 +-
 .../logsegment/LogSegmentWriter.java            |  11 +-
 .../logsegment/RollingPolicy.java               |   2 +-
 .../logsegment/SizeBasedRollingPolicy.java      |   2 +-
 .../logsegment/TimeBasedRollingPolicy.java      |   2 +-
 .../distributedlog/metadata/DLMetadata.java     |  11 +-
 .../DryrunLogSegmentMetadataStoreUpdater.java   |   7 +-
 .../metadata/LogMetadataStore.java              |  11 +-
 .../LogSegmentMetadataStoreUpdater.java         |  33 +-
 .../metadata/LogStreamMetadataStore.java        |  16 +-
 .../metadata/MetadataUpdater.java               |  14 +-
 .../namespace/DistributedLogNamespace.java      | 190 ------
 .../DistributedLogNamespaceBuilder.java         | 278 ---------
 .../namespace/NamespaceDriver.java              |   5 +-
 .../distributedlog/rate/MovingAverageRate.java  |  24 -
 .../rate/MovingAverageRateFactory.java          |  65 --
 .../rate/SampledMovingAverageRate.java          |  58 --
 .../distributedlog/readahead/package-info.java  |  21 -
 .../stats/BKExceptionStatsLogger.java           | 109 ----
 .../stats/BroadCastStatsLogger.java             | 194 ------
 .../distributedlog/stats/OpStatsListener.java   |  51 --
 .../subscription/SubscriptionStateStore.java    |  42 --
 .../subscription/SubscriptionsStore.java        |  69 ---
 .../distributedlog/thrift/package-info.java     |   2 +-
 .../tools/DistributedLogTool.java               |  56 +-
 .../apache/distributedlog/util/Allocator.java   |   9 +-
 .../apache/distributedlog/util/ConfUtils.java   |   2 +-
 .../apache/distributedlog/util/FutureUtils.java | 534 -----------------
 .../util/MonitoredFuturePool.java               | 131 ----
 .../MonitoredScheduledThreadPoolExecutor.java   | 257 --------
 .../distributedlog/util/OrderedScheduler.java   | 490 ---------------
 .../distributedlog/util/PermitLimiter.java      |  57 --
 .../distributedlog/util/PermitManager.java      |  93 ---
 .../util/SafeQueueingFuturePool.java            | 115 ----
 .../distributedlog/util/SchedulerUtils.java     |  56 --
 .../apache/distributedlog/util/Sequencer.java   |  31 -
 .../util/SimplePermitLimiter.java               |   5 +-
 .../org/apache/distributedlog/util/Sizable.java |  31 -
 .../distributedlog/util/TimeSequencer.java      |   1 +
 .../apache/distributedlog/util/Transaction.java |  10 +-
 .../org/apache/distributedlog/util/Utils.java   | 239 ++++++--
 .../distributedlog/zk/LimitedPermitManager.java |   2 +-
 .../apache/distributedlog/zk/ZKTransaction.java |  27 +-
 .../src/main/resources/findbugsExclude.xml      |  73 +++
 .../org/apache/distributedlog/DLMTestUtil.java  |  55 +-
 .../NonBlockingReadsTestUtil.java               |  13 +-
 .../TestAppendOnlyStreamReader.java             |   5 +-
 .../TestAppendOnlyStreamWriter.java             |  28 +-
 .../distributedlog/TestAsyncBulkWrite.java      |  32 +-
 .../distributedlog/TestAsyncReaderLock.java     | 165 +++--
 .../distributedlog/TestAsyncReaderWriter.java   | 232 ++++----
 .../TestBKDistributedLogManager.java            | 134 +++--
 .../TestBKDistributedLogNamespace.java          |  21 +-
 .../distributedlog/TestBKLogReadHandler.java    | 119 ++--
 .../distributedlog/TestBKLogSegmentWriter.java  | 101 ++--
 .../distributedlog/TestBKLogWriteHandler.java   |  13 +-
 .../distributedlog/TestBKSyncLogReader.java     |   2 +
 .../distributedlog/TestDistributedLogBase.java  |  24 +-
 .../org/apache/distributedlog/TestEntry.java    |  40 +-
 .../distributedlog/TestInterleavedReaders.java  |  52 +-
 .../distributedlog/TestLogSegmentCreation.java  |  11 +-
 .../distributedlog/TestLogSegmentMetadata.java  |   9 +-
 .../distributedlog/TestLogSegmentsZK.java       |  13 +-
 .../distributedlog/TestNonBlockingReads.java    |  12 +-
 .../TestNonBlockingReadsMultiReader.java        |  12 +-
 .../TestReadAheadEntryReader.java               |  32 +-
 .../apache/distributedlog/TestReadUtils.java    |  77 ++-
 .../org/apache/distributedlog/TestReader.java   |  20 +-
 .../distributedlog/TestRollLogSegments.java     |  32 +-
 .../apache/distributedlog/TestSequenceID.java   |  21 +-
 .../org/apache/distributedlog/TestTruncate.java |  31 +-
 .../apache/distributedlog/TestWriteLimiter.java |   2 -
 .../distributedlog/TestZooKeeperClient.java     |   2 +-
 .../distributedlog/acl/TestZKAccessControl.java |  40 +-
 .../acl/TestZKAccessControlManager.java         |   4 +-
 .../apache/distributedlog/admin/TestDLCK.java   |  11 +-
 .../admin/TestDistributedLogAdmin.java          |  33 +-
 .../distributedlog/bk/TestLedgerAllocator.java  |  64 +-
 .../bk/TestLedgerAllocatorPool.java             |  23 +-
 .../distributedlog/config/PropertiesWriter.java |   8 +-
 .../config/TestConcurrentBaseConfiguration.java |  46 --
 .../config/TestConfigurationSubscription.java   | 171 ------
 .../config/TestDynamicConfigurationFactory.java |   5 +-
 .../TestDynamicDistributedLogConfiguration.java |   2 +
 .../TestConfigurationFeatureProvider.java       |   2 +-
 ...TestDynamicConfigurationFeatureProvider.java |   4 +-
 .../impl/TestZKLogMetadataStore.java            |   7 +-
 .../impl/TestZKLogSegmentMetadataStore.java     | 137 +++--
 .../impl/TestZKNamespaceWatcher.java            |   1 -
 .../TestFederatedZKLogMetadataStore.java        |  70 ++-
 .../logsegment/TestBKLogSegmentEntryReader.java |  57 +-
 .../metadata/TestZKLogStreamMetadataStore.java  |  20 +-
 .../lock/TestDistributedLock.java               | 138 +++--
 .../distributedlog/lock/TestZKSessionLock.java  |  91 ++-
 .../logsegment/TestRollingPolicy.java           |   2 +-
 .../TestLogSegmentMetadataStoreUpdater.java     |  29 +-
 .../TestDistributedLogNamespaceBuilder.java     | 124 ----
 .../namespace/TestNamespaceBuilder.java         | 126 ++++
 .../rate/TestMovingAverageRate.java             |  99 ---
 .../tools/TestDistributedLogTool.java           |   4 +-
 .../distributedlog/util/TestFutureUtils.java    |  71 ---
 .../distributedlog/util/TestPermitManager.java  |   1 +
 .../util/TestSafeQueueingFuturePool.java        | 205 -------
 .../apache/distributedlog/util/TestUtils.java   |   6 +-
 distributedlog-protocol/pom.xml                 |  46 +-
 .../EnvelopedRecordSetWriter.java               |  28 +-
 .../org/apache/distributedlog/LogRecordSet.java |  10 +-
 .../annotations/DistributedLogAnnotations.java  |  35 --
 .../annotations/package-info.java               |  21 -
 .../distributedlog/util/BitMaskUtils.java       |  57 --
 .../distributedlog/util/package-info.java       |  21 -
 .../apache/distributedlog/TestLogRecordSet.java |  21 +-
 .../TestTimedOutTestsListener.java              | 183 ------
 .../distributedlog/TimedOutTestsListener.java   | 168 ------
 .../src/test/resources/log4j.properties         |  51 ++
 distributedlog-proxy-client/pom.xml             |   4 +-
 .../client/DistributedLogMultiStreamWriter.java |   3 +-
 .../protocol/util/TwitterFutureUtils.java       |  91 +++
 distributedlog-proxy-server/pom.xml             |   4 +-
 .../service/DistributedLogServer.java           |   2 +-
 .../service/DistributedLogServiceImpl.java      |  20 +-
 .../distributedlog/service/MonitorService.java  |  10 +-
 .../distributedlog/service/ResponseUtils.java   |   3 +
 .../config/DefaultStreamConfigProvider.java     |   8 +-
 .../service/config/ServerConfiguration.java     |   9 +-
 .../placement/LeastLoadPlacementPolicy.java     |   4 +-
 .../service/placement/PlacementPolicy.java      |   6 +-
 .../placement/ZKPlacementStateManager.java      |   5 +-
 .../service/stream/AbstractStreamOp.java        |   4 +-
 .../service/stream/BulkWriteOp.java             |   8 +-
 .../distributedlog/service/stream/DeleteOp.java |   4 +-
 .../service/stream/HeartbeatOp.java             |   7 +-
 .../service/stream/ReleaseOp.java               |   4 +-
 .../service/stream/StreamFactoryImpl.java       |   6 +-
 .../service/stream/StreamImpl.java              |  72 ++-
 .../service/stream/StreamManagerImpl.java       |   6 +-
 .../distributedlog/service/stream/StreamOp.java |   4 +-
 .../service/stream/StreamOpStats.java           |   2 +-
 .../service/stream/TruncateOp.java              |  14 +-
 .../distributedlog/service/stream/WriteOp.java  |   8 +-
 .../stream/limiter/ServiceRequestLimiter.java   |   2 +-
 .../stream/limiter/StreamAcquireLimiter.java    |   2 +-
 .../service/TestDistributedLogServerBase.java   |  20 +-
 .../service/TestDistributedLogService.java      |  14 +-
 .../placement/TestLeastLoadPlacementPolicy.java |   8 +-
 .../service/stream/TestStreamManager.java       |   6 +-
 .../service/stream/TestStreamOp.java            |   8 +-
 .../limiter/TestServiceRequestLimiter.java      |   2 +-
 .../distributedlog-basic/conf/log4j.properties  |   4 -
 .../distributedlog/basic/AtomicWriter.java      |  31 +-
 .../distributedlog/basic/ConsoleWriter.java     |  27 +-
 .../distributedlog/basic/MultiReader.java       |  18 +-
 .../distributedlog/basic/StreamRewinder.java    |  27 +-
 .../apache/distributedlog/basic/TailReader.java |  30 +-
 .../distributedlog-kafka/conf/log4j.properties  |   4 -
 .../kafka/DLFutureRecordMetadata.java           |  27 +-
 .../mapreduce/DistributedLogInputFormat.java    |  10 +-
 .../conf/log4j.properties                       |   4 -
 .../messaging/ReaderWithOffsets.java            |  37 +-
 .../messaging/StreamTransformer.java            |  48 +-
 pom.xml                                         |  12 +-
 349 files changed, 13188 insertions(+), 9781 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-benchmark/conf/log4j.properties
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/conf/log4j.properties b/distributedlog-benchmark/conf/log4j.properties
index 930db8d..af1cf5f 100644
--- a/distributedlog-benchmark/conf/log4j.properties
+++ b/distributedlog-benchmark/conf/log4j.properties
@@ -30,11 +30,7 @@ log4j.logger.org.apache.zookeeper=INFO
 log4j.logger.org.apache.bookkeeper=INFO
 
 # redirect executor output to executors.log since slow op warnings can be quite verbose
-log4j.logger.org.apache.distributedlog.util.MonitoredFuturePool=INFO, Executors
-log4j.logger.org.apache.distributedlog.util.MonitoredScheduledThreadPoolExecutor=INFO, Executors
 log4j.logger.org.apache.bookkeeper.util.SafeRunnable=INFO, Executors
-log4j.additivity.org.apache.distributedlog.util.MonitoredFuturePool=false
-log4j.additivity.org.apache.distributedlog.util.MonitoredScheduledThreadPoolExecutor=false
 log4j.additivity.org.apache.bookkeeper.util.SafeRunnable=false
 
 log4j.appender.Executors=org.apache.log4j.RollingFileAppender

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/DLWriterWorker.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/DLWriterWorker.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/DLWriterWorker.java
index a5e7a0a..5c9b2a9 100644
--- a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/DLWriterWorker.java
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/DLWriterWorker.java
@@ -19,17 +19,17 @@ package org.apache.distributedlog.benchmark;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-import org.apache.distributedlog.AsyncLogWriter;
+import org.apache.distributedlog.api.AsyncLogWriter;
 import org.apache.distributedlog.DLSN;
 import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.DistributedLogManager;
+import org.apache.distributedlog.api.DistributedLogManager;
 import org.apache.distributedlog.LogRecord;
+import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.benchmark.utils.ShiftableRateLimiter;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
-import org.apache.distributedlog.util.FutureUtils;
-import org.apache.distributedlog.util.SchedulerUtils;
-import com.twitter.util.FutureEventListener;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
+import org.apache.distributedlog.common.util.SchedulerUtils;
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
@@ -66,7 +66,7 @@ public class DLWriterWorker implements Worker {
     final ScheduledExecutorService rescueService;
     final ShiftableRateLimiter rateLimiter;
     final Random random;
-    final DistributedLogNamespace namespace;
+    final Namespace namespace;
     final List<DistributedLogManager> dlms;
     final List<AsyncLogWriter> streamWriters;
     final int numStreams;
@@ -98,7 +98,7 @@ public class DLWriterWorker implements Worker {
         this.rescueService = Executors.newSingleThreadScheduledExecutor();
         this.random = new Random(System.currentTimeMillis());
 
-        this.namespace = DistributedLogNamespaceBuilder.newBuilder()
+        this.namespace = NamespaceBuilder.newBuilder()
                 .conf(conf)
                 .uri(uri)
                 .statsLogger(statsLogger.scope("dl"))
@@ -120,7 +120,7 @@ public class DLWriterWorker implements Worker {
                             FutureUtils.result(writer.asyncClose());
                         }
                         latch.countDown();
-                    } catch (IOException e) {
+                    } catch (Exception e) {
                         LOG.error("Failed to intialize writer for stream : {}", streamName, e);
                     }
 
@@ -148,7 +148,7 @@ public class DLWriterWorker implements Worker {
         if (streamWriters.get(idx) == writer) {
             try {
                 FutureUtils.result(writer.asyncClose());
-            } catch (IOException e) {
+            } catch (Exception e) {
                 LOG.error("Failed to close writer for stream {}.", idx);
             }
             AsyncLogWriter newWriter = null;
@@ -185,7 +185,7 @@ public class DLWriterWorker implements Worker {
         SchedulerUtils.shutdownScheduler(this.executorService, 2, TimeUnit.MINUTES);
         SchedulerUtils.shutdownScheduler(this.rescueService, 2, TimeUnit.MINUTES);
         for (AsyncLogWriter writer : streamWriters) {
-            FutureUtils.result(writer.asyncClose());
+            org.apache.distributedlog.util.Utils.ioResult(writer.asyncClose());
         }
         for (DistributedLogManager dlm : dlms) {
             dlm.close();
@@ -225,7 +225,7 @@ public class DLWriterWorker implements Worker {
                     LOG.error("Error on generating message : ", e);
                     break;
                 }
-                writer.write(new LogRecord(requestMillis, data)).addEventListener(new FutureEventListener<DLSN>() {
+                writer.write(new LogRecord(requestMillis, data)).whenComplete(new FutureEventListener<DLSN>() {
                     @Override
                     public void onSuccess(DLSN value) {
                         requestStat.registerSuccessfulEvent(System.currentTimeMillis() - requestMillis);

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/ReaderWorker.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/ReaderWorker.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/ReaderWorker.java
index 11cba6f..ad95a59 100644
--- a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/ReaderWorker.java
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/ReaderWorker.java
@@ -22,21 +22,21 @@ import static com.google.common.base.Preconditions.checkArgument;
 import com.google.common.base.Stopwatch;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.twitter.common.zookeeper.ServerSet;
-import org.apache.distributedlog.AsyncLogReader;
+import org.apache.distributedlog.api.AsyncLogReader;
 import org.apache.distributedlog.DLSN;
 import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.DistributedLogManager;
+import org.apache.distributedlog.api.DistributedLogManager;
 import org.apache.distributedlog.LogRecordSet;
 import org.apache.distributedlog.LogRecordWithDLSN;
+import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.benchmark.thrift.Message;
 import org.apache.distributedlog.client.serverset.DLZkServerSet;
 import org.apache.distributedlog.exceptions.DLInterruptedException;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
 import org.apache.distributedlog.service.DistributedLogClient;
 import org.apache.distributedlog.service.DistributedLogClientBuilder;
-import org.apache.distributedlog.util.FutureUtils;
-import org.apache.distributedlog.util.SchedulerUtils;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
+import org.apache.distributedlog.common.util.SchedulerUtils;
 import com.twitter.finagle.builder.ClientBuilder;
 import com.twitter.finagle.stats.StatsReceiver;
 import com.twitter.finagle.thrift.ClientId$;
@@ -75,7 +75,7 @@ public class ReaderWorker implements Worker {
     final int endStreamId;
     final ScheduledExecutorService executorService;
     final ExecutorService callbackExecutor;
-    final DistributedLogNamespace namespace;
+    final Namespace namespace;
     final DistributedLogManager[] dlms;
     final AsyncLogReader[] logReaders;
     final StreamReader[] streamReaders;
@@ -100,7 +100,9 @@ public class ReaderWorker implements Worker {
     final Counter invalidRecordsCounter;
     final Counter outOfOrderSequenceIdCounter;
 
-    class StreamReader implements FutureEventListener<List<LogRecordWithDLSN>>, Runnable, Gauge<Number> {
+    class StreamReader implements
+        org.apache.distributedlog.common.concurrent.FutureEventListener<List<LogRecordWithDLSN>>,
+        Runnable, Gauge<Number> {
 
         final int streamIdx;
         final String streamName;
@@ -184,7 +186,7 @@ public class ReaderWorker implements Worker {
             if (!running) {
                 return;
             }
-            logReaders[streamIdx].readBulk(10).addEventListener(this);
+            logReaders[streamIdx].readBulk(10).whenComplete(this);
         }
 
         @Override
@@ -305,7 +307,7 @@ public class ReaderWorker implements Worker {
         }
 
         // construct the factory
-        this.namespace = DistributedLogNamespaceBuilder.newBuilder()
+        this.namespace = NamespaceBuilder.newBuilder()
                 .conf(conf)
                 .uri(uri)
                 .statsLogger(statsLogger.scope("dl"))
@@ -369,7 +371,7 @@ public class ReaderWorker implements Worker {
         if (logReaders[idx] != null) {
             try {
                 FutureUtils.result(logReaders[idx].asyncClose());
-            } catch (IOException e) {
+            } catch (Exception e) {
                 LOG.warn("Failed on closing stream reader {} : ", streamName, e);
             }
             logReaders[idx] = null;
@@ -434,7 +436,7 @@ public class ReaderWorker implements Worker {
         this.running = false;
         for (AsyncLogReader reader : logReaders) {
             if (null != reader) {
-                FutureUtils.result(reader.asyncClose());
+                org.apache.distributedlog.util.Utils.ioResult(reader.asyncClose());
             }
         }
         for (DistributedLogManager dlm : dlms) {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/WriterWorker.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/WriterWorker.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/WriterWorker.java
index fa96dfb..46f9dfc 100644
--- a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/WriterWorker.java
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/WriterWorker.java
@@ -29,7 +29,7 @@ import org.apache.distributedlog.io.CompressionCodec;
 import org.apache.distributedlog.service.DistributedLogClient;
 import org.apache.distributedlog.service.DistributedLogClientBuilder;
 import org.apache.distributedlog.thrift.service.StatusCode;
-import org.apache.distributedlog.util.SchedulerUtils;
+import org.apache.distributedlog.common.util.SchedulerUtils;
 import com.twitter.finagle.builder.ClientBuilder;
 import com.twitter.finagle.stats.StatsReceiver;
 import com.twitter.finagle.thrift.ClientId;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/AsyncReaderBenchmark.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/AsyncReaderBenchmark.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/AsyncReaderBenchmark.java
index 4930b8a..4c8e372 100644
--- a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/AsyncReaderBenchmark.java
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/AsyncReaderBenchmark.java
@@ -18,12 +18,12 @@
 package org.apache.distributedlog.benchmark.stream;
 
 import com.google.common.base.Stopwatch;
-import org.apache.distributedlog.AsyncLogReader;
+import org.apache.distributedlog.api.AsyncLogReader;
 import org.apache.distributedlog.DLSN;
-import org.apache.distributedlog.DistributedLogManager;
+import org.apache.distributedlog.api.DistributedLogManager;
 import org.apache.distributedlog.LogRecordWithDLSN;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -34,14 +34,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Benchmark on {@link org.apache.distributedlog.AsyncLogReader} reading from a stream.
+ * Benchmark on {@link AsyncLogReader} reading from a stream.
  */
 public class AsyncReaderBenchmark extends AbstractReaderBenchmark {
 
     private static final Logger logger = LoggerFactory.getLogger(AsyncReaderBenchmark.class);
 
     @Override
-    protected void benchmark(DistributedLogNamespace namespace, String logName, StatsLogger statsLogger) {
+    protected void benchmark(Namespace namespace, String logName, StatsLogger statsLogger) {
         DistributedLogManager dlm = null;
         while (null == dlm) {
             try {
@@ -112,7 +112,7 @@ public class AsyncReaderBenchmark extends AbstractReaderBenchmark {
                 openReaderStats.registerSuccessfulEvent(elapsedMs);
                 logger.info("It took {} ms to position the reader to transaction id = {}, dlsn = {}",
                         lastTxId, lastDLSN);
-            } catch (IOException ioe) {
+            } catch (Exception ioe) {
                 openReaderStats.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
                 logger.warn("Failed to create reader for stream {} reading from tx id = {}, dlsn = {}.",
                         new Object[] { streamName, lastTxId, lastDLSN });
@@ -141,7 +141,7 @@ public class AsyncReaderBenchmark extends AbstractReaderBenchmark {
                         lastDLSN = lastRecord.getDlsn();
                     }
                     stopwatch.reset();
-                } catch (IOException e) {
+                } catch (Exception e) {
                     logger.warn("Encountered reading record from stream {} : ", streamName, e);
                     reader = null;
                     break;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/LedgerReadBenchmark.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/LedgerReadBenchmark.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/LedgerReadBenchmark.java
index 489e5af..9fb46ad 100644
--- a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/LedgerReadBenchmark.java
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/LedgerReadBenchmark.java
@@ -21,12 +21,12 @@ import static com.google.common.base.Charsets.UTF_8;
 
 import com.google.common.base.Stopwatch;
 import org.apache.distributedlog.BookKeeperClientBuilder;
-import org.apache.distributedlog.DistributedLogManager;
+import org.apache.distributedlog.api.DistributedLogManager;
 import org.apache.distributedlog.LogSegmentMetadata;
 import org.apache.distributedlog.ZooKeeperClient;
 import org.apache.distributedlog.ZooKeeperClientBuilder;
+import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.impl.metadata.BKDLConfig;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -47,7 +47,7 @@ public class LedgerReadBenchmark extends AbstractReaderBenchmark {
     private static final Logger logger = LoggerFactory.getLogger(AsyncReaderBenchmark.class);
 
     @Override
-    protected void benchmark(DistributedLogNamespace namespace, String logName, StatsLogger statsLogger) {
+    protected void benchmark(Namespace namespace, String logName, StatsLogger statsLogger) {
         DistributedLogManager dlm = null;
         while (null == dlm) {
             try {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/StreamBenchmark.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/StreamBenchmark.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/StreamBenchmark.java
index d3083ca..427b9f3 100644
--- a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/StreamBenchmark.java
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/StreamBenchmark.java
@@ -18,8 +18,8 @@
 package org.apache.distributedlog.benchmark.stream;
 
 import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
 import java.io.File;
 import java.net.URI;
 import org.apache.bookkeeper.stats.NullStatsProvider;
@@ -107,8 +107,8 @@ public abstract class StreamBenchmark {
         statsProvider.start(conf);
         // run the benchmark
         StatsLogger statsLogger = statsProvider.getStatsLogger("dl");
-        DistributedLogNamespace namespace =
-                DistributedLogNamespaceBuilder.newBuilder()
+        Namespace namespace =
+                NamespaceBuilder.newBuilder()
                         .conf(conf)
                         .uri(uri)
                         .statsLogger(statsLogger)
@@ -121,7 +121,7 @@ public abstract class StreamBenchmark {
         }
     }
 
-    protected abstract void benchmark(DistributedLogNamespace namespace,
+    protected abstract void benchmark(Namespace namespace,
                                       String logName,
                                       StatsLogger statsLogger);
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/SyncReaderBenchmark.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/SyncReaderBenchmark.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/SyncReaderBenchmark.java
index 4abb317..cbd7f67 100644
--- a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/SyncReaderBenchmark.java
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/SyncReaderBenchmark.java
@@ -18,10 +18,10 @@
 package org.apache.distributedlog.benchmark.stream;
 
 import com.google.common.base.Stopwatch;
-import org.apache.distributedlog.DistributedLogManager;
-import org.apache.distributedlog.LogReader;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.LogReader;
 import org.apache.distributedlog.LogRecord;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
+import org.apache.distributedlog.api.namespace.Namespace;
 import java.io.IOException;
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.stats.Counter;
@@ -31,7 +31,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Benchmark on {@link org.apache.distributedlog.LogReader} reading from a stream.
+ * Benchmark on {@link LogReader} reading from a stream.
  */
 public class SyncReaderBenchmark extends AbstractReaderBenchmark {
 
@@ -40,7 +40,7 @@ public class SyncReaderBenchmark extends AbstractReaderBenchmark {
     public SyncReaderBenchmark() {}
 
     @Override
-    protected void benchmark(DistributedLogNamespace namespace, String streamName, StatsLogger statsLogger) {
+    protected void benchmark(Namespace namespace, String streamName, StatsLogger statsLogger) {
         DistributedLogManager dlm = null;
         while (null == dlm) {
             try {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-build-tools/src/main/resources/distributedlog/checkstyle.xml
----------------------------------------------------------------------
diff --git a/distributedlog-build-tools/src/main/resources/distributedlog/checkstyle.xml b/distributedlog-build-tools/src/main/resources/distributedlog/checkstyle.xml
index e520b2c..28cd4ec 100644
--- a/distributedlog-build-tools/src/main/resources/distributedlog/checkstyle.xml
+++ b/distributedlog-build-tools/src/main/resources/distributedlog/checkstyle.xml
@@ -240,7 +240,7 @@ page at http://checkstyle.sourceforge.net/config.html -->
     <module name="MethodNameCheck">
       <!-- Validates identifiers for method names. -->
       <metadata name="altname" value="MethodName"/>
-      <property name="format" value="^[a-z][a-zA-Z0-9]*(_[a-zA-Z0-9]+)*$"/>
+      <property name="format" value="(^[a-z][a-zA-Z0-9]*(_[a-zA-Z0-9]+)*$|Void)"/>
       <property name="severity" value="error"/>
     </module>
 
@@ -271,12 +271,12 @@ page at http://checkstyle.sourceforge.net/config.html -->
     </module>
 
     <module name="MethodTypeParameterName">
-      <property name="format" value="^(((T|K|V|W|X)[0-9]*)|([A-Z][a-z][a-zA-Z]*T))$"/>
+      <property name="format" value="^(((T|K|V|W|X|R)[0-9]*)|([A-Z][a-z][a-zA-Z]*T))$"/>
       <property name="severity" value="error"/>
     </module>
 
     <module name="InterfaceTypeParameterName">
-      <property name="format" value="^(((T|K|V|W|X)[0-9]*)|([A-Z][a-z][a-zA-Z]*T))$"/>
+      <property name="format" value="^(((T|K|V|W|X|R)[0-9]*)|([A-Z][a-z][a-zA-Z]*T))$"/>
       <property name="severity" value="error"/>
     </module>
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/pom.xml
----------------------------------------------------------------------
diff --git a/distributedlog-common/pom.xml b/distributedlog-common/pom.xml
new file mode 100644
index 0000000..cad2bc8
--- /dev/null
+++ b/distributedlog-common/pom.xml
@@ -0,0 +1,167 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.distributedlog</groupId>
+    <artifactId>distributedlog</artifactId>
+    <version>0.5.0-incubating-SNAPSHOT</version>
+  </parent>
+  <artifactId>distributedlog-common</artifactId>
+  <name>Apache DistributedLog :: Common</name>
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.bookkeeper.stats</groupId>
+      <artifactId>bookkeeper-stats-api</artifactId>
+      <version>${bookkeeper.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <version>${slf4j.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.projectlombok</groupId>
+      <artifactId>lombok</artifactId>
+      <version>${lombok.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.inferred</groupId>
+      <artifactId>freebuilder</artifactId>
+      <version>${freebuilder.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>${guava.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>commons-lang</groupId>
+      <artifactId>commons-lang</artifactId>
+      <version>${commons-lang.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>commons-codec</groupId>
+      <artifactId>commons-codec</artifactId>
+      <version>${commons-codec.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>${junit.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <version>${slf4j.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <version>${mockito.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.jmock</groupId>
+      <artifactId>jmock</artifactId>
+      <version>${jmock.version}</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>findbugs-maven-plugin</artifactId>
+        <configuration>
+          <excludeFilterFile>${basedir}/src/main/resources/findbugsExclude.xml</excludeFilterFile>
+        </configuration>
+      </plugin>
+      <plugin>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>${maven-compiler-plugin.version}</version>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <version>${maven-jar-plugin.version}</version>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <version>${maven-surefire-plugin.version}</version>
+        <configuration>
+          <redirectTestOutputToFile>true</redirectTestOutputToFile>
+          <argLine>-Xmx3G -Djava.net.preferIPv4Stack=true -XX:MaxDirectMemorySize=2G</argLine>
+          <forkMode>always</forkMode>
+          <forkedProcessTimeoutInSeconds>1800</forkedProcessTimeoutInSeconds>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+        <version>${maven-checkstyle-plugin.version}</version>
+        <dependencies>
+          <dependency>
+            <groupId>com.puppycrawl.tools</groupId>
+            <artifactId>checkstyle</artifactId>
+            <version>${puppycrawl.checkstyle.version}</version>
+          </dependency>
+          <dependency>
+            <groupId>org.apache.distributedlog</groupId>
+            <artifactId>distributedlog-build-tools</artifactId>
+            <version>${project.version}</version>
+          </dependency>
+        </dependencies>
+        <configuration>
+          <configLocation>distributedlog/checkstyle.xml</configLocation>
+          <suppressionsLocation>distributedlog/suppressions.xml</suppressionsLocation>
+          <consoleOutput>true</consoleOutput>
+          <failOnViolation>true</failOnViolation>
+          <includeResources>false</includeResources>
+          <includeTestSourceDirectory>true</includeTestSourceDirectory>
+        </configuration>
+        <executions>
+          <execution>
+            <phase>test-compile</phase>
+            <goals>
+              <goal>check</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/annotations/DistributedLogAnnotations.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/annotations/DistributedLogAnnotations.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/annotations/DistributedLogAnnotations.java
new file mode 100644
index 0000000..a5144b8
--- /dev/null
+++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/annotations/DistributedLogAnnotations.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.common.annotations;
+
+/**
+ * Common annotation types.
+ */
+public class DistributedLogAnnotations {
+    /**
+     * Annotation to identify flaky tests in DistributedLog.
+     * As and when we find that a test is flaky, we'll add this annotation to it for reference.
+     */
+    public @interface FlakyTest {}
+
+    /**
+     * Annotation to specify the occurrence of a compression operation. These are CPU intensive
+     * and should be avoided in low-latency paths.
+     */
+    public @interface Compression {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/annotations/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/annotations/package-info.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/annotations/package-info.java
new file mode 100644
index 0000000..a390700
--- /dev/null
+++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/annotations/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Defines annotations used across distributedlog project.
+ */
+package org.apache.distributedlog.common.annotations;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/concurrent/AsyncSemaphore.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/concurrent/AsyncSemaphore.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/concurrent/AsyncSemaphore.java
new file mode 100644
index 0000000..d6a9b34
--- /dev/null
+++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/concurrent/AsyncSemaphore.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.common.concurrent;
+
+import java.util.LinkedList;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.function.Supplier;
+import javax.annotation.concurrent.GuardedBy;
+import org.apache.distributedlog.common.util.Permit;
+
+/**
+ * An AsyncSemaphore is a traditional semaphore but with asynchronous
+ * execution.
+ *
+ * <p>Grabbing a permit returns a `Future[Permit]`.
+ *
+ * <p>Basic usage:
+ * {{{
+ *   val semaphore = new AsyncSemaphore(n)
+ *   ...
+ *   semaphore.acquireAndRun() {
+ *     somethingThatReturnsFutureT()
+ *   }
+ * }}}
+ *
+ * <p>Calls to acquire() and acquireAndRun are serialized, and tickets are
+ * given out fairly (in order of arrival).
+ */
+public class AsyncSemaphore {
+
+    private final Optional<Integer> maxWaiters;
+
+    private final Permit semaphorePermit = new Permit() {
+        @Override
+        public void release() {
+            releasePermit(this);
+        }
+    };
+
+    @GuardedBy("this")
+    private Optional<Throwable> closed = Optional.empty();
+    @GuardedBy("this")
+    private final LinkedList<CompletableFuture<Permit>> waitq;
+    @GuardedBy("this")
+    private int availablePermits;
+
+    public AsyncSemaphore(int initialPermits,
+                          Optional<Integer> maxWaiters) {
+        this.availablePermits = initialPermits;
+        this.waitq = new LinkedList<>();
+        this.maxWaiters = maxWaiters;
+    }
+
+    private synchronized void releasePermit(Permit permit) {
+        CompletableFuture<Permit> next = waitq.pollFirst();
+        if (null != next) {
+            next.complete(permit);
+        } else {
+            availablePermits += 1;
+        }
+    }
+
+    private CompletableFuture<Permit> newFuturePermit() {
+        return FutureUtils.value(semaphorePermit);
+    }
+
+    /**
+     * Acquire a [[Permit]], asynchronously.
+     *
+     * <p>Be sure to `permit.release()` in a
+     * - `finally` block of your `onSuccess` callback
+     * - `ensure` block of your future chain
+     *
+     * <p>Interrupting this future is only advisory, and will not release the permit
+     * if the future has already been satisfied.
+     *
+     * @note This method always return the same instance of [[Permit]].
+     * @return a `Future[Permit]` when the `Future` is satisfied, computation can proceed,
+     *         or a Future.Exception[RejectedExecutionException]` if the configured maximum
+     *         number of waiters would be exceeded.
+     */
+    public synchronized CompletableFuture<Permit> acquire() {
+        if (closed.isPresent()) {
+            return FutureUtils.exception(closed.get());
+        }
+
+        if (availablePermits > 0) {
+            availablePermits -= 1;
+            return newFuturePermit();
+        } else {
+            if (maxWaiters.isPresent() && waitq.size() >= maxWaiters.get()) {
+                return FutureUtils.exception(new RejectedExecutionException("Max waiters exceeded"));
+            } else {
+                CompletableFuture<Permit> future = FutureUtils.createFuture();
+                future.whenComplete((value, cause) -> {
+                    synchronized (AsyncSemaphore.this) {
+                        waitq.remove(future);
+                    }
+                });
+                waitq.addLast(future);
+                return future;
+            }
+        }
+    }
+
+    /**
+     * Fail the semaphore and stop it from distributing further permits. Subsequent
+     * attempts to acquire a permit fail with `exc`. This semaphore's queued waiters
+     * are also failed with `exc`.
+     */
+    public synchronized void fail(Throwable exc) {
+        closed = Optional.of(exc);
+        for (CompletableFuture<Permit> future : waitq) {
+            future.cancel(true);
+        }
+        waitq.clear();
+    }
+
+    /**
+     * Execute the function asynchronously when a permit becomes available.
+     *
+     * <p>If the function throws a non-fatal exception, the exception is returned as part of the Future.
+     * For all exceptions, the permit would be released before returning.
+     *
+     * @return a Future[T] equivalent to the return value of the input function. If the configured
+     *         maximum value of waitq is reached, Future.Exception[RejectedExecutionException] is
+     *         returned.
+     */
+    public <T> CompletableFuture<T> acquireAndRun(Supplier<CompletableFuture<T>> func) {
+        return acquire().thenCompose(permit -> {
+            CompletableFuture<T> future;
+            try {
+                future = func.get();
+                future.whenComplete((value, cause) -> permit.release());
+                return future;
+            } catch (Throwable cause) {
+                permit.release();
+                throw cause;
+            }
+        });
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/concurrent/FutureEventListener.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/concurrent/FutureEventListener.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/concurrent/FutureEventListener.java
new file mode 100644
index 0000000..ed5b9ac
--- /dev/null
+++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/concurrent/FutureEventListener.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.common.concurrent;
+
+import java.util.concurrent.CompletionException;
+import java.util.function.BiConsumer;
+
+/**
+ * Provide similar interface (as twitter future) over java future.
+ */
+public interface FutureEventListener<T> extends BiConsumer<T, Throwable> {
+
+  void onSuccess(T value);
+
+  void onFailure(Throwable cause);
+
+  @Override
+  default void accept(T t, Throwable throwable) {
+    if (null != throwable) {
+      if (throwable instanceof CompletionException && null != throwable.getCause()) {
+        onFailure(throwable.getCause());
+      } else {
+        onFailure(throwable);
+      }
+      return;
+    }
+    onSuccess(t);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/concurrent/FutureUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/concurrent/FutureUtils.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/concurrent/FutureUtils.java
new file mode 100644
index 0000000..15ecf1d
--- /dev/null
+++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/concurrent/FutureUtils.java
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.distributedlog.common.concurrent;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.distributedlog.common.stats.OpStatsListener;
+import org.apache.distributedlog.util.OrderedScheduler;
+
+/**
+ * Future related utils.
+ */
+@Slf4j
+public final class FutureUtils {
+
+    private FutureUtils() {}
+
+    private static final Function<Throwable, Exception> DEFAULT_EXCEPTION_HANDLER = cause -> {
+        if (cause instanceof Exception) {
+            return (Exception) cause;
+        } else {
+            return new Exception(cause);
+        }
+    };
+
+    public static CompletableFuture<Void> Void() {
+        return value(null);
+    }
+
+    public static <T> T result(CompletableFuture<T> future) throws Exception {
+        return FutureUtils.result(future, DEFAULT_EXCEPTION_HANDLER);
+    }
+
+    public static <T> T result(CompletableFuture<T> future, long timeout, TimeUnit timeUnit) throws Exception {
+        return FutureUtils.result(future, DEFAULT_EXCEPTION_HANDLER, timeout, timeUnit);
+    }
+
+    @SneakyThrows(InterruptedException.class)
+    public static <T, ExceptionT extends Throwable> T result(
+        CompletableFuture<T> future, Function<Throwable, ExceptionT> exceptionHandler) throws ExceptionT {
+        try {
+            return future.get();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw e;
+        } catch (ExecutionException e) {
+            ExceptionT cause = exceptionHandler.apply(e.getCause());
+            if (null == cause) {
+                return null;
+            } else {
+                throw cause;
+            }
+        }
+    }
+
+    @SneakyThrows(InterruptedException.class)
+    public static <T, ExceptionT extends Throwable> T result(
+        CompletableFuture<T> future,
+        Function<Throwable, ExceptionT> exceptionHandler,
+        long timeout,
+        TimeUnit timeUnit) throws ExceptionT, TimeoutException {
+        try {
+            return future.get(timeout, timeUnit);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw e;
+        } catch (ExecutionException e) {
+            ExceptionT cause = exceptionHandler.apply(e.getCause());
+            if (null == cause) {
+                return null;
+            } else {
+                throw cause;
+            }
+        }
+    }
+
+    public static <T> CompletableFuture<T> createFuture() {
+        return new CompletableFuture<T>();
+    }
+
+    public static <T> CompletableFuture<T> value(T value) {
+        return CompletableFuture.completedFuture(value);
+    }
+
+    public static <T> CompletableFuture<T> exception(Throwable cause) {
+        CompletableFuture<T> future = FutureUtils.createFuture();
+        future.completeExceptionally(cause);
+        return future;
+    }
+
+    public static <T> void complete(CompletableFuture<T> result,
+                                    T value) {
+        if (null == result) {
+            return;
+        }
+        result.complete(value);
+    }
+
+    public static <T> void completeExceptionally(CompletableFuture<T> result,
+                                                 Throwable cause) {
+        if (null == result) {
+            return;
+        }
+        result.completeExceptionally(cause);
+    }
+
+    /**
+     * Completing the {@code future} in the thread in the scheduler identified by
+     * the {@code scheduleKey}.
+     *
+     * @param future      future to complete
+     * @param action      action to execute when complete
+     * @param scheduler   scheduler to execute the action.
+     * @param scheduleKey key to choose the thread to execute the action
+     * @param <T>
+     * @return
+     */
+    public static <T> CompletableFuture<T> whenCompleteAsync(
+        CompletableFuture<T> future,
+        BiConsumer<? super T, ? super Throwable> action,
+        OrderedScheduler scheduler,
+        Object scheduleKey) {
+        return future.whenCompleteAsync(action, scheduler.chooseExecutor(scheduleKey));
+    }
+
+    public static <T> CompletableFuture<List<T>> collect(List<CompletableFuture<T>> futureList) {
+        CompletableFuture<Void> finalFuture =
+            CompletableFuture.allOf(futureList.toArray(new CompletableFuture[futureList.size()]));
+        return finalFuture.thenApply(result ->
+            futureList
+                .stream()
+                .map(CompletableFuture::join)
+                .collect(Collectors.toList()));
+    }
+
+    public static <T> void proxyTo(CompletableFuture<T> src,
+                                   CompletableFuture<T> target) {
+        src.whenComplete((value, cause) -> {
+            if (null == cause) {
+                target.complete(value);
+            } else {
+                target.completeExceptionally(cause);
+            }
+        });
+    }
+
+    //
+    // Process futures
+    //
+
+    private static class ListFutureProcessor<T, R>
+        implements FutureEventListener<R>, Runnable {
+
+        private volatile boolean done = false;
+        private final Iterator<T> itemsIter;
+        private final Function<T, CompletableFuture<R>> processFunc;
+        private final CompletableFuture<List<R>> promise;
+        private final List<R> results;
+        private final ExecutorService callbackExecutor;
+
+        ListFutureProcessor(List<T> items,
+                            Function<T, CompletableFuture<R>> processFunc,
+                            ExecutorService callbackExecutor) {
+            this.itemsIter = items.iterator();
+            this.processFunc = processFunc;
+            this.promise = new CompletableFuture<>();
+            this.results = Lists.newArrayListWithExpectedSize(items.size());
+            this.callbackExecutor = callbackExecutor;
+        }
+
+        @Override
+        public void onSuccess(R value) {
+            results.add(value);
+            if (null == callbackExecutor) {
+                run();
+            } else {
+                callbackExecutor.submit(this);
+            }
+        }
+
+        @Override
+        public void onFailure(final Throwable cause) {
+            done = true;
+
+            if (null == callbackExecutor) {
+                promise.completeExceptionally(cause);
+            } else {
+                callbackExecutor.submit(new Runnable() {
+                    @Override
+                    public void run() {
+                        promise.completeExceptionally(cause);
+                    }
+                });
+            }
+        }
+
+        @Override
+        public void run() {
+            if (done) {
+                log.debug("ListFutureProcessor is interrupted.");
+                return;
+            }
+            if (!itemsIter.hasNext()) {
+                promise.complete(results);
+                done = true;
+                return;
+            }
+            processFunc.apply(itemsIter.next()).whenComplete(this);
+        }
+    }
+
+    /**
+     * Process the list of items one by one using the process function <i>processFunc</i>.
+     * The process will be stopped immediately if it fails on processing any one.
+     *
+     * @param collection       list of items
+     * @param processFunc      process function
+     * @param callbackExecutor executor to process the item
+     * @return future presents the list of processed results
+     */
+    public static <T, R> CompletableFuture<List<R>> processList(List<T> collection,
+                                                                Function<T, CompletableFuture<R>> processFunc,
+                                                                @Nullable ExecutorService callbackExecutor) {
+        ListFutureProcessor<T, R> processor =
+            new ListFutureProcessor<T, R>(collection, processFunc, callbackExecutor);
+        if (null != callbackExecutor) {
+            callbackExecutor.submit(processor);
+        } else {
+            processor.run();
+        }
+        return processor.promise;
+    }
+
+    /**
+     * Raise an exception to the <i>promise</i> within a given <i>timeout</i> period.
+     * If the promise has been satisfied before raising, it won't change the state of the promise.
+     *
+     * @param promise   promise to raise exception
+     * @param timeout   timeout period
+     * @param unit      timeout period unit
+     * @param cause     cause to raise
+     * @param scheduler scheduler to execute raising exception
+     * @param key       the submit key used by the scheduler
+     * @return the promise applied with the raise logic
+     */
+    public static <T> CompletableFuture<T> within(final CompletableFuture<T> promise,
+                                                  final long timeout,
+                                                  final TimeUnit unit,
+                                                  final Throwable cause,
+                                                  final OrderedScheduler scheduler,
+                                                  final Object key) {
+        if (timeout < 0 || promise.isDone()) {
+            return promise;
+        }
+        // schedule a timeout to raise timeout exception
+        final java.util.concurrent.ScheduledFuture<?> task = scheduler.schedule(key, new Runnable() {
+            @Override
+            public void run() {
+                if (!promise.isDone() && promise.completeExceptionally(cause)) {
+                    log.info("Raise exception", cause);
+                }
+            }
+        }, timeout, unit);
+        // when the promise is satisfied, cancel the timeout task
+        promise.whenComplete((value, throwable) -> {
+                if (!task.cancel(true)) {
+                    log.debug("Failed to cancel the timeout task");
+                }
+            }
+        );
+        return promise;
+    }
+
+    /**
+     * Ignore exception from the <i>future</i>.
+     *
+     * @param future the original future
+     * @return a transformed future ignores exceptions
+     */
+    public static <T> CompletableFuture<Void> ignore(CompletableFuture<T> future) {
+        return ignore(future, null);
+    }
+
+    /**
+     * Ignore exception from the <i>future</i> and log <i>errorMsg</i> on exceptions.
+     *
+     * @param future   the original future
+     * @param errorMsg the error message to log on exceptions
+     * @return a transformed future ignores exceptions
+     */
+    public static <T> CompletableFuture<Void> ignore(CompletableFuture<T> future,
+                                                     final String errorMsg) {
+        final CompletableFuture<Void> promise = new CompletableFuture<Void>();
+        future.whenComplete(new FutureEventListener<T>() {
+            @Override
+            public void onSuccess(T value) {
+                promise.complete(null);
+            }
+
+            @Override
+            public void onFailure(Throwable cause) {
+                if (null != errorMsg) {
+                    log.error(errorMsg, cause);
+                }
+                promise.complete(null);
+            }
+        });
+        return promise;
+    }
+
+    public static <T> CompletableFuture<T> ensure(CompletableFuture<T> future,
+                                                  Runnable ensureBlock) {
+        return future.whenComplete((value, cause) -> {
+            ensureBlock.run();
+        });
+    }
+
+    public static <T> CompletableFuture<T> rescue(CompletableFuture<T> future,
+                                                  Function<Throwable, CompletableFuture<T>> rescueFuc) {
+        CompletableFuture<T> result = FutureUtils.createFuture();
+        future.whenComplete((value, cause) -> {
+            if (null == cause) {
+                result.complete(value);
+                return;
+            }
+            proxyTo(rescueFuc.apply(cause), result);
+        });
+        return result;
+    }
+
+    /**
+      * Add a event listener over <i>result</i> for collecting the operation stats.
+      *
+      * @param result result to listen on
+      * @param opStatsLogger stats logger to record operations stats
+      * @param stopwatch stop watch to time operation
+      * @param <T>
+      * @return result after registered the event listener
+      */
+    public static <T> CompletableFuture<T> stats(CompletableFuture<T> result,
+                                                 OpStatsLogger opStatsLogger,
+                                                 Stopwatch stopwatch) {
+        return result.whenComplete(new OpStatsListener<T>(opStatsLogger, stopwatch));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/concurrent/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/concurrent/package-info.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/concurrent/package-info.java
new file mode 100644
index 0000000..dff0ace
--- /dev/null
+++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/concurrent/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Utility classes commonly useful in concurrent programming.
+ */
+package org.apache.distributedlog.common.concurrent;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/config/ConcurrentBaseConfiguration.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/config/ConcurrentBaseConfiguration.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/config/ConcurrentBaseConfiguration.java
new file mode 100644
index 0000000..83e8e0e
--- /dev/null
+++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/config/ConcurrentBaseConfiguration.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.common.config;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.configuration.AbstractConfiguration;
+
+/**
+ * Configuration view built on concurrent hash map for fast thread-safe access.
+ * Notes:
+ * 1. Multi-property list aggregation will not work in this class. I.e. commons config
+ * normally combines all properties with the same key into one list property automatically.
+ * This class simply overwrites any existing mapping.
+ */
+public class ConcurrentBaseConfiguration extends AbstractConfiguration {
+
+    private final ConcurrentHashMap<String, Object> map;
+
+    public ConcurrentBaseConfiguration() {
+        this.map = new ConcurrentHashMap<String, Object>();
+    }
+
+    @Override
+    protected void addPropertyDirect(String key, Object value) {
+        checkNotNull(value);
+        map.put(key, value);
+    }
+
+    @Override
+    public Object getProperty(String key) {
+        return map.get(key);
+    }
+
+    @Override
+    public Iterator getKeys() {
+        return map.keySet().iterator();
+    }
+
+    @Override
+    public boolean containsKey(String key) {
+        return map.containsKey(key);
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return map.isEmpty();
+    }
+
+    @Override
+    protected void clearPropertyDirect(String key) {
+        map.remove(key);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/config/ConcurrentConstConfiguration.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/config/ConcurrentConstConfiguration.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/config/ConcurrentConstConfiguration.java
new file mode 100644
index 0000000..1131409
--- /dev/null
+++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/config/ConcurrentConstConfiguration.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.common.config;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.commons.configuration.Configuration;
+
+/**
+ * Invariant thread-safe view of some configuration.
+ */
+public class ConcurrentConstConfiguration extends ConcurrentBaseConfiguration {
+    public ConcurrentConstConfiguration(Configuration conf) {
+        checkNotNull(conf);
+        copy(conf);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/config/ConfigurationListener.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/config/ConfigurationListener.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/config/ConfigurationListener.java
new file mode 100644
index 0000000..71bb12a
--- /dev/null
+++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/config/ConfigurationListener.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.common.config;
+
+/**
+ * Configuration listener triggered when reloading configuration settings.
+ */
+public interface ConfigurationListener {
+
+    /**
+     * Reload the configuration.
+     *
+     * @param conf configuration to reload
+     */
+    void onReload(ConcurrentBaseConfiguration conf);
+
+}