You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/08/19 16:27:14 UTC

[3/4] cassandra git commit: Rewrite hinted handoff

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/hints/HintVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintVerbHandler.java b/src/java/org/apache/cassandra/hints/HintVerbHandler.java
new file mode 100644
index 0000000..458d01f
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/HintVerbHandler.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.net.InetAddress;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.service.StorageService;
+
+/**
+ * Verb handler used both for hint dispatch and streaming.
+ *
+ * With the non-sstable format, we cannot just stream hint sstables on node decommission. So sometimes, at decommission
+ * time, we might have to stream hints to a non-owning host (say, if the owning host B is down during decommission of host A).
+ * In that case the handler just stores the received hint in its local hint store.
+ */
+public final class HintVerbHandler implements IVerbHandler<HintMessage>
+{
+    private static final Logger logger = LoggerFactory.getLogger(HintVerbHandler.class);
+
+    public void doVerb(MessageIn<HintMessage> message, int id)
+    {
+        UUID hostId = message.payload.hostId;
+        Hint hint = message.payload.hint;
+
+        // If we see an unknown table id, it means the table, or one of the tables in the mutation, had been dropped.
+        // In that case there is nothing we can really do, or should do, other than log it go on.
+        // This will *not* happen due to a not-yet-seen table, because we don't transfer hints unless there
+        // is schema agreement between the sender and the receiver.
+        if (hint == null)
+        {
+            logger.debug("Failed to decode and apply a hint for {} - table with id {} is unknown",
+                         hostId,
+                         message.payload.unknownTableID);
+            reply(id, message.from);
+            return;
+        }
+
+        // We must perform validation before applying the hint, and there is no other place to do it other than here.
+        try
+        {
+            hint.mutation.getPartitionUpdates().forEach(PartitionUpdate::validate);
+        }
+        catch (MarshalException e)
+        {
+            logger.warn("Failed to validate a hint for {} (table id {}) - skipped", hostId);
+            reply(id, message.from);
+            return;
+        }
+
+        // Apply the hint if this node is the destination, store for future dispatch if this node isn't (must have gotten
+        // it from a decommissioned node that had streamed it before going out).
+        if (hostId.equals(StorageService.instance.getLocalHostUUID()))
+            hint.apply();
+        else
+            HintsService.instance.write(hostId, hint);
+
+        reply(id, message.from);
+    }
+
+    private static void reply(int id, InetAddress to)
+    {
+        MessagingService.instance().sendReply(HintResponse.message, id, to);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/hints/HintsBuffer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsBuffer.java b/src/java/org/apache/cassandra/hints/HintsBuffer.java
new file mode 100644
index 0000000..097abce
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/HintsBuffer.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.zip.CRC32;
+
+import org.apache.cassandra.io.util.DataOutputBufferFixed;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.AbstractIterator;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+
+import static org.apache.cassandra.utils.FBUtilities.updateChecksum;
+import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
+
+/**
+ * A shared buffer that temporarily holds the serialized hints before they are flushed to disk.
+ *
+ * Consists of :
+ * - a ByteBuffer holding the serialized hints (length, length checksum and total checksum included)
+ * - a pointer to the current allocation offset
+ * - an {@link OpOrder} appendOrder for {@link HintsWriteExecutor} to wait on for all writes completion
+ * - a map of (host id -> offset queue) for the hints written
+ *
+ * It's possible to write a single hint for two or more hosts at the same time, in which case the same offset will be put
+ * into two or more offset queues.
+ */
+final class HintsBuffer
+{
+    // hint entry overhead in bytes (int length, int length checksum, int body checksum)
+    static final int ENTRY_OVERHEAD_SIZE = 12;
+    static final int CLOSED = -1;
+
+    private final ByteBuffer slab; // the underlying backing ByteBuffer for all the serialized hints
+    private final AtomicInteger position; // the position in the slab that we currently allocate from
+
+    private final ConcurrentMap<UUID, Queue<Integer>> offsets;
+    private final OpOrder appendOrder;
+
+    private HintsBuffer(ByteBuffer slab)
+    {
+        this.slab = slab;
+
+        position = new AtomicInteger();
+        offsets = new ConcurrentHashMap<>();
+        appendOrder = new OpOrder();
+    }
+
+    static HintsBuffer create(int slabSize)
+    {
+        return new HintsBuffer(ByteBuffer.allocateDirect(slabSize));
+    }
+
+    boolean isClosed()
+    {
+        return position.get() == CLOSED;
+    }
+
+    int capacity()
+    {
+        return slab.capacity();
+    }
+
+    int remaining()
+    {
+        int pos = position.get();
+        return pos == CLOSED ? 0 : capacity() - pos;
+    }
+
+    HintsBuffer recycle()
+    {
+        slab.clear();
+        return new HintsBuffer(slab);
+    }
+
+    void free()
+    {
+        FileUtils.clean(slab);
+    }
+
+    /**
+     * Wait for any appends started before this method was called.
+     */
+    void waitForModifications()
+    {
+        appendOrder.awaitNewBarrier(); // issue a barrier and wait for it
+    }
+
+    Set<UUID> hostIds()
+    {
+        return offsets.keySet();
+    }
+
+    /**
+     * Coverts the queue of offsets for the selected host id into an iterator of hints encoded as ByteBuffers.
+     */
+    Iterator<ByteBuffer> consumingHintsIterator(UUID hostId)
+    {
+        final Queue<Integer> bufferOffsets = offsets.get(hostId);
+
+        if (bufferOffsets == null)
+            return Collections.emptyIterator();
+
+        return new AbstractIterator<ByteBuffer>()
+        {
+            private final ByteBuffer flyweight = slab.duplicate();
+
+            protected ByteBuffer computeNext()
+            {
+                Integer offset = bufferOffsets.poll();
+
+                if (offset == null)
+                    return endOfData();
+
+                int totalSize = slab.getInt(offset) + ENTRY_OVERHEAD_SIZE;
+
+                return (ByteBuffer) flyweight.clear().position(offset).limit(offset + totalSize);
+            }
+        };
+    }
+
+    Allocation allocate(int hintSize)
+    {
+        int totalSize = hintSize + ENTRY_OVERHEAD_SIZE;
+
+        if (totalSize > slab.capacity() / 2)
+        {
+            throw new IllegalArgumentException(String.format("Hint of %s bytes is too large - the maximum size is %s",
+                                                             hintSize,
+                                                             slab.capacity() / 2));
+        }
+
+        @SuppressWarnings("resource")
+        OpOrder.Group opGroup = appendOrder.start(); // will eventually be closed by the receiver of the allocation
+        try
+        {
+            return allocate(totalSize, opGroup);
+        }
+        catch (Throwable t)
+        {
+            opGroup.close();
+            throw t;
+        }
+    }
+
+    private Allocation allocate(int totalSize, OpOrder.Group opGroup)
+    {
+        int offset = allocateBytes(totalSize);
+        if (offset < 0)
+        {
+            opGroup.close();
+            return null;
+        }
+        return new Allocation(offset, totalSize, opGroup);
+    }
+
+    private int allocateBytes(int totalSize)
+    {
+        while (true)
+        {
+            int prev = position.get();
+            int next = prev + totalSize;
+
+            if (prev == CLOSED) // the slab has been 'closed'
+                return CLOSED;
+
+            if (next > slab.capacity())
+            {
+                position.set(CLOSED); // mark the slab as no longer allocating if we've exceeded its capacity
+                return CLOSED;
+            }
+
+            if (position.compareAndSet(prev, next))
+                return prev;
+        }
+    }
+
+    private void put(UUID hostId, int offset)
+    {
+        // we intentionally don't just return offsets.computeIfAbsent() because it's expensive compared to simple get(),
+        // and the method is on a really hot path
+        Queue<Integer> queue = offsets.get(hostId);
+        if (queue == null)
+            queue = offsets.computeIfAbsent(hostId, (id) -> new ConcurrentLinkedQueue<>());
+        queue.offer(offset);
+    }
+
+    /**
+     * A placeholder for hint serialization. Should always be used in a try-with-resources block.
+     */
+    final class Allocation implements AutoCloseable
+    {
+        private final Integer offset;
+        private final int totalSize;
+        private final OpOrder.Group opGroup;
+
+        Allocation(int offset, int totalSize, OpOrder.Group opGroup)
+        {
+            this.offset = offset;
+            this.totalSize = totalSize;
+            this.opGroup = opGroup;
+        }
+
+        void write(Iterable<UUID> hostIds, Hint hint)
+        {
+            write(hint);
+            for (UUID hostId : hostIds)
+                put(hostId, offset);
+        }
+
+        public void close()
+        {
+            opGroup.close();
+        }
+
+        private void write(Hint hint)
+        {
+            ByteBuffer buffer = (ByteBuffer) slab.duplicate().position(offset).limit(offset + totalSize);
+            DataOutputPlus dop = new DataOutputBufferFixed(buffer);
+            CRC32 crc = new CRC32();
+            int hintSize = totalSize - ENTRY_OVERHEAD_SIZE;
+            try
+            {
+                dop.writeInt(hintSize);
+                updateChecksumInt(crc, hintSize);
+                dop.writeInt((int) crc.getValue());
+
+                Hint.serializer.serialize(hint, dop, MessagingService.current_version);
+                updateChecksum(crc, buffer, buffer.position() - hintSize, hintSize);
+                dop.writeInt((int) crc.getValue());
+            }
+            catch (IOException e)
+            {
+                throw new AssertionError(); // cannot happen
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/hints/HintsBufferPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsBufferPool.java b/src/java/org/apache/cassandra/hints/HintsBufferPool.java
new file mode 100644
index 0000000..83b155a
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/HintsBufferPool.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.cassandra.net.MessagingService;
+
+/**
+ * A primitive pool of {@link HintsBuffer} buffers. Under normal conditions should only hold two buffers - the currently
+ * written to one, and a reserve buffer to switch to when the first one is beyond capacity.
+ */
+final class HintsBufferPool
+{
+    interface FlushCallback
+    {
+        void flush(HintsBuffer buffer, HintsBufferPool pool);
+    }
+
+    private volatile HintsBuffer currentBuffer;
+    private final Queue<HintsBuffer> reserveBuffers;
+    private final int bufferSize;
+    private final FlushCallback flushCallback;
+
+    HintsBufferPool(int bufferSize, FlushCallback flushCallback)
+    {
+        reserveBuffers = new ConcurrentLinkedQueue<>();
+
+        this.bufferSize = bufferSize;
+        this.flushCallback = flushCallback;
+    }
+
+    /**
+     * @param hostIds host ids of the hint's target nodes
+     * @param hint the hint to store
+     */
+    void write(Iterable<UUID> hostIds, Hint hint)
+    {
+        int hintSize = (int) Hint.serializer.serializedSize(hint, MessagingService.current_version);
+        try (HintsBuffer.Allocation allocation = allocate(hintSize))
+        {
+            allocation.write(hostIds, hint);
+        }
+    }
+
+    private HintsBuffer.Allocation allocate(int hintSize)
+    {
+        HintsBuffer current = currentBuffer();
+
+        while (true)
+        {
+            HintsBuffer.Allocation allocation = current.allocate(hintSize);
+            if (allocation != null)
+                return allocation;
+
+            // allocation failed due to insufficient size remaining in the buffer
+            if (switchCurrentBuffer(current))
+                flushCallback.flush(current, this);
+
+            current = currentBuffer;
+        }
+    }
+
+    boolean offer(HintsBuffer buffer)
+    {
+        if (!reserveBuffers.isEmpty())
+            return false;
+
+        reserveBuffers.offer(buffer);
+        return true;
+    }
+
+    // A wrapper to ensure a non-null currentBuffer value on the first call.
+    HintsBuffer currentBuffer()
+    {
+        if (currentBuffer == null)
+            initializeCurrentBuffer();
+
+        return currentBuffer;
+    }
+
+    private synchronized void initializeCurrentBuffer()
+    {
+        if (currentBuffer == null)
+            currentBuffer = createBuffer();
+    }
+
+    private synchronized boolean switchCurrentBuffer(HintsBuffer previous)
+    {
+        if (currentBuffer != previous)
+            return false;
+
+        HintsBuffer buffer = reserveBuffers.poll();
+        currentBuffer = buffer == null ? createBuffer() : buffer;
+
+        return true;
+    }
+
+    private HintsBuffer createBuffer()
+    {
+        return HintsBuffer.create(bufferSize);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/hints/HintsCatalog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsCatalog.java b/src/java/org/apache/cassandra/hints/HintsCatalog.java
new file mode 100644
index 0000000..13404ee
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/HintsCatalog.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Stream;
+
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.utils.CLibrary;
+import org.apache.cassandra.utils.SyncUtil;
+
+import static java.util.stream.Collectors.groupingBy;
+
+/**
+ * A simple catalog for easy host id -> {@link HintsStore} lookup and manipulation.
+ */
+final class HintsCatalog
+{
+    private final File hintsDirectory;
+    private final Map<UUID, HintsStore> stores;
+
+    private HintsCatalog(File hintsDirectory, Map<UUID, List<HintsDescriptor>> descriptors)
+    {
+        this.hintsDirectory = hintsDirectory;
+        this.stores = new ConcurrentHashMap<>();
+
+        for (Map.Entry<UUID, List<HintsDescriptor>> entry : descriptors.entrySet())
+            stores.put(entry.getKey(), HintsStore.create(entry.getKey(), hintsDirectory, entry.getValue()));
+    }
+
+    /**
+     * Loads hints stores from a given directory.
+     */
+    static HintsCatalog load(File hintsDirectory)
+    {
+        try
+        {
+            Map<UUID, List<HintsDescriptor>> stores =
+                Files.list(hintsDirectory.toPath())
+                     .filter(HintsDescriptor::isHintFileName)
+                     .map(HintsDescriptor::readFromFile)
+                     .collect(groupingBy(h -> h.hostId));
+            return new HintsCatalog(hintsDirectory, stores);
+        }
+        catch (IOException e)
+        {
+            throw new FSReadError(e, hintsDirectory);
+        }
+    }
+
+    Stream<HintsStore> stores()
+    {
+        return stores.values().stream();
+    }
+
+    void maybeLoadStores(Iterable<UUID> hostIds)
+    {
+        for (UUID hostId : hostIds)
+            get(hostId);
+    }
+
+    HintsStore get(UUID hostId)
+    {
+        // we intentionally don't just return stores.computeIfAbsent() because it's expensive compared to simple get(),
+        // and in this case would also allocate for the capturing lambda; the method is on a really hot path
+        HintsStore store = stores.get(hostId);
+        return store == null
+             ? stores.computeIfAbsent(hostId, (id) -> HintsStore.create(id, hintsDirectory, Collections.emptyList()))
+             : store;
+    }
+
+    /**
+     * Delete all hints for all host ids.
+     *
+     * Will not delete the files that are currently being dispatched, or written to.
+     */
+    void deleteAllHints()
+    {
+        stores.keySet().forEach(this::deleteAllHints);
+    }
+
+    /**
+     * Delete all hints for the specified host id.
+     *
+     * Will not delete the files that are currently being dispatched, or written to.
+     */
+    void deleteAllHints(UUID hostId)
+    {
+        HintsStore store = stores.get(hostId);
+        if (store != null)
+            store.deleteAllHints();
+    }
+
+    void exciseStore(UUID hostId)
+    {
+        deleteAllHints(hostId);
+        stores.remove(hostId);
+    }
+
+    void fsyncDirectory()
+    {
+        int fd = CLibrary.tryOpenDirectory(hintsDirectory.getAbsolutePath());
+        if (fd != -1)
+        {
+            SyncUtil.trySync(fd);
+            CLibrary.tryCloseFD(fd);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/hints/HintsDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsDescriptor.java b/src/java/org/apache/cassandra/hints/HintsDescriptor.java
new file mode 100644
index 0000000..9c27a23
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/HintsDescriptor.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.UUID;
+import java.util.regex.Pattern;
+import java.util.zip.CRC32;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
+import org.json.simple.JSONValue;
+
+import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
+
+/**
+ * Describes the host id, the version, the timestamp of creation, and an arbitrary map of JSON-encoded parameters of a
+ * hints file.
+ *
+ * Written in the beginning of each hints file.
+ */
+final class HintsDescriptor
+{
+    static final int VERSION_30 = 1;
+    static final int CURRENT_VERSION = VERSION_30;
+
+    static final Pattern pattern =
+        Pattern.compile("^[a-fA-F0-9]{8}\\-[a-fA-F0-9]{4}\\-[a-fA-F0-9]{4}\\-[a-fA-F0-9]{4}\\-[a-fA-F0-9]{12}\\-(\\d+)\\-(\\d+)\\.hints$");
+
+    final UUID hostId;
+    final int version;
+    final long timestamp;
+
+    // implemented for future compression support - see CASSANDRA-9428
+    final ImmutableMap<String, Object> parameters;
+
+    HintsDescriptor(UUID hostId, int version, long timestamp, ImmutableMap<String, Object> parameters)
+    {
+        this.hostId = hostId;
+        this.version = version;
+        this.timestamp = timestamp;
+        this.parameters = parameters;
+    }
+
+    HintsDescriptor(UUID hostId, long timestamp)
+    {
+        this(hostId, CURRENT_VERSION, timestamp, ImmutableMap.<String, Object>of());
+    }
+
+    String fileName()
+    {
+        return String.format("%s-%s-%s.hints", hostId, timestamp, version);
+    }
+
+    String checksumFileName()
+    {
+        return String.format("%s-%s-%s.crc32", hostId, timestamp, version);
+    }
+
+    int messagingVersion()
+    {
+        return messagingVersion(version);
+    }
+
+    static int messagingVersion(int hintsVersion)
+    {
+        switch (hintsVersion)
+        {
+            case VERSION_30:
+                return MessagingService.VERSION_30;
+            default:
+                throw new AssertionError();
+        }
+    }
+
+    static boolean isHintFileName(Path path)
+    {
+        return pattern.matcher(path.getFileName().toString()).matches();
+    }
+
+    static HintsDescriptor readFromFile(Path path)
+    {
+        try (RandomAccessFile raf = new RandomAccessFile(path.toFile(), "r"))
+        {
+            return deserialize(raf);
+        }
+        catch (IOException e)
+        {
+            throw new FSReadError(e, path.toFile());
+        }
+    }
+
+    @Override
+    public String toString()
+    {
+        return MoreObjects.toStringHelper(this)
+                          .add("hostId", hostId)
+                          .add("version", version)
+                          .add("timestamp", timestamp)
+                          .add("parameters", parameters)
+                          .toString();
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o)
+            return true;
+
+        if (!(o instanceof HintsDescriptor))
+            return false;
+
+        HintsDescriptor hd = (HintsDescriptor) o;
+
+        return Objects.equal(hostId, hd.hostId)
+            && Objects.equal(version, hd.version)
+            && Objects.equal(timestamp, hd.timestamp)
+            && Objects.equal(parameters, hd.parameters);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hashCode(hostId, version, timestamp, parameters);
+    }
+
+    void serialize(DataOutputPlus out) throws IOException
+    {
+        CRC32 crc = new CRC32();
+
+        out.writeInt(version);
+        updateChecksumInt(crc, version);
+
+        out.writeLong(timestamp);
+        updateChecksumLong(crc, timestamp);
+
+        out.writeLong(hostId.getMostSignificantBits());
+        updateChecksumLong(crc, hostId.getMostSignificantBits());
+        out.writeLong(hostId.getLeastSignificantBits());
+        updateChecksumLong(crc, hostId.getLeastSignificantBits());
+
+        byte[] paramsBytes = JSONValue.toJSONString(parameters).getBytes(StandardCharsets.UTF_8);
+        out.writeInt(paramsBytes.length);
+        updateChecksumInt(crc, paramsBytes.length);
+        out.writeInt((int) crc.getValue());
+
+        out.write(paramsBytes);
+        crc.update(paramsBytes, 0, paramsBytes.length);
+
+        out.writeInt((int) crc.getValue());
+    }
+
+    int serializedSize()
+    {
+        int size = TypeSizes.sizeof(version);
+        size += TypeSizes.sizeof(timestamp);
+
+        size += TypeSizes.sizeof(hostId.getMostSignificantBits());
+        size += TypeSizes.sizeof(hostId.getLeastSignificantBits());
+
+        byte[] paramsBytes = JSONValue.toJSONString(parameters).getBytes(StandardCharsets.UTF_8);
+        size += TypeSizes.sizeof(paramsBytes.length);
+        size += 4; // size checksum
+        size += paramsBytes.length;
+        size += 4; // total checksum
+
+        return size;
+    }
+
+    static HintsDescriptor deserialize(DataInput in) throws IOException
+    {
+        CRC32 crc = new CRC32();
+
+        int version = in.readInt();
+        updateChecksumInt(crc, version);
+
+        long timestamp = in.readLong();
+        updateChecksumLong(crc, timestamp);
+
+        long msb = in.readLong();
+        updateChecksumLong(crc, msb);
+        long lsb = in.readLong();
+        updateChecksumLong(crc, lsb);
+
+        UUID hostId = new UUID(msb, lsb);
+
+        int paramsLength = in.readInt();
+        updateChecksumInt(crc, paramsLength);
+        validateCRC(in.readInt(), (int) crc.getValue());
+
+        byte[] paramsBytes = new byte[paramsLength];
+        in.readFully(paramsBytes, 0, paramsLength);
+        crc.update(paramsBytes, 0, paramsLength);
+        validateCRC(in.readInt(), (int) crc.getValue());
+
+        return new HintsDescriptor(hostId, version, timestamp, decodeJSONBytes(paramsBytes));
+    }
+
+    @SuppressWarnings("unchecked")
+    private static ImmutableMap<String, Object> decodeJSONBytes(byte[] bytes)
+    {
+        return ImmutableMap.copyOf((Map<String, Object>) JSONValue.parse(new String(bytes, StandardCharsets.UTF_8)));
+    }
+
+    private static void updateChecksumLong(CRC32 crc, long value)
+    {
+        updateChecksumInt(crc, (int) (value & 0xFFFFFFFFL));
+        updateChecksumInt(crc, (int) (value >>> 32));
+    }
+
+    private static void validateCRC(int expected, int actual) throws IOException
+    {
+        if (expected != actual)
+            throw new IOException("Hints Descriptor CRC Mismatch");
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
new file mode 100644
index 0000000..d0fdd04
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.io.File;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.service.StorageService;
+
+/**
+ * A multi-threaded (by default) executor for dispatching hints.
+ *
+ * Most of dispatch is triggered by {@link HintsDispatchTrigger} running every ~10 seconds.
+ */
+final class HintsDispatchExecutor
+{
+    private static final Logger logger = LoggerFactory.getLogger(HintsDispatchExecutor.class);
+
+    private final File hintsDirectory;
+    private final ExecutorService executor;
+    private final AtomicBoolean isPaused;
+    private final Map<UUID, Future> scheduledDispatches;
+
+    HintsDispatchExecutor(File hintsDirectory, int maxThreads, AtomicBoolean isPaused)
+    {
+        this.hintsDirectory = hintsDirectory;
+        this.isPaused = isPaused;
+
+        scheduledDispatches = new ConcurrentHashMap<>();
+        executor = new JMXEnabledThreadPoolExecutor(1,
+                                                    maxThreads,
+                                                    1,
+                                                    TimeUnit.MINUTES,
+                                                    new LinkedBlockingQueue<>(),
+                                                    new NamedThreadFactory("HintsDispatcher", Thread.MIN_PRIORITY),
+                                                    "internal");
+    }
+
+    /*
+     * It's safe to terminate dispatch in process and to deschedule dispatch.
+     */
+    void shutdownBlocking()
+    {
+        scheduledDispatches.clear();
+        executor.shutdownNow();
+    }
+
+    boolean isScheduled(HintsStore store)
+    {
+        return scheduledDispatches.containsKey(store.hostId);
+    }
+
+    Future dispatch(HintsStore store)
+    {
+        return dispatch(store, store.hostId);
+    }
+
+    Future dispatch(HintsStore store, UUID hostId)
+    {
+        /*
+         * It is safe to perform dispatch for the same host id concurrently in two or more threads,
+         * however there is nothing to win from it - so we don't.
+         *
+         * Additionally, having just one dispatch task per host id ensures that we'll never violate our per-destination
+         * rate limit, without having to share a ratelimiter between threads.
+         *
+         * It also simplifies reasoning about dispatch sessions.
+         */
+        return scheduledDispatches.computeIfAbsent(store.hostId, uuid -> executor.submit(new DispatchHintsTask(store, hostId)));
+    }
+
+    void completeDispatchBlockingly(HintsStore store)
+    {
+        Future future = scheduledDispatches.get(store.hostId);
+        try
+        {
+            if (future != null)
+                future.get();
+        }
+        catch (ExecutionException | InterruptedException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private final class DispatchHintsTask implements Runnable
+    {
+        private final HintsStore store;
+        private final UUID hostId;
+        private final RateLimiter rateLimiter;
+
+        DispatchHintsTask(HintsStore store, UUID hostId)
+        {
+            this.store = store;
+            this.hostId = hostId;
+
+            // rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml).
+            // max rate is scaled by the number of nodes in the cluster (CASSANDRA-5272).
+            // the goal is to bound maximum hints traffic going towards a particular node from the rest of the cluster,
+            // not total outgoing hints traffic from this node - this is why the rate limiter is not shared between
+            // all the dispatch tasks (as there will be at most one dispatch task for a particular host id at a time).
+            int nodesCount = Math.max(1, StorageService.instance.getTokenMetadata().getAllEndpoints().size() - 1);
+            int throttleInKB = DatabaseDescriptor.getHintedHandoffThrottleInKB() / nodesCount;
+            this.rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024);
+        }
+
+        public void run()
+        {
+            try
+            {
+                dispatch();
+            }
+            finally
+            {
+                scheduledDispatches.remove(hostId);
+            }
+        }
+
+        private void dispatch()
+        {
+            while (true)
+            {
+                if (isPaused.get())
+                    break;
+
+                HintsDescriptor descriptor = store.poll();
+                if (descriptor == null)
+                    break;
+
+                try
+                {
+                    dispatch(descriptor);
+                }
+                catch (FSReadError e)
+                {
+                    logger.error("Failed to dispatch hints file {}: file is corrupted ({})", descriptor.fileName(), e);
+                    store.cleanUp(descriptor);
+                    store.blacklist(descriptor);
+                    throw e;
+                }
+            }
+        }
+
+        private void dispatch(HintsDescriptor descriptor)
+        {
+            logger.debug("Dispatching hints file {}", descriptor.fileName());
+
+            File file = new File(hintsDirectory, descriptor.fileName());
+            Long offset = store.getDispatchOffset(descriptor).orElse(null);
+
+            try (HintsDispatcher dispatcher = HintsDispatcher.create(file, rateLimiter, hostId, isPaused))
+            {
+                if (offset != null)
+                    dispatcher.seek(offset);
+
+                if (dispatcher.dispatch())
+                {
+                    if (!file.delete())
+                        logger.error("Failed to delete hints file {}", descriptor.fileName());
+                    store.cleanUp(descriptor);
+                    logger.info("Finished hinted handoff of file {} to endpoint {}", descriptor.fileName(), hostId);
+                }
+                else
+                {
+                    store.markDispatchOffset(descriptor, dispatcher.dispatchOffset());
+                    store.offerFirst(descriptor);
+                    logger.info("Finished hinted handoff of file {} to endpoint {}, partially", descriptor.fileName(), hostId);
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java b/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java
new file mode 100644
index 0000000..5fe0e27
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.gms.Gossiper;
+
+import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddress;
+
+/**
+ * A simple dispatch trigger that's being run every 10 seconds.
+ *
+ * Goes through all hint stores and schedules for dispatch all the hints for hosts that are:
+ * 1. Not currently scheduled for dispatch, and
+ * 2. Either have some hint files, or an active hint writer, and
+ * 3. Are live, and
+ * 4. Have matching schema versions
+ *
+ * What does triggering a hints store for dispatch mean?
+ * - If there are existing hint files, it means submitting them for dispatch;
+ * - If there is an active writer, closing it, for the next run to pick it up.
+ */
+final class HintsDispatchTrigger implements Runnable
+{
+    private final HintsCatalog catalog;
+    private final HintsWriteExecutor writeExecutor;
+    private final HintsDispatchExecutor dispatchExecutor;
+    private final AtomicBoolean isPaused;
+
+    HintsDispatchTrigger(HintsCatalog catalog,
+                         HintsWriteExecutor writeExecutor,
+                         HintsDispatchExecutor dispatchExecutor,
+                         AtomicBoolean isPaused)
+    {
+        this.catalog = catalog;
+        this.writeExecutor = writeExecutor;
+        this.dispatchExecutor = dispatchExecutor;
+        this.isPaused = isPaused;
+    }
+
+    public void run()
+    {
+        if (isPaused.get())
+            return;
+
+        catalog.stores()
+               .filter(store -> !isScheduled(store))
+               .filter(HintsStore::isLive)
+               .filter(store -> store.isWriting() || store.hasFiles())
+               .filter(store -> Gossiper.instance.valuesEqual(getBroadcastAddress(), store.address(), ApplicationState.SCHEMA))
+               .forEach(this::schedule);
+    }
+
+    private void schedule(HintsStore store)
+    {
+        if (store.hasFiles())
+            dispatchExecutor.dispatch(store);
+
+        if (store.isWriting())
+            writeExecutor.closeWriter(store);
+    }
+
+    private boolean isScheduled(HintsStore store)
+    {
+        return dispatchExecutor.isScheduled(store);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/hints/HintsDispatcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatcher.java b/src/java/org/apache/cassandra/hints/HintsDispatcher.java
new file mode 100644
index 0000000..f769e09
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/HintsDispatcher.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.net.IAsyncCallbackWithFailure;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.concurrent.SimpleCondition;
+
+/**
+ * Dispatches a single hints file to a specified node in a batched manner.
+ *
+ * Uses either {@link EncodedHintMessage} - when dispatching hints into a node with the same messaging version as the hints file,
+ * or {@link HintMessage}, when conversion is required.
+ */
+final class HintsDispatcher implements AutoCloseable
+{
+    private enum Action { CONTINUE, ABORT, RETRY }
+
+    private final HintsReader reader;
+    private final UUID hostId;
+    private final InetAddress address;
+    private final int messagingVersion;
+    private final AtomicBoolean isPaused;
+
+    private long currentPageOffset;
+
+    private HintsDispatcher(HintsReader reader, UUID hostId, InetAddress address, int messagingVersion, AtomicBoolean isPaused)
+    {
+        currentPageOffset = 0L;
+
+        this.reader = reader;
+        this.hostId = hostId;
+        this.address = address;
+        this.messagingVersion = messagingVersion;
+        this.isPaused = isPaused;
+    }
+
+    static HintsDispatcher create(File file, RateLimiter rateLimiter, UUID hostId, AtomicBoolean isPaused)
+    {
+        InetAddress address = StorageService.instance.getEndpointForHostId(hostId);
+        int messagingVersion = MessagingService.instance().getVersion(address);
+        return new HintsDispatcher(HintsReader.open(file, rateLimiter), hostId, address, messagingVersion, isPaused);
+    }
+
+    public void close()
+    {
+        reader.close();
+    }
+
+    void seek(long bytes)
+    {
+        reader.seek(bytes);
+        currentPageOffset = 0L;
+    }
+
+    /**
+     * @return whether or not dispatch completed entirely and successfully
+     */
+    boolean dispatch()
+    {
+        for (HintsReader.Page page : reader)
+        {
+            currentPageOffset = page.offset;
+            if (dispatch(page) != Action.CONTINUE)
+                return false;
+        }
+
+        return true;
+    }
+
+    /**
+     * @return offset of the first non-delivered page
+     */
+    long dispatchOffset()
+    {
+        return currentPageOffset;
+    }
+
+    private boolean isHostAlive()
+    {
+        return FailureDetector.instance.isAlive(address);
+    }
+
+    private boolean isPaused()
+    {
+        return isPaused.get();
+    }
+
+    // retry in case of a timeout; stop in case of a failure, host going down, or delivery paused
+    private Action dispatch(HintsReader.Page page)
+    {
+        Action action = sendHintsAndAwait(page);
+        return action == Action.RETRY
+             ? dispatch(page)
+             : action;
+    }
+
+    private Action sendHintsAndAwait(HintsReader.Page page)
+    {
+        Collection<Callback> callbacks = new ArrayList<>();
+
+        /*
+         * If hints file messaging version matches the version of the target host, we'll use the optimised path -
+         * skipping the redundant decoding/encoding cycle of the already encoded hint.
+         *
+         * If that is not the case, we'll need to perform conversion to a newer (or an older) format, and decoding the hint
+         * is an unavoidable intermediate step.
+         */
+        Action action = reader.descriptor().messagingVersion() == messagingVersion
+                      ? sendHints(page.buffersIterator(), callbacks, this::sendEncodedHint)
+                      : sendHints(page.hintsIterator(), callbacks, this::sendHint);
+
+        if (action == Action.ABORT)
+            return action;
+
+        for (Callback cb : callbacks)
+            if (cb.await() != Callback.Outcome.SUCCESS)
+                return Action.RETRY;
+
+        return Action.CONTINUE;
+    }
+
+    /*
+     * Sending hints in compatibility mode.
+     */
+    private <T> Action sendHints(Iterator<T> hints, Collection<Callback> callbacks, Function<T, Callback> sendFunction)
+    {
+        while (hints.hasNext())
+        {
+            if (!isHostAlive() || isPaused())
+                return Action.ABORT;
+            callbacks.add(sendFunction.apply(hints.next()));
+        }
+        return Action.CONTINUE;
+    }
+
+    private Callback sendHint(Hint hint)
+    {
+        Callback callback = new Callback();
+        HintMessage message = new HintMessage(hostId, hint);
+        MessagingService.instance().sendRRWithFailure(message.createMessageOut(), address, callback);
+        return callback;
+    }
+
+    /*
+     * Sending hints in raw mode.
+     */
+
+    private Callback sendEncodedHint(ByteBuffer hint)
+    {
+        Callback callback = new Callback();
+        EncodedHintMessage message = new EncodedHintMessage(hostId, hint, messagingVersion);
+        MessagingService.instance().sendRRWithFailure(message.createMessageOut(), address, callback);
+        return callback;
+    }
+
+    private static final class Callback implements IAsyncCallbackWithFailure
+    {
+        enum Outcome { SUCCESS, TIMEOUT, FAILURE }
+
+        private final long start = System.nanoTime();
+        private final SimpleCondition condition = new SimpleCondition();
+        private volatile Outcome outcome;
+
+        Outcome await()
+        {
+            long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getTimeout(MessagingService.Verb.HINT)) - (System.nanoTime() - start);
+            boolean timedOut;
+
+            try
+            {
+                timedOut = !condition.await(timeout, TimeUnit.NANOSECONDS);
+            }
+            catch (InterruptedException e)
+            {
+                throw new AssertionError(e);
+            }
+
+            return timedOut ? Outcome.TIMEOUT : outcome;
+        }
+
+        public void onFailure(InetAddress from)
+        {
+            outcome = Outcome.FAILURE;
+            condition.signalAll();
+        }
+
+        public void response(MessageIn msg)
+        {
+            outcome = Outcome.SUCCESS;
+            condition.signalAll();
+        }
+
+        public boolean isLatencyForSnitch()
+        {
+            return false;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/hints/HintsReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsReader.java b/src/java/org/apache/cassandra/hints/HintsReader.java
new file mode 100644
index 0000000..7d164b4
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/HintsReader.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+
+import javax.annotation.Nullable;
+
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.UnknownColumnFamilyException;
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.utils.AbstractIterator;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.CLibrary;
+
+/**
+ * A paged non-compressed hints reader that provides two iterators:
+ * - a 'raw' ByteBuffer iterator that doesn't deserialize the hints, but returns the pre-encoded hints verbatim
+ * - a decoded iterator, that deserializes the underlying bytes into {@link Hint} instances.
+ *
+ * The former is an optimisation for when the messaging version of the file matches the messaging version of the destination
+ * node. Extra decoding and reencoding is a waste of effort in this scenario, so we avoid it.
+ *
+ * The latter is required for dispatch of hints to nodes that have a different messaging version, and in general is just an
+ * easy way to enable backward and future compatibilty.
+ */
+final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
+{
+    private static final Logger logger = LoggerFactory.getLogger(HintsReader.class);
+
+    // don't read more than 512 KB of hints at a time.
+    private static final int PAGE_SIZE = 512 << 10;
+
+    private final HintsDescriptor descriptor;
+    private final File file;
+    private final RandomAccessReader reader;
+    private final ChecksummedDataInput crcInput;
+
+    // we pass the RateLimiter into HintsReader itself because it's cheaper to calculate the size before the hint is deserialized
+    @Nullable
+    private final RateLimiter rateLimiter;
+
+    private HintsReader(HintsDescriptor descriptor, File file, RandomAccessReader reader, RateLimiter rateLimiter)
+    {
+        this.descriptor = descriptor;
+        this.file = file;
+        this.reader = reader;
+        this.crcInput = ChecksummedDataInput.wrap(reader);
+        this.rateLimiter = rateLimiter;
+    }
+
+    static HintsReader open(File file, RateLimiter rateLimiter)
+    {
+        RandomAccessReader reader = RandomAccessReader.open(file);
+        try
+        {
+            HintsDescriptor descriptor = HintsDescriptor.deserialize(reader);
+            return new HintsReader(descriptor, file, reader, rateLimiter);
+        }
+        catch (IOException e)
+        {
+            reader.close();
+            throw new FSReadError(e, file);
+        }
+    }
+
+    static HintsReader open(File file)
+    {
+        return open(file, null);
+    }
+
+    public void close()
+    {
+        FileUtils.closeQuietly(reader);
+    }
+
+    public HintsDescriptor descriptor()
+    {
+        return descriptor;
+    }
+
+    void seek(long newPosition)
+    {
+        reader.seek(newPosition);
+    }
+
+    public Iterator<Page> iterator()
+    {
+        return new PagesIterator();
+    }
+
+    final class Page
+    {
+        public final long offset;
+
+        private Page(long offset)
+        {
+            this.offset = offset;
+        }
+
+        Iterator<Hint> hintsIterator()
+        {
+            return new HintsIterator(offset);
+        }
+
+        Iterator<ByteBuffer> buffersIterator()
+        {
+            return new BuffersIterator(offset);
+        }
+    }
+
+    final class PagesIterator extends AbstractIterator<Page>
+    {
+        @SuppressWarnings("resource")
+        protected Page computeNext()
+        {
+            CLibrary.trySkipCache(reader.getChannel().getFileDescriptor(), 0, reader.getFilePointer(), reader.getPath());
+
+            if (reader.length() == reader.getFilePointer())
+                return endOfData();
+
+            return new Page(reader.getFilePointer());
+        }
+    }
+
+    /**
+     * A decoding iterator that deserializes the hints as it goes.
+     */
+    final class HintsIterator extends AbstractIterator<Hint>
+    {
+        private final long offset;
+
+        HintsIterator(long offset)
+        {
+            super();
+            this.offset = offset;
+        }
+
+        protected Hint computeNext()
+        {
+            Hint hint;
+
+            do
+            {
+                long position = reader.getFilePointer();
+
+                if (reader.length() == position)
+                    return endOfData(); // reached EOF
+
+                if (position - offset >= PAGE_SIZE)
+                    return endOfData(); // read page size or more bytes
+
+                try
+                {
+                    hint = computeNextInternal();
+                }
+                catch (IOException e)
+                {
+                    throw new FSReadError(e, file);
+                }
+            }
+            while (hint == null);
+
+            return hint;
+        }
+
+        private Hint computeNextInternal() throws IOException
+        {
+            crcInput.resetCrc();
+            crcInput.resetLimit();
+
+            int size = crcInput.readInt();
+
+            // if we cannot corroborate the size via crc, then we cannot safely skip this hint
+            if (reader.readInt() != crcInput.getCrc())
+                throw new IOException("Digest mismatch exception");
+
+            return readHint(size);
+        }
+
+        private Hint readHint(int size) throws IOException
+        {
+            if (rateLimiter != null)
+                rateLimiter.acquire(size);
+            crcInput.limit(size);
+
+            Hint hint;
+            try
+            {
+                hint = Hint.serializer.deserialize(crcInput, descriptor.messagingVersion());
+            }
+            catch (UnknownColumnFamilyException e)
+            {
+                logger.warn("Failed to read a hint for {} - table with id {} is unknown in file {}",
+                            descriptor.hostId,
+                            e.cfId,
+                            descriptor.fileName());
+                reader.skipBytes(crcInput.bytesRemaining());
+
+                return null;
+            }
+
+            if (reader.readInt() == crcInput.getCrc())
+                return hint;
+
+            // log a warning and skip the corrupted entry
+            logger.warn("Failed to read a hint for {} - digest mismatch for hint at position {} in file {}",
+                        descriptor.hostId,
+                        crcInput.getPosition() - size - 4,
+                        descriptor.fileName());
+            return null;
+        }
+    }
+
+    /**
+     * A verbatim iterator that simply returns the underlying ByteBuffers.
+     */
+    final class BuffersIterator extends AbstractIterator<ByteBuffer>
+    {
+        private final long offset;
+
+        BuffersIterator(long offset)
+        {
+            super();
+            this.offset = offset;
+        }
+
+        protected ByteBuffer computeNext()
+        {
+            ByteBuffer buffer;
+
+            do
+            {
+                long position = reader.getFilePointer();
+
+                if (reader.length() == position)
+                    return endOfData(); // reached EOF
+
+                if (position - offset >= PAGE_SIZE)
+                    return endOfData(); // read page size or more bytes
+
+                try
+                {
+                    buffer = computeNextInternal();
+                }
+                catch (IOException e)
+                {
+                    throw new FSReadError(e, file);
+                }
+            }
+            while (buffer == null);
+
+            return buffer;
+        }
+
+        private ByteBuffer computeNextInternal() throws IOException
+        {
+            crcInput.resetCrc();
+            crcInput.resetLimit();
+
+            int size = crcInput.readInt();
+
+            // if we cannot corroborate the size via crc, then we cannot safely skip this hint
+            if (reader.readInt() != crcInput.getCrc())
+                throw new IOException("Digest mismatch exception");
+
+            return readBuffer(size);
+        }
+
+        private ByteBuffer readBuffer(int size) throws IOException
+        {
+            if (rateLimiter != null)
+                rateLimiter.acquire(size);
+            crcInput.limit(size);
+
+            ByteBuffer buffer = ByteBufferUtil.read(crcInput, size);
+            if (reader.readInt() == crcInput.getCrc())
+                return buffer;
+
+            // log a warning and skip the corrupted entry
+            logger.warn("Failed to read a hint for {} - digest mismatch for hint at position {} in file {}",
+                        descriptor.hostId,
+                        crcInput.getPosition() - size - 4,
+                        descriptor.fileName());
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/hints/HintsService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsService.java b/src/java/org/apache/cassandra/hints/HintsService.java
new file mode 100644
index 0000000..3f30c1d
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/HintsService.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.io.File;
+import java.lang.management.ManagementFactory;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.metrics.HintedHandoffMetrics;
+import org.apache.cassandra.metrics.StorageMetrics;
+import org.apache.cassandra.service.StorageService;
+
+import static com.google.common.collect.Iterables.transform;
+import static com.google.common.collect.Iterables.size;
+
+/**
+ * A singleton-ish wrapper over various hints components:
+ * - a catalog of all hints stores
+ * - a single-threaded write executor
+ * - a multi-threaded dispatch executor
+ * - the buffer pool for writing hints into
+ *
+ * The front-end for everything hints related.
+ */
+public final class HintsService implements HintsServiceMBean
+{
+    private static final Logger logger = LoggerFactory.getLogger(HintsService.class);
+
+    public static final HintsService instance = new HintsService();
+
+    private static final String MBEAN_NAME = "org.apache.cassandra.hints:type=HintsService";
+
+    private static final int MIN_BUFFER_SIZE = 32 << 20;
+
+    private final HintsCatalog catalog;
+    private final HintsWriteExecutor writeExecutor;
+    private final HintsBufferPool bufferPool;
+    private final HintsDispatchExecutor dispatchExecutor;
+    private final AtomicBoolean isDispatchPaused;
+
+    private volatile boolean isShutDown = false;
+
+    private final ScheduledFuture triggerFlushingFuture;
+    private volatile ScheduledFuture triggerDispatchFuture;
+
+    public final HintedHandoffMetrics metrics;
+
+    private HintsService()
+    {
+        File hintsDirectory = DatabaseDescriptor.getHintsDirectory();
+        int maxDeliveryThreads = DatabaseDescriptor.getMaxHintsDeliveryThreads();
+
+        catalog = HintsCatalog.load(hintsDirectory);
+        writeExecutor = new HintsWriteExecutor(catalog);
+
+        int bufferSize = Math.max(DatabaseDescriptor.getMaxMutationSize() * 2, MIN_BUFFER_SIZE);
+        bufferPool = new HintsBufferPool(bufferSize, writeExecutor::flushBuffer);
+
+        isDispatchPaused = new AtomicBoolean(true);
+        dispatchExecutor = new HintsDispatchExecutor(hintsDirectory, maxDeliveryThreads, isDispatchPaused);
+
+        // periodically empty the current content of the buffers
+        int flushPeriod = DatabaseDescriptor.getHintsFlushPeriodInMS();
+        triggerFlushingFuture = ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(() -> writeExecutor.flushBufferPool(bufferPool),
+                                                                                        flushPeriod,
+                                                                                        flushPeriod,
+                                                                                        TimeUnit.MILLISECONDS);
+        metrics = new HintedHandoffMetrics();
+    }
+
+    public void registerMBean()
+    {
+        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+        try
+        {
+            mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Write a hint for a iterable of nodes.
+     *
+     * @param hostIds host ids of the hint's target nodes
+     * @param hint the hint to store
+     */
+    public void write(Iterable<UUID> hostIds, Hint hint)
+    {
+        if (isShutDown)
+            throw new IllegalStateException("HintsService is shut down and can't accept new hints");
+
+        // we have to make sure that the HintsStore instances get properly initialized - otherwise dispatch will not trigger
+        catalog.maybeLoadStores(hostIds);
+
+        if (hint.isLive())
+            bufferPool.write(hostIds, hint);
+
+        StorageMetrics.totalHints.inc(size(hostIds));
+    }
+
+    /**
+     * Write a hint for a single node.
+     *
+     * @param hostId host id of the hint's target node
+     * @param hint the hint to store
+     */
+    public void write(UUID hostId, Hint hint)
+    {
+        write(Collections.singleton(hostId), hint);
+    }
+
+    /**
+     * Flush the buffer pool for the selected target nodes, then fsync their writers.
+     *
+     * @param hostIds host ids of the nodes to flush and fsync hints for
+     */
+    public void flushAndFsyncBlockingly(Iterable<UUID> hostIds)
+    {
+        Iterable<HintsStore> stores = transform(hostIds, catalog::get);
+        writeExecutor.flushBufferPool(bufferPool, stores);
+        writeExecutor.fsyncWritersBlockingly(stores);
+    }
+
+    public synchronized void startDispatch()
+    {
+        if (isShutDown)
+            throw new IllegalStateException("HintsService is shut down and cannot be restarted");
+
+        isDispatchPaused.set(false);
+
+        HintsDispatchTrigger trigger = new HintsDispatchTrigger(catalog, writeExecutor, dispatchExecutor, isDispatchPaused);
+        // triggering hint dispatch is now very cheap, so we can do it more often - every 10 seconds vs. every 10 minutes,
+        // previously; this reduces mean time to delivery, and positively affects batchlog delivery latencies, too
+        triggerDispatchFuture = ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(trigger, 10, 10, TimeUnit.SECONDS);
+    }
+
+    public void pauseDispatch()
+    {
+        logger.info("Paused hints dispatch");
+        isDispatchPaused.set(true);
+    }
+
+    public void resumeDispatch()
+    {
+        logger.info("Resumed hints dispatch");
+        isDispatchPaused.set(false);
+    }
+
+    /**
+     * Gracefully and blockingly shut down the service.
+     *
+     * Will abort dispatch sessions that are currently in progress (which is okay, it's idempotent),
+     * and make sure the buffers are flushed, hints files written and fsynced.
+     */
+    public synchronized void shutdownBlocking()
+    {
+        if (isShutDown)
+            throw new IllegalStateException("HintsService has already been shut down");
+        isShutDown = true;
+
+        if (triggerDispatchFuture != null)
+            triggerDispatchFuture.cancel(false);
+        pauseDispatch();
+
+        triggerFlushingFuture.cancel(false);
+
+        writeExecutor.flushBufferPool(bufferPool);
+        writeExecutor.closeAllWriters();
+
+        dispatchExecutor.shutdownBlocking();
+        writeExecutor.shutdownBlocking();
+    }
+
+    public void decommission()
+    {
+        resumeDispatch();
+    }
+
+    /**
+     * Deletes all hints for all destinations. Doesn't make snapshots - should be used with care.
+     */
+    public void deleteAllHints()
+    {
+        catalog.deleteAllHints();
+    }
+
+    /**
+     * Deletes all hints for the provided destination. Doesn't make snapshots - should be used with care.
+     *
+     * @param address inet address of the target node - encoded as a string for easier JMX consumption
+     */
+    public void deleteAllHintsForEndpoint(String address)
+    {
+        InetAddress target;
+        try
+        {
+            target = InetAddress.getByName(address);
+        }
+        catch (UnknownHostException e)
+        {
+            throw new IllegalArgumentException(e);
+        }
+        deleteAllHintsForEndpoint(target);
+    }
+
+    /**
+     * Deletes all hints for the provided destination. Doesn't make snapshots - should be used with care.
+     *
+     * @param target inet address of the target node
+     */
+    public void deleteAllHintsForEndpoint(InetAddress target)
+    {
+        UUID hostId = StorageService.instance.getHostIdForEndpoint(target);
+        if (hostId == null)
+            throw new IllegalArgumentException("Can't delete hints for unknown address " + target);
+        catalog.deleteAllHints(hostId);
+    }
+
+    /**
+     * Cleans up hints-related state after a node with id = hostId left.
+     *
+     * Dispatcher should stop itself (isHostAlive() will start returning false for the leaving host), but we'll wait for
+     * completion anyway.
+     *
+     * We should also flush the buffer is there are any thints for the node there, and close the writer (if any),
+     * so that we don't leave any hint files lying around.
+     *
+     * Once that is done, we can simply delete all hint files and remove the host id from the catalog.
+     *
+     * The worst that can happen if we don't get everything right is a hints file (or two) remaining undeleted.
+     *
+     * @param hostId id of the node being excised
+     */
+    public void excise(UUID hostId)
+    {
+        HintsStore store = catalog.get(hostId);
+        if (store == null)
+            return;
+
+        // flush the buffer and then close the writer for the excised host id, to make sure that no new files will appear
+        // for this host id after we are done
+        Future flushFuture = writeExecutor.flushBufferPool(bufferPool, Collections.singleton(store));
+        Future closeFuture = writeExecutor.closeWriter(store);
+        try
+        {
+            flushFuture.get();
+            closeFuture.get();
+        }
+        catch (InterruptedException | ExecutionException e)
+        {
+            throw new RuntimeException(e);
+        }
+
+        // wait for the current dispatch session to end (if any), so that the currently dispatched file gets removed
+        dispatchExecutor.completeDispatchBlockingly(store);
+
+        // delete all the hints files and remove the HintsStore instance from the map in the catalog
+        catalog.exciseStore(hostId);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/hints/HintsServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsServiceMBean.java b/src/java/org/apache/cassandra/hints/HintsServiceMBean.java
new file mode 100644
index 0000000..fe0abcc
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/HintsServiceMBean.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+public interface HintsServiceMBean
+{
+    /**
+     * Pause dispatch of all hints. Does not affect the creation of hints.
+     */
+    void pauseDispatch();
+
+    /**
+     * Resume dispatch of all hints. Does not affect the creation of hints.
+     */
+    void resumeDispatch();
+
+    /**
+     * Irrevocably deletes all the stored hints files (with the exception of those that are being dispatched right now,
+     * or being written to).
+     */
+    void deleteAllHints();
+
+    /**
+     * Irrevocably deletes all the stored hints files for the target address (with the exception of those that are
+     * being dispatched right now, or being written to).
+     */
+    void deleteAllHintsForEndpoint(String address);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/hints/HintsStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsStore.java b/src/java/org/apache/cassandra/hints/HintsStore.java
new file mode 100644
index 0000000..e19de99
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/HintsStore.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.SyncUtil;
+
+/**
+ * Encapsulates the state of a peer's hints: the queue of hints files for dispatch, and the current writer (if any).
+ *
+ * The queue for dispatch is multi-threading safe.
+ *
+ * The writer MUST only be accessed by {@link HintsWriteExecutor}.
+ */
+final class HintsStore
+{
+    private static final Logger logger = LoggerFactory.getLogger(HintsStore.class);
+
+    public final UUID hostId;
+    private final File hintsDirectory;
+
+    private final Map<HintsDescriptor, Long> dispatchOffsets;
+    private final Deque<HintsDescriptor> dispatchDequeue;
+    private final Queue<HintsDescriptor> blacklistedFiles;
+
+    // last timestamp used in a descriptor; make sure to not reuse the same timestamp for new descriptors.
+    private volatile long lastUsedTimestamp;
+    private volatile HintsWriter hintsWriter;
+
+    private HintsStore(UUID hostId, File hintsDirectory, List<HintsDescriptor> descriptors)
+    {
+        this.hostId = hostId;
+        this.hintsDirectory = hintsDirectory;
+
+        dispatchOffsets = new ConcurrentHashMap<>();
+        dispatchDequeue = new ConcurrentLinkedDeque<>(descriptors);
+        blacklistedFiles = new ConcurrentLinkedQueue<>();
+
+        //noinspection resource
+        lastUsedTimestamp = descriptors.stream().mapToLong(d -> d.timestamp).max().orElse(0L);
+    }
+
+    static HintsStore create(UUID hostId, File hintsDirectory, List<HintsDescriptor> descriptors)
+    {
+        descriptors.sort((d1, d2) -> Long.compare(d1.timestamp, d2.timestamp));
+        return new HintsStore(hostId, hintsDirectory, descriptors);
+    }
+
+    InetAddress address()
+    {
+        return StorageService.instance.getEndpointForHostId(hostId);
+    }
+
+    boolean isLive()
+    {
+        InetAddress address = address();
+        return address != null && FailureDetector.instance.isAlive(address);
+    }
+
+    HintsDescriptor poll()
+    {
+        return dispatchDequeue.poll();
+    }
+
+    void offerFirst(HintsDescriptor descriptor)
+    {
+        dispatchDequeue.offerFirst(descriptor);
+    }
+
+    void offerLast(HintsDescriptor descriptor)
+    {
+        dispatchDequeue.offerLast(descriptor);
+    }
+
+    void deleteAllHints()
+    {
+        HintsDescriptor descriptor;
+        while ((descriptor = poll()) != null)
+        {
+            cleanUp(descriptor);
+            delete(descriptor);
+        }
+
+        while ((descriptor = blacklistedFiles.poll()) != null)
+        {
+            cleanUp(descriptor);
+            delete(descriptor);
+        }
+    }
+
+    private void delete(HintsDescriptor descriptor)
+    {
+        File hintsFile = new File(hintsDirectory, descriptor.fileName());
+        if (hintsFile.delete())
+            logger.info("Deleted hint file {}", descriptor.fileName());
+        else
+            logger.error("Failed to delete hint file {}", descriptor.fileName());
+
+        //noinspection ResultOfMethodCallIgnored
+        new File(hintsDirectory, descriptor.checksumFileName()).delete();
+    }
+
+    boolean hasFiles()
+    {
+        return !dispatchDequeue.isEmpty();
+    }
+
+    Optional<Long> getDispatchOffset(HintsDescriptor descriptor)
+    {
+        return Optional.ofNullable(dispatchOffsets.get(descriptor));
+    }
+
+    void markDispatchOffset(HintsDescriptor descriptor, long mark)
+    {
+        dispatchOffsets.put(descriptor, mark);
+    }
+
+    void cleanUp(HintsDescriptor descriptor)
+    {
+        dispatchOffsets.remove(descriptor);
+    }
+
+    void blacklist(HintsDescriptor descriptor)
+    {
+        blacklistedFiles.add(descriptor);
+    }
+
+    /*
+     * Methods dealing with HintsWriter.
+     *
+     * All of these, with the exception of isWriting(), are for exclusively single-threaded use by HintsWriteExecutor.
+     */
+
+    boolean isWriting()
+    {
+        return hintsWriter != null;
+    }
+
+    HintsWriter getOrOpenWriter()
+    {
+        if (hintsWriter == null)
+            hintsWriter = openWriter();
+        return hintsWriter;
+    }
+
+    HintsWriter getWriter()
+    {
+        return hintsWriter;
+    }
+
+    private HintsWriter openWriter()
+    {
+        lastUsedTimestamp = Math.max(System.currentTimeMillis(), lastUsedTimestamp + 1);
+        HintsDescriptor descriptor = new HintsDescriptor(hostId, lastUsedTimestamp);
+
+        try
+        {
+            return HintsWriter.create(hintsDirectory, descriptor);
+        }
+        catch (IOException e)
+        {
+            throw new FSWriteError(e, descriptor.fileName());
+        }
+    }
+
+    void closeWriter()
+    {
+        if (hintsWriter != null)
+        {
+            hintsWriter.close();
+            offerLast(hintsWriter.descriptor());
+            hintsWriter = null;
+            SyncUtil.trySyncDir(hintsDirectory);
+        }
+    }
+
+    void fsyncWriter()
+    {
+        if (hintsWriter != null)
+            hintsWriter.fsync();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java b/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java
new file mode 100644
index 0000000..be52f92
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.concurrent.*;
+
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.FSWriteError;
+
+/**
+ * A single threaded executor that exclusively writes all the hints and otherwise manipulate the writers.
+ *
+ * Flushing demultiplexes the provided {@link HintsBuffer} and sequentially writes to each {@link HintsWriter},
+ * using the same shared write buffer. In the near future, when CASSANDRA-9428 (compression) is implemented,
+ * will also share a compression buffer.
+ */
+final class HintsWriteExecutor
+{
+    private static final int WRITE_BUFFER_SIZE = 256 << 10;
+
+    private final HintsCatalog catalog;
+    private final ByteBuffer writeBuffer;
+    private final ExecutorService executor;
+
+    HintsWriteExecutor(HintsCatalog catalog)
+    {
+        this.catalog = catalog;
+
+        writeBuffer = ByteBuffer.allocateDirect(WRITE_BUFFER_SIZE);
+        executor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("HintsWriteExecutor", 1);
+    }
+
+    /*
+     * Should be very fast (worst case scenario - write a few 10s of megabytes to disk).
+     */
+    void shutdownBlocking()
+    {
+        executor.shutdown();
+        try
+        {
+            executor.awaitTermination(1, TimeUnit.MINUTES);
+        }
+        catch (InterruptedException e)
+        {
+            throw new AssertionError(e);
+        }
+    }
+
+    /**
+     * Flush the provided buffer, recycle it and offer it back to the pool.
+     */
+    Future flushBuffer(HintsBuffer buffer, HintsBufferPool bufferPool)
+    {
+        return executor.submit(new FlushBufferTask(buffer, bufferPool));
+    }
+
+    /**
+     * Flush the current buffer, but without clearing/recycling it.
+     */
+    Future flushBufferPool(HintsBufferPool bufferPool)
+    {
+        return executor.submit(new FlushBufferPoolTask(bufferPool));
+    }
+
+    /**
+     * Flush the current buffer just for the specified hints stores. Without clearing/recycling it.
+     */
+    Future flushBufferPool(HintsBufferPool bufferPool, Iterable<HintsStore> stores)
+    {
+        return executor.submit(new PartiallyFlushBufferPoolTask(bufferPool, stores));
+    }
+
+    void fsyncWritersBlockingly(Iterable<HintsStore> stores)
+    {
+        try
+        {
+            executor.submit(new FsyncWritersTask(stores)).get();
+        }
+        catch (InterruptedException | ExecutionException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    Future closeWriter(HintsStore store)
+    {
+        return executor.submit(store::closeWriter);
+    }
+
+    Future closeAllWriters()
+    {
+        return executor.submit(() -> catalog.stores().forEach(HintsStore::closeWriter));
+    }
+
+    private final class FlushBufferTask implements Runnable
+    {
+        private final HintsBuffer buffer;
+        private final HintsBufferPool bufferPool;
+
+        FlushBufferTask(HintsBuffer buffer, HintsBufferPool bufferPool)
+        {
+            this.buffer = buffer;
+            this.bufferPool = bufferPool;
+        }
+
+        public void run()
+        {
+            buffer.waitForModifications();
+
+            try
+            {
+                flush(buffer);
+            }
+            finally
+            {
+                HintsBuffer recycledBuffer = buffer.recycle();
+                if (!bufferPool.offer(recycledBuffer))
+                    recycledBuffer.free();
+            }
+        }
+    }
+
+    private final class FlushBufferPoolTask implements Runnable
+    {
+        private final HintsBufferPool bufferPool;
+
+        FlushBufferPoolTask(HintsBufferPool bufferPool)
+        {
+            this.bufferPool = bufferPool;
+        }
+
+        public void run()
+        {
+            HintsBuffer buffer = bufferPool.currentBuffer();
+            buffer.waitForModifications();
+            flush(buffer);
+        }
+    }
+
+    private final class PartiallyFlushBufferPoolTask implements Runnable
+    {
+        private final HintsBufferPool bufferPool;
+        private final Iterable<HintsStore> stores;
+
+        PartiallyFlushBufferPoolTask(HintsBufferPool bufferPool, Iterable<HintsStore> stores)
+        {
+            this.bufferPool = bufferPool;
+            this.stores = stores;
+        }
+
+        public void run()
+        {
+            HintsBuffer buffer = bufferPool.currentBuffer();
+            buffer.waitForModifications();
+            stores.forEach(store -> flush(buffer.consumingHintsIterator(store.hostId), store));
+        }
+    }
+
+    private final class FsyncWritersTask implements Runnable
+    {
+        private final Iterable<HintsStore> stores;
+
+        FsyncWritersTask(Iterable<HintsStore> stores)
+        {
+            this.stores = stores;
+        }
+
+        public void run()
+        {
+            stores.forEach(HintsStore::fsyncWriter);
+            catalog.fsyncDirectory();
+        }
+    }
+
+    private void flush(HintsBuffer buffer)
+    {
+        buffer.hostIds().forEach(hostId -> flush(buffer.consumingHintsIterator(hostId), catalog.get(hostId)));
+    }
+
+    private void flush(Iterator<ByteBuffer> iterator, HintsStore store)
+    {
+        while (true)
+        {
+            flushInternal(iterator, store);
+
+            if (!iterator.hasNext())
+                break;
+
+            // exceeded the size limit for an individual file, but still have more to write
+            // close the current writer and continue flushing to a new one in the next iteration
+            store.closeWriter();
+        }
+    }
+
+    private void flushInternal(Iterator<ByteBuffer> iterator, HintsStore store)
+    {
+        long maxHintsFileSize = DatabaseDescriptor.getMaxHintsFileSize();
+
+        @SuppressWarnings("resource")
+        HintsWriter writer = store.getOrOpenWriter();
+
+        try (HintsWriter.Session session = writer.newSession(writeBuffer))
+        {
+            while (iterator.hasNext())
+            {
+                session.append(iterator.next());
+                if (session.position() >= maxHintsFileSize)
+                    break;
+            }
+        }
+        catch (IOException e)
+        {
+            throw new FSWriteError(e, writer.descriptor().fileName());
+        }
+    }
+}