You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2020/08/19 07:42:23 UTC

[cassandra] branch trunk updated: FQL replay should have options to ignore DDL statements

This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 00c2223  FQL replay should have options to ignore DDL statements
00c2223 is described below

commit 00c22232d96b6e592fb1d1f32a6b649b49eceb34
Author: Stefan Miklosovic <st...@instaclustr.com>
AuthorDate: Mon Aug 17 19:05:26 2020 +0200

    FQL replay should have options to ignore DDL statements
    
    Patch by Stefan Miklosovic; reviewed by David Capwell and Marcus Eriksson
    for CASSANDRA-16039
---
 CHANGES.txt                                        |   1 +
 doc/source/new/fqllogging.rst                      |  14 ++-
 .../test/FqlReplayDDLExclusionTest.java            | 107 +++++++++++++++++++++
 .../org/apache/cassandra/tools/ToolRunner.java     |  22 ++---
 .../src/org/apache/cassandra/fqltool/FQLQuery.java |  24 +++++
 .../apache/cassandra/fqltool/commands/Replay.java  |  22 ++++-
 6 files changed, 172 insertions(+), 18 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index b71b937..f429128 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-beta2
+ * FQL replay should have options to ignore DDL statements (CASSANDRA-16039)
  * Remove COMPACT STORAGE internals (CASSANDRA-13994)
  * Make TimestampSerializer accept fractional seconds of varying precision (CASSANDRA-15976)
  * Improve cassandra-stress logging when using a profile file that doesn't exist (CASSANDRA-14425)
diff --git a/doc/source/new/fqllogging.rst b/doc/source/new/fqllogging.rst
index 881f39f..08de1c7 100644
--- a/doc/source/new/fqllogging.rst
+++ b/doc/source/new/fqllogging.rst
@@ -550,19 +550,25 @@ The main objectives of ``fqltool replay`` are:
 
 The FQL replay could also be used on the same node on which the full query log are generated to recreate a dropped database object.
 
+Please keep in mind that ``fqltool replay`` is not replaying DDL statements automatically. You have to explicitly enable it by ``--replay-ddl-statements`` flag.
+
  The syntax of ``fqltool replay`` is as follows:
 
 ::
 
-  fqltool replay [--keyspace <keyspace>] [--results <results>]
- [--store-queries <store_queries>] --target <target>... [--] <path1>
- [<path2>...<pathN>]
+  fqltool replay [--keyspace <keyspace>] [--replay-ddl-statements]
+ [--results <results>] [--store-queries <store_queries>]
+ --target <target>... [--] <path1> [<path2>...<pathN>]
 
  OPTIONS
    --keyspace <keyspace>
   Only replay queries against this keyspace and queries without
   keyspace set.
 
+   --replay-ddl-statements
+   If specified, replays DDL statements as well, they are excluded from
+   replaying by default.
+
    --results <results>
   Where to store the results of the queries, this should be a
   directory. Leave this option out to avoid storing results.
@@ -593,7 +599,7 @@ Subsequently run ``fqltool replay``.   The directory to store results of queries
 
 ::
 
- [ec2-user@ip-10-0-2-238 cassandra]$ fqltool replay --keyspace AuditLogKeyspace --results
+ [ec2-user@ip-10-0-2-238 cassandra]$ fqltool replay --replay-ddl-statements --keyspace AuditLogKeyspace --results
  /cassandra/fql/logs/results/replay --store-queries /cassandra/fql/logs/queries/replay --
  target 3.91.56.164 -- /tmp/cassandrafullquerylog
 
