You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/08/19 16:29:49 UTC

[2/5] cassandra git commit: Rewrite hinted handoff

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/hints/HintsWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsWriter.java b/src/java/org/apache/cassandra/hints/HintsWriter.java
new file mode 100644
index 0000000..300d9cc
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/HintsWriter.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.util.zip.CRC32;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputBufferFixed;
+import org.apache.cassandra.utils.CLibrary;
+import org.apache.cassandra.utils.SyncUtil;
+import org.apache.cassandra.utils.Throwables;
+
+import static org.apache.cassandra.utils.FBUtilities.updateChecksum;
+import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
+import static org.apache.cassandra.utils.Throwables.perform;
+
+final class HintsWriter implements AutoCloseable
+{
+    static final int PAGE_SIZE = 4096;
+
+    private final File directory;
+    private final HintsDescriptor descriptor;
+    private final File file;
+    private final FileChannel channel;
+    private final int fd;
+    private final CRC32 globalCRC;
+
+    private volatile long lastSyncPosition = 0L;
+
+    private HintsWriter(File directory, HintsDescriptor descriptor, File file, FileChannel channel, int fd, CRC32 globalCRC)
+    {
+        this.directory = directory;
+        this.descriptor = descriptor;
+        this.file = file;
+        this.channel = channel;
+        this.fd = fd;
+        this.globalCRC = globalCRC;
+    }
+
+    static HintsWriter create(File directory, HintsDescriptor descriptor) throws IOException
+    {
+        File file = new File(directory, descriptor.fileName());
+
+        FileChannel channel = FileChannel.open(file.toPath(), StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
+        int fd = CLibrary.getfd(channel);
+
+        CRC32 crc = new CRC32();
+
+        try
+        {
+            // write the descriptor
+            DataOutputBuffer dob = new DataOutputBuffer();
+            descriptor.serialize(dob);
+            ByteBuffer descriptorBytes = dob.buffer();
+            updateChecksum(crc, descriptorBytes);
+            channel.write(descriptorBytes);
+        }
+        catch (Throwable e)
+        {
+            channel.close();
+            throw e;
+        }
+
+        return new HintsWriter(directory, descriptor, file, channel, fd, crc);
+    }
+
+    HintsDescriptor descriptor()
+    {
+        return descriptor;
+    }
+
+    private void writeChecksum()
+    {
+        File checksumFile = new File(directory, descriptor.checksumFileName());
+        try (OutputStream out = Files.newOutputStream(checksumFile.toPath()))
+        {
+            out.write(Integer.toHexString((int) globalCRC.getValue()).getBytes());
+        }
+        catch (IOException e)
+        {
+            throw new FSWriteError(e, checksumFile);
+        }
+    }
+
+    public void close()
+    {
+        perform(file, Throwables.FileOpType.WRITE, this::doFsync, channel::close);
+
+        writeChecksum();
+    }
+
+    public void fsync()
+    {
+        perform(file, Throwables.FileOpType.WRITE, this::doFsync);
+    }
+
+    private void doFsync() throws IOException
+    {
+        SyncUtil.force(channel, true);
+        lastSyncPosition = channel.position();
+    }
+
+    Session newSession(ByteBuffer buffer)
+    {
+        try
+        {
+            return new Session(buffer, channel.size());
+        }
+        catch (IOException e)
+        {
+            throw new FSWriteError(e, file);
+        }
+    }
+
+    /**
+     * The primary goal of the Session class is to be able to share the same buffers among potentially dozens or hundreds
+     * of hints writers, and ensure that their contents are always written to the underlying channels in the end.
+     */
+    final class Session implements AutoCloseable
+    {
+        private final ByteBuffer buffer;
+
+        private final long initialSize;
+        private long bytesWritten;
+
+        Session(ByteBuffer buffer, long initialSize)
+        {
+            buffer.clear();
+            bytesWritten = 0L;
+
+            this.buffer = buffer;
+            this.initialSize = initialSize;
+        }
+
+        long position()
+        {
+            return initialSize + bytesWritten;
+        }
+
+        /**
+         * Appends the serialized hint (with CRC included) to this session's aggregation buffer,
+         * writes to the underlying channel when the buffer is overflown.
+         *
+         * @param hint the serialized hint (with CRC included)
+         * @throws IOException
+         */
+        void append(ByteBuffer hint) throws IOException
+        {
+            bytesWritten += hint.remaining();
+
+            // if the hint fits in the aggregation buffer, then just update the aggregation buffer,
+            // otherwise write both the aggregation buffer and the new buffer to the channel
+            if (hint.remaining() <= buffer.remaining())
+            {
+                buffer.put(hint);
+                return;
+            }
+
+            buffer.flip();
+
+            // update file-global CRC checksum
+            updateChecksum(globalCRC, buffer);
+            updateChecksum(globalCRC, hint);
+
+            channel.write(new ByteBuffer[] { buffer, hint });
+            buffer.clear();
+        }
+
+        /**
+         * Serializes and appends the hint (with CRC included) to this session's aggregation buffer,
+         * writes to the underlying channel when the buffer is overflown.
+         *
+         * Used mainly by tests and {@link LegacyHintsMigrator}
+         *
+         * @param hint the unserialized hint
+         * @throws IOException
+         */
+        void append(Hint hint) throws IOException
+        {
+            int hintSize = (int) Hint.serializer.serializedSize(hint, descriptor.messagingVersion());
+            int totalSize = hintSize + HintsBuffer.ENTRY_OVERHEAD_SIZE;
+
+            if (totalSize > buffer.remaining())
+                flushBuffer();
+
+            ByteBuffer hintBuffer = totalSize <= buffer.remaining()
+                                  ? buffer
+                                  : ByteBuffer.allocate(totalSize);
+
+            CRC32 crc = new CRC32();
+            try (DataOutputBufferFixed out = new DataOutputBufferFixed(hintBuffer))
+            {
+                out.writeInt(hintSize);
+                updateChecksumInt(crc, hintSize);
+                out.writeInt((int) crc.getValue());
+
+                Hint.serializer.serialize(hint, out, descriptor.messagingVersion());
+                updateChecksum(crc, hintBuffer, hintBuffer.position() - hintSize, hintSize);
+                out.writeInt((int) crc.getValue());
+            }
+
+            if (hintBuffer == buffer)
+                bytesWritten += totalSize;
+            else
+                append((ByteBuffer) hintBuffer.flip());
+        }
+
+        /**
+         * Closes the session - flushes the aggregation buffer (if not empty), does page aligning, and potentially fsyncs.
+         * @throws IOException
+         */
+        public void close() throws IOException
+        {
+            flushBuffer();
+            maybeFsync();
+            maybeSkipCache();
+        }
+
+        private void flushBuffer() throws IOException
+        {
+            buffer.flip();
+
+            if (buffer.remaining() > 0)
+            {
+                updateChecksum(globalCRC, buffer);
+                channel.write(buffer);
+            }
+
+            buffer.clear();
+        }
+
+        private void maybeFsync()
+        {
+            if (position() >= lastSyncPosition + DatabaseDescriptor.getTrickleFsyncIntervalInKb() * 1024)
+                fsync();
+        }
+
+        private void maybeSkipCache()
+        {
+            long position = position();
+
+            // don't skip page cache for tiny files, on the assumption that if they are tiny, the target node is probably
+            // alive, and if so, the file will be closed and dispatched shortly (within a minute), and the file will be dropped.
+            if (position >= DatabaseDescriptor.getTrickleFsyncIntervalInKb() * 1024)
+                CLibrary.trySkipCache(fd, 0, position - (position % PAGE_SIZE), file.getPath());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/hints/LegacyHintsMigrator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/LegacyHintsMigrator.java b/src/java/org/apache/cassandra/hints/LegacyHintsMigrator.java
new file mode 100644
index 0000000..082e307
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/LegacyHintsMigrator.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.marshal.UUIDType;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * A migrator that goes through the legacy system.hints table and writes all the hints to the new hints storage format.
+ */
+@SuppressWarnings("deprecation")
+public final class LegacyHintsMigrator
+{
+    private static final Logger logger = LoggerFactory.getLogger(LegacyHintsMigrator.class);
+
+    private final File hintsDirectory;
+    private final long maxHintsFileSize;
+
+    private final ColumnFamilyStore legacyHintsTable;
+    private final int pageSize;
+
+    public LegacyHintsMigrator(File hintsDirectory, long maxHintsFileSize)
+    {
+        this.hintsDirectory = hintsDirectory;
+        this.maxHintsFileSize = maxHintsFileSize;
+
+        legacyHintsTable = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_HINTS);
+        pageSize = calculatePageSize(legacyHintsTable);
+    }
+
+    // read fewer columns (mutations) per page if they are very large
+    private static int calculatePageSize(ColumnFamilyStore legacyHintsTable)
+    {
+        int size = 128;
+
+        int meanCellCount = legacyHintsTable.getMeanColumns();
+        double meanPartitionSize = legacyHintsTable.getMeanPartitionSize();
+
+        if (meanCellCount != 0 || meanPartitionSize != 0)
+        {
+            int avgHintSize = (int) meanPartitionSize / meanCellCount;
+            size = Math.max(2, Math.min(size, (512 << 10) / avgHintSize));
+        }
+
+        return size;
+    }
+
+    public void migrate()
+    {
+        // nothing to migrate
+        if (legacyHintsTable.isEmpty())
+            return;
+        logger.info("Migrating legacy hints to new storage");
+
+        // major-compact all of the existing sstables to get rid of the tombstones + expired hints
+        logger.info("Forcing a major compaction of {}.{} table", SystemKeyspace.NAME, SystemKeyspace.LEGACY_HINTS);
+        compactLegacyHints();
+
+        // paginate over legacy hints and write them to the new storage
+        logger.info("Migrating legacy hints to the new storage");
+        migrateLegacyHints();
+
+        // truncate the legacy hints table
+        logger.info("Truncating {}.{} table", SystemKeyspace.NAME, SystemKeyspace.LEGACY_HINTS);
+        legacyHintsTable.truncateBlocking();
+    }
+
+    private void compactLegacyHints()
+    {
+        Collection<Descriptor> descriptors = new ArrayList<>();
+        legacyHintsTable.getTracker().getUncompacting().forEach(sstable -> descriptors.add(sstable.descriptor));
+        if (!descriptors.isEmpty())
+            forceCompaction(descriptors);
+    }
+
+    private void forceCompaction(Collection<Descriptor> descriptors)
+    {
+        try
+        {
+            CompactionManager.instance.submitUserDefined(legacyHintsTable, descriptors, FBUtilities.nowInSeconds()).get();
+        }
+        catch (InterruptedException | ExecutionException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void migrateLegacyHints()
+    {
+        ByteBuffer buffer = ByteBuffer.allocateDirect(256 * 1024);
+        String query = String.format("SELECT DISTINCT target_id FROM %s.%s", SystemKeyspace.NAME, SystemKeyspace.LEGACY_HINTS);
+        //noinspection ConstantConditions
+        QueryProcessor.executeInternal(query).forEach(row -> migrateLegacyHints(row.getUUID("target_id"), buffer));
+        FileUtils.clean(buffer);
+    }
+
+    private void migrateLegacyHints(UUID hostId, ByteBuffer buffer)
+    {
+        String query = String.format("SELECT target_id, hint_id, message_version, mutation, ttl(mutation) AS ttl, writeTime(mutation) AS write_time " +
+                                     "FROM %s.%s " +
+                                     "WHERE target_id = ?",
+                                     SystemKeyspace.NAME,
+                                     SystemKeyspace.LEGACY_HINTS);
+
+        // read all the old hints (paged iterator), write them in the new format
+        UntypedResultSet rows = QueryProcessor.executeInternalWithPaging(query, pageSize, hostId);
+        migrateLegacyHints(hostId, rows, buffer);
+
+        // delete the whole partition in the legacy table; we would truncate the whole table afterwards, but this allows
+        // to not lose progress in case of a terminated conversion
+        deleteLegacyHintsPartition(hostId);
+    }
+
+    private void migrateLegacyHints(UUID hostId, UntypedResultSet rows, ByteBuffer buffer)
+    {
+        migrateLegacyHints(hostId, rows.iterator(), buffer);
+    }
+
+    private void migrateLegacyHints(UUID hostId, Iterator<UntypedResultSet.Row> iterator, ByteBuffer buffer)
+    {
+        do
+        {
+            migrateLegacyHintsInternal(hostId, iterator, buffer);
+            // if there are hints that didn't fit in the previous file, keep calling the method to write to a new
+            // file until we get everything written.
+        }
+        while (iterator.hasNext());
+    }
+
+    private void migrateLegacyHintsInternal(UUID hostId, Iterator<UntypedResultSet.Row> iterator, ByteBuffer buffer)
+    {
+        HintsDescriptor descriptor = new HintsDescriptor(hostId, System.currentTimeMillis());
+
+        try (HintsWriter writer = HintsWriter.create(hintsDirectory, descriptor))
+        {
+            try (HintsWriter.Session session = writer.newSession(buffer))
+            {
+                while (iterator.hasNext())
+                {
+                    Hint hint = convertLegacyHint(iterator.next());
+                    if (hint != null)
+                        session.append(hint);
+
+                    if (session.position() >= maxHintsFileSize)
+                        break;
+                }
+            }
+        }
+        catch (IOException e)
+        {
+            throw new FSWriteError(e, descriptor.fileName());
+        }
+    }
+
+    private static Hint convertLegacyHint(UntypedResultSet.Row row)
+    {
+        Mutation mutation = deserializeLegacyMutation(row);
+        if (mutation == null)
+            return null;
+
+        long creationTime = row.getLong("write_time"); // milliseconds, not micros, for the hints table
+        int expirationTime = FBUtilities.nowInSeconds() + row.getInt("ttl");
+        int originalGCGS = expirationTime - (int) TimeUnit.MILLISECONDS.toSeconds(creationTime);
+
+        int gcgs = Math.min(originalGCGS, mutation.smallestGCGS());
+
+        return Hint.create(mutation, creationTime, gcgs);
+    }
+
+    private static Mutation deserializeLegacyMutation(UntypedResultSet.Row row)
+    {
+        try
+        {
+            Mutation mutation = Mutation.serializer.deserialize(new DataInputBuffer(row.getBlob("mutation"), true),
+                                                                row.getInt("message_version"));
+            mutation.getPartitionUpdates().forEach(PartitionUpdate::validate);
+            return mutation;
+        }
+        catch (IOException e)
+        {
+            logger.error("Failed to migrate a hint for {} from legacy {}.{} table: {}",
+                         row.getUUID("target_id"),
+                         SystemKeyspace.NAME,
+                         SystemKeyspace.LEGACY_HINTS,
+                         e);
+            return null;
+        }
+        catch (MarshalException e)
+        {
+            logger.warn("Failed to validate a hint for {} (table id {}) from legacy {}.{} table - skipping: {})",
+                        row.getUUID("target_id"),
+                        SystemKeyspace.NAME,
+                        SystemKeyspace.LEGACY_HINTS,
+                        e);
+            return null;
+        }
+    }
+
+    private static void deleteLegacyHintsPartition(UUID hostId)
+    {
+        // intentionally use millis, like the rest of the legacy implementation did, just in case
+        Mutation mutation = new Mutation(PartitionUpdate.fullPartitionDelete(SystemKeyspace.LegacyHints,
+                                                                             UUIDType.instance.decompose(hostId),
+                                                                             System.currentTimeMillis(),
+                                                                             FBUtilities.nowInSeconds()));
+        mutation.applyUnsafe();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/hints/package-info.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/package-info.java b/src/java/org/apache/cassandra/hints/package-info.java
new file mode 100644
index 0000000..faa7b9f
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/package-info.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Hints subsystem consists of several components.
+ *
+ * {@link org.apache.cassandra.hints.Hint} encodes all the required metadata and the mutation being hinted.
+ *
+ * {@link org.apache.cassandra.hints.HintsBuffer} provides a temporary buffer for writing the hints to in a concurrent manner,
+ * before we flush them to disk.
+ *
+ * {@link org.apache.cassandra.hints.HintsBufferPool} is responsible for submitting {@link org.apache.cassandra.hints.HintsBuffer}
+ * instances for flushing when they exceed their capacity, and for maitaining a reserve {@link org.apache.cassandra.hints.HintsBuffer}
+ * instance, and creating extra ones if flushing cannot keep up with arrival rate.
+ *
+ * {@link org.apache.cassandra.hints.HintsWriteExecutor} is a single-threaded executor that performs all the writing to disk.
+ *
+ * {@link org.apache.cassandra.hints.HintsDispatchExecutor} is a multi-threaded executor responsible for dispatch of
+ * the hints to their destinations.
+ *
+ * {@link org.apache.cassandra.hints.HintsStore} tracks the state of all hints files (written and being written to)
+ * for a given host id destination.
+ *
+ * {@link org.apache.cassandra.hints.HintsCatalog} maintains the mapping of host ids to {@link org.apache.cassandra.hints.HintsStore}
+ * instances, and provides some aggregate APIs.
+ *
+ * {@link org.apache.cassandra.hints.HintsService} wraps the catalog, the pool, and the two executors, acting as a front-end
+ * for hints.
+ */
+package org.apache.cassandra.hints;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java b/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java
index e44279a..51f6569 100644
--- a/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java
@@ -21,7 +21,6 @@ import java.net.InetAddress;
 import java.util.Map.Entry;
 
 import com.codahale.metrics.Counter;
-import org.apache.cassandra.db.HintedHandOffManager;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.utils.UUIDGen;
 import org.slf4j.Logger;
@@ -34,7 +33,7 @@ import com.google.common.cache.LoadingCache;
 import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
 
 /**
- * Metrics for {@link HintedHandOffManager}.
+ * Metrics for {@link org.apache.cassandra.hints.HintsService}.
  */
 public class HintedHandoffMetrics
 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/metrics/HintsServiceMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/HintsServiceMetrics.java b/src/java/org/apache/cassandra/metrics/HintsServiceMetrics.java
new file mode 100644
index 0000000..062f67d
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/HintsServiceMetrics.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.metrics;
+
+/**
+ * Metrics for {@link HintsService}.
+ */
+public final class HintsServiceMetrics
+{
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index b057d98..13632ac 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -55,6 +55,8 @@ import org.apache.cassandra.gms.EchoMessage;
 import org.apache.cassandra.gms.GossipDigestAck;
 import org.apache.cassandra.gms.GossipDigestAck2;
 import org.apache.cassandra.gms.GossipDigestSyn;
+import org.apache.cassandra.hints.HintMessage;
+import org.apache.cassandra.hints.HintResponse;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
@@ -97,7 +99,7 @@ public final class MessagingService implements MessagingServiceMBean
     public enum Verb
     {
         MUTATION,
-        @Deprecated BINARY,
+        HINT,
         READ_REPAIR,
         READ,
         REQUEST_RESPONSE, // client-initiated reads and writes
@@ -129,7 +131,6 @@ public final class MessagingService implements MessagingServiceMBean
         _TRACE, // dummy verb so we can use MS.droppedMessagesMap
         ECHO,
         REPAIR_MESSAGE,
-        // use as padding for backwards compatability where a previous version needs to validate a verb from the future.
         PAXOS_PREPARE,
         PAXOS_PROPOSE,
         PAXOS_COMMIT,
@@ -150,6 +151,7 @@ public final class MessagingService implements MessagingServiceMBean
         put(Verb.COUNTER_MUTATION, Stage.COUNTER_MUTATION);
         put(Verb.BATCHLOG_MUTATION, Stage.BATCHLOG_MUTATION);
         put(Verb.READ_REPAIR, Stage.MUTATION);
+        put(Verb.HINT, Stage.MUTATION);
         put(Verb.TRUNCATE, Stage.MUTATION);
         put(Verb.PAXOS_PREPARE, Stage.MUTATION);
         put(Verb.PAXOS_PROPOSE, Stage.MUTATION);
@@ -226,6 +228,7 @@ public final class MessagingService implements MessagingServiceMBean
         put(Verb.PAXOS_PREPARE, Commit.serializer);
         put(Verb.PAXOS_PROPOSE, Commit.serializer);
         put(Verb.PAXOS_COMMIT, Commit.serializer);
+        put(Verb.HINT, HintMessage.serializer);
     }};
 
     /**
@@ -234,6 +237,7 @@ public final class MessagingService implements MessagingServiceMBean
     public static final EnumMap<Verb, IVersionedSerializer<?>> callbackDeserializers = new EnumMap<Verb, IVersionedSerializer<?>>(Verb.class)
     {{
         put(Verb.MUTATION, WriteResponse.serializer);
+        put(Verb.HINT, HintResponse.serializer);
         put(Verb.BATCHLOG_MUTATION, WriteResponse.serializer);
         put(Verb.READ_REPAIR, WriteResponse.serializer);
         put(Verb.COUNTER_MUTATION, WriteResponse.serializer);
@@ -299,6 +303,7 @@ public final class MessagingService implements MessagingServiceMBean
                                                                    Verb.MUTATION,
                                                                    Verb.BATCHLOG_MUTATION, //FIXME: should this be droppable??
                                                                    Verb.COUNTER_MUTATION,
+                                                                   Verb.HINT,
                                                                    Verb.READ_REPAIR,
                                                                    Verb.READ,
                                                                    Verb.RANGE_SLICE,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 3b123c3..cf1e021 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -56,6 +56,7 @@ import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.StartupException;
 import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.hints.LegacyHintsMigrator;
 import org.apache.cassandra.io.FSError;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.io.util.FileUtils;
@@ -276,6 +277,9 @@ public class CassandraDaemon
             throw new RuntimeException(e);
         }
 
+        // migrate any legacy (pre-3.0) hints from system.hints table into the new store
+        new LegacyHintsMigrator(DatabaseDescriptor.getHintsDirectory(), DatabaseDescriptor.getMaxHintsFileSize()).migrate();
+
         // enable auto compaction
         for (Keyspace keyspace : Keyspace.all())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/service/StartupChecks.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StartupChecks.java b/src/java/org/apache/cassandra/service/StartupChecks.java
index 9ffef96..1c07b52 100644
--- a/src/java/org/apache/cassandra/service/StartupChecks.java
+++ b/src/java/org/apache/cassandra/service/StartupChecks.java
@@ -179,33 +179,30 @@ public class StartupChecks
         }
     };
 
-    public static final StartupCheck checkDataDirs = new StartupCheck()
+    public static final StartupCheck checkDataDirs = () ->
     {
-        public void execute() throws StartupException
+        // check all directories(data, commitlog, saved cache) for existence and permission
+        Iterable<String> dirs = Iterables.concat(Arrays.asList(DatabaseDescriptor.getAllDataFileLocations()),
+                                                 Arrays.asList(DatabaseDescriptor.getCommitLogLocation(),
+                                                               DatabaseDescriptor.getSavedCachesLocation(),
+                                                               DatabaseDescriptor.getHintsDirectory().getAbsolutePath()));
+        for (String dataDir : dirs)
         {
-            // check all directories(data, commitlog, saved cache) for existence and permission
-            Iterable<String> dirs = Iterables.concat(Arrays.asList(DatabaseDescriptor.getAllDataFileLocations()),
-                                                     Arrays.asList(DatabaseDescriptor.getCommitLogLocation(),
-                                                                   DatabaseDescriptor.getSavedCachesLocation()));
-            for (String dataDir : dirs)
-            {
-                logger.debug("Checking directory {}", dataDir);
-                File dir = new File(dataDir);
-
-                // check that directories exist.
-                if (!dir.exists())
-                {
-                    logger.error("Directory {} doesn't exist", dataDir);
-                    // if they don't, failing their creation, stop cassandra.
-                    if (!dir.mkdirs())
-                        throw new StartupException(3, "Has no permission to create directory "+ dataDir);
-                }
-
-                // if directories exist verify their permissions
-                if (!Directories.verifyFullPermissions(dir, dataDir))
-                    throw new StartupException(3, "Insufficient permissions on directory " + dataDir);
+            logger.debug("Checking directory {}", dataDir);
+            File dir = new File(dataDir);
 
+            // check that directories exist.
+            if (!dir.exists())
+            {
+                logger.error("Directory {} doesn't exist", dataDir);
+                // if they don't, failing their creation, stop cassandra.
+                if (!dir.mkdirs())
+                    throw new StartupException(3, "Has no permission to create directory "+ dataDir);
             }
+
+            // if directories exist verify their permissions
+            if (!Directories.verifyFullPermissions(dir, dataDir))
+                throw new StartupException(3, "Insufficient permissions on directory " + dataDir);
         }
     };
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index fc917f0..12c2c24 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -34,6 +34,7 @@ import com.google.common.util.concurrent.Uninterruptibles;
 
 import org.apache.cassandra.db.view.MaterializedViewManager;
 import org.apache.cassandra.db.view.MaterializedViewUtils;
+import org.apache.cassandra.db.HintedHandOffManager;
 import org.apache.cassandra.metrics.*;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
@@ -57,6 +58,8 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.hints.Hint;
+import org.apache.cassandra.hints.HintsService;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.IEndpointSnitch;
@@ -99,7 +102,9 @@ public class StorageProxy implements StorageProxyMBean
 
     private static final double CONCURRENT_SUBREQUESTS_MARGIN = 0.10;
 
-    private StorageProxy() {}
+    private StorageProxy()
+    {
+    }
 
     static
     {
@@ -113,6 +118,9 @@ public class StorageProxy implements StorageProxyMBean
             throw new RuntimeException(e);
         }
 
+        HintsService.instance.registerMBean();
+        HintedHandOffManager.instance.registerMBean();
+
         standardWritePerformer = new WritePerformer()
         {
             public void apply(IMutation mutation,
@@ -603,32 +611,41 @@ public class StorageProxy implements StorageProxyMBean
         }
     }
 
-    /** hint all the mutations (except counters, which can't be safely retried).  This means
-      * we'll re-hint any successful ones; doesn't seem worth it to track individual success
-      * just for this unusual case.
-
-      * @param mutations the mutations that require hints
-      */
+    /**
+     * Hint all the mutations (except counters, which can't be safely retried).  This means
+     * we'll re-hint any successful ones; doesn't seem worth it to track individual success
+     * just for this unusual case.
+     *
+     * Only used for CL.ANY
+     *
+     * @param mutations the mutations that require hints
+     */
     private static void hintMutations(Collection<? extends IMutation> mutations)
     {
         for (IMutation mutation : mutations)
-        {
-            if (mutation instanceof CounterMutation)
-                continue;
+            if (!(mutation instanceof CounterMutation))
+                hintMutation((Mutation) mutation);
 
-            Token tk = mutation.key().getToken();
-            List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(mutation.getKeyspaceName(), tk);
-            Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, mutation.getKeyspaceName());
-            for (InetAddress target : Iterables.concat(naturalEndpoints, pendingEndpoints))
-            {
-                // local writes can timeout, but cannot be dropped (see LocalMutationRunnable and
-                // CASSANDRA-6510), so there is no need to hint or retry
-                if (!target.equals(FBUtilities.getBroadcastAddress()) && shouldHint(target))
-                    submitHint((Mutation) mutation, target, null);
-            }
-        }
+        Tracing.trace("Wrote hints to satisfy CL.ANY after no replicas acknowledged the write");
+    }
+
+    private static void hintMutation(Mutation mutation)
+    {
+        Token tk = DatabaseDescriptor.getPartitioner().getToken(mutation.key().getKey());
+        List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(mutation.getKeyspaceName(), tk);
+        Collection<InetAddress> pendingEndpoints =
+            StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, mutation.getKeyspaceName());
+
+        Iterable<InetAddress> endpoints = Iterables.concat(naturalEndpoints, pendingEndpoints);
+        ArrayList<InetAddress> endpointsToHint = new ArrayList<>(Iterables.size(endpoints));
+
+        // local writes can timeout, but cannot be dropped (see LocalMutationRunnable and CASSANDRA-6510),
+        // so there is no need to hint or retry.
+        for (InetAddress target : endpoints)
+            if (!target.equals(FBUtilities.getBroadcastAddress()) && shouldHint(target))
+                endpointsToHint.add(target);
 
-        Tracing.trace("Wrote hint to satisfy CL.ANY after no replicas acknowledged the write");
+        submitHint(mutation, endpointsToHint, null);
     }
 
     /**
@@ -1071,7 +1088,7 @@ public class StorageProxy implements StorageProxyMBean
         MessageOut<Mutation> message = null;
 
         boolean insertLocal = false;
-
+        ArrayList<InetAddress> endpointsToHint = null;
 
         for (InetAddress destination : targets)
         {
@@ -1115,16 +1132,21 @@ public class StorageProxy implements StorageProxyMBean
                         messages.add(destination);
                     }
                 }
-            } else
+            }
+            else
             {
-                if (!shouldHint(destination))
-                    continue;
-
-                // Schedule a local hint
-                submitHint(mutation, destination, responseHandler);
+                if (shouldHint(destination))
+                {
+                    if (endpointsToHint == null)
+                        endpointsToHint = new ArrayList<>(Iterables.size(targets));
+                    endpointsToHint.add(destination);
+                }
             }
         }
 
+        if (endpointsToHint != null)
+            submitHint(mutation, endpointsToHint, responseHandler);
+
         if (insertLocal)
             insertLocal(stage, mutation, responseHandler);
 
@@ -1139,66 +1161,6 @@ public class StorageProxy implements StorageProxyMBean
         }
     }
 
-    private static AtomicInteger getHintsInProgressFor(InetAddress destination)
-    {
-        try
-        {
-            return hintsInProgress.load(destination);
-        }
-        catch (Exception e)
-        {
-            throw new AssertionError(e);
-        }
-    }
-
-    public static Future<Void> submitHint(final Mutation mutation,
-                                          final InetAddress target,
-                                          final AbstractWriteResponseHandler<IMutation> responseHandler)
-    {
-        // local write that time out should be handled by LocalMutationRunnable
-        assert !target.equals(FBUtilities.getBroadcastAddress()) : target;
-
-        HintRunnable runnable = new HintRunnable(target)
-        {
-            public void runMayThrow()
-            {
-                int ttl = HintedHandOffManager.calculateHintTTL(mutation);
-                if (ttl > 0)
-                {
-                    logger.debug("Adding hint for {}", target);
-                    writeHintForMutation(mutation, System.currentTimeMillis(), ttl, target);
-                    // Notify the handler only for CL == ANY
-                    if (responseHandler != null && responseHandler.consistencyLevel == ConsistencyLevel.ANY)
-                        responseHandler.response(null);
-                } else
-                {
-                    logger.debug("Skipped writing hint for {} (ttl {})", target, ttl);
-                }
-            }
-        };
-
-        return submitHint(runnable);
-    }
-
-    private static Future<Void> submitHint(HintRunnable runnable)
-    {
-        StorageMetrics.totalHintsInProgress.inc();
-        getHintsInProgressFor(runnable.target).incrementAndGet();
-        return (Future<Void>) StageManager.getStage(Stage.MUTATION).submit(runnable);
-    }
-
-    /**
-     * @param now current time in milliseconds - relevant for hint replay handling of truncated CFs
-     */
-    public static void writeHintForMutation(Mutation mutation, long now, int ttl, InetAddress target)
-    {
-        assert ttl > 0;
-        UUID hostId = StorageService.instance.getTokenMetadata().getHostId(target);
-        assert hostId != null : "Missing host ID for " + target.getHostAddress();
-        HintedHandOffManager.instance.hintFor(mutation, now, ttl, hostId).apply();
-        StorageMetrics.totalHints.inc();
-    }
-
     private static void sendMessagesToNonlocalDC(MessageOut<? extends IMutation> message,
                                                  Collection<InetAddress> targets,
                                                  AbstractWriteResponseHandler<IMutation> handler)
@@ -2209,7 +2171,7 @@ public class StorageProxy implements StorageProxyMBean
     {
         if (!DatabaseDescriptor.hintedHandoffEnabled())
         {
-            HintedHandOffManager.instance.metrics.incrPastWindow(ep);
+            HintsService.instance.metrics.incrPastWindow(ep);
             return false;
         }
 
@@ -2220,7 +2182,7 @@ public class StorageProxy implements StorageProxyMBean
             if (disabledDCs.contains(dc))
             {
                 Tracing.trace("Not hinting {} since its data center {} has been disabled {}", ep, dc, disabledDCs);
-                HintedHandOffManager.instance.metrics.incrPastWindow(ep);
+                HintsService.instance.metrics.incrPastWindow(ep);
                 return false;
             }
         }
@@ -2228,7 +2190,7 @@ public class StorageProxy implements StorageProxyMBean
         boolean hintWindowExpired = Gossiper.instance.getEndpointDowntime(ep) > DatabaseDescriptor.getMaxHintWindow();
         if (hintWindowExpired)
         {
-            HintedHandOffManager.instance.metrics.incrPastWindow(ep);
+            HintsService.instance.metrics.incrPastWindow(ep);
             Tracing.trace("Not hinting {} which has been down {} ms", ep, Gossiper.instance.getEndpointDowntime(ep));
         }
         return !hintWindowExpired;
@@ -2363,7 +2325,7 @@ public class StorageProxy implements StorageProxyMBean
             if (System.currentTimeMillis() > constructionTime + DatabaseDescriptor.getTimeout(MessagingService.Verb.MUTATION))
             {
                 MessagingService.instance().incrementDroppedMessages(MessagingService.Verb.MUTATION);
-                HintRunnable runnable = new HintRunnable(FBUtilities.getBroadcastAddress())
+                HintRunnable runnable = new HintRunnable(Collections.singleton(FBUtilities.getBroadcastAddress()))
                 {
                     protected void runMayThrow() throws Exception
                     {
@@ -2393,11 +2355,11 @@ public class StorageProxy implements StorageProxyMBean
      */
     private abstract static class HintRunnable implements Runnable
     {
-        public final InetAddress target;
+        public final Collection<InetAddress> targets;
 
-        protected HintRunnable(InetAddress target)
+        protected HintRunnable(Collection<InetAddress> targets)
         {
-            this.target = target;
+            this.targets = targets;
         }
 
         public void run()
@@ -2412,8 +2374,9 @@ public class StorageProxy implements StorageProxyMBean
             }
             finally
             {
-                StorageMetrics.totalHintsInProgress.dec();
-                getHintsInProgressFor(target).decrementAndGet();
+                StorageMetrics.totalHintsInProgress.dec(targets.size());
+                for (InetAddress target : targets)
+                    getHintsInProgressFor(target).decrementAndGet();
             }
         }
 
@@ -2446,6 +2409,52 @@ public class StorageProxy implements StorageProxyMBean
             logger.warn("Some hints were not written before shutdown.  This is not supposed to happen.  You should (a) run repair, and (b) file a bug report");
     }
 
+    private static AtomicInteger getHintsInProgressFor(InetAddress destination)
+    {
+        try
+        {
+            return hintsInProgress.load(destination);
+        }
+        catch (Exception e)
+        {
+            throw new AssertionError(e);
+        }
+    }
+
+    public static Future<Void> submitHint(Mutation mutation, InetAddress target, AbstractWriteResponseHandler<IMutation> responseHandler)
+    {
+        return submitHint(mutation, Collections.singleton(target), responseHandler);
+    }
+
+    public static Future<Void> submitHint(Mutation mutation,
+                                          Collection<InetAddress> targets,
+                                          AbstractWriteResponseHandler<IMutation> responseHandler)
+    {
+        HintRunnable runnable = new HintRunnable(targets)
+        {
+            public void runMayThrow()
+            {
+                logger.debug("Adding hints for {}", targets);
+                HintsService.instance.write(Iterables.transform(targets, StorageService.instance::getHostIdForEndpoint),
+                                            Hint.create(mutation, System.currentTimeMillis()));
+                targets.forEach(HintsService.instance.metrics::incrCreatedHints);
+                // Notify the handler only for CL == ANY
+                if (responseHandler != null && responseHandler.consistencyLevel == ConsistencyLevel.ANY)
+                    responseHandler.response(null);
+            }
+        };
+
+        return submitHint(runnable);
+    }
+
+    private static Future<Void> submitHint(HintRunnable runnable)
+    {
+        StorageMetrics.totalHintsInProgress.inc(runnable.targets.size());
+        for (InetAddress target : runnable.targets)
+            getHintsInProgressFor(target).incrementAndGet();
+        return (Future<Void>) StageManager.getStage(Stage.MUTATION).submit(runnable);
+    }
+
     public Long getRpcTimeout() { return DatabaseDescriptor.getRpcTimeout(); }
     public void setRpcTimeout(Long timeoutInMillis) { DatabaseDescriptor.setRpcTimeout(timeoutInMillis); }
 
@@ -2478,11 +2487,11 @@ public class StorageProxy implements StorageProxyMBean
     public long getReadRepairAttempted() {
         return ReadRepairMetrics.attempted.getCount();
     }
-    
+
     public long getReadRepairRepairedBlocking() {
         return ReadRepairMetrics.repairedBlocking.getCount();
     }
-    
+
     public long getReadRepairRepairedBackground() {
         return ReadRepairMetrics.repairedBackground.getCount();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 5966e49..0346645 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -28,22 +28,8 @@ import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.Map.Entry;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.UUID;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -54,6 +40,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import javax.annotation.Nullable;
 import javax.management.JMX;
 import javax.management.MBeanServer;
 import javax.management.NotificationBroadcasterSupport;
@@ -95,6 +82,8 @@ import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
 import org.apache.cassandra.gms.IFailureDetector;
 import org.apache.cassandra.gms.TokenSerializer;
 import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.hints.HintVerbHandler;
+import org.apache.cassandra.hints.HintsService;
 import org.apache.cassandra.io.sstable.SSTableLoader;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
@@ -303,6 +292,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PAXOS_PREPARE, new PrepareVerbHandler());
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PAXOS_PROPOSE, new ProposeVerbHandler());
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PAXOS_COMMIT, new CommitVerbHandler());
+        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.HINT, new HintVerbHandler());
 
         // see BootStrapper for a summary of how the bootstrap verbs interact
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.REPLICATION_FINISHED, new ReplicationFinishedVerbHandler());
@@ -585,7 +575,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         logger.info("Cassandra version: {}", FBUtilities.getReleaseVersionString());
         logger.info("Thrift API version: {}", cassandraConstants.VERSION);
         logger.info("CQL supported versions: {} (default: {})",
-                    StringUtils.join(ClientState.getCQLSupportedVersion(), ","), ClientState.DEFAULT_CQL_VERSION);
+                StringUtils.join(ClientState.getCQLSupportedVersion(), ","), ClientState.DEFAULT_CQL_VERSION);
 
         initialized = true;
 
@@ -649,6 +639,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                 MessagingService.instance().shutdown();
                 materializedViewMutationStage.shutdown();
                 batchlogMutationStage.shutdown();
+                HintsService.instance.pauseDispatch();
                 counterMutationStage.shutdown();
                 mutationStage.shutdown();
                 materializedViewMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
@@ -681,6 +672,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                 if (FBUtilities.isWindows())
                     WindowsTimer.endTimerPeriod(DatabaseDescriptor.getWindowsTimerInterval());
 
+                HintsService.instance.shutdownBlocking();
+
                 // wait for miscellaneous tasks like sstable and commitlog segment deletion
                 ScheduledExecutors.nonPeriodicTasks.shutdown();
                 if (!ScheduledExecutors.nonPeriodicTasks.awaitTermination(1, TimeUnit.MINUTES))
@@ -792,7 +785,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                 MessagingService.instance().listen(FBUtilities.getLocalAddress());
             LoadBroadcaster.instance.startBroadcasting();
 
-            HintedHandOffManager.instance.start();
+            HintsService.instance.startDispatch();
             BatchlogManager.instance.start();
         }
     }
