You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2018/09/01 08:44:19 UTC
[3/7] cassandra git commit: Add fqltool replay
Add fqltool replay
Patch by marcuse; reviewed by Jason Brown and Dinesh Joshi for CASSANDRA-14618
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/62ffb772
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/62ffb772
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/62ffb772
Branch: refs/heads/trunk
Commit: 62ffb7723917768c38c9e012710c6dce509191c1
Parents: 46c33f3
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Aug 6 16:32:27 2018 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Sat Sep 1 08:35:54 2018 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/audit/FullQueryLogger.java | 5 +-
.../apache/cassandra/service/QueryState.java | 8 +
.../cassandra/tools/FullQueryLogTool.java | 6 +-
.../tools/fqltool/DriverResultSet.java | 241 ++++++
.../apache/cassandra/tools/fqltool/Dump.java | 325 --------
.../cassandra/tools/fqltool/FQLQuery.java | 278 +++++++
.../tools/fqltool/FQLQueryIterator.java | 72 ++
.../cassandra/tools/fqltool/FQLQueryReader.java | 116 +++
.../cassandra/tools/fqltool/QueryReplayer.java | 167 ++++
.../tools/fqltool/ResultComparator.java | 116 +++
.../cassandra/tools/fqltool/ResultHandler.java | 124 +++
.../cassandra/tools/fqltool/ResultStore.java | 142 ++++
.../cassandra/tools/fqltool/commands/Dump.java | 325 ++++++++
.../tools/fqltool/commands/Replay.java | 148 ++++
.../cassandra/tools/fqltool/FQLReplayTest.java | 760 +++++++++++++++++++
16 files changed, 2505 insertions(+), 329 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffb772/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cd2a14a..1227337 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0
+ * Add fqltool replay (CASSANDRA-14618)
* Log keyspace in full query log (CASSANDRA-14656)
* Transient Replication and Cheap Quorums (CASSANDRA-14404)
* Log server-generated timestamp and nowInSeconds used by queries in FQL (CASSANDRA-14675)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffb772/src/java/org/apache/cassandra/audit/FullQueryLogger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/audit/FullQueryLogger.java b/src/java/org/apache/cassandra/audit/FullQueryLogger.java
index c9f8447..9c1f472 100644
--- a/src/java/org/apache/cassandra/audit/FullQueryLogger.java
+++ b/src/java/org/apache/cassandra/audit/FullQueryLogger.java
@@ -23,6 +23,7 @@ import java.util.List;
import javax.annotation.Nullable;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
@@ -151,7 +152,7 @@ public class FullQueryLogger extends BinLogAuditLogger implements IAuditLogger
logRecord(wrappedQuery, binLog);
}
- static class Query extends AbstractLogEntry
+ public static class Query extends AbstractLogEntry
{
private final String query;
@@ -181,7 +182,7 @@ public class FullQueryLogger extends BinLogAuditLogger implements IAuditLogger
}
}
- static class Batch extends AbstractLogEntry
+ public static class Batch extends AbstractLogEntry
{
private final int weight;
private final BatchStatement.Type batchType;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffb772/src/java/org/apache/cassandra/service/QueryState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/QueryState.java b/src/java/org/apache/cassandra/service/QueryState.java
index 2bd07ab..26f58bf 100644
--- a/src/java/org/apache/cassandra/service/QueryState.java
+++ b/src/java/org/apache/cassandra/service/QueryState.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.service;
import java.net.InetAddress;
+import org.apache.cassandra.transport.ClientStat;
import org.apache.cassandra.utils.FBUtilities;
/**
@@ -39,6 +40,13 @@ public class QueryState
this.clientState = clientState;
}
+ public QueryState(ClientState clientState, long timestamp, int nowInSeconds)
+ {
+ this(clientState);
+ this.timestamp = timestamp;
+ this.nowInSeconds = nowInSeconds;
+ }
+
/**
* @return a QueryState object for internal C* calls (not limited by any kind of auth).
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffb772/src/java/org/apache/cassandra/tools/FullQueryLogTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/FullQueryLogTool.java b/src/java/org/apache/cassandra/tools/FullQueryLogTool.java
index 0d170d9..c1d4713 100644
--- a/src/java/org/apache/cassandra/tools/FullQueryLogTool.java
+++ b/src/java/org/apache/cassandra/tools/FullQueryLogTool.java
@@ -31,7 +31,8 @@ import io.airlift.airline.ParseCommandUnrecognizedException;
import io.airlift.airline.ParseOptionConversionException;
import io.airlift.airline.ParseOptionMissingException;
import io.airlift.airline.ParseOptionMissingValueException;
-import org.apache.cassandra.tools.fqltool.Dump;
+import org.apache.cassandra.tools.fqltool.commands.Dump;
+import org.apache.cassandra.tools.fqltool.commands.Replay;
import static com.google.common.base.Throwables.getStackTraceAsString;
import static com.google.common.collect.Lists.newArrayList;
@@ -42,7 +43,8 @@ public class FullQueryLogTool
{
List<Class<? extends Runnable>> commands = newArrayList(
Help.class,
- Dump.class
+ Dump.class,
+ Replay.class
);
Cli.CliBuilder<Runnable> builder = Cli.builder("fqltool");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffb772/src/java/org/apache/cassandra/tools/fqltool/DriverResultSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/fqltool/DriverResultSet.java b/src/java/org/apache/cassandra/tools/fqltool/DriverResultSet.java
new file mode 100644
index 0000000..6c4ee45
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/fqltool/DriverResultSet.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.fqltool;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.AbstractIterator;
+
+import com.datastax.driver.core.ColumnDefinitions;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+
+/**
+ * Wraps a result set from the driver so that we can reuse the compare code when reading
+ * up a result set produced by ResultStore.
+ */
+public class DriverResultSet implements ResultHandler.ComparableResultSet
+{
+ private final ResultSet resultSet;
+ private final Throwable failureException;
+
+ public DriverResultSet(ResultSet resultSet)
+ {
+ this(resultSet, null);
+ }
+
+ private DriverResultSet(ResultSet res, Throwable failureException)
+ {
+ resultSet = res;
+ this.failureException = failureException;
+ }
+
+ public static DriverResultSet failed(Throwable ex)
+ {
+ return new DriverResultSet(null, ex);
+ }
+
+ public ResultHandler.ComparableColumnDefinitions getColumnDefinitions()
+ {
+ if (wasFailed())
+ return new DriverColumnDefinitions(null, true);
+
+ return new DriverColumnDefinitions(resultSet.getColumnDefinitions());
+ }
+
+ public boolean wasFailed()
+ {
+ return failureException != null;
+ }
+
+ public Throwable getFailureException()
+ {
+ return failureException;
+ }
+
+ public Iterator<ResultHandler.ComparableRow> iterator()
+ {
+ if (wasFailed())
+ return Collections.emptyListIterator();
+ return new AbstractIterator<ResultHandler.ComparableRow>()
+ {
+ Iterator<Row> iter = resultSet.iterator();
+ protected ResultHandler.ComparableRow computeNext()
+ {
+ if (iter.hasNext())
+ return new DriverRow(iter.next());
+ return endOfData();
+ }
+ };
+ }
+
+ public static class DriverRow implements ResultHandler.ComparableRow
+ {
+ private final Row row;
+
+ public DriverRow(Row row)
+ {
+ this.row = row;
+ }
+
+ public ResultHandler.ComparableColumnDefinitions getColumnDefinitions()
+ {
+ return new DriverColumnDefinitions(row.getColumnDefinitions());
+ }
+
+ public ByteBuffer getBytesUnsafe(int i)
+ {
+ return row.getBytesUnsafe(i);
+ }
+
+ @Override
+ public boolean equals(Object oo)
+ {
+ if (!(oo instanceof ResultHandler.ComparableRow))
+ return false;
+
+ ResultHandler.ComparableRow o = (ResultHandler.ComparableRow)oo;
+ if (getColumnDefinitions().size() != o.getColumnDefinitions().size())
+ return false;
+
+ for (int j = 0; j < getColumnDefinitions().size(); j++)
+ {
+ ByteBuffer b1 = getBytesUnsafe(j);
+ ByteBuffer b2 = o.getBytesUnsafe(j);
+
+ if (b1 != null && b2 != null && !b1.equals(b2))
+ {
+ return false;
+ }
+ if (b1 == null && b2 != null || b2 == null && b1 != null)
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ List<ResultHandler.ComparableDefinition> colDefs = getColumnDefinitions().asList();
+ for (int i = 0; i < getColumnDefinitions().size(); i++)
+ {
+ ByteBuffer bb = getBytesUnsafe(i);
+ String row = bb != null ? ByteBufferUtil.bytesToHex(bb) : "NULL";
+ sb.append(colDefs.get(i)).append(':').append(row).append(",");
+ }
+ return sb.toString();
+ }
+ }
+
+ public static class DriverColumnDefinitions implements ResultHandler.ComparableColumnDefinitions
+ {
+ private final ColumnDefinitions columnDefinitions;
+ private final boolean failed;
+
+ public DriverColumnDefinitions(ColumnDefinitions columnDefinitions)
+ {
+ this(columnDefinitions, false);
+ }
+
+ private DriverColumnDefinitions(ColumnDefinitions columnDefinitions, boolean failed)
+ {
+ this.columnDefinitions = columnDefinitions;
+ this.failed = failed;
+ }
+
+ public List<ResultHandler.ComparableDefinition> asList()
+ {
+ if (wasFailed())
+ return Collections.emptyList();
+ return columnDefinitions.asList().stream().map(DriverDefinition::new).collect(Collectors.toList());
+ }
+
+ public boolean wasFailed()
+ {
+ return failed;
+ }
+
+ public int size()
+ {
+ return columnDefinitions.size();
+ }
+
+ public Iterator<ResultHandler.ComparableDefinition> iterator()
+ {
+ return asList().iterator();
+ }
+
+ public boolean equals(Object oo)
+ {
+ if (!(oo instanceof ResultHandler.ComparableColumnDefinitions))
+ return false;
+
+ ResultHandler.ComparableColumnDefinitions o = (ResultHandler.ComparableColumnDefinitions)oo;
+ if (wasFailed() && o.wasFailed())
+ return true;
+
+ if (size() != o.size())
+ return false;
+
+ return asList().equals(o.asList());
+ }
+ }
+
+ public static class DriverDefinition implements ResultHandler.ComparableDefinition
+ {
+ private final ColumnDefinitions.Definition def;
+
+ public DriverDefinition(ColumnDefinitions.Definition def)
+ {
+ this.def = def;
+ }
+
+ public String getType()
+ {
+ return def.getType().toString();
+ }
+
+ public String getName()
+ {
+ return def.getName();
+ }
+
+ public boolean equals(Object oo)
+ {
+ if (!(oo instanceof ResultHandler.ComparableDefinition))
+ return false;
+
+ return def.equals(((DriverDefinition)oo).def);
+ }
+
+ public String toString()
+ {
+ return getName() + ':' + getType();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffb772/src/java/org/apache/cassandra/tools/fqltool/Dump.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/fqltool/Dump.java b/src/java/org/apache/cassandra/tools/fqltool/Dump.java
deleted file mode 100644
index a8e7592..0000000
--- a/src/java/org/apache/cassandra/tools/fqltool/Dump.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.tools.fqltool;
-
-import java.io.File;
-import java.nio.BufferUnderflowException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.stream.Collectors;
-
-import io.airlift.airline.Arguments;
-import io.airlift.airline.Command;
-import io.airlift.airline.Option;
-import io.netty.buffer.Unpooled;
-import net.openhft.chronicle.bytes.Bytes;
-import net.openhft.chronicle.queue.ChronicleQueue;
-import net.openhft.chronicle.queue.ChronicleQueueBuilder;
-import net.openhft.chronicle.queue.ExcerptTailer;
-import net.openhft.chronicle.queue.RollCycles;
-import net.openhft.chronicle.threads.Pauser;
-import net.openhft.chronicle.wire.ReadMarshallable;
-import net.openhft.chronicle.wire.ValueIn;
-import net.openhft.chronicle.wire.WireIn;
-import org.apache.cassandra.audit.FullQueryLogger;
-import org.apache.cassandra.cql3.QueryOptions;
-import org.apache.cassandra.transport.ProtocolVersion;
-
-/**
- * Dump the contents of a list of paths containing full query logs
- */
-@Command(name = "dump", description = "Dump the contents of a full query log")
-public class Dump implements Runnable
-{
- static final char[] HEXI_DECIMAL = "0123456789ABCDEF".toCharArray();
-
- @Arguments(usage = "<path1> [<path2>...<pathN>]", description = "Path containing the full query logs to dump.", required = true)
- private List<String> arguments = new ArrayList<>();
-
- @Option(title = "roll_cycle", name = {"--roll-cycle"}, description = "How often to roll the log file was rolled. May be necessary for Chronicle to correctly parse file names. (MINUTELY, HOURLY, DAILY). Default HOURLY.")
- private String rollCycle = "HOURLY";
-
- @Option(title = "follow", name = {"--follow"}, description = "Upon reacahing the end of the log continue indefinitely waiting for more records")
- private boolean follow = false;
-
- @Override
- public void run()
- {
- dump(arguments, rollCycle, follow);
- }
-
- public static void dump(List<String> arguments, String rollCycle, boolean follow)
- {
- StringBuilder sb = new StringBuilder();
- ReadMarshallable reader = wireIn ->
- {
- sb.setLength(0);
-
- int version = wireIn.read(FullQueryLogger.VERSION).int16();
- if (version != FullQueryLogger.CURRENT_VERSION)
- throw new UnsupportedOperationException("Full query log of unexpected version " + version + " encountered");
-
- String type = wireIn.read(FullQueryLogger.TYPE).text();
- sb.append("Type: ")
- .append(type)
- .append(System.lineSeparator());
-
- long queryStartTime = wireIn.read(FullQueryLogger.QUERY_START_TIME).int64();
- sb.append("Query start time: ")
- .append(queryStartTime)
- .append(System.lineSeparator());
-
- int protocolVersion = wireIn.read(FullQueryLogger.PROTOCOL_VERSION).int32();
- sb.append("Protocol version: ")
- .append(protocolVersion)
- .append(System.lineSeparator());
-
- QueryOptions options =
- QueryOptions.codec.decode(Unpooled.wrappedBuffer(wireIn.read(FullQueryLogger.QUERY_OPTIONS).bytes()),
- ProtocolVersion.decode(protocolVersion));
-
- long generatedTimestamp = wireIn.read(FullQueryLogger.GENERATED_TIMESTAMP).int64();
- sb.append("Generated timestamp:")
- .append(generatedTimestamp)
- .append(System.lineSeparator());
-
- int generatedNowInSeconds = wireIn.read(FullQueryLogger.GENERATED_NOW_IN_SECONDS).int32();
- sb.append("Generated nowInSeconds:")
- .append(generatedNowInSeconds)
- .append(System.lineSeparator());
-
- switch (type)
- {
- case (FullQueryLogger.SINGLE_QUERY):
- dumpQuery(options, wireIn, sb);
- break;
-
- case (FullQueryLogger.BATCH):
- dumpBatch(options, wireIn, sb);
- break;
-
- default:
- throw new UnsupportedOperationException("Log entry of unsupported type " + type);
- }
-
- System.out.print(sb.toString());
- System.out.flush();
- };
-
- //Backoff strategy for spinning on the queue, not aggressive at all as this doesn't need to be low latency
- Pauser pauser = Pauser.millis(100);
- List<ChronicleQueue> queues = arguments.stream().distinct().map(path -> ChronicleQueueBuilder.single(new File(path)).readOnly(true).rollCycle(RollCycles.valueOf(rollCycle)).build()).collect(Collectors.toList());
- List<ExcerptTailer> tailers = queues.stream().map(ChronicleQueue::createTailer).collect(Collectors.toList());
- boolean hadWork = true;
- while (hadWork)
- {
- hadWork = false;
- for (ExcerptTailer tailer : tailers)
- {
- while (tailer.readDocument(reader))
- {
- hadWork = true;
- }
- }
-
- if (follow)
- {
- if (!hadWork)
- {
- //Chronicle queue doesn't support blocking so use this backoff strategy
- pauser.pause();
- }
- //Don't terminate the loop even if there wasn't work
- hadWork = true;
- }
- }
- }
-
- private static void dumpQuery(QueryOptions options, WireIn wireIn, StringBuilder sb)
- {
- sb.append("Query: ")
- .append(wireIn.read(FullQueryLogger.QUERY).text())
- .append(System.lineSeparator());
-
- List<ByteBuffer> values = options.getValues() != null
- ? options.getValues()
- : Collections.emptyList();
-
- sb.append("Values: ")
- .append(System.lineSeparator());
- appendValuesToStringBuilder(values, sb);
- sb.append(System.lineSeparator());
- }
-
- private static void dumpBatch(QueryOptions options, WireIn wireIn, StringBuilder sb)
- {
- sb.append("Batch type: ")
- .append(wireIn.read(FullQueryLogger.BATCH_TYPE).text())
- .append(System.lineSeparator());
-
- ValueIn in = wireIn.read(FullQueryLogger.QUERIES);
- int numQueries = in.int32();
- List<String> queries = new ArrayList<>(numQueries);
- for (int i = 0; i < numQueries; i++)
- queries.add(in.text());
-
- in = wireIn.read(FullQueryLogger.VALUES);
- int numValues = in.int32();
-
- for (int i = 0; i < numValues; i++)
- {
- int numSubValues = in.int32();
- List<ByteBuffer> subValues = new ArrayList<>(numSubValues);
- for (int j = 0; j < numSubValues; j++)
- subValues.add(ByteBuffer.wrap(in.bytes()));
-
- sb.append("Query: ")
- .append(queries.get(i))
- .append(System.lineSeparator());
-
- sb.append("Values: ")
- .append(System.lineSeparator());
- appendValuesToStringBuilder(subValues, sb);
- }
-
- sb.append(System.lineSeparator());
- }
-
- private static void appendValuesToStringBuilder(List<ByteBuffer> values, StringBuilder sb)
- {
- boolean first = true;
- for (ByteBuffer value : values)
- {
- Bytes bytes = Bytes.wrapForRead(value);
- long maxLength2 = Math.min(1024, bytes.readLimit() - bytes.readPosition());
- toHexString(bytes, bytes.readPosition(), maxLength2, sb);
- if (maxLength2 < bytes.readLimit() - bytes.readPosition())
- {
- sb.append("... truncated").append(System.lineSeparator());
- }
-
- if (first)
- {
- first = false;
- }
- else
- {
- sb.append("-----").append(System.lineSeparator());
- }
- }
- }
-
- //This is from net.openhft.chronicle.bytes, need to pass in the StringBuilder so had to copy
- /*
- * Copyright 2016 higherfrequencytrading.com
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- /**
- * display the hex data of {@link Bytes} from the position() to the limit()
- *
- * @param bytes the buffer you wish to toString()
- * @return hex representation of the buffer, from example [0D ,OA, FF]
- */
- public static String toHexString(final Bytes bytes, long offset, long len, StringBuilder builder)
- throws BufferUnderflowException
- {
- if (len == 0)
- return "";
-
- int width = 16;
- int[] lastLine = new int[width];
- String sep = "";
- long position = bytes.readPosition();
- long limit = bytes.readLimit();
-
- try {
- bytes.readPositionRemaining(offset, len);
-
- long start = offset / width * width;
- long end = (offset + len + width - 1) / width * width;
- for (long i = start; i < end; i += width) {
- // check for duplicate rows
- if (i + width < end) {
- boolean same = true;
-
- for (int j = 0; j < width && i + j < offset + len; j++) {
- int ch = bytes.readUnsignedByte(i + j);
- same &= (ch == lastLine[j]);
- lastLine[j] = ch;
- }
- if (i > start && same) {
- sep = "........\n";
- continue;
- }
- }
- builder.append(sep);
- sep = "";
- String str = Long.toHexString(i);
- for (int j = str.length(); j < 8; j++)
- builder.append('0');
- builder.append(str);
- for (int j = 0; j < width; j++) {
- if (j == width / 2)
- builder.append(' ');
- if (i + j < offset || i + j >= offset + len) {
- builder.append(" ");
-
- } else {
- builder.append(' ');
- int ch = bytes.readUnsignedByte(i + j);
- builder.append(HEXI_DECIMAL[ch >> 4]);
- builder.append(HEXI_DECIMAL[ch & 15]);
- }
- }
- builder.append(' ');
- for (int j = 0; j < width; j++) {
- if (j == width / 2)
- builder.append(' ');
- if (i + j < offset || i + j >= offset + len) {
- builder.append(' ');
-
- } else {
- int ch = bytes.readUnsignedByte(i + j);
- if (ch < ' ' || ch > 126)
- ch = '\u00B7';
- builder.append((char) ch);
- }
- }
- builder.append("\n");
- }
- return builder.toString();
- } finally {
- bytes.readLimit(limit);
- bytes.readPosition(position);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffb772/src/java/org/apache/cassandra/tools/fqltool/FQLQuery.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/fqltool/FQLQuery.java b/src/java/org/apache/cassandra/tools/fqltool/FQLQuery.java
new file mode 100644
index 0000000..6c0a6b9
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/fqltool/FQLQuery.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.fqltool;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import com.google.common.primitives.Longs;
+
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.Statement;
+import org.apache.cassandra.audit.FullQueryLogger;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.binlog.BinLog;
+
+public abstract class FQLQuery implements Comparable<FQLQuery>
+{
+ public final long queryStartTime;
+ public final QueryOptions queryOptions;
+ public final int protocolVersion;
+ public final String keyspace;
+ public final long generatedTimestamp;
+ private final int generatedNowInSeconds;
+
+ public FQLQuery(String keyspace, int protocolVersion, QueryOptions queryOptions, long queryStartTime, long generatedTimestamp, int generatedNowInSeconds)
+ {
+ this.queryStartTime = queryStartTime;
+ this.queryOptions = queryOptions;
+ this.protocolVersion = protocolVersion;
+ this.keyspace = keyspace;
+ this.generatedTimestamp = generatedTimestamp;
+ this.generatedNowInSeconds = generatedNowInSeconds;
+ }
+
+ public abstract Statement toStatement();
+
+ /**
+ * used when storing the queries executed
+ */
+ public abstract BinLog.ReleaseableWriteMarshallable toMarshallable();
+
+ public QueryState queryState()
+ {
+ ClientState clientState = keyspace != null ? ClientState.forInternalCalls(keyspace) : ClientState.forInternalCalls();
+
+ return new QueryState(clientState, generatedTimestamp, generatedNowInSeconds);
+ }
+
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (!(o instanceof FQLQuery)) return false;
+ FQLQuery fqlQuery = (FQLQuery) o;
+ return queryStartTime == fqlQuery.queryStartTime &&
+ protocolVersion == fqlQuery.protocolVersion &&
+ generatedTimestamp == fqlQuery.generatedTimestamp &&
+ generatedNowInSeconds == fqlQuery.generatedNowInSeconds &&
+ Objects.equals(queryOptions.getValues(), fqlQuery.queryOptions.getValues()) &&
+ Objects.equals(keyspace, fqlQuery.keyspace);
+ }
+
+ public int hashCode()
+ {
+ return Objects.hash(queryStartTime, queryOptions, protocolVersion, keyspace, generatedTimestamp, generatedNowInSeconds);
+ }
+
+ public int compareTo(FQLQuery other)
+ {
+ int cmp = Longs.compare(queryStartTime, other.queryStartTime);
+ if (cmp != 0)
+ return cmp;
+ cmp = Longs.compare(generatedTimestamp, other.generatedTimestamp);
+ if (cmp != 0)
+ return cmp;
+
+ return Longs.compare(generatedNowInSeconds, other.generatedNowInSeconds);
+ }
+
+ public String toString()
+ {
+ return "FQLQuery{" +
+ "queryStartTime=" + queryStartTime +
+ ", protocolVersion=" + protocolVersion +
+ ", keyspace='" + keyspace + '\'' +
+ ", generatedTimestamp=" + generatedTimestamp +
+ ", generatedNowInSeconds=" + generatedNowInSeconds +
+ '}';
+ }
+
+ public static class Single extends FQLQuery
+ {
+ public final String query;
+ public final List<ByteBuffer> values;
+
+ public Single(String keyspace, int protocolVersion, QueryOptions queryOptions, long queryStartTime, long generatedTimestamp, int generatedNowInSeconds, String queryString, List<ByteBuffer> values)
+ {
+ super(keyspace, protocolVersion, queryOptions, queryStartTime, generatedTimestamp, generatedNowInSeconds);
+ this.query = queryString;
+ this.values = values;
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format("%s%nQuery = %s, Values = %s",
+ super.toString(),
+ query,
+ values.stream().map(ByteBufferUtil::bytesToHex).collect(Collectors.joining(",")));
+ }
+
+ public Statement toStatement()
+ {
+ SimpleStatement ss = new SimpleStatement(query, values.toArray());
+ ss.setConsistencyLevel(ConsistencyLevel.valueOf(queryOptions.getConsistency().name()));
+ ss.setDefaultTimestamp(generatedTimestamp);
+ return ss;
+ }
+
+ public BinLog.ReleaseableWriteMarshallable toMarshallable()
+ {
+
+ return new FullQueryLogger.Query(query, queryOptions, queryState(), queryStartTime);
+ }
+
+ public int compareTo(FQLQuery other)
+ {
+ int cmp = super.compareTo(other);
+
+ if (cmp == 0)
+ {
+ if (other instanceof Batch)
+ return -1;
+
+ Single singleQuery = (Single) other;
+
+ cmp = query.compareTo(singleQuery.query);
+ if (cmp == 0)
+ {
+ if (values.size() != singleQuery.values.size())
+ return values.size() - singleQuery.values.size();
+ for (int i = 0; i < values.size(); i++)
+ {
+ cmp = values.get(i).compareTo(singleQuery.values.get(i));
+ if (cmp != 0)
+ return cmp;
+ }
+ }
+ }
+ return cmp;
+ }
+
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (!(o instanceof Single)) return false;
+ if (!super.equals(o)) return false;
+ Single single = (Single) o;
+ return Objects.equals(query, single.query) &&
+ Objects.equals(values, single.values);
+ }
+
+ public int hashCode()
+ {
+ return Objects.hash(super.hashCode(), query, values);
+ }
+ }
+
+ public static class Batch extends FQLQuery
+ {
+ public final BatchStatement.Type batchType;
+ public final List<Single> queries;
+
+ public Batch(String keyspace, int protocolVersion, QueryOptions queryOptions, long queryStartTime, long generatedTimestamp, int generatedNowInSeconds, BatchStatement.Type batchType, List<String> queries, List<List<ByteBuffer>> values)
+ {
+ super(keyspace, protocolVersion, queryOptions, queryStartTime, generatedTimestamp, generatedNowInSeconds);
+ this.batchType = batchType;
+ this.queries = new ArrayList<>(queries.size());
+ for (int i = 0; i < queries.size(); i++)
+ this.queries.add(new Single(keyspace, protocolVersion, queryOptions, queryStartTime, generatedTimestamp, generatedNowInSeconds, queries.get(i), values.get(i)));
+ }
+
+ public Statement toStatement()
+ {
+ BatchStatement bs = new BatchStatement(batchType);
+
+ for (Single query : queries)
+ {
+ bs.add(new SimpleStatement(query.query, query.values.toArray()));
+ }
+ bs.setConsistencyLevel(ConsistencyLevel.valueOf(queryOptions.getConsistency().name()));
+ bs.setDefaultTimestamp(generatedTimestamp); // todo: set actual server side generated time
+ return bs;
+ }
+
+ public int compareTo(FQLQuery other)
+ {
+ int cmp = super.compareTo(other);
+
+ if (cmp == 0)
+ {
+ if (other instanceof Single)
+ return 1;
+
+ Batch otherBatch = (Batch) other;
+ if (queries.size() != otherBatch.queries.size())
+ return queries.size() - otherBatch.queries.size();
+ for (int i = 0; i < queries.size(); i++)
+ {
+ cmp = queries.get(i).compareTo(otherBatch.queries.get(i));
+ if (cmp != 0)
+ return cmp;
+ }
+ }
+ return cmp;
+ }
+
+ public BinLog.ReleaseableWriteMarshallable toMarshallable()
+ {
+ List<String> queryStrings = new ArrayList<>();
+ List<List<ByteBuffer>> values = new ArrayList<>();
+ for (Single q : queries)
+ {
+ queryStrings.add(q.query);
+ values.add(q.values);
+ }
+ return new FullQueryLogger.Batch(org.apache.cassandra.cql3.statements.BatchStatement.Type.valueOf(batchType.name()), queryStrings, values, queryOptions, queryState(), queryStartTime);
+ }
+
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder(super.toString()).append("\nbatch: ").append(batchType).append('\n');
+ for (Single q : queries)
+ sb.append(q.toString()).append('\n');
+ sb.append("end batch");
+ return sb.toString();
+ }
+
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (!(o instanceof Batch)) return false;
+ if (!super.equals(o)) return false;
+ Batch batch = (Batch) o;
+ return batchType == batch.batchType &&
+ Objects.equals(queries, batch.queries);
+ }
+
+ public int hashCode()
+ {
+ return Objects.hash(super.hashCode(), batchType, queries);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffb772/src/java/org/apache/cassandra/tools/fqltool/FQLQueryIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/fqltool/FQLQueryIterator.java b/src/java/org/apache/cassandra/tools/fqltool/FQLQueryIterator.java
new file mode 100644
index 0000000..390a52e
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/fqltool/FQLQueryIterator.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.fqltool;
+
+import java.util.PriorityQueue;
+
+import net.openhft.chronicle.queue.ExcerptTailer;
+import org.apache.cassandra.utils.AbstractIterator;
+
+public class FQLQueryIterator extends AbstractIterator<FQLQuery>
+{
+ // use a priority queue to be able to sort the head of the query logs in memory
+ private final PriorityQueue<FQLQuery> pq;
+ private final ExcerptTailer tailer;
+ private final FQLQueryReader reader;
+
+ /**
+ * Create an iterator over the FQLQueries in tailer
+ *
+ * Reads up to readAhead queries in to memory to be able to sort them (the files are mostly sorted already)
+ */
+ public FQLQueryIterator(ExcerptTailer tailer, int readAhead)
+ {
+ assert readAhead > 0 : "readAhead needs to be > 0";
+ reader = new FQLQueryReader();
+ this.tailer = tailer;
+ pq = new PriorityQueue<>(readAhead);
+ for (int i = 0; i < readAhead; i++)
+ {
+ FQLQuery next = readNext();
+ if (next != null)
+ pq.add(next);
+ else
+ break;
+ }
+ }
+
+ protected FQLQuery computeNext()
+ {
+ FQLQuery q = pq.poll();
+ if (q == null)
+ return endOfData();
+ FQLQuery next = readNext();
+ if (next != null)
+ pq.add(next);
+ return q;
+ }
+
+ private FQLQuery readNext()
+ {
+ if (tailer.readDocument(reader))
+ return reader.getQuery();
+ return null;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffb772/src/java/org/apache/cassandra/tools/fqltool/FQLQueryReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/fqltool/FQLQueryReader.java b/src/java/org/apache/cassandra/tools/fqltool/FQLQueryReader.java
new file mode 100644
index 0000000..af77c59
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/fqltool/FQLQueryReader.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.fqltool;
+
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.datastax.driver.core.BatchStatement;
+import io.netty.buffer.Unpooled;
+import net.openhft.chronicle.core.io.IORuntimeException;
+import net.openhft.chronicle.wire.ReadMarshallable;
+import net.openhft.chronicle.wire.ValueIn;
+import net.openhft.chronicle.wire.WireIn;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.transport.ProtocolVersion;
+
+import static org.apache.cassandra.audit.FullQueryLogger.GENERATED_NOW_IN_SECONDS;
+import static org.apache.cassandra.audit.FullQueryLogger.GENERATED_TIMESTAMP;
+import static org.apache.cassandra.audit.FullQueryLogger.KEYSPACE;
+import static org.apache.cassandra.audit.FullQueryLogger.PROTOCOL_VERSION;
+import static org.apache.cassandra.audit.FullQueryLogger.QUERY_OPTIONS;
+import static org.apache.cassandra.audit.FullQueryLogger.QUERY_START_TIME;
+import static org.apache.cassandra.audit.FullQueryLogger.TYPE;
+import static org.apache.cassandra.audit.FullQueryLogger.VERSION;
+import static org.apache.cassandra.audit.FullQueryLogger.BATCH;
+import static org.apache.cassandra.audit.FullQueryLogger.BATCH_TYPE;
+import static org.apache.cassandra.audit.FullQueryLogger.QUERIES;
+import static org.apache.cassandra.audit.FullQueryLogger.QUERY;
+import static org.apache.cassandra.audit.FullQueryLogger.SINGLE_QUERY;
+import static org.apache.cassandra.audit.FullQueryLogger.VALUES;
+
+public class FQLQueryReader implements ReadMarshallable
+{
+ private FQLQuery query;
+
+ public void readMarshallable(WireIn wireIn) throws IORuntimeException
+ {
+ int currentVersion = wireIn.read(VERSION).int16();
+ String type = wireIn.read(TYPE).text();
+ long queryStartTime = wireIn.read(QUERY_START_TIME).int64();
+ int protocolVersion = wireIn.read(PROTOCOL_VERSION).int32();
+ QueryOptions queryOptions = QueryOptions.codec.decode(Unpooled.wrappedBuffer(wireIn.read(QUERY_OPTIONS).bytes()), ProtocolVersion.decode(protocolVersion));
+ long generatedTimestamp = wireIn.read(GENERATED_TIMESTAMP).int64();
+ int generatedNowInSeconds = wireIn.read(GENERATED_NOW_IN_SECONDS).int32();
+ String keyspace = wireIn.read(KEYSPACE).text();
+
+ switch (type)
+ {
+ case SINGLE_QUERY:
+ String queryString = wireIn.read(QUERY).text();
+ query = new FQLQuery.Single(keyspace,
+ protocolVersion,
+ queryOptions,
+ queryStartTime,
+ generatedTimestamp,
+ generatedNowInSeconds,
+ queryString,
+ queryOptions.getValues());
+ break;
+ case BATCH:
+ BatchStatement.Type batchType = BatchStatement.Type.valueOf(wireIn.read(BATCH_TYPE).text());
+ ValueIn in = wireIn.read(QUERIES);
+ int queryCount = in.int32();
+
+ List<String> queries = new ArrayList<>(queryCount);
+ for (int i = 0; i < queryCount; i++)
+ queries.add(in.text());
+ in = wireIn.read(VALUES);
+ int valueCount = in.int32();
+ List<List<ByteBuffer>> values = new ArrayList<>(valueCount);
+ for (int ii = 0; ii < valueCount; ii++)
+ {
+ List<ByteBuffer> subValues = new ArrayList<>();
+ values.add(subValues);
+ int numSubValues = in.int32();
+ for (int zz = 0; zz < numSubValues; zz++)
+ subValues.add(ByteBuffer.wrap(in.bytes()));
+ }
+ query = new FQLQuery.Batch(keyspace,
+ protocolVersion,
+ queryOptions,
+ queryStartTime,
+ generatedTimestamp,
+ generatedNowInSeconds,
+ batchType,
+ queries,
+ values);
+ break;
+ default:
+ throw new RuntimeException("Unknown type: " + type);
+ }
+ }
+
+ public FQLQuery getQuery()
+ {
+ return query;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffb772/src/java/org/apache/cassandra/tools/fqltool/QueryReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/fqltool/QueryReplayer.java b/src/java/org/apache/cassandra/tools/fqltool/QueryReplayer.java
new file mode 100644
index 0000000..0c8382f
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/fqltool/QueryReplayer.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.fqltool;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import javax.annotation.Nullable;
+
+import com.google.common.util.concurrent.FluentFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class QueryReplayer implements Closeable
+{
+ private static final int PRINT_RATE = 5000;
+ private final ExecutorService es = Executors.newFixedThreadPool(1);
+ private final Iterator<List<FQLQuery>> queryIterator;
+ private final List<Cluster> targetClusters;
+ private final List<Predicate<FQLQuery>> filters;
+ private final List<Session> sessions;
+ private final ResultHandler resultHandler;
+ private final MetricRegistry metrics = new MetricRegistry();
+ private final boolean debug;
+ private final PrintStream out;
+
+ public QueryReplayer(Iterator<List<FQLQuery>> queryIterator,
+ List<String> targetHosts,
+ List<File> resultPaths,
+ List<Predicate<FQLQuery>> filters,
+ PrintStream out,
+ String queryFilePathString,
+ boolean debug)
+ {
+ this.queryIterator = queryIterator;
+ targetClusters = targetHosts.stream().map(h -> Cluster.builder().addContactPoint(h).build()).collect(Collectors.toList());
+ this.filters = filters;
+ sessions = targetClusters.stream().map(Cluster::connect).collect(Collectors.toList());
+ File queryFilePath = queryFilePathString != null ? new File(queryFilePathString) : null;
+ resultHandler = new ResultHandler(targetHosts, resultPaths, queryFilePath);
+ this.debug = debug;
+ this.out = out;
+ }
+
+ public void replay()
+ {
+ while (queryIterator.hasNext())
+ {
+ List<FQLQuery> queries = queryIterator.next();
+ for (FQLQuery query : queries)
+ {
+ if (filters.stream().anyMatch(f -> !f.test(query)))
+ continue;
+ try (Timer.Context ctx = metrics.timer("queries").time())
+ {
+ List<ListenableFuture<ResultHandler.ComparableResultSet>> results = new ArrayList<>(sessions.size());
+ Statement statement = query.toStatement();
+ for (Session session : sessions)
+ {
+ try
+ {
+ if (query.keyspace != null && !query.keyspace.equals(session.getLoggedKeyspace()))
+ {
+ if (debug)
+ out.printf("Switching keyspace from %s to %s%n", session.getLoggedKeyspace(), query.keyspace);
+ session.execute("USE " + query.keyspace);
+ }
+ }
+ catch (Throwable t)
+ {
+ out.printf("USE %s failed: %s%n", query.keyspace, t.getMessage());
+ }
+ if (debug)
+ {
+ out.println("Executing query:");
+ out.println(query);
+ }
+ ListenableFuture<ResultSet> future = session.executeAsync(statement);
+ results.add(handleErrors(future));
+ }
+
+ ListenableFuture<List<ResultHandler.ComparableResultSet>> resultList = Futures.allAsList(results);
+
+ Futures.addCallback(resultList, new FutureCallback<List<ResultHandler.ComparableResultSet>>()
+ {
+ public void onSuccess(@Nullable List<ResultHandler.ComparableResultSet> resultSets)
+ {
+ // note that the order of resultSets is signifcant here - resultSets.get(x) should
+ // be the result from a query against targetHosts.get(x)
+ resultHandler.handleResults(query, resultSets);
+ }
+
+ public void onFailure(Throwable throwable)
+ {
+ throw new AssertionError("Errors should be handled in FQLQuery.execute", throwable);
+ }
+ }, es);
+
+ FBUtilities.waitOnFuture(resultList);
+ }
+ catch (Throwable t)
+ {
+ out.printf("QUERY %s got exception: %s", query, t.getMessage());
+ }
+
+ Timer timer = metrics.timer("queries");
+ if (timer.getCount() % PRINT_RATE == 0)
+ out.printf("%d queries, rate = %.2f%n", timer.getCount(), timer.getOneMinuteRate());
+ }
+ }
+ }
+
+ /**
+ * Make sure we catch any query errors
+ *
+ * On error, this creates a failed ComparableResultSet with the exception set to be able to store
+ * this fact in the result file and handle comparison of failed result sets.
+ */
+ private static ListenableFuture<ResultHandler.ComparableResultSet> handleErrors(ListenableFuture<ResultSet> result)
+ {
+ FluentFuture<ResultHandler.ComparableResultSet> fluentFuture = FluentFuture.from(result)
+ .transform(DriverResultSet::new, MoreExecutors.directExecutor());
+ return fluentFuture.catching(Throwable.class, DriverResultSet::failed, MoreExecutors.directExecutor());
+ }
+
+ public void close()
+ {
+ sessions.forEach(Session::close);
+ targetClusters.forEach(Cluster::close);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffb772/src/java/org/apache/cassandra/tools/fqltool/ResultComparator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/fqltool/ResultComparator.java b/src/java/org/apache/cassandra/tools/fqltool/ResultComparator.java
new file mode 100644
index 0000000..4bbaf7a
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/fqltool/ResultComparator.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.fqltool;
+
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Streams;
+
+public class ResultComparator
+{
+ /**
+ * Compares the rows in rows
+ * the row at position x in rows will have come from host at position x in targetHosts
+ */
+ public boolean compareRows(List<String> targetHosts, FQLQuery query, List<ResultHandler.ComparableRow> rows)
+ {
+ if (rows.size() < 2 || rows.stream().allMatch(Objects::isNull))
+ return true;
+
+ if (rows.stream().anyMatch(Objects::isNull))
+ {
+ handleMismatch(targetHosts, query, rows);
+ return false;
+ }
+
+ ResultHandler.ComparableRow ref = rows.get(0);
+ boolean equal = true;
+ for (int i = 1; i < rows.size(); i++)
+ {
+ ResultHandler.ComparableRow compare = rows.get(i);
+ if (!ref.equals(compare))
+ equal = false;
+ }
+ if (!equal)
+ handleMismatch(targetHosts, query, rows);
+ return equal;
+ }
+
+ /**
+ * Compares the column definitions
+ *
+ * the column definitions at position x in cds will have come from host at position x in targetHosts
+ */
+ public boolean compareColumnDefinitions(List<String> targetHosts, FQLQuery query, List<ResultHandler.ComparableColumnDefinitions> cds)
+ {
+ if (cds.size() < 2)
+ return true;
+
+ boolean equal = true;
+ List<ResultHandler.ComparableDefinition> refDefs = cds.get(0).asList();
+ for (int i = 1; i < cds.size(); i++)
+ {
+ List<ResultHandler.ComparableDefinition> toCompare = cds.get(i).asList();
+ if (!refDefs.equals(toCompare))
+ equal = false;
+ }
+ if (!equal)
+ handleColumnDefMismatch(targetHosts, query, cds);
+ return equal;
+ }
+
+ private void handleMismatch(List<String> targetHosts, FQLQuery query, List<ResultHandler.ComparableRow> rows)
+ {
+ System.out.println("MISMATCH:");
+ System.out.println("Query = " + query);
+ System.out.println("Results:");
+ System.out.println(Streams.zip(rows.stream(), targetHosts.stream(), (r, host) -> String.format("%s: %s%n", host, r == null ? "null" : r)).collect(Collectors.joining()));
+ }
+
+ private void handleColumnDefMismatch(List<String> targetHosts, FQLQuery query, List<ResultHandler.ComparableColumnDefinitions> cds)
+ {
+ System.out.println("COLUMN DEFINITION MISMATCH:");
+ System.out.println("Query = " + query);
+ System.out.println("Results: ");
+ System.out.println(Streams.zip(cds.stream(), targetHosts.stream(), (cd, host) -> String.format("%s: %s%n", host, columnDefinitionsString(cd))).collect(Collectors.joining()));
+ }
+
+ private String columnDefinitionsString(ResultHandler.ComparableColumnDefinitions cd)
+ {
+ StringBuilder sb = new StringBuilder();
+ if (cd == null)
+ sb.append("NULL");
+ else if (cd.wasFailed())
+ sb.append("FAILED");
+ else
+ {
+ for (ResultHandler.ComparableDefinition def : cd)
+ {
+ sb.append(def.toString());
+ }
+ }
+ return sb.toString();
+ }
+
+
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffb772/src/java/org/apache/cassandra/tools/fqltool/ResultHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/fqltool/ResultHandler.java b/src/java/org/apache/cassandra/tools/fqltool/ResultHandler.java
new file mode 100644
index 0000000..c769231
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/fqltool/ResultHandler.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.fqltool;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class ResultHandler
+{
+ private final ResultStore resultStore;
+ private final ResultComparator resultComparator;
+ private final List<String> targetHosts;
+
+ public ResultHandler(List<String> targetHosts, List<File> resultPaths, File queryFilePath)
+ {
+ this.targetHosts = targetHosts;
+ resultStore = resultPaths != null ? new ResultStore(resultPaths, queryFilePath) : null;
+ resultComparator = new ResultComparator();
+ }
+
+ /**
+ * Since we can't iterate a ResultSet more than once, and we don't want to keep the entire result set in memory
+ * we feed the rows one-by-one to resultComparator and resultStore.
+ *
+ * results.get(x) should be the results from executing query against targetHosts.get(x)
+ */
+ public void handleResults(FQLQuery query, List<ComparableResultSet> results)
+ {
+ for (int i = 0; i < targetHosts.size(); i++)
+ {
+ if (results.get(i).wasFailed())
+ {
+ System.out.println("Query against "+targetHosts.get(i)+" failure:");
+ System.out.println(query);
+ System.out.println("Message: "+results.get(i).getFailureException().getMessage());
+ }
+ }
+
+ List<ComparableColumnDefinitions> columnDefinitions = results.stream().map(ComparableResultSet::getColumnDefinitions).collect(Collectors.toList());
+ resultComparator.compareColumnDefinitions(targetHosts, query, columnDefinitions);
+ if (resultStore != null)
+ resultStore.storeColumnDefinitions(query, columnDefinitions);
+ List<Iterator<ComparableRow>> iters = results.stream().map(Iterable::iterator).collect(Collectors.toList());
+
+ while (true)
+ {
+ List<ComparableRow> rows = rows(iters);
+ resultComparator.compareRows(targetHosts, query, rows);
+ if (resultStore != null)
+ resultStore.storeRows(rows);
+ // all rows being null marks end of all resultsets, we need to call compareRows
+ // and storeRows once with everything null to mark that fact
+ if (rows.stream().allMatch(Objects::isNull))
+ return;
+ }
+ }
+
+ /**
+ * Get the first row from each of the iterators, if the iterator has run out, null will mark that in the list
+ */
+ @VisibleForTesting
+ public static List<ComparableRow> rows(List<Iterator<ComparableRow>> iters)
+ {
+ List<ComparableRow> rows = new ArrayList<>(iters.size());
+ for (Iterator<ComparableRow> iter : iters)
+ {
+ if (iter.hasNext())
+ rows.add(iter.next());
+ else
+ rows.add(null);
+ }
+ return rows;
+ }
+
+ public interface ComparableResultSet extends Iterable<ComparableRow>
+ {
+ public ComparableColumnDefinitions getColumnDefinitions();
+ public boolean wasFailed();
+ public Throwable getFailureException();
+ }
+
+ public interface ComparableColumnDefinitions extends Iterable<ComparableDefinition>
+ {
+ public List<ComparableDefinition> asList();
+ public boolean wasFailed();
+ public int size();
+ }
+
+ public interface ComparableDefinition
+ {
+ public String getType();
+ public String getName();
+ }
+
+ public interface ComparableRow
+ {
+ public ByteBuffer getBytesUnsafe(int i);
+ public ComparableColumnDefinitions getColumnDefinitions();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffb772/src/java/org/apache/cassandra/tools/fqltool/ResultStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/fqltool/ResultStore.java b/src/java/org/apache/cassandra/tools/fqltool/ResultStore.java
new file mode 100644
index 0000000..6d6aaac
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/fqltool/ResultStore.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.fqltool;
+
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import net.openhft.chronicle.bytes.BytesStore;
+import net.openhft.chronicle.core.io.Closeable;
+import net.openhft.chronicle.queue.ChronicleQueue;
+import net.openhft.chronicle.queue.ChronicleQueueBuilder;
+import net.openhft.chronicle.queue.ExcerptAppender;
+import net.openhft.chronicle.wire.ValueOut;
+import org.apache.cassandra.utils.binlog.BinLog;
+
+/**
+ * see FQLReplayTest#readResultFile for how to read files produced by this class
+ */
+public class ResultStore
+{
+ private final List<ChronicleQueue> queues;
+ private final List<ExcerptAppender> appenders;
+ private final ChronicleQueue queryStoreQueue;
+ private final ExcerptAppender queryStoreAppender;
+ private final Set<Integer> finishedHosts = new HashSet<>();
+
+ public ResultStore(List<File> resultPaths, File queryFilePath)
+ {
+ queues = resultPaths.stream().map(path -> ChronicleQueueBuilder.single(path).build()).collect(Collectors.toList());
+ appenders = queues.stream().map(ChronicleQueue::acquireAppender).collect(Collectors.toList());
+ queryStoreQueue = queryFilePath != null ? ChronicleQueueBuilder.single(queryFilePath).build() : null;
+ queryStoreAppender = queryStoreQueue != null ? queryStoreQueue.acquireAppender() : null;
+ }
+
+ /**
+ * Store the column definitions in cds
+ *
+ * the ColumnDefinitions at position x will get stored by the appender at position x
+ *
+ * Calling this method indicates that we are starting a new result set from a query, it must be called before
+ * calling storeRows.
+ *
+ */
+ public void storeColumnDefinitions(FQLQuery query, List<ResultHandler.ComparableColumnDefinitions> cds)
+ {
+ finishedHosts.clear();
+ if (queryStoreAppender != null)
+ {
+ BinLog.ReleaseableWriteMarshallable writeMarshallableQuery = query.toMarshallable();
+ queryStoreAppender.writeDocument(writeMarshallableQuery);
+ writeMarshallableQuery.release();
+ }
+ for (int i = 0; i < cds.size(); i++)
+ {
+ ResultHandler.ComparableColumnDefinitions cd = cds.get(i);
+ appenders.get(i).writeDocument(wire ->
+ {
+ if (!cd.wasFailed())
+ {
+ wire.write("type").text("column_definitions");
+ wire.write("column_count").int32(cd.size());
+ for (ResultHandler.ComparableDefinition d : cd.asList())
+ {
+ ValueOut vo = wire.write("column_definition");
+ vo.text(d.getName());
+ vo.text(d.getType());
+ }
+ }
+ else
+ {
+ wire.write("type").text("query_failed");
+ }
+ });
+ }
+ }
+
+ /**
+ * Store rows
+ *
+ * the row at position x will get stored by appender at position x
+ *
+ * Before calling this for a new result set, storeColumnDefinitions must be called.
+ */
+ public void storeRows(List<ResultHandler.ComparableRow> rows)
+ {
+ for (int i = 0; i < rows.size(); i++)
+ {
+ ResultHandler.ComparableRow row = rows.get(i);
+ if (row == null && !finishedHosts.contains(i))
+ {
+ appenders.get(i).writeDocument(wire -> wire.write("type").text("end_resultset"));
+ finishedHosts.add(i);
+ }
+ else if (row != null)
+ {
+ appenders.get(i).writeDocument(wire ->
+ {
+ {
+ wire.write("type").text("row");
+ wire.write("row_column_count").int32(row.getColumnDefinitions().size());
+ for (int jj = 0; jj < row.getColumnDefinitions().size(); jj++)
+ {
+ ByteBuffer bb = row.getBytesUnsafe(jj);
+ if (bb != null)
+ wire.write("column").bytes(BytesStore.wrap(bb));
+ else
+ wire.write("column").bytes("NULL".getBytes());
+ }
+ }
+ });
+ }
+ }
+ }
+
+ public void close()
+ {
+ queues.forEach(Closeable::close);
+ if (queryStoreQueue != null)
+ queryStoreQueue.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffb772/src/java/org/apache/cassandra/tools/fqltool/commands/Dump.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/fqltool/commands/Dump.java b/src/java/org/apache/cassandra/tools/fqltool/commands/Dump.java
new file mode 100644
index 0000000..5c23d3e
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/fqltool/commands/Dump.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.fqltool.commands;
+
+import java.io.File;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import io.airlift.airline.Arguments;
+import io.airlift.airline.Command;
+import io.airlift.airline.Option;
+import io.netty.buffer.Unpooled;
+import net.openhft.chronicle.bytes.Bytes;
+import net.openhft.chronicle.queue.ChronicleQueue;
+import net.openhft.chronicle.queue.ChronicleQueueBuilder;
+import net.openhft.chronicle.queue.ExcerptTailer;
+import net.openhft.chronicle.queue.RollCycles;
+import net.openhft.chronicle.threads.Pauser;
+import net.openhft.chronicle.wire.ReadMarshallable;
+import net.openhft.chronicle.wire.ValueIn;
+import net.openhft.chronicle.wire.WireIn;
+import org.apache.cassandra.audit.FullQueryLogger;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.transport.ProtocolVersion;
+
+/**
+ * Dump the contents of a list of paths containing full query logs
+ */
+@Command(name = "dump", description = "Dump the contents of a full query log")
+public class Dump implements Runnable
+{
+ static final char[] HEXI_DECIMAL = "0123456789ABCDEF".toCharArray();
+
+ @Arguments(usage = "<path1> [<path2>...<pathN>]", description = "Path containing the full query logs to dump.", required = true)
+ private List<String> arguments = new ArrayList<>();
+
+ @Option(title = "roll_cycle", name = {"--roll-cycle"}, description = "How often to roll the log file was rolled. May be necessary for Chronicle to correctly parse file names. (MINUTELY, HOURLY, DAILY). Default HOURLY.")
+ private String rollCycle = "HOURLY";
+
+ @Option(title = "follow", name = {"--follow"}, description = "Upon reacahing the end of the log continue indefinitely waiting for more records")
+ private boolean follow = false;
+
+ @Override
+ public void run()
+ {
+ dump(arguments, rollCycle, follow);
+ }
+
+ public static void dump(List<String> arguments, String rollCycle, boolean follow)
+ {
+ StringBuilder sb = new StringBuilder();
+ ReadMarshallable reader = wireIn ->
+ {
+ sb.setLength(0);
+
+ int version = wireIn.read(FullQueryLogger.VERSION).int16();
+ if (version != FullQueryLogger.CURRENT_VERSION)
+ throw new UnsupportedOperationException("Full query log of unexpected version " + version + " encountered");
+
+ String type = wireIn.read(FullQueryLogger.TYPE).text();
+ sb.append("Type: ")
+ .append(type)
+ .append(System.lineSeparator());
+
+ long queryStartTime = wireIn.read(FullQueryLogger.QUERY_START_TIME).int64();
+ sb.append("Query start time: ")
+ .append(queryStartTime)
+ .append(System.lineSeparator());
+
+ int protocolVersion = wireIn.read(FullQueryLogger.PROTOCOL_VERSION).int32();
+ sb.append("Protocol version: ")
+ .append(protocolVersion)
+ .append(System.lineSeparator());
+
+ QueryOptions options =
+ QueryOptions.codec.decode(Unpooled.wrappedBuffer(wireIn.read(FullQueryLogger.QUERY_OPTIONS).bytes()),
+ ProtocolVersion.decode(protocolVersion));
+
+ long generatedTimestamp = wireIn.read(FullQueryLogger.GENERATED_TIMESTAMP).int64();
+ sb.append("Generated timestamp:")
+ .append(generatedTimestamp)
+ .append(System.lineSeparator());
+
+ int generatedNowInSeconds = wireIn.read(FullQueryLogger.GENERATED_NOW_IN_SECONDS).int32();
+ sb.append("Generated nowInSeconds:")
+ .append(generatedNowInSeconds)
+ .append(System.lineSeparator());
+
+ switch (type)
+ {
+ case (FullQueryLogger.SINGLE_QUERY):
+ dumpQuery(options, wireIn, sb);
+ break;
+
+ case (FullQueryLogger.BATCH):
+ dumpBatch(options, wireIn, sb);
+ break;
+
+ default:
+ throw new UnsupportedOperationException("Log entry of unsupported type " + type);
+ }
+
+ System.out.print(sb.toString());
+ System.out.flush();
+ };
+
+ //Backoff strategy for spinning on the queue, not aggressive at all as this doesn't need to be low latency
+ Pauser pauser = Pauser.millis(100);
+ List<ChronicleQueue> queues = arguments.stream().distinct().map(path -> ChronicleQueueBuilder.single(new File(path)).readOnly(true).rollCycle(RollCycles.valueOf(rollCycle)).build()).collect(Collectors.toList());
+ List<ExcerptTailer> tailers = queues.stream().map(ChronicleQueue::createTailer).collect(Collectors.toList());
+ boolean hadWork = true;
+ while (hadWork)
+ {
+ hadWork = false;
+ for (ExcerptTailer tailer : tailers)
+ {
+ while (tailer.readDocument(reader))
+ {
+ hadWork = true;
+ }
+ }
+
+ if (follow)
+ {
+ if (!hadWork)
+ {
+ //Chronicle queue doesn't support blocking so use this backoff strategy
+ pauser.pause();
+ }
+ //Don't terminate the loop even if there wasn't work
+ hadWork = true;
+ }
+ }
+ }
+
+ private static void dumpQuery(QueryOptions options, WireIn wireIn, StringBuilder sb)
+ {
+ sb.append("Query: ")
+ .append(wireIn.read(FullQueryLogger.QUERY).text())
+ .append(System.lineSeparator());
+
+ List<ByteBuffer> values = options.getValues() != null
+ ? options.getValues()
+ : Collections.emptyList();
+
+ sb.append("Values: ")
+ .append(System.lineSeparator());
+ appendValuesToStringBuilder(values, sb);
+ sb.append(System.lineSeparator());
+ }
+
+ private static void dumpBatch(QueryOptions options, WireIn wireIn, StringBuilder sb)
+ {
+ sb.append("Batch type: ")
+ .append(wireIn.read(FullQueryLogger.BATCH_TYPE).text())
+ .append(System.lineSeparator());
+
+ ValueIn in = wireIn.read(FullQueryLogger.QUERIES);
+ int numQueries = in.int32();
+ List<String> queries = new ArrayList<>(numQueries);
+ for (int i = 0; i < numQueries; i++)
+ queries.add(in.text());
+
+ in = wireIn.read(FullQueryLogger.VALUES);
+ int numValues = in.int32();
+
+ for (int i = 0; i < numValues; i++)
+ {
+ int numSubValues = in.int32();
+ List<ByteBuffer> subValues = new ArrayList<>(numSubValues);
+ for (int j = 0; j < numSubValues; j++)
+ subValues.add(ByteBuffer.wrap(in.bytes()));
+
+ sb.append("Query: ")
+ .append(queries.get(i))
+ .append(System.lineSeparator());
+
+ sb.append("Values: ")
+ .append(System.lineSeparator());
+ appendValuesToStringBuilder(subValues, sb);
+ }
+
+ sb.append(System.lineSeparator());
+ }
+
+ private static void appendValuesToStringBuilder(List<ByteBuffer> values, StringBuilder sb)
+ {
+ boolean first = true;
+ for (ByteBuffer value : values)
+ {
+ Bytes bytes = Bytes.wrapForRead(value);
+ long maxLength2 = Math.min(1024, bytes.readLimit() - bytes.readPosition());
+ toHexString(bytes, bytes.readPosition(), maxLength2, sb);
+ if (maxLength2 < bytes.readLimit() - bytes.readPosition())
+ {
+ sb.append("... truncated").append(System.lineSeparator());
+ }
+
+ if (first)
+ {
+ first = false;
+ }
+ else
+ {
+ sb.append("-----").append(System.lineSeparator());
+ }
+ }
+ }
+
+ //This is from net.openhft.chronicle.bytes, need to pass in the StringBuilder so had to copy
+ /*
+ * Copyright 2016 higherfrequencytrading.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ /**
+ * display the hex data of {@link Bytes} from the position() to the limit()
+ *
+ * @param bytes the buffer you wish to toString()
+ * @return hex representation of the buffer, from example [0D ,OA, FF]
+ */
+ public static String toHexString(final Bytes bytes, long offset, long len, StringBuilder builder)
+ throws BufferUnderflowException
+ {
+ if (len == 0)
+ return "";
+
+ int width = 16;
+ int[] lastLine = new int[width];
+ String sep = "";
+ long position = bytes.readPosition();
+ long limit = bytes.readLimit();
+
+ try {
+ bytes.readPositionRemaining(offset, len);
+
+ long start = offset / width * width;
+ long end = (offset + len + width - 1) / width * width;
+ for (long i = start; i < end; i += width) {
+ // check for duplicate rows
+ if (i + width < end) {
+ boolean same = true;
+
+ for (int j = 0; j < width && i + j < offset + len; j++) {
+ int ch = bytes.readUnsignedByte(i + j);
+ same &= (ch == lastLine[j]);
+ lastLine[j] = ch;
+ }
+ if (i > start && same) {
+ sep = "........\n";
+ continue;
+ }
+ }
+ builder.append(sep);
+ sep = "";
+ String str = Long.toHexString(i);
+ for (int j = str.length(); j < 8; j++)
+ builder.append('0');
+ builder.append(str);
+ for (int j = 0; j < width; j++) {
+ if (j == width / 2)
+ builder.append(' ');
+ if (i + j < offset || i + j >= offset + len) {
+ builder.append(" ");
+
+ } else {
+ builder.append(' ');
+ int ch = bytes.readUnsignedByte(i + j);
+ builder.append(HEXI_DECIMAL[ch >> 4]);
+ builder.append(HEXI_DECIMAL[ch & 15]);
+ }
+ }
+ builder.append(' ');
+ for (int j = 0; j < width; j++) {
+ if (j == width / 2)
+ builder.append(' ');
+ if (i + j < offset || i + j >= offset + len) {
+ builder.append(' ');
+
+ } else {
+ int ch = bytes.readUnsignedByte(i + j);
+ if (ch < ' ' || ch > 126)
+ ch = '\u00B7';
+ builder.append((char) ch);
+ }
+ }
+ builder.append("\n");
+ }
+ return builder.toString();
+ } finally {
+ bytes.readLimit(limit);
+ bytes.readPosition(position);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org