You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2018/09/01 08:44:18 UTC

[2/7] cassandra git commit: Add fqltool replay

http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffb772/src/java/org/apache/cassandra/tools/fqltool/commands/Replay.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/fqltool/commands/Replay.java b/src/java/org/apache/cassandra/tools/fqltool/commands/Replay.java
new file mode 100644
index 0000000..043ead8
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/fqltool/commands/Replay.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.fqltool.commands;
+
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import io.airlift.airline.Arguments;
+import io.airlift.airline.Command;
+import io.airlift.airline.Option;
+import net.openhft.chronicle.core.io.Closeable;
+import net.openhft.chronicle.queue.ChronicleQueue;
+import net.openhft.chronicle.queue.ChronicleQueueBuilder;
+
+import org.apache.cassandra.tools.fqltool.FQLQuery;
+import org.apache.cassandra.tools.fqltool.FQLQueryIterator;
+import org.apache.cassandra.tools.fqltool.QueryReplayer;
+import org.apache.cassandra.utils.AbstractIterator;
+import org.apache.cassandra.utils.MergeIterator;
+
+/**
+ * replay the contents of a list of paths containing full query logs
+ */
+@Command(name = "replay", description = "Replay full query logs")
+public class Replay implements Runnable
+{
+    @Arguments(usage = "<path1> [<path2>...<pathN>]", description = "Paths containing the full query logs to replay.", required = true)
+    private List<String> arguments = new ArrayList<>();
+
+    @Option(title = "target", name = {"--target"}, description = "Hosts to replay the logs to, can be repeated to replay to more hosts.")
+    private List<String> targetHosts;
+
+    @Option(title = "results", name = { "--results"}, description = "Where to store the results of the queries, this should be a directory. Leave this option out to avoid storing results.")
+    private String resultPath;
+
+    @Option(title = "keyspace", name = { "--keyspace"}, description = "Only replay queries against this keyspace and queries without keyspace set.")
+    private String keyspace;
+
+    @Option(title = "debug", name = {"--debug"}, description = "Debug mode, print all queries executed.")
+    private boolean debug;
+
+    @Option(title = "store_queries", name = {"--store-queries"}, description = "Path to store the queries executed. Stores queries in the same order as the result sets are in the result files. Requires --results")
+    private String queryStorePath;
+
+    @Override
+    public void run()
+    {
+        try
+        {
+            List<File> resultPaths = null;
+            if (resultPath != null)
+            {
+                File basePath = new File(resultPath);
+                if (!basePath.exists() || !basePath.isDirectory())
+                {
+                    System.err.println("The results path (" + basePath + ") should be an existing directory");
+                    System.exit(1);
+                }
+                resultPaths = targetHosts.stream().map(target -> new File(basePath, target)).collect(Collectors.toList());
+                resultPaths.forEach(File::mkdir);
+            }
+            if (targetHosts.size() < 1)
+            {
+                System.err.println("You need to state at least one --target host to replay the query against");
+                System.exit(1);
+            }
+            replay(keyspace, arguments, targetHosts, resultPaths, queryStorePath, debug);
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static void replay(String keyspace, List<String> arguments, List<String> targetHosts, List<File> resultPaths, String queryStorePath, boolean debug)
+    {
+        int readAhead = 200; // how many fql queries should we read in to memory to be able to sort them?
+        List<ChronicleQueue> readQueues = null;
+        List<FQLQueryIterator> iterators = null;
+        List<Predicate<FQLQuery>> filters = new ArrayList<>();
+
+        if (keyspace != null)
+            filters.add(fqlQuery -> fqlQuery.keyspace == null || fqlQuery.keyspace.equals(keyspace));
+
+        try
+        {
+            readQueues = arguments.stream().map(s -> ChronicleQueueBuilder.single(s).readOnly(true).build()).collect(Collectors.toList());
+            iterators = readQueues.stream().map(ChronicleQueue::createTailer).map(tailer -> new FQLQueryIterator(tailer, readAhead)).collect(Collectors.toList());
+            try (MergeIterator<FQLQuery, List<FQLQuery>> iter = MergeIterator.get(iterators, FQLQuery::compareTo, new Reducer());
+                 QueryReplayer replayer = new QueryReplayer(iter, targetHosts, resultPaths, filters, System.out, queryStorePath, debug))
+            {
+                replayer.replay();
+            }
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+        finally
+        {
+            if (iterators != null)
+                iterators.forEach(AbstractIterator::close);
+            if (readQueues != null)
+                readQueues.forEach(Closeable::close);
+        }
+    }
+
+    @VisibleForTesting
+    public static class Reducer extends MergeIterator.Reducer<FQLQuery, List<FQLQuery>>
+    {
+        List<FQLQuery> queries = new ArrayList<>();
+        public void reduce(int idx, FQLQuery current)
+        {
+            queries.add(current);
+        }
+
+        protected List<FQLQuery> getReduced()
+        {
+            return queries;
+        }
+        protected void onKeyChange()
+        {
+            queries.clear();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffb772/test/unit/org/apache/cassandra/tools/fqltool/FQLReplayTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/tools/fqltool/FQLReplayTest.java b/test/unit/org/apache/cassandra/tools/fqltool/FQLReplayTest.java
new file mode 100644
index 0000000..a662699
--- /dev/null
+++ b/test/unit/org/apache/cassandra/tools/fqltool/FQLReplayTest.java
@@ -0,0 +1,760 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.fqltool;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import net.openhft.chronicle.queue.ChronicleQueue;
+import net.openhft.chronicle.queue.ChronicleQueueBuilder;
+import net.openhft.chronicle.queue.ExcerptAppender;
+import net.openhft.chronicle.queue.ExcerptTailer;
+import net.openhft.chronicle.wire.ValueIn;
+import org.apache.cassandra.audit.FullQueryLogger;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.statements.BatchStatement;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.tools.Util;
+import org.apache.cassandra.tools.fqltool.commands.Replay;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.MergeIterator;
+import org.apache.cassandra.utils.Pair;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class FQLReplayTest
+{
+    public FQLReplayTest()
+    {
+        Util.initDatabaseDescriptor();
+    }
+
+    @Test
+    public void testOrderedReplay() throws IOException
+    {
+        File f = generateQueries(100, true);
+        int queryCount = 0;
+        try (ChronicleQueue queue = ChronicleQueueBuilder.single(f).build();
+             FQLQueryIterator iter = new FQLQueryIterator(queue.createTailer(), 101))
+        {
+            long last = -1;
+            while (iter.hasNext())
+            {
+                FQLQuery q = iter.next();
+                assertTrue(q.queryStartTime >= last);
+                last = q.queryStartTime;
+                queryCount++;
+            }
+        }
+        assertEquals(100, queryCount);
+    }
+    @Test
+    public void testMergingIterator() throws IOException
+    {
+        File f = generateQueries(100, false);
+        File f2 = generateQueries(100, false);
+        int queryCount = 0;
+        try (ChronicleQueue queue = ChronicleQueueBuilder.single(f).build();
+             ChronicleQueue queue2 = ChronicleQueueBuilder.single(f2).build();
+             FQLQueryIterator iter = new FQLQueryIterator(queue.createTailer(), 101);
+             FQLQueryIterator iter2 = new FQLQueryIterator(queue2.createTailer(), 101);
+             MergeIterator<FQLQuery, List<FQLQuery>> merger = MergeIterator.get(Lists.newArrayList(iter, iter2), FQLQuery::compareTo, new Replay.Reducer()))
+        {
+            long last = -1;
+
+            while (merger.hasNext())
+            {
+                List<FQLQuery> qs = merger.next();
+                assertEquals(2, qs.size());
+                assertEquals(0, qs.get(0).compareTo(qs.get(1)));
+                assertTrue(qs.get(0).queryStartTime >= last);
+                last = qs.get(0).queryStartTime;
+                queryCount++;
+            }
+        }
+        assertEquals(100, queryCount);
+    }
+
+    @Test
+    public void testFQLQueryReader() throws IOException
+    {
+        FQLQueryReader reader = new FQLQueryReader();
+
+        try (ChronicleQueue queue = ChronicleQueueBuilder.single(generateQueries(1000, true)).build())
+        {
+            ExcerptTailer tailer = queue.createTailer();
+            int queryCount = 0;
+            while (tailer.readDocument(reader))
+            {
+                assertNotNull(reader.getQuery());
+                if (reader.getQuery() instanceof FQLQuery.Single)
+                {
+                    assertTrue(reader.getQuery().keyspace == null || reader.getQuery().keyspace.equals("querykeyspace"));
+                }
+                else
+                {
+                    assertEquals("someks", reader.getQuery().keyspace);
+                }
+                queryCount++;
+            }
+            assertEquals(1000, queryCount);
+        }
+    }
+
+    @Test
+    public void testStoringResults() throws Throwable
+    {
+        File tmpDir = Files.createTempDirectory("results").toFile();
+        File queryDir = Files.createTempDirectory("queries").toFile();
+
+        ResultHandler.ComparableResultSet res = createResultSet(10, 10, true);
+        ResultStore rs = new ResultStore(Collections.singletonList(tmpDir), queryDir);
+        try
+        {
+            FQLQuery query = new FQLQuery.Single("abc", 3, QueryOptions.DEFAULT, 12345, 11111, 22, "select * from abc", Collections.emptyList());
+            rs.storeColumnDefinitions(query, Collections.singletonList(res.getColumnDefinitions()));
+            Iterator<ResultHandler.ComparableRow> it = res.iterator();
+            while (it.hasNext())
+            {
+                List<ResultHandler.ComparableRow> row = Collections.singletonList(it.next());
+                rs.storeRows(row);
+            }
+            // this marks the end of the result set:
+            rs.storeRows(Collections.singletonList(null));
+        }
+        finally
+        {
+            rs.close();
+        }
+
+        List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> resultSets = readResultFile(tmpDir, queryDir);
+        assertEquals(1, resultSets.size());
+        assertEquals(res, resultSets.get(0).right);
+
+    }
+
+    @Test
+    public void testCompareColumnDefinitions()
+    {
+        ResultHandler.ComparableResultSet res = createResultSet(10, 10, false);
+        ResultComparator rc = new ResultComparator();
+
+        List<ResultHandler.ComparableColumnDefinitions> colDefs = new ArrayList<>(100);
+        List<String> targetHosts = new ArrayList<>(100);
+        for (int i = 0; i < 100; i++)
+        {
+            targetHosts.add("host"+i);
+            colDefs.add(res.getColumnDefinitions());
+        }
+        assertTrue(rc.compareColumnDefinitions(targetHosts, null, colDefs));
+        colDefs.set(50, createResultSet(9, 9, false).getColumnDefinitions());
+        assertFalse(rc.compareColumnDefinitions(targetHosts, null, colDefs));
+    }
+
+    @Test
+    public void testCompareEqualRows()
+    {
+        ResultComparator rc = new ResultComparator();
+
+        ResultHandler.ComparableResultSet res = createResultSet(10, 10, false);
+        ResultHandler.ComparableResultSet res2 = createResultSet(10, 10, false);
+        List<ResultHandler.ComparableResultSet> toCompare = Lists.newArrayList(res, res2);
+        List<Iterator<ResultHandler.ComparableRow>> iters = toCompare.stream().map(Iterable::iterator).collect(Collectors.toList());
+
+        while (true)
+        {
+            List<ResultHandler.ComparableRow> rows = ResultHandler.rows(iters);
+            assertTrue(rc.compareRows(Lists.newArrayList("eq1", "eq2"), null, rows));
+            if (rows.stream().allMatch(Objects::isNull))
+                break;
+        }
+    }
+
+    @Test
+    public void testCompareRowsDifferentCount()
+    {
+        ResultComparator rc = new ResultComparator();
+        ResultHandler.ComparableResultSet res = createResultSet(10, 10, false);
+        ResultHandler.ComparableResultSet res2 = createResultSet(10, 10, false);
+        List<ResultHandler.ComparableResultSet> toCompare = Lists.newArrayList(res, res2, createResultSet(10, 11, false));
+        List<Iterator<ResultHandler.ComparableRow>> iters = toCompare.stream().map(Iterable::iterator).collect(Collectors.toList());
+        boolean foundMismatch = false;
+        while (true)
+        {
+            List<ResultHandler.ComparableRow> rows = ResultHandler.rows(iters);
+            if (rows.stream().allMatch(Objects::isNull))
+                break;
+            if (!rc.compareRows(Lists.newArrayList("eq1", "eq2", "diff"), null, rows))
+            {
+                foundMismatch = true;
+            }
+        }
+        assertTrue(foundMismatch);
+    }
+
+    @Test
+    public void testCompareRowsDifferentContent()
+    {
+        ResultComparator rc = new ResultComparator();
+        ResultHandler.ComparableResultSet res = createResultSet(10, 10, false);
+        ResultHandler.ComparableResultSet res2 = createResultSet(10, 10, false);
+        List<ResultHandler.ComparableResultSet> toCompare = Lists.newArrayList(res, res2, createResultSet(10, 10, true));
+        List<Iterator<ResultHandler.ComparableRow>> iters = toCompare.stream().map(Iterable::iterator).collect(Collectors.toList());
+        while (true)
+        {
+            List<ResultHandler.ComparableRow> rows = ResultHandler.rows(iters);
+            if (rows.stream().allMatch(Objects::isNull))
+                break;
+            assertFalse(rows.toString(), rc.compareRows(Lists.newArrayList("eq1", "eq2", "diff"), null, rows));
+        }
+    }
+
+    @Test
+    public void testCompareRowsDifferentColumnCount()
+    {
+        ResultComparator rc = new ResultComparator();
+        ResultHandler.ComparableResultSet res = createResultSet(10, 10, false);
+        ResultHandler.ComparableResultSet res2 = createResultSet(10, 10, false);
+        List<ResultHandler.ComparableResultSet> toCompare = Lists.newArrayList(res, res2, createResultSet(11, 10, false));
+        List<Iterator<ResultHandler.ComparableRow>> iters = toCompare.stream().map(Iterable::iterator).collect(Collectors.toList());
+        while (true)
+        {
+            List<ResultHandler.ComparableRow> rows = ResultHandler.rows(iters);
+            if (rows.stream().allMatch(Objects::isNull))
+                break;
+            assertFalse(rows.toString(), rc.compareRows(Lists.newArrayList("eq1", "eq2", "diff"), null, rows));
+        }
+    }
+
+    @Test
+    public void testResultHandler() throws IOException
+    {
+        List<String> targetHosts = Lists.newArrayList("hosta", "hostb", "hostc");
+        File tmpDir = Files.createTempDirectory("testresulthandler").toFile();
+        File queryDir = Files.createTempDirectory("queries").toFile();
+        List<File> resultPaths = new ArrayList<>();
+        targetHosts.forEach(host -> { File f = new File(tmpDir, host); f.mkdir(); resultPaths.add(f);});
+        ResultHandler rh = new ResultHandler(targetHosts, resultPaths, queryDir);
+        ResultHandler.ComparableResultSet res = createResultSet(10, 10, false);
+        ResultHandler.ComparableResultSet res2 = createResultSet(10, 10, false);
+        ResultHandler.ComparableResultSet res3 = createResultSet(10, 10, false);
+        List<ResultHandler.ComparableResultSet> toCompare = Lists.newArrayList(res, res2, res3);
+        rh.handleResults(new FQLQuery.Single("abcabc", 3, QueryOptions.DEFAULT, 1111, 2222, 3333, "select * from xyz", Collections.emptyList()), toCompare);
+        List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> results1 = readResultFile(resultPaths.get(0), queryDir);
+        List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> results2 = readResultFile(resultPaths.get(1), queryDir);
+        List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> results3 = readResultFile(resultPaths.get(2), queryDir);
+        assertEquals(results1, results2);
+        assertEquals(results1, results3);
+        assertEquals(Iterables.getOnlyElement(results3).right, res);
+    }
+
+    @Test
+    public void testResultHandlerWithDifference() throws IOException
+    {
+        List<String> targetHosts = Lists.newArrayList("hosta", "hostb", "hostc");
+        File tmpDir = Files.createTempDirectory("testresulthandler").toFile();
+        File queryDir = Files.createTempDirectory("queries").toFile();
+        List<File> resultPaths = new ArrayList<>();
+        targetHosts.forEach(host -> { File f = new File(tmpDir, host); f.mkdir(); resultPaths.add(f);});
+        ResultHandler rh = new ResultHandler(targetHosts, resultPaths, queryDir);
+        ResultHandler.ComparableResultSet res = createResultSet(10, 10, false);
+        ResultHandler.ComparableResultSet res2 = createResultSet(10, 5, false);
+        ResultHandler.ComparableResultSet res3 = createResultSet(10, 10, false);
+        List<ResultHandler.ComparableResultSet> toCompare = Lists.newArrayList(res, res2, res3);
+        rh.handleResults(new FQLQuery.Single("aaa", 3, QueryOptions.DEFAULT, 123123, 11111, 22222, "select * from abcabc", Collections.emptyList()), toCompare);
+        List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> results1 = readResultFile(resultPaths.get(0), queryDir);
+        List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> results2 = readResultFile(resultPaths.get(1), queryDir);
+        List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> results3 = readResultFile(resultPaths.get(2), queryDir);
+        assertEquals(results1, results3);
+        assertEquals(results2.get(0).right, res2);
+    }
+
+    @Test
+    public void testResultHandlerMultipleResultSets() throws IOException
+    {
+        List<String> targetHosts = Lists.newArrayList("hosta", "hostb", "hostc");
+        File tmpDir = Files.createTempDirectory("testresulthandler").toFile();
+        File queryDir = Files.createTempDirectory("queries").toFile();
+        List<File> resultPaths = new ArrayList<>();
+        targetHosts.forEach(host -> { File f = new File(tmpDir, host); f.mkdir(); resultPaths.add(f);});
+        ResultHandler rh = new ResultHandler(targetHosts, resultPaths, queryDir);
+        List<Pair<FQLQuery, List<ResultHandler.ComparableResultSet>>> resultSets = new ArrayList<>();
+        Random random = new Random();
+        for (int i = 0; i < 10; i++)
+        {
+            List<ResultHandler.ComparableResultSet> results = new ArrayList<>();
+            List<ByteBuffer> values = Collections.singletonList(ByteBufferUtil.bytes(i * 50));
+            for (int jj = 0; jj < targetHosts.size(); jj++)
+            {
+                results.add(createResultSet(5, 1 + random.nextInt(10), true));
+            }
+            FQLQuery q = new FQLQuery.Single("abc"+i,
+                                             3,
+                                             QueryOptions.forInternalCalls(values),
+                                             i * 1000,
+                                             12345,
+                                             54321,
+                                             "select * from xyz where id = "+i,
+                                             values);
+            resultSets.add(Pair.create(q, results));
+        }
+        for (int i = 0; i < resultSets.size(); i++)
+            rh.handleResults(resultSets.get(i).left, resultSets.get(i).right);
+
+        for (int i = 0; i < targetHosts.size(); i++)
+            compareWithFile(resultPaths, queryDir, resultSets, i);
+    }
+
+    @Test
+    public void testResultHandlerFailedQuery() throws IOException
+    {
+        List<String> targetHosts = Lists.newArrayList("hosta", "hostb", "hostc", "hostd");
+        File tmpDir = Files.createTempDirectory("testresulthandler").toFile();
+        File queryDir = Files.createTempDirectory("queries").toFile();
+        List<File> resultPaths = new ArrayList<>();
+        targetHosts.forEach(host -> { File f = new File(tmpDir, host); f.mkdir(); resultPaths.add(f);});
+        ResultHandler rh = new ResultHandler(targetHosts, resultPaths, queryDir);
+        List<Pair<FQLQuery, List<ResultHandler.ComparableResultSet>>> resultSets = new ArrayList<>();
+        Random random = new Random();
+        for (int i = 0; i < 10; i++)
+        {
+            List<ResultHandler.ComparableResultSet> results = new ArrayList<>();
+            List<ByteBuffer> values = Collections.singletonList(ByteBufferUtil.bytes(i * 50));
+            for (int jj = 0; jj < targetHosts.size(); jj++)
+            {
+                results.add(createResultSet(5, 1 + random.nextInt(10), true));
+            }
+            results.set(0, FakeResultSet.failed(new RuntimeException("testing abc")));
+            results.set(3, FakeResultSet.failed(new RuntimeException("testing abc")));
+            FQLQuery q = new FQLQuery.Single("abc"+i,
+                                             3,
+                                             QueryOptions.forInternalCalls(values),
+                                             i * 1000,
+                                             i * 12345,
+                                             i * 54321,
+                                             "select * from xyz where id = "+i,
+                                             values);
+            resultSets.add(Pair.create(q, results));
+        }
+        for (int i = 0; i < resultSets.size(); i++)
+            rh.handleResults(resultSets.get(i).left, resultSets.get(i).right);
+
+        for (int i = 0; i < targetHosts.size(); i++)
+            compareWithFile(resultPaths, queryDir, resultSets, i);
+    }
+
+
+    @Test
+    public void testCompare()
+    {
+        FQLQuery q1 = new FQLQuery.Single("abc", 0, QueryOptions.DEFAULT, 123, 111, 222, "aaaa", Collections.emptyList());
+        FQLQuery q2 = new FQLQuery.Single("abc", 0, QueryOptions.DEFAULT, 123, 111, 222,"aaaa", Collections.emptyList());
+
+        assertEquals(0, q1.compareTo(q2));
+        assertEquals(0, q2.compareTo(q1));
+
+        FQLQuery q3 = new FQLQuery.Batch("abc", 0, QueryOptions.DEFAULT, 123, 111, 222, com.datastax.driver.core.BatchStatement.Type.UNLOGGED, Collections.emptyList(), Collections.emptyList());
+        // single queries before batch queries
+        assertTrue(q1.compareTo(q3) < 0);
+        assertTrue(q3.compareTo(q1) > 0);
+
+        // check that smaller query time
+        FQLQuery q4 = new FQLQuery.Single("abc", 0, QueryOptions.DEFAULT, 124, 111, 222, "aaaa", Collections.emptyList());
+        assertTrue(q1.compareTo(q4) < 0);
+        assertTrue(q4.compareTo(q1) > 0);
+
+        FQLQuery q5 = new FQLQuery.Batch("abc", 0, QueryOptions.DEFAULT, 124, 111, 222, com.datastax.driver.core.BatchStatement.Type.UNLOGGED, Collections.emptyList(), Collections.emptyList());
+        assertTrue(q1.compareTo(q5) < 0);
+        assertTrue(q5.compareTo(q1) > 0);
+
+        FQLQuery q6 = new FQLQuery.Single("abc", 0, QueryOptions.DEFAULT, 123, 111, 222, "aaaa", Collections.singletonList(ByteBufferUtil.bytes(10)));
+        FQLQuery q7 = new FQLQuery.Single("abc", 0, QueryOptions.DEFAULT, 123, 111, 222, "aaaa", Collections.emptyList());
+        assertTrue(q6.compareTo(q7) > 0);
+        assertTrue(q7.compareTo(q6) < 0);
+
+        FQLQuery q8 = new FQLQuery.Single("abc", 0, QueryOptions.DEFAULT, 123, 111, 222, "aaaa", Collections.singletonList(ByteBufferUtil.bytes("a")));
+        FQLQuery q9 = new FQLQuery.Single("abc", 0, QueryOptions.DEFAULT, 123, 111, 222, "aaaa", Collections.singletonList(ByteBufferUtil.bytes("b")));
+        assertTrue(q8.compareTo(q9) < 0);
+        assertTrue(q9.compareTo(q8) > 0);
+    }
+
+    private File generateQueries(int count, boolean random) throws IOException
+    {
+        Random r = new Random();
+        File dir = Files.createTempDirectory("chronicle").toFile();
+        try (ChronicleQueue readQueue = ChronicleQueueBuilder.single(dir).build())
+        {
+            ExcerptAppender appender = readQueue.acquireAppender();
+
+            for (int i = 0; i < count; i++)
+            {
+                long timestamp = random ? Math.abs(r.nextLong() % 10000) : i;
+                if (random ? r.nextBoolean() : i % 2 == 0)
+                {
+                    String query = "abcdefghijklm " + i;
+                    QueryState qs = r.nextBoolean() ? queryState() : queryState("querykeyspace");
+                    FullQueryLogger.Query  q = new FullQueryLogger.Query(query, QueryOptions.DEFAULT, qs, timestamp);
+                    appender.writeDocument(q);
+                    q.release();
+                }
+                else
+                {
+                    int batchSize = random ? r.nextInt(99) + 1 : i + 1;
+                    List<String> queries = new ArrayList<>(batchSize);
+                    List<List<ByteBuffer>> values = new ArrayList<>(batchSize);
+                    for (int jj = 0; jj < (random ? r.nextInt(batchSize) : 10); jj++)
+                    {
+                        queries.add("aaaaaa batch "+i+":"+jj);
+                        values.add(Collections.emptyList());
+                    }
+                    FullQueryLogger.Batch batch = new FullQueryLogger.Batch(BatchStatement.Type.UNLOGGED,
+                                                                            queries,
+                                                                            values,
+                                                                            QueryOptions.DEFAULT,
+                                                                            queryState("someks"),
+                                                                            timestamp);
+                    appender.writeDocument(batch);
+                    batch.release();
+                }
+            }
+        }
+        return dir;
+    }
+
+    private QueryState queryState()
+    {
+        return QueryState.forInternalCalls();
+    }
+
+    private QueryState queryState(String keyspace)
+    {
+        ClientState clientState = ClientState.forInternalCalls(keyspace);
+        return new QueryState(clientState);
+    }
+
+    private static ResultHandler.ComparableResultSet createResultSet(int columnCount, int rowCount, boolean random)
+    {
+        List<Pair<String, String>> columnDefs = new ArrayList<>(columnCount);
+        Random r = new Random();
+        for (int i = 0; i < columnCount; i++)
+        {
+            columnDefs.add(Pair.create("a" + i, "int"));
+        }
+        List<List<String>> rows = new ArrayList<>();
+        for (int i = 0; i < rowCount; i++)
+        {
+            List<String> row = new ArrayList<>(columnCount);
+            for (int jj = 0; jj < columnCount; jj++)
+                row.add(i + " col " + jj + (random ? r.nextInt() : ""));
+            rows.add(row);
+        }
+        return new FakeResultSet(columnDefs, rows);
+    }
+
+    private static void compareWithFile(List<File> dirs, File resultDir, List<Pair<FQLQuery, List<ResultHandler.ComparableResultSet>>> resultSets, int idx)
+    {
+        List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> results1 = readResultFile(dirs.get(idx), resultDir);
+        for (int i = 0; i < results1.size(); i++)
+        {
+            assertEquals(results1.get(i).left, resultSets.get(i).left);
+            assertEquals(results1.get(i).right, resultSets.get(i).right.get(idx));
+        }
+    }
+
+    private static List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> readResultFile(File dir, File queryDir)
+    {
+        List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> resultSets = new ArrayList<>();
+        try (ChronicleQueue q = ChronicleQueueBuilder.single(dir).build();
+             ChronicleQueue queryQ = ChronicleQueueBuilder.single(queryDir).build())
+        {
+            ExcerptTailer tailer = q.createTailer();
+            ExcerptTailer queryTailer = queryQ.createTailer();
+            List<Pair<String, String>> columnDefinitions = new ArrayList<>();
+            List<List<String>> rowColumns = new ArrayList<>();
+            AtomicBoolean allRowsRead = new AtomicBoolean(false);
+            AtomicBoolean failedQuery = new AtomicBoolean(false);
+            while (tailer.readDocument(wire -> {
+                String type = wire.read("type").text();
+                if (type.equals("column_definitions"))
+                {
+                    int columnCount = wire.read("column_count").int32();
+                    for (int i = 0; i < columnCount; i++)
+                    {
+                        ValueIn vi = wire.read("column_definition");
+                        String name = vi.text();
+                        String dataType = vi.text();
+                        columnDefinitions.add(Pair.create(name, dataType));
+                    }
+                }
+                else if (type.equals("row"))
+                {
+                    int rowColumnCount = wire.read("row_column_count").int32();
+                    List<String> r = new ArrayList<>(rowColumnCount);
+                    for (int i = 0; i < rowColumnCount; i++)
+                    {
+                        byte[] b = wire.read("column").bytes();
+                        r.add(new String(b));
+                    }
+                    rowColumns.add(r);
+                }
+                else if (type.equals("end_resultset"))
+                {
+                    allRowsRead.set(true);
+                }
+                else if (type.equals("query_failed"))
+                {
+                    failedQuery.set(true);
+                }
+            }))
+            {
+                if (allRowsRead.get())
+                {
+                    FQLQueryReader reader = new FQLQueryReader();
+                    queryTailer.readDocument(reader);
+                    resultSets.add(Pair.create(reader.getQuery(), failedQuery.get() ? FakeResultSet.failed(new RuntimeException("failure"))
+                                                                                    : new FakeResultSet(ImmutableList.copyOf(columnDefinitions), ImmutableList.copyOf(rowColumns))));
+                    allRowsRead.set(false);
+                    failedQuery.set(false);
+                    columnDefinitions.clear();
+                    rowColumns.clear();
+                }
+            }
+        }
+        return resultSets;
+    }
+
+    private static class FakeResultSet implements ResultHandler.ComparableResultSet
+    {
+        private final List<Pair<String, String>> cdStrings;
+        private final List<List<String>> rows;
+        private final Throwable ex;
+
+        public FakeResultSet(List<Pair<String, String>> cdStrings, List<List<String>> rows)
+        {
+            this(cdStrings, rows, null);
+        }
+
+        public FakeResultSet(List<Pair<String, String>> cdStrings, List<List<String>> rows, Throwable ex)
+        {
+            this.cdStrings = cdStrings;
+            this.rows = rows;
+            this.ex = ex;
+        }
+
+        public static FakeResultSet failed(Throwable ex)
+        {
+            return new FakeResultSet(null, null, ex);
+        }
+
+        public ResultHandler.ComparableColumnDefinitions getColumnDefinitions()
+        {
+            return new FakeComparableColumnDefinitions(cdStrings, wasFailed());
+        }
+
+        public boolean wasFailed()
+        {
+            return getFailureException() != null;
+        }
+
+        public Throwable getFailureException()
+        {
+            return ex;
+        }
+
+        public Iterator<ResultHandler.ComparableRow> iterator()
+        {
+            if (wasFailed())
+                return Collections.emptyListIterator();
+            return new AbstractIterator<ResultHandler.ComparableRow>()
+            {
+                Iterator<List<String>> iter = rows.iterator();
+                protected ResultHandler.ComparableRow computeNext()
+                {
+                    if (iter.hasNext())
+                        return new FakeComparableRow(iter.next(), cdStrings);
+                    return endOfData();
+                }
+            };
+        }
+
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (!(o instanceof FakeResultSet)) return false;
+            FakeResultSet that = (FakeResultSet) o;
+            if (wasFailed() && that.wasFailed())
+                return true;
+            return Objects.equals(cdStrings, that.cdStrings) &&
+                   Objects.equals(rows, that.rows);
+        }
+
+        public int hashCode()
+        {
+            return Objects.hash(cdStrings, rows);
+        }
+
+        public String toString()
+        {
+            return "FakeResultSet{" +
+                   "cdStrings=" + cdStrings +
+                   ", rows=" + rows +
+                   '}';
+        }
+    }
+
+    private static class FakeComparableRow implements ResultHandler.ComparableRow
+    {
+        private final List<String> row;
+        private final List<Pair<String, String>> cds;
+
+        public FakeComparableRow(List<String> row, List<Pair<String,String>> cds)
+        {
+            this.row = row;
+            this.cds = cds;
+        }
+
+        public ByteBuffer getBytesUnsafe(int i)
+        {
+            return ByteBufferUtil.bytes(row.get(i));
+        }
+
+        public ResultHandler.ComparableColumnDefinitions getColumnDefinitions()
+        {
+            return new FakeComparableColumnDefinitions(cds, false);
+        }
+
+        public boolean equals(Object other)
+        {
+            if (!(other instanceof FakeComparableRow))
+                return false;
+            return row.equals(((FakeComparableRow)other).row);
+        }
+
+        public String toString()
+        {
+            return row.toString();
+        }
+    }
+
+    private static class FakeComparableColumnDefinitions implements ResultHandler.ComparableColumnDefinitions
+    {
+        private final List<ResultHandler.ComparableDefinition> defs;
+        private final boolean failed;
+        public FakeComparableColumnDefinitions(List<Pair<String, String>> cds, boolean failed)
+        {
+            defs = cds != null ? cds.stream().map(FakeComparableDefinition::new).collect(Collectors.toList()) : null;
+            this.failed = failed;
+        }
+
+        public List<ResultHandler.ComparableDefinition> asList()
+        {
+            if (wasFailed())
+                return Collections.emptyList();
+            return defs;
+        }
+
+        public boolean wasFailed()
+        {
+            return failed;
+        }
+
+        public int size()
+        {
+            return defs.size();
+        }
+
+        public Iterator<ResultHandler.ComparableDefinition> iterator()
+        {
+            if (wasFailed())
+                return Collections.emptyListIterator();
+            return new AbstractIterator<ResultHandler.ComparableDefinition>()
+            {
+                Iterator<ResultHandler.ComparableDefinition> iter = defs.iterator();
+                protected ResultHandler.ComparableDefinition computeNext()
+                {
+                    if (iter.hasNext())
+                        return iter.next();
+                    return endOfData();
+                }
+            };
+        }
+        public boolean equals(Object other)
+        {
+            if (!(other instanceof FakeComparableColumnDefinitions))
+                return false;
+            return defs.equals(((FakeComparableColumnDefinitions)other).defs);
+        }
+
+        public String toString()
+        {
+            return defs.toString();
+        }
+    }
+
+    private static class FakeComparableDefinition implements ResultHandler.ComparableDefinition
+    {
+        private final Pair<String, String> p;
+
+        public FakeComparableDefinition(Pair<String, String> p)
+        {
+            this.p = p;
+        }
+        public String getType()
+        {
+            return p.right;
+        }
+
+        public String getName()
+        {
+            return p.left;
+        }
+
+        public boolean equals(Object other)
+        {
+            if (!(other instanceof FakeComparableDefinition))
+                return false;
+            return p.equals(((FakeComparableDefinition)other).p);
+        }
+
+        public String toString()
+        {
+            return getName() + ':' + getType();
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org