You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2019/02/25 14:19:17 UTC

[cassandra] branch trunk updated: FQLTool replay improvements

This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 73ff199  FQLTool replay improvements
73ff199 is described below

commit 73ff199df34805355db0870b923c94f6b11563e0
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Mon Oct 15 17:00:31 2018 +0200

    FQLTool replay improvements
    
    Patch by marcuse; reviewed by Sam Tunnicliffe for CASSANDRA-14850
---
 .../src/org/apache/cassandra/fqltool/FQLQuery.java |  12 +-
 .../apache/cassandra/fqltool/MismatchListener.java |  28 ++++
 .../apache/cassandra/fqltool/QueryReplayer.java    | 162 +++++++++++++++++----
 .../apache/cassandra/fqltool/ResultComparator.java |  64 ++++++--
 .../apache/cassandra/fqltool/ResultHandler.java    |  16 +-
 .../apache/cassandra/fqltool/commands/Replay.java  |  11 +-
 .../apache/cassandra/fqltool/FQLReplayTest.java    |  42 ++++++
 7 files changed, 271 insertions(+), 64 deletions(-)

diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQuery.java b/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQuery.java
index 2862e0f..8683d15 100644
--- a/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQuery.java
+++ b/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQuery.java
@@ -22,9 +22,6 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
 import com.google.common.primitives.Longs;
 
 import com.datastax.driver.core.BatchStatement;
@@ -35,7 +32,6 @@ import org.apache.cassandra.audit.FullQueryLogger;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
-import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.binlog.BinLog;
 
 public abstract class FQLQuery implements Comparable<FQLQuery>
@@ -117,10 +113,10 @@ public abstract class FQLQuery implements Comparable<FQLQuery>
         @Override
         public String toString()
         {
-            return String.format("%s%nQuery = %s, Values = %s",
+            return String.format("%s: Query: [%s], valuecount : %d",
                                  super.toString(),
                                  query,
-                                 values.stream().map(ByteBufferUtil::bytesToHex).collect(Collectors.joining(",")));
+                                 values.size());
         }
 
         public Statement toStatement()
@@ -240,9 +236,9 @@ public abstract class FQLQuery implements Comparable<FQLQuery>
 
         public String toString()
         {
-            StringBuilder sb = new StringBuilder(super.toString()).append("\nbatch: ").append(batchType).append('\n');
+            StringBuilder sb = new StringBuilder(super.toString()).append(" batch: ").append(batchType).append(':');
             for (Single q : queries)
-                sb.append(q.toString()).append('\n');
+                sb.append(q.toString()).append(',');
             sb.append("end batch");
             return sb.toString();
         }
diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/MismatchListener.java b/tools/fqltool/src/org/apache/cassandra/fqltool/MismatchListener.java
new file mode 100644
index 0000000..70a4b11
--- /dev/null
+++ b/tools/fqltool/src/org/apache/cassandra/fqltool/MismatchListener.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.fqltool;
+
+import java.util.List;
+import java.util.UUID;
+
+public interface MismatchListener
+{
+    void mismatch(UUID mismatchUUID, List<String> targetHosts, FQLQuery query, List<ResultHandler.ComparableRow> rows);
+    void columnDefMismatch(UUID mismatchUUID, List<String> targetHosts, FQLQuery query, List<ResultHandler.ComparableColumnDefinitions> cds);
+}
diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/QueryReplayer.java b/tools/fqltool/src/org/apache/cassandra/fqltool/QueryReplayer.java
index d8653e5..b17ec53 100644
--- a/tools/fqltool/src/org/apache/cassandra/fqltool/QueryReplayer.java
+++ b/tools/fqltool/src/org/apache/cassandra/fqltool/QueryReplayer.java
@@ -21,23 +21,24 @@ package org.apache.cassandra.fqltool;
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
-import java.io.PrintStream;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
-//import javax.annotation.Nullable;
-
-import com.google.common.util.concurrent.FluentFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.Timer;
 import com.datastax.driver.core.Cluster;
