You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2014/09/07 16:38:46 UTC

[03/15] Improve stress workload realism

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsPopulation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsPopulation.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsPopulation.java
new file mode 100644
index 0000000..da4c282
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsPopulation.java
@@ -0,0 +1,176 @@
+package org.apache.cassandra.stress.settings;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableList;
+
+import org.apache.cassandra.stress.generate.DistributionFactory;
+import org.apache.cassandra.stress.generate.PartitionGenerator;
+
+public class SettingsPopulation implements Serializable
+{
+
+    public final DistributionFactory distribution;
+    public final DistributionFactory readlookback;
+    public final PartitionGenerator.Order order;
+    public final boolean wrap;
+    public final long[] sequence;
+
+    public static enum GenerateOrder
+    {
+        ARBITRARY, SHUFFLED, SORTED
+    }
+
+    private SettingsPopulation(GenerateOptions options, DistributionOptions dist, SequentialOptions pop)
+    {
+        this.order = !options.contents.setByUser() ? PartitionGenerator.Order.ARBITRARY : PartitionGenerator.Order.valueOf(options.contents.value().toUpperCase());
+        if (dist != null)
+        {
+            this.distribution = dist.seed.get();
+            this.sequence = null;
+            this.readlookback = null;
+            this.wrap = false;
+        }
+        else
+        {
+            this.distribution = null;
+            String[] bounds = pop.populate.value().split("\\.\\.+");
+            this.sequence = new long[] { OptionDistribution.parseLong(bounds[0]), OptionDistribution.parseLong(bounds[1]) };
+            this.readlookback = pop.lookback.get();
+            this.wrap = !pop.nowrap.setByUser();
+        }
+    }
+
+    public SettingsPopulation(DistributionOptions options)
+    {
+        this(options, options, null);
+    }
+
+    public SettingsPopulation(SequentialOptions options)
+    {
+        this(options, null, options);
+    }
+
+    // Option Declarations
+
+    private static class GenerateOptions extends GroupedOptions
+    {
+        final OptionSimple contents = new OptionSimple("contents=", "(sorted|shuffled)", null, "SORTED or SHUFFLED (intra-)partition order; if not specified, will be consistent but arbitrary order", false);
+
+        @Override
+        public List<? extends Option> options()
+        {
+            return Arrays.asList(contents);
+        }
+    }
+
+    private static final class DistributionOptions extends GenerateOptions
+    {
+        final OptionDistribution seed;
+
+        public DistributionOptions(String defaultLimit)
+        {
+            seed = new OptionDistribution("dist=", "gaussian(1.." + defaultLimit + ")", "Seeds are selected from this distribution");
+        }
+
+        @Override
+        public List<? extends Option> options()
+        {
+            return ImmutableList.<Option>builder().add(seed).addAll(super.options()).build();
+        }
+    }
+
+    private static final class SequentialOptions extends GenerateOptions
+    {
+        final OptionSimple populate;
+        final OptionDistribution lookback = new OptionDistribution("read-lookback=", "fixed(1)", "Select read seeds from the recently visited write seeds");
+        final OptionSimple nowrap = new OptionSimple("no-wrap", "", null, "Terminate the stress test once all seeds in the range have been visited", false);
+
+        public SequentialOptions(String defaultLimit)
+        {
+            populate = new OptionSimple("seq=", "[0-9]+\\.\\.+[0-9]+[MBK]?",
+                    "1.." + defaultLimit,
+                    "Generate all seeds in sequence", true);
+        }
+
+        @Override
+        public List<? extends Option> options()
+        {
+            return ImmutableList.<Option>builder().add(populate, nowrap, lookback).addAll(super.options()).build();
+        }
+    }
+
+    // CLI Utility Methods
+
+    public static SettingsPopulation get(Map<String, String[]> clArgs, SettingsCommand command)
+    {
+        // set default size to number of commands requested, unless set to err convergence, then use 1M
+        String defaultLimit = command.count <= 0 ? "1000000" : Long.toString(command.count);
+
+        String[] params = clArgs.remove("-pop");
+        if (params == null)
+        {
+            // return defaults:
+            switch(command.type)
+            {
+                case WRITE:
+                case COUNTER_WRITE:
+                    return new SettingsPopulation(new SequentialOptions(defaultLimit));
+                default:
+                    return new SettingsPopulation(new DistributionOptions(defaultLimit));
+            }
+        }
+        GroupedOptions options = GroupedOptions.select(params, new SequentialOptions(defaultLimit), new DistributionOptions(defaultLimit));
+        if (options == null)
+        {
+            printHelp();
+            System.out.println("Invalid -pop options provided, see output for valid options");
+            System.exit(1);
+        }
+        return options instanceof SequentialOptions ?
+                new SettingsPopulation((SequentialOptions) options) :
+                new SettingsPopulation((DistributionOptions) options);
+    }
+
+    public static void printHelp()
+    {
+        GroupedOptions.printOptions(System.out, "-pop", new SequentialOptions("N"), new DistributionOptions("N"));
+    }
+
+    public static Runnable helpPrinter()
+    {
+        return new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                printHelp();
+            }
+        };
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsSchema.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsSchema.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsSchema.java
index 5fb2bb2..6e3a02e 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsSchema.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsSchema.java
@@ -44,12 +44,7 @@ public class SettingsSchema implements Serializable
     public SettingsSchema(Options options, SettingsCommand command)
     {
         if (command instanceof SettingsCommandUser)
-        {
-            if (options.compaction.setByUser() || options.keyspace.setByUser() || options.compression.setByUser() || options.replication.setByUser())
-                throw new IllegalArgumentException("Cannot provide command line schema settings if a user profile is provided");
-
             keyspace = ((SettingsCommandUser) command).profile.keyspaceName;
-        }
         else
             keyspace = options.keyspace.value();
 
@@ -62,14 +57,7 @@ public class SettingsSchema implements Serializable
 
     public void createKeySpaces(StressSettings settings)
     {
-        if (!(settings.command instanceof SettingsCommandUser))
-        {
-            createKeySpacesThrift(settings);
-        }
-        else
-        {
-            ((SettingsCommandUser) settings.command).profile.maybeCreateSchema(settings);
-        }
+        createKeySpacesThrift(settings);
     }
 
 
@@ -189,6 +177,9 @@ public class SettingsSchema implements Serializable
         if (params == null)
             return new SettingsSchema(new Options(), command);
 
+        if (command instanceof SettingsCommandUser)
+            throw new IllegalArgumentException("-schema can only be provided with predefined operations insert, read, etc.; the 'user' command requires a schema yaml instead");
+
         GroupedOptions options = GroupedOptions.select(params, new Options());
         if (options == null)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java b/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
index ab57289..bdd10e5 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
@@ -40,8 +40,10 @@ public class StressSettings implements Serializable
 {
     public final SettingsCommand command;
     public final SettingsRate rate;
-    public final SettingsKey keys;
+    public final SettingsPopulation generate;
+    public final SettingsInsert insert;
     public final SettingsColumn columns;
+    public final SettingsErrors errors;
     public final SettingsLog log;
     public final SettingsMode mode;
     public final SettingsNode node;
@@ -50,12 +52,14 @@ public class StressSettings implements Serializable
     public final SettingsPort port;
     public final String sendToDaemon;
 
-    public StressSettings(SettingsCommand command, SettingsRate rate, SettingsKey keys, SettingsColumn columns, SettingsLog log, SettingsMode mode, SettingsNode node, SettingsSchema schema, SettingsTransport transport, SettingsPort port, String sendToDaemon)
+    public StressSettings(SettingsCommand command, SettingsRate rate, SettingsPopulation generate, SettingsInsert insert, SettingsColumn columns, SettingsErrors errors, SettingsLog log, SettingsMode mode, SettingsNode node, SettingsSchema schema, SettingsTransport transport, SettingsPort port, String sendToDaemon)
     {
         this.command = command;
         this.rate = rate;
-        this.keys = keys;
+        this.insert = insert;
+        this.generate = generate;
         this.columns = columns;
+        this.errors = errors;
         this.log = log;
         this.mode = mode;
         this.node = node;
@@ -129,7 +133,7 @@ public class StressSettings implements Serializable
         }
         catch (Exception e)
         {
-            throw new RuntimeException(e.getMessage());
+            throw new RuntimeException(e);
         }
 
         return client;
@@ -189,9 +193,10 @@ public class StressSettings implements Serializable
 
     public void maybeCreateKeyspaces()
     {
-        if (command.type == Command.WRITE || command.type == Command.COUNTER_WRITE || command.type == Command.USER)
+        if (command.type == Command.WRITE || command.type == Command.COUNTER_WRITE)
             schema.createKeySpaces(this);
-
+        else if (command.type == Command.USER)
+            ((SettingsCommandUser) command).profile.maybeCreateSchema(this);
     }
 
     public static StressSettings parse(String[] args)
@@ -221,8 +226,10 @@ public class StressSettings implements Serializable
         String sendToDaemon = SettingsMisc.getSendToDaemon(clArgs);
         SettingsPort port = SettingsPort.get(clArgs);
         SettingsRate rate = SettingsRate.get(clArgs, command);
-        SettingsKey keys = SettingsKey.get(clArgs, command);
+        SettingsPopulation generate = SettingsPopulation.get(clArgs, command);
+        SettingsInsert insert = SettingsInsert.get(clArgs);
         SettingsColumn columns = SettingsColumn.get(clArgs);
+        SettingsErrors errors = SettingsErrors.get(clArgs);
         SettingsLog log = SettingsLog.get(clArgs);
         SettingsMode mode = SettingsMode.get(clArgs);
         SettingsNode node = SettingsNode.get(clArgs);
@@ -244,7 +251,7 @@ public class StressSettings implements Serializable
             }
             System.exit(1);
         }
