You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2018/09/01 08:44:23 UTC
[7/7] cassandra git commit: Add fqltool compare
Add fqltool compare
Patch by marcuse; reviewed by Jason Brown and Dinesh Joshi for CASSANDRA-14619
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f83bd5ac
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f83bd5ac
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f83bd5ac
Branch: refs/heads/trunk
Commit: f83bd5ac2bbc6755213a6ad0675e7e5400c79670
Parents: 62ffb77
Author: Marcus Eriksson <ma...@apache.org>
Authored: Fri Aug 24 14:41:09 2018 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Sat Sep 1 09:59:21 2018 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
bin/fqltool | 76 --
bin/fqltool.bat | 36 -
build.xml | 67 +-
ide/idea-iml-file.xml | 2 +
ide/idea/workspace.xml | 1 +
.../cassandra/tools/FullQueryLogTool.java | 95 ---
.../tools/fqltool/DriverResultSet.java | 241 ------
.../cassandra/tools/fqltool/FQLQuery.java | 278 -------
.../tools/fqltool/FQLQueryIterator.java | 72 --
.../cassandra/tools/fqltool/FQLQueryReader.java | 116 ---
.../cassandra/tools/fqltool/QueryReplayer.java | 167 ----
.../tools/fqltool/ResultComparator.java | 116 ---
.../cassandra/tools/fqltool/ResultHandler.java | 124 ---
.../cassandra/tools/fqltool/ResultStore.java | 142 ----
.../cassandra/tools/fqltool/commands/Dump.java | 325 --------
.../tools/fqltool/commands/Replay.java | 148 ----
.../cassandra/tools/fqltool/FQLReplayTest.java | 760 -------------------
tools/bin/cassandra.in.bat | 2 +-
tools/bin/cassandra.in.sh | 2 +-
tools/bin/fqltool | 76 ++
tools/bin/fqltool.bat | 36 +
.../cassandra/fqltool/DriverResultSet.java | 248 ++++++
.../org/apache/cassandra/fqltool/FQLQuery.java | 265 +++++++
.../cassandra/fqltool/FQLQueryIterator.java | 72 ++
.../cassandra/fqltool/FQLQueryReader.java | 116 +++
.../cassandra/fqltool/FullQueryLogTool.java | 99 +++
.../apache/cassandra/fqltool/QueryReplayer.java | 172 +++++
.../cassandra/fqltool/ResultComparator.java | 116 +++
.../apache/cassandra/fqltool/ResultHandler.java | 133 ++++
.../apache/cassandra/fqltool/ResultStore.java | 291 +++++++
.../cassandra/fqltool/StoredResultSet.java | 292 +++++++
.../cassandra/fqltool/commands/Compare.java | 120 +++
.../apache/cassandra/fqltool/commands/Dump.java | 325 ++++++++
.../cassandra/fqltool/commands/Replay.java | 148 ++++
.../cassandra/fqltool/FQLCompareTest.java | 131 ++++
.../apache/cassandra/fqltool/FQLReplayTest.java | 675 ++++++++++++++++
37 files changed, 3384 insertions(+), 2702 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1227337..1ba9975 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0
+ * Add fqltool compare (CASSANDRA-14619)
* Add fqltool replay (CASSANDRA-14618)
* Log keyspace in full query log (CASSANDRA-14656)
* Transient Replication and Cheap Quorums (CASSANDRA-14404)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/bin/fqltool
----------------------------------------------------------------------
diff --git a/bin/fqltool b/bin/fqltool
deleted file mode 100755
index 15a0b20..0000000
--- a/bin/fqltool
+++ /dev/null
@@ -1,76 +0,0 @@
-#!/bin/sh
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-if [ "x$CASSANDRA_INCLUDE" = "x" ]; then
- # Locations (in order) to use when searching for an include file.
- for include in "`dirname "$0"`/cassandra.in.sh" \
- "$HOME/.cassandra.in.sh" \
- /usr/share/cassandra/cassandra.in.sh \
- /usr/local/share/cassandra/cassandra.in.sh \
- /opt/cassandra/cassandra.in.sh; do
- if [ -r "$include" ]; then
- . "$include"
- break
- fi
- done
-elif [ -r "$CASSANDRA_INCLUDE" ]; then
- . "$CASSANDRA_INCLUDE"
-fi
-
-if [ -z "$CASSANDRA_CONF" -o -z "$CLASSPATH" ]; then
- echo "You must set the CASSANDRA_CONF and CLASSPATH vars" >&2
- exit 1
-fi
-
-# Run cassandra-env.sh to pick up JMX_PORT
-if [ -f "$CASSANDRA_CONF/cassandra-env.sh" ]; then
- JVM_OPTS_SAVE=$JVM_OPTS
- MAX_HEAP_SIZE_SAVE=$MAX_HEAP_SIZE
- . "$CASSANDRA_CONF/cassandra-env.sh"
- MAX_HEAP_SIZE=$MAX_HEAP_SIZE_SAVE
- JVM_OPTS=$JVM_OPTS_SAVE
-fi
-
-# JMX Port passed via cmd line args (-p 9999 / --port 9999 / --port=9999)
-# should override the value from cassandra-env.sh
-ARGS=""
-JVM_ARGS=""
-while true
-do
- if [ ! $1 ]; then break; fi
- case $1 in
- -D*)
- JVM_ARGS="$JVM_ARGS $1"
- ;;
- *)
- ARGS="$ARGS $1"
- ;;
- esac
- shift
-done
-
-if [ "x$MAX_HEAP_SIZE" = "x" ]; then
- MAX_HEAP_SIZE="512m"
-fi
-
-"$JAVA" $JAVA_AGENT -ea -da:net.openhft... -cp "$CLASSPATH" $JVM_OPTS -Xmx$MAX_HEAP_SIZE \
- -Dlog4j.configurationFile=log4j2-tools.xml \
- $JVM_ARGS \
- org.apache.cassandra.tools.FullQueryLogTool $ARGS
-
-# vi:ai sw=4 ts=4 tw=0 et
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/bin/fqltool.bat
----------------------------------------------------------------------
diff --git a/bin/fqltool.bat b/bin/fqltool.bat
deleted file mode 100644
index f3103d8..0000000
--- a/bin/fqltool.bat
+++ /dev/null
@@ -1,36 +0,0 @@
-@REM
-@REM Licensed to the Apache Software Foundation (ASF) under one or more
-@REM contributor license agreements. See the NOTICE file distributed with
-@REM this work for additional information regarding copyright ownership.
-@REM The ASF licenses this file to You under the Apache License, Version 2.0
-@REM (the "License"); you may not use this file except in compliance with
-@REM the License. You may obtain a copy of the License at
-@REM
-@REM http://www.apache.org/licenses/LICENSE-2.0
-@REM
-@REM Unless required by applicable law or agreed to in writing, software
-@REM distributed under the License is distributed on an "AS IS" BASIS,
-@REM WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-@REM See the License for the specific language governing permissions and
-@REM limitations under the License.
-
-@echo off
-if "%OS%" == "Windows_NT" setlocal
-
-pushd "%~dp0"
-call cassandra.in.bat
-
-if NOT DEFINED JAVA_HOME goto :err
-
-set CASSANDRA_PARAMS=%CASSANDRA_PARAMS% -Dcassandra.logdir="%CASSANDRA_HOME%\logs"
-
-"%JAVA_HOME%\bin\java" -cp %CASSANDRA_CLASSPATH% %CASSANDRA_PARAMS% -Dlog4j.configurationFile=log4j2-tools.xml org.apache.cassandra.tools.FullQueryLogTool %*
-goto finally
-
-:err
-echo The JAVA_HOME environment variable must be set to run this program!
-pause
-
-:finally
-ENDLOCAL & set RC=%ERRORLEVEL%
-exit /B %RC%
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index b53c47b..1c957d3 100644
--- a/build.xml
+++ b/build.xml
@@ -233,6 +233,7 @@
<mkdir dir="${test.lib}"/>
<mkdir dir="${test.classes}"/>
<mkdir dir="${stress.test.classes}"/>
+ <mkdir dir="${fqltool.test.classes}"/>
<mkdir dir="${build.src.gen-java}"/>
<mkdir dir="${build.dir.lib}"/>
<mkdir dir="${jacoco.export.dir}"/>
@@ -914,6 +915,50 @@
</testmacro>
</target>
+ <!--
+ fqltool build file
+ -->
+ <property name="fqltool.build.src" value="${basedir}/tools/fqltool/src" />
+ <property name="fqltool.test.src" value="${basedir}/tools/fqltool/test/unit" />
+ <property name="fqltool.build.classes" value="${build.classes}/fqltool" />
+ <property name="fqltool.test.classes" value="${build.dir}/test/fqltool-classes" />
+ <property name="fqltool.manifest" value="${fqltool.build.classes}/MANIFEST.MF" />
+
+ <target name="fqltool-build-test" depends="fqltool-build" description="Compile fqltool tests">
+ <javac debug="true" debuglevel="${debuglevel}" destdir="${fqltool.test.classes}"
+ source="${source.version}" target="${target.version}"
+ includeantruntime="false" encoding="utf-8">
+ <classpath>
+ <path refid="cassandra.classpath"/>
+ <pathelement location="${fqltool.build.classes}" />
+ </classpath>
+ <src path="${fqltool.test.src}"/>
+ </javac>
+ </target>
+
+ <target name="fqltool-build" depends="build" description="build fqltool">
+ <mkdir dir="${fqltool.build.classes}" />
+ <javac compiler="modern" debug="true" debuglevel="${debuglevel}"
+ source="${source.version}" target="${target.version}"
+ encoding="utf-8" destdir="${fqltool.build.classes}" includeantruntime="true">
+ <src path="${fqltool.build.src}" />
+ <classpath>
+ <path refid="cassandra.classes" />
+ <path>
+ <fileset dir="${build.lib}">
+ <include name="**/*.jar" />
+ </fileset>
+ </path>
+ </classpath>
+ </javac>
+ </target>
+
+ <target name="fqltool-test" depends="fqltool-build-test, build-test" description="Runs fqltool tests">
+ <testmacro inputdir="${fqltool.test.src}"
+ timeout="${test.timeout}">
+ </testmacro>
+ </target>
+
<target name="_write-poms" depends="maven-declare-dependencies">
<artifact:writepom pomRefId="parent-pom" file="${build.dir}/${final.name}-parent.pom"/>
<artifact:writepom pomRefId="all-pom" file="${build.dir}/${final.name}.pom"/>
@@ -950,7 +995,7 @@
</jar>
</target>
<target name="jar"
- depends="_main-jar, build-test, stress-build, write-poms"
+ depends="_main-jar, build-test, stress-build, fqltool-build, write-poms"
description="Assemble Cassandra JAR files">
<!-- Stress jar -->
<manifest file="${stress.manifest}">
@@ -962,6 +1007,16 @@
<jar destfile="${build.dir}/tools/lib/stress.jar" manifest="${stress.manifest}">
<fileset dir="${stress.build.classes}"/>
</jar>
+ <!-- fqltool jar -->
+ <manifest file="${fqltool.manifest}">
+ <attribute name="Built-By" value="Marcus Eriksson"/>
+ <attribute name="Main-Class" value="org.apache.cassandra.fqltool.FullQueryLogTool"/>
+ </manifest>
+ <mkdir dir="${fqltool.build.classes}/META-INF" />
+ <mkdir dir="${build.dir}/tools/lib/" />
+ <jar destfile="${build.dir}/tools/lib/fqltool.jar" manifest="${stress.manifest}">
+ <fileset dir="${fqltool.build.classes}"/>
+ </jar>
</target>
<!--
@@ -1178,7 +1233,7 @@
<!-- use https://github.com/krummas/jstackjunit to get thread dumps when unit tests time out -->
<taskdef name="junit" classname="org.krummas.junit.JStackJUnitTask" classpath="lib/jstackjunit-0.0.1.jar"/>
- <target name="build-test" depends="_main-jar, stress-build, write-poms" description="Compile test classes">
+ <target name="build-test" depends="_main-jar, stress-build, fqltool-build, write-poms" description="Compile test classes">
<javac
compiler="modern"
debug="true"
@@ -1257,9 +1312,11 @@
<classpath>
<pathelement path="${java.class.path}"/>
<pathelement location="${stress.build.classes}"/>
+ <pathelement location="${fqltool.build.classes}"/>
<path refid="cassandra.classpath.test" />
<pathelement location="${test.classes}"/>
<pathelement location="${stress.test.classes}"/>
+ <pathelement location="${fqltool.test.classes}"/>
<pathelement location="${test.conf}"/>
<fileset dir="${test.lib}">
<include name="**/*.jar" />
@@ -1392,7 +1449,7 @@
</testmacro>
</target>
- <target name="test-compression" depends="build-test, stress-build" description="Execute unit tests with sstable compression enabled">
+ <target name="test-compression" depends="build-test, stress-build, fqltool-build" description="Execute unit tests with sstable compression enabled">
<property name="compressed_yaml" value="${build.test.dir}/cassandra.compressed.yaml"/>
<concat destfile="${compressed_yaml}">
<fileset file="${test.conf}/cassandra.yaml"/>
@@ -1529,7 +1586,7 @@
</target>
<target name="test-all"
- depends="eclipse-warnings,test,long-test,test-compression,stress-test"
+ depends="eclipse-warnings,test,long-test,test-compression,stress-test,fqltool-test"
description="Run all tests except for those under test-burn" />
<!-- Use JaCoCo ant extension without needing externally saved lib -->
@@ -1848,7 +1905,9 @@
<classpathentry kind="src" output="build/test/classes" path="test/long"/>
<classpathentry kind="src" output="build/test/classes" path="test/resources" />
<classpathentry kind="src" path="tools/stress/src"/>
+ <classpathentry kind="src" path="tools/fqltool/src"/>
<classpathentry kind="src" output="build/test/stress-classes" path="tools/stress/test/unit" />
+ <classpathentry kind="src" output="build/test/fqltool-classes" path="tools/fqltool/test/unit" />
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry kind="output" path="build/classes/eclipse"/>
<classpathentry kind="lib" path="test/conf"/>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/ide/idea-iml-file.xml
----------------------------------------------------------------------
diff --git a/ide/idea-iml-file.xml b/ide/idea-iml-file.xml
index 2512b1d..b83abfa 100644
--- a/ide/idea-iml-file.xml
+++ b/ide/idea-iml-file.xml
@@ -29,6 +29,8 @@
<sourceFolder url="file://$MODULE_DIR$/src/resources" type="java-resource" />
<sourceFolder url="file://$MODULE_DIR$/tools/stress/src" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/tools/stress/test/unit" isTestSource="true" />
+ <sourceFolder url="file://$MODULE_DIR$/tools/fqltool/src" isTestSource="false" />
+ <sourceFolder url="file://$MODULE_DIR$/tools/fqltool/test/unit" isTestSource="true" />
<sourceFolder url="file://$MODULE_DIR$/test/unit" isTestSource="true" />
<sourceFolder url="file://$MODULE_DIR$/test/long" isTestSource="true" />
<sourceFolder url="file://$MODULE_DIR$/test/microbench" isTestSource="true" />
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/ide/idea/workspace.xml
----------------------------------------------------------------------
diff --git a/ide/idea/workspace.xml b/ide/idea/workspace.xml
index 876ed58..a2dea2a 100644
--- a/ide/idea/workspace.xml
+++ b/ide/idea/workspace.xml
@@ -297,6 +297,7 @@
<filter targetName="codecoverage" isVisible="true" />
<filter targetName="build-project" isVisible="false" />
<filter targetName="stress-build" isVisible="true" />
+ <filter targetName="fqltool-build" isVisible="true" />
<filter targetName="_write-poms" isVisible="false" />
<filter targetName="write-poms" isVisible="false" />
<filter targetName="jar" isVisible="true" />
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/src/java/org/apache/cassandra/tools/FullQueryLogTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/FullQueryLogTool.java b/src/java/org/apache/cassandra/tools/FullQueryLogTool.java
deleted file mode 100644
index c1d4713..0000000
--- a/src/java/org/apache/cassandra/tools/FullQueryLogTool.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.tools;
-
-import java.util.List;
-
-import com.google.common.base.Throwables;
-
-import io.airlift.airline.Cli;
-import io.airlift.airline.Help;
-import io.airlift.airline.ParseArgumentsMissingException;
-import io.airlift.airline.ParseArgumentsUnexpectedException;
-import io.airlift.airline.ParseCommandMissingException;
-import io.airlift.airline.ParseCommandUnrecognizedException;
-import io.airlift.airline.ParseOptionConversionException;
-import io.airlift.airline.ParseOptionMissingException;
-import io.airlift.airline.ParseOptionMissingValueException;
-import org.apache.cassandra.tools.fqltool.commands.Dump;
-import org.apache.cassandra.tools.fqltool.commands.Replay;
-
-import static com.google.common.base.Throwables.getStackTraceAsString;
-import static com.google.common.collect.Lists.newArrayList;
-
-public class FullQueryLogTool
-{
- public static void main(String... args)
- {
- List<Class<? extends Runnable>> commands = newArrayList(
- Help.class,
- Dump.class,
- Replay.class
- );
-
- Cli.CliBuilder<Runnable> builder = Cli.builder("fqltool");
-
- builder.withDescription("Manipulate the contents of full query log files")
- .withDefaultCommand(Help.class)
- .withCommands(commands);
-
- Cli<Runnable> parser = builder.build();
-
- int status = 0;
- try
- {
- parser.parse(args).run();
- } catch (IllegalArgumentException |
- IllegalStateException |
- ParseArgumentsMissingException |
- ParseArgumentsUnexpectedException |
- ParseOptionConversionException |
- ParseOptionMissingException |
- ParseOptionMissingValueException |
- ParseCommandMissingException |
- ParseCommandUnrecognizedException e)
- {
- badUse(e);
- status = 1;
- } catch (Throwable throwable)
- {
- err(Throwables.getRootCause(throwable));
- status = 2;
- }
-
- System.exit(status);
- }
-
- private static void badUse(Exception e)
- {
- System.out.println("fqltool: " + e.getMessage());
- System.out.println("See 'fqltool help' or 'fqltool help <command>'.");
- }
-
- private static void err(Throwable e)
- {
- System.err.println("error: " + e.getMessage());
- System.err.println("-- StackTrace --");
- System.err.println(getStackTraceAsString(e));
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/src/java/org/apache/cassandra/tools/fqltool/DriverResultSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/fqltool/DriverResultSet.java b/src/java/org/apache/cassandra/tools/fqltool/DriverResultSet.java
deleted file mode 100644
index 6c4ee45..0000000
--- a/src/java/org/apache/cassandra/tools/fqltool/DriverResultSet.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.tools.fqltool;
-
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.stream.Collectors;
-
-import com.google.common.collect.AbstractIterator;
-
-import com.datastax.driver.core.ColumnDefinitions;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-
-/**
- * Wraps a result set from the driver so that we can reuse the compare code when reading
- * up a result set produced by ResultStore.
- */
-public class DriverResultSet implements ResultHandler.ComparableResultSet
-{
- private final ResultSet resultSet;
- private final Throwable failureException;
-
- public DriverResultSet(ResultSet resultSet)
- {
- this(resultSet, null);
- }
-
- private DriverResultSet(ResultSet res, Throwable failureException)
- {
- resultSet = res;
- this.failureException = failureException;
- }
-
- public static DriverResultSet failed(Throwable ex)
- {
- return new DriverResultSet(null, ex);
- }
-
- public ResultHandler.ComparableColumnDefinitions getColumnDefinitions()
- {
- if (wasFailed())
- return new DriverColumnDefinitions(null, true);
-
- return new DriverColumnDefinitions(resultSet.getColumnDefinitions());
- }
-
- public boolean wasFailed()
- {
- return failureException != null;
- }
-
- public Throwable getFailureException()
- {
- return failureException;
- }
-
- public Iterator<ResultHandler.ComparableRow> iterator()
- {
- if (wasFailed())
- return Collections.emptyListIterator();
- return new AbstractIterator<ResultHandler.ComparableRow>()
- {
- Iterator<Row> iter = resultSet.iterator();
- protected ResultHandler.ComparableRow computeNext()
- {
- if (iter.hasNext())
- return new DriverRow(iter.next());
- return endOfData();
- }
- };
- }
-
- public static class DriverRow implements ResultHandler.ComparableRow
- {
- private final Row row;
-
- public DriverRow(Row row)
- {
- this.row = row;
- }
-
- public ResultHandler.ComparableColumnDefinitions getColumnDefinitions()
- {
- return new DriverColumnDefinitions(row.getColumnDefinitions());
- }
-
- public ByteBuffer getBytesUnsafe(int i)
- {
- return row.getBytesUnsafe(i);
- }
-
- @Override
- public boolean equals(Object oo)
- {
- if (!(oo instanceof ResultHandler.ComparableRow))
- return false;
-
- ResultHandler.ComparableRow o = (ResultHandler.ComparableRow)oo;
- if (getColumnDefinitions().size() != o.getColumnDefinitions().size())
- return false;
-
- for (int j = 0; j < getColumnDefinitions().size(); j++)
- {
- ByteBuffer b1 = getBytesUnsafe(j);
- ByteBuffer b2 = o.getBytesUnsafe(j);
-
- if (b1 != null && b2 != null && !b1.equals(b2))
- {
- return false;
- }
- if (b1 == null && b2 != null || b2 == null && b1 != null)
- {
- return false;
- }
- }
- return true;
- }
-
- public String toString()
- {
- StringBuilder sb = new StringBuilder();
- List<ResultHandler.ComparableDefinition> colDefs = getColumnDefinitions().asList();
- for (int i = 0; i < getColumnDefinitions().size(); i++)
- {
- ByteBuffer bb = getBytesUnsafe(i);
- String row = bb != null ? ByteBufferUtil.bytesToHex(bb) : "NULL";
- sb.append(colDefs.get(i)).append(':').append(row).append(",");
- }
- return sb.toString();
- }
- }
-
- public static class DriverColumnDefinitions implements ResultHandler.ComparableColumnDefinitions
- {
- private final ColumnDefinitions columnDefinitions;
- private final boolean failed;
-
- public DriverColumnDefinitions(ColumnDefinitions columnDefinitions)
- {
- this(columnDefinitions, false);
- }
-
- private DriverColumnDefinitions(ColumnDefinitions columnDefinitions, boolean failed)
- {
- this.columnDefinitions = columnDefinitions;
- this.failed = failed;
- }
-
- public List<ResultHandler.ComparableDefinition> asList()
- {
- if (wasFailed())
- return Collections.emptyList();
- return columnDefinitions.asList().stream().map(DriverDefinition::new).collect(Collectors.toList());
- }
-
- public boolean wasFailed()
- {
- return failed;
- }
-
- public int size()
- {
- return columnDefinitions.size();
- }
-
- public Iterator<ResultHandler.ComparableDefinition> iterator()
- {
- return asList().iterator();
- }
-
- public boolean equals(Object oo)
- {
- if (!(oo instanceof ResultHandler.ComparableColumnDefinitions))
- return false;
-
- ResultHandler.ComparableColumnDefinitions o = (ResultHandler.ComparableColumnDefinitions)oo;
- if (wasFailed() && o.wasFailed())
- return true;
-
- if (size() != o.size())
- return false;
-
- return asList().equals(o.asList());
- }
- }
-
- public static class DriverDefinition implements ResultHandler.ComparableDefinition
- {
- private final ColumnDefinitions.Definition def;
-
- public DriverDefinition(ColumnDefinitions.Definition def)
- {
- this.def = def;
- }
-
- public String getType()
- {
- return def.getType().toString();
- }
-
- public String getName()
- {
- return def.getName();
- }
-
- public boolean equals(Object oo)
- {
- if (!(oo instanceof ResultHandler.ComparableDefinition))
- return false;
-
- return def.equals(((DriverDefinition)oo).def);
- }
-
- public String toString()
- {
- return getName() + ':' + getType();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/src/java/org/apache/cassandra/tools/fqltool/FQLQuery.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/fqltool/FQLQuery.java b/src/java/org/apache/cassandra/tools/fqltool/FQLQuery.java
deleted file mode 100644
index 6c0a6b9..0000000
--- a/src/java/org/apache/cassandra/tools/fqltool/FQLQuery.java
+++ /dev/null
@@ -1,278 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.tools.fqltool;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-import com.google.common.primitives.Longs;
-
-import com.datastax.driver.core.BatchStatement;
-import com.datastax.driver.core.ConsistencyLevel;
-import com.datastax.driver.core.SimpleStatement;
-import com.datastax.driver.core.Statement;
-import org.apache.cassandra.audit.FullQueryLogger;
-import org.apache.cassandra.cql3.QueryOptions;
-import org.apache.cassandra.service.ClientState;
-import org.apache.cassandra.service.QueryState;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.binlog.BinLog;
-
-public abstract class FQLQuery implements Comparable<FQLQuery>
-{
- public final long queryStartTime;
- public final QueryOptions queryOptions;
- public final int protocolVersion;
- public final String keyspace;
- public final long generatedTimestamp;
- private final int generatedNowInSeconds;
-
- public FQLQuery(String keyspace, int protocolVersion, QueryOptions queryOptions, long queryStartTime, long generatedTimestamp, int generatedNowInSeconds)
- {
- this.queryStartTime = queryStartTime;
- this.queryOptions = queryOptions;
- this.protocolVersion = protocolVersion;
- this.keyspace = keyspace;
- this.generatedTimestamp = generatedTimestamp;
- this.generatedNowInSeconds = generatedNowInSeconds;
- }
-
- public abstract Statement toStatement();
-
- /**
- * used when storing the queries executed
- */
- public abstract BinLog.ReleaseableWriteMarshallable toMarshallable();
-
- public QueryState queryState()
- {
- ClientState clientState = keyspace != null ? ClientState.forInternalCalls(keyspace) : ClientState.forInternalCalls();
-
- return new QueryState(clientState, generatedTimestamp, generatedNowInSeconds);
- }
-
- public boolean equals(Object o)
- {
- if (this == o) return true;
- if (!(o instanceof FQLQuery)) return false;
- FQLQuery fqlQuery = (FQLQuery) o;
- return queryStartTime == fqlQuery.queryStartTime &&
- protocolVersion == fqlQuery.protocolVersion &&
- generatedTimestamp == fqlQuery.generatedTimestamp &&
- generatedNowInSeconds == fqlQuery.generatedNowInSeconds &&
- Objects.equals(queryOptions.getValues(), fqlQuery.queryOptions.getValues()) &&
- Objects.equals(keyspace, fqlQuery.keyspace);
- }
-
- public int hashCode()
- {
- return Objects.hash(queryStartTime, queryOptions, protocolVersion, keyspace, generatedTimestamp, generatedNowInSeconds);
- }
-
- public int compareTo(FQLQuery other)
- {
- int cmp = Longs.compare(queryStartTime, other.queryStartTime);
- if (cmp != 0)
- return cmp;
- cmp = Longs.compare(generatedTimestamp, other.generatedTimestamp);
- if (cmp != 0)
- return cmp;
-
- return Longs.compare(generatedNowInSeconds, other.generatedNowInSeconds);
- }
-
- public String toString()
- {
- return "FQLQuery{" +
- "queryStartTime=" + queryStartTime +
- ", protocolVersion=" + protocolVersion +
- ", keyspace='" + keyspace + '\'' +
- ", generatedTimestamp=" + generatedTimestamp +
- ", generatedNowInSeconds=" + generatedNowInSeconds +
- '}';
- }
-
- public static class Single extends FQLQuery
- {
- public final String query;
- public final List<ByteBuffer> values;
-
- public Single(String keyspace, int protocolVersion, QueryOptions queryOptions, long queryStartTime, long generatedTimestamp, int generatedNowInSeconds, String queryString, List<ByteBuffer> values)
- {
- super(keyspace, protocolVersion, queryOptions, queryStartTime, generatedTimestamp, generatedNowInSeconds);
- this.query = queryString;
- this.values = values;
- }
-
- @Override
- public String toString()
- {
- return String.format("%s%nQuery = %s, Values = %s",
- super.toString(),
- query,
- values.stream().map(ByteBufferUtil::bytesToHex).collect(Collectors.joining(",")));
- }
-
- public Statement toStatement()
- {
- SimpleStatement ss = new SimpleStatement(query, values.toArray());
- ss.setConsistencyLevel(ConsistencyLevel.valueOf(queryOptions.getConsistency().name()));
- ss.setDefaultTimestamp(generatedTimestamp);
- return ss;
- }
-
- public BinLog.ReleaseableWriteMarshallable toMarshallable()
- {
-
- return new FullQueryLogger.Query(query, queryOptions, queryState(), queryStartTime);
- }
-
- public int compareTo(FQLQuery other)
- {
- int cmp = super.compareTo(other);
-
- if (cmp == 0)
- {
- if (other instanceof Batch)
- return -1;
-
- Single singleQuery = (Single) other;
-
- cmp = query.compareTo(singleQuery.query);
- if (cmp == 0)
- {
- if (values.size() != singleQuery.values.size())
- return values.size() - singleQuery.values.size();
- for (int i = 0; i < values.size(); i++)
- {
- cmp = values.get(i).compareTo(singleQuery.values.get(i));
- if (cmp != 0)
- return cmp;
- }
- }
- }
- return cmp;
- }
-
- public boolean equals(Object o)
- {
- if (this == o) return true;
- if (!(o instanceof Single)) return false;
- if (!super.equals(o)) return false;
- Single single = (Single) o;
- return Objects.equals(query, single.query) &&
- Objects.equals(values, single.values);
- }
-
- public int hashCode()
- {
- return Objects.hash(super.hashCode(), query, values);
- }
- }
-
- public static class Batch extends FQLQuery
- {
- public final BatchStatement.Type batchType;
- public final List<Single> queries;
-
- public Batch(String keyspace, int protocolVersion, QueryOptions queryOptions, long queryStartTime, long generatedTimestamp, int generatedNowInSeconds, BatchStatement.Type batchType, List<String> queries, List<List<ByteBuffer>> values)
- {
- super(keyspace, protocolVersion, queryOptions, queryStartTime, generatedTimestamp, generatedNowInSeconds);
- this.batchType = batchType;
- this.queries = new ArrayList<>(queries.size());
- for (int i = 0; i < queries.size(); i++)
- this.queries.add(new Single(keyspace, protocolVersion, queryOptions, queryStartTime, generatedTimestamp, generatedNowInSeconds, queries.get(i), values.get(i)));
- }
-
- public Statement toStatement()
- {
- BatchStatement bs = new BatchStatement(batchType);
-
- for (Single query : queries)
- {
- bs.add(new SimpleStatement(query.query, query.values.toArray()));
- }
- bs.setConsistencyLevel(ConsistencyLevel.valueOf(queryOptions.getConsistency().name()));
- bs.setDefaultTimestamp(generatedTimestamp); // todo: set actual server side generated time
- return bs;
- }
-
- public int compareTo(FQLQuery other)
- {
- int cmp = super.compareTo(other);
-
- if (cmp == 0)
- {
- if (other instanceof Single)
- return 1;
-
- Batch otherBatch = (Batch) other;
- if (queries.size() != otherBatch.queries.size())
- return queries.size() - otherBatch.queries.size();
- for (int i = 0; i < queries.size(); i++)
- {
- cmp = queries.get(i).compareTo(otherBatch.queries.get(i));
- if (cmp != 0)
- return cmp;
- }
- }
- return cmp;
- }
-
- public BinLog.ReleaseableWriteMarshallable toMarshallable()
- {
- List<String> queryStrings = new ArrayList<>();
- List<List<ByteBuffer>> values = new ArrayList<>();
- for (Single q : queries)
- {
- queryStrings.add(q.query);
- values.add(q.values);
- }
- return new FullQueryLogger.Batch(org.apache.cassandra.cql3.statements.BatchStatement.Type.valueOf(batchType.name()), queryStrings, values, queryOptions, queryState(), queryStartTime);
- }
-
- public String toString()
- {
- StringBuilder sb = new StringBuilder(super.toString()).append("\nbatch: ").append(batchType).append('\n');
- for (Single q : queries)
- sb.append(q.toString()).append('\n');
- sb.append("end batch");
- return sb.toString();
- }
-
- public boolean equals(Object o)
- {
- if (this == o) return true;
- if (!(o instanceof Batch)) return false;
- if (!super.equals(o)) return false;
- Batch batch = (Batch) o;
- return batchType == batch.batchType &&
- Objects.equals(queries, batch.queries);
- }
-
- public int hashCode()
- {
- return Objects.hash(super.hashCode(), batchType, queries);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/src/java/org/apache/cassandra/tools/fqltool/FQLQueryIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/fqltool/FQLQueryIterator.java b/src/java/org/apache/cassandra/tools/fqltool/FQLQueryIterator.java
deleted file mode 100644
index 390a52e..0000000
--- a/src/java/org/apache/cassandra/tools/fqltool/FQLQueryIterator.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.tools.fqltool;
-
-import java.util.PriorityQueue;
-
-import net.openhft.chronicle.queue.ExcerptTailer;
-import org.apache.cassandra.utils.AbstractIterator;
-
-public class FQLQueryIterator extends AbstractIterator<FQLQuery>
-{
- // use a priority queue to be able to sort the head of the query logs in memory
- private final PriorityQueue<FQLQuery> pq;
- private final ExcerptTailer tailer;
- private final FQLQueryReader reader;
-
- /**
- * Create an iterator over the FQLQueries in tailer
- *
- * Reads up to readAhead queries in to memory to be able to sort them (the files are mostly sorted already)
- */
- public FQLQueryIterator(ExcerptTailer tailer, int readAhead)
- {
- assert readAhead > 0 : "readAhead needs to be > 0";
- reader = new FQLQueryReader();
- this.tailer = tailer;
- pq = new PriorityQueue<>(readAhead);
- for (int i = 0; i < readAhead; i++)
- {
- FQLQuery next = readNext();
- if (next != null)
- pq.add(next);
- else
- break;
- }
- }
-
- protected FQLQuery computeNext()
- {
- FQLQuery q = pq.poll();
- if (q == null)
- return endOfData();
- FQLQuery next = readNext();
- if (next != null)
- pq.add(next);
- return q;
- }
-
- private FQLQuery readNext()
- {
- if (tailer.readDocument(reader))
- return reader.getQuery();
- return null;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/src/java/org/apache/cassandra/tools/fqltool/FQLQueryReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/fqltool/FQLQueryReader.java b/src/java/org/apache/cassandra/tools/fqltool/FQLQueryReader.java
deleted file mode 100644
index af77c59..0000000
--- a/src/java/org/apache/cassandra/tools/fqltool/FQLQueryReader.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.tools.fqltool;
-
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import com.datastax.driver.core.BatchStatement;
-import io.netty.buffer.Unpooled;
-import net.openhft.chronicle.core.io.IORuntimeException;
-import net.openhft.chronicle.wire.ReadMarshallable;
-import net.openhft.chronicle.wire.ValueIn;
-import net.openhft.chronicle.wire.WireIn;
-import org.apache.cassandra.cql3.QueryOptions;
-import org.apache.cassandra.transport.ProtocolVersion;
-
-import static org.apache.cassandra.audit.FullQueryLogger.GENERATED_NOW_IN_SECONDS;
-import static org.apache.cassandra.audit.FullQueryLogger.GENERATED_TIMESTAMP;
-import static org.apache.cassandra.audit.FullQueryLogger.KEYSPACE;
-import static org.apache.cassandra.audit.FullQueryLogger.PROTOCOL_VERSION;
-import static org.apache.cassandra.audit.FullQueryLogger.QUERY_OPTIONS;
-import static org.apache.cassandra.audit.FullQueryLogger.QUERY_START_TIME;
-import static org.apache.cassandra.audit.FullQueryLogger.TYPE;
-import static org.apache.cassandra.audit.FullQueryLogger.VERSION;
-import static org.apache.cassandra.audit.FullQueryLogger.BATCH;
-import static org.apache.cassandra.audit.FullQueryLogger.BATCH_TYPE;
-import static org.apache.cassandra.audit.FullQueryLogger.QUERIES;
-import static org.apache.cassandra.audit.FullQueryLogger.QUERY;
-import static org.apache.cassandra.audit.FullQueryLogger.SINGLE_QUERY;
-import static org.apache.cassandra.audit.FullQueryLogger.VALUES;
-
-public class FQLQueryReader implements ReadMarshallable
-{
- private FQLQuery query;
-
- public void readMarshallable(WireIn wireIn) throws IORuntimeException
- {
- int currentVersion = wireIn.read(VERSION).int16();
- String type = wireIn.read(TYPE).text();
- long queryStartTime = wireIn.read(QUERY_START_TIME).int64();
- int protocolVersion = wireIn.read(PROTOCOL_VERSION).int32();
- QueryOptions queryOptions = QueryOptions.codec.decode(Unpooled.wrappedBuffer(wireIn.read(QUERY_OPTIONS).bytes()), ProtocolVersion.decode(protocolVersion));
- long generatedTimestamp = wireIn.read(GENERATED_TIMESTAMP).int64();
- int generatedNowInSeconds = wireIn.read(GENERATED_NOW_IN_SECONDS).int32();
- String keyspace = wireIn.read(KEYSPACE).text();
-
- switch (type)
- {
- case SINGLE_QUERY:
- String queryString = wireIn.read(QUERY).text();
- query = new FQLQuery.Single(keyspace,
- protocolVersion,
- queryOptions,
- queryStartTime,
- generatedTimestamp,
- generatedNowInSeconds,
- queryString,
- queryOptions.getValues());
- break;
- case BATCH:
- BatchStatement.Type batchType = BatchStatement.Type.valueOf(wireIn.read(BATCH_TYPE).text());
- ValueIn in = wireIn.read(QUERIES);
- int queryCount = in.int32();
-
- List<String> queries = new ArrayList<>(queryCount);
- for (int i = 0; i < queryCount; i++)
- queries.add(in.text());
- in = wireIn.read(VALUES);
- int valueCount = in.int32();
- List<List<ByteBuffer>> values = new ArrayList<>(valueCount);
- for (int ii = 0; ii < valueCount; ii++)
- {
- List<ByteBuffer> subValues = new ArrayList<>();
- values.add(subValues);
- int numSubValues = in.int32();
- for (int zz = 0; zz < numSubValues; zz++)
- subValues.add(ByteBuffer.wrap(in.bytes()));
- }
- query = new FQLQuery.Batch(keyspace,
- protocolVersion,
- queryOptions,
- queryStartTime,
- generatedTimestamp,
- generatedNowInSeconds,
- batchType,
- queries,
- values);
- break;
- default:
- throw new RuntimeException("Unknown type: " + type);
- }
- }
-
- public FQLQuery getQuery()
- {
- return query;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/src/java/org/apache/cassandra/tools/fqltool/QueryReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/fqltool/QueryReplayer.java b/src/java/org/apache/cassandra/tools/fqltool/QueryReplayer.java
deleted file mode 100644
index 0c8382f..0000000
--- a/src/java/org/apache/cassandra/tools/fqltool/QueryReplayer.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.tools.fqltool;
-
-import java.io.Closeable;
-import java.io.File;
-import java.io.PrintStream;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-
-import javax.annotation.Nullable;
-
-import com.google.common.util.concurrent.FluentFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
-
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.Timer;
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.ConsistencyLevel;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.Statement;
-import org.apache.cassandra.utils.FBUtilities;
-
-public class QueryReplayer implements Closeable
-{
- private static final int PRINT_RATE = 5000;
- private final ExecutorService es = Executors.newFixedThreadPool(1);
- private final Iterator<List<FQLQuery>> queryIterator;
- private final List<Cluster> targetClusters;
- private final List<Predicate<FQLQuery>> filters;
- private final List<Session> sessions;
- private final ResultHandler resultHandler;
- private final MetricRegistry metrics = new MetricRegistry();
- private final boolean debug;
- private final PrintStream out;
-
- public QueryReplayer(Iterator<List<FQLQuery>> queryIterator,
- List<String> targetHosts,
- List<File> resultPaths,
- List<Predicate<FQLQuery>> filters,
- PrintStream out,
- String queryFilePathString,
- boolean debug)
- {
- this.queryIterator = queryIterator;
- targetClusters = targetHosts.stream().map(h -> Cluster.builder().addContactPoint(h).build()).collect(Collectors.toList());
- this.filters = filters;
- sessions = targetClusters.stream().map(Cluster::connect).collect(Collectors.toList());
- File queryFilePath = queryFilePathString != null ? new File(queryFilePathString) : null;
- resultHandler = new ResultHandler(targetHosts, resultPaths, queryFilePath);
- this.debug = debug;
- this.out = out;
- }
-
- public void replay()
- {
- while (queryIterator.hasNext())
- {
- List<FQLQuery> queries = queryIterator.next();
- for (FQLQuery query : queries)
- {
- if (filters.stream().anyMatch(f -> !f.test(query)))
- continue;
- try (Timer.Context ctx = metrics.timer("queries").time())
- {
- List<ListenableFuture<ResultHandler.ComparableResultSet>> results = new ArrayList<>(sessions.size());
- Statement statement = query.toStatement();
- for (Session session : sessions)
- {
- try
- {
- if (query.keyspace != null && !query.keyspace.equals(session.getLoggedKeyspace()))
- {
- if (debug)
- out.printf("Switching keyspace from %s to %s%n", session.getLoggedKeyspace(), query.keyspace);
- session.execute("USE " + query.keyspace);
- }
- }
- catch (Throwable t)
- {
- out.printf("USE %s failed: %s%n", query.keyspace, t.getMessage());
- }
- if (debug)
- {
- out.println("Executing query:");
- out.println(query);
- }
- ListenableFuture<ResultSet> future = session.executeAsync(statement);
- results.add(handleErrors(future));
- }
-
- ListenableFuture<List<ResultHandler.ComparableResultSet>> resultList = Futures.allAsList(results);
-
- Futures.addCallback(resultList, new FutureCallback<List<ResultHandler.ComparableResultSet>>()
- {
- public void onSuccess(@Nullable List<ResultHandler.ComparableResultSet> resultSets)
- {
- // note that the order of resultSets is signifcant here - resultSets.get(x) should
- // be the result from a query against targetHosts.get(x)
- resultHandler.handleResults(query, resultSets);
- }
-
- public void onFailure(Throwable throwable)
- {
- throw new AssertionError("Errors should be handled in FQLQuery.execute", throwable);
- }
- }, es);
-
- FBUtilities.waitOnFuture(resultList);
- }
- catch (Throwable t)
- {
- out.printf("QUERY %s got exception: %s", query, t.getMessage());
- }
-
- Timer timer = metrics.timer("queries");
- if (timer.getCount() % PRINT_RATE == 0)
- out.printf("%d queries, rate = %.2f%n", timer.getCount(), timer.getOneMinuteRate());
- }
- }
- }
-
- /**
- * Make sure we catch any query errors
- *
- * On error, this creates a failed ComparableResultSet with the exception set to be able to store
- * this fact in the result file and handle comparison of failed result sets.
- */
- private static ListenableFuture<ResultHandler.ComparableResultSet> handleErrors(ListenableFuture<ResultSet> result)
- {
- FluentFuture<ResultHandler.ComparableResultSet> fluentFuture = FluentFuture.from(result)
- .transform(DriverResultSet::new, MoreExecutors.directExecutor());
- return fluentFuture.catching(Throwable.class, DriverResultSet::failed, MoreExecutors.directExecutor());
- }
-
- public void close()
- {
- sessions.forEach(Session::close);
- targetClusters.forEach(Cluster::close);
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/src/java/org/apache/cassandra/tools/fqltool/ResultComparator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/fqltool/ResultComparator.java b/src/java/org/apache/cassandra/tools/fqltool/ResultComparator.java
deleted file mode 100644
index 4bbaf7a..0000000
--- a/src/java/org/apache/cassandra/tools/fqltool/ResultComparator.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.tools.fqltool;
-
-
-import java.util.List;
-import java.util.Objects;
-import java.util.stream.Collectors;
-
-import com.google.common.collect.Streams;
-
-public class ResultComparator
-{
- /**
- * Compares the rows in rows
- * the row at position x in rows will have come from host at position x in targetHosts
- */
- public boolean compareRows(List<String> targetHosts, FQLQuery query, List<ResultHandler.ComparableRow> rows)
- {
- if (rows.size() < 2 || rows.stream().allMatch(Objects::isNull))
- return true;
-
- if (rows.stream().anyMatch(Objects::isNull))
- {
- handleMismatch(targetHosts, query, rows);
- return false;
- }
-
- ResultHandler.ComparableRow ref = rows.get(0);
- boolean equal = true;
- for (int i = 1; i < rows.size(); i++)
- {
- ResultHandler.ComparableRow compare = rows.get(i);
- if (!ref.equals(compare))
- equal = false;
- }
- if (!equal)
- handleMismatch(targetHosts, query, rows);
- return equal;
- }
-
- /**
- * Compares the column definitions
- *
- * the column definitions at position x in cds will have come from host at position x in targetHosts
- */
- public boolean compareColumnDefinitions(List<String> targetHosts, FQLQuery query, List<ResultHandler.ComparableColumnDefinitions> cds)
- {
- if (cds.size() < 2)
- return true;
-
- boolean equal = true;
- List<ResultHandler.ComparableDefinition> refDefs = cds.get(0).asList();
- for (int i = 1; i < cds.size(); i++)
- {
- List<ResultHandler.ComparableDefinition> toCompare = cds.get(i).asList();
- if (!refDefs.equals(toCompare))
- equal = false;
- }
- if (!equal)
- handleColumnDefMismatch(targetHosts, query, cds);
- return equal;
- }
-
- private void handleMismatch(List<String> targetHosts, FQLQuery query, List<ResultHandler.ComparableRow> rows)
- {
- System.out.println("MISMATCH:");
- System.out.println("Query = " + query);
- System.out.println("Results:");
- System.out.println(Streams.zip(rows.stream(), targetHosts.stream(), (r, host) -> String.format("%s: %s%n", host, r == null ? "null" : r)).collect(Collectors.joining()));
- }
-
- private void handleColumnDefMismatch(List<String> targetHosts, FQLQuery query, List<ResultHandler.ComparableColumnDefinitions> cds)
- {
- System.out.println("COLUMN DEFINITION MISMATCH:");
- System.out.println("Query = " + query);
- System.out.println("Results: ");
- System.out.println(Streams.zip(cds.stream(), targetHosts.stream(), (cd, host) -> String.format("%s: %s%n", host, columnDefinitionsString(cd))).collect(Collectors.joining()));
- }
-
- private String columnDefinitionsString(ResultHandler.ComparableColumnDefinitions cd)
- {
- StringBuilder sb = new StringBuilder();
- if (cd == null)
- sb.append("NULL");
- else if (cd.wasFailed())
- sb.append("FAILED");
- else
- {
- for (ResultHandler.ComparableDefinition def : cd)
- {
- sb.append(def.toString());
- }
- }
- return sb.toString();
- }
-
-
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/src/java/org/apache/cassandra/tools/fqltool/ResultHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/fqltool/ResultHandler.java b/src/java/org/apache/cassandra/tools/fqltool/ResultHandler.java
deleted file mode 100644
index c769231..0000000
--- a/src/java/org/apache/cassandra/tools/fqltool/ResultHandler.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.tools.fqltool;
-
-import java.io.File;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Objects;
-import java.util.stream.Collectors;
-
-import com.google.common.annotations.VisibleForTesting;
-
-public class ResultHandler
-{
- private final ResultStore resultStore;
- private final ResultComparator resultComparator;
- private final List<String> targetHosts;
-
- public ResultHandler(List<String> targetHosts, List<File> resultPaths, File queryFilePath)
- {
- this.targetHosts = targetHosts;
- resultStore = resultPaths != null ? new ResultStore(resultPaths, queryFilePath) : null;
- resultComparator = new ResultComparator();
- }
-
- /**
- * Since we can't iterate a ResultSet more than once, and we don't want to keep the entire result set in memory
- * we feed the rows one-by-one to resultComparator and resultStore.
- *
- * results.get(x) should be the results from executing query against targetHosts.get(x)
- */
- public void handleResults(FQLQuery query, List<ComparableResultSet> results)
- {
- for (int i = 0; i < targetHosts.size(); i++)
- {
- if (results.get(i).wasFailed())
- {
- System.out.println("Query against "+targetHosts.get(i)+" failure:");
- System.out.println(query);
- System.out.println("Message: "+results.get(i).getFailureException().getMessage());
- }
- }
-
- List<ComparableColumnDefinitions> columnDefinitions = results.stream().map(ComparableResultSet::getColumnDefinitions).collect(Collectors.toList());
- resultComparator.compareColumnDefinitions(targetHosts, query, columnDefinitions);
- if (resultStore != null)
- resultStore.storeColumnDefinitions(query, columnDefinitions);
- List<Iterator<ComparableRow>> iters = results.stream().map(Iterable::iterator).collect(Collectors.toList());
-
- while (true)
- {
- List<ComparableRow> rows = rows(iters);
- resultComparator.compareRows(targetHosts, query, rows);
- if (resultStore != null)
- resultStore.storeRows(rows);
- // all rows being null marks end of all resultsets, we need to call compareRows
- // and storeRows once with everything null to mark that fact
- if (rows.stream().allMatch(Objects::isNull))
- return;
- }
- }
-
- /**
- * Get the first row from each of the iterators, if the iterator has run out, null will mark that in the list
- */
- @VisibleForTesting
- public static List<ComparableRow> rows(List<Iterator<ComparableRow>> iters)
- {
- List<ComparableRow> rows = new ArrayList<>(iters.size());
- for (Iterator<ComparableRow> iter : iters)
- {
- if (iter.hasNext())
- rows.add(iter.next());
- else
- rows.add(null);
- }
- return rows;
- }
-
- public interface ComparableResultSet extends Iterable<ComparableRow>
- {
- public ComparableColumnDefinitions getColumnDefinitions();
- public boolean wasFailed();
- public Throwable getFailureException();
- }
-
- public interface ComparableColumnDefinitions extends Iterable<ComparableDefinition>
- {
- public List<ComparableDefinition> asList();
- public boolean wasFailed();
- public int size();
- }
-
- public interface ComparableDefinition
- {
- public String getType();
- public String getName();
- }
-
- public interface ComparableRow
- {
- public ByteBuffer getBytesUnsafe(int i);
- public ComparableColumnDefinitions getColumnDefinitions();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/src/java/org/apache/cassandra/tools/fqltool/ResultStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/fqltool/ResultStore.java b/src/java/org/apache/cassandra/tools/fqltool/ResultStore.java
deleted file mode 100644
index 6d6aaac..0000000
--- a/src/java/org/apache/cassandra/tools/fqltool/ResultStore.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.tools.fqltool;
-
-
-import java.io.File;
-import java.nio.ByteBuffer;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import net.openhft.chronicle.bytes.BytesStore;
-import net.openhft.chronicle.core.io.Closeable;
-import net.openhft.chronicle.queue.ChronicleQueue;
-import net.openhft.chronicle.queue.ChronicleQueueBuilder;
-import net.openhft.chronicle.queue.ExcerptAppender;
-import net.openhft.chronicle.wire.ValueOut;
-import org.apache.cassandra.utils.binlog.BinLog;
-
-/**
- * see FQLReplayTest#readResultFile for how to read files produced by this class
- */
-public class ResultStore
-{
- private final List<ChronicleQueue> queues;
- private final List<ExcerptAppender> appenders;
- private final ChronicleQueue queryStoreQueue;
- private final ExcerptAppender queryStoreAppender;
- private final Set<Integer> finishedHosts = new HashSet<>();
-
- public ResultStore(List<File> resultPaths, File queryFilePath)
- {
- queues = resultPaths.stream().map(path -> ChronicleQueueBuilder.single(path).build()).collect(Collectors.toList());
- appenders = queues.stream().map(ChronicleQueue::acquireAppender).collect(Collectors.toList());
- queryStoreQueue = queryFilePath != null ? ChronicleQueueBuilder.single(queryFilePath).build() : null;
- queryStoreAppender = queryStoreQueue != null ? queryStoreQueue.acquireAppender() : null;
- }
-
- /**
- * Store the column definitions in cds
- *
- * the ColumnDefinitions at position x will get stored by the appender at position x
- *
- * Calling this method indicates that we are starting a new result set from a query, it must be called before
- * calling storeRows.
- *
- */
- public void storeColumnDefinitions(FQLQuery query, List<ResultHandler.ComparableColumnDefinitions> cds)
- {
- finishedHosts.clear();
- if (queryStoreAppender != null)
- {
- BinLog.ReleaseableWriteMarshallable writeMarshallableQuery = query.toMarshallable();
- queryStoreAppender.writeDocument(writeMarshallableQuery);
- writeMarshallableQuery.release();
- }
- for (int i = 0; i < cds.size(); i++)
- {
- ResultHandler.ComparableColumnDefinitions cd = cds.get(i);
- appenders.get(i).writeDocument(wire ->
- {
- if (!cd.wasFailed())
- {
- wire.write("type").text("column_definitions");
- wire.write("column_count").int32(cd.size());
- for (ResultHandler.ComparableDefinition d : cd.asList())
- {
- ValueOut vo = wire.write("column_definition");
- vo.text(d.getName());
- vo.text(d.getType());
- }
- }
- else
- {
- wire.write("type").text("query_failed");
- }
- });
- }
- }
-
- /**
- * Store rows
- *
- * the row at position x will get stored by appender at position x
- *
- * Before calling this for a new result set, storeColumnDefinitions must be called.
- */
- public void storeRows(List<ResultHandler.ComparableRow> rows)
- {
- for (int i = 0; i < rows.size(); i++)
- {
- ResultHandler.ComparableRow row = rows.get(i);
- if (row == null && !finishedHosts.contains(i))
- {
- appenders.get(i).writeDocument(wire -> wire.write("type").text("end_resultset"));
- finishedHosts.add(i);
- }
- else if (row != null)
- {
- appenders.get(i).writeDocument(wire ->
- {
- {
- wire.write("type").text("row");
- wire.write("row_column_count").int32(row.getColumnDefinitions().size());
- for (int jj = 0; jj < row.getColumnDefinitions().size(); jj++)
- {
- ByteBuffer bb = row.getBytesUnsafe(jj);
- if (bb != null)
- wire.write("column").bytes(BytesStore.wrap(bb));
- else
- wire.write("column").bytes("NULL".getBytes());
- }
- }
- });
- }
- }
- }
-
- public void close()
- {
- queues.forEach(Closeable::close);
- if (queryStoreQueue != null)
- queryStoreQueue.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/src/java/org/apache/cassandra/tools/fqltool/commands/Dump.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/fqltool/commands/Dump.java b/src/java/org/apache/cassandra/tools/fqltool/commands/Dump.java
deleted file mode 100644
index 5c23d3e..0000000
--- a/src/java/org/apache/cassandra/tools/fqltool/commands/Dump.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.tools.fqltool.commands;
-
-import java.io.File;
-import java.nio.BufferUnderflowException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.stream.Collectors;
-
-import io.airlift.airline.Arguments;
-import io.airlift.airline.Command;
-import io.airlift.airline.Option;
-import io.netty.buffer.Unpooled;
-import net.openhft.chronicle.bytes.Bytes;
-import net.openhft.chronicle.queue.ChronicleQueue;
-import net.openhft.chronicle.queue.ChronicleQueueBuilder;
-import net.openhft.chronicle.queue.ExcerptTailer;
-import net.openhft.chronicle.queue.RollCycles;
-import net.openhft.chronicle.threads.Pauser;
-import net.openhft.chronicle.wire.ReadMarshallable;
-import net.openhft.chronicle.wire.ValueIn;
-import net.openhft.chronicle.wire.WireIn;
-import org.apache.cassandra.audit.FullQueryLogger;
-import org.apache.cassandra.cql3.QueryOptions;
-import org.apache.cassandra.transport.ProtocolVersion;
-
-/**
- * Dump the contents of a list of paths containing full query logs
- */
-@Command(name = "dump", description = "Dump the contents of a full query log")
-public class Dump implements Runnable
-{
- static final char[] HEXI_DECIMAL = "0123456789ABCDEF".toCharArray();
-
- @Arguments(usage = "<path1> [<path2>...<pathN>]", description = "Path containing the full query logs to dump.", required = true)
- private List<String> arguments = new ArrayList<>();
-
- @Option(title = "roll_cycle", name = {"--roll-cycle"}, description = "How often to roll the log file was rolled. May be necessary for Chronicle to correctly parse file names. (MINUTELY, HOURLY, DAILY). Default HOURLY.")
- private String rollCycle = "HOURLY";
-
- @Option(title = "follow", name = {"--follow"}, description = "Upon reacahing the end of the log continue indefinitely waiting for more records")
- private boolean follow = false;
-
- @Override
- public void run()
- {
- dump(arguments, rollCycle, follow);
- }
-
- public static void dump(List<String> arguments, String rollCycle, boolean follow)
- {
- StringBuilder sb = new StringBuilder();
- ReadMarshallable reader = wireIn ->
- {
- sb.setLength(0);
-
- int version = wireIn.read(FullQueryLogger.VERSION).int16();
- if (version != FullQueryLogger.CURRENT_VERSION)
- throw new UnsupportedOperationException("Full query log of unexpected version " + version + " encountered");
-
- String type = wireIn.read(FullQueryLogger.TYPE).text();
- sb.append("Type: ")
- .append(type)
- .append(System.lineSeparator());
-
- long queryStartTime = wireIn.read(FullQueryLogger.QUERY_START_TIME).int64();
- sb.append("Query start time: ")
- .append(queryStartTime)
- .append(System.lineSeparator());
-
- int protocolVersion = wireIn.read(FullQueryLogger.PROTOCOL_VERSION).int32();
- sb.append("Protocol version: ")
- .append(protocolVersion)
- .append(System.lineSeparator());
-
- QueryOptions options =
- QueryOptions.codec.decode(Unpooled.wrappedBuffer(wireIn.read(FullQueryLogger.QUERY_OPTIONS).bytes()),
- ProtocolVersion.decode(protocolVersion));
-
- long generatedTimestamp = wireIn.read(FullQueryLogger.GENERATED_TIMESTAMP).int64();
- sb.append("Generated timestamp:")
- .append(generatedTimestamp)
- .append(System.lineSeparator());
-
- int generatedNowInSeconds = wireIn.read(FullQueryLogger.GENERATED_NOW_IN_SECONDS).int32();
- sb.append("Generated nowInSeconds:")
- .append(generatedNowInSeconds)
- .append(System.lineSeparator());
-
- switch (type)
- {
- case (FullQueryLogger.SINGLE_QUERY):
- dumpQuery(options, wireIn, sb);
- break;
-
- case (FullQueryLogger.BATCH):
- dumpBatch(options, wireIn, sb);
- break;
-
- default:
- throw new UnsupportedOperationException("Log entry of unsupported type " + type);
- }
-
- System.out.print(sb.toString());
- System.out.flush();
- };
-
- //Backoff strategy for spinning on the queue, not aggressive at all as this doesn't need to be low latency
- Pauser pauser = Pauser.millis(100);
- List<ChronicleQueue> queues = arguments.stream().distinct().map(path -> ChronicleQueueBuilder.single(new File(path)).readOnly(true).rollCycle(RollCycles.valueOf(rollCycle)).build()).collect(Collectors.toList());
- List<ExcerptTailer> tailers = queues.stream().map(ChronicleQueue::createTailer).collect(Collectors.toList());
- boolean hadWork = true;
- while (hadWork)
- {
- hadWork = false;
- for (ExcerptTailer tailer : tailers)
- {
- while (tailer.readDocument(reader))
- {
- hadWork = true;
- }
- }
-
- if (follow)
- {
- if (!hadWork)
- {
- //Chronicle queue doesn't support blocking so use this backoff strategy
- pauser.pause();
- }
- //Don't terminate the loop even if there wasn't work
- hadWork = true;
- }
- }
- }
-
- private static void dumpQuery(QueryOptions options, WireIn wireIn, StringBuilder sb)
- {
- sb.append("Query: ")
- .append(wireIn.read(FullQueryLogger.QUERY).text())
- .append(System.lineSeparator());
-
- List<ByteBuffer> values = options.getValues() != null
- ? options.getValues()
- : Collections.emptyList();
-
- sb.append("Values: ")
- .append(System.lineSeparator());
- appendValuesToStringBuilder(values, sb);
- sb.append(System.lineSeparator());
- }
-
- private static void dumpBatch(QueryOptions options, WireIn wireIn, StringBuilder sb)
- {
- sb.append("Batch type: ")
- .append(wireIn.read(FullQueryLogger.BATCH_TYPE).text())
- .append(System.lineSeparator());
-
- ValueIn in = wireIn.read(FullQueryLogger.QUERIES);
- int numQueries = in.int32();
- List<String> queries = new ArrayList<>(numQueries);
- for (int i = 0; i < numQueries; i++)
- queries.add(in.text());
-
- in = wireIn.read(FullQueryLogger.VALUES);
- int numValues = in.int32();
-
- for (int i = 0; i < numValues; i++)
- {
- int numSubValues = in.int32();
- List<ByteBuffer> subValues = new ArrayList<>(numSubValues);
- for (int j = 0; j < numSubValues; j++)
- subValues.add(ByteBuffer.wrap(in.bytes()));
-
- sb.append("Query: ")
- .append(queries.get(i))
- .append(System.lineSeparator());
-
- sb.append("Values: ")
- .append(System.lineSeparator());
- appendValuesToStringBuilder(subValues, sb);
- }
-
- sb.append(System.lineSeparator());
- }
-
- private static void appendValuesToStringBuilder(List<ByteBuffer> values, StringBuilder sb)
- {
- boolean first = true;
- for (ByteBuffer value : values)
- {
- Bytes bytes = Bytes.wrapForRead(value);
- long maxLength2 = Math.min(1024, bytes.readLimit() - bytes.readPosition());
- toHexString(bytes, bytes.readPosition(), maxLength2, sb);
- if (maxLength2 < bytes.readLimit() - bytes.readPosition())
- {
- sb.append("... truncated").append(System.lineSeparator());
- }
-
- if (first)
- {
- first = false;
- }
- else
- {
- sb.append("-----").append(System.lineSeparator());
- }
- }
- }
-
- //This is from net.openhft.chronicle.bytes, need to pass in the StringBuilder so had to copy
- /*
- * Copyright 2016 higherfrequencytrading.com
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- /**
- * display the hex data of {@link Bytes} from the position() to the limit()
- *
- * @param bytes the buffer you wish to toString()
- * @return hex representation of the buffer, from example [0D ,OA, FF]
- */
- public static String toHexString(final Bytes bytes, long offset, long len, StringBuilder builder)
- throws BufferUnderflowException
- {
- if (len == 0)
- return "";
-
- int width = 16;
- int[] lastLine = new int[width];
- String sep = "";
- long position = bytes.readPosition();
- long limit = bytes.readLimit();
-
- try {
- bytes.readPositionRemaining(offset, len);
-
- long start = offset / width * width;
- long end = (offset + len + width - 1) / width * width;
- for (long i = start; i < end; i += width) {
- // check for duplicate rows
- if (i + width < end) {
- boolean same = true;
-
- for (int j = 0; j < width && i + j < offset + len; j++) {
- int ch = bytes.readUnsignedByte(i + j);
- same &= (ch == lastLine[j]);
- lastLine[j] = ch;
- }
- if (i > start && same) {
- sep = "........\n";
- continue;
- }
- }
- builder.append(sep);
- sep = "";
- String str = Long.toHexString(i);
- for (int j = str.length(); j < 8; j++)
- builder.append('0');
- builder.append(str);
- for (int j = 0; j < width; j++) {
- if (j == width / 2)
- builder.append(' ');
- if (i + j < offset || i + j >= offset + len) {
- builder.append(" ");
-
- } else {
- builder.append(' ');
- int ch = bytes.readUnsignedByte(i + j);
- builder.append(HEXI_DECIMAL[ch >> 4]);
- builder.append(HEXI_DECIMAL[ch & 15]);
- }
- }
- builder.append(' ');
- for (int j = 0; j < width; j++) {
- if (j == width / 2)
- builder.append(' ');
- if (i + j < offset || i + j >= offset + len) {
- builder.append(' ');
-
- } else {
- int ch = bytes.readUnsignedByte(i + j);
- if (ch < ' ' || ch > 126)
- ch = '\u00B7';
- builder.append((char) ch);
- }
- }
- builder.append("\n");
- }
- return builder.toString();
- } finally {
- bytes.readLimit(limit);
- bytes.readPosition(position);
- }
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org