@@ -48,33 +49,51 @@ import org.apache.cassandra.utils.FBUtilities;
 
 public class QueryReplayer implements Closeable
 {
+    private static final Logger logger = LoggerFactory.getLogger(QueryReplayer.class);
     private static final int PRINT_RATE = 5000;
     private final ExecutorService es = Executors.newFixedThreadPool(1);
     private final Iterator<List<FQLQuery>> queryIterator;
-    private final List<Cluster> targetClusters;
     private final List<Predicate<FQLQuery>> filters;
     private final List<Session> sessions;
     private final ResultHandler resultHandler;
     private final MetricRegistry metrics = new MetricRegistry();
-    private final boolean debug;
-    private final PrintStream out;
+    private final SessionProvider sessionProvider;
 
+    /**
+     * @param queryIterator the queries to be replayed
+     * @param targetHosts hosts to connect to, in the format "<user>:<password>@<host>:<port>" where only <host> is mandatory, port defaults to 9042
+     * @param resultPaths where to write the results of the queries, for later comparisons, size should be the same as the number of iterators
+     * @param filters query filters
+     * @param queryFilePathString where to store the queries executed
+     */
+    public QueryReplayer(Iterator<List<FQLQuery>> queryIterator,
+                         List<String> targetHosts,
+                         List<File> resultPaths,
+                         List<Predicate<FQLQuery>> filters,
+                         String queryFilePathString)
+    {
+        this(queryIterator, targetHosts, resultPaths, filters, queryFilePathString, new DefaultSessionProvider(), null);
+    }
+
+    /**
+     * Constructor public to allow external users to build their own session provider
+     *
+     * sessionProvider takes the hosts in targetHosts and creates one session per entry
+     */
     public QueryReplayer(Iterator<List<FQLQuery>> queryIterator,
                          List<String> targetHosts,
                          List<File> resultPaths,
                          List<Predicate<FQLQuery>> filters,
-                         PrintStream out,
                          String queryFilePathString,
-                         boolean debug)
+                         SessionProvider sessionProvider,
+                         MismatchListener mismatchListener)
     {
+        this.sessionProvider = sessionProvider;
         this.queryIterator = queryIterator;
-        targetClusters = targetHosts.stream().map(h -> Cluster.builder().addContactPoint(h).build()).collect(Collectors.toList());
         this.filters = filters;
-        sessions = targetClusters.stream().map(Cluster::connect).collect(Collectors.toList());
+        sessions = targetHosts.stream().map(sessionProvider::connect).collect(Collectors.toList());
         File queryFilePath = queryFilePathString != null ? new File(queryFilePathString) : null;
-        resultHandler = new ResultHandler(targetHosts, resultPaths, queryFilePath);
-        this.debug = debug;
-        this.out = out;
+        resultHandler = new ResultHandler(targetHosts, resultPaths, queryFilePath, mismatchListener);
     }
 
     public void replay()
@@ -93,11 +112,8 @@ public class QueryReplayer implements Closeable
                     for (Session session : sessions)
                     {
                         maybeSetKeyspace(session, query);
-                        if (debug)
-                        {
-                            out.println("Executing query:");
-                            out.println(query);
-                        }
+                        if (logger.isDebugEnabled())
+                            logger.debug("Executing query: {}", query);
                         ListenableFuture<ResultSet> future = session.executeAsync(statement);
                         results.add(handleErrors(future));
                     }
@@ -106,7 +122,7 @@ public class QueryReplayer implements Closeable
 
                     Futures.addCallback(resultList, new FutureCallback<List<ResultHandler.ComparableResultSet>>()
                     {
-                        public void onSuccess(/*@Nullable */List<ResultHandler.ComparableResultSet> resultSets)
+                        public void onSuccess(List<ResultHandler.ComparableResultSet> resultSets)
                         {
                             // note that the order of resultSets is signifcant here - resultSets.get(x) should
                             // be the result from a query against targetHosts.get(x)
@@ -123,12 +139,12 @@ public class QueryReplayer implements Closeable
                 }
                 catch (Throwable t)
                 {
-                    out.printf("QUERY %s got exception: %s", query, t.getMessage());
+                    logger.error("QUERY %s got exception: %s", query, t.getMessage());
                 }
 
                 Timer timer = metrics.timer("queries");
                 if (timer.getCount() % PRINT_RATE == 0)
-                    out.printf("%d queries, rate = %.2f%n", timer.getCount(), timer.getOneMinuteRate());
+                    logger.info(String.format("%d queries, rate = %.2f", timer.getCount(), timer.getOneMinuteRate()));
             }
         }
     }