@@ -1564,6 +1557,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         return getTokenMetadata().getHostId(FBUtilities.getBroadcastAddress()).toString();
     }
 
+    public UUID getLocalHostUUID()
+    {
+        return getTokenMetadata().getHostId(FBUtilities.getBroadcastAddress());
+    }
+
     public Map<String, String> getHostIdMap()
     {
         Map<String, String> mapOut = new HashMap<>();
@@ -2119,11 +2117,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     private void excise(Collection<Token> tokens, InetAddress endpoint)
     {
         logger.info("Removing tokens {} for {}", tokens, endpoint);
-        HintedHandOffManager.instance.deleteHintsForEndpoint(endpoint);
+
+        if (tokenMetadata.isMember(endpoint))
+            HintsService.instance.excise(tokenMetadata.getHostId(endpoint));
+
         removeEndpoint(endpoint);
         tokenMetadata.removeEndpoint(endpoint);
         tokenMetadata.removeBootstrapTokens(tokens);
-
         notifyLeft(endpoint);
         PendingRangeCalculatorService.instance.update();
     }
@@ -2337,10 +2337,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         MigrationManager.instance.scheduleSchemaPull(endpoint, state);
 
         if (tokenMetadata.isMember(endpoint))
-        {
-            HintedHandOffManager.instance.scheduleHintDelivery(endpoint, true);
             notifyUp(endpoint);
-        }
     }
 
     public void onRemove(InetAddress endpoint)
