You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2018/09/01 08:44:22 UTC
[6/7] cassandra git commit: Add fqltool compare
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/src/java/org/apache/cassandra/tools/fqltool/commands/Replay.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/fqltool/commands/Replay.java b/src/java/org/apache/cassandra/tools/fqltool/commands/Replay.java
deleted file mode 100644
index 043ead8..0000000
--- a/src/java/org/apache/cassandra/tools/fqltool/commands/Replay.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.tools.fqltool.commands;
-
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import io.airlift.airline.Arguments;
-import io.airlift.airline.Command;
-import io.airlift.airline.Option;
-import net.openhft.chronicle.core.io.Closeable;
-import net.openhft.chronicle.queue.ChronicleQueue;
-import net.openhft.chronicle.queue.ChronicleQueueBuilder;
-
-import org.apache.cassandra.tools.fqltool.FQLQuery;
-import org.apache.cassandra.tools.fqltool.FQLQueryIterator;
-import org.apache.cassandra.tools.fqltool.QueryReplayer;
-import org.apache.cassandra.utils.AbstractIterator;
-import org.apache.cassandra.utils.MergeIterator;
-
-/**
- * replay the contents of a list of paths containing full query logs
- */
-@Command(name = "replay", description = "Replay full query logs")
-public class Replay implements Runnable
-{
- @Arguments(usage = "<path1> [<path2>...<pathN>]", description = "Paths containing the full query logs to replay.", required = true)
- private List<String> arguments = new ArrayList<>();
-
- @Option(title = "target", name = {"--target"}, description = "Hosts to replay the logs to, can be repeated to replay to more hosts.")
- private List<String> targetHosts;
-
- @Option(title = "results", name = { "--results"}, description = "Where to store the results of the queries, this should be a directory. Leave this option out to avoid storing results.")
- private String resultPath;
-
- @Option(title = "keyspace", name = { "--keyspace"}, description = "Only replay queries against this keyspace and queries without keyspace set.")
- private String keyspace;
-
- @Option(title = "debug", name = {"--debug"}, description = "Debug mode, print all queries executed.")
- private boolean debug;
-
- @Option(title = "store_queries", name = {"--store-queries"}, description = "Path to store the queries executed. Stores queries in the same order as the result sets are in the result files. Requires --results")
- private String queryStorePath;
-
- @Override
- public void run()
- {
- try
- {
- List<File> resultPaths = null;
- if (resultPath != null)
- {
- File basePath = new File(resultPath);
- if (!basePath.exists() || !basePath.isDirectory())
- {
- System.err.println("The results path (" + basePath + ") should be an existing directory");
- System.exit(1);
- }
- resultPaths = targetHosts.stream().map(target -> new File(basePath, target)).collect(Collectors.toList());
- resultPaths.forEach(File::mkdir);
- }
- if (targetHosts.size() < 1)
- {
- System.err.println("You need to state at least one --target host to replay the query against");
- System.exit(1);
- }
- replay(keyspace, arguments, targetHosts, resultPaths, queryStorePath, debug);
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- }
-
- public static void replay(String keyspace, List<String> arguments, List<String> targetHosts, List<File> resultPaths, String queryStorePath, boolean debug)
- {
- int readAhead = 200; // how many fql queries should we read in to memory to be able to sort them?
- List<ChronicleQueue> readQueues = null;
- List<FQLQueryIterator> iterators = null;
- List<Predicate<FQLQuery>> filters = new ArrayList<>();
-
- if (keyspace != null)
- filters.add(fqlQuery -> fqlQuery.keyspace == null || fqlQuery.keyspace.equals(keyspace));
-
- try
- {
- readQueues = arguments.stream().map(s -> ChronicleQueueBuilder.single(s).readOnly(true).build()).collect(Collectors.toList());
- iterators = readQueues.stream().map(ChronicleQueue::createTailer).map(tailer -> new FQLQueryIterator(tailer, readAhead)).collect(Collectors.toList());
- try (MergeIterator<FQLQuery, List<FQLQuery>> iter = MergeIterator.get(iterators, FQLQuery::compareTo, new Reducer());
- QueryReplayer replayer = new QueryReplayer(iter, targetHosts, resultPaths, filters, System.out, queryStorePath, debug))
- {
- replayer.replay();
- }
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- finally
- {
- if (iterators != null)
- iterators.forEach(AbstractIterator::close);
- if (readQueues != null)
- readQueues.forEach(Closeable::close);
- }
- }
-
- @VisibleForTesting
- public static class Reducer extends MergeIterator.Reducer<FQLQuery, List<FQLQuery>>
- {
- List<FQLQuery> queries = new ArrayList<>();
- public void reduce(int idx, FQLQuery current)
- {
- queries.add(current);
- }
-
- protected List<FQLQuery> getReduced()
- {
- return queries;
- }
- protected void onKeyChange()
- {
- queries.clear();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/test/unit/org/apache/cassandra/tools/fqltool/FQLReplayTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/tools/fqltool/FQLReplayTest.java b/test/unit/org/apache/cassandra/tools/fqltool/FQLReplayTest.java
deleted file mode 100644
index a662699..0000000
--- a/test/unit/org/apache/cassandra/tools/fqltool/FQLReplayTest.java
+++ /dev/null
@@ -1,760 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.tools.fqltool;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Objects;
-import java.util.Random;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
-
-import com.google.common.collect.AbstractIterator;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import org.junit.Test;
-
-import net.openhft.chronicle.queue.ChronicleQueue;
-import net.openhft.chronicle.queue.ChronicleQueueBuilder;
-import net.openhft.chronicle.queue.ExcerptAppender;
-import net.openhft.chronicle.queue.ExcerptTailer;
-import net.openhft.chronicle.wire.ValueIn;
-import org.apache.cassandra.audit.FullQueryLogger;
-import org.apache.cassandra.cql3.QueryOptions;
-import org.apache.cassandra.cql3.statements.BatchStatement;
-import org.apache.cassandra.service.ClientState;
-import org.apache.cassandra.service.QueryState;
-import org.apache.cassandra.tools.Util;
-import org.apache.cassandra.tools.fqltool.commands.Replay;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.MergeIterator;
-import org.apache.cassandra.utils.Pair;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-public class FQLReplayTest
-{
- public FQLReplayTest()
- {
- Util.initDatabaseDescriptor();
- }
-
- @Test
- public void testOrderedReplay() throws IOException
- {
- File f = generateQueries(100, true);
- int queryCount = 0;
- try (ChronicleQueue queue = ChronicleQueueBuilder.single(f).build();
- FQLQueryIterator iter = new FQLQueryIterator(queue.createTailer(), 101))
- {
- long last = -1;
- while (iter.hasNext())
- {
- FQLQuery q = iter.next();
- assertTrue(q.queryStartTime >= last);
- last = q.queryStartTime;
- queryCount++;
- }
- }
- assertEquals(100, queryCount);
- }
- @Test
- public void testMergingIterator() throws IOException
- {
- File f = generateQueries(100, false);
- File f2 = generateQueries(100, false);
- int queryCount = 0;
- try (ChronicleQueue queue = ChronicleQueueBuilder.single(f).build();
- ChronicleQueue queue2 = ChronicleQueueBuilder.single(f2).build();
- FQLQueryIterator iter = new FQLQueryIterator(queue.createTailer(), 101);
- FQLQueryIterator iter2 = new FQLQueryIterator(queue2.createTailer(), 101);
- MergeIterator<FQLQuery, List<FQLQuery>> merger = MergeIterator.get(Lists.newArrayList(iter, iter2), FQLQuery::compareTo, new Replay.Reducer()))
- {
- long last = -1;
-
- while (merger.hasNext())
- {
- List<FQLQuery> qs = merger.next();
- assertEquals(2, qs.size());
- assertEquals(0, qs.get(0).compareTo(qs.get(1)));
- assertTrue(qs.get(0).queryStartTime >= last);
- last = qs.get(0).queryStartTime;
- queryCount++;
- }
- }
- assertEquals(100, queryCount);
- }
-
- @Test
- public void testFQLQueryReader() throws IOException
- {
- FQLQueryReader reader = new FQLQueryReader();
-
- try (ChronicleQueue queue = ChronicleQueueBuilder.single(generateQueries(1000, true)).build())
- {
- ExcerptTailer tailer = queue.createTailer();
- int queryCount = 0;
- while (tailer.readDocument(reader))
- {
- assertNotNull(reader.getQuery());
- if (reader.getQuery() instanceof FQLQuery.Single)
- {
- assertTrue(reader.getQuery().keyspace == null || reader.getQuery().keyspace.equals("querykeyspace"));
- }
- else
- {
- assertEquals("someks", reader.getQuery().keyspace);
- }
- queryCount++;
- }
- assertEquals(1000, queryCount);
- }
- }
-
- @Test
- public void testStoringResults() throws Throwable
- {
- File tmpDir = Files.createTempDirectory("results").toFile();
- File queryDir = Files.createTempDirectory("queries").toFile();
-
- ResultHandler.ComparableResultSet res = createResultSet(10, 10, true);
- ResultStore rs = new ResultStore(Collections.singletonList(tmpDir), queryDir);
- try
- {
- FQLQuery query = new FQLQuery.Single("abc", 3, QueryOptions.DEFAULT, 12345, 11111, 22, "select * from abc", Collections.emptyList());
- rs.storeColumnDefinitions(query, Collections.singletonList(res.getColumnDefinitions()));
- Iterator<ResultHandler.ComparableRow> it = res.iterator();
- while (it.hasNext())
- {
- List<ResultHandler.ComparableRow> row = Collections.singletonList(it.next());
- rs.storeRows(row);
- }
- // this marks the end of the result set:
- rs.storeRows(Collections.singletonList(null));
- }
- finally
- {
- rs.close();
- }
-
- List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> resultSets = readResultFile(tmpDir, queryDir);
- assertEquals(1, resultSets.size());
- assertEquals(res, resultSets.get(0).right);
-
- }
-
- @Test
- public void testCompareColumnDefinitions()
- {
- ResultHandler.ComparableResultSet res = createResultSet(10, 10, false);
- ResultComparator rc = new ResultComparator();
-
- List<ResultHandler.ComparableColumnDefinitions> colDefs = new ArrayList<>(100);
- List<String> targetHosts = new ArrayList<>(100);
- for (int i = 0; i < 100; i++)
- {
- targetHosts.add("host"+i);
- colDefs.add(res.getColumnDefinitions());
- }
- assertTrue(rc.compareColumnDefinitions(targetHosts, null, colDefs));
- colDefs.set(50, createResultSet(9, 9, false).getColumnDefinitions());
- assertFalse(rc.compareColumnDefinitions(targetHosts, null, colDefs));
- }
-
- @Test
- public void testCompareEqualRows()
- {
- ResultComparator rc = new ResultComparator();
-
- ResultHandler.ComparableResultSet res = createResultSet(10, 10, false);
- ResultHandler.ComparableResultSet res2 = createResultSet(10, 10, false);
- List<ResultHandler.ComparableResultSet> toCompare = Lists.newArrayList(res, res2);
- List<Iterator<ResultHandler.ComparableRow>> iters = toCompare.stream().map(Iterable::iterator).collect(Collectors.toList());
-
- while (true)
- {
- List<ResultHandler.ComparableRow> rows = ResultHandler.rows(iters);
- assertTrue(rc.compareRows(Lists.newArrayList("eq1", "eq2"), null, rows));
- if (rows.stream().allMatch(Objects::isNull))
- break;
- }
- }
-
- @Test
- public void testCompareRowsDifferentCount()
- {
- ResultComparator rc = new ResultComparator();
- ResultHandler.ComparableResultSet res = createResultSet(10, 10, false);
- ResultHandler.ComparableResultSet res2 = createResultSet(10, 10, false);
- List<ResultHandler.ComparableResultSet> toCompare = Lists.newArrayList(res, res2, createResultSet(10, 11, false));
- List<Iterator<ResultHandler.ComparableRow>> iters = toCompare.stream().map(Iterable::iterator).collect(Collectors.toList());
- boolean foundMismatch = false;
- while (true)
- {
- List<ResultHandler.ComparableRow> rows = ResultHandler.rows(iters);
- if (rows.stream().allMatch(Objects::isNull))
- break;
- if (!rc.compareRows(Lists.newArrayList("eq1", "eq2", "diff"), null, rows))
- {
- foundMismatch = true;
- }
- }
- assertTrue(foundMismatch);
- }
-
- @Test
- public void testCompareRowsDifferentContent()
- {
- ResultComparator rc = new ResultComparator();
- ResultHandler.ComparableResultSet res = createResultSet(10, 10, false);
- ResultHandler.ComparableResultSet res2 = createResultSet(10, 10, false);
- List<ResultHandler.ComparableResultSet> toCompare = Lists.newArrayList(res, res2, createResultSet(10, 10, true));
- List<Iterator<ResultHandler.ComparableRow>> iters = toCompare.stream().map(Iterable::iterator).collect(Collectors.toList());
- while (true)
- {
- List<ResultHandler.ComparableRow> rows = ResultHandler.rows(iters);
- if (rows.stream().allMatch(Objects::isNull))
- break;
- assertFalse(rows.toString(), rc.compareRows(Lists.newArrayList("eq1", "eq2", "diff"), null, rows));
- }
- }
-
- @Test
- public void testCompareRowsDifferentColumnCount()
- {
- ResultComparator rc = new ResultComparator();
- ResultHandler.ComparableResultSet res = createResultSet(10, 10, false);
- ResultHandler.ComparableResultSet res2 = createResultSet(10, 10, false);
- List<ResultHandler.ComparableResultSet> toCompare = Lists.newArrayList(res, res2, createResultSet(11, 10, false));
- List<Iterator<ResultHandler.ComparableRow>> iters = toCompare.stream().map(Iterable::iterator).collect(Collectors.toList());
- while (true)
- {
- List<ResultHandler.ComparableRow> rows = ResultHandler.rows(iters);
- if (rows.stream().allMatch(Objects::isNull))
- break;
- assertFalse(rows.toString(), rc.compareRows(Lists.newArrayList("eq1", "eq2", "diff"), null, rows));
- }
- }
-
- @Test
- public void testResultHandler() throws IOException
- {
- List<String> targetHosts = Lists.newArrayList("hosta", "hostb", "hostc");
- File tmpDir = Files.createTempDirectory("testresulthandler").toFile();
- File queryDir = Files.createTempDirectory("queries").toFile();
- List<File> resultPaths = new ArrayList<>();
- targetHosts.forEach(host -> { File f = new File(tmpDir, host); f.mkdir(); resultPaths.add(f);});
- ResultHandler rh = new ResultHandler(targetHosts, resultPaths, queryDir);
- ResultHandler.ComparableResultSet res = createResultSet(10, 10, false);
- ResultHandler.ComparableResultSet res2 = createResultSet(10, 10, false);
- ResultHandler.ComparableResultSet res3 = createResultSet(10, 10, false);
- List<ResultHandler.ComparableResultSet> toCompare = Lists.newArrayList(res, res2, res3);
- rh.handleResults(new FQLQuery.Single("abcabc", 3, QueryOptions.DEFAULT, 1111, 2222, 3333, "select * from xyz", Collections.emptyList()), toCompare);
- List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> results1 = readResultFile(resultPaths.get(0), queryDir);
- List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> results2 = readResultFile(resultPaths.get(1), queryDir);
- List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> results3 = readResultFile(resultPaths.get(2), queryDir);
- assertEquals(results1, results2);
- assertEquals(results1, results3);
- assertEquals(Iterables.getOnlyElement(results3).right, res);
- }
-
- @Test
- public void testResultHandlerWithDifference() throws IOException
- {
- List<String> targetHosts = Lists.newArrayList("hosta", "hostb", "hostc");
- File tmpDir = Files.createTempDirectory("testresulthandler").toFile();
- File queryDir = Files.createTempDirectory("queries").toFile();
- List<File> resultPaths = new ArrayList<>();
- targetHosts.forEach(host -> { File f = new File(tmpDir, host); f.mkdir(); resultPaths.add(f);});
- ResultHandler rh = new ResultHandler(targetHosts, resultPaths, queryDir);
- ResultHandler.ComparableResultSet res = createResultSet(10, 10, false);
- ResultHandler.ComparableResultSet res2 = createResultSet(10, 5, false);
- ResultHandler.ComparableResultSet res3 = createResultSet(10, 10, false);
- List<ResultHandler.ComparableResultSet> toCompare = Lists.newArrayList(res, res2, res3);
- rh.handleResults(new FQLQuery.Single("aaa", 3, QueryOptions.DEFAULT, 123123, 11111, 22222, "select * from abcabc", Collections.emptyList()), toCompare);
- List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> results1 = readResultFile(resultPaths.get(0), queryDir);
- List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> results2 = readResultFile(resultPaths.get(1), queryDir);
- List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> results3 = readResultFile(resultPaths.get(2), queryDir);
- assertEquals(results1, results3);
- assertEquals(results2.get(0).right, res2);
- }
-
- @Test
- public void testResultHandlerMultipleResultSets() throws IOException
- {
- List<String> targetHosts = Lists.newArrayList("hosta", "hostb", "hostc");
- File tmpDir = Files.createTempDirectory("testresulthandler").toFile();
- File queryDir = Files.createTempDirectory("queries").toFile();
- List<File> resultPaths = new ArrayList<>();
- targetHosts.forEach(host -> { File f = new File(tmpDir, host); f.mkdir(); resultPaths.add(f);});
- ResultHandler rh = new ResultHandler(targetHosts, resultPaths, queryDir);
- List<Pair<FQLQuery, List<ResultHandler.ComparableResultSet>>> resultSets = new ArrayList<>();
- Random random = new Random();
- for (int i = 0; i < 10; i++)
- {
- List<ResultHandler.ComparableResultSet> results = new ArrayList<>();
- List<ByteBuffer> values = Collections.singletonList(ByteBufferUtil.bytes(i * 50));
- for (int jj = 0; jj < targetHosts.size(); jj++)
- {
- results.add(createResultSet(5, 1 + random.nextInt(10), true));
- }
- FQLQuery q = new FQLQuery.Single("abc"+i,
- 3,
- QueryOptions.forInternalCalls(values),
- i * 1000,
- 12345,
- 54321,
- "select * from xyz where id = "+i,
- values);
- resultSets.add(Pair.create(q, results));
- }
- for (int i = 0; i < resultSets.size(); i++)
- rh.handleResults(resultSets.get(i).left, resultSets.get(i).right);
-
- for (int i = 0; i < targetHosts.size(); i++)
- compareWithFile(resultPaths, queryDir, resultSets, i);
- }
-
- @Test
- public void testResultHandlerFailedQuery() throws IOException
- {
- List<String> targetHosts = Lists.newArrayList("hosta", "hostb", "hostc", "hostd");
- File tmpDir = Files.createTempDirectory("testresulthandler").toFile();
- File queryDir = Files.createTempDirectory("queries").toFile();
- List<File> resultPaths = new ArrayList<>();
- targetHosts.forEach(host -> { File f = new File(tmpDir, host); f.mkdir(); resultPaths.add(f);});
- ResultHandler rh = new ResultHandler(targetHosts, resultPaths, queryDir);
- List<Pair<FQLQuery, List<ResultHandler.ComparableResultSet>>> resultSets = new ArrayList<>();
- Random random = new Random();
- for (int i = 0; i < 10; i++)
- {
- List<ResultHandler.ComparableResultSet> results = new ArrayList<>();
- List<ByteBuffer> values = Collections.singletonList(ByteBufferUtil.bytes(i * 50));
- for (int jj = 0; jj < targetHosts.size(); jj++)
- {
- results.add(createResultSet(5, 1 + random.nextInt(10), true));
- }
- results.set(0, FakeResultSet.failed(new RuntimeException("testing abc")));
- results.set(3, FakeResultSet.failed(new RuntimeException("testing abc")));
- FQLQuery q = new FQLQuery.Single("abc"+i,
- 3,
- QueryOptions.forInternalCalls(values),
- i * 1000,
- i * 12345,
- i * 54321,
- "select * from xyz where id = "+i,
- values);
- resultSets.add(Pair.create(q, results));
- }
- for (int i = 0; i < resultSets.size(); i++)
- rh.handleResults(resultSets.get(i).left, resultSets.get(i).right);
-
- for (int i = 0; i < targetHosts.size(); i++)
- compareWithFile(resultPaths, queryDir, resultSets, i);
- }
-
-
- @Test
- public void testCompare()
- {
- FQLQuery q1 = new FQLQuery.Single("abc", 0, QueryOptions.DEFAULT, 123, 111, 222, "aaaa", Collections.emptyList());
- FQLQuery q2 = new FQLQuery.Single("abc", 0, QueryOptions.DEFAULT, 123, 111, 222,"aaaa", Collections.emptyList());
-
- assertEquals(0, q1.compareTo(q2));
- assertEquals(0, q2.compareTo(q1));
-
- FQLQuery q3 = new FQLQuery.Batch("abc", 0, QueryOptions.DEFAULT, 123, 111, 222, com.datastax.driver.core.BatchStatement.Type.UNLOGGED, Collections.emptyList(), Collections.emptyList());
- // single queries before batch queries
- assertTrue(q1.compareTo(q3) < 0);
- assertTrue(q3.compareTo(q1) > 0);
-
- // check that smaller query time
- FQLQuery q4 = new FQLQuery.Single("abc", 0, QueryOptions.DEFAULT, 124, 111, 222, "aaaa", Collections.emptyList());
- assertTrue(q1.compareTo(q4) < 0);
- assertTrue(q4.compareTo(q1) > 0);
-
- FQLQuery q5 = new FQLQuery.Batch("abc", 0, QueryOptions.DEFAULT, 124, 111, 222, com.datastax.driver.core.BatchStatement.Type.UNLOGGED, Collections.emptyList(), Collections.emptyList());
- assertTrue(q1.compareTo(q5) < 0);
- assertTrue(q5.compareTo(q1) > 0);
-
- FQLQuery q6 = new FQLQuery.Single("abc", 0, QueryOptions.DEFAULT, 123, 111, 222, "aaaa", Collections.singletonList(ByteBufferUtil.bytes(10)));
- FQLQuery q7 = new FQLQuery.Single("abc", 0, QueryOptions.DEFAULT, 123, 111, 222, "aaaa", Collections.emptyList());
- assertTrue(q6.compareTo(q7) > 0);
- assertTrue(q7.compareTo(q6) < 0);
-
- FQLQuery q8 = new FQLQuery.Single("abc", 0, QueryOptions.DEFAULT, 123, 111, 222, "aaaa", Collections.singletonList(ByteBufferUtil.bytes("a")));
- FQLQuery q9 = new FQLQuery.Single("abc", 0, QueryOptions.DEFAULT, 123, 111, 222, "aaaa", Collections.singletonList(ByteBufferUtil.bytes("b")));
- assertTrue(q8.compareTo(q9) < 0);
- assertTrue(q9.compareTo(q8) > 0);
- }
-
- private File generateQueries(int count, boolean random) throws IOException
- {
- Random r = new Random();
- File dir = Files.createTempDirectory("chronicle").toFile();
- try (ChronicleQueue readQueue = ChronicleQueueBuilder.single(dir).build())
- {
- ExcerptAppender appender = readQueue.acquireAppender();
-
- for (int i = 0; i < count; i++)
- {
- long timestamp = random ? Math.abs(r.nextLong() % 10000) : i;
- if (random ? r.nextBoolean() : i % 2 == 0)
- {
- String query = "abcdefghijklm " + i;
- QueryState qs = r.nextBoolean() ? queryState() : queryState("querykeyspace");
- FullQueryLogger.Query q = new FullQueryLogger.Query(query, QueryOptions.DEFAULT, qs, timestamp);
- appender.writeDocument(q);
- q.release();
- }
- else
- {
- int batchSize = random ? r.nextInt(99) + 1 : i + 1;
- List<String> queries = new ArrayList<>(batchSize);
- List<List<ByteBuffer>> values = new ArrayList<>(batchSize);
- for (int jj = 0; jj < (random ? r.nextInt(batchSize) : 10); jj++)
- {
- queries.add("aaaaaa batch "+i+":"+jj);
- values.add(Collections.emptyList());
- }
- FullQueryLogger.Batch batch = new FullQueryLogger.Batch(BatchStatement.Type.UNLOGGED,
- queries,
- values,
- QueryOptions.DEFAULT,
- queryState("someks"),
- timestamp);
- appender.writeDocument(batch);
- batch.release();
- }
- }
- }
- return dir;
- }
-
- private QueryState queryState()
- {
- return QueryState.forInternalCalls();
- }
-
- private QueryState queryState(String keyspace)
- {
- ClientState clientState = ClientState.forInternalCalls(keyspace);
- return new QueryState(clientState);
- }
-
- private static ResultHandler.ComparableResultSet createResultSet(int columnCount, int rowCount, boolean random)
- {
- List<Pair<String, String>> columnDefs = new ArrayList<>(columnCount);
- Random r = new Random();
- for (int i = 0; i < columnCount; i++)
- {
- columnDefs.add(Pair.create("a" + i, "int"));
- }
- List<List<String>> rows = new ArrayList<>();
- for (int i = 0; i < rowCount; i++)
- {
- List<String> row = new ArrayList<>(columnCount);
- for (int jj = 0; jj < columnCount; jj++)
- row.add(i + " col " + jj + (random ? r.nextInt() : ""));
- rows.add(row);
- }
- return new FakeResultSet(columnDefs, rows);
- }
-
- private static void compareWithFile(List<File> dirs, File resultDir, List<Pair<FQLQuery, List<ResultHandler.ComparableResultSet>>> resultSets, int idx)
- {
- List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> results1 = readResultFile(dirs.get(idx), resultDir);
- for (int i = 0; i < results1.size(); i++)
- {
- assertEquals(results1.get(i).left, resultSets.get(i).left);
- assertEquals(results1.get(i).right, resultSets.get(i).right.get(idx));
- }
- }
-
- private static List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> readResultFile(File dir, File queryDir)
- {
- List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> resultSets = new ArrayList<>();
- try (ChronicleQueue q = ChronicleQueueBuilder.single(dir).build();
- ChronicleQueue queryQ = ChronicleQueueBuilder.single(queryDir).build())
- {
- ExcerptTailer tailer = q.createTailer();
- ExcerptTailer queryTailer = queryQ.createTailer();
- List<Pair<String, String>> columnDefinitions = new ArrayList<>();
- List<List<String>> rowColumns = new ArrayList<>();
- AtomicBoolean allRowsRead = new AtomicBoolean(false);
- AtomicBoolean failedQuery = new AtomicBoolean(false);
- while (tailer.readDocument(wire -> {
- String type = wire.read("type").text();
- if (type.equals("column_definitions"))
- {
- int columnCount = wire.read("column_count").int32();
- for (int i = 0; i < columnCount; i++)
- {
- ValueIn vi = wire.read("column_definition");
- String name = vi.text();
- String dataType = vi.text();
- columnDefinitions.add(Pair.create(name, dataType));
- }
- }
- else if (type.equals("row"))
- {
- int rowColumnCount = wire.read("row_column_count").int32();
- List<String> r = new ArrayList<>(rowColumnCount);
- for (int i = 0; i < rowColumnCount; i++)
- {
- byte[] b = wire.read("column").bytes();
- r.add(new String(b));
- }
- rowColumns.add(r);
- }
- else if (type.equals("end_resultset"))
- {
- allRowsRead.set(true);
- }
- else if (type.equals("query_failed"))
- {
- failedQuery.set(true);
- }
- }))
- {
- if (allRowsRead.get())
- {
- FQLQueryReader reader = new FQLQueryReader();
- queryTailer.readDocument(reader);
- resultSets.add(Pair.create(reader.getQuery(), failedQuery.get() ? FakeResultSet.failed(new RuntimeException("failure"))
- : new FakeResultSet(ImmutableList.copyOf(columnDefinitions), ImmutableList.copyOf(rowColumns))));
- allRowsRead.set(false);
- failedQuery.set(false);
- columnDefinitions.clear();
- rowColumns.clear();
- }
- }
- }
- return resultSets;
- }
-
- private static class FakeResultSet implements ResultHandler.ComparableResultSet
- {
- private final List<Pair<String, String>> cdStrings;
- private final List<List<String>> rows;
- private final Throwable ex;
-
- public FakeResultSet(List<Pair<String, String>> cdStrings, List<List<String>> rows)
- {
- this(cdStrings, rows, null);
- }
-
- public FakeResultSet(List<Pair<String, String>> cdStrings, List<List<String>> rows, Throwable ex)
- {
- this.cdStrings = cdStrings;
- this.rows = rows;
- this.ex = ex;
- }
-
- public static FakeResultSet failed(Throwable ex)
- {
- return new FakeResultSet(null, null, ex);
- }
-
- public ResultHandler.ComparableColumnDefinitions getColumnDefinitions()
- {
- return new FakeComparableColumnDefinitions(cdStrings, wasFailed());
- }
-
- public boolean wasFailed()
- {
- return getFailureException() != null;
- }
-
- public Throwable getFailureException()
- {
- return ex;
- }
-
- public Iterator<ResultHandler.ComparableRow> iterator()
- {
- if (wasFailed())
- return Collections.emptyListIterator();
- return new AbstractIterator<ResultHandler.ComparableRow>()
- {
- Iterator<List<String>> iter = rows.iterator();
- protected ResultHandler.ComparableRow computeNext()
- {
- if (iter.hasNext())
- return new FakeComparableRow(iter.next(), cdStrings);
- return endOfData();
- }
- };
- }
-
- public boolean equals(Object o)
- {
- if (this == o) return true;
- if (!(o instanceof FakeResultSet)) return false;
- FakeResultSet that = (FakeResultSet) o;
- if (wasFailed() && that.wasFailed())
- return true;
- return Objects.equals(cdStrings, that.cdStrings) &&
- Objects.equals(rows, that.rows);
- }
-
- public int hashCode()
- {
- return Objects.hash(cdStrings, rows);
- }
-
- public String toString()
- {
- return "FakeResultSet{" +
- "cdStrings=" + cdStrings +
- ", rows=" + rows +
- '}';
- }
- }
-
- private static class FakeComparableRow implements ResultHandler.ComparableRow
- {
- private final List<String> row;
- private final List<Pair<String, String>> cds;
-
- public FakeComparableRow(List<String> row, List<Pair<String,String>> cds)
- {
- this.row = row;
- this.cds = cds;
- }
-
- public ByteBuffer getBytesUnsafe(int i)
- {
- return ByteBufferUtil.bytes(row.get(i));
- }
-
- public ResultHandler.ComparableColumnDefinitions getColumnDefinitions()
- {
- return new FakeComparableColumnDefinitions(cds, false);
- }
-
- public boolean equals(Object other)
- {
- if (!(other instanceof FakeComparableRow))
- return false;
- return row.equals(((FakeComparableRow)other).row);
- }
-
- public String toString()
- {
- return row.toString();
- }
- }
-
- private static class FakeComparableColumnDefinitions implements ResultHandler.ComparableColumnDefinitions
- {
- private final List<ResultHandler.ComparableDefinition> defs;
- private final boolean failed;
- public FakeComparableColumnDefinitions(List<Pair<String, String>> cds, boolean failed)
- {
- defs = cds != null ? cds.stream().map(FakeComparableDefinition::new).collect(Collectors.toList()) : null;
- this.failed = failed;
- }
-
- public List<ResultHandler.ComparableDefinition> asList()
- {
- if (wasFailed())
- return Collections.emptyList();
- return defs;
- }
-
- public boolean wasFailed()
- {
- return failed;
- }
-
- public int size()
- {
- return defs.size();
- }
-
- public Iterator<ResultHandler.ComparableDefinition> iterator()
- {
- if (wasFailed())
- return Collections.emptyListIterator();
- return new AbstractIterator<ResultHandler.ComparableDefinition>()
- {
- Iterator<ResultHandler.ComparableDefinition> iter = defs.iterator();
- protected ResultHandler.ComparableDefinition computeNext()
- {
- if (iter.hasNext())
- return iter.next();
- return endOfData();
- }
- };
- }
- public boolean equals(Object other)
- {
- if (!(other instanceof FakeComparableColumnDefinitions))
- return false;
- return defs.equals(((FakeComparableColumnDefinitions)other).defs);
- }
-
- public String toString()
- {
- return defs.toString();
- }
- }
-
- private static class FakeComparableDefinition implements ResultHandler.ComparableDefinition
- {
- private final Pair<String, String> p;
-
- public FakeComparableDefinition(Pair<String, String> p)
- {
- this.p = p;
- }
- public String getType()
- {
- return p.right;
- }
-
- public String getName()
- {
- return p.left;
- }
-
- public boolean equals(Object other)
- {
- if (!(other instanceof FakeComparableDefinition))
- return false;
- return p.equals(((FakeComparableDefinition)other).p);
- }
-
- public String toString()
- {
- return getName() + ':' + getType();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/bin/cassandra.in.bat
----------------------------------------------------------------------
diff --git a/tools/bin/cassandra.in.bat b/tools/bin/cassandra.in.bat
index 8804921..4d395e8 100644
--- a/tools/bin/cassandra.in.bat
+++ b/tools/bin/cassandra.in.bat
@@ -39,7 +39,7 @@ goto :eof
:okClasspath
REM Include the build\classes\main directory so it works in development
-set CASSANDRA_CLASSPATH=%CLASSPATH%;%CASSANDRA_CONF%;"%CASSANDRA_HOME%\build\classes\main";"%CASSANDRA_HOME%\build\classes\stress"
+set CASSANDRA_CLASSPATH=%CLASSPATH%;%CASSANDRA_CONF%;"%CASSANDRA_HOME%\build\classes\main";"%CASSANDRA_HOME%\build\classes\stress";"%CASSANDRA_HOME%\build\classes\fqltool"
REM Add the default storage location. Can be overridden in conf\cassandra.yaml
set CASSANDRA_PARAMS=%CASSANDRA_PARAMS% "-Dcassandra.storagedir=%CASSANDRA_HOME%\data"
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/bin/cassandra.in.sh
----------------------------------------------------------------------
diff --git a/tools/bin/cassandra.in.sh b/tools/bin/cassandra.in.sh
index 869eb22..bf1ecc4 100644
--- a/tools/bin/cassandra.in.sh
+++ b/tools/bin/cassandra.in.sh
@@ -32,7 +32,7 @@ CLASSPATH="$CASSANDRA_CONF"
if [ -d $CASSANDRA_HOME/build ] ; then
#cassandra_bin="$CASSANDRA_HOME/build/classes/main"
cassandra_bin=`ls -1 $CASSANDRA_HOME/build/apache-cassandra*.jar`
- cassandra_bin="$cassandra_bin:$CASSANDRA_HOME/build/classes/stress"
+ cassandra_bin="$cassandra_bin:$CASSANDRA_HOME/build/classes/stress:$CASSANDRA_HOME/build/classes/fqltool"
CLASSPATH="$CLASSPATH:$cassandra_bin"
fi
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/bin/fqltool
----------------------------------------------------------------------
diff --git a/tools/bin/fqltool b/tools/bin/fqltool
new file mode 100755
index 0000000..a34128e
--- /dev/null
+++ b/tools/bin/fqltool
@@ -0,0 +1,76 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+if [ "x$CASSANDRA_INCLUDE" = "x" ]; then
+ # Locations (in order) to use when searching for an include file.
+ for include in "`dirname "$0"`/cassandra.in.sh" \
+ "$HOME/.cassandra.in.sh" \
+ /usr/share/cassandra/cassandra.in.sh \
+ /usr/local/share/cassandra/cassandra.in.sh \
+ /opt/cassandra/cassandra.in.sh; do
+ if [ -r "$include" ]; then
+ . "$include"
+ break
+ fi
+ done
+elif [ -r "$CASSANDRA_INCLUDE" ]; then
+ . "$CASSANDRA_INCLUDE"
+fi
+
+if [ -z "$CASSANDRA_CONF" -o -z "$CLASSPATH" ]; then
+ echo "You must set the CASSANDRA_CONF and CLASSPATH vars" >&2
+ exit 1
+fi
+
+# Run cassandra-env.sh to pick up JMX_PORT
+if [ -f "$CASSANDRA_CONF/cassandra-env.sh" ]; then
+ JVM_OPTS_SAVE=$JVM_OPTS
+ MAX_HEAP_SIZE_SAVE=$MAX_HEAP_SIZE
+ . "$CASSANDRA_CONF/cassandra-env.sh"
+ MAX_HEAP_SIZE=$MAX_HEAP_SIZE_SAVE
+ JVM_OPTS=$JVM_OPTS_SAVE
+fi
+
+# JMX Port passed via cmd line args (-p 9999 / --port 9999 / --port=9999)
+# should override the value from cassandra-env.sh
+ARGS=""
+JVM_ARGS=""
+while true
+do
+ if [ ! $1 ]; then break; fi
+ case $1 in
+ -D*)
+ JVM_ARGS="$JVM_ARGS $1"
+ ;;
+ *)
+ ARGS="$ARGS $1"
+ ;;
+ esac
+ shift
+done
+
+if [ "x$MAX_HEAP_SIZE" = "x" ]; then
+ MAX_HEAP_SIZE="512m"
+fi
+
+"$JAVA" $JAVA_AGENT -ea -da:net.openhft... -cp "$CLASSPATH" $JVM_OPTS -Xmx$MAX_HEAP_SIZE \
+ -Dlog4j.configurationFile=log4j2-tools.xml \
+ $JVM_ARGS \
+ org.apache.cassandra.fqltool.FullQueryLogTool $ARGS
+
+# vi:ai sw=4 ts=4 tw=0 et
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/bin/fqltool.bat
----------------------------------------------------------------------
diff --git a/tools/bin/fqltool.bat b/tools/bin/fqltool.bat
new file mode 100644
index 0000000..acb6d1c
--- /dev/null
+++ b/tools/bin/fqltool.bat
@@ -0,0 +1,36 @@
+@REM
+@REM Licensed to the Apache Software Foundation (ASF) under one or more
+@REM contributor license agreements. See the NOTICE file distributed with
+@REM this work for additional information regarding copyright ownership.
+@REM The ASF licenses this file to You under the Apache License, Version 2.0
+@REM (the "License"); you may not use this file except in compliance with
+@REM the License. You may obtain a copy of the License at
+@REM
+@REM http://www.apache.org/licenses/LICENSE-2.0
+@REM
+@REM Unless required by applicable law or agreed to in writing, software
+@REM distributed under the License is distributed on an "AS IS" BASIS,
+@REM WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+@REM See the License for the specific language governing permissions and
+@REM limitations under the License.
+
+@echo off
+if "%OS%" == "Windows_NT" setlocal
+
+pushd "%~dp0"
+call cassandra.in.bat
+
+if NOT DEFINED JAVA_HOME goto :err
+
+set CASSANDRA_PARAMS=%CASSANDRA_PARAMS% -Dcassandra.logdir="%CASSANDRA_HOME%\logs"
+
+"%JAVA_HOME%\bin\java" -cp %CASSANDRA_CLASSPATH% %CASSANDRA_PARAMS% -Dlog4j.configurationFile=log4j2-tools.xml org.apache.cassandra.fqltool.FullQueryLogTool %*
+goto finally
+
+:err
+echo The JAVA_HOME environment variable must be set to run this program!
+pause
+
+:finally
+ENDLOCAL & set RC=%ERRORLEVEL%
+exit /B %RC%
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/fqltool/src/org/apache/cassandra/fqltool/DriverResultSet.java
----------------------------------------------------------------------
diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/DriverResultSet.java b/tools/fqltool/src/org/apache/cassandra/fqltool/DriverResultSet.java
new file mode 100644
index 0000000..ccff370
--- /dev/null
+++ b/tools/fqltool/src/org/apache/cassandra/fqltool/DriverResultSet.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.fqltool;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.AbstractIterator;
+
+import com.datastax.driver.core.ColumnDefinitions;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+
+/**
+ * Wraps a result set from the driver so that we can reuse the compare code when reading
+ * up a result set produced by ResultStore.
+ */
+public class DriverResultSet implements ResultHandler.ComparableResultSet
+{
+ private final ResultSet resultSet;
+ private final Throwable failureException;
+
+ public DriverResultSet(ResultSet resultSet)
+ {
+ this(resultSet, null);
+ }
+
+ private DriverResultSet(ResultSet res, Throwable failureException)
+ {
+ resultSet = res;
+ this.failureException = failureException;
+ }
+
+ public static DriverResultSet failed(Throwable ex)
+ {
+ return new DriverResultSet(null, ex);
+ }
+
+ public ResultHandler.ComparableColumnDefinitions getColumnDefinitions()
+ {
+ if (wasFailed())
+ return new DriverColumnDefinitions(null, true, failureException);
+
+ return new DriverColumnDefinitions(resultSet.getColumnDefinitions());
+ }
+
+ public boolean wasFailed()
+ {
+ return failureException != null;
+ }
+
+ public Throwable getFailureException()
+ {
+ return failureException;
+ }
+
+ public Iterator<ResultHandler.ComparableRow> iterator()
+ {
+ if (wasFailed())
+ return Collections.emptyListIterator();
+ return new AbstractIterator<ResultHandler.ComparableRow>()
+ {
+ Iterator<Row> iter = resultSet.iterator();
+ protected ResultHandler.ComparableRow computeNext()
+ {
+ if (iter.hasNext())
+ return new DriverRow(iter.next());
+ return endOfData();
+ }
+ };
+ }
+
+ public static class DriverRow implements ResultHandler.ComparableRow
+ {
+ private final Row row;
+
+ public DriverRow(Row row)
+ {
+ this.row = row;
+ }
+
+ public ResultHandler.ComparableColumnDefinitions getColumnDefinitions()
+ {
+ return new DriverColumnDefinitions(row.getColumnDefinitions());
+ }
+
+ public ByteBuffer getBytesUnsafe(int i)
+ {
+ return row.getBytesUnsafe(i);
+ }
+
+ @Override
+ public boolean equals(Object oo)
+ {
+ if (!(oo instanceof ResultHandler.ComparableRow))
+ return false;
+
+ ResultHandler.ComparableRow o = (ResultHandler.ComparableRow)oo;
+ if (getColumnDefinitions().size() != o.getColumnDefinitions().size())
+ return false;
+
+ for (int j = 0; j < getColumnDefinitions().size(); j++)
+ {
+ ByteBuffer b1 = getBytesUnsafe(j);
+ ByteBuffer b2 = o.getBytesUnsafe(j);
+
+ if (b1 != null && b2 != null && !b1.equals(b2))
+ {
+ return false;
+ }
+ if (b1 == null && b2 != null || b2 == null && b1 != null)
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ List<ResultHandler.ComparableDefinition> colDefs = getColumnDefinitions().asList();
+ for (int i = 0; i < getColumnDefinitions().size(); i++)
+ {
+ ByteBuffer bb = getBytesUnsafe(i);
+ String row = bb != null ? ByteBufferUtil.bytesToHex(bb) : "NULL";
+ sb.append(colDefs.get(i)).append(':').append(row).append(",");
+ }
+ return sb.toString();
+ }
+ }
+
+ public static class DriverColumnDefinitions implements ResultHandler.ComparableColumnDefinitions
+ {
+ private final ColumnDefinitions columnDefinitions;
+ private final boolean failed;
+ private final Throwable failureException;
+
+ public DriverColumnDefinitions(ColumnDefinitions columnDefinitions)
+ {
+ this(columnDefinitions, false, null);
+ }
+
+ private DriverColumnDefinitions(ColumnDefinitions columnDefinitions, boolean failed, Throwable failureException)
+ {
+ this.columnDefinitions = columnDefinitions;
+ this.failed = failed;
+ this.failureException = failureException;
+ }
+
+ public List<ResultHandler.ComparableDefinition> asList()
+ {
+ if (wasFailed())
+ return Collections.emptyList();
+ return columnDefinitions.asList().stream().map(DriverDefinition::new).collect(Collectors.toList());
+ }
+
+ public boolean wasFailed()
+ {
+ return failed;
+ }
+
+ public Throwable getFailureException()
+ {
+ return failureException;
+ }
+
+ public int size()
+ {
+ return columnDefinitions.size();
+ }
+
+ public Iterator<ResultHandler.ComparableDefinition> iterator()
+ {
+ return asList().iterator();
+ }
+
+ public boolean equals(Object oo)
+ {
+ if (!(oo instanceof ResultHandler.ComparableColumnDefinitions))
+ return false;
+
+ ResultHandler.ComparableColumnDefinitions o = (ResultHandler.ComparableColumnDefinitions)oo;
+ if (wasFailed() && o.wasFailed())
+ return true;
+
+ if (size() != o.size())
+ return false;
+
+ return asList().equals(o.asList());
+ }
+ }
+
+ public static class DriverDefinition implements ResultHandler.ComparableDefinition
+ {
+ private final ColumnDefinitions.Definition def;
+
+ public DriverDefinition(ColumnDefinitions.Definition def)
+ {
+ this.def = def;
+ }
+
+ public String getType()
+ {
+ return def.getType().toString();
+ }
+
+ public String getName()
+ {
+ return def.getName();
+ }
+
+ public boolean equals(Object oo)
+ {
+ if (!(oo instanceof ResultHandler.ComparableDefinition))
+ return false;
+
+ return def.equals(((DriverDefinition)oo).def);
+ }
+
+ public String toString()
+ {
+ return getName() + ':' + getType();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQuery.java
----------------------------------------------------------------------
diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQuery.java b/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQuery.java
new file mode 100644
index 0000000..2862e0f
--- /dev/null
+++ b/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQuery.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.fqltool;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import com.google.common.primitives.Longs;
+
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.Statement;
+import org.apache.cassandra.audit.FullQueryLogger;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.binlog.BinLog;
+
+public abstract class FQLQuery implements Comparable<FQLQuery>
+{
+ public final long queryStartTime;
+ public final QueryOptions queryOptions;
+ public final int protocolVersion;
+ public final QueryState queryState;
+
+ public FQLQuery(String keyspace, int protocolVersion, QueryOptions queryOptions, long queryStartTime, long generatedTimestamp, int generatedNowInSeconds)
+ {
+ this.queryStartTime = queryStartTime;
+ this.queryOptions = queryOptions;
+ this.protocolVersion = protocolVersion;
+ this.queryState = queryState(keyspace, generatedTimestamp, generatedNowInSeconds);
+ }
+
+ public abstract Statement toStatement();
+
+ /**
+ * used when storing the queries executed
+ */
+ public abstract BinLog.ReleaseableWriteMarshallable toMarshallable();
+
+ public String keyspace()
+ {
+ return queryState.getClientState().getRawKeyspace();
+ }
+
+ private QueryState queryState(String keyspace, long generatedTimestamp, int generatedNowInSeconds)
+ {
+ ClientState clientState = keyspace != null ? ClientState.forInternalCalls(keyspace) : ClientState.forInternalCalls();
+ return new QueryState(clientState, generatedTimestamp, generatedNowInSeconds);
+ }
+
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (!(o instanceof FQLQuery)) return false;
+ FQLQuery fqlQuery = (FQLQuery) o;
+ return queryStartTime == fqlQuery.queryStartTime &&
+ protocolVersion == fqlQuery.protocolVersion &&
+ queryState.getTimestamp() == fqlQuery.queryState.getTimestamp() &&
+ Objects.equals(queryState.getClientState().getRawKeyspace(), fqlQuery.queryState.getClientState().getRawKeyspace()) &&
+ Objects.equals(queryOptions.getValues(), fqlQuery.queryOptions.getValues());
+ }
+
+ public int hashCode()
+ {
+ return Objects.hash(queryStartTime, queryOptions, protocolVersion, queryState.getClientState().getRawKeyspace());
+ }
+
+ public int compareTo(FQLQuery other)
+ {
+ return Longs.compare(queryStartTime, other.queryStartTime);
+ }
+
+ public String toString()
+ {
+ return "FQLQuery{" +
+ "queryStartTime=" + queryStartTime +
+ ", protocolVersion=" + protocolVersion +
+ ", queryState='" + queryState + '\'' +
+ '}';
+ }
+
+ public static class Single extends FQLQuery
+ {
+ public final String query;
+ public final List<ByteBuffer> values;
+
+ public Single(String keyspace, int protocolVersion, QueryOptions queryOptions, long queryStartTime, long generatedTimestamp, int generatedNowInSeconds, String queryString, List<ByteBuffer> values)
+ {
+ super(keyspace, protocolVersion, queryOptions, queryStartTime, generatedTimestamp, generatedNowInSeconds);
+ this.query = queryString;
+ this.values = values;
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format("%s%nQuery = %s, Values = %s",
+ super.toString(),
+ query,
+ values.stream().map(ByteBufferUtil::bytesToHex).collect(Collectors.joining(",")));
+ }
+
+ public Statement toStatement()
+ {
+ SimpleStatement ss = new SimpleStatement(query, values.toArray());
+ ss.setConsistencyLevel(ConsistencyLevel.valueOf(queryOptions.getConsistency().name()));
+ ss.setDefaultTimestamp(queryOptions.getTimestamp(queryState));
+ return ss;
+ }
+
+ public BinLog.ReleaseableWriteMarshallable toMarshallable()
+ {
+
+ return new FullQueryLogger.Query(query, queryOptions, queryState, queryStartTime);
+ }
+
+ public int compareTo(FQLQuery other)
+ {
+ int cmp = super.compareTo(other);
+
+ if (cmp == 0)
+ {
+ if (other instanceof Batch)
+ return -1;
+
+ Single singleQuery = (Single) other;
+
+ cmp = query.compareTo(singleQuery.query);
+ if (cmp == 0)
+ {
+ if (values.size() != singleQuery.values.size())
+ return values.size() - singleQuery.values.size();
+ for (int i = 0; i < values.size(); i++)
+ {
+ cmp = values.get(i).compareTo(singleQuery.values.get(i));
+ if (cmp != 0)
+ return cmp;
+ }
+ }
+ }
+ return cmp;
+ }
+
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (!(o instanceof Single)) return false;
+ if (!super.equals(o)) return false;
+ Single single = (Single) o;
+ return Objects.equals(query, single.query) &&
+ Objects.equals(values, single.values);
+ }
+
+ public int hashCode()
+ {
+ return Objects.hash(super.hashCode(), query, values);
+ }
+ }
+
+ public static class Batch extends FQLQuery
+ {
+ public final BatchStatement.Type batchType;
+ public final List<Single> queries;
+
+ public Batch(String keyspace, int protocolVersion, QueryOptions queryOptions, long queryStartTime, long generatedTimestamp, int generatedNowInSeconds, BatchStatement.Type batchType, List<String> queries, List<List<ByteBuffer>> values)
+ {
+ super(keyspace, protocolVersion, queryOptions, queryStartTime, generatedTimestamp, generatedNowInSeconds);
+ this.batchType = batchType;
+ this.queries = new ArrayList<>(queries.size());
+ for (int i = 0; i < queries.size(); i++)
+ this.queries.add(new Single(keyspace, protocolVersion, queryOptions, queryStartTime, generatedTimestamp, generatedNowInSeconds, queries.get(i), values.get(i)));
+ }
+
+ public Statement toStatement()
+ {
+ BatchStatement bs = new BatchStatement(batchType);
+ for (Single query : queries)
+ bs.add(query.toStatement());
+ bs.setConsistencyLevel(ConsistencyLevel.valueOf(queryOptions.getConsistency().name()));
+ bs.setDefaultTimestamp(queryOptions.getTimestamp(queryState));
+ return bs;
+ }
+
+ public int compareTo(FQLQuery other)
+ {
+ int cmp = super.compareTo(other);
+
+ if (cmp == 0)
+ {
+ if (other instanceof Single)
+ return 1;
+
+ Batch otherBatch = (Batch) other;
+ if (queries.size() != otherBatch.queries.size())
+ return queries.size() - otherBatch.queries.size();
+ for (int i = 0; i < queries.size(); i++)
+ {
+ cmp = queries.get(i).compareTo(otherBatch.queries.get(i));
+ if (cmp != 0)
+ return cmp;
+ }
+ }
+ return cmp;
+ }
+
+ public BinLog.ReleaseableWriteMarshallable toMarshallable()
+ {
+ List<String> queryStrings = new ArrayList<>();
+ List<List<ByteBuffer>> values = new ArrayList<>();
+ for (Single q : queries)
+ {
+ queryStrings.add(q.query);
+ values.add(q.values);
+ }
+ return new FullQueryLogger.Batch(org.apache.cassandra.cql3.statements.BatchStatement.Type.valueOf(batchType.name()), queryStrings, values, queryOptions, queryState, queryStartTime);
+ }
+
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder(super.toString()).append("\nbatch: ").append(batchType).append('\n');
+ for (Single q : queries)
+ sb.append(q.toString()).append('\n');
+ sb.append("end batch");
+ return sb.toString();
+ }
+
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (!(o instanceof Batch)) return false;
+ if (!super.equals(o)) return false;
+ Batch batch = (Batch) o;
+ return batchType == batch.batchType &&
+ Objects.equals(queries, batch.queries);
+ }
+
+ public int hashCode()
+ {
+ return Objects.hash(super.hashCode(), batchType, queries);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQueryIterator.java
----------------------------------------------------------------------
diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQueryIterator.java b/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQueryIterator.java
new file mode 100644
index 0000000..ccbb200
--- /dev/null
+++ b/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQueryIterator.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.fqltool;
+
+import java.util.PriorityQueue;
+
+import net.openhft.chronicle.queue.ExcerptTailer;
+import org.apache.cassandra.utils.AbstractIterator;
+
+public class FQLQueryIterator extends AbstractIterator<FQLQuery>
+{
+ // use a priority queue to be able to sort the head of the query logs in memory
+ private final PriorityQueue<FQLQuery> pq;
+ private final ExcerptTailer tailer;
+ private final FQLQueryReader reader;
+
+ /**
+ * Create an iterator over the FQLQueries in tailer
+ *
+ * Reads up to readAhead queries in to memory to be able to sort them (the files are mostly sorted already)
+ */
+ public FQLQueryIterator(ExcerptTailer tailer, int readAhead)
+ {
+ assert readAhead > 0 : "readAhead needs to be > 0";
+ reader = new FQLQueryReader();
+ this.tailer = tailer;
+ pq = new PriorityQueue<>(readAhead);
+ for (int i = 0; i < readAhead; i++)
+ {
+ FQLQuery next = readNext();
+ if (next != null)
+ pq.add(next);
+ else
+ break;
+ }
+ }
+
+ protected FQLQuery computeNext()
+ {
+ FQLQuery q = pq.poll();
+ if (q == null)
+ return endOfData();
+ FQLQuery next = readNext();
+ if (next != null)
+ pq.add(next);
+ return q;
+ }
+
+ private FQLQuery readNext()
+ {
+ if (tailer.readDocument(reader))
+ return reader.getQuery();
+ return null;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQueryReader.java
----------------------------------------------------------------------
diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQueryReader.java b/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQueryReader.java
new file mode 100644
index 0000000..fd5073c
--- /dev/null
+++ b/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQueryReader.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.fqltool;
+
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.datastax.driver.core.BatchStatement;
+import io.netty.buffer.Unpooled;
+import net.openhft.chronicle.core.io.IORuntimeException;
+import net.openhft.chronicle.wire.ReadMarshallable;
+import net.openhft.chronicle.wire.ValueIn;
+import net.openhft.chronicle.wire.WireIn;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.transport.ProtocolVersion;
+
+import static org.apache.cassandra.audit.FullQueryLogger.GENERATED_NOW_IN_SECONDS;
+import static org.apache.cassandra.audit.FullQueryLogger.GENERATED_TIMESTAMP;
+import static org.apache.cassandra.audit.FullQueryLogger.KEYSPACE;
+import static org.apache.cassandra.audit.FullQueryLogger.PROTOCOL_VERSION;
+import static org.apache.cassandra.audit.FullQueryLogger.QUERY_OPTIONS;
+import static org.apache.cassandra.audit.FullQueryLogger.QUERY_START_TIME;
+import static org.apache.cassandra.audit.FullQueryLogger.TYPE;
+import static org.apache.cassandra.audit.FullQueryLogger.VERSION;
+import static org.apache.cassandra.audit.FullQueryLogger.BATCH;
+import static org.apache.cassandra.audit.FullQueryLogger.BATCH_TYPE;
+import static org.apache.cassandra.audit.FullQueryLogger.QUERIES;
+import static org.apache.cassandra.audit.FullQueryLogger.QUERY;
+import static org.apache.cassandra.audit.FullQueryLogger.SINGLE_QUERY;
+import static org.apache.cassandra.audit.FullQueryLogger.VALUES;
+
+public class FQLQueryReader implements ReadMarshallable
+{
+ private FQLQuery query;
+
+ public void readMarshallable(WireIn wireIn) throws IORuntimeException
+ {
+ int currentVersion = wireIn.read(VERSION).int16();
+ String type = wireIn.read(TYPE).text();
+ long queryStartTime = wireIn.read(QUERY_START_TIME).int64();
+ int protocolVersion = wireIn.read(PROTOCOL_VERSION).int32();
+ QueryOptions queryOptions = QueryOptions.codec.decode(Unpooled.wrappedBuffer(wireIn.read(QUERY_OPTIONS).bytes()), ProtocolVersion.decode(protocolVersion));
+ long generatedTimestamp = wireIn.read(GENERATED_TIMESTAMP).int64();
+ int generatedNowInSeconds = wireIn.read(GENERATED_NOW_IN_SECONDS).int32();
+ String keyspace = wireIn.read(KEYSPACE).text();
+
+ switch (type)
+ {
+ case SINGLE_QUERY:
+ String queryString = wireIn.read(QUERY).text();
+ query = new FQLQuery.Single(keyspace,
+ protocolVersion,
+ queryOptions,
+ queryStartTime,
+ generatedTimestamp,
+ generatedNowInSeconds,
+ queryString,
+ queryOptions.getValues());
+ break;
+ case BATCH:
+ BatchStatement.Type batchType = BatchStatement.Type.valueOf(wireIn.read(BATCH_TYPE).text());
+ ValueIn in = wireIn.read(QUERIES);
+ int queryCount = in.int32();
+
+ List<String> queries = new ArrayList<>(queryCount);
+ for (int i = 0; i < queryCount; i++)
+ queries.add(in.text());
+ in = wireIn.read(VALUES);
+ int valueCount = in.int32();
+ List<List<ByteBuffer>> values = new ArrayList<>(valueCount);
+ for (int ii = 0; ii < valueCount; ii++)
+ {
+ List<ByteBuffer> subValues = new ArrayList<>();
+ values.add(subValues);
+ int numSubValues = in.int32();
+ for (int zz = 0; zz < numSubValues; zz++)
+ subValues.add(ByteBuffer.wrap(in.bytes()));
+ }
+ query = new FQLQuery.Batch(keyspace,
+ protocolVersion,
+ queryOptions,
+ queryStartTime,
+ generatedTimestamp,
+ generatedNowInSeconds,
+ batchType,
+ queries,
+ values);
+ break;
+ default:
+ throw new RuntimeException("Unknown type: " + type);
+ }
+ }
+
+ public FQLQuery getQuery()
+ {
+ return query;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/fqltool/src/org/apache/cassandra/fqltool/FullQueryLogTool.java
----------------------------------------------------------------------
diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/FullQueryLogTool.java b/tools/fqltool/src/org/apache/cassandra/fqltool/FullQueryLogTool.java
new file mode 100644
index 0000000..97e7487
--- /dev/null
+++ b/tools/fqltool/src/org/apache/cassandra/fqltool/FullQueryLogTool.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.fqltool;
+
+import java.util.List;
+
+import com.google.common.base.Throwables;
+
+import io.airlift.airline.Cli;
+import io.airlift.airline.Help;
+import io.airlift.airline.ParseArgumentsMissingException;
+import io.airlift.airline.ParseArgumentsUnexpectedException;
+import io.airlift.airline.ParseCommandMissingException;
+import io.airlift.airline.ParseCommandUnrecognizedException;
+import io.airlift.airline.ParseOptionConversionException;
+import io.airlift.airline.ParseOptionMissingException;
+import io.airlift.airline.ParseOptionMissingValueException;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.fqltool.commands.Compare;
+import org.apache.cassandra.fqltool.commands.Dump;
+import org.apache.cassandra.fqltool.commands.Replay;
+
+import static com.google.common.base.Throwables.getStackTraceAsString;
+import static com.google.common.collect.Lists.newArrayList;
+
+public class FullQueryLogTool
+{
+ public static void main(String... args)
+ {
+ DatabaseDescriptor.clientInitialization();
+ List<Class<? extends Runnable>> commands = newArrayList(
+ Help.class,
+ Dump.class,
+ Replay.class,
+ Compare.class
+ );
+
+ Cli.CliBuilder<Runnable> builder = Cli.builder("fqltool");
+
+ builder.withDescription("Manipulate the contents of full query log files")
+ .withDefaultCommand(Help.class)
+ .withCommands(commands);
+
+ Cli<Runnable> parser = builder.build();
+
+ int status = 0;
+ try
+ {
+ parser.parse(args).run();
+ } catch (IllegalArgumentException |
+ IllegalStateException |
+ ParseArgumentsMissingException |
+ ParseArgumentsUnexpectedException |
+ ParseOptionConversionException |
+ ParseOptionMissingException |
+ ParseOptionMissingValueException |
+ ParseCommandMissingException |
+ ParseCommandUnrecognizedException e)
+ {
+ badUse(e);
+ status = 1;
+ } catch (Throwable throwable)
+ {
+ err(Throwables.getRootCause(throwable));
+ status = 2;
+ }
+
+ System.exit(status);
+ }
+
+ private static void badUse(Exception e)
+ {
+ System.out.println("fqltool: " + e.getMessage());
+ System.out.println("See 'fqltool help' or 'fqltool help <command>'.");
+ }
+
+ private static void err(Throwable e)
+ {
+ System.err.println("error: " + e.getMessage());
+ System.err.println("-- StackTrace --");
+ System.err.println(getStackTraceAsString(e));
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/fqltool/src/org/apache/cassandra/fqltool/QueryReplayer.java
----------------------------------------------------------------------
diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/QueryReplayer.java b/tools/fqltool/src/org/apache/cassandra/fqltool/QueryReplayer.java
new file mode 100644
index 0000000..d8653e5
--- /dev/null
+++ b/tools/fqltool/src/org/apache/cassandra/fqltool/QueryReplayer.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.fqltool;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+//import javax.annotation.Nullable;
+
+import com.google.common.util.concurrent.FluentFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class QueryReplayer implements Closeable
+{
+ private static final int PRINT_RATE = 5000;
+ private final ExecutorService es = Executors.newFixedThreadPool(1);
+ private final Iterator<List<FQLQuery>> queryIterator;
+ private final List<Cluster> targetClusters;
+ private final List<Predicate<FQLQuery>> filters;
+ private final List<Session> sessions;
+ private final ResultHandler resultHandler;
+ private final MetricRegistry metrics = new MetricRegistry();
+ private final boolean debug;
+ private final PrintStream out;
+
+ public QueryReplayer(Iterator<List<FQLQuery>> queryIterator,
+ List<String> targetHosts,
+ List<File> resultPaths,
+ List<Predicate<FQLQuery>> filters,
+ PrintStream out,
+ String queryFilePathString,
+ boolean debug)
+ {
+ this.queryIterator = queryIterator;
+ targetClusters = targetHosts.stream().map(h -> Cluster.builder().addContactPoint(h).build()).collect(Collectors.toList());
+ this.filters = filters;
+ sessions = targetClusters.stream().map(Cluster::connect).collect(Collectors.toList());
+ File queryFilePath = queryFilePathString != null ? new File(queryFilePathString) : null;
+ resultHandler = new ResultHandler(targetHosts, resultPaths, queryFilePath);
+ this.debug = debug;
+ this.out = out;
+ }
+
+ public void replay()
+ {
+ while (queryIterator.hasNext())
+ {
+ List<FQLQuery> queries = queryIterator.next();
+ for (FQLQuery query : queries)
+ {
+ if (filters.stream().anyMatch(f -> !f.test(query)))
+ continue;
+ try (Timer.Context ctx = metrics.timer("queries").time())
+ {
+ List<ListenableFuture<ResultHandler.ComparableResultSet>> results = new ArrayList<>(sessions.size());
+ Statement statement = query.toStatement();
+ for (Session session : sessions)
+ {
+ maybeSetKeyspace(session, query);
+ if (debug)
+ {
+ out.println("Executing query:");
+ out.println(query);
+ }
+ ListenableFuture<ResultSet> future = session.executeAsync(statement);
+ results.add(handleErrors(future));
+ }
+
+ ListenableFuture<List<ResultHandler.ComparableResultSet>> resultList = Futures.allAsList(results);
+
+ Futures.addCallback(resultList, new FutureCallback<List<ResultHandler.ComparableResultSet>>()
+ {
+ public void onSuccess(/*@Nullable */List<ResultHandler.ComparableResultSet> resultSets)
+ {
+ // note that the order of resultSets is signifcant here - resultSets.get(x) should
+ // be the result from a query against targetHosts.get(x)
+ resultHandler.handleResults(query, resultSets);
+ }
+
+ public void onFailure(Throwable throwable)
+ {
+ throw new AssertionError("Errors should be handled in FQLQuery.execute", throwable);
+ }
+ }, es);
+
+ FBUtilities.waitOnFuture(resultList);
+ }
+ catch (Throwable t)
+ {
+ out.printf("QUERY %s got exception: %s", query, t.getMessage());
+ }
+
+ Timer timer = metrics.timer("queries");
+ if (timer.getCount() % PRINT_RATE == 0)
+ out.printf("%d queries, rate = %.2f%n", timer.getCount(), timer.getOneMinuteRate());
+ }
+ }
+ }
+
+ private void maybeSetKeyspace(Session session, FQLQuery query)
+ {
+ try
+ {
+ if (query.keyspace() != null && !query.keyspace().equals(session.getLoggedKeyspace()))
+ {
+ if (debug)
+ out.printf("Switching keyspace from %s to %s%n", session.getLoggedKeyspace(), query.keyspace());
+ session.execute("USE " + query.keyspace());
+ }
+ }
+ catch (Throwable t)
+ {
+ out.printf("USE %s failed: %s%n", query.keyspace(), t.getMessage());
+ }
+ }
+
+ /**
+ * Make sure we catch any query errors
+ *
+ * On error, this creates a failed ComparableResultSet with the exception set to be able to store
+ * this fact in the result file and handle comparison of failed result sets.
+ */
+ private static ListenableFuture<ResultHandler.ComparableResultSet> handleErrors(ListenableFuture<ResultSet> result)
+ {
+ FluentFuture<ResultHandler.ComparableResultSet> fluentFuture = FluentFuture.from(result)
+ .transform(DriverResultSet::new, MoreExecutors.directExecutor());
+ return fluentFuture.catching(Throwable.class, DriverResultSet::failed, MoreExecutors.directExecutor());
+ }
+
+ public void close() throws IOException
+ {
+ sessions.forEach(Session::close);
+ targetClusters.forEach(Cluster::close);
+ resultHandler.close();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org