@@ -139,14 +155,14 @@ public class QueryReplayer implements Closeable
         {
             if (query.keyspace() != null && !query.keyspace().equals(session.getLoggedKeyspace()))
             {
-                if (debug)
-                    out.printf("Switching keyspace from %s to %s%n", session.getLoggedKeyspace(), query.keyspace());
+                if (logger.isDebugEnabled())
+                    logger.debug("Switching keyspace from {} to {}", session.getLoggedKeyspace(), query.keyspace());
                 session.execute("USE " + query.keyspace());
             }
         }
         catch (Throwable t)
         {
-            out.printf("USE %s failed: %s%n", query.keyspace(), t.getMessage());
+            logger.error("USE {} failed: {}", query.keyspace(), t.getMessage());
         }
     }
 
@@ -158,15 +174,101 @@ public class QueryReplayer implements Closeable
      */
     private static ListenableFuture<ResultHandler.ComparableResultSet> handleErrors(ListenableFuture<ResultSet> result)
     {
-        FluentFuture<ResultHandler.ComparableResultSet> fluentFuture = FluentFuture.from(result)
-                                                                                   .transform(DriverResultSet::new, MoreExecutors.directExecutor());
-        return fluentFuture.catching(Throwable.class, DriverResultSet::failed, MoreExecutors.directExecutor());
+        ListenableFuture<ResultHandler.ComparableResultSet> res = Futures.transform(result, DriverResultSet::new, MoreExecutors.directExecutor());
+        return Futures.catching(res, Throwable.class, DriverResultSet::failed, MoreExecutors.directExecutor());
     }
 
     public void close() throws IOException
     {
-        sessions.forEach(Session::close);
-        targetClusters.forEach(Cluster::close);
+        es.shutdown();
+        sessionProvider.close();
         resultHandler.close();
     }
+
+    static class ParsedTargetHost
+    {
+        final int port;
+        final String user;
+        final String password;
+        final String host;
+
+        ParsedTargetHost(String host, int port, String user, String password)
+        {
+            this.host = host;
+            this.port = port;
+            this.user = user;
+            this.password = password;
+        }
+
+        static ParsedTargetHost fromString(String s)
+        {
+            String [] userInfoHostPort = s.split("@");
+
+            String hostPort = null;
+            String user = null;
+            String password = null;
+            if (userInfoHostPort.length == 2)
+            {
+                String [] userPassword = userInfoHostPort[0].split(":");
+                if (userPassword.length != 2)
+                    throw new RuntimeException("Username provided but no password");
+                hostPort = userInfoHostPort[1];
+                user = userPassword[0];
+                password = userPassword[1];
+            }
+            else if (userInfoHostPort.length == 1)
+                hostPort = userInfoHostPort[0];
+            else
+                throw new RuntimeException("Malformed target host: "+s);
+
+            String[] splitHostPort = hostPort.split(":");
+            int port = 9042;
+            if (splitHostPort.length == 2)
+                port = Integer.parseInt(splitHostPort[1]);
+
+            return new ParsedTargetHost(splitHostPort[0], port, user, password);
+        }
+    }
+
+    public static interface SessionProvider extends Closeable
+    {
+        Session connect(String connectionString);
+        void close();
+    }
+
+    private static final class DefaultSessionProvider implements SessionProvider
+    {
+        private final static Map<String, Session> sessionCache = new HashMap<>();
+
+        public synchronized Session connect(String connectionString)
+        {
+            if (sessionCache.containsKey(connectionString))
+                return sessionCache.get(connectionString);
+            Cluster.Builder builder = Cluster.builder();
+            ParsedTargetHost pth = ParsedTargetHost.fromString(connectionString);
+            builder.addContactPoint(pth.host);
+            builder.withPort(pth.port);
+            if (pth.user != null)
+                builder.withCredentials(pth.user, pth.password);
+            Cluster c = builder.build();
+            sessionCache.put(connectionString, c.connect());
+            return sessionCache.get(connectionString);
+        }
+
+        public void close()
+        {
+            for (Session s : sessionCache.values())
+            {
+                try
+                {
+                    s.close();
+                    s.getCluster().close();
+                }
+                catch (Throwable t)
+                {
+                    logger.error("Could not close connection", t);
+                }
+            }
+        }
+    }
 }
diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/ResultComparator.java b/tools/fqltool/src/org/apache/cassandra/fqltool/ResultComparator.java
index d8d419a..eeebe20 100644
--- a/tools/fqltool/src/org/apache/cassandra/fqltool/ResultComparator.java
+++ b/tools/fqltool/src/org/apache/cassandra/fqltool/ResultComparator.java
@@ -21,12 +21,25 @@ package org.apache.cassandra.fqltool;
 
 import java.util.List;
 import java.util.Objects;
-import java.util.stream.Collectors;
+import java.util.UUID;
 
-import com.google.common.collect.Streams;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ResultComparator
 {
+    private static final Logger logger = LoggerFactory.getLogger(ResultComparator.class);
+    private final MismatchListener mismatchListener;
+
+    public ResultComparator()
+    {
+        this(null);
+    }
+
+    public ResultComparator(MismatchListener mismatchListener)
+    {
+        this.mismatchListener = mismatchListener;
+    }
     /**
      * Compares the rows in rows
      * the row at position x in rows will have come from host at position x in targetHosts
@@ -80,18 +93,46 @@ public class ResultComparator
 
     private void handleMismatch(List<String> targetHosts, FQLQuery query, List<ResultHandler.ComparableRow> rows)
     {
-        System.out.println("MISMATCH:");
-        System.out.println("Query = " + query);
-        System.out.println("Results:");
-        System.out.println(Streams.zip(rows.stream(), targetHosts.stream(), (r, host) -> String.format("%s: %s%n", host, r == null ? "null" : r)).collect(Collectors.joining()));
+        UUID mismatchUUID = UUID.randomUUID();
+        StringBuilder sb = new StringBuilder("{} - MISMATCH Query = {} ");
+        for (int i = 0; i < targetHosts.size(); i++)
+            sb.append("mismatch").append(i)
+              .append('=')
+              .append('"').append(targetHosts.get(i)).append(':').append(rows.get(i)).append('"')
+              .append(',');
+
+        logger.warn(sb.toString(), mismatchUUID, query);
+        try
+        {
+            if (mismatchListener != null)
+                mismatchListener.mismatch(mismatchUUID, targetHosts, query, rows);
+        }
+        catch (Throwable t)
+        {
+            logger.error("ERROR notifying listener", t);
+        }
     }
 
     private void handleColumnDefMismatch(List<String> targetHosts, FQLQuery query, List<ResultHandler.ComparableColumnDefinitions> cds)
     {
-        System.out.println("COLUMN DEFINITION MISMATCH:");
-        System.out.println("Query = " + query);
-        System.out.println("Results: ");
-        System.out.println(Streams.zip(cds.stream(), targetHosts.stream(), (cd, host) -> String.format("%s: %s%n", host, columnDefinitionsString(cd))).collect(Collectors.joining()));
+        UUID mismatchUUID = UUID.randomUUID();
+        StringBuilder sb = new StringBuilder("{} - COLUMN DEFINITION MISMATCH Query = {} ");
+        for (int i = 0; i < targetHosts.size(); i++)
+            sb.append("mismatch").append(i)
+              .append('=')
+              .append('"').append(targetHosts.get(i)).append(':').append(columnDefinitionsString(cds.get(i))).append('"')
+              .append(',');
+
+        logger.warn(sb.toString(), mismatchUUID, query);
+        try
+        {
+            if (mismatchListener != null)
+                mismatchListener.columnDefMismatch(mismatchUUID, targetHosts, query, cds);
+        }
+        catch (Throwable t)
+        {
+            logger.error("ERROR notifying listener", t);
+        }
     }
 
     private String columnDefinitionsString(ResultHandler.ComparableColumnDefinitions cd)
@@ -110,7 +151,4 @@ public class ResultComparator
         }
         return sb.toString();
     }
-
-
-
 }
\ No newline at end of file
diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/ResultHandler.java b/tools/fqltool/src/org/apache/cassandra/fqltool/ResultHandler.java
index 8c4c018..d88c6f7 100644
--- a/tools/fqltool/src/org/apache/cassandra/fqltool/ResultHandler.java
+++ b/tools/fqltool/src/org/apache/cassandra/fqltool/ResultHandler.java
@@ -29,18 +29,26 @@ import java.util.Objects;
 import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ResultHandler implements Closeable
 {
+    private static final Logger logger = LoggerFactory.getLogger(ResultHandler.class);
     private final ResultStore resultStore;
     private final ResultComparator resultComparator;
     private final List<String> targetHosts;
 
     public ResultHandler(List<String> targetHosts, List<File> resultPaths, File queryFilePath)
     {
+        this(targetHosts, resultPaths, queryFilePath, null);
+    }
+
+    public ResultHandler(List<String> targetHosts, List<File> resultPaths, File queryFilePath, MismatchListener mismatchListener)
+    {
         this.targetHosts = targetHosts;
         resultStore = resultPaths != null ? new ResultStore(resultPaths, queryFilePath) : null;
-        resultComparator = new ResultComparator();
+        resultComparator = new ResultComparator(mismatchListener);
     }
 
     /**
@@ -54,11 +62,7 @@ public class ResultHandler implements Closeable
         for (int i = 0; i < targetHosts.size(); i++)
         {
             if (results.get(i).wasFailed())
-            {
-                System.out.println("Query against "+targetHosts.get(i)+" failure:");
-                System.out.println(query);
-                System.out.println("Message: "+results.get(i).getFailureException().getMessage());
-            }
+                logger.error("Query {} against {} failure: {}", query, targetHosts.get(i), results.get(i).getFailureException().getMessage());
         }
 
         List<ComparableColumnDefinitions> columnDefinitions = results.stream().map(ComparableResultSet::getColumnDefinitions).collect(Collectors.toList());
diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Replay.java b/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Replay.java
index adea742..9cc147a 100644
--- a/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Replay.java
+++ b/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Replay.java
@@ -49,7 +49,7 @@ public class Replay implements Runnable
     @Arguments(usage = "<path1> [<path2>...<pathN>]", description = "Paths containing the full query logs to replay.", required = true)
     private List<String> arguments = new ArrayList<>();
 
-    @Option(title = "target", name = {"--target"}, description = "Hosts to replay the logs to, can be repeated to replay to more hosts.")
+    @Option(title = "target", name = {"--target"}, description = "Hosts to replay the logs to, can be repeated to replay to more hosts.", required = true)
     private List<String> targetHosts;
 
     @Option(title = "results", name = { "--results"}, description = "Where to store the results of the queries, this should be a directory. Leave this option out to avoid storing results.")
@@ -58,9 +58,6 @@ public class Replay implements Runnable
     @Option(title = "keyspace", name = { "--keyspace"}, description = "Only replay queries against this keyspace and queries without keyspace set.")
     private String keyspace;
 
-    @Option(title = "debug", name = {"--debug"}, description = "Debug mode, print all queries executed.")
-    private boolean debug;
-
     @Option(title = "store_queries", name = {"--store-queries"}, description = "Path to store the queries executed. Stores queries in the same order as the result sets are in the result files. Requires --results")
     private String queryStorePath;
 
@@ -86,7 +83,7 @@ public class Replay implements Runnable
                 System.err.println("You need to state at least one --target host to replay the query against");
                 System.exit(1);
             }
-            replay(keyspace, arguments, targetHosts, resultPaths, queryStorePath, debug);
+            replay(keyspace, arguments, targetHosts, resultPaths, queryStorePath);
         }
         catch (Exception e)
         {
@@ -94,7 +91,7 @@ public class Replay implements Runnable
         }
     }
 
-    public static void replay(String keyspace, List<String> arguments, List<String> targetHosts, List<File> resultPaths, String queryStorePath, boolean debug)
+    public static void replay(String keyspace, List<String> arguments, List<String> targetHosts, List<File> resultPaths, String queryStorePath)
     {
         int readAhead = 200; // how many fql queries should we read in to memory to be able to sort them?
         List<ChronicleQueue> readQueues = null;
@@ -109,7 +106,7 @@ public class Replay implements Runnable
             readQueues = arguments.stream().map(s -> ChronicleQueueBuilder.single(s).readOnly(true).build()).collect(Collectors.toList());
             iterators = readQueues.stream().map(ChronicleQueue::createTailer).map(tailer -> new FQLQueryIterator(tailer, readAhead)).collect(Collectors.toList());
             try (MergeIterator<FQLQuery, List<FQLQuery>> iter = MergeIterator.get(iterators, FQLQuery::compareTo, new Reducer());
-                 QueryReplayer replayer = new QueryReplayer(iter, targetHosts, resultPaths, filters, System.out, queryStorePath, debug))
+                 QueryReplayer replayer = new QueryReplayer(iter, targetHosts, resultPaths, filters, queryStorePath))
             {
                 replayer.replay();
             }
diff --git a/tools/fqltool/test/unit/org/apache/cassandra/fqltool/FQLReplayTest.java b/tools/fqltool/test/unit/org/apache/cassandra/fqltool/FQLReplayTest.java
index 61c8aa0..8c573f4 100644
--- a/tools/fqltool/test/unit/org/apache/cassandra/fqltool/FQLReplayTest.java
+++ b/tools/fqltool/test/unit/org/apache/cassandra/fqltool/FQLReplayTest.java
@@ -52,10 +52,12 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.MergeIterator;
 import org.apache.cassandra.utils.Pair;
 
+import static org.apache.cassandra.fqltool.QueryReplayer.ParsedTargetHost.fromString;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 public class FQLReplayTest
@@ -544,6 +546,46 @@ public class FQLReplayTest
             compareStatements(statements.get(i), fromFQLQueries.get(i));
     }
 
+    @Test
+    public void testParser() {
+        QueryReplayer.ParsedTargetHost pth;
+        pth = fromString("127.0.0.1");
+        assertEquals("127.0.0.1", pth.host);
+        assertEquals(9042, pth.port );
+        assertNull(pth.user);
+        assertNull(pth.password);
+
+        pth = fromString("127.0.0.1:3333");
+        assertEquals("127.0.0.1", pth.host);
+        assertEquals(3333, pth.port );
+        assertNull(pth.user);
+        assertNull(pth.password);
+
+        pth = fromString("aaa:bbb@127.0.0.1:3333");
+        assertEquals("127.0.0.1", pth.host);
+        assertEquals(3333, pth.port );
+        assertEquals("aaa", pth.user);
+        assertEquals("bbb", pth.password);
+
+        pth = fromString("aaa:bbb@127.0.0.1");
+        assertEquals("127.0.0.1", pth.host);
+        assertEquals(9042, pth.port );
+        assertEquals("aaa", pth.user);
+        assertEquals("bbb", pth.password);
+    }
+
+    @Test(expected = RuntimeException.class)
+    public void testNoPass()
+    {
+        fromString("blabla@abc.com:1234");
+    }
+
+    @Test(expected = RuntimeException.class)
+    public void testBadPort()
+    {
+        fromString("aaa:bbb@abc.com:xyz");
+    }
+
     private void compareStatements(Statement statement1, Statement statement2)
     {
         assertTrue(statement1 instanceof SimpleStatement && statement2 instanceof SimpleStatement);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org