You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2016/12/17 06:43:20 UTC
incubator-distributedlog git commit: DL-87: Introduce periodic
keepalive control record in writer
Repository: incubator-distributedlog
Updated Branches:
refs/heads/master 40df29d9b -> b4150fc84
DL-87: Introduce periodic keepalive control record in writer
merge twitter's change from Leigh Stewart
Author: Jordan Bull <jb...@twitter.com>
Author: Sijie Guo <si...@twitter.com>
Author: Leigh Stewart <ls...@twitter.com>
Reviewers: Leigh Stewart <ls...@apache.org>
Closes #59 from sijie/merge/DL-87
Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/b4150fc8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/b4150fc8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/b4150fc8
Branch: refs/heads/master
Commit: b4150fc842e63ea43dc648df21c0c2d00489282b
Parents: 40df29d
Author: Jordan Bull <jb...@twitter.com>
Authored: Fri Dec 16 22:43:24 2016 -0800
Committer: Sijie Guo <si...@apache.org>
Committed: Fri Dec 16 22:43:24 2016 -0800
----------------------------------------------------------------------
.../distributedlog/BKLogSegmentWriter.java | 47 ++++++++++++++++-
.../DistributedLogConfiguration.java | 25 +++++++++
.../distributedlog/DistributedLogConstants.java | 1 +
.../distributedlog/TestAsyncReaderWriter.java | 53 ++++++++++++++++++++
4 files changed, 125 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b4150fc8/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java
index 004b2fb..1b52951 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java
@@ -72,6 +72,7 @@ import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.MathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.runtime.AbstractFunction1;
@@ -128,6 +129,8 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz
private long numFlushesSinceRestart = 0;
private long numBytes = 0;
private long lastEntryId = Long.MIN_VALUE;
+ private long lastTransmitNanos = Long.MIN_VALUE;
+ private final int periodicKeepAliveMs;
// Indicates whether there are writes that have been successfully transmitted that would need
// a control record to be transmitted to make them visible to the readers by updating the last
@@ -137,7 +140,8 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz
private int minDelayBetweenImmediateFlushMs = 0;
private Stopwatch lastTransmit;
private boolean streamEnded = false;
- private ScheduledFuture<?> periodicFlushSchedule = null;
+ private final ScheduledFuture<?> periodicFlushSchedule;
+ private final ScheduledFuture<?> periodicKeepAliveSchedule;
final private AtomicReference<ScheduledFuture<?>> transmitSchedFutureRef = new AtomicReference<ScheduledFuture<?>>(null);
final private AtomicReference<ScheduledFuture<?>> immFlushSchedFutureRef = new AtomicReference<ScheduledFuture<?>>(null);
final private AtomicReference<Exception> scheduledFlushException = new AtomicReference<Exception>(null);
@@ -312,11 +316,25 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz
if (periodicFlushFrequency > 0 && scheduler != null) {
periodicFlushSchedule = scheduler.scheduleAtFixedRate(this,
periodicFlushFrequency/2, periodicFlushFrequency/2, TimeUnit.MILLISECONDS);
+ } else {
+ periodicFlushSchedule = null;
}
} else {
// Min delay heuristic applies only when immediate flush is enabled
// and transmission threshold is zero
minDelayBetweenImmediateFlushMs = conf.getMinDelayBetweenImmediateFlushMs();
+ periodicFlushSchedule = null;
+ }
+ this.periodicKeepAliveMs = conf.getPeriodicKeepAliveMilliSeconds();
+ if (periodicKeepAliveMs > 0 && scheduler != null) {
+ periodicKeepAliveSchedule = scheduler.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ keepAlive();
+ }
+ }, periodicKeepAliveMs, periodicKeepAliveMs, TimeUnit.MILLISECONDS);
+ } else {
+ periodicKeepAliveSchedule = null;
}
this.conf = conf;
@@ -513,6 +531,13 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz
private void closeInternal(final boolean abort,
final AtomicReference<Throwable> throwExc,
final Promise<Void> closePromise) {
+ // Cancel the periodic keep alive schedule first
+ if (null != periodicKeepAliveSchedule) {
+ if (!periodicKeepAliveSchedule.cancel(false)) {
+ LOG.info("Periodic keepalive for log segment {} isn't cancelled.", getFullyQualifiedLogSegment());
+ }
+ }
+
// Cancel the periodic flush schedule first
// The task is allowed to exit gracefully
if (null != periodicFlushSchedule) {
@@ -1079,6 +1104,9 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz
}
synchronized (this) {
+ // update the transmit timestamp
+ lastTransmitNanos = MathUtils.nowInNano();
+
BKTransmitPacket packet = new BKTransmitPacket(recordSetToTransmit);
packetPrevious = packet;
entryWriter.asyncAddEntry(toSend.getData(), 0, toSend.size(),
@@ -1293,4 +1321,21 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz
}
}
+ synchronized private void keepAlive() {
+ if (null != closeFuture) {
+ // if the log segment is closing, skip sending any keep alive records.
+ LOG.debug("Skip sending keepAlive control record since log segment {} is closing.",
+ getFullyQualifiedLogSegment());
+ return;
+ }
+
+ if (MathUtils.elapsedMSec(lastTransmitNanos) < periodicKeepAliveMs) {
+ return;
+ }
+
+ LogRecord controlRec = new LogRecord(lastTxId, DistributedLogConstants.KEEPALIVE_RECORD_CONTENT);
+ controlRec.setControl();
+ asyncWrite(controlRec);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b4150fc8/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
index d2af862..c2057df 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
@@ -249,6 +249,8 @@ public class DistributedLogConfiguration extends CompositeConfiguration {
public static final boolean BKDL_ENABLE_IMMEDIATE_FLUSH_DEFAULT = false;
public static final String BKDL_MINIMUM_DELAY_BETWEEN_IMMEDIATE_FLUSH_MILLISECONDS = "minimumDelayBetweenImmediateFlushMilliSeconds";
public static final int BKDL_MINIMUM_DELAY_BETWEEN_IMMEDIATE_FLUSH_MILLISECONDS_DEFAULT = 0;
+ public static final String BKDL_PERIODIC_KEEP_ALIVE_MILLISECONDS = "periodicKeepAliveMilliSeconds";
+ public static final int BKDL_PERIODIC_KEEP_ALIVE_MILLISECONDS_DEFAULT = 0;
// Retention/Truncation Settings
public static final String BKDL_RETENTION_PERIOD_IN_HOURS = "logSegmentRetentionHours";
@@ -1893,6 +1895,29 @@ public class DistributedLogConfiguration extends CompositeConfiguration {
return this;
}
+ /**
+ * Get Periodic Keep Alive Frequency in milliseconds.
+ * <p>If the setting is set with a positive value, it would periodically write a control record
+ * to keep the stream active. The default value is 0.
+ *
+ * @return periodic keep alive frequency in milliseconds.
+ */
+ public int getPeriodicKeepAliveMilliSeconds() {
+ return this.getInt(BKDL_PERIODIC_KEEP_ALIVE_MILLISECONDS, BKDL_PERIODIC_KEEP_ALIVE_MILLISECONDS_DEFAULT);
+ }
+
+ /**
+ * Set Periodic Keep Alive Frequency in milliseconds.
+ *
+ * @param keepAliveMs keep alive frequency in milliseconds.
+ * @return distributedlog configuration
+ * @see #getPeriodicKeepAliveMilliSeconds()
+ */
+ public DistributedLogConfiguration setPeriodicKeepAliveMilliSeconds(int keepAliveMs) {
+ setProperty(BKDL_PERIODIC_KEEP_ALIVE_MILLISECONDS, keepAliveMs);
+ return this;
+ }
+
//
// DL Retention/Truncation Settings
//
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b4150fc8/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java
index 5c50282..32def94 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java
@@ -58,6 +58,7 @@ public class DistributedLogConstants {
public static final String COMPLETED_LOGSEGMENT_PREFIX = "logrecs";
public static final String DISALLOW_PLACEMENT_IN_REGION_FEATURE_NAME = "disallow_bookie_placement";
static final byte[] CONTROL_RECORD_CONTENT = "control".getBytes(UTF_8);
+ public static final byte[] KEEPALIVE_RECORD_CONTENT = "keepalive".getBytes(UTF_8);
// An ACL that gives all permissions to node creators and read permissions only to everyone else.
public static final List<ACL> EVERYONE_READ_CREATOR_ALL =
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b4150fc8/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
index 8011a04..a4832b0 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
@@ -2115,4 +2115,57 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
}
}
}
+
+ @Test(timeout = 60000)
+ public void testIdleReaderExceptionWhenKeepAliveIsDisabled() throws Exception {
+ String name = runtime.getMethodName();
+ DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
+ confLocal.addConfiguration(testConf);
+ confLocal.setOutputBufferSize(0);
+ confLocal.setImmediateFlushEnabled(false);
+ confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
+ confLocal.setPeriodicKeepAliveMilliSeconds(0);
+ confLocal.setReaderIdleWarnThresholdMillis(20);
+ confLocal.setReaderIdleErrorThresholdMillis(40);
+
+ URI uri = createDLMURI("/" + name);
+ ensureURICreated(uri);
+
+ DistributedLogManager dlm = createNewDLM(confLocal, name);
+ BKAsyncLogWriter writer = (BKAsyncLogWriter) FutureUtils.result(dlm.openAsyncLogWriter());
+ writer.write(DLMTestUtil.getLogRecordInstance(1L));
+
+ AsyncLogReader reader = FutureUtils.result(dlm.openAsyncLogReader(DLSN.InitialDLSN));
+ try {
+ FutureUtils.result(reader.readNext());
+ fail("Should fail when stream is idle");
+ } catch (IdleReaderException ire) {
+ // expected
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testIdleReaderExceptionWhenKeepAliveIsEnabled() throws Exception {
+ String name = runtime.getMethodName();
+ DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
+ confLocal.addConfiguration(testConf);
+ confLocal.setOutputBufferSize(0);
+ confLocal.setImmediateFlushEnabled(false);
+ confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
+ confLocal.setPeriodicKeepAliveMilliSeconds(1000);
+ confLocal.setReaderIdleWarnThresholdMillis(2000);
+ confLocal.setReaderIdleErrorThresholdMillis(4000);
+
+ URI uri = createDLMURI("/" + name);
+ ensureURICreated(uri);
+
+ DistributedLogManager dlm = createNewDLM(confLocal, name);
+ BKAsyncLogWriter writer = (BKAsyncLogWriter) FutureUtils.result(dlm.openAsyncLogWriter());
+ writer.write(DLMTestUtil.getLogRecordInstance(1L));
+
+ AsyncLogReader reader = FutureUtils.result(dlm.openAsyncLogReader(DLSN.InitialDLSN));
+ LogRecordWithDLSN record = FutureUtils.result(reader.readNext());
+ assertEquals(1L, record.getTransactionId());
+ DLMTestUtil.verifyLogRecord(record);
+ }
}