You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2018/11/29 18:17:49 UTC
cassandra git commit: In mixed 3.x/4 version clusters write tracing
and repair history information without new columns
Repository: cassandra
Updated Branches:
refs/heads/trunk d8c549b68 -> 1c8d0ad33
In mixed 3.x/4 version clusters write tracing and repair history information without new columns
Patch by Ariel Weisberg; Reviewed by Jason Brown for CASSANDRA-14897
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1c8d0ad3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1c8d0ad3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1c8d0ad3
Branch: refs/heads/trunk
Commit: 1c8d0ad333c642405537150fed2cbb8623a8fe94
Parents: d8c549b
Author: Ariel Weisberg <aw...@apple.com>
Authored: Wed Nov 28 18:44:06 2018 -0500
Committer: Ariel Weisberg <aw...@apple.com>
Committed: Thu Nov 29 13:16:36 2018 -0500
----------------------------------------------------------------------
NEWS.txt | 8 +--
.../repair/SystemDistributedKeyspace.java | 57 +++++++++++---------
.../apache/cassandra/tracing/TraceKeyspace.java | 27 +++++-----
.../cassandra/tracing/TraceStateImpl.java | 10 ----
4 files changed, 52 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c8d0ad3/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index af28b6e..5cd8542 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -112,11 +112,11 @@ Upgrading
64kb to 16kb. For highly compressible data this can have a noticeable impact
on space utilization. You may want to consider manually specifying this value.
- Additional columns have been added to system_distributed.repair_history,
- system_traces.sessions and system_traces.events. As a result select * queries
+ system_traces.sessions and system_traces.events. As a result select queries
againsts these tables will fail and generate an error in the log
- during upgrade when the cluster is mixed version. Additionally these
- tables will not be written to if repair or tracing occurs until
- the entire cluster is upgraded and there are no 3.X version nodes in Gossip.
+ during upgrade when the cluster is mixed version. The tables can be made
+ readable by following the instructions in CASSANDRA-14897 to add the
+ new columns to the system tables before upgrading.
- Timestamp ties between values resolve differently: if either value has a TTL,
this value always wins. This is to provide consistent reconciliation before
and after the value expires into a tombstone.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c8d0ad3/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
index d4b7259..645694e 100644
--- a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
+++ b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
@@ -190,8 +190,7 @@ public final class SystemDistributedKeyspace
{
//Don't record repair history if an upgrade is in progress as version 3 nodes generates errors
//due to schema differences
- if (Gossiper.instance.haveMajorVersion3Nodes())
- return;
+ boolean includeNewColumns = !Gossiper.instance.haveMajorVersion3Nodes();
InetAddressAndPort coordinator = FBUtilities.getBroadcastAddressAndPort();
Set<String> participants = Sets.newHashSet();
@@ -206,23 +205,43 @@ public final class SystemDistributedKeyspace
String query =
"INSERT INTO %s.%s (keyspace_name, columnfamily_name, id, parent_id, range_begin, range_end, coordinator, coordinator_port, participants, participants_v2, status, started_at) " +
"VALUES ( '%s', '%s', %s, %s, '%s', '%s', '%s', %d, { '%s' }, { '%s' }, '%s', toTimestamp(now()))";
+ String queryWithoutNewColumns =
+ "INSERT INTO %s.%s (keyspace_name, columnfamily_name, id, parent_id, range_begin, range_end, coordinator, participants, status, started_at) " +
+ "VALUES ( '%s', '%s', %s, %s, '%s', '%s', '%s', { '%s' }, '%s', toTimestamp(now()))";
for (String cfname : cfnames)
{
for (Range<Token> range : commonRange.ranges)
{
- String fmtQry = format(query, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, REPAIR_HISTORY,
- keyspaceName,
- cfname,
- id.toString(),
- parent_id.toString(),
- range.left.toString(),
- range.right.toString(),
- coordinator.getHostAddress(false),
- coordinator.port,
- Joiner.on("', '").join(participants),
- Joiner.on("', '").join(participants_v2),
- RepairState.STARTED.toString());
+ String fmtQry;
+ if (includeNewColumns)
+ {
+ fmtQry = format(query, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, REPAIR_HISTORY,
+ keyspaceName,
+ cfname,
+ id.toString(),
+ parent_id.toString(),
+ range.left.toString(),
+ range.right.toString(),
+ coordinator.getHostAddress(false),
+ coordinator.port,
+ Joiner.on("', '").join(participants),
+ Joiner.on("', '").join(participants_v2),
+ RepairState.STARTED.toString());
+ }
+ else
+ {
+ fmtQry = format(queryWithoutNewColumns, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, REPAIR_HISTORY,
+ keyspaceName,
+ cfname,
+ id.toString(),
+ parent_id.toString(),
+ range.left.toString(),
+ range.right.toString(),
+ coordinator.getHostAddress(false),
+ Joiner.on("', '").join(participants),
+ RepairState.STARTED.toString());
+ }
processSilent(fmtQry);
}
}
@@ -236,11 +255,6 @@ public final class SystemDistributedKeyspace
public static void successfulRepairJob(UUID id, String keyspaceName, String cfname)
{
- //Don't record repair history if an upgrade is in progress as version 3 nodes generates errors
- //due to schema differences
- if (Gossiper.instance.haveMajorVersion3Nodes())
- return;
-
String query = "UPDATE %s.%s SET status = '%s', finished_at = toTimestamp(now()) WHERE keyspace_name = '%s' AND columnfamily_name = '%s' AND id = %s";
String fmtQuery = format(query, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, REPAIR_HISTORY,
RepairState.SUCCESS.toString(),
@@ -252,11 +266,6 @@ public final class SystemDistributedKeyspace
public static void failedRepairJob(UUID id, String keyspaceName, String cfname, Throwable t)
{
- //Don't record repair history if an upgrade is in progress as version 3 nodes generates errors
- //due to schema differences
- if (Gossiper.instance.haveMajorVersion3Nodes())
- return;
-
String query = "UPDATE %s.%s SET status = '%s', finished_at = toTimestamp(now()), exception_message=?, exception_stacktrace=? WHERE keyspace_name = '%s' AND columnfamily_name = '%s' AND id = %s";
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c8d0ad3/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/TraceKeyspace.java b/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
index 6477d8c..f3f6f42 100644
--- a/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
+++ b/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
@@ -26,6 +26,7 @@ import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.schema.KeyspaceMetadata;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.schema.SchemaConstants;
@@ -98,15 +99,16 @@ public final class TraceKeyspace
int ttl)
{
PartitionUpdate.SimpleBuilder builder = PartitionUpdate.simpleBuilder(Sessions, sessionId);
- builder.row()
- .ttl(ttl)
- .add("client", client)
- .add("coordinator", FBUtilities.getBroadcastAddressAndPort().address)
- .add("coordinator_port", FBUtilities.getBroadcastAddressAndPort().port)
- .add("request", request)
- .add("started_at", new Date(startedAt))
- .add("command", command)
- .appendAll("parameters", parameters);
+ Row.SimpleBuilder rb = builder.row();
+ rb.ttl(ttl)
+ .add("client", client)
+ .add("coordinator", FBUtilities.getBroadcastAddressAndPort().address);
+ if (!Gossiper.instance.haveMajorVersion3Nodes())
+ rb.add("coordinator_port", FBUtilities.getBroadcastAddressAndPort().port);
+ rb.add("request", request)
+ .add("started_at", new Date(startedAt))
+ .add("command", command)
+ .appendAll("parameters", parameters);
return builder.buildAsMutation();
}
@@ -127,9 +129,10 @@ public final class TraceKeyspace
.ttl(ttl);
rowBuilder.add("activity", message)
- .add("source", FBUtilities.getBroadcastAddressAndPort().address)
- .add("source_port", FBUtilities.getBroadcastAddressAndPort().port)
- .add("thread", threadName);
+ .add("source", FBUtilities.getBroadcastAddressAndPort().address);
+ if (!Gossiper.instance.haveMajorVersion3Nodes())
+ rowBuilder.add("source_port", FBUtilities.getBroadcastAddressAndPort().port);
+ rowBuilder.add("thread", threadName);
if (elapsed >= 0)
rowBuilder.add("source_elapsed", elapsed);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c8d0ad3/src/java/org/apache/cassandra/tracing/TraceStateImpl.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/TraceStateImpl.java b/src/java/org/apache/cassandra/tracing/TraceStateImpl.java
index e16f778..aa9f8a8 100644
--- a/src/java/org/apache/cassandra/tracing/TraceStateImpl.java
+++ b/src/java/org/apache/cassandra/tracing/TraceStateImpl.java
@@ -103,11 +103,6 @@ public class TraceStateImpl extends TraceState
void executeMutation(final Mutation mutation)
{
- //Don't record trace state if an upgrade is in progress as version 3 nodes generates errors
- //due to schema differences
- if (Gossiper.instance.haveMajorVersion3Nodes())
- return;
-
CompletableFuture<Void> fut = CompletableFuture.runAsync(new WrappedRunnable()
{
protected void runMayThrow()
@@ -123,11 +118,6 @@ public class TraceStateImpl extends TraceState
static void mutateWithCatch(Mutation mutation)
{
- //Don't record trace state if an upgrade is in progress as version 3 nodes generates errors
- //due to schema differences
- if (Gossiper.instance.haveMajorVersion3Nodes())
- return;
-
try
{
StorageProxy.mutate(Collections.singletonList(mutation), ConsistencyLevel.ANY, System.nanoTime());
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org