You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2014/07/07 19:34:41 UTC
[18/23] Introduce CQL support for stress tool
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/generatedata/DistributionOffsetApache.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generatedata/DistributionOffsetApache.java b/tools/stress/src/org/apache/cassandra/stress/generatedata/DistributionOffsetApache.java
deleted file mode 100644
index cfbc385..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/generatedata/DistributionOffsetApache.java
+++ /dev/null
@@ -1,61 +0,0 @@
-package org.apache.cassandra.stress.generatedata;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import org.apache.commons.math3.distribution.AbstractRealDistribution;
-
-public class DistributionOffsetApache extends Distribution
-{
-
- final AbstractRealDistribution delegate;
- final long min, delta;
-
- public DistributionOffsetApache(AbstractRealDistribution delegate, long min, long max)
- {
- this.delegate = delegate;
- this.min = min;
- this.delta = max - min;
- }
-
- @Override
- public long next()
- {
- return offset(min, delta, delegate.sample());
- }
-
- @Override
- public long inverseCumProb(double cumProb)
- {
- return offset(min, delta, delegate.inverseCumulativeProbability(cumProb));
- }
-
- private long offset(long min, long delta, double val)
- {
- long r = (long) val;
- if (r < 0)
- r = 0;
- if (r > delta)
- r = delta;
- return min + r;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/generatedata/DistributionSeqBatch.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generatedata/DistributionSeqBatch.java b/tools/stress/src/org/apache/cassandra/stress/generatedata/DistributionSeqBatch.java
deleted file mode 100644
index 8e1a5d5..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/generatedata/DistributionSeqBatch.java
+++ /dev/null
@@ -1,68 +0,0 @@
-package org.apache.cassandra.stress.generatedata;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-public class DistributionSeqBatch extends DataGenHex
-{
-
- final Distribution delegate;
- final int batchSize;
- final long maxKey;
-
- private int batchIndex;
- private long batchKey;
-
- // object must be published safely if passed between threadCount, due to batchIndex not being volatile. various
- // hacks possible, but not ideal. don't want to use volatile as object intended for single threaded use.
- public DistributionSeqBatch(int batchSize, long maxKey, Distribution delegate)
- {
- this.batchIndex = batchSize;
- this.batchSize = batchSize;
- this.maxKey = maxKey;
- this.delegate = delegate;
- }
-
- @Override
- long next(long operationIndex)
- {
- if (batchIndex >= batchSize)
- {
- batchKey = delegate.next();
- batchIndex = 0;
- }
- long r = batchKey + batchIndex++;
- if (r > maxKey)
- {
- batchKey = delegate.next();
- batchIndex = 1;
- r = batchKey;
- }
- return r;
- }
-
- @Override
- public boolean isDeterministic()
- {
- return false;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/generatedata/KeyGen.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generatedata/KeyGen.java b/tools/stress/src/org/apache/cassandra/stress/generatedata/KeyGen.java
deleted file mode 100644
index dad5918..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/generatedata/KeyGen.java
+++ /dev/null
@@ -1,54 +0,0 @@
-package org.apache.cassandra.stress.generatedata;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-public class KeyGen
-{
-
- final DataGen dataGen;
- final int keySize;
- final List<ByteBuffer> keyBuffers = new ArrayList<>();
-
- public KeyGen(DataGen dataGen, int keySize)
- {
- this.dataGen = dataGen;
- this.keySize = keySize;
- }
-
- public List<ByteBuffer> getKeys(int n, long index)
- {
- while (keyBuffers.size() < n)
- keyBuffers.add(ByteBuffer.wrap(new byte[keySize]));
- dataGen.generate(keyBuffers, index, null);
- return keyBuffers;
- }
-
- public boolean isDeterministic()
- {
- return dataGen.isDeterministic();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/generatedata/RowGen.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generatedata/RowGen.java b/tools/stress/src/org/apache/cassandra/stress/generatedata/RowGen.java
deleted file mode 100644
index 9c6ca43..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/generatedata/RowGen.java
+++ /dev/null
@@ -1,53 +0,0 @@
-package org.apache.cassandra.stress.generatedata;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-/**
- * Generates a row of data, by constructing one byte buffers per column according to some algorithm
- * and delegating the work of populating the values of those byte buffers to the provided data generator
- */
-public abstract class RowGen
-{
-
- final DataGen dataGen;
- protected RowGen(DataGen dataGenerator)
- {
- this.dataGen = dataGenerator;
- }
-
- public List<ByteBuffer> generate(long operationIndex, ByteBuffer key)
- {
- List<ByteBuffer> fill = getColumns(operationIndex);
- dataGen.generate(fill, operationIndex, key);
- return fill;
- }
-
- // these byte[] may be re-used
- abstract List<ByteBuffer> getColumns(long operationIndex);
- abstract public int count(long operationIndex);
-
- abstract public boolean isDeterministic();
-
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/generatedata/RowGenDistributedSize.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generatedata/RowGenDistributedSize.java b/tools/stress/src/org/apache/cassandra/stress/generatedata/RowGenDistributedSize.java
deleted file mode 100644
index fffad2f..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/generatedata/RowGenDistributedSize.java
+++ /dev/null
@@ -1,116 +0,0 @@
-package org.apache.cassandra.stress.generatedata;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-public class RowGenDistributedSize extends RowGen
-{
-
- // TODO - make configurable
- static final int MAX_SINGLE_CACHE_SIZE = 16 * 1024;
-
- final Distribution countDistribution;
- final Distribution sizeDistribution;
-
- final TreeMap<Integer, ByteBuffer> cache = new TreeMap<>();
-
- // array re-used for returning columns
- final ByteBuffer[] ret;
- final int[] sizes;
-
- final boolean isDeterministic;
-
- public RowGenDistributedSize(DataGen dataGenerator, Distribution countDistribution, Distribution sizeDistribution)
- {
- super(dataGenerator);
- this.countDistribution = countDistribution;
- this.sizeDistribution = sizeDistribution;
- ret = new ByteBuffer[(int) countDistribution.maxValue()];
- sizes = new int[ret.length];
- // TODO: should keep it deterministic in event that count distribution is not, but size and dataGen are, so that
- // we simply need to generate the correct selection of columns
- this.isDeterministic = dataGen.isDeterministic() && countDistribution.maxValue() == countDistribution.minValue()
- && sizeDistribution.minValue() == sizeDistribution.maxValue();
- }
-
- ByteBuffer getBuffer(int size)
- {
- if (size >= MAX_SINGLE_CACHE_SIZE)
- return ByteBuffer.allocate(size);
- Map.Entry<Integer, ByteBuffer> found = cache.ceilingEntry(size);
- if (found == null)
- {
- // remove the next entry down, and replace it with a cache of this size
- Integer del = cache.lowerKey(size);
- if (del != null)
- cache.remove(del);
- return ByteBuffer.allocate(size);
- }
- ByteBuffer r = found.getValue();
- cache.remove(found.getKey());
- return r;
- }
-
- @Override
- List<ByteBuffer> getColumns(long operationIndex)
- {
- int i = 0;
- int count = (int) countDistribution.next();
- while (i < count)
- {
- int columnSize = (int) sizeDistribution.next();
- sizes[i] = columnSize;
- ret[i] = getBuffer(columnSize);
- i++;
- }
- while (i < ret.length && ret[i] != null)
- ret[i] = null;
- i = 0;
- while (i < count)
- {
- ByteBuffer b = ret[i];
- cache.put(b.capacity(), b);
- b.position(b.capacity() - sizes[i]);
- ret[i] = b.slice();
- b.position(0);
- i++;
- }
- return Arrays.asList(ret).subList(0, count);
- }
-
- public int count(long operationIndex)
- {
- return (int) countDistribution.next();
- }
-
- @Override
- public boolean isDeterministic()
- {
- return isDeterministic;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterAdder.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterAdder.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterAdder.java
deleted file mode 100644
index 9a8c37d..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterAdder.java
+++ /dev/null
@@ -1,77 +0,0 @@
-package org.apache.cassandra.stress.operations;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-public class CqlCounterAdder extends CqlOperation<Integer>
-{
- public CqlCounterAdder(State state, long idx)
- {
- super(state, idx);
- }
-
- @Override
- protected String buildQuery()
- {
- String counterCF = state.isCql2() ? state.type.table : "Counter3";
-
- StringBuilder query = new StringBuilder("UPDATE ").append(wrapInQuotesIfRequired(counterCF));
-
- if (state.isCql2())
- query.append(" USING CONSISTENCY ").append(state.settings.command.consistencyLevel);
-
- query.append(" SET ");
-
- // TODO : increment distribution subset of columns
- for (int i = 0; i < state.settings.columns.maxColumnsPerKey; i++)
- {
- if (i > 0)
- query.append(",");
-
- query.append('C').append(i).append("=C").append(i).append("+?");
- }
- query.append(" WHERE KEY=?");
- return query.toString();
- }
-
- @Override
- protected List<Object> getQueryParameters(byte[] key)
- {
- final List<Object> list = new ArrayList<>();
- for (int i = 0; i < state.settings.columns.maxColumnsPerKey; i++)
- list.add(state.counteradd.next());
- list.add(ByteBuffer.wrap(key));
- return list;
- }
-
- @Override
- protected CqlRunOp<Integer> buildRunOp(ClientWrapper client, String query, Object queryId, List<Object> params, String keyid, ByteBuffer key)
- {
- return new CqlRunOpAlwaysSucceed(client, query, queryId, params, keyid, key, 1);
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterGetter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterGetter.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterGetter.java
deleted file mode 100644
index 88d622e..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterGetter.java
+++ /dev/null
@@ -1,69 +0,0 @@
-package org.apache.cassandra.stress.operations;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.List;
-
-public class CqlCounterGetter extends CqlOperation<Integer>
-{
-
- public CqlCounterGetter(State state, long idx)
- {
- super(state, idx);
- }
-
- @Override
- protected List<Object> getQueryParameters(byte[] key)
- {
- return Collections.<Object>singletonList(ByteBuffer.wrap(key));
- }
-
- @Override
- protected String buildQuery()
- {
- StringBuilder query = new StringBuilder("SELECT ");
-
- // TODO: obey slice/noslice option (instead of always slicing)
- if (state.isCql2())
- query.append("FIRST ").append(state.settings.columns.maxColumnsPerKey).append(" ''..''");
- else
- query.append("*");
-
- String counterCF = state.isCql2() ? state.type.table : "Counter3";
-
- query.append(" FROM ").append(wrapInQuotesIfRequired(counterCF));
-
- if (state.isCql2())
- query.append(" USING CONSISTENCY ").append(state.settings.command.consistencyLevel);
-
- return query.append(" WHERE KEY=?").toString();
- }
-
- @Override
- protected CqlRunOp<Integer> buildRunOp(ClientWrapper client, String query, Object queryId, List<Object> params, String keyid, ByteBuffer key)
- {
- return new CqlRunOpTestNonEmpty(client, query, queryId, params, keyid, key);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java
deleted file mode 100644
index 046381e..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java
+++ /dev/null
@@ -1,118 +0,0 @@
-package org.apache.cassandra.stress.operations;
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.cassandra.utils.FBUtilities;
-
-public class CqlIndexedRangeSlicer extends CqlOperation<byte[][]>
-{
-
- volatile boolean acceptNoResults = false;
-
- public CqlIndexedRangeSlicer(State state, long idx)
- {
- super(state, idx);
- }
-
- @Override
- protected List<Object> getQueryParameters(byte[] key)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected String buildQuery()
- {
- StringBuilder query = new StringBuilder("SELECT");
- query.append(wrapInQuotesIfRequired("key"));
- query.append(" FROM ");
- query.append(wrapInQuotesIfRequired(state.type.table));
-
- if (state.isCql2())
- query.append(" USING CONSISTENCY ").append(state.settings.command.consistencyLevel);
-
- final String columnName = (state.settings.columns.namestrs.get(1));
- query.append(" WHERE ").append(columnName).append("=?")
- .append(" AND KEY > ? LIMIT ").append(state.settings.command.keysAtOnce);
- return query.toString();
- }
-
- @Override
- protected void run(CqlOperation.ClientWrapper client) throws IOException
- {
- acceptNoResults = false;
- final List<ByteBuffer> columns = generateColumnValues(getKey());
- final ByteBuffer value = columns.get(1); // only C1 column is indexed
- byte[] minKey = new byte[0];
- int rowCount;
- do
- {
- List<Object> params = Arrays.<Object>asList(value, ByteBuffer.wrap(minKey));
- CqlRunOp<byte[][]> op = run(client, params, value, new String(value.array()));
- byte[][] keys = op.result;
- rowCount = keys.length;
- minKey = getNextMinKey(minKey, keys);
- acceptNoResults = true;
- } while (rowCount > 0);
- }
-
- private final class IndexedRangeSliceRunOp extends CqlRunOpFetchKeys
- {
-
- protected IndexedRangeSliceRunOp(ClientWrapper client, String query, Object queryId, List<Object> params, String keyid, ByteBuffer key)
- {
- super(client, query, queryId, params, keyid, key);
- }
-
- @Override
- public boolean validate(byte[][] result)
- {
- return acceptNoResults || result.length > 0;
- }
- }
-
- @Override
- protected CqlRunOp<byte[][]> buildRunOp(ClientWrapper client, String query, Object queryId, List<Object> params, String keyid, ByteBuffer key)
- {
- return new IndexedRangeSliceRunOp(client, query, queryId, params, keyid, key);
- }
-
- private static byte[] getNextMinKey(byte[] cur, byte[][] keys)
- {
- // find max
- for (byte[] key : keys)
- if (FBUtilities.compareUnsigned(cur, key) < 0)
- cur = key;
-
- // increment
- for (int i = 0 ; i < cur.length ; i++)
- if (++cur[i] != 0)
- break;
- return cur;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java
deleted file mode 100644
index 71cdadf..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java
+++ /dev/null
@@ -1,88 +0,0 @@
-package org.apache.cassandra.stress.operations;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.cassandra.utils.UUIDGen;
-
-public class CqlInserter extends CqlOperation<Integer>
-{
-
- public CqlInserter(State state, long idx)
- {
- super(state, idx);
- if (state.settings.columns.useTimeUUIDComparator)
- throw new IllegalStateException("Cannot use TimeUUID Comparator with CQL");
- }
-
- @Override
- protected String buildQuery()
- {
- StringBuilder query = new StringBuilder("UPDATE ").append(wrapInQuotesIfRequired(state.type.table));
-
- if (state.isCql2())
- query.append(" USING CONSISTENCY ").append(state.settings.command.consistencyLevel);
-
- query.append(" SET ");
-
- for (int i = 0 ; i < state.settings.columns.maxColumnsPerKey; i++)
- {
- if (i > 0)
- query.append(',');
-
- if (state.settings.columns.useTimeUUIDComparator)
- {
- if (state.isCql3())
- throw new UnsupportedOperationException("Cannot use UUIDs in column names with CQL3");
-
- query.append(wrapInQuotesIfRequired(UUIDGen.getTimeUUID().toString()))
- .append(" = ?");
- }
- else
- {
- query.append(wrapInQuotesIfRequired("C" + i)).append(" = ?");
- }
- }
-
- query.append(" WHERE KEY=?");
- return query.toString();
- }
-
- @Override
- protected List<Object> getQueryParameters(byte[] key)
- {
- final ArrayList<Object> queryParams = new ArrayList<>();
- final List<ByteBuffer> values = generateColumnValues(ByteBuffer.wrap(key));
- queryParams.addAll(values);
- queryParams.add(ByteBuffer.wrap(key));
- return queryParams;
- }
-
- @Override
- protected CqlRunOp<Integer> buildRunOp(ClientWrapper client, String query, Object queryId, List<Object> params, String keyid, ByteBuffer key)
- {
- return new CqlRunOpAlwaysSucceed(client, query, queryId, params, keyid, key, 1);
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/operations/CqlMultiGetter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlMultiGetter.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlMultiGetter.java
deleted file mode 100644
index 80a7118..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlMultiGetter.java
+++ /dev/null
@@ -1,42 +0,0 @@
-package org.apache.cassandra.stress.operations;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import java.io.IOException;
-
-import org.apache.cassandra.stress.Operation;
-import org.apache.cassandra.stress.util.ThriftClient;
-
-public class CqlMultiGetter extends Operation
-{
- public CqlMultiGetter(State state, long idx)
- {
- super(state, idx);
- throw new RuntimeException("Multiget is not implemented for CQL");
- }
-
- @Override
- public void run(ThriftClient client) throws IOException
- {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/operations/CqlOperation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlOperation.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlOperation.java
deleted file mode 100644
index 1c59e2d..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlOperation.java
+++ /dev/null
@@ -1,698 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cassandra.stress.operations;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.google.common.base.Function;
-import org.apache.cassandra.stress.Operation;
-import org.apache.cassandra.stress.settings.ConnectionStyle;
-import org.apache.cassandra.stress.settings.CqlVersion;
-import org.apache.cassandra.stress.util.JavaDriverClient;
-import org.apache.cassandra.stress.util.ThriftClient;
-import org.apache.cassandra.thrift.Compression;
-import org.apache.cassandra.thrift.CqlResult;
-import org.apache.cassandra.thrift.CqlRow;
-import org.apache.cassandra.thrift.ThriftConversion;
-import org.apache.cassandra.transport.SimpleClient;
-import org.apache.cassandra.transport.messages.ResultMessage;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.thrift.TException;
-
-public abstract class CqlOperation<V> extends Operation
-{
-
- protected abstract List<Object> getQueryParameters(byte[] key);
- protected abstract String buildQuery();
- protected abstract CqlRunOp<V> buildRunOp(ClientWrapper client, String query, Object queryId, List<Object> params, String id, ByteBuffer key);
-
- public CqlOperation(State state, long idx)
- {
- super(state, idx);
- if (state.settings.columns.useSuperColumns)
- throw new IllegalStateException("Super columns are not implemented for CQL");
- if (state.settings.columns.variableColumnCount)
- throw new IllegalStateException("Variable column counts are not implemented for CQL");
- if (state.settings.columns.useTimeUUIDComparator)
- throw new IllegalStateException("Cannot use TimeUUID Comparator with CQL");
- }
-
- protected CqlRunOp<V> run(final ClientWrapper client, final List<Object> queryParams, final ByteBuffer key, final String keyid) throws IOException
- {
- final CqlRunOp<V> op;
- if (state.settings.mode.style == ConnectionStyle.CQL_PREPARED)
- {
- final Object id;
- Object idobj = state.getCqlCache();
- if (idobj == null)
- {
- try
- {
- id = client.createPreparedStatement(buildQuery());
- } catch (TException e)
- {
- throw new RuntimeException(e);
- }
- state.storeCqlCache(id);
- }
- else
- id = idobj;
-
- op = buildRunOp(client, null, id, queryParams, keyid, key);
- }
- else
- {
- final String query;
- Object qobj = state.getCqlCache();
- if (qobj == null)
- state.storeCqlCache(query = buildQuery());
- else
- query = qobj.toString();
-
- op = buildRunOp(client, query, null, queryParams, keyid, key);
- }
-
- timeWithRetry(op);
- return op;
- }
-
- protected void run(final ClientWrapper client) throws IOException
- {
- final byte[] key = getKey().array();
- final List<Object> queryParams = getQueryParameters(key);
- run(client, queryParams, ByteBuffer.wrap(key), new String(key));
- }
-
- // Classes to process Cql results
-
- // Always succeeds so long as the query executes without error; provides a keyCount to increment on instantiation
- protected final class CqlRunOpAlwaysSucceed extends CqlRunOp<Integer>
- {
-
- final int keyCount;
-
- protected CqlRunOpAlwaysSucceed(ClientWrapper client, String query, Object queryId, List<Object> params, String id, ByteBuffer key, int keyCount)
- {
- super(client, query, queryId, RowCountHandler.INSTANCE, params, id, key);
- this.keyCount = keyCount;
- }
-
- @Override
- public boolean validate(Integer result)
- {
- return true;
- }
-
- @Override
- public int keyCount()
- {
- return keyCount;
- }
- }
-
- // Succeeds so long as the result set is nonempty, and the query executes without error
- protected final class CqlRunOpTestNonEmpty extends CqlRunOp<Integer>
- {
-
- protected CqlRunOpTestNonEmpty(ClientWrapper client, String query, Object queryId, List<Object> params, String id, ByteBuffer key)
- {
- super(client, query, queryId, RowCountHandler.INSTANCE, params, id, key);
- }
-
- @Override
- public boolean validate(Integer result)
- {
- return result > 0;
- }
-
- @Override
- public int keyCount()
- {
- return result;
- }
- }
-
- // Requires a custom validate() method, but fetches and stores the keys from the result set for further processing
- protected abstract class CqlRunOpFetchKeys extends CqlRunOp<byte[][]>
- {
-
- protected CqlRunOpFetchKeys(ClientWrapper client, String query, Object queryId, List<Object> params, String id, ByteBuffer key)
- {
- super(client, query, queryId, KeysHandler.INSTANCE, params, id, key);
- }
-
- @Override
- public int keyCount()
- {
- return result.length;
- }
-
- }
-
- protected final class CqlRunOpMatchResults extends CqlRunOp<ByteBuffer[][]>
- {
-
- final List<List<ByteBuffer>> expect;
-
- // a null value for an item in expect means we just check the row is present
- protected CqlRunOpMatchResults(ClientWrapper client, String query, Object queryId, List<Object> params, String id, ByteBuffer key, List<List<ByteBuffer>> expect)
- {
- super(client, query, queryId, RowsHandler.INSTANCE, params, id, key);
- this.expect = expect;
- }
-
- @Override
- public int keyCount()
- {
- return result == null ? 0 : result.length;
- }
-
- public boolean validate(ByteBuffer[][] result)
- {
- if (result.length != expect.size())
- return false;
- for (int i = 0 ; i < result.length ; i++)
- if (expect.get(i) != null && !expect.get(i).equals(Arrays.asList(result[i])))
- return false;
- return true;
- }
- }
-
- // Cql
- protected abstract class CqlRunOp<V> implements RunOp
- {
-
- final ClientWrapper client;
- final String query;
- final Object queryId;
- final List<Object> params;
- final String id;
- final ByteBuffer key;
- final ResultHandler<V> handler;
- V result;
-
- private CqlRunOp(ClientWrapper client, String query, Object queryId, ResultHandler<V> handler, List<Object> params, String id, ByteBuffer key)
- {
- this.client = client;
- this.query = query;
- this.queryId = queryId;
- this.handler = handler;
- this.params = params;
- this.id = id;
- this.key = key;
- }
-
- @Override
- public boolean run() throws Exception
- {
- return queryId != null
- ? validate(result = client.execute(queryId, key, params, handler))
- : validate(result = client.execute(query, key, params, handler));
- }
-
- @Override
- public String key()
- {
- return id;
- }
-
- public abstract boolean validate(V result);
-
- }
-
-
- /// LOTS OF WRAPPING/UNWRAPPING NONSENSE
-
-
- @Override
- public void run(final ThriftClient client) throws IOException
- {
- run(wrap(client));
- }
-
- @Override
- public void run(SimpleClient client) throws IOException
- {
- run(wrap(client));
- }
-
- @Override
- public void run(JavaDriverClient client) throws IOException
- {
- run(wrap(client));
- }
-
- public ClientWrapper wrap(ThriftClient client)
- {
- return state.isCql3()
- ? new Cql3CassandraClientWrapper(client)
- : new Cql2CassandraClientWrapper(client);
-
- }
-
- public ClientWrapper wrap(JavaDriverClient client)
- {
- return new JavaDriverWrapper(client);
- }
-
- public ClientWrapper wrap(SimpleClient client)
- {
- return new SimpleClientWrapper(client);
- }
-
- protected interface ClientWrapper
- {
- Object createPreparedStatement(String cqlQuery) throws TException;
- <V> V execute(Object preparedStatementId, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler) throws TException;
- <V> V execute(String query, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler) throws TException;
- }
-
- private final class JavaDriverWrapper implements ClientWrapper
- {
- final JavaDriverClient client;
- private JavaDriverWrapper(JavaDriverClient client)
- {
- this.client = client;
- }
-
- @Override
- public <V> V execute(String query, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler)
- {
- String formattedQuery = formatCqlQuery(query, queryParams, state.isCql3());
- return handler.javaDriverHandler().apply(client.execute(formattedQuery, ThriftConversion.fromThrift(state.settings.command.consistencyLevel)));
- }
-
- @Override
- public <V> V execute(Object preparedStatementId, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler)
- {
- return handler.javaDriverHandler().apply(
- client.executePrepared(
- (PreparedStatement) preparedStatementId,
- queryParams,
- ThriftConversion.fromThrift(state.settings.command.consistencyLevel)));
- }
-
- @Override
- public Object createPreparedStatement(String cqlQuery)
- {
- return client.prepare(cqlQuery);
- }
- }
-
- private final class SimpleClientWrapper implements ClientWrapper
- {
- final SimpleClient client;
- private SimpleClientWrapper(SimpleClient client)
- {
- this.client = client;
- }
-
- @Override
- public <V> V execute(String query, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler)
- {
- String formattedQuery = formatCqlQuery(query, queryParams, state.isCql3());
- return handler.thriftHandler().apply(client.execute(formattedQuery, ThriftConversion.fromThrift(state.settings.command.consistencyLevel)));
- }
-
- @Override
- public <V> V execute(Object preparedStatementId, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler)
- {
- return handler.thriftHandler().apply(
- client.executePrepared(
- (byte[]) preparedStatementId,
- toByteBufferParams(queryParams),
- ThriftConversion.fromThrift(state.settings.command.consistencyLevel)));
- }
-
- @Override
- public Object createPreparedStatement(String cqlQuery)
- {
- return client.prepare(cqlQuery).statementId.bytes;
- }
- }
-
- // client wrapper for Cql3
- private final class Cql3CassandraClientWrapper implements ClientWrapper
- {
- final ThriftClient client;
- private Cql3CassandraClientWrapper(ThriftClient client)
- {
- this.client = client;
- }
-
- @Override
- public <V> V execute(String query, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler) throws TException
- {
- String formattedQuery = formatCqlQuery(query, queryParams, true);
- return handler.simpleNativeHandler().apply(
- client.execute_cql3_query(formattedQuery, key, Compression.NONE, state.settings.command.consistencyLevel)
- );
- }
-
- @Override
- public <V> V execute(Object preparedStatementId, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler) throws TException
- {
- Integer id = (Integer) preparedStatementId;
- return handler.simpleNativeHandler().apply(
- client.execute_prepared_cql3_query(id, key, toByteBufferParams(queryParams), state.settings.command.consistencyLevel)
- );
- }
-
- @Override
- public Object createPreparedStatement(String cqlQuery) throws TException
- {
- return client.prepare_cql3_query(cqlQuery, Compression.NONE);
- }
- }
-
- // client wrapper for Cql2
- private final class Cql2CassandraClientWrapper implements ClientWrapper
- {
- final ThriftClient client;
- private Cql2CassandraClientWrapper(ThriftClient client)
- {
- this.client = client;
- }
-
- @Override
- public <V> V execute(String query, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler) throws TException
- {
- String formattedQuery = formatCqlQuery(query, queryParams, false);
- return handler.simpleNativeHandler().apply(
- client.execute_cql_query(formattedQuery, key, Compression.NONE)
- );
- }
-
- @Override
- public <V> V execute(Object preparedStatementId, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler) throws TException
- {
- Integer id = (Integer) preparedStatementId;
- return handler.simpleNativeHandler().apply(
- client.execute_prepared_cql_query(id, key, toByteBufferParams(queryParams))
- );
- }
-
- @Override
- public Object createPreparedStatement(String cqlQuery) throws TException
- {
- return client.prepare_cql_query(cqlQuery, Compression.NONE);
- }
- }
-
- // interface for building functions to standardise results from each client
- protected static interface ResultHandler<V>
- {
- Function<ResultSet, V> javaDriverHandler();
- Function<ResultMessage, V> thriftHandler();
- Function<CqlResult, V> simpleNativeHandler();
- }
-
- protected static class RowCountHandler implements ResultHandler<Integer>
- {
- static final RowCountHandler INSTANCE = new RowCountHandler();
-
- @Override
- public Function<ResultSet, Integer> javaDriverHandler()
- {
- return new Function<ResultSet, Integer>()
- {
- @Override
- public Integer apply(ResultSet rows)
- {
- if (rows == null)
- return 0;
- return rows.all().size();
- }
- };
- }
-
- @Override
- public Function<ResultMessage, Integer> thriftHandler()
- {
- return new Function<ResultMessage, Integer>()
- {
- @Override
- public Integer apply(ResultMessage result)
- {
- return result instanceof ResultMessage.Rows ? ((ResultMessage.Rows) result).result.size() : 0;
- }
- };
- }
-
- @Override
- public Function<CqlResult, Integer> simpleNativeHandler()
- {
- return new Function<CqlResult, Integer>()
- {
-
- @Override
- public Integer apply(CqlResult result)
- {
- switch (result.getType())
- {
- case ROWS:
- return result.getRows().size();
- default:
- return 1;
- }
- }
- };
- }
-
- }
-
- // Processes results from each client into an array of all key bytes returned
- protected static final class RowsHandler implements ResultHandler<ByteBuffer[][]>
- {
- static final RowsHandler INSTANCE = new RowsHandler();
-
- @Override
- public Function<ResultSet, ByteBuffer[][]> javaDriverHandler()
- {
- return new Function<ResultSet, ByteBuffer[][]>()
- {
-
- @Override
- public ByteBuffer[][] apply(ResultSet result)
- {
- if (result == null)
- return new ByteBuffer[0][];
- List<Row> rows = result.all();
-
- ByteBuffer[][] r = new ByteBuffer[rows.size()][];
- for (int i = 0 ; i < r.length ; i++)
- {
- Row row = rows.get(i);
- r[i] = new ByteBuffer[row.getColumnDefinitions().size()];
- for (int j = 0 ; j < row.getColumnDefinitions().size() ; j++)
- r[i][j] = row.getBytes(j);
- }
- return r;
- }
- };
- }
-
- @Override
- public Function<ResultMessage, ByteBuffer[][]> thriftHandler()
- {
- return new Function<ResultMessage, ByteBuffer[][]>()
- {
-
- @Override
- public ByteBuffer[][] apply(ResultMessage result)
- {
- if (!(result instanceof ResultMessage.Rows))
- return new ByteBuffer[0][];
-
- ResultMessage.Rows rows = ((ResultMessage.Rows) result);
- ByteBuffer[][] r = new ByteBuffer[rows.result.size()][];
- for (int i = 0 ; i < r.length ; i++)
- {
- List<ByteBuffer> row = rows.result.rows.get(i);
- r[i] = new ByteBuffer[row.size()];
- for (int j = 0 ; j < row.size() ; j++)
- r[i][j] = row.get(j);
- }
- return r;
- }
- };
- }
-
- @Override
- public Function<CqlResult, ByteBuffer[][]> simpleNativeHandler()
- {
- return new Function<CqlResult, ByteBuffer[][]>()
- {
-
- @Override
- public ByteBuffer[][] apply(CqlResult result)
- {
- ByteBuffer[][] r = new ByteBuffer[result.getRows().size()][];
- for (int i = 0 ; i < r.length ; i++)
- {
- CqlRow row = result.getRows().get(i);
- r[i] = new ByteBuffer[row.getColumns().size()];
- for (int j = 0 ; j < r[i].length ; j++)
- r[i][j] = ByteBuffer.wrap(row.getColumns().get(j).getValue());
- }
- return r;
- }
- };
- }
-
- }
- // Processes results from each client into an array of all key bytes returned
- protected static final class KeysHandler implements ResultHandler<byte[][]>
- {
- static final KeysHandler INSTANCE = new KeysHandler();
-
- @Override
- public Function<ResultSet, byte[][]> javaDriverHandler()
- {
- return new Function<ResultSet, byte[][]>()
- {
-
- @Override
- public byte[][] apply(ResultSet result)
- {
-
- if (result == null)
- return new byte[0][];
- List<Row> rows = result.all();
- byte[][] r = new byte[rows.size()][];
- for (int i = 0 ; i < r.length ; i++)
- r[i] = rows.get(i).getBytes(0).array();
- return r;
- }
- };
- }
-
- @Override
- public Function<ResultMessage, byte[][]> thriftHandler()
- {
- return new Function<ResultMessage, byte[][]>()
- {
-
- @Override
- public byte[][] apply(ResultMessage result)
- {
- if (result instanceof ResultMessage.Rows)
- {
- ResultMessage.Rows rows = ((ResultMessage.Rows) result);
- byte[][] r = new byte[rows.result.size()][];
- for (int i = 0 ; i < r.length ; i++)
- r[i] = rows.result.rows.get(i).get(0).array();
- return r;
- }
- return null;
- }
- };
- }
-
- @Override
- public Function<CqlResult, byte[][]> simpleNativeHandler()
- {
- return new Function<CqlResult, byte[][]>()
- {
-
- @Override
- public byte[][] apply(CqlResult result)
- {
- byte[][] r = new byte[result.getRows().size()][];
- for (int i = 0 ; i < r.length ; i++)
- r[i] = result.getRows().get(i).getKey();
- return r;
- }
- };
- }
-
- }
-
- private static String getUnQuotedCqlBlob(ByteBuffer term, boolean isCQL3)
- {
- return isCQL3
- ? "0x" + ByteBufferUtil.bytesToHex(term)
- : ByteBufferUtil.bytesToHex(term);
- }
-
- /**
- * Constructs a CQL query string by replacing instances of the character
- * '?', with the corresponding parameter.
- *
- * @param query base query string to format
- * @param parms sequence of string query parameters
- * @return formatted CQL query string
- */
- private static String formatCqlQuery(String query, List<Object> parms, boolean isCql3)
- {
- int marker, position = 0;
- StringBuilder result = new StringBuilder();
-
- if (-1 == (marker = query.indexOf('?')) || parms.size() == 0)
- return query;
-
- for (Object parm : parms)
- {
- result.append(query.substring(position, marker));
- if (parm instanceof ByteBuffer)
- result.append(getUnQuotedCqlBlob((ByteBuffer) parm, isCql3));
- else if (parm instanceof Long)
- result.append(parm.toString());
- else throw new AssertionError();
-
- position = marker + 1;
- if (-1 == (marker = query.indexOf('?', position + 1)))
- break;
- }
-
- if (position < query.length())
- result.append(query.substring(position));
-
- return result.toString();
- }
-
- private static List<ByteBuffer> toByteBufferParams(List<Object> params)
- {
- List<ByteBuffer> r = new ArrayList<>();
- for (Object param : params)
- {
- if (param instanceof ByteBuffer)
- r.add((ByteBuffer) param);
- else if (param instanceof Long)
- r.add(ByteBufferUtil.bytes((Long) param));
- else throw new AssertionError();
- }
- return r;
- }
-
- protected String wrapInQuotesIfRequired(String string)
- {
- return state.settings.mode.cqlVersion == CqlVersion.CQL3
- ? "\"" + string + "\""
- : string;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/operations/CqlRangeSlicer.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlRangeSlicer.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlRangeSlicer.java
deleted file mode 100644
index 16cdff3..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlRangeSlicer.java
+++ /dev/null
@@ -1,59 +0,0 @@
-package org.apache.cassandra.stress.operations;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.List;
-
-public class CqlRangeSlicer extends CqlOperation<Integer>
-{
- public CqlRangeSlicer(State state, long idx)
- {
- super(state, idx);
- }
-
- @Override
- protected List<Object> getQueryParameters(byte[] key)
- {
- return Collections.<Object>singletonList(ByteBuffer.wrap(key));
- }
-
- @Override
- protected String buildQuery()
- {
- StringBuilder query = new StringBuilder("SELECT FIRST ").append(state.settings.columns.maxColumnsPerKey)
- .append(" ''..'' FROM ").append(wrapInQuotesIfRequired(state.type.table));
-
- if (state.isCql2())
- query.append(" USING CONSISTENCY ").append(state.settings.command.consistencyLevel);
-
- return query.append(" WHERE KEY > ?").toString();
- }
-
- @Override
- protected CqlRunOp<Integer> buildRunOp(ClientWrapper client, String query, Object queryId, List<Object> params, String keyid, ByteBuffer key)
- {
- return new CqlRunOpTestNonEmpty(client, query, queryId, params, keyid, key);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java
deleted file mode 100644
index fb07edc..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java
+++ /dev/null
@@ -1,91 +0,0 @@
-package org.apache.cassandra.stress.operations;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import java.nio.ByteBuffer;
-import java.nio.charset.CharacterCodingException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-public class CqlReader extends CqlOperation<ByteBuffer[][]>
-{
-
- public CqlReader(State state, long idx)
- {
- super(state, idx);
- }
-
- @Override
- protected String buildQuery()
- {
- StringBuilder query = new StringBuilder("SELECT ");
-
- if (state.settings.columns.slice)
- {
- if (state.isCql2())
- query.append("FIRST ").append(state.settings.columns.maxColumnsPerKey).append(" ''..''");
- else
- query.append("*");
- }
- else
- {
- try
- {
- for (int i = 0; i < state.settings.columns.names.size() ; i++)
- {
- if (i > 0)
- query.append(",");
- query.append(wrapInQuotesIfRequired(ByteBufferUtil.string(state.settings.columns.names.get(i))));
- }
- }
- catch (CharacterCodingException e)
- {
- throw new IllegalStateException(e);
- }
- }
-
- query.append(" FROM ").append(wrapInQuotesIfRequired(state.type.table));
-
- if (state.isCql2())
- query.append(" USING CONSISTENCY ").append(state.settings.command.consistencyLevel);
- query.append(" WHERE KEY=?");
- return query.toString();
- }
-
- @Override
- protected List<Object> getQueryParameters(byte[] key)
- {
- return Collections.<Object>singletonList(ByteBuffer.wrap(key));
- }
-
- @Override
- protected CqlRunOp<ByteBuffer[][]> buildRunOp(ClientWrapper client, String query, Object queryId, List<Object> params, String keyid, ByteBuffer key)
- {
- List<ByteBuffer> expectRow = state.rowGen.isDeterministic() ? generateColumnValues(key) : null;
- return new CqlRunOpMatchResults(client, query, queryId, params, keyid, key, Arrays.asList(expectRow));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/operations/FixedOpDistribution.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/FixedOpDistribution.java b/tools/stress/src/org/apache/cassandra/stress/operations/FixedOpDistribution.java
new file mode 100644
index 0000000..914d212
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/FixedOpDistribution.java
@@ -0,0 +1,25 @@
+package org.apache.cassandra.stress.operations;
+
+import org.apache.cassandra.stress.Operation;
+
+public class FixedOpDistribution implements OpDistribution
+{
+
+ final Operation operation;
+
+ public FixedOpDistribution(Operation operation)
+ {
+ this.operation = operation;
+ }
+
+ public Operation next()
+ {
+ return operation;
+ }
+
+ public int maxBatchSize()
+ {
+ return (int) operation.partitionCount.maxValue();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/operations/OpDistribution.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/OpDistribution.java b/tools/stress/src/org/apache/cassandra/stress/operations/OpDistribution.java
new file mode 100644
index 0000000..a744f18
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/OpDistribution.java
@@ -0,0 +1,11 @@
+package org.apache.cassandra.stress.operations;
+
+import org.apache.cassandra.stress.Operation;
+
+public interface OpDistribution
+{
+
+ Operation next();
+ public int maxBatchSize();
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java b/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java
new file mode 100644
index 0000000..08d5f56
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java
@@ -0,0 +1,12 @@
+package org.apache.cassandra.stress.operations;
+
+import org.apache.cassandra.stress.util.Timer;
+
+public interface OpDistributionFactory
+{
+
+ public OpDistribution get(Timer timer);
+ public String desc();
+ Iterable<OpDistributionFactory> each();
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistribution.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistribution.java b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistribution.java
new file mode 100644
index 0000000..8bd2806
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistribution.java
@@ -0,0 +1,41 @@
+package org.apache.cassandra.stress.operations;
+
+import org.apache.commons.math3.distribution.EnumeratedDistribution;
+import org.apache.commons.math3.util.Pair;
+
+import org.apache.cassandra.stress.Operation;
+import org.apache.cassandra.stress.generate.Distribution;
+
+public class SampledOpDistribution implements OpDistribution
+{
+
+ final EnumeratedDistribution<Operation> operations;
+ final Distribution clustering;
+ private Operation cur;
+ private long remaining;
+
+ public SampledOpDistribution(EnumeratedDistribution<Operation> operations, Distribution clustering)
+ {
+ this.operations = operations;
+ this.clustering = clustering;
+ }
+
+ public int maxBatchSize()
+ {
+ int max = 1;
+ for (Pair<Operation, Double> pair : operations.getPmf())
+ max = Math.max(max, (int) pair.getFirst().partitionCount.maxValue());
+ return max;
+ }
+
+ public Operation next()
+ {
+ while (remaining == 0)
+ {
+ remaining = clustering.next();
+ cur = operations.sample();
+ }
+ remaining--;
+ return cur;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
new file mode 100644
index 0000000..575da12
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
@@ -0,0 +1,72 @@
+package org.apache.cassandra.stress.operations;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.math3.distribution.EnumeratedDistribution;
+import org.apache.commons.math3.util.Pair;
+
+import org.apache.cassandra.stress.Operation;
+import org.apache.cassandra.stress.generate.DistributionFactory;
+import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.util.Timer;
+
+public abstract class SampledOpDistributionFactory<T> implements OpDistributionFactory
+{
+
+ final List<Pair<T, Double>> ratios;
+ final DistributionFactory clustering;
+ protected SampledOpDistributionFactory(List<Pair<T, Double>> ratios, DistributionFactory clustering)
+ {
+ this.ratios = ratios;
+ this.clustering = clustering;
+ }
+
+ protected abstract Operation get(Timer timer, PartitionGenerator generator, T key);
+ protected abstract PartitionGenerator newGenerator();
+
+ public OpDistribution get(Timer timer)
+ {
+ PartitionGenerator generator = newGenerator();
+ List<Pair<Operation, Double>> operations = new ArrayList<>();
+ for (Pair<T, Double> ratio : ratios)
+ operations.add(new Pair<>(get(timer, generator, ratio.getFirst()), ratio.getSecond()));
+ return new SampledOpDistribution(new EnumeratedDistribution<>(operations), clustering.get());
+ }
+
+ public String desc()
+ {
+ List<T> keys = new ArrayList<>();
+ for (Pair<T, Double> p : ratios)
+ keys.add(p.getFirst());
+ return keys.toString();
+ }
+
+ public Iterable<OpDistributionFactory> each()
+ {
+ List<OpDistributionFactory> out = new ArrayList<>();
+ for (final Pair<T, Double> ratio : ratios)
+ {
+ out.add(new OpDistributionFactory()
+ {
+ public OpDistribution get(Timer timer)
+ {
+ return new FixedOpDistribution(SampledOpDistributionFactory.this.get(timer, newGenerator(), ratio.getFirst()));
+ }
+
+ public String desc()
+ {
+ return ratio.getFirst().toString();
+ }
+
+ public Iterable<OpDistributionFactory> each()
+ {
+ return Collections.<OpDistributionFactory>singleton(this);
+ }
+ });
+ }
+ return out;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/operations/ThriftCounterAdder.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftCounterAdder.java b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftCounterAdder.java
deleted file mode 100644
index 9bfe440..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftCounterAdder.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.stress.operations;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ThreadLocalRandom;
-
-import org.apache.cassandra.stress.Operation;
-import org.apache.cassandra.stress.util.ThriftClient;
-import org.apache.cassandra.thrift.*;
-
-public class ThriftCounterAdder extends Operation
-{
- public ThriftCounterAdder(State state, long index)
- {
- super(state, index);
- }
-
- public void run(final ThriftClient client) throws IOException
- {
- List<CounterColumn> columns = new ArrayList<>();
- for (ByteBuffer name : randomNames())
- columns.add(new CounterColumn(name, state.counteradd.next()));
-
- Map<String, List<Mutation>> row;
- if (state.settings.columns.useSuperColumns)
- {
- List<Mutation> mutations = new ArrayList<>();
- for (ColumnParent parent : state.columnParents)
- {
- CounterSuperColumn csc = new CounterSuperColumn(ByteBuffer.wrap(parent.getSuper_column()), columns);
- ColumnOrSuperColumn cosc = new ColumnOrSuperColumn().setCounter_super_column(csc);
- mutations.add(new Mutation().setColumn_or_supercolumn(cosc));
- }
- row = Collections.singletonMap(state.type.supertable, mutations);
- }
- else
- {
- List<Mutation> mutations = new ArrayList<>(columns.size());
- for (CounterColumn c : columns)
- {
- ColumnOrSuperColumn cosc = new ColumnOrSuperColumn().setCounter_column(c);
- mutations.add(new Mutation().setColumn_or_supercolumn(cosc));
- }
- row = Collections.singletonMap(state.type.table, mutations);
- }
-
- final ByteBuffer key = getKey();
- final Map<ByteBuffer, Map<String, List<Mutation>>> record = Collections.singletonMap(key, row);
-
- timeWithRetry(new RunOp()
- {
- @Override
- public boolean run() throws Exception
- {
- client.batch_mutate(record, state.settings.command.consistencyLevel);
- return true;
- }
-
- @Override
- public String key()
- {
- return new String(key.array());
- }
-
- @Override
- public int keyCount()
- {
- return 1;
- }
- });
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/operations/ThriftCounterGetter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftCounterGetter.java b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftCounterGetter.java
deleted file mode 100644
index 6e36a28..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftCounterGetter.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.stress.operations;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import org.apache.cassandra.stress.Operation;
-import org.apache.cassandra.stress.util.ThriftClient;
-import org.apache.cassandra.thrift.ColumnParent;
-import org.apache.cassandra.thrift.SlicePredicate;
-import org.apache.cassandra.thrift.SliceRange;
-
-public class ThriftCounterGetter extends Operation
-{
- public ThriftCounterGetter(State state, long index)
- {
- super(state, index);
- }
-
- public void run(final ThriftClient client) throws IOException
- {
- final SlicePredicate predicate = slicePredicate();
- final ByteBuffer key = getKey();
- for (final ColumnParent parent : state.columnParents)
- {
-
- timeWithRetry(new RunOp()
- {
- @Override
- public boolean run() throws Exception
- {
- List<?> r = client.get_slice(key, parent, predicate, state.settings.command.consistencyLevel);
- return r != null && r.size() > 0;
- }
-
- @Override
- public String key()
- {
- return new String(key.array());
- }
-
- @Override
- public int keyCount()
- {
- return 1;
- }
- });
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/operations/ThriftIndexedRangeSlicer.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftIndexedRangeSlicer.java b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftIndexedRangeSlicer.java
deleted file mode 100644
index 8c8ec31..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftIndexedRangeSlicer.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.cassandra.stress.operations;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.cassandra.stress.Operation;
-import org.apache.cassandra.stress.util.ThriftClient;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
-
-public class ThriftIndexedRangeSlicer extends Operation
-{
- public ThriftIndexedRangeSlicer(State state, long index)
- {
- super(state, index);
- if (!state.rowGen.isDeterministic() || !state.keyGen.isDeterministic())
- throw new IllegalStateException("Only run with a isDeterministic row/key generator");
- if (state.settings.columns.useSuperColumns || state.columnParents.size() != 1)
- throw new IllegalStateException("Does not support super columns");
- if (state.settings.columns.useTimeUUIDComparator)
- throw new IllegalStateException("Does not support TimeUUID column names");
- }
-
- public void run(final ThriftClient client) throws IOException
- {
-
- final SlicePredicate predicate = new SlicePredicate()
- .setSlice_range(new SliceRange(ByteBufferUtil.EMPTY_BYTE_BUFFER,
- ByteBufferUtil.EMPTY_BYTE_BUFFER,
- false, state.settings.columns.maxColumnsPerKey));
- final List<ByteBuffer> columns = generateColumnValues(getKey());
- final ColumnParent parent = state.columnParents.get(0);
-
- final ByteBuffer columnName = state.settings.columns.names.get(1);
- final ByteBuffer value = columns.get(1); // only C1 column is indexed
-
- IndexExpression expression = new IndexExpression(columnName, IndexOperator.EQ, value);
- byte[] minKey = new byte[0];
- final List<KeySlice>[] results = new List[1];
- do
- {
-
- final boolean first = minKey.length == 0;
- final IndexClause clause = new IndexClause(Arrays.asList(expression),
- ByteBuffer.wrap(minKey),
- state.settings.command.keysAtOnce);
-
- timeWithRetry(new RunOp()
- {
- @Override
- public boolean run() throws Exception
- {
- results[0] = client.get_indexed_slices(parent, clause, predicate, state.settings.command.consistencyLevel);
- return !first || results[0].size() > 0;
- }
-
- @Override
- public String key()
- {
- return new String(value.array());
- }
-
- @Override
- public int keyCount()
- {
- return results[0].size();
- }
- });
-
- minKey = getNextMinKey(minKey, results[0]);
-
- } while (results[0].size() > 0);
- }
-
- /**
- * Get maximum key from keySlice list
- * @param slices list of the KeySlice objects
- * @return maximum key value of the list
- */
- private static byte[] getNextMinKey(byte[] cur, List<KeySlice> slices)
- {
- // find max
- for (KeySlice slice : slices)
- if (FBUtilities.compareUnsigned(cur, slice.getKey()) < 0)
- cur = slice.getKey();
-
- // increment
- for (int i = 0 ; i < cur.length ; i++)
- if (++cur[i] != 0)
- break;
- return cur;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/operations/ThriftInserter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftInserter.java b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftInserter.java
deleted file mode 100644
index 7077a95..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftInserter.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.stress.operations;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.cassandra.db.marshal.TimeUUIDType;
-import org.apache.cassandra.stress.Operation;
-import org.apache.cassandra.stress.util.ThriftClient;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.UUIDGen;
-
-public final class ThriftInserter extends Operation
-{
-
- public ThriftInserter(State state, long index)
- {
- super(state, index);
- }
-
- public void run(final ThriftClient client) throws IOException
- {
- final ByteBuffer key = getKey();
- final List<Column> columns = generateColumns(key);
-
- Map<String, List<Mutation>> row;
- if (!state.settings.columns.useSuperColumns)
- {
- List<Mutation> mutations = new ArrayList<>(columns.size());
- for (Column c : columns)
- {
- ColumnOrSuperColumn column = new ColumnOrSuperColumn().setColumn(c);
- mutations.add(new Mutation().setColumn_or_supercolumn(column));
- }
- row = Collections.singletonMap(state.type.table, mutations);
- }
- else
- {
- List<Mutation> mutations = new ArrayList<>(state.columnParents.size());
- for (ColumnParent parent : state.columnParents)
- {
- final SuperColumn s = new SuperColumn(parent.bufferForSuper_column(), columns);
- final ColumnOrSuperColumn cosc = new ColumnOrSuperColumn().setSuper_column(s);
- mutations.add(new Mutation().setColumn_or_supercolumn(cosc));
- }
- row = Collections.singletonMap(state.settings.command.type.supertable, mutations);
- }
-
- final Map<ByteBuffer, Map<String, List<Mutation>>> record = Collections.singletonMap(key, row);
-
- timeWithRetry(new RunOp()
- {
- @Override
- public boolean run() throws Exception
- {
- client.batch_mutate(record, state.settings.command.consistencyLevel);
- return true;
- }
-
- @Override
- public String key()
- {
- return new String(key.array());
- }
-
- @Override
- public int keyCount()
- {
- return 1;
- }
- });
- }
-
- protected List<Column> generateColumns(ByteBuffer key)
- {
- final List<ByteBuffer> values = generateColumnValues(key);
- final List<Column> columns = new ArrayList<>(values.size());
-
- if (state.settings.columns.useTimeUUIDComparator)
- for (int i = 0 ; i < values.size() ; i++)
- new Column(TimeUUIDType.instance.decompose(UUIDGen.getTimeUUID()));
- else
- // TODO : consider randomly allocating column names in case where have fewer than max columns
- // but need to think about implications for indexes / indexed range slicer / other knock on effects
- for (int i = 0 ; i < values.size() ; i++)
- columns.add(new Column(state.settings.columns.names.get(i)));
-
- for (int i = 0 ; i < values.size() ; i++)
- columns.get(i)
- .setValue(values.get(i))
- .setTimestamp(FBUtilities.timestampMicros());
-
- return columns;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75364296/tools/stress/src/org/apache/cassandra/stress/operations/ThriftMultiGetter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftMultiGetter.java b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftMultiGetter.java
deleted file mode 100644
index d8e0117..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftMultiGetter.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.stress.operations;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import org.apache.cassandra.stress.Operation;
-import org.apache.cassandra.stress.util.ThriftClient;
-import org.apache.cassandra.thrift.ColumnParent;
-import org.apache.cassandra.thrift.SlicePredicate;
-import org.apache.cassandra.thrift.SliceRange;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-
-public final class ThriftMultiGetter extends Operation
-{
-
- public ThriftMultiGetter(State state, long index)
- {
- super(state, index);
- }
-
- public void run(final ThriftClient client) throws IOException
- {
-
- final SlicePredicate predicate = new SlicePredicate().setSlice_range(
- new SliceRange(
- ByteBufferUtil.EMPTY_BYTE_BUFFER,
- ByteBufferUtil.EMPTY_BYTE_BUFFER,
- false,
- state.settings.columns.maxColumnsPerKey
- )
- );
-
- final List<ByteBuffer> keys = getKeys(state.settings.command.keysAtOnce);
-
- for (final ColumnParent parent : state.columnParents)
- {
- timeWithRetry(new RunOp()
- {
- int count;
- @Override
- public boolean run() throws Exception
- {
- return (count = client.multiget_slice(keys, parent, predicate, state.settings.command.consistencyLevel).size()) != 0;
- }
-
- @Override
- public String key()
- {
- return keys.toString();
- }
-
- @Override
- public int keyCount()
- {
- return count;
- }
- });
- }
- }
-
-}