You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2018/11/29 18:17:49 UTC

cassandra git commit: In mixed 3.x/4 version clusters write tracing and repair history information without new columns

Repository: cassandra
Updated Branches:
  refs/heads/trunk d8c549b68 -> 1c8d0ad33


In mixed 3.x/4 version clusters write tracing and repair history information without new columns

Patch by Ariel Weisberg; Reviewed by Jason Brown for CASSANDRA-14897


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1c8d0ad3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1c8d0ad3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1c8d0ad3

Branch: refs/heads/trunk
Commit: 1c8d0ad333c642405537150fed2cbb8623a8fe94
Parents: d8c549b
Author: Ariel Weisberg <aw...@apple.com>
Authored: Wed Nov 28 18:44:06 2018 -0500
Committer: Ariel Weisberg <aw...@apple.com>
Committed: Thu Nov 29 13:16:36 2018 -0500

----------------------------------------------------------------------
 NEWS.txt                                        |  8 +--
 .../repair/SystemDistributedKeyspace.java       | 57 +++++++++++---------
 .../apache/cassandra/tracing/TraceKeyspace.java | 27 +++++-----
 .../cassandra/tracing/TraceStateImpl.java       | 10 ----
 4 files changed, 52 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c8d0ad3/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index af28b6e..5cd8542 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -112,11 +112,11 @@ Upgrading
       64kb to 16kb. For highly compressible data this can have a noticeable impact
       on space utilization. You may want to consider manually specifying this value.
     - Additional columns have been added to system_distributed.repair_history,
-      system_traces.sessions and system_traces.events. As a result select * queries
+      system_traces.sessions and system_traces.events. As a result select queries
       againsts these tables will fail and generate an error in the log
-      during upgrade when the cluster is mixed version. Additionally these
-      tables will not be written to if repair or tracing occurs until
-      the entire cluster is upgraded and there are no 3.X version nodes in Gossip.
+      during upgrade when the cluster is mixed version. The tables can be made
+      readable by following the instructions in CASSANDRA-14897 to add the
+      new columns to the system tables before upgrading.
     - Timestamp ties between values resolve differently: if either value has a TTL,
       this value always wins. This is to provide consistent reconciliation before
       and after the value expires into a tombstone.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c8d0ad3/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
