You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2018/09/01 08:44:21 UTC

[5/7] cassandra git commit: Add fqltool compare

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/fqltool/src/org/apache/cassandra/fqltool/ResultComparator.java
----------------------------------------------------------------------
diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/ResultComparator.java b/tools/fqltool/src/org/apache/cassandra/fqltool/ResultComparator.java
new file mode 100644
index 0000000..d8d419a
--- /dev/null
+++ b/tools/fqltool/src/org/apache/cassandra/fqltool/ResultComparator.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.fqltool;
+
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Streams;
+
+public class ResultComparator
+{
+    /**
+     * Compares the rows in rows
+     * the row at position x in rows will have come from host at position x in targetHosts
+     */
+    public boolean compareRows(List<String> targetHosts, FQLQuery query, List<ResultHandler.ComparableRow> rows)
+    {
+        if (rows.size() < 2 || rows.stream().allMatch(Objects::isNull))
+            return true;
+
+        if (rows.stream().anyMatch(Objects::isNull))
+        {
+            handleMismatch(targetHosts, query, rows);
+            return false;
+        }
+
+        ResultHandler.ComparableRow ref = rows.get(0);
+        boolean equal = true;
+        for (int i = 1; i < rows.size(); i++)
+        {
+            ResultHandler.ComparableRow compare = rows.get(i);
+            if (!ref.equals(compare))
+                equal = false;
+        }
+        if (!equal)
+            handleMismatch(targetHosts, query, rows);
+        return equal;
+    }
+
+    /**
+     * Compares the column definitions
+     *
+     * the column definitions at position x in cds will have come from host at position x in targetHosts
+     */
+    public boolean compareColumnDefinitions(List<String> targetHosts, FQLQuery query, List<ResultHandler.ComparableColumnDefinitions> cds)
+    {
+        if (cds.size() < 2)
+            return true;
+
+        boolean equal = true;
+        List<ResultHandler.ComparableDefinition> refDefs = cds.get(0).asList();
+        for (int i = 1; i < cds.size(); i++)
+        {
+            List<ResultHandler.ComparableDefinition> toCompare = cds.get(i).asList();
+            if (!refDefs.equals(toCompare))
+                equal = false;
+        }
+        if (!equal)
+            handleColumnDefMismatch(targetHosts, query, cds);
+        return equal;
+    }
+
+    private void handleMismatch(List<String> targetHosts, FQLQuery query, List<ResultHandler.ComparableRow> rows)
+    {
+        System.out.println("MISMATCH:");
+        System.out.println("Query = " + query);
+        System.out.println("Results:");
+        System.out.println(Streams.zip(rows.stream(), targetHosts.stream(), (r, host) -> String.format("%s: %s%n", host, r == null ? "null" : r)).collect(Collectors.joining()));
+    }
+
+    private void handleColumnDefMismatch(List<String> targetHosts, FQLQuery query, List<ResultHandler.ComparableColumnDefinitions> cds)
+    {
+        System.out.println("COLUMN DEFINITION MISMATCH:");
+        System.out.println("Query = " + query);
+        System.out.println("Results: ");
+        System.out.println(Streams.zip(cds.stream(), targetHosts.stream(), (cd, host) -> String.format("%s: %s%n", host, columnDefinitionsString(cd))).collect(Collectors.joining()));
+    }
+
+    private String columnDefinitionsString(ResultHandler.ComparableColumnDefinitions cd)
+    {
+        StringBuilder sb = new StringBuilder();
+        if (cd == null)
+            sb.append("NULL");
+        else if (cd.wasFailed())
+            sb.append("FAILED");
+        else
+        {
+            for (ResultHandler.ComparableDefinition def : cd)
+            {
+                sb.append(def.toString());
+            }
+        }
+        return sb.toString();
+    }
+
+
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/fqltool/src/org/apache/cassandra/fqltool/ResultHandler.java
----------------------------------------------------------------------
diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/ResultHandler.java b/tools/fqltool/src/org/apache/cassandra/fqltool/ResultHandler.java
new file mode 100644
index 0000000..8c4c018
--- /dev/null
+++ b/tools/fqltool/src/org/apache/cassandra/fqltool/ResultHandler.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.fqltool;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class ResultHandler implements Closeable
+{
+    private final ResultStore resultStore;
+    private final ResultComparator resultComparator;
+    private final List<String> targetHosts;
+
+    public ResultHandler(List<String> targetHosts, List<File> resultPaths, File queryFilePath)
+    {
+        this.targetHosts = targetHosts;
+        resultStore = resultPaths != null ? new ResultStore(resultPaths, queryFilePath) : null;
+        resultComparator = new ResultComparator();
+    }
+
+    /**
+     * Since we can't iterate a ResultSet more than once, and we don't want to keep the entire result set in memory
+     * we feed the rows one-by-one to resultComparator and resultStore.
+     *
+     * results.get(x) should be the results from executing query against targetHosts.get(x)
+     */
+    public void handleResults(FQLQuery query, List<ComparableResultSet> results)
+    {
+        for (int i = 0; i < targetHosts.size(); i++)
+        {
+            if (results.get(i).wasFailed())
+            {
+                System.out.println("Query against "+targetHosts.get(i)+" failure:");
+                System.out.println(query);
+                System.out.println("Message: "+results.get(i).getFailureException().getMessage());
+            }
+        }
+
+        List<ComparableColumnDefinitions> columnDefinitions = results.stream().map(ComparableResultSet::getColumnDefinitions).collect(Collectors.toList());
+        resultComparator.compareColumnDefinitions(targetHosts, query, columnDefinitions);
+        if (resultStore != null)
+            resultStore.storeColumnDefinitions(query, columnDefinitions);
+        List<Iterator<ComparableRow>> iters = results.stream().map(Iterable::iterator).collect(Collectors.toList());
+
+        while (true)
+        {
+            List<ComparableRow> rows = rows(iters);
+            resultComparator.compareRows(targetHosts, query, rows);
+            if (resultStore != null)
+                resultStore.storeRows(rows);
+            // all rows being null marks end of all resultsets, we need to call compareRows
+            // and storeRows once with everything null to mark that fact
+            if (rows.stream().allMatch(Objects::isNull))
+                return;
+        }
+    }
+
+    /**
+     * Get the first row from each of the iterators, if the iterator has run out, null will mark that in the list
+     */
+    @VisibleForTesting
+    public static List<ComparableRow> rows(List<Iterator<ComparableRow>> iters)
+    {
+        List<ComparableRow> rows = new ArrayList<>(iters.size());
+        for (Iterator<ComparableRow> iter : iters)
+        {
+            if (iter.hasNext())
+                rows.add(iter.next());
+            else
+                rows.add(null);
+        }
+        return rows;
+    }
+
+    public void close() throws IOException
+    {
+        if (resultStore != null)
+            resultStore.close();
+    }
+
+    public interface ComparableResultSet extends Iterable<ComparableRow>
+    {
+        public ComparableColumnDefinitions getColumnDefinitions();
+        public boolean wasFailed();
+        public Throwable getFailureException();
+    }
+
+    public interface ComparableColumnDefinitions extends Iterable<ComparableDefinition>
+    {
+        public List<ComparableDefinition> asList();
+        public boolean wasFailed();
+        public Throwable getFailureException();
+        public int size();
+    }
+
+    public interface ComparableDefinition
+    {
+        public String getType();
+        public String getName();
+    }
+
+    public interface ComparableRow
+    {
+        public ByteBuffer getBytesUnsafe(int i);
+        public ComparableColumnDefinitions getColumnDefinitions();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/fqltool/src/org/apache/cassandra/fqltool/ResultStore.java
----------------------------------------------------------------------
diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/ResultStore.java b/tools/fqltool/src/org/apache/cassandra/fqltool/ResultStore.java
new file mode 100644
index 0000000..d128717
--- /dev/null
+++ b/tools/fqltool/src/org/apache/cassandra/fqltool/ResultStore.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.fqltool;
+
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import net.openhft.chronicle.bytes.BytesStore;
+import net.openhft.chronicle.core.io.Closeable;
+import net.openhft.chronicle.core.io.IORuntimeException;
+import net.openhft.chronicle.queue.ChronicleQueue;
+import net.openhft.chronicle.queue.ChronicleQueueBuilder;
+import net.openhft.chronicle.queue.ExcerptAppender;
+import net.openhft.chronicle.wire.ReadMarshallable;
+import net.openhft.chronicle.wire.ValueIn;
+import net.openhft.chronicle.wire.ValueOut;
+import net.openhft.chronicle.wire.WireIn;
+import net.openhft.chronicle.wire.WireOut;
+import net.openhft.chronicle.wire.WriteMarshallable;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.binlog.BinLog;
+
+/**
+ * note that we store each row as a separate chronicle document to be able to
+ * avoid reading up the entire result set in memory when comparing
+ *
+ * document formats:
+ * to mark the start of a new result set:
+ * -------------------
+ * version: int16
+ * type: column_definitions
+ * column_count: int32;
+ * column_definition: text, text
+ * column_definition: text, text
+ * ....
+ * --------------------
+ *
+ * to mark a failed query:
+ * ---------------------
+ * version: int16
+ * type: query_failed
+ * message: text
+ * ---------------------
+ *
+ * row:
+ * --------------------
+ * version: int16
+ * type: row
+ * row_column_count: int32
+ * column: bytes
+ * ---------------------
+ *
+ * to mark the end of a result set:
+ * -------------------
+ * version: int16
+ * type: end_resultset
+ * -------------------
+ *
+ */
+public class ResultStore
+{
+    private static final String VERSION = "version";
+    private static final String TYPE = "type";
+    // types:
+    private static final String ROW = "row";
+    private static final String END = "end_resultset";
+    private static final String FAILURE = "query_failed";
+    private static final String COLUMN_DEFINITIONS = "column_definitions";
+    // fields:
+    private static final String COLUMN_DEFINITION = "column_definition";
+    private static final String COLUMN_COUNT = "column_count";
+    private static final String MESSAGE = "message";
+    private static final String ROW_COLUMN_COUNT = "row_column_count";
+    private static final String COLUMN = "column";
+
+    private static final int CURRENT_VERSION = 0;
+
+    private final List<ChronicleQueue> queues;
+    private final List<ExcerptAppender> appenders;
+    private final ChronicleQueue queryStoreQueue;
+    private final ExcerptAppender queryStoreAppender;
+    private final Set<Integer> finishedHosts = new HashSet<>();
+
+    public ResultStore(List<File> resultPaths, File queryFilePath)
+    {
+        queues = resultPaths.stream().map(path -> ChronicleQueueBuilder.single(path).build()).collect(Collectors.toList());
+        appenders = queues.stream().map(ChronicleQueue::acquireAppender).collect(Collectors.toList());
+        queryStoreQueue = queryFilePath != null ? ChronicleQueueBuilder.single(queryFilePath).build() : null;
+        queryStoreAppender = queryStoreQueue != null ? queryStoreQueue.acquireAppender() : null;
+    }
+
+    /**
+     * Store the column definitions in cds
+     *
+     * the ColumnDefinitions at position x will get stored by the appender at position x
+     *
+     * Calling this method indicates that we are starting a new result set from a query, it must be called before
+     * calling storeRows.
+     *
+     */
+    public void storeColumnDefinitions(FQLQuery query, List<ResultHandler.ComparableColumnDefinitions> cds)
+    {
+        finishedHosts.clear();
+        if (queryStoreAppender != null)
+        {
+            BinLog.ReleaseableWriteMarshallable writeMarshallableQuery = query.toMarshallable();
+            queryStoreAppender.writeDocument(writeMarshallableQuery);
+            writeMarshallableQuery.release();
+        }
+        for (int i = 0; i < cds.size(); i++)
+        {
+            ResultHandler.ComparableColumnDefinitions cd = cds.get(i);
+            appenders.get(i).writeDocument(new ColumnDefsWriter(cd));
+        }
+    }
+
+    /**
+     * Store rows
+     *
+     * the row at position x will get stored by appender at position x
+     *
+     * Before calling this for a new result set, storeColumnDefinitions must be called.
+     */
+    public void storeRows(List<ResultHandler.ComparableRow> rows)
+    {
+        for (int i = 0; i < rows.size(); i++)
+        {
+            ResultHandler.ComparableRow row = rows.get(i);
+            if (row == null && !finishedHosts.contains(i))
+            {
+                appenders.get(i).writeDocument(wire -> {
+                    wire.write(VERSION).int16(CURRENT_VERSION);
+                    wire.write(TYPE).text(END);
+                });
+                finishedHosts.add(i);
+            }
+            else if (row != null)
+            {
+                appenders.get(i).writeDocument(new RowWriter(row));
+            }
+        }
+    }
+
+    public void close()
+    {
+        queues.forEach(Closeable::close);
+        if (queryStoreQueue != null)
+            queryStoreQueue.close();
+    }
+
+    static class ColumnDefsWriter implements WriteMarshallable
+    {
+        private final ResultHandler.ComparableColumnDefinitions defs;
+
+        ColumnDefsWriter(ResultHandler.ComparableColumnDefinitions defs)
+        {
+            this.defs = defs;
+        }
+
+        public void writeMarshallable(WireOut wire)
+        {
+            wire.write(VERSION).int16(CURRENT_VERSION);
+            if (!defs.wasFailed())
+            {
+                wire.write(TYPE).text(COLUMN_DEFINITIONS);
+                wire.write(COLUMN_COUNT).int32(defs.size());
+                for (ResultHandler.ComparableDefinition d : defs.asList())
+                {
+                    ValueOut vo = wire.write(COLUMN_DEFINITION);
+                    vo.text(d.getName());
+                    vo.text(d.getType());
+                }
+            }
+            else
+            {
+                wire.write(TYPE).text(FAILURE);
+                wire.write(MESSAGE).text(defs.getFailureException().getMessage());
+            }
+        }
+    }
+
+    static class ColumnDefsReader implements ReadMarshallable
+    {
+        boolean wasFailed;
+        String failureMessage;
+        List<Pair<String, String>> columnDefinitions = new ArrayList<>();
+
+        public void readMarshallable(WireIn wire) throws IORuntimeException
+        {
+            int version = wire.read(VERSION).int16();
+            String type = wire.read(TYPE).text();
+            if (type.equals(FAILURE))
+            {
+                wasFailed = true;
+                failureMessage = wire.read(MESSAGE).text();
+            }
+            else if (type.equals(COLUMN_DEFINITION))
+            {
+                int columnCount = wire.read(COLUMN_COUNT).int32();
+                for (int i = 0; i < columnCount; i++)
+                {
+                    ValueIn vi = wire.read(COLUMN_DEFINITION);
+                    String name = vi.text();
+                    String dataType = vi.text();
+                    columnDefinitions.add(Pair.create(name, dataType));
+                }
+            }
+        }
+    }
+
+    /**
+     * read a single row from the wire, or, marks itself finished if we read "end_resultset"
+     */
+    static class RowReader implements ReadMarshallable
+    {
+        boolean isFinished;
+        List<ByteBuffer> rows = new ArrayList<>();
+
+        public void readMarshallable(WireIn wire) throws IORuntimeException
+        {
+            int version = wire.read(VERSION).int32();
+            String type = wire.read(TYPE).text();
+            if (!type.equals(END))
+            {
+                isFinished = false;
+                int rowColumnCount = wire.read(ROW_COLUMN_COUNT).int32();
+
+                for (int i = 0; i < rowColumnCount; i++)
+                {
+                    byte[] b = wire.read(COLUMN).bytes();
+                    rows.add(ByteBuffer.wrap(b));
+                }
+            }
+            else
+            {
+                isFinished = true;
+            }
+        }
+    }
+
+    /**
+     * Writes a single row to the given wire
+     */
+    static class RowWriter implements WriteMarshallable
+    {
+        private final ResultHandler.ComparableRow row;
+
+        RowWriter(ResultHandler.ComparableRow row)
+        {
+            this.row = row;
+        }
+
+        public void writeMarshallable(WireOut wire)
+        {
+            wire.write(VERSION).int16(CURRENT_VERSION);
+            wire.write(TYPE).text(ROW);
+            wire.write(ROW_COLUMN_COUNT).int32(row.getColumnDefinitions().size());
+            for (int jj = 0; jj < row.getColumnDefinitions().size(); jj++)
+            {
+                ByteBuffer bb = row.getBytesUnsafe(jj);
+                if (bb != null)
+                    wire.write(COLUMN).bytes(BytesStore.wrap(bb));
+                else
+                    wire.write(COLUMN).bytes("NULL".getBytes());
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/fqltool/src/org/apache/cassandra/fqltool/StoredResultSet.java
----------------------------------------------------------------------
diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/StoredResultSet.java b/tools/fqltool/src/org/apache/cassandra/fqltool/StoredResultSet.java
new file mode 100644
index 0000000..b08861d
--- /dev/null
+++ b/tools/fqltool/src/org/apache/cassandra/fqltool/StoredResultSet.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.fqltool;
+
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.AbstractIterator;
+
+import net.openhft.chronicle.queue.ExcerptTailer;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Pair;
+
+/**
+ * represents a resultset defined by the format in ResultStore on disk
+ *
+ * todo: Currently all iterators need to be consumed fully while iterating over result sets
+ *       if this is created from a tailer. This can probably be improved, but for all current uses it is fine.
+ */
+public class StoredResultSet implements ResultHandler.ComparableResultSet
+{
+    private final ResultHandler.ComparableColumnDefinitions defs;
+    public final boolean hasMoreResultSets;
+    private final Supplier<Iterator<ResultHandler.ComparableRow>> rowIteratorSupplier;
+    private final boolean wasFailed;
+    private final Throwable failureException;
+
+    /**
+     * create a new StoredResultSet
+     *
+     * note that we use an iteratorSupplier to be able to iterate over the same in-memory rows several times *in tests*
+     */
+    public StoredResultSet(ResultHandler.ComparableColumnDefinitions defs,
+                           boolean hasMoreResultSets,
+                           boolean wasFailed,
+                           Throwable failure,
+                           Supplier<Iterator<ResultHandler.ComparableRow>> iteratorSupplier)
+    {
+        this.defs = defs;
+        this.hasMoreResultSets = hasMoreResultSets;
+        this.wasFailed = wasFailed;
+        this.failureException = failure;
+        this.rowIteratorSupplier = iteratorSupplier;
+    }
+
+    /**
+     * creates a ComparableResultSet based on the data in tailer
+     */
+    public static StoredResultSet fromTailer(ExcerptTailer tailer)
+    {
+        ResultStore.ColumnDefsReader reader = new ResultStore.ColumnDefsReader();
+        boolean hasMoreResultSets = tailer.readDocument(reader);
+        ResultHandler.ComparableColumnDefinitions defs = new StoredComparableColumnDefinitions(reader.columnDefinitions,
+                                                                                               reader.wasFailed,
+                                                                                               new RuntimeException(reader.failureMessage));
+
+
+        Iterator<ResultHandler.ComparableRow> rowIterator = new AbstractIterator<ResultHandler.ComparableRow>()
+        {
+            protected ResultHandler.ComparableRow computeNext()
+            {
+                ResultStore.RowReader rowReader = new ResultStore.RowReader();
+                tailer.readDocument(rowReader);
+                if (rowReader.isFinished)
+                    return endOfData();
+                return new StoredComparableRow(rowReader.rows, defs);
+            }
+        };
+
+        return new StoredResultSet(defs,
+                                   hasMoreResultSets,
+                                   reader.wasFailed,
+                                   new RuntimeException(reader.failureMessage),
+                                   () -> rowIterator);
+    }
+
+    public static ResultHandler.ComparableResultSet failed(String failureMessage)
+    {
+        return new FailedComparableResultSet(new RuntimeException(failureMessage));
+    }
+
+    public Iterator<ResultHandler.ComparableRow> iterator()
+    {
+        return rowIteratorSupplier.get();
+    }
+
+    public ResultHandler.ComparableColumnDefinitions getColumnDefinitions()
+    {
+        return defs;
+    }
+
+    public boolean wasFailed()
+    {
+        return wasFailed;
+    }
+
+    public Throwable getFailureException()
+    {
+        return failureException;
+    }
+
+    static class StoredComparableRow implements ResultHandler.ComparableRow
+    {
+        private final List<ByteBuffer> row;
+        private final ResultHandler.ComparableColumnDefinitions cds;
+
+        public StoredComparableRow(List<ByteBuffer> row, ResultHandler.ComparableColumnDefinitions cds)
+        {
+            this.row = row;
+            this.cds = cds;
+        }
+
+        public ByteBuffer getBytesUnsafe(int i)
+        {
+            return row.get(i);
+        }
+
+        public ResultHandler.ComparableColumnDefinitions getColumnDefinitions()
+        {
+            return cds;
+        }
+
+        public boolean equals(Object other)
+        {
+            if (!(other instanceof StoredComparableRow))
+                return false;
+            return row.equals(((StoredComparableRow)other).row);
+        }
+
+        public String toString()
+        {
+            return row.stream().map(ByteBufferUtil::bytesToHex).collect(Collectors.joining(","));
+        }
+    }
+
+    static class StoredComparableColumnDefinitions implements ResultHandler.ComparableColumnDefinitions
+    {
+        private final List<ResultHandler.ComparableDefinition> defs;
+        private final boolean wasFailed;
+        private final Throwable failureException;
+
+        public StoredComparableColumnDefinitions(List<Pair<String, String>> cds, boolean wasFailed, Throwable failureException)
+        {
+            defs = cds != null ? cds.stream().map(StoredComparableDefinition::new).collect(Collectors.toList()) : Collections.emptyList();
+            this.wasFailed = wasFailed;
+            this.failureException = failureException;
+        }
+        public List<ResultHandler.ComparableDefinition> asList()
+        {
+            return wasFailed() ? Collections.emptyList() : defs;
+        }
+
+        public boolean wasFailed()
+        {
+            return wasFailed;
+        }
+
+        public Throwable getFailureException()
+        {
+            return failureException;
+        }
+
+        public int size()
+        {
+            return asList().size();
+        }
+
+        public Iterator<ResultHandler.ComparableDefinition> iterator()
+        {
+            return defs.iterator();
+        }
+
+        public boolean equals(Object other)
+        {
+            if (!(other instanceof StoredComparableColumnDefinitions))
+                return false;
+            return defs.equals(((StoredComparableColumnDefinitions)other).defs);
+        }
+
+        public String toString()
+        {
+            return defs.toString();
+        }
+    }
+
+    private static class StoredComparableDefinition implements ResultHandler.ComparableDefinition
+    {
+        private final Pair<String, String> p;
+
+        public StoredComparableDefinition(Pair<String, String> p)
+        {
+            this.p = p;
+        }
+        public String getType()
+        {
+            return p.right;
+        }
+
+        public String getName()
+        {
+            return p.left;
+        }
+
+        public boolean equals(Object other)
+        {
+            if (!(other instanceof StoredComparableDefinition))
+                return false;
+            return p.equals(((StoredComparableDefinition)other).p);
+        }
+
+        public String toString()
+        {
+            return getName() + ':' + getType();
+        }
+    }
+
+    private static class FailedComparableResultSet implements ResultHandler.ComparableResultSet
+    {
+        private final Throwable exception;
+
+        public FailedComparableResultSet(Throwable exception)
+        {
+            this.exception = exception;
+        }
+        public ResultHandler.ComparableColumnDefinitions getColumnDefinitions()
+        {
+            return new ResultHandler.ComparableColumnDefinitions()
+            {
+                public List<ResultHandler.ComparableDefinition> asList()
+                {
+                    return Collections.emptyList();
+                }
+
+                public boolean wasFailed()
+                {
+                    return true;
+                }
+
+                public Throwable getFailureException()
+                {
+                    return exception;
+                }
+
+                public int size()
+                {
+                    return 0;
+                }
+
+                public Iterator<ResultHandler.ComparableDefinition> iterator()
+                {
+                    return asList().iterator();
+                }
+            };
+        }
+
+        public boolean wasFailed()
+        {
+            return true;
+        }
+
+        public Throwable getFailureException()
+        {
+            return new RuntimeException();
+        }
+
+        public Iterator<ResultHandler.ComparableRow> iterator()
+        {
+            return Collections.emptyListIterator();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Compare.java
----------------------------------------------------------------------
diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Compare.java b/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Compare.java
new file mode 100644
index 0000000..2375296
--- /dev/null
+++ b/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Compare.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.fqltool.commands;
+
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.AbstractIterator;
+
+import io.airlift.airline.Arguments;
+import io.airlift.airline.Command;
+import io.airlift.airline.Option;
+import net.openhft.chronicle.core.io.Closeable;
+import net.openhft.chronicle.queue.ChronicleQueue;
+import net.openhft.chronicle.queue.ChronicleQueueBuilder;
+import net.openhft.chronicle.queue.ExcerptTailer;
+import org.apache.cassandra.fqltool.FQLQueryIterator;
+import org.apache.cassandra.fqltool.ResultHandler;
+import org.apache.cassandra.fqltool.StoredResultSet;
+
+/**
+ */
+@Command(name = "compare", description = "Compare result files generated by fqltool replay")
+public class Compare implements Runnable
+{
+    @Arguments(usage = "<path1> [<path2>...<pathN>]",
+               description = "Directories containing result files to compare.",
+               required = true)
+    private List<String> arguments = new ArrayList<>();
+
+    @Option(title = "queries",
+            name = { "--queries"},
+            description = "Directory to read the queries from. It is produced by the fqltool replay --store-queries option. ",
+            required = true)
+    private String querylog;
+
+    @Override
+    public void run()
+    {
+        compare(querylog, arguments);
+    }
+
+    public static void compare(String querylog, List<String> arguments)
+    {
+        List<ChronicleQueue> readQueues = null;
+        try (ResultHandler rh = new ResultHandler(arguments, null, null);
+             ChronicleQueue queryQ = ChronicleQueueBuilder.single(querylog).readOnly(true).build();
+             FQLQueryIterator queries = new FQLQueryIterator(queryQ.createTailer(), 1))
+        {
+            readQueues = arguments.stream().map(s -> ChronicleQueueBuilder.single(s).readOnly(true).build()).collect(Collectors.toList());
+            List<Iterator<ResultHandler.ComparableResultSet>> its = readQueues.stream().map(q -> new StoredResultSetIterator(q.createTailer())).collect(Collectors.toList());
+            while (queries.hasNext())
+                rh.handleResults(queries.next(), resultSets(its));
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+        finally
+        {
+            if (readQueues != null)
+                readQueues.forEach(Closeable::close);
+        }
+    }
+
+    @VisibleForTesting
+    public static List<ResultHandler.ComparableResultSet> resultSets(List<Iterator<ResultHandler.ComparableResultSet>> its)
+    {
+        List<ResultHandler.ComparableResultSet> resultSets = new ArrayList<>(its.size());
+        for (Iterator<ResultHandler.ComparableResultSet> it : its)
+        {
+            if (it.hasNext())
+                resultSets.add(it.next());
+            else
+                resultSets.add(null);
+        }
+        return resultSets;
+    }
+
+    @VisibleForTesting
+    public static class StoredResultSetIterator extends AbstractIterator<ResultHandler.ComparableResultSet>
+    {
+        private final ExcerptTailer tailer;
+
+        public StoredResultSetIterator(ExcerptTailer tailer)
+        {
+            this.tailer = tailer;
+        }
+
+        protected ResultHandler.ComparableResultSet computeNext()
+        {
+            StoredResultSet srs = StoredResultSet.fromTailer(tailer);
+            if (srs.hasMoreResultSets)
+                return srs;
+            return endOfData();
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Dump.java
----------------------------------------------------------------------
diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Dump.java b/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Dump.java
new file mode 100644
index 0000000..1263a11
--- /dev/null
+++ b/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Dump.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.fqltool.commands;
+
+import java.io.File;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import io.airlift.airline.Arguments;
+import io.airlift.airline.Command;
+import io.airlift.airline.Option;
+import io.netty.buffer.Unpooled;
+import net.openhft.chronicle.bytes.Bytes;
+import net.openhft.chronicle.queue.ChronicleQueue;
+import net.openhft.chronicle.queue.ChronicleQueueBuilder;
+import net.openhft.chronicle.queue.ExcerptTailer;
+import net.openhft.chronicle.queue.RollCycles;
+import net.openhft.chronicle.threads.Pauser;
+import net.openhft.chronicle.wire.ReadMarshallable;
+import net.openhft.chronicle.wire.ValueIn;
+import net.openhft.chronicle.wire.WireIn;
+import org.apache.cassandra.audit.FullQueryLogger;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.transport.ProtocolVersion;
+
+/**
+ * Dump the contents of a list of paths containing full query logs
+ */
+@Command(name = "dump", description = "Dump the contents of a full query log")
+public class Dump implements Runnable
+{
+    static final char[] HEXI_DECIMAL = "0123456789ABCDEF".toCharArray();
+
+    @Arguments(usage = "<path1> [<path2>...<pathN>]", description = "Path containing the full query logs to dump.", required = true)
+    private List<String> arguments = new ArrayList<>();
+
+    @Option(title = "roll_cycle", name = {"--roll-cycle"}, description = "How often to roll the log file was rolled. May be necessary for Chronicle to correctly parse file names. (MINUTELY, HOURLY, DAILY). Default HOURLY.")
+    private String rollCycle = "HOURLY";
+
+    @Option(title = "follow", name = {"--follow"}, description = "Upon reacahing the end of the log continue indefinitely waiting for more records")
+    private boolean follow = false;
+
+    @Override
+    public void run()
+    {
+        dump(arguments, rollCycle, follow);
+    }
+
+    public static void dump(List<String> arguments, String rollCycle, boolean follow)
+    {
+        StringBuilder sb = new StringBuilder();
+        ReadMarshallable reader = wireIn ->
+        {
+            sb.setLength(0);
+
+            int version = wireIn.read(FullQueryLogger.VERSION).int16();
+            if (version != FullQueryLogger.CURRENT_VERSION)
+                throw new UnsupportedOperationException("Full query log of unexpected version " + version + " encountered");
+
+            String type = wireIn.read(FullQueryLogger.TYPE).text();
+            sb.append("Type: ")
+              .append(type)
+              .append(System.lineSeparator());
+
+            long queryStartTime = wireIn.read(FullQueryLogger.QUERY_START_TIME).int64();
+            sb.append("Query start time: ")
+              .append(queryStartTime)
+              .append(System.lineSeparator());
+
+            int protocolVersion = wireIn.read(FullQueryLogger.PROTOCOL_VERSION).int32();
+            sb.append("Protocol version: ")
+              .append(protocolVersion)
+              .append(System.lineSeparator());
+
+            QueryOptions options =
+                QueryOptions.codec.decode(Unpooled.wrappedBuffer(wireIn.read(FullQueryLogger.QUERY_OPTIONS).bytes()),
+                                          ProtocolVersion.decode(protocolVersion));
+
+            long generatedTimestamp = wireIn.read(FullQueryLogger.GENERATED_TIMESTAMP).int64();
+            sb.append("Generated timestamp:")
+              .append(generatedTimestamp)
+              .append(System.lineSeparator());
+
+            int generatedNowInSeconds = wireIn.read(FullQueryLogger.GENERATED_NOW_IN_SECONDS).int32();
+            sb.append("Generated nowInSeconds:")
+              .append(generatedNowInSeconds)
+              .append(System.lineSeparator());
+
+            switch (type)
+            {
+                case (FullQueryLogger.SINGLE_QUERY):
+                    dumpQuery(options, wireIn, sb);
+                    break;
+
+                case (FullQueryLogger.BATCH):
+                    dumpBatch(options, wireIn, sb);
+                    break;
+
+                default:
+                    throw new UnsupportedOperationException("Log entry of unsupported type " + type);
+            }
+
+            System.out.print(sb.toString());
+            System.out.flush();
+        };
+
+        //Backoff strategy for spinning on the queue, not aggressive at all as this doesn't need to be low latency
+        Pauser pauser = Pauser.millis(100);
+        List<ChronicleQueue> queues = arguments.stream().distinct().map(path -> ChronicleQueueBuilder.single(new File(path)).readOnly(true).rollCycle(RollCycles.valueOf(rollCycle)).build()).collect(Collectors.toList());
+        List<ExcerptTailer> tailers = queues.stream().map(ChronicleQueue::createTailer).collect(Collectors.toList());
+        boolean hadWork = true;
+        while (hadWork)
+        {
+            hadWork = false;
+            for (ExcerptTailer tailer : tailers)
+            {
+                while (tailer.readDocument(reader))
+                {
+                    hadWork = true;
+                }
+            }
+
+            if (follow)
+            {
+                if (!hadWork)
+                {
+                    //Chronicle queue doesn't support blocking so use this backoff strategy
+                    pauser.pause();
+                }
+                //Don't terminate the loop even if there wasn't work
+                hadWork = true;
+            }
+        }
+    }
+
+    private static void dumpQuery(QueryOptions options, WireIn wireIn, StringBuilder sb)
+    {
+        sb.append("Query: ")
+          .append(wireIn.read(FullQueryLogger.QUERY).text())
+          .append(System.lineSeparator());
+
+        List<ByteBuffer> values = options.getValues() != null
+                                ? options.getValues()
+                                : Collections.emptyList();
+
+        sb.append("Values: ")
+          .append(System.lineSeparator());
+        appendValuesToStringBuilder(values, sb);
+        sb.append(System.lineSeparator());
+    }
+
+    private static void dumpBatch(QueryOptions options, WireIn wireIn, StringBuilder sb)
+    {
+        sb.append("Batch type: ")
+          .append(wireIn.read(FullQueryLogger.BATCH_TYPE).text())
+          .append(System.lineSeparator());
+
+        ValueIn in = wireIn.read(FullQueryLogger.QUERIES);
+        int numQueries = in.int32();
+        List<String> queries = new ArrayList<>(numQueries);
+        for (int i = 0; i < numQueries; i++)
+            queries.add(in.text());
+
+        in = wireIn.read(FullQueryLogger.VALUES);
+        int numValues = in.int32();
+
+        for (int i = 0; i < numValues; i++)
+        {
+            int numSubValues = in.int32();
+            List<ByteBuffer> subValues = new ArrayList<>(numSubValues);
+            for (int j = 0; j < numSubValues; j++)
+                subValues.add(ByteBuffer.wrap(in.bytes()));
+
+            sb.append("Query: ")
+              .append(queries.get(i))
+              .append(System.lineSeparator());
+
+            sb.append("Values: ")
+              .append(System.lineSeparator());
+            appendValuesToStringBuilder(subValues, sb);
+        }
+
+        sb.append(System.lineSeparator());
+    }
+
+    private static void appendValuesToStringBuilder(List<ByteBuffer> values, StringBuilder sb)
+    {
+        boolean first = true;
+        for (ByteBuffer value : values)
+        {
+            Bytes bytes = Bytes.wrapForRead(value);
+            long maxLength2 = Math.min(1024, bytes.readLimit() - bytes.readPosition());
+            toHexString(bytes, bytes.readPosition(), maxLength2, sb);
+            if (maxLength2 < bytes.readLimit() - bytes.readPosition())
+            {
+                sb.append("... truncated").append(System.lineSeparator());
+            }
+
+            if (first)
+            {
+                first = false;
+            }
+            else
+            {
+                sb.append("-----").append(System.lineSeparator());
+            }
+        }
+    }
+
+    //This is from net.openhft.chronicle.bytes, need to pass in the StringBuilder so had to copy
+    /*
+     * Copyright 2016 higherfrequencytrading.com
+     *
+     * Licensed under the Apache License, Version 2.0 (the "License");
+     * you may not use this file except in compliance with the License.
+     * You may obtain a copy of the License at
+     *
+     *     http://www.apache.org/licenses/LICENSE-2.0
+     *
+     * Unless required by applicable law or agreed to in writing, software
+     * distributed under the License is distributed on an "AS IS" BASIS,
+     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+     * See the License for the specific language governing permissions and
+     * limitations under the License.
+     */
+    /**
+     * display the hex data of {@link Bytes} from the position() to the limit()
+     *
+     * @param bytes the buffer you wish to toString()
+     * @return hex representation of the buffer, from example [0D ,OA, FF]
+     */
+    public static String toHexString(final Bytes bytes, long offset, long len, StringBuilder builder)
+    throws BufferUnderflowException
+    {
+        if (len == 0)
+            return "";
+
+        int width = 16;
+        int[] lastLine = new int[width];
+        String sep = "";
+        long position = bytes.readPosition();
+        long limit = bytes.readLimit();
+
+        try {
+            bytes.readPositionRemaining(offset, len);
+
+            long start = offset / width * width;
+            long end = (offset + len + width - 1) / width * width;
+            for (long i = start; i < end; i += width) {
+                // check for duplicate rows
+                if (i + width < end) {
+                    boolean same = true;
+
+                    for (int j = 0; j < width && i + j < offset + len; j++) {
+                        int ch = bytes.readUnsignedByte(i + j);
+                        same &= (ch == lastLine[j]);
+                        lastLine[j] = ch;
+                    }
+                    if (i > start && same) {
+                        sep = "........\n";
+                        continue;
+                    }
+                }
+                builder.append(sep);
+                sep = "";
+                String str = Long.toHexString(i);
+                for (int j = str.length(); j < 8; j++)
+                    builder.append('0');
+                builder.append(str);
+                for (int j = 0; j < width; j++) {
+                    if (j == width / 2)
+                        builder.append(' ');
+                    if (i + j < offset || i + j >= offset + len) {
+                        builder.append("   ");
+
+                    } else {
+                        builder.append(' ');
+                        int ch = bytes.readUnsignedByte(i + j);
+                        builder.append(HEXI_DECIMAL[ch >> 4]);
+                        builder.append(HEXI_DECIMAL[ch & 15]);
+                    }
+                }
+                builder.append(' ');
+                for (int j = 0; j < width; j++) {
+                    if (j == width / 2)
+                        builder.append(' ');
+                    if (i + j < offset || i + j >= offset + len) {
+                        builder.append(' ');
+
+                    } else {
+                        int ch = bytes.readUnsignedByte(i + j);
+                        if (ch < ' ' || ch > 126)
+                            ch = '\u00B7';
+                        builder.append((char) ch);
+                    }
+                }
+                builder.append("\n");
+            }
+            return builder.toString();
+        } finally {
+            bytes.readLimit(limit);
+            bytes.readPosition(position);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Replay.java
----------------------------------------------------------------------
diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Replay.java b/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Replay.java
new file mode 100644
index 0000000..adea742
--- /dev/null
+++ b/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Replay.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.fqltool.commands;
+
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import io.airlift.airline.Arguments;
+import io.airlift.airline.Command;
+import io.airlift.airline.Option;
+import net.openhft.chronicle.core.io.Closeable;
+import net.openhft.chronicle.queue.ChronicleQueue;
+import net.openhft.chronicle.queue.ChronicleQueueBuilder;
+
+import org.apache.cassandra.fqltool.FQLQuery;
+import org.apache.cassandra.fqltool.FQLQueryIterator;
+import org.apache.cassandra.fqltool.QueryReplayer;
+import org.apache.cassandra.utils.AbstractIterator;
+import org.apache.cassandra.utils.MergeIterator;
+
+/**
+ * replay the contents of a list of paths containing full query logs
+ */
+@Command(name = "replay", description = "Replay full query logs")
+public class Replay implements Runnable
+{
+    @Arguments(usage = "<path1> [<path2>...<pathN>]", description = "Paths containing the full query logs to replay.", required = true)
+    private List<String> arguments = new ArrayList<>();
+
+    @Option(title = "target", name = {"--target"}, description = "Hosts to replay the logs to, can be repeated to replay to more hosts.")
+    private List<String> targetHosts;
+
+    @Option(title = "results", name = { "--results"}, description = "Where to store the results of the queries, this should be a directory. Leave this option out to avoid storing results.")
+    private String resultPath;
+
+    @Option(title = "keyspace", name = { "--keyspace"}, description = "Only replay queries against this keyspace and queries without keyspace set.")
+    private String keyspace;
+
+    @Option(title = "debug", name = {"--debug"}, description = "Debug mode, print all queries executed.")
+    private boolean debug;
+
+    @Option(title = "store_queries", name = {"--store-queries"}, description = "Path to store the queries executed. Stores queries in the same order as the result sets are in the result files. Requires --results")
+    private String queryStorePath;
+
+    @Override
+    public void run()
+    {
+        try
+        {
+            List<File> resultPaths = null;
+            if (resultPath != null)
+            {
+                File basePath = new File(resultPath);
+                if (!basePath.exists() || !basePath.isDirectory())
+                {
+                    System.err.println("The results path (" + basePath + ") should be an existing directory");
+                    System.exit(1);
+                }
+                resultPaths = targetHosts.stream().map(target -> new File(basePath, target)).collect(Collectors.toList());
+                resultPaths.forEach(File::mkdir);
+            }
+            if (targetHosts.size() < 1)
+            {
+                System.err.println("You need to state at least one --target host to replay the query against");
+                System.exit(1);
+            }
+            replay(keyspace, arguments, targetHosts, resultPaths, queryStorePath, debug);
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static void replay(String keyspace, List<String> arguments, List<String> targetHosts, List<File> resultPaths, String queryStorePath, boolean debug)
+    {
+        int readAhead = 200; // how many fql queries should we read in to memory to be able to sort them?
+        List<ChronicleQueue> readQueues = null;
+        List<FQLQueryIterator> iterators = null;
+        List<Predicate<FQLQuery>> filters = new ArrayList<>();
+
+        if (keyspace != null)
+            filters.add(fqlQuery -> fqlQuery.keyspace() == null || fqlQuery.keyspace().equals(keyspace));
+
+        try
+        {
+            readQueues = arguments.stream().map(s -> ChronicleQueueBuilder.single(s).readOnly(true).build()).collect(Collectors.toList());
+            iterators = readQueues.stream().map(ChronicleQueue::createTailer).map(tailer -> new FQLQueryIterator(tailer, readAhead)).collect(Collectors.toList());
+            try (MergeIterator<FQLQuery, List<FQLQuery>> iter = MergeIterator.get(iterators, FQLQuery::compareTo, new Reducer());
+                 QueryReplayer replayer = new QueryReplayer(iter, targetHosts, resultPaths, filters, System.out, queryStorePath, debug))
+            {
+                replayer.replay();
+            }
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+        finally
+        {
+            if (iterators != null)
+                iterators.forEach(AbstractIterator::close);
+            if (readQueues != null)
+                readQueues.forEach(Closeable::close);
+        }
+    }
+
+    @VisibleForTesting
+    public static class Reducer extends MergeIterator.Reducer<FQLQuery, List<FQLQuery>>
+    {
+        List<FQLQuery> queries = new ArrayList<>();
+        public void reduce(int idx, FQLQuery current)
+        {
+            queries.add(current);
+        }
+
+        protected List<FQLQuery> getReduced()
+        {
+            return queries;
+        }
+        protected void onKeyChange()
+        {
+            queries.clear();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/fqltool/test/unit/org/apache/cassandra/fqltool/FQLCompareTest.java
----------------------------------------------------------------------
diff --git a/tools/fqltool/test/unit/org/apache/cassandra/fqltool/FQLCompareTest.java b/tools/fqltool/test/unit/org/apache/cassandra/fqltool/FQLCompareTest.java
new file mode 100644
index 0000000..7990b7e
--- /dev/null
+++ b/tools/fqltool/test/unit/org/apache/cassandra/fqltool/FQLCompareTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.fqltool;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import net.openhft.chronicle.core.io.Closeable;
+import net.openhft.chronicle.queue.ChronicleQueue;
+import net.openhft.chronicle.queue.ChronicleQueueBuilder;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.fqltool.commands.Compare;
+import org.apache.cassandra.tools.Util;
+
+
+import static org.psjava.util.AssertStatus.assertTrue;
+
+public class FQLCompareTest
+{
+    public FQLCompareTest()
+    {
+        Util.initDatabaseDescriptor();
+    }
+
+    @Test
+    public void endToEnd() throws IOException
+    {
+        List<String> targetHosts = Lists.newArrayList("hosta", "hostb");
+        File tmpDir = Files.createTempDirectory("testresulthandler").toFile();
+        File queryDir = Files.createTempDirectory("queries").toFile();
+        List<File> resultPaths = generateResultSets(targetHosts, tmpDir, queryDir, true, false);
+        Compare.compare(queryDir.toString(), resultPaths.stream().map(File::toString).collect(Collectors.toList()));
+    }
+
+    @Test
+    public void endToEndQueryFailures() throws IOException
+    {
+        List<String> targetHosts = Lists.newArrayList("hosta", "hostb");
+        File tmpDir = Files.createTempDirectory("testresulthandler").toFile();
+        File queryDir = Files.createTempDirectory("queries").toFile();
+        List<File> resultPaths = generateResultSets(targetHosts, tmpDir, queryDir, true,true);
+        Compare.compare(queryDir.toString(), resultPaths.stream().map(File::toString).collect(Collectors.toList()));
+    }
+
+    @Test
+    public void compareEqual() throws IOException
+    {
+        List<String> targetHosts = Lists.newArrayList("hosta", "hostb");
+        File tmpDir = Files.createTempDirectory("testresulthandler").toFile();
+        File queryDir = Files.createTempDirectory("queries").toFile();
+        List<File> resultPaths = generateResultSets(targetHosts, tmpDir, queryDir, false,false);
+
+        ResultComparator comparator = new ResultComparator();
+        List<ChronicleQueue> readQueues = null;
+        try
+        {
+            readQueues = resultPaths.stream().map(s -> ChronicleQueueBuilder.single(s).readOnly(true).build()).collect(Collectors.toList());
+            List<Iterator<ResultHandler.ComparableResultSet>> its = readQueues.stream().map(q -> new Compare.StoredResultSetIterator(q.createTailer())).collect(Collectors.toList());
+            List<ResultHandler.ComparableResultSet> resultSets = Compare.resultSets(its);
+            while(resultSets.stream().allMatch(Objects::nonNull))
+            {
+                assertTrue(comparator.compareColumnDefinitions(targetHosts, query(), resultSets.stream().map(ResultHandler.ComparableResultSet::getColumnDefinitions).collect(Collectors.toList())));
+                List<Iterator<ResultHandler.ComparableRow>> rows = resultSets.stream().map(Iterable::iterator).collect(Collectors.toList());
+
+                List<ResultHandler.ComparableRow> toCompare = ResultHandler.rows(rows);
+
+                while (toCompare.stream().allMatch(Objects::nonNull))
+                {
+                    assertTrue(comparator.compareRows(targetHosts, query(), ResultHandler.rows(rows)));
+                    toCompare = ResultHandler.rows(rows);
+                }
+                resultSets = Compare.resultSets(its);
+            }
+        }
+        finally
+        {
+            if (readQueues != null)
+                readQueues.forEach(Closeable::close);
+        }
+    }
+
+    private List<File> generateResultSets(List<String> targetHosts, File resultDir, File queryDir, boolean random, boolean includeFailures) throws IOException
+    {
+        List<File> resultPaths = new ArrayList<>();
+        targetHosts.forEach(host -> { File f = new File(resultDir, host); f.mkdir(); resultPaths.add(f);});
+
+        try (ResultHandler rh = new ResultHandler(targetHosts, resultPaths, queryDir))
+        {
+            for (int i = 0; i < 100; i++)
+            {
+                ResultHandler.ComparableResultSet resultSet1 = includeFailures && (i % 10 == 0)
+                                                               ? StoredResultSet.failed("test failure!")
+                                                               : FQLReplayTest.createResultSet(10, 10, random);
+                ResultHandler.ComparableResultSet resultSet2 = FQLReplayTest.createResultSet(10, 10, random);
+                rh.handleResults(query(), Lists.newArrayList(resultSet1, resultSet2));
+            }
+        }
+        return resultPaths;
+    }
+
+    private FQLQuery.Single query()
+    {
+        return new FQLQuery.Single("abc", QueryOptions.DEFAULT.getProtocolVersion().asInt(), QueryOptions.DEFAULT, 12345, 5555, 6666, "select * from xyz", Collections.emptyList());
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org