You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2016/08/08 12:57:05 UTC
cassandra git commit: Restore resumable hints delivery
Repository: cassandra
Updated Branches:
refs/heads/trunk d5c53d2bb -> cee22ad54
Restore resumable hints delivery
patch by Stefan Podkowinski and Branimir Lambov; reviewed by Branimir Lambov for CASSANDRA-11960
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cee22ad5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cee22ad5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cee22ad5
Branch: refs/heads/trunk
Commit: cee22ad54d7c28aaf0868dd45aeb9b5b708b0e78
Parents: d5c53d2
Author: Stefan Podkowinski <s....@gmail.com>
Authored: Thu Jun 16 16:31:54 2016 +0200
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Mon Aug 8 13:56:57 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/hints/ChecksummedDataInput.java | 53 +++-
.../hints/CompressedChecksummedDataInput.java | 51 +++-
.../hints/EncryptedChecksummedDataInput.java | 50 +++-
.../cassandra/hints/HintsDispatchExecutor.java | 21 +-
.../apache/cassandra/hints/HintsDispatcher.java | 48 ++--
.../org/apache/cassandra/hints/HintsReader.java | 34 +--
.../apache/cassandra/hints/HintsService.java | 27 +-
.../org/apache/cassandra/hints/HintsStore.java | 14 +-
.../apache/cassandra/hints/InputPosition.java | 9 +
.../cassandra/security/EncryptionUtils.java | 5 +
.../cassandra/service/StorageService.java | 2 +-
.../apache/cassandra/hints/AlteredHints.java | 17 ++
.../cassandra/hints/HintsCatalogTest.java | 31 +++
.../cassandra/hints/HintsServiceTest.java | 254 +++++++++++++++++++
15 files changed, 540 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cee22ad5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 15e5001..6fdc04a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -32,6 +32,7 @@
3.9
+ * Restore resumable hints delivery (CASSANDRA-11960)
* Fix nodetool tablestats miss SSTable count (CASSANDRA-12205)
* Fixed flacky SSTablesIteratedTest (CASSANDRA-12282)
* Fixed flacky SSTableRewriterTest: check file counts before calling validateCFS (CASSANDRA-12348)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cee22ad5/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
index 8bb5b6d..0db95af 100644
--- a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
+++ b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
@@ -26,18 +26,19 @@ import com.google.common.base.Preconditions;
import org.apache.cassandra.io.compress.BufferType;
import org.apache.cassandra.io.util.*;
+import org.apache.cassandra.utils.CLibrary;
import org.apache.cassandra.utils.memory.BufferPool;
/**
- * A {@link RandomAccessReader} wrapper that calctulates the CRC in place.
+ * A {@link RandomAccessReader} wrapper that calculates the CRC in place.
*
* Useful for {@link org.apache.cassandra.hints.HintsReader}, for example, where we must verify the CRC, yet don't want
* to allocate an extra byte array just that purpose. The CRC can be embedded in the input stream and checked via checkCrc().
*
- * In addition to calculating the CRC, it allows to enforce a maximim known size. This is needed
+ * In addition to calculating the CRC, it allows to enforce a maximum known size. This is needed
* so that {@link org.apache.cassandra.db.Mutation.MutationSerializer} doesn't blow up the heap when deserializing a
* corrupted sequence by reading a huge corrupted length of bytes via
- * via {@link org.apache.cassandra.utils.ByteBufferUtil#readWithLength(java.io.DataInput)}.
+ * {@link org.apache.cassandra.utils.ByteBufferUtil#readWithLength(java.io.DataInput)}.
*/
public class ChecksummedDataInput extends RebufferingInputStream
{
@@ -81,13 +82,37 @@ public class ChecksummedDataInput extends RebufferingInputStream
return getPosition() == channel.size();
}
+ static class Position implements InputPosition
+ {
+ final long sourcePosition;
+
+ public Position(long sourcePosition)
+ {
+ super();
+ this.sourcePosition = sourcePosition;
+ }
+
+ @Override
+ public long subtract(InputPosition other)
+ {
+ return sourcePosition - ((Position)other).sourcePosition;
+ }
+ }
+
/**
- * Returns the position in the source file, which is different for getPosition() for compressed/encrypted files
- * and may be imprecise.
+ * Return a seekable representation of the current position. For compressed files this is chunk position
+ * in file and offset within chunk.
*/
- public long getSourcePosition()
+ public InputPosition getSeekPosition()
{
- return getPosition();
+ return new Position(getPosition());
+ }
+
+ public void seek(InputPosition pos)
+ {
+ updateCrc();
+ bufferOffset = ((Position) pos).sourcePosition;
+ buffer.position(0).limit(0);
}
public void resetCrc()
@@ -110,6 +135,15 @@ public class ChecksummedDataInput extends RebufferingInputStream
return bufferOffset + buffer.position();
}
+ /**
+ * Returns the position in the source file, which is different for getPosition() for compressed/encrypted files
+ * and may be imprecise.
+ */
+ protected long getSourcePosition()
+ {
+ return bufferOffset;
+ }
+
public void resetLimit()
{
limit = Long.MAX_VALUE;
@@ -179,6 +213,11 @@ public class ChecksummedDataInput extends RebufferingInputStream
buffer.flip();
}
+ public void tryUncacheRead()
+ {
+ CLibrary.trySkipCache(getChannel().getFileDescriptor(), 0, getSourcePosition(), getPath());
+ }
+
private void updateCrc()
{
if (crcPosition == buffer.position() || crcUpdateDisabled)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cee22ad5/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java
index f584dd1..0766fa5 100644
--- a/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java
+++ b/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.cassandra.hints.ChecksummedDataInput.Position;
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.compress.ICompressor;
import org.apache.cassandra.io.util.ChannelProxy;
@@ -31,7 +32,8 @@ import org.apache.cassandra.utils.memory.BufferPool;
public final class CompressedChecksummedDataInput extends ChecksummedDataInput
{
private final ICompressor compressor;
- private volatile long filePosition = 0;
+ private volatile long filePosition = 0; // Current position in file, advanced when reading chunk.
+ private volatile long sourcePosition = 0; // Current position in file to report, advanced after consuming chunk.
private volatile ByteBuffer compressedBuffer = null;
private final ByteBuffer metadataBuffer = ByteBuffer.allocate(CompressedHintsWriter.METADATA_SIZE);
@@ -39,7 +41,7 @@ public final class CompressedChecksummedDataInput extends ChecksummedDataInput
{
super(channel, compressor.preferredBufferType());
this.compressor = compressor;
- this.filePosition = filePosition;
+ this.sourcePosition = this.filePosition = filePosition;
}
/**
@@ -53,12 +55,55 @@ public final class CompressedChecksummedDataInput extends ChecksummedDataInput
public long getSourcePosition()
{
- return filePosition;
+ return sourcePosition;
+ }
+
+ static class Position extends ChecksummedDataInput.Position
+ {
+ final long bufferStart;
+ final int bufferPosition;
+
+ public Position(long sourcePosition, long bufferStart, int bufferPosition)
+ {
+ super(sourcePosition);
+ this.bufferStart = bufferStart;
+ this.bufferPosition = bufferPosition;
+ }
+
+ @Override
+ public long subtract(InputPosition o)
+ {
+ Position other = (Position) o;
+ return bufferStart - other.bufferStart + bufferPosition - other.bufferPosition;
+ }
+ }
+
+ public InputPosition getSeekPosition()
+ {
+ return new Position(sourcePosition, bufferOffset, buffer.position());
+ }
+
+ public void seek(InputPosition p)
+ {
+ Position pos = (Position) p;
+ bufferOffset = pos.bufferStart;
+ filePosition = pos.sourcePosition;
+ buffer.position(0).limit(0);
+ resetCrc();
+ reBuffer();
+ buffer.position(pos.bufferPosition);
+ assert sourcePosition == pos.sourcePosition;
+ assert bufferOffset == pos.bufferStart;
+ assert buffer.position() == pos.bufferPosition;
}
@Override
protected void readBuffer()
{
+ sourcePosition = filePosition;
+ if (isEOF())
+ return;
+
metadataBuffer.clear();
channel.read(metadataBuffer, filePosition);
filePosition += CompressedHintsWriter.METADATA_SIZE;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cee22ad5/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java
index 7ecfbfe..b335226 100644
--- a/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java
+++ b/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java
@@ -24,6 +24,7 @@ import javax.crypto.Cipher;
import com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.security.EncryptionUtils;
+import org.apache.cassandra.hints.CompressedChecksummedDataInput.Position;
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.compress.ICompressor;
import org.apache.cassandra.io.util.ChannelProxy;
@@ -42,6 +43,7 @@ public class EncryptedChecksummedDataInput extends ChecksummedDataInput
private final ICompressor compressor;
private final EncryptionUtils.ChannelProxyReadChannel readChannel;
+ private long sourcePosition;
protected EncryptedChecksummedDataInput(ChannelProxy channel, Cipher cipher, ICompressor compressor, long filePosition)
{
@@ -49,6 +51,7 @@ public class EncryptedChecksummedDataInput extends ChecksummedDataInput
this.cipher = cipher;
this.compressor = compressor;
readChannel = new EncryptionUtils.ChannelProxyReadChannel(channel, filePosition);
+ this.sourcePosition = filePosition;
assert cipher != null;
assert compressor != null;
}
@@ -59,17 +62,60 @@ public class EncryptedChecksummedDataInput extends ChecksummedDataInput
*/
public boolean isEOF()
{
- return getSourcePosition() == channel.size() && buffer.remaining() == 0;
+ return readChannel.getCurrentPosition() == channel.size() && buffer.remaining() == 0;
}
public long getSourcePosition()
{
- return readChannel.getCurrentPosition();
+ return sourcePosition;
+ }
+
+ static class Position extends ChecksummedDataInput.Position
+ {
+ final long bufferStart;
+ final int bufferPosition;
+
+ public Position(long sourcePosition, long bufferStart, int bufferPosition)
+ {
+ super(sourcePosition);
+ this.bufferStart = bufferStart;
+ this.bufferPosition = bufferPosition;
+ }
+
+ @Override
+ public long subtract(InputPosition o)
+ {
+ Position other = (Position) o;
+ return bufferStart - other.bufferStart + bufferPosition - other.bufferPosition;
+ }
+ }
+
+ public InputPosition getSeekPosition()
+ {
+ return new Position(sourcePosition, bufferOffset, buffer.position());
+ }
+
+ public void seek(InputPosition p)
+ {
+ Position pos = (Position) p;
+ bufferOffset = pos.bufferStart;
+ readChannel.setPosition(pos.sourcePosition);
+ buffer.position(0).limit(0);
+ resetCrc();
+ reBuffer();
+ buffer.position(pos.bufferPosition);
+ assert sourcePosition == pos.sourcePosition;
+ assert bufferOffset == pos.bufferStart;
+ assert buffer.position() == pos.bufferPosition;
}
@Override
protected void readBuffer()
{
+ this.sourcePosition = readChannel.getCurrentPosition();
+ if (isEOF())
+ return;
+
try
{
ByteBuffer byteBuffer = reusableBuffers.get();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cee22ad5/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
index 5292dc1..d7ccf81 100644
--- a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
+++ b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
@@ -23,6 +23,8 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BooleanSupplier;
+import java.util.function.Function;
import java.util.function.Supplier;
import com.google.common.util.concurrent.RateLimiter;
@@ -48,12 +50,14 @@ final class HintsDispatchExecutor
private final File hintsDirectory;
private final ExecutorService executor;
private final AtomicBoolean isPaused;
+ private final Function<InetAddress, Boolean> isAlive;
private final Map<UUID, Future> scheduledDispatches;
- HintsDispatchExecutor(File hintsDirectory, int maxThreads, AtomicBoolean isPaused)
+ HintsDispatchExecutor(File hintsDirectory, int maxThreads, AtomicBoolean isPaused, Function<InetAddress, Boolean> isAlive)
{
this.hintsDirectory = hintsDirectory;
this.isPaused = isPaused;
+ this.isAlive = isAlive;
scheduledDispatches = new ConcurrentHashMap<>();
executor = new JMXEnabledThreadPoolExecutor(1,
@@ -72,6 +76,14 @@ final class HintsDispatchExecutor
{
scheduledDispatches.clear();
executor.shutdownNow();
+ try
+ {
+ executor.awaitTermination(1, TimeUnit.MINUTES);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
}
boolean isScheduled(HintsStore store)
@@ -249,9 +261,10 @@ final class HintsDispatchExecutor
private boolean deliver(HintsDescriptor descriptor, InetAddress address)
{
File file = new File(hintsDirectory, descriptor.fileName());
- Long offset = store.getDispatchOffset(descriptor).orElse(null);
+ InputPosition offset = store.getDispatchOffset(descriptor);
- try (HintsDispatcher dispatcher = HintsDispatcher.create(file, rateLimiter, address, descriptor.hostId, isPaused))
+ BooleanSupplier shouldAbort = () -> !isAlive.apply(address) || isPaused.get();
+ try (HintsDispatcher dispatcher = HintsDispatcher.create(file, rateLimiter, address, descriptor.hostId, shouldAbort))
{
if (offset != null)
dispatcher.seek(offset);
@@ -265,7 +278,7 @@ final class HintsDispatchExecutor
}
else
{
- store.markDispatchOffset(descriptor, dispatcher.dispatchOffset());
+ store.markDispatchOffset(descriptor, dispatcher.dispatchPosition());
store.offerFirst(descriptor);
logger.info("Finished hinted handoff of file {} to endpoint {}, partially", descriptor.fileName(), hostId);
return false;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cee22ad5/src/java/org/apache/cassandra/hints/HintsDispatcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatcher.java b/src/java/org/apache/cassandra/hints/HintsDispatcher.java
index e582d88..00ef52b 100644
--- a/src/java/org/apache/cassandra/hints/HintsDispatcher.java
+++ b/src/java/org/apache/cassandra/hints/HintsDispatcher.java
@@ -22,13 +22,12 @@ import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BooleanSupplier;
import java.util.function.Function;
import com.google.common.util.concurrent.RateLimiter;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.net.IAsyncCallbackWithFailure;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessagingService;
@@ -42,31 +41,31 @@ import org.apache.cassandra.utils.concurrent.SimpleCondition;
*/
final class HintsDispatcher implements AutoCloseable
{
- private enum Action { CONTINUE, ABORT, RETRY }
+ private enum Action { CONTINUE, ABORT }
private final HintsReader reader;
private final UUID hostId;
private final InetAddress address;
private final int messagingVersion;
- private final AtomicBoolean isPaused;
+ private final BooleanSupplier abortRequested;
- private long currentPageOffset;
+ private InputPosition currentPagePosition;
- private HintsDispatcher(HintsReader reader, UUID hostId, InetAddress address, int messagingVersion, AtomicBoolean isPaused)
+ private HintsDispatcher(HintsReader reader, UUID hostId, InetAddress address, int messagingVersion, BooleanSupplier abortRequested)
{
- currentPageOffset = 0L;
+ currentPagePosition = null;
this.reader = reader;
this.hostId = hostId;
this.address = address;
this.messagingVersion = messagingVersion;
- this.isPaused = isPaused;
+ this.abortRequested = abortRequested;
}
- static HintsDispatcher create(File file, RateLimiter rateLimiter, InetAddress address, UUID hostId, AtomicBoolean isPaused)
+ static HintsDispatcher create(File file, RateLimiter rateLimiter, InetAddress address, UUID hostId, BooleanSupplier abortRequested)
{
int messagingVersion = MessagingService.instance().getVersion(address);
- return new HintsDispatcher(HintsReader.open(file, rateLimiter), hostId, address, messagingVersion, isPaused);
+ return new HintsDispatcher(HintsReader.open(file, rateLimiter), hostId, address, messagingVersion, abortRequested);
}
public void close()
@@ -74,10 +73,9 @@ final class HintsDispatcher implements AutoCloseable
reader.close();
}
- void seek(long bytes)
+ void seek(InputPosition position)
{
- reader.seek(bytes);
- currentPageOffset = 0L;
+ reader.seek(position);
}
/**
@@ -87,7 +85,7 @@ final class HintsDispatcher implements AutoCloseable
{
for (HintsReader.Page page : reader)
{
- currentPageOffset = page.offset;
+ currentPagePosition = page.position;
if (dispatch(page) != Action.CONTINUE)
return false;
}
@@ -98,28 +96,16 @@ final class HintsDispatcher implements AutoCloseable
/**
* @return offset of the first non-delivered page
*/
- long dispatchOffset()
+ InputPosition dispatchPosition()
{
- return currentPageOffset;
+ return currentPagePosition;
}
- private boolean isHostAlive()
- {
- return FailureDetector.instance.isAlive(address);
- }
-
- private boolean isPaused()
- {
- return isPaused.get();
- }
// retry in case of a timeout; stop in case of a failure, host going down, or delivery paused
private Action dispatch(HintsReader.Page page)
{
- Action action = sendHintsAndAwait(page);
- return action == Action.RETRY
- ? dispatch(page)
- : action;
+ return sendHintsAndAwait(page);
}
private Action sendHintsAndAwait(HintsReader.Page page)
@@ -142,7 +128,7 @@ final class HintsDispatcher implements AutoCloseable
for (Callback cb : callbacks)
if (cb.await() != Callback.Outcome.SUCCESS)
- return Action.RETRY;
+ return Action.ABORT;
return Action.CONTINUE;
}
@@ -155,7 +141,7 @@ final class HintsDispatcher implements AutoCloseable
{
while (hints.hasNext())
{
- if (!isHostAlive() || isPaused())
+ if (abortRequested.getAsBoolean())
return Action.ABORT;
callbacks.add(sendFunction.apply(hints.next()));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cee22ad5/src/java/org/apache/cassandra/hints/HintsReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsReader.java b/src/java/org/apache/cassandra/hints/HintsReader.java
index 5e73805..e0a73c1 100644
--- a/src/java/org/apache/cassandra/hints/HintsReader.java
+++ b/src/java/org/apache/cassandra/hints/HintsReader.java
@@ -109,9 +109,9 @@ class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
return descriptor;
}
- void seek(long newPosition)
+ void seek(InputPosition newPosition)
{
- throw new UnsupportedOperationException("Hints are not seekable.");
+ input.seek(newPosition);
}
public Iterator<Page> iterator()
@@ -126,21 +126,21 @@ class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
final class Page
{
- public final long offset;
+ public final InputPosition position;
- private Page(long offset)
+ private Page(InputPosition inputPosition)
{
- this.offset = offset;
+ this.position = inputPosition;
}
Iterator<Hint> hintsIterator()
{
- return new HintsIterator(offset);
+ return new HintsIterator(position);
}
Iterator<ByteBuffer> buffersIterator()
{
- return new BuffersIterator(offset);
+ return new BuffersIterator(position);
}
}
@@ -149,12 +149,12 @@ class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
@SuppressWarnings("resource")
protected Page computeNext()
{
- CLibrary.trySkipCache(input.getChannel().getFileDescriptor(), 0, input.getSourcePosition(), input.getPath());
+ input.tryUncacheRead();
if (input.isEOF())
return endOfData();
- return new Page(input.getSourcePosition());
+ return new Page(input.getSeekPosition());
}
}
@@ -163,9 +163,9 @@ class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
*/
final class HintsIterator extends AbstractIterator<Hint>
{
- private final long offset;
+ private final InputPosition offset;
- HintsIterator(long offset)
+ HintsIterator(InputPosition offset)
{
super();
this.offset = offset;
@@ -177,12 +177,12 @@ class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
do
{
- long position = input.getSourcePosition();
+ InputPosition position = input.getSeekPosition();
if (input.isEOF())
return endOfData(); // reached EOF
- if (position - offset >= PAGE_SIZE)
+ if (position.subtract(offset) >= PAGE_SIZE)
return endOfData(); // read page size or more bytes
try
@@ -253,9 +253,9 @@ class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
*/
final class BuffersIterator extends AbstractIterator<ByteBuffer>
{
- private final long offset;
+ private final InputPosition offset;
- BuffersIterator(long offset)
+ BuffersIterator(InputPosition offset)
{
super();
this.offset = offset;
@@ -267,12 +267,12 @@ class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
do
{
- long position = input.getSourcePosition();
+ InputPosition position = input.getSeekPosition();
if (input.isEOF())
return endOfData(); // reached EOF
- if (position - offset >= PAGE_SIZE)
+ if (position.subtract(offset) >= PAGE_SIZE)
return endOfData(); // read page size or more bytes
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cee22ad5/src/java/org/apache/cassandra/hints/HintsService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsService.java b/src/java/org/apache/cassandra/hints/HintsService.java
index 5a32786..1a3a403 100644
--- a/src/java/org/apache/cassandra/hints/HintsService.java
+++ b/src/java/org/apache/cassandra/hints/HintsService.java
@@ -30,6 +30,7 @@ import java.util.function.Supplier;
import javax.management.MBeanServer;
import javax.management.ObjectName;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,6 +38,8 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.gms.IFailureDetector;
import org.apache.cassandra.metrics.HintedHandoffMetrics;
import org.apache.cassandra.metrics.StorageMetrics;
import org.apache.cassandra.dht.Token;
@@ -60,7 +63,7 @@ public final class HintsService implements HintsServiceMBean
{
private static final Logger logger = LoggerFactory.getLogger(HintsService.class);
- public static final HintsService instance = new HintsService();
+ public static HintsService instance = new HintsService();
private static final String MBEAN_NAME = "org.apache.cassandra.hints:type=HintsService";
@@ -82,6 +85,12 @@ public final class HintsService implements HintsServiceMBean
private HintsService()
{
+ this(FailureDetector.instance);
+ }
+
+ @VisibleForTesting
+ HintsService(IFailureDetector failureDetector)
+ {
File hintsDirectory = DatabaseDescriptor.getHintsDirectory();
int maxDeliveryThreads = DatabaseDescriptor.getMaxHintsDeliveryThreads();
@@ -92,7 +101,7 @@ public final class HintsService implements HintsServiceMBean
bufferPool = new HintsBufferPool(bufferSize, writeExecutor::flushBuffer);
isDispatchPaused = new AtomicBoolean(true);
- dispatchExecutor = new HintsDispatchExecutor(hintsDirectory, maxDeliveryThreads, isDispatchPaused);
+ dispatchExecutor = new HintsDispatchExecutor(hintsDirectory, maxDeliveryThreads, isDispatchPaused, failureDetector::isAlive);
// periodically empty the current content of the buffers
int flushPeriod = DatabaseDescriptor.getHintsFlushPeriodInMS();
@@ -225,7 +234,7 @@ public final class HintsService implements HintsServiceMBean
* Will abort dispatch sessions that are currently in progress (which is okay, it's idempotent),
* and make sure the buffers are flushed, hints files written and fsynced.
*/
- public synchronized void shutdownBlocking()
+ public synchronized void shutdownBlocking() throws ExecutionException, InterruptedException
{
if (isShutDown)
throw new IllegalStateException("HintsService has already been shut down");
@@ -237,8 +246,8 @@ public final class HintsService implements HintsServiceMBean
triggerFlushingFuture.cancel(false);
- writeExecutor.flushBufferPool(bufferPool);
- writeExecutor.closeAllWriters();
+ writeExecutor.flushBufferPool(bufferPool).get();
+ writeExecutor.closeAllWriters().get();
dispatchExecutor.shutdownBlocking();
writeExecutor.shutdownBlocking();
@@ -369,4 +378,12 @@ public final class HintsService implements HintsServiceMBean
{
return catalog;
}
+
+ /**
+ * Returns true in case service is shut down.
+ */
+ public boolean isShutDown()
+ {
+ return isShutDown;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cee22ad5/src/java/org/apache/cassandra/hints/HintsStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsStore.java b/src/java/org/apache/cassandra/hints/HintsStore.java
index c066331..3572172 100644
--- a/src/java/org/apache/cassandra/hints/HintsStore.java
+++ b/src/java/org/apache/cassandra/hints/HintsStore.java
@@ -49,7 +49,7 @@ final class HintsStore
private final File hintsDirectory;
private final ImmutableMap<String, Object> writerParams;
- private final Map<HintsDescriptor, Long> dispatchOffsets;
+ private final Map<HintsDescriptor, InputPosition> dispatchPositions;
private final Deque<HintsDescriptor> dispatchDequeue;
private final Queue<HintsDescriptor> blacklistedFiles;
@@ -63,7 +63,7 @@ final class HintsStore
this.hintsDirectory = hintsDirectory;
this.writerParams = writerParams;
- dispatchOffsets = new ConcurrentHashMap<>();
+ dispatchPositions = new ConcurrentHashMap<>();
dispatchDequeue = new ConcurrentLinkedDeque<>(descriptors);
blacklistedFiles = new ConcurrentLinkedQueue<>();
@@ -136,19 +136,19 @@ final class HintsStore
return !dispatchDequeue.isEmpty();
}
- Optional<Long> getDispatchOffset(HintsDescriptor descriptor)
+ InputPosition getDispatchOffset(HintsDescriptor descriptor)
{
- return Optional.ofNullable(dispatchOffsets.get(descriptor));
+ return dispatchPositions.get(descriptor);
}
- void markDispatchOffset(HintsDescriptor descriptor, long mark)
+ void markDispatchOffset(HintsDescriptor descriptor, InputPosition inputPosition)
{
- dispatchOffsets.put(descriptor, mark);
+ dispatchPositions.put(descriptor, inputPosition);
}
void cleanUp(HintsDescriptor descriptor)
{
- dispatchOffsets.remove(descriptor);
+ dispatchPositions.remove(descriptor);
}
void blacklist(HintsDescriptor descriptor)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cee22ad5/src/java/org/apache/cassandra/hints/InputPosition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/InputPosition.java b/src/java/org/apache/cassandra/hints/InputPosition.java
new file mode 100644
index 0000000..05f9db0
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/InputPosition.java
@@ -0,0 +1,9 @@
+package org.apache.cassandra.hints;
+
+/**
+ * Marker interface for file positions as provided by the various ChecksummedDataReader implementations.
+ */
+public interface InputPosition
+{
+ long subtract(InputPosition other);
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cee22ad5/src/java/org/apache/cassandra/security/EncryptionUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/security/EncryptionUtils.java b/src/java/org/apache/cassandra/security/EncryptionUtils.java
index 7e72b3e..b262259 100644
--- a/src/java/org/apache/cassandra/security/EncryptionUtils.java
+++ b/src/java/org/apache/cassandra/security/EncryptionUtils.java
@@ -309,5 +309,10 @@ public class EncryptionUtils
{
// nop
}
+
+ public void setPosition(long sourcePosition)
+ {
+ this.currentPosition = sourcePosition;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cee22ad5/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 0a95827..e3b4752 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -570,7 +570,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
drainOnShutdown = new Thread(new WrappedRunnable()
{
@Override
- public void runMayThrow() throws InterruptedException
+ public void runMayThrow() throws InterruptedException, ExecutionException
{
inShutdownHook = true;
ExecutorService viewMutationStage = StageManager.getStage(Stage.VIEW_MUTATION);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cee22ad5/test/unit/org/apache/cassandra/hints/AlteredHints.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/hints/AlteredHints.java b/test/unit/org/apache/cassandra/hints/AlteredHints.java
index 23dc32a..7efe08f 100644
--- a/test/unit/org/apache/cassandra/hints/AlteredHints.java
+++ b/test/unit/org/apache/cassandra/hints/AlteredHints.java
@@ -107,9 +107,11 @@ public abstract class AlteredHints
{
Assert.assertTrue(looksLegit(reader.getInput()));
List<Hint> deserialized = new ArrayList<>(hintNum);
+ List<InputPosition> pagePositions = new ArrayList<>(hintNum);
for (HintsReader.Page page: reader)
{
+ pagePositions.add(page.position);
Iterator<Hint> iterator = page.hintsIterator();
while (iterator.hasNext())
{
@@ -124,6 +126,21 @@ public abstract class AlteredHints
HintsTestUtil.assertHintsEqual(expected, deserialized.get(hintNum));
hintNum++;
}
+
+ // explicitely seek to each page by iterating collected page positions and check if hints still match as expected
+ int hintOffset = 0;
+ for (InputPosition pos : pagePositions)
+ {
+ reader.seek(pos);
+ HintsReader.Page page = reader.iterator().next();
+ Iterator<Hint> iterator = page.hintsIterator();
+ while (iterator.hasNext())
+ {
+ Hint seekedHint = iterator.next();
+ HintsTestUtil.assertHintsEqual(hints.get(hintOffset), seekedHint);
+ hintOffset++;
+ }
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cee22ad5/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java b/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java
index 51b6aa3..a255338 100644
--- a/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java
+++ b/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java
@@ -79,6 +79,37 @@ public class HintsCatalogTest
assertNull(store2.poll());
}
+ @Test
+ public void deleteHintsTest() throws IOException
+ {
+ File directory = Files.createTempDirectory(null).toFile();
+ UUID hostId1 = UUID.randomUUID();
+ UUID hostId2 = UUID.randomUUID();
+ long now = System.currentTimeMillis();
+ writeDescriptor(directory, new HintsDescriptor(hostId1, now));
+ writeDescriptor(directory, new HintsDescriptor(hostId1, now+1));
+ writeDescriptor(directory, new HintsDescriptor(hostId2, now+2));
+ writeDescriptor(directory, new HintsDescriptor(hostId2, now+3));
+
+ // load catalog containing two stores (one for each host)
+ HintsCatalog catalog = HintsCatalog.load(directory, ImmutableMap.of());
+ assertEquals(2, catalog.stores().count());
+ assertTrue(catalog.hasFiles());
+
+ // delete all hints from store 1
+ assertTrue(catalog.get(hostId1).hasFiles());
+ catalog.deleteAllHints(hostId1);
+ assertFalse(catalog.get(hostId1).hasFiles());
+ // stores are still keepts for each host, even after deleting hints
+ assertEquals(2, catalog.stores().count());
+ assertTrue(catalog.hasFiles());
+
+ // delete all hints from all stores
+ catalog.deleteAllHints();
+ assertEquals(2, catalog.stores().count());
+ assertFalse(catalog.hasFiles());
+ }
+
@SuppressWarnings("EmptyTryBlock")
private static void writeDescriptor(File directory, HintsDescriptor descriptor) throws IOException
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cee22ad5/test/unit/org/apache/cassandra/hints/HintsServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/hints/HintsServiceTest.java b/test/unit/org/apache/cassandra/hints/HintsServiceTest.java
new file mode 100644
index 0000000..ffb7f73
--- /dev/null
+++ b/test/unit/org/apache/cassandra/hints/HintsServiceTest.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.net.InetAddress;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import javax.annotation.Nullable;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.datastax.driver.core.utils.MoreFutures;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.gms.IFailureDetectionEventListener;
+import org.apache.cassandra.gms.IFailureDetector;
+import org.apache.cassandra.metrics.StorageMetrics;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.MockMessagingService;
+import org.apache.cassandra.net.MockMessagingSpy;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.apache.cassandra.Util.dk;
+import static org.apache.cassandra.net.MockMessagingService.verb;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class HintsServiceTest
+{
+ private static final String KEYSPACE = "hints_service_test";
+ private static final String TABLE = "table";
+
+ private final MockFailureDetector failureDetector = new MockFailureDetector();
+
+ @BeforeClass
+ public static void defineSchema()
+ {
+ SchemaLoader.prepareServer();
+ StorageService.instance.initServer();
+ SchemaLoader.createKeyspace(KEYSPACE,
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD(KEYSPACE, TABLE));
+ }
+
+ @After
+ public void cleanup()
+ {
+ MockMessagingService.cleanup();
+ }
+
+ @Before
+ public void reinstanciateService() throws ExecutionException, InterruptedException
+ {
+ MessagingService.instance().clearMessageSinks();
+
+ if (!HintsService.instance.isShutDown())
+ {
+ HintsService.instance.shutdownBlocking();
+ HintsService.instance.deleteAllHints();
+ }
+
+ failureDetector.isAlive = true;
+ HintsService.instance = new HintsService(failureDetector);
+ HintsService.instance.startDispatch();
+ }
+
+ @Test
+ public void testDispatchHints() throws InterruptedException, ExecutionException
+ {
+ long cnt = StorageMetrics.totalHints.getCount();
+
+ // create spy for hint messages
+ MockMessagingSpy spy = sendHintsAndResponses(100, -1);
+
+ // metrics should have been updated with number of create hints
+ assertEquals(cnt + 100, StorageMetrics.totalHints.getCount());
+
+ // wait until hints have been send
+ spy.interceptMessageOut(100).get();
+ spy.interceptNoMsg(500, TimeUnit.MILLISECONDS).get();
+ }
+
+ @Test
+ public void testPauseAndResume() throws InterruptedException, ExecutionException
+ {
+ HintsService.instance.pauseDispatch();
+
+ // create spy for hint messages
+ MockMessagingSpy spy = sendHintsAndResponses(100, -1);
+
+ // we should not send any hints while paused
+ ListenableFuture<Boolean> noMessagesWhilePaused = spy.interceptNoMsg(15, TimeUnit.SECONDS);
+ Futures.addCallback(noMessagesWhilePaused, new MoreFutures.SuccessCallback<Boolean>()
+ {
+ public void onSuccess(@Nullable Boolean aBoolean)
+ {
+ HintsService.instance.resumeDispatch();
+ }
+ });
+
+ Futures.allAsList(
+ noMessagesWhilePaused,
+ spy.interceptMessageOut(100),
+ spy.interceptNoMsg(200, TimeUnit.MILLISECONDS)
+ ).get();
+ }
+
+ @Test
+ public void testPageRetry() throws InterruptedException, ExecutionException, TimeoutException
+ {
+ // create spy for hint messages, but only create responses for 5 hints
+ MockMessagingSpy spy = sendHintsAndResponses(20, 5);
+
+ Futures.allAsList(
+ // the dispatcher will always send all hints within the current page
+ // and only wait for the acks before going to the next page
+ spy.interceptMessageOut(20),
+ spy.interceptNoMsg(200, TimeUnit.MILLISECONDS),
+
+ // next tick will trigger a retry of the same page as we only replied with 5/20 acks
+ spy.interceptMessageOut(20)
+ ).get();
+
+ // marking the destination node as dead should stop sending hints
+ failureDetector.isAlive = false;
+ spy.interceptNoMsg(20, TimeUnit.SECONDS).get();
+ }
+
+ @Test
+ public void testPageSeek() throws InterruptedException, ExecutionException
+ {
+ // create spy for hint messages, stop replying after 12k (should be on 3rd page)
+ MockMessagingSpy spy = sendHintsAndResponses(20000, 12000);
+
+ // At this point the dispatcher will constantly retry the page we stopped acking,
+ // thus we receive the same hints from the page multiple times and in total more than
+ // all written hints. Lets just consume them for a while and then pause the dispatcher.
+ spy.interceptMessageOut(22000).get();
+ HintsService.instance.pauseDispatch();
+ Thread.sleep(1000);
+
+ // verify that we have a dispatch offset set for the page we're currently stuck at
+ HintsStore store = HintsService.instance.getCatalog().get(StorageService.instance.getLocalHostUUID());
+ HintsDescriptor descriptor = store.poll();
+ store.offerFirst(descriptor); // add again for cleanup during re-instanciation
+ InputPosition dispatchOffset = store.getDispatchOffset(descriptor);
+ assertTrue(dispatchOffset != null);
+ assertTrue(((ChecksummedDataInput.Position) dispatchOffset).sourcePosition > 0);
+ }
+
+ private MockMessagingSpy sendHintsAndResponses(int noOfHints, int noOfResponses)
+ {
+ // create spy for hint messages, but only create responses for noOfResponses hints
+ MessageIn<HintResponse> messageIn = MessageIn.create(FBUtilities.getBroadcastAddress(),
+ HintResponse.instance,
+ Collections.emptyMap(),
+ MessagingService.Verb.REQUEST_RESPONSE,
+ MessagingService.current_version,
+ MessageIn.createTimestamp());
+
+ MockMessagingSpy spy;
+ if (noOfResponses != -1)
+ {
+ spy = MockMessagingService.when(verb(MessagingService.Verb.HINT)).respondN(messageIn, noOfResponses);
+ }
+ else
+ {
+ spy = MockMessagingService.when(verb(MessagingService.Verb.HINT)).respond(messageIn);
+ }
+
+ // create and write noOfHints using service
+ UUID hostId = StorageService.instance.getLocalHostUUID();
+ for (int i = 0; i < noOfHints; i++)
+ {
+ long now = System.currentTimeMillis();
+ DecoratedKey dkey = dk(String.valueOf(i));
+ CFMetaData cfMetaData = Schema.instance.getCFMetaData(KEYSPACE, TABLE);
+ PartitionUpdate.SimpleBuilder builder = PartitionUpdate.simpleBuilder(cfMetaData, dkey).timestamp(now);
+ builder.row("column0").add("val", "value0");
+ Hint hint = Hint.create(builder.buildAsMutation(), now);
+ HintsService.instance.write(hostId, hint);
+ }
+ return spy;
+ }
+
+ private static class MockFailureDetector implements IFailureDetector
+ {
+ private boolean isAlive = true;
+
+ public boolean isAlive(InetAddress ep)
+ {
+ return isAlive;
+ }
+
+ public void interpret(InetAddress ep)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void report(InetAddress ep)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void registerFailureDetectionEventListener(IFailureDetectionEventListener listener)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void unregisterFailureDetectionEventListener(IFailureDetectionEventListener listener)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void remove(InetAddress ep)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void forceConviction(InetAddress ep)
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+}