You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/04/18 02:15:50 UTC
[3/4] git commit: Merge branch 'cassandra-1.2' into cassandra-2.0
Merge branch 'cassandra-1.2' into cassandra-2.0
Conflicts:
src/java/org/apache/cassandra/db/BatchlogManager.java
src/java/org/apache/cassandra/db/ColumnFamilyStore.java
src/java/org/apache/cassandra/db/HintedHandOffManager.java
src/java/org/apache/cassandra/db/RowMutation.java
src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
src/java/org/apache/cassandra/service/StorageProxy.java
test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
test/unit/org/apache/cassandra/db/HintedHandOffTest.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/384de4b8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/384de4b8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/384de4b8
Branch: refs/heads/cassandra-2.1
Commit: 384de4b85d93dd3a5d23fcc93bcd0f55aba59941
Parents: 7dbbe92 8709706
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Fri Apr 18 02:40:19 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Fri Apr 18 02:40:19 2014 +0300
----------------------------------------------------------------------
CHANGES.txt | 5 +
.../apache/cassandra/db/BatchlogManager.java | 102 +++++++++++--------
.../apache/cassandra/db/ColumnFamilyStore.java | 6 --
.../cassandra/db/HintedHandOffManager.java | 21 +---
.../org/apache/cassandra/db/SystemKeyspace.java | 60 +++++++----
.../db/commitlog/CommitLogReplayer.java | 12 +--
.../apache/cassandra/service/StorageProxy.java | 9 +-
.../cassandra/db/BatchlogManagerTest.java | 78 ++++++++++++--
.../apache/cassandra/db/HintedHandOffTest.java | 19 ++--
9 files changed, 205 insertions(+), 107 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/384de4b8/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 451d046,bb08a37..ad26f6d
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,58 -1,4 +1,63 @@@
-1.2.17
++2.0.8
++Merged from 1.2:
++ * Fix batchlog to account for CF truncation records (CASSANDRA-6999)
++
++
+2.0.7
+ * Put nodes in hibernate when join_ring is false (CASSANDRA-6961)
+ * Allow compaction of system tables during startup (CASSANDRA-6913)
+ * Restrict Windows to parallel repairs (CASSANDRA-6907)
+ * (Hadoop) Allow manually specifying start/end tokens in CFIF (CASSANDRA-6436)
+ * Fix NPE in MeteredFlusher (CASSANDRA-6820)
+ * Fix race processing range scan responses (CASSANDRA-6820)
+ * Allow deleting snapshots from dropped keyspaces (CASSANDRA-6821)
+ * Add uuid() function (CASSANDRA-6473)
+ * Omit tombstones from schema digests (CASSANDRA-6862)
+ * Include correct consistencyLevel in LWT timeout (CASSANDRA-6884)
+ * Lower chances for losing new SSTables during nodetool refresh and
+ ColumnFamilyStore.loadNewSSTables (CASSANDRA-6514)
+ * Add support for DELETE ... IF EXISTS to CQL3 (CASSANDRA-5708)
+ * Update hadoop_cql3_word_count example (CASSANDRA-6793)
+ * Fix handling of RejectedExecution in sync Thrift server (CASSANDRA-6788)
+ * Log more information when exceeding tombstone_warn_threshold (CASSANDRA-6865)
+ * Fix truncate to not abort due to unreachable fat clients (CASSANDRA-6864)
+ * Fix schema concurrency exceptions (CASSANDRA-6841)
+ * Fix leaking validator FH in StreamWriter (CASSANDRA-6832)
+ * Fix saving triggers to schema (CASSANDRA-6789)
+ * Fix trigger mutations when base mutation list is immutable (CASSANDRA-6790)
+ * Fix accounting in FileCacheService to allow re-using RAR (CASSANDRA-6838)
+ * Fix static counter columns (CASSANDRA-6827)
+ * Restore expiring->deleted (cell) compaction optimization (CASSANDRA-6844)
+ * Fix CompactionManager.needsCleanup (CASSANDRA-6845)
+ * Correctly compare BooleanType values other than 0 and 1 (CASSANDRA-6779)
+ * Read message id as string from earlier versions (CASSANDRA-6840)
+ * Properly use the Paxos consistency for (non-protocol) batch (CASSANDRA-6837)
+ * Add paranoid disk failure option (CASSANDRA-6646)
+ * Improve PerRowSecondaryIndex performance (CASSANDRA-6876)
+ * Extend triggers to support CAS updates (CASSANDRA-6882)
+ * Static columns with IF NOT EXISTS don't always work as expected (CASSANDRA-6873)
+ * Fix paging with SELECT DISTINCT (CASSANDRA-6857)
+ * Fix UnsupportedOperationException on CAS timeout (CASSANDRA-6923)
+ * Improve MeteredFlusher handling of MF-unaffected column families
+ (CASSANDRA-6867)
+ * Add CqlRecordReader using native pagination (CASSANDRA-6311)
+ * Add QueryHandler interface (CASSANDRA-6659)
+ * Track liveRatio per-memtable, not per-CF (CASSANDRA-6945)
+ * Make sure upgradesstables keeps sstable level (CASSANDRA-6958)
+ * Fix LIMT with static columns (CASSANDRA-6956)
+ * Fix clash with CQL column name in thrift validation (CASSANDRA-6892)
+ * Fix error with super columns in mixed 1.2-2.0 clusters (CASSANDRA-6966)
+ * Fix bad skip of sstables on slice query with composite start/finish (CASSANDRA-6825)
+ * Fix unintended update with conditional statement (CASSANDRA-6893)
+ * Fix map element access in IF (CASSANDRA-6914)
+ * Avoid costly range calculations for range queries on system keyspaces
+ (CASSANDRA-6906)
+ * Fix SSTable not released if stream session fails (CASSANDRA-6818)
+ * Avoid build failure due to ANTLR timeout (CASSANDRA-6991)
+Merged from 1.2:
+ * Add UNLOGGED, COUNTER options to BATCH documentation (CASSANDRA-6816)
+ * add extra SSL cipher suites (CASSANDRA-6613)
+ * fix nodetool getsstables for blob PK (CASSANDRA-6803)
* Fix BatchlogManager#deleteBatch() use of millisecond timsestamps
(CASSANDRA-6822)
* Continue assassinating even if the endpoint vanishes (CASSANDRA-6787)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/384de4b8/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/BatchlogManager.java
index d65a2b0,ea32e9d..5770994
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@@ -35,7 -32,8 +32,6 @@@ import javax.management.ObjectName
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
--import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
import com.google.common.util.concurrent.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@@ -251,45 -252,72 +247,72 @@@ public class BatchlogManager implement
{
DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(data));
int size = in.readInt();
- List<RowMutation> mutations = new ArrayList<RowMutation>(size);
++ List<RowMutation> mutations = new ArrayList<>(size);
+
for (int i = 0; i < size; i++)
- replaySerializedMutation(RowMutation.serializer.deserialize(in, version), writtenAt, version, rateLimiter);
+ {
- RowMutation mutation = RowMutation.serializer.deserialize(in, VERSION);
++ RowMutation mutation = RowMutation.serializer.deserialize(in, version);
+
+ // Remove CFs that have been truncated since. writtenAt and SystemTable#getTruncatedAt() both return millis.
+ // We don't abort the replay entirely b/c this can be considered a succes (truncated is same as delivered then
+ // truncated.
+ for (UUID cfId : mutation.getColumnFamilyIds())
- if (writtenAt <= SystemTable.getTruncatedAt(cfId))
++ if (writtenAt <= SystemKeyspace.getTruncatedAt(cfId))
+ mutation = mutation.without(cfId);
+
+ if (!mutation.isEmpty())
+ mutations.add(mutation);
+ }
+
+ if (!mutations.isEmpty())
- replayMutations(mutations, writtenAt, rateLimiter);
++ replayMutations(mutations, writtenAt, version, rateLimiter);
}
/*
* We try to deliver the mutations to the replicas ourselves if they are alive and only resort to writing hints
* when a replica is down or a write request times out.
*/
- private void replaySerializedMutation(RowMutation mutation, long writtenAt, int version, RateLimiter rateLimiter)
- private void replayMutations(List<RowMutation> mutations, long writtenAt, RateLimiter rateLimiter) throws IOException
++ private void replayMutations(List<RowMutation> mutations, long writtenAt, int version, RateLimiter rateLimiter) throws IOException
{
- int ttl = calculateHintTTL(mutation, writtenAt);
+ int ttl = calculateHintTTL(mutations, writtenAt);
if (ttl <= 0)
- return; // the mutation isn't safe to replay.
-
- Set<InetAddress> liveEndpoints = new HashSet<>();
- String ks = mutation.getKeyspaceName();
- Token<?> tk = StorageService.getPartitioner().getToken(mutation.key());
- int mutationSize = (int) RowMutation.serializer.serializedSize(mutation, version);
+ return; // this batchlog entry has 'expired'
- for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk),
- StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks)))
+ for (RowMutation mutation : mutations)
{
- rateLimiter.acquire(mutationSize);
- if (endpoint.equals(FBUtilities.getBroadcastAddress()))
- mutation.apply();
- else if (FailureDetector.instance.isAlive(endpoint))
- liveEndpoints.add(endpoint); // will try delivering directly instead of writing a hint.
- else
- StorageProxy.writeHintForMutation(mutation, ttl, endpoint);
- }
- Set<InetAddress> liveEndpoints = Sets.newHashSet();
- List<InetAddress> hintEndpoints = Lists.newArrayList();
++ List<InetAddress> liveEndpoints = new ArrayList<>();
++ List<InetAddress> hintEndpoints = new ArrayList<>();
- if (!liveEndpoints.isEmpty())
- attemptDirectDelivery(mutation, writtenAt, liveEndpoints);
- String ks = mutation.getTable();
++ String ks = mutation.getKeyspaceName();
+ Token tk = StorageService.getPartitioner().getToken(mutation.key());
- int mutationSize = (int) RowMutation.serializer.serializedSize(mutation, VERSION);
++ int mutationSize = (int) RowMutation.serializer.serializedSize(mutation, version);
+
+ for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk),
+ StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks)))
+ {
+ rateLimiter.acquire(mutationSize);
+ if (endpoint.equals(FBUtilities.getBroadcastAddress()))
+ mutation.apply();
+ else if (FailureDetector.instance.isAlive(endpoint))
+ liveEndpoints.add(endpoint); // will try delivering directly instead of writing a hint.
+ else
+ hintEndpoints.add(endpoint);
+ }
+
+ if (!liveEndpoints.isEmpty())
+ hintEndpoints.addAll(attemptDirectDelivery(mutation, liveEndpoints));
+
+ for (InetAddress endpoint : hintEndpoints)
+ StorageProxy.writeHintForMutation(mutation, writtenAt, ttl, endpoint);
+ }
}
- private void attemptDirectDelivery(RowMutation mutation, long writtenAt, Set<InetAddress> endpoints)
+ // Returns the endpoints we failed to deliver to.
- private Set<InetAddress> attemptDirectDelivery(RowMutation mutation, Set<InetAddress> endpoints) throws IOException
++ private Set<InetAddress> attemptDirectDelivery(RowMutation mutation, List<InetAddress> endpoints) throws IOException
{
- List<WriteResponseHandler> handlers = Lists.newArrayList();
- final CopyOnWriteArraySet<InetAddress> undelivered = new CopyOnWriteArraySet<>(endpoints);
- final List<WriteResponseHandler> handlers = Lists.newArrayList();
++ final List<WriteResponseHandler> handlers = new ArrayList<>();
+ final Set<InetAddress> undelivered = Collections.synchronizedSet(new HashSet<InetAddress>());
+
for (final InetAddress ep : endpoints)
{
Runnable callback = new Runnable()
@@@ -317,20 -345,19 +340,19 @@@
}
}
- if (!undelivered.isEmpty())
- {
- int ttl = calculateHintTTL(mutation, writtenAt); // recalculate ttl
- if (ttl > 0)
- for (InetAddress endpoint : undelivered)
- StorageProxy.writeHintForMutation(mutation, ttl, endpoint);
- }
+ return undelivered;
}
- // calculate ttl for the mutation's hint (and reduce ttl by the time the mutation spent in the batchlog).
- // this ensures that deletes aren't "undone" by an old batch replay.
- private int calculateHintTTL(RowMutation mutation, long writtenAt)
+ /*
+ * Calculate ttl for the mutations' hints (and reduce ttl by the time the mutations spent in the batchlog).
+ * This ensures that deletes aren't "undone" by an old batch replay.
+ */
+ private int calculateHintTTL(List<RowMutation> mutations, long writtenAt)
{
- return (int) ((HintedHandOffManager.calculateHintTTL(mutation) * 1000 - (System.currentTimeMillis() - writtenAt)) / 1000);
+ int unadjustedTTL = Integer.MAX_VALUE;
+ for (RowMutation mutation : mutations)
- unadjustedTTL = Math.min(unadjustedTTL, mutation.calculateHintTTL());
++ unadjustedTTL = Math.min(unadjustedTTL, HintedHandOffManager.calculateHintTTL(mutation));
+ return unadjustedTTL - (int) TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - writtenAt);
}
private static ByteBuffer columnName(String name)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/384de4b8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 792c155,1100fb9..36bc470
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -2415,16 -2096,4 +2415,10 @@@ public class ColumnFamilyStore implemen
{
return getDataTracker().getDroppableTombstoneRatio();
}
+
- public long getTruncationTime()
- {
- Pair<ReplayPosition, Long> truncationRecord = SystemKeyspace.getTruncationRecords().get(metadata.cfId);
- return truncationRecord == null ? Long.MIN_VALUE : truncationRecord.right;
- }
-
+ @VisibleForTesting
+ void resetFileIndexGenerator()
+ {
+ fileIndexGenerator.set(0);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/384de4b8/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/HintedHandOffManager.java
index b9914a6,427bbf2..942707e
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@@ -114,45 -110,7 +113,45 @@@ public class HintedHandOffManager imple
new NamedThreadFactory("HintedHandoff", Thread.MIN_PRIORITY),
"internal");
- private final ColumnFamilyStore hintStore = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.HINTS_CF);
+ private final ColumnFamilyStore hintStore = Keyspace.open(Keyspace.SYSTEM_KS).getColumnFamilyStore(SystemKeyspace.HINTS_CF);
+
+ /**
+ * Returns a mutation representing a Hint to be sent to <code>targetId</code>
+ * as soon as it becomes available again.
+ */
- public RowMutation hintFor(RowMutation mutation, int ttl, UUID targetId)
++ public RowMutation hintFor(RowMutation mutation, long now, int ttl, UUID targetId)
+ {
+ assert ttl > 0;
+
+ InetAddress endpoint = StorageService.instance.getTokenMetadata().getEndpointForHostId(targetId);
+ // during tests we may not have a matching endpoint, but this would be unexpected in real clusters
+ if (endpoint != null)
+ metrics.incrCreatedHints(endpoint);
+ else
+ logger.warn("Unable to find matching endpoint for target {} when storing a hint", targetId);
+
+ UUID hintId = UUIDGen.getTimeUUID();
+ // serialize the hint with id and version as a composite column name
+ ByteBuffer name = comparator.decompose(hintId, MessagingService.current_version);
+ ByteBuffer value = ByteBuffer.wrap(FBUtilities.serialize(mutation, RowMutation.serializer, MessagingService.current_version));
+ ColumnFamily cf = ArrayBackedSortedColumns.factory.create(Schema.instance.getCFMetaData(Keyspace.SYSTEM_KS, SystemKeyspace.HINTS_CF));
- cf.addColumn(name, value, System.currentTimeMillis(), ttl);
++ cf.addColumn(name, value, now, ttl);
+ return new RowMutation(Keyspace.SYSTEM_KS, UUIDType.instance.decompose(targetId), cf);
+ }
+
+ /*
+ * determine the TTL for the hint RowMutation
+ * this is set at the smallest GCGraceSeconds for any of the CFs in the RM
+ * this ensures that deletes aren't "undone" by delivery of an old hint
+ */
+ public static int calculateHintTTL(RowMutation mutation)
+ {
+ int ttl = maxHintTTL;
+ for (ColumnFamily cf : mutation.getColumnFamilies())
+ ttl = Math.min(ttl, cf.metadata().getGcGraceSeconds());
+ return ttl;
+ }
+
public void start()
{
@@@ -388,8 -337,8 +387,7 @@@
}
List<WriteResponseHandler> responseHandlers = Lists.newArrayList();
- Map<UUID, Long> truncationTimesCache = new HashMap<UUID, Long>();
-
- for (final IColumn hint : hintsPage.getSortedColumns())
+ for (final Column hint : hintsPage)
{
// check if hints delivery has been paused during the process
if (hintedHandOffPaused)
@@@ -427,20 -376,11 +425,11 @@@
throw new AssertionError(e);
}
- truncationTimesCache.clear();
- for (UUID cfId : ImmutableSet.copyOf((rm.getColumnFamilyIds())))
+ for (UUID cfId : rm.getColumnFamilyIds())
{
- Long truncatedAt = truncationTimesCache.get(cfId);
- if (truncatedAt == null)
- if (hint.maxTimestamp() <= SystemTable.getTruncatedAt(cfId))
++ if (hint.maxTimestamp() <= SystemKeyspace.getTruncatedAt(cfId))
{
- ColumnFamilyStore cfs = Keyspace.open(rm.getKeyspaceName()).getColumnFamilyStore(cfId);
- truncatedAt = cfs.getTruncationTime();
- truncationTimesCache.put(cfId, truncatedAt);
- }
-
- if (hint.maxTimestamp() < truncatedAt)
- {
- logger.debug("Skipping delivery of hint for truncated columnfamily {}" + cfId);
+ logger.debug("Skipping delivery of hint for truncated columnfamily {}", cfId);
rm = rm.without(cfId);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/384de4b8/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/SystemKeyspace.java
index 15d8538,0000000..fe8f179
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@@ -1,965 -1,0 +1,987 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.*;
+import javax.management.openmbean.*;
+
+import com.google.common.base.Function;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.SetMultimap;
+import com.google.common.collect.Sets;
-
- import org.apache.cassandra.db.compaction.CompactionHistoryTabularData;
- import org.apache.cassandra.metrics.RestorableMeter;
- import org.apache.cassandra.transport.Server;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
++import org.apache.cassandra.db.compaction.CompactionHistoryTabularData;
+import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.locator.IEndpointSnitch;
++import org.apache.cassandra.metrics.RestorableMeter;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.paxos.Commit;
+import org.apache.cassandra.service.paxos.PaxosState;
+import org.apache.cassandra.thrift.cassandraConstants;
++import org.apache.cassandra.transport.Server;
+import org.apache.cassandra.utils.*;
+
+import static org.apache.cassandra.cql3.QueryProcessor.processInternal;
+
+public class SystemKeyspace
+{
+ private static final Logger logger = LoggerFactory.getLogger(SystemKeyspace.class);
+
+ // see CFMetaData for schema definitions
+ public static final String PEERS_CF = "peers";
+ public static final String PEER_EVENTS_CF = "peer_events";
+ public static final String LOCAL_CF = "local";
+ public static final String INDEX_CF = "IndexInfo";
+ public static final String COUNTER_ID_CF = "NodeIdInfo";
+ public static final String HINTS_CF = "hints";
+ public static final String RANGE_XFERS_CF = "range_xfers";
+ public static final String BATCHLOG_CF = "batchlog";
+ // see layout description in the DefsTables class header
+ public static final String SCHEMA_KEYSPACES_CF = "schema_keyspaces";
+ public static final String SCHEMA_COLUMNFAMILIES_CF = "schema_columnfamilies";
+ public static final String SCHEMA_COLUMNS_CF = "schema_columns";
+ public static final String SCHEMA_TRIGGERS_CF = "schema_triggers";
+ public static final String COMPACTION_LOG = "compactions_in_progress";
+ public static final String PAXOS_CF = "paxos";
+ public static final String SSTABLE_ACTIVITY_CF = "sstable_activity";
+ public static final String COMPACTION_HISTORY_CF = "compaction_history";
+
+ private static final String LOCAL_KEY = "local";
+ private static final ByteBuffer ALL_LOCAL_NODE_ID_KEY = ByteBufferUtil.bytes("Local");
+
++ private static volatile Map<UUID, Pair<ReplayPosition, Long>> truncationRecords;
++
+ public enum BootstrapState
+ {
+ NEEDS_BOOTSTRAP,
+ COMPLETED,
+ IN_PROGRESS
+ }
+
+ private static DecoratedKey decorate(ByteBuffer key)
+ {
+ return StorageService.getPartitioner().decorateKey(key);
+ }
+
+ public static void finishStartup()
+ {
+ setupVersion();
+
+ copyAllAliasesToColumnsProper();
+
+ // add entries to system schema columnfamilies for the hardcoded system definitions
+ for (String ksname : Schema.systemKeyspaceNames)
+ {
+ KSMetaData ksmd = Schema.instance.getKSMetaData(ksname);
+
+ // delete old, possibly obsolete entries in schema columnfamilies
+ for (String cfname : Arrays.asList(SystemKeyspace.SCHEMA_KEYSPACES_CF, SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF, SystemKeyspace.SCHEMA_COLUMNS_CF))
+ {
+ String req = String.format("DELETE FROM system.%s WHERE keyspace_name = '%s'", cfname, ksmd.name);
+ processInternal(req);
+ }
+
+ // (+1 to timestamp to make sure we don't get shadowed by the tombstones we just added)
+ ksmd.toSchema(FBUtilities.timestampMicros() + 1).apply();
+ }
+ }
+
+ // Starting with 2.0 (CASSANDRA-5125) we keep all the 'aliases' in system.schema_columns together with the regular columns,
+ // but only for the newly-created tables. This migration is for the pre-2.0 created tables.
+ private static void copyAllAliasesToColumnsProper()
+ {
+ for (UntypedResultSet.Row row : processInternal(String.format("SELECT * FROM system.%s", SCHEMA_COLUMNFAMILIES_CF)))
+ {
+ CFMetaData table = CFMetaData.fromSchema(row);
+ String query = String.format("SELECT writetime(type) "
+ + "FROM system.%s "
+ + "WHERE keyspace_name = '%s' AND columnfamily_name = '%s'",
+ SCHEMA_COLUMNFAMILIES_CF,
+ table.ksName,
+ table.cfName);
+ long timestamp = processInternal(query).one().getLong("writetime(type)");
+ try
+ {
+ table.toSchema(timestamp).apply();
+ }
+ catch (ConfigurationException e)
+ {
+ // shouldn't happen
+ }
+ }
+ }
+
+ private static void setupVersion()
+ {
+ String req = "INSERT INTO system.%s (key, release_version, cql_version, thrift_version, native_protocol_version, data_center, rack, partitioner) VALUES ('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s')";
+ IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+ processInternal(String.format(req, LOCAL_CF,
+ LOCAL_KEY,
+ FBUtilities.getReleaseVersionString(),
+ QueryProcessor.CQL_VERSION.toString(),
+ cassandraConstants.VERSION,
+ Server.CURRENT_VERSION,
+ snitch.getDatacenter(FBUtilities.getBroadcastAddress()),
+ snitch.getRack(FBUtilities.getBroadcastAddress()),
+ DatabaseDescriptor.getPartitioner().getClass().getName()));
+ }
+
+ /**
+ * Write compaction log, except columfamilies under system keyspace.
+ *
+ * @param cfs
+ * @param toCompact sstables to compact
+ * @return compaction task id or null if cfs is under system keyspace
+ */
+ public static UUID startCompaction(ColumnFamilyStore cfs, Iterable<SSTableReader> toCompact)
+ {
+ if (Keyspace.SYSTEM_KS.equals(cfs.keyspace.getName()))
+ return null;
+
+ UUID compactionId = UUIDGen.getTimeUUID();
+ String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, inputs) VALUES (%s, '%s', '%s', {%s})";
+ Iterable<Integer> generations = Iterables.transform(toCompact, new Function<SSTableReader, Integer>()
+ {
+ public Integer apply(SSTableReader sstable)
+ {
+ return sstable.descriptor.generation;
+ }
+ });
+ processInternal(String.format(req, COMPACTION_LOG, compactionId, cfs.keyspace.getName(), cfs.name, StringUtils.join(Sets.newHashSet(generations), ',')));
+ forceBlockingFlush(COMPACTION_LOG);
+ return compactionId;
+ }
+
+ /**
+ * Deletes the entry for this compaction from the set of compactions in progress. The compaction does not need
+ * to complete successfully for this to be called.
+ * @param taskId what was returned from {@code startCompaction}
+ */
+ public static void finishCompaction(UUID taskId)
+ {
+ assert taskId != null;
+
+ String req = "DELETE FROM system.%s WHERE id = %s";
+ processInternal(String.format(req, COMPACTION_LOG, taskId));
+ forceBlockingFlush(COMPACTION_LOG);
+ }
+
+ /**
+ * Returns a Map whose keys are KS.CF pairs and whose values are maps from sstable generation numbers to the
+ * task ID of the compaction they were participating in.
+ */
+ public static Map<Pair<String, String>, Map<Integer, UUID>> getUnfinishedCompactions()
+ {
+ String req = "SELECT * FROM system.%s";
+ UntypedResultSet resultSet = processInternal(String.format(req, COMPACTION_LOG));
+
+ Map<Pair<String, String>, Map<Integer, UUID>> unfinishedCompactions = new HashMap<>();
+ for (UntypedResultSet.Row row : resultSet)
+ {
+ String keyspace = row.getString("keyspace_name");
+ String columnfamily = row.getString("columnfamily_name");
+ Set<Integer> inputs = row.getSet("inputs", Int32Type.instance);
+ UUID taskID = row.getUUID("id");
+
+ Pair<String, String> kscf = Pair.create(keyspace, columnfamily);
+ Map<Integer, UUID> generationToTaskID = unfinishedCompactions.get(kscf);
+ if (generationToTaskID == null)
+ generationToTaskID = new HashMap<>(inputs.size());
+
+ for (Integer generation : inputs)
+ generationToTaskID.put(generation, taskID);
+
+ unfinishedCompactions.put(kscf, generationToTaskID);
+ }
+ return unfinishedCompactions;
+ }
+
+ public static void discardCompactionsInProgress()
+ {
+ ColumnFamilyStore compactionLog = Keyspace.open(Keyspace.SYSTEM_KS).getColumnFamilyStore(COMPACTION_LOG);
+ compactionLog.truncateBlocking();
+ }
+
+ public static void updateCompactionHistory(String ksname,
+ String cfname,
+ long compactedAt,
+ long bytesIn,
+ long bytesOut,
+ Map<Integer, Long> rowsMerged)
+ {
+ // don't write anything when the history table itself is compacted, since that would in turn cause new compactions
+ if (ksname.equals("system") && cfname.equals(COMPACTION_HISTORY_CF))
+ return;
+ String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, compacted_at, bytes_in, bytes_out, rows_merged) "
+ + "VALUES (%s, '%s', '%s', %d, %d, %d, {%s})";
+ processInternal(String.format(req, COMPACTION_HISTORY_CF, UUIDGen.getTimeUUID().toString(), ksname, cfname, compactedAt, bytesIn, bytesOut, FBUtilities.toString(rowsMerged)));
+ }
+
+ public static TabularData getCompactionHistory() throws OpenDataException
+ {
+ UntypedResultSet queryResultSet = processInternal("SELECT * from system.compaction_history");
+ return CompactionHistoryTabularData.from(queryResultSet);
+ }
+
- public static void saveTruncationRecord(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position)
++ public static synchronized void saveTruncationRecord(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position)
+ {
+ String req = "UPDATE system.%s SET truncated_at = truncated_at + %s WHERE key = '%s'";
+ processInternal(String.format(req, LOCAL_CF, truncationAsMapEntry(cfs, truncatedAt, position), LOCAL_KEY));
++ truncationRecords = null;
+ forceBlockingFlush(LOCAL_CF);
+ }
+
+ /**
+ * This method is used to remove information about truncation time for specified column family
+ */
- public static void removeTruncationRecord(UUID cfId)
++ public static synchronized void removeTruncationRecord(UUID cfId)
+ {
+ String req = "DELETE truncated_at[%s] from system.%s WHERE key = '%s'";
+ processInternal(String.format(req, cfId, LOCAL_CF, LOCAL_KEY));
++ truncationRecords = null;
+ forceBlockingFlush(LOCAL_CF);
+ }
+
+ private static String truncationAsMapEntry(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position)
+ {
+ DataOutputBuffer out = new DataOutputBuffer();
+ try
+ {
+ ReplayPosition.serializer.serialize(position, out);
+ out.writeLong(truncatedAt);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ return String.format("{%s: 0x%s}",
+ cfs.metadata.cfId,
+ ByteBufferUtil.bytesToHex(ByteBuffer.wrap(out.getData(), 0, out.getLength())));
+ }
+
- public static Map<UUID, Pair<ReplayPosition, Long>> getTruncationRecords()
++ public static ReplayPosition getTruncatedPosition(UUID cfId)
++ {
++ Pair<ReplayPosition, Long> record = getTruncationRecord(cfId);
++ return record == null ? null : record.left;
++ }
++
++ public static long getTruncatedAt(UUID cfId)
+ {
- String req = "SELECT truncated_at FROM system.%s WHERE key = '%s'";
- UntypedResultSet rows = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
- if (rows.isEmpty())
- return Collections.emptyMap();
-
- UntypedResultSet.Row row = rows.one();
- Map<UUID, ByteBuffer> rawMap = row.getMap("truncated_at", UUIDType.instance, BytesType.instance);
- if (rawMap == null)
- return Collections.emptyMap();
-
- Map<UUID, Pair<ReplayPosition, Long>> positions = new HashMap<UUID, Pair<ReplayPosition, Long>>();
- for (Map.Entry<UUID, ByteBuffer> entry : rawMap.entrySet())
- positions.put(entry.getKey(), truncationRecordFromBlob(entry.getValue()));
- return positions;
++ Pair<ReplayPosition, Long> record = getTruncationRecord(cfId);
++ return record == null ? Long.MIN_VALUE : record.right;
++ }
++
++ private static synchronized Pair<ReplayPosition, Long> getTruncationRecord(UUID cfId)
++ {
++ if (truncationRecords == null)
++ truncationRecords = readTruncationRecords();
++ return truncationRecords.get(cfId);
++ }
++
++ private static Map<UUID, Pair<ReplayPosition, Long>> readTruncationRecords()
++ {
++ UntypedResultSet rows = processInternal(String.format("SELECT truncated_at FROM system.%s WHERE key = '%s'",
++ LOCAL_CF,
++ LOCAL_KEY));
++
++ Map<UUID, Pair<ReplayPosition, Long>> records = new HashMap<>();
++
++ if (!rows.isEmpty() && rows.one().has("truncated_at"))
++ {
++ Map<UUID, ByteBuffer> map = rows.one().getMap("truncated_at", UUIDType.instance, BytesType.instance);
++ for (Map.Entry<UUID, ByteBuffer> entry : map.entrySet())
++ records.put(entry.getKey(), truncationRecordFromBlob(entry.getValue()));
++ }
++
++ return records;
+ }
+
+ private static Pair<ReplayPosition, Long> truncationRecordFromBlob(ByteBuffer bytes)
+ {
+ try
+ {
+ DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(bytes));
+ return Pair.create(ReplayPosition.serializer.deserialize(in), in.available() > 0 ? in.readLong() : Long.MIN_VALUE);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Record tokens being used by another node
+ */
+ public static synchronized void updateTokens(InetAddress ep, Collection<Token> tokens)
+ {
+ if (ep.equals(FBUtilities.getBroadcastAddress()))
+ {
+ removeEndpoint(ep);
+ return;
+ }
+
+ String req = "INSERT INTO system.%s (peer, tokens) VALUES ('%s', %s)";
+ processInternal(String.format(req, PEERS_CF, ep.getHostAddress(), tokensAsSet(tokens)));
+ }
+
+ public static synchronized void updatePreferredIP(InetAddress ep, InetAddress preferred_ip)
+ {
+ String req = "INSERT INTO system.%s (peer, preferred_ip) VALUES ('%s', '%s')";
+ processInternal(String.format(req, PEERS_CF, ep.getHostAddress(), preferred_ip.getHostAddress()));
+ forceBlockingFlush(PEERS_CF);
+ }
+
+ public static synchronized void updatePeerInfo(InetAddress ep, String columnName, String value)
+ {
+ if (ep.equals(FBUtilities.getBroadcastAddress()))
+ return;
+
+ String req = "INSERT INTO system.%s (peer, %s) VALUES ('%s', %s)";
+ processInternal(String.format(req, PEERS_CF, columnName, ep.getHostAddress(), value));
+ }
+
+ public static synchronized void updateHintsDropped(InetAddress ep, UUID timePeriod, int value)
+ {
+ // with 30 day TTL
+ String req = "UPDATE system.%s USING TTL 2592000 SET hints_dropped[ %s ] = %s WHERE peer = '%s'";
+ processInternal(String.format(req, PEER_EVENTS_CF, timePeriod.toString(), value, ep.getHostAddress()));
+ }
+
+ public static synchronized void updateSchemaVersion(UUID version)
+ {
+ String req = "INSERT INTO system.%s (key, schema_version) VALUES ('%s', %s)";
+ processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, version.toString()));
+ }
+
+ private static String tokensAsSet(Collection<Token> tokens)
+ {
+ Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory();
+ StringBuilder sb = new StringBuilder();
+ sb.append("{");
+ Iterator<Token> iter = tokens.iterator();
+ while (iter.hasNext())
+ {
+ sb.append("'").append(factory.toString(iter.next())).append("'");
+ if (iter.hasNext())
+ sb.append(",");
+ }
+ sb.append("}");
+ return sb.toString();
+ }
+
+ private static Collection<Token> deserializeTokens(Collection<String> tokensStrings)
+ {
+ Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory();
+ List<Token> tokens = new ArrayList<Token>(tokensStrings.size());
+ for (String tk : tokensStrings)
+ tokens.add(factory.fromString(tk));
+ return tokens;
+ }
+
+ /**
+ * Remove stored tokens being used by another node
+ */
+ public static synchronized void removeEndpoint(InetAddress ep)
+ {
+ String req = "DELETE FROM system.%s WHERE peer = '%s'";
+ processInternal(String.format(req, PEERS_CF, ep.getHostAddress()));
+ }
+
+ /**
+ * This method is used to update the System Keyspace with the new tokens for this node
+ */
+ public static synchronized void updateTokens(Collection<Token> tokens)
+ {
+ assert !tokens.isEmpty() : "removeEndpoint should be used instead";
+ String req = "INSERT INTO system.%s (key, tokens) VALUES ('%s', %s)";
+ processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, tokensAsSet(tokens)));
+ forceBlockingFlush(LOCAL_CF);
+ }
+
+ /**
+ * Convenience method to update the list of tokens in the local system keyspace.
+ *
+ * @param addTokens tokens to add
+ * @param rmTokens tokens to remove
+ * @return the collection of persisted tokens
+ */
+ public static synchronized Collection<Token> updateLocalTokens(Collection<Token> addTokens, Collection<Token> rmTokens)
+ {
+ Collection<Token> tokens = getSavedTokens();
+ tokens.removeAll(rmTokens);
+ tokens.addAll(addTokens);
+ updateTokens(tokens);
+ return tokens;
+ }
+
+ public static void forceBlockingFlush(String cfname)
+ {
+ if (!Boolean.getBoolean("cassandra.unsafesystem"))
+ FBUtilities.waitOnFuture(Keyspace.open(Keyspace.SYSTEM_KS).getColumnFamilyStore(cfname).forceFlush());
+ }
+
+ /**
+ * Return a map of stored tokens to IP addresses
+ *
+ */
+ public static SetMultimap<InetAddress, Token> loadTokens()
+ {
+ SetMultimap<InetAddress, Token> tokenMap = HashMultimap.create();
+ for (UntypedResultSet.Row row : processInternal("SELECT peer, tokens FROM system." + PEERS_CF))
+ {
+ InetAddress peer = row.getInetAddress("peer");
+ if (row.has("tokens"))
+ tokenMap.putAll(peer, deserializeTokens(row.getSet("tokens", UTF8Type.instance)));
+ }
+
+ return tokenMap;
+ }
+
+ /**
+ * Return a map of store host_ids to IP addresses
+ *
+ */
+ public static Map<InetAddress, UUID> loadHostIds()
+ {
+ Map<InetAddress, UUID> hostIdMap = new HashMap<InetAddress, UUID>();
+ for (UntypedResultSet.Row row : processInternal("SELECT peer, host_id FROM system." + PEERS_CF))
+ {
+ InetAddress peer = row.getInetAddress("peer");
+ if (row.has("host_id"))
+ {
+ hostIdMap.put(peer, row.getUUID("host_id"));
+ }
+ }
+ return hostIdMap;
+ }
+
+ public static InetAddress getPreferredIP(InetAddress ep)
+ {
+ String req = "SELECT preferred_ip FROM system.%s WHERE peer='%s'";
+ UntypedResultSet result = processInternal(String.format(req, PEERS_CF, ep.getHostAddress()));
+ if (!result.isEmpty() && result.one().has("preferred_ip"))
+ return result.one().getInetAddress("preferred_ip");
+ return null;
+ }
+
+ /**
+ * Return a map of IP addresses containing a map of dc and rack info
+ */
+ public static Map<InetAddress, Map<String,String>> loadDcRackInfo()
+ {
+ Map<InetAddress, Map<String, String>> result = new HashMap<InetAddress, Map<String, String>>();
+ for (UntypedResultSet.Row row : processInternal("SELECT peer, data_center, rack from system." + PEERS_CF))
+ {
+ InetAddress peer = row.getInetAddress("peer");
+ if (row.has("data_center") && row.has("rack"))
+ {
+ Map<String, String> dcRack = new HashMap<String, String>();
+ dcRack.put("data_center", row.getString("data_center"));
+ dcRack.put("rack", row.getString("rack"));
+ result.put(peer, dcRack);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * One of three things will happen if you try to read the system keyspace:
+ * 1. files are present and you can read them: great
+ * 2. no files are there: great (new node is assumed)
+ * 3. files are present but you can't read them: bad
+ * @throws ConfigurationException
+ */
+ public static void checkHealth() throws ConfigurationException
+ {
+ Keyspace keyspace;
+ try
+ {
+ keyspace = Keyspace.open(Keyspace.SYSTEM_KS);
+ }
+ catch (AssertionError err)
+ {
+ // this happens when a user switches from OPP to RP.
+ ConfigurationException ex = new ConfigurationException("Could not read system keyspace!");
+ ex.initCause(err);
+ throw ex;
+ }
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(LOCAL_CF);
+
+ String req = "SELECT cluster_name FROM system.%s WHERE key='%s'";
+ UntypedResultSet result = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
+
+ if (result.isEmpty() || !result.one().has("cluster_name"))
+ {
+ // this is a brand new node
+ if (!cfs.getSSTables().isEmpty())
+ throw new ConfigurationException("Found system keyspace files, but they couldn't be loaded!");
+
+ // no system files. this is a new node.
+ req = "INSERT INTO system.%s (key, cluster_name) VALUES ('%s', '%s')";
+ processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, DatabaseDescriptor.getClusterName()));
+ return;
+ }
+
+ String savedClusterName = result.one().getString("cluster_name");
+ if (!DatabaseDescriptor.getClusterName().equals(savedClusterName))
+ throw new ConfigurationException("Saved cluster name " + savedClusterName + " != configured name " + DatabaseDescriptor.getClusterName());
+ }
+
+ public static Collection<Token> getSavedTokens()
+ {
+ String req = "SELECT tokens FROM system.%s WHERE key='%s'";
+ UntypedResultSet result = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
+ return result.isEmpty() || !result.one().has("tokens")
+ ? Collections.<Token>emptyList()
+ : deserializeTokens(result.one().<String>getSet("tokens", UTF8Type.instance));
+ }
+
+ public static int incrementAndGetGeneration()
+ {
+ String req = "SELECT gossip_generation FROM system.%s WHERE key='%s'";
+ UntypedResultSet result = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
+
+ int generation;
+ if (result.isEmpty() || !result.one().has("gossip_generation"))
+ {
+ // seconds-since-epoch isn't a foolproof new generation
+ // (where foolproof is "guaranteed to be larger than the last one seen at this ip address"),
+ // but it's as close as sanely possible
+ generation = (int) (System.currentTimeMillis() / 1000);
+ }
+ else
+ {
+ // Other nodes will ignore gossip messages about a node that have a lower generation than previously seen.
+ final int storedGeneration = result.one().getInt("gossip_generation") + 1;
+ final int now = (int) (System.currentTimeMillis() / 1000);
+ if (storedGeneration >= now)
+ {
+ logger.warn("Using stored Gossip Generation {} as it is greater than current system time {}. See CASSANDRA-3654 if you experience problems",
+ storedGeneration, now);
+ generation = storedGeneration;
+ }
+ else
+ {
+ generation = now;
+ }
+ }
+
+ req = "INSERT INTO system.%s (key, gossip_generation) VALUES ('%s', %d)";
+ processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, generation));
+ forceBlockingFlush(LOCAL_CF);
+
+ return generation;
+ }
+
+ public static BootstrapState getBootstrapState()
+ {
+ String req = "SELECT bootstrapped FROM system.%s WHERE key='%s'";
+ UntypedResultSet result = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
+
+ if (result.isEmpty() || !result.one().has("bootstrapped"))
+ return BootstrapState.NEEDS_BOOTSTRAP;
+
+ return BootstrapState.valueOf(result.one().getString("bootstrapped"));
+ }
+
+ public static boolean bootstrapComplete()
+ {
+ return getBootstrapState() == BootstrapState.COMPLETED;
+ }
+
+ public static boolean bootstrapInProgress()
+ {
+ return getBootstrapState() == BootstrapState.IN_PROGRESS;
+ }
+
+ public static void setBootstrapState(BootstrapState state)
+ {
+ String req = "INSERT INTO system.%s (key, bootstrapped) VALUES ('%s', '%s')";
+ processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, state.name()));
+ forceBlockingFlush(LOCAL_CF);
+ }
+
+ public static boolean isIndexBuilt(String keyspaceName, String indexName)
+ {
+ ColumnFamilyStore cfs = Keyspace.open(Keyspace.SYSTEM_KS).getColumnFamilyStore(INDEX_CF);
+ QueryFilter filter = QueryFilter.getNamesFilter(decorate(ByteBufferUtil.bytes(keyspaceName)),
+ INDEX_CF,
+ FBUtilities.singleton(ByteBufferUtil.bytes(indexName), cfs.getComparator()),
+ System.currentTimeMillis());
+ return ColumnFamilyStore.removeDeleted(cfs.getColumnFamily(filter), Integer.MAX_VALUE) != null;
+ }
+
+ public static void setIndexBuilt(String keyspaceName, String indexName)
+ {
+ ColumnFamily cf = ArrayBackedSortedColumns.factory.create(Keyspace.SYSTEM_KS, INDEX_CF);
+ cf.addColumn(new Column(ByteBufferUtil.bytes(indexName), ByteBufferUtil.EMPTY_BYTE_BUFFER, FBUtilities.timestampMicros()));
+ RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, ByteBufferUtil.bytes(keyspaceName), cf);
+ rm.apply();
+ }
+
+ public static void setIndexRemoved(String keyspaceName, String indexName)
+ {
+ RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, ByteBufferUtil.bytes(keyspaceName));
+ rm.delete(INDEX_CF, ByteBufferUtil.bytes(indexName), FBUtilities.timestampMicros());
+ rm.apply();
+ }
+
+ /**
+ * Read the host ID from the system keyspace, creating (and storing) one if
+ * none exists.
+ */
+ public static UUID getLocalHostId()
+ {
+ UUID hostId = null;
+
+ String req = "SELECT host_id FROM system.%s WHERE key='%s'";
+ UntypedResultSet result = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
+
+ // Look up the Host UUID (return it if found)
+ if (!result.isEmpty() && result.one().has("host_id"))
+ {
+ return result.one().getUUID("host_id");
+ }
+
+ // ID not found, generate a new one, persist, and then return it.
+ hostId = UUID.randomUUID();
+ logger.warn("No host ID found, created {} (Note: This should happen exactly once per node).", hostId);
+ return setLocalHostId(hostId);
+ }
+
+ /**
+ * Sets the local host ID explicitly. Should only be called outside of SystemTable when replacing a node.
+ */
+ public static UUID setLocalHostId(UUID hostId)
+ {
+ String req = "INSERT INTO system.%s (key, host_id) VALUES ('%s', %s)";
+ processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, hostId));
+ return hostId;
+ }
+
+ /**
+ * Read the current local node id from the system keyspace or null if no
+ * such node id is recorded.
+ */
+ public static CounterId getCurrentLocalCounterId()
+ {
+ Keyspace keyspace = Keyspace.open(Keyspace.SYSTEM_KS);
+
+ // Get the last CounterId (since CounterId are timeuuid is thus ordered from the older to the newer one)
+ QueryFilter filter = QueryFilter.getSliceFilter(decorate(ALL_LOCAL_NODE_ID_KEY),
+ COUNTER_ID_CF,
+ ByteBufferUtil.EMPTY_BYTE_BUFFER,
+ ByteBufferUtil.EMPTY_BYTE_BUFFER,
+ true,
+ 1,
+ System.currentTimeMillis());
+ ColumnFamily cf = keyspace.getColumnFamilyStore(COUNTER_ID_CF).getColumnFamily(filter);
+ if (cf != null && cf.getColumnCount() != 0)
+ return CounterId.wrap(cf.iterator().next().name());
+ else
+ return null;
+ }
+
+ /**
+ * Write a new current local node id to the system keyspace.
+ *
+ * @param newCounterId the new current local node id to record
+ * @param now microsecond time stamp.
+ */
+ public static void writeCurrentLocalCounterId(CounterId newCounterId, long now)
+ {
+ ByteBuffer ip = ByteBuffer.wrap(FBUtilities.getBroadcastAddress().getAddress());
+
+ ColumnFamily cf = ArrayBackedSortedColumns.factory.create(Keyspace.SYSTEM_KS, COUNTER_ID_CF);
+ cf.addColumn(new Column(newCounterId.bytes(), ip, now));
+ RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, ALL_LOCAL_NODE_ID_KEY, cf);
+ rm.apply();
+ forceBlockingFlush(COUNTER_ID_CF);
+ }
+
+ public static List<CounterId.CounterIdRecord> getOldLocalCounterIds()
+ {
+ List<CounterId.CounterIdRecord> l = new ArrayList<CounterId.CounterIdRecord>();
+
+ Keyspace keyspace = Keyspace.open(Keyspace.SYSTEM_KS);
+ QueryFilter filter = QueryFilter.getIdentityFilter(decorate(ALL_LOCAL_NODE_ID_KEY), COUNTER_ID_CF, System.currentTimeMillis());
+ ColumnFamily cf = keyspace.getColumnFamilyStore(COUNTER_ID_CF).getColumnFamily(filter);
+
+ CounterId previous = null;
+ for (Column c : cf)
+ {
+ if (previous != null)
+ l.add(new CounterId.CounterIdRecord(previous, c.timestamp()));
+
+ // this will ignore the last column on purpose since it is the
+ // current local node id
+ previous = CounterId.wrap(c.name());
+ }
+ return l;
+ }
+
+ /**
+ * @param cfName The name of the ColumnFamily responsible for part of the schema (keyspace, ColumnFamily, columns)
+ * @return CFS responsible to hold low-level serialized schema
+ */
+ public static ColumnFamilyStore schemaCFS(String cfName)
+ {
+ return Keyspace.open(Keyspace.SYSTEM_KS).getColumnFamilyStore(cfName);
+ }
+
+ public static List<Row> serializedSchema()
+ {
+ List<Row> schema = new ArrayList<>();
+
+ schema.addAll(serializedSchema(SCHEMA_KEYSPACES_CF));
+ schema.addAll(serializedSchema(SCHEMA_COLUMNFAMILIES_CF));
+ schema.addAll(serializedSchema(SCHEMA_COLUMNS_CF));
+ schema.addAll(serializedSchema(SCHEMA_TRIGGERS_CF));
+
+ return schema;
+ }
+
+ /**
+ * @param schemaCfName The name of the ColumnFamily responsible for part of the schema (keyspace, ColumnFamily, columns)
+ * @return low-level schema representation (each row represents individual Keyspace or ColumnFamily)
+ */
+ public static List<Row> serializedSchema(String schemaCfName)
+ {
+ Token minToken = StorageService.getPartitioner().getMinimumToken();
+
+ return schemaCFS(schemaCfName).getRangeSlice(new Range<RowPosition>(minToken.minKeyBound(), minToken.maxKeyBound()),
+ null,
+ new IdentityQueryFilter(),
+ Integer.MAX_VALUE,
+ System.currentTimeMillis());
+ }
+
+ public static Collection<RowMutation> serializeSchema()
+ {
+ Map<DecoratedKey, RowMutation> mutationMap = new HashMap<>();
+
+ serializeSchema(mutationMap, SCHEMA_KEYSPACES_CF);
+ serializeSchema(mutationMap, SCHEMA_COLUMNFAMILIES_CF);
+ serializeSchema(mutationMap, SCHEMA_COLUMNS_CF);
+ serializeSchema(mutationMap, SCHEMA_TRIGGERS_CF);
+
+ return mutationMap.values();
+ }
+
+ private static void serializeSchema(Map<DecoratedKey, RowMutation> mutationMap, String schemaCfName)
+ {
+ for (Row schemaRow : serializedSchema(schemaCfName))
+ {
+ if (Schema.ignoredSchemaRow(schemaRow))
+ continue;
+
+ RowMutation mutation = mutationMap.get(schemaRow.key);
+ if (mutation == null)
+ {
+ mutation = new RowMutation(Keyspace.SYSTEM_KS, schemaRow.key.key);
+ mutationMap.put(schemaRow.key, mutation);
+ }
+
+ mutation.add(schemaRow.cf);
+ }
+ }
+
+ public static Map<DecoratedKey, ColumnFamily> getSchema(String cfName)
+ {
+ Map<DecoratedKey, ColumnFamily> schema = new HashMap<DecoratedKey, ColumnFamily>();
+
+ for (Row schemaEntity : SystemKeyspace.serializedSchema(cfName))
+ schema.put(schemaEntity.key, schemaEntity.cf);
+
+ return schema;
+ }
+
+ public static ByteBuffer getSchemaKSKey(String ksName)
+ {
+ return AsciiType.instance.fromString(ksName);
+ }
+
+ public static Row readSchemaRow(String ksName)
+ {
+ DecoratedKey key = StorageService.getPartitioner().decorateKey(getSchemaKSKey(ksName));
+
+ ColumnFamilyStore schemaCFS = SystemKeyspace.schemaCFS(SCHEMA_KEYSPACES_CF);
+ ColumnFamily result = schemaCFS.getColumnFamily(QueryFilter.getIdentityFilter(key, SCHEMA_KEYSPACES_CF, System.currentTimeMillis()));
+
+ return new Row(key, result);
+ }
+
+ /**
+ * Fetches a subset of schema (table data, columns metadata or triggers) for the keyspace+table pair.
+ *
+ * @param schemaCfName the schema table to get the data from (schema_columnfamilies, schema_columns or schema_triggers)
+ * @param ksName the keyspace of the table we are interested in
+ * @param cfName the table we are interested in
+ * @return a Row containing the schema data of a particular type for the table
+ */
+ public static Row readSchemaRow(String schemaCfName, String ksName, String cfName)
+ {
+ DecoratedKey key = StorageService.getPartitioner().decorateKey(getSchemaKSKey(ksName));
+ ColumnFamilyStore schemaCFS = SystemKeyspace.schemaCFS(schemaCfName);
+ ColumnFamily cf = schemaCFS.getColumnFamily(key,
+ DefsTables.searchComposite(cfName, true),
+ DefsTables.searchComposite(cfName, false),
+ false,
+ Integer.MAX_VALUE,
+ System.currentTimeMillis());
+ return new Row(key, cf);
+ }
+
+ public static PaxosState loadPaxosState(ByteBuffer key, CFMetaData metadata)
+ {
+ String req = "SELECT * FROM system.%s WHERE row_key = 0x%s AND cf_id = %s";
+ UntypedResultSet results = processInternal(String.format(req, PAXOS_CF, ByteBufferUtil.bytesToHex(key), metadata.cfId));
+ if (results.isEmpty())
+ return new PaxosState(key, metadata);
+ UntypedResultSet.Row row = results.one();
+ Commit promised = row.has("in_progress_ballot")
+ ? new Commit(key, row.getUUID("in_progress_ballot"), EmptyColumns.factory.create(metadata))
+ : Commit.emptyCommit(key, metadata);
+ // either we have both a recently accepted ballot and update or we have neither
+ Commit accepted = row.has("proposal")
+ ? new Commit(key, row.getUUID("proposal_ballot"), ColumnFamily.fromBytes(row.getBytes("proposal")))
+ : Commit.emptyCommit(key, metadata);
+ // either most_recent_commit and most_recent_commit_at will both be set, or neither
+ Commit mostRecent = row.has("most_recent_commit")
+ ? new Commit(key, row.getUUID("most_recent_commit_at"), ColumnFamily.fromBytes(row.getBytes("most_recent_commit")))
+ : Commit.emptyCommit(key, metadata);
+ return new PaxosState(promised, accepted, mostRecent);
+ }
+
+ public static void savePaxosPromise(Commit promise)
+ {
+ String req = "UPDATE %s USING TIMESTAMP %d AND TTL %d SET in_progress_ballot = %s WHERE row_key = 0x%s AND cf_id = %s";
+ processInternal(String.format(req,
+ PAXOS_CF,
+ UUIDGen.microsTimestamp(promise.ballot),
+ paxosTtl(promise.update.metadata),
+ promise.ballot,
+ ByteBufferUtil.bytesToHex(promise.key),
+ promise.update.id()));
+ }
+
+ public static void savePaxosProposal(Commit proposal)
+ {
+ processInternal(String.format("UPDATE %s USING TIMESTAMP %d AND TTL %d SET proposal_ballot = %s, proposal = 0x%s WHERE row_key = 0x%s AND cf_id = %s",
+ PAXOS_CF,
+ UUIDGen.microsTimestamp(proposal.ballot),
+ paxosTtl(proposal.update.metadata),
+ proposal.ballot,
+ ByteBufferUtil.bytesToHex(proposal.update.toBytes()),
+ ByteBufferUtil.bytesToHex(proposal.key),
+ proposal.update.id()));
+ }
+
+ private static int paxosTtl(CFMetaData metadata)
+ {
+ // keep paxos state around for at least 3h
+ return Math.max(3 * 3600, metadata.getGcGraceSeconds());
+ }
+
+ public static void savePaxosCommit(Commit commit)
+ {
+ // We always erase the last proposal (with the commit timestamp to no erase more recent proposal in case the commit is old)
+ // even though that's really just an optimization since SP.beginAndRepairPaxos will exclude accepted proposal older than the mrc.
+ String cql = "UPDATE %s USING TIMESTAMP %d AND TTL %d SET proposal_ballot = null, proposal = null, most_recent_commit_at = %s, most_recent_commit = 0x%s WHERE row_key = 0x%s AND cf_id = %s";
+ processInternal(String.format(cql,
+ PAXOS_CF,
+ UUIDGen.microsTimestamp(commit.ballot),
+ paxosTtl(commit.update.metadata),
+ commit.ballot,
+ ByteBufferUtil.bytesToHex(commit.update.toBytes()),
+ ByteBufferUtil.bytesToHex(commit.key),
+ commit.update.id()));
+ }
+
+ /**
+ * Returns a RestorableMeter tracking the average read rate of a particular SSTable, restoring the last-seen rate
+ * from values in system.sstable_activity if present.
+ * @param keyspace the keyspace the sstable belongs to
+ * @param table the table the sstable belongs to
+ * @param generation the generation number for the sstable
+ */
+ public static RestorableMeter getSSTableReadMeter(String keyspace, String table, int generation)
+ {
+ String cql = "SELECT * FROM %s WHERE keyspace_name='%s' and columnfamily_name='%s' and generation=%d";
+ UntypedResultSet results = processInternal(String.format(cql,
+ SSTABLE_ACTIVITY_CF,
+ keyspace,
+ table,
+ generation));
+
+ if (results.isEmpty())
+ return new RestorableMeter();
+
+ UntypedResultSet.Row row = results.one();
+ double m15rate = row.getDouble("rate_15m");
+ double m120rate = row.getDouble("rate_120m");
+ return new RestorableMeter(m15rate, m120rate);
+ }
+
+ /**
+ * Writes the current read rates for a given SSTable to system.sstable_activity
+ */
+ public static void persistSSTableReadMeter(String keyspace, String table, int generation, RestorableMeter meter)
+ {
+ // Store values with a one-day TTL to handle corner cases where cleanup might not occur
+ String cql = "INSERT INTO %s (keyspace_name, columnfamily_name, generation, rate_15m, rate_120m) VALUES ('%s', '%s', %d, %f, %f) USING TTL 864000";
+ processInternal(String.format(cql,
+ SSTABLE_ACTIVITY_CF,
+ keyspace,
+ table,
+ generation,
+ meter.fifteenMinuteRate(),
+ meter.twoHourRate()));
+ }
+
+ /**
+ * Clears persisted read rates from system.sstable_activity for SSTables that have been deleted.
+ */
+ public static void clearSSTableReadMeter(String keyspace, String table, int generation)
+ {
+ String cql = "DELETE FROM %s WHERE keyspace_name='%s' AND columnfamily_name='%s' and generation=%d";
+ processInternal(String.format(cql, SSTABLE_ACTIVITY_CF, keyspace, table, generation));
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/384de4b8/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 56cd4ff,46d18b2..579b6ee
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@@ -80,8 -76,7 +79,7 @@@ public class CommitLogReplaye
ReplayPosition rp = ReplayPosition.getReplayPosition(cfs.getSSTables());
// but, if we've truncted the cf in question, then we need to need to start replay after the truncation
- Pair<ReplayPosition, Long> truncateRecord = truncationPositions.get(cfs.metadata.cfId);
- ReplayPosition truncatedAt = truncateRecord == null ? null : truncateRecord.left;
- ReplayPosition truncatedAt = SystemTable.getTruncatedPosition(cfs.metadata.cfId);
++ ReplayPosition truncatedAt = SystemKeyspace.getTruncatedPosition(cfs.metadata.cfId);
if (truncatedAt != null)
rp = replayPositionOrdering.max(Arrays.asList(rp, truncatedAt));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/384de4b8/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index 5a8dc8d,7ef3d72..dce7256
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -878,9 -560,9 +878,9 @@@ public class StorageProxy implements St
if (ttl > 0)
{
logger.debug("Adding hint for {}", target);
- writeHintForMutation(mutation, ttl, target);
+ writeHintForMutation(mutation, System.currentTimeMillis(), ttl, target);
// Notify the handler only for CL == ANY
- if (responseHandler != null && consistencyLevel == ConsistencyLevel.ANY)
+ if (responseHandler != null && responseHandler.consistencyLevel == ConsistencyLevel.ANY)
responseHandler.response(null);
}
else
@@@ -900,12 -582,20 +900,15 @@@
return (Future<Void>) StageManager.getStage(Stage.MUTATION).submit(runnable);
}
- public static void writeHintForMutation(RowMutation mutation, int ttl, InetAddress target)
+ /**
+ * @param now current time in milliseconds - relevant for hint replay handling of truncated CFs
+ */
- public static void writeHintForMutation(RowMutation mutation, long now, int ttl, InetAddress target) throws IOException
++ public static void writeHintForMutation(RowMutation mutation, long now, int ttl, InetAddress target)
{
assert ttl > 0;
UUID hostId = StorageService.instance.getTokenMetadata().getHostId(target);
- if ((hostId == null) && (MessagingService.instance().getVersion(target) < MessagingService.VERSION_12))
- {
- logger.warn("Unable to store hint for host with missing ID, {} (old node?)", target.toString());
- return;
- }
assert hostId != null : "Missing host ID for " + target.getHostAddress();
- HintedHandOffManager.instance.hintFor(mutation, ttl, hostId).apply();
- mutation.toHint(now, ttl, hostId).apply();
++ HintedHandOffManager.instance.hintFor(mutation, now, ttl, hostId).apply();
StorageMetrics.totalHints.inc();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/384de4b8/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
index 537278a,fd2812f..954c1f2
--- a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
@@@ -28,6 -31,8 +31,7 @@@ import org.apache.cassandra.Util
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
+ import org.apache.cassandra.db.commitlog.ReplayPosition;
-import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.UUIDGen;
@@@ -68,10 -72,10 +72,10 @@@ public class BatchlogManagerTest extend
}
// Flush the batchlog to disk (see CASSANDRA-6822).
- Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.BATCHLOG_CF).forceFlush();
+ Keyspace.open(Keyspace.SYSTEM_KS).getColumnFamilyStore(SystemKeyspace.BATCHLOG_CF).forceBlockingFlush();
- assertEquals(1000, BatchlogManager.instance.countAllBatches());
- assertEquals(0, BatchlogManager.instance.getTotalBatchesReplayed());
+ assertEquals(1000, BatchlogManager.instance.countAllBatches() - initialAllBatches);
+ assertEquals(0, BatchlogManager.instance.getTotalBatchesReplayed() - initialReplayedBatches);
// Force batchlog replay.
BatchlogManager.instance.replayAllFailedBatches();
@@@ -99,4 -103,66 +103,66 @@@
UntypedResultSet result = QueryProcessor.processInternal(String.format("SELECT count(*) FROM \"Keyspace1\".\"Standard1\""));
assertEquals(500, result.one().getLong("count"));
}
+
+ @Test
+ public void testTruncatedReplay() throws InterruptedException, ExecutionException
+ {
+ // Generate 2000 mutations (1000 batchlog entries) and put them all into the batchlog.
+ // Each batchlog entry with a mutation for Standard2 and Standard3.
+ // In the middle of the process, 'truncate' Standard2.
+ for (int i = 0; i < 1000; i++)
+ {
+ RowMutation mutation1 = new RowMutation("Keyspace1", bytes(i));
- mutation1.add(new QueryPath("Standard2", null, bytes(i)), bytes(i), 0);
++ mutation1.add("Standard2", bytes(i), bytes(i), 0);
+ RowMutation mutation2 = new RowMutation("Keyspace1", bytes(i));
- mutation2.add(new QueryPath("Standard3", null, bytes(i)), bytes(i), 0);
++ mutation2.add("Standard3", bytes(i), bytes(i), 0);
+ List<RowMutation> mutations = Lists.newArrayList(mutation1, mutation2);
+
+ // Make sure it's ready to be replayed, so adjust the timestamp.
+ long timestamp = System.currentTimeMillis() - DatabaseDescriptor.getWriteRpcTimeout() * 2;
+
+ if (i == 500)
- SystemTable.saveTruncationRecord(Table.open("Keyspace1").getColumnFamilyStore("Standard2"),
- timestamp,
- ReplayPosition.NONE);
++ SystemKeyspace.saveTruncationRecord(Keyspace.open("Keyspace1").getColumnFamilyStore("Standard2"),
++ timestamp,
++ ReplayPosition.NONE);
+
+ // Adjust the timestamp (slightly) to make the test deterministic.
+ if (i >= 500)
+ timestamp++;
+ else
+ timestamp--;
+
+ BatchlogManager.getBatchlogMutationFor(mutations, UUIDGen.getTimeUUID(), timestamp * 1000).apply();
+ }
+
+ // Flush the batchlog to disk (see CASSANDRA-6822).
- Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.BATCHLOG_CF).forceFlush();
++ Keyspace.open(Keyspace.SYSTEM_KS).getColumnFamilyStore(SystemKeyspace.BATCHLOG_CF).forceFlush();
+
+ // Force batchlog replay.
+ BatchlogManager.instance.replayAllFailedBatches();
+
+ // We should see half of Standard2-targeted mutations written after the replay and all of Standard3 mutations applied.
+ for (int i = 0; i < 1000; i++)
+ {
+ UntypedResultSet result = QueryProcessor.processInternal(String.format("SELECT * FROM \"Keyspace1\".\"Standard2\" WHERE key = intAsBlob(%d)", i));
+ if (i >= 500)
+ {
+ assertEquals(bytes(i), result.one().getBytes("key"));
+ assertEquals(bytes(i), result.one().getBytes("column1"));
+ assertEquals(bytes(i), result.one().getBytes("value"));
+ }
+ else
+ {
+ assertTrue(result.isEmpty());
+ }
+ }
+
+ for (int i = 0; i < 1000; i++)
+ {
+ UntypedResultSet result = QueryProcessor.processInternal(String.format("SELECT * FROM \"Keyspace1\".\"Standard3\" WHERE key = intAsBlob(%d)", i));
+ assertEquals(bytes(i), result.one().getBytes("key"));
+ assertEquals(bytes(i), result.one().getBytes("column1"));
+ assertEquals(bytes(i), result.one().getBytes("value"));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/384de4b8/test/unit/org/apache/cassandra/db/HintedHandOffTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/HintedHandOffTest.java
index ab5bc23,7b4a736..9ffd702
--- a/test/unit/org/apache/cassandra/db/HintedHandOffTest.java
+++ b/test/unit/org/apache/cassandra/db/HintedHandOffTest.java
@@@ -1,4 -1,4 +1,3 @@@
--package org.apache.cassandra.db;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@@ -19,7 -19,7 +18,7 @@@
* under the License.
*
*/
--
++package org.apache.cassandra.db;
import java.net.InetAddress;
import java.util.Map;
@@@ -28,6 -28,6 +27,8 @@@ import java.util.concurrent.TimeUnit
import org.junit.Test;
++import com.google.common.collect.Iterators;
++
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.compaction.CompactionManager;
@@@ -37,9 -38,9 +38,7 @@@ import org.apache.cassandra.db.marshal.
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
--import com.google.common.collect.Iterators;
--
-import static junit.framework.Assert.assertEquals;
+import static org.junit.Assert.assertEquals;
import static org.apache.cassandra.cql3.QueryProcessor.processInternal;
public class HintedHandOffTest extends SchemaLoader
@@@ -62,10 -63,14 +61,14 @@@
hintStore.disableAutoCompaction();
// insert 1 hint
- RowMutation rm = new RowMutation(TABLE4, ByteBufferUtil.bytes(1));
- rm.add(new QueryPath(STANDARD1_CF,
- null,
- ByteBufferUtil.bytes(String.valueOf(COLUMN1))),
- ByteBufferUtil.EMPTY_BYTE_BUFFER,
- System.currentTimeMillis());
+ RowMutation rm = new RowMutation(KEYSPACE4, ByteBufferUtil.bytes(1));
+ rm.add(STANDARD1_CF, ByteBufferUtil.bytes(String.valueOf(COLUMN1)), ByteBufferUtil.EMPTY_BYTE_BUFFER, System.currentTimeMillis());
- HintedHandOffManager.instance.hintFor(rm, HintedHandOffManager.calculateHintTTL(rm), UUID.randomUUID()).apply();
- rm.toHint(System.currentTimeMillis(), rm.calculateHintTTL(), UUID.randomUUID()).apply();
++ HintedHandOffManager.instance.hintFor(rm,
++ System.currentTimeMillis(),
++ HintedHandOffManager.calculateHintTTL(rm),
++ UUID.randomUUID())
++ .apply();
// flush data to disk
hintStore.forceBlockingFlush();
@@@ -92,36 -97,4 +95,40 @@@
Map<UUID, Integer> returned = rows.one().getMap("hints_dropped", UUIDType.instance, Int32Type.instance);
assertEquals(Iterators.getLast(returned.values().iterator()).intValue(), 99);
}
+
+ @Test(timeout = 5000)
+ public void testTruncateHints() throws Exception
+ {
+ Keyspace systemKeyspace = Keyspace.open("system");
+ ColumnFamilyStore hintStore = systemKeyspace.getColumnFamilyStore(SystemKeyspace.HINTS_CF);
+ hintStore.clearUnsafe();
+
+ // insert 1 hint
+ RowMutation rm = new RowMutation(KEYSPACE4, ByteBufferUtil.bytes(1));
+ rm.add(STANDARD1_CF, ByteBufferUtil.bytes(String.valueOf(COLUMN1)), ByteBufferUtil.EMPTY_BYTE_BUFFER, System.currentTimeMillis());
+
- HintedHandOffManager.instance.hintFor(rm, HintedHandOffManager.calculateHintTTL(rm), UUID.randomUUID()).apply();
++ HintedHandOffManager.instance.hintFor(rm,
++ System.currentTimeMillis(),
++ HintedHandOffManager.calculateHintTTL(rm),
++ UUID.randomUUID())
++ .apply();
+
+ assert getNoOfHints() == 1;
+
+ HintedHandOffManager.instance.truncateAllHints();
+
+ while(getNoOfHints() > 0)
+ {
+ Thread.sleep(100);
+ }
+
+ assert getNoOfHints() == 0;
+ }
+
+ private int getNoOfHints()
+ {
+ String req = "SELECT * FROM system.%s";
+ UntypedResultSet resultSet = processInternal(String.format(req, SystemKeyspace.HINTS_CF));
+ return resultSet.size();
+ }
}