index d4b7259..645694e 100644
--- a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
+++ b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
@@ -190,8 +190,7 @@ public final class SystemDistributedKeyspace
     {
         //Don't record repair history if an upgrade is in progress as version 3 nodes generates errors
         //due to schema differences
-        if (Gossiper.instance.haveMajorVersion3Nodes())
-            return;
+        boolean includeNewColumns = !Gossiper.instance.haveMajorVersion3Nodes();
 
         InetAddressAndPort coordinator = FBUtilities.getBroadcastAddressAndPort();
         Set<String> participants = Sets.newHashSet();
@@ -206,23 +205,43 @@ public final class SystemDistributedKeyspace
         String query =
                 "INSERT INTO %s.%s (keyspace_name, columnfamily_name, id, parent_id, range_begin, range_end, coordinator, coordinator_port, participants, participants_v2, status, started_at) " +
                         "VALUES (   '%s',          '%s',              %s, %s,        '%s',        '%s',      '%s',        %d,               { '%s' },     { '%s' },        '%s',   toTimestamp(now()))";
+        String queryWithoutNewColumns =
+                "INSERT INTO %s.%s (keyspace_name, columnfamily_name, id, parent_id, range_begin, range_end, coordinator, participants, status, started_at) " +
+                        "VALUES (   '%s',          '%s',              %s, %s,        '%s',        '%s',      '%s',               { '%s' },        '%s',   toTimestamp(now()))";
 
         for (String cfname : cfnames)
         {
             for (Range<Token> range : commonRange.ranges)
             {
-                String fmtQry = format(query, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, REPAIR_HISTORY,
-                                              keyspaceName,
-                                              cfname,
-                                              id.toString(),
-                                              parent_id.toString(),
-                                              range.left.toString(),
-                                              range.right.toString(),
-                                              coordinator.getHostAddress(false),
-                                              coordinator.port,
-                                              Joiner.on("', '").join(participants),
-                                              Joiner.on("', '").join(participants_v2),
-                                              RepairState.STARTED.toString());
+                String fmtQry;
+                if (includeNewColumns)
+                {
+                    fmtQry = format(query, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, REPAIR_HISTORY,
+                                    keyspaceName,
+                                    cfname,
+                                    id.toString(),
+                                    parent_id.toString(),
+                                    range.left.toString(),
+                                    range.right.toString(),
+                                    coordinator.getHostAddress(false),
+                                    coordinator.port,
+                                    Joiner.on("', '").join(participants),
+                                    Joiner.on("', '").join(participants_v2),
+                                    RepairState.STARTED.toString());
+                }
+                else
+                {
+                    fmtQry = format(queryWithoutNewColumns, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, REPAIR_HISTORY,
+                                    keyspaceName,
+                                    cfname,
+                                    id.toString(),
+                                    parent_id.toString(),
+                                    range.left.toString(),
+                                    range.right.toString(),
+                                    coordinator.getHostAddress(false),
+                                    Joiner.on("', '").join(participants),
+                                    RepairState.STARTED.toString());
+                }
                 processSilent(fmtQry);
             }
         }
@@ -236,11 +255,6 @@ public final class SystemDistributedKeyspace
 
     public static void successfulRepairJob(UUID id, String keyspaceName, String cfname)
     {
-        //Don't record repair history if an upgrade is in progress as version 3 nodes generates errors
-        //due to schema differences
-        if (Gossiper.instance.haveMajorVersion3Nodes())
-            return;
-
         String query = "UPDATE %s.%s SET status = '%s', finished_at = toTimestamp(now()) WHERE keyspace_name = '%s' AND columnfamily_name = '%s' AND id = %s";
         String fmtQuery = format(query, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, REPAIR_HISTORY,
                                         RepairState.SUCCESS.toString(),
@@ -252,11 +266,6 @@ public final class SystemDistributedKeyspace
 
     public static void failedRepairJob(UUID id, String keyspaceName, String cfname, Throwable t)
     {
-        //Don't record repair history if an upgrade is in progress as version 3 nodes generates errors
-        //due to schema differences
-        if (Gossiper.instance.haveMajorVersion3Nodes())
-            return;
-
         String query = "UPDATE %s.%s SET status = '%s', finished_at = toTimestamp(now()), exception_message=?, exception_stacktrace=? WHERE keyspace_name = '%s' AND columnfamily_name = '%s' AND id = %s";
         StringWriter sw = new StringWriter();
         PrintWriter pw = new PrintWriter(sw);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c8d0ad3/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/TraceKeyspace.java b/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
index 6477d8c..f3f6f42 100644
--- a/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
+++ b/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
@@ -26,6 +26,7 @@ import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.SchemaConstants;
@@ -98,15 +99,16 @@ public final class TraceKeyspace
                                              int ttl)
     {
         PartitionUpdate.SimpleBuilder builder = PartitionUpdate.simpleBuilder(Sessions, sessionId);
-        builder.row()
-               .ttl(ttl)
-               .add("client", client)
-               .add("coordinator", FBUtilities.getBroadcastAddressAndPort().address)
-               .add("coordinator_port", FBUtilities.getBroadcastAddressAndPort().port)
-               .add("request", request)
-               .add("started_at", new Date(startedAt))
-               .add("command", command)
-               .appendAll("parameters", parameters);
+        Row.SimpleBuilder rb = builder.row();
+        rb.ttl(ttl)
+          .add("client", client)
+          .add("coordinator", FBUtilities.getBroadcastAddressAndPort().address);
+        if (!Gossiper.instance.haveMajorVersion3Nodes())
+            rb.add("coordinator_port", FBUtilities.getBroadcastAddressAndPort().port);
+        rb.add("request", request)
+          .add("started_at", new Date(startedAt))
+          .add("command", command)
+          .appendAll("parameters", parameters);
 
         return builder.buildAsMutation();
     }
@@ -127,9 +129,10 @@ public final class TraceKeyspace
                                               .ttl(ttl);
 
         rowBuilder.add("activity", message)
-                  .add("source", FBUtilities.getBroadcastAddressAndPort().address)
-                  .add("source_port", FBUtilities.getBroadcastAddressAndPort().port)
-                  .add("thread", threadName);
+                  .add("source", FBUtilities.getBroadcastAddressAndPort().address);
+        if (!Gossiper.instance.haveMajorVersion3Nodes())
+            rowBuilder.add("source_port", FBUtilities.getBroadcastAddressAndPort().port);
+        rowBuilder.add("thread", threadName);
 
         if (elapsed >= 0)
             rowBuilder.add("source_elapsed", elapsed);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c8d0ad3/src/java/org/apache/cassandra/tracing/TraceStateImpl.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/TraceStateImpl.java b/src/java/org/apache/cassandra/tracing/TraceStateImpl.java
index e16f778..aa9f8a8 100644
--- a/src/java/org/apache/cassandra/tracing/TraceStateImpl.java
+++ b/src/java/org/apache/cassandra/tracing/TraceStateImpl.java
@@ -103,11 +103,6 @@ public class TraceStateImpl extends TraceState
 
     void executeMutation(final Mutation mutation)
     {
-        //Don't record trace state if an upgrade is in progress as version 3 nodes generates errors
-        //due to schema differences
-        if (Gossiper.instance.haveMajorVersion3Nodes())
-            return;
-
         CompletableFuture<Void> fut = CompletableFuture.runAsync(new WrappedRunnable()
         {
             protected void runMayThrow()
@@ -123,11 +118,6 @@ public class TraceStateImpl extends TraceState
 
     static void mutateWithCatch(Mutation mutation)
     {
-        //Don't record trace state if an upgrade is in progress as version 3 nodes generates errors
-        //due to schema differences
-        if (Gossiper.instance.haveMajorVersion3Nodes())
-            return;
-
         try
         {
             StorageProxy.mutate(Collections.singletonList(mutation), ConsistencyLevel.ANY, System.nanoTime());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org