@@ -2380,9 +2377,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         return map;
     }
 
+    // TODO
     public final void deliverHints(String host) throws UnknownHostException
     {
-        HintedHandOffManager.instance.scheduleHintDelivery(host);
+        throw new UnsupportedOperationException();
     }
 
     public Collection<Token> getLocalTokens()
@@ -2392,6 +2390,18 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         return tokens;
     }
 
+    @Nullable
+    public InetAddress getEndpointForHostId(UUID hostId)
+    {
+        return tokenMetadata.getEndpointForHostId(hostId);
+    }
+
+    @Nullable
+    public UUID getHostIdForEndpoint(InetAddress address)
+    {
+        return tokenMetadata.getHostId(address);
+    }
+
     /* These methods belong to the MBean interface */
 
     public List<String> getTokens()
@@ -3418,7 +3428,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     private Future<StreamState> streamHints()
     {
         // StreamPlan will not fail if there are zero files to transfer, so flush anyway (need to get any in-memory hints, as well)
-        ColumnFamilyStore hintsCF = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.HINTS);
+        ColumnFamilyStore hintsCF = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_HINTS);
         FBUtilities.waitOnFuture(hintsCF.forceFlush());
 
         // gather all live nodes in the cluster that aren't also leaving
@@ -3451,7 +3461,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                                                           preferred,
                                                           SystemKeyspace.NAME,
                                                           ranges,
