You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2018/07/03 13:23:45 UTC
[6/9] cassandra git commit: Restore resumable hints delivery
Restore resumable hints delivery
Backport of CASSANDRA-11960
patch by Tommy Stendahl, Stefan Podkowinski and Branimir Lambov; reviewed by Aleksey Yeschenko for CASSANDRA-14419
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c4982587
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c4982587
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c4982587
Branch: refs/heads/trunk
Commit: c4982587bfe3cb6946daa2912fe46146edef7fbf
Parents: 1e478d3
Author: tommy stendahl <to...@ericsson.com>
Authored: Tue Jul 3 12:53:59 2018 +0200
Committer: Aleksey Yeshchenko <al...@apple.com>
Committed: Tue Jul 3 14:22:18 2018 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/hints/ChecksummedDataInput.java | 53 +++-
.../hints/CompressedChecksummedDataInput.java | 53 +++-
.../cassandra/hints/HintsDispatchExecutor.java | 21 +-
.../apache/cassandra/hints/HintsDispatcher.java | 39 +--
.../org/apache/cassandra/hints/HintsReader.java | 32 +--
.../apache/cassandra/hints/HintsService.java | 27 +-
.../org/apache/cassandra/hints/HintsStore.java | 14 +-
.../apache/cassandra/hints/InputPosition.java | 26 ++
.../cassandra/hints/HintsCatalogTest.java | 31 +++
.../cassandra/hints/HintsCompressionTest.java | 17 ++
.../cassandra/hints/HintsServiceTest.java | 261 +++++++++++++++++++
12 files changed, 511 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4982587/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d694f3b..ee95718 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.17
+ * Restore resumable hints delivery, backport CASSANDRA-11960 (CASSANDRA-14419)
* Always close RT markers returned by ReadCommand#executeLocally() (CASSANDRA-14515)
* Reverse order queries with range tombstones can cause data loss (CASSANDRA-14513)
* Fix regression of lagging commitlog flush log message (CASSANDRA-14451)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4982587/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
index 39f46a4..a78256b 100644
--- a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
+++ b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
@@ -25,17 +25,18 @@ import java.util.zip.CRC32;
import org.apache.cassandra.io.util.ChannelProxy;
import org.apache.cassandra.io.util.DataPosition;
import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.utils.NativeLibrary;
/**
- * A {@link RandomAccessReader} wrapper that calctulates the CRC in place.
+ * A {@link RandomAccessReader} wrapper that calculates the CRC in place.
*
* Useful for {@link org.apache.cassandra.hints.HintsReader}, for example, where we must verify the CRC, yet don't want
* to allocate an extra byte array just that purpose. The CRC can be embedded in the input stream and checked via checkCrc().
*
- * In addition to calculating the CRC, it allows to enforce a maximim known size. This is needed
+ * In addition to calculating the CRC, it allows to enforce a maximum known size. This is needed
* so that {@link org.apache.cassandra.db.Mutation.MutationSerializer} doesn't blow up the heap when deserializing a
* corrupted sequence by reading a huge corrupted length of bytes via
- * via {@link org.apache.cassandra.utils.ByteBufferUtil#readWithLength(java.io.DataInput)}.
+ * {@link org.apache.cassandra.utils.ByteBufferUtil#readWithLength(java.io.DataInput)}.
*/
public class ChecksummedDataInput extends RandomAccessReader.RandomAccessReaderWithOwnChannel
{
@@ -63,9 +64,37 @@ public class ChecksummedDataInput extends RandomAccessReader.RandomAccessReaderW
return new Builder(new ChannelProxy(file)).build();
}
- protected void releaseBuffer()
+ static class Position implements InputPosition
{
- super.releaseBuffer();
+ final long sourcePosition;
+
+ public Position(long sourcePosition)
+ {
+ super();
+ this.sourcePosition = sourcePosition;
+ }
+
+ @Override
+ public long subtract(InputPosition other)
+ {
+ return sourcePosition - ((Position)other).sourcePosition;
+ }
+ }
+
+ /**
+ * Return a seekable representation of the current position. For compressed files this is chunk position
+ * in file and offset within chunk.
+ */
+ public InputPosition getSeekPosition()
+ {
+ return new Position(getPosition());
+ }
+
+ public void seek(InputPosition pos)
+ {
+ updateCrc();
+ bufferOffset = ((Position) pos).sourcePosition;
+ buffer.position(0).limit(0);
}
public void resetCrc()
@@ -80,6 +109,15 @@ public class ChecksummedDataInput extends RandomAccessReader.RandomAccessReaderW
limitMark = mark();
}
+ /**
+ * Returns the position in the source file, which is different for getPosition() for compressed/encrypted files
+ * and may be imprecise.
+ */
+ protected long getSourcePosition()
+ {
+ return bufferOffset;
+ }
+
public void resetLimit()
{
limit = Long.MAX_VALUE;
@@ -141,6 +179,11 @@ public class ChecksummedDataInput extends RandomAccessReader.RandomAccessReaderW
crcPosition = buffer.position();
}
+ public void tryUncacheRead()
+ {
+ NativeLibrary.trySkipCache(getChannel().getFileDescriptor(), 0, getSourcePosition(), getPath());
+ }
+
private void updateCrc()
{
if (crcPosition == buffer.position() || crcUpdateDisabled)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4982587/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java
index cc4a6bd..c0de1cf 100644
--- a/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java
+++ b/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java
@@ -29,7 +29,8 @@ import org.apache.cassandra.utils.memory.BufferPool;
public final class CompressedChecksummedDataInput extends ChecksummedDataInput
{
private final ICompressor compressor;
- private volatile long filePosition = 0;
+ private volatile long filePosition = 0; // Current position in file, advanced when reading chunk.
+ private volatile long sourcePosition = 0; // Current position in file to report, advanced after consuming chunk.
private volatile ByteBuffer compressedBuffer = null;
private final ByteBuffer metadataBuffer = ByteBuffer.allocate(CompressedHintsWriter.METADATA_SIZE);
@@ -39,7 +40,7 @@ public final class CompressedChecksummedDataInput extends ChecksummedDataInput
assert regions == null; //mmapped regions are not supported
compressor = builder.compressor;
- filePosition = builder.position;
+ sourcePosition = filePosition = builder.position;
}
/**
@@ -51,8 +52,56 @@ public final class CompressedChecksummedDataInput extends ChecksummedDataInput
return filePosition == channel.size() && buffer.remaining() == 0;
}
+ public long getSourcePosition()
+ {
+ return sourcePosition;
+ }
+
+ static class Position extends ChecksummedDataInput.Position
+ {
+ final long bufferStart;
+ final int bufferPosition;
+
+ public Position(long sourcePosition, long bufferStart, int bufferPosition)
+ {
+ super(sourcePosition);
+ this.bufferStart = bufferStart;
+ this.bufferPosition = bufferPosition;
+ }
+
+ @Override
+ public long subtract(InputPosition o)
+ {
+ Position other = (Position) o;
+ return bufferStart - other.bufferStart + bufferPosition - other.bufferPosition;
+ }
+ }
+
+ public InputPosition getSeekPosition()
+ {
+ return new Position(sourcePosition, bufferOffset, buffer.position());
+ }
+
+ public void seek(InputPosition p)
+ {
+ Position pos = (Position) p;
+ bufferOffset = pos.bufferStart;
+ filePosition = pos.sourcePosition;
+ buffer.position(0).limit(0);
+ resetCrc();
+ reBuffer();
+ buffer.position(pos.bufferPosition);
+ assert sourcePosition == pos.sourcePosition;
+ assert bufferOffset == pos.bufferStart;
+ assert buffer.position() == pos.bufferPosition;
+ }
+
protected void reBufferStandard()
{
+ sourcePosition = filePosition;
+ if (isEOF())
+ return;
+
metadataBuffer.clear();
channel.read(metadataBuffer, filePosition);
filePosition += CompressedHintsWriter.METADATA_SIZE;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4982587/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
index c9e69f5..d6c28d4 100644
--- a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
+++ b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
@@ -23,6 +23,8 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BooleanSupplier;
+import java.util.function.Function;
import java.util.function.Supplier;
import com.google.common.util.concurrent.RateLimiter;
@@ -48,12 +50,14 @@ final class HintsDispatchExecutor
private final File hintsDirectory;
private final ExecutorService executor;
private final AtomicBoolean isPaused;
+ private final Function<InetAddress, Boolean> isAlive;
private final Map<UUID, Future> scheduledDispatches;
- HintsDispatchExecutor(File hintsDirectory, int maxThreads, AtomicBoolean isPaused)
+ HintsDispatchExecutor(File hintsDirectory, int maxThreads, AtomicBoolean isPaused, Function<InetAddress, Boolean> isAlive)
{
this.hintsDirectory = hintsDirectory;
this.isPaused = isPaused;
+ this.isAlive = isAlive;
scheduledDispatches = new ConcurrentHashMap<>();
executor = new JMXEnabledThreadPoolExecutor(maxThreads, 1, TimeUnit.MINUTES,
@@ -69,6 +73,14 @@ final class HintsDispatchExecutor
{
scheduledDispatches.clear();
executor.shutdownNow();
+ try
+ {
+ executor.awaitTermination(1, TimeUnit.MINUTES);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
}
boolean isScheduled(HintsStore store)
@@ -255,9 +267,10 @@ final class HintsDispatchExecutor
private boolean deliver(HintsDescriptor descriptor, InetAddress address)
{
File file = new File(hintsDirectory, descriptor.fileName());
- Long offset = store.getDispatchOffset(descriptor).orElse(null);
+ InputPosition offset = store.getDispatchOffset(descriptor);
- try (HintsDispatcher dispatcher = HintsDispatcher.create(file, rateLimiter, address, descriptor.hostId, isPaused))
+ BooleanSupplier shouldAbort = () -> !isAlive.apply(address) || isPaused.get();
+ try (HintsDispatcher dispatcher = HintsDispatcher.create(file, rateLimiter, address, descriptor.hostId, shouldAbort))
{
if (offset != null)
dispatcher.seek(offset);
@@ -271,7 +284,7 @@ final class HintsDispatchExecutor
}
else
{
- store.markDispatchOffset(descriptor, dispatcher.dispatchOffset());
+ store.markDispatchOffset(descriptor, dispatcher.dispatchPosition());
store.offerFirst(descriptor);
logger.info("Finished hinted handoff of file {} to endpoint {}: {}, partially", descriptor.fileName(), address, hostId);
return false;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4982587/src/java/org/apache/cassandra/hints/HintsDispatcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatcher.java b/src/java/org/apache/cassandra/hints/HintsDispatcher.java
index 351b3fa..db5f42f 100644
--- a/src/java/org/apache/cassandra/hints/HintsDispatcher.java
+++ b/src/java/org/apache/cassandra/hints/HintsDispatcher.java
@@ -22,7 +22,7 @@ import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BooleanSupplier;
import java.util.function.Function;
import com.google.common.util.concurrent.RateLimiter;
@@ -30,7 +30,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.net.IAsyncCallbackWithFailure;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessagingService;
@@ -52,25 +51,25 @@ final class HintsDispatcher implements AutoCloseable
private final UUID hostId;
private final InetAddress address;
private final int messagingVersion;
- private final AtomicBoolean isPaused;
+ private final BooleanSupplier abortRequested;
- private long currentPageOffset;
+ private InputPosition currentPagePosition;
- private HintsDispatcher(HintsReader reader, UUID hostId, InetAddress address, int messagingVersion, AtomicBoolean isPaused)
+ private HintsDispatcher(HintsReader reader, UUID hostId, InetAddress address, int messagingVersion, BooleanSupplier abortRequested)
{
- currentPageOffset = 0L;
+ currentPagePosition = null;
this.reader = reader;
this.hostId = hostId;
this.address = address;
this.messagingVersion = messagingVersion;
- this.isPaused = isPaused;
+ this.abortRequested = abortRequested;
}
- static HintsDispatcher create(File file, RateLimiter rateLimiter, InetAddress address, UUID hostId, AtomicBoolean isPaused)
+ static HintsDispatcher create(File file, RateLimiter rateLimiter, InetAddress address, UUID hostId, BooleanSupplier abortRequested)
{
int messagingVersion = MessagingService.instance().getVersion(address);
- return new HintsDispatcher(HintsReader.open(file, rateLimiter), hostId, address, messagingVersion, isPaused);
+ return new HintsDispatcher(HintsReader.open(file, rateLimiter), hostId, address, messagingVersion, abortRequested);
}
public void close()
@@ -78,10 +77,9 @@ final class HintsDispatcher implements AutoCloseable
reader.close();
}
- void seek(long bytes)
+ void seek(InputPosition position)
{
- reader.seek(bytes);
- currentPageOffset = 0L;
+ reader.seek(position);
}
/**
@@ -91,7 +89,7 @@ final class HintsDispatcher implements AutoCloseable
{
for (HintsReader.Page page : reader)
{
- currentPageOffset = page.offset;
+ currentPagePosition = page.position;
if (dispatch(page) != Action.CONTINUE)
return false;
}
@@ -102,20 +100,11 @@ final class HintsDispatcher implements AutoCloseable
/**
* @return offset of the first non-delivered page
*/
- long dispatchOffset()
+ InputPosition dispatchPosition()
{
- return currentPageOffset;
+ return currentPagePosition;
}
- private boolean isHostAlive()
- {
- return FailureDetector.instance.isAlive(address);
- }
-
- private boolean isPaused()
- {
- return isPaused.get();
- }
// retry in case of a timeout; stop in case of a failure, host going down, or delivery paused
private Action dispatch(HintsReader.Page page)
@@ -156,7 +145,7 @@ final class HintsDispatcher implements AutoCloseable
{
while (hints.hasNext())
{
- if (!isHostAlive() || isPaused())
+ if (abortRequested.getAsBoolean())
return Action.ABORT;
callbacks.add(sendFunction.apply(hints.next()));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4982587/src/java/org/apache/cassandra/hints/HintsReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsReader.java b/src/java/org/apache/cassandra/hints/HintsReader.java
index 973a15e..7003e04 100644
--- a/src/java/org/apache/cassandra/hints/HintsReader.java
+++ b/src/java/org/apache/cassandra/hints/HintsReader.java
@@ -109,7 +109,7 @@ class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
return descriptor;
}
- void seek(long newPosition)
+ void seek(InputPosition newPosition)
{
input.seek(newPosition);
}
@@ -126,21 +126,21 @@ class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
final class Page
{
- public final long offset;
+ public final InputPosition position;
- private Page(long offset)
+ private Page(InputPosition inputPosition)
{
- this.offset = offset;
+ this.position = inputPosition;
}
Iterator<Hint> hintsIterator()
{
- return new HintsIterator(offset);
+ return new HintsIterator(position);
}
Iterator<ByteBuffer> buffersIterator()
{
- return new BuffersIterator(offset);
+ return new BuffersIterator(position);
}
}
@@ -149,12 +149,12 @@ class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
@SuppressWarnings("resource")
protected Page computeNext()
{
- NativeLibrary.trySkipCache(input.getChannel().getFileDescriptor(), 0, input.getFilePointer(), input.getPath());
+ input.tryUncacheRead();
if (input.isEOF())
return endOfData();
- return new Page(input.getFilePointer());
+ return new Page(input.getSeekPosition());
}
}
@@ -163,9 +163,9 @@ class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
*/
final class HintsIterator extends AbstractIterator<Hint>
{
- private final long offset;
+ private final InputPosition offset;
- HintsIterator(long offset)
+ HintsIterator(InputPosition offset)
{
super();
this.offset = offset;
@@ -177,12 +177,12 @@ class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
do
{
- long position = input.getFilePointer();
+ InputPosition position = input.getSeekPosition();
if (input.isEOF())
return endOfData(); // reached EOF
- if (position - offset >= PAGE_SIZE)
+ if (position.subtract(offset) >= PAGE_SIZE)
return endOfData(); // read page size or more bytes
try
@@ -260,9 +260,9 @@ class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
*/
final class BuffersIterator extends AbstractIterator<ByteBuffer>
{
- private final long offset;
+ private final InputPosition offset;
- BuffersIterator(long offset)
+ BuffersIterator(InputPosition offset)
{
super();
this.offset = offset;
@@ -274,12 +274,12 @@ class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
do
{
- long position = input.getFilePointer();
+ InputPosition position = input.getSeekPosition();
if (input.isEOF())
return endOfData(); // reached EOF
- if (position - offset >= PAGE_SIZE)
+ if (position.subtract(offset) >= PAGE_SIZE)
return endOfData(); // read page size or more bytes
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4982587/src/java/org/apache/cassandra/hints/HintsService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsService.java b/src/java/org/apache/cassandra/hints/HintsService.java
index 268ee1f..7ec359d 100644
--- a/src/java/org/apache/cassandra/hints/HintsService.java
+++ b/src/java/org/apache/cassandra/hints/HintsService.java
@@ -30,6 +30,7 @@ import java.util.function.Supplier;
import javax.management.MBeanServer;
import javax.management.ObjectName;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,6 +38,8 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.gms.IFailureDetector;
import org.apache.cassandra.metrics.HintedHandoffMetrics;
import org.apache.cassandra.metrics.StorageMetrics;
import org.apache.cassandra.dht.Token;
@@ -60,7 +63,7 @@ public final class HintsService implements HintsServiceMBean
{
private static final Logger logger = LoggerFactory.getLogger(HintsService.class);
- public static final HintsService instance = new HintsService();
+ public static HintsService instance = new HintsService();
private static final String MBEAN_NAME = "org.apache.cassandra.hints:type=HintsService";
@@ -82,6 +85,12 @@ public final class HintsService implements HintsServiceMBean
private HintsService()
{
+ this(FailureDetector.instance);
+ }
+
+ @VisibleForTesting
+ HintsService(IFailureDetector failureDetector)
+ {
File hintsDirectory = DatabaseDescriptor.getHintsDirectory();
int maxDeliveryThreads = DatabaseDescriptor.getMaxHintsDeliveryThreads();
@@ -92,7 +101,7 @@ public final class HintsService implements HintsServiceMBean
bufferPool = new HintsBufferPool(bufferSize, writeExecutor::flushBuffer);
isDispatchPaused = new AtomicBoolean(true);
- dispatchExecutor = new HintsDispatchExecutor(hintsDirectory, maxDeliveryThreads, isDispatchPaused);
+ dispatchExecutor = new HintsDispatchExecutor(hintsDirectory, maxDeliveryThreads, isDispatchPaused, failureDetector::isAlive);
// periodically empty the current content of the buffers
int flushPeriod = DatabaseDescriptor.getHintsFlushPeriodInMS();
@@ -225,7 +234,7 @@ public final class HintsService implements HintsServiceMBean
* Will abort dispatch sessions that are currently in progress (which is okay, it's idempotent),
* and make sure the buffers are flushed, hints files written and fsynced.
*/
- public synchronized void shutdownBlocking()
+ public synchronized void shutdownBlocking() throws ExecutionException, InterruptedException
{
if (isShutDown)
throw new IllegalStateException("HintsService has already been shut down");
@@ -237,8 +246,8 @@ public final class HintsService implements HintsServiceMBean
triggerFlushingFuture.cancel(false);
- writeExecutor.flushBufferPool(bufferPool);
- writeExecutor.closeAllWriters();
+ writeExecutor.flushBufferPool(bufferPool).get();
+ writeExecutor.closeAllWriters().get();
dispatchExecutor.shutdownBlocking();
writeExecutor.shutdownBlocking();
@@ -370,4 +379,12 @@ public final class HintsService implements HintsServiceMBean
{
return catalog;
}
+
+ /**
+ * Returns true in case service is shut down.
+ */
+ public boolean isShutDown()
+ {
+ return isShutDown;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4982587/src/java/org/apache/cassandra/hints/HintsStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsStore.java b/src/java/org/apache/cassandra/hints/HintsStore.java
index bb3aa0f..032de5a 100644
--- a/src/java/org/apache/cassandra/hints/HintsStore.java
+++ b/src/java/org/apache/cassandra/hints/HintsStore.java
@@ -50,7 +50,7 @@ final class HintsStore
private final File hintsDirectory;
private final ImmutableMap<String, Object> writerParams;
- private final Map<HintsDescriptor, Long> dispatchOffsets;
+ private final Map<HintsDescriptor, InputPosition> dispatchPositions;
private final Deque<HintsDescriptor> dispatchDequeue;
private final Queue<HintsDescriptor> blacklistedFiles;
@@ -64,7 +64,7 @@ final class HintsStore
this.hintsDirectory = hintsDirectory;
this.writerParams = writerParams;
- dispatchOffsets = new ConcurrentHashMap<>();
+ dispatchPositions = new ConcurrentHashMap<>();
dispatchDequeue = new ConcurrentLinkedDeque<>(descriptors);
blacklistedFiles = new ConcurrentLinkedQueue<>();
@@ -143,19 +143,19 @@ final class HintsStore
return !dispatchDequeue.isEmpty();
}
- Optional<Long> getDispatchOffset(HintsDescriptor descriptor)
+ InputPosition getDispatchOffset(HintsDescriptor descriptor)
{
- return Optional.ofNullable(dispatchOffsets.get(descriptor));
+ return dispatchPositions.get(descriptor);
}
- void markDispatchOffset(HintsDescriptor descriptor, long mark)
+ void markDispatchOffset(HintsDescriptor descriptor, InputPosition inputPosition)
{
- dispatchOffsets.put(descriptor, mark);
+ dispatchPositions.put(descriptor, inputPosition);
}
void cleanUp(HintsDescriptor descriptor)
{
- dispatchOffsets.remove(descriptor);
+ dispatchPositions.remove(descriptor);
}
void blacklist(HintsDescriptor descriptor)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4982587/src/java/org/apache/cassandra/hints/InputPosition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/InputPosition.java b/src/java/org/apache/cassandra/hints/InputPosition.java
new file mode 100644
index 0000000..0b8953c
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/InputPosition.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+/**
+ * Marker interface for file positions as provided by the various ChecksummedDataReader implementations.
+ */
+public interface InputPosition
+{
+ long subtract(InputPosition other);
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4982587/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java b/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java
index 928fd31..dcd31cf 100644
--- a/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java
+++ b/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java
@@ -152,6 +152,37 @@ public class HintsCatalogTest
assertEquals(0, store.getDispatchQueueSize());
}
+ @Test
+ public void deleteHintsTest() throws IOException
+ {
+ File directory = Files.createTempDirectory(null).toFile();
+ UUID hostId1 = UUID.randomUUID();
+ UUID hostId2 = UUID.randomUUID();
+ long now = System.currentTimeMillis();
+ writeDescriptor(directory, new HintsDescriptor(hostId1, now));
+ writeDescriptor(directory, new HintsDescriptor(hostId1, now+1));
+ writeDescriptor(directory, new HintsDescriptor(hostId2, now+2));
+ writeDescriptor(directory, new HintsDescriptor(hostId2, now+3));
+
+ // load catalog containing two stores (one for each host)
+ HintsCatalog catalog = HintsCatalog.load(directory, ImmutableMap.of());
+ assertEquals(2, catalog.stores().count());
+ assertTrue(catalog.hasFiles());
+
+ // delete all hints from store 1
+ assertTrue(catalog.get(hostId1).hasFiles());
+ catalog.deleteAllHints(hostId1);
+ assertFalse(catalog.get(hostId1).hasFiles());
+ // stores are still keepts for each host, even after deleting hints
+ assertEquals(2, catalog.stores().count());
+ assertTrue(catalog.hasFiles());
+
+ // delete all hints from all stores
+ catalog.deleteAllHints();
+ assertEquals(2, catalog.stores().count());
+ assertFalse(catalog.hasFiles());
+ }
+
@SuppressWarnings("EmptyTryBlock")
private static void writeDescriptor(File directory, HintsDescriptor descriptor) throws IOException
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4982587/test/unit/org/apache/cassandra/hints/HintsCompressionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/hints/HintsCompressionTest.java b/test/unit/org/apache/cassandra/hints/HintsCompressionTest.java
index d6a08ca..656d7cd 100644
--- a/test/unit/org/apache/cassandra/hints/HintsCompressionTest.java
+++ b/test/unit/org/apache/cassandra/hints/HintsCompressionTest.java
@@ -117,9 +117,11 @@ public class HintsCompressionTest
try (HintsReader reader = HintsReader.open(new File(dir, descriptor.fileName())))
{
List<Hint> deserialized = new ArrayList<>(hintNum);
+ List<InputPosition> pagePositions = new ArrayList<>(hintNum);
for (HintsReader.Page page: reader)
{
+ pagePositions.add(page.position);
Iterator<Hint> iterator = page.hintsIterator();
while (iterator.hasNext())
{
@@ -134,6 +136,21 @@ public class HintsCompressionTest
HintsTestUtil.assertHintsEqual(expected, deserialized.get(hintNum));
hintNum++;
}
+
+ // explicitely seek to each page by iterating collected page positions and check if hints still match as expected
+ int hintOffset = 0;
+ for (InputPosition pos : pagePositions)
+ {
+ reader.seek(pos);
+ HintsReader.Page page = reader.iterator().next();
+ Iterator<Hint> iterator = page.hintsIterator();
+ while (iterator.hasNext())
+ {
+ Hint seekedHint = iterator.next();
+ HintsTestUtil.assertHintsEqual(hints.get(hintOffset), seekedHint);
+ hintOffset++;
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4982587/test/unit/org/apache/cassandra/hints/HintsServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/hints/HintsServiceTest.java b/test/unit/org/apache/cassandra/hints/HintsServiceTest.java
new file mode 100644
index 0000000..ab1cbd0
--- /dev/null
+++ b/test/unit/org/apache/cassandra/hints/HintsServiceTest.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.net.InetAddress;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.annotation.Nullable;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.datastax.driver.core.utils.MoreFutures;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.UpdateBuilder;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.gms.IFailureDetectionEventListener;
+import org.apache.cassandra.gms.IFailureDetector;
+import org.apache.cassandra.metrics.StorageMetrics;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.MockMessagingService;
+import org.apache.cassandra.net.MockMessagingSpy;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.apache.cassandra.Util.dk;
+import static org.apache.cassandra.net.MockMessagingService.verb;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class HintsServiceTest
+{
+ private static final String KEYSPACE = "hints_service_test";
+ private static final String TABLE = "table";
+
+ private final MockFailureDetector failureDetector = new MockFailureDetector();
+
+ @BeforeClass
+ public static void defineSchema()
+ {
+ SchemaLoader.prepareServer();
+ StorageService.instance.initServer();
+ SchemaLoader.createKeyspace(KEYSPACE,
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD(KEYSPACE, TABLE));
+ }
+
+ @After
+ public void cleanup()
+ {
+ MockMessagingService.cleanup();
+ }
+
+ @Before
+ public void reinstanciateService() throws ExecutionException, InterruptedException
+ {
+ MessagingService.instance().clearMessageSinks();
+
+ if (!HintsService.instance.isShutDown())
+ {
+ HintsService.instance.shutdownBlocking();
+ HintsService.instance.deleteAllHints();
+ }
+
+ failureDetector.isAlive = true;
+ HintsService.instance = new HintsService(failureDetector);
+ HintsService.instance.startDispatch();
+ }
+
+ @Test
+ public void testDispatchHints() throws InterruptedException, ExecutionException
+ {
+ long cnt = StorageMetrics.totalHints.getCount();
+
+ // create spy for hint messages
+ MockMessagingSpy spy = sendHintsAndResponses(100, -1);
+
+ // metrics should have been updated with number of create hints
+ assertEquals(cnt + 100, StorageMetrics.totalHints.getCount());
+
+ // wait until hints have been send
+ spy.interceptMessageOut(100).get();
+ spy.interceptNoMsg(500, TimeUnit.MILLISECONDS).get();
+ }
+
+ @Test
+ public void testPauseAndResume() throws InterruptedException, ExecutionException
+ {
+ HintsService.instance.pauseDispatch();
+
+ // create spy for hint messages
+ MockMessagingSpy spy = sendHintsAndResponses(100, -1);
+
+ // we should not send any hints while paused
+ ListenableFuture<Boolean> noMessagesWhilePaused = spy.interceptNoMsg(15, TimeUnit.SECONDS);
+ Futures.addCallback(noMessagesWhilePaused, new MoreFutures.SuccessCallback<Boolean>()
+ {
+ public void onSuccess(@Nullable Boolean aBoolean)
+ {
+ HintsService.instance.resumeDispatch();
+ }
+ });
+
+ Futures.allAsList(
+ noMessagesWhilePaused,
+ spy.interceptMessageOut(100),
+ spy.interceptNoMsg(200, TimeUnit.MILLISECONDS)
+ ).get();
+ }
+
+ @Test
+ public void testPageRetry() throws InterruptedException, ExecutionException, TimeoutException
+ {
+ // create spy for hint messages, but only create responses for 5 hints
+ MockMessagingSpy spy = sendHintsAndResponses(20, 5);
+
+ Futures.allAsList(
+ // the dispatcher will always send all hints within the current page
+ // and only wait for the acks before going to the next page
+ spy.interceptMessageOut(20),
+ spy.interceptNoMsg(200, TimeUnit.MILLISECONDS),
+
+ // next tick will trigger a retry of the same page as we only replied with 5/20 acks
+ spy.interceptMessageOut(20)
+ ).get();
+
+ // marking the destination node as dead should stop sending hints
+ failureDetector.isAlive = false;
+ spy.interceptNoMsg(20, TimeUnit.SECONDS).get();
+ }
+
+ @Test
+ public void testPageSeek() throws InterruptedException, ExecutionException
+ {
+ // create spy for hint messages, stop replying after 12k (should be on 3rd page)
+ MockMessagingSpy spy = sendHintsAndResponses(20000, 12000);
+
+ // At this point the dispatcher will constantly retry the page we stopped acking,
+ // thus we receive the same hints from the page multiple times and in total more than
+ // all written hints. Lets just consume them for a while and then pause the dispatcher.
+ spy.interceptMessageOut(22000).get();
+ HintsService.instance.pauseDispatch();
+ Thread.sleep(1000);
+
+ // verify that we have a dispatch offset set for the page we're currently stuck at
+ HintsStore store = HintsService.instance.getCatalog().get(StorageService.instance.getLocalHostUUID());
+ HintsDescriptor descriptor = store.poll();
+ store.offerFirst(descriptor); // add again for cleanup during re-instanciation
+ InputPosition dispatchOffset = store.getDispatchOffset(descriptor);
+ assertTrue(dispatchOffset != null);
+ assertTrue(((ChecksummedDataInput.Position) dispatchOffset).sourcePosition > 0);
+ }
+
+ private MockMessagingSpy sendHintsAndResponses(int noOfHints, int noOfResponses)
+ {
+ // create spy for hint messages, but only create responses for noOfResponses hints
+ MessageIn<HintResponse> messageIn = MessageIn.create(FBUtilities.getBroadcastAddress(),
+ HintResponse.instance,
+ Collections.emptyMap(),
+ MessagingService.Verb.REQUEST_RESPONSE,
+ MessagingService.current_version);
+
+ MockMessagingSpy spy;
+ if (noOfResponses != -1)
+ {
+ spy = MockMessagingService.when(verb(MessagingService.Verb.HINT)).respondN(messageIn, noOfResponses);
+ }
+ else
+ {
+ spy = MockMessagingService.when(verb(MessagingService.Verb.HINT)).respond(messageIn);
+ }
+
+ // create and write noOfHints using service
+ UUID hostId = StorageService.instance.getLocalHostUUID();
+ for (int i = 0; i < noOfHints; i++)
+ {
+ long now = System.currentTimeMillis();
+ DecoratedKey dkey = dk(String.valueOf(i));
+ CFMetaData cfMetaData = Schema.instance.getCFMetaData(KEYSPACE, TABLE);
+
+ UpdateBuilder builder = UpdateBuilder.create(cfMetaData, dkey)
+ .withTimestamp(now)
+ .newRow("column0")
+ .add("val", "value0");
+ Hint hint = Hint.create((Mutation) builder.makeMutation(), now);
+
+ HintsService.instance.write(hostId, hint);
+ }
+ return spy;
+ }
+
+ private static class MockFailureDetector implements IFailureDetector
+ {
+ private boolean isAlive = true;
+
+ public boolean isAlive(InetAddress ep)
+ {
+ return isAlive;
+ }
+
+ public void interpret(InetAddress ep)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void report(InetAddress ep)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void registerFailureDetectionEventListener(IFailureDetectionEventListener listener)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void unregisterFailureDetectionEventListener(IFailureDetectionEventListener listener)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void remove(InetAddress ep)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void forceConviction(InetAddress ep)
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org