You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by xe...@apache.org on 2013/12/24 03:08:44 UTC
[5/6] Improve Stress Tool patch by Benedict;
reviewed by Pavel Yaskevich for CASSANDRA-6199
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java b/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
new file mode 100644
index 0000000..b9f1a47
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
@@ -0,0 +1,178 @@
+package org.apache.cassandra.stress;
+
+import java.io.PrintStream;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadFactory;
+
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.stress.util.Timing;
+import org.apache.cassandra.stress.util.TimingInterval;
+import org.apache.cassandra.stress.util.Uncertainty;
+import org.apache.commons.lang3.time.DurationFormatUtils;
+
+public class StressMetrics
+{
+
+ private static final ThreadFactory tf = new NamedThreadFactory("StressMetrics");
+
+ private final PrintStream output;
+ private final Thread thread;
+ private volatile boolean stop = false;
+ private volatile boolean cancelled = false;
+ private final Uncertainty opRateUncertainty = new Uncertainty();
+ private final CountDownLatch stopped = new CountDownLatch(1);
+ private final Timing timing = new Timing();
+
+ public StressMetrics(PrintStream output, final long logIntervalMillis)
+ {
+ this.output = output;
+ printHeader("", output);
+ thread = tf.newThread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ timing.start();
+ try {
+
+ while (!stop)
+ {
+ try
+ {
+ long sleep = timing.getHistory().endMillis() + logIntervalMillis - System.currentTimeMillis();
+ if (sleep < logIntervalMillis >>> 3)
+ // if had a major hiccup, sleep full interval
+ Thread.sleep(logIntervalMillis);
+ else
+ Thread.sleep(sleep);
+ update();
+ } catch (InterruptedException e)
+ {
+ break;
+ }
+ }
+
+ update();
+ }
+ catch (InterruptedException e)
+ {}
+ catch (Exception e)
+ {
+ cancel();
+ e.printStackTrace(StressMetrics.this.output);
+ }
+ finally
+ {
+ stopped.countDown();
+ }
+ }
+ });
+ }
+
+ public void start()
+ {
+ thread.start();
+ }
+
+ public void waitUntilConverges(double targetUncertainty, int minMeasurements, int maxMeasurements) throws InterruptedException
+ {
+ opRateUncertainty.await(targetUncertainty, minMeasurements, maxMeasurements);
+ }
+
+ public void cancel()
+ {
+ cancelled = true;
+ stop = true;
+ thread.interrupt();
+ opRateUncertainty.wakeAll();
+ }
+
+ public void stop() throws InterruptedException
+ {
+ stop = true;
+ thread.interrupt();
+ stopped.await();
+ }
+
+ private void update() throws InterruptedException
+ {
+ TimingInterval interval = timing.snapInterval();
+ printRow("", interval, timing.getHistory(), opRateUncertainty, output);
+ opRateUncertainty.update(interval.adjustedOpRate());
+ }
+
+
+ // PRINT FORMATTING
+
+ public static final String HEADFORMAT = "%-10s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%7s,%9s";
+ public static final String ROWFORMAT = "%-10d,%8.0f,%8.0f,%8.0f,%8.1f,%8.1f,%8.1f,%8.1f,%8.1f,%8.1f,%7.1f,%9.5f";
+
+ private static void printHeader(String prefix, PrintStream output)
+ {
+ output.println(prefix + String.format(HEADFORMAT, "ops","op/s", "adj op/s","key/s","mean","med",".95",".99",".999","max","time","stderr"));
+ }
+
+ private static void printRow(String prefix, TimingInterval interval, TimingInterval total, Uncertainty opRateUncertainty, PrintStream output)
+ {
+ output.println(prefix + String.format(ROWFORMAT,
+ total.operationCount,
+ interval.realOpRate(),
+ interval.adjustedOpRate(),
+ interval.keyRate(),
+ interval.meanLatency(),
+ interval.medianLatency(),
+ interval.rankLatency(0.95f),
+ interval.rankLatency(0.99f),
+ interval.rankLatency(0.999f),
+ interval.maxLatency(),
+ total.runTime() / 1000f,
+ opRateUncertainty.getUncertainty()));
+ }
+
+ public void summarise()
+ {
+ output.println("\n");
+ output.println("Results:");
+ TimingInterval history = timing.getHistory();
+ output.println(String.format("real op rate : %.0f", history.realOpRate()));
+ output.println(String.format("adjusted op rate : %.0f", history.adjustedOpRate()));
+ output.println(String.format("adjusted op rate stderr : %.0f", opRateUncertainty.getUncertainty()));
+ output.println(String.format("key rate : %.0f", history.keyRate()));
+ output.println(String.format("latency mean : %.1f", history.meanLatency()));
+ output.println(String.format("latency median : %.1f", history.medianLatency()));
+ output.println(String.format("latency 95th percentile : %.1f", history.rankLatency(.95f)));
+ output.println(String.format("latency 99th percentile : %.1f", history.rankLatency(0.99f)));
+ output.println(String.format("latency 99.9th percentile : %.1f", history.rankLatency(0.999f)));
+ output.println(String.format("latency max : %.1f", history.maxLatency()));
+ output.println("Total operation time : " + DurationFormatUtils.formatDuration(
+ history.runTime(), "HH:mm:ss", true));
+ }
+
+ public static final void summarise(List<String> ids, List<StressMetrics> summarise, PrintStream out)
+ {
+ int idLen = 0;
+ for (String id : ids)
+ idLen = Math.max(id.length(), idLen);
+ String formatstr = "%" + idLen + "s, ";
+ printHeader(String.format(formatstr, "id"), out);
+ for (int i = 0 ; i < ids.size() ; i++)
+ printRow(String.format(formatstr, ids.get(i)),
+ summarise.get(i).timing.getHistory(),
+ summarise.get(i).timing.getHistory(),
+ summarise.get(i).opRateUncertainty,
+ out
+ );
+ }
+
+ public Timing getTiming()
+ {
+ return timing;
+ }
+
+ public boolean wasCancelled()
+ {
+ return cancelled;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/StressServer.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressServer.java b/tools/stress/src/org/apache/cassandra/stress/StressServer.java
index 6600dfd..3c9e2a6 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressServer.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressServer.java
@@ -1,27 +1,30 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
package org.apache.cassandra.stress;
import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.PrintStream;
import java.net.InetAddress;
import java.net.ServerSocket;
+import java.net.Socket;
-import org.apache.cassandra.stress.server.StressThread;
+import org.apache.cassandra.stress.settings.StressSettings;
import org.apache.commons.cli.*;
public class StressServer
@@ -68,4 +71,57 @@ public class StressServer
for (;;)
new StressThread(serverSocket.accept()).start();
}
+
+ public static class StressThread extends Thread
+ {
+ private final Socket socket;
+
+ public StressThread(Socket client)
+ {
+ this.socket = client;
+ }
+
+ public void run()
+ {
+ try
+ {
+ ObjectInputStream in = new ObjectInputStream(socket.getInputStream());
+ PrintStream out = new PrintStream(socket.getOutputStream());
+
+ StressAction action = new StressAction((StressSettings) in.readObject(), out);
+ Thread actionThread = new Thread(action);
+ actionThread.start();
+
+ while (actionThread.isAlive())
+ {
+ try
+ {
+ if (in.readInt() == 1)
+ {
+ actionThread.interrupt();
+ break;
+ }
+ }
+ catch (Exception e)
+ {
+ // continue without problem
+ }
+ }
+
+ out.close();
+ in.close();
+ socket.close();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/StressStatistics.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressStatistics.java b/tools/stress/src/org/apache/cassandra/stress/StressStatistics.java
deleted file mode 100644
index b739c8e..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/StressStatistics.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.stress;
-
-import java.io.PrintStream;
-import org.apache.commons.lang3.time.DurationFormatUtils;
-
-import com.yammer.metrics.stats.Snapshot;
-
-
-/**
- * Gathers and aggregates statistics for an operation
- */
-public class StressStatistics
-{
-
- private Session client;
- private PrintStream output;
-
- private long durationInSeconds;
- /** The sum of the interval_op_rate values collected by tallyAverages */
- private int tallyOpRateSum;
- /** The number of interval_op_rate values collected by tallyAverages */
- private int tallyOpRateCount;
- /** The sum of the interval_key_rate values collected by tallyAverages */
- private int tallyKeyRateSum;
- /** The number of interval_key_rate values collected by tallyAverages */
- private int tallyKeyRateCount;
-
- /** The sum of the latency values collected by tallyAverages */
- private double tallyLatencySum;
- /** The number of latency values collected by tallyAverages */
- private int tallyLatencyCount;
- /** The sum of the 95%tile latency values collected by tallyAverages */
- private double tally95thLatencySum;
- /** The number of 95%tile latency values collected by tallyAverages */
- private int tally95thLatencyCount;
- /** The sum of the 99.9%tile latency values collected by tallyAverages */
- private double tally999thLatencySum;
- /** The number of 99.9%tile latency values collected by tallyAverages */
- private int tally999thLatencyCount;
-
-
- public StressStatistics(Session client, PrintStream out)
- {
- this.client = client;
- this.output = out;
-
- tallyOpRateSum = 0;
- tallyOpRateCount = 0;
- }
-
- /**
- * Collect statistics per-interval
- */
- public void addIntervalStats(int totalOperations, int intervalOpRate,
- int intervalKeyRate, Snapshot latency,
- long currentTimeInSeconds)
- {
- this.tallyAverages(totalOperations, intervalKeyRate, intervalKeyRate,
- latency, currentTimeInSeconds);
- }
-
- /**
- * Collect interval_op_rate and interval_key_rate averages
- */
- private void tallyAverages(int totalOperations, int intervalOpRate,
- int intervalKeyRate, Snapshot latency,
- long currentTimeInSeconds)
- {
- //Skip the first and last 10% of values.
- //The middle values of the operation are the ones worthwhile
- //to collect and average:
- if (totalOperations > (0.10 * client.getNumKeys()) &&
- totalOperations < (0.90 * client.getNumKeys())) {
- tallyOpRateSum += intervalOpRate;
- tallyOpRateCount += 1;
- tallyKeyRateSum += intervalKeyRate;
- tallyKeyRateCount += 1;
- tallyLatencySum += latency.getMedian();
- tallyLatencyCount += 1;
- tally95thLatencySum += latency.get95thPercentile();
- tally95thLatencyCount += 1;
- tally999thLatencySum += latency.get999thPercentile();
- tally999thLatencyCount += 1;
- }
- durationInSeconds = currentTimeInSeconds;
- }
-
- public void printStats()
- {
- output.println("\n");
- if (tallyOpRateCount > 0) {
- output.println("Averages from the middle 80% of values:");
- output.println(String.format("interval_op_rate : %d",
- (tallyOpRateSum / tallyOpRateCount)));
- output.println(String.format("interval_key_rate : %d",
- (tallyKeyRateSum / tallyKeyRateCount)));
- output.println(String.format("latency median : %.1f",
- (tallyLatencySum / tallyLatencyCount)));
- output.println(String.format("latency 95th percentile : %.1f",
- (tally95thLatencySum / tally95thLatencyCount)));
- output.println(String.format("latency 99.9th percentile : %.1f",
- (tally999thLatencySum / tally999thLatencyCount)));
- }
- output.println("Total operation time : " + DurationFormatUtils.formatDuration(
- durationInSeconds*1000, "HH:mm:ss", true));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGen.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGen.java b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGen.java
new file mode 100644
index 0000000..4c22005
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGen.java
@@ -0,0 +1,18 @@
+package org.apache.cassandra.stress.generatedata;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public abstract class DataGen
+{
+
+ public abstract void generate(ByteBuffer fill, long offset);
+ public abstract boolean isDeterministic();
+
+ public void generate(List<ByteBuffer> fills, long offset)
+ {
+ for (ByteBuffer fill : fills)
+ generate(fill, offset++);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenBytesRandom.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenBytesRandom.java b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenBytesRandom.java
new file mode 100644
index 0000000..3906f93
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenBytesRandom.java
@@ -0,0 +1,24 @@
+package org.apache.cassandra.stress.generatedata;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+public class DataGenBytesRandom extends DataGen
+{
+
+ private final Random rnd = new Random();
+
+ @Override
+ public void generate(ByteBuffer fill, long offset)
+ {
+ fill.clear();
+ rnd.nextBytes(fill.array());
+ }
+
+ @Override
+ public boolean isDeterministic()
+ {
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenFactory.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenFactory.java b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenFactory.java
new file mode 100644
index 0000000..c5738cc
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenFactory.java
@@ -0,0 +1,9 @@
+package org.apache.cassandra.stress.generatedata;
+
+import java.io.Serializable;
+
+public interface DataGenFactory extends Serializable
+{
+ DataGen get();
+}
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenHex.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenHex.java b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenHex.java
new file mode 100644
index 0000000..50d49dd
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenHex.java
@@ -0,0 +1,39 @@
+package org.apache.cassandra.stress.generatedata;
+
+import java.nio.ByteBuffer;
+
+public abstract class DataGenHex extends DataGen
+{
+
+ abstract long next(long operationIndex);
+
+ @Override
+ public final void generate(ByteBuffer fill, long operationIndex)
+ {
+ fill.clear();
+ fillKeyStringBytes(next(operationIndex), fill.array());
+ }
+
+ public static void fillKeyStringBytes(long key, byte[] fill)
+ {
+ int ub = fill.length - 1;
+ int offset = 0;
+ while (key != 0)
+ {
+ int digit = ((int) key) & 15;
+ key >>>= 4;
+ fill[ub - offset++] = digit(digit);
+ }
+ while (offset < fill.length)
+ fill[ub - offset++] = '0';
+ }
+
+ // needs to be UTF-8, but for these chars there is no difference
+ private static byte digit(int num)
+ {
+ if (num < 10)
+ return (byte)('0' + num);
+ return (byte)('A' + (num - 10));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenHexFromDistribution.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenHexFromDistribution.java b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenHexFromDistribution.java
new file mode 100644
index 0000000..3391fce
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenHexFromDistribution.java
@@ -0,0 +1,45 @@
+package org.apache.cassandra.stress.generatedata;
+
+import org.apache.commons.math3.distribution.NormalDistribution;
+import org.apache.commons.math3.distribution.UniformRealDistribution;
+
+public class DataGenHexFromDistribution extends DataGenHex
+{
+
+ final Distribution distribution;
+
+ public DataGenHexFromDistribution(Distribution distribution)
+ {
+ this.distribution = distribution;
+ }
+
+ @Override
+ public boolean isDeterministic()
+ {
+ return false;
+ }
+
+ @Override
+ long next(long operationIndex)
+ {
+ return distribution.next();
+ }
+
+ public static DataGenHex buildGaussian(long minKey, long maxKey, double stdevsToLimit)
+ {
+ double midRange = (maxKey + minKey) / 2d;
+ double halfRange = (maxKey - minKey) / 2d;
+ return new DataGenHexFromDistribution(new DistributionBoundApache(new NormalDistribution(midRange, halfRange / stdevsToLimit), minKey, maxKey));
+ }
+
+ public static DataGenHex buildGaussian(long minKey, long maxKey, double mean, double stdev)
+ {
+ return new DataGenHexFromDistribution(new DistributionBoundApache(new NormalDistribution(mean, stdev), minKey, maxKey));
+ }
+
+ public static DataGenHex buildUniform(long minKey, long maxKey)
+ {
+ return new DataGenHexFromDistribution(new DistributionBoundApache(new UniformRealDistribution(minKey, maxKey), minKey, maxKey));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenHexFromOpIndex.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenHexFromOpIndex.java b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenHexFromOpIndex.java
new file mode 100644
index 0000000..5d499d5
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenHexFromOpIndex.java
@@ -0,0 +1,27 @@
+package org.apache.cassandra.stress.generatedata;
+
+public class DataGenHexFromOpIndex extends DataGenHex
+{
+
+ final long minKey;
+ final long maxKey;
+
+ public DataGenHexFromOpIndex(long minKey, long maxKey)
+ {
+ this.minKey = minKey;
+ this.maxKey = maxKey;
+ }
+
+ @Override
+ public boolean isDeterministic()
+ {
+ return true;
+ }
+
+ @Override
+ long next(long operationIndex)
+ {
+ long range = maxKey + 1 - minKey;
+ return Math.abs((operationIndex % range) + minKey);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenStringDictionary.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenStringDictionary.java b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenStringDictionary.java
new file mode 100644
index 0000000..68c8034
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenStringDictionary.java
@@ -0,0 +1,84 @@
+package org.apache.cassandra.stress.generatedata;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.math3.distribution.EnumeratedDistribution;
+import org.apache.commons.math3.util.Pair;
+
+import static com.google.common.base.Charsets.UTF_8;
+
+public class DataGenStringDictionary extends DataGen
+{
+
+ private final byte space = ' ';
+ private final EnumeratedDistribution<byte[]> words;
+
+ public DataGenStringDictionary(EnumeratedDistribution<byte[]> wordDistribution)
+ {
+ words = wordDistribution;
+ }
+
+ @Override
+ public void generate(ByteBuffer fill, long index)
+ {
+ fill(fill, 0);
+ }
+
+ @Override
+ public void generate(List<ByteBuffer> fills, long index)
+ {
+ for (int i = 0 ; i < fills.size() ; i++)
+ fill(fills.get(0), i);
+ }
+
+ private void fill(ByteBuffer fill, int column)
+ {
+ fill.clear();
+ byte[] trg = fill.array();
+ int i = 0;
+ while (i < trg.length)
+ {
+ if (i > 0)
+ trg[i++] = space;
+ byte[] src = words.sample();
+ System.arraycopy(src, 0, trg, i, Math.min(src.length, trg.length - i));
+ i += src.length;
+ }
+ }
+
+ @Override
+ public boolean isDeterministic()
+ {
+ return true;
+ }
+
+ public static DataGenFactory getFactory(File file) throws IOException
+ {
+ final List<Pair<byte[], Double>> words = new ArrayList<>();
+ final BufferedReader reader = new BufferedReader(new FileReader(file));
+ String line;
+ while ( null != (line = reader.readLine()) )
+ {
+ String[] pair = line.split(" +");
+ if (pair.length != 2)
+ throw new IllegalArgumentException("Invalid record in dictionary: \"" + line + "\"");
+ words.add(new Pair<>(pair[1].getBytes(UTF_8), Double.parseDouble(pair[0])));
+ }
+ final EnumeratedDistribution<byte[]> dist = new EnumeratedDistribution<byte[]>(words);
+ return new DataGenFactory()
+ {
+ @Override
+ public DataGen get()
+ {
+ return new DataGenStringDictionary(dist);
+ }
+ };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenStringRepeats.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenStringRepeats.java b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenStringRepeats.java
new file mode 100644
index 0000000..47091f7
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenStringRepeats.java
@@ -0,0 +1,69 @@
+package org.apache.cassandra.stress.generatedata;
+
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.cassandra.utils.FBUtilities;
+
+import static com.google.common.base.Charsets.UTF_8;
+
+public class DataGenStringRepeats extends DataGen
+{
+
+ private static final ConcurrentHashMap<Integer, ConcurrentHashMap<Long, byte[]>> CACHE_LOOKUP = new ConcurrentHashMap<>();
+
+ private final ConcurrentHashMap<Long, byte[]> cache;
+ private final int repeatFrequency;
+ public DataGenStringRepeats(int repeatFrequency)
+ {
+ if (!CACHE_LOOKUP.containsKey(repeatFrequency))
+ CACHE_LOOKUP.putIfAbsent(repeatFrequency, new ConcurrentHashMap<Long, byte[]>());
+ cache = CACHE_LOOKUP.get(repeatFrequency);
+ this.repeatFrequency = repeatFrequency;
+ }
+
+ @Override
+ public void generate(ByteBuffer fill, long index)
+ {
+ fill(fill, index, 0);
+ }
+
+ @Override
+ public void generate(List<ByteBuffer> fills, long index)
+ {
+ for (int i = 0 ; i < fills.size() ; i++)
+ {
+ fill(fills.get(i), index, i);
+ }
+ }
+
+ private void fill(ByteBuffer fill, long index, int column)
+ {
+ fill.clear();
+ byte[] trg = fill.array();
+ byte[] src = getData(index, column);
+ for (int j = 0 ; j < trg.length ; j += src.length)
+ System.arraycopy(src, 0, trg, j, Math.min(src.length, trg.length - j));
+ }
+
+ private byte[] getData(long index, int column)
+ {
+ final long key = (column * repeatFrequency) + (index % repeatFrequency);
+ byte[] r = cache.get(key);
+ if (r != null)
+ return r;
+ MessageDigest md = FBUtilities.threadLocalMD5Digest();
+ r = md.digest(Long.toString(key).getBytes(UTF_8));
+ cache.putIfAbsent(key, r);
+ return r;
+ }
+
+ @Override
+ public boolean isDeterministic()
+ {
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/generatedata/Distribution.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generatedata/Distribution.java b/tools/stress/src/org/apache/cassandra/stress/generatedata/Distribution.java
new file mode 100644
index 0000000..5236eab
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generatedata/Distribution.java
@@ -0,0 +1,19 @@
+package org.apache.cassandra.stress.generatedata;
+
+public abstract class Distribution
+{
+
+ public abstract long next();
+ public abstract long inverseCumProb(double cumProb);
+
+ public long maxValue()
+ {
+ return inverseCumProb(1d);
+ }
+
+ public long minValue()
+ {
+ return inverseCumProb(0d);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/generatedata/DistributionBoundApache.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generatedata/DistributionBoundApache.java b/tools/stress/src/org/apache/cassandra/stress/generatedata/DistributionBoundApache.java
new file mode 100644
index 0000000..9f59dbd
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generatedata/DistributionBoundApache.java
@@ -0,0 +1,42 @@
+package org.apache.cassandra.stress.generatedata;
+
+import org.apache.commons.math3.distribution.AbstractRealDistribution;
+
+public class DistributionBoundApache extends Distribution
+{
+
+ final AbstractRealDistribution delegate;
+ final long min, max;
+
+ public DistributionBoundApache(AbstractRealDistribution delegate, long min, long max)
+ {
+ this.delegate = delegate;
+ this.min = min;
+ this.max = max;
+ }
+
+ @Override
+ public long next()
+ {
+ return bound(min, max, delegate.sample());
+ }
+
+ @Override
+ public long inverseCumProb(double cumProb)
+ {
+ return bound(min, max, delegate.inverseCumulativeProbability(cumProb));
+ }
+
+ private static long bound(long min, long max, double val)
+ {
+ long r = (long) val;
+ if ((r >= min) & (r <= max))
+ return r;
+ if (r < min)
+ return min;
+ if (r > max)
+ return max;
+ throw new IllegalStateException();
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/generatedata/DistributionFactory.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generatedata/DistributionFactory.java b/tools/stress/src/org/apache/cassandra/stress/generatedata/DistributionFactory.java
new file mode 100644
index 0000000..ac2b7ba
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generatedata/DistributionFactory.java
@@ -0,0 +1,10 @@
+package org.apache.cassandra.stress.generatedata;
+
+import java.io.Serializable;
+
+public interface DistributionFactory extends Serializable
+{
+
+ Distribution get();
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/generatedata/DistributionFixed.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generatedata/DistributionFixed.java b/tools/stress/src/org/apache/cassandra/stress/generatedata/DistributionFixed.java
new file mode 100644
index 0000000..6873b1c
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generatedata/DistributionFixed.java
@@ -0,0 +1,25 @@
+package org.apache.cassandra.stress.generatedata;
+
+public class DistributionFixed extends Distribution
+{
+
+ final long key;
+
+ public DistributionFixed(long key)
+ {
+ this.key = key;
+ }
+
+ @Override
+ public long next()
+ {
+ return key;
+ }
+
+ @Override
+ public long inverseCumProb(double cumProb)
+ {
+ return key;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/generatedata/DistributionOffsetApache.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generatedata/DistributionOffsetApache.java b/tools/stress/src/org/apache/cassandra/stress/generatedata/DistributionOffsetApache.java
new file mode 100644
index 0000000..c7a5aca
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generatedata/DistributionOffsetApache.java
@@ -0,0 +1,40 @@
+package org.apache.cassandra.stress.generatedata;
+
+import org.apache.commons.math3.distribution.AbstractRealDistribution;
+
+public class DistributionOffsetApache extends Distribution
+{
+
+ final AbstractRealDistribution delegate;
+ final long min, delta;
+
+ public DistributionOffsetApache(AbstractRealDistribution delegate, long min, long max)
+ {
+ this.delegate = delegate;
+ this.min = min;
+ this.delta = max - min;
+ }
+
+ @Override
+ public long next()
+ {
+ return offset(min, delta, delegate.sample());
+ }
+
+ @Override
+ public long inverseCumProb(double cumProb)
+ {
+ return offset(min, delta, delegate.inverseCumulativeProbability(cumProb));
+ }
+
+ private long offset(long min, long delta, double val)
+ {
+ long r = (long) val;
+ if (r < 0)
+ r = 0;
+ if (r > delta)
+ r = delta;
+ return min + r;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/generatedata/DistributionSeqBatch.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generatedata/DistributionSeqBatch.java b/tools/stress/src/org/apache/cassandra/stress/generatedata/DistributionSeqBatch.java
new file mode 100644
index 0000000..a1a51bb
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generatedata/DistributionSeqBatch.java
@@ -0,0 +1,47 @@
+package org.apache.cassandra.stress.generatedata;
+
+public class DistributionSeqBatch extends DataGenHex
+{
+
+ final Distribution delegate;
+ final int batchSize;
+ final long maxKey;
+
+ private int batchIndex;
+ private long batchKey;
+
+ // object must be published safely if passed between threadCount, due to batchIndex not being volatile. various
+ // hacks possible, but not ideal. don't want to use volatile as object intended for single threaded use.
+ public DistributionSeqBatch(int batchSize, long maxKey, Distribution delegate)
+ {
+ this.batchIndex = batchSize;
+ this.batchSize = batchSize;
+ this.maxKey = maxKey;
+ this.delegate = delegate;
+ }
+
+ @Override
+ long next(long operationIndex)
+ {
+ if (batchIndex >= batchSize)
+ {
+ batchKey = delegate.next();
+ batchIndex = 0;
+ }
+ long r = batchKey + batchIndex++;
+ if (r > maxKey)
+ {
+ batchKey = delegate.next();
+ batchIndex = 1;
+ r = batchKey;
+ }
+ return r;
+ }
+
+ @Override
+ public boolean isDeterministic()
+ {
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/generatedata/KeyGen.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generatedata/KeyGen.java b/tools/stress/src/org/apache/cassandra/stress/generatedata/KeyGen.java
new file mode 100644
index 0000000..cdd6d39
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generatedata/KeyGen.java
@@ -0,0 +1,33 @@
+package org.apache.cassandra.stress.generatedata;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class KeyGen
+{
+
+ final DataGen dataGen;
+ final int keySize;
+ final List<ByteBuffer> keyBuffers = new ArrayList<>();
+
+ public KeyGen(DataGen dataGen, int keySize)
+ {
+ this.dataGen = dataGen;
+ this.keySize = keySize;
+ }
+
+ public List<ByteBuffer> getKeys(int n, long index)
+ {
+ while (keyBuffers.size() < n)
+ keyBuffers.add(ByteBuffer.wrap(new byte[keySize]));
+ dataGen.generate(keyBuffers, index);
+ return keyBuffers;
+ }
+
+ public boolean isDeterministic()
+ {
+ return dataGen.isDeterministic();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/generatedata/RowGen.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generatedata/RowGen.java b/tools/stress/src/org/apache/cassandra/stress/generatedata/RowGen.java
new file mode 100644
index 0000000..869fbc7
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generatedata/RowGen.java
@@ -0,0 +1,31 @@
+package org.apache.cassandra.stress.generatedata;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * Generates a row of data, by constructing one byte buffers per column according to some algorithm
+ * and delegating the work of populating the values of those byte buffers to the provided data generator
+ */
+public abstract class RowGen
+{
+
+ final DataGen dataGen;
+ protected RowGen(DataGen dataGenerator)
+ {
+ this.dataGen = dataGenerator;
+ }
+
+ public List<ByteBuffer> generate(long operationIndex)
+ {
+ List<ByteBuffer> fill = getColumns(operationIndex);
+ dataGen.generate(fill, operationIndex);
+ return fill;
+ }
+
+ // these byte[] may be re-used
+ abstract List<ByteBuffer> getColumns(long operationIndex);
+
+ abstract public boolean isDeterministic();
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/generatedata/RowGenDistributedSize.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generatedata/RowGenDistributedSize.java b/tools/stress/src/org/apache/cassandra/stress/generatedata/RowGenDistributedSize.java
new file mode 100644
index 0000000..b68ab3c
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generatedata/RowGenDistributedSize.java
@@ -0,0 +1,84 @@
+package org.apache.cassandra.stress.generatedata;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class RowGenDistributedSize extends RowGen
+{
+
+ // TODO - make configurable
+ static final int MAX_SINGLE_CACHE_SIZE = 16 * 1024;
+
+ final Distribution countDistribution;
+ final Distribution sizeDistribution;
+
+ final TreeMap<Integer, ByteBuffer> cache = new TreeMap<>();
+
+ // array re-used for returning columns
+ final ByteBuffer[] ret;
+ final int[] sizes;
+
+ public RowGenDistributedSize(DataGen dataGenerator, Distribution countDistribution, Distribution sizeDistribution)
+ {
+ super(dataGenerator);
+ this.countDistribution = countDistribution;
+ this.sizeDistribution = sizeDistribution;
+ ret = new ByteBuffer[(int) countDistribution.maxValue()];
+ sizes = new int[ret.length];
+ }
+
+ ByteBuffer getBuffer(int size)
+ {
+ if (size >= MAX_SINGLE_CACHE_SIZE)
+ return ByteBuffer.allocate(size);
+ Map.Entry<Integer, ByteBuffer> found = cache.ceilingEntry(size);
+ if (found == null)
+ {
+ // remove the next entry down, and replace it with a cache of this size
+ Integer del = cache.lowerKey(size);
+ if (del != null)
+ cache.remove(del);
+ return ByteBuffer.allocate(size);
+ }
+ ByteBuffer r = found.getValue();
+ cache.remove(found.getKey());
+ return r;
+ }
+
+ @Override
+ List<ByteBuffer> getColumns(long operationIndex)
+ {
+ int i = 0;
+ int count = (int) countDistribution.next();
+ while (i < count)
+ {
+ int columnSize = (int) sizeDistribution.next();
+ sizes[i] = columnSize;
+ ret[i] = getBuffer(columnSize);
+ i++;
+ }
+ while (i < ret.length && ret[i] != null)
+ ret[i] = null;
+ i = 0;
+ while (i < count)
+ {
+ ByteBuffer b = ret[i];
+ cache.put(b.capacity(), b);
+ b.position(b.capacity() - sizes[i]);
+ ret[i] = b.slice();
+ b.position(0);
+ i++;
+ }
+ return Arrays.asList(ret).subList(0, count);
+ }
+
+ @Override
+ public boolean isDeterministic()
+ {
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/operations/CQLOperation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CQLOperation.java b/tools/stress/src/org/apache/cassandra/stress/operations/CQLOperation.java
deleted file mode 100644
index 54737a4..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CQLOperation.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cassandra.stress.operations;
-
-import java.nio.ByteBuffer;
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.cassandra.stress.Session;
-import org.apache.cassandra.stress.util.CassandraClient;
-import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.transport.SimpleClient;
-import org.apache.cassandra.transport.messages.ResultMessage;
-import org.apache.cassandra.thrift.Compression;
-import org.apache.cassandra.thrift.CqlResult;
-import org.apache.cassandra.thrift.ThriftConversion;
-
-public abstract class CQLOperation extends Operation
-{
- public CQLOperation(Session client, int idx)
- {
- super(client, idx);
- }
-
- protected abstract void run(CQLQueryExecutor executor) throws IOException;
-
- protected abstract boolean validateThriftResult(CqlResult result);
-
- protected abstract boolean validateNativeResult(ResultMessage result);
-
- public void run(final CassandraClient client) throws IOException
- {
- run(new CQLQueryExecutor()
- {
- public boolean execute(String cqlQuery, List<String> queryParams) throws Exception
- {
- CqlResult result = null;
- if (session.usePreparedStatements())
- {
- Integer stmntId = getPreparedStatement(client, cqlQuery);
- if (session.cqlVersion.startsWith("3"))
- result = client.execute_prepared_cql3_query(stmntId, queryParamsAsByteBuffer(queryParams), session.getConsistencyLevel());
- else
- result = client.execute_prepared_cql_query(stmntId, queryParamsAsByteBuffer(queryParams));
- }
- else
- {
- String formattedQuery = formatCqlQuery(cqlQuery, queryParams);
- if (session.cqlVersion.startsWith("3"))
- result = client.execute_cql3_query(ByteBuffer.wrap(formattedQuery.getBytes()), Compression.NONE, session.getConsistencyLevel());
- else
- result = client.execute_cql_query(ByteBuffer.wrap(formattedQuery.getBytes()), Compression.NONE);
- }
- return validateThriftResult(result);
- }
- });
- }
-
- public void run(final SimpleClient client) throws IOException
- {
- run(new CQLQueryExecutor()
- {
- public boolean execute(String cqlQuery, List<String> queryParams) throws Exception
- {
- ResultMessage result = null;
- if (session.usePreparedStatements())
- {
- byte[] stmntId = getPreparedStatement(client, cqlQuery);
- result = client.executePrepared(stmntId, queryParamsAsByteBuffer(queryParams), ThriftConversion.fromThrift(session.getConsistencyLevel()));
- }
- else
- {
- String formattedQuery = formatCqlQuery(cqlQuery, queryParams);
- result = client.execute(formattedQuery, ThriftConversion.fromThrift(session.getConsistencyLevel()));
- }
- return validateNativeResult(result);
- }
- });
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/operations/CounterAdder.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CounterAdder.java b/tools/stress/src/org/apache/cassandra/stress/operations/CounterAdder.java
deleted file mode 100644
index ab6ae9d..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CounterAdder.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.stress.operations;
-
-import com.yammer.metrics.core.TimerContext;
-import org.apache.cassandra.stress.Session;
-import org.apache.cassandra.stress.util.CassandraClient;
-import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.db.ColumnFamilyType;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class CounterAdder extends Operation
-{
- public CounterAdder(Session client, int index)
- {
- super(client, index);
- }
-
- public void run(CassandraClient client) throws IOException
- {
- List<CounterColumn> columns = new ArrayList<CounterColumn>();
- List<CounterSuperColumn> superColumns = new ArrayList<CounterSuperColumn>();
-
- // format used for keys
- String format = "%0" + session.getTotalKeysLength() + "d";
-
- for (int i = 0; i < session.getColumnsPerKey(); i++)
- {
- String columnName = ("C" + Integer.toString(i));
-
- columns.add(new CounterColumn(ByteBufferUtil.bytes(columnName), 1L));
- }
-
- if (session.getColumnFamilyType() == ColumnFamilyType.Super)
- {
- // supers = [SuperColumn('S' + str(j), columns) for j in xrange(supers_per_key)]
- for (int i = 0; i < session.getSuperColumns(); i++)
- {
- String superColumnName = "S" + Integer.toString(i);
- superColumns.add(new CounterSuperColumn(ByteBuffer.wrap(superColumnName.getBytes()), columns));
- }
- }
-
- String rawKey = String.format(format, index);
- Map<ByteBuffer, Map<String, List<Mutation>>> record = new HashMap<ByteBuffer, Map<String, List<Mutation>>>();
-
- record.put(ByteBufferUtil.bytes(rawKey), session.getColumnFamilyType() == ColumnFamilyType.Super
- ? getSuperColumnsMutationMap(superColumns)
- : getColumnsMutationMap(columns));
-
- TimerContext context = session.latency.time();
-
- boolean success = false;
- String exceptionMessage = null;
-
- for (int t = 0; t < session.getRetryTimes(); t++)
- {
- if (success)
- break;
-
- try
- {
- client.batch_mutate(record, session.getConsistencyLevel());
- success = true;
- }
- catch (Exception e)
- {
- exceptionMessage = getExceptionMessage(e);
- success = false;
- }
- }
-
- if (!success)
- {
- error(String.format("Operation [%d] retried %d times - error incrementing key %s %s%n",
- index,
- session.getRetryTimes(),
- rawKey,
- (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
- }
-
- session.operations.getAndIncrement();
- session.keys.getAndIncrement();
- context.stop();
- }
-
- private Map<String, List<Mutation>> getSuperColumnsMutationMap(List<CounterSuperColumn> superColumns)
- {
- List<Mutation> mutations = new ArrayList<Mutation>();
- Map<String, List<Mutation>> mutationMap = new HashMap<String, List<Mutation>>();
-
- for (CounterSuperColumn s : superColumns)
- {
- ColumnOrSuperColumn cosc = new ColumnOrSuperColumn().setCounter_super_column(s);
- mutations.add(new Mutation().setColumn_or_supercolumn(cosc));
- }
-
- mutationMap.put("SuperCounter1", mutations);
-
- return mutationMap;
- }
-
- private Map<String, List<Mutation>> getColumnsMutationMap(List<CounterColumn> columns)
- {
- List<Mutation> mutations = new ArrayList<Mutation>();
- Map<String, List<Mutation>> mutationMap = new HashMap<String, List<Mutation>>();
-
- for (CounterColumn c : columns)
- {
- ColumnOrSuperColumn cosc = new ColumnOrSuperColumn().setCounter_column(c);
- mutations.add(new Mutation().setColumn_or_supercolumn(cosc));
- }
-
- mutationMap.put("Counter1", mutations);
-
- return mutationMap;
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/operations/CounterGetter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CounterGetter.java b/tools/stress/src/org/apache/cassandra/stress/operations/CounterGetter.java
deleted file mode 100644
index 56ef243..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CounterGetter.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.stress.operations;
-
-import com.yammer.metrics.core.TimerContext;
-import org.apache.cassandra.stress.Session;
-import org.apache.cassandra.stress.util.CassandraClient;
-import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.db.ColumnFamilyType;
-import org.apache.cassandra.thrift.*;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-public class CounterGetter extends Operation
-{
- public CounterGetter(Session client, int index)
- {
- super(client, index);
- }
-
- public void run(CassandraClient client) throws IOException
- {
- SliceRange sliceRange = new SliceRange();
-
- // start/finish
- sliceRange.setStart(new byte[] {}).setFinish(new byte[] {});
-
- // reversed/count
- sliceRange.setReversed(false).setCount(session.getColumnsPerKey());
-
- // initialize SlicePredicate with existing SliceRange
- SlicePredicate predicate = new SlicePredicate().setSlice_range(sliceRange);
-
- if (session.getColumnFamilyType() == ColumnFamilyType.Super)
- {
- runSuperCounterGetter(predicate, client);
- }
- else
- {
- runCounterGetter(predicate, client);
- }
- }
-
- private void runSuperCounterGetter(SlicePredicate predicate, Cassandra.Client client) throws IOException
- {
- byte[] rawKey = generateKey();
- ByteBuffer key = ByteBuffer.wrap(rawKey);
-
- for (int j = 0; j < session.getSuperColumns(); j++)
- {
- String superColumn = 'S' + Integer.toString(j);
- ColumnParent parent = new ColumnParent("SuperCounter1").setSuper_column(superColumn.getBytes());
-
- TimerContext context = session.latency.time();
-
- boolean success = false;
- String exceptionMessage = null;
-
- for (int t = 0; t < session.getRetryTimes(); t++)
- {
- if (success)
- break;
-
- try
- {
- List<ColumnOrSuperColumn> counters;
- counters = client.get_slice(key, parent, predicate, session.getConsistencyLevel());
- success = (counters.size() != 0);
- }
- catch (Exception e)
- {
- exceptionMessage = getExceptionMessage(e);
- success = false;
- }
- }
-
- if (!success)
- {
- error(String.format("Operation [%d] retried %d times - error reading counter key %s %s%n",
- index,
- session.getRetryTimes(),
- new String(rawKey),
- (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
- }
-
- session.operations.getAndIncrement();
- session.keys.getAndIncrement();
- context.stop();
- }
- }
-
- private void runCounterGetter(SlicePredicate predicate, Cassandra.Client client) throws IOException
- {
- ColumnParent parent = new ColumnParent("Counter1");
-
- byte[] key = generateKey();
- ByteBuffer keyBuffer = ByteBuffer.wrap(key);
-
- TimerContext context = session.latency.time();
-
- boolean success = false;
- String exceptionMessage = null;
-
- for (int t = 0; t < session.getRetryTimes(); t++)
- {
- if (success)
- break;
-
- try
- {
- List<ColumnOrSuperColumn> counters;
- counters = client.get_slice(keyBuffer, parent, predicate, session.getConsistencyLevel());
- success = (counters.size() != 0);
- }
- catch (Exception e)
- {
- exceptionMessage = getExceptionMessage(e);
- success = false;
- }
- }
-
- if (!success)
- {
- error(String.format("Operation [%d] retried %d times - error reading counter key %s %s%n",
- index,
- session.getRetryTimes(),
- new String(key),
- (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
- }
-
- session.operations.getAndIncrement();
- session.keys.getAndIncrement();
- context.stop();
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterAdder.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterAdder.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterAdder.java
index 31e8371..8e1f137 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterAdder.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterAdder.java
@@ -21,102 +21,50 @@ package org.apache.cassandra.stress.operations;
*/
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
-import com.yammer.metrics.core.TimerContext;
-import org.apache.cassandra.db.ColumnFamilyType;
-import org.apache.cassandra.stress.Session;
-import org.apache.cassandra.stress.util.CassandraClient;
-import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.transport.messages.ResultMessage;
-import org.apache.cassandra.thrift.Compression;
-import org.apache.cassandra.thrift.CqlResult;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-public class CqlCounterAdder extends CQLOperation
+public class CqlCounterAdder extends CqlOperation<Integer>
{
- private static String cqlQuery = null;
-
- public CqlCounterAdder(Session client, int idx)
+ public CqlCounterAdder(State state, long idx)
{
- super(client, idx);
+ super(state, idx);
}
- protected void run(CQLQueryExecutor executor) throws IOException
+ @Override
+ protected String buildQuery()
{
- if (session.getColumnFamilyType() == ColumnFamilyType.Super)
- throw new RuntimeException("Super columns are not implemented for CQL");
-
- if (cqlQuery == null)
- {
- String counterCF = session.cqlVersion.startsWith("2") ? "Counter1" : "Counter3";
-
- StringBuilder query = new StringBuilder("UPDATE ").append(wrapInQuotesIfRequired(counterCF));
-
- if (session.cqlVersion.startsWith("2"))
- query.append(" USING CONSISTENCY ").append(session.getConsistencyLevel());
-
- query.append(" SET ");
+ String counterCF = state.isCql2() ? "Counter1" : "Counter3";
- for (int i = 0; i < session.getColumnsPerKey(); i++)
- {
- if (i > 0)
- query.append(",");
+ StringBuilder query = new StringBuilder("UPDATE ").append(wrapInQuotesIfRequired(counterCF));
- query.append('C').append(i).append("=C").append(i).append("+1");
- }
- query.append(" WHERE KEY=?");
- cqlQuery = query.toString();
- }
-
- String key = String.format("%0" + session.getTotalKeysLength() + "d", index);
- List<String> queryParams = Collections.singletonList(getUnQuotedCqlBlob(key, session.cqlVersion.startsWith("3")));
+ if (state.isCql2())
+ query.append(" USING CONSISTENCY ").append(state.settings.command.consistencyLevel);
- TimerContext context = session.latency.time();
+ query.append(" SET ");
- boolean success = false;
- String exceptionMessage = null;
-
- for (int t = 0; t < session.getRetryTimes(); t++)
+ // TODO : increment distribution subset of columns
+ for (int i = 0; i < state.settings.columns.maxColumnsPerKey; i++)
{
- if (success)
- break;
+ if (i > 0)
+ query.append(",");
- try
- {
- success = executor.execute(cqlQuery, queryParams);
- }
- catch (Exception e)
- {
- exceptionMessage = getExceptionMessage(e);
- success = false;
- }
+ query.append('C').append(i).append("=C").append(i).append("+1");
}
-
- if (!success)
- {
- error(String.format("Operation [%d] retried %d times - error incrementing key %s %s%n",
- index,
- session.getRetryTimes(),
- key,
- (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
- }
-
- session.operations.getAndIncrement();
- session.keys.getAndIncrement();
- context.stop();
+ query.append(" WHERE KEY=?");
+ return query.toString();
}
- protected boolean validateThriftResult(CqlResult result)
+ @Override
+ protected List<ByteBuffer> getQueryParameters(byte[] key)
{
- return true;
+ return Collections.singletonList(ByteBuffer.wrap(key));
}
- protected boolean validateNativeResult(ResultMessage result)
+ @Override
+ protected CqlRunOp buildRunOp(ClientWrapper client, String query, Object queryId, List<ByteBuffer> params, String keyid, ByteBuffer key)
{
- return true;
+ return new CqlRunOpAlwaysSucceed(client, query, queryId, params, keyid, key, 1);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterGetter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterGetter.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterGetter.java
index a4d037a..0a0b05b 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterGetter.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterGetter.java
@@ -21,100 +21,48 @@ package org.apache.cassandra.stress.operations;
*/
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
-import com.yammer.metrics.core.TimerContext;
-import org.apache.cassandra.db.ColumnFamilyType;
-import org.apache.cassandra.stress.Session;
-import org.apache.cassandra.stress.util.CassandraClient;
-import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.transport.messages.ResultMessage;
-import org.apache.cassandra.thrift.Compression;
-import org.apache.cassandra.thrift.CqlResult;
-import org.apache.cassandra.thrift.CqlResultType;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-public class CqlCounterGetter extends CQLOperation
+public class CqlCounterGetter extends CqlOperation<Integer>
{
- private static String cqlQuery = null;
- public CqlCounterGetter(Session client, int idx)
+ public CqlCounterGetter(State state, long idx)
{
- super(client, idx);
+ super(state, idx);
}
- protected void run(CQLQueryExecutor executor) throws IOException
+ @Override
+ protected List<ByteBuffer> getQueryParameters(byte[] key)
{
- if (session.getColumnFamilyType() == ColumnFamilyType.Super)
- throw new RuntimeException("Super columns are not implemented for CQL");
-
- if (cqlQuery == null)
- {
- StringBuilder query = new StringBuilder("SELECT ");
-
- if (session.cqlVersion.startsWith("2"))
- query.append("FIRST ").append(session.getColumnsPerKey()).append(" ''..''");
- else
- query.append("*");
-
- String counterCF = session.cqlVersion.startsWith("2") ? "Counter1" : "Counter3";
-
- query.append(" FROM ").append(wrapInQuotesIfRequired(counterCF));
-
- if (session.cqlVersion.startsWith("2"))
- query.append(" USING CONSISTENCY ").append(session.getConsistencyLevel().toString());
-
- cqlQuery = query.append(" WHERE KEY=?").toString();
- }
-
- byte[] key = generateKey();
- List<String> queryParams = Collections.singletonList(getUnQuotedCqlBlob(key, session.cqlVersion.startsWith("3")));
+ return Collections.singletonList(ByteBuffer.wrap(key));
+ }
- TimerContext context = session.latency.time();
+ @Override
+ protected String buildQuery()
+ {
+ StringBuilder query = new StringBuilder("SELECT ");
- boolean success = false;
- String exceptionMessage = null;
+ if (state.isCql2())
+ query.append("FIRST ").append(state.settings.columns.maxColumnsPerKey).append(" ''..''");
+ else
+ query.append("*");
- for (int t = 0; t < session.getRetryTimes(); t++)
- {
- if (success)
- break;
+ String counterCF = state.isCql2() ? "Counter1" : "Counter3";
- try
- {
- success = executor.execute(cqlQuery, queryParams);
- }
- catch (Exception e)
- {
- exceptionMessage = getExceptionMessage(e);
- success = false;
- }
- }
+ query.append(" FROM ").append(wrapInQuotesIfRequired(counterCF));
- if (!success)
- {
- error(String.format("Operation [%d] retried %d times - error reading counter key %s %s%n",
- index,
- session.getRetryTimes(),
- new String(key),
- (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
- }
+ if (state.isCql2())
+ query.append(" USING CONSISTENCY ").append(state.settings.command.consistencyLevel);
- session.operations.getAndIncrement();
- session.keys.getAndIncrement();
- context.stop();
+ return query.append(" WHERE KEY=?").toString();
}
- protected boolean validateThriftResult(CqlResult result)
+ @Override
+ protected CqlRunOp buildRunOp(ClientWrapper client, String query, Object queryId, List<ByteBuffer> params, String keyid, ByteBuffer key)
{
- return result.rows.get(0).columns.size() != 0;
+ return new CqlRunOpTestNonEmpty(client, query, queryId, params, keyid, key);
}
- protected boolean validateNativeResult(ResultMessage result)
- {
- return result instanceof ResultMessage.Rows && ((ResultMessage.Rows)result).result.size() != 0;
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java
index bf416cc..748bf30 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java
@@ -1,179 +1,123 @@
package org.apache.cassandra.stress.operations;
/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.Collections;
+import java.util.Arrays;
import java.util.List;
-import com.yammer.metrics.core.TimerContext;
-import org.apache.cassandra.cql3.ResultSet;
-import org.apache.cassandra.db.ColumnFamilyType;
-import org.apache.cassandra.stress.Session;
-import org.apache.cassandra.stress.util.CassandraClient;
-import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.transport.messages.ResultMessage;
-import org.apache.cassandra.thrift.Compression;
-import org.apache.cassandra.thrift.CqlResult;
-import org.apache.cassandra.thrift.CqlRow;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-public class CqlIndexedRangeSlicer extends CQLOperation
+import org.apache.cassandra.stress.settings.SettingsCommandMulti;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class CqlIndexedRangeSlicer extends CqlOperation<byte[][]>
{
- private static List<ByteBuffer> values = null;
- private static String cqlQuery = null;
- private int lastQueryResultSize;
- private int lastMaxKey;
+ volatile boolean acceptNoResults = false;
- public CqlIndexedRangeSlicer(Session client, int idx)
+ public CqlIndexedRangeSlicer(State state, long idx)
{
- super(client, idx);
+ super(state, idx);
}
- protected void run(CQLQueryExecutor executor) throws IOException
+ @Override
+ protected List<ByteBuffer> getQueryParameters(byte[] key)
{
- if (session.getColumnFamilyType() == ColumnFamilyType.Super)
- throw new RuntimeException("Super columns are not implemented for CQL");
-
- if (values == null)
- values = generateValues();
-
- if (cqlQuery == null)
- {
- StringBuilder query = new StringBuilder("SELECT ");
-
- if (session.cqlVersion.startsWith("2"))
- query.append(session.getColumnsPerKey()).append(" ''..''");
- else
- query.append("*");
-
- query.append(" FROM Standard1");
-
- if (session.cqlVersion.startsWith("2"))
- query.append(" USING CONSISTENCY ").append(session.getConsistencyLevel());
+ throw new UnsupportedOperationException();
+ }
- query.append(" WHERE C1=").append(getUnQuotedCqlBlob(values.get(1).array(), session.cqlVersion.startsWith("3")))
- .append(" AND KEY > ? LIMIT ").append(session.getKeysPerCall());
+ @Override
+ protected String buildQuery()
+ {
+ StringBuilder query = new StringBuilder("SELECT ");
- cqlQuery = query.toString();
- }
+ if (state.isCql2())
+ query.append(state.settings.columns.maxColumnsPerKey).append(" ''..''");
+ else
+ query.append("*");
- String format = "%0" + session.getTotalKeysLength() + "d";
- String startOffset = String.format(format, 0);
+ query.append(" FROM Standard1");
- int expectedPerValue = session.getNumKeys() / values.size(), received = 0;
+ if (state.isCql2())
+ query.append(" USING CONSISTENCY ").append(state.settings.command.consistencyLevel);
- while (received < expectedPerValue)
- {
- TimerContext context = session.latency.time();
-
- boolean success = false;
- String exceptionMessage = null;
- String formattedQuery = null;
- List<String> queryParms = Collections.singletonList(getUnQuotedCqlBlob(startOffset, session.cqlVersion.startsWith("3")));
-
- for (int t = 0; t < session.getRetryTimes(); t++)
- {
- if (success)
- break;
-
- try
- {
- success = executor.execute(cqlQuery, queryParms);
- }
- catch (Exception e)
- {
- exceptionMessage = getExceptionMessage(e);
- success = false;
- }
- }
-
- if (!success)
- {
- error(String.format("Operation [%d] retried %d times - error executing indexed range query with offset %s %s%n",
- index,
- session.getRetryTimes(),
- startOffset,
- (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
- }
-
- received += lastQueryResultSize;
-
- // convert max key found back to an integer, and increment it
- startOffset = String.format(format, (1 + lastMaxKey));
-
- session.operations.getAndIncrement();
- session.keys.getAndAdd(lastQueryResultSize);
- context.stop();
- }
+ final String columnName = getColumnName(1);
+ query.append(" WHERE ").append(columnName).append("=?")
+ .append(" AND KEY > ? LIMIT ").append(((SettingsCommandMulti)state.settings.command).keysAtOnce);
+ return query.toString();
}
- /**
- * Get maximum key from CqlRow list
- * @param rows list of the CqlRow objects
- * @return maximum key value of the list
- */
- private int getMaxKey(List<CqlRow> rows)
+ @Override
+ protected void run(CqlOperation.ClientWrapper client) throws IOException
{
- int maxKey = ByteBufferUtil.toInt(rows.get(0).key);
-
- for (CqlRow row : rows)
+ acceptNoResults = false;
+ final List<ByteBuffer> columns = generateColumnValues();
+ final ByteBuffer value = columns.get(1); // only C1 column is indexed
+ byte[] minKey = new byte[0];
+ int rowCount;
+ do
{
- int currentKey = ByteBufferUtil.toInt(row.key);
- if (currentKey > maxKey)
- maxKey = currentKey;
- }
-
- return maxKey;
+ List<ByteBuffer> params = Arrays.asList(value, ByteBuffer.wrap(minKey));
+ CqlRunOp<byte[][]> op = run(client, params, value, new String(value.array()));
+ byte[][] keys = op.result;
+ rowCount = keys.length;
+ minKey = getNextMinKey(minKey, keys);
+ acceptNoResults = true;
+ } while (rowCount > 0);
}
- private int getMaxKey(ResultSet rs)
+ private final class IndexedRangeSliceRunOp extends CqlRunOpFetchKeys
{
- int maxKey = ByteBufferUtil.toInt(rs.rows.get(0).get(0));
- for (List<ByteBuffer> row : rs.rows)
+ protected IndexedRangeSliceRunOp(ClientWrapper client, String query, Object queryId, List<ByteBuffer> params, String keyid, ByteBuffer key)
{
- int currentKey = ByteBufferUtil.toInt(row.get(0));
- if (currentKey > maxKey)
- maxKey = currentKey;
+ super(client, query, queryId, params, keyid, key);
}
- return maxKey;
+ @Override
+ public boolean validate(byte[][] result)
+ {
+ return acceptNoResults || result.length > 0;
+ }
}
- protected boolean validateThriftResult(CqlResult result)
+ @Override
+ protected CqlRunOp<byte[][]> buildRunOp(ClientWrapper client, String query, Object queryId, List<ByteBuffer> params, String keyid, ByteBuffer key)
{
- lastQueryResultSize = result.rows.size();
- lastMaxKey = getMaxKey(result.rows);
- return lastQueryResultSize != 0;
+ return new IndexedRangeSliceRunOp(client, query, queryId, params, keyid, key);
}
- protected boolean validateNativeResult(ResultMessage result)
+ private static byte[] getNextMinKey(byte[] cur, byte[][] keys)
{
- assert result instanceof ResultMessage.Rows;
- lastQueryResultSize = ((ResultMessage.Rows)result).result.size();
- lastMaxKey = getMaxKey(((ResultMessage.Rows)result).result);
- return lastQueryResultSize != 0;
+ // find max
+ for (byte[] key : keys)
+ if (FBUtilities.compareUnsigned(cur, key) < 0)
+ cur = key;
+
+ // increment
+ for (int i = 0 ; i < cur.length ; i++)
+ if (++cur[i] != 0)
+ break;
+ return cur;
}
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java
index d593e57..6b1577c 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java
@@ -21,126 +21,66 @@ package org.apache.cassandra.stress.operations;
*/
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-import com.yammer.metrics.core.TimerContext;
-import org.apache.cassandra.db.ColumnFamilyType;
-import org.apache.cassandra.stress.Session;
-import org.apache.cassandra.stress.util.CassandraClient;
-import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.transport.SimpleClient;
-import org.apache.cassandra.transport.messages.ResultMessage;
-import org.apache.cassandra.thrift.Compression;
-import org.apache.cassandra.thrift.CqlResult;
import org.apache.cassandra.utils.UUIDGen;
-public class CqlInserter extends CQLOperation
+public class CqlInserter extends CqlOperation<Integer>
{
- private static List<ByteBuffer> values;
- private static String cqlQuery = null;
- public CqlInserter(Session client, int idx)
+ public CqlInserter(State state, long idx)
{
- super(client, idx);
+ super(state, idx);
}
- protected void run(CQLQueryExecutor executor) throws IOException
+ @Override
+ protected String buildQuery()
{
- if (session.getColumnFamilyType() == ColumnFamilyType.Super)
- throw new RuntimeException("Super columns are not implemented for CQL");
+ StringBuilder query = new StringBuilder("UPDATE ").append(wrapInQuotesIfRequired(state.settings.schema.columnFamily));
- if (values == null)
- values = generateValues();
+ if (state.isCql2())
+ query.append(" USING CONSISTENCY ").append(state.settings.command.consistencyLevel);
- // Construct a query string once.
- if (cqlQuery == null)
- {
- StringBuilder query = new StringBuilder("UPDATE ").append(wrapInQuotesIfRequired("Standard1"));
-
- if (session.cqlVersion.startsWith("2"))
- query.append(" USING CONSISTENCY ").append(session.getConsistencyLevel().toString());
-
- query.append(" SET ");
-
- for (int i = 0; i < session.getColumnsPerKey(); i++)
- {
- if (i > 0)
- query.append(',');
-
- if (session.timeUUIDComparator)
- {
- if (session.cqlVersion.startsWith("3"))
- throw new UnsupportedOperationException("Cannot use UUIDs in column names with CQL3");
-
- query.append(wrapInQuotesIfRequired(UUIDGen.getTimeUUID().toString()))
- .append(" = ?");
- }
- else
- {
- query.append(wrapInQuotesIfRequired("C" + i)).append(" = ?");
- }
- }
-
- query.append(" WHERE KEY=?");
- cqlQuery = query.toString();
- }
+ query.append(" SET ");
- List<String> queryParms = new ArrayList<String>();
- for (int i = 0; i < session.getColumnsPerKey(); i++)
+ for (int i = 0 ; i < state.settings.columns.maxColumnsPerKey; i++)
{
- // Cell value
- queryParms.add(getUnQuotedCqlBlob(values.get(i % values.size()).array(), session.cqlVersion.startsWith("3")));
- }
-
- String key = String.format("%0" + session.getTotalKeysLength() + "d", index);
- queryParms.add(getUnQuotedCqlBlob(key, session.cqlVersion.startsWith("3")));
-
- TimerContext context = session.latency.time();
-
- boolean success = false;
- String exceptionMessage = null;
-
- for (int t = 0; t < session.getRetryTimes(); t++)
- {
- if (success)
- break;
+ if (i > 0)
+ query.append(',');
- try
+ if (state.settings.columns.useTimeUUIDComparator)
{
- success = executor.execute(cqlQuery, queryParms);
+ if (state.isCql3())
+ throw new UnsupportedOperationException("Cannot use UUIDs in column names with CQL3");
+
+ query.append(wrapInQuotesIfRequired(UUIDGen.getTimeUUID().toString()))
+ .append(" = ?");
}
- catch (Exception e)
+ else
{
- exceptionMessage = getExceptionMessage(e);
- success = false;
+ query.append(wrapInQuotesIfRequired("C" + i)).append(" = ?");
}
}
- if (!success)
- {
- error(String.format("Operation [%d] retried %d times - error inserting key %s %s%n with query %s",
- index,
- session.getRetryTimes(),
- key,
- (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")",
- cqlQuery));
- }
-
- session.operations.getAndIncrement();
- session.keys.getAndIncrement();
- context.stop();
+ query.append(" WHERE KEY=?");
+ return query.toString();
}
- protected boolean validateThriftResult(CqlResult result)
+ @Override
+ protected List<ByteBuffer> getQueryParameters(byte[] key)
{
- return true;
+ final ArrayList<ByteBuffer> queryParams = new ArrayList<>();
+ final List<ByteBuffer> values = generateColumnValues();
+ queryParams.addAll(values);
+ queryParams.add(ByteBuffer.wrap(key));
+ return queryParams;
}
- protected boolean validateNativeResult(ResultMessage result)
+ @Override
+ protected CqlRunOp buildRunOp(ClientWrapper client, String query, Object queryId, List<ByteBuffer> params, String keyid, ByteBuffer key)
{
- return true;
+ return new CqlRunOpAlwaysSucceed(client, query, queryId, params, keyid, key, 1);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/operations/CqlMultiGetter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlMultiGetter.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlMultiGetter.java
index ec645d4..80a7118 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlMultiGetter.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlMultiGetter.java
@@ -23,25 +23,20 @@ package org.apache.cassandra.stress.operations;
import java.io.IOException;
-import org.apache.cassandra.stress.Session;
-import org.apache.cassandra.stress.util.CassandraClient;
-import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.transport.SimpleClient;
+import org.apache.cassandra.stress.Operation;
+import org.apache.cassandra.stress.util.ThriftClient;
public class CqlMultiGetter extends Operation
{
- public CqlMultiGetter(Session client, int idx)
- {
- super(client, idx);
- }
-
- public void run(CassandraClient client) throws IOException
+ public CqlMultiGetter(State state, long idx)
{
+ super(state, idx);
throw new RuntimeException("Multiget is not implemented for CQL");
}
- public void run(SimpleClient client) throws IOException
+ @Override
+ public void run(ThriftClient client) throws IOException
{
- throw new RuntimeException("Multiget is not implemented for CQL");
}
+
}