-                                                          SystemKeyspace.HINTS)
+                                                          SystemKeyspace.LEGACY_HINTS)
                                           .execute();
         }
     }
@@ -3835,7 +3845,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     public synchronized void drain() throws IOException, InterruptedException, ExecutionException
     {
         inShutdownHook = true;
-        
+
+        BatchlogManager.shutdown();
+
+        HintsService.instance.pauseDispatch();
+
         ExecutorService counterMutationStage = StageManager.getStage(Stage.COUNTER_MUTATION);
         ExecutorService batchlogMutationStage = StageManager.getStage(Stage.BATCHLOG_MUTATION);
         ExecutorService materializedViewMutationStage = StageManager.getStage(Stage.MATERIALIZED_VIEW_MUTATION);
@@ -3899,7 +3913,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         }
         FBUtilities.waitOnFutures(flushes);
 
-        BatchlogManager.shutdown();
+        HintsService.instance.shutdownBlocking();
 
         // whilst we've flushed all the CFs, which will have recycled all completed segments, we want to ensure
         // there are no segments to replay, so we force the recycling of any remaining (should be at most one)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 22b3455..c1e0a0d 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -57,7 +57,6 @@ import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.db.BatchlogManager;
 import org.apache.cassandra.db.BatchlogManagerMBean;
 import org.apache.cassandra.db.ColumnFamilyStoreMBean;
