You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2016/12/21 08:00:12 UTC

[04/29] incubator-distributedlog git commit: Introduce periodic keepalive control record in writer

Introduce periodic keepalive control record in writer

    * so the writer will periodically write 'keepalive' control record to make sure the stream is alive. so if the write proxy is disconnected from bookies, the control record will fail to write hence write proxy gets a chance to drop the ownership.


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/517c77c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/517c77c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/517c77c1

Branch: refs/heads/merge/DL-98
Commit: 517c77c164cb989ae9829cbd80bf2e492eb8e364
Parents: 7b46a9a
Author: Leigh Stewart <ls...@twitter.com>
Authored: Mon Dec 12 16:33:33 2016 -0800
Committer: Sijie Guo <si...@twitter.com>
Committed: Mon Dec 12 16:33:33 2016 -0800

----------------------------------------------------------------------
 .../distributedlog/BKLogSegmentWriter.java      | 47 ++++++++++++++++-
 .../DistributedLogConfiguration.java            | 25 +++++++++
 .../distributedlog/DistributedLogConstants.java |  1 +
 .../distributedlog/TestAsyncReaderWriter.java   | 53 ++++++++++++++++++++
 4 files changed, 125 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/517c77c1/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java
index 004b2fb..1b52951 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java
@@ -72,6 +72,7 @@ import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.MathUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.runtime.AbstractFunction1;
@@ -128,6 +129,8 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz
     private long numFlushesSinceRestart = 0;
     private long numBytes = 0;
     private long lastEntryId = Long.MIN_VALUE;
+    private long lastTransmitNanos = Long.MIN_VALUE;
+    private final int periodicKeepAliveMs;
 
     // Indicates whether there are writes that have been successfully transmitted that would need
     // a control record to be transmitted to make them visible to the readers by updating the last
@@ -137,7 +140,8 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz
     private int minDelayBetweenImmediateFlushMs = 0;
     private Stopwatch lastTransmit;
     private boolean streamEnded = false;
-    private ScheduledFuture<?> periodicFlushSchedule = null;
+    private final ScheduledFuture<?> periodicFlushSchedule;
+    private final ScheduledFuture<?> periodicKeepAliveSchedule;
     final private AtomicReference<ScheduledFuture<?>> transmitSchedFutureRef = new AtomicReference<ScheduledFuture<?>>(null);
     final private AtomicReference<ScheduledFuture<?>> immFlushSchedFutureRef = new AtomicReference<ScheduledFuture<?>>(null);
     final private AtomicReference<Exception> scheduledFlushException = new AtomicReference<Exception>(null);
@@ -312,11 +316,25 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz
             if (periodicFlushFrequency > 0 && scheduler != null) {
                 periodicFlushSchedule = scheduler.scheduleAtFixedRate(this,
                         periodicFlushFrequency/2, periodicFlushFrequency/2, TimeUnit.MILLISECONDS);
+            } else {
+                periodicFlushSchedule = null;
             }
         } else {
             // Min delay heuristic applies only when immediate flush is enabled
             // and transmission threshold is zero
             minDelayBetweenImmediateFlushMs = conf.getMinDelayBetweenImmediateFlushMs();
+            periodicFlushSchedule = null;
+        }
+        this.periodicKeepAliveMs = conf.getPeriodicKeepAliveMilliSeconds();
+        if (periodicKeepAliveMs > 0 && scheduler != null) {
+            periodicKeepAliveSchedule = scheduler.scheduleAtFixedRate(new Runnable() {
+                @Override
+                public void run() {
+                    keepAlive();
+                }
+            }, periodicKeepAliveMs, periodicKeepAliveMs, TimeUnit.MILLISECONDS);
+        } else {
+            periodicKeepAliveSchedule = null;
         }
 
         this.conf = conf;
