You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/08/19 16:27:14 UTC
[3/4] cassandra git commit: Rewrite hinted handoff
http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/hints/HintVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintVerbHandler.java b/src/java/org/apache/cassandra/hints/HintVerbHandler.java
new file mode 100644
index 0000000..458d01f
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/HintVerbHandler.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.net.InetAddress;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.service.StorageService;
+
+/**
+ * Verb handler used both for hint dispatch and streaming.
+ *
+ * With the non-sstable format, we cannot just stream hint sstables on node decommission. So sometimes, at decommission
+ * time, we might have to stream hints to a non-owning host (say, if the owning host B is down during decommission of host A).
+ * In that case the handler just stores the received hint in its local hint store.
+ */
+public final class HintVerbHandler implements IVerbHandler<HintMessage>
+{
+ private static final Logger logger = LoggerFactory.getLogger(HintVerbHandler.class);
+
+ public void doVerb(MessageIn<HintMessage> message, int id)
+ {
+ UUID hostId = message.payload.hostId;
+ Hint hint = message.payload.hint;
+
+ // If we see an unknown table id, it means the table, or one of the tables in the mutation, had been dropped.
+ // In that case there is nothing we can really do, or should do, other than log it go on.
+ // This will *not* happen due to a not-yet-seen table, because we don't transfer hints unless there
+ // is schema agreement between the sender and the receiver.
+ if (hint == null)
+ {
+ logger.debug("Failed to decode and apply a hint for {} - table with id {} is unknown",
+ hostId,
+ message.payload.unknownTableID);
+ reply(id, message.from);
+ return;
+ }
+
+ // We must perform validation before applying the hint, and there is no other place to do it other than here.
+ try
+ {
+ hint.mutation.getPartitionUpdates().forEach(PartitionUpdate::validate);
+ }
+ catch (MarshalException e)
+ {
+ logger.warn("Failed to validate a hint for {} (table id {}) - skipped", hostId);
+ reply(id, message.from);
+ return;
+ }
+
+ // Apply the hint if this node is the destination, store for future dispatch if this node isn't (must have gotten
+ // it from a decommissioned node that had streamed it before going out).
+ if (hostId.equals(StorageService.instance.getLocalHostUUID()))
+ hint.apply();
+ else
+ HintsService.instance.write(hostId, hint);
+
+ reply(id, message.from);
+ }
+
+ private static void reply(int id, InetAddress to)
+ {
+ MessagingService.instance().sendReply(HintResponse.message, id, to);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/hints/HintsBuffer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsBuffer.java b/src/java/org/apache/cassandra/hints/HintsBuffer.java
new file mode 100644
index 0000000..097abce
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/HintsBuffer.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.zip.CRC32;
+
+import org.apache.cassandra.io.util.DataOutputBufferFixed;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.AbstractIterator;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+
+import static org.apache.cassandra.utils.FBUtilities.updateChecksum;
+import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
+
+/**
+ * A shared buffer that temporarily holds the serialized hints before they are flushed to disk.
+ *
+ * Consists of :
+ * - a ByteBuffer holding the serialized hints (length, length checksum and total checksum included)
+ * - a pointer to the current allocation offset
+ * - an {@link OpOrder} appendOrder for {@link HintsWriteExecutor} to wait on for all writes completion
+ * - a map of (host id -> offset queue) for the hints written
+ *
+ * It's possible to write a single hint for two or more hosts at the same time, in which case the same offset will be put
+ * into two or more offset queues.
+ */
+final class HintsBuffer
+{
+ // hint entry overhead in bytes (int length, int length checksum, int body checksum)
+ static final int ENTRY_OVERHEAD_SIZE = 12;
+ static final int CLOSED = -1;
+
+ private final ByteBuffer slab; // the underlying backing ByteBuffer for all the serialized hints
+ private final AtomicInteger position; // the position in the slab that we currently allocate from
+
+ private final ConcurrentMap<UUID, Queue<Integer>> offsets;
+ private final OpOrder appendOrder;
+
+ private HintsBuffer(ByteBuffer slab)
+ {
+ this.slab = slab;
+
+ position = new AtomicInteger();
+ offsets = new ConcurrentHashMap<>();
+ appendOrder = new OpOrder();
+ }
+
+ static HintsBuffer create(int slabSize)
+ {
+ return new HintsBuffer(ByteBuffer.allocateDirect(slabSize));
+ }
+
+ boolean isClosed()
+ {
+ return position.get() == CLOSED;
+ }
+
+ int capacity()
+ {
+ return slab.capacity();
+ }
+
+ int remaining()
+ {
+ int pos = position.get();
+ return pos == CLOSED ? 0 : capacity() - pos;
+ }
+
+ HintsBuffer recycle()
+ {
+ slab.clear();
+ return new HintsBuffer(slab);
+ }
+
+ void free()
+ {
+ FileUtils.clean(slab);
+ }
+
+ /**
+ * Wait for any appends started before this method was called.
+ */
+ void waitForModifications()
+ {
+ appendOrder.awaitNewBarrier(); // issue a barrier and wait for it
+ }
+
+ Set<UUID> hostIds()
+ {
+ return offsets.keySet();
+ }
+
+ /**
+ * Coverts the queue of offsets for the selected host id into an iterator of hints encoded as ByteBuffers.
+ */
+ Iterator<ByteBuffer> consumingHintsIterator(UUID hostId)
+ {
+ final Queue<Integer> bufferOffsets = offsets.get(hostId);
+
+ if (bufferOffsets == null)
+ return Collections.emptyIterator();
+
+ return new AbstractIterator<ByteBuffer>()
+ {
+ private final ByteBuffer flyweight = slab.duplicate();
+
+ protected ByteBuffer computeNext()
+ {
+ Integer offset = bufferOffsets.poll();
+
+ if (offset == null)
+ return endOfData();
+
+ int totalSize = slab.getInt(offset) + ENTRY_OVERHEAD_SIZE;
+
+ return (ByteBuffer) flyweight.clear().position(offset).limit(offset + totalSize);
+ }
+ };
+ }
+
+ Allocation allocate(int hintSize)
+ {
+ int totalSize = hintSize + ENTRY_OVERHEAD_SIZE;
+
+ if (totalSize > slab.capacity() / 2)
+ {
+ throw new IllegalArgumentException(String.format("Hint of %s bytes is too large - the maximum size is %s",
+ hintSize,
+ slab.capacity() / 2));
+ }
+
+ @SuppressWarnings("resource")
+ OpOrder.Group opGroup = appendOrder.start(); // will eventually be closed by the receiver of the allocation
+ try
+ {
+ return allocate(totalSize, opGroup);
+ }
+ catch (Throwable t)
+ {
+ opGroup.close();
+ throw t;
+ }
+ }
+
+ private Allocation allocate(int totalSize, OpOrder.Group opGroup)
+ {
+ int offset = allocateBytes(totalSize);
+ if (offset < 0)
+ {
+ opGroup.close();
+ return null;
+ }
+ return new Allocation(offset, totalSize, opGroup);
+ }
+
+ private int allocateBytes(int totalSize)
+ {
+ while (true)
+ {
+ int prev = position.get();
+ int next = prev + totalSize;
+
+ if (prev == CLOSED) // the slab has been 'closed'
+ return CLOSED;
+
+ if (next > slab.capacity())
+ {
+ position.set(CLOSED); // mark the slab as no longer allocating if we've exceeded its capacity
+ return CLOSED;
+ }
+
+ if (position.compareAndSet(prev, next))
+ return prev;
+ }
+ }
+
+ private void put(UUID hostId, int offset)
+ {
+ // we intentionally don't just return offsets.computeIfAbsent() because it's expensive compared to simple get(),
+ // and the method is on a really hot path
+ Queue<Integer> queue = offsets.get(hostId);
+ if (queue == null)
+ queue = offsets.computeIfAbsent(hostId, (id) -> new ConcurrentLinkedQueue<>());
+ queue.offer(offset);
+ }
+
+ /**
+ * A placeholder for hint serialization. Should always be used in a try-with-resources block.
+ */
+ final class Allocation implements AutoCloseable
+ {
+ private final Integer offset;
+ private final int totalSize;
+ private final OpOrder.Group opGroup;
+
+ Allocation(int offset, int totalSize, OpOrder.Group opGroup)
+ {
+ this.offset = offset;
+ this.totalSize = totalSize;
+ this.opGroup = opGroup;
+ }
+
+ void write(Iterable<UUID> hostIds, Hint hint)
+ {
+ write(hint);
+ for (UUID hostId : hostIds)
+ put(hostId, offset);
+ }
+
+ public void close()
+ {
+ opGroup.close();
+ }
+
+ private void write(Hint hint)
+ {
+ ByteBuffer buffer = (ByteBuffer) slab.duplicate().position(offset).limit(offset + totalSize);
+ DataOutputPlus dop = new DataOutputBufferFixed(buffer);
+ CRC32 crc = new CRC32();
+ int hintSize = totalSize - ENTRY_OVERHEAD_SIZE;
+ try
+ {
+ dop.writeInt(hintSize);
+ updateChecksumInt(crc, hintSize);
+ dop.writeInt((int) crc.getValue());
+
+ Hint.serializer.serialize(hint, dop, MessagingService.current_version);
+ updateChecksum(crc, buffer, buffer.position() - hintSize, hintSize);
+ dop.writeInt((int) crc.getValue());
+ }
+ catch (IOException e)
+ {
+ throw new AssertionError(); // cannot happen
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/hints/HintsBufferPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsBufferPool.java b/src/java/org/apache/cassandra/hints/HintsBufferPool.java
new file mode 100644
index 0000000..83b155a
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/HintsBufferPool.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.cassandra.net.MessagingService;
+
+/**
+ * A primitive pool of {@link HintsBuffer} buffers. Under normal conditions should only hold two buffers - the currently
+ * written to one, and a reserve buffer to switch to when the first one is beyond capacity.
+ */
+final class HintsBufferPool
+{
+ interface FlushCallback
+ {
+ void flush(HintsBuffer buffer, HintsBufferPool pool);
+ }
+
+ private volatile HintsBuffer currentBuffer;
+ private final Queue<HintsBuffer> reserveBuffers;
+ private final int bufferSize;
+ private final FlushCallback flushCallback;
+
+ HintsBufferPool(int bufferSize, FlushCallback flushCallback)
+ {
+ reserveBuffers = new ConcurrentLinkedQueue<>();
+
+ this.bufferSize = bufferSize;
+ this.flushCallback = flushCallback;
+ }
+
+ /**
+ * @param hostIds host ids of the hint's target nodes
+ * @param hint the hint to store
+ */
+ void write(Iterable<UUID> hostIds, Hint hint)
+ {
+ int hintSize = (int) Hint.serializer.serializedSize(hint, MessagingService.current_version);
+ try (HintsBuffer.Allocation allocation = allocate(hintSize))
+ {
+ allocation.write(hostIds, hint);
+ }
+ }
+
+ private HintsBuffer.Allocation allocate(int hintSize)
+ {
+ HintsBuffer current = currentBuffer();
+
+ while (true)
+ {
+ HintsBuffer.Allocation allocation = current.allocate(hintSize);
+ if (allocation != null)
+ return allocation;
+
+ // allocation failed due to insufficient size remaining in the buffer
+ if (switchCurrentBuffer(current))
+ flushCallback.flush(current, this);
+
+ current = currentBuffer;
+ }
+ }
+
+ boolean offer(HintsBuffer buffer)
+ {
+ if (!reserveBuffers.isEmpty())
+ return false;
+
+ reserveBuffers.offer(buffer);
+ return true;
+ }
+
+ // A wrapper to ensure a non-null currentBuffer value on the first call.
+ HintsBuffer currentBuffer()
+ {
+ if (currentBuffer == null)
+ initializeCurrentBuffer();
+
+ return currentBuffer;
+ }
+
+ private synchronized void initializeCurrentBuffer()
+ {
+ if (currentBuffer == null)
+ currentBuffer = createBuffer();
+ }
+
+ private synchronized boolean switchCurrentBuffer(HintsBuffer previous)
+ {
+ if (currentBuffer != previous)
+ return false;
+
+ HintsBuffer buffer = reserveBuffers.poll();
+ currentBuffer = buffer == null ? createBuffer() : buffer;
+
+ return true;
+ }
+
+ private HintsBuffer createBuffer()
+ {
+ return HintsBuffer.create(bufferSize);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/hints/HintsCatalog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsCatalog.java b/src/java/org/apache/cassandra/hints/HintsCatalog.java
new file mode 100644
index 0000000..13404ee
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/HintsCatalog.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Stream;
+
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.utils.CLibrary;
+import org.apache.cassandra.utils.SyncUtil;
+
+import static java.util.stream.Collectors.groupingBy;
+
+/**
+ * A simple catalog for easy host id -> {@link HintsStore} lookup and manipulation.
+ */
+final class HintsCatalog
+{
+ private final File hintsDirectory;
+ private final Map<UUID, HintsStore> stores;
+
+ private HintsCatalog(File hintsDirectory, Map<UUID, List<HintsDescriptor>> descriptors)
+ {
+ this.hintsDirectory = hintsDirectory;
+ this.stores = new ConcurrentHashMap<>();
+
+ for (Map.Entry<UUID, List<HintsDescriptor>> entry : descriptors.entrySet())
+ stores.put(entry.getKey(), HintsStore.create(entry.getKey(), hintsDirectory, entry.getValue()));
+ }
+
+ /**
+ * Loads hints stores from a given directory.
+ */
+ static HintsCatalog load(File hintsDirectory)
+ {
+ try
+ {
+ Map<UUID, List<HintsDescriptor>> stores =
+ Files.list(hintsDirectory.toPath())
+ .filter(HintsDescriptor::isHintFileName)
+ .map(HintsDescriptor::readFromFile)
+ .collect(groupingBy(h -> h.hostId));
+ return new HintsCatalog(hintsDirectory, stores);
+ }
+ catch (IOException e)
+ {
+ throw new FSReadError(e, hintsDirectory);
+ }
+ }
+
+ Stream<HintsStore> stores()
+ {
+ return stores.values().stream();
+ }
+
+ void maybeLoadStores(Iterable<UUID> hostIds)
+ {
+ for (UUID hostId : hostIds)
+ get(hostId);
+ }
+
+ HintsStore get(UUID hostId)
+ {
+ // we intentionally don't just return stores.computeIfAbsent() because it's expensive compared to simple get(),
+ // and in this case would also allocate for the capturing lambda; the method is on a really hot path
+ HintsStore store = stores.get(hostId);
+ return store == null
+ ? stores.computeIfAbsent(hostId, (id) -> HintsStore.create(id, hintsDirectory, Collections.emptyList()))
+ : store;
+ }
+
+ /**
+ * Delete all hints for all host ids.
+ *
+ * Will not delete the files that are currently being dispatched, or written to.
+ */
+ void deleteAllHints()
+ {
+ stores.keySet().forEach(this::deleteAllHints);
+ }
+
+ /**
+ * Delete all hints for the specified host id.
+ *
+ * Will not delete the files that are currently being dispatched, or written to.
+ */
+ void deleteAllHints(UUID hostId)
+ {
+ HintsStore store = stores.get(hostId);
+ if (store != null)
+ store.deleteAllHints();
+ }
+
+ void exciseStore(UUID hostId)
+ {
+ deleteAllHints(hostId);
+ stores.remove(hostId);
+ }
+
+ void fsyncDirectory()
+ {
+ int fd = CLibrary.tryOpenDirectory(hintsDirectory.getAbsolutePath());
+ if (fd != -1)
+ {
+ SyncUtil.trySync(fd);
+ CLibrary.tryCloseFD(fd);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/hints/HintsDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsDescriptor.java b/src/java/org/apache/cassandra/hints/HintsDescriptor.java
new file mode 100644
index 0000000..9c27a23
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/HintsDescriptor.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.UUID;
+import java.util.regex.Pattern;
+import java.util.zip.CRC32;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
+import org.json.simple.JSONValue;
+
+import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
+
+/**
+ * Describes the host id, the version, the timestamp of creation, and an arbitrary map of JSON-encoded parameters of a
+ * hints file.
+ *
+ * Written in the beginning of each hints file.
+ */
+final class HintsDescriptor
+{
+ static final int VERSION_30 = 1;
+ static final int CURRENT_VERSION = VERSION_30;
+
+ static final Pattern pattern =
+ Pattern.compile("^[a-fA-F0-9]{8}\\-[a-fA-F0-9]{4}\\-[a-fA-F0-9]{4}\\-[a-fA-F0-9]{4}\\-[a-fA-F0-9]{12}\\-(\\d+)\\-(\\d+)\\.hints$");
+
+ final UUID hostId;
+ final int version;
+ final long timestamp;
+
+ // implemented for future compression support - see CASSANDRA-9428
+ final ImmutableMap<String, Object> parameters;
+
+ HintsDescriptor(UUID hostId, int version, long timestamp, ImmutableMap<String, Object> parameters)
+ {
+ this.hostId = hostId;
+ this.version = version;
+ this.timestamp = timestamp;
+ this.parameters = parameters;
+ }
+
+ HintsDescriptor(UUID hostId, long timestamp)
+ {
+ this(hostId, CURRENT_VERSION, timestamp, ImmutableMap.<String, Object>of());
+ }
+
+ String fileName()
+ {
+ return String.format("%s-%s-%s.hints", hostId, timestamp, version);
+ }
+
+ String checksumFileName()
+ {
+ return String.format("%s-%s-%s.crc32", hostId, timestamp, version);
+ }
+
+ int messagingVersion()
+ {
+ return messagingVersion(version);
+ }
+
+ static int messagingVersion(int hintsVersion)
+ {
+ switch (hintsVersion)
+ {
+ case VERSION_30:
+ return MessagingService.VERSION_30;
+ default:
+ throw new AssertionError();
+ }
+ }
+
+ static boolean isHintFileName(Path path)
+ {
+ return pattern.matcher(path.getFileName().toString()).matches();
+ }
+
+ static HintsDescriptor readFromFile(Path path)
+ {
+ try (RandomAccessFile raf = new RandomAccessFile(path.toFile(), "r"))
+ {
+ return deserialize(raf);
+ }
+ catch (IOException e)
+ {
+ throw new FSReadError(e, path.toFile());
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return MoreObjects.toStringHelper(this)
+ .add("hostId", hostId)
+ .add("version", version)
+ .add("timestamp", timestamp)
+ .add("parameters", parameters)
+ .toString();
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o)
+ return true;
+
+ if (!(o instanceof HintsDescriptor))
+ return false;
+
+ HintsDescriptor hd = (HintsDescriptor) o;
+
+ return Objects.equal(hostId, hd.hostId)
+ && Objects.equal(version, hd.version)
+ && Objects.equal(timestamp, hd.timestamp)
+ && Objects.equal(parameters, hd.parameters);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hashCode(hostId, version, timestamp, parameters);
+ }
+
+ void serialize(DataOutputPlus out) throws IOException
+ {
+ CRC32 crc = new CRC32();
+
+ out.writeInt(version);
+ updateChecksumInt(crc, version);
+
+ out.writeLong(timestamp);
+ updateChecksumLong(crc, timestamp);
+
+ out.writeLong(hostId.getMostSignificantBits());
+ updateChecksumLong(crc, hostId.getMostSignificantBits());
+ out.writeLong(hostId.getLeastSignificantBits());
+ updateChecksumLong(crc, hostId.getLeastSignificantBits());
+
+ byte[] paramsBytes = JSONValue.toJSONString(parameters).getBytes(StandardCharsets.UTF_8);
+ out.writeInt(paramsBytes.length);
+ updateChecksumInt(crc, paramsBytes.length);
+ out.writeInt((int) crc.getValue());
+
+ out.write(paramsBytes);
+ crc.update(paramsBytes, 0, paramsBytes.length);
+
+ out.writeInt((int) crc.getValue());
+ }
+
+ int serializedSize()
+ {
+ int size = TypeSizes.sizeof(version);
+ size += TypeSizes.sizeof(timestamp);
+
+ size += TypeSizes.sizeof(hostId.getMostSignificantBits());
+ size += TypeSizes.sizeof(hostId.getLeastSignificantBits());
+
+ byte[] paramsBytes = JSONValue.toJSONString(parameters).getBytes(StandardCharsets.UTF_8);
+ size += TypeSizes.sizeof(paramsBytes.length);
+ size += 4; // size checksum
+ size += paramsBytes.length;
+ size += 4; // total checksum
+
+ return size;
+ }
+
+ static HintsDescriptor deserialize(DataInput in) throws IOException
+ {
+ CRC32 crc = new CRC32();
+
+ int version = in.readInt();
+ updateChecksumInt(crc, version);
+
+ long timestamp = in.readLong();
+ updateChecksumLong(crc, timestamp);
+
+ long msb = in.readLong();
+ updateChecksumLong(crc, msb);
+ long lsb = in.readLong();
+ updateChecksumLong(crc, lsb);
+
+ UUID hostId = new UUID(msb, lsb);
+
+ int paramsLength = in.readInt();
+ updateChecksumInt(crc, paramsLength);
+ validateCRC(in.readInt(), (int) crc.getValue());
+
+ byte[] paramsBytes = new byte[paramsLength];
+ in.readFully(paramsBytes, 0, paramsLength);
+ crc.update(paramsBytes, 0, paramsLength);
+ validateCRC(in.readInt(), (int) crc.getValue());
+
+ return new HintsDescriptor(hostId, version, timestamp, decodeJSONBytes(paramsBytes));
+ }
+
+ @SuppressWarnings("unchecked")
+ private static ImmutableMap<String, Object> decodeJSONBytes(byte[] bytes)
+ {
+ return ImmutableMap.copyOf((Map<String, Object>) JSONValue.parse(new String(bytes, StandardCharsets.UTF_8)));
+ }
+
+ private static void updateChecksumLong(CRC32 crc, long value)
+ {
+ updateChecksumInt(crc, (int) (value & 0xFFFFFFFFL));
+ updateChecksumInt(crc, (int) (value >>> 32));
+ }
+
+ private static void validateCRC(int expected, int actual) throws IOException
+ {
+ if (expected != actual)
+ throw new IOException("Hints Descriptor CRC Mismatch");
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
new file mode 100644
index 0000000..d0fdd04
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.io.File;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.service.StorageService;
+
+/**
+ * A multi-threaded (by default) executor for dispatching hints.
+ *
+ * Most of dispatch is triggered by {@link HintsDispatchTrigger} running every ~10 seconds.
+ */
+final class HintsDispatchExecutor
+{
+ private static final Logger logger = LoggerFactory.getLogger(HintsDispatchExecutor.class);
+
+ private final File hintsDirectory;
+ private final ExecutorService executor;
+ private final AtomicBoolean isPaused;
+ private final Map<UUID, Future> scheduledDispatches;
+
+ HintsDispatchExecutor(File hintsDirectory, int maxThreads, AtomicBoolean isPaused)
+ {
+ this.hintsDirectory = hintsDirectory;
+ this.isPaused = isPaused;
+
+ scheduledDispatches = new ConcurrentHashMap<>();
+ executor = new JMXEnabledThreadPoolExecutor(1,
+ maxThreads,
+ 1,
+ TimeUnit.MINUTES,
+ new LinkedBlockingQueue<>(),
+ new NamedThreadFactory("HintsDispatcher", Thread.MIN_PRIORITY),
+ "internal");
+ }
+
+ /*
+ * It's safe to terminate dispatch in process and to deschedule dispatch.
+ */
+ void shutdownBlocking()
+ {
+ scheduledDispatches.clear();
+ executor.shutdownNow();
+ }
+
+ boolean isScheduled(HintsStore store)
+ {
+ return scheduledDispatches.containsKey(store.hostId);
+ }
+
+ Future dispatch(HintsStore store)
+ {
+ return dispatch(store, store.hostId);
+ }
+
+ Future dispatch(HintsStore store, UUID hostId)
+ {
+ /*
+ * It is safe to perform dispatch for the same host id concurrently in two or more threads,
+ * however there is nothing to win from it - so we don't.
+ *
+ * Additionally, having just one dispatch task per host id ensures that we'll never violate our per-destination
+ * rate limit, without having to share a ratelimiter between threads.
+ *
+ * It also simplifies reasoning about dispatch sessions.
+ */
+ return scheduledDispatches.computeIfAbsent(store.hostId, uuid -> executor.submit(new DispatchHintsTask(store, hostId)));
+ }
+
+ void completeDispatchBlockingly(HintsStore store)
+ {
+ Future future = scheduledDispatches.get(store.hostId);
+ try
+ {
+ if (future != null)
+ future.get();
+ }
+ catch (ExecutionException | InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private final class DispatchHintsTask implements Runnable
+ {
+ private final HintsStore store;
+ private final UUID hostId;
+ private final RateLimiter rateLimiter;
+
+ DispatchHintsTask(HintsStore store, UUID hostId)
+ {
+ this.store = store;
+ this.hostId = hostId;
+
+ // rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml).
+ // max rate is scaled by the number of nodes in the cluster (CASSANDRA-5272).
+ // the goal is to bound maximum hints traffic going towards a particular node from the rest of the cluster,
+ // not total outgoing hints traffic from this node - this is why the rate limiter is not shared between
+ // all the dispatch tasks (as there will be at most one dispatch task for a particular host id at a time).
+ int nodesCount = Math.max(1, StorageService.instance.getTokenMetadata().getAllEndpoints().size() - 1);
+ int throttleInKB = DatabaseDescriptor.getHintedHandoffThrottleInKB() / nodesCount;
+ this.rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024);
+ }
+
+ public void run()
+ {
+ try
+ {
+ dispatch();
+ }
+ finally
+ {
+ scheduledDispatches.remove(hostId);
+ }
+ }
+
+ private void dispatch()
+ {
+ while (true)
+ {
+ if (isPaused.get())
+ break;
+
+ HintsDescriptor descriptor = store.poll();
+ if (descriptor == null)
+ break;
+
+ try
+ {
+ dispatch(descriptor);
+ }
+ catch (FSReadError e)
+ {
+ logger.error("Failed to dispatch hints file {}: file is corrupted ({})", descriptor.fileName(), e);
+ store.cleanUp(descriptor);
+ store.blacklist(descriptor);
+ throw e;
+ }
+ }
+ }
+
+ private void dispatch(HintsDescriptor descriptor)
+ {
+ logger.debug("Dispatching hints file {}", descriptor.fileName());
+
+ File file = new File(hintsDirectory, descriptor.fileName());
+ Long offset = store.getDispatchOffset(descriptor).orElse(null);
+
+ try (HintsDispatcher dispatcher = HintsDispatcher.create(file, rateLimiter, hostId, isPaused))
+ {
+ if (offset != null)
+ dispatcher.seek(offset);
+
+ if (dispatcher.dispatch())
+ {
+ if (!file.delete())
+ logger.error("Failed to delete hints file {}", descriptor.fileName());
+ store.cleanUp(descriptor);
+ logger.info("Finished hinted handoff of file {} to endpoint {}", descriptor.fileName(), hostId);
+ }
+ else
+ {
+ store.markDispatchOffset(descriptor, dispatcher.dispatchOffset());
+ store.offerFirst(descriptor);
+ logger.info("Finished hinted handoff of file {} to endpoint {}, partially", descriptor.fileName(), hostId);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java b/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java
new file mode 100644
index 0000000..5fe0e27
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.gms.Gossiper;
+
+import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddress;
+
+/**
+ * A simple dispatch trigger that's being run every 10 seconds.
+ *
+ * Goes through all hint stores and schedules for dispatch all the hints for hosts that are:
+ * 1. Not currently scheduled for dispatch, and
+ * 2. Either have some hint files, or an active hint writer, and
+ * 3. Are live, and
+ * 4. Have matching schema versions
+ *
+ * What does triggering a hints store for dispatch mean?
+ * - If there are existing hint files, it means submitting them for dispatch;
+ * - If there is an active writer, closing it, for the next run to pick it up.
+ */
+final class HintsDispatchTrigger implements Runnable
+{
+ private final HintsCatalog catalog;
+ private final HintsWriteExecutor writeExecutor;
+ private final HintsDispatchExecutor dispatchExecutor;
+ private final AtomicBoolean isPaused;
+
+ HintsDispatchTrigger(HintsCatalog catalog,
+ HintsWriteExecutor writeExecutor,
+ HintsDispatchExecutor dispatchExecutor,
+ AtomicBoolean isPaused)
+ {
+ this.catalog = catalog;
+ this.writeExecutor = writeExecutor;
+ this.dispatchExecutor = dispatchExecutor;
+ this.isPaused = isPaused;
+ }
+
+ public void run()
+ {
+ if (isPaused.get())
+ return;
+
+ catalog.stores()
+ .filter(store -> !isScheduled(store))
+ .filter(HintsStore::isLive)
+ .filter(store -> store.isWriting() || store.hasFiles())
+ .filter(store -> Gossiper.instance.valuesEqual(getBroadcastAddress(), store.address(), ApplicationState.SCHEMA))
+ .forEach(this::schedule);
+ }
+
+ private void schedule(HintsStore store)
+ {
+ if (store.hasFiles())
+ dispatchExecutor.dispatch(store);
+
+ if (store.isWriting())
+ writeExecutor.closeWriter(store);
+ }
+
+ private boolean isScheduled(HintsStore store)
+ {
+ return dispatchExecutor.isScheduled(store);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/hints/HintsDispatcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatcher.java b/src/java/org/apache/cassandra/hints/HintsDispatcher.java
new file mode 100644
index 0000000..f769e09
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/HintsDispatcher.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.net.IAsyncCallbackWithFailure;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.concurrent.SimpleCondition;
+
+/**
+ * Dispatches a single hints file to a specified node in a batched manner.
+ *
+ * Uses either {@link EncodedHintMessage} - when dispatching hints into a node with the same messaging version as the hints file,
+ * or {@link HintMessage}, when conversion is required.
+ */
+final class HintsDispatcher implements AutoCloseable
+{
+ private enum Action { CONTINUE, ABORT, RETRY }
+
+ private final HintsReader reader;
+ private final UUID hostId;
+ private final InetAddress address;
+ private final int messagingVersion;
+ private final AtomicBoolean isPaused;
+
+ private long currentPageOffset;
+
+ private HintsDispatcher(HintsReader reader, UUID hostId, InetAddress address, int messagingVersion, AtomicBoolean isPaused)
+ {
+ currentPageOffset = 0L;
+
+ this.reader = reader;
+ this.hostId = hostId;
+ this.address = address;
+ this.messagingVersion = messagingVersion;
+ this.isPaused = isPaused;
+ }
+
+ static HintsDispatcher create(File file, RateLimiter rateLimiter, UUID hostId, AtomicBoolean isPaused)
+ {
+ InetAddress address = StorageService.instance.getEndpointForHostId(hostId);
+ int messagingVersion = MessagingService.instance().getVersion(address);
+ return new HintsDispatcher(HintsReader.open(file, rateLimiter), hostId, address, messagingVersion, isPaused);
+ }
+
+ public void close()
+ {
+ reader.close();
+ }
+
+ void seek(long bytes)
+ {
+ reader.seek(bytes);
+ currentPageOffset = 0L;
+ }
+
+ /**
+ * @return whether or not dispatch completed entirely and successfully
+ */
+ boolean dispatch()
+ {
+ for (HintsReader.Page page : reader)
+ {
+ currentPageOffset = page.offset;
+ if (dispatch(page) != Action.CONTINUE)
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * @return offset of the first non-delivered page
+ */
+ long dispatchOffset()
+ {
+ return currentPageOffset;
+ }
+
+ private boolean isHostAlive()
+ {
+ return FailureDetector.instance.isAlive(address);
+ }
+
+ private boolean isPaused()
+ {
+ return isPaused.get();
+ }
+
+ // retry in case of a timeout; stop in case of a failure, host going down, or delivery paused
+ private Action dispatch(HintsReader.Page page)
+ {
+ Action action = sendHintsAndAwait(page);
+ return action == Action.RETRY
+ ? dispatch(page)
+ : action;
+ }
+
+ private Action sendHintsAndAwait(HintsReader.Page page)
+ {
+ Collection<Callback> callbacks = new ArrayList<>();
+
+ /*
+ * If hints file messaging version matches the version of the target host, we'll use the optimised path -
+ * skipping the redundant decoding/encoding cycle of the already encoded hint.
+ *
+ * If that is not the case, we'll need to perform conversion to a newer (or an older) format, and decoding the hint
+ * is an unavoidable intermediate step.
+ */
+ Action action = reader.descriptor().messagingVersion() == messagingVersion
+ ? sendHints(page.buffersIterator(), callbacks, this::sendEncodedHint)
+ : sendHints(page.hintsIterator(), callbacks, this::sendHint);
+
+ if (action == Action.ABORT)
+ return action;
+
+ for (Callback cb : callbacks)
+ if (cb.await() != Callback.Outcome.SUCCESS)
+ return Action.RETRY;
+
+ return Action.CONTINUE;
+ }
+
+ /*
+ * Sending hints in compatibility mode.
+ */
+ private <T> Action sendHints(Iterator<T> hints, Collection<Callback> callbacks, Function<T, Callback> sendFunction)
+ {
+ while (hints.hasNext())
+ {
+ if (!isHostAlive() || isPaused())
+ return Action.ABORT;
+ callbacks.add(sendFunction.apply(hints.next()));
+ }
+ return Action.CONTINUE;
+ }
+
+ private Callback sendHint(Hint hint)
+ {
+ Callback callback = new Callback();
+ HintMessage message = new HintMessage(hostId, hint);
+ MessagingService.instance().sendRRWithFailure(message.createMessageOut(), address, callback);
+ return callback;
+ }
+
+ /*
+ * Sending hints in raw mode.
+ */
+
+ private Callback sendEncodedHint(ByteBuffer hint)
+ {
+ Callback callback = new Callback();
+ EncodedHintMessage message = new EncodedHintMessage(hostId, hint, messagingVersion);
+ MessagingService.instance().sendRRWithFailure(message.createMessageOut(), address, callback);
+ return callback;
+ }
+
+ private static final class Callback implements IAsyncCallbackWithFailure
+ {
+ enum Outcome { SUCCESS, TIMEOUT, FAILURE }
+
+ private final long start = System.nanoTime();
+ private final SimpleCondition condition = new SimpleCondition();
+ private volatile Outcome outcome;
+
+ Outcome await()
+ {
+ long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getTimeout(MessagingService.Verb.HINT)) - (System.nanoTime() - start);
+ boolean timedOut;
+
+ try
+ {
+ timedOut = !condition.await(timeout, TimeUnit.NANOSECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+
+ return timedOut ? Outcome.TIMEOUT : outcome;
+ }
+
+ public void onFailure(InetAddress from)
+ {
+ outcome = Outcome.FAILURE;
+ condition.signalAll();
+ }
+
+ public void response(MessageIn msg)
+ {
+ outcome = Outcome.SUCCESS;
+ condition.signalAll();
+ }
+
+ public boolean isLatencyForSnitch()
+ {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/hints/HintsReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsReader.java b/src/java/org/apache/cassandra/hints/HintsReader.java
new file mode 100644
index 0000000..7d164b4
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/HintsReader.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+
+import javax.annotation.Nullable;
+
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.UnknownColumnFamilyException;
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.utils.AbstractIterator;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.CLibrary;
+
+/**
+ * A paged non-compressed hints reader that provides two iterators:
+ * - a 'raw' ByteBuffer iterator that doesn't deserialize the hints, but returns the pre-encoded hints verbatim
+ * - a decoded iterator, that deserializes the underlying bytes into {@link Hint} instances.
+ *
+ * The former is an optimisation for when the messaging version of the file matches the messaging version of the destination
+ * node. Extra decoding and reencoding is a waste of effort in this scenario, so we avoid it.
+ *
+ * The latter is required for dispatch of hints to nodes that have a different messaging version, and in general is just an
+ * easy way to enable backward and future compatibilty.
+ */
+final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
+{
+ private static final Logger logger = LoggerFactory.getLogger(HintsReader.class);
+
+ // don't read more than 512 KB of hints at a time.
+ private static final int PAGE_SIZE = 512 << 10;
+
+ private final HintsDescriptor descriptor;
+ private final File file;
+ private final RandomAccessReader reader;
+ private final ChecksummedDataInput crcInput;
+
+ // we pass the RateLimiter into HintsReader itself because it's cheaper to calculate the size before the hint is deserialized
+ @Nullable
+ private final RateLimiter rateLimiter;
+
+ private HintsReader(HintsDescriptor descriptor, File file, RandomAccessReader reader, RateLimiter rateLimiter)
+ {
+ this.descriptor = descriptor;
+ this.file = file;
+ this.reader = reader;
+ this.crcInput = ChecksummedDataInput.wrap(reader);
+ this.rateLimiter = rateLimiter;
+ }
+
+ static HintsReader open(File file, RateLimiter rateLimiter)
+ {
+ RandomAccessReader reader = RandomAccessReader.open(file);
+ try
+ {
+ HintsDescriptor descriptor = HintsDescriptor.deserialize(reader);
+ return new HintsReader(descriptor, file, reader, rateLimiter);
+ }
+ catch (IOException e)
+ {
+ reader.close();
+ throw new FSReadError(e, file);
+ }
+ }
+
+ static HintsReader open(File file)
+ {
+ return open(file, null);
+ }
+
+ public void close()
+ {
+ FileUtils.closeQuietly(reader);
+ }
+
+ public HintsDescriptor descriptor()
+ {
+ return descriptor;
+ }
+
+ void seek(long newPosition)
+ {
+ reader.seek(newPosition);
+ }
+
+ public Iterator<Page> iterator()
+ {
+ return new PagesIterator();
+ }
+
+ final class Page
+ {
+ public final long offset;
+
+ private Page(long offset)
+ {
+ this.offset = offset;
+ }
+
+ Iterator<Hint> hintsIterator()
+ {
+ return new HintsIterator(offset);
+ }
+
+ Iterator<ByteBuffer> buffersIterator()
+ {
+ return new BuffersIterator(offset);
+ }
+ }
+
+ final class PagesIterator extends AbstractIterator<Page>
+ {
+ @SuppressWarnings("resource")
+ protected Page computeNext()
+ {
+ CLibrary.trySkipCache(reader.getChannel().getFileDescriptor(), 0, reader.getFilePointer(), reader.getPath());
+
+ if (reader.length() == reader.getFilePointer())
+ return endOfData();
+
+ return new Page(reader.getFilePointer());
+ }
+ }
+
+ /**
+ * A decoding iterator that deserializes the hints as it goes.
+ */
+ final class HintsIterator extends AbstractIterator<Hint>
+ {
+ private final long offset;
+
+ HintsIterator(long offset)
+ {
+ super();
+ this.offset = offset;
+ }
+
+ protected Hint computeNext()
+ {
+ Hint hint;
+
+ do
+ {
+ long position = reader.getFilePointer();
+
+ if (reader.length() == position)
+ return endOfData(); // reached EOF
+
+ if (position - offset >= PAGE_SIZE)
+ return endOfData(); // read page size or more bytes
+
+ try
+ {
+ hint = computeNextInternal();
+ }
+ catch (IOException e)
+ {
+ throw new FSReadError(e, file);
+ }
+ }
+ while (hint == null);
+
+ return hint;
+ }
+
+ private Hint computeNextInternal() throws IOException
+ {
+ crcInput.resetCrc();
+ crcInput.resetLimit();
+
+ int size = crcInput.readInt();
+
+ // if we cannot corroborate the size via crc, then we cannot safely skip this hint
+ if (reader.readInt() != crcInput.getCrc())
+ throw new IOException("Digest mismatch exception");
+
+ return readHint(size);
+ }
+
+ private Hint readHint(int size) throws IOException
+ {
+ if (rateLimiter != null)
+ rateLimiter.acquire(size);
+ crcInput.limit(size);
+
+ Hint hint;
+ try
+ {
+ hint = Hint.serializer.deserialize(crcInput, descriptor.messagingVersion());
+ }
+ catch (UnknownColumnFamilyException e)
+ {
+ logger.warn("Failed to read a hint for {} - table with id {} is unknown in file {}",
+ descriptor.hostId,
+ e.cfId,
+ descriptor.fileName());
+ reader.skipBytes(crcInput.bytesRemaining());
+
+ return null;
+ }
+
+ if (reader.readInt() == crcInput.getCrc())
+ return hint;
+
+ // log a warning and skip the corrupted entry
+ logger.warn("Failed to read a hint for {} - digest mismatch for hint at position {} in file {}",
+ descriptor.hostId,
+ crcInput.getPosition() - size - 4,
+ descriptor.fileName());
+ return null;
+ }
+ }
+
+ /**
+ * A verbatim iterator that simply returns the underlying ByteBuffers.
+ */
+ final class BuffersIterator extends AbstractIterator<ByteBuffer>
+ {
+ private final long offset;
+
+ BuffersIterator(long offset)
+ {
+ super();
+ this.offset = offset;
+ }
+
+ protected ByteBuffer computeNext()
+ {
+ ByteBuffer buffer;
+
+ do
+ {
+ long position = reader.getFilePointer();
+
+ if (reader.length() == position)
+ return endOfData(); // reached EOF
+
+ if (position - offset >= PAGE_SIZE)
+ return endOfData(); // read page size or more bytes
+
+ try
+ {
+ buffer = computeNextInternal();
+ }
+ catch (IOException e)
+ {
+ throw new FSReadError(e, file);
+ }
+ }
+ while (buffer == null);
+
+ return buffer;
+ }
+
+ private ByteBuffer computeNextInternal() throws IOException
+ {
+ crcInput.resetCrc();
+ crcInput.resetLimit();
+
+ int size = crcInput.readInt();
+
+ // if we cannot corroborate the size via crc, then we cannot safely skip this hint
+ if (reader.readInt() != crcInput.getCrc())
+ throw new IOException("Digest mismatch exception");
+
+ return readBuffer(size);
+ }
+
+ private ByteBuffer readBuffer(int size) throws IOException
+ {
+ if (rateLimiter != null)
+ rateLimiter.acquire(size);
+ crcInput.limit(size);
+
+ ByteBuffer buffer = ByteBufferUtil.read(crcInput, size);
+ if (reader.readInt() == crcInput.getCrc())
+ return buffer;
+
+ // log a warning and skip the corrupted entry
+ logger.warn("Failed to read a hint for {} - digest mismatch for hint at position {} in file {}",
+ descriptor.hostId,
+ crcInput.getPosition() - size - 4,
+ descriptor.fileName());
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/hints/HintsService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsService.java b/src/java/org/apache/cassandra/hints/HintsService.java
new file mode 100644
index 0000000..3f30c1d
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/HintsService.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.io.File;
+import java.lang.management.ManagementFactory;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.metrics.HintedHandoffMetrics;
+import org.apache.cassandra.metrics.StorageMetrics;
+import org.apache.cassandra.service.StorageService;
+
+import static com.google.common.collect.Iterables.transform;
+import static com.google.common.collect.Iterables.size;
+
+/**
+ * A singleton-ish wrapper over various hints components:
+ * - a catalog of all hints stores
+ * - a single-threaded write executor
+ * - a multi-threaded dispatch executor
+ * - the buffer pool for writing hints into
+ *
+ * The front-end for everything hints related.
+ */
+public final class HintsService implements HintsServiceMBean
+{
+ private static final Logger logger = LoggerFactory.getLogger(HintsService.class);
+
+ public static final HintsService instance = new HintsService();
+
+ private static final String MBEAN_NAME = "org.apache.cassandra.hints:type=HintsService";
+
+ private static final int MIN_BUFFER_SIZE = 32 << 20;
+
+ private final HintsCatalog catalog;
+ private final HintsWriteExecutor writeExecutor;
+ private final HintsBufferPool bufferPool;
+ private final HintsDispatchExecutor dispatchExecutor;
+ private final AtomicBoolean isDispatchPaused;
+
+ private volatile boolean isShutDown = false;
+
+ private final ScheduledFuture triggerFlushingFuture;
+ private volatile ScheduledFuture triggerDispatchFuture;
+
+ public final HintedHandoffMetrics metrics;
+
+ private HintsService()
+ {
+ File hintsDirectory = DatabaseDescriptor.getHintsDirectory();
+ int maxDeliveryThreads = DatabaseDescriptor.getMaxHintsDeliveryThreads();
+
+ catalog = HintsCatalog.load(hintsDirectory);
+ writeExecutor = new HintsWriteExecutor(catalog);
+
+ int bufferSize = Math.max(DatabaseDescriptor.getMaxMutationSize() * 2, MIN_BUFFER_SIZE);
+ bufferPool = new HintsBufferPool(bufferSize, writeExecutor::flushBuffer);
+
+ isDispatchPaused = new AtomicBoolean(true);
+ dispatchExecutor = new HintsDispatchExecutor(hintsDirectory, maxDeliveryThreads, isDispatchPaused);
+
+ // periodically empty the current content of the buffers
+ int flushPeriod = DatabaseDescriptor.getHintsFlushPeriodInMS();
+ triggerFlushingFuture = ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(() -> writeExecutor.flushBufferPool(bufferPool),
+ flushPeriod,
+ flushPeriod,
+ TimeUnit.MILLISECONDS);
+ metrics = new HintedHandoffMetrics();
+ }
+
+ public void registerMBean()
+ {
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ try
+ {
+ mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Write a hint for a iterable of nodes.
+ *
+ * @param hostIds host ids of the hint's target nodes
+ * @param hint the hint to store
+ */
+ public void write(Iterable<UUID> hostIds, Hint hint)
+ {
+ if (isShutDown)
+ throw new IllegalStateException("HintsService is shut down and can't accept new hints");
+
+ // we have to make sure that the HintsStore instances get properly initialized - otherwise dispatch will not trigger
+ catalog.maybeLoadStores(hostIds);
+
+ if (hint.isLive())
+ bufferPool.write(hostIds, hint);
+
+ StorageMetrics.totalHints.inc(size(hostIds));
+ }
+
+ /**
+ * Write a hint for a single node.
+ *
+ * @param hostId host id of the hint's target node
+ * @param hint the hint to store
+ */
+ public void write(UUID hostId, Hint hint)
+ {
+ write(Collections.singleton(hostId), hint);
+ }
+
+ /**
+ * Flush the buffer pool for the selected target nodes, then fsync their writers.
+ *
+ * @param hostIds host ids of the nodes to flush and fsync hints for
+ */
+ public void flushAndFsyncBlockingly(Iterable<UUID> hostIds)
+ {
+ Iterable<HintsStore> stores = transform(hostIds, catalog::get);
+ writeExecutor.flushBufferPool(bufferPool, stores);
+ writeExecutor.fsyncWritersBlockingly(stores);
+ }
+
+ public synchronized void startDispatch()
+ {
+ if (isShutDown)
+ throw new IllegalStateException("HintsService is shut down and cannot be restarted");
+
+ isDispatchPaused.set(false);
+
+ HintsDispatchTrigger trigger = new HintsDispatchTrigger(catalog, writeExecutor, dispatchExecutor, isDispatchPaused);
+ // triggering hint dispatch is now very cheap, so we can do it more often - every 10 seconds vs. every 10 minutes,
+ // previously; this reduces mean time to delivery, and positively affects batchlog delivery latencies, too
+ triggerDispatchFuture = ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(trigger, 10, 10, TimeUnit.SECONDS);
+ }
+
+ public void pauseDispatch()
+ {
+ logger.info("Paused hints dispatch");
+ isDispatchPaused.set(true);
+ }
+
+ public void resumeDispatch()
+ {
+ logger.info("Resumed hints dispatch");
+ isDispatchPaused.set(false);
+ }
+
+ /**
+ * Gracefully and blockingly shut down the service.
+ *
+ * Will abort dispatch sessions that are currently in progress (which is okay, it's idempotent),
+ * and make sure the buffers are flushed, hints files written and fsynced.
+ */
+ public synchronized void shutdownBlocking()
+ {
+ if (isShutDown)
+ throw new IllegalStateException("HintsService has already been shut down");
+ isShutDown = true;
+
+ if (triggerDispatchFuture != null)
+ triggerDispatchFuture.cancel(false);
+ pauseDispatch();
+
+ triggerFlushingFuture.cancel(false);
+
+ writeExecutor.flushBufferPool(bufferPool);
+ writeExecutor.closeAllWriters();
+
+ dispatchExecutor.shutdownBlocking();
+ writeExecutor.shutdownBlocking();
+ }
+
+ public void decommission()
+ {
+ resumeDispatch();
+ }
+
+ /**
+ * Deletes all hints for all destinations. Doesn't make snapshots - should be used with care.
+ */
+ public void deleteAllHints()
+ {
+ catalog.deleteAllHints();
+ }
+
+ /**
+ * Deletes all hints for the provided destination. Doesn't make snapshots - should be used with care.
+ *
+ * @param address inet address of the target node - encoded as a string for easier JMX consumption
+ */
+ public void deleteAllHintsForEndpoint(String address)
+ {
+ InetAddress target;
+ try
+ {
+ target = InetAddress.getByName(address);
+ }
+ catch (UnknownHostException e)
+ {
+ throw new IllegalArgumentException(e);
+ }
+ deleteAllHintsForEndpoint(target);
+ }
+
+ /**
+ * Deletes all hints for the provided destination. Doesn't make snapshots - should be used with care.
+ *
+ * @param target inet address of the target node
+ */
+ public void deleteAllHintsForEndpoint(InetAddress target)
+ {
+ UUID hostId = StorageService.instance.getHostIdForEndpoint(target);
+ if (hostId == null)
+ throw new IllegalArgumentException("Can't delete hints for unknown address " + target);
+ catalog.deleteAllHints(hostId);
+ }
+
+ /**
+ * Cleans up hints-related state after a node with id = hostId left.
+ *
+ * Dispatcher should stop itself (isHostAlive() will start returning false for the leaving host), but we'll wait for
+ * completion anyway.
+ *
+ * We should also flush the buffer is there are any thints for the node there, and close the writer (if any),
+ * so that we don't leave any hint files lying around.
+ *
+ * Once that is done, we can simply delete all hint files and remove the host id from the catalog.
+ *
+ * The worst that can happen if we don't get everything right is a hints file (or two) remaining undeleted.
+ *
+ * @param hostId id of the node being excised
+ */
+ public void excise(UUID hostId)
+ {
+ HintsStore store = catalog.get(hostId);
+ if (store == null)
+ return;
+
+ // flush the buffer and then close the writer for the excised host id, to make sure that no new files will appear
+ // for this host id after we are done
+ Future flushFuture = writeExecutor.flushBufferPool(bufferPool, Collections.singleton(store));
+ Future closeFuture = writeExecutor.closeWriter(store);
+ try
+ {
+ flushFuture.get();
+ closeFuture.get();
+ }
+ catch (InterruptedException | ExecutionException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ // wait for the current dispatch session to end (if any), so that the currently dispatched file gets removed
+ dispatchExecutor.completeDispatchBlockingly(store);
+
+ // delete all the hints files and remove the HintsStore instance from the map in the catalog
+ catalog.exciseStore(hostId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/hints/HintsServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsServiceMBean.java b/src/java/org/apache/cassandra/hints/HintsServiceMBean.java
new file mode 100644
index 0000000..fe0abcc
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/HintsServiceMBean.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+public interface HintsServiceMBean
+{
+ /**
+ * Pause dispatch of all hints. Does not affect the creation of hints.
+ */
+ void pauseDispatch();
+
+ /**
+ * Resume dispatch of all hints. Does not affect the creation of hints.
+ */
+ void resumeDispatch();
+
+ /**
+ * Irrevocably deletes all the stored hints files (with the exception of those that are being dispatched right now,
+ * or being written to).
+ */
+ void deleteAllHints();
+
+ /**
+ * Irrevocably deletes all the stored hints files for the target address (with the exception of those that are
+ * being dispatched right now, or being written to).
+ */
+ void deleteAllHintsForEndpoint(String address);
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/hints/HintsStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsStore.java b/src/java/org/apache/cassandra/hints/HintsStore.java
new file mode 100644
index 0000000..e19de99
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/HintsStore.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.SyncUtil;
+
+/**
+ * Encapsulates the state of a peer's hints: the queue of hints files for dispatch, and the current writer (if any).
+ *
+ * The queue for dispatch is multi-threading safe.
+ *
+ * The writer MUST only be accessed by {@link HintsWriteExecutor}.
+ */
+final class HintsStore
+{
+ private static final Logger logger = LoggerFactory.getLogger(HintsStore.class);
+
+ public final UUID hostId;
+ private final File hintsDirectory;
+
+ private final Map<HintsDescriptor, Long> dispatchOffsets;
+ private final Deque<HintsDescriptor> dispatchDequeue;
+ private final Queue<HintsDescriptor> blacklistedFiles;
+
+ // last timestamp used in a descriptor; make sure to not reuse the same timestamp for new descriptors.
+ private volatile long lastUsedTimestamp;
+ private volatile HintsWriter hintsWriter;
+
+ private HintsStore(UUID hostId, File hintsDirectory, List<HintsDescriptor> descriptors)
+ {
+ this.hostId = hostId;
+ this.hintsDirectory = hintsDirectory;
+
+ dispatchOffsets = new ConcurrentHashMap<>();
+ dispatchDequeue = new ConcurrentLinkedDeque<>(descriptors);
+ blacklistedFiles = new ConcurrentLinkedQueue<>();
+
+ //noinspection resource
+ lastUsedTimestamp = descriptors.stream().mapToLong(d -> d.timestamp).max().orElse(0L);
+ }
+
+ static HintsStore create(UUID hostId, File hintsDirectory, List<HintsDescriptor> descriptors)
+ {
+ descriptors.sort((d1, d2) -> Long.compare(d1.timestamp, d2.timestamp));
+ return new HintsStore(hostId, hintsDirectory, descriptors);
+ }
+
+ InetAddress address()
+ {
+ return StorageService.instance.getEndpointForHostId(hostId);
+ }
+
+ boolean isLive()
+ {
+ InetAddress address = address();
+ return address != null && FailureDetector.instance.isAlive(address);
+ }
+
+ HintsDescriptor poll()
+ {
+ return dispatchDequeue.poll();
+ }
+
+ void offerFirst(HintsDescriptor descriptor)
+ {
+ dispatchDequeue.offerFirst(descriptor);
+ }
+
+ void offerLast(HintsDescriptor descriptor)
+ {
+ dispatchDequeue.offerLast(descriptor);
+ }
+
+ void deleteAllHints()
+ {
+ HintsDescriptor descriptor;
+ while ((descriptor = poll()) != null)
+ {
+ cleanUp(descriptor);
+ delete(descriptor);
+ }
+
+ while ((descriptor = blacklistedFiles.poll()) != null)
+ {
+ cleanUp(descriptor);
+ delete(descriptor);
+ }
+ }
+
+ private void delete(HintsDescriptor descriptor)
+ {
+ File hintsFile = new File(hintsDirectory, descriptor.fileName());
+ if (hintsFile.delete())
+ logger.info("Deleted hint file {}", descriptor.fileName());
+ else
+ logger.error("Failed to delete hint file {}", descriptor.fileName());
+
+ //noinspection ResultOfMethodCallIgnored
+ new File(hintsDirectory, descriptor.checksumFileName()).delete();
+ }
+
+ boolean hasFiles()
+ {
+ return !dispatchDequeue.isEmpty();
+ }
+
+ Optional<Long> getDispatchOffset(HintsDescriptor descriptor)
+ {
+ return Optional.ofNullable(dispatchOffsets.get(descriptor));
+ }
+
+ void markDispatchOffset(HintsDescriptor descriptor, long mark)
+ {
+ dispatchOffsets.put(descriptor, mark);
+ }
+
+ void cleanUp(HintsDescriptor descriptor)
+ {
+ dispatchOffsets.remove(descriptor);
+ }
+
+ void blacklist(HintsDescriptor descriptor)
+ {
+ blacklistedFiles.add(descriptor);
+ }
+
+ /*
+ * Methods dealing with HintsWriter.
+ *
+ * All of these, with the exception of isWriting(), are for exclusively single-threaded use by HintsWriteExecutor.
+ */
+
+ boolean isWriting()
+ {
+ return hintsWriter != null;
+ }
+
+ HintsWriter getOrOpenWriter()
+ {
+ if (hintsWriter == null)
+ hintsWriter = openWriter();
+ return hintsWriter;
+ }
+
+ HintsWriter getWriter()
+ {
+ return hintsWriter;
+ }
+
+ private HintsWriter openWriter()
+ {
+ lastUsedTimestamp = Math.max(System.currentTimeMillis(), lastUsedTimestamp + 1);
+ HintsDescriptor descriptor = new HintsDescriptor(hostId, lastUsedTimestamp);
+
+ try
+ {
+ return HintsWriter.create(hintsDirectory, descriptor);
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, descriptor.fileName());
+ }
+ }
+
+ void closeWriter()
+ {
+ if (hintsWriter != null)
+ {
+ hintsWriter.close();
+ offerLast(hintsWriter.descriptor());
+ hintsWriter = null;
+ SyncUtil.trySyncDir(hintsDirectory);
+ }
+ }
+
+ void fsyncWriter()
+ {
+ if (hintsWriter != null)
+ hintsWriter.fsync();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java b/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java
new file mode 100644
index 0000000..be52f92
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.concurrent.*;
+
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.FSWriteError;
+
+/**
+ * A single threaded executor that exclusively writes all the hints and otherwise manipulate the writers.
+ *
+ * Flushing demultiplexes the provided {@link HintsBuffer} and sequentially writes to each {@link HintsWriter},
+ * using the same shared write buffer. In the near future, when CASSANDRA-9428 (compression) is implemented,
+ * will also share a compression buffer.
+ */
+final class HintsWriteExecutor
+{
+ private static final int WRITE_BUFFER_SIZE = 256 << 10;
+
+ private final HintsCatalog catalog;
+ private final ByteBuffer writeBuffer;
+ private final ExecutorService executor;
+
+ HintsWriteExecutor(HintsCatalog catalog)
+ {
+ this.catalog = catalog;
+
+ writeBuffer = ByteBuffer.allocateDirect(WRITE_BUFFER_SIZE);
+ executor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("HintsWriteExecutor", 1);
+ }
+
+ /*
+ * Should be very fast (worst case scenario - write a few 10s of megabytes to disk).
+ */
+ void shutdownBlocking()
+ {
+ executor.shutdown();
+ try
+ {
+ executor.awaitTermination(1, TimeUnit.MINUTES);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ }
+
+ /**
+ * Flush the provided buffer, recycle it and offer it back to the pool.
+ */
+ Future flushBuffer(HintsBuffer buffer, HintsBufferPool bufferPool)
+ {
+ return executor.submit(new FlushBufferTask(buffer, bufferPool));
+ }
+
+ /**
+ * Flush the current buffer, but without clearing/recycling it.
+ */
+ Future flushBufferPool(HintsBufferPool bufferPool)
+ {
+ return executor.submit(new FlushBufferPoolTask(bufferPool));
+ }
+
+ /**
+ * Flush the current buffer just for the specified hints stores. Without clearing/recycling it.
+ */
+ Future flushBufferPool(HintsBufferPool bufferPool, Iterable<HintsStore> stores)
+ {
+ return executor.submit(new PartiallyFlushBufferPoolTask(bufferPool, stores));
+ }
+
+ void fsyncWritersBlockingly(Iterable<HintsStore> stores)
+ {
+ try
+ {
+ executor.submit(new FsyncWritersTask(stores)).get();
+ }
+ catch (InterruptedException | ExecutionException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ Future closeWriter(HintsStore store)
+ {
+ return executor.submit(store::closeWriter);
+ }
+
+ Future closeAllWriters()
+ {
+ return executor.submit(() -> catalog.stores().forEach(HintsStore::closeWriter));
+ }
+
+ private final class FlushBufferTask implements Runnable
+ {
+ private final HintsBuffer buffer;
+ private final HintsBufferPool bufferPool;
+
+ FlushBufferTask(HintsBuffer buffer, HintsBufferPool bufferPool)
+ {
+ this.buffer = buffer;
+ this.bufferPool = bufferPool;
+ }
+
+ public void run()
+ {
+ buffer.waitForModifications();
+
+ try
+ {
+ flush(buffer);
+ }
+ finally
+ {
+ HintsBuffer recycledBuffer = buffer.recycle();
+ if (!bufferPool.offer(recycledBuffer))
+ recycledBuffer.free();
+ }
+ }
+ }
+
+ private final class FlushBufferPoolTask implements Runnable
+ {
+ private final HintsBufferPool bufferPool;
+
+ FlushBufferPoolTask(HintsBufferPool bufferPool)
+ {
+ this.bufferPool = bufferPool;
+ }
+
+ public void run()
+ {
+ HintsBuffer buffer = bufferPool.currentBuffer();
+ buffer.waitForModifications();
+ flush(buffer);
+ }
+ }
+
+ private final class PartiallyFlushBufferPoolTask implements Runnable
+ {
+ private final HintsBufferPool bufferPool;
+ private final Iterable<HintsStore> stores;
+
+ PartiallyFlushBufferPoolTask(HintsBufferPool bufferPool, Iterable<HintsStore> stores)
+ {
+ this.bufferPool = bufferPool;
+ this.stores = stores;
+ }
+
+ public void run()
+ {
+ HintsBuffer buffer = bufferPool.currentBuffer();
+ buffer.waitForModifications();
+ stores.forEach(store -> flush(buffer.consumingHintsIterator(store.hostId), store));
+ }
+ }
+
+ private final class FsyncWritersTask implements Runnable
+ {
+ private final Iterable<HintsStore> stores;
+
+ FsyncWritersTask(Iterable<HintsStore> stores)
+ {
+ this.stores = stores;
+ }
+
+ public void run()
+ {
+ stores.forEach(HintsStore::fsyncWriter);
+ catalog.fsyncDirectory();
+ }
+ }
+
+ private void flush(HintsBuffer buffer)
+ {
+ buffer.hostIds().forEach(hostId -> flush(buffer.consumingHintsIterator(hostId), catalog.get(hostId)));
+ }
+
+ private void flush(Iterator<ByteBuffer> iterator, HintsStore store)
+ {
+ while (true)
+ {
+ flushInternal(iterator, store);
+
+ if (!iterator.hasNext())
+ break;
+
+ // exceeded the size limit for an individual file, but still have more to write
+ // close the current writer and continue flushing to a new one in the next iteration
+ store.closeWriter();
+ }
+ }
+
+ private void flushInternal(Iterator<ByteBuffer> iterator, HintsStore store)
+ {
+ long maxHintsFileSize = DatabaseDescriptor.getMaxHintsFileSize();
+
+ @SuppressWarnings("resource")
+ HintsWriter writer = store.getOrOpenWriter();
+
+ try (HintsWriter.Session session = writer.newSession(writeBuffer))
+ {
+ while (iterator.hasNext())
+ {
+ session.append(iterator.next());
+ if (session.position() >= maxHintsFileSize)
+ break;
+ }
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, writer.descriptor().fileName());
+ }
+ }
+}