-import org.apache.cassandra.db.HintedHandOffManager;
 import org.apache.cassandra.db.HintedHandOffManagerMBean;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.compaction.CompactionManagerMBean;
@@ -65,6 +64,7 @@ import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.gms.FailureDetectorMBean;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.gms.GossiperMBean;
+import org.apache.cassandra.db.HintedHandOffManager;
 import org.apache.cassandra.locator.EndpointSnitchInfoMBean;
 import org.apache.cassandra.metrics.CassandraMetricsRegistry;
 import org.apache.cassandra.metrics.TableMetrics.Sampler;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index d1462d2..4f7a97a 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -26,10 +26,10 @@ import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.*;
 import java.util.concurrent.*;
+import java.util.zip.CRC32;
 import java.util.zip.Checksum;
 
 import com.google.common.base.Joiner;
-import org.apache.cassandra.utils.AbstractIterator;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -332,7 +332,7 @@ public class FBUtilities
 
     public static int nowInSeconds()
     {
-        return (int)(System.currentTimeMillis() / 1000);
+        return (int) (System.currentTimeMillis() / 1000);
     }
 
     public static void waitOnFutures(Iterable<Future<?>> futures)
@@ -595,6 +595,34 @@ public class FBUtilities
         checksum.update((v >>> 0) & 0xFF);
     }
 
