You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2016/08/08 12:57:05 UTC

cassandra git commit: Restore resumable hints delivery

Repository: cassandra
Updated Branches:
  refs/heads/trunk d5c53d2bb -> cee22ad54


Restore resumable hints delivery

patch by Stefan Podkowinski and Branimir Lambov; reviewed by Branimir Lambov for CASSANDRA-11960


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cee22ad5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cee22ad5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cee22ad5

Branch: refs/heads/trunk
Commit: cee22ad54d7c28aaf0868dd45aeb9b5b708b0e78
Parents: d5c53d2
Author: Stefan Podkowinski <s....@gmail.com>
Authored: Thu Jun 16 16:31:54 2016 +0200
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Mon Aug 8 13:56:57 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/hints/ChecksummedDataInput.java   |  53 +++-
 .../hints/CompressedChecksummedDataInput.java   |  51 +++-
 .../hints/EncryptedChecksummedDataInput.java    |  50 +++-
 .../cassandra/hints/HintsDispatchExecutor.java  |  21 +-
 .../apache/cassandra/hints/HintsDispatcher.java |  48 ++--
 .../org/apache/cassandra/hints/HintsReader.java |  34 +--
 .../apache/cassandra/hints/HintsService.java    |  27 +-
 .../org/apache/cassandra/hints/HintsStore.java  |  14 +-
 .../apache/cassandra/hints/InputPosition.java   |   9 +
 .../cassandra/security/EncryptionUtils.java     |   5 +
 .../cassandra/service/StorageService.java       |   2 +-
 .../apache/cassandra/hints/AlteredHints.java    |  17 ++
 .../cassandra/hints/HintsCatalogTest.java       |  31 +++
 .../cassandra/hints/HintsServiceTest.java       | 254 +++++++++++++++++++
 15 files changed, 540 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/cee22ad5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 15e5001..6fdc04a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -32,6 +32,7 @@
 
 
 3.9
+ * Restore resumable hints delivery (CASSANDRA-11960)
  * Fix nodetool tablestats miss SSTable count (CASSANDRA-12205)
  * Fixed flacky SSTablesIteratedTest (CASSANDRA-12282)
  * Fixed flacky SSTableRewriterTest: check file counts before calling validateCFS (CASSANDRA-12348)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cee22ad5/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
index 8bb5b6d..0db95af 100644
--- a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
+++ b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
@@ -26,18 +26,19 @@ import com.google.common.base.Preconditions;
 
 import org.apache.cassandra.io.compress.BufferType;
 import org.apache.cassandra.io.util.*;
+import org.apache.cassandra.utils.CLibrary;
 import org.apache.cassandra.utils.memory.BufferPool;
 
 /**
- * A {@link RandomAccessReader} wrapper that calctulates the CRC in place.
+ * A {@link RandomAccessReader} wrapper that calculates the CRC in place.
  *
  * Useful for {@link org.apache.cassandra.hints.HintsReader}, for example, where we must verify the CRC, yet don't want
  * to allocate an extra byte array just that purpose. The CRC can be embedded in the input stream and checked via checkCrc().
  *
- * In addition to calculating the CRC, it allows to enforce a maximim known size. This is needed
+ * In addition to calculating the CRC, it allows to enforce a maximum known size. This is needed
  * so that {@link org.apache.cassandra.db.Mutation.MutationSerializer} doesn't blow up the heap when deserializing a
  * corrupted sequence by reading a huge corrupted length of bytes via
- * via {@link org.apache.cassandra.utils.ByteBufferUtil#readWithLength(java.io.DataInput)}.
+ * {@link org.apache.cassandra.utils.ByteBufferUtil#readWithLength(java.io.DataInput)}.
  */
 public class ChecksummedDataInput extends RebufferingInputStream
 {
@@ -81,13 +82,37 @@ public class ChecksummedDataInput extends RebufferingInputStream
         return getPosition() == channel.size();
     }
 
+    static class Position implements InputPosition
+    {
+        final long sourcePosition;
+
+        public Position(long sourcePosition)
+        {
+            super();
+            this.sourcePosition = sourcePosition;
+        }
+
+        @Override
+        public long subtract(InputPosition other)
+        {
+            return sourcePosition - ((Position)other).sourcePosition;
+        }
+    }
+
     /**
-     * Returns the position in the source file, which is different for getPosition() for compressed/encrypted files
-     * and may be imprecise.
+     * Return a seekable representation of the current position. For compressed files this is chunk position
+     * in file and offset within chunk.
      */
-    public long getSourcePosition()
+    public InputPosition getSeekPosition()
     {
-        return getPosition();
+        return new Position(getPosition());
+    }
+
+    public void seek(InputPosition pos)
+    {
+        updateCrc();
+        bufferOffset = ((Position) pos).sourcePosition;
+        buffer.position(0).limit(0);
     }
 
     public void resetCrc()
@@ -110,6 +135,15 @@ public class ChecksummedDataInput extends RebufferingInputStream
         return bufferOffset + buffer.position();
     }
 