diff --git a/test/distributed/org/apache/cassandra/distributed/test/FqlReplayDDLExclusionTest.java b/test/distributed/org/apache/cassandra/distributed/test/FqlReplayDDLExclusionTest.java
new file mode 100644
index 0000000..1f53b98
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/FqlReplayDDLExclusionTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import com.datastax.driver.core.Session;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.QueryResults;
+import org.apache.cassandra.tools.ToolRunner;
+
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class FqlReplayDDLExclusionTest extends TestBaseImpl
+{
+    @Rule
+    public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+    @Test
+    public void test() throws Throwable
+    {
+        try (final Cluster cluster = init(builder().withNodes(1)
+                                                   .withConfig(updater -> updater.with(NETWORK, GOSSIP, NATIVE_PROTOCOL))
+                                                   .start()))
+        {
+            final IInvokableInstance node = cluster.get(1);
+
+            // using driver path is important because dtest API and query execution does not invoke code
+            // in Cassandra where events are propagated to logger
+            try (com.datastax.driver.core.Cluster c = com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1").build();
+                 Session s = c.connect())
+            {
+                s.execute("CREATE KEYSPACE fql_ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};");
+
+                node.nodetool("enablefullquerylog", "--path", temporaryFolder.getRoot().getAbsolutePath());
+
+                s.execute("CREATE TABLE fql_ks.fql_table (id int primary key);");
+                s.execute("INSERT INTO fql_ks.fql_table (id) VALUES (1)");
+
+                node.nodetool("disablefullquerylog");
+
+                // here we are dropping and we expect that ddl replay will reconstruct it
+
+                node.executeInternal("DROP TABLE fql_ks.fql_table;");
+
+                final ToolRunner.Runners runners = new ToolRunner.Runners();
+
+                // without --replay-ddl-statements, the replay will fail on insert because underlying table is not there
+                final ToolRunner negativeRunner = runners.invokeClassAsTool("org.apache.cassandra.fqltool.FullQueryLogTool",
+                                                                            "replay",
+                                                                            "--keyspace", "fql_ks",
+                                                                            "--target", "127.0.0.1",
+                                                                            "--", temporaryFolder.getRoot().getAbsolutePath());
+
+                assertEquals(0, negativeRunner.getExitCode());
+
+                try
+                {
+                    node.executeInternalWithResult("SELECT * from fql_ks.fql_table");
+                    fail("This query should fail because we do not expect fql_ks.fql_table to be created!");
+                }
+                catch (final Exception ex)
+                {
+                    assertTrue(ex.getMessage().contains("table fql_table does not exist"));
+                }
+
+                // here we replay with --replay-ddl-statements so table will be created and insert will succeed
+                final ToolRunner positiveRunner = runners.invokeClassAsTool("org.apache.cassandra.fqltool.FullQueryLogTool",
+                                                                            "replay",
+                                                                            "--keyspace", "fql_ks",
+                                                                            "--target", "127.0.0.1",
+                                                                            // important
+                                                                            "--replay-ddl-statements",
+                                                                            "--", temporaryFolder.getRoot().getAbsolutePath());
+
+                assertEquals(0, positiveRunner.getExitCode());
+
+                assertRows(node.executeInternalWithResult("SELECT * from fql_ks.fql_table"), QueryResults.builder().row(1).build());
+            }
+        }
+    }
+}
diff --git a/test/unit/org/apache/cassandra/tools/ToolRunner.java b/test/unit/org/apache/cassandra/tools/ToolRunner.java
index ab6dc0d..b289e4d 100644
--- a/test/unit/org/apache/cassandra/tools/ToolRunner.java
+++ b/test/unit/org/apache/cassandra/tools/ToolRunner.java
@@ -424,15 +424,15 @@ public class ToolRunner implements AutoCloseable
     {
         forceKill();
     }
-    
-    static class Runners
+
+    public static class Runners
     {
-        protected ToolRunner invokeNodetool(String... args)
+        public ToolRunner invokeNodetool(String... args)
         {
             return invokeNodetool(Arrays.asList(args));
         }
 
-        protected ToolRunner invokeNodetool(List<String> args)
+        public ToolRunner invokeNodetool(List<String> args)
         {
             return invokeTool(buildNodetoolArgs(args), true);
         }
@@ -441,28 +441,28 @@ public class ToolRunner implements AutoCloseable
         {
             return CQLTester.buildNodetoolArgs(args);
         }
-        
-        protected ToolRunner invokeClassAsTool(String... args)
+
+        public ToolRunner invokeClassAsTool(String... args)
         {
             return invokeClassAsTool(Arrays.asList(args));
         }
-        
-        protected ToolRunner invokeClassAsTool(List<String> args)
+
+        public ToolRunner invokeClassAsTool(List<String> args)
         {
             return invokeTool(args, false);
         }
 
-        protected ToolRunner invokeTool(String... args)
+        public ToolRunner invokeTool(String... args)
         {
             return invokeTool(Arrays.asList(args));
         }
 
-        protected ToolRunner invokeTool(List<String> args)
+        public ToolRunner invokeTool(List<String> args)
         {
             return invokeTool(args, true);
         }
 
-        protected ToolRunner invokeTool(List<String> args, boolean runOutOfProcess)
+        public ToolRunner invokeTool(List<String> args, boolean runOutOfProcess)
         {
             ToolRunner runner = new ToolRunner(args, runOutOfProcess);
             runner.start().waitFor();
diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQuery.java b/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQuery.java
index c3c6c89..ca32646 100644
--- a/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQuery.java
+++ b/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQuery.java
@@ -22,6 +22,9 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
 import com.google.common.primitives.Longs;
 
 import com.datastax.driver.core.BatchStatement;
@@ -98,8 +101,12 @@ public abstract class FQLQuery implements Comparable<FQLQuery>
                '}';
     }
 
+    public abstract boolean isDDLStatement();
+
     public static class Single extends FQLQuery
     {
+        private static final Set<String> DDL_STATEMENTS = Sets.newHashSet("CREATE", "ALTER", "DROP");
+
         public final String query;
         public final List<ByteBuffer> values;
 
@@ -119,6 +126,18 @@ public abstract class FQLQuery implements Comparable<FQLQuery>
                                  values.size());
         }
 
