You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2018/09/01 08:44:19 UTC

[3/7] cassandra git commit: Add fqltool replay

Add fqltool replay

Patch by marcuse; reviewed by Jason Brown and Dinesh Joshi for CASSANDRA-14618


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/62ffb772
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/62ffb772
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/62ffb772

Branch: refs/heads/trunk
Commit: 62ffb7723917768c38c9e012710c6dce509191c1
Parents: 46c33f3
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Aug 6 16:32:27 2018 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Sat Sep 1 08:35:54 2018 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/audit/FullQueryLogger.java |   5 +-
 .../apache/cassandra/service/QueryState.java    |   8 +
 .../cassandra/tools/FullQueryLogTool.java       |   6 +-
 .../tools/fqltool/DriverResultSet.java          | 241 ++++++
 .../apache/cassandra/tools/fqltool/Dump.java    | 325 --------
 .../cassandra/tools/fqltool/FQLQuery.java       | 278 +++++++
 .../tools/fqltool/FQLQueryIterator.java         |  72 ++
 .../cassandra/tools/fqltool/FQLQueryReader.java | 116 +++
 .../cassandra/tools/fqltool/QueryReplayer.java  | 167 ++++
 .../tools/fqltool/ResultComparator.java         | 116 +++
 .../cassandra/tools/fqltool/ResultHandler.java  | 124 +++
 .../cassandra/tools/fqltool/ResultStore.java    | 142 ++++
 .../cassandra/tools/fqltool/commands/Dump.java  | 325 ++++++++
 .../tools/fqltool/commands/Replay.java          | 148 ++++
 .../cassandra/tools/fqltool/FQLReplayTest.java  | 760 +++++++++++++++++++
 16 files changed, 2505 insertions(+), 329 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffb772/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cd2a14a..1227337 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Add fqltool replay (CASSANDRA-14618)
  * Log keyspace in full query log (CASSANDRA-14656)
  * Transient Replication and Cheap Quorums (CASSANDRA-14404)
  * Log server-generated timestamp and nowInSeconds used by queries in FQL (CASSANDRA-14675)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffb772/src/java/org/apache/cassandra/audit/FullQueryLogger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/audit/FullQueryLogger.java b/src/java/org/apache/cassandra/audit/FullQueryLogger.java
index c9f8447..9c1f472 100644
--- a/src/java/org/apache/cassandra/audit/FullQueryLogger.java
+++ b/src/java/org/apache/cassandra/audit/FullQueryLogger.java
@@ -23,6 +23,7 @@ import java.util.List;
 
 import javax.annotation.Nullable;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.primitives.Ints;
 
@@ -151,7 +152,7 @@ public class FullQueryLogger extends BinLogAuditLogger implements IAuditLogger
         logRecord(wrappedQuery, binLog);
     }
 
-    static class Query extends AbstractLogEntry
+    public static class Query extends AbstractLogEntry
     {
         private final String query;
 
@@ -181,7 +182,7 @@ public class FullQueryLogger extends BinLogAuditLogger implements IAuditLogger
         }
     }
 