+    /**
+     * Returns the position in the source file, which is different for getPosition() for compressed/encrypted files
+     * and may be imprecise.
+     */
+    protected long getSourcePosition()
+    {
+        return bufferOffset;
+    }
+
     public void resetLimit()
     {
         limit = Long.MAX_VALUE;
@@ -179,6 +213,11 @@ public class ChecksummedDataInput extends RebufferingInputStream
         buffer.flip();
     }
 
+    public void tryUncacheRead()
+    {
+        CLibrary.trySkipCache(getChannel().getFileDescriptor(), 0, getSourcePosition(), getPath());
+    }
+
     private void updateCrc()
     {
         if (crcPosition == buffer.position() || crcUpdateDisabled)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cee22ad5/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java
index f584dd1..0766fa5 100644
--- a/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java
+++ b/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import org.apache.cassandra.hints.ChecksummedDataInput.Position;
 import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.compress.ICompressor;
 import org.apache.cassandra.io.util.ChannelProxy;
@@ -31,7 +32,8 @@ import org.apache.cassandra.utils.memory.BufferPool;
 public final class CompressedChecksummedDataInput extends ChecksummedDataInput
 {
     private final ICompressor compressor;
-    private volatile long filePosition = 0;
+    private volatile long filePosition = 0;     // Current position in file, advanced when reading chunk.
+    private volatile long sourcePosition = 0;   // Current position in file to report, advanced after consuming chunk.
     private volatile ByteBuffer compressedBuffer = null;
     private final ByteBuffer metadataBuffer = ByteBuffer.allocate(CompressedHintsWriter.METADATA_SIZE);
 
@@ -39,7 +41,7 @@ public final class CompressedChecksummedDataInput extends ChecksummedDataInput
     {
         super(channel, compressor.preferredBufferType());
         this.compressor = compressor;
-        this.filePosition = filePosition;
+        this.sourcePosition = this.filePosition = filePosition;
     }
 
     /**
@@ -53,12 +55,55 @@ public final class CompressedChecksummedDataInput extends ChecksummedDataInput
 
     public long getSourcePosition()
     {
-        return filePosition;
+        return sourcePosition;
+    }
+
+    static class Position extends ChecksummedDataInput.Position
+    {
+        final long bufferStart;
+        final int bufferPosition;
+
+        public Position(long sourcePosition, long bufferStart, int bufferPosition)
+        {
+            super(sourcePosition);
+            this.bufferStart = bufferStart;
+            this.bufferPosition = bufferPosition;
+        }
+
+        @Override
+        public long subtract(InputPosition o)
+        {
+            Position other = (Position) o;
+            return bufferStart - other.bufferStart + bufferPosition - other.bufferPosition;
+        }
+    }
+
+    public InputPosition getSeekPosition()
+    {
+        return new Position(sourcePosition, bufferOffset, buffer.position());
+    }
+
+    public void seek(InputPosition p)
+    {
+        Position pos = (Position) p;
+        bufferOffset = pos.bufferStart;
+        filePosition = pos.sourcePosition;
+        buffer.position(0).limit(0);
+        resetCrc();
+        reBuffer();
+        buffer.position(pos.bufferPosition);
+        assert sourcePosition == pos.sourcePosition;
+        assert bufferOffset == pos.bufferStart;
+        assert buffer.position() == pos.bufferPosition;
     }
 
     @Override
     protected void readBuffer()
     {
+        sourcePosition = filePosition;
+        if (isEOF())
+            return;
+
         metadataBuffer.clear();
         channel.read(metadataBuffer, filePosition);
         filePosition += CompressedHintsWriter.METADATA_SIZE;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cee22ad5/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java
index 7ecfbfe..b335226 100644
--- a/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java
+++ b/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java
@@ -24,6 +24,7 @@ import javax.crypto.Cipher;
 import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.cassandra.security.EncryptionUtils;
+import org.apache.cassandra.hints.CompressedChecksummedDataInput.Position;
 import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.compress.ICompressor;
 import org.apache.cassandra.io.util.ChannelProxy;
@@ -42,6 +43,7 @@ public class EncryptedChecksummedDataInput extends ChecksummedDataInput
     private final ICompressor compressor;
 
     private final EncryptionUtils.ChannelProxyReadChannel readChannel;
+    private long sourcePosition;
 
     protected EncryptedChecksummedDataInput(ChannelProxy channel, Cipher cipher, ICompressor compressor, long filePosition)
     {
@@ -49,6 +51,7 @@ public class EncryptedChecksummedDataInput extends ChecksummedDataInput
         this.cipher = cipher;
         this.compressor = compressor;
         readChannel = new EncryptionUtils.ChannelProxyReadChannel(channel, filePosition);
+        this.sourcePosition = filePosition;
         assert cipher != null;
         assert compressor != null;
     }
@@ -59,17 +62,60 @@ public class EncryptedChecksummedDataInput extends ChecksummedDataInput
      */
     public boolean isEOF()
     {
-        return getSourcePosition() == channel.size() && buffer.remaining() == 0;
+        return readChannel.getCurrentPosition() == channel.size() && buffer.remaining() == 0;
     }
 
     public long getSourcePosition()
     {
-        return readChannel.getCurrentPosition();
+        return sourcePosition;
+    }
+
+    static class Position extends ChecksummedDataInput.Position
+    {
+        final long bufferStart;
+        final int bufferPosition;
+
+        public Position(long sourcePosition, long bufferStart, int bufferPosition)
+        {
+            super(sourcePosition);
+            this.bufferStart = bufferStart;
+            this.bufferPosition = bufferPosition;
+        }
+
+        @Override
+        public long subtract(InputPosition o)
+        {
+            Position other = (Position) o;
+            return bufferStart - other.bufferStart + bufferPosition - other.bufferPosition;
+        }
+    }
+
+    public InputPosition getSeekPosition()
+    {
+        return new Position(sourcePosition, bufferOffset, buffer.position());
+    }
+
+    public void seek(InputPosition p)
+    {
+        Position pos = (Position) p;
+        bufferOffset = pos.bufferStart;
+        readChannel.setPosition(pos.sourcePosition);
+        buffer.position(0).limit(0);
+        resetCrc();
+        reBuffer();
+        buffer.position(pos.bufferPosition);
+        assert sourcePosition == pos.sourcePosition;
+        assert bufferOffset == pos.bufferStart;
+        assert buffer.position() == pos.bufferPosition;
     }
 
     @Override
     protected void readBuffer()
     {
+        this.sourcePosition = readChannel.getCurrentPosition();
+        if (isEOF())
+            return;
+
         try
         {
             ByteBuffer byteBuffer = reusableBuffers.get();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cee22ad5/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
index 5292dc1..d7ccf81 100644
--- a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
+++ b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
@@ -23,6 +23,8 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BooleanSupplier;
+import java.util.function.Function;
 import java.util.function.Supplier;
 
 import com.google.common.util.concurrent.RateLimiter;
@@ -48,12 +50,14 @@ final class HintsDispatchExecutor
     private final File hintsDirectory;
     private final ExecutorService executor;
     private final AtomicBoolean isPaused;
+    private final Function<InetAddress, Boolean> isAlive;
     private final Map<UUID, Future> scheduledDispatches;
 
-    HintsDispatchExecutor(File hintsDirectory, int maxThreads, AtomicBoolean isPaused)
+    HintsDispatchExecutor(File hintsDirectory, int maxThreads, AtomicBoolean isPaused, Function<InetAddress, Boolean> isAlive)
     {
         this.hintsDirectory = hintsDirectory;
         this.isPaused = isPaused;
+        this.isAlive = isAlive;
 
         scheduledDispatches = new ConcurrentHashMap<>();
         executor = new JMXEnabledThreadPoolExecutor(1,
@@ -72,6 +76,14 @@ final class HintsDispatchExecutor
     {
         scheduledDispatches.clear();
         executor.shutdownNow();
+        try
+        {
+            executor.awaitTermination(1, TimeUnit.MINUTES);
+        }
+        catch (InterruptedException e)
+        {
+            throw new AssertionError(e);
+        }
     }
 
     boolean isScheduled(HintsStore store)
@@ -249,9 +261,10 @@ final class HintsDispatchExecutor
         private boolean deliver(HintsDescriptor descriptor, InetAddress address)
         {
             File file = new File(hintsDirectory, descriptor.fileName());
-            Long offset = store.getDispatchOffset(descriptor).orElse(null);
+            InputPosition offset = store.getDispatchOffset(descriptor);
 
-            try (HintsDispatcher dispatcher = HintsDispatcher.create(file, rateLimiter, address, descriptor.hostId, isPaused))
+            BooleanSupplier shouldAbort = () -> !isAlive.apply(address) || isPaused.get();
+            try (HintsDispatcher dispatcher = HintsDispatcher.create(file, rateLimiter, address, descriptor.hostId, shouldAbort))
             {
                 if (offset != null)
                     dispatcher.seek(offset);
@@ -265,7 +278,7 @@ final class HintsDispatchExecutor
                 }
                 else
                 {
-                    store.markDispatchOffset(descriptor, dispatcher.dispatchOffset());
+                    store.markDispatchOffset(descriptor, dispatcher.dispatchPosition());
                     store.offerFirst(descriptor);
                     logger.info("Finished hinted handoff of file {} to endpoint {}, partially", descriptor.fileName(), hostId);
                     return false;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cee22ad5/src/java/org/apache/cassandra/hints/HintsDispatcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatcher.java b/src/java/org/apache/cassandra/hints/HintsDispatcher.java
index e582d88..00ef52b 100644
--- a/src/java/org/apache/cassandra/hints/HintsDispatcher.java
+++ b/src/java/org/apache/cassandra/hints/HintsDispatcher.java
@@ -22,13 +22,12 @@ import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BooleanSupplier;
 import java.util.function.Function;
 
 import com.google.common.util.concurrent.RateLimiter;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.net.IAsyncCallbackWithFailure;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessagingService;
@@ -42,31 +41,31 @@ import org.apache.cassandra.utils.concurrent.SimpleCondition;
  */
 final class HintsDispatcher implements AutoCloseable
 {
-    private enum Action { CONTINUE, ABORT, RETRY }
+    private enum Action { CONTINUE, ABORT }
 
     private final HintsReader reader;
     private final UUID hostId;
     private final InetAddress address;
     private final int messagingVersion;
-    private final AtomicBoolean isPaused;
+    private final BooleanSupplier abortRequested;
 
-    private long currentPageOffset;
+    private InputPosition currentPagePosition;
 
-    private HintsDispatcher(HintsReader reader, UUID hostId, InetAddress address, int messagingVersion, AtomicBoolean isPaused)
+    private HintsDispatcher(HintsReader reader, UUID hostId, InetAddress address, int messagingVersion, BooleanSupplier abortRequested)
     {
-        currentPageOffset = 0L;
+        currentPagePosition = null;
 
         this.reader = reader;
         this.hostId = hostId;
         this.address = address;
         this.messagingVersion = messagingVersion;
-        this.isPaused = isPaused;
+        this.abortRequested = abortRequested;
     }
 
-    static HintsDispatcher create(File file, RateLimiter rateLimiter, InetAddress address, UUID hostId, AtomicBoolean isPaused)
+    static HintsDispatcher create(File file, RateLimiter rateLimiter, InetAddress address, UUID hostId, BooleanSupplier abortRequested)
     {
         int messagingVersion = MessagingService.instance().getVersion(address);
-        return new HintsDispatcher(HintsReader.open(file, rateLimiter), hostId, address, messagingVersion, isPaused);
+        return new HintsDispatcher(HintsReader.open(file, rateLimiter), hostId, address, messagingVersion, abortRequested);
     }
 
     public void close()
@@ -74,10 +73,9 @@ final class HintsDispatcher implements AutoCloseable
         reader.close();
     }
 
-    void seek(long bytes)
+    void seek(InputPosition position)
     {
-        reader.seek(bytes);
-        currentPageOffset = 0L;
+        reader.seek(position);
     }
 
     /**
@@ -87,7 +85,7 @@ final class HintsDispatcher implements AutoCloseable
     {
         for (HintsReader.Page page : reader)
         {
-            currentPageOffset = page.offset;
+            currentPagePosition = page.position;
             if (dispatch(page) != Action.CONTINUE)
                 return false;
         }
@@ -98,28 +96,16 @@ final class HintsDispatcher implements AutoCloseable
     /**
      * @return offset of the first non-delivered page
      */
-    long dispatchOffset()
+    InputPosition dispatchPosition()
     {
-        return currentPageOffset;
+        return currentPagePosition;
     }
 
-    private boolean isHostAlive()
-    {
-        return FailureDetector.instance.isAlive(address);
-    }
-
-    private boolean isPaused()
-    {
-        return isPaused.get();
-    }
 
     // retry in case of a timeout; stop in case of a failure, host going down, or delivery paused
     private Action dispatch(HintsReader.Page page)
     {
-        Action action = sendHintsAndAwait(page);
-        return action == Action.RETRY
-             ? dispatch(page)
-             : action;
+        return sendHintsAndAwait(page);
     }
 
     private Action sendHintsAndAwait(HintsReader.Page page)
@@ -142,7 +128,7 @@ final class HintsDispatcher implements AutoCloseable
 
         for (Callback cb : callbacks)
             if (cb.await() != Callback.Outcome.SUCCESS)
-                return Action.RETRY;
+                return Action.ABORT;
 
         return Action.CONTINUE;
     }
@@ -155,7 +141,7 @@ final class HintsDispatcher implements AutoCloseable
     {
         while (hints.hasNext())
         {
-            if (!isHostAlive() || isPaused())
+            if (abortRequested.getAsBoolean())
                 return Action.ABORT;
             callbacks.add(sendFunction.apply(hints.next()));
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cee22ad5/src/java/org/apache/cassandra/hints/HintsReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsReader.java b/src/java/org/apache/cassandra/hints/HintsReader.java
index 5e73805..e0a73c1 100644
--- a/src/java/org/apache/cassandra/hints/HintsReader.java
+++ b/src/java/org/apache/cassandra/hints/HintsReader.java
@@ -109,9 +109,9 @@ class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
         return descriptor;
     }
 
-    void seek(long newPosition)
+    void seek(InputPosition newPosition)
     {
-        throw new UnsupportedOperationException("Hints are not seekable.");
+        input.seek(newPosition);
     }
 
     public Iterator<Page> iterator()
@@ -126,21 +126,21 @@ class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
 
     final class Page
     {
-        public final long offset;
+        public final InputPosition position;
 
-        private Page(long offset)
+        private Page(InputPosition inputPosition)
         {
-            this.offset = offset;
+            this.position = inputPosition;
         }
 
         Iterator<Hint> hintsIterator()
         {
-            return new HintsIterator(offset);
+            return new HintsIterator(position);
         }
 
         Iterator<ByteBuffer> buffersIterator()
         {
-            return new BuffersIterator(offset);
+            return new BuffersIterator(position);
         }
     }
 
@@ -149,12 +149,12 @@ class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
         @SuppressWarnings("resource")
         protected Page computeNext()
         {
-            CLibrary.trySkipCache(input.getChannel().getFileDescriptor(), 0, input.getSourcePosition(), input.getPath());
+            input.tryUncacheRead();
 
             if (input.isEOF())
                 return endOfData();
 
-            return new Page(input.getSourcePosition());
+            return new Page(input.getSeekPosition());
         }
     }
 
@@ -163,9 +163,9 @@ class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
      */
     final class HintsIterator extends AbstractIterator<Hint>
     {
-        private final long offset;
+        private final InputPosition offset;
 
-        HintsIterator(long offset)
+        HintsIterator(InputPosition offset)
         {
             super();
             this.offset = offset;
@@ -177,12 +177,12 @@ class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
 
             do
             {
-                long position = input.getSourcePosition();
+                InputPosition position = input.getSeekPosition();
 
                 if (input.isEOF())
                     return endOfData(); // reached EOF
 
-                if (position - offset >= PAGE_SIZE)
+                if (position.subtract(offset) >= PAGE_SIZE)
                     return endOfData(); // read page size or more bytes
 
                 try
@@ -253,9 +253,9 @@ class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
      */
     final class BuffersIterator extends AbstractIterator<ByteBuffer>
     {
-        private final long offset;
+        private final InputPosition offset;
 
-        BuffersIterator(long offset)
+        BuffersIterator(InputPosition offset)
         {
             super();
             this.offset = offset;
@@ -267,12 +267,12 @@ class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
 
             do
             {
-                long position = input.getSourcePosition();
+                InputPosition position = input.getSeekPosition();
 
                 if (input.isEOF())
                     return endOfData(); // reached EOF
 
-                if (position - offset >= PAGE_SIZE)
+                if (position.subtract(offset) >= PAGE_SIZE)
                     return endOfData(); // read page size or more bytes
 
                 try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cee22ad5/src/java/org/apache/cassandra/hints/HintsService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsService.java b/src/java/org/apache/cassandra/hints/HintsService.java
index 5a32786..1a3a403 100644
--- a/src/java/org/apache/cassandra/hints/HintsService.java
+++ b/src/java/org/apache/cassandra/hints/HintsService.java
@@ -30,6 +30,7 @@ import java.util.function.Supplier;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,6 +38,8 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.gms.IFailureDetector;
 import org.apache.cassandra.metrics.HintedHandoffMetrics;
 import org.apache.cassandra.metrics.StorageMetrics;
 import org.apache.cassandra.dht.Token;
@@ -60,7 +63,7 @@ public final class HintsService implements HintsServiceMBean
 {
     private static final Logger logger = LoggerFactory.getLogger(HintsService.class);
 
-    public static final HintsService instance = new HintsService();
+    public static HintsService instance = new HintsService();
 
     private static final String MBEAN_NAME = "org.apache.cassandra.hints:type=HintsService";
 
@@ -82,6 +85,12 @@ public final class HintsService implements HintsServiceMBean
 
     private HintsService()
     {
+        this(FailureDetector.instance);
+    }
+
+    @VisibleForTesting
+    HintsService(IFailureDetector failureDetector)
+    {
         File hintsDirectory = DatabaseDescriptor.getHintsDirectory();
         int maxDeliveryThreads = DatabaseDescriptor.getMaxHintsDeliveryThreads();
 
@@ -92,7 +101,7 @@ public final class HintsService implements HintsServiceMBean
         bufferPool = new HintsBufferPool(bufferSize, writeExecutor::flushBuffer);
 
         isDispatchPaused = new AtomicBoolean(true);
-        dispatchExecutor = new HintsDispatchExecutor(hintsDirectory, maxDeliveryThreads, isDispatchPaused);
+        dispatchExecutor = new HintsDispatchExecutor(hintsDirectory, maxDeliveryThreads, isDispatchPaused, failureDetector::isAlive);
 
         // periodically empty the current content of the buffers
         int flushPeriod = DatabaseDescriptor.getHintsFlushPeriodInMS();
@@ -225,7 +234,7 @@ public final class HintsService implements HintsServiceMBean
      * Will abort dispatch sessions that are currently in progress (which is okay, it's idempotent),
      * and make sure the buffers are flushed, hints files written and fsynced.
      */
-    public synchronized void shutdownBlocking()
+    public synchronized void shutdownBlocking() throws ExecutionException, InterruptedException
     {
         if (isShutDown)
             throw new IllegalStateException("HintsService has already been shut down");
@@ -237,8 +246,8 @@ public final class HintsService implements HintsServiceMBean
 
         triggerFlushingFuture.cancel(false);
 
-        writeExecutor.flushBufferPool(bufferPool);
-        writeExecutor.closeAllWriters();
+        writeExecutor.flushBufferPool(bufferPool).get();
+        writeExecutor.closeAllWriters().get();
 
         dispatchExecutor.shutdownBlocking();
         writeExecutor.shutdownBlocking();
@@ -369,4 +378,12 @@ public final class HintsService implements HintsServiceMBean
     {
         return catalog;
     }
+
+    /**
+     * Returns true in case service is shut down.
+     */
+    public boolean isShutDown()
+    {
+        return isShutDown;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cee22ad5/src/java/org/apache/cassandra/hints/HintsStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsStore.java b/src/java/org/apache/cassandra/hints/HintsStore.java
index c066331..3572172 100644
--- a/src/java/org/apache/cassandra/hints/HintsStore.java
+++ b/src/java/org/apache/cassandra/hints/HintsStore.java
@@ -49,7 +49,7 @@ final class HintsStore
     private final File hintsDirectory;
     private final ImmutableMap<String, Object> writerParams;
 
-    private final Map<HintsDescriptor, Long> dispatchOffsets;
+    private final Map<HintsDescriptor, InputPosition> dispatchPositions;
     private final Deque<HintsDescriptor> dispatchDequeue;
     private final Queue<HintsDescriptor> blacklistedFiles;
 
@@ -63,7 +63,7 @@ final class HintsStore
         this.hintsDirectory = hintsDirectory;
         this.writerParams = writerParams;
 
-        dispatchOffsets = new ConcurrentHashMap<>();
+        dispatchPositions = new ConcurrentHashMap<>();
         dispatchDequeue = new ConcurrentLinkedDeque<>(descriptors);
         blacklistedFiles = new ConcurrentLinkedQueue<>();
 
@@ -136,19 +136,19 @@ final class HintsStore
         return !dispatchDequeue.isEmpty();
     }
 
-    Optional<Long> getDispatchOffset(HintsDescriptor descriptor)
+    InputPosition getDispatchOffset(HintsDescriptor descriptor)
     {
-        return Optional.ofNullable(dispatchOffsets.get(descriptor));
+        return dispatchPositions.get(descriptor);
     }
 
-    void markDispatchOffset(HintsDescriptor descriptor, long mark)
+    void markDispatchOffset(HintsDescriptor descriptor, InputPosition inputPosition)
     {
-        dispatchOffsets.put(descriptor, mark);
+        dispatchPositions.put(descriptor, inputPosition);
     }
 
     void cleanUp(HintsDescriptor descriptor)
     {
-        dispatchOffsets.remove(descriptor);
+        dispatchPositions.remove(descriptor);
     }
 
     void blacklist(HintsDescriptor descriptor)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cee22ad5/src/java/org/apache/cassandra/hints/InputPosition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/InputPosition.java b/src/java/org/apache/cassandra/hints/InputPosition.java
new file mode 100644
index 0000000..05f9db0
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/InputPosition.java
@@ -0,0 +1,9 @@
+package org.apache.cassandra.hints;
+
+/**
+ * Marker interface for file positions as provided by the various ChecksummedDataReader implementations.
+ */
+public interface InputPosition
+{
+    long subtract(InputPosition other);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cee22ad5/src/java/org/apache/cassandra/security/EncryptionUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/security/EncryptionUtils.java b/src/java/org/apache/cassandra/security/EncryptionUtils.java
index 7e72b3e..b262259 100644
--- a/src/java/org/apache/cassandra/security/EncryptionUtils.java
+++ b/src/java/org/apache/cassandra/security/EncryptionUtils.java
@@ -309,5 +309,10 @@ public class EncryptionUtils
         {
             // nop
         }
+
+        public void setPosition(long sourcePosition)
+        {
+            this.currentPosition = sourcePosition;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cee22ad5/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 0a95827..e3b4752 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -570,7 +570,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         drainOnShutdown = new Thread(new WrappedRunnable()
         {
             @Override
-            public void runMayThrow() throws InterruptedException
+            public void runMayThrow() throws InterruptedException, ExecutionException
             {
                 inShutdownHook = true;
                 ExecutorService viewMutationStage = StageManager.getStage(Stage.VIEW_MUTATION);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cee22ad5/test/unit/org/apache/cassandra/hints/AlteredHints.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/hints/AlteredHints.java b/test/unit/org/apache/cassandra/hints/AlteredHints.java
index 23dc32a..7efe08f 100644
--- a/test/unit/org/apache/cassandra/hints/AlteredHints.java
+++ b/test/unit/org/apache/cassandra/hints/AlteredHints.java
@@ -107,9 +107,11 @@ public abstract class AlteredHints
         {
             Assert.assertTrue(looksLegit(reader.getInput()));
             List<Hint> deserialized = new ArrayList<>(hintNum);
+            List<InputPosition> pagePositions = new ArrayList<>(hintNum);
 
             for (HintsReader.Page page: reader)
             {
+                pagePositions.add(page.position);
                 Iterator<Hint> iterator = page.hintsIterator();
                 while (iterator.hasNext())
                 {
@@ -124,6 +126,21 @@ public abstract class AlteredHints
                 HintsTestUtil.assertHintsEqual(expected, deserialized.get(hintNum));
                 hintNum++;
             }
+
+            // explicitely seek to each page by iterating collected page positions and check if hints still match as expected
+            int hintOffset = 0;
+            for (InputPosition pos : pagePositions)
+            {
+                reader.seek(pos);
+                HintsReader.Page page = reader.iterator().next();
+                Iterator<Hint> iterator = page.hintsIterator();
+                while (iterator.hasNext())
+                {
+                    Hint seekedHint = iterator.next();
+                    HintsTestUtil.assertHintsEqual(hints.get(hintOffset), seekedHint);
+                    hintOffset++;
+                }
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cee22ad5/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java b/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java
index 51b6aa3..a255338 100644
--- a/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java
+++ b/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java
@@ -79,6 +79,37 @@ public class HintsCatalogTest
         assertNull(store2.poll());
     }
 
+    @Test
+    public void deleteHintsTest() throws IOException
+    {
+        File directory = Files.createTempDirectory(null).toFile();
+        UUID hostId1 = UUID.randomUUID();
+        UUID hostId2 = UUID.randomUUID();
+        long now = System.currentTimeMillis();
+        writeDescriptor(directory, new HintsDescriptor(hostId1, now));
+        writeDescriptor(directory, new HintsDescriptor(hostId1, now+1));
+        writeDescriptor(directory, new HintsDescriptor(hostId2, now+2));
+        writeDescriptor(directory, new HintsDescriptor(hostId2, now+3));
+
+        // load catalog containing two stores (one for each host)
+        HintsCatalog catalog = HintsCatalog.load(directory, ImmutableMap.of());
+        assertEquals(2, catalog.stores().count());
+        assertTrue(catalog.hasFiles());
+
+        // delete all hints from store 1
+        assertTrue(catalog.get(hostId1).hasFiles());
+        catalog.deleteAllHints(hostId1);
+        assertFalse(catalog.get(hostId1).hasFiles());
+        // stores are still keepts for each host, even after deleting hints
+        assertEquals(2, catalog.stores().count());
+        assertTrue(catalog.hasFiles());
+
+        // delete all hints from all stores
+        catalog.deleteAllHints();
+        assertEquals(2, catalog.stores().count());
+        assertFalse(catalog.hasFiles());
+    }
+
     @SuppressWarnings("EmptyTryBlock")
     private static void writeDescriptor(File directory, HintsDescriptor descriptor) throws IOException
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cee22ad5/test/unit/org/apache/cassandra/hints/HintsServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/hints/HintsServiceTest.java b/test/unit/org/apache/cassandra/hints/HintsServiceTest.java
new file mode 100644
index 0000000..ffb7f73
--- /dev/null
+++ b/test/unit/org/apache/cassandra/hints/HintsServiceTest.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.net.InetAddress;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import javax.annotation.Nullable;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.datastax.driver.core.utils.MoreFutures;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.gms.IFailureDetectionEventListener;
+import org.apache.cassandra.gms.IFailureDetector;
+import org.apache.cassandra.metrics.StorageMetrics;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.MockMessagingService;
+import org.apache.cassandra.net.MockMessagingSpy;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.apache.cassandra.Util.dk;
+import static org.apache.cassandra.net.MockMessagingService.verb;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class HintsServiceTest
+{
+    private static final String KEYSPACE = "hints_service_test";
+    private static final String TABLE = "table";
+
+    private final MockFailureDetector failureDetector = new MockFailureDetector();
+
+    @BeforeClass
+    public static void defineSchema()
+    {
+        SchemaLoader.prepareServer();
+        StorageService.instance.initServer();
+        SchemaLoader.createKeyspace(KEYSPACE,
+                KeyspaceParams.simple(1),
+                SchemaLoader.standardCFMD(KEYSPACE, TABLE));
+    }
+
+    @After
+    public void cleanup()
+    {
+        MockMessagingService.cleanup();
+    }
+
+    @Before
+    public void reinstanciateService() throws ExecutionException, InterruptedException
+    {
+        MessagingService.instance().clearMessageSinks();
+
+        if (!HintsService.instance.isShutDown())
+        {
+            HintsService.instance.shutdownBlocking();
+            HintsService.instance.deleteAllHints();
+        }
+
+        failureDetector.isAlive = true;
+        HintsService.instance = new HintsService(failureDetector);
+        HintsService.instance.startDispatch();
+    }
+
+    @Test
+    public void testDispatchHints() throws InterruptedException, ExecutionException
+    {
+        long cnt = StorageMetrics.totalHints.getCount();
+
+        // create spy for hint messages
+        MockMessagingSpy spy = sendHintsAndResponses(100, -1);
+
+        // metrics should have been updated with number of create hints
+        assertEquals(cnt + 100, StorageMetrics.totalHints.getCount());
+
+        // wait until hints have been send
+        spy.interceptMessageOut(100).get();
+        spy.interceptNoMsg(500, TimeUnit.MILLISECONDS).get();
+    }
+
+    @Test
+    public void testPauseAndResume() throws InterruptedException, ExecutionException
+    {
+        HintsService.instance.pauseDispatch();
+
+        // create spy for hint messages
+        MockMessagingSpy spy = sendHintsAndResponses(100, -1);
+
+        // we should not send any hints while paused
+        ListenableFuture<Boolean> noMessagesWhilePaused = spy.interceptNoMsg(15, TimeUnit.SECONDS);
+        Futures.addCallback(noMessagesWhilePaused, new MoreFutures.SuccessCallback<Boolean>()
+        {
+            public void onSuccess(@Nullable Boolean aBoolean)
+            {
+                HintsService.instance.resumeDispatch();
+            }
+        });
+
+        Futures.allAsList(
+                noMessagesWhilePaused,
+                spy.interceptMessageOut(100),
+                spy.interceptNoMsg(200, TimeUnit.MILLISECONDS)
+        ).get();
+    }
+
+    @Test
+    public void testPageRetry() throws InterruptedException, ExecutionException, TimeoutException
+    {
+        // create spy for hint messages, but only create responses for 5 hints
+        MockMessagingSpy spy = sendHintsAndResponses(20, 5);
+
+        Futures.allAsList(
+                // the dispatcher will always send all hints within the current page
+                // and only wait for the acks before going to the next page
+                spy.interceptMessageOut(20),
+                spy.interceptNoMsg(200, TimeUnit.MILLISECONDS),
+
+                // next tick will trigger a retry of the same page as we only replied with 5/20 acks
+                spy.interceptMessageOut(20)
+        ).get();
+
+        // marking the destination node as dead should stop sending hints
+        failureDetector.isAlive = false;
+        spy.interceptNoMsg(20, TimeUnit.SECONDS).get();
+    }
+
+    @Test
+    public void testPageSeek() throws InterruptedException, ExecutionException
+    {
+        // create spy for hint messages, stop replying after 12k (should be on 3rd page)
+        MockMessagingSpy spy = sendHintsAndResponses(20000, 12000);
+
+        // At this point the dispatcher will constantly retry the page we stopped acking,
+        // thus we receive the same hints from the page multiple times and in total more than
+        // all written hints. Lets just consume them for a while and then pause the dispatcher.
+        spy.interceptMessageOut(22000).get();
+        HintsService.instance.pauseDispatch();
+        Thread.sleep(1000);
+
+        // verify that we have a dispatch offset set for the page we're currently stuck at
+        HintsStore store = HintsService.instance.getCatalog().get(StorageService.instance.getLocalHostUUID());
+        HintsDescriptor descriptor = store.poll();
+        store.offerFirst(descriptor); // add again for cleanup during re-instanciation
+        InputPosition dispatchOffset = store.getDispatchOffset(descriptor);
+        assertTrue(dispatchOffset != null);
+        assertTrue(((ChecksummedDataInput.Position) dispatchOffset).sourcePosition > 0);
+    }
+
+    private MockMessagingSpy sendHintsAndResponses(int noOfHints, int noOfResponses)
+    {
+        // create spy for hint messages, but only create responses for noOfResponses hints
+        MessageIn<HintResponse> messageIn = MessageIn.create(FBUtilities.getBroadcastAddress(),
+                HintResponse.instance,
+                Collections.emptyMap(),
+                MessagingService.Verb.REQUEST_RESPONSE,
+                MessagingService.current_version,
+                MessageIn.createTimestamp());
+
+        MockMessagingSpy spy;
+        if (noOfResponses != -1)
+        {
+            spy = MockMessagingService.when(verb(MessagingService.Verb.HINT)).respondN(messageIn, noOfResponses);
+        }
+        else
+        {
+            spy = MockMessagingService.when(verb(MessagingService.Verb.HINT)).respond(messageIn);
+        }
+
+        // create and write noOfHints using service
+        UUID hostId = StorageService.instance.getLocalHostUUID();
+        for (int i = 0; i < noOfHints; i++)
+        {
+            long now = System.currentTimeMillis();
+            DecoratedKey dkey = dk(String.valueOf(i));
+            CFMetaData cfMetaData = Schema.instance.getCFMetaData(KEYSPACE, TABLE);
+            PartitionUpdate.SimpleBuilder builder = PartitionUpdate.simpleBuilder(cfMetaData, dkey).timestamp(now);
+            builder.row("column0").add("val", "value0");
+            Hint hint = Hint.create(builder.buildAsMutation(), now);
+            HintsService.instance.write(hostId, hint);
+        }
+        return spy;
+    }
+
+    private static class MockFailureDetector implements IFailureDetector
+    {
+        private boolean isAlive = true;
+
+        public boolean isAlive(InetAddress ep)
+        {
+            return isAlive;
+        }
+
+        public void interpret(InetAddress ep)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public void report(InetAddress ep)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public void registerFailureDetectionEventListener(IFailureDetectionEventListener listener)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public void unregisterFailureDetectionEventListener(IFailureDetectionEventListener listener)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public void remove(InetAddress ep)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public void forceConviction(InetAddress ep)
+        {
+            throw new UnsupportedOperationException();
+        }
+    }
+}