You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/08/19 16:29:49 UTC
[2/5] cassandra git commit: Rewrite hinted handoff
http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/hints/HintsWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsWriter.java b/src/java/org/apache/cassandra/hints/HintsWriter.java
new file mode 100644
index 0000000..300d9cc
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/HintsWriter.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.util.zip.CRC32;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputBufferFixed;
+import org.apache.cassandra.utils.CLibrary;
+import org.apache.cassandra.utils.SyncUtil;
+import org.apache.cassandra.utils.Throwables;
+
+import static org.apache.cassandra.utils.FBUtilities.updateChecksum;
+import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
+import static org.apache.cassandra.utils.Throwables.perform;
+
+final class HintsWriter implements AutoCloseable
+{
+ static final int PAGE_SIZE = 4096;
+
+ private final File directory;
+ private final HintsDescriptor descriptor;
+ private final File file;
+ private final FileChannel channel;
+ private final int fd;
+ private final CRC32 globalCRC;
+
+ private volatile long lastSyncPosition = 0L;
+
+ private HintsWriter(File directory, HintsDescriptor descriptor, File file, FileChannel channel, int fd, CRC32 globalCRC)
+ {
+ this.directory = directory;
+ this.descriptor = descriptor;
+ this.file = file;
+ this.channel = channel;
+ this.fd = fd;
+ this.globalCRC = globalCRC;
+ }
+
+ static HintsWriter create(File directory, HintsDescriptor descriptor) throws IOException
+ {
+ File file = new File(directory, descriptor.fileName());
+
+ FileChannel channel = FileChannel.open(file.toPath(), StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
+ int fd = CLibrary.getfd(channel);
+
+ CRC32 crc = new CRC32();
+
+ try
+ {
+ // write the descriptor
+ DataOutputBuffer dob = new DataOutputBuffer();
+ descriptor.serialize(dob);
+ ByteBuffer descriptorBytes = dob.buffer();
+ updateChecksum(crc, descriptorBytes);
+ channel.write(descriptorBytes);
+ }
+ catch (Throwable e)
+ {
+ channel.close();
+ throw e;
+ }
+
+ return new HintsWriter(directory, descriptor, file, channel, fd, crc);
+ }
+
+ HintsDescriptor descriptor()
+ {
+ return descriptor;
+ }
+
+ private void writeChecksum()
+ {
+ File checksumFile = new File(directory, descriptor.checksumFileName());
+ try (OutputStream out = Files.newOutputStream(checksumFile.toPath()))
+ {
+ out.write(Integer.toHexString((int) globalCRC.getValue()).getBytes());
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, checksumFile);
+ }
+ }
+
+ public void close()
+ {
+ perform(file, Throwables.FileOpType.WRITE, this::doFsync, channel::close);
+
+ writeChecksum();
+ }
+
+ public void fsync()
+ {
+ perform(file, Throwables.FileOpType.WRITE, this::doFsync);
+ }
+
+ private void doFsync() throws IOException
+ {
+ SyncUtil.force(channel, true);
+ lastSyncPosition = channel.position();
+ }
+
+ Session newSession(ByteBuffer buffer)
+ {
+ try
+ {
+ return new Session(buffer, channel.size());
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, file);
+ }
+ }
+
+ /**
+ * The primary goal of the Session class is to be able to share the same buffers among potentially dozens or hundreds
+ * of hints writers, and ensure that their contents are always written to the underlying channels in the end.
+ */
+ final class Session implements AutoCloseable
+ {
+ private final ByteBuffer buffer;
+
+ private final long initialSize;
+ private long bytesWritten;
+
+ Session(ByteBuffer buffer, long initialSize)
+ {
+ buffer.clear();
+ bytesWritten = 0L;
+
+ this.buffer = buffer;
+ this.initialSize = initialSize;
+ }
+
+ long position()
+ {
+ return initialSize + bytesWritten;
+ }
+
+ /**
+ * Appends the serialized hint (with CRC included) to this session's aggregation buffer,
+ * writes to the underlying channel when the buffer is overflown.
+ *
+ * @param hint the serialized hint (with CRC included)
+ * @throws IOException
+ */
+ void append(ByteBuffer hint) throws IOException
+ {
+ bytesWritten += hint.remaining();
+
+ // if the hint fits in the aggregation buffer, then just update the aggregation buffer,
+ // otherwise write both the aggregation buffer and the new buffer to the channel
+ if (hint.remaining() <= buffer.remaining())
+ {
+ buffer.put(hint);
+ return;
+ }
+
+ buffer.flip();
+
+ // update file-global CRC checksum
+ updateChecksum(globalCRC, buffer);
+ updateChecksum(globalCRC, hint);
+
+ channel.write(new ByteBuffer[] { buffer, hint });
+ buffer.clear();
+ }
+
+ /**
+ * Serializes and appends the hint (with CRC included) to this session's aggregation buffer,
+ * writes to the underlying channel when the buffer is overflown.
+ *
+ * Used mainly by tests and {@link LegacyHintsMigrator}
+ *
+ * @param hint the unserialized hint
+ * @throws IOException
+ */
+ void append(Hint hint) throws IOException
+ {
+ int hintSize = (int) Hint.serializer.serializedSize(hint, descriptor.messagingVersion());
+ int totalSize = hintSize + HintsBuffer.ENTRY_OVERHEAD_SIZE;
+
+ if (totalSize > buffer.remaining())
+ flushBuffer();
+
+ ByteBuffer hintBuffer = totalSize <= buffer.remaining()
+ ? buffer
+ : ByteBuffer.allocate(totalSize);
+
+ CRC32 crc = new CRC32();
+ try (DataOutputBufferFixed out = new DataOutputBufferFixed(hintBuffer))
+ {
+ out.writeInt(hintSize);
+ updateChecksumInt(crc, hintSize);
+ out.writeInt((int) crc.getValue());
+
+ Hint.serializer.serialize(hint, out, descriptor.messagingVersion());
+ updateChecksum(crc, hintBuffer, hintBuffer.position() - hintSize, hintSize);
+ out.writeInt((int) crc.getValue());
+ }
+
+ if (hintBuffer == buffer)
+ bytesWritten += totalSize;
+ else
+ append((ByteBuffer) hintBuffer.flip());
+ }
+
+ /**
+ * Closes the session - flushes the aggregation buffer (if not empty), does page aligning, and potentially fsyncs.
+ * @throws IOException
+ */
+ public void close() throws IOException
+ {
+ flushBuffer();
+ maybeFsync();
+ maybeSkipCache();
+ }
+
+ private void flushBuffer() throws IOException
+ {
+ buffer.flip();
+
+ if (buffer.remaining() > 0)
+ {
+ updateChecksum(globalCRC, buffer);
+ channel.write(buffer);
+ }
+
+ buffer.clear();
+ }
+
+ private void maybeFsync()
+ {
+ if (position() >= lastSyncPosition + DatabaseDescriptor.getTrickleFsyncIntervalInKb() * 1024)
+ fsync();
+ }
+
+ private void maybeSkipCache()
+ {
+ long position = position();
+
+ // don't skip page cache for tiny files, on the assumption that if they are tiny, the target node is probably
+ // alive, and if so, the file will be closed and dispatched shortly (within a minute), and the file will be dropped.
+ if (position >= DatabaseDescriptor.getTrickleFsyncIntervalInKb() * 1024)
+ CLibrary.trySkipCache(fd, 0, position - (position % PAGE_SIZE), file.getPath());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/hints/LegacyHintsMigrator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/LegacyHintsMigrator.java b/src/java/org/apache/cassandra/hints/LegacyHintsMigrator.java
new file mode 100644
index 0000000..082e307
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/LegacyHintsMigrator.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.marshal.UUIDType;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * A migrator that goes through the legacy system.hints table and writes all the hints to the new hints storage format.
+ */
+@SuppressWarnings("deprecation")
+public final class LegacyHintsMigrator
+{
+ private static final Logger logger = LoggerFactory.getLogger(LegacyHintsMigrator.class);
+
+ private final File hintsDirectory;
+ private final long maxHintsFileSize;
+
+ private final ColumnFamilyStore legacyHintsTable;
+ private final int pageSize;
+
+ public LegacyHintsMigrator(File hintsDirectory, long maxHintsFileSize)
+ {
+ this.hintsDirectory = hintsDirectory;
+ this.maxHintsFileSize = maxHintsFileSize;
+
+ legacyHintsTable = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_HINTS);
+ pageSize = calculatePageSize(legacyHintsTable);
+ }
+
+ // read fewer columns (mutations) per page if they are very large
+ private static int calculatePageSize(ColumnFamilyStore legacyHintsTable)
+ {
+ int size = 128;
+
+ int meanCellCount = legacyHintsTable.getMeanColumns();
+ double meanPartitionSize = legacyHintsTable.getMeanPartitionSize();
+
+ if (meanCellCount != 0 || meanPartitionSize != 0)
+ {
+ int avgHintSize = (int) meanPartitionSize / meanCellCount;
+ size = Math.max(2, Math.min(size, (512 << 10) / avgHintSize));
+ }
+
+ return size;
+ }
+
+ public void migrate()
+ {
+ // nothing to migrate
+ if (legacyHintsTable.isEmpty())
+ return;
+ logger.info("Migrating legacy hints to new storage");
+
+ // major-compact all of the existing sstables to get rid of the tombstones + expired hints
+ logger.info("Forcing a major compaction of {}.{} table", SystemKeyspace.NAME, SystemKeyspace.LEGACY_HINTS);
+ compactLegacyHints();
+
+ // paginate over legacy hints and write them to the new storage
+ logger.info("Migrating legacy hints to the new storage");
+ migrateLegacyHints();
+
+ // truncate the legacy hints table
+ logger.info("Truncating {}.{} table", SystemKeyspace.NAME, SystemKeyspace.LEGACY_HINTS);
+ legacyHintsTable.truncateBlocking();
+ }
+
+ private void compactLegacyHints()
+ {
+ Collection<Descriptor> descriptors = new ArrayList<>();
+ legacyHintsTable.getTracker().getUncompacting().forEach(sstable -> descriptors.add(sstable.descriptor));
+ if (!descriptors.isEmpty())
+ forceCompaction(descriptors);
+ }
+
+ private void forceCompaction(Collection<Descriptor> descriptors)
+ {
+ try
+ {
+ CompactionManager.instance.submitUserDefined(legacyHintsTable, descriptors, FBUtilities.nowInSeconds()).get();
+ }
+ catch (InterruptedException | ExecutionException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void migrateLegacyHints()
+ {
+ ByteBuffer buffer = ByteBuffer.allocateDirect(256 * 1024);
+ String query = String.format("SELECT DISTINCT target_id FROM %s.%s", SystemKeyspace.NAME, SystemKeyspace.LEGACY_HINTS);
+ //noinspection ConstantConditions
+ QueryProcessor.executeInternal(query).forEach(row -> migrateLegacyHints(row.getUUID("target_id"), buffer));
+ FileUtils.clean(buffer);
+ }
+
+ private void migrateLegacyHints(UUID hostId, ByteBuffer buffer)
+ {
+ String query = String.format("SELECT target_id, hint_id, message_version, mutation, ttl(mutation) AS ttl, writeTime(mutation) AS write_time " +
+ "FROM %s.%s " +
+ "WHERE target_id = ?",
+ SystemKeyspace.NAME,
+ SystemKeyspace.LEGACY_HINTS);
+
+ // read all the old hints (paged iterator), write them in the new format
+ UntypedResultSet rows = QueryProcessor.executeInternalWithPaging(query, pageSize, hostId);
+ migrateLegacyHints(hostId, rows, buffer);
+
+ // delete the whole partition in the legacy table; we would truncate the whole table afterwards, but this allows
+ // to not lose progress in case of a terminated conversion
+ deleteLegacyHintsPartition(hostId);
+ }
+
+ private void migrateLegacyHints(UUID hostId, UntypedResultSet rows, ByteBuffer buffer)
+ {
+ migrateLegacyHints(hostId, rows.iterator(), buffer);
+ }
+
+ private void migrateLegacyHints(UUID hostId, Iterator<UntypedResultSet.Row> iterator, ByteBuffer buffer)
+ {
+ do
+ {
+ migrateLegacyHintsInternal(hostId, iterator, buffer);
+ // if there are hints that didn't fit in the previous file, keep calling the method to write to a new
+ // file until we get everything written.
+ }
+ while (iterator.hasNext());
+ }
+
+ private void migrateLegacyHintsInternal(UUID hostId, Iterator<UntypedResultSet.Row> iterator, ByteBuffer buffer)
+ {
+ HintsDescriptor descriptor = new HintsDescriptor(hostId, System.currentTimeMillis());
+
+ try (HintsWriter writer = HintsWriter.create(hintsDirectory, descriptor))
+ {
+ try (HintsWriter.Session session = writer.newSession(buffer))
+ {
+ while (iterator.hasNext())
+ {
+ Hint hint = convertLegacyHint(iterator.next());
+ if (hint != null)
+ session.append(hint);
+
+ if (session.position() >= maxHintsFileSize)
+ break;
+ }
+ }
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, descriptor.fileName());
+ }
+ }
+
+ private static Hint convertLegacyHint(UntypedResultSet.Row row)
+ {
+ Mutation mutation = deserializeLegacyMutation(row);
+ if (mutation == null)
+ return null;
+
+ long creationTime = row.getLong("write_time"); // milliseconds, not micros, for the hints table
+ int expirationTime = FBUtilities.nowInSeconds() + row.getInt("ttl");
+ int originalGCGS = expirationTime - (int) TimeUnit.MILLISECONDS.toSeconds(creationTime);
+
+ int gcgs = Math.min(originalGCGS, mutation.smallestGCGS());
+
+ return Hint.create(mutation, creationTime, gcgs);
+ }
+
+ private static Mutation deserializeLegacyMutation(UntypedResultSet.Row row)
+ {
+ try
+ {
+ Mutation mutation = Mutation.serializer.deserialize(new DataInputBuffer(row.getBlob("mutation"), true),
+ row.getInt("message_version"));
+ mutation.getPartitionUpdates().forEach(PartitionUpdate::validate);
+ return mutation;
+ }
+ catch (IOException e)
+ {
+ logger.error("Failed to migrate a hint for {} from legacy {}.{} table: {}",
+ row.getUUID("target_id"),
+ SystemKeyspace.NAME,
+ SystemKeyspace.LEGACY_HINTS,
+ e);
+ return null;
+ }
+ catch (MarshalException e)
+ {
+ logger.warn("Failed to validate a hint for {} (table id {}) from legacy {}.{} table - skipping: {})",
+ row.getUUID("target_id"),
+ SystemKeyspace.NAME,
+ SystemKeyspace.LEGACY_HINTS,
+ e);
+ return null;
+ }
+ }
+
+ private static void deleteLegacyHintsPartition(UUID hostId)
+ {
+ // intentionally use millis, like the rest of the legacy implementation did, just in case
+ Mutation mutation = new Mutation(PartitionUpdate.fullPartitionDelete(SystemKeyspace.LegacyHints,
+ UUIDType.instance.decompose(hostId),
+ System.currentTimeMillis(),
+ FBUtilities.nowInSeconds()));
+ mutation.applyUnsafe();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/hints/package-info.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/package-info.java b/src/java/org/apache/cassandra/hints/package-info.java
new file mode 100644
index 0000000..faa7b9f
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/package-info.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Hints subsystem consists of several components.
+ *
+ * {@link org.apache.cassandra.hints.Hint} encodes all the required metadata and the mutation being hinted.
+ *
+ * {@link org.apache.cassandra.hints.HintsBuffer} provides a temporary buffer for writing the hints to in a concurrent manner,
+ * before we flush them to disk.
+ *
+ * {@link org.apache.cassandra.hints.HintsBufferPool} is responsible for submitting {@link org.apache.cassandra.hints.HintsBuffer}
+ * instances for flushing when they exceed their capacity, and for maitaining a reserve {@link org.apache.cassandra.hints.HintsBuffer}
+ * instance, and creating extra ones if flushing cannot keep up with arrival rate.
+ *
+ * {@link org.apache.cassandra.hints.HintsWriteExecutor} is a single-threaded executor that performs all the writing to disk.
+ *
+ * {@link org.apache.cassandra.hints.HintsDispatchExecutor} is a multi-threaded executor responsible for dispatch of
+ * the hints to their destinations.
+ *
+ * {@link org.apache.cassandra.hints.HintsStore} tracks the state of all hints files (written and being written to)
+ * for a given host id destination.
+ *
+ * {@link org.apache.cassandra.hints.HintsCatalog} maintains the mapping of host ids to {@link org.apache.cassandra.hints.HintsStore}
+ * instances, and provides some aggregate APIs.
+ *
+ * {@link org.apache.cassandra.hints.HintsService} wraps the catalog, the pool, and the two executors, acting as a front-end
+ * for hints.
+ */
+package org.apache.cassandra.hints;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java b/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java
index e44279a..51f6569 100644
--- a/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java
@@ -21,7 +21,6 @@ import java.net.InetAddress;
import java.util.Map.Entry;
import com.codahale.metrics.Counter;
-import org.apache.cassandra.db.HintedHandOffManager;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.utils.UUIDGen;
import org.slf4j.Logger;
@@ -34,7 +33,7 @@ import com.google.common.cache.LoadingCache;
import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
/**
- * Metrics for {@link HintedHandOffManager}.
+ * Metrics for {@link org.apache.cassandra.hints.HintsService}.
*/
public class HintedHandoffMetrics
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/metrics/HintsServiceMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/HintsServiceMetrics.java b/src/java/org/apache/cassandra/metrics/HintsServiceMetrics.java
new file mode 100644
index 0000000..062f67d
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/HintsServiceMetrics.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.metrics;
+
+/**
+ * Metrics for {@link HintsService}.
+ */
+public final class HintsServiceMetrics
+{
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index b057d98..13632ac 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -55,6 +55,8 @@ import org.apache.cassandra.gms.EchoMessage;
import org.apache.cassandra.gms.GossipDigestAck;
import org.apache.cassandra.gms.GossipDigestAck2;
import org.apache.cassandra.gms.GossipDigestSyn;
+import org.apache.cassandra.hints.HintMessage;
+import org.apache.cassandra.hints.HintResponse;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
@@ -97,7 +99,7 @@ public final class MessagingService implements MessagingServiceMBean
public enum Verb
{
MUTATION,
- @Deprecated BINARY,
+ HINT,
READ_REPAIR,
READ,
REQUEST_RESPONSE, // client-initiated reads and writes
@@ -129,7 +131,6 @@ public final class MessagingService implements MessagingServiceMBean
_TRACE, // dummy verb so we can use MS.droppedMessagesMap
ECHO,
REPAIR_MESSAGE,
- // use as padding for backwards compatability where a previous version needs to validate a verb from the future.
PAXOS_PREPARE,
PAXOS_PROPOSE,
PAXOS_COMMIT,
@@ -150,6 +151,7 @@ public final class MessagingService implements MessagingServiceMBean
put(Verb.COUNTER_MUTATION, Stage.COUNTER_MUTATION);
put(Verb.BATCHLOG_MUTATION, Stage.BATCHLOG_MUTATION);
put(Verb.READ_REPAIR, Stage.MUTATION);
+ put(Verb.HINT, Stage.MUTATION);
put(Verb.TRUNCATE, Stage.MUTATION);
put(Verb.PAXOS_PREPARE, Stage.MUTATION);
put(Verb.PAXOS_PROPOSE, Stage.MUTATION);
@@ -226,6 +228,7 @@ public final class MessagingService implements MessagingServiceMBean
put(Verb.PAXOS_PREPARE, Commit.serializer);
put(Verb.PAXOS_PROPOSE, Commit.serializer);
put(Verb.PAXOS_COMMIT, Commit.serializer);
+ put(Verb.HINT, HintMessage.serializer);
}};
/**
@@ -234,6 +237,7 @@ public final class MessagingService implements MessagingServiceMBean
public static final EnumMap<Verb, IVersionedSerializer<?>> callbackDeserializers = new EnumMap<Verb, IVersionedSerializer<?>>(Verb.class)
{{
put(Verb.MUTATION, WriteResponse.serializer);
+ put(Verb.HINT, HintResponse.serializer);
put(Verb.BATCHLOG_MUTATION, WriteResponse.serializer);
put(Verb.READ_REPAIR, WriteResponse.serializer);
put(Verb.COUNTER_MUTATION, WriteResponse.serializer);
@@ -299,6 +303,7 @@ public final class MessagingService implements MessagingServiceMBean
Verb.MUTATION,
Verb.BATCHLOG_MUTATION, //FIXME: should this be droppable??
Verb.COUNTER_MUTATION,
+ Verb.HINT,
Verb.READ_REPAIR,
Verb.READ,
Verb.RANGE_SLICE,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 3b123c3..cf1e021 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -56,6 +56,7 @@ import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.StartupException;
import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.hints.LegacyHintsMigrator;
import org.apache.cassandra.io.FSError;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.util.FileUtils;
@@ -276,6 +277,9 @@ public class CassandraDaemon
throw new RuntimeException(e);
}
+ // migrate any legacy (pre-3.0) hints from system.hints table into the new store
+ new LegacyHintsMigrator(DatabaseDescriptor.getHintsDirectory(), DatabaseDescriptor.getMaxHintsFileSize()).migrate();
+
// enable auto compaction
for (Keyspace keyspace : Keyspace.all())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/service/StartupChecks.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StartupChecks.java b/src/java/org/apache/cassandra/service/StartupChecks.java
index 9ffef96..1c07b52 100644
--- a/src/java/org/apache/cassandra/service/StartupChecks.java
+++ b/src/java/org/apache/cassandra/service/StartupChecks.java
@@ -179,33 +179,30 @@ public class StartupChecks
}
};
- public static final StartupCheck checkDataDirs = new StartupCheck()
+ public static final StartupCheck checkDataDirs = () ->
{
- public void execute() throws StartupException
+ // check all directories(data, commitlog, saved cache) for existence and permission
+ Iterable<String> dirs = Iterables.concat(Arrays.asList(DatabaseDescriptor.getAllDataFileLocations()),
+ Arrays.asList(DatabaseDescriptor.getCommitLogLocation(),
+ DatabaseDescriptor.getSavedCachesLocation(),
+ DatabaseDescriptor.getHintsDirectory().getAbsolutePath()));
+ for (String dataDir : dirs)
{
- // check all directories(data, commitlog, saved cache) for existence and permission
- Iterable<String> dirs = Iterables.concat(Arrays.asList(DatabaseDescriptor.getAllDataFileLocations()),
- Arrays.asList(DatabaseDescriptor.getCommitLogLocation(),
- DatabaseDescriptor.getSavedCachesLocation()));
- for (String dataDir : dirs)
- {
- logger.debug("Checking directory {}", dataDir);
- File dir = new File(dataDir);
-
- // check that directories exist.
- if (!dir.exists())
- {
- logger.error("Directory {} doesn't exist", dataDir);
- // if they don't, failing their creation, stop cassandra.
- if (!dir.mkdirs())
- throw new StartupException(3, "Has no permission to create directory "+ dataDir);
- }
-
- // if directories exist verify their permissions
- if (!Directories.verifyFullPermissions(dir, dataDir))
- throw new StartupException(3, "Insufficient permissions on directory " + dataDir);
+ logger.debug("Checking directory {}", dataDir);
+ File dir = new File(dataDir);
+ // check that directories exist.
+ if (!dir.exists())
+ {
+ logger.error("Directory {} doesn't exist", dataDir);
+ // if they don't, failing their creation, stop cassandra.
+ if (!dir.mkdirs())
+ throw new StartupException(3, "Has no permission to create directory "+ dataDir);
}
+
+ // if directories exist verify their permissions
+ if (!Directories.verifyFullPermissions(dir, dataDir))
+ throw new StartupException(3, "Insufficient permissions on directory " + dataDir);
}
};
http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index fc917f0..12c2c24 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -34,6 +34,7 @@ import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.cassandra.db.view.MaterializedViewManager;
import org.apache.cassandra.db.view.MaterializedViewUtils;
+import org.apache.cassandra.db.HintedHandOffManager;
import org.apache.cassandra.metrics.*;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
@@ -57,6 +58,8 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.hints.Hint;
+import org.apache.cassandra.hints.HintsService;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.IEndpointSnitch;
@@ -99,7 +102,9 @@ public class StorageProxy implements StorageProxyMBean
private static final double CONCURRENT_SUBREQUESTS_MARGIN = 0.10;
- private StorageProxy() {}
+ private StorageProxy()
+ {
+ }
static
{
@@ -113,6 +118,9 @@ public class StorageProxy implements StorageProxyMBean
throw new RuntimeException(e);
}
+ HintsService.instance.registerMBean();
+ HintedHandOffManager.instance.registerMBean();
+
standardWritePerformer = new WritePerformer()
{
public void apply(IMutation mutation,
@@ -603,32 +611,41 @@ public class StorageProxy implements StorageProxyMBean
}
}
- /** hint all the mutations (except counters, which can't be safely retried). This means
- * we'll re-hint any successful ones; doesn't seem worth it to track individual success
- * just for this unusual case.
-
- * @param mutations the mutations that require hints
- */
+ /**
+ * Hint all the mutations (except counters, which can't be safely retried). This means
+ * we'll re-hint any successful ones; doesn't seem worth it to track individual success
+ * just for this unusual case.
+ *
+ * Only used for CL.ANY
+ *
+ * @param mutations the mutations that require hints
+ */
private static void hintMutations(Collection<? extends IMutation> mutations)
{
for (IMutation mutation : mutations)
- {
- if (mutation instanceof CounterMutation)
- continue;
+ if (!(mutation instanceof CounterMutation))
+ hintMutation((Mutation) mutation);
- Token tk = mutation.key().getToken();
- List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(mutation.getKeyspaceName(), tk);
- Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, mutation.getKeyspaceName());
- for (InetAddress target : Iterables.concat(naturalEndpoints, pendingEndpoints))
- {
- // local writes can timeout, but cannot be dropped (see LocalMutationRunnable and
- // CASSANDRA-6510), so there is no need to hint or retry
- if (!target.equals(FBUtilities.getBroadcastAddress()) && shouldHint(target))
- submitHint((Mutation) mutation, target, null);
- }
- }
+ Tracing.trace("Wrote hints to satisfy CL.ANY after no replicas acknowledged the write");
+ }
+
+ private static void hintMutation(Mutation mutation)
+ {
+ Token tk = DatabaseDescriptor.getPartitioner().getToken(mutation.key().getKey());
+ List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(mutation.getKeyspaceName(), tk);
+ Collection<InetAddress> pendingEndpoints =
+ StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, mutation.getKeyspaceName());
+
+ Iterable<InetAddress> endpoints = Iterables.concat(naturalEndpoints, pendingEndpoints);
+ ArrayList<InetAddress> endpointsToHint = new ArrayList<>(Iterables.size(endpoints));
+
+ // local writes can timeout, but cannot be dropped (see LocalMutationRunnable and CASSANDRA-6510),
+ // so there is no need to hint or retry.
+ for (InetAddress target : endpoints)
+ if (!target.equals(FBUtilities.getBroadcastAddress()) && shouldHint(target))
+ endpointsToHint.add(target);
- Tracing.trace("Wrote hint to satisfy CL.ANY after no replicas acknowledged the write");
+ submitHint(mutation, endpointsToHint, null);
}
/**
@@ -1071,7 +1088,7 @@ public class StorageProxy implements StorageProxyMBean
MessageOut<Mutation> message = null;
boolean insertLocal = false;
-
+ ArrayList<InetAddress> endpointsToHint = null;
for (InetAddress destination : targets)
{
@@ -1115,16 +1132,21 @@ public class StorageProxy implements StorageProxyMBean
messages.add(destination);
}
}
- } else
+ }
+ else
{
- if (!shouldHint(destination))
- continue;
-
- // Schedule a local hint
- submitHint(mutation, destination, responseHandler);
+ if (shouldHint(destination))
+ {
+ if (endpointsToHint == null)
+ endpointsToHint = new ArrayList<>(Iterables.size(targets));
+ endpointsToHint.add(destination);
+ }
}
}
+ if (endpointsToHint != null)
+ submitHint(mutation, endpointsToHint, responseHandler);
+
if (insertLocal)
insertLocal(stage, mutation, responseHandler);
@@ -1139,66 +1161,6 @@ public class StorageProxy implements StorageProxyMBean
}
}
- private static AtomicInteger getHintsInProgressFor(InetAddress destination)
- {
- try
- {
- return hintsInProgress.load(destination);
- }
- catch (Exception e)
- {
- throw new AssertionError(e);
- }
- }
-
- public static Future<Void> submitHint(final Mutation mutation,
- final InetAddress target,
- final AbstractWriteResponseHandler<IMutation> responseHandler)
- {
- // local write that time out should be handled by LocalMutationRunnable
- assert !target.equals(FBUtilities.getBroadcastAddress()) : target;
-
- HintRunnable runnable = new HintRunnable(target)
- {
- public void runMayThrow()
- {
- int ttl = HintedHandOffManager.calculateHintTTL(mutation);
- if (ttl > 0)
- {
- logger.debug("Adding hint for {}", target);
- writeHintForMutation(mutation, System.currentTimeMillis(), ttl, target);
- // Notify the handler only for CL == ANY
- if (responseHandler != null && responseHandler.consistencyLevel == ConsistencyLevel.ANY)
- responseHandler.response(null);
- } else
- {
- logger.debug("Skipped writing hint for {} (ttl {})", target, ttl);
- }
- }
- };
-
- return submitHint(runnable);
- }
-
- private static Future<Void> submitHint(HintRunnable runnable)
- {
- StorageMetrics.totalHintsInProgress.inc();
- getHintsInProgressFor(runnable.target).incrementAndGet();
- return (Future<Void>) StageManager.getStage(Stage.MUTATION).submit(runnable);
- }
-
- /**
- * @param now current time in milliseconds - relevant for hint replay handling of truncated CFs
- */
- public static void writeHintForMutation(Mutation mutation, long now, int ttl, InetAddress target)
- {
- assert ttl > 0;
- UUID hostId = StorageService.instance.getTokenMetadata().getHostId(target);
- assert hostId != null : "Missing host ID for " + target.getHostAddress();
- HintedHandOffManager.instance.hintFor(mutation, now, ttl, hostId).apply();
- StorageMetrics.totalHints.inc();
- }
-
private static void sendMessagesToNonlocalDC(MessageOut<? extends IMutation> message,
Collection<InetAddress> targets,
AbstractWriteResponseHandler<IMutation> handler)
@@ -2209,7 +2171,7 @@ public class StorageProxy implements StorageProxyMBean
{
if (!DatabaseDescriptor.hintedHandoffEnabled())
{
- HintedHandOffManager.instance.metrics.incrPastWindow(ep);
+ HintsService.instance.metrics.incrPastWindow(ep);
return false;
}
@@ -2220,7 +2182,7 @@ public class StorageProxy implements StorageProxyMBean
if (disabledDCs.contains(dc))
{
Tracing.trace("Not hinting {} since its data center {} has been disabled {}", ep, dc, disabledDCs);
- HintedHandOffManager.instance.metrics.incrPastWindow(ep);
+ HintsService.instance.metrics.incrPastWindow(ep);
return false;
}
}
@@ -2228,7 +2190,7 @@ public class StorageProxy implements StorageProxyMBean
boolean hintWindowExpired = Gossiper.instance.getEndpointDowntime(ep) > DatabaseDescriptor.getMaxHintWindow();
if (hintWindowExpired)
{
- HintedHandOffManager.instance.metrics.incrPastWindow(ep);
+ HintsService.instance.metrics.incrPastWindow(ep);
Tracing.trace("Not hinting {} which has been down {} ms", ep, Gossiper.instance.getEndpointDowntime(ep));
}
return !hintWindowExpired;
@@ -2363,7 +2325,7 @@ public class StorageProxy implements StorageProxyMBean
if (System.currentTimeMillis() > constructionTime + DatabaseDescriptor.getTimeout(MessagingService.Verb.MUTATION))
{
MessagingService.instance().incrementDroppedMessages(MessagingService.Verb.MUTATION);
- HintRunnable runnable = new HintRunnable(FBUtilities.getBroadcastAddress())
+ HintRunnable runnable = new HintRunnable(Collections.singleton(FBUtilities.getBroadcastAddress()))
{
protected void runMayThrow() throws Exception
{
@@ -2393,11 +2355,11 @@ public class StorageProxy implements StorageProxyMBean
*/
private abstract static class HintRunnable implements Runnable
{
- public final InetAddress target;
+ public final Collection<InetAddress> targets;
- protected HintRunnable(InetAddress target)
+ protected HintRunnable(Collection<InetAddress> targets)
{
- this.target = target;
+ this.targets = targets;
}
public void run()
@@ -2412,8 +2374,9 @@ public class StorageProxy implements StorageProxyMBean
}
finally
{
- StorageMetrics.totalHintsInProgress.dec();
- getHintsInProgressFor(target).decrementAndGet();
+ StorageMetrics.totalHintsInProgress.dec(targets.size());
+ for (InetAddress target : targets)
+ getHintsInProgressFor(target).decrementAndGet();
}
}
@@ -2446,6 +2409,52 @@ public class StorageProxy implements StorageProxyMBean
logger.warn("Some hints were not written before shutdown. This is not supposed to happen. You should (a) run repair, and (b) file a bug report");
}
+ private static AtomicInteger getHintsInProgressFor(InetAddress destination)
+ {
+ try
+ {
+ return hintsInProgress.load(destination);
+ }
+ catch (Exception e)
+ {
+ throw new AssertionError(e);
+ }
+ }
+
+ public static Future<Void> submitHint(Mutation mutation, InetAddress target, AbstractWriteResponseHandler<IMutation> responseHandler)
+ {
+ return submitHint(mutation, Collections.singleton(target), responseHandler);
+ }
+
+ public static Future<Void> submitHint(Mutation mutation,
+ Collection<InetAddress> targets,
+ AbstractWriteResponseHandler<IMutation> responseHandler)
+ {
+ HintRunnable runnable = new HintRunnable(targets)
+ {
+ public void runMayThrow()
+ {
+ logger.debug("Adding hints for {}", targets);
+ HintsService.instance.write(Iterables.transform(targets, StorageService.instance::getHostIdForEndpoint),
+ Hint.create(mutation, System.currentTimeMillis()));
+ targets.forEach(HintsService.instance.metrics::incrCreatedHints);
+ // Notify the handler only for CL == ANY
+ if (responseHandler != null && responseHandler.consistencyLevel == ConsistencyLevel.ANY)
+ responseHandler.response(null);
+ }
+ };
+
+ return submitHint(runnable);
+ }
+
+ private static Future<Void> submitHint(HintRunnable runnable)
+ {
+ StorageMetrics.totalHintsInProgress.inc(runnable.targets.size());
+ for (InetAddress target : runnable.targets)
+ getHintsInProgressFor(target).incrementAndGet();
+ return (Future<Void>) StageManager.getStage(Stage.MUTATION).submit(runnable);
+ }
+
public Long getRpcTimeout() { return DatabaseDescriptor.getRpcTimeout(); }
public void setRpcTimeout(Long timeoutInMillis) { DatabaseDescriptor.setRpcTimeout(timeoutInMillis); }
@@ -2478,11 +2487,11 @@ public class StorageProxy implements StorageProxyMBean
public long getReadRepairAttempted() {
return ReadRepairMetrics.attempted.getCount();
}
-
+
public long getReadRepairRepairedBlocking() {
return ReadRepairMetrics.repairedBlocking.getCount();
}
-
+
public long getReadRepairRepairedBackground() {
return ReadRepairMetrics.repairedBackground.getCount();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 5966e49..0346645 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -28,22 +28,8 @@ import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.Map.Entry;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -54,6 +40,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import javax.annotation.Nullable;
import javax.management.JMX;
import javax.management.MBeanServer;
import javax.management.NotificationBroadcasterSupport;
@@ -95,6 +82,8 @@ import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
import org.apache.cassandra.gms.IFailureDetector;
import org.apache.cassandra.gms.TokenSerializer;
import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.hints.HintVerbHandler;
+import org.apache.cassandra.hints.HintsService;
import org.apache.cassandra.io.sstable.SSTableLoader;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
@@ -303,6 +292,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PAXOS_PREPARE, new PrepareVerbHandler());
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PAXOS_PROPOSE, new ProposeVerbHandler());
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PAXOS_COMMIT, new CommitVerbHandler());
+ MessagingService.instance().registerVerbHandlers(MessagingService.Verb.HINT, new HintVerbHandler());
// see BootStrapper for a summary of how the bootstrap verbs interact
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.REPLICATION_FINISHED, new ReplicationFinishedVerbHandler());
@@ -585,7 +575,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
logger.info("Cassandra version: {}", FBUtilities.getReleaseVersionString());
logger.info("Thrift API version: {}", cassandraConstants.VERSION);
logger.info("CQL supported versions: {} (default: {})",
- StringUtils.join(ClientState.getCQLSupportedVersion(), ","), ClientState.DEFAULT_CQL_VERSION);
+ StringUtils.join(ClientState.getCQLSupportedVersion(), ","), ClientState.DEFAULT_CQL_VERSION);
initialized = true;
@@ -649,6 +639,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
MessagingService.instance().shutdown();
materializedViewMutationStage.shutdown();
batchlogMutationStage.shutdown();
+ HintsService.instance.pauseDispatch();
counterMutationStage.shutdown();
mutationStage.shutdown();
materializedViewMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
@@ -681,6 +672,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
if (FBUtilities.isWindows())
WindowsTimer.endTimerPeriod(DatabaseDescriptor.getWindowsTimerInterval());
+ HintsService.instance.shutdownBlocking();
+
// wait for miscellaneous tasks like sstable and commitlog segment deletion
ScheduledExecutors.nonPeriodicTasks.shutdown();
if (!ScheduledExecutors.nonPeriodicTasks.awaitTermination(1, TimeUnit.MINUTES))
@@ -792,7 +785,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
MessagingService.instance().listen(FBUtilities.getLocalAddress());
LoadBroadcaster.instance.startBroadcasting();
- HintedHandOffManager.instance.start();
+ HintsService.instance.startDispatch();
BatchlogManager.instance.start();
}
}
@@ -1564,6 +1557,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return getTokenMetadata().getHostId(FBUtilities.getBroadcastAddress()).toString();
}
+ public UUID getLocalHostUUID()
+ {
+ return getTokenMetadata().getHostId(FBUtilities.getBroadcastAddress());
+ }
+
public Map<String, String> getHostIdMap()
{
Map<String, String> mapOut = new HashMap<>();
@@ -2119,11 +2117,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
private void excise(Collection<Token> tokens, InetAddress endpoint)
{
logger.info("Removing tokens {} for {}", tokens, endpoint);
- HintedHandOffManager.instance.deleteHintsForEndpoint(endpoint);
+
+ if (tokenMetadata.isMember(endpoint))
+ HintsService.instance.excise(tokenMetadata.getHostId(endpoint));
+
removeEndpoint(endpoint);
tokenMetadata.removeEndpoint(endpoint);
tokenMetadata.removeBootstrapTokens(tokens);
-
notifyLeft(endpoint);
PendingRangeCalculatorService.instance.update();
}
@@ -2337,10 +2337,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
MigrationManager.instance.scheduleSchemaPull(endpoint, state);
if (tokenMetadata.isMember(endpoint))
- {
- HintedHandOffManager.instance.scheduleHintDelivery(endpoint, true);
notifyUp(endpoint);
- }
}
public void onRemove(InetAddress endpoint)
@@ -2380,9 +2377,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return map;
}
+ // TODO
public final void deliverHints(String host) throws UnknownHostException
{
- HintedHandOffManager.instance.scheduleHintDelivery(host);
+ throw new UnsupportedOperationException();
}
public Collection<Token> getLocalTokens()
@@ -2392,6 +2390,18 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return tokens;
}
+ @Nullable
+ public InetAddress getEndpointForHostId(UUID hostId)
+ {
+ return tokenMetadata.getEndpointForHostId(hostId);
+ }
+
+ @Nullable
+ public UUID getHostIdForEndpoint(InetAddress address)
+ {
+ return tokenMetadata.getHostId(address);
+ }
+
/* These methods belong to the MBean interface */
public List<String> getTokens()
@@ -3418,7 +3428,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
private Future<StreamState> streamHints()
{
// StreamPlan will not fail if there are zero files to transfer, so flush anyway (need to get any in-memory hints, as well)
- ColumnFamilyStore hintsCF = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.HINTS);
+ ColumnFamilyStore hintsCF = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_HINTS);
FBUtilities.waitOnFuture(hintsCF.forceFlush());
// gather all live nodes in the cluster that aren't also leaving
@@ -3451,7 +3461,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
preferred,
SystemKeyspace.NAME,
ranges,
- SystemKeyspace.HINTS)
+ SystemKeyspace.LEGACY_HINTS)
.execute();
}
}
@@ -3835,7 +3845,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public synchronized void drain() throws IOException, InterruptedException, ExecutionException
{
inShutdownHook = true;
-
+
+ BatchlogManager.shutdown();
+
+ HintsService.instance.pauseDispatch();
+
ExecutorService counterMutationStage = StageManager.getStage(Stage.COUNTER_MUTATION);
ExecutorService batchlogMutationStage = StageManager.getStage(Stage.BATCHLOG_MUTATION);
ExecutorService materializedViewMutationStage = StageManager.getStage(Stage.MATERIALIZED_VIEW_MUTATION);
@@ -3899,7 +3913,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
FBUtilities.waitOnFutures(flushes);
- BatchlogManager.shutdown();
+ HintsService.instance.shutdownBlocking();
// whilst we've flushed all the CFs, which will have recycled all completed segments, we want to ensure
// there are no segments to replay, so we force the recycling of any remaining (should be at most one)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 22b3455..c1e0a0d 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -57,7 +57,6 @@ import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.db.BatchlogManager;
import org.apache.cassandra.db.BatchlogManagerMBean;
import org.apache.cassandra.db.ColumnFamilyStoreMBean;
-import org.apache.cassandra.db.HintedHandOffManager;
import org.apache.cassandra.db.HintedHandOffManagerMBean;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.CompactionManagerMBean;
@@ -65,6 +64,7 @@ import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.FailureDetectorMBean;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.GossiperMBean;
+import org.apache.cassandra.db.HintedHandOffManager;
import org.apache.cassandra.locator.EndpointSnitchInfoMBean;
import org.apache.cassandra.metrics.CassandraMetricsRegistry;
import org.apache.cassandra.metrics.TableMetrics.Sampler;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index d1462d2..4f7a97a 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -26,10 +26,10 @@ import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.*;
import java.util.concurrent.*;
+import java.util.zip.CRC32;
import java.util.zip.Checksum;
import com.google.common.base.Joiner;
-import org.apache.cassandra.utils.AbstractIterator;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -332,7 +332,7 @@ public class FBUtilities
public static int nowInSeconds()
{
- return (int)(System.currentTimeMillis() / 1000);
+ return (int) (System.currentTimeMillis() / 1000);
}
public static void waitOnFutures(Iterable<Future<?>> futures)
@@ -595,6 +595,34 @@ public class FBUtilities
checksum.update((v >>> 0) & 0xFF);
}
+ /**
+ * Updates checksum with the provided ByteBuffer at the given offset + length.
+ * Resets position and limit back to their original values on return.
+ * This method is *NOT* thread-safe.
+ */
+ public static void updateChecksum(CRC32 checksum, ByteBuffer buffer, int offset, int length)
+ {
+ int position = buffer.position();
+ int limit = buffer.limit();
+
+ buffer.position(offset).limit(offset + length);
+ checksum.update(buffer);
+
+ buffer.position(position).limit(limit);
+ }
+
+ /**
+ * Updates checksum with the provided ByteBuffer.
+ * Resets position back to its original values on return.
+ * This method is *NOT* thread-safe.
+ */
+ public static void updateChecksum(CRC32 checksum, ByteBuffer buffer)
+ {
+ int position = buffer.position();
+ checksum.update(buffer);
+ buffer.position(position);
+ }
+
private static final ThreadLocal<byte[]> threadLocalScratchBuffer = new ThreadLocal<byte[]>()
{
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/utils/Throwables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/Throwables.java b/src/java/org/apache/cassandra/utils/Throwables.java
index 0a2bd28..d6ce7b4 100644
--- a/src/java/org/apache/cassandra/utils/Throwables.java
+++ b/src/java/org/apache/cassandra/utils/Throwables.java
@@ -18,10 +18,25 @@
*/
package org.apache.cassandra.utils;
-public class Throwables
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.stream.Stream;
+
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.FSWriteError;
+
+public final class Throwables
{
+ public enum FileOpType { READ, WRITE }
- public static Throwable merge(Throwable existingFail, Throwable newFail)
+ public interface DiscreteAction<E extends Exception>
+ {
+ void perform() throws E;
+ }
+
+ public static <T extends Throwable> T merge(T existingFail, T newFail)
{
if (existingFail == null)
return newFail;
@@ -31,7 +46,74 @@ public class Throwables
public static void maybeFail(Throwable fail)
{
- if (fail != null)
- com.google.common.base.Throwables.propagate(fail);
+ if (failIfCanCast(fail, null))
+ throw new RuntimeException(fail);
+ }
+
+ public static <T extends Throwable> void maybeFail(Throwable fail, Class<T> checked) throws T
+ {
+ if (failIfCanCast(fail, checked))
+ throw new RuntimeException(fail);
+ }
+
+ public static <T extends Throwable> boolean failIfCanCast(Throwable fail, Class<T> checked) throws T
+ {
+ if (fail == null)
+ return false;
+
+ if (fail instanceof Error)
+ throw (Error) fail;
+
+ if (fail instanceof RuntimeException)
+ throw (RuntimeException) fail;
+
+ if (checked != null && checked.isInstance(fail))
+ throw checked.cast(fail);
+
+ return true;
+ }
+
+ @SafeVarargs
+ public static <E extends Exception> void perform(DiscreteAction<? extends E> ... actions) throws E
+ {
+ perform(Arrays.stream(actions));
+ }
+
+ @SuppressWarnings("unchecked")
+ public static <E extends Exception> void perform(Stream<DiscreteAction<? extends E>> actions) throws E
+ {
+ Throwable fail = null;
+ Iterator<DiscreteAction<? extends E>> iter = actions.iterator();
+ while (iter.hasNext())
+ {
+ DiscreteAction<? extends E> action = iter.next();
+ try
+ {
+ action.perform();
+ }
+ catch (Throwable t)
+ {
+ fail = merge(fail, t);
+ }
+ }
+
+ if (failIfCanCast(fail, null))
+ throw (E) fail;
+ }
+
+ @SafeVarargs
+ public static void perform(File against, FileOpType opType, DiscreteAction<? extends IOException> ... actions)
+ {
+ perform(Arrays.stream(actions).map((action) -> () ->
+ {
+ try
+ {
+ action.perform();
+ }
+ catch (IOException e)
+ {
+ throw (opType == FileOpType.WRITE) ? new FSWriteError(e, against) : new FSReadError(e, against);
+ }
+ }));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/test/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml
index 0bbaee4..1dba284 100644
--- a/test/conf/cassandra.yaml
+++ b/test/conf/cassandra.yaml
@@ -8,6 +8,7 @@ commitlog_sync: batch
commitlog_sync_batch_window_in_ms: 1.0
commitlog_segment_size_in_mb: 5
commitlog_directory: build/test/cassandra/commitlog
+hints_directory: build/test/cassandra/hints
partitioner: org.apache.cassandra.dht.ByteOrderedPartitioner
listen_address: 127.0.0.1
storage_port: 7010
http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/test/long/org/apache/cassandra/hints/HintsWriteThenReadTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/hints/HintsWriteThenReadTest.java b/test/long/org/apache/cassandra/hints/HintsWriteThenReadTest.java
new file mode 100644
index 0000000..fd880cb
--- /dev/null
+++ b/test/long/org/apache/cassandra/hints/HintsWriteThenReadTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.Iterator;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.zip.CRC32;
+
+import com.google.common.collect.Iterables;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertTrue;
+
+import static org.apache.cassandra.Util.dk;
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+
+public class HintsWriteThenReadTest
+{
+ private static final String KEYSPACE = "hints_write_then_read_test";
+ private static final String TABLE = "table";
+
+ private static final int HINTS_COUNT = 10_000_000;
+
+ @Test
+ public void testWriteReadCycle() throws IOException
+ {
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), SchemaLoader.standardCFMD(KEYSPACE, TABLE));
+
+ HintsDescriptor descriptor = new HintsDescriptor(UUID.randomUUID(), System.currentTimeMillis());
+
+ File directory = Files.createTempDirectory(null).toFile();
+ try
+ {
+ testWriteReadCycle(directory, descriptor);
+ }
+ finally
+ {
+ directory.deleteOnExit();
+ }
+ }
+
+ private void testWriteReadCycle(File directory, HintsDescriptor descriptor) throws IOException
+ {
+ // write HINTS_COUNT hints to a file
+ writeHints(directory, descriptor);
+
+ // calculate the checksum of the file, then compare to the .crc32 checksum file content
+ verifyChecksum(directory, descriptor);
+
+ // iterate over the written hints, make sure they are all present
+ verifyHints(directory, descriptor);
+ }
+
+ private void writeHints(File directory, HintsDescriptor descriptor) throws IOException
+ {
+ try (HintsWriter writer = HintsWriter.create(directory, descriptor))
+ {
+ write(writer, descriptor.timestamp);
+ }
+ }
+
+ private static void verifyChecksum(File directory, HintsDescriptor descriptor) throws IOException
+ {
+ File hintsFile = new File(directory, descriptor.fileName());
+ File checksumFile = new File(directory, descriptor.checksumFileName());
+
+ assertTrue(checksumFile.exists());
+
+ String actualChecksum = Integer.toHexString(calculateChecksum(hintsFile));
+ String expectedChecksum = Files.readAllLines(checksumFile.toPath()).iterator().next();
+
+ assertEquals(expectedChecksum, actualChecksum);
+ }
+
+ private void verifyHints(File directory, HintsDescriptor descriptor)
+ {
+ long baseTimestamp = descriptor.timestamp;
+ int index = 0;
+
+ try (HintsReader reader = HintsReader.open(new File(directory, descriptor.fileName())))
+ {
+ for (HintsReader.Page page : reader)
+ {
+ Iterator<Hint> hints = page.hintsIterator();
+ while (hints.hasNext())
+ {
+ Hint hint = hints.next();
+
+ long timestamp = baseTimestamp + index;
+ Mutation mutation = hint.mutation;
+
+ assertEquals(timestamp, hint.creationTime);
+ assertEquals(dk(bytes(index)), mutation.key());
+
+ Row row = mutation.getPartitionUpdates().iterator().next().iterator().next();
+ assertEquals(1, Iterables.size(row.cells()));
+ assertEquals(bytes(index), row.clustering().get(0));
+ Cell cell = row.cells().iterator().next();
+ assertNotNull(cell);
+ assertEquals(bytes(index), cell.value());
+ assertEquals(timestamp * 1000, cell.timestamp());
+
+ index++;
+ }
+ }
+ }
+
+ assertEquals(index, HINTS_COUNT);
+ }
+
+ private void write(HintsWriter writer, long timestamp) throws IOException
+ {
+ ByteBuffer buffer = ByteBuffer.allocateDirect(256 * 1024);
+ try (HintsWriter.Session session = writer.newSession(buffer))
+ {
+ write(session, timestamp);
+ }
+ FileUtils.clean(buffer);
+ }
+
+ private void write(HintsWriter.Session session, long timestamp) throws IOException
+ {
+ for (int i = 0; i < HINTS_COUNT; i++)
+ session.append(createHint(i, timestamp));
+ }
+
+ private static Hint createHint(int idx, long baseTimestamp)
+ {
+ long timestamp = baseTimestamp + idx;
+ return Hint.create(createMutation(idx, TimeUnit.MILLISECONDS.toMicros(timestamp)), timestamp);
+ }
+
+ private static Mutation createMutation(int index, long timestamp)
+ {
+ CFMetaData table = Schema.instance.getCFMetaData(KEYSPACE, TABLE);
+ return new RowUpdateBuilder(table, timestamp, bytes(index))
+ .clustering(bytes(index))
+ .add("val", bytes(index))
+ .build();
+ }
+
+ private static int calculateChecksum(File file) throws IOException
+ {
+ CRC32 crc = new CRC32();
+ byte[] buffer = new byte[FBUtilities.MAX_UNSIGNED_SHORT];
+
+ try (InputStream in = Files.newInputStream(file.toPath()))
+ {
+ int bytesRead;
+ while((bytesRead = in.read(buffer)) != -1)
+ crc.update(buffer, 0, bytesRead);
+ }
+
+ return (int) crc.getValue();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/test/unit/org/apache/cassandra/OffsetAwareConfigurationLoader.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/OffsetAwareConfigurationLoader.java b/test/unit/org/apache/cassandra/OffsetAwareConfigurationLoader.java
index 9023b11..3bdb192 100644
--- a/test/unit/org/apache/cassandra/OffsetAwareConfigurationLoader.java
+++ b/test/unit/org/apache/cassandra/OffsetAwareConfigurationLoader.java
@@ -54,6 +54,7 @@ public class OffsetAwareConfigurationLoader extends YamlConfigurationLoader
config.commitlog_directory += File.pathSeparator + offset;
config.saved_caches_directory += File.pathSeparator + offset;
+ config.hints_directory += File.pathSeparator + offset;
for (int i = 0; i < config.data_file_directories.length; i++)
config.data_file_directories[i] += File.pathSeparator + offset;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/test/unit/org/apache/cassandra/db/HintedHandOffTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/HintedHandOffTest.java b/test/unit/org/apache/cassandra/db/HintedHandOffTest.java
deleted file mode 100644
index e06c95a..0000000
--- a/test/unit/org/apache/cassandra/db/HintedHandOffTest.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.cassandra.db;
-
-import java.net.InetAddress;
-import java.util.Map;
-import java.util.UUID;
-
-import com.google.common.collect.Iterators;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.cql3.UntypedResultSet;
-import org.apache.cassandra.db.marshal.Int32Type;
-import org.apache.cassandra.db.marshal.UUIDType;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.schema.KeyspaceParams;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
-
-import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
-import static org.junit.Assert.assertEquals;
-
-public class HintedHandOffTest
-{
-
- public static final String KEYSPACE4 = "HintedHandOffTest4";
- public static final String STANDARD1_CF = "Standard1";
-
- @BeforeClass
- public static void defineSchema() throws ConfigurationException
- {
- SchemaLoader.prepareServer();
- SchemaLoader.createKeyspace(KEYSPACE4,
- KeyspaceParams.simple(1),
- SchemaLoader.standardCFMD(KEYSPACE4, STANDARD1_CF));
- }
-
- @Before
- public void clearHints()
- {
- Keyspace systemKeyspace = Keyspace.open("system");
- ColumnFamilyStore hintStore = systemKeyspace.getColumnFamilyStore(SystemKeyspace.HINTS);
- hintStore.clearUnsafe();
- }
-
- // Test compaction of hints column family. It shouldn't remove all columns on compaction.
- @Test
- public void testCompactionOfHintsCF() throws Exception
- {
- // prepare hints column family
- Keyspace systemKeyspace = Keyspace.open("system");
- ColumnFamilyStore hintStore = systemKeyspace.getColumnFamilyStore(SystemKeyspace.HINTS);
- hintStore.clearUnsafe();
- hintStore.metadata.gcGraceSeconds(36000); // 10 hours
- hintStore.disableAutoCompaction();
-
- // insert 1 hint
- Mutation rm = mutation();
- HintedHandOffManager.instance.hintFor(rm,
- System.currentTimeMillis(),
- HintedHandOffManager.calculateHintTTL(rm),
- UUID.randomUUID())
- .applyUnsafe();
-
- // flush data to disk
- hintStore.forceBlockingFlush();
- assertEquals(1, hintStore.getLiveSSTables().size());
-
- // submit compaction
- HintedHandOffManager.instance.compact();
-
- // single row should not be removed because of gc_grace_seconds
- // is 10 hours and there are no any tombstones in sstable
- assertEquals(1, hintStore.getLiveSSTables().size());
- }
-
- @Test
- public void testHintsMetrics() throws Exception
- {
- for (int i = 0; i < 99; i++)
- HintedHandOffManager.instance.metrics.incrPastWindow(InetAddress.getLocalHost());
- HintedHandOffManager.instance.metrics.log();
-
- UntypedResultSet rows = executeInternal("SELECT hints_dropped FROM system." + SystemKeyspace.PEER_EVENTS);
- Map<UUID, Integer> returned = rows.one().getMap("hints_dropped", UUIDType.instance, Int32Type.instance);
- assertEquals(Iterators.getLast(returned.values().iterator()).intValue(), 99);
- }
-
- @Test(timeout = 5000)
- public void testTruncateHints() throws Exception
- {
- // insert 1 hint
- Mutation rm = mutation();
- HintedHandOffManager.instance.hintFor(rm,
- System.currentTimeMillis(),
- HintedHandOffManager.calculateHintTTL(rm),
- UUID.randomUUID())
- .applyUnsafe();
-
- assert getNoOfHints() == 1;
-
- HintedHandOffManager.instance.truncateAllHints();
-
- while(getNoOfHints() > 0)
- {
- Thread.sleep(100);
- }
-
- assert getNoOfHints() == 0;
- }
-
- private Mutation mutation()
- {
- // get a random mutation to write a hint for
- return new RowUpdateBuilder(Keyspace.open(KEYSPACE4).getColumnFamilyStore(STANDARD1_CF).metadata,
- FBUtilities.timestampMicros(),
- ByteBufferUtil.bytes(1))
- .clustering("cluster_col0")
- .add("val", "value0")
- .build();
- }
-
- private int getNoOfHints()
- {
- String req = "SELECT * FROM system.%s";
- UntypedResultSet resultSet = executeInternal(String.format(req, SystemKeyspace.HINTS));
- return resultSet.size();
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index b41b7b3..fcdab62 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@ -250,7 +250,7 @@ public class CommitLogTest
// Adding new mutation on another CF, large enough (including CL entry overhead) that a new segment is created
Mutation rm2 = new RowUpdateBuilder(cfs2.metadata, 0, "k")
.clustering("bytes")
- .add("val", ByteBuffer.allocate((DatabaseDescriptor.getCommitLogSegmentSize()/2) - 200))
+ .add("val", ByteBuffer.allocate(DatabaseDescriptor.getMaxMutationSize() - 200))
.build();
CommitLog.instance.add(rm2);
// also forces a new segment, since each entry-with-overhead is just under half the CL size
@@ -280,7 +280,7 @@ public class CommitLogTest
.clustering(colName)
.add("val", ByteBuffer.allocate(allocSize)).build();
- int max = (DatabaseDescriptor.getCommitLogSegmentSize() / 2);
+ int max = DatabaseDescriptor.getMaxMutationSize();
max -= CommitLogSegment.ENTRY_OVERHEAD_SIZE; // log entry overhead
// Note that the size of the value if vint encoded. So we first compute the ovehead of the mutation without the value and it's size
http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/test/unit/org/apache/cassandra/hints/ChecksummedDataInputTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/hints/ChecksummedDataInputTest.java b/test/unit/org/apache/cassandra/hints/ChecksummedDataInputTest.java
new file mode 100644
index 0000000..e431924
--- /dev/null
+++ b/test/unit/org/apache/cassandra/hints/ChecksummedDataInputTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.zip.CRC32;
+
+import org.junit.Test;
+
+import org.apache.cassandra.hints.ChecksummedDataInput;
+import org.apache.cassandra.io.util.AbstractDataInput;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
+
+public class ChecksummedDataInputTest
+{
+ @Test
+ public void testThatItWorks() throws IOException
+ {
+ // fill a bytebuffer with some input
+ DataOutputBuffer out = new DataOutputBuffer();
+ out.write(127);
+ out.write(new byte[]{ 0, 1, 2, 3, 4, 5, 6 });
+ out.writeBoolean(false);
+ out.writeByte(10);
+ out.writeChar('t');
+ out.writeDouble(3.3);
+ out.writeFloat(2.2f);
+ out.writeInt(42);
+ out.writeLong(Long.MAX_VALUE);
+ out.writeShort(Short.MIN_VALUE);
+ out.writeUTF("utf");
+ ByteBuffer buffer = out.buffer();
+
+ // calculate resulting CRC
+ CRC32 crc = new CRC32();
+ FBUtilities.updateChecksum(crc, buffer);
+ int expectedCRC = (int) crc.getValue();
+
+ ChecksummedDataInput crcInput = ChecksummedDataInput.wrap(new DummyByteBufferDataInput(buffer.duplicate()));
+ crcInput.limit(buffer.remaining());
+
+ // assert that we read all the right values back
+ assertEquals(127, crcInput.read());
+ byte[] bytes = new byte[7];
+ crcInput.readFully(bytes);
+ assertTrue(Arrays.equals(new byte[]{ 0, 1, 2, 3, 4, 5, 6 }, bytes));
+ assertEquals(false, crcInput.readBoolean());
+ assertEquals(10, crcInput.readByte());
+ assertEquals('t', crcInput.readChar());
+ assertEquals(3.3, crcInput.readDouble());
+ assertEquals(2.2f, crcInput.readFloat());
+ assertEquals(42, crcInput.readInt());
+ assertEquals(Long.MAX_VALUE, crcInput.readLong());
+ assertEquals(Short.MIN_VALUE, crcInput.readShort());
+ assertEquals("utf", crcInput.readUTF());
+
+ // assert that the crc matches, and that we've read exactly as many bytes as expected
+ assertEquals(0, crcInput.bytesRemaining());
+ assertEquals(expectedCRC, crcInput.getCrc());
+ }
+
+ private static final class DummyByteBufferDataInput extends AbstractDataInput
+ {
+ private final ByteBuffer buffer;
+
+ DummyByteBufferDataInput(ByteBuffer buffer)
+ {
+ this.buffer = buffer;
+ }
+
+ public void seek(long position)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public long getPosition()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public long getPositionLimit()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public int read()
+ {
+ return buffer.get() & 0xFF;
+ }
+ }
+}