@@ -513,6 +531,13 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz
     private void closeInternal(final boolean abort,
                                final AtomicReference<Throwable> throwExc,
                                final Promise<Void> closePromise) {
+        // Cancel the periodic keep alive schedule first
+        if (null != periodicKeepAliveSchedule) {
+            if (!periodicKeepAliveSchedule.cancel(false)) {
+                LOG.info("Periodic keepalive for log segment {} isn't cancelled.", getFullyQualifiedLogSegment());
+            }
+        }
+
         // Cancel the periodic flush schedule first
         // The task is allowed to exit gracefully
         if (null != periodicFlushSchedule) {
@@ -1079,6 +1104,9 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz
             }
 
             synchronized (this) {
+                // update the transmit timestamp
+                lastTransmitNanos = MathUtils.nowInNano();
+
                 BKTransmitPacket packet = new BKTransmitPacket(recordSetToTransmit);
                 packetPrevious = packet;
                 entryWriter.asyncAddEntry(toSend.getData(), 0, toSend.size(),
@@ -1293,4 +1321,21 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz
         }
     }
 
+    synchronized private void keepAlive() {
+        if (null != closeFuture) {
+            // if the log segment is closing, skip sending any keep alive records.
+            LOG.debug("Skip sending keepAlive control record since log segment {} is closing.",
+                    getFullyQualifiedLogSegment());
+            return;
+        }
+
+        if (MathUtils.elapsedMSec(lastTransmitNanos) < periodicKeepAliveMs) {
+            return;
+        }
+
+        LogRecord controlRec = new LogRecord(lastTxId, DistributedLogConstants.KEEPALIVE_RECORD_CONTENT);
+        controlRec.setControl();
+        asyncWrite(controlRec);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/517c77c1/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
index d2af862..c2057df 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
@@ -249,6 +249,8 @@ public class DistributedLogConfiguration extends CompositeConfiguration {
     public static final boolean BKDL_ENABLE_IMMEDIATE_FLUSH_DEFAULT = false;
     public static final String BKDL_MINIMUM_DELAY_BETWEEN_IMMEDIATE_FLUSH_MILLISECONDS = "minimumDelayBetweenImmediateFlushMilliSeconds";
     public static final int BKDL_MINIMUM_DELAY_BETWEEN_IMMEDIATE_FLUSH_MILLISECONDS_DEFAULT = 0;
+    public static final String BKDL_PERIODIC_KEEP_ALIVE_MILLISECONDS = "periodicKeepAliveMilliSeconds";
+    public static final int BKDL_PERIODIC_KEEP_ALIVE_MILLISECONDS_DEFAULT = 0;
 
     // Retention/Truncation Settings
     public static final String BKDL_RETENTION_PERIOD_IN_HOURS = "logSegmentRetentionHours";
@@ -1893,6 +1895,29 @@ public class DistributedLogConfiguration extends CompositeConfiguration {
         return this;
     }
 
+    /**
+     * Get Periodic Keep Alive Frequency in milliseconds.
+     * <p>If the setting is set with a positive value, it would periodically write a control record
+     * to keep the stream active. The default value is 0.
+     *
+     * @return periodic keep alive frequency in milliseconds.
+     */
+    public int getPeriodicKeepAliveMilliSeconds() {
+        return this.getInt(BKDL_PERIODIC_KEEP_ALIVE_MILLISECONDS, BKDL_PERIODIC_KEEP_ALIVE_MILLISECONDS_DEFAULT);
+    }
+
+    /**
+     * Set Periodic Keep Alive Frequency in milliseconds.
+     *
+     * @param keepAliveMs keep alive frequency in milliseconds.
+     * @return distributedlog configuration
+     * @see #getPeriodicKeepAliveMilliSeconds()
+     */
+    public DistributedLogConfiguration setPeriodicKeepAliveMilliSeconds(int keepAliveMs) {
+        setProperty(BKDL_PERIODIC_KEEP_ALIVE_MILLISECONDS, keepAliveMs);
+        return this;
+    }
+
     //
     // DL Retention/Truncation Settings
     //

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/517c77c1/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java
index 5c50282..32def94 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java
@@ -58,6 +58,7 @@ public class DistributedLogConstants {
     public static final String COMPLETED_LOGSEGMENT_PREFIX = "logrecs";
     public static final String DISALLOW_PLACEMENT_IN_REGION_FEATURE_NAME = "disallow_bookie_placement";
     static final byte[] CONTROL_RECORD_CONTENT = "control".getBytes(UTF_8);
+    public static final byte[] KEEPALIVE_RECORD_CONTENT = "keepalive".getBytes(UTF_8);
 
     // An ACL that gives all permissions to node creators and read permissions only to everyone else.
     public static final List<ACL> EVERYONE_READ_CREATOR_ALL =

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/517c77c1/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
index 8011a04..a4832b0 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
@@ -2115,4 +2115,57 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
             }
         }
     }
+
+    @Test(timeout = 60000)
+    public void testIdleReaderExceptionWhenKeepAliveIsDisabled() throws Exception {
+        String name = runtime.getMethodName();
+        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
+        confLocal.addConfiguration(testConf);
+        confLocal.setOutputBufferSize(0);
+        confLocal.setImmediateFlushEnabled(false);
+        confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
+        confLocal.setPeriodicKeepAliveMilliSeconds(0);
+        confLocal.setReaderIdleWarnThresholdMillis(20);
+        confLocal.setReaderIdleErrorThresholdMillis(40);
+
+        URI uri = createDLMURI("/" + name);
+        ensureURICreated(uri);
+
+        DistributedLogManager dlm = createNewDLM(confLocal, name);
+        BKAsyncLogWriter writer = (BKAsyncLogWriter) FutureUtils.result(dlm.openAsyncLogWriter());
+        writer.write(DLMTestUtil.getLogRecordInstance(1L));
+
+        AsyncLogReader reader = FutureUtils.result(dlm.openAsyncLogReader(DLSN.InitialDLSN));
+        try {
+            FutureUtils.result(reader.readNext());
+            fail("Should fail when stream is idle");
+        } catch (IdleReaderException ire) {
+            // expected
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testIdleReaderExceptionWhenKeepAliveIsEnabled() throws Exception {
+        String name = runtime.getMethodName();
+        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
+        confLocal.addConfiguration(testConf);
+        confLocal.setOutputBufferSize(0);
+        confLocal.setImmediateFlushEnabled(false);
+        confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
+        confLocal.setPeriodicKeepAliveMilliSeconds(1000);
+        confLocal.setReaderIdleWarnThresholdMillis(2000);
+        confLocal.setReaderIdleErrorThresholdMillis(4000);
+
+        URI uri = createDLMURI("/" + name);
+        ensureURICreated(uri);
+
+        DistributedLogManager dlm = createNewDLM(confLocal, name);
+        BKAsyncLogWriter writer = (BKAsyncLogWriter) FutureUtils.result(dlm.openAsyncLogWriter());
+        writer.write(DLMTestUtil.getLogRecordInstance(1L));
+
+        AsyncLogReader reader = FutureUtils.result(dlm.openAsyncLogReader(DLSN.InitialDLSN));
+        LogRecordWithDLSN record = FutureUtils.result(reader.readNext());
+        assertEquals(1L, record.getTransactionId());
+        DLMTestUtil.verifyLogRecord(record);
+    }
 }