You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/01/05 00:51:14 UTC
[09/51] [partial] incubator-distributedlog git commit: DL-4:
Repackage the source under apache namespace
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java
new file mode 100644
index 0000000..6b60c77
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java
@@ -0,0 +1,1348 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Stopwatch;
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
+import org.apache.distributedlog.exceptions.BKTransmitException;
+import org.apache.distributedlog.exceptions.EndOfStreamException;
+import org.apache.distributedlog.exceptions.FlushException;
+import org.apache.distributedlog.exceptions.LockingException;
+import org.apache.distributedlog.exceptions.LogRecordTooLongException;
+import org.apache.distributedlog.exceptions.TransactionIdOutOfOrderException;
+import org.apache.distributedlog.exceptions.WriteCancelledException;
+import org.apache.distributedlog.exceptions.WriteException;
+import org.apache.distributedlog.exceptions.InvalidEnvelopedEntryException;
+import org.apache.distributedlog.feature.CoreFeatureKeys;
+import org.apache.distributedlog.injector.FailureInjector;
+import org.apache.distributedlog.injector.RandomDelayFailureInjector;
+import org.apache.distributedlog.io.Buffer;
+import org.apache.distributedlog.io.CompressionCodec;
+import org.apache.distributedlog.io.CompressionUtils;
+import org.apache.distributedlog.lock.DistributedLock;
+import org.apache.distributedlog.logsegment.LogSegmentEntryWriter;
+import org.apache.distributedlog.logsegment.LogSegmentWriter;
+import org.apache.distributedlog.stats.BroadCastStatsLogger;
+import org.apache.distributedlog.stats.OpStatsListener;
+import org.apache.distributedlog.util.FailpointUtils;
+import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.util.OrderedScheduler;
+import org.apache.distributedlog.util.PermitLimiter;
+import org.apache.distributedlog.util.SafeQueueingFuturePool;
+import org.apache.distributedlog.util.SimplePermitLimiter;
+import org.apache.distributedlog.util.Sizable;
+import com.twitter.util.Function0;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
+import com.twitter.util.FuturePool;
+import com.twitter.util.Promise;
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.feature.FeatureProvider;
+import org.apache.bookkeeper.stats.AlertStatsLogger;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.MathUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.runtime.AbstractFunction1;
+import scala.runtime.BoxedUnit;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.apache.distributedlog.LogRecord.MAX_LOGRECORD_SIZE;
+import static org.apache.distributedlog.LogRecord.MAX_LOGRECORDSET_SIZE;
+
+/**
+ * BookKeeper Based Log Segment Writer.
+ *
+ * Multiple log records are packed into a single bookkeeper
+ * entry before sending it over the network. The fact that the log record entries
+ * are complete in the bookkeeper entries means that each bookkeeper log entry
+ * can be read as a complete edit log. This is useful for reading, as we don't
+ * need to read through the entire log segment to get the last written entry.
+ *
+ * <h3>Metrics</h3>
+ *
+ * <ul>
+ * <li> flush/periodic/{success,miss}: counters for periodic flushes.
+ * <li> data/{success,miss}: counters for data transmits.
+ * <li> transmit/packetsize: opstats. characteristics of packet size for transmits.
+ * <li> control/success: counter of success transmit of control records
+ * <li> seg_writer/write: opstats. latency characteristics of write operations in segment writer.
+ * <li> seg_writer/add_complete/{callback,queued,deferred}: opstats. latency components of add completions.
+ * <li> seg_writer/pendings: counter. the number of records pending by the segment writers.
+ * <li> transmit/outstanding/requests: per stream gauge. the number of outstanding transmits each stream.
+ * </ul>
+ */
+class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Sizable {
+ static final Logger LOG = LoggerFactory.getLogger(BKLogSegmentWriter.class);
+
+ private final String fullyQualifiedLogSegment;
+ private final String streamName;
+ private final int logSegmentMetadataVersion;
+ private BKTransmitPacket packetPrevious;
+ private Entry.Writer recordSetWriter;
+ private final AtomicInteger outstandingTransmits;
+ private final int transmissionThreshold;
+ protected final LogSegmentEntryWriter entryWriter;
+ private final CompressionCodec.Type compressionType;
+ private final ReentrantLock transmitLock = new ReentrantLock();
+ private final AtomicInteger transmitResult
+ = new AtomicInteger(BKException.Code.OK);
+ private final DistributedLock lock;
+ private final boolean isDurableWriteEnabled;
+ private DLSN lastDLSN = DLSN.InvalidDLSN;
+ private final long startTxId;
+ private long lastTxId = DistributedLogConstants.INVALID_TXID;
+ private long lastTxIdAcknowledged = DistributedLogConstants.INVALID_TXID;
+ private long outstandingBytes = 0;
+ private long numFlushesSinceRestart = 0;
+ private long numBytes = 0;
+ private long lastEntryId = Long.MIN_VALUE;
+ private long lastTransmitNanos = Long.MIN_VALUE;
+ private final int periodicKeepAliveMs;
+
+ // Indicates whether there are writes that have been successfully transmitted that would need
+ // a control record to be transmitted to make them visible to the readers by updating the last
+ // add confirmed
+ volatile private boolean controlFlushNeeded = false;
+ private boolean immediateFlushEnabled = false;
+ private int minDelayBetweenImmediateFlushMs = 0;
+ private Stopwatch lastTransmit;
+ private boolean streamEnded = false;
+ private final ScheduledFuture<?> periodicFlushSchedule;
+ private final ScheduledFuture<?> periodicKeepAliveSchedule;
+ final private AtomicReference<ScheduledFuture<?>> transmitSchedFutureRef = new AtomicReference<ScheduledFuture<?>>(null);
+ final private AtomicReference<ScheduledFuture<?>> immFlushSchedFutureRef = new AtomicReference<ScheduledFuture<?>>(null);
+ final private AtomicReference<Exception> scheduledFlushException = new AtomicReference<Exception>(null);
+ private boolean enforceLock = true;
+ private Promise<Void> closeFuture = null;
+ private final boolean enableRecordCounts;
+ private int positionWithinLogSegment = 0;
+ private final long logSegmentSequenceNumber;
+ // Used only for values that *could* change (e.g. buffer size etc.)
+ private final DistributedLogConfiguration conf;
+ private final OrderedScheduler scheduler;
+
+ // stats
+ private final StatsLogger envelopeStatsLogger;
+ private final StatsLogger transmitOutstandingLogger;
+ private final Counter transmitDataSuccesses;
+ private final Counter transmitDataMisses;
+ private final Gauge<Number> transmitOutstandingGauge;
+ private final OpStatsLogger transmitDataPacketSize;
+ private final Counter transmitControlSuccesses;
+ private final Counter pFlushSuccesses;
+ private final Counter pFlushMisses;
+ private final OpStatsLogger writeTime;
+ private final OpStatsLogger addCompleteTime;
+ private final OpStatsLogger addCompleteQueuedTime;
+ private final OpStatsLogger addCompleteDeferredTime;
+ private final Counter pendingWrites;
+
+ // add complete processing
+ private final SafeQueueingFuturePool<Void> addCompleteFuturePool;
+
+ // Functions
+ private final AbstractFunction1<Integer, Future<Long>> GET_LAST_TXID_ACKNOWLEDGED_AFTER_TRANSMIT_FUNC =
+ new AbstractFunction1<Integer, Future<Long>>() {
+ @Override
+ public Future<Long> apply(Integer transmitRc) {
+ if (BKException.Code.OK == transmitRc) {
+ return Future.value(getLastTxIdAcknowledged());
+ } else {
+ return Future.exception(new BKTransmitException("Failed to transmit entry", transmitRc));
+ }
+ }
+ };
+ final AbstractFunction1<Long, Future<Long>> COMMIT_AFTER_FLUSH_FUNC =
+ new AbstractFunction1<Long, Future<Long>>() {
+ @Override
+ public Future<Long> apply(Long lastAckedTxId) {
+ return commit();
+ }
+ };
+
+ private final AlertStatsLogger alertStatsLogger;
+ private final WriteLimiter writeLimiter;
+ private final FailureInjector writeDelayInjector;
+
+ /**
+ * Construct an edit log output stream which writes to a ledger.
+ */
+ protected BKLogSegmentWriter(String streamName,
+ String logSegmentName,
+ DistributedLogConfiguration conf,
+ int logSegmentMetadataVersion,
+ LogSegmentEntryWriter entryWriter,
+ DistributedLock lock, /** the lock needs to be acquired **/
+ long startTxId,
+ long logSegmentSequenceNumber,
+ OrderedScheduler scheduler,
+ StatsLogger statsLogger,
+ StatsLogger perLogStatsLogger,
+ AlertStatsLogger alertStatsLogger,
+ PermitLimiter globalWriteLimiter,
+ FeatureProvider featureProvider,
+ DynamicDistributedLogConfiguration dynConf)
+ throws IOException {
+ super();
+
+ // set up a write limiter
+ PermitLimiter streamWriteLimiter = null;
+ if (conf.getPerWriterOutstandingWriteLimit() < 0) {
+ streamWriteLimiter = PermitLimiter.NULL_PERMIT_LIMITER;
+ } else {
+ Feature disableWriteLimitFeature = featureProvider.getFeature(
+ CoreFeatureKeys.DISABLE_WRITE_LIMIT.name().toLowerCase());
+ streamWriteLimiter = new SimplePermitLimiter(
+ conf.getOutstandingWriteLimitDarkmode(),
+ conf.getPerWriterOutstandingWriteLimit(),
+ statsLogger.scope("streamWriteLimiter"),
+ false,
+ disableWriteLimitFeature);
+ }
+ this.writeLimiter = new WriteLimiter(streamName, streamWriteLimiter, globalWriteLimiter);
+ this.alertStatsLogger = alertStatsLogger;
+ this.envelopeStatsLogger = BroadCastStatsLogger.masterslave(statsLogger, perLogStatsLogger);
+
+ StatsLogger flushStatsLogger = statsLogger.scope("flush");
+ StatsLogger pFlushStatsLogger = flushStatsLogger.scope("periodic");
+ pFlushSuccesses = pFlushStatsLogger.getCounter("success");
+ pFlushMisses = pFlushStatsLogger.getCounter("miss");
+
+ // transmit
+ StatsLogger transmitDataStatsLogger = statsLogger.scope("data");
+ transmitDataSuccesses = transmitDataStatsLogger.getCounter("success");
+ transmitDataMisses = transmitDataStatsLogger.getCounter("miss");
+ StatsLogger transmitStatsLogger = statsLogger.scope("transmit");
+ transmitDataPacketSize = transmitStatsLogger.getOpStatsLogger("packetsize");
+ StatsLogger transmitControlStatsLogger = statsLogger.scope("control");
+ transmitControlSuccesses = transmitControlStatsLogger.getCounter("success");
+ StatsLogger segWriterStatsLogger = statsLogger.scope("seg_writer");
+ writeTime = segWriterStatsLogger.getOpStatsLogger("write");
+ addCompleteTime = segWriterStatsLogger.scope("add_complete").getOpStatsLogger("callback");
+ addCompleteQueuedTime = segWriterStatsLogger.scope("add_complete").getOpStatsLogger("queued");
+ addCompleteDeferredTime = segWriterStatsLogger.scope("add_complete").getOpStatsLogger("deferred");
+ pendingWrites = segWriterStatsLogger.getCounter("pending");
+
+ // outstanding transmit requests
+ transmitOutstandingLogger = perLogStatsLogger.scope("transmit").scope("outstanding");
+ transmitOutstandingGauge = new Gauge<Number>() {
+ @Override
+ public Number getDefaultValue() {
+ return 0;
+ }
+ @Override
+ public Number getSample() {
+ return outstandingTransmits.get();
+ }
+ };
+ transmitOutstandingLogger.registerGauge("requests", transmitOutstandingGauge);
+
+ outstandingTransmits = new AtomicInteger(0);
+ this.fullyQualifiedLogSegment = streamName + ":" + logSegmentName;
+ this.streamName = streamName;
+ this.logSegmentMetadataVersion = logSegmentMetadataVersion;
+ this.entryWriter = entryWriter;
+ this.lock = lock;
+ this.lock.checkOwnershipAndReacquire();
+
+ final int configuredTransmissionThreshold = dynConf.getOutputBufferSize();
+ if (configuredTransmissionThreshold > MAX_LOGRECORDSET_SIZE) {
+ LOG.warn("Setting output buffer size {} greater than max transmission size {} for log segment {}",
+ new Object[] {configuredTransmissionThreshold, MAX_LOGRECORDSET_SIZE, fullyQualifiedLogSegment});
+ this.transmissionThreshold = MAX_LOGRECORDSET_SIZE;
+ } else {
+ this.transmissionThreshold = configuredTransmissionThreshold;
+ }
+ this.compressionType = CompressionUtils.stringToType(conf.getCompressionType());
+
+ this.logSegmentSequenceNumber = logSegmentSequenceNumber;
+ this.recordSetWriter = Entry.newEntry(
+ streamName,
+ Math.max(transmissionThreshold, 1024),
+ envelopeBeforeTransmit(),
+ compressionType,
+ envelopeStatsLogger);
+ this.packetPrevious = null;
+ this.startTxId = startTxId;
+ this.lastTxId = startTxId;
+ this.lastTxIdAcknowledged = startTxId;
+ this.enableRecordCounts = conf.getEnableRecordCounts();
+ this.immediateFlushEnabled = conf.getImmediateFlushEnabled();
+ this.isDurableWriteEnabled = dynConf.isDurableWriteEnabled();
+ this.scheduler = scheduler;
+
+ // Failure injection
+ if (conf.getEIInjectWriteDelay()) {
+ this.writeDelayInjector = new RandomDelayFailureInjector(dynConf);
+ } else {
+ this.writeDelayInjector = FailureInjector.NULL;
+ }
+
+ // If we are transmitting immediately (threshold == 0) and if immediate
+ // flush is enabled, we don't need the periodic flush task
+ final int configuredPeriodicFlushFrequency = dynConf.getPeriodicFlushFrequencyMilliSeconds();
+ if (!immediateFlushEnabled || (0 != this.transmissionThreshold)) {
+ int periodicFlushFrequency = configuredPeriodicFlushFrequency;
+ if (periodicFlushFrequency > 0 && scheduler != null) {
+ periodicFlushSchedule = scheduler.scheduleAtFixedRate(this,
+ periodicFlushFrequency/2, periodicFlushFrequency/2, TimeUnit.MILLISECONDS);
+ } else {
+ periodicFlushSchedule = null;
+ }
+ } else {
+ // Min delay heuristic applies only when immediate flush is enabled
+ // and transmission threshold is zero
+ minDelayBetweenImmediateFlushMs = conf.getMinDelayBetweenImmediateFlushMs();
+ periodicFlushSchedule = null;
+ }
+ this.periodicKeepAliveMs = conf.getPeriodicKeepAliveMilliSeconds();
+ if (periodicKeepAliveMs > 0 && scheduler != null) {
+ periodicKeepAliveSchedule = scheduler.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ keepAlive();
+ }
+ }, periodicKeepAliveMs, periodicKeepAliveMs, TimeUnit.MILLISECONDS);
+ } else {
+ periodicKeepAliveSchedule = null;
+ }
+
+ this.conf = conf;
+ if (null != scheduler) {
+ this.addCompleteFuturePool = new SafeQueueingFuturePool<Void>(scheduler.getFuturePool(streamName));
+ } else {
+ this.addCompleteFuturePool = null;
+ }
+ assert(!this.immediateFlushEnabled || (null != this.scheduler));
+ this.lastTransmit = Stopwatch.createStarted();
+ }
+
+ String getFullyQualifiedLogSegment() {
+ return fullyQualifiedLogSegment;
+ }
+
+ @VisibleForTesting
+ DistributedLock getLock() {
+ return this.lock;
+ }
+
+ @VisibleForTesting
+ FuturePool getFuturePool() {
+ if (null == scheduler) {
+ return null;
+ }
+ return scheduler.getFuturePool(streamName);
+ }
+
+ @VisibleForTesting
+ void setTransmitResult(int rc) {
+ transmitResult.set(rc);
+ }
+
+ @VisibleForTesting
+ protected final LogSegmentEntryWriter getEntryWriter() {
+ return this.entryWriter;
+ }
+
+ @Override
+ public long getLogSegmentId() {
+ return this.entryWriter.getLogSegmentId();
+ }
+
+ protected final long getLogSegmentSequenceNumber() {
+ return logSegmentSequenceNumber;
+ }
+
+ /**
+ * Get the start tx id of the log segment.
+ *
+ * @return start tx id of the log segment.
+ */
+ protected final long getStartTxId() {
+ return startTxId;
+ }
+
+ /**
+ * Get the last tx id that has been written to the log segment buffer but not committed yet.
+ *
+ * @return last tx id that has been written to the log segment buffer but not committed yet.
+ * @see #getLastTxIdAcknowledged()
+ */
+ synchronized long getLastTxId() {
+ return lastTxId;
+ }
+
+ /**
+ * Get the last tx id that has been acknowledged.
+ *
+ * @return last tx id that has been acknowledged.
+ * @see #getLastTxId()
+ */
+ synchronized long getLastTxIdAcknowledged() {
+ return lastTxIdAcknowledged;
+ }
+
+ /**
+ * Get the position-within-logsemgnet of the last written log record.
+ *
+ * @return position-within-logsegment of the last written log record.
+ */
+ synchronized int getPositionWithinLogSegment() {
+ return positionWithinLogSegment;
+ }
+
+ @VisibleForTesting
+ long getLastEntryId() {
+ return lastEntryId;
+ }
+
+ /**
+ * Get the last dlsn of the last acknowledged record.
+ *
+ * @return last dlsn of the last acknowledged record.
+ */
+ synchronized DLSN getLastDLSN() {
+ return lastDLSN;
+ }
+
+ @Override
+ public long size() {
+ return entryWriter.size();
+ }
+
+ private synchronized int getAverageTransmitSize() {
+ if (numFlushesSinceRestart > 0) {
+ long ret = numBytes/numFlushesSinceRestart;
+
+ if (ret < Integer.MIN_VALUE || ret > Integer.MAX_VALUE) {
+ throw new IllegalArgumentException
+ (ret + " transmit size should never exceed max transmit size");
+ }
+ return (int) ret;
+ }
+
+ return 0;
+ }
+
+ private Entry.Writer newRecordSetWriter() {
+ return Entry.newEntry(
+ streamName,
+ Math.max(transmissionThreshold, getAverageTransmitSize()),
+ envelopeBeforeTransmit(),
+ compressionType,
+ envelopeStatsLogger);
+ }
+
+ private boolean envelopeBeforeTransmit() {
+ return LogSegmentMetadata.supportsEnvelopedEntries(logSegmentMetadataVersion);
+ }
+
+ @Override
+ public Future<Void> asyncClose() {
+ return closeInternal(false);
+ }
+
+ @Override
+ public Future<Void> asyncAbort() {
+ return closeInternal(true);
+ }
+
+ private void flushAddCompletes() {
+ if (null != addCompleteFuturePool) {
+ addCompleteFuturePool.close();
+ }
+ }
+
+ private synchronized void abortPacket(BKTransmitPacket packet) {
+ long numRecords = 0;
+ if (null != packet) {
+ EntryBuffer recordSet = packet.getRecordSet();
+ numRecords = recordSet.getNumRecords();
+ int rc = transmitResult.get();
+ if (BKException.Code.OK == rc) {
+ rc = BKException.Code.InterruptedException;
+ }
+ Throwable reason = new WriteCancelledException(streamName, FutureUtils.transmitException(rc));
+ recordSet.abortTransmit(reason);
+ }
+ LOG.info("Stream {} aborted {} writes", fullyQualifiedLogSegment, numRecords);
+ }
+
+ private synchronized long getWritesPendingTransmit() {
+ if (null != recordSetWriter) {
+ return recordSetWriter.getNumRecords();
+ } else {
+ return 0;
+ }
+ }
+
+ private synchronized long getPendingAddCompleteCount() {
+ if (null != addCompleteFuturePool) {
+ return addCompleteFuturePool.size();
+ } else {
+ return 0;
+ }
+ }
+
+ private Future<Void> closeInternal(boolean abort) {
+ Promise<Void> closePromise;
+ synchronized (this) {
+ if (null != closeFuture) {
+ return closeFuture;
+ }
+ closePromise = closeFuture = new Promise<Void>();
+ }
+
+ AtomicReference<Throwable> throwExc = new AtomicReference<Throwable>(null);
+ closeInternal(abort, throwExc, closePromise);
+ return closePromise;
+ }
+
+ private void closeInternal(final boolean abort,
+ final AtomicReference<Throwable> throwExc,
+ final Promise<Void> closePromise) {
+ // clean stats resources
+ this.transmitOutstandingLogger.unregisterGauge("requests", transmitOutstandingGauge);
+ this.writeLimiter.close();
+
+ // Cancel the periodic keep alive schedule first
+ if (null != periodicKeepAliveSchedule) {
+ if (!periodicKeepAliveSchedule.cancel(false)) {
+ LOG.info("Periodic keepalive for log segment {} isn't cancelled.", getFullyQualifiedLogSegment());
+ }
+ }
+
+ // Cancel the periodic flush schedule first
+ // The task is allowed to exit gracefully
+ if (null != periodicFlushSchedule) {
+ // we don't need to care about the cancel result here. if the periodicl flush task couldn't
+ // be cancelled, it means that it is doing flushing. So following flushes would be synchronized
+ // to wait until background flush completed.
+ if (!periodicFlushSchedule.cancel(false)) {
+ LOG.info("Periodic flush for log segment {} isn't cancelled.", getFullyQualifiedLogSegment());
+ }
+ }
+
+ // If it is a normal close and the stream isn't in an error state, we attempt to flush any buffered data
+ if (!abort && !isLogSegmentInError()) {
+ this.enforceLock = false;
+ LOG.info("Flushing before closing log segment {}", getFullyQualifiedLogSegment());
+ flushAndCommit().addEventListener(new FutureEventListener<Long>() {
+ @Override
+ public void onSuccess(Long value) {
+ abortTransmitPacketOnClose(abort, throwExc, closePromise);
+ }
+
+ @Override
+ public void onFailure(Throwable cause) {
+ throwExc.set(cause);
+ abortTransmitPacketOnClose(abort, throwExc, closePromise);
+ }
+ });
+ } else {
+ abortTransmitPacketOnClose(abort, throwExc, closePromise);
+ }
+
+ }
+
+ private void abortTransmitPacketOnClose(final boolean abort,
+ final AtomicReference<Throwable> throwExc,
+ final Promise<Void> closePromise) {
+ LOG.info("Closing BKPerStreamLogWriter (abort={}) for {} :" +
+ " lastDLSN = {} outstandingTransmits = {} writesPendingTransmit = {} addCompletesPending = {}",
+ new Object[]{abort, fullyQualifiedLogSegment, getLastDLSN(),
+ outstandingTransmits.get(), getWritesPendingTransmit(), getPendingAddCompleteCount()});
+
+ // Save the current packet to reset, leave a new empty packet to avoid a race with
+ // addCompleteDeferredProcessing.
+ final BKTransmitPacket packetPreviousSaved;
+ final BKTransmitPacket packetCurrentSaved;
+ synchronized (this) {
+ packetPreviousSaved = packetPrevious;
+ packetCurrentSaved = new BKTransmitPacket(recordSetWriter);
+ recordSetWriter = newRecordSetWriter();
+ }
+
+ // Once the last packet been transmitted, apply any remaining promises asynchronously
+ // to avoid blocking close if bk client is slow for some reason.
+ if (null != packetPreviousSaved) {
+ packetPreviousSaved.addTransmitCompleteListener(new FutureEventListener<Integer>() {
+ @Override
+ public void onSuccess(Integer transmitResult) {
+ flushAddCompletes();
+ abortPacket(packetCurrentSaved);
+ }
+ @Override
+ public void onFailure(Throwable cause) {
+ LOG.error("Unexpected error on transmit completion ", cause);
+ }
+ });
+ } else {
+ // In this case there are no pending add completes, but we still need to abort the
+ // current packet.
+ abortPacket(packetCurrentSaved);
+ }
+ closeLedgerOnClose(abort, throwExc, closePromise);
+ }
+
+ private void closeLedgerOnClose(final boolean abort,
+ final AtomicReference<Throwable> throwExc,
+ final Promise<Void> closePromise) {
+ // close the log segment if it isn't in error state, so all the outstanding addEntry(s) will callback.
+ if (null == throwExc.get() && !isLogSegmentInError()) {
+ // Synchronous closing the ledger handle, if we couldn't close a ledger handle successfully.
+ // we should throw the exception to #closeToFinalize, so it would fail completing a log segment.
+ entryWriter.asyncClose(new CloseCallback() {
+ @Override
+ public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
+ if (BKException.Code.OK != rc && BKException.Code.LedgerClosedException != rc) {
+ if (!abort) {
+ throwExc.set(new IOException("Failed to close ledger for " + fullyQualifiedLogSegment + " : " +
+ BKException.getMessage(rc)));
+ }
+ }
+ completeClosePromise(abort, throwExc, closePromise);
+ }
+ }, null);
+ } else {
+ completeClosePromise(abort, throwExc, closePromise);
+ }
+ }
+
+ private void completeClosePromise(final boolean abort,
+ final AtomicReference<Throwable> throwExc,
+ final Promise<Void> closePromise) {
+ // If add entry failed because of closing ledger above, we don't need to fail the close operation
+ if (!abort && null == throwExc.get() && shouldFailCompleteLogSegment()) {
+ throwExc.set(new BKTransmitException("Closing an errored stream : ", transmitResult.get()));
+ }
+
+ if (null == throwExc.get()) {
+ FutureUtils.setValue(closePromise, null);
+ } else {
+ FutureUtils.setException(closePromise, throwExc.get());
+ }
+ }
+
+ @Override
+ synchronized public void write(LogRecord record) throws IOException {
+ writeUserRecord(record);
+ flushIfNeeded();
+ }
+
+ @Override
+ synchronized public Future<DLSN> asyncWrite(LogRecord record) {
+ return asyncWrite(record, true);
+ }
+
+ synchronized public Future<DLSN> asyncWrite(LogRecord record, boolean flush) {
+ Future<DLSN> result = null;
+ try {
+ if (record.isControl()) {
+ // we don't pack control records with user records together
+ // so transmit current output buffer if possible
+ try {
+ transmit();
+ } catch (IOException ioe) {
+ return Future.exception(new WriteCancelledException(fullyQualifiedLogSegment, ioe));
+ }
+ result = writeControlLogRecord(record);
+ transmit();
+ } else {
+ result = writeUserRecord(record);
+ if (!isDurableWriteEnabled) {
+ // we have no idea about the DLSN if durability is turned off.
+ result = Future.value(DLSN.InvalidDLSN);
+ }
+ if (flush) {
+ flushIfNeeded();
+ }
+ }
+ } catch (IOException ioe) {
+ // We may incorrectly report transmit failure here, but only if we happened to hit
+ // packet/xmit size limit conditions AND fail flush above, which should happen rarely
+ if (null != result) {
+ LOG.error("Overriding first result with flush failure {}", result);
+ }
+ result = Future.exception(ioe);
+
+ // Flush to ensure any prev. writes with flush=false are flushed despite failure.
+ flushIfNeededNoThrow();
+ }
+ return result;
+ }
+
+ synchronized private Future<DLSN> writeUserRecord(LogRecord record) throws IOException {
+ if (null != closeFuture) {
+ throw new WriteException(fullyQualifiedLogSegment, BKException.getMessage(BKException.Code.LedgerClosedException));
+ }
+
+ if (BKException.Code.OK != transmitResult.get()) {
+ // Failfast if the stream already encountered error with safe retry on the client
+ throw new WriteException(fullyQualifiedLogSegment, BKException.getMessage(transmitResult.get()));
+ }
+
+ if (streamEnded) {
+ throw new EndOfStreamException("Writing to a stream after it has been marked as completed");
+ }
+
+ if ((record.getTransactionId() < 0) ||
+ (record.getTransactionId() == DistributedLogConstants.MAX_TXID)) {
+ throw new TransactionIdOutOfOrderException(record.getTransactionId());
+ }
+
+ // Inject write delay if configured to do so
+ writeDelayInjector.inject();
+
+ // Will check write rate limits and throw if exceeded.
+ writeLimiter.acquire();
+ pendingWrites.inc();
+
+ // The count represents the number of user records up to the
+ // current record
+ // Increment the record count only when writing a user log record
+ // Internally generated log records don't increment the count
+ // writeInternal will always set a count regardless of whether it was
+ // incremented or not.
+ Future<DLSN> future = null;
+ try {
+ // increment the position for the record to write
+ // if the record is failed to write, it would be decremented.
+ positionWithinLogSegment++;
+ int numRecords = 1;
+ if (record.isRecordSet()) {
+ numRecords = LogRecordSet.numRecords(record);
+ }
+ future = writeInternal(record);
+ // after the record (record set) is written, the position should be
+ // moved for {numRecords}, but since we already moved the record by 1
+ // so advance the position for other {numRecords - 1}.
+ positionWithinLogSegment += (numRecords - 1);
+ } catch (IOException ex) {
+ writeLimiter.release();
+ pendingWrites.dec();
+ positionWithinLogSegment--;
+ throw ex;
+ }
+
+ // Track outstanding requests and return the future.
+ return future.ensure(new Function0<BoxedUnit>() {
+ public BoxedUnit apply() {
+ pendingWrites.dec();
+ writeLimiter.release();
+ return null;
+ }
+ });
+ }
+
+ boolean isLogSegmentInError() {
+ return (transmitResult.get() != BKException.Code.OK);
+ }
+
+ boolean shouldFailCompleteLogSegment() {
+ return (transmitResult.get() != BKException.Code.OK) &&
+ (transmitResult.get() != BKException.Code.LedgerClosedException);
+ }
+
+ synchronized public Future<DLSN> writeInternal(LogRecord record)
+ throws LogRecordTooLongException, LockingException, BKTransmitException,
+ WriteException, InvalidEnvelopedEntryException {
+ int logRecordSize = record.getPersistentSize();
+
+ if (logRecordSize > MAX_LOGRECORD_SIZE) {
+ throw new LogRecordTooLongException(String.format(
+ "Log Record of size %d written when only %d is allowed",
+ logRecordSize, MAX_LOGRECORD_SIZE));
+ }
+
+ // If we will exceed the max number of bytes allowed per entry
+ // initiate a transmit before accepting the new log record
+ if ((recordSetWriter.getNumBytes() + logRecordSize) > MAX_LOGRECORDSET_SIZE) {
+ checkStateAndTransmit();
+ }
+
+ checkWriteLock();
+
+ if (enableRecordCounts) {
+ // Set the count here. The caller would appropriately increment it
+ // if this log record is to be counted
+ record.setPositionWithinLogSegment(positionWithinLogSegment);
+ }
+
+ Promise<DLSN> writePromise = new Promise<DLSN>();
+ writePromise.addEventListener(new OpStatsListener<DLSN>(writeTime));
+ recordSetWriter.writeRecord(record, writePromise);
+
+ if (record.getTransactionId() < lastTxId) {
+ LOG.info("Log Segment {} TxId decreased Last: {} Record: {}",
+ new Object[] {fullyQualifiedLogSegment, lastTxId, record.getTransactionId()});
+ }
+ if (!record.isControl()) {
+ // only update last tx id for user records
+ lastTxId = record.getTransactionId();
+ outstandingBytes += (20 + record.getPayload().length);
+ }
+ return writePromise;
+ }
+
+ synchronized private Future<DLSN> writeControlLogRecord()
+ throws BKTransmitException, WriteException, InvalidEnvelopedEntryException,
+ LockingException, LogRecordTooLongException {
+ LogRecord controlRec = new LogRecord(lastTxId, DistributedLogConstants.CONTROL_RECORD_CONTENT);
+ controlRec.setControl();
+ return writeControlLogRecord(controlRec);
+ }
+
+ synchronized private Future<DLSN> writeControlLogRecord(LogRecord record)
+ throws BKTransmitException, WriteException, InvalidEnvelopedEntryException,
+ LockingException, LogRecordTooLongException {
+ return writeInternal(record);
+ }
+
+ /**
+ * We write a special log record that marks the end of the stream. Since this is the last
+ * log record in the stream, it is marked with MAX_TXID. MAX_TXID also has the useful
+ * side-effect of disallowing future startLogSegment calls through the MaxTxID check
+ *
+ * @throws IOException
+ */
+ synchronized private void writeEndOfStreamMarker() throws IOException {
+ LogRecord endOfStreamRec = new LogRecord(DistributedLogConstants.MAX_TXID, "endOfStream".getBytes(UTF_8));
+ endOfStreamRec.setEndOfStream();
+ writeInternal(endOfStreamRec);
+ }
+
+ /**
+ * Flushes all the data up to this point,
+ * adds the end of stream marker and marks the stream
+ * as read-only in the metadata. No appends to the
+ * stream will be allowed after this point
+ */
+ public Future<Long> markEndOfStream() {
+ synchronized (this) {
+ try {
+ writeEndOfStreamMarker();
+ } catch (IOException e) {
+ return Future.exception(e);
+ }
+ streamEnded = true;
+ }
+ return flushAndCommit();
+ }
+
+ /**
+ * Write bulk of records.
+ *
+ * (TODO: moved this method to log writer level)
+ *
+ * @param records list of records to write
+ * @return number of records that has been written
+ * @throws IOException when there is I/O errors during writing records.
+ */
+ synchronized public int writeBulk(List<LogRecord> records) throws IOException {
+ int numRecords = 0;
+ for (LogRecord r : records) {
+ write(r);
+ numRecords++;
+ }
+ return numRecords;
+ }
+
+ private void checkStateBeforeTransmit() throws WriteException {
+ try {
+ FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_TransmitBeforeAddEntry);
+ } catch (IOException e) {
+ throw new WriteException(streamName, "Fail transmit before adding entries");
+ }
+ }
+
+ /**
+ * Transmit the output buffer data to the backend.
+ *
+ * @return last txn id that already acknowledged
+ * @throws BKTransmitException if the segment writer is already in error state
+ * @throws LockingException if the segment writer lost lock before transmit
+ * @throws WriteException if failed to create the envelope for the data to transmit
+ * @throws InvalidEnvelopedEntryException when built an invalid enveloped entry
+ */
+ synchronized void checkStateAndTransmit()
+ throws BKTransmitException, WriteException, InvalidEnvelopedEntryException, LockingException {
+ checkStateBeforeTransmit();
+ transmit();
+ }
+
+ @Override
+ public synchronized Future<Long> flush() {
+ try {
+ checkStateBeforeTransmit();
+ } catch (WriteException e) {
+ return Future.exception(e);
+ }
+
+ Future<Integer> transmitFuture;
+ try {
+ transmitFuture = transmit();
+ } catch (BKTransmitException e) {
+ return Future.exception(e);
+ } catch (LockingException e) {
+ return Future.exception(e);
+ } catch (WriteException e) {
+ return Future.exception(e);
+ } catch (InvalidEnvelopedEntryException e) {
+ return Future.exception(e);
+ }
+
+ if (null == transmitFuture) {
+ if (null != packetPrevious) {
+ transmitFuture = packetPrevious.getTransmitFuture();
+ } else {
+ return Future.value(getLastTxIdAcknowledged());
+ }
+ }
+
+ return transmitFuture.flatMap(GET_LAST_TXID_ACKNOWLEDGED_AFTER_TRANSMIT_FUNC);
+ }
+
+ @Override
+ public synchronized Future<Long> commit() {
+ // we don't pack control records with user records together
+ // so transmit current output buffer if possible
+ Future<Integer> transmitFuture;
+ try {
+ try {
+ transmitFuture = transmit();
+ } catch (IOException ioe) {
+ return Future.exception(ioe);
+ }
+ if (null == transmitFuture) {
+ writeControlLogRecord();
+ return flush();
+ }
+ } catch (IOException ioe) {
+ return Future.exception(ioe);
+ }
+ return transmitFuture.flatMap(GET_LAST_TXID_ACKNOWLEDGED_AFTER_TRANSMIT_FUNC);
+ }
+
+ Future<Long> flushAndCommit() {
+ return flush().flatMap(COMMIT_AFTER_FLUSH_FUNC);
+ }
+
+ void flushIfNeededNoThrow() {
+ try {
+ flushIfNeeded();
+ } catch (IOException ioe) {
+ LOG.error("Encountered exception while flushing log records to stream {}",
+ fullyQualifiedLogSegment, ioe);
+ }
+ }
+
+ void scheduleFlushWithDelayIfNeeded(final Callable<?> callable,
+ final AtomicReference<ScheduledFuture<?>> scheduledFutureRef) {
+ final long delayMs = Math.max(0, minDelayBetweenImmediateFlushMs - lastTransmit.elapsed(TimeUnit.MILLISECONDS));
+ final ScheduledFuture<?> scheduledFuture = scheduledFutureRef.get();
+ if ((null == scheduledFuture) || scheduledFuture.isDone()) {
+ scheduledFutureRef.set(scheduler.schedule(new Runnable() {
+ @Override
+ public void run() {
+ synchronized(this) {
+ scheduledFutureRef.set(null);
+ try {
+ callable.call();
+
+ // Flush was successful or wasn't needed, the exception should be unset.
+ scheduledFlushException.set(null);
+ } catch (Exception exc) {
+ scheduledFlushException.set(exc);
+ LOG.error("Delayed flush failed", exc);
+ }
+ }
+ }
+ }, delayMs, TimeUnit.MILLISECONDS));
+ }
+ }
+
+ // Based on transmit buffer size, immediate flush, etc., should we flush the current
+ // packet now.
+ void flushIfNeeded() throws BKTransmitException, WriteException, InvalidEnvelopedEntryException,
+ LockingException, FlushException {
+ if (outstandingBytes > transmissionThreshold) {
+ // If flush delay is disabled, flush immediately, else schedule appropriately.
+ if (0 == minDelayBetweenImmediateFlushMs) {
+ checkStateAndTransmit();
+ } else {
+ scheduleFlushWithDelayIfNeeded(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ checkStateAndTransmit();
+ return null;
+ }
+ }, transmitSchedFutureRef);
+
+ // Timing here is not very important--the last flush failed and we should
+ // indicate this to the caller. The next flush may succeed and unset the
+ // scheduledFlushException in which case the next write will succeed (if the caller
+ // hasn't already closed the writer).
+ if (scheduledFlushException.get() != null) {
+ throw new FlushException("Last flush encountered an error while writing data to the backend",
+ getLastTxId(), getLastTxIdAcknowledged(), scheduledFlushException.get());
+ }
+ }
+ }
+ }
+
+ private void checkWriteLock() throws LockingException {
+ try {
+ if (FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_WriteInternalLostLock)) {
+ throw new LockingException("/failpoint/lockpath", "failpoint is simulating a lost lock"
+ + getFullyQualifiedLogSegment());
+ }
+ } catch (IOException e) {
+ throw new LockingException("/failpoint/lockpath", "failpoint is simulating a lost lock for "
+ + getFullyQualifiedLogSegment());
+ }
+ if (enforceLock) {
+ lock.checkOwnershipAndReacquire();
+ }
+ }
+
+ /**
+ * Transmit the current buffer to bookkeeper.
+ * Synchronised at the class. #write() and #setReadyToFlush()
+ * are never called at the same time.
+ *
+ * NOTE: This method should only throw known exceptions so that we don't accidentally
+ * add new code that throws in an inappropriate place.
+ *
+ * @return a transmit future for caller to wait for transmit result if we transmit successfully,
+ * null if no data to transmit
+ * @throws BKTransmitException if the segment writer is already in error state
+ * @throws LockingException if the segment writer lost lock before transmit
+ * @throws WriteException if failed to create the envelope for the data to transmit
+ * @throws InvalidEnvelopedEntryException when built an invalid enveloped entry
+ */
+ private Future<Integer> transmit()
+ throws BKTransmitException, LockingException, WriteException, InvalidEnvelopedEntryException {
+ EntryBuffer recordSetToTransmit;
+ transmitLock.lock();
+ try {
+ synchronized (this) {
+ checkWriteLock();
+ // If transmitResult is anything other than BKException.Code.OK, it means that the
+ // stream has encountered an error and cannot be written to.
+ if (!transmitResult.compareAndSet(BKException.Code.OK,
+ BKException.Code.OK)) {
+ LOG.error("Log Segment {} Trying to write to an errored stream; Error is {}",
+ fullyQualifiedLogSegment,
+ BKException.getMessage(transmitResult.get()));
+ throw new BKTransmitException("Trying to write to an errored stream;"
+ + " Error code : (" + transmitResult.get()
+ + ") " + BKException.getMessage(transmitResult.get()), transmitResult.get());
+ }
+
+ if (recordSetWriter.getNumRecords() == 0) {
+ // Control flushes always have at least the control record to flush
+ transmitDataMisses.inc();
+ return null;
+ }
+
+ recordSetToTransmit = recordSetWriter;
+ recordSetWriter = newRecordSetWriter();
+ outstandingBytes = 0;
+
+ if (recordSetToTransmit.hasUserRecords()) {
+ numBytes += recordSetToTransmit.getNumBytes();
+ numFlushesSinceRestart++;
+ }
+ }
+
+ Buffer toSend;
+ try {
+ toSend = recordSetToTransmit.getBuffer();
+ FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_TransmitFailGetBuffer);
+ } catch (IOException e) {
+ if (e instanceof InvalidEnvelopedEntryException) {
+ alertStatsLogger.raise("Invalid enveloped entry for segment {} : ", fullyQualifiedLogSegment, e);
+ }
+ LOG.error("Exception while enveloping entries for segment: {}",
+ new Object[] {fullyQualifiedLogSegment}, e);
+ // If a write fails here, we need to set the transmit result to an error so that
+ // no future writes go through and violate ordering guarantees.
+ transmitResult.set(BKException.Code.WriteException);
+ if (e instanceof InvalidEnvelopedEntryException) {
+ alertStatsLogger.raise("Invalid enveloped entry for segment {} : ", fullyQualifiedLogSegment, e);
+ throw (InvalidEnvelopedEntryException) e;
+ } else {
+ throw new WriteException(streamName, "Envelope Error");
+ }
+ }
+
+ synchronized (this) {
+ // update the transmit timestamp
+ lastTransmitNanos = MathUtils.nowInNano();
+
+ BKTransmitPacket packet = new BKTransmitPacket(recordSetToTransmit);
+ packetPrevious = packet;
+ entryWriter.asyncAddEntry(toSend.getData(), 0, toSend.size(),
+ this, packet);
+
+ if (recordSetToTransmit.hasUserRecords()) {
+ transmitDataSuccesses.inc();
+ } else {
+ transmitControlSuccesses.inc();
+ }
+
+ lastTransmit.reset().start();
+ outstandingTransmits.incrementAndGet();
+ controlFlushNeeded = false;
+ return packet.getTransmitFuture();
+ }
+ } finally {
+ transmitLock.unlock();
+ }
+ }
+
+ /**
+ * Checks if there is any data to transmit so that the periodic flush
+ * task can determine if there is anything it needs to do
+ */
+ synchronized private boolean haveDataToTransmit() {
+ if (!transmitResult.compareAndSet(BKException.Code.OK, BKException.Code.OK)) {
+ // Even if there is data it cannot be transmitted, so effectively nothing to send
+ return false;
+ }
+
+ return (recordSetWriter.getNumRecords() > 0);
+ }
+
+ @Override
+ public void addComplete(final int rc, LedgerHandle handle,
+ final long entryId, final Object ctx) {
+ final AtomicReference<Integer> effectiveRC = new AtomicReference<Integer>(rc);
+ try {
+ if (FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_TransmitComplete)) {
+ effectiveRC.set(BKException.Code.UnexpectedConditionException);
+ }
+ } catch (Exception exc) {
+ effectiveRC.set(BKException.Code.UnexpectedConditionException);
+ }
+
+ // Sanity check to make sure we're receiving these callbacks in order.
+ if (entryId > -1 && lastEntryId >= entryId) {
+ LOG.error("Log segment {} saw out of order entry {} lastEntryId {}",
+ new Object[] {fullyQualifiedLogSegment, entryId, lastEntryId});
+ }
+ lastEntryId = entryId;
+
+ assert (ctx instanceof BKTransmitPacket);
+ final BKTransmitPacket transmitPacket = (BKTransmitPacket) ctx;
+
+ // Time from transmit until receipt of addComplete callback
+ addCompleteTime.registerSuccessfulEvent(TimeUnit.MICROSECONDS.convert(
+ System.nanoTime() - transmitPacket.getTransmitTime(), TimeUnit.NANOSECONDS));
+
+ if (BKException.Code.OK == rc) {
+ EntryBuffer recordSet = transmitPacket.getRecordSet();
+ if (recordSet.hasUserRecords()) {
+ synchronized (this) {
+ lastTxIdAcknowledged = Math.max(lastTxIdAcknowledged, recordSet.getMaxTxId());
+ }
+ }
+ }
+
+ if (null != addCompleteFuturePool) {
+ final Stopwatch queuedTime = Stopwatch.createStarted();
+ addCompleteFuturePool.apply(new Function0<Void>() {
+ public Void apply() {
+ final Stopwatch deferredTime = Stopwatch.createStarted();
+ addCompleteQueuedTime.registerSuccessfulEvent(queuedTime.elapsed(TimeUnit.MICROSECONDS));
+ addCompleteDeferredProcessing(transmitPacket, entryId, effectiveRC.get());
+ addCompleteDeferredTime.registerSuccessfulEvent(deferredTime.elapsed(TimeUnit.MICROSECONDS));
+ return null;
+ }
+ @Override
+ public String toString() {
+ return String.format("AddComplete(Stream=%s, entryId=%d, rc=%d)",
+ fullyQualifiedLogSegment, entryId, rc);
+ }
+ }).addEventListener(new FutureEventListener<Void>() {
+ @Override
+ public void onSuccess(Void done) {
+ }
+ @Override
+ public void onFailure(Throwable cause) {
+ LOG.error("addComplete processing failed for {} entry {} lastTxId {} rc {} with error",
+ new Object[] {fullyQualifiedLogSegment, entryId, transmitPacket.getRecordSet().getMaxTxId(), rc, cause});
+ }
+ });
+ // Race condition if we notify before the addComplete is enqueued.
+ transmitPacket.notifyTransmitComplete(effectiveRC.get());
+ outstandingTransmits.getAndDecrement();
+ } else {
+ // Notify transmit complete must be called before deferred processing in the
+ // sync case since otherwise callbacks in deferred processing may deadlock.
+ transmitPacket.notifyTransmitComplete(effectiveRC.get());
+ outstandingTransmits.getAndDecrement();
+ addCompleteDeferredProcessing(transmitPacket, entryId, effectiveRC.get());
+ }
+ }
+
+ private void addCompleteDeferredProcessing(final BKTransmitPacket transmitPacket,
+ final long entryId,
+ final int rc) {
+ boolean cancelPendingPromises = false;
+ EntryBuffer recordSet = transmitPacket.getRecordSet();
+ synchronized (this) {
+ if (transmitResult.compareAndSet(BKException.Code.OK, rc)) {
+ // If this is the first time we are setting an error code in the transmitResult then
+ // we must cancel pending promises; once this error has been set, more records will not
+ // be enqueued; they will be failed with WriteException
+ cancelPendingPromises = (BKException.Code.OK != rc);
+ } else {
+ LOG.warn("Log segment {} entryId {}: Tried to set transmit result to ({}) but is already ({})",
+ new Object[] {fullyQualifiedLogSegment, entryId, rc, transmitResult.get()});
+ }
+
+ if (transmitResult.get() != BKException.Code.OK) {
+ if (recordSet.hasUserRecords()) {
+ transmitDataPacketSize.registerFailedEvent(recordSet.getNumBytes());
+ }
+ } else {
+ // If we had data that we flushed then we need it to make sure that
+ // background flush in the next pass will make the previous writes
+ // visible by advancing the lastAck
+ if (recordSet.hasUserRecords()) {
+ transmitDataPacketSize.registerSuccessfulEvent(recordSet.getNumBytes());
+ controlFlushNeeded = true;
+ if (immediateFlushEnabled) {
+ if (0 == minDelayBetweenImmediateFlushMs) {
+ backgroundFlush(true);
+ } else {
+ scheduleFlushWithDelayIfNeeded(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ backgroundFlush(true);
+ return null;
+ }
+ }, immFlushSchedFutureRef);
+ }
+ }
+ }
+ }
+
+ // update last dlsn before satisifying future
+ if (BKException.Code.OK == transmitResult.get()) {
+ DLSN lastDLSNInPacket = recordSet.finalizeTransmit(
+ logSegmentSequenceNumber, entryId);
+ if (recordSet.hasUserRecords()) {
+ if (null != lastDLSNInPacket && lastDLSN.compareTo(lastDLSNInPacket) < 0) {
+ lastDLSN = lastDLSNInPacket;
+ }
+ }
+ }
+ }
+
+ if (BKException.Code.OK == transmitResult.get()) {
+ recordSet.completeTransmit(logSegmentSequenceNumber, entryId);
+ } else {
+ recordSet.abortTransmit(FutureUtils.transmitException(transmitResult.get()));
+ }
+
+ if (cancelPendingPromises) {
+ // Since the writer is in a bad state no more packets will be tramsitted, and its safe to
+ // assign a new empty packet. This is to avoid a race with closeInternal which may also
+ // try to cancel the current packet;
+ final BKTransmitPacket packetCurrentSaved;
+ synchronized (this) {
+ packetCurrentSaved = new BKTransmitPacket(recordSetWriter);
+ recordSetWriter = newRecordSetWriter();
+ }
+ packetCurrentSaved.getRecordSet().abortTransmit(
+ new WriteCancelledException(streamName,
+ FutureUtils.transmitException(transmitResult.get())));
+ }
+ }
+
+ @Override
+ synchronized public void run() {
+ backgroundFlush(false);
+ }
+
+ synchronized private void backgroundFlush(boolean controlFlushOnly) {
+ if (null != closeFuture) {
+ // if the log segment is closing, skip any background flushing
+ LOG.debug("Skip background flushing since log segment {} is closing.", getFullyQualifiedLogSegment());
+ return;
+ }
+ try {
+ boolean newData = haveDataToTransmit();
+
+ if (controlFlushNeeded || (!controlFlushOnly && newData)) {
+ // If we need this periodic transmit to persist previously written data but
+ // there is no new data (which would cause the transmit to be skipped) generate
+ // a control record
+ if (!newData) {
+ writeControlLogRecord();
+ }
+
+ transmit();
+ pFlushSuccesses.inc();
+ } else {
+ pFlushMisses.inc();
+ }
+ } catch (IOException exc) {
+ LOG.error("Log Segment {}: Error encountered by the periodic flush", fullyQualifiedLogSegment, exc);
+ }
+ }
+
+ synchronized private void keepAlive() {
+ if (null != closeFuture) {
+ // if the log segment is closing, skip sending any keep alive records.
+ LOG.debug("Skip sending keepAlive control record since log segment {} is closing.",
+ getFullyQualifiedLogSegment());
+ return;
+ }
+
+ if (MathUtils.elapsedMSec(lastTransmitNanos) < periodicKeepAliveMs) {
+ return;
+ }
+
+ LogRecord controlRec = new LogRecord(lastTxId, DistributedLogConstants.KEEPALIVE_RECORD_CONTENT);
+ controlRec.setControl();
+ asyncWrite(controlRec);
+ }
+
+}