You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2018/09/01 08:44:22 UTC

[6/7] cassandra git commit: Add fqltool compare

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/src/java/org/apache/cassandra/tools/fqltool/commands/Replay.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/fqltool/commands/Replay.java b/src/java/org/apache/cassandra/tools/fqltool/commands/Replay.java
deleted file mode 100644
index 043ead8..0000000
--- a/src/java/org/apache/cassandra/tools/fqltool/commands/Replay.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.tools.fqltool.commands;
-
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import io.airlift.airline.Arguments;
-import io.airlift.airline.Command;
-import io.airlift.airline.Option;
-import net.openhft.chronicle.core.io.Closeable;
-import net.openhft.chronicle.queue.ChronicleQueue;
-import net.openhft.chronicle.queue.ChronicleQueueBuilder;
-
-import org.apache.cassandra.tools.fqltool.FQLQuery;
-import org.apache.cassandra.tools.fqltool.FQLQueryIterator;
-import org.apache.cassandra.tools.fqltool.QueryReplayer;
-import org.apache.cassandra.utils.AbstractIterator;
-import org.apache.cassandra.utils.MergeIterator;
-
-/**
- * replay the contents of a list of paths containing full query logs
- */
-@Command(name = "replay", description = "Replay full query logs")
-public class Replay implements Runnable
-{
-    @Arguments(usage = "<path1> [<path2>...<pathN>]", description = "Paths containing the full query logs to replay.", required = true)
-    private List<String> arguments = new ArrayList<>();
-
-    @Option(title = "target", name = {"--target"}, description = "Hosts to replay the logs to, can be repeated to replay to more hosts.")
-    private List<String> targetHosts;
-
-    @Option(title = "results", name = { "--results"}, description = "Where to store the results of the queries, this should be a directory. Leave this option out to avoid storing results.")
-    private String resultPath;
-
-    @Option(title = "keyspace", name = { "--keyspace"}, description = "Only replay queries against this keyspace and queries without keyspace set.")
-    private String keyspace;
-
-    @Option(title = "debug", name = {"--debug"}, description = "Debug mode, print all queries executed.")
-    private boolean debug;
-
-    @Option(title = "store_queries", name = {"--store-queries"}, description = "Path to store the queries executed. Stores queries in the same order as the result sets are in the result files. Requires --results")
-    private String queryStorePath;
-
-    @Override
-    public void run()
-    {
-        try
-        {
-            List<File> resultPaths = null;
-            if (resultPath != null)
-            {
-                File basePath = new File(resultPath);
-                if (!basePath.exists() || !basePath.isDirectory())
-                {
-                    System.err.println("The results path (" + basePath + ") should be an existing directory");
-                    System.exit(1);
-                }
-                resultPaths = targetHosts.stream().map(target -> new File(basePath, target)).collect(Collectors.toList());
-                resultPaths.forEach(File::mkdir);
-            }
-            if (targetHosts.size() < 1)
-            {
-                System.err.println("You need to state at least one --target host to replay the query against");
-                System.exit(1);
-            }
-            replay(keyspace, arguments, targetHosts, resultPaths, queryStorePath, debug);
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public static void replay(String keyspace, List<String> arguments, List<String> targetHosts, List<File> resultPaths, String queryStorePath, boolean debug)
-    {
-        int readAhead = 200; // how many fql queries should we read in to memory to be able to sort them?
-        List<ChronicleQueue> readQueues = null;
-        List<FQLQueryIterator> iterators = null;
-        List<Predicate<FQLQuery>> filters = new ArrayList<>();
-
-        if (keyspace != null)
-            filters.add(fqlQuery -> fqlQuery.keyspace == null || fqlQuery.keyspace.equals(keyspace));
-
-        try
-        {
-            readQueues = arguments.stream().map(s -> ChronicleQueueBuilder.single(s).readOnly(true).build()).collect(Collectors.toList());
-            iterators = readQueues.stream().map(ChronicleQueue::createTailer).map(tailer -> new FQLQueryIterator(tailer, readAhead)).collect(Collectors.toList());
-            try (MergeIterator<FQLQuery, List<FQLQuery>> iter = MergeIterator.get(iterators, FQLQuery::compareTo, new Reducer());
-                 QueryReplayer replayer = new QueryReplayer(iter, targetHosts, resultPaths, filters, System.out, queryStorePath, debug))
-            {
-                replayer.replay();
-            }
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
-        finally
-        {
-            if (iterators != null)
-                iterators.forEach(AbstractIterator::close);
-            if (readQueues != null)
-                readQueues.forEach(Closeable::close);
-        }
-    }
-
-    @VisibleForTesting
-    public static class Reducer extends MergeIterator.Reducer<FQLQuery, List<FQLQuery>>
-    {
-        List<FQLQuery> queries = new ArrayList<>();
-        public void reduce(int idx, FQLQuery current)
-        {
-            queries.add(current);
-        }
-
-        protected List<FQLQuery> getReduced()
-        {
-            return queries;
-        }
-        protected void onKeyChange()
-        {
-            queries.clear();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/test/unit/org/apache/cassandra/tools/fqltool/FQLReplayTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/tools/fqltool/FQLReplayTest.java b/test/unit/org/apache/cassandra/tools/fqltool/FQLReplayTest.java
deleted file mode 100644
index a662699..0000000
--- a/test/unit/org/apache/cassandra/tools/fqltool/FQLReplayTest.java
+++ /dev/null
@@ -1,760 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.tools.fqltool;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Objects;
-import java.util.Random;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
-
-import com.google.common.collect.AbstractIterator;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import org.junit.Test;
-
-import net.openhft.chronicle.queue.ChronicleQueue;
-import net.openhft.chronicle.queue.ChronicleQueueBuilder;
-import net.openhft.chronicle.queue.ExcerptAppender;
-import net.openhft.chronicle.queue.ExcerptTailer;
-import net.openhft.chronicle.wire.ValueIn;
-import org.apache.cassandra.audit.FullQueryLogger;
-import org.apache.cassandra.cql3.QueryOptions;
-import org.apache.cassandra.cql3.statements.BatchStatement;
-import org.apache.cassandra.service.ClientState;
-import org.apache.cassandra.service.QueryState;
-import org.apache.cassandra.tools.Util;
-import org.apache.cassandra.tools.fqltool.commands.Replay;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.MergeIterator;
-import org.apache.cassandra.utils.Pair;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-public class FQLReplayTest
-{
-    public FQLReplayTest()
-    {
-        Util.initDatabaseDescriptor();
-    }
-
-    @Test
-    public void testOrderedReplay() throws IOException
-    {
-        File f = generateQueries(100, true);
-        int queryCount = 0;
-        try (ChronicleQueue queue = ChronicleQueueBuilder.single(f).build();
-             FQLQueryIterator iter = new FQLQueryIterator(queue.createTailer(), 101))
-        {
-            long last = -1;
-            while (iter.hasNext())
-            {
-                FQLQuery q = iter.next();
-                assertTrue(q.queryStartTime >= last);
-                last = q.queryStartTime;
-                queryCount++;
-            }
-        }
-        assertEquals(100, queryCount);
-    }
-    @Test
-    public void testMergingIterator() throws IOException
-    {
-        File f = generateQueries(100, false);
-        File f2 = generateQueries(100, false);
-        int queryCount = 0;
-        try (ChronicleQueue queue = ChronicleQueueBuilder.single(f).build();
-             ChronicleQueue queue2 = ChronicleQueueBuilder.single(f2).build();
-             FQLQueryIterator iter = new FQLQueryIterator(queue.createTailer(), 101);
-             FQLQueryIterator iter2 = new FQLQueryIterator(queue2.createTailer(), 101);
-             MergeIterator<FQLQuery, List<FQLQuery>> merger = MergeIterator.get(Lists.newArrayList(iter, iter2), FQLQuery::compareTo, new Replay.Reducer()))
-        {
-            long last = -1;
-
-            while (merger.hasNext())
-            {
-                List<FQLQuery> qs = merger.next();
-                assertEquals(2, qs.size());
-                assertEquals(0, qs.get(0).compareTo(qs.get(1)));
-                assertTrue(qs.get(0).queryStartTime >= last);
-                last = qs.get(0).queryStartTime;
-                queryCount++;
-            }
-        }
-        assertEquals(100, queryCount);
-    }
-
-    @Test
-    public void testFQLQueryReader() throws IOException
-    {
-        FQLQueryReader reader = new FQLQueryReader();
-
-        try (ChronicleQueue queue = ChronicleQueueBuilder.single(generateQueries(1000, true)).build())
-        {
-            ExcerptTailer tailer = queue.createTailer();
-            int queryCount = 0;
-            while (tailer.readDocument(reader))
-            {
-                assertNotNull(reader.getQuery());
-                if (reader.getQuery() instanceof FQLQuery.Single)
-                {
-                    assertTrue(reader.getQuery().keyspace == null || reader.getQuery().keyspace.equals("querykeyspace"));
-                }
-                else
-                {
-                    assertEquals("someks", reader.getQuery().keyspace);
-                }
-                queryCount++;
-            }
-            assertEquals(1000, queryCount);
-        }
-    }
-
-    @Test
-    public void testStoringResults() throws Throwable
-    {
-        File tmpDir = Files.createTempDirectory("results").toFile();
-        File queryDir = Files.createTempDirectory("queries").toFile();
-
-        ResultHandler.ComparableResultSet res = createResultSet(10, 10, true);
-        ResultStore rs = new ResultStore(Collections.singletonList(tmpDir), queryDir);
-        try
-        {
-            FQLQuery query = new FQLQuery.Single("abc", 3, QueryOptions.DEFAULT, 12345, 11111, 22, "select * from abc", Collections.emptyList());
-            rs.storeColumnDefinitions(query, Collections.singletonList(res.getColumnDefinitions()));
-            Iterator<ResultHandler.ComparableRow> it = res.iterator();
-            while (it.hasNext())
-            {
-                List<ResultHandler.ComparableRow> row = Collections.singletonList(it.next());
-                rs.storeRows(row);
-            }
-            // this marks the end of the result set:
-            rs.storeRows(Collections.singletonList(null));
-        }
-        finally
-        {
-            rs.close();
-        }
-
-        List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> resultSets = readResultFile(tmpDir, queryDir);
-        assertEquals(1, resultSets.size());
-        assertEquals(res, resultSets.get(0).right);
-
-    }
-
-    @Test
-    public void testCompareColumnDefinitions()
-    {
-        ResultHandler.ComparableResultSet res = createResultSet(10, 10, false);
-        ResultComparator rc = new ResultComparator();
-
-        List<ResultHandler.ComparableColumnDefinitions> colDefs = new ArrayList<>(100);
-        List<String> targetHosts = new ArrayList<>(100);
-        for (int i = 0; i < 100; i++)
-        {
-            targetHosts.add("host"+i);
-            colDefs.add(res.getColumnDefinitions());
-        }
-        assertTrue(rc.compareColumnDefinitions(targetHosts, null, colDefs));
-        colDefs.set(50, createResultSet(9, 9, false).getColumnDefinitions());
-        assertFalse(rc.compareColumnDefinitions(targetHosts, null, colDefs));
-    }
-
-    @Test
-    public void testCompareEqualRows()
-    {
-        ResultComparator rc = new ResultComparator();
-
-        ResultHandler.ComparableResultSet res = createResultSet(10, 10, false);
-        ResultHandler.ComparableResultSet res2 = createResultSet(10, 10, false);
-        List<ResultHandler.ComparableResultSet> toCompare = Lists.newArrayList(res, res2);
-        List<Iterator<ResultHandler.ComparableRow>> iters = toCompare.stream().map(Iterable::iterator).collect(Collectors.toList());
-
-        while (true)
-        {
-            List<ResultHandler.ComparableRow> rows = ResultHandler.rows(iters);
-            assertTrue(rc.compareRows(Lists.newArrayList("eq1", "eq2"), null, rows));
-            if (rows.stream().allMatch(Objects::isNull))
-                break;
-        }
-    }
-
-    @Test
-    public void testCompareRowsDifferentCount()
-    {
-        ResultComparator rc = new ResultComparator();
-        ResultHandler.ComparableResultSet res = createResultSet(10, 10, false);
-        ResultHandler.ComparableResultSet res2 = createResultSet(10, 10, false);
-        List<ResultHandler.ComparableResultSet> toCompare = Lists.newArrayList(res, res2, createResultSet(10, 11, false));
-        List<Iterator<ResultHandler.ComparableRow>> iters = toCompare.stream().map(Iterable::iterator).collect(Collectors.toList());
-        boolean foundMismatch = false;
-        while (true)
-        {
-            List<ResultHandler.ComparableRow> rows = ResultHandler.rows(iters);
-            if (rows.stream().allMatch(Objects::isNull))
-                break;
-            if (!rc.compareRows(Lists.newArrayList("eq1", "eq2", "diff"), null, rows))
-            {
-                foundMismatch = true;
-            }
-        }
-        assertTrue(foundMismatch);
-    }
-
-    @Test
-    public void testCompareRowsDifferentContent()
-    {
-        ResultComparator rc = new ResultComparator();
-        ResultHandler.ComparableResultSet res = createResultSet(10, 10, false);
-        ResultHandler.ComparableResultSet res2 = createResultSet(10, 10, false);
-        List<ResultHandler.ComparableResultSet> toCompare = Lists.newArrayList(res, res2, createResultSet(10, 10, true));
-        List<Iterator<ResultHandler.ComparableRow>> iters = toCompare.stream().map(Iterable::iterator).collect(Collectors.toList());
-        while (true)
-        {
-            List<ResultHandler.ComparableRow> rows = ResultHandler.rows(iters);
-            if (rows.stream().allMatch(Objects::isNull))
-                break;
-            assertFalse(rows.toString(), rc.compareRows(Lists.newArrayList("eq1", "eq2", "diff"), null, rows));
-        }
-    }
-
-    @Test
-    public void testCompareRowsDifferentColumnCount()
-    {
-        ResultComparator rc = new ResultComparator();
-        ResultHandler.ComparableResultSet res = createResultSet(10, 10, false);
-        ResultHandler.ComparableResultSet res2 = createResultSet(10, 10, false);
-        List<ResultHandler.ComparableResultSet> toCompare = Lists.newArrayList(res, res2, createResultSet(11, 10, false));
-        List<Iterator<ResultHandler.ComparableRow>> iters = toCompare.stream().map(Iterable::iterator).collect(Collectors.toList());
-        while (true)
-        {
-            List<ResultHandler.ComparableRow> rows = ResultHandler.rows(iters);
-            if (rows.stream().allMatch(Objects::isNull))
-                break;
-            assertFalse(rows.toString(), rc.compareRows(Lists.newArrayList("eq1", "eq2", "diff"), null, rows));
-        }
-    }
-
-    @Test
-    public void testResultHandler() throws IOException
-    {
-        List<String> targetHosts = Lists.newArrayList("hosta", "hostb", "hostc");
-        File tmpDir = Files.createTempDirectory("testresulthandler").toFile();
-        File queryDir = Files.createTempDirectory("queries").toFile();
-        List<File> resultPaths = new ArrayList<>();
-        targetHosts.forEach(host -> { File f = new File(tmpDir, host); f.mkdir(); resultPaths.add(f);});
-        ResultHandler rh = new ResultHandler(targetHosts, resultPaths, queryDir);
-        ResultHandler.ComparableResultSet res = createResultSet(10, 10, false);
-        ResultHandler.ComparableResultSet res2 = createResultSet(10, 10, false);
-        ResultHandler.ComparableResultSet res3 = createResultSet(10, 10, false);
-        List<ResultHandler.ComparableResultSet> toCompare = Lists.newArrayList(res, res2, res3);
-        rh.handleResults(new FQLQuery.Single("abcabc", 3, QueryOptions.DEFAULT, 1111, 2222, 3333, "select * from xyz", Collections.emptyList()), toCompare);
-        List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> results1 = readResultFile(resultPaths.get(0), queryDir);
-        List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> results2 = readResultFile(resultPaths.get(1), queryDir);
-        List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> results3 = readResultFile(resultPaths.get(2), queryDir);
-        assertEquals(results1, results2);
-        assertEquals(results1, results3);
-        assertEquals(Iterables.getOnlyElement(results3).right, res);
-    }
-
-    @Test
-    public void testResultHandlerWithDifference() throws IOException
-    {
-        List<String> targetHosts = Lists.newArrayList("hosta", "hostb", "hostc");
-        File tmpDir = Files.createTempDirectory("testresulthandler").toFile();
-        File queryDir = Files.createTempDirectory("queries").toFile();
-        List<File> resultPaths = new ArrayList<>();
-        targetHosts.forEach(host -> { File f = new File(tmpDir, host); f.mkdir(); resultPaths.add(f);});
-        ResultHandler rh = new ResultHandler(targetHosts, resultPaths, queryDir);
-        ResultHandler.ComparableResultSet res = createResultSet(10, 10, false);
-        ResultHandler.ComparableResultSet res2 = createResultSet(10, 5, false);
-        ResultHandler.ComparableResultSet res3 = createResultSet(10, 10, false);
-        List<ResultHandler.ComparableResultSet> toCompare = Lists.newArrayList(res, res2, res3);
-        rh.handleResults(new FQLQuery.Single("aaa", 3, QueryOptions.DEFAULT, 123123, 11111, 22222, "select * from abcabc", Collections.emptyList()), toCompare);
-        List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> results1 = readResultFile(resultPaths.get(0), queryDir);
-        List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> results2 = readResultFile(resultPaths.get(1), queryDir);
-        List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> results3 = readResultFile(resultPaths.get(2), queryDir);
-        assertEquals(results1, results3);
-        assertEquals(results2.get(0).right, res2);
-    }
-
-    @Test
-    public void testResultHandlerMultipleResultSets() throws IOException
-    {
-        List<String> targetHosts = Lists.newArrayList("hosta", "hostb", "hostc");
-        File tmpDir = Files.createTempDirectory("testresulthandler").toFile();
-        File queryDir = Files.createTempDirectory("queries").toFile();
-        List<File> resultPaths = new ArrayList<>();
-        targetHosts.forEach(host -> { File f = new File(tmpDir, host); f.mkdir(); resultPaths.add(f);});
-        ResultHandler rh = new ResultHandler(targetHosts, resultPaths, queryDir);
-        List<Pair<FQLQuery, List<ResultHandler.ComparableResultSet>>> resultSets = new ArrayList<>();
-        Random random = new Random();
-        for (int i = 0; i < 10; i++)
-        {
-            List<ResultHandler.ComparableResultSet> results = new ArrayList<>();
-            List<ByteBuffer> values = Collections.singletonList(ByteBufferUtil.bytes(i * 50));
-            for (int jj = 0; jj < targetHosts.size(); jj++)
-            {
-                results.add(createResultSet(5, 1 + random.nextInt(10), true));
-            }
-            FQLQuery q = new FQLQuery.Single("abc"+i,
-                                             3,
-                                             QueryOptions.forInternalCalls(values),
-                                             i * 1000,
-                                             12345,
-                                             54321,
-                                             "select * from xyz where id = "+i,
-                                             values);
-            resultSets.add(Pair.create(q, results));
-        }
-        for (int i = 0; i < resultSets.size(); i++)
-            rh.handleResults(resultSets.get(i).left, resultSets.get(i).right);
-
-        for (int i = 0; i < targetHosts.size(); i++)
-            compareWithFile(resultPaths, queryDir, resultSets, i);
-    }
-
-    @Test
-    public void testResultHandlerFailedQuery() throws IOException
-    {
-        List<String> targetHosts = Lists.newArrayList("hosta", "hostb", "hostc", "hostd");
-        File tmpDir = Files.createTempDirectory("testresulthandler").toFile();
-        File queryDir = Files.createTempDirectory("queries").toFile();
-        List<File> resultPaths = new ArrayList<>();
-        targetHosts.forEach(host -> { File f = new File(tmpDir, host); f.mkdir(); resultPaths.add(f);});
-        ResultHandler rh = new ResultHandler(targetHosts, resultPaths, queryDir);
-        List<Pair<FQLQuery, List<ResultHandler.ComparableResultSet>>> resultSets = new ArrayList<>();
-        Random random = new Random();
-        for (int i = 0; i < 10; i++)
-        {
-            List<ResultHandler.ComparableResultSet> results = new ArrayList<>();
-            List<ByteBuffer> values = Collections.singletonList(ByteBufferUtil.bytes(i * 50));
-            for (int jj = 0; jj < targetHosts.size(); jj++)
-            {
-                results.add(createResultSet(5, 1 + random.nextInt(10), true));
-            }
-            results.set(0, FakeResultSet.failed(new RuntimeException("testing abc")));
-            results.set(3, FakeResultSet.failed(new RuntimeException("testing abc")));
-            FQLQuery q = new FQLQuery.Single("abc"+i,
-                                             3,
-                                             QueryOptions.forInternalCalls(values),
-                                             i * 1000,
-                                             i * 12345,
-                                             i * 54321,
-                                             "select * from xyz where id = "+i,
-                                             values);
-            resultSets.add(Pair.create(q, results));
-        }
-        for (int i = 0; i < resultSets.size(); i++)
-            rh.handleResults(resultSets.get(i).left, resultSets.get(i).right);
-
-        for (int i = 0; i < targetHosts.size(); i++)
-            compareWithFile(resultPaths, queryDir, resultSets, i);
-    }
-
-
-    @Test
-    public void testCompare()
-    {
-        FQLQuery q1 = new FQLQuery.Single("abc", 0, QueryOptions.DEFAULT, 123, 111, 222, "aaaa", Collections.emptyList());
-        FQLQuery q2 = new FQLQuery.Single("abc", 0, QueryOptions.DEFAULT, 123, 111, 222,"aaaa", Collections.emptyList());
-
-        assertEquals(0, q1.compareTo(q2));
-        assertEquals(0, q2.compareTo(q1));
-
-        FQLQuery q3 = new FQLQuery.Batch("abc", 0, QueryOptions.DEFAULT, 123, 111, 222, com.datastax.driver.core.BatchStatement.Type.UNLOGGED, Collections.emptyList(), Collections.emptyList());
-        // single queries before batch queries
-        assertTrue(q1.compareTo(q3) < 0);
-        assertTrue(q3.compareTo(q1) > 0);
-
-        // check that smaller query time
-        FQLQuery q4 = new FQLQuery.Single("abc", 0, QueryOptions.DEFAULT, 124, 111, 222, "aaaa", Collections.emptyList());
-        assertTrue(q1.compareTo(q4) < 0);
-        assertTrue(q4.compareTo(q1) > 0);
-
-        FQLQuery q5 = new FQLQuery.Batch("abc", 0, QueryOptions.DEFAULT, 124, 111, 222, com.datastax.driver.core.BatchStatement.Type.UNLOGGED, Collections.emptyList(), Collections.emptyList());
-        assertTrue(q1.compareTo(q5) < 0);
-        assertTrue(q5.compareTo(q1) > 0);
-
-        FQLQuery q6 = new FQLQuery.Single("abc", 0, QueryOptions.DEFAULT, 123, 111, 222, "aaaa", Collections.singletonList(ByteBufferUtil.bytes(10)));
-        FQLQuery q7 = new FQLQuery.Single("abc", 0, QueryOptions.DEFAULT, 123, 111, 222, "aaaa", Collections.emptyList());
-        assertTrue(q6.compareTo(q7) > 0);
-        assertTrue(q7.compareTo(q6) < 0);
-
-        FQLQuery q8 = new FQLQuery.Single("abc", 0, QueryOptions.DEFAULT, 123, 111, 222, "aaaa", Collections.singletonList(ByteBufferUtil.bytes("a")));
-        FQLQuery q9 = new FQLQuery.Single("abc", 0, QueryOptions.DEFAULT, 123, 111, 222, "aaaa", Collections.singletonList(ByteBufferUtil.bytes("b")));
-        assertTrue(q8.compareTo(q9) < 0);
-        assertTrue(q9.compareTo(q8) > 0);
-    }
-
-    private File generateQueries(int count, boolean random) throws IOException
-    {
-        Random r = new Random();
-        File dir = Files.createTempDirectory("chronicle").toFile();
-        try (ChronicleQueue readQueue = ChronicleQueueBuilder.single(dir).build())
-        {
-            ExcerptAppender appender = readQueue.acquireAppender();
-
-            for (int i = 0; i < count; i++)
-            {
-                long timestamp = random ? Math.abs(r.nextLong() % 10000) : i;
-                if (random ? r.nextBoolean() : i % 2 == 0)
-                {
-                    String query = "abcdefghijklm " + i;
-                    QueryState qs = r.nextBoolean() ? queryState() : queryState("querykeyspace");
-                    FullQueryLogger.Query  q = new FullQueryLogger.Query(query, QueryOptions.DEFAULT, qs, timestamp);
-                    appender.writeDocument(q);
-                    q.release();
-                }
-                else
-                {
-                    int batchSize = random ? r.nextInt(99) + 1 : i + 1;
-                    List<String> queries = new ArrayList<>(batchSize);
-                    List<List<ByteBuffer>> values = new ArrayList<>(batchSize);
-                    for (int jj = 0; jj < (random ? r.nextInt(batchSize) : 10); jj++)
-                    {
-                        queries.add("aaaaaa batch "+i+":"+jj);
-                        values.add(Collections.emptyList());
-                    }
-                    FullQueryLogger.Batch batch = new FullQueryLogger.Batch(BatchStatement.Type.UNLOGGED,
-                                                                            queries,
-                                                                            values,
-                                                                            QueryOptions.DEFAULT,
-                                                                            queryState("someks"),
-                                                                            timestamp);
-                    appender.writeDocument(batch);
-                    batch.release();
-                }
-            }
-        }
-        return dir;
-    }
-
-    private QueryState queryState()
-    {
-        return QueryState.forInternalCalls();
-    }
-
-    private QueryState queryState(String keyspace)
-    {
-        ClientState clientState = ClientState.forInternalCalls(keyspace);
-        return new QueryState(clientState);
-    }
-
-    private static ResultHandler.ComparableResultSet createResultSet(int columnCount, int rowCount, boolean random)
-    {
-        List<Pair<String, String>> columnDefs = new ArrayList<>(columnCount);
-        Random r = new Random();
-        for (int i = 0; i < columnCount; i++)
-        {
-            columnDefs.add(Pair.create("a" + i, "int"));
-        }
-        List<List<String>> rows = new ArrayList<>();
-        for (int i = 0; i < rowCount; i++)
-        {
-            List<String> row = new ArrayList<>(columnCount);
-            for (int jj = 0; jj < columnCount; jj++)
-                row.add(i + " col " + jj + (random ? r.nextInt() : ""));
-            rows.add(row);
-        }
-        return new FakeResultSet(columnDefs, rows);
-    }
-
-    private static void compareWithFile(List<File> dirs, File resultDir, List<Pair<FQLQuery, List<ResultHandler.ComparableResultSet>>> resultSets, int idx)
-    {
-        List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> results1 = readResultFile(dirs.get(idx), resultDir);
-        for (int i = 0; i < results1.size(); i++)
-        {
-            assertEquals(results1.get(i).left, resultSets.get(i).left);
-            assertEquals(results1.get(i).right, resultSets.get(i).right.get(idx));
-        }
-    }
-
-    private static List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> readResultFile(File dir, File queryDir)
-    {
-        List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> resultSets = new ArrayList<>();
-        try (ChronicleQueue q = ChronicleQueueBuilder.single(dir).build();
-             ChronicleQueue queryQ = ChronicleQueueBuilder.single(queryDir).build())
-        {
-            ExcerptTailer tailer = q.createTailer();
-            ExcerptTailer queryTailer = queryQ.createTailer();
-            List<Pair<String, String>> columnDefinitions = new ArrayList<>();
-            List<List<String>> rowColumns = new ArrayList<>();
-            AtomicBoolean allRowsRead = new AtomicBoolean(false);
-            AtomicBoolean failedQuery = new AtomicBoolean(false);
-            while (tailer.readDocument(wire -> {
-                String type = wire.read("type").text();
-                if (type.equals("column_definitions"))
-                {
-                    int columnCount = wire.read("column_count").int32();
-                    for (int i = 0; i < columnCount; i++)
-                    {
-                        ValueIn vi = wire.read("column_definition");
-                        String name = vi.text();
-                        String dataType = vi.text();
-                        columnDefinitions.add(Pair.create(name, dataType));
-                    }
-                }
-                else if (type.equals("row"))
-                {
-                    int rowColumnCount = wire.read("row_column_count").int32();
-                    List<String> r = new ArrayList<>(rowColumnCount);
-                    for (int i = 0; i < rowColumnCount; i++)
-                    {
-                        byte[] b = wire.read("column").bytes();
-                        r.add(new String(b));
-                    }
-                    rowColumns.add(r);
-                }
-                else if (type.equals("end_resultset"))
-                {
-                    allRowsRead.set(true);
-                }
-                else if (type.equals("query_failed"))
-                {
-                    failedQuery.set(true);
-                }
-            }))
-            {
-                if (allRowsRead.get())
-                {
-                    FQLQueryReader reader = new FQLQueryReader();
-                    queryTailer.readDocument(reader);
-                    resultSets.add(Pair.create(reader.getQuery(), failedQuery.get() ? FakeResultSet.failed(new RuntimeException("failure"))
-                                                                                    : new FakeResultSet(ImmutableList.copyOf(columnDefinitions), ImmutableList.copyOf(rowColumns))));
-                    allRowsRead.set(false);
-                    failedQuery.set(false);
-                    columnDefinitions.clear();
-                    rowColumns.clear();
-                }
-            }
-        }
-        return resultSets;
-    }
-
-    private static class FakeResultSet implements ResultHandler.ComparableResultSet
-    {
-        private final List<Pair<String, String>> cdStrings;
-        private final List<List<String>> rows;
-        private final Throwable ex;
-
-        public FakeResultSet(List<Pair<String, String>> cdStrings, List<List<String>> rows)
-        {
-            this(cdStrings, rows, null);
-        }
-
-        public FakeResultSet(List<Pair<String, String>> cdStrings, List<List<String>> rows, Throwable ex)
-        {
-            this.cdStrings = cdStrings;
-            this.rows = rows;
-            this.ex = ex;
-        }
-
-        public static FakeResultSet failed(Throwable ex)
-        {
-            return new FakeResultSet(null, null, ex);
-        }
-
-        public ResultHandler.ComparableColumnDefinitions getColumnDefinitions()
-        {
-            return new FakeComparableColumnDefinitions(cdStrings, wasFailed());
-        }
-
-        public boolean wasFailed()
-        {
-            return getFailureException() != null;
-        }
-
-        public Throwable getFailureException()
-        {
-            return ex;
-        }
-
-        public Iterator<ResultHandler.ComparableRow> iterator()
-        {
-            if (wasFailed())
-                return Collections.emptyListIterator();
-            return new AbstractIterator<ResultHandler.ComparableRow>()
-            {
-                Iterator<List<String>> iter = rows.iterator();
-                protected ResultHandler.ComparableRow computeNext()
-                {
-                    if (iter.hasNext())
-                        return new FakeComparableRow(iter.next(), cdStrings);
-                    return endOfData();
-                }
-            };
-        }
-
-        public boolean equals(Object o)
-        {
-            if (this == o) return true;
-            if (!(o instanceof FakeResultSet)) return false;
-            FakeResultSet that = (FakeResultSet) o;
-            if (wasFailed() && that.wasFailed())
-                return true;
-            return Objects.equals(cdStrings, that.cdStrings) &&
-                   Objects.equals(rows, that.rows);
-        }
-
-        public int hashCode()
-        {
-            return Objects.hash(cdStrings, rows);
-        }
-
-        public String toString()
-        {
-            return "FakeResultSet{" +
-                   "cdStrings=" + cdStrings +
-                   ", rows=" + rows +
-                   '}';
-        }
-    }
-
-    private static class FakeComparableRow implements ResultHandler.ComparableRow
-    {
-        private final List<String> row;
-        private final List<Pair<String, String>> cds;
-
-        public FakeComparableRow(List<String> row, List<Pair<String,String>> cds)
-        {
-            this.row = row;
-            this.cds = cds;
-        }
-
-        public ByteBuffer getBytesUnsafe(int i)
-        {
-            return ByteBufferUtil.bytes(row.get(i));
-        }
-
-        public ResultHandler.ComparableColumnDefinitions getColumnDefinitions()
-        {
-            return new FakeComparableColumnDefinitions(cds, false);
-        }
-
-        public boolean equals(Object other)
-        {
-            if (!(other instanceof FakeComparableRow))
-                return false;
-            return row.equals(((FakeComparableRow)other).row);
-        }
-
-        public String toString()
-        {
-            return row.toString();
-        }
-    }
-
-    private static class FakeComparableColumnDefinitions implements ResultHandler.ComparableColumnDefinitions
-    {
-        private final List<ResultHandler.ComparableDefinition> defs;
-        private final boolean failed;
-        public FakeComparableColumnDefinitions(List<Pair<String, String>> cds, boolean failed)
-        {
-            defs = cds != null ? cds.stream().map(FakeComparableDefinition::new).collect(Collectors.toList()) : null;
-            this.failed = failed;
-        }
-
-        public List<ResultHandler.ComparableDefinition> asList()
-        {
-            if (wasFailed())
-                return Collections.emptyList();
-            return defs;
-        }
-
-        public boolean wasFailed()
-        {
-            return failed;
-        }
-
-        public int size()
-        {
-            return defs.size();
-        }
-
-        public Iterator<ResultHandler.ComparableDefinition> iterator()
-        {
-            if (wasFailed())
-                return Collections.emptyListIterator();
-            return new AbstractIterator<ResultHandler.ComparableDefinition>()
-            {
-                Iterator<ResultHandler.ComparableDefinition> iter = defs.iterator();
-                protected ResultHandler.ComparableDefinition computeNext()
-                {
-                    if (iter.hasNext())
-                        return iter.next();
-                    return endOfData();
-                }
-            };
-        }
-        public boolean equals(Object other)
-        {
-            if (!(other instanceof FakeComparableColumnDefinitions))
-                return false;
-            return defs.equals(((FakeComparableColumnDefinitions)other).defs);
-        }
-
-        public String toString()
-        {
-            return defs.toString();
-        }
-    }
-
-    private static class FakeComparableDefinition implements ResultHandler.ComparableDefinition
-    {
-        private final Pair<String, String> p;
-
-        public FakeComparableDefinition(Pair<String, String> p)
-        {
-            this.p = p;
-        }
-        public String getType()
-        {
-            return p.right;
-        }
-
-        public String getName()
-        {
-            return p.left;
-        }
-
-        public boolean equals(Object other)
-        {
-            if (!(other instanceof FakeComparableDefinition))
-                return false;
-            return p.equals(((FakeComparableDefinition)other).p);
-        }
-
-        public String toString()
-        {
-            return getName() + ':' + getType();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/bin/cassandra.in.bat
----------------------------------------------------------------------
diff --git a/tools/bin/cassandra.in.bat b/tools/bin/cassandra.in.bat
index 8804921..4d395e8 100644
--- a/tools/bin/cassandra.in.bat
+++ b/tools/bin/cassandra.in.bat
@@ -39,7 +39,7 @@ goto :eof
 :okClasspath
 
 REM Include the build\classes\main directory so it works in development
-set CASSANDRA_CLASSPATH=%CLASSPATH%;%CASSANDRA_CONF%;"%CASSANDRA_HOME%\build\classes\main";"%CASSANDRA_HOME%\build\classes\stress"
+set CASSANDRA_CLASSPATH=%CLASSPATH%;%CASSANDRA_CONF%;"%CASSANDRA_HOME%\build\classes\main";"%CASSANDRA_HOME%\build\classes\stress";"%CASSANDRA_HOME%\build\classes\fqltool"
 
 REM Add the default storage location.  Can be overridden in conf\cassandra.yaml
 set CASSANDRA_PARAMS=%CASSANDRA_PARAMS% "-Dcassandra.storagedir=%CASSANDRA_HOME%\data"

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/bin/cassandra.in.sh
----------------------------------------------------------------------
diff --git a/tools/bin/cassandra.in.sh b/tools/bin/cassandra.in.sh
index 869eb22..bf1ecc4 100644
--- a/tools/bin/cassandra.in.sh
+++ b/tools/bin/cassandra.in.sh
@@ -32,7 +32,7 @@ CLASSPATH="$CASSANDRA_CONF"
 if [ -d $CASSANDRA_HOME/build ] ; then
     #cassandra_bin="$CASSANDRA_HOME/build/classes/main"
     cassandra_bin=`ls -1 $CASSANDRA_HOME/build/apache-cassandra*.jar`
-    cassandra_bin="$cassandra_bin:$CASSANDRA_HOME/build/classes/stress"
+    cassandra_bin="$cassandra_bin:$CASSANDRA_HOME/build/classes/stress:$CASSANDRA_HOME/build/classes/fqltool"
     CLASSPATH="$CLASSPATH:$cassandra_bin"
 fi
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/bin/fqltool
----------------------------------------------------------------------
diff --git a/tools/bin/fqltool b/tools/bin/fqltool
new file mode 100755
index 0000000..a34128e
--- /dev/null
+++ b/tools/bin/fqltool
@@ -0,0 +1,76 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+if [ "x$CASSANDRA_INCLUDE" = "x" ]; then
+    # Locations (in order) to use when searching for an include file.
+    for include in "`dirname "$0"`/cassandra.in.sh" \
+                   "$HOME/.cassandra.in.sh" \
+                   /usr/share/cassandra/cassandra.in.sh \
+                   /usr/local/share/cassandra/cassandra.in.sh \
+                   /opt/cassandra/cassandra.in.sh; do
+        if [ -r "$include" ]; then
+            . "$include"
+            break
+        fi
+    done
+elif [ -r "$CASSANDRA_INCLUDE" ]; then
+    . "$CASSANDRA_INCLUDE"
+fi
+
+if [ -z "$CASSANDRA_CONF" -o -z "$CLASSPATH" ]; then
+    echo "You must set the CASSANDRA_CONF and CLASSPATH vars" >&2
+    exit 1
+fi
+
+# Run cassandra-env.sh to pick up JMX_PORT
+if [ -f "$CASSANDRA_CONF/cassandra-env.sh" ]; then
+    JVM_OPTS_SAVE=$JVM_OPTS
+    MAX_HEAP_SIZE_SAVE=$MAX_HEAP_SIZE
+    . "$CASSANDRA_CONF/cassandra-env.sh"
+    MAX_HEAP_SIZE=$MAX_HEAP_SIZE_SAVE
+    JVM_OPTS=$JVM_OPTS_SAVE
+fi
+
+# JMX Port passed via cmd line args (-p 9999 / --port 9999 / --port=9999)
+# should override the value from cassandra-env.sh
+ARGS=""
+JVM_ARGS=""
+while true
+do
+  if [ ! $1 ]; then break; fi
+  case $1 in
+    -D*)
+      JVM_ARGS="$JVM_ARGS $1"
+      ;;
+    *)
+      ARGS="$ARGS $1"
+      ;;
+  esac
+  shift
+done
+
+if [ "x$MAX_HEAP_SIZE" = "x" ]; then
+    MAX_HEAP_SIZE="512m"
+fi
+
+"$JAVA" $JAVA_AGENT -ea -da:net.openhft... -cp "$CLASSPATH" $JVM_OPTS -Xmx$MAX_HEAP_SIZE \
+        -Dlog4j.configurationFile=log4j2-tools.xml \
+        $JVM_ARGS \
+        org.apache.cassandra.fqltool.FullQueryLogTool $ARGS
+
+# vi:ai sw=4 ts=4 tw=0 et

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/bin/fqltool.bat
----------------------------------------------------------------------
diff --git a/tools/bin/fqltool.bat b/tools/bin/fqltool.bat
new file mode 100644
index 0000000..acb6d1c
--- /dev/null
+++ b/tools/bin/fqltool.bat
@@ -0,0 +1,36 @@
+@REM
+@REM Licensed to the Apache Software Foundation (ASF) under one or more
+@REM contributor license agreements. See the NOTICE file distributed with
+@REM this work for additional information regarding copyright ownership.
+@REM The ASF licenses this file to You under the Apache License, Version 2.0
+@REM (the "License"); you may not use this file except in compliance with
+@REM the License. You may obtain a copy of the License at
+@REM
+@REM http://www.apache.org/licenses/LICENSE-2.0
+@REM
+@REM Unless required by applicable law or agreed to in writing, software
+@REM distributed under the License is distributed on an "AS IS" BASIS,
+@REM WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+@REM See the License for the specific language governing permissions and
+@REM limitations under the License.
+
+@echo off
+if "%OS%" == "Windows_NT" setlocal
+
+pushd "%~dp0"
+call cassandra.in.bat
+
+if NOT DEFINED JAVA_HOME goto :err
+
+set CASSANDRA_PARAMS=%CASSANDRA_PARAMS% -Dcassandra.logdir="%CASSANDRA_HOME%\logs"
+
+"%JAVA_HOME%\bin\java" -cp %CASSANDRA_CLASSPATH% %CASSANDRA_PARAMS% -Dlog4j.configurationFile=log4j2-tools.xml org.apache.cassandra.fqltool.FullQueryLogTool %*
+goto finally
+
+:err
+echo The JAVA_HOME environment variable must be set to run this program!
+pause
+
+:finally
+ENDLOCAL & set RC=%ERRORLEVEL%
+exit /B %RC%

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/fqltool/src/org/apache/cassandra/fqltool/DriverResultSet.java
----------------------------------------------------------------------
diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/DriverResultSet.java b/tools/fqltool/src/org/apache/cassandra/fqltool/DriverResultSet.java
new file mode 100644
index 0000000..ccff370
--- /dev/null
+++ b/tools/fqltool/src/org/apache/cassandra/fqltool/DriverResultSet.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.fqltool;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.AbstractIterator;
+
+import com.datastax.driver.core.ColumnDefinitions;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+
+/**
+ * Wraps a result set from the driver so that we can reuse the compare code when reading
+ * up a result set produced by ResultStore.
+ */
+public class DriverResultSet implements ResultHandler.ComparableResultSet
+{
+    private final ResultSet resultSet;
+    private final Throwable failureException;
+
+    public DriverResultSet(ResultSet resultSet)
+    {
+        this(resultSet, null);
+    }
+
+    private DriverResultSet(ResultSet res, Throwable failureException)
+    {
+        resultSet = res;
+        this.failureException = failureException;
+    }
+
+    public static DriverResultSet failed(Throwable ex)
+    {
+        return new DriverResultSet(null, ex);
+    }
+
+    public ResultHandler.ComparableColumnDefinitions getColumnDefinitions()
+    {
+        if (wasFailed())
+            return new DriverColumnDefinitions(null, true, failureException);
+
+        return new DriverColumnDefinitions(resultSet.getColumnDefinitions());
+    }
+
+    public boolean wasFailed()
+    {
+        return failureException != null;
+    }
+
+    public Throwable getFailureException()
+    {
+        return failureException;
+    }
+
+    public Iterator<ResultHandler.ComparableRow> iterator()
+    {
+        if (wasFailed())
+            return Collections.emptyListIterator();
+        return new AbstractIterator<ResultHandler.ComparableRow>()
+        {
+            Iterator<Row> iter = resultSet.iterator();
+            protected ResultHandler.ComparableRow computeNext()
+            {
+                if (iter.hasNext())
+                    return new DriverRow(iter.next());
+                return endOfData();
+            }
+        };
+    }
+
+    public static class DriverRow implements ResultHandler.ComparableRow
+    {
+        private final Row row;
+
+        public DriverRow(Row row)
+        {
+            this.row = row;
+        }
+
+        public ResultHandler.ComparableColumnDefinitions getColumnDefinitions()
+        {
+            return new DriverColumnDefinitions(row.getColumnDefinitions());
+        }
+
+        public ByteBuffer getBytesUnsafe(int i)
+        {
+            return row.getBytesUnsafe(i);
+        }
+
+        @Override
+        public boolean equals(Object oo)
+        {
+            if (!(oo instanceof ResultHandler.ComparableRow))
+                return false;
+
+            ResultHandler.ComparableRow o = (ResultHandler.ComparableRow)oo;
+            if (getColumnDefinitions().size() != o.getColumnDefinitions().size())
+                return false;
+
+            for (int j = 0; j < getColumnDefinitions().size(); j++)
+            {
+                ByteBuffer b1 = getBytesUnsafe(j);
+                ByteBuffer b2 = o.getBytesUnsafe(j);
+
+                if (b1 != null && b2 != null && !b1.equals(b2))
+                {
+                    return false;
+                }
+                if (b1 == null && b2 != null || b2 == null && b1 != null)
+                {
+                    return false;
+                }
+            }
+            return true;
+        }
+
+        public String toString()
+        {
+            StringBuilder sb = new StringBuilder();
+            List<ResultHandler.ComparableDefinition> colDefs = getColumnDefinitions().asList();
+            for (int i = 0; i < getColumnDefinitions().size(); i++)
+            {
+                ByteBuffer bb = getBytesUnsafe(i);
+                String row = bb != null ? ByteBufferUtil.bytesToHex(bb) : "NULL";
+                sb.append(colDefs.get(i)).append(':').append(row).append(",");
+            }
+            return sb.toString();
+        }
+    }
+
+    public static class DriverColumnDefinitions implements ResultHandler.ComparableColumnDefinitions
+    {
+        private final ColumnDefinitions columnDefinitions;
+        private final boolean failed;
+        private final Throwable failureException;
+
+        public DriverColumnDefinitions(ColumnDefinitions columnDefinitions)
+        {
+            this(columnDefinitions, false, null);
+        }
+
+        private DriverColumnDefinitions(ColumnDefinitions columnDefinitions, boolean failed, Throwable failureException)
+        {
+            this.columnDefinitions = columnDefinitions;
+            this.failed = failed;
+            this.failureException = failureException;
+        }
+
+        public List<ResultHandler.ComparableDefinition> asList()
+        {
+            if (wasFailed())
+                return Collections.emptyList();
+            return columnDefinitions.asList().stream().map(DriverDefinition::new).collect(Collectors.toList());
+        }
+
+        public boolean wasFailed()
+        {
+            return failed;
+        }
+
+        public Throwable getFailureException()
+        {
+            return failureException;
+        }
+
+        public int size()
+        {
+            return columnDefinitions.size();
+        }
+
+        public Iterator<ResultHandler.ComparableDefinition> iterator()
+        {
+            return asList().iterator();
+        }
+
+        public boolean equals(Object oo)
+        {
+            if (!(oo instanceof ResultHandler.ComparableColumnDefinitions))
+                return false;
+
+            ResultHandler.ComparableColumnDefinitions o = (ResultHandler.ComparableColumnDefinitions)oo;
+            if (wasFailed() && o.wasFailed())
+                return true;
+
+            if (size() != o.size())
+                return false;
+
+            return asList().equals(o.asList());
+        }
+    }
+
+    public static class DriverDefinition implements ResultHandler.ComparableDefinition
+    {
+        private final ColumnDefinitions.Definition def;
+
+        public DriverDefinition(ColumnDefinitions.Definition def)
+        {
+            this.def = def;
+        }
+
+        public String getType()
+        {
+            return def.getType().toString();
+        }
+
+        public String getName()
+        {
+            return def.getName();
+        }
+
+        public boolean equals(Object oo)
+        {
+            if (!(oo instanceof ResultHandler.ComparableDefinition))
+                return false;
+
+            return def.equals(((DriverDefinition)oo).def);
+        }
+
+        public String toString()
+        {
+            return getName() + ':' + getType();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQuery.java
----------------------------------------------------------------------
diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQuery.java b/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQuery.java
new file mode 100644
index 0000000..2862e0f
--- /dev/null
+++ b/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQuery.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.fqltool;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import com.google.common.primitives.Longs;
+
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.Statement;
+import org.apache.cassandra.audit.FullQueryLogger;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.binlog.BinLog;
+
+public abstract class FQLQuery implements Comparable<FQLQuery>
+{
+    public final long queryStartTime;
+    public final QueryOptions queryOptions;
+    public final int protocolVersion;
+    public final QueryState queryState;
+
+    public FQLQuery(String keyspace, int protocolVersion, QueryOptions queryOptions, long queryStartTime, long generatedTimestamp, int generatedNowInSeconds)
+    {
+        this.queryStartTime = queryStartTime;
+        this.queryOptions = queryOptions;
+        this.protocolVersion = protocolVersion;
+        this.queryState = queryState(keyspace, generatedTimestamp, generatedNowInSeconds);
+    }
+
+    public abstract Statement toStatement();
+
+    /**
+     * used when storing the queries executed
+     */
+    public abstract BinLog.ReleaseableWriteMarshallable toMarshallable();
+
+    public String keyspace()
+    {
+        return queryState.getClientState().getRawKeyspace();
+    }
+
+    private QueryState queryState(String keyspace, long generatedTimestamp, int generatedNowInSeconds)
+    {
+        ClientState clientState = keyspace != null ? ClientState.forInternalCalls(keyspace) : ClientState.forInternalCalls();
+        return new QueryState(clientState, generatedTimestamp, generatedNowInSeconds);
+    }
+
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (!(o instanceof FQLQuery)) return false;
+        FQLQuery fqlQuery = (FQLQuery) o;
+        return queryStartTime == fqlQuery.queryStartTime &&
+               protocolVersion == fqlQuery.protocolVersion &&
+               queryState.getTimestamp() == fqlQuery.queryState.getTimestamp() &&
+               Objects.equals(queryState.getClientState().getRawKeyspace(), fqlQuery.queryState.getClientState().getRawKeyspace()) &&
+               Objects.equals(queryOptions.getValues(), fqlQuery.queryOptions.getValues());
+    }
+
+    public int hashCode()
+    {
+        return Objects.hash(queryStartTime, queryOptions, protocolVersion, queryState.getClientState().getRawKeyspace());
+    }
+
+    public int compareTo(FQLQuery other)
+    {
+        return Longs.compare(queryStartTime, other.queryStartTime);
+    }
+
+    public String toString()
+    {
+        return "FQLQuery{" +
+               "queryStartTime=" + queryStartTime +
+               ", protocolVersion=" + protocolVersion +
+               ", queryState='" + queryState + '\'' +
+               '}';
+    }
+
+    public static class Single extends FQLQuery
+    {
+        public final String query;
+        public final List<ByteBuffer> values;
+
+        public Single(String keyspace, int protocolVersion, QueryOptions queryOptions, long queryStartTime, long generatedTimestamp, int generatedNowInSeconds, String queryString, List<ByteBuffer> values)
+        {
+            super(keyspace, protocolVersion, queryOptions, queryStartTime, generatedTimestamp, generatedNowInSeconds);
+            this.query = queryString;
+            this.values = values;
+        }
+
+        @Override
+        public String toString()
+        {
+            return String.format("%s%nQuery = %s, Values = %s",
+                                 super.toString(),
+                                 query,
+                                 values.stream().map(ByteBufferUtil::bytesToHex).collect(Collectors.joining(",")));
+        }
+
+        public Statement toStatement()
+        {
+            SimpleStatement ss = new SimpleStatement(query, values.toArray());
+            ss.setConsistencyLevel(ConsistencyLevel.valueOf(queryOptions.getConsistency().name()));
+            ss.setDefaultTimestamp(queryOptions.getTimestamp(queryState));
+            return ss;
+        }
+
+        public BinLog.ReleaseableWriteMarshallable toMarshallable()
+        {
+
+            return new FullQueryLogger.Query(query, queryOptions, queryState, queryStartTime);
+        }
+
+        public int compareTo(FQLQuery other)
+        {
+            int cmp = super.compareTo(other);
+
+            if (cmp == 0)
+            {
+                if (other instanceof Batch)
+                    return -1;
+
+                Single singleQuery = (Single) other;
+
+                cmp = query.compareTo(singleQuery.query);
+                if (cmp == 0)
+                {
+                    if (values.size() != singleQuery.values.size())
+                        return values.size() - singleQuery.values.size();
+                    for (int i = 0; i < values.size(); i++)
+                    {
+                        cmp = values.get(i).compareTo(singleQuery.values.get(i));
+                        if (cmp != 0)
+                            return cmp;
+                    }
+                }
+            }
+            return cmp;
+        }
+
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (!(o instanceof Single)) return false;
+            if (!super.equals(o)) return false;
+            Single single = (Single) o;
+            return Objects.equals(query, single.query) &&
+                   Objects.equals(values, single.values);
+        }
+
+        public int hashCode()
+        {
+            return Objects.hash(super.hashCode(), query, values);
+        }
+    }
+
+    public static class Batch extends FQLQuery
+    {
+        public final BatchStatement.Type batchType;
+        public final List<Single> queries;
+
+        public Batch(String keyspace, int protocolVersion, QueryOptions queryOptions, long queryStartTime, long generatedTimestamp, int generatedNowInSeconds, BatchStatement.Type batchType, List<String> queries, List<List<ByteBuffer>> values)
+        {
+            super(keyspace, protocolVersion, queryOptions, queryStartTime, generatedTimestamp, generatedNowInSeconds);
+            this.batchType = batchType;
+            this.queries = new ArrayList<>(queries.size());
+            for (int i = 0; i < queries.size(); i++)
+                this.queries.add(new Single(keyspace, protocolVersion, queryOptions, queryStartTime, generatedTimestamp, generatedNowInSeconds, queries.get(i), values.get(i)));
+        }
+
+        public Statement toStatement()
+        {
+            BatchStatement bs = new BatchStatement(batchType);
+            for (Single query : queries)
+                bs.add(query.toStatement());
+            bs.setConsistencyLevel(ConsistencyLevel.valueOf(queryOptions.getConsistency().name()));
+            bs.setDefaultTimestamp(queryOptions.getTimestamp(queryState));
+            return bs;
+        }
+
+        public int compareTo(FQLQuery other)
+        {
+            int cmp = super.compareTo(other);
+
+            if (cmp == 0)
+            {
+                if (other instanceof Single)
+                    return 1;
+
+                Batch otherBatch = (Batch) other;
+                if (queries.size() != otherBatch.queries.size())
+                    return queries.size() - otherBatch.queries.size();
+                for (int i = 0; i < queries.size(); i++)
+                {
+                    cmp = queries.get(i).compareTo(otherBatch.queries.get(i));
+                    if (cmp != 0)
+                        return cmp;
+                }
+            }
+            return cmp;
+        }
+
+        public BinLog.ReleaseableWriteMarshallable toMarshallable()
+        {
+            List<String> queryStrings = new ArrayList<>();
+            List<List<ByteBuffer>> values = new ArrayList<>();
+            for (Single q : queries)
+            {
+                queryStrings.add(q.query);
+                values.add(q.values);
+            }
+            return new FullQueryLogger.Batch(org.apache.cassandra.cql3.statements.BatchStatement.Type.valueOf(batchType.name()), queryStrings, values, queryOptions, queryState, queryStartTime);
+        }
+
+        public String toString()
+        {
+            StringBuilder sb = new StringBuilder(super.toString()).append("\nbatch: ").append(batchType).append('\n');
+            for (Single q : queries)
+                sb.append(q.toString()).append('\n');
+            sb.append("end batch");
+            return sb.toString();
+        }
+
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (!(o instanceof Batch)) return false;
+            if (!super.equals(o)) return false;
+            Batch batch = (Batch) o;
+            return batchType == batch.batchType &&
+                   Objects.equals(queries, batch.queries);
+        }
+
+        public int hashCode()
+        {
+            return Objects.hash(super.hashCode(), batchType, queries);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQueryIterator.java
----------------------------------------------------------------------
diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQueryIterator.java b/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQueryIterator.java
new file mode 100644
index 0000000..ccbb200
--- /dev/null
+++ b/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQueryIterator.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.fqltool;
+
+import java.util.PriorityQueue;
+
+import net.openhft.chronicle.queue.ExcerptTailer;
+import org.apache.cassandra.utils.AbstractIterator;
+
+public class FQLQueryIterator extends AbstractIterator<FQLQuery>
+{
+    // use a priority queue to be able to sort the head of the query logs in memory
+    private final PriorityQueue<FQLQuery> pq;
+    private final ExcerptTailer tailer;
+    private final FQLQueryReader reader;
+
+    /**
+     * Create an iterator over the FQLQueries in tailer
+     *
+     * Reads up to readAhead queries in to memory to be able to sort them (the files are mostly sorted already)
+     */
+    public FQLQueryIterator(ExcerptTailer tailer, int readAhead)
+    {
+        assert readAhead > 0 : "readAhead needs to be > 0";
+        reader = new FQLQueryReader();
+        this.tailer = tailer;
+        pq = new PriorityQueue<>(readAhead);
+        for (int i = 0; i < readAhead; i++)
+        {
+            FQLQuery next = readNext();
+            if (next != null)
+                pq.add(next);
+            else
+                break;
+        }
+    }
+
+    protected FQLQuery computeNext()
+    {
+        FQLQuery q = pq.poll();
+        if (q == null)
+            return endOfData();
+        FQLQuery next = readNext();
+        if (next != null)
+            pq.add(next);
+        return q;
+    }
+
+    private FQLQuery readNext()
+    {
+        if (tailer.readDocument(reader))
+            return reader.getQuery();
+        return null;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQueryReader.java
----------------------------------------------------------------------
diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQueryReader.java b/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQueryReader.java
new file mode 100644
index 0000000..fd5073c
--- /dev/null
+++ b/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQueryReader.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.fqltool;
+
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.datastax.driver.core.BatchStatement;
+import io.netty.buffer.Unpooled;
+import net.openhft.chronicle.core.io.IORuntimeException;
+import net.openhft.chronicle.wire.ReadMarshallable;
+import net.openhft.chronicle.wire.ValueIn;
+import net.openhft.chronicle.wire.WireIn;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.transport.ProtocolVersion;
+
+import static org.apache.cassandra.audit.FullQueryLogger.GENERATED_NOW_IN_SECONDS;
+import static org.apache.cassandra.audit.FullQueryLogger.GENERATED_TIMESTAMP;
+import static org.apache.cassandra.audit.FullQueryLogger.KEYSPACE;
+import static org.apache.cassandra.audit.FullQueryLogger.PROTOCOL_VERSION;
+import static org.apache.cassandra.audit.FullQueryLogger.QUERY_OPTIONS;
+import static org.apache.cassandra.audit.FullQueryLogger.QUERY_START_TIME;
+import static org.apache.cassandra.audit.FullQueryLogger.TYPE;
+import static org.apache.cassandra.audit.FullQueryLogger.VERSION;
+import static org.apache.cassandra.audit.FullQueryLogger.BATCH;
+import static org.apache.cassandra.audit.FullQueryLogger.BATCH_TYPE;
+import static org.apache.cassandra.audit.FullQueryLogger.QUERIES;
+import static org.apache.cassandra.audit.FullQueryLogger.QUERY;
+import static org.apache.cassandra.audit.FullQueryLogger.SINGLE_QUERY;
+import static org.apache.cassandra.audit.FullQueryLogger.VALUES;
+
+public class FQLQueryReader implements ReadMarshallable
+{
+    private FQLQuery query;
+
+    public void readMarshallable(WireIn wireIn) throws IORuntimeException
+    {
+        int currentVersion = wireIn.read(VERSION).int16();
+        String type = wireIn.read(TYPE).text();
+        long queryStartTime = wireIn.read(QUERY_START_TIME).int64();
+        int protocolVersion = wireIn.read(PROTOCOL_VERSION).int32();
+        QueryOptions queryOptions = QueryOptions.codec.decode(Unpooled.wrappedBuffer(wireIn.read(QUERY_OPTIONS).bytes()), ProtocolVersion.decode(protocolVersion));
+        long generatedTimestamp = wireIn.read(GENERATED_TIMESTAMP).int64();
+        int generatedNowInSeconds = wireIn.read(GENERATED_NOW_IN_SECONDS).int32();
+        String keyspace = wireIn.read(KEYSPACE).text();
+
+        switch (type)
+        {
+            case SINGLE_QUERY:
+                String queryString = wireIn.read(QUERY).text();
+                query = new FQLQuery.Single(keyspace,
+                                            protocolVersion,
+                                            queryOptions,
+                                            queryStartTime,
+                                            generatedTimestamp,
+                                            generatedNowInSeconds,
+                                            queryString,
+                                            queryOptions.getValues());
+                break;
+            case BATCH:
+                BatchStatement.Type batchType = BatchStatement.Type.valueOf(wireIn.read(BATCH_TYPE).text());
+                ValueIn in = wireIn.read(QUERIES);
+                int queryCount = in.int32();
+
+                List<String> queries = new ArrayList<>(queryCount);
+                for (int i = 0; i < queryCount; i++)
+                    queries.add(in.text());
+                in = wireIn.read(VALUES);
+                int valueCount = in.int32();
+                List<List<ByteBuffer>> values = new ArrayList<>(valueCount);
+                for (int ii = 0; ii < valueCount; ii++)
+                {
+                    List<ByteBuffer> subValues = new ArrayList<>();
+                    values.add(subValues);
+                    int numSubValues = in.int32();
+                    for (int zz = 0; zz < numSubValues; zz++)
+                        subValues.add(ByteBuffer.wrap(in.bytes()));
+                }
+                query = new FQLQuery.Batch(keyspace,
+                                           protocolVersion,
+                                           queryOptions,
+                                           queryStartTime,
+                                           generatedTimestamp,
+                                           generatedNowInSeconds,
+                                           batchType,
+                                           queries,
+                                           values);
+                break;
+            default:
+                throw new RuntimeException("Unknown type: " + type);
+        }
+    }
+
+    public FQLQuery getQuery()
+    {
+        return query;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/fqltool/src/org/apache/cassandra/fqltool/FullQueryLogTool.java
----------------------------------------------------------------------
diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/FullQueryLogTool.java b/tools/fqltool/src/org/apache/cassandra/fqltool/FullQueryLogTool.java
new file mode 100644
index 0000000..97e7487
--- /dev/null
+++ b/tools/fqltool/src/org/apache/cassandra/fqltool/FullQueryLogTool.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.fqltool;
+
+import java.util.List;
+
+import com.google.common.base.Throwables;
+
+import io.airlift.airline.Cli;
+import io.airlift.airline.Help;
+import io.airlift.airline.ParseArgumentsMissingException;
+import io.airlift.airline.ParseArgumentsUnexpectedException;
+import io.airlift.airline.ParseCommandMissingException;
+import io.airlift.airline.ParseCommandUnrecognizedException;
+import io.airlift.airline.ParseOptionConversionException;
+import io.airlift.airline.ParseOptionMissingException;
+import io.airlift.airline.ParseOptionMissingValueException;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.fqltool.commands.Compare;
+import org.apache.cassandra.fqltool.commands.Dump;
+import org.apache.cassandra.fqltool.commands.Replay;
+
+import static com.google.common.base.Throwables.getStackTraceAsString;
+import static com.google.common.collect.Lists.newArrayList;
+
+public class FullQueryLogTool
+{
+    public static void main(String... args)
+    {
+        DatabaseDescriptor.clientInitialization();
+        List<Class<? extends Runnable>> commands = newArrayList(
+                Help.class,
+                Dump.class,
+                Replay.class,
+                Compare.class
+        );
+
+        Cli.CliBuilder<Runnable> builder = Cli.builder("fqltool");
+
+        builder.withDescription("Manipulate the contents of full query log files")
+                 .withDefaultCommand(Help.class)
+                 .withCommands(commands);
+
+        Cli<Runnable> parser = builder.build();
+
+        int status = 0;
+        try
+        {
+            parser.parse(args).run();
+        } catch (IllegalArgumentException |
+                IllegalStateException |
+                ParseArgumentsMissingException |
+                ParseArgumentsUnexpectedException |
+                ParseOptionConversionException |
+                ParseOptionMissingException |
+                ParseOptionMissingValueException |
+                ParseCommandMissingException |
+                ParseCommandUnrecognizedException e)
+        {
+            badUse(e);
+            status = 1;
+        } catch (Throwable throwable)
+        {
+            err(Throwables.getRootCause(throwable));
+            status = 2;
+        }
+
+        System.exit(status);
+    }
+
+    private static void badUse(Exception e)
+    {
+        System.out.println("fqltool: " + e.getMessage());
+        System.out.println("See 'fqltool help' or 'fqltool help <command>'.");
+    }
+
+    private static void err(Throwable e)
+    {
+        System.err.println("error: " + e.getMessage());
+        System.err.println("-- StackTrace --");
+        System.err.println(getStackTraceAsString(e));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/fqltool/src/org/apache/cassandra/fqltool/QueryReplayer.java
----------------------------------------------------------------------
diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/QueryReplayer.java b/tools/fqltool/src/org/apache/cassandra/fqltool/QueryReplayer.java
new file mode 100644
index 0000000..d8653e5
--- /dev/null
+++ b/tools/fqltool/src/org/apache/cassandra/fqltool/QueryReplayer.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.fqltool;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+//import javax.annotation.Nullable;
+
+import com.google.common.util.concurrent.FluentFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class QueryReplayer implements Closeable
+{
+    private static final int PRINT_RATE = 5000;
+    private final ExecutorService es = Executors.newFixedThreadPool(1);
+    private final Iterator<List<FQLQuery>> queryIterator;
+    private final List<Cluster> targetClusters;
+    private final List<Predicate<FQLQuery>> filters;
+    private final List<Session> sessions;
+    private final ResultHandler resultHandler;
+    private final MetricRegistry metrics = new MetricRegistry();
+    private final boolean debug;
+    private final PrintStream out;
+
+    public QueryReplayer(Iterator<List<FQLQuery>> queryIterator,
+                         List<String> targetHosts,
+                         List<File> resultPaths,
+                         List<Predicate<FQLQuery>> filters,
+                         PrintStream out,
+                         String queryFilePathString,
+                         boolean debug)
+    {
+        this.queryIterator = queryIterator;
+        targetClusters = targetHosts.stream().map(h -> Cluster.builder().addContactPoint(h).build()).collect(Collectors.toList());
+        this.filters = filters;
+        sessions = targetClusters.stream().map(Cluster::connect).collect(Collectors.toList());
+        File queryFilePath = queryFilePathString != null ? new File(queryFilePathString) : null;
+        resultHandler = new ResultHandler(targetHosts, resultPaths, queryFilePath);
+        this.debug = debug;
+        this.out = out;
+    }
+
+    public void replay()
+    {
+        while (queryIterator.hasNext())
+        {
+            List<FQLQuery> queries = queryIterator.next();
+            for (FQLQuery query : queries)
+            {
+                if (filters.stream().anyMatch(f -> !f.test(query)))
+                    continue;
+                try (Timer.Context ctx = metrics.timer("queries").time())
+                {
+                    List<ListenableFuture<ResultHandler.ComparableResultSet>> results = new ArrayList<>(sessions.size());
+                    Statement statement = query.toStatement();
+                    for (Session session : sessions)
+                    {
+                        maybeSetKeyspace(session, query);
+                        if (debug)
+                        {
+                            out.println("Executing query:");
+                            out.println(query);
+                        }
+                        ListenableFuture<ResultSet> future = session.executeAsync(statement);
+                        results.add(handleErrors(future));
+                    }
+
+                    ListenableFuture<List<ResultHandler.ComparableResultSet>> resultList = Futures.allAsList(results);
+
+                    Futures.addCallback(resultList, new FutureCallback<List<ResultHandler.ComparableResultSet>>()
+                    {
+                        public void onSuccess(/*@Nullable */List<ResultHandler.ComparableResultSet> resultSets)
+                        {
+                            // note that the order of resultSets is signifcant here - resultSets.get(x) should
+                            // be the result from a query against targetHosts.get(x)
+                            resultHandler.handleResults(query, resultSets);
+                        }
+
+                        public void onFailure(Throwable throwable)
+                        {
+                            throw new AssertionError("Errors should be handled in FQLQuery.execute", throwable);
+                        }
+                    }, es);
+
+                    FBUtilities.waitOnFuture(resultList);
+                }
+                catch (Throwable t)
+                {
+                    out.printf("QUERY %s got exception: %s", query, t.getMessage());
+                }
+
+                Timer timer = metrics.timer("queries");
+                if (timer.getCount() % PRINT_RATE == 0)
+                    out.printf("%d queries, rate = %.2f%n", timer.getCount(), timer.getOneMinuteRate());
+            }
+        }
+    }
+
+    private void maybeSetKeyspace(Session session, FQLQuery query)
+    {
+        try
+        {
+            if (query.keyspace() != null && !query.keyspace().equals(session.getLoggedKeyspace()))
+            {
+                if (debug)
+                    out.printf("Switching keyspace from %s to %s%n", session.getLoggedKeyspace(), query.keyspace());
+                session.execute("USE " + query.keyspace());
+            }
+        }
+        catch (Throwable t)
+        {
+            out.printf("USE %s failed: %s%n", query.keyspace(), t.getMessage());
+        }
+    }
+
+    /**
+     * Make sure we catch any query errors
+     *
+     * On error, this creates a failed ComparableResultSet with the exception set to be able to store
+     * this fact in the result file and handle comparison of failed result sets.
+     */
+    private static ListenableFuture<ResultHandler.ComparableResultSet> handleErrors(ListenableFuture<ResultSet> result)
+    {
+        FluentFuture<ResultHandler.ComparableResultSet> fluentFuture = FluentFuture.from(result)
+                                                                                   .transform(DriverResultSet::new, MoreExecutors.directExecutor());
+        return fluentFuture.catching(Throwable.class, DriverResultSet::failed, MoreExecutors.directExecutor());
+    }
+
+    public void close() throws IOException
+    {
+        sessions.forEach(Session::close);
+        targetClusters.forEach(Cluster::close);
+        resultHandler.close();
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org