You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2018/09/01 08:44:21 UTC
[5/7] cassandra git commit: Add fqltool compare
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/fqltool/src/org/apache/cassandra/fqltool/ResultComparator.java
----------------------------------------------------------------------
diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/ResultComparator.java b/tools/fqltool/src/org/apache/cassandra/fqltool/ResultComparator.java
new file mode 100644
index 0000000..d8d419a
--- /dev/null
+++ b/tools/fqltool/src/org/apache/cassandra/fqltool/ResultComparator.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.fqltool;
+
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Streams;
+
+public class ResultComparator
+{
+ /**
+ * Compares the rows in rows
+ * the row at position x in rows will have come from host at position x in targetHosts
+ */
+ public boolean compareRows(List<String> targetHosts, FQLQuery query, List<ResultHandler.ComparableRow> rows)
+ {
+ if (rows.size() < 2 || rows.stream().allMatch(Objects::isNull))
+ return true;
+
+ if (rows.stream().anyMatch(Objects::isNull))
+ {
+ handleMismatch(targetHosts, query, rows);
+ return false;
+ }
+
+ ResultHandler.ComparableRow ref = rows.get(0);
+ boolean equal = true;
+ for (int i = 1; i < rows.size(); i++)
+ {
+ ResultHandler.ComparableRow compare = rows.get(i);
+ if (!ref.equals(compare))
+ equal = false;
+ }
+ if (!equal)
+ handleMismatch(targetHosts, query, rows);
+ return equal;
+ }
+
+ /**
+ * Compares the column definitions
+ *
+ * the column definitions at position x in cds will have come from host at position x in targetHosts
+ */
+ public boolean compareColumnDefinitions(List<String> targetHosts, FQLQuery query, List<ResultHandler.ComparableColumnDefinitions> cds)
+ {
+ if (cds.size() < 2)
+ return true;
+
+ boolean equal = true;
+ List<ResultHandler.ComparableDefinition> refDefs = cds.get(0).asList();
+ for (int i = 1; i < cds.size(); i++)
+ {
+ List<ResultHandler.ComparableDefinition> toCompare = cds.get(i).asList();
+ if (!refDefs.equals(toCompare))
+ equal = false;
+ }
+ if (!equal)
+ handleColumnDefMismatch(targetHosts, query, cds);
+ return equal;
+ }
+
+ private void handleMismatch(List<String> targetHosts, FQLQuery query, List<ResultHandler.ComparableRow> rows)
+ {
+ System.out.println("MISMATCH:");
+ System.out.println("Query = " + query);
+ System.out.println("Results:");
+ System.out.println(Streams.zip(rows.stream(), targetHosts.stream(), (r, host) -> String.format("%s: %s%n", host, r == null ? "null" : r)).collect(Collectors.joining()));
+ }
+
+ private void handleColumnDefMismatch(List<String> targetHosts, FQLQuery query, List<ResultHandler.ComparableColumnDefinitions> cds)
+ {
+ System.out.println("COLUMN DEFINITION MISMATCH:");
+ System.out.println("Query = " + query);
+ System.out.println("Results: ");
+ System.out.println(Streams.zip(cds.stream(), targetHosts.stream(), (cd, host) -> String.format("%s: %s%n", host, columnDefinitionsString(cd))).collect(Collectors.joining()));
+ }
+
+ private String columnDefinitionsString(ResultHandler.ComparableColumnDefinitions cd)
+ {
+ StringBuilder sb = new StringBuilder();
+ if (cd == null)
+ sb.append("NULL");
+ else if (cd.wasFailed())
+ sb.append("FAILED");
+ else
+ {
+ for (ResultHandler.ComparableDefinition def : cd)
+ {
+ sb.append(def.toString());
+ }
+ }
+ return sb.toString();
+ }
+
+
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/fqltool/src/org/apache/cassandra/fqltool/ResultHandler.java
----------------------------------------------------------------------
diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/ResultHandler.java b/tools/fqltool/src/org/apache/cassandra/fqltool/ResultHandler.java
new file mode 100644
index 0000000..8c4c018
--- /dev/null
+++ b/tools/fqltool/src/org/apache/cassandra/fqltool/ResultHandler.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.fqltool;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class ResultHandler implements Closeable
+{
+ private final ResultStore resultStore;
+ private final ResultComparator resultComparator;
+ private final List<String> targetHosts;
+
+ public ResultHandler(List<String> targetHosts, List<File> resultPaths, File queryFilePath)
+ {
+ this.targetHosts = targetHosts;
+ resultStore = resultPaths != null ? new ResultStore(resultPaths, queryFilePath) : null;
+ resultComparator = new ResultComparator();
+ }
+
+ /**
+ * Since we can't iterate a ResultSet more than once, and we don't want to keep the entire result set in memory
+ * we feed the rows one-by-one to resultComparator and resultStore.
+ *
+ * results.get(x) should be the results from executing query against targetHosts.get(x)
+ */
+ public void handleResults(FQLQuery query, List<ComparableResultSet> results)
+ {
+ for (int i = 0; i < targetHosts.size(); i++)
+ {
+ if (results.get(i).wasFailed())
+ {
+ System.out.println("Query against "+targetHosts.get(i)+" failure:");
+ System.out.println(query);
+ System.out.println("Message: "+results.get(i).getFailureException().getMessage());
+ }
+ }
+
+ List<ComparableColumnDefinitions> columnDefinitions = results.stream().map(ComparableResultSet::getColumnDefinitions).collect(Collectors.toList());
+ resultComparator.compareColumnDefinitions(targetHosts, query, columnDefinitions);
+ if (resultStore != null)
+ resultStore.storeColumnDefinitions(query, columnDefinitions);
+ List<Iterator<ComparableRow>> iters = results.stream().map(Iterable::iterator).collect(Collectors.toList());
+
+ while (true)
+ {
+ List<ComparableRow> rows = rows(iters);
+ resultComparator.compareRows(targetHosts, query, rows);
+ if (resultStore != null)
+ resultStore.storeRows(rows);
+ // all rows being null marks end of all resultsets, we need to call compareRows
+ // and storeRows once with everything null to mark that fact
+ if (rows.stream().allMatch(Objects::isNull))
+ return;
+ }
+ }
+
+ /**
+ * Get the first row from each of the iterators, if the iterator has run out, null will mark that in the list
+ */
+ @VisibleForTesting
+ public static List<ComparableRow> rows(List<Iterator<ComparableRow>> iters)
+ {
+ List<ComparableRow> rows = new ArrayList<>(iters.size());
+ for (Iterator<ComparableRow> iter : iters)
+ {
+ if (iter.hasNext())
+ rows.add(iter.next());
+ else
+ rows.add(null);
+ }
+ return rows;
+ }
+
+ public void close() throws IOException
+ {
+ if (resultStore != null)
+ resultStore.close();
+ }
+
+ public interface ComparableResultSet extends Iterable<ComparableRow>
+ {
+ public ComparableColumnDefinitions getColumnDefinitions();
+ public boolean wasFailed();
+ public Throwable getFailureException();
+ }
+
+ public interface ComparableColumnDefinitions extends Iterable<ComparableDefinition>
+ {
+ public List<ComparableDefinition> asList();
+ public boolean wasFailed();
+ public Throwable getFailureException();
+ public int size();
+ }
+
+ public interface ComparableDefinition
+ {
+ public String getType();
+ public String getName();
+ }
+
+ public interface ComparableRow
+ {
+ public ByteBuffer getBytesUnsafe(int i);
+ public ComparableColumnDefinitions getColumnDefinitions();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/fqltool/src/org/apache/cassandra/fqltool/ResultStore.java
----------------------------------------------------------------------
diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/ResultStore.java b/tools/fqltool/src/org/apache/cassandra/fqltool/ResultStore.java
new file mode 100644
index 0000000..d128717
--- /dev/null
+++ b/tools/fqltool/src/org/apache/cassandra/fqltool/ResultStore.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.fqltool;
+
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import net.openhft.chronicle.bytes.BytesStore;
+import net.openhft.chronicle.core.io.Closeable;
+import net.openhft.chronicle.core.io.IORuntimeException;
+import net.openhft.chronicle.queue.ChronicleQueue;
+import net.openhft.chronicle.queue.ChronicleQueueBuilder;
+import net.openhft.chronicle.queue.ExcerptAppender;
+import net.openhft.chronicle.wire.ReadMarshallable;
+import net.openhft.chronicle.wire.ValueIn;
+import net.openhft.chronicle.wire.ValueOut;
+import net.openhft.chronicle.wire.WireIn;
+import net.openhft.chronicle.wire.WireOut;
+import net.openhft.chronicle.wire.WriteMarshallable;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.binlog.BinLog;
+
+/**
+ * note that we store each row as a separate chronicle document to be able to
+ * avoid reading up the entire result set in memory when comparing
+ *
+ * document formats:
+ * to mark the start of a new result set:
+ * -------------------
+ * version: int16
+ * type: column_definitions
+ * column_count: int32;
+ * column_definition: text, text
+ * column_definition: text, text
+ * ....
+ * --------------------
+ *
+ * to mark a failed query:
+ * ---------------------
+ * version: int16
+ * type: query_failed
+ * message: text
+ * ---------------------
+ *
+ * row:
+ * --------------------
+ * version: int16
+ * type: row
+ * row_column_count: int32
+ * column: bytes
+ * ---------------------
+ *
+ * to mark the end of a result set:
+ * -------------------
+ * version: int16
+ * type: end_resultset
+ * -------------------
+ *
+ */
+public class ResultStore
+{
+ private static final String VERSION = "version";
+ private static final String TYPE = "type";
+ // types:
+ private static final String ROW = "row";
+ private static final String END = "end_resultset";
+ private static final String FAILURE = "query_failed";
+ private static final String COLUMN_DEFINITIONS = "column_definitions";
+ // fields:
+ private static final String COLUMN_DEFINITION = "column_definition";
+ private static final String COLUMN_COUNT = "column_count";
+ private static final String MESSAGE = "message";
+ private static final String ROW_COLUMN_COUNT = "row_column_count";
+ private static final String COLUMN = "column";
+
+ private static final int CURRENT_VERSION = 0;
+
+ private final List<ChronicleQueue> queues;
+ private final List<ExcerptAppender> appenders;
+ private final ChronicleQueue queryStoreQueue;
+ private final ExcerptAppender queryStoreAppender;
+ private final Set<Integer> finishedHosts = new HashSet<>();
+
+ public ResultStore(List<File> resultPaths, File queryFilePath)
+ {
+ queues = resultPaths.stream().map(path -> ChronicleQueueBuilder.single(path).build()).collect(Collectors.toList());
+ appenders = queues.stream().map(ChronicleQueue::acquireAppender).collect(Collectors.toList());
+ queryStoreQueue = queryFilePath != null ? ChronicleQueueBuilder.single(queryFilePath).build() : null;
+ queryStoreAppender = queryStoreQueue != null ? queryStoreQueue.acquireAppender() : null;
+ }
+
+ /**
+ * Store the column definitions in cds
+ *
+ * the ColumnDefinitions at position x will get stored by the appender at position x
+ *
+ * Calling this method indicates that we are starting a new result set from a query, it must be called before
+ * calling storeRows.
+ *
+ */
+ public void storeColumnDefinitions(FQLQuery query, List<ResultHandler.ComparableColumnDefinitions> cds)
+ {
+ finishedHosts.clear();
+ if (queryStoreAppender != null)
+ {
+ BinLog.ReleaseableWriteMarshallable writeMarshallableQuery = query.toMarshallable();
+ queryStoreAppender.writeDocument(writeMarshallableQuery);
+ writeMarshallableQuery.release();
+ }
+ for (int i = 0; i < cds.size(); i++)
+ {
+ ResultHandler.ComparableColumnDefinitions cd = cds.get(i);
+ appenders.get(i).writeDocument(new ColumnDefsWriter(cd));
+ }
+ }
+
+ /**
+ * Store rows
+ *
+ * the row at position x will get stored by appender at position x
+ *
+ * Before calling this for a new result set, storeColumnDefinitions must be called.
+ */
+ public void storeRows(List<ResultHandler.ComparableRow> rows)
+ {
+ for (int i = 0; i < rows.size(); i++)
+ {
+ ResultHandler.ComparableRow row = rows.get(i);
+ if (row == null && !finishedHosts.contains(i))
+ {
+ appenders.get(i).writeDocument(wire -> {
+ wire.write(VERSION).int16(CURRENT_VERSION);
+ wire.write(TYPE).text(END);
+ });
+ finishedHosts.add(i);
+ }
+ else if (row != null)
+ {
+ appenders.get(i).writeDocument(new RowWriter(row));
+ }
+ }
+ }
+
+ public void close()
+ {
+ queues.forEach(Closeable::close);
+ if (queryStoreQueue != null)
+ queryStoreQueue.close();
+ }
+
+ static class ColumnDefsWriter implements WriteMarshallable
+ {
+ private final ResultHandler.ComparableColumnDefinitions defs;
+
+ ColumnDefsWriter(ResultHandler.ComparableColumnDefinitions defs)
+ {
+ this.defs = defs;
+ }
+
+ public void writeMarshallable(WireOut wire)
+ {
+ wire.write(VERSION).int16(CURRENT_VERSION);
+ if (!defs.wasFailed())
+ {
+ wire.write(TYPE).text(COLUMN_DEFINITIONS);
+ wire.write(COLUMN_COUNT).int32(defs.size());
+ for (ResultHandler.ComparableDefinition d : defs.asList())
+ {
+ ValueOut vo = wire.write(COLUMN_DEFINITION);
+ vo.text(d.getName());
+ vo.text(d.getType());
+ }
+ }
+ else
+ {
+ wire.write(TYPE).text(FAILURE);
+ wire.write(MESSAGE).text(defs.getFailureException().getMessage());
+ }
+ }
+ }
+
+ static class ColumnDefsReader implements ReadMarshallable
+ {
+ boolean wasFailed;
+ String failureMessage;
+ List<Pair<String, String>> columnDefinitions = new ArrayList<>();
+
+ public void readMarshallable(WireIn wire) throws IORuntimeException
+ {
+ int version = wire.read(VERSION).int16();
+ String type = wire.read(TYPE).text();
+ if (type.equals(FAILURE))
+ {
+ wasFailed = true;
+ failureMessage = wire.read(MESSAGE).text();
+ }
+ else if (type.equals(COLUMN_DEFINITION))
+ {
+ int columnCount = wire.read(COLUMN_COUNT).int32();
+ for (int i = 0; i < columnCount; i++)
+ {
+ ValueIn vi = wire.read(COLUMN_DEFINITION);
+ String name = vi.text();
+ String dataType = vi.text();
+ columnDefinitions.add(Pair.create(name, dataType));
+ }
+ }
+ }
+ }
+
+ /**
+ * read a single row from the wire, or, marks itself finished if we read "end_resultset"
+ */
+ static class RowReader implements ReadMarshallable
+ {
+ boolean isFinished;
+ List<ByteBuffer> rows = new ArrayList<>();
+
+ public void readMarshallable(WireIn wire) throws IORuntimeException
+ {
+ int version = wire.read(VERSION).int32();
+ String type = wire.read(TYPE).text();
+ if (!type.equals(END))
+ {
+ isFinished = false;
+ int rowColumnCount = wire.read(ROW_COLUMN_COUNT).int32();
+
+ for (int i = 0; i < rowColumnCount; i++)
+ {
+ byte[] b = wire.read(COLUMN).bytes();
+ rows.add(ByteBuffer.wrap(b));
+ }
+ }
+ else
+ {
+ isFinished = true;
+ }
+ }
+ }
+
+ /**
+ * Writes a single row to the given wire
+ */
+ static class RowWriter implements WriteMarshallable
+ {
+ private final ResultHandler.ComparableRow row;
+
+ RowWriter(ResultHandler.ComparableRow row)
+ {
+ this.row = row;
+ }
+
+ public void writeMarshallable(WireOut wire)
+ {
+ wire.write(VERSION).int16(CURRENT_VERSION);
+ wire.write(TYPE).text(ROW);
+ wire.write(ROW_COLUMN_COUNT).int32(row.getColumnDefinitions().size());
+ for (int jj = 0; jj < row.getColumnDefinitions().size(); jj++)
+ {
+ ByteBuffer bb = row.getBytesUnsafe(jj);
+ if (bb != null)
+ wire.write(COLUMN).bytes(BytesStore.wrap(bb));
+ else
+ wire.write(COLUMN).bytes("NULL".getBytes());
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/fqltool/src/org/apache/cassandra/fqltool/StoredResultSet.java
----------------------------------------------------------------------
diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/StoredResultSet.java b/tools/fqltool/src/org/apache/cassandra/fqltool/StoredResultSet.java
new file mode 100644
index 0000000..b08861d
--- /dev/null
+++ b/tools/fqltool/src/org/apache/cassandra/fqltool/StoredResultSet.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.fqltool;
+
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.AbstractIterator;
+
+import net.openhft.chronicle.queue.ExcerptTailer;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Pair;
+
+/**
+ * represents a resultset defined by the format in ResultStore on disk
+ *
+ * todo: Currently all iterators need to be consumed fully while iterating over result sets
+ * if this is created from a tailer. This can probably be improved, but for all current uses it is fine.
+ */
+public class StoredResultSet implements ResultHandler.ComparableResultSet
+{
+ private final ResultHandler.ComparableColumnDefinitions defs;
+ public final boolean hasMoreResultSets;
+ private final Supplier<Iterator<ResultHandler.ComparableRow>> rowIteratorSupplier;
+ private final boolean wasFailed;
+ private final Throwable failureException;
+
+ /**
+ * create a new StoredResultSet
+ *
+ * note that we use an iteratorSupplier to be able to iterate over the same in-memory rows several times *in tests*
+ */
+ public StoredResultSet(ResultHandler.ComparableColumnDefinitions defs,
+ boolean hasMoreResultSets,
+ boolean wasFailed,
+ Throwable failure,
+ Supplier<Iterator<ResultHandler.ComparableRow>> iteratorSupplier)
+ {
+ this.defs = defs;
+ this.hasMoreResultSets = hasMoreResultSets;
+ this.wasFailed = wasFailed;
+ this.failureException = failure;
+ this.rowIteratorSupplier = iteratorSupplier;
+ }
+
+ /**
+ * creates a ComparableResultSet based on the data in tailer
+ */
+ public static StoredResultSet fromTailer(ExcerptTailer tailer)
+ {
+ ResultStore.ColumnDefsReader reader = new ResultStore.ColumnDefsReader();
+ boolean hasMoreResultSets = tailer.readDocument(reader);
+ ResultHandler.ComparableColumnDefinitions defs = new StoredComparableColumnDefinitions(reader.columnDefinitions,
+ reader.wasFailed,
+ new RuntimeException(reader.failureMessage));
+
+
+ Iterator<ResultHandler.ComparableRow> rowIterator = new AbstractIterator<ResultHandler.ComparableRow>()
+ {
+ protected ResultHandler.ComparableRow computeNext()
+ {
+ ResultStore.RowReader rowReader = new ResultStore.RowReader();
+ tailer.readDocument(rowReader);
+ if (rowReader.isFinished)
+ return endOfData();
+ return new StoredComparableRow(rowReader.rows, defs);
+ }
+ };
+
+ return new StoredResultSet(defs,
+ hasMoreResultSets,
+ reader.wasFailed,
+ new RuntimeException(reader.failureMessage),
+ () -> rowIterator);
+ }
+
+ public static ResultHandler.ComparableResultSet failed(String failureMessage)
+ {
+ return new FailedComparableResultSet(new RuntimeException(failureMessage));
+ }
+
+ public Iterator<ResultHandler.ComparableRow> iterator()
+ {
+ return rowIteratorSupplier.get();
+ }
+
+ public ResultHandler.ComparableColumnDefinitions getColumnDefinitions()
+ {
+ return defs;
+ }
+
+ public boolean wasFailed()
+ {
+ return wasFailed;
+ }
+
+ public Throwable getFailureException()
+ {
+ return failureException;
+ }
+
+ static class StoredComparableRow implements ResultHandler.ComparableRow
+ {
+ private final List<ByteBuffer> row;
+ private final ResultHandler.ComparableColumnDefinitions cds;
+
+ public StoredComparableRow(List<ByteBuffer> row, ResultHandler.ComparableColumnDefinitions cds)
+ {
+ this.row = row;
+ this.cds = cds;
+ }
+
+ public ByteBuffer getBytesUnsafe(int i)
+ {
+ return row.get(i);
+ }
+
+ public ResultHandler.ComparableColumnDefinitions getColumnDefinitions()
+ {
+ return cds;
+ }
+
+ public boolean equals(Object other)
+ {
+ if (!(other instanceof StoredComparableRow))
+ return false;
+ return row.equals(((StoredComparableRow)other).row);
+ }
+
+ public String toString()
+ {
+ return row.stream().map(ByteBufferUtil::bytesToHex).collect(Collectors.joining(","));
+ }
+ }
+
+ static class StoredComparableColumnDefinitions implements ResultHandler.ComparableColumnDefinitions
+ {
+ private final List<ResultHandler.ComparableDefinition> defs;
+ private final boolean wasFailed;
+ private final Throwable failureException;
+
+ public StoredComparableColumnDefinitions(List<Pair<String, String>> cds, boolean wasFailed, Throwable failureException)
+ {
+ defs = cds != null ? cds.stream().map(StoredComparableDefinition::new).collect(Collectors.toList()) : Collections.emptyList();
+ this.wasFailed = wasFailed;
+ this.failureException = failureException;
+ }
+ public List<ResultHandler.ComparableDefinition> asList()
+ {
+ return wasFailed() ? Collections.emptyList() : defs;
+ }
+
+ public boolean wasFailed()
+ {
+ return wasFailed;
+ }
+
+ public Throwable getFailureException()
+ {
+ return failureException;
+ }
+
+ public int size()
+ {
+ return asList().size();
+ }
+
+ public Iterator<ResultHandler.ComparableDefinition> iterator()
+ {
+ return defs.iterator();
+ }
+
+ public boolean equals(Object other)
+ {
+ if (!(other instanceof StoredComparableColumnDefinitions))
+ return false;
+ return defs.equals(((StoredComparableColumnDefinitions)other).defs);
+ }
+
+ public String toString()
+ {
+ return defs.toString();
+ }
+ }
+
+ private static class StoredComparableDefinition implements ResultHandler.ComparableDefinition
+ {
+ private final Pair<String, String> p;
+
+ public StoredComparableDefinition(Pair<String, String> p)
+ {
+ this.p = p;
+ }
+ public String getType()
+ {
+ return p.right;
+ }
+
+ public String getName()
+ {
+ return p.left;
+ }
+
+ public boolean equals(Object other)
+ {
+ if (!(other instanceof StoredComparableDefinition))
+ return false;
+ return p.equals(((StoredComparableDefinition)other).p);
+ }
+
+ public String toString()
+ {
+ return getName() + ':' + getType();
+ }
+ }
+
+ private static class FailedComparableResultSet implements ResultHandler.ComparableResultSet
+ {
+ private final Throwable exception;
+
+ public FailedComparableResultSet(Throwable exception)
+ {
+ this.exception = exception;
+ }
+ public ResultHandler.ComparableColumnDefinitions getColumnDefinitions()
+ {
+ return new ResultHandler.ComparableColumnDefinitions()
+ {
+ public List<ResultHandler.ComparableDefinition> asList()
+ {
+ return Collections.emptyList();
+ }
+
+ public boolean wasFailed()
+ {
+ return true;
+ }
+
+ public Throwable getFailureException()
+ {
+ return exception;
+ }
+
+ public int size()
+ {
+ return 0;
+ }
+
+ public Iterator<ResultHandler.ComparableDefinition> iterator()
+ {
+ return asList().iterator();
+ }
+ };
+ }
+
+ public boolean wasFailed()
+ {
+ return true;
+ }
+
+ public Throwable getFailureException()
+ {
+ return new RuntimeException();
+ }
+
+ public Iterator<ResultHandler.ComparableRow> iterator()
+ {
+ return Collections.emptyListIterator();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Compare.java
----------------------------------------------------------------------
diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Compare.java b/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Compare.java
new file mode 100644
index 0000000..2375296
--- /dev/null
+++ b/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Compare.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.fqltool.commands;
+
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.AbstractIterator;
+
+import io.airlift.airline.Arguments;
+import io.airlift.airline.Command;
+import io.airlift.airline.Option;
+import net.openhft.chronicle.core.io.Closeable;
+import net.openhft.chronicle.queue.ChronicleQueue;
+import net.openhft.chronicle.queue.ChronicleQueueBuilder;
+import net.openhft.chronicle.queue.ExcerptTailer;
+import org.apache.cassandra.fqltool.FQLQueryIterator;
+import org.apache.cassandra.fqltool.ResultHandler;
+import org.apache.cassandra.fqltool.StoredResultSet;
+
+/**
+ */
+@Command(name = "compare", description = "Compare result files generated by fqltool replay")
+public class Compare implements Runnable
+{
+ @Arguments(usage = "<path1> [<path2>...<pathN>]",
+ description = "Directories containing result files to compare.",
+ required = true)
+ private List<String> arguments = new ArrayList<>();
+
+ @Option(title = "queries",
+ name = { "--queries"},
+ description = "Directory to read the queries from. It is produced by the fqltool replay --store-queries option. ",
+ required = true)
+ private String querylog;
+
+ @Override
+ public void run()
+ {
+ compare(querylog, arguments);
+ }
+
+ public static void compare(String querylog, List<String> arguments)
+ {
+ List<ChronicleQueue> readQueues = null;
+ try (ResultHandler rh = new ResultHandler(arguments, null, null);
+ ChronicleQueue queryQ = ChronicleQueueBuilder.single(querylog).readOnly(true).build();
+ FQLQueryIterator queries = new FQLQueryIterator(queryQ.createTailer(), 1))
+ {
+ readQueues = arguments.stream().map(s -> ChronicleQueueBuilder.single(s).readOnly(true).build()).collect(Collectors.toList());
+ List<Iterator<ResultHandler.ComparableResultSet>> its = readQueues.stream().map(q -> new StoredResultSetIterator(q.createTailer())).collect(Collectors.toList());
+ while (queries.hasNext())
+ rh.handleResults(queries.next(), resultSets(its));
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ finally
+ {
+ if (readQueues != null)
+ readQueues.forEach(Closeable::close);
+ }
+ }
+
+ @VisibleForTesting
+ public static List<ResultHandler.ComparableResultSet> resultSets(List<Iterator<ResultHandler.ComparableResultSet>> its)
+ {
+ List<ResultHandler.ComparableResultSet> resultSets = new ArrayList<>(its.size());
+ for (Iterator<ResultHandler.ComparableResultSet> it : its)
+ {
+ if (it.hasNext())
+ resultSets.add(it.next());
+ else
+ resultSets.add(null);
+ }
+ return resultSets;
+ }
+
+ @VisibleForTesting
+ public static class StoredResultSetIterator extends AbstractIterator<ResultHandler.ComparableResultSet>
+ {
+ private final ExcerptTailer tailer;
+
+ public StoredResultSetIterator(ExcerptTailer tailer)
+ {
+ this.tailer = tailer;
+ }
+
+ protected ResultHandler.ComparableResultSet computeNext()
+ {
+ StoredResultSet srs = StoredResultSet.fromTailer(tailer);
+ if (srs.hasMoreResultSets)
+ return srs;
+ return endOfData();
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Dump.java
----------------------------------------------------------------------
diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Dump.java b/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Dump.java
new file mode 100644
index 0000000..1263a11
--- /dev/null
+++ b/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Dump.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.fqltool.commands;
+
+import java.io.File;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import io.airlift.airline.Arguments;
+import io.airlift.airline.Command;
+import io.airlift.airline.Option;
+import io.netty.buffer.Unpooled;
+import net.openhft.chronicle.bytes.Bytes;
+import net.openhft.chronicle.queue.ChronicleQueue;
+import net.openhft.chronicle.queue.ChronicleQueueBuilder;
+import net.openhft.chronicle.queue.ExcerptTailer;
+import net.openhft.chronicle.queue.RollCycles;
+import net.openhft.chronicle.threads.Pauser;
+import net.openhft.chronicle.wire.ReadMarshallable;
+import net.openhft.chronicle.wire.ValueIn;
+import net.openhft.chronicle.wire.WireIn;
+import org.apache.cassandra.audit.FullQueryLogger;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.transport.ProtocolVersion;
+
+/**
+ * Dump the contents of a list of paths containing full query logs
+ */
+@Command(name = "dump", description = "Dump the contents of a full query log")
+public class Dump implements Runnable
+{
+ static final char[] HEXI_DECIMAL = "0123456789ABCDEF".toCharArray();
+
+ @Arguments(usage = "<path1> [<path2>...<pathN>]", description = "Path containing the full query logs to dump.", required = true)
+ private List<String> arguments = new ArrayList<>();
+
+ @Option(title = "roll_cycle", name = {"--roll-cycle"}, description = "How often to roll the log file was rolled. May be necessary for Chronicle to correctly parse file names. (MINUTELY, HOURLY, DAILY). Default HOURLY.")
+ private String rollCycle = "HOURLY";
+
+ @Option(title = "follow", name = {"--follow"}, description = "Upon reacahing the end of the log continue indefinitely waiting for more records")
+ private boolean follow = false;
+
+ @Override
+ public void run()
+ {
+ dump(arguments, rollCycle, follow);
+ }
+
+ public static void dump(List<String> arguments, String rollCycle, boolean follow)
+ {
+ StringBuilder sb = new StringBuilder();
+ ReadMarshallable reader = wireIn ->
+ {
+ sb.setLength(0);
+
+ int version = wireIn.read(FullQueryLogger.VERSION).int16();
+ if (version != FullQueryLogger.CURRENT_VERSION)
+ throw new UnsupportedOperationException("Full query log of unexpected version " + version + " encountered");
+
+ String type = wireIn.read(FullQueryLogger.TYPE).text();
+ sb.append("Type: ")
+ .append(type)
+ .append(System.lineSeparator());
+
+ long queryStartTime = wireIn.read(FullQueryLogger.QUERY_START_TIME).int64();
+ sb.append("Query start time: ")
+ .append(queryStartTime)
+ .append(System.lineSeparator());
+
+ int protocolVersion = wireIn.read(FullQueryLogger.PROTOCOL_VERSION).int32();
+ sb.append("Protocol version: ")
+ .append(protocolVersion)
+ .append(System.lineSeparator());
+
+ QueryOptions options =
+ QueryOptions.codec.decode(Unpooled.wrappedBuffer(wireIn.read(FullQueryLogger.QUERY_OPTIONS).bytes()),
+ ProtocolVersion.decode(protocolVersion));
+
+ long generatedTimestamp = wireIn.read(FullQueryLogger.GENERATED_TIMESTAMP).int64();
+ sb.append("Generated timestamp:")
+ .append(generatedTimestamp)
+ .append(System.lineSeparator());
+
+ int generatedNowInSeconds = wireIn.read(FullQueryLogger.GENERATED_NOW_IN_SECONDS).int32();
+ sb.append("Generated nowInSeconds:")
+ .append(generatedNowInSeconds)
+ .append(System.lineSeparator());
+
+ switch (type)
+ {
+ case (FullQueryLogger.SINGLE_QUERY):
+ dumpQuery(options, wireIn, sb);
+ break;
+
+ case (FullQueryLogger.BATCH):
+ dumpBatch(options, wireIn, sb);
+ break;
+
+ default:
+ throw new UnsupportedOperationException("Log entry of unsupported type " + type);
+ }
+
+ System.out.print(sb.toString());
+ System.out.flush();
+ };
+
+ //Backoff strategy for spinning on the queue, not aggressive at all as this doesn't need to be low latency
+ Pauser pauser = Pauser.millis(100);
+ List<ChronicleQueue> queues = arguments.stream().distinct().map(path -> ChronicleQueueBuilder.single(new File(path)).readOnly(true).rollCycle(RollCycles.valueOf(rollCycle)).build()).collect(Collectors.toList());
+ List<ExcerptTailer> tailers = queues.stream().map(ChronicleQueue::createTailer).collect(Collectors.toList());
+ boolean hadWork = true;
+ while (hadWork)
+ {
+ hadWork = false;
+ for (ExcerptTailer tailer : tailers)
+ {
+ while (tailer.readDocument(reader))
+ {
+ hadWork = true;
+ }
+ }
+
+ if (follow)
+ {
+ if (!hadWork)
+ {
+ //Chronicle queue doesn't support blocking so use this backoff strategy
+ pauser.pause();
+ }
+ //Don't terminate the loop even if there wasn't work
+ hadWork = true;
+ }
+ }
+ }
+
+ private static void dumpQuery(QueryOptions options, WireIn wireIn, StringBuilder sb)
+ {
+ sb.append("Query: ")
+ .append(wireIn.read(FullQueryLogger.QUERY).text())
+ .append(System.lineSeparator());
+
+ List<ByteBuffer> values = options.getValues() != null
+ ? options.getValues()
+ : Collections.emptyList();
+
+ sb.append("Values: ")
+ .append(System.lineSeparator());
+ appendValuesToStringBuilder(values, sb);
+ sb.append(System.lineSeparator());
+ }
+
+ private static void dumpBatch(QueryOptions options, WireIn wireIn, StringBuilder sb)
+ {
+ sb.append("Batch type: ")
+ .append(wireIn.read(FullQueryLogger.BATCH_TYPE).text())
+ .append(System.lineSeparator());
+
+ ValueIn in = wireIn.read(FullQueryLogger.QUERIES);
+ int numQueries = in.int32();
+ List<String> queries = new ArrayList<>(numQueries);
+ for (int i = 0; i < numQueries; i++)
+ queries.add(in.text());
+
+ in = wireIn.read(FullQueryLogger.VALUES);
+ int numValues = in.int32();
+
+ for (int i = 0; i < numValues; i++)
+ {
+ int numSubValues = in.int32();
+ List<ByteBuffer> subValues = new ArrayList<>(numSubValues);
+ for (int j = 0; j < numSubValues; j++)
+ subValues.add(ByteBuffer.wrap(in.bytes()));
+
+ sb.append("Query: ")
+ .append(queries.get(i))
+ .append(System.lineSeparator());
+
+ sb.append("Values: ")
+ .append(System.lineSeparator());
+ appendValuesToStringBuilder(subValues, sb);
+ }
+
+ sb.append(System.lineSeparator());
+ }
+
+ private static void appendValuesToStringBuilder(List<ByteBuffer> values, StringBuilder sb)
+ {
+ boolean first = true;
+ for (ByteBuffer value : values)
+ {
+ Bytes bytes = Bytes.wrapForRead(value);
+ long maxLength2 = Math.min(1024, bytes.readLimit() - bytes.readPosition());
+ toHexString(bytes, bytes.readPosition(), maxLength2, sb);
+ if (maxLength2 < bytes.readLimit() - bytes.readPosition())
+ {
+ sb.append("... truncated").append(System.lineSeparator());
+ }
+
+ if (first)
+ {
+ first = false;
+ }
+ else
+ {
+ sb.append("-----").append(System.lineSeparator());
+ }
+ }
+ }
+
+ //This is from net.openhft.chronicle.bytes, need to pass in the StringBuilder so had to copy
+ /*
+ * Copyright 2016 higherfrequencytrading.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ /**
+ * display the hex data of {@link Bytes} from the position() to the limit()
+ *
+ * @param bytes the buffer you wish to toString()
+ * @return hex representation of the buffer, from example [0D ,OA, FF]
+ */
+ public static String toHexString(final Bytes bytes, long offset, long len, StringBuilder builder)
+ throws BufferUnderflowException
+ {
+ if (len == 0)
+ return "";
+
+ int width = 16;
+ int[] lastLine = new int[width];
+ String sep = "";
+ long position = bytes.readPosition();
+ long limit = bytes.readLimit();
+
+ try {
+ bytes.readPositionRemaining(offset, len);
+
+ long start = offset / width * width;
+ long end = (offset + len + width - 1) / width * width;
+ for (long i = start; i < end; i += width) {
+ // check for duplicate rows
+ if (i + width < end) {
+ boolean same = true;
+
+ for (int j = 0; j < width && i + j < offset + len; j++) {
+ int ch = bytes.readUnsignedByte(i + j);
+ same &= (ch == lastLine[j]);
+ lastLine[j] = ch;
+ }
+ if (i > start && same) {
+ sep = "........\n";
+ continue;
+ }
+ }
+ builder.append(sep);
+ sep = "";
+ String str = Long.toHexString(i);
+ for (int j = str.length(); j < 8; j++)
+ builder.append('0');
+ builder.append(str);
+ for (int j = 0; j < width; j++) {
+ if (j == width / 2)
+ builder.append(' ');
+ if (i + j < offset || i + j >= offset + len) {
+ builder.append(" ");
+
+ } else {
+ builder.append(' ');
+ int ch = bytes.readUnsignedByte(i + j);
+ builder.append(HEXI_DECIMAL[ch >> 4]);
+ builder.append(HEXI_DECIMAL[ch & 15]);
+ }
+ }
+ builder.append(' ');
+ for (int j = 0; j < width; j++) {
+ if (j == width / 2)
+ builder.append(' ');
+ if (i + j < offset || i + j >= offset + len) {
+ builder.append(' ');
+
+ } else {
+ int ch = bytes.readUnsignedByte(i + j);
+ if (ch < ' ' || ch > 126)
+ ch = '\u00B7';
+ builder.append((char) ch);
+ }
+ }
+ builder.append("\n");
+ }
+ return builder.toString();
+ } finally {
+ bytes.readLimit(limit);
+ bytes.readPosition(position);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Replay.java
----------------------------------------------------------------------
diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Replay.java b/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Replay.java
new file mode 100644
index 0000000..adea742
--- /dev/null
+++ b/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Replay.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.fqltool.commands;
+
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import io.airlift.airline.Arguments;
+import io.airlift.airline.Command;
+import io.airlift.airline.Option;
+import net.openhft.chronicle.core.io.Closeable;
+import net.openhft.chronicle.queue.ChronicleQueue;
+import net.openhft.chronicle.queue.ChronicleQueueBuilder;
+
+import org.apache.cassandra.fqltool.FQLQuery;
+import org.apache.cassandra.fqltool.FQLQueryIterator;
+import org.apache.cassandra.fqltool.QueryReplayer;
+import org.apache.cassandra.utils.AbstractIterator;
+import org.apache.cassandra.utils.MergeIterator;
+
+/**
+ * replay the contents of a list of paths containing full query logs
+ */
+@Command(name = "replay", description = "Replay full query logs")
+public class Replay implements Runnable
+{
+ @Arguments(usage = "<path1> [<path2>...<pathN>]", description = "Paths containing the full query logs to replay.", required = true)
+ private List<String> arguments = new ArrayList<>();
+
+ @Option(title = "target", name = {"--target"}, description = "Hosts to replay the logs to, can be repeated to replay to more hosts.")
+ private List<String> targetHosts;
+
+ @Option(title = "results", name = { "--results"}, description = "Where to store the results of the queries, this should be a directory. Leave this option out to avoid storing results.")
+ private String resultPath;
+
+ @Option(title = "keyspace", name = { "--keyspace"}, description = "Only replay queries against this keyspace and queries without keyspace set.")
+ private String keyspace;
+
+ @Option(title = "debug", name = {"--debug"}, description = "Debug mode, print all queries executed.")
+ private boolean debug;
+
+ @Option(title = "store_queries", name = {"--store-queries"}, description = "Path to store the queries executed. Stores queries in the same order as the result sets are in the result files. Requires --results")
+ private String queryStorePath;
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ List<File> resultPaths = null;
+ if (resultPath != null)
+ {
+ File basePath = new File(resultPath);
+ if (!basePath.exists() || !basePath.isDirectory())
+ {
+ System.err.println("The results path (" + basePath + ") should be an existing directory");
+ System.exit(1);
+ }
+ resultPaths = targetHosts.stream().map(target -> new File(basePath, target)).collect(Collectors.toList());
+ resultPaths.forEach(File::mkdir);
+ }
+ if (targetHosts.size() < 1)
+ {
+ System.err.println("You need to state at least one --target host to replay the query against");
+ System.exit(1);
+ }
+ replay(keyspace, arguments, targetHosts, resultPaths, queryStorePath, debug);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static void replay(String keyspace, List<String> arguments, List<String> targetHosts, List<File> resultPaths, String queryStorePath, boolean debug)
+ {
+ int readAhead = 200; // how many fql queries should we read in to memory to be able to sort them?
+ List<ChronicleQueue> readQueues = null;
+ List<FQLQueryIterator> iterators = null;
+ List<Predicate<FQLQuery>> filters = new ArrayList<>();
+
+ if (keyspace != null)
+ filters.add(fqlQuery -> fqlQuery.keyspace() == null || fqlQuery.keyspace().equals(keyspace));
+
+ try
+ {
+ readQueues = arguments.stream().map(s -> ChronicleQueueBuilder.single(s).readOnly(true).build()).collect(Collectors.toList());
+ iterators = readQueues.stream().map(ChronicleQueue::createTailer).map(tailer -> new FQLQueryIterator(tailer, readAhead)).collect(Collectors.toList());
+ try (MergeIterator<FQLQuery, List<FQLQuery>> iter = MergeIterator.get(iterators, FQLQuery::compareTo, new Reducer());
+ QueryReplayer replayer = new QueryReplayer(iter, targetHosts, resultPaths, filters, System.out, queryStorePath, debug))
+ {
+ replayer.replay();
+ }
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ finally
+ {
+ if (iterators != null)
+ iterators.forEach(AbstractIterator::close);
+ if (readQueues != null)
+ readQueues.forEach(Closeable::close);
+ }
+ }
+
+ @VisibleForTesting
+ public static class Reducer extends MergeIterator.Reducer<FQLQuery, List<FQLQuery>>
+ {
+ List<FQLQuery> queries = new ArrayList<>();
+ public void reduce(int idx, FQLQuery current)
+ {
+ queries.add(current);
+ }
+
+ protected List<FQLQuery> getReduced()
+ {
+ return queries;
+ }
+ protected void onKeyChange()
+ {
+ queries.clear();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/fqltool/test/unit/org/apache/cassandra/fqltool/FQLCompareTest.java
----------------------------------------------------------------------
diff --git a/tools/fqltool/test/unit/org/apache/cassandra/fqltool/FQLCompareTest.java b/tools/fqltool/test/unit/org/apache/cassandra/fqltool/FQLCompareTest.java
new file mode 100644
index 0000000..7990b7e
--- /dev/null
+++ b/tools/fqltool/test/unit/org/apache/cassandra/fqltool/FQLCompareTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.fqltool;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import net.openhft.chronicle.core.io.Closeable;
+import net.openhft.chronicle.queue.ChronicleQueue;
+import net.openhft.chronicle.queue.ChronicleQueueBuilder;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.fqltool.commands.Compare;
+import org.apache.cassandra.tools.Util;
+
+
+import static org.psjava.util.AssertStatus.assertTrue;
+
+public class FQLCompareTest
+{
+ public FQLCompareTest()
+ {
+ Util.initDatabaseDescriptor();
+ }
+
+ @Test
+ public void endToEnd() throws IOException
+ {
+ List<String> targetHosts = Lists.newArrayList("hosta", "hostb");
+ File tmpDir = Files.createTempDirectory("testresulthandler").toFile();
+ File queryDir = Files.createTempDirectory("queries").toFile();
+ List<File> resultPaths = generateResultSets(targetHosts, tmpDir, queryDir, true, false);
+ Compare.compare(queryDir.toString(), resultPaths.stream().map(File::toString).collect(Collectors.toList()));
+ }
+
+ @Test
+ public void endToEndQueryFailures() throws IOException
+ {
+ List<String> targetHosts = Lists.newArrayList("hosta", "hostb");
+ File tmpDir = Files.createTempDirectory("testresulthandler").toFile();
+ File queryDir = Files.createTempDirectory("queries").toFile();
+ List<File> resultPaths = generateResultSets(targetHosts, tmpDir, queryDir, true,true);
+ Compare.compare(queryDir.toString(), resultPaths.stream().map(File::toString).collect(Collectors.toList()));
+ }
+
+ @Test
+ public void compareEqual() throws IOException
+ {
+ List<String> targetHosts = Lists.newArrayList("hosta", "hostb");
+ File tmpDir = Files.createTempDirectory("testresulthandler").toFile();
+ File queryDir = Files.createTempDirectory("queries").toFile();
+ List<File> resultPaths = generateResultSets(targetHosts, tmpDir, queryDir, false,false);
+
+ ResultComparator comparator = new ResultComparator();
+ List<ChronicleQueue> readQueues = null;
+ try
+ {
+ readQueues = resultPaths.stream().map(s -> ChronicleQueueBuilder.single(s).readOnly(true).build()).collect(Collectors.toList());
+ List<Iterator<ResultHandler.ComparableResultSet>> its = readQueues.stream().map(q -> new Compare.StoredResultSetIterator(q.createTailer())).collect(Collectors.toList());
+ List<ResultHandler.ComparableResultSet> resultSets = Compare.resultSets(its);
+ while(resultSets.stream().allMatch(Objects::nonNull))
+ {
+ assertTrue(comparator.compareColumnDefinitions(targetHosts, query(), resultSets.stream().map(ResultHandler.ComparableResultSet::getColumnDefinitions).collect(Collectors.toList())));
+ List<Iterator<ResultHandler.ComparableRow>> rows = resultSets.stream().map(Iterable::iterator).collect(Collectors.toList());
+
+ List<ResultHandler.ComparableRow> toCompare = ResultHandler.rows(rows);
+
+ while (toCompare.stream().allMatch(Objects::nonNull))
+ {
+ assertTrue(comparator.compareRows(targetHosts, query(), ResultHandler.rows(rows)));
+ toCompare = ResultHandler.rows(rows);
+ }
+ resultSets = Compare.resultSets(its);
+ }
+ }
+ finally
+ {
+ if (readQueues != null)
+ readQueues.forEach(Closeable::close);
+ }
+ }
+
+ private List<File> generateResultSets(List<String> targetHosts, File resultDir, File queryDir, boolean random, boolean includeFailures) throws IOException
+ {
+ List<File> resultPaths = new ArrayList<>();
+ targetHosts.forEach(host -> { File f = new File(resultDir, host); f.mkdir(); resultPaths.add(f);});
+
+ try (ResultHandler rh = new ResultHandler(targetHosts, resultPaths, queryDir))
+ {
+ for (int i = 0; i < 100; i++)
+ {
+ ResultHandler.ComparableResultSet resultSet1 = includeFailures && (i % 10 == 0)
+ ? StoredResultSet.failed("test failure!")
+ : FQLReplayTest.createResultSet(10, 10, random);
+ ResultHandler.ComparableResultSet resultSet2 = FQLReplayTest.createResultSet(10, 10, random);
+ rh.handleResults(query(), Lists.newArrayList(resultSet1, resultSet2));
+ }
+ }
+ return resultPaths;
+ }
+
+ private FQLQuery.Single query()
+ {
+ return new FQLQuery.Single("abc", QueryOptions.DEFAULT.getProtocolVersion().asInt(), QueryOptions.DEFAULT, 12345, 5555, 6666, "select * from xyz", Collections.emptyList());
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org