+    /**
+      * Updates checksum with the provided ByteBuffer at the given offset + length.
+      * Resets position and limit back to their original values on return.
+      * This method is *NOT* thread-safe.
+      */
+    public static void updateChecksum(CRC32 checksum, ByteBuffer buffer, int offset, int length)
+    {
+        int position = buffer.position();
+        int limit = buffer.limit();
+
+        buffer.position(offset).limit(offset + length);
+        checksum.update(buffer);
+
+        buffer.position(position).limit(limit);
+    }
+
+    /**
+     * Updates checksum with the provided ByteBuffer.
+     * Resets position back to its original values on return.
+     * This method is *NOT* thread-safe.
+     */
+    public static void updateChecksum(CRC32 checksum, ByteBuffer buffer)
+    {
+        int position = buffer.position();
+        checksum.update(buffer);
+        buffer.position(position);
+    }
+
     private static final ThreadLocal<byte[]> threadLocalScratchBuffer = new ThreadLocal<byte[]>()
     {
         @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/utils/Throwables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/Throwables.java b/src/java/org/apache/cassandra/utils/Throwables.java
index 0a2bd28..d6ce7b4 100644
--- a/src/java/org/apache/cassandra/utils/Throwables.java
+++ b/src/java/org/apache/cassandra/utils/Throwables.java
@@ -18,10 +18,25 @@
 */
 package org.apache.cassandra.utils;
 
-public class Throwables
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.stream.Stream;
+
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.FSWriteError;
+
+public final class Throwables
 {
+    public enum FileOpType { READ, WRITE }
 
-    public static Throwable merge(Throwable existingFail, Throwable newFail)
+    public interface DiscreteAction<E extends Exception>
+    {
+        void perform() throws E;
+    }
+
+    public static <T extends Throwable> T merge(T existingFail, T newFail)
     {
         if (existingFail == null)
             return newFail;
@@ -31,7 +46,74 @@ public class Throwables
 
     public static void maybeFail(Throwable fail)
     {
-        if (fail != null)
-            com.google.common.base.Throwables.propagate(fail);
+        if (failIfCanCast(fail, null))
+            throw new RuntimeException(fail);
+    }
+
+    public static <T extends Throwable> void maybeFail(Throwable fail, Class<T> checked) throws T
+    {
+        if (failIfCanCast(fail, checked))
+            throw new RuntimeException(fail);
+    }
+
+    public static <T extends Throwable> boolean failIfCanCast(Throwable fail, Class<T> checked) throws T
+    {
+        if (fail == null)
+            return false;
+
+        if (fail instanceof Error)
+            throw (Error) fail;
+
+        if (fail instanceof RuntimeException)
+            throw (RuntimeException) fail;
+
+        if (checked != null && checked.isInstance(fail))
+            throw checked.cast(fail);
+
+        return true;
+    }
+
+    @SafeVarargs
+    public static <E extends Exception> void perform(DiscreteAction<? extends E> ... actions) throws E
+    {
+        perform(Arrays.stream(actions));
+    }
+
+    @SuppressWarnings("unchecked")
+    public static <E extends Exception> void perform(Stream<DiscreteAction<? extends E>> actions) throws E
+    {
+        Throwable fail = null;
+        Iterator<DiscreteAction<? extends E>> iter = actions.iterator();
+        while (iter.hasNext())
+        {
+            DiscreteAction<? extends E> action = iter.next();
+            try
+            {
+                action.perform();
+            }
+            catch (Throwable t)
+            {
+                fail = merge(fail, t);
+            }
+        }
+
+        if (failIfCanCast(fail, null))
+            throw (E) fail;
+    }
+
+    @SafeVarargs
+    public static void perform(File against, FileOpType opType, DiscreteAction<? extends IOException> ... actions)
+    {
+        perform(Arrays.stream(actions).map((action) -> () ->
+        {
+            try
+            {
+                action.perform();
+            }
+            catch (IOException e)
+            {
+                throw (opType == FileOpType.WRITE) ? new FSWriteError(e, against) : new FSReadError(e, against);
+            }
+        }));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/test/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml
index 0bbaee4..1dba284 100644
--- a/test/conf/cassandra.yaml
+++ b/test/conf/cassandra.yaml
@@ -8,6 +8,7 @@ commitlog_sync: batch
 commitlog_sync_batch_window_in_ms: 1.0
 commitlog_segment_size_in_mb: 5
 commitlog_directory: build/test/cassandra/commitlog
+hints_directory: build/test/cassandra/hints
 partitioner: org.apache.cassandra.dht.ByteOrderedPartitioner
 listen_address: 127.0.0.1
 storage_port: 7010

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/test/long/org/apache/cassandra/hints/HintsWriteThenReadTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/hints/HintsWriteThenReadTest.java b/test/long/org/apache/cassandra/hints/HintsWriteThenReadTest.java
new file mode 100644
index 0000000..fd880cb
--- /dev/null
+++ b/test/long/org/apache/cassandra/hints/HintsWriteThenReadTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.Iterator;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.zip.CRC32;
+
+import com.google.common.collect.Iterables;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertTrue;
+
+import static org.apache.cassandra.Util.dk;
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+
+public class HintsWriteThenReadTest
+{
+    private static final String KEYSPACE = "hints_write_then_read_test";
+    private static final String TABLE = "table";
+
+    private static final int HINTS_COUNT = 10_000_000;
+
+    @Test
+    public void testWriteReadCycle() throws IOException
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), SchemaLoader.standardCFMD(KEYSPACE, TABLE));
+
+        HintsDescriptor descriptor = new HintsDescriptor(UUID.randomUUID(), System.currentTimeMillis());
+
+        File directory = Files.createTempDirectory(null).toFile();
+        try
+        {
+            testWriteReadCycle(directory, descriptor);
+        }
+        finally
+        {
+            directory.deleteOnExit();
+        }
+    }
+
+    private void testWriteReadCycle(File directory, HintsDescriptor descriptor) throws IOException
+    {
+        // write HINTS_COUNT hints to a file
+        writeHints(directory, descriptor);
+
+        // calculate the checksum of the file, then compare to the .crc32 checksum file content
+        verifyChecksum(directory, descriptor);
+
+        // iterate over the written hints, make sure they are all present
+        verifyHints(directory, descriptor);
+    }
+
+    private void writeHints(File directory, HintsDescriptor descriptor) throws IOException
+    {
+        try (HintsWriter writer = HintsWriter.create(directory, descriptor))
+        {
+            write(writer, descriptor.timestamp);
+        }
+    }
+
+    private static void verifyChecksum(File directory, HintsDescriptor descriptor) throws IOException
+    {
+        File hintsFile = new File(directory, descriptor.fileName());
+        File checksumFile = new File(directory, descriptor.checksumFileName());
+
+        assertTrue(checksumFile.exists());
+
+        String actualChecksum = Integer.toHexString(calculateChecksum(hintsFile));
+        String expectedChecksum = Files.readAllLines(checksumFile.toPath()).iterator().next();
+
+        assertEquals(expectedChecksum, actualChecksum);
+    }
+
+    private void verifyHints(File directory, HintsDescriptor descriptor)
+    {
+        long baseTimestamp = descriptor.timestamp;
+        int index = 0;
+
+        try (HintsReader reader = HintsReader.open(new File(directory, descriptor.fileName())))
+        {
+            for (HintsReader.Page page : reader)
+            {
+                Iterator<Hint> hints = page.hintsIterator();
+                while (hints.hasNext())
+                {
+                    Hint hint = hints.next();
+
+                    long timestamp = baseTimestamp + index;
+                    Mutation mutation = hint.mutation;
+
+                    assertEquals(timestamp, hint.creationTime);
+                    assertEquals(dk(bytes(index)), mutation.key());
+
+                    Row row = mutation.getPartitionUpdates().iterator().next().iterator().next();
+                    assertEquals(1, Iterables.size(row.cells()));
+                    assertEquals(bytes(index), row.clustering().get(0));
+                    Cell cell = row.cells().iterator().next();
+                    assertNotNull(cell);
+                    assertEquals(bytes(index), cell.value());
+                    assertEquals(timestamp * 1000, cell.timestamp());
+
+                    index++;
+                }
+            }
+        }
+
+        assertEquals(index, HINTS_COUNT);
+    }
+
+    private void write(HintsWriter writer, long timestamp) throws IOException
+    {
+        ByteBuffer buffer = ByteBuffer.allocateDirect(256 * 1024);
+        try (HintsWriter.Session session = writer.newSession(buffer))
+        {
+            write(session, timestamp);
+        }
+        FileUtils.clean(buffer);
+    }
+
+    private void write(HintsWriter.Session session, long timestamp) throws IOException
+    {
+        for (int i = 0; i < HINTS_COUNT; i++)
+            session.append(createHint(i, timestamp));
+    }
+
+    private static Hint createHint(int idx, long baseTimestamp)
+    {
+        long timestamp = baseTimestamp + idx;
+        return Hint.create(createMutation(idx, TimeUnit.MILLISECONDS.toMicros(timestamp)), timestamp);
+    }
+
+    private static Mutation createMutation(int index, long timestamp)
+    {
+        CFMetaData table = Schema.instance.getCFMetaData(KEYSPACE, TABLE);
+        return new RowUpdateBuilder(table, timestamp, bytes(index))
+               .clustering(bytes(index))
+               .add("val", bytes(index))
+               .build();
+    }
+
+    private static int calculateChecksum(File file) throws IOException
+    {
+        CRC32 crc = new CRC32();
+        byte[] buffer = new byte[FBUtilities.MAX_UNSIGNED_SHORT];
+
+        try (InputStream in = Files.newInputStream(file.toPath()))
+        {
+            int bytesRead;
+            while((bytesRead = in.read(buffer)) != -1)
+                crc.update(buffer, 0, bytesRead);
+        }
+
+        return (int) crc.getValue();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/test/unit/org/apache/cassandra/OffsetAwareConfigurationLoader.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/OffsetAwareConfigurationLoader.java b/test/unit/org/apache/cassandra/OffsetAwareConfigurationLoader.java
index 9023b11..3bdb192 100644
--- a/test/unit/org/apache/cassandra/OffsetAwareConfigurationLoader.java
+++ b/test/unit/org/apache/cassandra/OffsetAwareConfigurationLoader.java
@@ -54,6 +54,7 @@ public class OffsetAwareConfigurationLoader extends YamlConfigurationLoader
 
         config.commitlog_directory += File.pathSeparator + offset;
         config.saved_caches_directory += File.pathSeparator + offset;
+        config.hints_directory += File.pathSeparator + offset;
         for (int i = 0; i < config.data_file_directories.length; i++)
             config.data_file_directories[i] += File.pathSeparator + offset;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/test/unit/org/apache/cassandra/db/HintedHandOffTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/HintedHandOffTest.java b/test/unit/org/apache/cassandra/db/HintedHandOffTest.java
deleted file mode 100644
index e06c95a..0000000
--- a/test/unit/org/apache/cassandra/db/HintedHandOffTest.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.cassandra.db;
-
-import java.net.InetAddress;
-import java.util.Map;
-import java.util.UUID;
-
-import com.google.common.collect.Iterators;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.cql3.UntypedResultSet;
-import org.apache.cassandra.db.marshal.Int32Type;
-import org.apache.cassandra.db.marshal.UUIDType;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.schema.KeyspaceParams;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
-
-import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
-import static org.junit.Assert.assertEquals;
-
-public class HintedHandOffTest
-{
-
-    public static final String KEYSPACE4 = "HintedHandOffTest4";
-    public static final String STANDARD1_CF = "Standard1";
-
-    @BeforeClass
-    public static void defineSchema() throws ConfigurationException
-    {
-        SchemaLoader.prepareServer();
-        SchemaLoader.createKeyspace(KEYSPACE4,
-                                    KeyspaceParams.simple(1),
-                                    SchemaLoader.standardCFMD(KEYSPACE4, STANDARD1_CF));
-    }
-
-    @Before
-    public void clearHints()
-    {
-        Keyspace systemKeyspace = Keyspace.open("system");
-        ColumnFamilyStore hintStore = systemKeyspace.getColumnFamilyStore(SystemKeyspace.HINTS);
-        hintStore.clearUnsafe();
-    }
-
-    // Test compaction of hints column family. It shouldn't remove all columns on compaction.
-    @Test
-    public void testCompactionOfHintsCF() throws Exception
-    {
-        // prepare hints column family
-        Keyspace systemKeyspace = Keyspace.open("system");
-        ColumnFamilyStore hintStore = systemKeyspace.getColumnFamilyStore(SystemKeyspace.HINTS);
-        hintStore.clearUnsafe();
-        hintStore.metadata.gcGraceSeconds(36000); // 10 hours
-        hintStore.disableAutoCompaction();
-
-        // insert 1 hint
-        Mutation rm = mutation();
-        HintedHandOffManager.instance.hintFor(rm,
-                                              System.currentTimeMillis(),
-                                              HintedHandOffManager.calculateHintTTL(rm),
-                                              UUID.randomUUID())
-                                     .applyUnsafe();
-
-        // flush data to disk
-        hintStore.forceBlockingFlush();
-        assertEquals(1, hintStore.getLiveSSTables().size());
-
-        // submit compaction
-        HintedHandOffManager.instance.compact();
-
-        // single row should not be removed because of gc_grace_seconds
-        // is 10 hours and there are no any tombstones in sstable
-        assertEquals(1, hintStore.getLiveSSTables().size());
-    }
-
-    @Test
-    public void testHintsMetrics() throws Exception
-    {
-        for (int i = 0; i < 99; i++)
-            HintedHandOffManager.instance.metrics.incrPastWindow(InetAddress.getLocalHost());
-        HintedHandOffManager.instance.metrics.log();
-
-        UntypedResultSet rows = executeInternal("SELECT hints_dropped FROM system." + SystemKeyspace.PEER_EVENTS);
-        Map<UUID, Integer> returned = rows.one().getMap("hints_dropped", UUIDType.instance, Int32Type.instance);
-        assertEquals(Iterators.getLast(returned.values().iterator()).intValue(), 99);
-    }
-
-    @Test(timeout = 5000)
-    public void testTruncateHints() throws Exception
-    {
-        // insert 1 hint
-        Mutation rm = mutation();
-        HintedHandOffManager.instance.hintFor(rm,
-                                              System.currentTimeMillis(),
-                                              HintedHandOffManager.calculateHintTTL(rm),
-                                              UUID.randomUUID())
-                                     .applyUnsafe();
-
-        assert getNoOfHints() == 1;
-
-        HintedHandOffManager.instance.truncateAllHints();
-
-        while(getNoOfHints() > 0)
-        {
-            Thread.sleep(100);
-        }
-
-        assert getNoOfHints() == 0;
-    }
-
-    private Mutation mutation()
-    {
-        // get a random mutation to write a hint for
-        return new RowUpdateBuilder(Keyspace.open(KEYSPACE4).getColumnFamilyStore(STANDARD1_CF).metadata,
-                                    FBUtilities.timestampMicros(),
-                                    ByteBufferUtil.bytes(1))
-               .clustering("cluster_col0")
-               .add("val", "value0")
-               .build();
-    }
-
-    private int getNoOfHints()
-    {
-        String req = "SELECT * FROM system.%s";
-        UntypedResultSet resultSet = executeInternal(String.format(req, SystemKeyspace.HINTS));
-        return resultSet.size();
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index b41b7b3..fcdab62 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@ -250,7 +250,7 @@ public class CommitLogTest
         // Adding new mutation on another CF, large enough (including CL entry overhead) that a new segment is created
         Mutation rm2 = new RowUpdateBuilder(cfs2.metadata, 0, "k")
                        .clustering("bytes")
-                       .add("val", ByteBuffer.allocate((DatabaseDescriptor.getCommitLogSegmentSize()/2) - 200))
+                       .add("val", ByteBuffer.allocate(DatabaseDescriptor.getMaxMutationSize() - 200))
                        .build();
         CommitLog.instance.add(rm2);
         // also forces a new segment, since each entry-with-overhead is just under half the CL size
@@ -280,7 +280,7 @@ public class CommitLogTest
                       .clustering(colName)
                       .add("val", ByteBuffer.allocate(allocSize)).build();
 
-        int max = (DatabaseDescriptor.getCommitLogSegmentSize() / 2);
+        int max = DatabaseDescriptor.getMaxMutationSize();
         max -= CommitLogSegment.ENTRY_OVERHEAD_SIZE; // log entry overhead
 
         // Note that the size of the value if vint encoded. So we first compute the ovehead of the mutation without the value and it's size

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/test/unit/org/apache/cassandra/hints/ChecksummedDataInputTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/hints/ChecksummedDataInputTest.java b/test/unit/org/apache/cassandra/hints/ChecksummedDataInputTest.java
new file mode 100644
index 0000000..e431924
--- /dev/null
+++ b/test/unit/org/apache/cassandra/hints/ChecksummedDataInputTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.zip.CRC32;
+
+import org.junit.Test;
+
+import org.apache.cassandra.hints.ChecksummedDataInput;
+import org.apache.cassandra.io.util.AbstractDataInput;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
+
+public class ChecksummedDataInputTest
+{
+    @Test
+    public void testThatItWorks() throws IOException
+    {
+        // fill a bytebuffer with some input
+        DataOutputBuffer out = new DataOutputBuffer();
+        out.write(127);
+        out.write(new byte[]{ 0, 1, 2, 3, 4, 5, 6 });
+        out.writeBoolean(false);
+        out.writeByte(10);
+        out.writeChar('t');
+        out.writeDouble(3.3);
+        out.writeFloat(2.2f);
+        out.writeInt(42);
+        out.writeLong(Long.MAX_VALUE);
+        out.writeShort(Short.MIN_VALUE);
+        out.writeUTF("utf");
+        ByteBuffer buffer = out.buffer();
+
+        // calculate resulting CRC
+        CRC32 crc = new CRC32();
+        FBUtilities.updateChecksum(crc, buffer);
+        int expectedCRC = (int) crc.getValue();
+
+        ChecksummedDataInput crcInput = ChecksummedDataInput.wrap(new DummyByteBufferDataInput(buffer.duplicate()));
+        crcInput.limit(buffer.remaining());
+
+        // assert that we read all the right values back
+        assertEquals(127, crcInput.read());
+        byte[] bytes = new byte[7];
+        crcInput.readFully(bytes);
+        assertTrue(Arrays.equals(new byte[]{ 0, 1, 2, 3, 4, 5, 6 }, bytes));
+        assertEquals(false, crcInput.readBoolean());
+        assertEquals(10, crcInput.readByte());
+        assertEquals('t', crcInput.readChar());
+        assertEquals(3.3, crcInput.readDouble());
+        assertEquals(2.2f, crcInput.readFloat());
+        assertEquals(42, crcInput.readInt());
+        assertEquals(Long.MAX_VALUE, crcInput.readLong());
+        assertEquals(Short.MIN_VALUE, crcInput.readShort());
+        assertEquals("utf", crcInput.readUTF());
+
+        // assert that the crc matches, and that we've read exactly as many bytes as expected
+        assertEquals(0, crcInput.bytesRemaining());
+        assertEquals(expectedCRC, crcInput.getCrc());
+    }
+
+    private static final class DummyByteBufferDataInput extends AbstractDataInput
+    {
+        private final ByteBuffer buffer;
+
+        DummyByteBufferDataInput(ByteBuffer buffer)
+        {
+            this.buffer = buffer;
+        }
+
+        public void seek(long position)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public long getPosition()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public long getPositionLimit()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public int read()
+        {
+            return buffer.get() & 0xFF;
+        }
+    }
+}