You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/01/05 00:51:14 UTC

[09/51] [partial] incubator-distributedlog git commit: DL-4: Repackage the source under apache namespace

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java
new file mode 100644
index 0000000..6b60c77
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java
@@ -0,0 +1,1348 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Stopwatch;
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
+import org.apache.distributedlog.exceptions.BKTransmitException;
+import org.apache.distributedlog.exceptions.EndOfStreamException;
+import org.apache.distributedlog.exceptions.FlushException;
+import org.apache.distributedlog.exceptions.LockingException;
+import org.apache.distributedlog.exceptions.LogRecordTooLongException;
+import org.apache.distributedlog.exceptions.TransactionIdOutOfOrderException;
+import org.apache.distributedlog.exceptions.WriteCancelledException;
+import org.apache.distributedlog.exceptions.WriteException;
+import org.apache.distributedlog.exceptions.InvalidEnvelopedEntryException;
+import org.apache.distributedlog.feature.CoreFeatureKeys;
+import org.apache.distributedlog.injector.FailureInjector;
+import org.apache.distributedlog.injector.RandomDelayFailureInjector;
+import org.apache.distributedlog.io.Buffer;
+import org.apache.distributedlog.io.CompressionCodec;
+import org.apache.distributedlog.io.CompressionUtils;
+import org.apache.distributedlog.lock.DistributedLock;
+import org.apache.distributedlog.logsegment.LogSegmentEntryWriter;
+import org.apache.distributedlog.logsegment.LogSegmentWriter;
+import org.apache.distributedlog.stats.BroadCastStatsLogger;
+import org.apache.distributedlog.stats.OpStatsListener;
+import org.apache.distributedlog.util.FailpointUtils;
+import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.util.OrderedScheduler;
+import org.apache.distributedlog.util.PermitLimiter;
+import org.apache.distributedlog.util.SafeQueueingFuturePool;
+import org.apache.distributedlog.util.SimplePermitLimiter;
+import org.apache.distributedlog.util.Sizable;
+import com.twitter.util.Function0;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
+import com.twitter.util.FuturePool;
+import com.twitter.util.Promise;
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.feature.FeatureProvider;
+import org.apache.bookkeeper.stats.AlertStatsLogger;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.MathUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.runtime.AbstractFunction1;
+import scala.runtime.BoxedUnit;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.apache.distributedlog.LogRecord.MAX_LOGRECORD_SIZE;
+import static org.apache.distributedlog.LogRecord.MAX_LOGRECORDSET_SIZE;
+
+/**
+ * BookKeeper Based Log Segment Writer.
+ *
+ * Multiple log records are packed into a single bookkeeper
+ * entry before sending it over the network. The fact that the log record entries
+ * are complete in the bookkeeper entries means that each bookkeeper log entry
+ * can be read as a complete edit log. This is useful for reading, as we don't
+ * need to read through the entire log segment to get the last written entry.
+ *
+ * <h3>Metrics</h3>
+ *
+ * <ul>
+ * <li> flush/periodic/{success,miss}: counters for periodic flushes.
+ * <li> data/{success,miss}: counters for data transmits.
+ * <li> transmit/packetsize: opstats. characteristics of packet size for transmits.
+ * <li> control/success: counter of success transmit of control records
+ * <li> seg_writer/write: opstats. latency characteristics of write operations in segment writer.
+ * <li> seg_writer/add_complete/{callback,queued,deferred}: opstats. latency components of add completions.
+ * <li> seg_writer/pendings: counter. the number of records pending by the segment writers.
+ * <li> transmit/outstanding/requests: per stream gauge. the number of outstanding transmits each stream.
+ * </ul>
+ */
+class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Sizable {
+    static final Logger LOG = LoggerFactory.getLogger(BKLogSegmentWriter.class);
+
+    private final String fullyQualifiedLogSegment;
+    private final String streamName;
+    private final int logSegmentMetadataVersion;
+    private BKTransmitPacket packetPrevious;
+    private Entry.Writer recordSetWriter;
+    private final AtomicInteger outstandingTransmits;
+    private final int transmissionThreshold;
+    protected final LogSegmentEntryWriter entryWriter;
+    private final CompressionCodec.Type compressionType;
+    private final ReentrantLock transmitLock = new ReentrantLock();
+    private final AtomicInteger transmitResult
+        = new AtomicInteger(BKException.Code.OK);
+    private final DistributedLock lock;
+    private final boolean isDurableWriteEnabled;
+    private DLSN lastDLSN = DLSN.InvalidDLSN;
+    private final long startTxId;
+    private long lastTxId = DistributedLogConstants.INVALID_TXID;
+    private long lastTxIdAcknowledged = DistributedLogConstants.INVALID_TXID;
+    private long outstandingBytes = 0;
+    private long numFlushesSinceRestart = 0;
+    private long numBytes = 0;
+    private long lastEntryId = Long.MIN_VALUE;
+    private long lastTransmitNanos = Long.MIN_VALUE;
+    private final int periodicKeepAliveMs;
+
+    // Indicates whether there are writes that have been successfully transmitted that would need
+    // a control record to be transmitted to make them visible to the readers by updating the last
+    // add confirmed
+    volatile private boolean controlFlushNeeded = false;
+    private boolean immediateFlushEnabled = false;
+    private int minDelayBetweenImmediateFlushMs = 0;
+    private Stopwatch lastTransmit;
+    private boolean streamEnded = false;
+    private final ScheduledFuture<?> periodicFlushSchedule;
+    private final ScheduledFuture<?> periodicKeepAliveSchedule;
+    final private AtomicReference<ScheduledFuture<?>> transmitSchedFutureRef = new AtomicReference<ScheduledFuture<?>>(null);
+    final private AtomicReference<ScheduledFuture<?>> immFlushSchedFutureRef = new AtomicReference<ScheduledFuture<?>>(null);
+    final private AtomicReference<Exception> scheduledFlushException = new AtomicReference<Exception>(null);
+    private boolean enforceLock = true;
+    private Promise<Void> closeFuture = null;
+    private final boolean enableRecordCounts;
+    private int positionWithinLogSegment = 0;
+    private final long logSegmentSequenceNumber;
+    // Used only for values that *could* change (e.g. buffer size etc.)
+    private final DistributedLogConfiguration conf;
+    private final OrderedScheduler scheduler;
+
+    // stats
+    private final StatsLogger envelopeStatsLogger;
+    private final StatsLogger transmitOutstandingLogger;
+    private final Counter transmitDataSuccesses;
+    private final Counter transmitDataMisses;
+    private final Gauge<Number> transmitOutstandingGauge;
+    private final OpStatsLogger transmitDataPacketSize;
+    private final Counter transmitControlSuccesses;
+    private final Counter pFlushSuccesses;
+    private final Counter pFlushMisses;
+    private final OpStatsLogger writeTime;
+    private final OpStatsLogger addCompleteTime;
+    private final OpStatsLogger addCompleteQueuedTime;
+    private final OpStatsLogger addCompleteDeferredTime;
+    private final Counter pendingWrites;
+
+    // add complete processing
+    private final SafeQueueingFuturePool<Void> addCompleteFuturePool;
+
+    // Functions
+    private final AbstractFunction1<Integer, Future<Long>> GET_LAST_TXID_ACKNOWLEDGED_AFTER_TRANSMIT_FUNC =
+            new AbstractFunction1<Integer, Future<Long>>() {
+                @Override
+                public Future<Long> apply(Integer transmitRc) {
+                    if (BKException.Code.OK == transmitRc) {
+                        return Future.value(getLastTxIdAcknowledged());
+                    } else {
+                        return Future.exception(new BKTransmitException("Failed to transmit entry", transmitRc));
+                    }
+                }
+            };
+    final AbstractFunction1<Long, Future<Long>> COMMIT_AFTER_FLUSH_FUNC =
+            new AbstractFunction1<Long, Future<Long>>() {
+                @Override
+                public Future<Long> apply(Long lastAckedTxId) {
+                    return commit();
+                }
+            };
+
+    private final AlertStatsLogger alertStatsLogger;
+    private final WriteLimiter writeLimiter;
+    private final FailureInjector writeDelayInjector;
+
+    /**
+     * Construct an edit log output stream which writes to a ledger.
+     */
+    protected BKLogSegmentWriter(String streamName,
+                                 String logSegmentName,
+                                 DistributedLogConfiguration conf,
+                                 int logSegmentMetadataVersion,
+                                 LogSegmentEntryWriter entryWriter,
+                                 DistributedLock lock, /** the lock needs to be acquired **/
+                                 long startTxId,
+                                 long logSegmentSequenceNumber,
+                                 OrderedScheduler scheduler,
+                                 StatsLogger statsLogger,
+                                 StatsLogger perLogStatsLogger,
+                                 AlertStatsLogger alertStatsLogger,
+                                 PermitLimiter globalWriteLimiter,
+                                 FeatureProvider featureProvider,
+                                 DynamicDistributedLogConfiguration dynConf)
+        throws IOException {
+        super();
+
+        // set up a write limiter
+        PermitLimiter streamWriteLimiter = null;
+        if (conf.getPerWriterOutstandingWriteLimit() < 0) {
+            streamWriteLimiter = PermitLimiter.NULL_PERMIT_LIMITER;
+        } else {
+            Feature disableWriteLimitFeature = featureProvider.getFeature(
+                CoreFeatureKeys.DISABLE_WRITE_LIMIT.name().toLowerCase());
+            streamWriteLimiter = new SimplePermitLimiter(
+                conf.getOutstandingWriteLimitDarkmode(),
+                conf.getPerWriterOutstandingWriteLimit(),
+                statsLogger.scope("streamWriteLimiter"),
+                false,
+                disableWriteLimitFeature);
+        }
+        this.writeLimiter = new WriteLimiter(streamName, streamWriteLimiter, globalWriteLimiter);
+        this.alertStatsLogger = alertStatsLogger;
+        this.envelopeStatsLogger = BroadCastStatsLogger.masterslave(statsLogger, perLogStatsLogger);
+
+        StatsLogger flushStatsLogger = statsLogger.scope("flush");
+        StatsLogger pFlushStatsLogger = flushStatsLogger.scope("periodic");
+        pFlushSuccesses = pFlushStatsLogger.getCounter("success");
+        pFlushMisses = pFlushStatsLogger.getCounter("miss");
+
+        // transmit
+        StatsLogger transmitDataStatsLogger = statsLogger.scope("data");
+        transmitDataSuccesses = transmitDataStatsLogger.getCounter("success");
+        transmitDataMisses = transmitDataStatsLogger.getCounter("miss");
+        StatsLogger transmitStatsLogger = statsLogger.scope("transmit");
+        transmitDataPacketSize =  transmitStatsLogger.getOpStatsLogger("packetsize");
+        StatsLogger transmitControlStatsLogger = statsLogger.scope("control");
+        transmitControlSuccesses = transmitControlStatsLogger.getCounter("success");
+        StatsLogger segWriterStatsLogger = statsLogger.scope("seg_writer");
+        writeTime = segWriterStatsLogger.getOpStatsLogger("write");
+        addCompleteTime = segWriterStatsLogger.scope("add_complete").getOpStatsLogger("callback");
+        addCompleteQueuedTime = segWriterStatsLogger.scope("add_complete").getOpStatsLogger("queued");
+        addCompleteDeferredTime = segWriterStatsLogger.scope("add_complete").getOpStatsLogger("deferred");
+        pendingWrites = segWriterStatsLogger.getCounter("pending");
+
+        // outstanding transmit requests
+        transmitOutstandingLogger = perLogStatsLogger.scope("transmit").scope("outstanding");
+        transmitOutstandingGauge = new Gauge<Number>() {
+            @Override
+            public Number getDefaultValue() {
+                return 0;
+            }
+            @Override
+            public Number getSample() {
+                return outstandingTransmits.get();
+            }
+        };
+        transmitOutstandingLogger.registerGauge("requests", transmitOutstandingGauge);
+
+        outstandingTransmits = new AtomicInteger(0);
+        this.fullyQualifiedLogSegment = streamName + ":" + logSegmentName;
+        this.streamName = streamName;
+        this.logSegmentMetadataVersion = logSegmentMetadataVersion;
+        this.entryWriter = entryWriter;
+        this.lock = lock;
+        this.lock.checkOwnershipAndReacquire();
+
+        final int configuredTransmissionThreshold = dynConf.getOutputBufferSize();
+        if (configuredTransmissionThreshold > MAX_LOGRECORDSET_SIZE) {
+            LOG.warn("Setting output buffer size {} greater than max transmission size {} for log segment {}",
+                new Object[] {configuredTransmissionThreshold, MAX_LOGRECORDSET_SIZE, fullyQualifiedLogSegment});
+            this.transmissionThreshold = MAX_LOGRECORDSET_SIZE;
+        } else {
+            this.transmissionThreshold = configuredTransmissionThreshold;
+        }
+        this.compressionType = CompressionUtils.stringToType(conf.getCompressionType());
+
+        this.logSegmentSequenceNumber = logSegmentSequenceNumber;
+        this.recordSetWriter = Entry.newEntry(
+                streamName,
+                Math.max(transmissionThreshold, 1024),
+                envelopeBeforeTransmit(),
+                compressionType,
+                envelopeStatsLogger);
+        this.packetPrevious = null;
+        this.startTxId = startTxId;
+        this.lastTxId = startTxId;
+        this.lastTxIdAcknowledged = startTxId;
+        this.enableRecordCounts = conf.getEnableRecordCounts();
+        this.immediateFlushEnabled = conf.getImmediateFlushEnabled();
+        this.isDurableWriteEnabled = dynConf.isDurableWriteEnabled();
+        this.scheduler = scheduler;
+
+        // Failure injection
+        if (conf.getEIInjectWriteDelay()) {
+            this.writeDelayInjector = new RandomDelayFailureInjector(dynConf);
+        } else {
+            this.writeDelayInjector = FailureInjector.NULL;
+        }
+
+        // If we are transmitting immediately (threshold == 0) and if immediate
+        // flush is enabled, we don't need the periodic flush task
+        final int configuredPeriodicFlushFrequency = dynConf.getPeriodicFlushFrequencyMilliSeconds();
+        if (!immediateFlushEnabled || (0 != this.transmissionThreshold)) {
+            int periodicFlushFrequency = configuredPeriodicFlushFrequency;
+            if (periodicFlushFrequency > 0 && scheduler != null) {
+                periodicFlushSchedule = scheduler.scheduleAtFixedRate(this,
+                        periodicFlushFrequency/2, periodicFlushFrequency/2, TimeUnit.MILLISECONDS);
+            } else {
+                periodicFlushSchedule = null;
+            }
+        } else {
+            // Min delay heuristic applies only when immediate flush is enabled
+            // and transmission threshold is zero
+            minDelayBetweenImmediateFlushMs = conf.getMinDelayBetweenImmediateFlushMs();
+            periodicFlushSchedule = null;
+        }
+        this.periodicKeepAliveMs = conf.getPeriodicKeepAliveMilliSeconds();
+        if (periodicKeepAliveMs > 0 && scheduler != null) {
+            periodicKeepAliveSchedule = scheduler.scheduleAtFixedRate(new Runnable() {
+                @Override
+                public void run() {
+                    keepAlive();
+                }
+            }, periodicKeepAliveMs, periodicKeepAliveMs, TimeUnit.MILLISECONDS);
+        } else {
+            periodicKeepAliveSchedule = null;
+        }
+
+        this.conf = conf;
+        if (null != scheduler) {
+            this.addCompleteFuturePool = new SafeQueueingFuturePool<Void>(scheduler.getFuturePool(streamName));
+        } else {
+            this.addCompleteFuturePool = null;
+        }
+        assert(!this.immediateFlushEnabled || (null != this.scheduler));
+        this.lastTransmit = Stopwatch.createStarted();
+    }
+
+    String getFullyQualifiedLogSegment() {
+        return fullyQualifiedLogSegment;
+    }
+
+    @VisibleForTesting
+    DistributedLock getLock() {
+        return this.lock;
+    }
+
+    @VisibleForTesting
+    FuturePool getFuturePool() {
+        if (null == scheduler) {
+            return null;
+        }
+        return scheduler.getFuturePool(streamName);
+    }
+
+    @VisibleForTesting
+    void setTransmitResult(int rc) {
+        transmitResult.set(rc);
+    }
+
+    @VisibleForTesting
+    protected final LogSegmentEntryWriter getEntryWriter() {
+        return this.entryWriter;
+    }
+
+    @Override
+    public long getLogSegmentId() {
+        return this.entryWriter.getLogSegmentId();
+    }
+
+    protected final long getLogSegmentSequenceNumber() {
+        return logSegmentSequenceNumber;
+    }
+
+    /**
+     * Get the start tx id of the log segment.
+     *
+     * @return start tx id of the log segment.
+     */
+    protected final long getStartTxId() {
+        return startTxId;
+    }
+
+    /**
+     * Get the last tx id that has been written to the log segment buffer but not committed yet.
+     *
+     * @return last tx id that has been written to the log segment buffer but not committed yet.
+     * @see #getLastTxIdAcknowledged()
+     */
+    synchronized long getLastTxId() {
+        return lastTxId;
+    }
+
+    /**
+     * Get the last tx id that has been acknowledged.
+     *
+     * @return last tx id that has been acknowledged.
+     * @see #getLastTxId()
+     */
+    synchronized long getLastTxIdAcknowledged() {
+        return lastTxIdAcknowledged;
+    }
+
+    /**
+     * Get the position-within-logsemgnet of the last written log record.
+     *
+     * @return position-within-logsegment of the last written log record.
+     */
+    synchronized int getPositionWithinLogSegment() {
+        return positionWithinLogSegment;
+    }
+
+    @VisibleForTesting
+    long getLastEntryId() {
+        return lastEntryId;
+    }
+
+    /**
+     * Get the last dlsn of the last acknowledged record.
+     *
+     * @return last dlsn of the last acknowledged record.
+     */
+    synchronized DLSN getLastDLSN() {
+        return lastDLSN;
+    }
+
+    @Override
+    public long size() {
+        return entryWriter.size();
+    }
+
+    private synchronized int getAverageTransmitSize() {
+        if (numFlushesSinceRestart > 0) {
+            long ret = numBytes/numFlushesSinceRestart;
+
+            if (ret < Integer.MIN_VALUE || ret > Integer.MAX_VALUE) {
+                throw new IllegalArgumentException
+                    (ret + " transmit size should never exceed max transmit size");
+            }
+            return (int) ret;
+        }
+
+        return 0;
+    }
+
+    private Entry.Writer newRecordSetWriter() {
+        return Entry.newEntry(
+                streamName,
+                Math.max(transmissionThreshold, getAverageTransmitSize()),
+                envelopeBeforeTransmit(),
+                compressionType,
+                envelopeStatsLogger);
+    }
+
+    private boolean envelopeBeforeTransmit() {
+        return LogSegmentMetadata.supportsEnvelopedEntries(logSegmentMetadataVersion);
+    }
+
+    @Override
+    public Future<Void> asyncClose() {
+        return closeInternal(false);
+    }
+
+    @Override
+    public Future<Void> asyncAbort() {
+        return closeInternal(true);
+    }
+
+    private void flushAddCompletes() {
+        if (null != addCompleteFuturePool) {
+            addCompleteFuturePool.close();
+        }
+    }
+
+    private synchronized void abortPacket(BKTransmitPacket packet) {
+        long numRecords = 0;
+        if (null != packet) {
+            EntryBuffer recordSet = packet.getRecordSet();
+            numRecords = recordSet.getNumRecords();
+            int rc = transmitResult.get();
+            if (BKException.Code.OK == rc) {
+                rc = BKException.Code.InterruptedException;
+            }
+            Throwable reason = new WriteCancelledException(streamName, FutureUtils.transmitException(rc));
+            recordSet.abortTransmit(reason);
+        }
+        LOG.info("Stream {} aborted {} writes", fullyQualifiedLogSegment, numRecords);
+    }
+
+    private synchronized long getWritesPendingTransmit() {
+        if (null != recordSetWriter) {
+            return recordSetWriter.getNumRecords();
+        } else {
+            return 0;
+        }
+    }
+
+    private synchronized long getPendingAddCompleteCount() {
+        if (null != addCompleteFuturePool) {
+            return addCompleteFuturePool.size();
+        } else {
+            return 0;
+        }
+    }
+
+    private Future<Void> closeInternal(boolean abort) {
+        Promise<Void> closePromise;
+        synchronized (this) {
+            if (null != closeFuture) {
+                return closeFuture;
+            }
+            closePromise = closeFuture = new Promise<Void>();
+        }
+
+        AtomicReference<Throwable> throwExc = new AtomicReference<Throwable>(null);
+        closeInternal(abort, throwExc, closePromise);
+        return closePromise;
+    }
+
+    private void closeInternal(final boolean abort,
+                               final AtomicReference<Throwable> throwExc,
+                               final Promise<Void> closePromise) {
+        // clean stats resources
+        this.transmitOutstandingLogger.unregisterGauge("requests", transmitOutstandingGauge);
+        this.writeLimiter.close();
+
+        // Cancel the periodic keep alive schedule first
+        if (null != periodicKeepAliveSchedule) {
+            if (!periodicKeepAliveSchedule.cancel(false)) {
+                LOG.info("Periodic keepalive for log segment {} isn't cancelled.", getFullyQualifiedLogSegment());
+            }
+        }
+
+        // Cancel the periodic flush schedule first
+        // The task is allowed to exit gracefully
+        if (null != periodicFlushSchedule) {
+            // we don't need to care about the cancel result here. if the periodicl flush task couldn't
+            // be cancelled, it means that it is doing flushing. So following flushes would be synchronized
+            // to wait until background flush completed.
+            if (!periodicFlushSchedule.cancel(false)) {
+                LOG.info("Periodic flush for log segment {} isn't cancelled.", getFullyQualifiedLogSegment());
+            }
+        }
+
+        // If it is a normal close and the stream isn't in an error state, we attempt to flush any buffered data
+        if (!abort && !isLogSegmentInError()) {
+            this.enforceLock = false;
+            LOG.info("Flushing before closing log segment {}", getFullyQualifiedLogSegment());
+            flushAndCommit().addEventListener(new FutureEventListener<Long>() {
+                @Override
+                public void onSuccess(Long value) {
+                    abortTransmitPacketOnClose(abort, throwExc, closePromise);
+                }
+
+                @Override
+                public void onFailure(Throwable cause) {
+                    throwExc.set(cause);
+                    abortTransmitPacketOnClose(abort, throwExc, closePromise);
+                }
+            });
+        } else {
+            abortTransmitPacketOnClose(abort, throwExc, closePromise);
+        }
+
+    }
+
+    private void abortTransmitPacketOnClose(final boolean abort,
+                                            final AtomicReference<Throwable> throwExc,
+                                            final Promise<Void> closePromise) {
+        LOG.info("Closing BKPerStreamLogWriter (abort={}) for {} :" +
+                        " lastDLSN = {} outstandingTransmits = {} writesPendingTransmit = {} addCompletesPending = {}",
+                new Object[]{abort, fullyQualifiedLogSegment, getLastDLSN(),
+                        outstandingTransmits.get(), getWritesPendingTransmit(), getPendingAddCompleteCount()});
+
+        // Save the current packet to reset, leave a new empty packet to avoid a race with
+        // addCompleteDeferredProcessing.
+        final BKTransmitPacket packetPreviousSaved;
+        final BKTransmitPacket packetCurrentSaved;
+        synchronized (this) {
+            packetPreviousSaved = packetPrevious;
+            packetCurrentSaved = new BKTransmitPacket(recordSetWriter);
+            recordSetWriter = newRecordSetWriter();
+        }
+
+        // Once the last packet been transmitted, apply any remaining promises asynchronously
+        // to avoid blocking close if bk client is slow for some reason.
+        if (null != packetPreviousSaved) {
+            packetPreviousSaved.addTransmitCompleteListener(new FutureEventListener<Integer>() {
+                @Override
+                public void onSuccess(Integer transmitResult) {
+                    flushAddCompletes();
+                    abortPacket(packetCurrentSaved);
+                }
+                @Override
+                public void onFailure(Throwable cause) {
+                    LOG.error("Unexpected error on transmit completion ", cause);
+                }
+            });
+        } else {
+            // In this case there are no pending add completes, but we still need to abort the
+            // current packet.
+            abortPacket(packetCurrentSaved);
+        }
+        closeLedgerOnClose(abort, throwExc, closePromise);
+    }
+
+    private void closeLedgerOnClose(final boolean abort,
+                                    final AtomicReference<Throwable> throwExc,
+                                    final Promise<Void> closePromise) {
+        // close the log segment if it isn't in error state, so all the outstanding addEntry(s) will callback.
+        if (null == throwExc.get() && !isLogSegmentInError()) {
+            // Synchronous closing the ledger handle, if we couldn't close a ledger handle successfully.
+            // we should throw the exception to #closeToFinalize, so it would fail completing a log segment.
+            entryWriter.asyncClose(new CloseCallback() {
+                @Override
+                public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
+                    if (BKException.Code.OK != rc && BKException.Code.LedgerClosedException != rc) {
+                        if (!abort) {
+                            throwExc.set(new IOException("Failed to close ledger for " + fullyQualifiedLogSegment + " : " +
+                                    BKException.getMessage(rc)));
+                        }
+                    }
+                    completeClosePromise(abort, throwExc, closePromise);
+                }
+            }, null);
+        } else {
+            completeClosePromise(abort, throwExc, closePromise);
+        }
+    }
+
+    private void completeClosePromise(final boolean abort,
+                                      final AtomicReference<Throwable> throwExc,
+                                      final Promise<Void> closePromise) {
+        // If add entry failed because of closing ledger above, we don't need to fail the close operation
+        if (!abort && null == throwExc.get() && shouldFailCompleteLogSegment()) {
+            throwExc.set(new BKTransmitException("Closing an errored stream : ", transmitResult.get()));
+        }
+
+        if (null == throwExc.get()) {
+            FutureUtils.setValue(closePromise, null);
+        } else {
+            FutureUtils.setException(closePromise, throwExc.get());
+        }
+    }
+
+    @Override
+    synchronized public void write(LogRecord record) throws IOException {
+        writeUserRecord(record);
+        flushIfNeeded();
+    }
+
+    @Override
+    synchronized public Future<DLSN> asyncWrite(LogRecord record) {
+        return asyncWrite(record, true);
+    }
+
+    synchronized public Future<DLSN> asyncWrite(LogRecord record, boolean flush) {
+        Future<DLSN> result = null;
+        try {
+            if (record.isControl()) {
+                // we don't pack control records with user records together
+                // so transmit current output buffer if possible
+                try {
+                    transmit();
+                } catch (IOException ioe) {
+                    return Future.exception(new WriteCancelledException(fullyQualifiedLogSegment, ioe));
+                }
+                result = writeControlLogRecord(record);
+                transmit();
+            } else {
+                result = writeUserRecord(record);
+                if (!isDurableWriteEnabled) {
+                    // we have no idea about the DLSN if durability is turned off.
+                    result = Future.value(DLSN.InvalidDLSN);
+                }
+                if (flush) {
+                    flushIfNeeded();
+                }
+            }
+        } catch (IOException ioe) {
+            // We may incorrectly report transmit failure here, but only if we happened to hit
+            // packet/xmit size limit conditions AND fail flush above, which should happen rarely
+            if (null != result) {
+                LOG.error("Overriding first result with flush failure {}", result);
+            }
+            result = Future.exception(ioe);
+
+            // Flush to ensure any prev. writes with flush=false are flushed despite failure.
+            flushIfNeededNoThrow();
+        }
+        return result;
+    }
+
+    synchronized private Future<DLSN> writeUserRecord(LogRecord record) throws IOException {
+        if (null != closeFuture) {
+            throw new WriteException(fullyQualifiedLogSegment, BKException.getMessage(BKException.Code.LedgerClosedException));
+        }
+
+        if (BKException.Code.OK != transmitResult.get()) {
+            // Failfast if the stream already encountered error with safe retry on the client
+            throw new WriteException(fullyQualifiedLogSegment, BKException.getMessage(transmitResult.get()));
+        }
+
+        if (streamEnded) {
+            throw new EndOfStreamException("Writing to a stream after it has been marked as completed");
+        }
+
+        if ((record.getTransactionId() < 0) ||
+            (record.getTransactionId() == DistributedLogConstants.MAX_TXID)) {
+            throw new TransactionIdOutOfOrderException(record.getTransactionId());
+        }
+
+        // Inject write delay if configured to do so
+        writeDelayInjector.inject();
+
+        // Will check write rate limits and throw if exceeded.
+        writeLimiter.acquire();
+        pendingWrites.inc();
+
+        // The count represents the number of user records up to the
+        // current record
+        // Increment the record count only when writing a user log record
+        // Internally generated log records don't increment the count
+        // writeInternal will always set a count regardless of whether it was
+        // incremented or not.
+        Future<DLSN> future = null;
+        try {
+            // increment the position for the record to write
+            // if the record is failed to write, it would be decremented.
+            positionWithinLogSegment++;
+            int numRecords = 1;
+            if (record.isRecordSet()) {
+                numRecords = LogRecordSet.numRecords(record);
+            }
+            future = writeInternal(record);
+            // after the record (record set) is written, the position should be
+            // moved for {numRecords}, but since we already moved the record by 1
+            // so advance the position for other {numRecords - 1}.
+            positionWithinLogSegment += (numRecords - 1);
+        } catch (IOException ex) {
+            writeLimiter.release();
+            pendingWrites.dec();
+            positionWithinLogSegment--;
+            throw ex;
+        }
+
+        // Track outstanding requests and return the future.
+        return future.ensure(new Function0<BoxedUnit>() {
+            public BoxedUnit apply() {
+                pendingWrites.dec();
+                writeLimiter.release();
+                return null;
+            }
+        });
+    }
+
+    boolean isLogSegmentInError() {
+        return (transmitResult.get() != BKException.Code.OK);
+    }
+
+    boolean shouldFailCompleteLogSegment() {
+        return (transmitResult.get() != BKException.Code.OK) &&
+                (transmitResult.get() != BKException.Code.LedgerClosedException);
+    }
+
+    synchronized public Future<DLSN> writeInternal(LogRecord record)
+            throws LogRecordTooLongException, LockingException, BKTransmitException,
+                   WriteException, InvalidEnvelopedEntryException {
+        int logRecordSize = record.getPersistentSize();
+
+        if (logRecordSize > MAX_LOGRECORD_SIZE) {
+            throw new LogRecordTooLongException(String.format(
+                    "Log Record of size %d written when only %d is allowed",
+                    logRecordSize, MAX_LOGRECORD_SIZE));
+        }
+
+        // If we will exceed the max number of bytes allowed per entry
+        // initiate a transmit before accepting the new log record
+        if ((recordSetWriter.getNumBytes() + logRecordSize) > MAX_LOGRECORDSET_SIZE) {
+            checkStateAndTransmit();
+        }
+
+        checkWriteLock();
+
+        if (enableRecordCounts) {
+            // Set the count here. The caller would appropriately increment it
+            // if this log record is to be counted
+            record.setPositionWithinLogSegment(positionWithinLogSegment);
+        }
+
+        Promise<DLSN> writePromise = new Promise<DLSN>();
+        writePromise.addEventListener(new OpStatsListener<DLSN>(writeTime));
+        recordSetWriter.writeRecord(record, writePromise);
+
+        if (record.getTransactionId() < lastTxId) {
+            LOG.info("Log Segment {} TxId decreased Last: {} Record: {}",
+                    new Object[] {fullyQualifiedLogSegment, lastTxId, record.getTransactionId()});
+        }
+        if (!record.isControl()) {
+            // only update last tx id for user records
+            lastTxId = record.getTransactionId();
+            outstandingBytes += (20 + record.getPayload().length);
+        }
+        return writePromise;
+    }
+
+    synchronized private Future<DLSN> writeControlLogRecord()
+            throws BKTransmitException, WriteException, InvalidEnvelopedEntryException,
+                   LockingException, LogRecordTooLongException {
+        LogRecord controlRec = new LogRecord(lastTxId, DistributedLogConstants.CONTROL_RECORD_CONTENT);
+        controlRec.setControl();
+        return writeControlLogRecord(controlRec);
+    }
+
+    synchronized private Future<DLSN> writeControlLogRecord(LogRecord record)
+            throws BKTransmitException, WriteException, InvalidEnvelopedEntryException,
+                   LockingException, LogRecordTooLongException {
+        return writeInternal(record);
+    }
+
+    /**
+     * We write a special log record that marks the end of the stream. Since this is the last
+     * log record in the stream, it is marked with MAX_TXID. MAX_TXID also has the useful
+     * side-effect of disallowing future startLogSegment calls through the MaxTxID check
+     *
+     * @throws IOException
+     */
+    synchronized private void writeEndOfStreamMarker() throws IOException {
+        LogRecord endOfStreamRec = new LogRecord(DistributedLogConstants.MAX_TXID, "endOfStream".getBytes(UTF_8));
+        endOfStreamRec.setEndOfStream();
+        writeInternal(endOfStreamRec);
+    }
+
+    /**
+     * Flushes all the data up to this point,
+     * adds the end of stream marker and marks the stream
+     * as read-only in the metadata. No appends to the
+     * stream will be allowed after this point
+     */
+    public Future<Long> markEndOfStream() {
+        synchronized (this) {
+            try {
+                writeEndOfStreamMarker();
+            } catch (IOException e) {
+                return Future.exception(e);
+            }
+            streamEnded = true;
+        }
+        return flushAndCommit();
+    }
+
+    /**
+     * Write bulk of records.
+     *
+     * (TODO: moved this method to log writer level)
+     *
+     * @param records list of records to write
+     * @return number of records that has been written
+     * @throws IOException when there is I/O errors during writing records.
+     */
+    synchronized public int writeBulk(List<LogRecord> records) throws IOException {
+        int numRecords = 0;
+        for (LogRecord r : records) {
+            write(r);
+            numRecords++;
+        }
+        return numRecords;
+    }
+
+    private void checkStateBeforeTransmit() throws WriteException {
+        try {
+            FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_TransmitBeforeAddEntry);
+        } catch (IOException e) {
+            throw new WriteException(streamName, "Fail transmit before adding entries");
+        }
+    }
+
+    /**
+     * Transmit the output buffer data to the backend.
+     *
+     * @return last txn id that already acknowledged
+     * @throws BKTransmitException if the segment writer is already in error state
+     * @throws LockingException if the segment writer lost lock before transmit
+     * @throws WriteException if failed to create the envelope for the data to transmit
+     * @throws InvalidEnvelopedEntryException when built an invalid enveloped entry
+     */
+    synchronized void checkStateAndTransmit()
+            throws BKTransmitException, WriteException, InvalidEnvelopedEntryException, LockingException {
+        checkStateBeforeTransmit();
+        transmit();
+    }
+
+    @Override
+    public synchronized Future<Long> flush() {
+        try {
+            checkStateBeforeTransmit();
+        } catch (WriteException e) {
+            return Future.exception(e);
+        }
+
+        Future<Integer> transmitFuture;
+        try {
+            transmitFuture = transmit();
+        } catch (BKTransmitException e) {
+            return Future.exception(e);
+        } catch (LockingException e) {
+            return Future.exception(e);
+        } catch (WriteException e) {
+            return Future.exception(e);
+        } catch (InvalidEnvelopedEntryException e) {
+            return Future.exception(e);
+        }
+
+        if (null == transmitFuture) {
+            if (null != packetPrevious) {
+                transmitFuture = packetPrevious.getTransmitFuture();
+            }  else {
+                return Future.value(getLastTxIdAcknowledged());
+            }
+        }
+
+        return transmitFuture.flatMap(GET_LAST_TXID_ACKNOWLEDGED_AFTER_TRANSMIT_FUNC);
+    }
+
+    @Override
+    public synchronized Future<Long> commit() {
+        // we don't pack control records with user records together
+        // so transmit current output buffer if possible
+        Future<Integer> transmitFuture;
+        try {
+            try {
+                transmitFuture = transmit();
+            } catch (IOException ioe) {
+                return Future.exception(ioe);
+            }
+            if (null == transmitFuture) {
+                writeControlLogRecord();
+                return flush();
+            }
+        } catch (IOException ioe) {
+            return Future.exception(ioe);
+        }
+        return transmitFuture.flatMap(GET_LAST_TXID_ACKNOWLEDGED_AFTER_TRANSMIT_FUNC);
+    }
+
+    Future<Long> flushAndCommit() {
+        return flush().flatMap(COMMIT_AFTER_FLUSH_FUNC);
+    }
+
+    void flushIfNeededNoThrow() {
+        try {
+            flushIfNeeded();
+        } catch (IOException ioe) {
+            LOG.error("Encountered exception while flushing log records to stream {}",
+                fullyQualifiedLogSegment, ioe);
+        }
+    }
+
+    void scheduleFlushWithDelayIfNeeded(final Callable<?> callable,
+                                        final AtomicReference<ScheduledFuture<?>> scheduledFutureRef) {
+        final long delayMs = Math.max(0, minDelayBetweenImmediateFlushMs - lastTransmit.elapsed(TimeUnit.MILLISECONDS));
+        final ScheduledFuture<?> scheduledFuture = scheduledFutureRef.get();
+        if ((null == scheduledFuture) || scheduledFuture.isDone()) {
+            scheduledFutureRef.set(scheduler.schedule(new Runnable() {
+                @Override
+                public void run() {
+                    synchronized(this) {
+                        scheduledFutureRef.set(null);
+                        try {
+                            callable.call();
+
+                            // Flush was successful or wasn't needed, the exception should be unset.
+                            scheduledFlushException.set(null);
+                        } catch (Exception exc) {
+                            scheduledFlushException.set(exc);
+                            LOG.error("Delayed flush failed", exc);
+                        }
+                    }
+                }
+            }, delayMs, TimeUnit.MILLISECONDS));
+        }
+    }
+
+    // Based on transmit buffer size, immediate flush, etc., should we flush the current
+    // packet now.
+    void flushIfNeeded() throws BKTransmitException, WriteException, InvalidEnvelopedEntryException,
+            LockingException, FlushException {
+        if (outstandingBytes > transmissionThreshold) {
+            // If flush delay is disabled, flush immediately, else schedule appropriately.
+            if (0 == minDelayBetweenImmediateFlushMs) {
+                checkStateAndTransmit();
+            } else {
+                scheduleFlushWithDelayIfNeeded(new Callable<Void>() {
+                    @Override
+                    public Void call() throws Exception {
+                        checkStateAndTransmit();
+                        return null;
+                    }
+                }, transmitSchedFutureRef);
+
+                // Timing here is not very important--the last flush failed and we should
+                // indicate this to the caller. The next flush may succeed and unset the
+                // scheduledFlushException in which case the next write will succeed (if the caller
+                // hasn't already closed the writer).
+                if (scheduledFlushException.get() != null) {
+                    throw new FlushException("Last flush encountered an error while writing data to the backend",
+                        getLastTxId(), getLastTxIdAcknowledged(), scheduledFlushException.get());
+                }
+            }
+        }
+    }
+
+    private void checkWriteLock() throws LockingException {
+        try {
+            if (FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_WriteInternalLostLock)) {
+                throw new LockingException("/failpoint/lockpath", "failpoint is simulating a lost lock"
+                        + getFullyQualifiedLogSegment());
+            }
+        } catch (IOException e) {
+            throw new LockingException("/failpoint/lockpath", "failpoint is simulating a lost lock for "
+                    + getFullyQualifiedLogSegment());
+        }
+        if (enforceLock) {
+            lock.checkOwnershipAndReacquire();
+        }
+    }
+
+    /**
+     * Transmit the current buffer to bookkeeper.
+     * Synchronised at the class. #write() and #setReadyToFlush()
+     * are never called at the same time.
+     *
+     * NOTE: This method should only throw known exceptions so that we don't accidentally
+     *       add new code that throws in an inappropriate place.
+     *
+     * @return a transmit future for caller to wait for transmit result if we transmit successfully,
+     *         null if no data to transmit
+     * @throws BKTransmitException if the segment writer is already in error state
+     * @throws LockingException if the segment writer lost lock before transmit
+     * @throws WriteException if failed to create the envelope for the data to transmit
+     * @throws InvalidEnvelopedEntryException when built an invalid enveloped entry
+     */
+    private Future<Integer> transmit()
+        throws BKTransmitException, LockingException, WriteException, InvalidEnvelopedEntryException {
+        EntryBuffer recordSetToTransmit;
+        transmitLock.lock();
+        try {
+            synchronized (this) {
+                checkWriteLock();
+                // If transmitResult is anything other than BKException.Code.OK, it means that the
+                // stream has encountered an error and cannot be written to.
+                if (!transmitResult.compareAndSet(BKException.Code.OK,
+                                                  BKException.Code.OK)) {
+                    LOG.error("Log Segment {} Trying to write to an errored stream; Error is {}",
+                              fullyQualifiedLogSegment,
+                              BKException.getMessage(transmitResult.get()));
+                    throw new BKTransmitException("Trying to write to an errored stream;"
+                                                          + " Error code : (" + transmitResult.get()
+                                                          + ") " + BKException.getMessage(transmitResult.get()), transmitResult.get());
+                }
+
+                if (recordSetWriter.getNumRecords() == 0) {
+                    // Control flushes always have at least the control record to flush
+                    transmitDataMisses.inc();
+                    return null;
+                }
+
+                recordSetToTransmit = recordSetWriter;
+                recordSetWriter = newRecordSetWriter();
+                outstandingBytes = 0;
+
+                if (recordSetToTransmit.hasUserRecords()) {
+                    numBytes += recordSetToTransmit.getNumBytes();
+                    numFlushesSinceRestart++;
+                }
+            }
+
+            Buffer toSend;
+            try {
+                toSend = recordSetToTransmit.getBuffer();
+                FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_TransmitFailGetBuffer);
+            } catch (IOException e) {
+                if (e instanceof InvalidEnvelopedEntryException) {
+                    alertStatsLogger.raise("Invalid enveloped entry for segment {} : ", fullyQualifiedLogSegment, e);
+                }
+                LOG.error("Exception while enveloping entries for segment: {}",
+                          new Object[] {fullyQualifiedLogSegment}, e);
+                // If a write fails here, we need to set the transmit result to an error so that
+                // no future writes go through and violate ordering guarantees.
+                transmitResult.set(BKException.Code.WriteException);
+                if (e instanceof InvalidEnvelopedEntryException) {
+                    alertStatsLogger.raise("Invalid enveloped entry for segment {} : ", fullyQualifiedLogSegment, e);
+                    throw (InvalidEnvelopedEntryException) e;
+                } else {
+                    throw new WriteException(streamName, "Envelope Error");
+                }
+            }
+
+            synchronized (this) {
+                // update the transmit timestamp
+                lastTransmitNanos = MathUtils.nowInNano();
+
+                BKTransmitPacket packet = new BKTransmitPacket(recordSetToTransmit);
+                packetPrevious = packet;
+                entryWriter.asyncAddEntry(toSend.getData(), 0, toSend.size(),
+                                          this, packet);
+
+                if (recordSetToTransmit.hasUserRecords()) {
+                    transmitDataSuccesses.inc();
+                } else {
+                    transmitControlSuccesses.inc();
+                }
+
+                lastTransmit.reset().start();
+                outstandingTransmits.incrementAndGet();
+                controlFlushNeeded = false;
+                return packet.getTransmitFuture();
+            }
+        } finally {
+            transmitLock.unlock();
+        }
+    }
+
+    /**
+     *  Checks if there is any data to transmit so that the periodic flush
+     *  task can determine if there is anything it needs to do
+     */
+    synchronized private boolean haveDataToTransmit() {
+        if (!transmitResult.compareAndSet(BKException.Code.OK, BKException.Code.OK)) {
+            // Even if there is data it cannot be transmitted, so effectively nothing to send
+            return false;
+        }
+
+        return (recordSetWriter.getNumRecords() > 0);
+    }
+
+    @Override
+    public void addComplete(final int rc, LedgerHandle handle,
+                            final long entryId, final Object ctx) {
+        final AtomicReference<Integer> effectiveRC = new AtomicReference<Integer>(rc);
+        try {
+            if (FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_TransmitComplete)) {
+                effectiveRC.set(BKException.Code.UnexpectedConditionException);
+            }
+        } catch (Exception exc) {
+            effectiveRC.set(BKException.Code.UnexpectedConditionException);
+        }
+
+        // Sanity check to make sure we're receiving these callbacks in order.
+        if (entryId > -1 && lastEntryId >= entryId) {
+            LOG.error("Log segment {} saw out of order entry {} lastEntryId {}",
+                new Object[] {fullyQualifiedLogSegment, entryId, lastEntryId});
+        }
+        lastEntryId = entryId;
+
+        assert (ctx instanceof BKTransmitPacket);
+        final BKTransmitPacket transmitPacket = (BKTransmitPacket) ctx;
+
+        // Time from transmit until receipt of addComplete callback
+        addCompleteTime.registerSuccessfulEvent(TimeUnit.MICROSECONDS.convert(
+            System.nanoTime() - transmitPacket.getTransmitTime(), TimeUnit.NANOSECONDS));
+
+        if (BKException.Code.OK == rc) {
+            EntryBuffer recordSet = transmitPacket.getRecordSet();
+            if (recordSet.hasUserRecords()) {
+                synchronized (this) {
+                    lastTxIdAcknowledged = Math.max(lastTxIdAcknowledged, recordSet.getMaxTxId());
+                }
+            }
+        }
+
+        if (null != addCompleteFuturePool) {
+            final Stopwatch queuedTime = Stopwatch.createStarted();
+            addCompleteFuturePool.apply(new Function0<Void>() {
+                public Void apply() {
+                    final Stopwatch deferredTime = Stopwatch.createStarted();
+                    addCompleteQueuedTime.registerSuccessfulEvent(queuedTime.elapsed(TimeUnit.MICROSECONDS));
+                    addCompleteDeferredProcessing(transmitPacket, entryId, effectiveRC.get());
+                    addCompleteDeferredTime.registerSuccessfulEvent(deferredTime.elapsed(TimeUnit.MICROSECONDS));
+                    return null;
+                }
+                @Override
+                public String toString() {
+                    return String.format("AddComplete(Stream=%s, entryId=%d, rc=%d)",
+                            fullyQualifiedLogSegment, entryId, rc);
+                }
+            }).addEventListener(new FutureEventListener<Void>() {
+                @Override
+                public void onSuccess(Void done) {
+                }
+                @Override
+                public void onFailure(Throwable cause) {
+                    LOG.error("addComplete processing failed for {} entry {} lastTxId {} rc {} with error",
+                        new Object[] {fullyQualifiedLogSegment, entryId, transmitPacket.getRecordSet().getMaxTxId(), rc, cause});
+                }
+            });
+            // Race condition if we notify before the addComplete is enqueued.
+            transmitPacket.notifyTransmitComplete(effectiveRC.get());
+            outstandingTransmits.getAndDecrement();
+        } else {
+            // Notify transmit complete must be called before deferred processing in the
+            // sync case since otherwise callbacks in deferred processing may deadlock.
+            transmitPacket.notifyTransmitComplete(effectiveRC.get());
+            outstandingTransmits.getAndDecrement();
+            addCompleteDeferredProcessing(transmitPacket, entryId, effectiveRC.get());
+        }
+    }
+
+    private void addCompleteDeferredProcessing(final BKTransmitPacket transmitPacket,
+                                               final long entryId,
+                                               final int rc) {
+        boolean cancelPendingPromises = false;
+        EntryBuffer recordSet = transmitPacket.getRecordSet();
+        synchronized (this) {
+            if (transmitResult.compareAndSet(BKException.Code.OK, rc)) {
+                // If this is the first time we are setting an error code in the transmitResult then
+                // we must cancel pending promises; once this error has been set, more records will not
+                // be enqueued; they will be failed with WriteException
+                cancelPendingPromises = (BKException.Code.OK != rc);
+            } else {
+                LOG.warn("Log segment {} entryId {}: Tried to set transmit result to ({}) but is already ({})",
+                    new Object[] {fullyQualifiedLogSegment, entryId, rc, transmitResult.get()});
+            }
+
+            if (transmitResult.get() != BKException.Code.OK) {
+                if (recordSet.hasUserRecords()) {
+                    transmitDataPacketSize.registerFailedEvent(recordSet.getNumBytes());
+                }
+            } else {
+                // If we had data that we flushed then we need it to make sure that
+                // background flush in the next pass will make the previous writes
+                // visible by advancing the lastAck
+                if (recordSet.hasUserRecords()) {
+                    transmitDataPacketSize.registerSuccessfulEvent(recordSet.getNumBytes());
+                    controlFlushNeeded = true;
+                    if (immediateFlushEnabled) {
+                        if (0 == minDelayBetweenImmediateFlushMs) {
+                            backgroundFlush(true);
+                        } else {
+                            scheduleFlushWithDelayIfNeeded(new Callable<Void>() {
+                                @Override
+                                public Void call() throws Exception {
+                                    backgroundFlush(true);
+                                    return null;
+                                }
+                            }, immFlushSchedFutureRef);
+                        }
+                    }
+                }
+            }
+
+            // update last dlsn before satisifying future
+            if (BKException.Code.OK == transmitResult.get()) {
+                DLSN lastDLSNInPacket = recordSet.finalizeTransmit(
+                        logSegmentSequenceNumber, entryId);
+                if (recordSet.hasUserRecords()) {
+                    if (null != lastDLSNInPacket && lastDLSN.compareTo(lastDLSNInPacket) < 0) {
+                        lastDLSN = lastDLSNInPacket;
+                    }
+                }
+            }
+        }
+
+        if (BKException.Code.OK == transmitResult.get()) {
+            recordSet.completeTransmit(logSegmentSequenceNumber, entryId);
+        } else {
+            recordSet.abortTransmit(FutureUtils.transmitException(transmitResult.get()));
+        }
+
+        if (cancelPendingPromises) {
+            // Since the writer is in a bad state no more packets will be tramsitted, and its safe to
+            // assign a new empty packet. This is to avoid a race with closeInternal which may also
+            // try to cancel the current packet;
+            final BKTransmitPacket packetCurrentSaved;
+            synchronized (this) {
+                packetCurrentSaved = new BKTransmitPacket(recordSetWriter);
+                recordSetWriter = newRecordSetWriter();
+            }
+            packetCurrentSaved.getRecordSet().abortTransmit(
+                    new WriteCancelledException(streamName,
+                            FutureUtils.transmitException(transmitResult.get())));
+        }
+    }
+
+    @Override
+    synchronized public void run()  {
+        backgroundFlush(false);
+    }
+
+    synchronized private void backgroundFlush(boolean controlFlushOnly)  {
+        if (null != closeFuture) {
+            // if the log segment is closing, skip any background flushing
+            LOG.debug("Skip background flushing since log segment {} is closing.", getFullyQualifiedLogSegment());
+            return;
+        }
+        try {
+            boolean newData = haveDataToTransmit();
+
+            if (controlFlushNeeded || (!controlFlushOnly && newData)) {
+                // If we need this periodic transmit to persist previously written data but
+                // there is no new data (which would cause the transmit to be skipped) generate
+                // a control record
+                if (!newData) {
+                    writeControlLogRecord();
+                }
+
+                transmit();
+                pFlushSuccesses.inc();
+            } else {
+                pFlushMisses.inc();
+            }
+        } catch (IOException exc) {
+            LOG.error("Log Segment {}: Error encountered by the periodic flush", fullyQualifiedLogSegment, exc);
+        }
+    }
+
+    synchronized private void keepAlive() {
+        if (null != closeFuture) {
+            // if the log segment is closing, skip sending any keep alive records.
+            LOG.debug("Skip sending keepAlive control record since log segment {} is closing.",
+                    getFullyQualifiedLogSegment());
+            return;
+        }
+
+        if (MathUtils.elapsedMSec(lastTransmitNanos) < periodicKeepAliveMs) {
+            return;
+        }
+
+        LogRecord controlRec = new LogRecord(lastTxId, DistributedLogConstants.KEEPALIVE_RECORD_CONTENT);
+        controlRec.setControl();
+        asyncWrite(controlRec);
+    }
+
+}