You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2016/12/17 06:43:20 UTC

incubator-distributedlog git commit: DL-87: Introduce periodic keepalive control record in writer

Repository: incubator-distributedlog
Updated Branches:
  refs/heads/master 40df29d9b -> b4150fc84


DL-87: Introduce periodic keepalive control record in writer

merge twitter's change from Leigh Stewart

Author: Jordan Bull <jb...@twitter.com>
Author: Sijie Guo <si...@twitter.com>
Author: Leigh Stewart <ls...@twitter.com>

Reviewers: Leigh Stewart <ls...@apache.org>

Closes #59 from sijie/merge/DL-87


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/b4150fc8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/b4150fc8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/b4150fc8

Branch: refs/heads/master
Commit: b4150fc842e63ea43dc648df21c0c2d00489282b
Parents: 40df29d
Author: Jordan Bull <jb...@twitter.com>
Authored: Fri Dec 16 22:43:24 2016 -0800
Committer: Sijie Guo <si...@apache.org>
Committed: Fri Dec 16 22:43:24 2016 -0800

----------------------------------------------------------------------
 .../distributedlog/BKLogSegmentWriter.java      | 47 ++++++++++++++++-
 .../DistributedLogConfiguration.java            | 25 +++++++++
 .../distributedlog/DistributedLogConstants.java |  1 +
 .../distributedlog/TestAsyncReaderWriter.java   | 53 ++++++++++++++++++++
 4 files changed, 125 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b4150fc8/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java
index 004b2fb..1b52951 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java
@@ -72,6 +72,7 @@ import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.MathUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.runtime.AbstractFunction1;
@@ -128,6 +129,8 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz
     private long numFlushesSinceRestart = 0;
     private long numBytes = 0;
     private long lastEntryId = Long.MIN_VALUE;
+    private long lastTransmitNanos = Long.MIN_VALUE;
+    private final int periodicKeepAliveMs;
 
     // Indicates whether there are writes that have been successfully transmitted that would need
     // a control record to be transmitted to make them visible to the readers by updating the last
@@ -137,7 +140,8 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz
     private int minDelayBetweenImmediateFlushMs = 0;
     private Stopwatch lastTransmit;
     private boolean streamEnded = false;
-    private ScheduledFuture<?> periodicFlushSchedule = null;
+    private final ScheduledFuture<?> periodicFlushSchedule;
+    private final ScheduledFuture<?> periodicKeepAliveSchedule;
     final private AtomicReference<ScheduledFuture<?>> transmitSchedFutureRef = new AtomicReference<ScheduledFuture<?>>(null);
     final private AtomicReference<ScheduledFuture<?>> immFlushSchedFutureRef = new AtomicReference<ScheduledFuture<?>>(null);
     final private AtomicReference<Exception> scheduledFlushException = new AtomicReference<Exception>(null);
@@ -312,11 +316,25 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz
             if (periodicFlushFrequency > 0 && scheduler != null) {
                 periodicFlushSchedule = scheduler.scheduleAtFixedRate(this,
                         periodicFlushFrequency/2, periodicFlushFrequency/2, TimeUnit.MILLISECONDS);
+            } else {
+                periodicFlushSchedule = null;
             }
         } else {
             // Min delay heuristic applies only when immediate flush is enabled
             // and transmission threshold is zero
             minDelayBetweenImmediateFlushMs = conf.getMinDelayBetweenImmediateFlushMs();
+            periodicFlushSchedule = null;
+        }
+        this.periodicKeepAliveMs = conf.getPeriodicKeepAliveMilliSeconds();
+        if (periodicKeepAliveMs > 0 && scheduler != null) {
+            periodicKeepAliveSchedule = scheduler.scheduleAtFixedRate(new Runnable() {
+                @Override
+                public void run() {
+                    keepAlive();
+                }
+            }, periodicKeepAliveMs, periodicKeepAliveMs, TimeUnit.MILLISECONDS);
+        } else {
+            periodicKeepAliveSchedule = null;
         }
 
         this.conf = conf;