-    static class Batch extends AbstractLogEntry
+    public static class Batch extends AbstractLogEntry
     {
         private final int weight;
         private final BatchStatement.Type batchType;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffb772/src/java/org/apache/cassandra/service/QueryState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/QueryState.java b/src/java/org/apache/cassandra/service/QueryState.java
index 2bd07ab..26f58bf 100644
--- a/src/java/org/apache/cassandra/service/QueryState.java
+++ b/src/java/org/apache/cassandra/service/QueryState.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.service;
 
 import java.net.InetAddress;
 
+import org.apache.cassandra.transport.ClientStat;
 import org.apache.cassandra.utils.FBUtilities;
 
 /**
@@ -39,6 +40,13 @@ public class QueryState
         this.clientState = clientState;
     }
 
+    public QueryState(ClientState clientState, long timestamp, int nowInSeconds)
+    {
+        this(clientState);
+        this.timestamp = timestamp;
+        this.nowInSeconds = nowInSeconds;
+    }
+
     /**
      * @return a QueryState object for internal C* calls (not limited by any kind of auth).
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffb772/src/java/org/apache/cassandra/tools/FullQueryLogTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/FullQueryLogTool.java b/src/java/org/apache/cassandra/tools/FullQueryLogTool.java
index 0d170d9..c1d4713 100644
--- a/src/java/org/apache/cassandra/tools/FullQueryLogTool.java
+++ b/src/java/org/apache/cassandra/tools/FullQueryLogTool.java
@@ -31,7 +31,8 @@ import io.airlift.airline.ParseCommandUnrecognizedException;
 import io.airlift.airline.ParseOptionConversionException;
 import io.airlift.airline.ParseOptionMissingException;
 import io.airlift.airline.ParseOptionMissingValueException;
-import org.apache.cassandra.tools.fqltool.Dump;
+import org.apache.cassandra.tools.fqltool.commands.Dump;
+import org.apache.cassandra.tools.fqltool.commands.Replay;
 
 import static com.google.common.base.Throwables.getStackTraceAsString;
 import static com.google.common.collect.Lists.newArrayList;
@@ -42,7 +43,8 @@ public class FullQueryLogTool
     {
         List<Class<? extends Runnable>> commands = newArrayList(
                 Help.class,
-                Dump.class
+                Dump.class,
+                Replay.class
         );
 
         Cli.CliBuilder<Runnable> builder = Cli.builder("fqltool");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffb772/src/java/org/apache/cassandra/tools/fqltool/DriverResultSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/fqltool/DriverResultSet.java b/src/java/org/apache/cassandra/tools/fqltool/DriverResultSet.java
new file mode 100644
index 0000000..6c4ee45
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/fqltool/DriverResultSet.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.fqltool;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.AbstractIterator;
+
+import com.datastax.driver.core.ColumnDefinitions;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+
+/**
+ * Wraps a result set from the driver so that we can reuse the compare code when reading
+ * up a result set produced by ResultStore.
+ */
+public class DriverResultSet implements ResultHandler.ComparableResultSet
+{
+    private final ResultSet resultSet;
+    private final Throwable failureException;
+
+    public DriverResultSet(ResultSet resultSet)
+    {
+        this(resultSet, null);
+    }
+
+    private DriverResultSet(ResultSet res, Throwable failureException)
+    {
+        resultSet = res;
+        this.failureException = failureException;
+    }
+
+    public static DriverResultSet failed(Throwable ex)
+    {
+        return new DriverResultSet(null, ex);
+    }
+
+    public ResultHandler.ComparableColumnDefinitions getColumnDefinitions()
+    {
+        if (wasFailed())
+            return new DriverColumnDefinitions(null, true);
+
+        return new DriverColumnDefinitions(resultSet.getColumnDefinitions());
+    }
+
+    public boolean wasFailed()
+    {
+        return failureException != null;
+    }
+
+    public Throwable getFailureException()
+    {
+        return failureException;
+    }
+
+    public Iterator<ResultHandler.ComparableRow> iterator()
+    {
+        if (wasFailed())
+            return Collections.emptyListIterator();
+        return new AbstractIterator<ResultHandler.ComparableRow>()
+        {
+            Iterator<Row> iter = resultSet.iterator();
+            protected ResultHandler.ComparableRow computeNext()
+            {
+                if (iter.hasNext())
+                    return new DriverRow(iter.next());
+                return endOfData();
+            }
+        };
+    }
+
+    public static class DriverRow implements ResultHandler.ComparableRow
+    {
+        private final Row row;
+
+        public DriverRow(Row row)
+        {
+            this.row = row;
+        }
+
+        public ResultHandler.ComparableColumnDefinitions getColumnDefinitions()
+        {
+            return new DriverColumnDefinitions(row.getColumnDefinitions());
+        }
+
+        public ByteBuffer getBytesUnsafe(int i)
+        {
+            return row.getBytesUnsafe(i);
+        }
+
+        @Override
+        public boolean equals(Object oo)
+        {
+            if (!(oo instanceof ResultHandler.ComparableRow))
+                return false;
+
+            ResultHandler.ComparableRow o = (ResultHandler.ComparableRow)oo;
+            if (getColumnDefinitions().size() != o.getColumnDefinitions().size())
+                return false;
+
+            for (int j = 0; j < getColumnDefinitions().size(); j++)
+            {
+                ByteBuffer b1 = getBytesUnsafe(j);
+                ByteBuffer b2 = o.getBytesUnsafe(j);
+
+                if (b1 != null && b2 != null && !b1.equals(b2))
+                {
+                    return false;
+                }
+                if (b1 == null && b2 != null || b2 == null && b1 != null)
+                {
+                    return false;
+                }
+            }
+            return true;
+        }
+
+        public String toString()
+        {
+            StringBuilder sb = new StringBuilder();
+            List<ResultHandler.ComparableDefinition> colDefs = getColumnDefinitions().asList();
+            for (int i = 0; i < getColumnDefinitions().size(); i++)
+            {
+                ByteBuffer bb = getBytesUnsafe(i);
+                String row = bb != null ? ByteBufferUtil.bytesToHex(bb) : "NULL";
+                sb.append(colDefs.get(i)).append(':').append(row).append(",");
+            }
+            return sb.toString();
+        }
+    }
+
+    public static class DriverColumnDefinitions implements ResultHandler.ComparableColumnDefinitions
+    {
+        private final ColumnDefinitions columnDefinitions;
+        private final boolean failed;
+
+        public DriverColumnDefinitions(ColumnDefinitions columnDefinitions)
+        {
+            this(columnDefinitions, false);
+        }
+
+        private DriverColumnDefinitions(ColumnDefinitions columnDefinitions, boolean failed)
+        {
+            this.columnDefinitions = columnDefinitions;
+            this.failed = failed;
+        }
+
+        public List<ResultHandler.ComparableDefinition> asList()
+        {
+            if (wasFailed())
+                return Collections.emptyList();
+            return columnDefinitions.asList().stream().map(DriverDefinition::new).collect(Collectors.toList());
+        }
+
+        public boolean wasFailed()
+        {
+            return failed;
+        }
+
+        public int size()
+        {
+            return columnDefinitions.size();
+        }
+
+        public Iterator<ResultHandler.ComparableDefinition> iterator()
+        {
+            return asList().iterator();
+        }
+
+        public boolean equals(Object oo)
+        {
+            if (!(oo instanceof ResultHandler.ComparableColumnDefinitions))
+                return false;
+
+            ResultHandler.ComparableColumnDefinitions o = (ResultHandler.ComparableColumnDefinitions)oo;
+            if (wasFailed() && o.wasFailed())
+                return true;
+
+            if (size() != o.size())
+                return false;
+
+            return asList().equals(o.asList());
+        }
+    }
+
+    public static class DriverDefinition implements ResultHandler.ComparableDefinition
+    {
+        private final ColumnDefinitions.Definition def;
+
+        public DriverDefinition(ColumnDefinitions.Definition def)
+        {
+            this.def = def;
+        }
+
+        public String getType()
+        {
+            return def.getType().toString();
+        }
+
+        public String getName()
+        {
+            return def.getName();
+        }
+
+        public boolean equals(Object oo)
+        {
+            if (!(oo instanceof ResultHandler.ComparableDefinition))
+                return false;
+
+            return def.equals(((DriverDefinition)oo).def);
+        }
+
+        public String toString()
+        {
+            return getName() + ':' + getType();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffb772/src/java/org/apache/cassandra/tools/fqltool/Dump.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/fqltool/Dump.java b/src/java/org/apache/cassandra/tools/fqltool/Dump.java
deleted file mode 100644
index a8e7592..0000000
--- a/src/java/org/apache/cassandra/tools/fqltool/Dump.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.tools.fqltool;
-
-import java.io.File;
-import java.nio.BufferUnderflowException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.stream.Collectors;
-
-import io.airlift.airline.Arguments;
-import io.airlift.airline.Command;
-import io.airlift.airline.Option;
-import io.netty.buffer.Unpooled;
-import net.openhft.chronicle.bytes.Bytes;
-import net.openhft.chronicle.queue.ChronicleQueue;
-import net.openhft.chronicle.queue.ChronicleQueueBuilder;
-import net.openhft.chronicle.queue.ExcerptTailer;
-import net.openhft.chronicle.queue.RollCycles;
-import net.openhft.chronicle.threads.Pauser;
-import net.openhft.chronicle.wire.ReadMarshallable;
-import net.openhft.chronicle.wire.ValueIn;
-import net.openhft.chronicle.wire.WireIn;
-import org.apache.cassandra.audit.FullQueryLogger;
-import org.apache.cassandra.cql3.QueryOptions;
-import org.apache.cassandra.transport.ProtocolVersion;
-
-/**
- * Dump the contents of a list of paths containing full query logs
- */
-@Command(name = "dump", description = "Dump the contents of a full query log")
-public class Dump implements Runnable
-{
-    static final char[] HEXI_DECIMAL = "0123456789ABCDEF".toCharArray();
-
-    @Arguments(usage = "<path1> [<path2>...<pathN>]", description = "Path containing the full query logs to dump.", required = true)
-    private List<String> arguments = new ArrayList<>();
-
-    @Option(title = "roll_cycle", name = {"--roll-cycle"}, description = "How often to roll the log file was rolled. May be necessary for Chronicle to correctly parse file names. (MINUTELY, HOURLY, DAILY). Default HOURLY.")
-    private String rollCycle = "HOURLY";
-
-    @Option(title = "follow", name = {"--follow"}, description = "Upon reacahing the end of the log continue indefinitely waiting for more records")
-    private boolean follow = false;
-
-    @Override
-    public void run()
-    {
-        dump(arguments, rollCycle, follow);
-    }
-
-    public static void dump(List<String> arguments, String rollCycle, boolean follow)
-    {
-        StringBuilder sb = new StringBuilder();
-        ReadMarshallable reader = wireIn ->
-        {
-            sb.setLength(0);
-
-            int version = wireIn.read(FullQueryLogger.VERSION).int16();
-            if (version != FullQueryLogger.CURRENT_VERSION)
-                throw new UnsupportedOperationException("Full query log of unexpected version " + version + " encountered");
-
-            String type = wireIn.read(FullQueryLogger.TYPE).text();
-            sb.append("Type: ")
-              .append(type)
-              .append(System.lineSeparator());
-
-            long queryStartTime = wireIn.read(FullQueryLogger.QUERY_START_TIME).int64();
-            sb.append("Query start time: ")
-              .append(queryStartTime)
-              .append(System.lineSeparator());
-
-            int protocolVersion = wireIn.read(FullQueryLogger.PROTOCOL_VERSION).int32();
-            sb.append("Protocol version: ")
-              .append(protocolVersion)
-              .append(System.lineSeparator());
-
-            QueryOptions options =
-                QueryOptions.codec.decode(Unpooled.wrappedBuffer(wireIn.read(FullQueryLogger.QUERY_OPTIONS).bytes()),
-                                          ProtocolVersion.decode(protocolVersion));
-
-            long generatedTimestamp = wireIn.read(FullQueryLogger.GENERATED_TIMESTAMP).int64();
-            sb.append("Generated timestamp:")
-              .append(generatedTimestamp)
-              .append(System.lineSeparator());
-
-            int generatedNowInSeconds = wireIn.read(FullQueryLogger.GENERATED_NOW_IN_SECONDS).int32();
-            sb.append("Generated nowInSeconds:")
-              .append(generatedNowInSeconds)
-              .append(System.lineSeparator());
-
-            switch (type)
-            {
-                case (FullQueryLogger.SINGLE_QUERY):
-                    dumpQuery(options, wireIn, sb);
-                    break;
-
-                case (FullQueryLogger.BATCH):
-                    dumpBatch(options, wireIn, sb);
-                    break;
-
-                default:
-                    throw new UnsupportedOperationException("Log entry of unsupported type " + type);
-            }
-
-            System.out.print(sb.toString());
-            System.out.flush();
-        };
-
-        //Backoff strategy for spinning on the queue, not aggressive at all as this doesn't need to be low latency
-        Pauser pauser = Pauser.millis(100);
-        List<ChronicleQueue> queues = arguments.stream().distinct().map(path -> ChronicleQueueBuilder.single(new File(path)).readOnly(true).rollCycle(RollCycles.valueOf(rollCycle)).build()).collect(Collectors.toList());
-        List<ExcerptTailer> tailers = queues.stream().map(ChronicleQueue::createTailer).collect(Collectors.toList());
-        boolean hadWork = true;
-        while (hadWork)
-        {
-            hadWork = false;
-            for (ExcerptTailer tailer : tailers)
-            {
-                while (tailer.readDocument(reader))
-                {
-                    hadWork = true;
-                }
-            }
-
-            if (follow)
-            {
-                if (!hadWork)
-                {
-                    //Chronicle queue doesn't support blocking so use this backoff strategy
-                    pauser.pause();
-                }
-                //Don't terminate the loop even if there wasn't work
-                hadWork = true;
-            }
-        }
-    }
-
-    private static void dumpQuery(QueryOptions options, WireIn wireIn, StringBuilder sb)
-    {
-        sb.append("Query: ")
-          .append(wireIn.read(FullQueryLogger.QUERY).text())
-          .append(System.lineSeparator());
-
-        List<ByteBuffer> values = options.getValues() != null
-                                ? options.getValues()
-                                : Collections.emptyList();
-
-        sb.append("Values: ")
-          .append(System.lineSeparator());
-        appendValuesToStringBuilder(values, sb);
-        sb.append(System.lineSeparator());
-    }
-
-    private static void dumpBatch(QueryOptions options, WireIn wireIn, StringBuilder sb)
-    {
-        sb.append("Batch type: ")
-          .append(wireIn.read(FullQueryLogger.BATCH_TYPE).text())
-          .append(System.lineSeparator());
-
-        ValueIn in = wireIn.read(FullQueryLogger.QUERIES);
-        int numQueries = in.int32();
-        List<String> queries = new ArrayList<>(numQueries);
-        for (int i = 0; i < numQueries; i++)
-            queries.add(in.text());
-
-        in = wireIn.read(FullQueryLogger.VALUES);
-        int numValues = in.int32();
-
-        for (int i = 0; i < numValues; i++)
-        {
-            int numSubValues = in.int32();
-            List<ByteBuffer> subValues = new ArrayList<>(numSubValues);
-            for (int j = 0; j < numSubValues; j++)
-                subValues.add(ByteBuffer.wrap(in.bytes()));
-
-            sb.append("Query: ")
-              .append(queries.get(i))
-              .append(System.lineSeparator());
-
-            sb.append("Values: ")
-              .append(System.lineSeparator());
-            appendValuesToStringBuilder(subValues, sb);
-        }
-
-        sb.append(System.lineSeparator());
-    }
-
-    private static void appendValuesToStringBuilder(List<ByteBuffer> values, StringBuilder sb)
-    {
-        boolean first = true;
-        for (ByteBuffer value : values)
-        {
-            Bytes bytes = Bytes.wrapForRead(value);
-            long maxLength2 = Math.min(1024, bytes.readLimit() - bytes.readPosition());
-            toHexString(bytes, bytes.readPosition(), maxLength2, sb);
-            if (maxLength2 < bytes.readLimit() - bytes.readPosition())
-            {
-                sb.append("... truncated").append(System.lineSeparator());
-            }
-
-            if (first)
-            {
-                first = false;
-            }
-            else
-            {
-                sb.append("-----").append(System.lineSeparator());
-            }
-        }
-    }
-
-    //This is from net.openhft.chronicle.bytes, need to pass in the StringBuilder so had to copy
-    /*
-     * Copyright 2016 higherfrequencytrading.com
-     *
-     * Licensed under the Apache License, Version 2.0 (the "License");
-     * you may not use this file except in compliance with the License.
-     * You may obtain a copy of the License at
-     *
-     *     http://www.apache.org/licenses/LICENSE-2.0
-     *
-     * Unless required by applicable law or agreed to in writing, software
-     * distributed under the License is distributed on an "AS IS" BASIS,
-     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-     * See the License for the specific language governing permissions and
-     * limitations under the License.
-     */
-    /**
-     * display the hex data of {@link Bytes} from the position() to the limit()
-     *
-     * @param bytes the buffer you wish to toString()
-     * @return hex representation of the buffer, from example [0D ,OA, FF]
-     */
-    public static String toHexString(final Bytes bytes, long offset, long len, StringBuilder builder)
-    throws BufferUnderflowException
-    {
-        if (len == 0)
-            return "";
-
-        int width = 16;
-        int[] lastLine = new int[width];
-        String sep = "";
-        long position = bytes.readPosition();
-        long limit = bytes.readLimit();
-
-        try {
-            bytes.readPositionRemaining(offset, len);
-
-            long start = offset / width * width;
-            long end = (offset + len + width - 1) / width * width;
-            for (long i = start; i < end; i += width) {
-                // check for duplicate rows
-                if (i + width < end) {
-                    boolean same = true;
-
-                    for (int j = 0; j < width && i + j < offset + len; j++) {
-                        int ch = bytes.readUnsignedByte(i + j);
-                        same &= (ch == lastLine[j]);
-                        lastLine[j] = ch;
-                    }
-                    if (i > start && same) {
-                        sep = "........\n";
-                        continue;
-                    }
-                }
-                builder.append(sep);
-                sep = "";
-                String str = Long.toHexString(i);
-                for (int j = str.length(); j < 8; j++)
-                    builder.append('0');
-                builder.append(str);
-                for (int j = 0; j < width; j++) {
-                    if (j == width / 2)
-                        builder.append(' ');
-                    if (i + j < offset || i + j >= offset + len) {
-                        builder.append("   ");
-
-                    } else {
-                        builder.append(' ');
-                        int ch = bytes.readUnsignedByte(i + j);
-                        builder.append(HEXI_DECIMAL[ch >> 4]);
-                        builder.append(HEXI_DECIMAL[ch & 15]);
-                    }
-                }
-                builder.append(' ');
-                for (int j = 0; j < width; j++) {
-                    if (j == width / 2)
-                        builder.append(' ');
-                    if (i + j < offset || i + j >= offset + len) {
-                        builder.append(' ');
-
-                    } else {
-                        int ch = bytes.readUnsignedByte(i + j);
-                        if (ch < ' ' || ch > 126)
-                            ch = '\u00B7';
-                        builder.append((char) ch);
-                    }
-                }
-                builder.append("\n");
-            }
-            return builder.toString();
-        } finally {
-            bytes.readLimit(limit);
-            bytes.readPosition(position);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffb772/src/java/org/apache/cassandra/tools/fqltool/FQLQuery.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/fqltool/FQLQuery.java b/src/java/org/apache/cassandra/tools/fqltool/FQLQuery.java
new file mode 100644
index 0000000..6c0a6b9
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/fqltool/FQLQuery.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.fqltool;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import com.google.common.primitives.Longs;
+
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.Statement;
+import org.apache.cassandra.audit.FullQueryLogger;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.binlog.BinLog;
+
+public abstract class FQLQuery implements Comparable<FQLQuery>
+{
+    public final long queryStartTime;
+    public final QueryOptions queryOptions;
+    public final int protocolVersion;
+    public final String keyspace;
+    public final long generatedTimestamp;
+    private final int generatedNowInSeconds;
+
+    public FQLQuery(String keyspace, int protocolVersion, QueryOptions queryOptions, long queryStartTime, long generatedTimestamp, int generatedNowInSeconds)
+    {
+        this.queryStartTime = queryStartTime;
+        this.queryOptions = queryOptions;
+        this.protocolVersion = protocolVersion;
+        this.keyspace = keyspace;
+        this.generatedTimestamp = generatedTimestamp;
+        this.generatedNowInSeconds = generatedNowInSeconds;
+    }
+
+    public abstract Statement toStatement();
+
+    /**
+     * used when storing the queries executed
+     */
+    public abstract BinLog.ReleaseableWriteMarshallable toMarshallable();
+
+    public QueryState queryState()
+    {
+        ClientState clientState = keyspace != null ? ClientState.forInternalCalls(keyspace) : ClientState.forInternalCalls();
+
+        return new QueryState(clientState, generatedTimestamp, generatedNowInSeconds);
+    }
+
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (!(o instanceof FQLQuery)) return false;
+        FQLQuery fqlQuery = (FQLQuery) o;
+        return queryStartTime == fqlQuery.queryStartTime &&
+               protocolVersion == fqlQuery.protocolVersion &&
+               generatedTimestamp == fqlQuery.generatedTimestamp &&
+               generatedNowInSeconds == fqlQuery.generatedNowInSeconds &&
+               Objects.equals(queryOptions.getValues(), fqlQuery.queryOptions.getValues()) &&
+               Objects.equals(keyspace, fqlQuery.keyspace);
+    }
+
+    public int hashCode()
+    {
+        return Objects.hash(queryStartTime, queryOptions, protocolVersion, keyspace, generatedTimestamp, generatedNowInSeconds);
+    }
+
+    public int compareTo(FQLQuery other)
+    {
+        int cmp = Longs.compare(queryStartTime, other.queryStartTime);
+        if (cmp != 0)
+            return cmp;
+        cmp = Longs.compare(generatedTimestamp, other.generatedTimestamp);
+        if (cmp != 0)
+            return cmp;
+
+        return Longs.compare(generatedNowInSeconds, other.generatedNowInSeconds);
+    }
+
+    public String toString()
+    {
+        return "FQLQuery{" +
+               "queryStartTime=" + queryStartTime +
+               ", protocolVersion=" + protocolVersion +
+               ", keyspace='" + keyspace + '\'' +
+               ", generatedTimestamp=" + generatedTimestamp +
+               ", generatedNowInSeconds=" + generatedNowInSeconds +
+               '}';
+    }
+
+    public static class Single extends FQLQuery
+    {
+        public final String query;
+        public final List<ByteBuffer> values;
+
+        public Single(String keyspace, int protocolVersion, QueryOptions queryOptions, long queryStartTime, long generatedTimestamp, int generatedNowInSeconds, String queryString, List<ByteBuffer> values)
+        {
+            super(keyspace, protocolVersion, queryOptions, queryStartTime, generatedTimestamp, generatedNowInSeconds);
+            this.query = queryString;
+            this.values = values;
+        }
+
+        @Override
+        public String toString()
+        {
+            return String.format("%s%nQuery = %s, Values = %s",
+                                 super.toString(),
+                                 query,
+                                 values.stream().map(ByteBufferUtil::bytesToHex).collect(Collectors.joining(",")));
+        }
+
+        public Statement toStatement()
+        {
+            SimpleStatement ss = new SimpleStatement(query, values.toArray());
+            ss.setConsistencyLevel(ConsistencyLevel.valueOf(queryOptions.getConsistency().name()));
+            ss.setDefaultTimestamp(generatedTimestamp);
+            return ss;
+        }
+
+        public BinLog.ReleaseableWriteMarshallable toMarshallable()
+        {
+
+            return new FullQueryLogger.Query(query, queryOptions, queryState(), queryStartTime);
+        }
+
+        public int compareTo(FQLQuery other)
+        {
+            int cmp = super.compareTo(other);
+
+            if (cmp == 0)
+            {
+                if (other instanceof Batch)
+                    return -1;
+
+                Single singleQuery = (Single) other;
+
+                cmp = query.compareTo(singleQuery.query);
+                if (cmp == 0)
+                {
+                    if (values.size() != singleQuery.values.size())
+                        return values.size() - singleQuery.values.size();
+                    for (int i = 0; i < values.size(); i++)
+                    {
+                        cmp = values.get(i).compareTo(singleQuery.values.get(i));
+                        if (cmp != 0)
+                            return cmp;
+                    }
+                }
+            }
+            return cmp;
+        }
+
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (!(o instanceof Single)) return false;
+            if (!super.equals(o)) return false;
+            Single single = (Single) o;
+            return Objects.equals(query, single.query) &&
+                   Objects.equals(values, single.values);
+        }
+
+        public int hashCode()
+        {
+            return Objects.hash(super.hashCode(), query, values);
+        }
+    }
+
+    public static class Batch extends FQLQuery
+    {
+        public final BatchStatement.Type batchType;
+        public final List<Single> queries;
+
+        public Batch(String keyspace, int protocolVersion, QueryOptions queryOptions, long queryStartTime, long generatedTimestamp, int generatedNowInSeconds, BatchStatement.Type batchType, List<String> queries, List<List<ByteBuffer>> values)
+        {
+            super(keyspace, protocolVersion, queryOptions, queryStartTime, generatedTimestamp, generatedNowInSeconds);
+            this.batchType = batchType;
+            this.queries = new ArrayList<>(queries.size());
+            for (int i = 0; i < queries.size(); i++)
+                this.queries.add(new Single(keyspace, protocolVersion, queryOptions, queryStartTime, generatedTimestamp, generatedNowInSeconds, queries.get(i), values.get(i)));
+        }
+
+        public Statement toStatement()
+        {
+            BatchStatement bs = new BatchStatement(batchType);
+
+            for (Single query : queries)
+            {
+                bs.add(new SimpleStatement(query.query, query.values.toArray()));
+            }
+            bs.setConsistencyLevel(ConsistencyLevel.valueOf(queryOptions.getConsistency().name()));
+            bs.setDefaultTimestamp(generatedTimestamp); // todo: set actual server side generated time
+            return bs;
+        }
+
+        public int compareTo(FQLQuery other)
+        {
+            int cmp = super.compareTo(other);
+
+            if (cmp == 0)
+            {
+                if (other instanceof Single)
+                    return 1;
+
+                Batch otherBatch = (Batch) other;
+                if (queries.size() != otherBatch.queries.size())
+                    return queries.size() - otherBatch.queries.size();
+                for (int i = 0; i < queries.size(); i++)
+                {
+                    cmp = queries.get(i).compareTo(otherBatch.queries.get(i));
+                    if (cmp != 0)
+                        return cmp;
+                }
+            }
+            return cmp;
+        }
+
+        public BinLog.ReleaseableWriteMarshallable toMarshallable()
+        {
+            List<String> queryStrings = new ArrayList<>();
+            List<List<ByteBuffer>> values = new ArrayList<>();
+            for (Single q : queries)
+            {
+                queryStrings.add(q.query);
+                values.add(q.values);
+            }
+            return new FullQueryLogger.Batch(org.apache.cassandra.cql3.statements.BatchStatement.Type.valueOf(batchType.name()), queryStrings, values, queryOptions, queryState(), queryStartTime);
+        }
+
+        public String toString()
+        {
+            StringBuilder sb = new StringBuilder(super.toString()).append("\nbatch: ").append(batchType).append('\n');
+            for (Single q : queries)
+                sb.append(q.toString()).append('\n');
+            sb.append("end batch");
+            return sb.toString();
+        }
+
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (!(o instanceof Batch)) return false;
+            if (!super.equals(o)) return false;
+            Batch batch = (Batch) o;
+            return batchType == batch.batchType &&
+                   Objects.equals(queries, batch.queries);
+        }
+
+        public int hashCode()
+        {
+            return Objects.hash(super.hashCode(), batchType, queries);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffb772/src/java/org/apache/cassandra/tools/fqltool/FQLQueryIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/fqltool/FQLQueryIterator.java b/src/java/org/apache/cassandra/tools/fqltool/FQLQueryIterator.java
new file mode 100644
index 0000000..390a52e
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/fqltool/FQLQueryIterator.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.fqltool;
+
+import java.util.PriorityQueue;
+
+import net.openhft.chronicle.queue.ExcerptTailer;
+import org.apache.cassandra.utils.AbstractIterator;
+
+public class FQLQueryIterator extends AbstractIterator<FQLQuery>
+{
+    // use a priority queue to be able to sort the head of the query logs in memory
+    private final PriorityQueue<FQLQuery> pq;
+    private final ExcerptTailer tailer;
+    private final FQLQueryReader reader;
+
+    /**
+     * Create an iterator over the FQLQueries in tailer
+     *
+     * Reads up to readAhead queries in to memory to be able to sort them (the files are mostly sorted already)
+     */
+    public FQLQueryIterator(ExcerptTailer tailer, int readAhead)
+    {
+        assert readAhead > 0 : "readAhead needs to be > 0";
+        reader = new FQLQueryReader();
+        this.tailer = tailer;
+        pq = new PriorityQueue<>(readAhead);
+        for (int i = 0; i < readAhead; i++)
+        {
+            FQLQuery next = readNext();
+            if (next != null)
+                pq.add(next);
+            else
+                break;
+        }
+    }
+
+    protected FQLQuery computeNext()
+    {
+        FQLQuery q = pq.poll();
+        if (q == null)
+            return endOfData();
+        FQLQuery next = readNext();
+        if (next != null)
+            pq.add(next);
+        return q;
+    }
+
+    private FQLQuery readNext()
+    {
+        if (tailer.readDocument(reader))
+            return reader.getQuery();
+        return null;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffb772/src/java/org/apache/cassandra/tools/fqltool/FQLQueryReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/fqltool/FQLQueryReader.java b/src/java/org/apache/cassandra/tools/fqltool/FQLQueryReader.java
new file mode 100644
index 0000000..af77c59
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/fqltool/FQLQueryReader.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.fqltool;
+
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.datastax.driver.core.BatchStatement;
+import io.netty.buffer.Unpooled;
+import net.openhft.chronicle.core.io.IORuntimeException;
+import net.openhft.chronicle.wire.ReadMarshallable;
+import net.openhft.chronicle.wire.ValueIn;
+import net.openhft.chronicle.wire.WireIn;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.transport.ProtocolVersion;
+
+import static org.apache.cassandra.audit.FullQueryLogger.GENERATED_NOW_IN_SECONDS;
+import static org.apache.cassandra.audit.FullQueryLogger.GENERATED_TIMESTAMP;
+import static org.apache.cassandra.audit.FullQueryLogger.KEYSPACE;
+import static org.apache.cassandra.audit.FullQueryLogger.PROTOCOL_VERSION;
+import static org.apache.cassandra.audit.FullQueryLogger.QUERY_OPTIONS;
+import static org.apache.cassandra.audit.FullQueryLogger.QUERY_START_TIME;
+import static org.apache.cassandra.audit.FullQueryLogger.TYPE;
+import static org.apache.cassandra.audit.FullQueryLogger.VERSION;
+import static org.apache.cassandra.audit.FullQueryLogger.BATCH;
+import static org.apache.cassandra.audit.FullQueryLogger.BATCH_TYPE;
+import static org.apache.cassandra.audit.FullQueryLogger.QUERIES;
+import static org.apache.cassandra.audit.FullQueryLogger.QUERY;
+import static org.apache.cassandra.audit.FullQueryLogger.SINGLE_QUERY;
+import static org.apache.cassandra.audit.FullQueryLogger.VALUES;
+
+public class FQLQueryReader implements ReadMarshallable
+{
+    private FQLQuery query;
+
+    public void readMarshallable(WireIn wireIn) throws IORuntimeException
+    {
+        int currentVersion = wireIn.read(VERSION).int16();
+        String type = wireIn.read(TYPE).text();
+        long queryStartTime = wireIn.read(QUERY_START_TIME).int64();
+        int protocolVersion = wireIn.read(PROTOCOL_VERSION).int32();
+        QueryOptions queryOptions = QueryOptions.codec.decode(Unpooled.wrappedBuffer(wireIn.read(QUERY_OPTIONS).bytes()), ProtocolVersion.decode(protocolVersion));
+        long generatedTimestamp = wireIn.read(GENERATED_TIMESTAMP).int64();
+        int generatedNowInSeconds = wireIn.read(GENERATED_NOW_IN_SECONDS).int32();
+        String keyspace = wireIn.read(KEYSPACE).text();
+
+        switch (type)
+        {
+            case SINGLE_QUERY:
+                String queryString = wireIn.read(QUERY).text();
+                query = new FQLQuery.Single(keyspace,
+                                            protocolVersion,
+                                            queryOptions,
+                                            queryStartTime,
+                                            generatedTimestamp,
+                                            generatedNowInSeconds,
+                                            queryString,
+                                            queryOptions.getValues());
+                break;
+            case BATCH:
+                BatchStatement.Type batchType = BatchStatement.Type.valueOf(wireIn.read(BATCH_TYPE).text());
+                ValueIn in = wireIn.read(QUERIES);
+                int queryCount = in.int32();
+
+                List<String> queries = new ArrayList<>(queryCount);
+                for (int i = 0; i < queryCount; i++)
+                    queries.add(in.text());
+                in = wireIn.read(VALUES);
+                int valueCount = in.int32();
+                List<List<ByteBuffer>> values = new ArrayList<>(valueCount);
+                for (int ii = 0; ii < valueCount; ii++)
+                {
+                    List<ByteBuffer> subValues = new ArrayList<>();
+                    values.add(subValues);
+                    int numSubValues = in.int32();
+                    for (int zz = 0; zz < numSubValues; zz++)
+                        subValues.add(ByteBuffer.wrap(in.bytes()));
+                }
+                query = new FQLQuery.Batch(keyspace,
+                                           protocolVersion,
+                                           queryOptions,
+                                           queryStartTime,
+                                           generatedTimestamp,
+                                           generatedNowInSeconds,
+                                           batchType,
+                                           queries,
+                                           values);
+                break;
+            default:
+                throw new RuntimeException("Unknown type: " + type);
+        }
+    }
+
+    public FQLQuery getQuery()
+    {
+        return query;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffb772/src/java/org/apache/cassandra/tools/fqltool/QueryReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/fqltool/QueryReplayer.java b/src/java/org/apache/cassandra/tools/fqltool/QueryReplayer.java
new file mode 100644
index 0000000..0c8382f
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/fqltool/QueryReplayer.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.fqltool;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import javax.annotation.Nullable;
+
+import com.google.common.util.concurrent.FluentFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class QueryReplayer implements Closeable
+{
+    private static final int PRINT_RATE = 5000;
+    private final ExecutorService es = Executors.newFixedThreadPool(1);
+    private final Iterator<List<FQLQuery>> queryIterator;
+    private final List<Cluster> targetClusters;
+    private final List<Predicate<FQLQuery>> filters;
+    private final List<Session> sessions;
+    private final ResultHandler resultHandler;
+    private final MetricRegistry metrics = new MetricRegistry();
+    private final boolean debug;
+    private final PrintStream out;
+
+    public QueryReplayer(Iterator<List<FQLQuery>> queryIterator,
+                         List<String> targetHosts,
+                         List<File> resultPaths,
+                         List<Predicate<FQLQuery>> filters,
+                         PrintStream out,
+                         String queryFilePathString,
+                         boolean debug)
+    {
+        this.queryIterator = queryIterator;
+        targetClusters = targetHosts.stream().map(h -> Cluster.builder().addContactPoint(h).build()).collect(Collectors.toList());
+        this.filters = filters;
+        sessions = targetClusters.stream().map(Cluster::connect).collect(Collectors.toList());
+        File queryFilePath = queryFilePathString != null ? new File(queryFilePathString) : null;
+        resultHandler = new ResultHandler(targetHosts, resultPaths, queryFilePath);
+        this.debug = debug;
+        this.out = out;
+    }
+
+    public void replay()
+    {
+        while (queryIterator.hasNext())
+        {
+            List<FQLQuery> queries = queryIterator.next();
+            for (FQLQuery query : queries)
+            {
+                if (filters.stream().anyMatch(f -> !f.test(query)))
+                    continue;
+                try (Timer.Context ctx = metrics.timer("queries").time())
+                {
+                    List<ListenableFuture<ResultHandler.ComparableResultSet>> results = new ArrayList<>(sessions.size());
+                    Statement statement = query.toStatement();
+                    for (Session session : sessions)
+                    {
+                        try
+                        {
+                            if (query.keyspace != null && !query.keyspace.equals(session.getLoggedKeyspace()))
+                            {
+                                if (debug)
+                                    out.printf("Switching keyspace from %s to %s%n", session.getLoggedKeyspace(), query.keyspace);
+                                session.execute("USE " + query.keyspace);
+                            }
+                        }
+                        catch (Throwable t)
+                        {
+                            out.printf("USE %s failed: %s%n", query.keyspace, t.getMessage());
+                        }
+                        if (debug)
+                        {
+                            out.println("Executing query:");
+                            out.println(query);
+                        }
+                        ListenableFuture<ResultSet> future = session.executeAsync(statement);
+                        results.add(handleErrors(future));
+                    }
+
+                    ListenableFuture<List<ResultHandler.ComparableResultSet>> resultList = Futures.allAsList(results);
+
+                    Futures.addCallback(resultList, new FutureCallback<List<ResultHandler.ComparableResultSet>>()
+                    {
+                        public void onSuccess(@Nullable List<ResultHandler.ComparableResultSet> resultSets)
+                        {
+                            // note that the order of resultSets is signifcant here - resultSets.get(x) should
+                            // be the result from a query against targetHosts.get(x)
+                            resultHandler.handleResults(query, resultSets);
+                        }
+
+                        public void onFailure(Throwable throwable)
+                        {
+                            throw new AssertionError("Errors should be handled in FQLQuery.execute", throwable);
+                        }
+                    }, es);
+
+                    FBUtilities.waitOnFuture(resultList);
+                }
+                catch (Throwable t)
+                {
+                    out.printf("QUERY %s got exception: %s", query, t.getMessage());
+                }
+
+                Timer timer = metrics.timer("queries");
+                if (timer.getCount() % PRINT_RATE == 0)
+                    out.printf("%d queries, rate = %.2f%n", timer.getCount(), timer.getOneMinuteRate());
+            }
+        }
+    }
+
+    /**
+     * Make sure we catch any query errors
+     *
+     * On error, this creates a failed ComparableResultSet with the exception set to be able to store
+     * this fact in the result file and handle comparison of failed result sets.
+     */
+    private static ListenableFuture<ResultHandler.ComparableResultSet> handleErrors(ListenableFuture<ResultSet> result)
+    {
+        FluentFuture<ResultHandler.ComparableResultSet> fluentFuture = FluentFuture.from(result)
+                                                                                   .transform(DriverResultSet::new, MoreExecutors.directExecutor());
+        return fluentFuture.catching(Throwable.class, DriverResultSet::failed, MoreExecutors.directExecutor());
+    }
+
+    public void close()
+    {
+        sessions.forEach(Session::close);
+        targetClusters.forEach(Cluster::close);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffb772/src/java/org/apache/cassandra/tools/fqltool/ResultComparator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/fqltool/ResultComparator.java b/src/java/org/apache/cassandra/tools/fqltool/ResultComparator.java
new file mode 100644
index 0000000..4bbaf7a
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/fqltool/ResultComparator.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.fqltool;
+
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Streams;
+
+public class ResultComparator
+{
+    /**
+     * Compares the rows in rows
+     * the row at position x in rows will have come from host at position x in targetHosts
+     */
+    public boolean compareRows(List<String> targetHosts, FQLQuery query, List<ResultHandler.ComparableRow> rows)
+    {
+        if (rows.size() < 2 || rows.stream().allMatch(Objects::isNull))
+            return true;
+
+        if (rows.stream().anyMatch(Objects::isNull))
+        {
+            handleMismatch(targetHosts, query, rows);
+            return false;
+        }
+
+        ResultHandler.ComparableRow ref = rows.get(0);
+        boolean equal = true;
+        for (int i = 1; i < rows.size(); i++)
+        {
+            ResultHandler.ComparableRow compare = rows.get(i);
+            if (!ref.equals(compare))
+                equal = false;
+        }
+        if (!equal)
+            handleMismatch(targetHosts, query, rows);
+        return equal;
+    }
+
+    /**
+     * Compares the column definitions
+     *
+     * the column definitions at position x in cds will have come from host at position x in targetHosts
+     */
+    public boolean compareColumnDefinitions(List<String> targetHosts, FQLQuery query, List<ResultHandler.ComparableColumnDefinitions> cds)
+    {
+        if (cds.size() < 2)
+            return true;
+
+        boolean equal = true;
+        List<ResultHandler.ComparableDefinition> refDefs = cds.get(0).asList();
+        for (int i = 1; i < cds.size(); i++)
+        {
+            List<ResultHandler.ComparableDefinition> toCompare = cds.get(i).asList();
+            if (!refDefs.equals(toCompare))
+                equal = false;
+        }
+        if (!equal)
+            handleColumnDefMismatch(targetHosts, query, cds);
+        return equal;
+    }
+
+    private void handleMismatch(List<String> targetHosts, FQLQuery query, List<ResultHandler.ComparableRow> rows)
+    {
+        System.out.println("MISMATCH:");
+        System.out.println("Query = " + query);
+        System.out.println("Results:");
+        System.out.println(Streams.zip(rows.stream(), targetHosts.stream(), (r, host) -> String.format("%s: %s%n", host, r == null ? "null" : r)).collect(Collectors.joining()));
+    }
+
+    private void handleColumnDefMismatch(List<String> targetHosts, FQLQuery query, List<ResultHandler.ComparableColumnDefinitions> cds)
+    {
+        System.out.println("COLUMN DEFINITION MISMATCH:");
+        System.out.println("Query = " + query);
+        System.out.println("Results: ");
+        System.out.println(Streams.zip(cds.stream(), targetHosts.stream(), (cd, host) -> String.format("%s: %s%n", host, columnDefinitionsString(cd))).collect(Collectors.joining()));
+    }
+
+    private String columnDefinitionsString(ResultHandler.ComparableColumnDefinitions cd)
+    {
+        StringBuilder sb = new StringBuilder();
+        if (cd == null)
+            sb.append("NULL");
+        else if (cd.wasFailed())
+            sb.append("FAILED");
+        else
+        {
+            for (ResultHandler.ComparableDefinition def : cd)
+            {
+                sb.append(def.toString());
+            }
+        }
+        return sb.toString();
+    }
+
+
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffb772/src/java/org/apache/cassandra/tools/fqltool/ResultHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/fqltool/ResultHandler.java b/src/java/org/apache/cassandra/tools/fqltool/ResultHandler.java
new file mode 100644
index 0000000..c769231
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/fqltool/ResultHandler.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.fqltool;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class ResultHandler
+{
+    private final ResultStore resultStore;
+    private final ResultComparator resultComparator;
+    private final List<String> targetHosts;
+
+    public ResultHandler(List<String> targetHosts, List<File> resultPaths, File queryFilePath)
+    {
+        this.targetHosts = targetHosts;
+        resultStore = resultPaths != null ? new ResultStore(resultPaths, queryFilePath) : null;
+        resultComparator = new ResultComparator();
+    }
+
+    /**
+     * Since we can't iterate a ResultSet more than once, and we don't want to keep the entire result set in memory
+     * we feed the rows one-by-one to resultComparator and resultStore.
+     *
+     * results.get(x) should be the results from executing query against targetHosts.get(x)
+     */
+    public void handleResults(FQLQuery query, List<ComparableResultSet> results)
+    {
+        for (int i = 0; i < targetHosts.size(); i++)
+        {
+            if (results.get(i).wasFailed())
+            {
+                System.out.println("Query against "+targetHosts.get(i)+" failure:");
+                System.out.println(query);
+                System.out.println("Message: "+results.get(i).getFailureException().getMessage());
+            }
+        }
+
+        List<ComparableColumnDefinitions> columnDefinitions = results.stream().map(ComparableResultSet::getColumnDefinitions).collect(Collectors.toList());
+        resultComparator.compareColumnDefinitions(targetHosts, query, columnDefinitions);
+        if (resultStore != null)
+            resultStore.storeColumnDefinitions(query, columnDefinitions);
+        List<Iterator<ComparableRow>> iters = results.stream().map(Iterable::iterator).collect(Collectors.toList());
+
+        while (true)
+        {
+            List<ComparableRow> rows = rows(iters);
+            resultComparator.compareRows(targetHosts, query, rows);
+            if (resultStore != null)
+                resultStore.storeRows(rows);
+            // all rows being null marks end of all resultsets, we need to call compareRows
+            // and storeRows once with everything null to mark that fact
+            if (rows.stream().allMatch(Objects::isNull))
+                return;
+        }
+    }
+
+    /**
+     * Get the first row from each of the iterators, if the iterator has run out, null will mark that in the list
+     */
+    @VisibleForTesting
+    public static List<ComparableRow> rows(List<Iterator<ComparableRow>> iters)
+    {
+        List<ComparableRow> rows = new ArrayList<>(iters.size());
+        for (Iterator<ComparableRow> iter : iters)
+        {
+            if (iter.hasNext())
+                rows.add(iter.next());
+            else
+                rows.add(null);
+        }
+        return rows;
+    }
+
+    public interface ComparableResultSet extends Iterable<ComparableRow>
+    {
+        public ComparableColumnDefinitions getColumnDefinitions();
+        public boolean wasFailed();
+        public Throwable getFailureException();
+    }
+
+    public interface ComparableColumnDefinitions extends Iterable<ComparableDefinition>
+    {
+        public List<ComparableDefinition> asList();
+        public boolean wasFailed();
+        public int size();
+    }
+
+    public interface ComparableDefinition
+    {
+        public String getType();
+        public String getName();
+    }
+
+    public interface ComparableRow
+    {
+        public ByteBuffer getBytesUnsafe(int i);
+        public ComparableColumnDefinitions getColumnDefinitions();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffb772/src/java/org/apache/cassandra/tools/fqltool/ResultStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/fqltool/ResultStore.java b/src/java/org/apache/cassandra/tools/fqltool/ResultStore.java
new file mode 100644
index 0000000..6d6aaac
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/fqltool/ResultStore.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.fqltool;
+
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import net.openhft.chronicle.bytes.BytesStore;
+import net.openhft.chronicle.core.io.Closeable;
+import net.openhft.chronicle.queue.ChronicleQueue;
+import net.openhft.chronicle.queue.ChronicleQueueBuilder;
+import net.openhft.chronicle.queue.ExcerptAppender;
+import net.openhft.chronicle.wire.ValueOut;
+import org.apache.cassandra.utils.binlog.BinLog;
+
+/**
+ * see FQLReplayTest#readResultFile for how to read files produced by this class
+ */
+public class ResultStore
+{
+    private final List<ChronicleQueue> queues;
+    private final List<ExcerptAppender> appenders;
+    private final ChronicleQueue queryStoreQueue;
+    private final ExcerptAppender queryStoreAppender;
+    private final Set<Integer> finishedHosts = new HashSet<>();
+
+    public ResultStore(List<File> resultPaths, File queryFilePath)
+    {
+        queues = resultPaths.stream().map(path -> ChronicleQueueBuilder.single(path).build()).collect(Collectors.toList());
+        appenders = queues.stream().map(ChronicleQueue::acquireAppender).collect(Collectors.toList());
+        queryStoreQueue = queryFilePath != null ? ChronicleQueueBuilder.single(queryFilePath).build() : null;
+        queryStoreAppender = queryStoreQueue != null ? queryStoreQueue.acquireAppender() : null;
+    }
+
+    /**
+     * Store the column definitions in cds
+     *
+     * the ColumnDefinitions at position x will get stored by the appender at position x
+     *
+     * Calling this method indicates that we are starting a new result set from a query, it must be called before
+     * calling storeRows.
+     *
+     */
+    public void storeColumnDefinitions(FQLQuery query, List<ResultHandler.ComparableColumnDefinitions> cds)
+    {
+        finishedHosts.clear();
+        if (queryStoreAppender != null)
+        {
+            BinLog.ReleaseableWriteMarshallable writeMarshallableQuery = query.toMarshallable();
+            queryStoreAppender.writeDocument(writeMarshallableQuery);
+            writeMarshallableQuery.release();
+        }
+        for (int i = 0; i < cds.size(); i++)
+        {
+            ResultHandler.ComparableColumnDefinitions cd = cds.get(i);
+            appenders.get(i).writeDocument(wire ->
+                                           {
+                                               if (!cd.wasFailed())
+                                               {
+                                                   wire.write("type").text("column_definitions");
+                                                   wire.write("column_count").int32(cd.size());
+                                                   for (ResultHandler.ComparableDefinition d : cd.asList())
+                                                   {
+                                                       ValueOut vo = wire.write("column_definition");
+                                                       vo.text(d.getName());
+                                                       vo.text(d.getType());
+                                                   }
+                                               }
+                                               else
+                                               {
+                                                   wire.write("type").text("query_failed");
+                                               }
+                                           });
+        }
+    }
+
+    /**
+     * Store rows
+     *
+     * the row at position x will get stored by appender at position x
+     *
+     * Before calling this for a new result set, storeColumnDefinitions must be called.
+     */
+    public void storeRows(List<ResultHandler.ComparableRow> rows)
+    {
+        for (int i = 0; i < rows.size(); i++)
+        {
+            ResultHandler.ComparableRow row = rows.get(i);
+            if (row == null && !finishedHosts.contains(i))
+            {
+                appenders.get(i).writeDocument(wire -> wire.write("type").text("end_resultset"));
+                finishedHosts.add(i);
+            }
+            else if (row != null)
+            {
+                appenders.get(i).writeDocument(wire ->
+                                               {
+                                                   {
+                                                       wire.write("type").text("row");
+                                                       wire.write("row_column_count").int32(row.getColumnDefinitions().size());
+                                                       for (int jj = 0; jj < row.getColumnDefinitions().size(); jj++)
+                                                       {
+                                                           ByteBuffer bb = row.getBytesUnsafe(jj);
+                                                           if (bb != null)
+                                                               wire.write("column").bytes(BytesStore.wrap(bb));
+                                                           else
+                                                               wire.write("column").bytes("NULL".getBytes());
+                                                       }
+                                                   }
+                                               });
+            }
+        }
+    }
+
+    public void close()
+    {
+        queues.forEach(Closeable::close);
+        if (queryStoreQueue != null)
+            queryStoreQueue.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffb772/src/java/org/apache/cassandra/tools/fqltool/commands/Dump.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/fqltool/commands/Dump.java b/src/java/org/apache/cassandra/tools/fqltool/commands/Dump.java
new file mode 100644
index 0000000..5c23d3e
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/fqltool/commands/Dump.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.fqltool.commands;
+
+import java.io.File;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import io.airlift.airline.Arguments;
+import io.airlift.airline.Command;
+import io.airlift.airline.Option;
+import io.netty.buffer.Unpooled;
+import net.openhft.chronicle.bytes.Bytes;
+import net.openhft.chronicle.queue.ChronicleQueue;
+import net.openhft.chronicle.queue.ChronicleQueueBuilder;
+import net.openhft.chronicle.queue.ExcerptTailer;
+import net.openhft.chronicle.queue.RollCycles;
+import net.openhft.chronicle.threads.Pauser;
+import net.openhft.chronicle.wire.ReadMarshallable;
+import net.openhft.chronicle.wire.ValueIn;
+import net.openhft.chronicle.wire.WireIn;
+import org.apache.cassandra.audit.FullQueryLogger;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.transport.ProtocolVersion;
+
+/**
+ * Dump the contents of a list of paths containing full query logs
+ */
+@Command(name = "dump", description = "Dump the contents of a full query log")
+public class Dump implements Runnable
+{
+    static final char[] HEXI_DECIMAL = "0123456789ABCDEF".toCharArray();
+
+    @Arguments(usage = "<path1> [<path2>...<pathN>]", description = "Path containing the full query logs to dump.", required = true)
+    private List<String> arguments = new ArrayList<>();
+
+    @Option(title = "roll_cycle", name = {"--roll-cycle"}, description = "How often to roll the log file was rolled. May be necessary for Chronicle to correctly parse file names. (MINUTELY, HOURLY, DAILY). Default HOURLY.")
+    private String rollCycle = "HOURLY";
+
+    @Option(title = "follow", name = {"--follow"}, description = "Upon reacahing the end of the log continue indefinitely waiting for more records")
+    private boolean follow = false;
+
+    @Override
+    public void run()
+    {
+        dump(arguments, rollCycle, follow);
+    }
+
+    public static void dump(List<String> arguments, String rollCycle, boolean follow)
+    {
+        StringBuilder sb = new StringBuilder();
+        ReadMarshallable reader = wireIn ->
+        {
+            sb.setLength(0);
+
+            int version = wireIn.read(FullQueryLogger.VERSION).int16();
+            if (version != FullQueryLogger.CURRENT_VERSION)
+                throw new UnsupportedOperationException("Full query log of unexpected version " + version + " encountered");
+
+            String type = wireIn.read(FullQueryLogger.TYPE).text();
+            sb.append("Type: ")
+              .append(type)
+              .append(System.lineSeparator());
+
+            long queryStartTime = wireIn.read(FullQueryLogger.QUERY_START_TIME).int64();
+            sb.append("Query start time: ")
+              .append(queryStartTime)
+              .append(System.lineSeparator());
+
+            int protocolVersion = wireIn.read(FullQueryLogger.PROTOCOL_VERSION).int32();
+            sb.append("Protocol version: ")
+              .append(protocolVersion)
+              .append(System.lineSeparator());
+
+            QueryOptions options =
+                QueryOptions.codec.decode(Unpooled.wrappedBuffer(wireIn.read(FullQueryLogger.QUERY_OPTIONS).bytes()),
+                                          ProtocolVersion.decode(protocolVersion));
+
+            long generatedTimestamp = wireIn.read(FullQueryLogger.GENERATED_TIMESTAMP).int64();
+            sb.append("Generated timestamp:")
+              .append(generatedTimestamp)
+              .append(System.lineSeparator());
+
+            int generatedNowInSeconds = wireIn.read(FullQueryLogger.GENERATED_NOW_IN_SECONDS).int32();
+            sb.append("Generated nowInSeconds:")
+              .append(generatedNowInSeconds)
+              .append(System.lineSeparator());
+
+            switch (type)
+            {
+                case (FullQueryLogger.SINGLE_QUERY):
+                    dumpQuery(options, wireIn, sb);
+                    break;
+
+                case (FullQueryLogger.BATCH):
+                    dumpBatch(options, wireIn, sb);
+                    break;
+
+                default:
+                    throw new UnsupportedOperationException("Log entry of unsupported type " + type);
+            }
+
+            System.out.print(sb.toString());
+            System.out.flush();
+        };
+
+        //Backoff strategy for spinning on the queue, not aggressive at all as this doesn't need to be low latency
+        Pauser pauser = Pauser.millis(100);
+        List<ChronicleQueue> queues = arguments.stream().distinct().map(path -> ChronicleQueueBuilder.single(new File(path)).readOnly(true).rollCycle(RollCycles.valueOf(rollCycle)).build()).collect(Collectors.toList());
+        List<ExcerptTailer> tailers = queues.stream().map(ChronicleQueue::createTailer).collect(Collectors.toList());
+        boolean hadWork = true;
+        while (hadWork)
+        {
+            hadWork = false;
+            for (ExcerptTailer tailer : tailers)
+            {
+                while (tailer.readDocument(reader))
+                {
+                    hadWork = true;
+                }
+            }
+
+            if (follow)
+            {
+                if (!hadWork)
+                {
+                    //Chronicle queue doesn't support blocking so use this backoff strategy
+                    pauser.pause();
+                }
+                //Don't terminate the loop even if there wasn't work
+                hadWork = true;
+            }
+        }
+    }
+
+    private static void dumpQuery(QueryOptions options, WireIn wireIn, StringBuilder sb)
+    {
+        sb.append("Query: ")
+          .append(wireIn.read(FullQueryLogger.QUERY).text())
+          .append(System.lineSeparator());
+
+        List<ByteBuffer> values = options.getValues() != null
+                                ? options.getValues()
+                                : Collections.emptyList();
+
+        sb.append("Values: ")
+          .append(System.lineSeparator());
+        appendValuesToStringBuilder(values, sb);
+        sb.append(System.lineSeparator());
+    }
+
+    private static void dumpBatch(QueryOptions options, WireIn wireIn, StringBuilder sb)
+    {
+        sb.append("Batch type: ")
+          .append(wireIn.read(FullQueryLogger.BATCH_TYPE).text())
+          .append(System.lineSeparator());
+
+        ValueIn in = wireIn.read(FullQueryLogger.QUERIES);
+        int numQueries = in.int32();
+        List<String> queries = new ArrayList<>(numQueries);
+        for (int i = 0; i < numQueries; i++)
+            queries.add(in.text());
+
+        in = wireIn.read(FullQueryLogger.VALUES);
+        int numValues = in.int32();
+
+        for (int i = 0; i < numValues; i++)
+        {
+            int numSubValues = in.int32();
+            List<ByteBuffer> subValues = new ArrayList<>(numSubValues);
+            for (int j = 0; j < numSubValues; j++)
+                subValues.add(ByteBuffer.wrap(in.bytes()));
+
+            sb.append("Query: ")
+              .append(queries.get(i))
+              .append(System.lineSeparator());
+
+            sb.append("Values: ")
+              .append(System.lineSeparator());
+            appendValuesToStringBuilder(subValues, sb);
+        }
+
+        sb.append(System.lineSeparator());
+    }
+
+    private static void appendValuesToStringBuilder(List<ByteBuffer> values, StringBuilder sb)
+    {
+        boolean first = true;
+        for (ByteBuffer value : values)
+        {
+            Bytes bytes = Bytes.wrapForRead(value);
+            long maxLength2 = Math.min(1024, bytes.readLimit() - bytes.readPosition());
+            toHexString(bytes, bytes.readPosition(), maxLength2, sb);
+            if (maxLength2 < bytes.readLimit() - bytes.readPosition())
+            {
+                sb.append("... truncated").append(System.lineSeparator());
+            }
+
+            if (first)
+            {
+                first = false;
+            }
+            else
+            {
+                sb.append("-----").append(System.lineSeparator());
+            }
+        }
+    }
+
+    //This is from net.openhft.chronicle.bytes, need to pass in the StringBuilder so had to copy
+    /*
+     * Copyright 2016 higherfrequencytrading.com
+     *
+     * Licensed under the Apache License, Version 2.0 (the "License");
+     * you may not use this file except in compliance with the License.
+     * You may obtain a copy of the License at
+     *
+     *     http://www.apache.org/licenses/LICENSE-2.0
+     *
+     * Unless required by applicable law or agreed to in writing, software
+     * distributed under the License is distributed on an "AS IS" BASIS,
+     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+     * See the License for the specific language governing permissions and
+     * limitations under the License.
+     */
+    /**
+     * display the hex data of {@link Bytes} from the position() to the limit()
+     *
+     * @param bytes the buffer you wish to toString()
+     * @return hex representation of the buffer, from example [0D ,OA, FF]
+     */
+    public static String toHexString(final Bytes bytes, long offset, long len, StringBuilder builder)
+    throws BufferUnderflowException
+    {
+        if (len == 0)
+            return "";
+
+        int width = 16;
+        int[] lastLine = new int[width];
+        String sep = "";
+        long position = bytes.readPosition();
+        long limit = bytes.readLimit();
+
+        try {
+            bytes.readPositionRemaining(offset, len);
+
+            long start = offset / width * width;
+            long end = (offset + len + width - 1) / width * width;
+            for (long i = start; i < end; i += width) {
+                // check for duplicate rows
+                if (i + width < end) {
+                    boolean same = true;
+
+                    for (int j = 0; j < width && i + j < offset + len; j++) {
+                        int ch = bytes.readUnsignedByte(i + j);
+                        same &= (ch == lastLine[j]);
+                        lastLine[j] = ch;
+                    }
+                    if (i > start && same) {
+                        sep = "........\n";
+                        continue;
+                    }
+                }
+                builder.append(sep);
+                sep = "";
+                String str = Long.toHexString(i);
+                for (int j = str.length(); j < 8; j++)
+                    builder.append('0');
+                builder.append(str);
+                for (int j = 0; j < width; j++) {
+                    if (j == width / 2)
+                        builder.append(' ');
+                    if (i + j < offset || i + j >= offset + len) {
+                        builder.append("   ");
+
+                    } else {
+                        builder.append(' ');
+                        int ch = bytes.readUnsignedByte(i + j);
+                        builder.append(HEXI_DECIMAL[ch >> 4]);
+                        builder.append(HEXI_DECIMAL[ch & 15]);
+                    }
+                }
+                builder.append(' ');
+                for (int j = 0; j < width; j++) {
+                    if (j == width / 2)
+                        builder.append(' ');
+                    if (i + j < offset || i + j >= offset + len) {
+                        builder.append(' ');
+
+                    } else {
+                        int ch = bytes.readUnsignedByte(i + j);
+                        if (ch < ' ' || ch > 126)
+                            ch = '\u00B7';
+                        builder.append((char) ch);
+                    }
+                }
+                builder.append("\n");
+            }
+            return builder.toString();
+        } finally {
+            bytes.readLimit(limit);
+            bytes.readPosition(position);
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org