You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/06/28 13:22:02 UTC
[12/17] cassandra git commit: Merge commit
'6555a87bde4daeb8bd5d9558595a367ec6bc061d' into cassandra-3.0
Merge commit '6555a87bde4daeb8bd5d9558595a367ec6bc061d' into cassandra-3.0
* commit '6555a87bde4daeb8bd5d9558595a367ec6bc061d':
Avoid stalling Paxos when the paxos state expires
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/70059726
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/70059726
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/70059726
Branch: refs/heads/cassandra-3.9
Commit: 70059726f08a98ea21af91ce3855bf62f6f4b652
Parents: cb4540e 6555a87
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Tue Jun 28 15:19:08 2016 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Tue Jun 28 15:19:57 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/cql3/QueryProcessor.java | 17 ++++++++++++++++-
.../cql3/statements/SelectStatement.java | 6 +++++-
.../org/apache/cassandra/db/SystemKeyspace.java | 12 ++++++------
.../apache/cassandra/service/StorageProxy.java | 4 +++-
.../cassandra/service/paxos/PaxosState.java | 11 +++++++++--
.../cassandra/service/paxos/PrepareCallback.java | 18 +++++++++++++++++-
src/java/org/apache/cassandra/utils/UUIDGen.java | 11 +++++++++++
8 files changed, 68 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/70059726/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 314a93e,9f42d98..aaeafd6
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -9,38 -2,9 +9,39 @@@ Merged from 2.2
* Allow nodetool info to run with readonly JMX access (CASSANDRA-11755)
* Validate bloom_filter_fp_chance against lowest supported
value when the table is created (CASSANDRA-11920)
- * RandomAccessReader: call isEOF() only when rebuffering, not for every read operation (CASSANDRA-12013)
* Don't send erroneous NEW_NODE notifications on restart (CASSANDRA-11038)
* StorageService shutdown hook should use a volatile variable (CASSANDRA-11984)
+Merged from 2.1:
++ * Avoid stalling paxos when the paxos state expires (CASSANDRA-12043)
+ * Remove finished incoming streaming connections from MessagingService (CASSANDRA-11854)
+ * Don't try to get sstables for non-repairing column families (CASSANDRA-12077)
+ * Avoid marking too many sstables as repaired (CASSANDRA-11696)
+ * Prevent select statements with clustering key > 64k (CASSANDRA-11882)
+ * Fix clock skew corrupting other nodes with paxos (CASSANDRA-11991)
+ * Remove distinction between non-existing static columns and existing but null in LWTs (CASSANDRA-9842)
+ * Cache local ranges when calculating repair neighbors (CASSANDRA-11934)
+ * Allow LWT operation on static column with only partition keys (CASSANDRA-10532)
+ * Create interval tree over canonical sstables to avoid missing sstables during streaming (CASSANDRA-11886)
+ * cqlsh COPY FROM: shutdown parent cluster after forking, to avoid corrupting SSL connections (CASSANDRA-11749)
+
+
+3.0.7
+ * Fix legacy serialization of Thrift-generated non-compound range tombstones
+ when communicating with 2.x nodes (CASSANDRA-11930)
+ * Fix Directories instantiations where CFS.initialDirectories should be used (CASSANDRA-11849)
+ * Avoid referencing DatabaseDescriptor in AbstractType (CASSANDRA-11912)
+ * Fix sstables not being protected from removal during index build (CASSANDRA-11905)
+ * cqlsh: Suppress stack trace from Read/WriteFailures (CASSANDRA-11032)
+ * Remove unneeded code to repair index summaries that have
+ been improperly down-sampled (CASSANDRA-11127)
+ * Avoid WriteTimeoutExceptions during commit log replay due to materialized
+ view lock contention (CASSANDRA-11891)
+ * Prevent OOM failures on SSTable corruption, improve tests for corruption detection (CASSANDRA-9530)
+ * Use CFS.initialDirectories when clearing snapshots (CASSANDRA-11705)
+ * Allow compaction strategies to disable early open (CASSANDRA-11754)
+ * Refactor Materialized View code (CASSANDRA-11475)
+ * Update Java Driver (CASSANDRA-11615)
+Merged from 2.2:
* Persist local metadata earlier in startup sequence (CASSANDRA-11742)
* Run CommitLog tests with different compression settings (CASSANDRA-9039)
* cqlsh: fix tab completion for case-sensitive identifiers (CASSANDRA-11664)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/70059726/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/QueryProcessor.java
index da146ef,c702679..af94d3e
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@@ -273,10 -288,10 +273,10 @@@ public class QueryProcessor implements
AbstractType type = prepared.boundNames.get(i).type;
boundValues.add(value instanceof ByteBuffer || value == null ? (ByteBuffer)value : type.decompose(value));
}
- return QueryOptions.forInternalCalls(boundValues);
+ return QueryOptions.forInternalCalls(cl, boundValues);
}
- private static ParsedStatement.Prepared prepareInternal(String query) throws RequestValidationException
+ public static ParsedStatement.Prepared prepareInternal(String query) throws RequestValidationException
{
ParsedStatement.Prepared prepared = internalStatements.get(query);
if (prepared != null)
@@@ -343,19 -340,42 +343,34 @@@
return null;
}
+ /**
+ * A special version of executeInternal that takes the time used as "now" for the query in argument.
+ * Note that this only make sense for Selects so this only accept SELECT statements and is only useful in rare
+ * cases.
+ */
- public static UntypedResultSet executeInternalWithNow(long now, String query, Object... values)
++ public static UntypedResultSet executeInternalWithNow(int nowInSec, String query, Object... values)
+ {
- try
- {
- ParsedStatement.Prepared prepared = prepareInternal(query);
- assert prepared.statement instanceof SelectStatement;
- SelectStatement select = (SelectStatement)prepared.statement;
- ResultMessage result = select.executeInternal(internalQueryState(), makeInternalOptions(prepared, values), now);
- assert result instanceof ResultMessage.Rows;
- return UntypedResultSet.create(((ResultMessage.Rows)result).result);
- }
- catch (RequestExecutionException e)
- {
- throw new RuntimeException(e);
- }
- catch (RequestValidationException e)
- {
- throw new RuntimeException("Error validating query " + query, e);
- }
++ ParsedStatement.Prepared prepared = prepareInternal(query);
++ assert prepared.statement instanceof SelectStatement;
++ SelectStatement select = (SelectStatement)prepared.statement;
++ ResultMessage result = select.executeInternal(internalQueryState(), makeInternalOptions(prepared, values), nowInSec);
++ assert result instanceof ResultMessage.Rows;
++ return UntypedResultSet.create(((ResultMessage.Rows)result).result);
+ }
+
- public static UntypedResultSet resultify(String query, Row row)
+ public static UntypedResultSet resultify(String query, RowIterator partition)
{
- return resultify(query, Collections.singletonList(row));
+ return resultify(query, PartitionIterators.singletonIterator(partition));
}
- public static UntypedResultSet resultify(String query, List<Row> rows)
+ public static UntypedResultSet resultify(String query, PartitionIterator partitions)
{
- SelectStatement ss = (SelectStatement) getStatement(query, null).statement;
- ResultSet cqlRows = ss.process(rows);
- return UntypedResultSet.create(cqlRows);
+ try (PartitionIterator iter = partitions)
+ {
+ SelectStatement ss = (SelectStatement) getStatement(query, null).statement;
+ ResultSet cqlRows = ss.process(iter, FBUtilities.nowInSeconds());
+ return UntypedResultSet.create(cqlRows);
+ }
}
public ResultMessage.Prepared prepare(String query,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/70059726/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 0e33475,8820ff7..aca6146
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@@ -401,33 -280,45 +401,37 @@@ public class SelectStatement implement
return new ResultMessage.Rows(rset);
}
- static List<Row> readLocally(String keyspaceName, List<ReadCommand> cmds)
- {
- Keyspace keyspace = Keyspace.open(keyspaceName);
- List<Row> rows = new ArrayList<Row>(cmds.size());
- for (ReadCommand cmd : cmds)
- rows.add(cmd.getRow(keyspace));
- return rows;
- }
-
public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException
{
- int nowInSec = FBUtilities.nowInSeconds();
- return executeInternal(state, options, System.currentTimeMillis());
++ return executeInternal(state, options, FBUtilities.nowInSeconds());
+ }
+
- public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options, long now) throws RequestExecutionException, RequestValidationException
++ public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options, int nowInSec) throws RequestExecutionException, RequestValidationException
+ {
- int limit = getLimit(options);
- Pageable command = getPageableCommand(options, limit, now);
+ int userLimit = getLimit(options);
+ ReadQuery query = getQuery(options, nowInSec, userLimit);
int pageSize = getPageSize(options);
- if (pageSize <= 0 || command == null || !QueryPagers.mayNeedPaging(command, pageSize))
+ try (ReadOrderGroup orderGroup = query.startOrderGroup())
{
- List<Row> rows = command == null
- ? Collections.<Row>emptyList()
- : (command instanceof Pageable.ReadCommands
- ? readLocally(keyspace(), ((Pageable.ReadCommands)command).commands)
- : ((RangeSliceCommand)command).executeLocally());
-
- return processResults(rows, options, limit, now);
+ if (pageSize <= 0 || query.limits().count() <= pageSize)
+ {
+ try (PartitionIterator data = query.executeInternal(orderGroup))
+ {
+ return processResults(data, options, nowInSec, userLimit);
+ }
+ }
+ else
+ {
+ QueryPager pager = query.getPager(options.getPagingState(), options.getProtocolVersion());
+ return execute(Pager.forInternalQuery(pager, orderGroup), options, pageSize, nowInSec, userLimit);
+ }
}
-
- QueryPager pager = QueryPagers.localPager(command);
- return execute(pager, options, limit, now, pageSize);
}
- public ResultSet process(List<Row> rows) throws InvalidRequestException
+ public ResultSet process(PartitionIterator partitions, int nowInSec) throws InvalidRequestException
{
- QueryOptions options = QueryOptions.DEFAULT;
- return process(rows, options, getLimit(options), System.currentTimeMillis());
+ return process(partitions, QueryOptions.DEFAULT, nowInSec, getLimit(QueryOptions.DEFAULT));
}
public String keyspace()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/70059726/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/SystemKeyspace.java
index c5c6abe,e0d5f66..da96b38
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@@ -1082,10 -894,10 +1082,10 @@@ public final class SystemKeyspac
return null;
}
- public static PaxosState loadPaxosState(DecoratedKey key, CFMetaData metadata)
- public static PaxosState loadPaxosState(ByteBuffer key, CFMetaData metadata, long now)
++ public static PaxosState loadPaxosState(DecoratedKey key, CFMetaData metadata, int nowInSec)
{
String req = "SELECT * FROM system.%s WHERE row_key = ? AND cf_id = ?";
- UntypedResultSet results = executeInternal(String.format(req, PAXOS), key.getKey(), metadata.cfId);
- UntypedResultSet results = QueryProcessor.executeInternalWithNow(now, String.format(req, PAXOS), key, metadata.cfId);
++ UntypedResultSet results = QueryProcessor.executeInternalWithNow(nowInSec, String.format(req, PAXOS), key.getKey(), metadata.cfId);
if (results.isEmpty())
return new PaxosState(key, metadata);
UntypedResultSet.Row row = results.one();
@@@ -1110,43 -920,41 +1110,43 @@@
String req = "UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET in_progress_ballot = ? WHERE row_key = ? AND cf_id = ?";
executeInternal(String.format(req, PAXOS),
UUIDGen.microsTimestamp(promise.ballot),
- paxosTtl(promise.update.metadata()),
- paxosTtl(promise.update.metadata),
++ paxosTtlSec(promise.update.metadata()),
promise.ballot,
- promise.key,
- promise.update.id());
+ promise.update.partitionKey().getKey(),
+ promise.update.metadata().cfId);
}
public static void savePaxosProposal(Commit proposal)
{
- executeInternal(String.format("UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET proposal_ballot = ?, proposal = ? WHERE row_key = ? AND cf_id = ?", PAXOS),
+ executeInternal(String.format("UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET proposal_ballot = ?, proposal = ?, proposal_version = ? WHERE row_key = ? AND cf_id = ?", PAXOS),
UUIDGen.microsTimestamp(proposal.ballot),
- paxosTtl(proposal.update.metadata()),
- paxosTtl(proposal.update.metadata),
++ paxosTtlSec(proposal.update.metadata()),
proposal.ballot,
- proposal.update.toBytes(),
- proposal.key,
- proposal.update.id());
+ PartitionUpdate.toBytes(proposal.update, MessagingService.current_version),
+ MessagingService.current_version,
+ proposal.update.partitionKey().getKey(),
+ proposal.update.metadata().cfId);
}
- private static int paxosTtl(CFMetaData metadata)
- public static int paxosTtl(CFMetaData metadata)
++ public static int paxosTtlSec(CFMetaData metadata)
{
// keep paxos state around for at least 3h
- return Math.max(3 * 3600, metadata.getGcGraceSeconds());
+ return Math.max(3 * 3600, metadata.params.gcGraceSeconds);
}
public static void savePaxosCommit(Commit commit)
{
// We always erase the last proposal (with the commit timestamp to no erase more recent proposal in case the commit is old)
// even though that's really just an optimization since SP.beginAndRepairPaxos will exclude accepted proposal older than the mrc.
- String cql = "UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET proposal_ballot = null, proposal = null, most_recent_commit_at = ?, most_recent_commit = ? WHERE row_key = ? AND cf_id = ?";
+ String cql = "UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET proposal_ballot = null, proposal = null, most_recent_commit_at = ?, most_recent_commit = ?, most_recent_commit_version = ? WHERE row_key = ? AND cf_id = ?";
executeInternal(String.format(cql, PAXOS),
UUIDGen.microsTimestamp(commit.ballot),
- paxosTtl(commit.update.metadata()),
- paxosTtl(commit.update.metadata),
++ paxosTtlSec(commit.update.metadata()),
commit.ballot,
- commit.update.toBytes(),
- commit.key,
- commit.update.id());
+ PartitionUpdate.toBytes(commit.update, MessagingService.current_version),
+ MessagingService.current_version,
+ commit.update.partitionKey().getKey(),
+ commit.update.metadata().cfId);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/70059726/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index 34d7c40,03dd209..483da67
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -31,6 -30,6 +31,7 @@@ import javax.management.ObjectName
import com.google.common.base.Predicate;
import com.google.common.cache.CacheLoader;
import com.google.common.collect.*;
++import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
@@@ -439,7 -425,7 +440,8 @@@ public class StorageProxy implements St
// https://issues.apache.org/jira/browse/CASSANDRA-5062?focusedCommentId=13619810&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13619810)
// Since we waited for quorum nodes, if some of them haven't seen the last commit (which may just be a timing issue, but may also
// mean we lost messages), we pro-actively "repair" those nodes, and retry.
- Iterable<InetAddress> missingMRC = summary.replicasMissingMostRecentCommit();
- Iterable<InetAddress> missingMRC = summary.replicasMissingMostRecentCommit(metadata, ballotMicros);
++ int nowInSec = Ints.checkedCast(TimeUnit.MICROSECONDS.toSeconds(ballotMicros));
++ Iterable<InetAddress> missingMRC = summary.replicasMissingMostRecentCommit(metadata, nowInSec);
if (Iterables.size(missingMRC) > 0)
{
Tracing.trace("Repairing replicas that missed the most recent commit");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/70059726/src/java/org/apache/cassandra/service/paxos/PaxosState.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/paxos/PaxosState.java
index 0b3af8f,fde881b..e01f568
--- a/src/java/org/apache/cassandra/service/paxos/PaxosState.java
+++ b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
@@@ -65,7 -63,13 +65,13 @@@ public class PaxosStat
lock.lock();
try
{
- PaxosState state = SystemKeyspace.loadPaxosState(toPrepare.update.partitionKey(), toPrepare.update.metadata());
+ // When preparing, we need to use the same time as "now" (that's the time we use to decide if something
+ // is expired or not) accross nodes otherwise we may have a window where a Most Recent Commit shows up
+ // on some replica and not others during a new proposal (in StorageProxy.beginAndRepairPaxos()), and no
+ // amount of re-submit will fix this (because the node on which the commit has expired will have a
+ // tombstone that hides any re-submit). See CASSANDRA-12043 for details.
- long now = UUIDGen.unixTimestamp(toPrepare.ballot);
- PaxosState state = SystemKeyspace.loadPaxosState(toPrepare.key, toPrepare.update.metadata(), now);
++ int nowInSec = UUIDGen.unixTimestampInSec(toPrepare.ballot);
++ PaxosState state = SystemKeyspace.loadPaxosState(toPrepare.update.partitionKey(), toPrepare.update.metadata(), nowInSec);
if (toPrepare.isAfter(state.promised))
{
Tracing.trace("Promising ballot {}", toPrepare.ballot);
@@@ -100,7 -104,8 +106,8 @@@
lock.lock();
try
{
- PaxosState state = SystemKeyspace.loadPaxosState(proposal.update.partitionKey(), proposal.update.metadata());
- long now = UUIDGen.unixTimestamp(proposal.ballot);
- PaxosState state = SystemKeyspace.loadPaxosState(proposal.key, proposal.update.metadata(), now);
++ int nowInSec = UUIDGen.unixTimestampInSec(proposal.ballot);
++ PaxosState state = SystemKeyspace.loadPaxosState(proposal.update.partitionKey(), proposal.update.metadata(), nowInSec);
if (proposal.hasBallot(state.promised.ballot) || proposal.isAfter(state.promised))
{
Tracing.trace("Accepting proposal {}", proposal);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/70059726/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
index 9c54b01,081f457..ff81803
--- a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
+++ b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
@@@ -87,8 -89,21 +90,21 @@@ public class PrepareCallback extends Ab
latch.countDown();
}
- public Iterable<InetAddress> replicasMissingMostRecentCommit()
- public Iterable<InetAddress> replicasMissingMostRecentCommit(CFMetaData metadata, long now)
++ public Iterable<InetAddress> replicasMissingMostRecentCommit(CFMetaData metadata, int nowInSec)
{
+ // In general, we need every replicas that have answered to the prepare (a quorum) to agree on the MRC (see
+ // coment in StorageProxy.beginAndRepairPaxos(), but basically we need to make sure at least a quorum of nodes
+ // have learn a commit before commit a new one otherwise that previous commit is not guaranteed to have reach a
+ // quorum and further commit may proceed on incomplete information).
+ // However, if that commit is too hold, it may have been expired from some of the replicas paxos table (we don't
+ // keep the paxos state forever or that could grow unchecked), and we could end up in some infinite loop as
+ // explained on CASSANDRA-12043. To avoid that, we ignore a MRC that is too old, i.e. older than the TTL we set
+ // on paxos tables. For such old commit, we rely on hints and repair to ensure the commit has indeed be
+ // propagated to all nodes.
- long paxosTtlMicros = SystemKeyspace.paxosTtl(metadata) * 1000 * 1000;
- if (UUIDGen.microsTimestamp(mostRecentCommit.ballot) + paxosTtlMicros < now)
++ long paxosTtlSec = SystemKeyspace.paxosTtlSec(metadata);
++ if (UUIDGen.unixTimestampInSec(mostRecentCommit.ballot) + paxosTtlSec < nowInSec)
+ return Collections.emptySet();
+
return Iterables.filter(commitsByReplica.keySet(), new Predicate<InetAddress>()
{
public boolean apply(InetAddress inetAddress)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/70059726/src/java/org/apache/cassandra/utils/UUIDGen.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/utils/UUIDGen.java
index a673f05,78b8b57..3efcb5e
--- a/src/java/org/apache/cassandra/utils/UUIDGen.java
+++ b/src/java/org/apache/cassandra/utils/UUIDGen.java
@@@ -25,9 -25,9 +25,11 @@@ import java.security.SecureRandom
import java.util.Collection;
import java.util.Random;
import java.util.UUID;
++import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
++import com.google.common.primitives.Ints;
/**
@@@ -203,6 -203,6 +205,15 @@@ public class UUIDGe
/**
* @param uuid
++ * @return seconds since Unix epoch
++ */
++ public static int unixTimestampInSec(UUID uuid)
++ {
++ return Ints.checkedCast(TimeUnit.MILLISECONDS.toSeconds(unixTimestamp(uuid)));
++ }
++
++ /**
++ * @param uuid
* @return microseconds since Unix epoch
*/
public static long microsTimestamp(UUID uuid)