-        return new StressSettings(command, rate, keys, columns, log, mode, node, schema, transport, port, sendToDaemon);
+        return new StressSettings(command, rate, generate, insert, columns, errors, log, mode, node, schema, transport, port, sendToDaemon);
     }
 
     private static Map<String, String[]> parseMap(String[] args)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/util/DynamicList.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/util/DynamicList.java b/tools/stress/src/org/apache/cassandra/stress/util/DynamicList.java
new file mode 100644
index 0000000..2a38e7d
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/util/DynamicList.java
@@ -0,0 +1,259 @@
+package org.apache.cassandra.stress.util;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.TreeSet;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.cassandra.stress.generate.FasterRandom;
+
+// simple thread-unsafe skiplist that permits indexing/removal by position, insertion at the end
+// (though easily extended to insertion at any position, not necessary here)
+// we use it for sampling items by position for visiting writes in the pool of pending writes
+public class DynamicList<E>
+{
+
+    // represents a value and an index simultaneously; each node maintains a list
+    // of next pointers for each height in the skip-list this node participates in
+    // (a contiguous range from [0..height))
+    public static class Node<E>
+    {
+        // stores the size of each descendant
+        private final int[] size;
+        // TODO: alternate links to save space
+        private final Node<E>[] links;
+        private final E value;
+
+        private Node(int height, E value)
+        {
+            this.value = value;
+            links = new Node[height * 2];
+            size = new int[height];
+            Arrays.fill(size, 1);
+        }
+
+        private int height()
+        {
+            return size.length;
+        }
+
+        private Node<E> next(int i)
+        {
+            return links[i * 2];
+        }
+
+        private Node<E> prev(int i)
+        {
+            return links[1 + i * 2];
+        }
+
+        private void setNext(int i, Node<E> next)
+        {
+            links[i * 2] = next;
+        }
+
+        private void setPrev(int i, Node<E> prev)
+        {
+            links[1 + i * 2] = prev;
+        }
+
+        private Node parent(int parentHeight)
+        {
+            Node prev = this;
+            while (true)
+            {
+                int height = prev.height();
+                if (parentHeight < height)
+                    return prev;
+                prev = prev.prev(height - 1);
+            }
+        }
+    }
+
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
+    private final int maxHeight;
+    private final Node<E> head;
+    private int size;
+
+    public DynamicList(int maxExpectedSize)
+    {
+        this.maxHeight = 3 + (int) Math.ceil(Math.log(maxExpectedSize) / Math.log(2));
+        head = new Node<>(maxHeight, null);
+    }
+
+    private int randomLevel()
+    {
+        return 1 + Integer.bitCount(ThreadLocalRandom.current().nextInt() & ((1 << (maxHeight - 1)) - 1));
+    }
+
+    // add the value to the end of the list, and return the associated Node that permits efficient removal
+    // regardless of its future position in the list from other modifications
+    public Node<E> append(E value)
+    {
+        Node<E> newTail = new Node<>(randomLevel(), value);
+
+        lock.writeLock().lock();
+        try
+        {
+            size++;
+
+            Node<E> tail = head;
+            for (int i = maxHeight - 1 ; i >= newTail.height() ; i--)
+            {
+                Node<E> next;
+                while ((next = tail.next(i)) != null)
+                    tail = next;
+                tail.size[i]++;
+            }
+
+            for (int i = newTail.height() - 1 ; i >= 0 ; i--)
+            {
+                Node<E> next;
+                while ((next = tail.next(i)) != null)
+                    tail = next;
+                tail.setNext(i, newTail);
+                newTail.setPrev(i, tail);
+            }
+
+            return newTail;
+        }
+        finally
+        {
+            lock.writeLock().unlock();
+        }
+    }
+
+    // remove the provided node and its associated value from the list
+    public void remove(Node<E> node)
+    {
+        lock.writeLock().lock();
+        try
+        {
+            size--;
+
+            // go up through each level in the skip list, unlinking this node; this entails
+            // simply linking each neighbour to each other, and appending the size of the
+            // current level owned by this node's index to the preceding neighbour (since
+            // ownership is defined as any node that you must visit through the index,
+            // removal of ourselves from a level means the preceding index entry is the
+            // entry point to all of the removed node's descendants)
+            for (int i = 0 ; i < node.height() ; i++)
+            {
+                Node<E> prev = node.prev(i);
+                Node<E> next = node.next(i);
+                assert prev != null;
+                prev.setNext(i, next);
+                if (next != null)
+                    next.setPrev(i, prev);
+                prev.size[i] += node.size[i] - 1;
+            }
+
+            // then go up the levels, removing 1 from the size at each height above ours
+            for (int i = node.height() ; i < maxHeight ; i++)
+            {
+                // if we're at our height limit, we backtrack at our top level until we
+                // hit a neighbour with a greater height
+                while (i == node.height())
+                    node = node.prev(i - 1);
+                node.size[i]--;
+            }
+        }
+        finally
+        {
+            lock.writeLock().unlock();
+        }
+    }
+
+    // retrieve the item at the provided index, or return null if the index is past the end of the list
+    public E get(int index)
+    {
+        lock.readLock().lock();
+        try
+        {
+            if (index >= size)
+                return null;
+
+            index++;
+            int c = 0;
+            Node<E> finger = head;
+            for (int i = maxHeight - 1 ; i >= 0 ; i--)
+            {
+                while (c + finger.size[i] <= index)
+                {
+                    c += finger.size[i];
+                    finger = finger.next(i);
+                }
+            }
+
+            assert c == index;
+            return finger.value;
+        }
+        finally
+        {
+            lock.readLock().unlock();
+        }
+    }
+
+    // some quick and dirty tests to confirm the skiplist works as intended
+    // don't create a separate unit test - tools tree doesn't currently warrant them
+
+    private boolean isWellFormed()
+    {
+        for (int i = 0 ; i < maxHeight ; i++)
+        {
+            int c = 0;
+            for (Node node = head ; node != null ; node = node.next(i))
+            {
+                if (node.prev(i) != null && node.prev(i).next(i) != node)
+                    return false;
+                if (node.next(i) != null && node.next(i).prev(i) != node)
+                    return false;
+                c += node.size[i];
+                if (i + 1 < maxHeight && node.parent(i + 1).next(i + 1) == node.next(i))
+                {
+                    if (node.parent(i + 1).size[i + 1] != c)
+                        return false;
+                    c = 0;
+                }
+            }
+            if (i == maxHeight - 1 && c != size + 1)
+                return false;
+        }
+        return true;
+    }
+
+    public static void main(String[] args)
+    {
+        DynamicList<Integer> list = new DynamicList<>(20);
+        TreeSet<Integer> canon = new TreeSet<>();
+        HashMap<Integer, Node> nodes = new HashMap<>();
+        int c = 0;
+        for (int i = 0 ; i < 100000 ; i++)
+        {
+            nodes.put(c, list.append(c));
+            canon.add(c);
+            c++;
+        }
+        FasterRandom rand = new FasterRandom();
+        assert list.isWellFormed();
+        for (int loop = 0 ; loop < 100 ; loop++)
+        {
+            System.out.println(loop);
+            for (int i = 0 ; i < 100000 ; i++)
+            {
+                int index = rand.nextInt(100000);
+                Integer seed = list.get(index);
+//                assert canon.headSet(seed, false).size() == index;
+                list.remove(nodes.remove(seed));
+                canon.remove(seed);
+                nodes.put(c, list.append(c));
+                canon.add(c);
+                c++;
+            }
+            assert list.isWellFormed();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/util/Timer.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/util/Timer.java b/tools/stress/src/org/apache/cassandra/stress/util/Timer.java
index 45e1ba7..4e2b0a3 100644
--- a/tools/stress/src/org/apache/cassandra/stress/util/Timer.java
+++ b/tools/stress/src/org/apache/cassandra/stress/util/Timer.java
@@ -30,7 +30,7 @@ import java.util.concurrent.CountDownLatch;
 public final class Timer
 {
 
-    private static final int SAMPLE_SIZE_SHIFT = 10;
+    private static final int SAMPLE_SIZE_SHIFT = 14;
     private static final int SAMPLE_SIZE_MASK = (1 << SAMPLE_SIZE_SHIFT) - 1;
 
     private final Random rnd = new Random();
@@ -66,6 +66,11 @@ public final class Timer
         return 1 + (index >>> SAMPLE_SIZE_SHIFT);
     }
 
+    public boolean running()
+    {
+        return finalReport == null;
+    }
+
     public void stop(long partitionCount, long rowCount)
     {
         maybeReport();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/util/Timing.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/util/Timing.java b/tools/stress/src/org/apache/cassandra/stress/util/Timing.java
index 2bdca82..b6d4e52 100644
--- a/tools/stress/src/org/apache/cassandra/stress/util/Timing.java
+++ b/tools/stress/src/org/apache/cassandra/stress/util/Timing.java
@@ -40,6 +40,7 @@ public class Timing
     private final CopyOnWriteArrayList<Timer> timers = new CopyOnWriteArrayList<>();
     private volatile TimingInterval history;
     private final Random rnd = new Random();
+    private boolean done;
 
     // TIMING
 
@@ -57,11 +58,16 @@ public class Timing
         if (!ready.await(2L, TimeUnit.MINUTES))
             throw new RuntimeException("Timed out waiting for a timer thread - seems one got stuck");
 
+        boolean done = true;
         // reports have been filled in by timer threadCount, so merge
         List<TimingInterval> intervals = new ArrayList<>();
         for (Timer timer : timers)
+        {
             intervals.add(timer.report);
+            done &= !timer.running();
+        }
 
+        this.done = done;
         return TimingInterval.merge(rnd, intervals, Integer.MAX_VALUE, history.endNanos());
     }
 
@@ -78,10 +84,15 @@ public class Timing
         history = new TimingInterval(System.nanoTime());
     }
 
+    public boolean done()
+    {
+        return done;
+    }
+
     public TimingInterval snapInterval() throws InterruptedException
     {
         final TimingInterval interval = snapInterval(rnd);
-        history = TimingInterval.merge(rnd, Arrays.asList(interval, history), 50000, history.startNanos());
+        history = TimingInterval.merge(rnd, Arrays.asList(interval, history), 200000, history.startNanos());
         return interval;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/util/TimingInterval.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/util/TimingInterval.java b/tools/stress/src/org/apache/cassandra/stress/util/TimingInterval.java
index db3fef1..50ab608 100644
--- a/tools/stress/src/org/apache/cassandra/stress/util/TimingInterval.java
+++ b/tools/stress/src/org/apache/cassandra/stress/util/TimingInterval.java
@@ -97,14 +97,14 @@ public final class TimingInterval
 
     }
 
-    public double realOpRate()
+    public double opRate()
     {
         return operationCount / ((end - start) * 0.000000001d);
     }
 
-    public double adjustedOpRate()
+    public double adjustedRowRate()
     {
-        return operationCount / ((end - (start + pauseLength)) * 0.000000001d);
+        return rowCount / ((end - (start + pauseLength)) * 0.000000001d);
     }
 
     public double partitionRate()