+        public boolean isDDLStatement()
+        {
+            for (final String ddlStmt : DDL_STATEMENTS)
+            {
+                if (this.query.startsWith(ddlStmt))
+                {
+                    return true;
+                }
+            }
+            return false;
+        }
+
         public Statement toStatement()
         {
             SimpleStatement ss = new SimpleStatement(query, values.toArray());
@@ -243,6 +262,11 @@ public abstract class FQLQuery implements Comparable<FQLQuery>
             return sb.toString();
         }
 
+        public boolean isDDLStatement()
+        {
+            return false;
+        }
+
         public boolean equals(Object o)
         {
             if (this == o) return true;
diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Replay.java b/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Replay.java
index 9cc147a..a567101 100644
--- a/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Replay.java
+++ b/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Replay.java
@@ -26,6 +26,8 @@ import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import io.airlift.airline.Arguments;
 import io.airlift.airline.Command;
@@ -33,7 +35,6 @@ import io.airlift.airline.Option;
 import net.openhft.chronicle.core.io.Closeable;
 import net.openhft.chronicle.queue.ChronicleQueue;
 import net.openhft.chronicle.queue.ChronicleQueueBuilder;
-
 import org.apache.cassandra.fqltool.FQLQuery;
 import org.apache.cassandra.fqltool.FQLQueryIterator;
 import org.apache.cassandra.fqltool.QueryReplayer;
@@ -46,6 +47,8 @@ import org.apache.cassandra.utils.MergeIterator;
 @Command(name = "replay", description = "Replay full query logs")
 public class Replay implements Runnable
 {
+    private static final Logger logger = LoggerFactory.getLogger(Replay.class);
+
     @Arguments(usage = "<path1> [<path2>...<pathN>]", description = "Paths containing the full query logs to replay.", required = true)
     private List<String> arguments = new ArrayList<>();
 
@@ -61,6 +64,9 @@ public class Replay implements Runnable
     @Option(title = "store_queries", name = {"--store-queries"}, description = "Path to store the queries executed. Stores queries in the same order as the result sets are in the result files. Requires --results")
     private String queryStorePath;
 
+    @Option(title = "replay_ddl_statements", name = { "--replay-ddl-statements" }, description = "If specified, replays DDL statements as well, they are excluded from replaying by default.")
+    private boolean replayDDLStatements;
+
     @Override
     public void run()
     {
@@ -83,7 +89,7 @@ public class Replay implements Runnable
                 System.err.println("You need to state at least one --target host to replay the query against");
                 System.exit(1);
             }
-            replay(keyspace, arguments, targetHosts, resultPaths, queryStorePath);
+            replay(keyspace, arguments, targetHosts, resultPaths, queryStorePath, replayDDLStatements);
         }
         catch (Exception e)
         {
@@ -91,7 +97,7 @@ public class Replay implements Runnable
         }
     }
 
-    public static void replay(String keyspace, List<String> arguments, List<String> targetHosts, List<File> resultPaths, String queryStorePath)
+    public static void replay(String keyspace, List<String> arguments, List<String> targetHosts, List<File> resultPaths, String queryStorePath, boolean replayDDLStatements)
     {
         int readAhead = 200; // how many fql queries should we read in to memory to be able to sort them?
         List<ChronicleQueue> readQueues = null;
@@ -101,6 +107,16 @@ public class Replay implements Runnable
         if (keyspace != null)
             filters.add(fqlQuery -> fqlQuery.keyspace() == null || fqlQuery.keyspace().equals(keyspace));
 
+        if (!replayDDLStatements)
+            filters.add(fqlQuery -> {
+                boolean notDDLStatement = !fqlQuery.isDDLStatement();
+
+                if (!notDDLStatement)
+                    logger.info("Excluding DDL statement from replaying: {}", ((FQLQuery.Single) fqlQuery).query);
+
+                return notDDLStatement;
+            });
+
         try
         {
             readQueues = arguments.stream().map(s -> ChronicleQueueBuilder.single(s).readOnly(true).build()).collect(Collectors.toList());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org