You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2018/09/01 08:44:18 UTC
[2/7] cassandra git commit: Add fqltool replay
http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffb772/src/java/org/apache/cassandra/tools/fqltool/commands/Replay.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/fqltool/commands/Replay.java b/src/java/org/apache/cassandra/tools/fqltool/commands/Replay.java
new file mode 100644
index 0000000..043ead8
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/fqltool/commands/Replay.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.fqltool.commands;
+
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import io.airlift.airline.Arguments;
+import io.airlift.airline.Command;
+import io.airlift.airline.Option;
+import net.openhft.chronicle.core.io.Closeable;
+import net.openhft.chronicle.queue.ChronicleQueue;
+import net.openhft.chronicle.queue.ChronicleQueueBuilder;
+
+import org.apache.cassandra.tools.fqltool.FQLQuery;
+import org.apache.cassandra.tools.fqltool.FQLQueryIterator;
+import org.apache.cassandra.tools.fqltool.QueryReplayer;
+import org.apache.cassandra.utils.AbstractIterator;
+import org.apache.cassandra.utils.MergeIterator;
+
+/**
+ * replay the contents of a list of paths containing full query logs
+ */
+@Command(name = "replay", description = "Replay full query logs")
+public class Replay implements Runnable
+{
+ @Arguments(usage = "<path1> [<path2>...<pathN>]", description = "Paths containing the full query logs to replay.", required = true)
+ private List<String> arguments = new ArrayList<>();
+
+ @Option(title = "target", name = {"--target"}, description = "Hosts to replay the logs to, can be repeated to replay to more hosts.")
+ private List<String> targetHosts;
+
+ @Option(title = "results", name = { "--results"}, description = "Where to store the results of the queries, this should be a directory. Leave this option out to avoid storing results.")
+ private String resultPath;
+
+ @Option(title = "keyspace", name = { "--keyspace"}, description = "Only replay queries against this keyspace and queries without keyspace set.")
+ private String keyspace;
+
+ @Option(title = "debug", name = {"--debug"}, description = "Debug mode, print all queries executed.")
+ private boolean debug;
+
+ @Option(title = "store_queries", name = {"--store-queries"}, description = "Path to store the queries executed. Stores queries in the same order as the result sets are in the result files. Requires --results")
+ private String queryStorePath;
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ List<File> resultPaths = null;
+ if (resultPath != null)
+ {
+ File basePath = new File(resultPath);
+ if (!basePath.exists() || !basePath.isDirectory())
+ {
+ System.err.println("The results path (" + basePath + ") should be an existing directory");
+ System.exit(1);
+ }
+ resultPaths = targetHosts.stream().map(target -> new File(basePath, target)).collect(Collectors.toList());
+ resultPaths.forEach(File::mkdir);
+ }
+ if (targetHosts.size() < 1)
+ {
+ System.err.println("You need to state at least one --target host to replay the query against");
+ System.exit(1);
+ }
+ replay(keyspace, arguments, targetHosts, resultPaths, queryStorePath, debug);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static void replay(String keyspace, List<String> arguments, List<String> targetHosts, List<File> resultPaths, String queryStorePath, boolean debug)
+ {
+ int readAhead = 200; // how many fql queries should we read in to memory to be able to sort them?
+ List<ChronicleQueue> readQueues = null;
+ List<FQLQueryIterator> iterators = null;
+ List<Predicate<FQLQuery>> filters = new ArrayList<>();
+
+ if (keyspace != null)
+ filters.add(fqlQuery -> fqlQuery.keyspace == null || fqlQuery.keyspace.equals(keyspace));
+
+ try
+ {
+ readQueues = arguments.stream().map(s -> ChronicleQueueBuilder.single(s).readOnly(true).build()).collect(Collectors.toList());
+ iterators = readQueues.stream().map(ChronicleQueue::createTailer).map(tailer -> new FQLQueryIterator(tailer, readAhead)).collect(Collectors.toList());
+ try (MergeIterator<FQLQuery, List<FQLQuery>> iter = MergeIterator.get(iterators, FQLQuery::compareTo, new Reducer());
+ QueryReplayer replayer = new QueryReplayer(iter, targetHosts, resultPaths, filters, System.out, queryStorePath, debug))
+ {
+ replayer.replay();
+ }
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ finally
+ {
+ if (iterators != null)
+ iterators.forEach(AbstractIterator::close);
+ if (readQueues != null)
+ readQueues.forEach(Closeable::close);
+ }
+ }
+
+ @VisibleForTesting
+ public static class Reducer extends MergeIterator.Reducer<FQLQuery, List<FQLQuery>>
+ {
+ List<FQLQuery> queries = new ArrayList<>();
+ public void reduce(int idx, FQLQuery current)
+ {
+ queries.add(current);
+ }
+
+ protected List<FQLQuery> getReduced()
+ {
+ return queries;
+ }
+ protected void onKeyChange()
+ {
+ queries.clear();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffb772/test/unit/org/apache/cassandra/tools/fqltool/FQLReplayTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/tools/fqltool/FQLReplayTest.java b/test/unit/org/apache/cassandra/tools/fqltool/FQLReplayTest.java
new file mode 100644
index 0000000..a662699
--- /dev/null
+++ b/test/unit/org/apache/cassandra/tools/fqltool/FQLReplayTest.java
@@ -0,0 +1,760 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.fqltool;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import net.openhft.chronicle.queue.ChronicleQueue;
+import net.openhft.chronicle.queue.ChronicleQueueBuilder;
+import net.openhft.chronicle.queue.ExcerptAppender;
+import net.openhft.chronicle.queue.ExcerptTailer;
+import net.openhft.chronicle.wire.ValueIn;
+import org.apache.cassandra.audit.FullQueryLogger;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.statements.BatchStatement;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.tools.Util;
+import org.apache.cassandra.tools.fqltool.commands.Replay;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.MergeIterator;
+import org.apache.cassandra.utils.Pair;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class FQLReplayTest
+{
+ public FQLReplayTest()
+ {
+ Util.initDatabaseDescriptor();
+ }
+
+ @Test
+ public void testOrderedReplay() throws IOException
+ {
+ File f = generateQueries(100, true);
+ int queryCount = 0;
+ try (ChronicleQueue queue = ChronicleQueueBuilder.single(f).build();
+ FQLQueryIterator iter = new FQLQueryIterator(queue.createTailer(), 101))
+ {
+ long last = -1;
+ while (iter.hasNext())
+ {
+ FQLQuery q = iter.next();
+ assertTrue(q.queryStartTime >= last);
+ last = q.queryStartTime;
+ queryCount++;
+ }
+ }
+ assertEquals(100, queryCount);
+ }
+ @Test
+ public void testMergingIterator() throws IOException
+ {
+ File f = generateQueries(100, false);
+ File f2 = generateQueries(100, false);
+ int queryCount = 0;
+ try (ChronicleQueue queue = ChronicleQueueBuilder.single(f).build();
+ ChronicleQueue queue2 = ChronicleQueueBuilder.single(f2).build();
+ FQLQueryIterator iter = new FQLQueryIterator(queue.createTailer(), 101);
+ FQLQueryIterator iter2 = new FQLQueryIterator(queue2.createTailer(), 101);
+ MergeIterator<FQLQuery, List<FQLQuery>> merger = MergeIterator.get(Lists.newArrayList(iter, iter2), FQLQuery::compareTo, new Replay.Reducer()))
+ {
+ long last = -1;
+
+ while (merger.hasNext())
+ {
+ List<FQLQuery> qs = merger.next();
+ assertEquals(2, qs.size());
+ assertEquals(0, qs.get(0).compareTo(qs.get(1)));
+ assertTrue(qs.get(0).queryStartTime >= last);
+ last = qs.get(0).queryStartTime;
+ queryCount++;
+ }
+ }
+ assertEquals(100, queryCount);
+ }
+
+ @Test
+ public void testFQLQueryReader() throws IOException
+ {
+ FQLQueryReader reader = new FQLQueryReader();
+
+ try (ChronicleQueue queue = ChronicleQueueBuilder.single(generateQueries(1000, true)).build())
+ {
+ ExcerptTailer tailer = queue.createTailer();
+ int queryCount = 0;
+ while (tailer.readDocument(reader))
+ {
+ assertNotNull(reader.getQuery());
+ if (reader.getQuery() instanceof FQLQuery.Single)
+ {
+ assertTrue(reader.getQuery().keyspace == null || reader.getQuery().keyspace.equals("querykeyspace"));
+ }
+ else
+ {
+ assertEquals("someks", reader.getQuery().keyspace);
+ }
+ queryCount++;
+ }
+ assertEquals(1000, queryCount);
+ }
+ }
+
+ @Test
+ public void testStoringResults() throws Throwable
+ {
+ File tmpDir = Files.createTempDirectory("results").toFile();
+ File queryDir = Files.createTempDirectory("queries").toFile();
+
+ ResultHandler.ComparableResultSet res = createResultSet(10, 10, true);
+ ResultStore rs = new ResultStore(Collections.singletonList(tmpDir), queryDir);
+ try
+ {
+ FQLQuery query = new FQLQuery.Single("abc", 3, QueryOptions.DEFAULT, 12345, 11111, 22, "select * from abc", Collections.emptyList());
+ rs.storeColumnDefinitions(query, Collections.singletonList(res.getColumnDefinitions()));
+ Iterator<ResultHandler.ComparableRow> it = res.iterator();
+ while (it.hasNext())
+ {
+ List<ResultHandler.ComparableRow> row = Collections.singletonList(it.next());
+ rs.storeRows(row);
+ }
+ // this marks the end of the result set:
+ rs.storeRows(Collections.singletonList(null));
+ }
+ finally
+ {
+ rs.close();
+ }
+
+ List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> resultSets = readResultFile(tmpDir, queryDir);
+ assertEquals(1, resultSets.size());
+ assertEquals(res, resultSets.get(0).right);
+
+ }
+
+ @Test
+ public void testCompareColumnDefinitions()
+ {
+ ResultHandler.ComparableResultSet res = createResultSet(10, 10, false);
+ ResultComparator rc = new ResultComparator();
+
+ List<ResultHandler.ComparableColumnDefinitions> colDefs = new ArrayList<>(100);
+ List<String> targetHosts = new ArrayList<>(100);
+ for (int i = 0; i < 100; i++)
+ {
+ targetHosts.add("host"+i);
+ colDefs.add(res.getColumnDefinitions());
+ }
+ assertTrue(rc.compareColumnDefinitions(targetHosts, null, colDefs));
+ colDefs.set(50, createResultSet(9, 9, false).getColumnDefinitions());
+ assertFalse(rc.compareColumnDefinitions(targetHosts, null, colDefs));
+ }
+
+ @Test
+ public void testCompareEqualRows()
+ {
+ ResultComparator rc = new ResultComparator();
+
+ ResultHandler.ComparableResultSet res = createResultSet(10, 10, false);
+ ResultHandler.ComparableResultSet res2 = createResultSet(10, 10, false);
+ List<ResultHandler.ComparableResultSet> toCompare = Lists.newArrayList(res, res2);
+ List<Iterator<ResultHandler.ComparableRow>> iters = toCompare.stream().map(Iterable::iterator).collect(Collectors.toList());
+
+ while (true)
+ {
+ List<ResultHandler.ComparableRow> rows = ResultHandler.rows(iters);
+ assertTrue(rc.compareRows(Lists.newArrayList("eq1", "eq2"), null, rows));
+ if (rows.stream().allMatch(Objects::isNull))
+ break;
+ }
+ }
+
+ @Test
+ public void testCompareRowsDifferentCount()
+ {
+ ResultComparator rc = new ResultComparator();
+ ResultHandler.ComparableResultSet res = createResultSet(10, 10, false);
+ ResultHandler.ComparableResultSet res2 = createResultSet(10, 10, false);
+ List<ResultHandler.ComparableResultSet> toCompare = Lists.newArrayList(res, res2, createResultSet(10, 11, false));
+ List<Iterator<ResultHandler.ComparableRow>> iters = toCompare.stream().map(Iterable::iterator).collect(Collectors.toList());
+ boolean foundMismatch = false;
+ while (true)
+ {
+ List<ResultHandler.ComparableRow> rows = ResultHandler.rows(iters);
+ if (rows.stream().allMatch(Objects::isNull))
+ break;
+ if (!rc.compareRows(Lists.newArrayList("eq1", "eq2", "diff"), null, rows))
+ {
+ foundMismatch = true;
+ }
+ }
+ assertTrue(foundMismatch);
+ }
+
+ @Test
+ public void testCompareRowsDifferentContent()
+ {
+ ResultComparator rc = new ResultComparator();
+ ResultHandler.ComparableResultSet res = createResultSet(10, 10, false);
+ ResultHandler.ComparableResultSet res2 = createResultSet(10, 10, false);
+ List<ResultHandler.ComparableResultSet> toCompare = Lists.newArrayList(res, res2, createResultSet(10, 10, true));
+ List<Iterator<ResultHandler.ComparableRow>> iters = toCompare.stream().map(Iterable::iterator).collect(Collectors.toList());
+ while (true)
+ {
+ List<ResultHandler.ComparableRow> rows = ResultHandler.rows(iters);
+ if (rows.stream().allMatch(Objects::isNull))
+ break;
+ assertFalse(rows.toString(), rc.compareRows(Lists.newArrayList("eq1", "eq2", "diff"), null, rows));
+ }
+ }
+
+ @Test
+ public void testCompareRowsDifferentColumnCount()
+ {
+ ResultComparator rc = new ResultComparator();
+ ResultHandler.ComparableResultSet res = createResultSet(10, 10, false);
+ ResultHandler.ComparableResultSet res2 = createResultSet(10, 10, false);
+ List<ResultHandler.ComparableResultSet> toCompare = Lists.newArrayList(res, res2, createResultSet(11, 10, false));
+ List<Iterator<ResultHandler.ComparableRow>> iters = toCompare.stream().map(Iterable::iterator).collect(Collectors.toList());
+ while (true)
+ {
+ List<ResultHandler.ComparableRow> rows = ResultHandler.rows(iters);
+ if (rows.stream().allMatch(Objects::isNull))
+ break;
+ assertFalse(rows.toString(), rc.compareRows(Lists.newArrayList("eq1", "eq2", "diff"), null, rows));
+ }
+ }
+
+ @Test
+ public void testResultHandler() throws IOException
+ {
+ List<String> targetHosts = Lists.newArrayList("hosta", "hostb", "hostc");
+ File tmpDir = Files.createTempDirectory("testresulthandler").toFile();
+ File queryDir = Files.createTempDirectory("queries").toFile();
+ List<File> resultPaths = new ArrayList<>();
+ targetHosts.forEach(host -> { File f = new File(tmpDir, host); f.mkdir(); resultPaths.add(f);});
+ ResultHandler rh = new ResultHandler(targetHosts, resultPaths, queryDir);
+ ResultHandler.ComparableResultSet res = createResultSet(10, 10, false);
+ ResultHandler.ComparableResultSet res2 = createResultSet(10, 10, false);
+ ResultHandler.ComparableResultSet res3 = createResultSet(10, 10, false);
+ List<ResultHandler.ComparableResultSet> toCompare = Lists.newArrayList(res, res2, res3);
+ rh.handleResults(new FQLQuery.Single("abcabc", 3, QueryOptions.DEFAULT, 1111, 2222, 3333, "select * from xyz", Collections.emptyList()), toCompare);
+ List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> results1 = readResultFile(resultPaths.get(0), queryDir);
+ List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> results2 = readResultFile(resultPaths.get(1), queryDir);
+ List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> results3 = readResultFile(resultPaths.get(2), queryDir);
+ assertEquals(results1, results2);
+ assertEquals(results1, results3);
+ assertEquals(Iterables.getOnlyElement(results3).right, res);
+ }
+
+ @Test
+ public void testResultHandlerWithDifference() throws IOException
+ {
+ List<String> targetHosts = Lists.newArrayList("hosta", "hostb", "hostc");
+ File tmpDir = Files.createTempDirectory("testresulthandler").toFile();
+ File queryDir = Files.createTempDirectory("queries").toFile();
+ List<File> resultPaths = new ArrayList<>();
+ targetHosts.forEach(host -> { File f = new File(tmpDir, host); f.mkdir(); resultPaths.add(f);});
+ ResultHandler rh = new ResultHandler(targetHosts, resultPaths, queryDir);
+ ResultHandler.ComparableResultSet res = createResultSet(10, 10, false);
+ ResultHandler.ComparableResultSet res2 = createResultSet(10, 5, false);
+ ResultHandler.ComparableResultSet res3 = createResultSet(10, 10, false);
+ List<ResultHandler.ComparableResultSet> toCompare = Lists.newArrayList(res, res2, res3);
+ rh.handleResults(new FQLQuery.Single("aaa", 3, QueryOptions.DEFAULT, 123123, 11111, 22222, "select * from abcabc", Collections.emptyList()), toCompare);
+ List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> results1 = readResultFile(resultPaths.get(0), queryDir);
+ List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> results2 = readResultFile(resultPaths.get(1), queryDir);
+ List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> results3 = readResultFile(resultPaths.get(2), queryDir);
+ assertEquals(results1, results3);
+ assertEquals(results2.get(0).right, res2);
+ }
+
+ @Test
+ public void testResultHandlerMultipleResultSets() throws IOException
+ {
+ List<String> targetHosts = Lists.newArrayList("hosta", "hostb", "hostc");
+ File tmpDir = Files.createTempDirectory("testresulthandler").toFile();
+ File queryDir = Files.createTempDirectory("queries").toFile();
+ List<File> resultPaths = new ArrayList<>();
+ targetHosts.forEach(host -> { File f = new File(tmpDir, host); f.mkdir(); resultPaths.add(f);});
+ ResultHandler rh = new ResultHandler(targetHosts, resultPaths, queryDir);
+ List<Pair<FQLQuery, List<ResultHandler.ComparableResultSet>>> resultSets = new ArrayList<>();
+ Random random = new Random();
+ for (int i = 0; i < 10; i++)
+ {
+ List<ResultHandler.ComparableResultSet> results = new ArrayList<>();
+ List<ByteBuffer> values = Collections.singletonList(ByteBufferUtil.bytes(i * 50));
+ for (int jj = 0; jj < targetHosts.size(); jj++)
+ {
+ results.add(createResultSet(5, 1 + random.nextInt(10), true));
+ }
+ FQLQuery q = new FQLQuery.Single("abc"+i,
+ 3,
+ QueryOptions.forInternalCalls(values),
+ i * 1000,
+ 12345,
+ 54321,
+ "select * from xyz where id = "+i,
+ values);
+ resultSets.add(Pair.create(q, results));
+ }
+ for (int i = 0; i < resultSets.size(); i++)
+ rh.handleResults(resultSets.get(i).left, resultSets.get(i).right);
+
+ for (int i = 0; i < targetHosts.size(); i++)
+ compareWithFile(resultPaths, queryDir, resultSets, i);
+ }
+
+ @Test
+ public void testResultHandlerFailedQuery() throws IOException
+ {
+ List<String> targetHosts = Lists.newArrayList("hosta", "hostb", "hostc", "hostd");
+ File tmpDir = Files.createTempDirectory("testresulthandler").toFile();
+ File queryDir = Files.createTempDirectory("queries").toFile();
+ List<File> resultPaths = new ArrayList<>();
+ targetHosts.forEach(host -> { File f = new File(tmpDir, host); f.mkdir(); resultPaths.add(f);});
+ ResultHandler rh = new ResultHandler(targetHosts, resultPaths, queryDir);
+ List<Pair<FQLQuery, List<ResultHandler.ComparableResultSet>>> resultSets = new ArrayList<>();
+ Random random = new Random();
+ for (int i = 0; i < 10; i++)
+ {
+ List<ResultHandler.ComparableResultSet> results = new ArrayList<>();
+ List<ByteBuffer> values = Collections.singletonList(ByteBufferUtil.bytes(i * 50));
+ for (int jj = 0; jj < targetHosts.size(); jj++)
+ {
+ results.add(createResultSet(5, 1 + random.nextInt(10), true));
+ }
+ results.set(0, FakeResultSet.failed(new RuntimeException("testing abc")));
+ results.set(3, FakeResultSet.failed(new RuntimeException("testing abc")));
+ FQLQuery q = new FQLQuery.Single("abc"+i,
+ 3,
+ QueryOptions.forInternalCalls(values),
+ i * 1000,
+ i * 12345,
+ i * 54321,
+ "select * from xyz where id = "+i,
+ values);
+ resultSets.add(Pair.create(q, results));
+ }
+ for (int i = 0; i < resultSets.size(); i++)
+ rh.handleResults(resultSets.get(i).left, resultSets.get(i).right);
+
+ for (int i = 0; i < targetHosts.size(); i++)
+ compareWithFile(resultPaths, queryDir, resultSets, i);
+ }
+
+
+ @Test
+ public void testCompare()
+ {
+ FQLQuery q1 = new FQLQuery.Single("abc", 0, QueryOptions.DEFAULT, 123, 111, 222, "aaaa", Collections.emptyList());
+ FQLQuery q2 = new FQLQuery.Single("abc", 0, QueryOptions.DEFAULT, 123, 111, 222,"aaaa", Collections.emptyList());
+
+ assertEquals(0, q1.compareTo(q2));
+ assertEquals(0, q2.compareTo(q1));
+
+ FQLQuery q3 = new FQLQuery.Batch("abc", 0, QueryOptions.DEFAULT, 123, 111, 222, com.datastax.driver.core.BatchStatement.Type.UNLOGGED, Collections.emptyList(), Collections.emptyList());
+ // single queries before batch queries
+ assertTrue(q1.compareTo(q3) < 0);
+ assertTrue(q3.compareTo(q1) > 0);
+
+ // check that smaller query time
+ FQLQuery q4 = new FQLQuery.Single("abc", 0, QueryOptions.DEFAULT, 124, 111, 222, "aaaa", Collections.emptyList());
+ assertTrue(q1.compareTo(q4) < 0);
+ assertTrue(q4.compareTo(q1) > 0);
+
+ FQLQuery q5 = new FQLQuery.Batch("abc", 0, QueryOptions.DEFAULT, 124, 111, 222, com.datastax.driver.core.BatchStatement.Type.UNLOGGED, Collections.emptyList(), Collections.emptyList());
+ assertTrue(q1.compareTo(q5) < 0);
+ assertTrue(q5.compareTo(q1) > 0);
+
+ FQLQuery q6 = new FQLQuery.Single("abc", 0, QueryOptions.DEFAULT, 123, 111, 222, "aaaa", Collections.singletonList(ByteBufferUtil.bytes(10)));
+ FQLQuery q7 = new FQLQuery.Single("abc", 0, QueryOptions.DEFAULT, 123, 111, 222, "aaaa", Collections.emptyList());
+ assertTrue(q6.compareTo(q7) > 0);
+ assertTrue(q7.compareTo(q6) < 0);
+
+ FQLQuery q8 = new FQLQuery.Single("abc", 0, QueryOptions.DEFAULT, 123, 111, 222, "aaaa", Collections.singletonList(ByteBufferUtil.bytes("a")));
+ FQLQuery q9 = new FQLQuery.Single("abc", 0, QueryOptions.DEFAULT, 123, 111, 222, "aaaa", Collections.singletonList(ByteBufferUtil.bytes("b")));
+ assertTrue(q8.compareTo(q9) < 0);
+ assertTrue(q9.compareTo(q8) > 0);
+ }
+
+ private File generateQueries(int count, boolean random) throws IOException
+ {
+ Random r = new Random();
+ File dir = Files.createTempDirectory("chronicle").toFile();
+ try (ChronicleQueue readQueue = ChronicleQueueBuilder.single(dir).build())
+ {
+ ExcerptAppender appender = readQueue.acquireAppender();
+
+ for (int i = 0; i < count; i++)
+ {
+ long timestamp = random ? Math.abs(r.nextLong() % 10000) : i;
+ if (random ? r.nextBoolean() : i % 2 == 0)
+ {
+ String query = "abcdefghijklm " + i;
+ QueryState qs = r.nextBoolean() ? queryState() : queryState("querykeyspace");
+ FullQueryLogger.Query q = new FullQueryLogger.Query(query, QueryOptions.DEFAULT, qs, timestamp);
+ appender.writeDocument(q);
+ q.release();
+ }
+ else
+ {
+ int batchSize = random ? r.nextInt(99) + 1 : i + 1;
+ List<String> queries = new ArrayList<>(batchSize);
+ List<List<ByteBuffer>> values = new ArrayList<>(batchSize);
+ for (int jj = 0; jj < (random ? r.nextInt(batchSize) : 10); jj++)
+ {
+ queries.add("aaaaaa batch "+i+":"+jj);
+ values.add(Collections.emptyList());
+ }
+ FullQueryLogger.Batch batch = new FullQueryLogger.Batch(BatchStatement.Type.UNLOGGED,
+ queries,
+ values,
+ QueryOptions.DEFAULT,
+ queryState("someks"),
+ timestamp);
+ appender.writeDocument(batch);
+ batch.release();
+ }
+ }
+ }
+ return dir;
+ }
+
+ private QueryState queryState()
+ {
+ return QueryState.forInternalCalls();
+ }
+
+ private QueryState queryState(String keyspace)
+ {
+ ClientState clientState = ClientState.forInternalCalls(keyspace);
+ return new QueryState(clientState);
+ }
+
+ private static ResultHandler.ComparableResultSet createResultSet(int columnCount, int rowCount, boolean random)
+ {
+ List<Pair<String, String>> columnDefs = new ArrayList<>(columnCount);
+ Random r = new Random();
+ for (int i = 0; i < columnCount; i++)
+ {
+ columnDefs.add(Pair.create("a" + i, "int"));
+ }
+ List<List<String>> rows = new ArrayList<>();
+ for (int i = 0; i < rowCount; i++)
+ {
+ List<String> row = new ArrayList<>(columnCount);
+ for (int jj = 0; jj < columnCount; jj++)
+ row.add(i + " col " + jj + (random ? r.nextInt() : ""));
+ rows.add(row);
+ }
+ return new FakeResultSet(columnDefs, rows);
+ }
+
+ private static void compareWithFile(List<File> dirs, File resultDir, List<Pair<FQLQuery, List<ResultHandler.ComparableResultSet>>> resultSets, int idx)
+ {
+ List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> results1 = readResultFile(dirs.get(idx), resultDir);
+ for (int i = 0; i < results1.size(); i++)
+ {
+ assertEquals(results1.get(i).left, resultSets.get(i).left);
+ assertEquals(results1.get(i).right, resultSets.get(i).right.get(idx));
+ }
+ }
+
+ private static List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> readResultFile(File dir, File queryDir)
+ {
+ List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> resultSets = new ArrayList<>();
+ try (ChronicleQueue q = ChronicleQueueBuilder.single(dir).build();
+ ChronicleQueue queryQ = ChronicleQueueBuilder.single(queryDir).build())
+ {
+ ExcerptTailer tailer = q.createTailer();
+ ExcerptTailer queryTailer = queryQ.createTailer();
+ List<Pair<String, String>> columnDefinitions = new ArrayList<>();
+ List<List<String>> rowColumns = new ArrayList<>();
+ AtomicBoolean allRowsRead = new AtomicBoolean(false);
+ AtomicBoolean failedQuery = new AtomicBoolean(false);
+ while (tailer.readDocument(wire -> {
+ String type = wire.read("type").text();
+ if (type.equals("column_definitions"))
+ {
+ int columnCount = wire.read("column_count").int32();
+ for (int i = 0; i < columnCount; i++)
+ {
+ ValueIn vi = wire.read("column_definition");
+ String name = vi.text();
+ String dataType = vi.text();
+ columnDefinitions.add(Pair.create(name, dataType));
+ }
+ }
+ else if (type.equals("row"))
+ {
+ int rowColumnCount = wire.read("row_column_count").int32();
+ List<String> r = new ArrayList<>(rowColumnCount);
+ for (int i = 0; i < rowColumnCount; i++)
+ {
+ byte[] b = wire.read("column").bytes();
+ r.add(new String(b));
+ }
+ rowColumns.add(r);
+ }
+ else if (type.equals("end_resultset"))
+ {
+ allRowsRead.set(true);
+ }
+ else if (type.equals("query_failed"))
+ {
+ failedQuery.set(true);
+ }
+ }))
+ {
+ if (allRowsRead.get())
+ {
+ FQLQueryReader reader = new FQLQueryReader();
+ queryTailer.readDocument(reader);
+ resultSets.add(Pair.create(reader.getQuery(), failedQuery.get() ? FakeResultSet.failed(new RuntimeException("failure"))
+ : new FakeResultSet(ImmutableList.copyOf(columnDefinitions), ImmutableList.copyOf(rowColumns))));
+ allRowsRead.set(false);
+ failedQuery.set(false);
+ columnDefinitions.clear();
+ rowColumns.clear();
+ }
+ }
+ }
+ return resultSets;
+ }
+
+ private static class FakeResultSet implements ResultHandler.ComparableResultSet
+ {
+ private final List<Pair<String, String>> cdStrings;
+ private final List<List<String>> rows;
+ private final Throwable ex;
+
+ public FakeResultSet(List<Pair<String, String>> cdStrings, List<List<String>> rows)
+ {
+ this(cdStrings, rows, null);
+ }
+
+ public FakeResultSet(List<Pair<String, String>> cdStrings, List<List<String>> rows, Throwable ex)
+ {
+ this.cdStrings = cdStrings;
+ this.rows = rows;
+ this.ex = ex;
+ }
+
+ public static FakeResultSet failed(Throwable ex)
+ {
+ return new FakeResultSet(null, null, ex);
+ }
+
+ public ResultHandler.ComparableColumnDefinitions getColumnDefinitions()
+ {
+ return new FakeComparableColumnDefinitions(cdStrings, wasFailed());
+ }
+
+ public boolean wasFailed()
+ {
+ return getFailureException() != null;
+ }
+
+ public Throwable getFailureException()
+ {
+ return ex;
+ }
+
+ public Iterator<ResultHandler.ComparableRow> iterator()
+ {
+ if (wasFailed())
+ return Collections.emptyListIterator();
+ return new AbstractIterator<ResultHandler.ComparableRow>()
+ {
+ Iterator<List<String>> iter = rows.iterator();
+ protected ResultHandler.ComparableRow computeNext()
+ {
+ if (iter.hasNext())
+ return new FakeComparableRow(iter.next(), cdStrings);
+ return endOfData();
+ }
+ };
+ }
+
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (!(o instanceof FakeResultSet)) return false;
+ FakeResultSet that = (FakeResultSet) o;
+ if (wasFailed() && that.wasFailed())
+ return true;
+ return Objects.equals(cdStrings, that.cdStrings) &&
+ Objects.equals(rows, that.rows);
+ }
+
+ public int hashCode()
+ {
+ return Objects.hash(cdStrings, rows);
+ }
+
+ public String toString()
+ {
+ return "FakeResultSet{" +
+ "cdStrings=" + cdStrings +
+ ", rows=" + rows +
+ '}';
+ }
+ }
+
+ private static class FakeComparableRow implements ResultHandler.ComparableRow
+ {
+ private final List<String> row;
+ private final List<Pair<String, String>> cds;
+
+ public FakeComparableRow(List<String> row, List<Pair<String,String>> cds)
+ {
+ this.row = row;
+ this.cds = cds;
+ }
+
+ public ByteBuffer getBytesUnsafe(int i)
+ {
+ return ByteBufferUtil.bytes(row.get(i));
+ }
+
+ public ResultHandler.ComparableColumnDefinitions getColumnDefinitions()
+ {
+ return new FakeComparableColumnDefinitions(cds, false);
+ }
+
+ public boolean equals(Object other)
+ {
+ if (!(other instanceof FakeComparableRow))
+ return false;
+ return row.equals(((FakeComparableRow)other).row);
+ }
+
+ public String toString()
+ {
+ return row.toString();
+ }
+ }
+
+ private static class FakeComparableColumnDefinitions implements ResultHandler.ComparableColumnDefinitions
+ {
+ private final List<ResultHandler.ComparableDefinition> defs;
+ private final boolean failed;
+ public FakeComparableColumnDefinitions(List<Pair<String, String>> cds, boolean failed)
+ {
+ defs = cds != null ? cds.stream().map(FakeComparableDefinition::new).collect(Collectors.toList()) : null;
+ this.failed = failed;
+ }
+
+ public List<ResultHandler.ComparableDefinition> asList()
+ {
+ if (wasFailed())
+ return Collections.emptyList();
+ return defs;
+ }
+
+ public boolean wasFailed()
+ {
+ return failed;
+ }
+
+ public int size()
+ {
+ return defs.size();
+ }
+
+ public Iterator<ResultHandler.ComparableDefinition> iterator()
+ {
+ if (wasFailed())
+ return Collections.emptyListIterator();
+ return new AbstractIterator<ResultHandler.ComparableDefinition>()
+ {
+ Iterator<ResultHandler.ComparableDefinition> iter = defs.iterator();
+ protected ResultHandler.ComparableDefinition computeNext()
+ {
+ if (iter.hasNext())
+ return iter.next();
+ return endOfData();
+ }
+ };
+ }
+ public boolean equals(Object other)
+ {
+ if (!(other instanceof FakeComparableColumnDefinitions))
+ return false;
+ return defs.equals(((FakeComparableColumnDefinitions)other).defs);
+ }
+
+ public String toString()
+ {
+ return defs.toString();
+ }
+ }
+
+ private static class FakeComparableDefinition implements ResultHandler.ComparableDefinition
+ {
+ private final Pair<String, String> p;
+
+ public FakeComparableDefinition(Pair<String, String> p)
+ {
+ this.p = p;
+ }
+ public String getType()
+ {
+ return p.right;
+ }
+
+ public String getName()
+ {
+ return p.left;
+ }
+
+ public boolean equals(Object other)
+ {
+ if (!(other instanceof FakeComparableDefinition))
+ return false;
+ return p.equals(((FakeComparableDefinition)other).p);
+ }
+
+ public String toString()
+ {
+ return getName() + ':' + getType();
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org