@@ -513,6 +531,13 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz
     private void closeInternal(final boolean abort,
                                final AtomicReference<Throwable> throwExc,
                                final Promise<Void> closePromise) {
+        // Cancel the periodic keep alive schedule first
+        if (null != periodicKeepAliveSchedule) {
+            if (!periodicKeepAliveSchedule.cancel(false)) {
+                LOG.info("Periodic keepalive for log segment {} isn't cancelled.", getFullyQualifiedLogSegment());
+            }
+        }
+
         // Cancel the periodic flush schedule first
         // The task is allowed to exit gracefully
         if (null != periodicFlushSchedule) {
@@ -1079,6 +1104,9 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz
             }
 
             synchronized (this) {
+                // update the transmit timestamp
+                lastTransmitNanos = MathUtils.nowInNano();
+
                 BKTransmitPacket packet = new BKTransmitPacket(recordSetToTransmit);
                 packetPrevious = packet;
                 entryWriter.asyncAddEntry(toSend.getData(), 0, toSend.size(),
@@ -1293,4 +1321,21 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz
         }
     }
 
+    synchronized private void keepAlive() {
+        if (null != closeFuture) {
+            // if the log segment is closing, skip sending any keep alive records.
+            LOG.debug("Skip sending keepAlive control record since log segment {} is closing.",
+                    getFullyQualifiedLogSegment());
+            return;
+        }
+
+        if (MathUtils.elapsedMSec(lastTransmitNanos) < periodicKeepAliveMs) {
+            return;
+        }
+
+        LogRecord controlRec = new LogRecord(lastTxId, DistributedLogConstants.KEEPALIVE_RECORD_CONTENT);
+        controlRec.setControl();
+        asyncWrite(controlRec);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b4150fc8/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
index d2af862..c2057df 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
@@ -249,6 +249,8 @@ public class DistributedLogConfiguration extends CompositeConfiguration {
     public static final boolean BKDL_ENABLE_IMMEDIATE_FLUSH_DEFAULT = false;
     public static final String BKDL_MINIMUM_DELAY_BETWEEN_IMMEDIATE_FLUSH_MILLISECONDS = "minimumDelayBetweenImmediateFlushMilliSeconds";
     public static final int BKDL_MINIMUM_DELAY_BETWEEN_IMMEDIATE_FLUSH_MILLISECONDS_DEFAULT = 0;
+    public static final String BKDL_PERIODIC_KEEP_ALIVE_MILLISECONDS = "periodicKeepAliveMilliSeconds";
+    public static final int BKDL_PERIODIC_KEEP_ALIVE_MILLISECONDS_DEFAULT = 0;
 
     // Retention/Truncation Settings
     public static final String BKDL_RETENTION_PERIOD_IN_HOURS = "logSegmentRetentionHours";
@@ -1893,6 +1895,29 @@ public class DistributedLogConfiguration extends CompositeConfiguration {
         return this;
     }
 
+    /**
+     * Get Periodic Keep Alive Frequency in milliseconds.
+     * <p>If the setting is set with a positive value, it would periodically write a control record
+     * to keep the stream active. The default value is 0.
+     *
+     * @return periodic keep alive frequency in milliseconds.
+     */
+    public int getPeriodicKeepAliveMilliSeconds() {
+        return this.getInt(BKDL_PERIODIC_KEEP_ALIVE_MILLISECONDS, BKDL_PERIODIC_KEEP_ALIVE_MILLISECONDS_DEFAULT);
+    }
+
+    /**
+     * Set Periodic Keep Alive Frequency in milliseconds.
+     *
+     * @param keepAliveMs keep alive frequency in milliseconds.
+     * @return distributedlog configuration
+     * @see #getPeriodicKeepAliveMilliSeconds()
+     */
+    public DistributedLogConfiguration setPeriodicKeepAliveMilliSeconds(int keepAliveMs) {
+        setProperty(BKDL_PERIODIC_KEEP_ALIVE_MILLISECONDS, keepAliveMs);
+        return this;
+    }
+
     //
     // DL Retention/Truncation Settings
     //

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b4150fc8/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java
index 5c50282..32def94 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java
@@ -58,6 +58,7 @@ public class DistributedLogConstants {
     public static final String COMPLETED_LOGSEGMENT_PREFIX = "logrecs";
     public static final String DISALLOW_PLACEMENT_IN_REGION_FEATURE_NAME = "disallow_bookie_placement";
     static final byte[] CONTROL_RECORD_CONTENT = "control".getBytes(UTF_8);
+    public static final byte[] KEEPALIVE_RECORD_CONTENT = "keepalive".getBytes(UTF_8);
 
     // An ACL that gives all permissions to node creators and read permissions only to everyone else.
     public static final List<ACL> EVERYONE_READ_CREATOR_ALL =

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b4150fc8/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
index 8011a04..a4832b0 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
@@ -2115,4 +2115,57 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
             }
         }
     }
+
+    @Test(timeout = 60000)
+    public void testIdleReaderExceptionWhenKeepAliveIsDisabled() throws Exception {
+        String name = runtime.getMethodName();
+        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
+        confLocal.addConfiguration(testConf);
+        confLocal.setOutputBufferSize(0);
+        confLocal.setImmediateFlushEnabled(false);
+        confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
+        confLocal.setPeriodicKeepAliveMilliSeconds(0);
+        confLocal.setReaderIdleWarnThresholdMillis(20);
+        confLocal.setReaderIdleErrorThresholdMillis(40);
+
+        URI uri = createDLMURI("/" + name);
+        ensureURICreated(uri);
+
+        DistributedLogManager dlm = createNewDLM(confLocal, name);
+        BKAsyncLogWriter writer = (BKAsyncLogWriter) FutureUtils.result(dlm.openAsyncLogWriter());
+        writer.write(DLMTestUtil.getLogRecordInstance(1L));
+
+        AsyncLogReader reader = FutureUtils.result(dlm.openAsyncLogReader(DLSN.InitialDLSN));
+        try {
+            FutureUtils.result(reader.readNext());
+            fail("Should fail when stream is idle");
+        } catch (IdleReaderException ire) {
+            // expected
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testIdleReaderExceptionWhenKeepAliveIsEnabled() throws Exception {
+        String name = runtime.getMethodName();
+        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
+        confLocal.addConfiguration(testConf);
+        confLocal.setOutputBufferSize(0);
+        confLocal.setImmediateFlushEnabled(false);
+        confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
+        confLocal.setPeriodicKeepAliveMilliSeconds(1000);
+        confLocal.setReaderIdleWarnThresholdMillis(2000);
+        confLocal.setReaderIdleErrorThresholdMillis(4000);
+
+        URI uri = createDLMURI("/" + name);
+        ensureURICreated(uri);
+
+        DistributedLogManager dlm = createNewDLM(confLocal, name);
+        BKAsyncLogWriter writer = (BKAsyncLogWriter) FutureUtils.result(dlm.openAsyncLogWriter());
+        writer.write(DLMTestUtil.getLogRecordInstance(1L));
+
+        AsyncLogReader reader = FutureUtils.result(dlm.openAsyncLogReader(DLSN.InitialDLSN));
+        LogRecordWithDLSN record = FutureUtils.result(reader.readNext());
+        assertEquals(1L, record.getTransactionId());
+        DLMTestUtil.verifyLogRecord(record);
+    }
 }