You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/06/28 12:40:29 UTC
[1/6] cassandra git commit: backport burn test refactor
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 8a56868bc -> bd4a9d18e
refs/heads/cassandra-2.2 14d7a63b8 -> 02a7c3429
refs/heads/trunk 6739434c6 -> 3671082b0
backport burn test refactor
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bd4a9d18
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bd4a9d18
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bd4a9d18
Branch: refs/heads/cassandra-2.1
Commit: bd4a9d18e1317dcb8542bd4adc5a9f99b108d6c6
Parents: 8a56868
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Sun Jun 28 11:38:22 2015 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Sun Jun 28 11:38:22 2015 +0100
----------------------------------------------------------------------
build.xml | 7 +
.../cassandra/concurrent/LongOpOrderTest.java | 240 +++++++++
.../concurrent/LongSharedExecutorPoolTest.java | 226 +++++++++
.../apache/cassandra/utils/LongBTreeTest.java | 502 +++++++++++++++++++
.../cassandra/concurrent/LongOpOrderTest.java | 240 ---------
.../concurrent/LongSharedExecutorPoolTest.java | 228 ---------
.../apache/cassandra/utils/LongBTreeTest.java | 401 ---------------
7 files changed, 975 insertions(+), 869 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd4a9d18/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 73e76e5..18ad49f 100644
--- a/build.xml
+++ b/build.xml
@@ -93,6 +93,7 @@
<property name="test.timeout" value="60000" />
<property name="test.long.timeout" value="600000" />
+ <property name="test.burn.timeout" value="600000" />
<!-- default for cql tests. Can be override by -Dcassandra.test.use_prepared=false -->
<property name="cassandra.test.use_prepared" value="true" />
@@ -1258,6 +1259,12 @@
</testmacro>
</target>
+ <target name="test-burn" depends="build-test" description="Execute functional tests">
+ <testmacro suitename="burn" inputdir="${test.burn.src}"
+ timeout="${test.burn.timeout}">
+ </testmacro>
+ </target>
+
<target name="long-test" depends="build-test" description="Execute functional tests">
<testmacro suitename="long" inputdir="${test.long.src}"
timeout="${test.long.timeout}">
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd4a9d18/test/burn/org/apache/cassandra/concurrent/LongOpOrderTest.java
----------------------------------------------------------------------
diff --git a/test/burn/org/apache/cassandra/concurrent/LongOpOrderTest.java b/test/burn/org/apache/cassandra/concurrent/LongOpOrderTest.java
new file mode 100644
index 0000000..d7105df
--- /dev/null
+++ b/test/burn/org/apache/cassandra/concurrent/LongOpOrderTest.java
@@ -0,0 +1,240 @@
+package org.apache.cassandra.concurrent;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.utils.concurrent.OpOrder;
+
+import static org.junit.Assert.assertTrue;
+
+// TODO: we don't currently test SAFE functionality at all!
+// TODO: should also test markBlocking and SyncOrdered
+public class LongOpOrderTest
+{
+
+ private static final Logger logger = LoggerFactory.getLogger(LongOpOrderTest.class);
+
+ static final int CONSUMERS = 4;
+ static final int PRODUCERS = 32;
+
+ static final long RUNTIME = TimeUnit.MINUTES.toMillis(5);
+ static final long REPORT_INTERVAL = TimeUnit.MINUTES.toMillis(1);
+
+ static final Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler()
+ {
+ @Override
+ public void uncaughtException(Thread t, Throwable e)
+ {
+ System.err.println(t.getName() + ": " + e.getMessage());
+ e.printStackTrace();
+ }
+ };
+
+ final OpOrder order = new OpOrder();
+ final AtomicInteger errors = new AtomicInteger();
+
+ class TestOrdering implements Runnable
+ {
+
+ final int[] waitNanos = new int[1 << 16];
+ volatile State state = new State();
+ final ScheduledExecutorService sched;
+
+ TestOrdering(ExecutorService exec, ScheduledExecutorService sched)
+ {
+ this.sched = sched;
+ final ThreadLocalRandom rnd = ThreadLocalRandom.current();
+ for (int i = 0 ; i < waitNanos.length ; i++)
+ waitNanos[i] = rnd.nextInt(5000);
+ for (int i = 0 ; i < PRODUCERS / CONSUMERS ; i++)
+ exec.execute(new Producer());
+ exec.execute(this);
+ }
+
+ @Override
+ public void run()
+ {
+ final long until = System.currentTimeMillis() + RUNTIME;
+ long lastReport = System.currentTimeMillis();
+ long count = 0;
+ long opCount = 0;
+ while (true)
+ {
+ long now = System.currentTimeMillis();
+ if (now > until)
+ break;
+ if (now > lastReport + REPORT_INTERVAL)
+ {
+ lastReport = now;
+ logger.info(String.format("%s: Executed %d barriers with %d operations. %.0f%% complete.",
+ Thread.currentThread().getName(), count, opCount, 100 * (1 - ((until - now) / (double) RUNTIME))));
+ }
+ try
+ {
+ Thread.sleep(0, waitNanos[((int) (count & (waitNanos.length - 1)))]);
+ } catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+
+ final State s = state;
+ s.barrier = order.newBarrier();
+ s.replacement = new State();
+ s.barrier.issue();
+ s.barrier.await();
+ s.check();
+ opCount += s.totalCount();
+ state = s.replacement;
+ sched.schedule(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ s.check();
+ }
+ }, 1, TimeUnit.SECONDS);
+ count++;
+ }
+ }
+
+ class State
+ {
+
+ volatile OpOrder.Barrier barrier;
+ volatile State replacement;
+ final NonBlockingHashMap<OpOrder.Group, AtomicInteger> count = new NonBlockingHashMap<>();
+ int checkCount = -1;
+
+ boolean accept(OpOrder.Group opGroup)
+ {
+ if (barrier != null && !barrier.isAfter(opGroup))
+ return false;
+ AtomicInteger c;
+ if (null == (c = count.get(opGroup)))
+ {
+ count.putIfAbsent(opGroup, new AtomicInteger());
+ c = count.get(opGroup);
+ }
+ c.incrementAndGet();
+ return true;
+ }
+
+ int totalCount()
+ {
+ int c = 0;
+ for (AtomicInteger v : count.values())
+ c += v.intValue();
+ return c;
+ }
+
+ void check()
+ {
+ boolean delete;
+ if (checkCount >= 0)
+ {
+ if (checkCount != totalCount())
+ {
+ errors.incrementAndGet();
+ logger.error("Received size changed after barrier finished: {} vs {}", checkCount, totalCount());
+ }
+ delete = true;
+ }
+ else
+ {
+ checkCount = totalCount();
+ delete = false;
+ }
+ for (Map.Entry<OpOrder.Group, AtomicInteger> e : count.entrySet())
+ {
+ if (e.getKey().compareTo(barrier.getSyncPoint()) > 0)
+ {
+ errors.incrementAndGet();
+ logger.error("Received an operation that was created after the barrier was issued.");
+ }
+ if (TestOrdering.this.count.get(e.getKey()).intValue() != e.getValue().intValue())
+ {
+ errors.incrementAndGet();
+ logger.error("Missing registered operations. {} vs {}", TestOrdering.this.count.get(e.getKey()).intValue(), e.getValue().intValue());
+ }
+ if (delete)
+ TestOrdering.this.count.remove(e.getKey());
+ }
+ }
+
+ }
+
+ final NonBlockingHashMap<OpOrder.Group, AtomicInteger> count = new NonBlockingHashMap<>();
+
+ class Producer implements Runnable
+ {
+ public void run()
+ {
+ while (true)
+ {
+ AtomicInteger c;
+ try (OpOrder.Group opGroup = order.start())
+ {
+ if (null == (c = count.get(opGroup)))
+ {
+ count.putIfAbsent(opGroup, new AtomicInteger());
+ c = count.get(opGroup);
+ }
+ c.incrementAndGet();
+ State s = state;
+ while (!s.accept(opGroup))
+ s = s.replacement;
+ }
+ }
+ }
+ }
+
+ }
+
+ @Test
+ public void testOrdering() throws InterruptedException
+ {
+ errors.set(0);
+ Thread.setDefaultUncaughtExceptionHandler(handler);
+ final ExecutorService exec = Executors.newCachedThreadPool(new NamedThreadFactory("checker"));
+ final ScheduledExecutorService checker = Executors.newScheduledThreadPool(1, new NamedThreadFactory("checker"));
+ for (int i = 0 ; i < CONSUMERS ; i++)
+ new TestOrdering(exec, checker);
+ exec.shutdown();
+ exec.awaitTermination((long) (RUNTIME * 1.1), TimeUnit.MILLISECONDS);
+ assertTrue(exec.isShutdown());
+ assertTrue(errors.get() == 0);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd4a9d18/test/burn/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java
----------------------------------------------------------------------
diff --git a/test/burn/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java b/test/burn/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java
new file mode 100644
index 0000000..fe464c7
--- /dev/null
+++ b/test/burn/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.concurrent;
+
+import java.util.BitSet;
+import java.util.TreeSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.LockSupport;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.commons.math3.distribution.WeibullDistribution;
+import org.junit.Test;
+
+public class LongSharedExecutorPoolTest
+{
+
+ private static final class WaitTask implements Runnable
+ {
+ final long nanos;
+
+ private WaitTask(long nanos)
+ {
+ this.nanos = nanos;
+ }
+
+ public void run()
+ {
+ LockSupport.parkNanos(nanos);
+ }
+ }
+
+ private static final class Result implements Comparable<Result>
+ {
+ final Future<?> future;
+ final long forecastedCompletion;
+
+ private Result(Future<?> future, long forecastedCompletion)
+ {
+ this.future = future;
+ this.forecastedCompletion = forecastedCompletion;
+ }
+
+ public int compareTo(Result that)
+ {
+ int c = Long.compare(this.forecastedCompletion, that.forecastedCompletion);
+ if (c != 0)
+ return c;
+ c = Integer.compare(this.hashCode(), that.hashCode());
+ if (c != 0)
+ return c;
+ return Integer.compare(this.future.hashCode(), that.future.hashCode());
+ }
+ }
+
+ private static final class Batch implements Comparable<Batch>
+ {
+ final TreeSet<Result> results;
+ final long timeout;
+ final int executorIndex;
+
+ private Batch(TreeSet<Result> results, long timeout, int executorIndex)
+ {
+ this.results = results;
+ this.timeout = timeout;
+ this.executorIndex = executorIndex;
+ }
+
+ public int compareTo(Batch that)
+ {
+ int c = Long.compare(this.timeout, that.timeout);
+ if (c != 0)
+ return c;
+ c = Integer.compare(this.results.size(), that.results.size());
+ if (c != 0)
+ return c;
+ return Integer.compare(this.hashCode(), that.hashCode());
+ }
+ }
+
+ @Test
+ public void testPromptnessOfExecution() throws InterruptedException, ExecutionException
+ {
+ testPromptnessOfExecution(TimeUnit.MINUTES.toNanos(2L), 0.5f);
+ }
+
+ private void testPromptnessOfExecution(long intervalNanos, float loadIncrement) throws InterruptedException, ExecutionException
+ {
+ final int executorCount = 4;
+ int threadCount = 8;
+ int maxQueued = 1024;
+ final WeibullDistribution workTime = new WeibullDistribution(3, 200000);
+ final long minWorkTime = TimeUnit.MICROSECONDS.toNanos(1);
+ final long maxWorkTime = TimeUnit.MILLISECONDS.toNanos(1);
+
+ final int[] threadCounts = new int[executorCount];
+ final WeibullDistribution[] workCount = new WeibullDistribution[executorCount];
+ final ExecutorService[] executors = new ExecutorService[executorCount];
+ for (int i = 0 ; i < executors.length ; i++)
+ {
+ executors[i] = SharedExecutorPool.SHARED.newExecutor(threadCount, maxQueued, "test" + i, "test" + i);
+ threadCounts[i] = threadCount;
+ workCount[i] = new WeibullDistribution(2, maxQueued);
+ threadCount *= 2;
+ maxQueued *= 2;
+ }
+
+ long runs = 0;
+ long events = 0;
+ final TreeSet<Batch> pending = new TreeSet<>();
+ final BitSet executorsWithWork = new BitSet(executorCount);
+ long until = 0;
+ // basic idea is to go through different levels of load on the executor service; initially is all small batches
+ // (mostly within max queue size) of very short operations, moving to progressively larger batches
+ // (beyond max queued size), and longer operations
+ for (float multiplier = 0f ; multiplier < 2.01f ; )
+ {
+ if (System.nanoTime() > until)
+ {
+ System.out.println(String.format("Completed %.0fK batches with %.1fM events", runs * 0.001f, events * 0.000001f));
+ events = 0;
+ until = System.nanoTime() + intervalNanos;
+ multiplier += loadIncrement;
+ System.out.println(String.format("Running for %ds with load multiplier %.1f", TimeUnit.NANOSECONDS.toSeconds(intervalNanos), multiplier));
+ }
+
+ // wait a random amount of time so we submit new tasks in various stages of
+ long timeout;
+ if (pending.isEmpty()) timeout = 0;
+ else if (Math.random() > 0.98) timeout = Long.MAX_VALUE;
+ else if (pending.size() == executorCount) timeout = pending.first().timeout;
+ else timeout = (long) (Math.random() * pending.last().timeout);
+
+ while (!pending.isEmpty() && timeout > System.nanoTime())
+ {
+ Batch first = pending.first();
+ boolean complete = false;
+ try
+ {
+ for (Result result : first.results.descendingSet())
+ result.future.get(timeout - System.nanoTime(), TimeUnit.NANOSECONDS);
+ complete = true;
+ }
+ catch (TimeoutException e)
+ {
+ }
+ if (!complete && System.nanoTime() > first.timeout)
+ {
+ for (Result result : first.results)
+ if (!result.future.isDone())
+ throw new AssertionError();
+ complete = true;
+ }
+ if (complete)
+ {
+ pending.pollFirst();
+ executorsWithWork.clear(first.executorIndex);
+ }
+ }
+
+ // if we've emptied the executors, give all our threads an opportunity to spin down
+ if (timeout == Long.MAX_VALUE)
+ Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
+
+ // submit a random batch to the first free executor service
+ int executorIndex = executorsWithWork.nextClearBit(0);
+ if (executorIndex >= executorCount)
+ continue;
+ executorsWithWork.set(executorIndex);
+ ExecutorService executor = executors[executorIndex];
+ TreeSet<Result> results = new TreeSet<>();
+ int count = (int) (workCount[executorIndex].sample() * multiplier);
+ long targetTotalElapsed = 0;
+ long start = System.nanoTime();
+ long baseTime;
+ if (Math.random() > 0.5) baseTime = 2 * (long) (workTime.sample() * multiplier);
+ else baseTime = 0;
+ for (int j = 0 ; j < count ; j++)
+ {
+ long time;
+ if (baseTime == 0) time = (long) (workTime.sample() * multiplier);
+ else time = (long) (baseTime * Math.random());
+ if (time < minWorkTime)
+ time = minWorkTime;
+ if (time > maxWorkTime)
+ time = maxWorkTime;
+ targetTotalElapsed += time;
+ Future<?> future = executor.submit(new WaitTask(time));
+ results.add(new Result(future, System.nanoTime() + time));
+ }
+ long end = start + (long) Math.ceil(targetTotalElapsed / (double) threadCounts[executorIndex])
+ + TimeUnit.MILLISECONDS.toNanos(100L);
+ long now = System.nanoTime();
+ if (runs++ > executorCount && now > end)
+ throw new AssertionError();
+ events += results.size();
+ pending.add(new Batch(results, end, executorIndex));
+// System.out.println(String.format("Submitted batch to executor %d with %d items and %d permitted millis", executorIndex, count, TimeUnit.NANOSECONDS.toMillis(end - start)));
+ }
+ }
+
+ public static void main(String[] args) throws InterruptedException, ExecutionException
+ {
+ // do longer test
+ new LongSharedExecutorPoolTest().testPromptnessOfExecution(TimeUnit.MINUTES.toNanos(10L), 0.1f);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd4a9d18/test/burn/org/apache/cassandra/utils/LongBTreeTest.java
----------------------------------------------------------------------
diff --git a/test/burn/org/apache/cassandra/utils/LongBTreeTest.java b/test/burn/org/apache/cassandra/utils/LongBTreeTest.java
new file mode 100644
index 0000000..9641930
--- /dev/null
+++ b/test/burn/org/apache/cassandra/utils/LongBTreeTest.java
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.Random;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableFutureTask;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.utils.btree.BTree;
+import org.apache.cassandra.utils.btree.BTreeSearchIterator;
+import org.apache.cassandra.utils.btree.BTreeSet;
+import org.apache.cassandra.utils.btree.UpdateFunction;
+
+// TODO : should probably lower fan-factor for tests to make them more intensive
+public class LongBTreeTest
+{
+
+ private static final MetricRegistry metrics = new MetricRegistry();
+ private static final Timer BTREE_TIMER = metrics.timer(MetricRegistry.name(BTree.class, "BTREE"));
+ private static final Timer TREE_TIMER = metrics.timer(MetricRegistry.name(BTree.class, "TREE"));
+ private static final ExecutorService MODIFY = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new NamedThreadFactory("MODIFY"));
+ private static final ExecutorService COMPARE = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new NamedThreadFactory("COMPARE"));
+ private static final RandomAbort<Integer> SPORADIC_ABORT = new RandomAbort<>(new Random(), 0.0001f);
+
+ static
+ {
+ System.setProperty("cassandra.btree.fanfactor", "4");
+ }
+
+ @Test
+ public void testOversizedMiddleInsert()
+ {
+ TreeSet<Integer> canon = new TreeSet<>();
+ for (int i = 0 ; i < 10000000 ; i++)
+ canon.add(i);
+ Object[] btree = BTree.build(Arrays.asList(Integer.MIN_VALUE, Integer.MAX_VALUE), ICMP, true, null);
+ btree = BTree.update(btree, ICMP, canon, true);
+ canon.add(Integer.MIN_VALUE);
+ canon.add(Integer.MAX_VALUE);
+ Assert.assertTrue(BTree.isWellFormed(btree, ICMP));
+ testEqual("Oversize", BTree.<Integer>slice(btree, true), canon.iterator());
+ }
+
+ @Test
+ public void testIndividualInsertsSmallOverlappingRange() throws ExecutionException, InterruptedException
+ {
+ testInsertions(10000000, 50, 1, 1, true);
+ }
+
+ @Test
+ public void testBatchesSmallOverlappingRange() throws ExecutionException, InterruptedException
+ {
+ testInsertions(10000000, 50, 1, 5, true);
+ }
+
+ @Test
+ public void testIndividualInsertsMediumSparseRange() throws ExecutionException, InterruptedException
+ {
+ testInsertions(10000000, 500, 10, 1, true);
+ }
+
+ @Test
+ public void testBatchesMediumSparseRange() throws ExecutionException, InterruptedException
+ {
+ testInsertions(10000000, 500, 10, 10, true);
+ }
+
+ @Test
+ public void testLargeBatchesLargeRange() throws ExecutionException, InterruptedException
+ {
+ testInsertions(100000000, 5000, 3, 100, true);
+ }
+
+ @Test
+ public void testSlicingSmallRandomTrees() throws ExecutionException, InterruptedException
+ {
+ testInsertions(10000, 50, 10, 10, false);
+ }
+
+ @Test
+ public void testSearchIterator() throws InterruptedException
+ {
+ int threads = Runtime.getRuntime().availableProcessors();
+ final CountDownLatch latch = new CountDownLatch(threads);
+ final AtomicLong errors = new AtomicLong();
+ final AtomicLong count = new AtomicLong();
+ final int perThreadTrees = 100;
+ final int perTreeSelections = 100;
+ final long totalCount = threads * perThreadTrees * perTreeSelections;
+ for (int t = 0 ; t < threads ; t++)
+ {
+ MODIFY.execute(new Runnable()
+ {
+ public void run()
+ {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ for (int i = 0 ; i < perThreadTrees ; i++)
+ {
+ Object[] tree = randomTree(10000, random);
+ for (int j = 0 ; j < perTreeSelections ; j++)
+ {
+ BTreeSearchIterator<Integer, Integer, Integer> searchIterator = new BTreeSearchIterator<>(tree, ICMP);
+ for (Integer key : randomSelection(tree, random))
+ if (key != searchIterator.next(key))
+ errors.incrementAndGet();
+ searchIterator = new BTreeSearchIterator<Integer, Integer, Integer>(tree, ICMP);
+ for (Integer key : randomMix(tree, random))
+ if (key != searchIterator.next(key))
+ if (BTree.find(tree, ICMP, key) == key)
+ errors.incrementAndGet();
+ count.incrementAndGet();
+ }
+ }
+ latch.countDown();
+ }
+ });
+ }
+ while (latch.getCount() > 0)
+ {
+ latch.await(10L, TimeUnit.SECONDS);
+ System.out.println(String.format("%.0f%% complete %s", 100 * count.get() / (double) totalCount, errors.get() > 0 ? ("Errors: " + errors.get()) : ""));
+ assert errors.get() == 0;
+ }
+ }
+
+ private static void testInsertions(int totalCount, int perTestCount, int testKeyRatio, int modificationBatchSize, boolean quickEquality) throws ExecutionException, InterruptedException
+ {
+ int batchesPerTest = perTestCount / modificationBatchSize;
+ int maximumRunLength = 100;
+ int testKeyRange = perTestCount * testKeyRatio;
+ int tests = totalCount / perTestCount;
+ System.out.println(String.format("Performing %d tests of %d operations, with %.2f max size/key-range ratio in batches of ~%d ops",
+ tests, perTestCount, 1 / (float) testKeyRatio, modificationBatchSize));
+
+ // if we're not doing quick-equality, we can spam with garbage for all the checks we perform, so we'll split the work into smaller chunks
+ int chunkSize = quickEquality ? tests : (int) (100000 / Math.pow(perTestCount, 2));
+ for (int chunk = 0 ; chunk < tests ; chunk += chunkSize)
+ {
+ final List<ListenableFutureTask<List<ListenableFuture<?>>>> outer = new ArrayList<>();
+ for (int i = 0 ; i < chunkSize ; i++)
+ {
+ outer.add(doOneTestInsertions(testKeyRange, maximumRunLength, modificationBatchSize, batchesPerTest, quickEquality));
+ }
+
+ final List<ListenableFuture<?>> inner = new ArrayList<>();
+ int complete = 0;
+ int reportInterval = totalCount / 100;
+ int lastReportAt = 0;
+ for (ListenableFutureTask<List<ListenableFuture<?>>> f : outer)
+ {
+ inner.addAll(f.get());
+ complete += perTestCount;
+ if (complete - lastReportAt >= reportInterval)
+ {
+ System.out.println(String.format("Completed %d of %d operations", (chunk * perTestCount) + complete, totalCount));
+ lastReportAt = complete;
+ }
+ }
+ Futures.allAsList(inner).get();
+ }
+ Snapshot snap = BTREE_TIMER.getSnapshot();
+ System.out.println(String.format("btree : %.2fns, %.2fns, %.2fns", snap.getMedian(), snap.get95thPercentile(), snap.get999thPercentile()));
+ snap = TREE_TIMER.getSnapshot();
+ System.out.println(String.format("snaptree: %.2fns, %.2fns, %.2fns", snap.getMedian(), snap.get95thPercentile(), snap.get999thPercentile()));
+ System.out.println("Done");
+ }
+
+ private static ListenableFutureTask<List<ListenableFuture<?>>> doOneTestInsertions(final int upperBound, final int maxRunLength, final int averageModsPerIteration, final int iterations, final boolean quickEquality)
+ {
+ ListenableFutureTask<List<ListenableFuture<?>>> f = ListenableFutureTask.create(new Callable<List<ListenableFuture<?>>>()
+ {
+ @Override
+ public List<ListenableFuture<?>> call()
+ {
+ final List<ListenableFuture<?>> r = new ArrayList<>();
+ NavigableMap<Integer, Integer> canon = new TreeMap<>();
+ Object[] btree = BTree.empty();
+ final TreeMap<Integer, Integer> buffer = new TreeMap<>();
+ final Random rnd = new Random();
+ for (int i = 0 ; i < iterations ; i++)
+ {
+ buffer.clear();
+ int mods = (averageModsPerIteration >> 1) + 1 + rnd.nextInt(averageModsPerIteration);
+ while (mods > 0)
+ {
+ int v = rnd.nextInt(upperBound);
+ int rc = Math.max(0, Math.min(mods, maxRunLength) - 1);
+ int c = 1 + (rc <= 0 ? 0 : rnd.nextInt(rc));
+ for (int j = 0 ; j < c ; j++)
+ {
+ buffer.put(v, v);
+ v++;
+ }
+ mods -= c;
+ }
+ Timer.Context ctxt;
+ ctxt = TREE_TIMER.time();
+ canon.putAll(buffer);
+ ctxt.stop();
+ ctxt = BTREE_TIMER.time();
+ Object[] next = null;
+ while (next == null)
+ next = BTree.update(btree, ICMP, buffer.keySet(), true, SPORADIC_ABORT);
+ btree = next;
+ ctxt.stop();
+
+ if (!BTree.isWellFormed(btree, ICMP))
+ {
+ System.out.println("ERROR: Not well formed");
+ throw new AssertionError("Not well formed!");
+ }
+ if (quickEquality)
+ testEqual("", BTree.<Integer>slice(btree, true), canon.keySet().iterator());
+ else
+ r.addAll(testAllSlices("RND", btree, new TreeSet<>(canon.keySet())));
+ }
+ return r;
+ }
+ });
+ MODIFY.execute(f);
+ return f;
+ }
+
+ @Test
+ public void testSlicingAllSmallTrees() throws ExecutionException, InterruptedException
+ {
+ Object[] cur = BTree.empty();
+ TreeSet<Integer> canon = new TreeSet<>();
+ // we set FAN_FACTOR to 4, so 128 items is four levels deep, three fully populated
+ for (int i = 0 ; i < 128 ; i++)
+ {
+ String id = String.format("[0..%d)", canon.size());
+ System.out.println("Testing " + id);
+ Futures.allAsList(testAllSlices(id, cur, canon)).get();
+ Object[] next = null;
+ while (next == null)
+ next = BTree.update(cur, ICMP, Arrays.asList(i), true, SPORADIC_ABORT);
+ cur = next;
+ canon.add(i);
+ }
+ }
+
+ static final Comparator<Integer> ICMP = new Comparator<Integer>()
+ {
+ @Override
+ public int compare(Integer o1, Integer o2)
+ {
+ return Integer.compare(o1, o2);
+ }
+ };
+
+ private static List<ListenableFuture<?>> testAllSlices(String id, Object[] btree, NavigableSet<Integer> canon)
+ {
+ List<ListenableFuture<?>> waitFor = new ArrayList<>();
+ testAllSlices(id + " ASC", new BTreeSet<>(btree, ICMP), canon, true, waitFor);
+ testAllSlices(id + " DSC", new BTreeSet<>(btree, ICMP).descendingSet(), canon.descendingSet(), false, waitFor);
+ return waitFor;
+ }
+
+ private static void testAllSlices(String id, NavigableSet<Integer> btree, NavigableSet<Integer> canon, boolean ascending, List<ListenableFuture<?>> results)
+ {
+ testOneSlice(id, btree, canon, results);
+ for (Integer lb : range(canon.size(), Integer.MIN_VALUE, ascending))
+ {
+ // test head/tail sets
+ testOneSlice(String.format("%s->[%d..)", id, lb), btree.headSet(lb, true), canon.headSet(lb, true), results);
+ testOneSlice(String.format("%s->(%d..)", id, lb), btree.headSet(lb, false), canon.headSet(lb, false), results);
+ testOneSlice(String.format("%s->(..%d]", id, lb), btree.tailSet(lb, true), canon.tailSet(lb, true), results);
+ testOneSlice(String.format("%s->(..%d]", id, lb), btree.tailSet(lb, false), canon.tailSet(lb, false), results);
+ for (Integer ub : range(canon.size(), lb, ascending))
+ {
+ // test subsets
+ testOneSlice(String.format("%s->[%d..%d]", id, lb, ub), btree.subSet(lb, true, ub, true), canon.subSet(lb, true, ub, true), results);
+ testOneSlice(String.format("%s->(%d..%d]", id, lb, ub), btree.subSet(lb, false, ub, true), canon.subSet(lb, false, ub, true), results);
+ testOneSlice(String.format("%s->[%d..%d)", id, lb, ub), btree.subSet(lb, true, ub, false), canon.subSet(lb, true, ub, false), results);
+ testOneSlice(String.format("%s->(%d..%d)", id, lb, ub), btree.subSet(lb, false, ub, false), canon.subSet(lb, false, ub, false), results);
+ }
+ }
+ }
+
+ private static void testOneSlice(final String id, final NavigableSet<Integer> test, final NavigableSet<Integer> canon, List<ListenableFuture<?>> results)
+ {
+ ListenableFutureTask<?> f = ListenableFutureTask.create(new Runnable()
+ {
+
+ @Override
+ public void run()
+ {
+ test(id + " Count", test.size(), canon.size());
+ testEqual(id, test.iterator(), canon.iterator());
+ testEqual(id + "->DSCI", test.descendingIterator(), canon.descendingIterator());
+ testEqual(id + "->DSCS", test.descendingSet().iterator(), canon.descendingSet().iterator());
+ testEqual(id + "->DSCS->DSCI", test.descendingSet().descendingIterator(), canon.descendingSet().descendingIterator());
+ }
+ }, null);
+ results.add(f);
+ COMPARE.execute(f);
+ }
+
+ private static void test(String id, int test, int expect)
+ {
+ if (test != expect)
+ {
+ System.out.println(String.format("%s: Expected %d, Got %d", id, expect, test));
+ }
+ }
+
+ private static <V> void testEqual(String id, Iterator<V> btree, Iterator<V> canon)
+ {
+ boolean equal = true;
+ while (btree.hasNext() && canon.hasNext())
+ {
+ Object i = btree.next();
+ Object j = canon.next();
+ if (!i.equals(j))
+ {
+ System.out.println(String.format("%s: Expected %d, Got %d", id, j, i));
+ equal = false;
+ }
+ }
+ while (btree.hasNext())
+ {
+ System.out.println(String.format("%s: Expected <Nil>, Got %d", id, btree.next()));
+ equal = false;
+ }
+ while (canon.hasNext())
+ {
+ System.out.println(String.format("%s: Expected %d, Got Nil", id, canon.next()));
+ equal = false;
+ }
+ if (!equal)
+ throw new AssertionError("Not equal");
+ }
+
+ // should only be called on sets that range from 0->N or N->0
+ private static final Iterable<Integer> range(final int size, final int from, final boolean ascending)
+ {
+ return new Iterable<Integer>()
+ {
+ int cur;
+ int delta;
+ int end;
+ {
+ if (ascending)
+ {
+ end = size + 1;
+ cur = from == Integer.MIN_VALUE ? -1 : from;
+ delta = 1;
+ }
+ else
+ {
+ end = -2;
+ cur = from == Integer.MIN_VALUE ? size : from;
+ delta = -1;
+ }
+ }
+ @Override
+ public Iterator<Integer> iterator()
+ {
+ return new Iterator<Integer>()
+ {
+ @Override
+ public boolean hasNext()
+ {
+ return cur != end;
+ }
+
+ @Override
+ public Integer next()
+ {
+ Integer r = cur;
+ cur += delta;
+ return r;
+ }
+
+ @Override
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ };
+ }
+
+ private static Object[] randomTree(int maxSize, Random random)
+ {
+ TreeSet<Integer> build = new TreeSet<>();
+ int size = random.nextInt(maxSize);
+ for (int i = 0 ; i < size ; i++)
+ {
+ build.add(random.nextInt());
+ }
+ return BTree.build(build, ICMP, true, UpdateFunction.NoOp.<Integer>instance());
+ }
+
+ private static Iterable<Integer> randomSelection(Object[] iter, final Random rnd)
+ {
+ final float proportion = rnd.nextFloat();
+ return Iterables.filter(new BTreeSet<>(iter, ICMP), new Predicate<Integer>()
+ {
+ public boolean apply(Integer integer)
+ {
+ return rnd.nextFloat() < proportion;
+ }
+ });
+ }
+
+ private static Iterable<Integer> randomMix(Object[] iter, final Random rnd)
+ {
+ final float proportion = rnd.nextFloat();
+ return Iterables.transform(new BTreeSet<>(iter, ICMP), new Function<Integer, Integer>()
+ {
+ long last = Integer.MIN_VALUE;
+
+ public Integer apply(Integer v)
+ {
+ long last = this.last;
+ this.last = v;
+ if (rnd.nextFloat() < proportion)
+ return v;
+ return (int)((v - last) / 2);
+ }
+ });
+ }
+
+ private static final class RandomAbort<V> implements UpdateFunction<V>
+ {
+ final Random rnd;
+ final float chance;
+ private RandomAbort(Random rnd, float chance)
+ {
+ this.rnd = rnd;
+ this.chance = chance;
+ }
+
+ public V apply(V replacing, V update)
+ {
+ return update;
+ }
+
+ public boolean abortEarly()
+ {
+ return rnd.nextFloat() < chance;
+ }
+
+ public void allocated(long heapSize)
+ {
+
+ }
+
+ public V apply(V v)
+ {
+ return v;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd4a9d18/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java b/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java
deleted file mode 100644
index d7105df..0000000
--- a/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java
+++ /dev/null
@@ -1,240 +0,0 @@
-package org.apache.cassandra.concurrent;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.utils.concurrent.OpOrder;
-
-import static org.junit.Assert.assertTrue;
-
-// TODO: we don't currently test SAFE functionality at all!
-// TODO: should also test markBlocking and SyncOrdered
-public class LongOpOrderTest
-{
-
- private static final Logger logger = LoggerFactory.getLogger(LongOpOrderTest.class);
-
- static final int CONSUMERS = 4;
- static final int PRODUCERS = 32;
-
- static final long RUNTIME = TimeUnit.MINUTES.toMillis(5);
- static final long REPORT_INTERVAL = TimeUnit.MINUTES.toMillis(1);
-
- static final Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler()
- {
- @Override
- public void uncaughtException(Thread t, Throwable e)
- {
- System.err.println(t.getName() + ": " + e.getMessage());
- e.printStackTrace();
- }
- };
-
- final OpOrder order = new OpOrder();
- final AtomicInteger errors = new AtomicInteger();
-
- class TestOrdering implements Runnable
- {
-
- final int[] waitNanos = new int[1 << 16];
- volatile State state = new State();
- final ScheduledExecutorService sched;
-
- TestOrdering(ExecutorService exec, ScheduledExecutorService sched)
- {
- this.sched = sched;
- final ThreadLocalRandom rnd = ThreadLocalRandom.current();
- for (int i = 0 ; i < waitNanos.length ; i++)
- waitNanos[i] = rnd.nextInt(5000);
- for (int i = 0 ; i < PRODUCERS / CONSUMERS ; i++)
- exec.execute(new Producer());
- exec.execute(this);
- }
-
- @Override
- public void run()
- {
- final long until = System.currentTimeMillis() + RUNTIME;
- long lastReport = System.currentTimeMillis();
- long count = 0;
- long opCount = 0;
- while (true)
- {
- long now = System.currentTimeMillis();
- if (now > until)
- break;
- if (now > lastReport + REPORT_INTERVAL)
- {
- lastReport = now;
- logger.info(String.format("%s: Executed %d barriers with %d operations. %.0f%% complete.",
- Thread.currentThread().getName(), count, opCount, 100 * (1 - ((until - now) / (double) RUNTIME))));
- }
- try
- {
- Thread.sleep(0, waitNanos[((int) (count & (waitNanos.length - 1)))]);
- } catch (InterruptedException e)
- {
- e.printStackTrace();
- }
-
- final State s = state;
- s.barrier = order.newBarrier();
- s.replacement = new State();
- s.barrier.issue();
- s.barrier.await();
- s.check();
- opCount += s.totalCount();
- state = s.replacement;
- sched.schedule(new Runnable()
- {
- @Override
- public void run()
- {
- s.check();
- }
- }, 1, TimeUnit.SECONDS);
- count++;
- }
- }
-
- class State
- {
-
- volatile OpOrder.Barrier barrier;
- volatile State replacement;
- final NonBlockingHashMap<OpOrder.Group, AtomicInteger> count = new NonBlockingHashMap<>();
- int checkCount = -1;
-
- boolean accept(OpOrder.Group opGroup)
- {
- if (barrier != null && !barrier.isAfter(opGroup))
- return false;
- AtomicInteger c;
- if (null == (c = count.get(opGroup)))
- {
- count.putIfAbsent(opGroup, new AtomicInteger());
- c = count.get(opGroup);
- }
- c.incrementAndGet();
- return true;
- }
-
- int totalCount()
- {
- int c = 0;
- for (AtomicInteger v : count.values())
- c += v.intValue();
- return c;
- }
-
- void check()
- {
- boolean delete;
- if (checkCount >= 0)
- {
- if (checkCount != totalCount())
- {
- errors.incrementAndGet();
- logger.error("Received size changed after barrier finished: {} vs {}", checkCount, totalCount());
- }
- delete = true;
- }
- else
- {
- checkCount = totalCount();
- delete = false;
- }
- for (Map.Entry<OpOrder.Group, AtomicInteger> e : count.entrySet())
- {
- if (e.getKey().compareTo(barrier.getSyncPoint()) > 0)
- {
- errors.incrementAndGet();
- logger.error("Received an operation that was created after the barrier was issued.");
- }
- if (TestOrdering.this.count.get(e.getKey()).intValue() != e.getValue().intValue())
- {
- errors.incrementAndGet();
- logger.error("Missing registered operations. {} vs {}", TestOrdering.this.count.get(e.getKey()).intValue(), e.getValue().intValue());
- }
- if (delete)
- TestOrdering.this.count.remove(e.getKey());
- }
- }
-
- }
-
- final NonBlockingHashMap<OpOrder.Group, AtomicInteger> count = new NonBlockingHashMap<>();
-
- class Producer implements Runnable
- {
- public void run()
- {
- while (true)
- {
- AtomicInteger c;
- try (OpOrder.Group opGroup = order.start())
- {
- if (null == (c = count.get(opGroup)))
- {
- count.putIfAbsent(opGroup, new AtomicInteger());
- c = count.get(opGroup);
- }
- c.incrementAndGet();
- State s = state;
- while (!s.accept(opGroup))
- s = s.replacement;
- }
- }
- }
- }
-
- }
-
- @Test
- public void testOrdering() throws InterruptedException
- {
- errors.set(0);
- Thread.setDefaultUncaughtExceptionHandler(handler);
- final ExecutorService exec = Executors.newCachedThreadPool(new NamedThreadFactory("checker"));
- final ScheduledExecutorService checker = Executors.newScheduledThreadPool(1, new NamedThreadFactory("checker"));
- for (int i = 0 ; i < CONSUMERS ; i++)
- new TestOrdering(exec, checker);
- exec.shutdown();
- exec.awaitTermination((long) (RUNTIME * 1.1), TimeUnit.MILLISECONDS);
- assertTrue(exec.isShutdown());
- assertTrue(errors.get() == 0);
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd4a9d18/test/long/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java b/test/long/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java
deleted file mode 100644
index 0fd53bb..0000000
--- a/test/long/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.concurrent;
-
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.List;
-import java.util.TreeSet;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.locks.LockSupport;
-
-import com.google.common.util.concurrent.Uninterruptibles;
-import org.apache.commons.math3.distribution.WeibullDistribution;
-import org.junit.Test;
-
-public class LongSharedExecutorPoolTest
-{
-
- private static final class WaitTask implements Runnable
- {
- final long nanos;
-
- private WaitTask(long nanos)
- {
- this.nanos = nanos;
- }
-
- public void run()
- {
- LockSupport.parkNanos(nanos);
- }
- }
-
- private static final class Result implements Comparable<Result>
- {
- final Future<?> future;
- final long forecastedCompletion;
-
- private Result(Future<?> future, long forecastedCompletion)
- {
- this.future = future;
- this.forecastedCompletion = forecastedCompletion;
- }
-
- public int compareTo(Result that)
- {
- int c = Long.compare(this.forecastedCompletion, that.forecastedCompletion);
- if (c != 0)
- return c;
- c = Integer.compare(this.hashCode(), that.hashCode());
- if (c != 0)
- return c;
- return Integer.compare(this.future.hashCode(), that.future.hashCode());
- }
- }
-
- private static final class Batch implements Comparable<Batch>
- {
- final TreeSet<Result> results;
- final long timeout;
- final int executorIndex;
-
- private Batch(TreeSet<Result> results, long timeout, int executorIndex)
- {
- this.results = results;
- this.timeout = timeout;
- this.executorIndex = executorIndex;
- }
-
- public int compareTo(Batch that)
- {
- int c = Long.compare(this.timeout, that.timeout);
- if (c != 0)
- return c;
- c = Integer.compare(this.results.size(), that.results.size());
- if (c != 0)
- return c;
- return Integer.compare(this.hashCode(), that.hashCode());
- }
- }
-
- @Test
- public void testPromptnessOfExecution() throws InterruptedException, ExecutionException, TimeoutException
- {
- testPromptnessOfExecution(TimeUnit.MINUTES.toNanos(2L), 0.5f);
- }
-
- private void testPromptnessOfExecution(long intervalNanos, float loadIncrement) throws InterruptedException, ExecutionException, TimeoutException
- {
- final int executorCount = 4;
- int threadCount = 8;
- int maxQueued = 1024;
- final WeibullDistribution workTime = new WeibullDistribution(3, 200000);
- final long minWorkTime = TimeUnit.MICROSECONDS.toNanos(1);
- final long maxWorkTime = TimeUnit.MILLISECONDS.toNanos(1);
-
- final int[] threadCounts = new int[executorCount];
- final WeibullDistribution[] workCount = new WeibullDistribution[executorCount];
- final ExecutorService[] executors = new ExecutorService[executorCount];
- for (int i = 0 ; i < executors.length ; i++)
- {
- executors[i] = JMXEnabledSharedExecutorPool.SHARED.newExecutor(threadCount, maxQueued, "test" + i, "test" + i);
- threadCounts[i] = threadCount;
- workCount[i] = new WeibullDistribution(2, maxQueued);
- threadCount *= 2;
- maxQueued *= 2;
- }
-
- long runs = 0;
- long events = 0;
- final TreeSet<Batch> pending = new TreeSet<>();
- final BitSet executorsWithWork = new BitSet(executorCount);
- long until = 0;
- // basic idea is to go through different levels of load on the executor service; initially is all small batches
- // (mostly within max queue size) of very short operations, moving to progressively larger batches
- // (beyond max queued size), and longer operations
- for (float multiplier = 0f ; multiplier < 2.01f ; )
- {
- if (System.nanoTime() > until)
- {
- System.out.println(String.format("Completed %.0fK batches with %.1fM events", runs * 0.001f, events * 0.000001f));
- events = 0;
- until = System.nanoTime() + intervalNanos;
- multiplier += loadIncrement;
- System.out.println(String.format("Running for %ds with load multiplier %.1f", TimeUnit.NANOSECONDS.toSeconds(intervalNanos), multiplier));
- }
-
- // wait a random amount of time so we submit new tasks in various stages of
- long timeout;
- if (pending.isEmpty()) timeout = 0;
- else if (Math.random() > 0.98) timeout = Long.MAX_VALUE;
- else if (pending.size() == executorCount) timeout = pending.first().timeout;
- else timeout = (long) (Math.random() * pending.last().timeout);
-
- while (!pending.isEmpty() && timeout > System.nanoTime())
- {
- Batch first = pending.first();
- boolean complete = false;
- try
- {
- for (Result result : first.results.descendingSet())
- result.future.get(timeout - System.nanoTime(), TimeUnit.NANOSECONDS);
- complete = true;
- }
- catch (TimeoutException e)
- {
- }
- if (!complete && System.nanoTime() > first.timeout)
- {
- for (Result result : first.results)
- if (!result.future.isDone())
- throw new AssertionError();
- complete = true;
- }
- if (complete)
- {
- pending.pollFirst();
- executorsWithWork.clear(first.executorIndex);
- }
- }
-
- // if we've emptied the executors, give all our threads an opportunity to spin down
- if (timeout == Long.MAX_VALUE)
- Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
-
- // submit a random batch to the first free executor service
- int executorIndex = executorsWithWork.nextClearBit(0);
- if (executorIndex >= executorCount)
- continue;
- executorsWithWork.set(executorIndex);
- ExecutorService executor = executors[executorIndex];
- TreeSet<Result> results = new TreeSet<>();
- int count = (int) (workCount[executorIndex].sample() * multiplier);
- long targetTotalElapsed = 0;
- long start = System.nanoTime();
- long baseTime;
- if (Math.random() > 0.5) baseTime = 2 * (long) (workTime.sample() * multiplier);
- else baseTime = 0;
- for (int j = 0 ; j < count ; j++)
- {
- long time;
- if (baseTime == 0) time = (long) (workTime.sample() * multiplier);
- else time = (long) (baseTime * Math.random());
- if (time < minWorkTime)
- time = minWorkTime;
- if (time > maxWorkTime)
- time = maxWorkTime;
- targetTotalElapsed += time;
- Future<?> future = executor.submit(new WaitTask(time));
- results.add(new Result(future, System.nanoTime() + time));
- }
- long end = start + (long) Math.ceil(targetTotalElapsed / (double) threadCounts[executorIndex])
- + TimeUnit.MILLISECONDS.toNanos(100L);
- long now = System.nanoTime();
- if (runs++ > executorCount && now > end)
- throw new AssertionError();
- events += results.size();
- pending.add(new Batch(results, end, executorIndex));
-// System.out.println(String.format("Submitted batch to executor %d with %d items and %d permitted millis", executorIndex, count, TimeUnit.NANOSECONDS.toMillis(end - start)));
- }
- }
-
- public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException
- {
- // do longer test
- new LongSharedExecutorPoolTest().testPromptnessOfExecution(TimeUnit.MINUTES.toNanos(10L), 0.1f);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd4a9d18/test/long/org/apache/cassandra/utils/LongBTreeTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/utils/LongBTreeTest.java b/test/long/org/apache/cassandra/utils/LongBTreeTest.java
deleted file mode 100644
index 76ff2bf..0000000
--- a/test/long/org/apache/cassandra/utils/LongBTreeTest.java
+++ /dev/null
@@ -1,401 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cassandra.utils;
-
-import java.util.*;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import javax.annotation.Nullable;
-
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListenableFutureTask;
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.yammer.metrics.Metrics;
-import com.yammer.metrics.core.Timer;
-import com.yammer.metrics.core.TimerContext;
-import com.yammer.metrics.stats.Snapshot;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.utils.btree.BTree;
-import org.apache.cassandra.utils.btree.BTreeSet;
-import org.apache.cassandra.utils.btree.UpdateFunction;
-
-// TODO : should probably lower fan-factor for tests to make them more intensive
-public class LongBTreeTest
-{
-
- private static final Timer BTREE_TIMER = Metrics.newTimer(BTree.class, "BTREE", TimeUnit.NANOSECONDS, TimeUnit.NANOSECONDS);
- private static final Timer TREE_TIMER = Metrics.newTimer(BTree.class, "TREE", TimeUnit.NANOSECONDS, TimeUnit.NANOSECONDS);
- private static final ExecutorService MODIFY = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new NamedThreadFactory("MODIFY"));
- private static final ExecutorService COMPARE = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new NamedThreadFactory("COMPARE"));
- private static final RandomAbort<Integer> SPORADIC_ABORT = new RandomAbort<>(new Random(), 0.0001f);
-
- static
- {
- System.setProperty("cassandra.btree.fanfactor", "4");
- }
-
- @Test
- public void testOversizedMiddleInsert()
- {
- TreeSet<Integer> canon = new TreeSet<>();
- for (int i = 0 ; i < 10000000 ; i++)
- canon.add(i);
- Object[] btree = BTree.build(Arrays.asList(Integer.MIN_VALUE, Integer.MAX_VALUE), ICMP, true, null);
- btree = BTree.update(btree, ICMP, canon, true);
- canon.add(Integer.MIN_VALUE);
- canon.add(Integer.MAX_VALUE);
- Assert.assertTrue(BTree.isWellFormed(btree, ICMP));
- testEqual("Oversize", BTree.<Integer>slice(btree, true), canon.iterator());
- }
-
- @Test
- public void testIndividualInsertsSmallOverlappingRange() throws ExecutionException, InterruptedException
- {
- testInsertions(10000000, 50, 1, 1, true);
- }
-
- @Test
- public void testBatchesSmallOverlappingRange() throws ExecutionException, InterruptedException
- {
- testInsertions(10000000, 50, 1, 5, true);
- }
-
- @Test
- public void testIndividualInsertsMediumSparseRange() throws ExecutionException, InterruptedException
- {
- testInsertions(10000000, 500, 10, 1, true);
- }
-
- @Test
- public void testBatchesMediumSparseRange() throws ExecutionException, InterruptedException
- {
- testInsertions(10000000, 500, 10, 10, true);
- }
-
- @Test
- public void testLargeBatchesLargeRange() throws ExecutionException, InterruptedException
- {
- testInsertions(100000000, 5000, 3, 100, true);
- }
-
- @Test
- public void testSlicingSmallRandomTrees() throws ExecutionException, InterruptedException
- {
- testInsertions(10000, 50, 10, 10, false);
- }
-
- private static void testInsertions(int totalCount, int perTestCount, int testKeyRatio, int modificationBatchSize, boolean quickEquality) throws ExecutionException, InterruptedException
- {
- int batchesPerTest = perTestCount / modificationBatchSize;
- int maximumRunLength = 100;
- int testKeyRange = perTestCount * testKeyRatio;
- int tests = totalCount / perTestCount;
- System.out.println(String.format("Performing %d tests of %d operations, with %.2f max size/key-range ratio in batches of ~%d ops",
- tests, perTestCount, 1 / (float) testKeyRatio, modificationBatchSize));
-
- // if we're not doing quick-equality, we can spam with garbage for all the checks we perform, so we'll split the work into smaller chunks
- int chunkSize = quickEquality ? tests : (int) (100000 / Math.pow(perTestCount, 2));
- for (int chunk = 0 ; chunk < tests ; chunk += chunkSize)
- {
- final List<ListenableFutureTask<List<ListenableFuture<?>>>> outer = new ArrayList<>();
- for (int i = 0 ; i < chunkSize ; i++)
- {
- outer.add(doOneTestInsertions(testKeyRange, maximumRunLength, modificationBatchSize, batchesPerTest, quickEquality));
- }
-
- final List<ListenableFuture<?>> inner = new ArrayList<>();
- int complete = 0;
- int reportInterval = totalCount / 100;
- int lastReportAt = 0;
- for (ListenableFutureTask<List<ListenableFuture<?>>> f : outer)
- {
- inner.addAll(f.get());
- complete += perTestCount;
- if (complete - lastReportAt >= reportInterval)
- {
- System.out.println(String.format("Completed %d of %d operations", (chunk * perTestCount) + complete, totalCount));
- lastReportAt = complete;
- }
- }
- Futures.allAsList(inner).get();
- }
- Snapshot snap = BTREE_TIMER.getSnapshot();
- System.out.println(String.format("btree : %.2fns, %.2fns, %.2fns", snap.getMedian(), snap.get95thPercentile(), snap.get999thPercentile()));
- snap = TREE_TIMER.getSnapshot();
- System.out.println(String.format("snaptree: %.2fns, %.2fns, %.2fns", snap.getMedian(), snap.get95thPercentile(), snap.get999thPercentile()));
- System.out.println("Done");
- }
-
- private static ListenableFutureTask<List<ListenableFuture<?>>> doOneTestInsertions(final int upperBound, final int maxRunLength, final int averageModsPerIteration, final int iterations, final boolean quickEquality)
- {
- ListenableFutureTask<List<ListenableFuture<?>>> f = ListenableFutureTask.create(new Callable<List<ListenableFuture<?>>>()
- {
- @Override
- public List<ListenableFuture<?>> call()
- {
- final List<ListenableFuture<?>> r = new ArrayList<>();
- NavigableMap<Integer, Integer> canon = new TreeMap<>();
- Object[] btree = BTree.empty();
- final TreeMap<Integer, Integer> buffer = new TreeMap<>();
- final Random rnd = new Random();
- for (int i = 0 ; i < iterations ; i++)
- {
- buffer.clear();
- int mods = (averageModsPerIteration >> 1) + 1 + rnd.nextInt(averageModsPerIteration);
- while (mods > 0)
- {
- int v = rnd.nextInt(upperBound);
- int rc = Math.max(0, Math.min(mods, maxRunLength) - 1);
- int c = 1 + (rc <= 0 ? 0 : rnd.nextInt(rc));
- for (int j = 0 ; j < c ; j++)
- {
- buffer.put(v, v);
- v++;
- }
- mods -= c;
- }
- TimerContext ctxt;
- ctxt = TREE_TIMER.time();
- canon.putAll(buffer);
- ctxt.stop();
- ctxt = BTREE_TIMER.time();
- Object[] next = null;
- while (next == null)
- next = BTree.update(btree, ICMP, buffer.keySet(), true, SPORADIC_ABORT);
- btree = next;
- ctxt.stop();
-
- if (!BTree.isWellFormed(btree, ICMP))
- {
- System.out.println("ERROR: Not well formed");
- throw new AssertionError("Not well formed!");
- }
- if (quickEquality)
- testEqual("", BTree.<Integer>slice(btree, true), canon.keySet().iterator());
- else
- r.addAll(testAllSlices("RND", btree, new TreeSet<>(canon.keySet())));
- }
- return r;
- }
- });
- MODIFY.execute(f);
- return f;
- }
-
- @Test
- public void testSlicingAllSmallTrees() throws ExecutionException, InterruptedException
- {
- Object[] cur = BTree.empty();
- TreeSet<Integer> canon = new TreeSet<>();
- // we set FAN_FACTOR to 4, so 128 items is four levels deep, three fully populated
- for (int i = 0 ; i < 128 ; i++)
- {
- String id = String.format("[0..%d)", canon.size());
- System.out.println("Testing " + id);
- Futures.allAsList(testAllSlices(id, cur, canon)).get();
- Object[] next = null;
- while (next == null)
- next = BTree.update(cur, ICMP, Arrays.asList(i), true, SPORADIC_ABORT);
- cur = next;
- canon.add(i);
- }
- }
-
- static final Comparator<Integer> ICMP = new Comparator<Integer>()
- {
- @Override
- public int compare(Integer o1, Integer o2)
- {
- return Integer.compare(o1, o2);
- }
- };
-
- private static List<ListenableFuture<?>> testAllSlices(String id, Object[] btree, NavigableSet<Integer> canon)
- {
- List<ListenableFuture<?>> waitFor = new ArrayList<>();
- testAllSlices(id + " ASC", new BTreeSet<>(btree, ICMP), canon, true, waitFor);
- testAllSlices(id + " DSC", new BTreeSet<>(btree, ICMP).descendingSet(), canon.descendingSet(), false, waitFor);
- return waitFor;
- }
-
- private static void testAllSlices(String id, NavigableSet<Integer> btree, NavigableSet<Integer> canon, boolean ascending, List<ListenableFuture<?>> results)
- {
- testOneSlice(id, btree, canon, results);
- for (Integer lb : range(canon.size(), Integer.MIN_VALUE, ascending))
- {
- // test head/tail sets
- testOneSlice(String.format("%s->[%d..)", id, lb), btree.headSet(lb, true), canon.headSet(lb, true), results);
- testOneSlice(String.format("%s->(%d..)", id, lb), btree.headSet(lb, false), canon.headSet(lb, false), results);
- testOneSlice(String.format("%s->(..%d]", id, lb), btree.tailSet(lb, true), canon.tailSet(lb, true), results);
- testOneSlice(String.format("%s->(..%d]", id, lb), btree.tailSet(lb, false), canon.tailSet(lb, false), results);
- for (Integer ub : range(canon.size(), lb, ascending))
- {
- // test subsets
- testOneSlice(String.format("%s->[%d..%d]", id, lb, ub), btree.subSet(lb, true, ub, true), canon.subSet(lb, true, ub, true), results);
- testOneSlice(String.format("%s->(%d..%d]", id, lb, ub), btree.subSet(lb, false, ub, true), canon.subSet(lb, false, ub, true), results);
- testOneSlice(String.format("%s->[%d..%d)", id, lb, ub), btree.subSet(lb, true, ub, false), canon.subSet(lb, true, ub, false), results);
- testOneSlice(String.format("%s->(%d..%d)", id, lb, ub), btree.subSet(lb, false, ub, false), canon.subSet(lb, false, ub, false), results);
- }
- }
- }
-
- private static void testOneSlice(final String id, final NavigableSet<Integer> test, final NavigableSet<Integer> canon, List<ListenableFuture<?>> results)
- {
- ListenableFutureTask<?> f = ListenableFutureTask.create(new Runnable()
- {
-
- @Override
- public void run()
- {
- test(id + " Count", test.size(), canon.size());
- testEqual(id, test.iterator(), canon.iterator());
- testEqual(id + "->DSCI", test.descendingIterator(), canon.descendingIterator());
- testEqual(id + "->DSCS", test.descendingSet().iterator(), canon.descendingSet().iterator());
- testEqual(id + "->DSCS->DSCI", test.descendingSet().descendingIterator(), canon.descendingSet().descendingIterator());
- }
- }, null);
- results.add(f);
- COMPARE.execute(f);
- }
-
- private static void test(String id, int test, int expect)
- {
- if (test != expect)
- {
- System.out.println(String.format("%s: Expected %d, Got %d", id, expect, test));
- }
- }
-
- private static <V> void testEqual(String id, Iterator<V> btree, Iterator<V> canon)
- {
- boolean equal = true;
- while (btree.hasNext() && canon.hasNext())
- {
- Object i = btree.next();
- Object j = canon.next();
- if (!i.equals(j))
- {
- System.out.println(String.format("%s: Expected %d, Got %d", id, j, i));
- equal = false;
- }
- }
- while (btree.hasNext())
- {
- System.out.println(String.format("%s: Expected <Nil>, Got %d", id, btree.next()));
- equal = false;
- }
- while (canon.hasNext())
- {
- System.out.println(String.format("%s: Expected %d, Got Nil", id, canon.next()));
- equal = false;
- }
- if (!equal)
- throw new AssertionError("Not equal");
- }
-
- // should only be called on sets that range from 0->N or N->0
- private static final Iterable<Integer> range(final int size, final int from, final boolean ascending)
- {
- return new Iterable<Integer>()
- {
- int cur;
- int delta;
- int end;
- {
- if (ascending)
- {
- end = size + 1;
- cur = from == Integer.MIN_VALUE ? -1 : from;
- delta = 1;
- }
- else
- {
- end = -2;
- cur = from == Integer.MIN_VALUE ? size : from;
- delta = -1;
- }
- }
- @Override
- public Iterator<Integer> iterator()
- {
- return new Iterator<Integer>()
- {
- @Override
- public boolean hasNext()
- {
- return cur != end;
- }
-
- @Override
- public Integer next()
- {
- Integer r = cur;
- cur += delta;
- return r;
- }
-
- @Override
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
- };
- }
- };
- }
-
- private static final class RandomAbort<V> implements UpdateFunction<V>
- {
- final Random rnd;
- final float chance;
- private RandomAbort(Random rnd, float chance)
- {
- this.rnd = rnd;
- this.chance = chance;
- }
-
- public V apply(V replacing, V update)
- {
- return update;
- }
-
- public boolean abortEarly()
- {
- return rnd.nextFloat() < chance;
- }
-
- public void allocated(long heapSize)
- {
-
- }
-
- public V apply(V v)
- {
- return v;
- }
- }
-
-}
[6/6] cassandra git commit: Merge branch 'cassandra-2.2' into trunk
Posted by be...@apache.org.
Merge branch 'cassandra-2.2' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3671082b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3671082b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3671082b
Branch: refs/heads/trunk
Commit: 3671082b037c05979740c9bc5a4ee3a4a4425bf7
Parents: 6739434 02a7c34
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Sun Jun 28 11:40:00 2015 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Sun Jun 28 11:40:00 2015 +0100
----------------------------------------------------------------------
----------------------------------------------------------------------
[3/6] cassandra git commit: backport burn test refactor
Posted by be...@apache.org.
backport burn test refactor
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bd4a9d18
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bd4a9d18
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bd4a9d18
Branch: refs/heads/trunk
Commit: bd4a9d18e1317dcb8542bd4adc5a9f99b108d6c6
Parents: 8a56868
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Sun Jun 28 11:38:22 2015 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Sun Jun 28 11:38:22 2015 +0100
----------------------------------------------------------------------
build.xml | 7 +
.../cassandra/concurrent/LongOpOrderTest.java | 240 +++++++++
.../concurrent/LongSharedExecutorPoolTest.java | 226 +++++++++
.../apache/cassandra/utils/LongBTreeTest.java | 502 +++++++++++++++++++
.../cassandra/concurrent/LongOpOrderTest.java | 240 ---------
.../concurrent/LongSharedExecutorPoolTest.java | 228 ---------
.../apache/cassandra/utils/LongBTreeTest.java | 401 ---------------
7 files changed, 975 insertions(+), 869 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd4a9d18/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 73e76e5..18ad49f 100644
--- a/build.xml
+++ b/build.xml
@@ -93,6 +93,7 @@
<property name="test.timeout" value="60000" />
<property name="test.long.timeout" value="600000" />
+ <property name="test.burn.timeout" value="600000" />
<!-- default for cql tests. Can be override by -Dcassandra.test.use_prepared=false -->
<property name="cassandra.test.use_prepared" value="true" />
@@ -1258,6 +1259,12 @@
</testmacro>
</target>
+ <target name="test-burn" depends="build-test" description="Execute functional tests">
+ <testmacro suitename="burn" inputdir="${test.burn.src}"
+ timeout="${test.burn.timeout}">
+ </testmacro>
+ </target>
+
<target name="long-test" depends="build-test" description="Execute functional tests">
<testmacro suitename="long" inputdir="${test.long.src}"
timeout="${test.long.timeout}">
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd4a9d18/test/burn/org/apache/cassandra/concurrent/LongOpOrderTest.java
----------------------------------------------------------------------
diff --git a/test/burn/org/apache/cassandra/concurrent/LongOpOrderTest.java b/test/burn/org/apache/cassandra/concurrent/LongOpOrderTest.java
new file mode 100644
index 0000000..d7105df
--- /dev/null
+++ b/test/burn/org/apache/cassandra/concurrent/LongOpOrderTest.java
@@ -0,0 +1,240 @@
+package org.apache.cassandra.concurrent;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.utils.concurrent.OpOrder;
+
+import static org.junit.Assert.assertTrue;
+
+// TODO: we don't currently test SAFE functionality at all!
+// TODO: should also test markBlocking and SyncOrdered
+public class LongOpOrderTest
+{
+
+ private static final Logger logger = LoggerFactory.getLogger(LongOpOrderTest.class);
+
+ static final int CONSUMERS = 4;
+ static final int PRODUCERS = 32;
+
+ static final long RUNTIME = TimeUnit.MINUTES.toMillis(5);
+ static final long REPORT_INTERVAL = TimeUnit.MINUTES.toMillis(1);
+
+ static final Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler()
+ {
+ @Override
+ public void uncaughtException(Thread t, Throwable e)
+ {
+ System.err.println(t.getName() + ": " + e.getMessage());
+ e.printStackTrace();
+ }
+ };
+
+ final OpOrder order = new OpOrder();
+ final AtomicInteger errors = new AtomicInteger();
+
+ class TestOrdering implements Runnable
+ {
+
+ final int[] waitNanos = new int[1 << 16];
+ volatile State state = new State();
+ final ScheduledExecutorService sched;
+
+ TestOrdering(ExecutorService exec, ScheduledExecutorService sched)
+ {
+ this.sched = sched;
+ final ThreadLocalRandom rnd = ThreadLocalRandom.current();
+ for (int i = 0 ; i < waitNanos.length ; i++)
+ waitNanos[i] = rnd.nextInt(5000);
+ for (int i = 0 ; i < PRODUCERS / CONSUMERS ; i++)
+ exec.execute(new Producer());
+ exec.execute(this);
+ }
+
+ @Override
+ public void run()
+ {
+ final long until = System.currentTimeMillis() + RUNTIME;
+ long lastReport = System.currentTimeMillis();
+ long count = 0;
+ long opCount = 0;
+ while (true)
+ {
+ long now = System.currentTimeMillis();
+ if (now > until)
+ break;
+ if (now > lastReport + REPORT_INTERVAL)
+ {
+ lastReport = now;
+ logger.info(String.format("%s: Executed %d barriers with %d operations. %.0f%% complete.",
+ Thread.currentThread().getName(), count, opCount, 100 * (1 - ((until - now) / (double) RUNTIME))));
+ }
+ try
+ {
+ Thread.sleep(0, waitNanos[((int) (count & (waitNanos.length - 1)))]);
+ } catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+
+ final State s = state;
+ s.barrier = order.newBarrier();
+ s.replacement = new State();
+ s.barrier.issue();
+ s.barrier.await();
+ s.check();
+ opCount += s.totalCount();
+ state = s.replacement;
+ sched.schedule(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ s.check();
+ }
+ }, 1, TimeUnit.SECONDS);
+ count++;
+ }
+ }
+
+ class State
+ {
+
+ volatile OpOrder.Barrier barrier;
+ volatile State replacement;
+ final NonBlockingHashMap<OpOrder.Group, AtomicInteger> count = new NonBlockingHashMap<>();
+ int checkCount = -1;
+
+ boolean accept(OpOrder.Group opGroup)
+ {
+ if (barrier != null && !barrier.isAfter(opGroup))
+ return false;
+ AtomicInteger c;
+ if (null == (c = count.get(opGroup)))
+ {
+ count.putIfAbsent(opGroup, new AtomicInteger());
+ c = count.get(opGroup);
+ }
+ c.incrementAndGet();
+ return true;
+ }
+
+ int totalCount()
+ {
+ int c = 0;
+ for (AtomicInteger v : count.values())
+ c += v.intValue();
+ return c;
+ }
+
+ void check()
+ {
+ boolean delete;
+ if (checkCount >= 0)
+ {
+ if (checkCount != totalCount())
+ {
+ errors.incrementAndGet();
+ logger.error("Received size changed after barrier finished: {} vs {}", checkCount, totalCount());
+ }
+ delete = true;
+ }
+ else
+ {
+ checkCount = totalCount();
+ delete = false;
+ }
+ for (Map.Entry<OpOrder.Group, AtomicInteger> e : count.entrySet())
+ {
+ if (e.getKey().compareTo(barrier.getSyncPoint()) > 0)
+ {
+ errors.incrementAndGet();
+ logger.error("Received an operation that was created after the barrier was issued.");
+ }
+ if (TestOrdering.this.count.get(e.getKey()).intValue() != e.getValue().intValue())
+ {
+ errors.incrementAndGet();
+ logger.error("Missing registered operations. {} vs {}", TestOrdering.this.count.get(e.getKey()).intValue(), e.getValue().intValue());
+ }
+ if (delete)
+ TestOrdering.this.count.remove(e.getKey());
+ }
+ }
+
+ }
+
+ final NonBlockingHashMap<OpOrder.Group, AtomicInteger> count = new NonBlockingHashMap<>();
+
+ class Producer implements Runnable
+ {
+ public void run()
+ {
+ while (true)
+ {
+ AtomicInteger c;
+ try (OpOrder.Group opGroup = order.start())
+ {
+ if (null == (c = count.get(opGroup)))
+ {
+ count.putIfAbsent(opGroup, new AtomicInteger());
+ c = count.get(opGroup);
+ }
+ c.incrementAndGet();
+ State s = state;
+ while (!s.accept(opGroup))
+ s = s.replacement;
+ }
+ }
+ }
+ }
+
+ }
+
+ @Test
+ public void testOrdering() throws InterruptedException
+ {
+ errors.set(0);
+ Thread.setDefaultUncaughtExceptionHandler(handler);
+ final ExecutorService exec = Executors.newCachedThreadPool(new NamedThreadFactory("checker"));
+ final ScheduledExecutorService checker = Executors.newScheduledThreadPool(1, new NamedThreadFactory("checker"));
+ for (int i = 0 ; i < CONSUMERS ; i++)
+ new TestOrdering(exec, checker);
+ exec.shutdown();
+ exec.awaitTermination((long) (RUNTIME * 1.1), TimeUnit.MILLISECONDS);
+ assertTrue(exec.isShutdown());
+ assertTrue(errors.get() == 0);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd4a9d18/test/burn/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java
----------------------------------------------------------------------
diff --git a/test/burn/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java b/test/burn/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java
new file mode 100644
index 0000000..fe464c7
--- /dev/null
+++ b/test/burn/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.concurrent;
+
+import java.util.BitSet;
+import java.util.TreeSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.LockSupport;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.commons.math3.distribution.WeibullDistribution;
+import org.junit.Test;
+
+public class LongSharedExecutorPoolTest
+{
+
+ private static final class WaitTask implements Runnable
+ {
+ final long nanos;
+
+ private WaitTask(long nanos)
+ {
+ this.nanos = nanos;
+ }
+
+ public void run()
+ {
+ LockSupport.parkNanos(nanos);
+ }
+ }
+
+ private static final class Result implements Comparable<Result>
+ {
+ final Future<?> future;
+ final long forecastedCompletion;
+
+ private Result(Future<?> future, long forecastedCompletion)
+ {
+ this.future = future;
+ this.forecastedCompletion = forecastedCompletion;
+ }
+
+ public int compareTo(Result that)
+ {
+ int c = Long.compare(this.forecastedCompletion, that.forecastedCompletion);
+ if (c != 0)
+ return c;
+ c = Integer.compare(this.hashCode(), that.hashCode());
+ if (c != 0)
+ return c;
+ return Integer.compare(this.future.hashCode(), that.future.hashCode());
+ }
+ }
+
+ private static final class Batch implements Comparable<Batch>
+ {
+ final TreeSet<Result> results;
+ final long timeout;
+ final int executorIndex;
+
+ private Batch(TreeSet<Result> results, long timeout, int executorIndex)
+ {
+ this.results = results;
+ this.timeout = timeout;
+ this.executorIndex = executorIndex;
+ }
+
+ public int compareTo(Batch that)
+ {
+ int c = Long.compare(this.timeout, that.timeout);
+ if (c != 0)
+ return c;
+ c = Integer.compare(this.results.size(), that.results.size());
+ if (c != 0)
+ return c;
+ return Integer.compare(this.hashCode(), that.hashCode());
+ }
+ }
+
+ @Test
+ public void testPromptnessOfExecution() throws InterruptedException, ExecutionException
+ {
+ testPromptnessOfExecution(TimeUnit.MINUTES.toNanos(2L), 0.5f);
+ }
+
+ private void testPromptnessOfExecution(long intervalNanos, float loadIncrement) throws InterruptedException, ExecutionException
+ {
+ final int executorCount = 4;
+ int threadCount = 8;
+ int maxQueued = 1024;
+ final WeibullDistribution workTime = new WeibullDistribution(3, 200000);
+ final long minWorkTime = TimeUnit.MICROSECONDS.toNanos(1);
+ final long maxWorkTime = TimeUnit.MILLISECONDS.toNanos(1);
+
+ final int[] threadCounts = new int[executorCount];
+ final WeibullDistribution[] workCount = new WeibullDistribution[executorCount];
+ final ExecutorService[] executors = new ExecutorService[executorCount];
+ for (int i = 0 ; i < executors.length ; i++)
+ {
+ executors[i] = SharedExecutorPool.SHARED.newExecutor(threadCount, maxQueued, "test" + i, "test" + i);
+ threadCounts[i] = threadCount;
+ workCount[i] = new WeibullDistribution(2, maxQueued);
+ threadCount *= 2;
+ maxQueued *= 2;
+ }
+
+ long runs = 0;
+ long events = 0;
+ final TreeSet<Batch> pending = new TreeSet<>();
+ final BitSet executorsWithWork = new BitSet(executorCount);
+ long until = 0;
+ // basic idea is to go through different levels of load on the executor service; initially is all small batches
+ // (mostly within max queue size) of very short operations, moving to progressively larger batches
+ // (beyond max queued size), and longer operations
+ for (float multiplier = 0f ; multiplier < 2.01f ; )
+ {
+ if (System.nanoTime() > until)
+ {
+ System.out.println(String.format("Completed %.0fK batches with %.1fM events", runs * 0.001f, events * 0.000001f));
+ events = 0;
+ until = System.nanoTime() + intervalNanos;
+ multiplier += loadIncrement;
+ System.out.println(String.format("Running for %ds with load multiplier %.1f", TimeUnit.NANOSECONDS.toSeconds(intervalNanos), multiplier));
+ }
+
+ // wait a random amount of time so we submit new tasks in various stages of
+ long timeout;
+ if (pending.isEmpty()) timeout = 0;
+ else if (Math.random() > 0.98) timeout = Long.MAX_VALUE;
+ else if (pending.size() == executorCount) timeout = pending.first().timeout;
+ else timeout = (long) (Math.random() * pending.last().timeout);
+
+ while (!pending.isEmpty() && timeout > System.nanoTime())
+ {
+ Batch first = pending.first();
+ boolean complete = false;
+ try
+ {
+ for (Result result : first.results.descendingSet())
+ result.future.get(timeout - System.nanoTime(), TimeUnit.NANOSECONDS);
+ complete = true;
+ }
+ catch (TimeoutException e)
+ {
+ }
+ if (!complete && System.nanoTime() > first.timeout)
+ {
+ for (Result result : first.results)
+ if (!result.future.isDone())
+ throw new AssertionError();
+ complete = true;
+ }
+ if (complete)
+ {
+ pending.pollFirst();
+ executorsWithWork.clear(first.executorIndex);
+ }
+ }
+
+ // if we've emptied the executors, give all our threads an opportunity to spin down
+ if (timeout == Long.MAX_VALUE)
+ Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
+
+ // submit a random batch to the first free executor service
+ int executorIndex = executorsWithWork.nextClearBit(0);
+ if (executorIndex >= executorCount)
+ continue;
+ executorsWithWork.set(executorIndex);
+ ExecutorService executor = executors[executorIndex];
+ TreeSet<Result> results = new TreeSet<>();
+ int count = (int) (workCount[executorIndex].sample() * multiplier);
+ long targetTotalElapsed = 0;
+ long start = System.nanoTime();
+ long baseTime;
+ if (Math.random() > 0.5) baseTime = 2 * (long) (workTime.sample() * multiplier);
+ else baseTime = 0;
+ for (int j = 0 ; j < count ; j++)
+ {
+ long time;
+ if (baseTime == 0) time = (long) (workTime.sample() * multiplier);
+ else time = (long) (baseTime * Math.random());
+ if (time < minWorkTime)
+ time = minWorkTime;
+ if (time > maxWorkTime)
+ time = maxWorkTime;
+ targetTotalElapsed += time;
+ Future<?> future = executor.submit(new WaitTask(time));
+ results.add(new Result(future, System.nanoTime() + time));
+ }
+ long end = start + (long) Math.ceil(targetTotalElapsed / (double) threadCounts[executorIndex])
+ + TimeUnit.MILLISECONDS.toNanos(100L);
+ long now = System.nanoTime();
+ if (runs++ > executorCount && now > end)
+ throw new AssertionError();
+ events += results.size();
+ pending.add(new Batch(results, end, executorIndex));
+// System.out.println(String.format("Submitted batch to executor %d with %d items and %d permitted millis", executorIndex, count, TimeUnit.NANOSECONDS.toMillis(end - start)));
+ }
+ }
+
+ public static void main(String[] args) throws InterruptedException, ExecutionException
+ {
+ // do longer test
+ new LongSharedExecutorPoolTest().testPromptnessOfExecution(TimeUnit.MINUTES.toNanos(10L), 0.1f);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd4a9d18/test/burn/org/apache/cassandra/utils/LongBTreeTest.java
----------------------------------------------------------------------
diff --git a/test/burn/org/apache/cassandra/utils/LongBTreeTest.java b/test/burn/org/apache/cassandra/utils/LongBTreeTest.java
new file mode 100644
index 0000000..9641930
--- /dev/null
+++ b/test/burn/org/apache/cassandra/utils/LongBTreeTest.java
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.Random;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableFutureTask;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.utils.btree.BTree;
+import org.apache.cassandra.utils.btree.BTreeSearchIterator;
+import org.apache.cassandra.utils.btree.BTreeSet;
+import org.apache.cassandra.utils.btree.UpdateFunction;
+
+// TODO : should probably lower fan-factor for tests to make them more intensive
+public class LongBTreeTest
+{
+
+ private static final MetricRegistry metrics = new MetricRegistry();
+ private static final Timer BTREE_TIMER = metrics.timer(MetricRegistry.name(BTree.class, "BTREE"));
+ private static final Timer TREE_TIMER = metrics.timer(MetricRegistry.name(BTree.class, "TREE"));
+ private static final ExecutorService MODIFY = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new NamedThreadFactory("MODIFY"));
+ private static final ExecutorService COMPARE = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new NamedThreadFactory("COMPARE"));
+ private static final RandomAbort<Integer> SPORADIC_ABORT = new RandomAbort<>(new Random(), 0.0001f);
+
+ static
+ {
+ System.setProperty("cassandra.btree.fanfactor", "4");
+ }
+
+ @Test
+ public void testOversizedMiddleInsert()
+ {
+ TreeSet<Integer> canon = new TreeSet<>();
+ for (int i = 0 ; i < 10000000 ; i++)
+ canon.add(i);
+ Object[] btree = BTree.build(Arrays.asList(Integer.MIN_VALUE, Integer.MAX_VALUE), ICMP, true, null);
+ btree = BTree.update(btree, ICMP, canon, true);
+ canon.add(Integer.MIN_VALUE);
+ canon.add(Integer.MAX_VALUE);
+ Assert.assertTrue(BTree.isWellFormed(btree, ICMP));
+ testEqual("Oversize", BTree.<Integer>slice(btree, true), canon.iterator());
+ }
+
+ @Test
+ public void testIndividualInsertsSmallOverlappingRange() throws ExecutionException, InterruptedException
+ {
+ testInsertions(10000000, 50, 1, 1, true);
+ }
+
+ @Test
+ public void testBatchesSmallOverlappingRange() throws ExecutionException, InterruptedException
+ {
+ testInsertions(10000000, 50, 1, 5, true);
+ }
+
+ @Test
+ public void testIndividualInsertsMediumSparseRange() throws ExecutionException, InterruptedException
+ {
+ testInsertions(10000000, 500, 10, 1, true);
+ }
+
+ @Test
+ public void testBatchesMediumSparseRange() throws ExecutionException, InterruptedException
+ {
+ testInsertions(10000000, 500, 10, 10, true);
+ }
+
+ @Test
+ public void testLargeBatchesLargeRange() throws ExecutionException, InterruptedException
+ {
+ testInsertions(100000000, 5000, 3, 100, true);
+ }
+
+ @Test
+ public void testSlicingSmallRandomTrees() throws ExecutionException, InterruptedException
+ {
+ testInsertions(10000, 50, 10, 10, false);
+ }
+
+ @Test
+ public void testSearchIterator() throws InterruptedException
+ {
+ int threads = Runtime.getRuntime().availableProcessors();
+ final CountDownLatch latch = new CountDownLatch(threads);
+ final AtomicLong errors = new AtomicLong();
+ final AtomicLong count = new AtomicLong();
+ final int perThreadTrees = 100;
+ final int perTreeSelections = 100;
+ final long totalCount = threads * perThreadTrees * perTreeSelections;
+ for (int t = 0 ; t < threads ; t++)
+ {
+ MODIFY.execute(new Runnable()
+ {
+ public void run()
+ {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ for (int i = 0 ; i < perThreadTrees ; i++)
+ {
+ Object[] tree = randomTree(10000, random);
+ for (int j = 0 ; j < perTreeSelections ; j++)
+ {
+ BTreeSearchIterator<Integer, Integer, Integer> searchIterator = new BTreeSearchIterator<>(tree, ICMP);
+ for (Integer key : randomSelection(tree, random))
+ if (key != searchIterator.next(key))
+ errors.incrementAndGet();
+ searchIterator = new BTreeSearchIterator<Integer, Integer, Integer>(tree, ICMP);
+ for (Integer key : randomMix(tree, random))
+ if (key != searchIterator.next(key))
+ if (BTree.find(tree, ICMP, key) == key)
+ errors.incrementAndGet();
+ count.incrementAndGet();
+ }
+ }
+ latch.countDown();
+ }
+ });
+ }
+ while (latch.getCount() > 0)
+ {
+ latch.await(10L, TimeUnit.SECONDS);
+ System.out.println(String.format("%.0f%% complete %s", 100 * count.get() / (double) totalCount, errors.get() > 0 ? ("Errors: " + errors.get()) : ""));
+ assert errors.get() == 0;
+ }
+ }
+
+ private static void testInsertions(int totalCount, int perTestCount, int testKeyRatio, int modificationBatchSize, boolean quickEquality) throws ExecutionException, InterruptedException
+ {
+ int batchesPerTest = perTestCount / modificationBatchSize;
+ int maximumRunLength = 100;
+ int testKeyRange = perTestCount * testKeyRatio;
+ int tests = totalCount / perTestCount;
+ System.out.println(String.format("Performing %d tests of %d operations, with %.2f max size/key-range ratio in batches of ~%d ops",
+ tests, perTestCount, 1 / (float) testKeyRatio, modificationBatchSize));
+
+ // if we're not doing quick-equality, we can spam with garbage for all the checks we perform, so we'll split the work into smaller chunks
+ int chunkSize = quickEquality ? tests : (int) (100000 / Math.pow(perTestCount, 2));
+ for (int chunk = 0 ; chunk < tests ; chunk += chunkSize)
+ {
+ final List<ListenableFutureTask<List<ListenableFuture<?>>>> outer = new ArrayList<>();
+ for (int i = 0 ; i < chunkSize ; i++)
+ {
+ outer.add(doOneTestInsertions(testKeyRange, maximumRunLength, modificationBatchSize, batchesPerTest, quickEquality));
+ }
+
+ final List<ListenableFuture<?>> inner = new ArrayList<>();
+ int complete = 0;
+ int reportInterval = totalCount / 100;
+ int lastReportAt = 0;
+ for (ListenableFutureTask<List<ListenableFuture<?>>> f : outer)
+ {
+ inner.addAll(f.get());
+ complete += perTestCount;
+ if (complete - lastReportAt >= reportInterval)
+ {
+ System.out.println(String.format("Completed %d of %d operations", (chunk * perTestCount) + complete, totalCount));
+ lastReportAt = complete;
+ }
+ }
+ Futures.allAsList(inner).get();
+ }
+ Snapshot snap = BTREE_TIMER.getSnapshot();
+ System.out.println(String.format("btree : %.2fns, %.2fns, %.2fns", snap.getMedian(), snap.get95thPercentile(), snap.get999thPercentile()));
+ snap = TREE_TIMER.getSnapshot();
+ System.out.println(String.format("snaptree: %.2fns, %.2fns, %.2fns", snap.getMedian(), snap.get95thPercentile(), snap.get999thPercentile()));
+ System.out.println("Done");
+ }
+
+ private static ListenableFutureTask<List<ListenableFuture<?>>> doOneTestInsertions(final int upperBound, final int maxRunLength, final int averageModsPerIteration, final int iterations, final boolean quickEquality)
+ {
+ ListenableFutureTask<List<ListenableFuture<?>>> f = ListenableFutureTask.create(new Callable<List<ListenableFuture<?>>>()
+ {
+ @Override
+ public List<ListenableFuture<?>> call()
+ {
+ final List<ListenableFuture<?>> r = new ArrayList<>();
+ NavigableMap<Integer, Integer> canon = new TreeMap<>();
+ Object[] btree = BTree.empty();
+ final TreeMap<Integer, Integer> buffer = new TreeMap<>();
+ final Random rnd = new Random();
+ for (int i = 0 ; i < iterations ; i++)
+ {
+ buffer.clear();
+ int mods = (averageModsPerIteration >> 1) + 1 + rnd.nextInt(averageModsPerIteration);
+ while (mods > 0)
+ {
+ int v = rnd.nextInt(upperBound);
+ int rc = Math.max(0, Math.min(mods, maxRunLength) - 1);
+ int c = 1 + (rc <= 0 ? 0 : rnd.nextInt(rc));
+ for (int j = 0 ; j < c ; j++)
+ {
+ buffer.put(v, v);
+ v++;
+ }
+ mods -= c;
+ }
+ Timer.Context ctxt;
+ ctxt = TREE_TIMER.time();
+ canon.putAll(buffer);
+ ctxt.stop();
+ ctxt = BTREE_TIMER.time();
+ Object[] next = null;
+ while (next == null)
+ next = BTree.update(btree, ICMP, buffer.keySet(), true, SPORADIC_ABORT);
+ btree = next;
+ ctxt.stop();
+
+ if (!BTree.isWellFormed(btree, ICMP))
+ {
+ System.out.println("ERROR: Not well formed");
+ throw new AssertionError("Not well formed!");
+ }
+ if (quickEquality)
+ testEqual("", BTree.<Integer>slice(btree, true), canon.keySet().iterator());
+ else
+ r.addAll(testAllSlices("RND", btree, new TreeSet<>(canon.keySet())));
+ }
+ return r;
+ }
+ });
+ MODIFY.execute(f);
+ return f;
+ }
+
+ @Test
+ public void testSlicingAllSmallTrees() throws ExecutionException, InterruptedException
+ {
+ Object[] cur = BTree.empty();
+ TreeSet<Integer> canon = new TreeSet<>();
+ // we set FAN_FACTOR to 4, so 128 items is four levels deep, three fully populated
+ for (int i = 0 ; i < 128 ; i++)
+ {
+ String id = String.format("[0..%d)", canon.size());
+ System.out.println("Testing " + id);
+ Futures.allAsList(testAllSlices(id, cur, canon)).get();
+ Object[] next = null;
+ while (next == null)
+ next = BTree.update(cur, ICMP, Arrays.asList(i), true, SPORADIC_ABORT);
+ cur = next;
+ canon.add(i);
+ }
+ }
+
+ static final Comparator<Integer> ICMP = new Comparator<Integer>()
+ {
+ @Override
+ public int compare(Integer o1, Integer o2)
+ {
+ return Integer.compare(o1, o2);
+ }
+ };
+
+ private static List<ListenableFuture<?>> testAllSlices(String id, Object[] btree, NavigableSet<Integer> canon)
+ {
+ List<ListenableFuture<?>> waitFor = new ArrayList<>();
+ testAllSlices(id + " ASC", new BTreeSet<>(btree, ICMP), canon, true, waitFor);
+ testAllSlices(id + " DSC", new BTreeSet<>(btree, ICMP).descendingSet(), canon.descendingSet(), false, waitFor);
+ return waitFor;
+ }
+
+ private static void testAllSlices(String id, NavigableSet<Integer> btree, NavigableSet<Integer> canon, boolean ascending, List<ListenableFuture<?>> results)
+ {
+ testOneSlice(id, btree, canon, results);
+ for (Integer lb : range(canon.size(), Integer.MIN_VALUE, ascending))
+ {
+ // test head/tail sets
+ testOneSlice(String.format("%s->[%d..)", id, lb), btree.headSet(lb, true), canon.headSet(lb, true), results);
+ testOneSlice(String.format("%s->(%d..)", id, lb), btree.headSet(lb, false), canon.headSet(lb, false), results);
+ testOneSlice(String.format("%s->(..%d]", id, lb), btree.tailSet(lb, true), canon.tailSet(lb, true), results);
+ testOneSlice(String.format("%s->(..%d]", id, lb), btree.tailSet(lb, false), canon.tailSet(lb, false), results);
+ for (Integer ub : range(canon.size(), lb, ascending))
+ {
+ // test subsets
+ testOneSlice(String.format("%s->[%d..%d]", id, lb, ub), btree.subSet(lb, true, ub, true), canon.subSet(lb, true, ub, true), results);
+ testOneSlice(String.format("%s->(%d..%d]", id, lb, ub), btree.subSet(lb, false, ub, true), canon.subSet(lb, false, ub, true), results);
+ testOneSlice(String.format("%s->[%d..%d)", id, lb, ub), btree.subSet(lb, true, ub, false), canon.subSet(lb, true, ub, false), results);
+ testOneSlice(String.format("%s->(%d..%d)", id, lb, ub), btree.subSet(lb, false, ub, false), canon.subSet(lb, false, ub, false), results);
+ }
+ }
+ }
+
+ private static void testOneSlice(final String id, final NavigableSet<Integer> test, final NavigableSet<Integer> canon, List<ListenableFuture<?>> results)
+ {
+ ListenableFutureTask<?> f = ListenableFutureTask.create(new Runnable()
+ {
+
+ @Override
+ public void run()
+ {
+ test(id + " Count", test.size(), canon.size());
+ testEqual(id, test.iterator(), canon.iterator());
+ testEqual(id + "->DSCI", test.descendingIterator(), canon.descendingIterator());
+ testEqual(id + "->DSCS", test.descendingSet().iterator(), canon.descendingSet().iterator());
+ testEqual(id + "->DSCS->DSCI", test.descendingSet().descendingIterator(), canon.descendingSet().descendingIterator());
+ }
+ }, null);
+ results.add(f);
+ COMPARE.execute(f);
+ }
+
+ private static void test(String id, int test, int expect)
+ {
+ if (test != expect)
+ {
+ System.out.println(String.format("%s: Expected %d, Got %d", id, expect, test));
+ }
+ }
+
+ private static <V> void testEqual(String id, Iterator<V> btree, Iterator<V> canon)
+ {
+ boolean equal = true;
+ while (btree.hasNext() && canon.hasNext())
+ {
+ Object i = btree.next();
+ Object j = canon.next();
+ if (!i.equals(j))
+ {
+ System.out.println(String.format("%s: Expected %d, Got %d", id, j, i));
+ equal = false;
+ }
+ }
+ while (btree.hasNext())
+ {
+ System.out.println(String.format("%s: Expected <Nil>, Got %d", id, btree.next()));
+ equal = false;
+ }
+ while (canon.hasNext())
+ {
+ System.out.println(String.format("%s: Expected %d, Got Nil", id, canon.next()));
+ equal = false;
+ }
+ if (!equal)
+ throw new AssertionError("Not equal");
+ }
+
+ // should only be called on sets that range from 0->N or N->0
+ private static final Iterable<Integer> range(final int size, final int from, final boolean ascending)
+ {
+ return new Iterable<Integer>()
+ {
+ int cur;
+ int delta;
+ int end;
+ {
+ if (ascending)
+ {
+ end = size + 1;
+ cur = from == Integer.MIN_VALUE ? -1 : from;
+ delta = 1;
+ }
+ else
+ {
+ end = -2;
+ cur = from == Integer.MIN_VALUE ? size : from;
+ delta = -1;
+ }
+ }
+ @Override
+ public Iterator<Integer> iterator()
+ {
+ return new Iterator<Integer>()
+ {
+ @Override
+ public boolean hasNext()
+ {
+ return cur != end;
+ }
+
+ @Override
+ public Integer next()
+ {
+ Integer r = cur;
+ cur += delta;
+ return r;
+ }
+
+ @Override
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ };
+ }
+
+ private static Object[] randomTree(int maxSize, Random random)
+ {
+ TreeSet<Integer> build = new TreeSet<>();
+ int size = random.nextInt(maxSize);
+ for (int i = 0 ; i < size ; i++)
+ {
+ build.add(random.nextInt());
+ }
+ return BTree.build(build, ICMP, true, UpdateFunction.NoOp.<Integer>instance());
+ }
+
+ private static Iterable<Integer> randomSelection(Object[] iter, final Random rnd)
+ {
+ final float proportion = rnd.nextFloat();
+ return Iterables.filter(new BTreeSet<>(iter, ICMP), new Predicate<Integer>()
+ {
+ public boolean apply(Integer integer)
+ {
+ return rnd.nextFloat() < proportion;
+ }
+ });
+ }
+
+ private static Iterable<Integer> randomMix(Object[] iter, final Random rnd)
+ {
+ final float proportion = rnd.nextFloat();
+ return Iterables.transform(new BTreeSet<>(iter, ICMP), new Function<Integer, Integer>()
+ {
+ long last = Integer.MIN_VALUE;
+
+ public Integer apply(Integer v)
+ {
+ long last = this.last;
+ this.last = v;
+ if (rnd.nextFloat() < proportion)
+ return v;
+ return (int)((v - last) / 2);
+ }
+ });
+ }
+
+ private static final class RandomAbort<V> implements UpdateFunction<V>
+ {
+ final Random rnd;
+ final float chance;
+ private RandomAbort(Random rnd, float chance)
+ {
+ this.rnd = rnd;
+ this.chance = chance;
+ }
+
+ public V apply(V replacing, V update)
+ {
+ return update;
+ }
+
+ public boolean abortEarly()
+ {
+ return rnd.nextFloat() < chance;
+ }
+
+ public void allocated(long heapSize)
+ {
+
+ }
+
+ public V apply(V v)
+ {
+ return v;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd4a9d18/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java b/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java
deleted file mode 100644
index d7105df..0000000
--- a/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java
+++ /dev/null
@@ -1,240 +0,0 @@
-package org.apache.cassandra.concurrent;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.utils.concurrent.OpOrder;
-
-import static org.junit.Assert.assertTrue;
-
-// TODO: we don't currently test SAFE functionality at all!
-// TODO: should also test markBlocking and SyncOrdered
-public class LongOpOrderTest
-{
-
- private static final Logger logger = LoggerFactory.getLogger(LongOpOrderTest.class);
-
- static final int CONSUMERS = 4;
- static final int PRODUCERS = 32;
-
- static final long RUNTIME = TimeUnit.MINUTES.toMillis(5);
- static final long REPORT_INTERVAL = TimeUnit.MINUTES.toMillis(1);
-
- static final Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler()
- {
- @Override
- public void uncaughtException(Thread t, Throwable e)
- {
- System.err.println(t.getName() + ": " + e.getMessage());
- e.printStackTrace();
- }
- };
-
- final OpOrder order = new OpOrder();
- final AtomicInteger errors = new AtomicInteger();
-
- class TestOrdering implements Runnable
- {
-
- final int[] waitNanos = new int[1 << 16];
- volatile State state = new State();
- final ScheduledExecutorService sched;
-
- TestOrdering(ExecutorService exec, ScheduledExecutorService sched)
- {
- this.sched = sched;
- final ThreadLocalRandom rnd = ThreadLocalRandom.current();
- for (int i = 0 ; i < waitNanos.length ; i++)
- waitNanos[i] = rnd.nextInt(5000);
- for (int i = 0 ; i < PRODUCERS / CONSUMERS ; i++)
- exec.execute(new Producer());
- exec.execute(this);
- }
-
- @Override
- public void run()
- {
- final long until = System.currentTimeMillis() + RUNTIME;
- long lastReport = System.currentTimeMillis();
- long count = 0;
- long opCount = 0;
- while (true)
- {
- long now = System.currentTimeMillis();
- if (now > until)
- break;
- if (now > lastReport + REPORT_INTERVAL)
- {
- lastReport = now;
- logger.info(String.format("%s: Executed %d barriers with %d operations. %.0f%% complete.",
- Thread.currentThread().getName(), count, opCount, 100 * (1 - ((until - now) / (double) RUNTIME))));
- }
- try
- {
- Thread.sleep(0, waitNanos[((int) (count & (waitNanos.length - 1)))]);
- } catch (InterruptedException e)
- {
- e.printStackTrace();
- }
-
- final State s = state;
- s.barrier = order.newBarrier();
- s.replacement = new State();
- s.barrier.issue();
- s.barrier.await();
- s.check();
- opCount += s.totalCount();
- state = s.replacement;
- sched.schedule(new Runnable()
- {
- @Override
- public void run()
- {
- s.check();
- }
- }, 1, TimeUnit.SECONDS);
- count++;
- }
- }
-
- class State
- {
-
- volatile OpOrder.Barrier barrier;
- volatile State replacement;
- final NonBlockingHashMap<OpOrder.Group, AtomicInteger> count = new NonBlockingHashMap<>();
- int checkCount = -1;
-
- boolean accept(OpOrder.Group opGroup)
- {
- if (barrier != null && !barrier.isAfter(opGroup))
- return false;
- AtomicInteger c;
- if (null == (c = count.get(opGroup)))
- {
- count.putIfAbsent(opGroup, new AtomicInteger());
- c = count.get(opGroup);
- }
- c.incrementAndGet();
- return true;
- }
-
- int totalCount()
- {
- int c = 0;
- for (AtomicInteger v : count.values())
- c += v.intValue();
- return c;
- }
-
- void check()
- {
- boolean delete;
- if (checkCount >= 0)
- {
- if (checkCount != totalCount())
- {
- errors.incrementAndGet();
- logger.error("Received size changed after barrier finished: {} vs {}", checkCount, totalCount());
- }
- delete = true;
- }
- else
- {
- checkCount = totalCount();
- delete = false;
- }
- for (Map.Entry<OpOrder.Group, AtomicInteger> e : count.entrySet())
- {
- if (e.getKey().compareTo(barrier.getSyncPoint()) > 0)
- {
- errors.incrementAndGet();
- logger.error("Received an operation that was created after the barrier was issued.");
- }
- if (TestOrdering.this.count.get(e.getKey()).intValue() != e.getValue().intValue())
- {
- errors.incrementAndGet();
- logger.error("Missing registered operations. {} vs {}", TestOrdering.this.count.get(e.getKey()).intValue(), e.getValue().intValue());
- }
- if (delete)
- TestOrdering.this.count.remove(e.getKey());
- }
- }
-
- }
-
- final NonBlockingHashMap<OpOrder.Group, AtomicInteger> count = new NonBlockingHashMap<>();
-
- class Producer implements Runnable
- {
- public void run()
- {
- while (true)
- {
- AtomicInteger c;
- try (OpOrder.Group opGroup = order.start())
- {
- if (null == (c = count.get(opGroup)))
- {
- count.putIfAbsent(opGroup, new AtomicInteger());
- c = count.get(opGroup);
- }
- c.incrementAndGet();
- State s = state;
- while (!s.accept(opGroup))
- s = s.replacement;
- }
- }
- }
- }
-
- }
-
- @Test
- public void testOrdering() throws InterruptedException
- {
- errors.set(0);
- Thread.setDefaultUncaughtExceptionHandler(handler);
- final ExecutorService exec = Executors.newCachedThreadPool(new NamedThreadFactory("checker"));
- final ScheduledExecutorService checker = Executors.newScheduledThreadPool(1, new NamedThreadFactory("checker"));
- for (int i = 0 ; i < CONSUMERS ; i++)
- new TestOrdering(exec, checker);
- exec.shutdown();
- exec.awaitTermination((long) (RUNTIME * 1.1), TimeUnit.MILLISECONDS);
- assertTrue(exec.isShutdown());
- assertTrue(errors.get() == 0);
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd4a9d18/test/long/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java b/test/long/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java
deleted file mode 100644
index 0fd53bb..0000000
--- a/test/long/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.concurrent;
-
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.List;
-import java.util.TreeSet;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.locks.LockSupport;
-
-import com.google.common.util.concurrent.Uninterruptibles;
-import org.apache.commons.math3.distribution.WeibullDistribution;
-import org.junit.Test;
-
-public class LongSharedExecutorPoolTest
-{
-
- private static final class WaitTask implements Runnable
- {
- final long nanos;
-
- private WaitTask(long nanos)
- {
- this.nanos = nanos;
- }
-
- public void run()
- {
- LockSupport.parkNanos(nanos);
- }
- }
-
- private static final class Result implements Comparable<Result>
- {
- final Future<?> future;
- final long forecastedCompletion;
-
- private Result(Future<?> future, long forecastedCompletion)
- {
- this.future = future;
- this.forecastedCompletion = forecastedCompletion;
- }
-
- public int compareTo(Result that)
- {
- int c = Long.compare(this.forecastedCompletion, that.forecastedCompletion);
- if (c != 0)
- return c;
- c = Integer.compare(this.hashCode(), that.hashCode());
- if (c != 0)
- return c;
- return Integer.compare(this.future.hashCode(), that.future.hashCode());
- }
- }
-
- private static final class Batch implements Comparable<Batch>
- {
- final TreeSet<Result> results;
- final long timeout;
- final int executorIndex;
-
- private Batch(TreeSet<Result> results, long timeout, int executorIndex)
- {
- this.results = results;
- this.timeout = timeout;
- this.executorIndex = executorIndex;
- }
-
- public int compareTo(Batch that)
- {
- int c = Long.compare(this.timeout, that.timeout);
- if (c != 0)
- return c;
- c = Integer.compare(this.results.size(), that.results.size());
- if (c != 0)
- return c;
- return Integer.compare(this.hashCode(), that.hashCode());
- }
- }
-
- @Test
- public void testPromptnessOfExecution() throws InterruptedException, ExecutionException, TimeoutException
- {
- testPromptnessOfExecution(TimeUnit.MINUTES.toNanos(2L), 0.5f);
- }
-
- private void testPromptnessOfExecution(long intervalNanos, float loadIncrement) throws InterruptedException, ExecutionException, TimeoutException
- {
- final int executorCount = 4;
- int threadCount = 8;
- int maxQueued = 1024;
- final WeibullDistribution workTime = new WeibullDistribution(3, 200000);
- final long minWorkTime = TimeUnit.MICROSECONDS.toNanos(1);
- final long maxWorkTime = TimeUnit.MILLISECONDS.toNanos(1);
-
- final int[] threadCounts = new int[executorCount];
- final WeibullDistribution[] workCount = new WeibullDistribution[executorCount];
- final ExecutorService[] executors = new ExecutorService[executorCount];
- for (int i = 0 ; i < executors.length ; i++)
- {
- executors[i] = JMXEnabledSharedExecutorPool.SHARED.newExecutor(threadCount, maxQueued, "test" + i, "test" + i);
- threadCounts[i] = threadCount;
- workCount[i] = new WeibullDistribution(2, maxQueued);
- threadCount *= 2;
- maxQueued *= 2;
- }
-
- long runs = 0;
- long events = 0;
- final TreeSet<Batch> pending = new TreeSet<>();
- final BitSet executorsWithWork = new BitSet(executorCount);
- long until = 0;
- // basic idea is to go through different levels of load on the executor service; initially is all small batches
- // (mostly within max queue size) of very short operations, moving to progressively larger batches
- // (beyond max queued size), and longer operations
- for (float multiplier = 0f ; multiplier < 2.01f ; )
- {
- if (System.nanoTime() > until)
- {
- System.out.println(String.format("Completed %.0fK batches with %.1fM events", runs * 0.001f, events * 0.000001f));
- events = 0;
- until = System.nanoTime() + intervalNanos;
- multiplier += loadIncrement;
- System.out.println(String.format("Running for %ds with load multiplier %.1f", TimeUnit.NANOSECONDS.toSeconds(intervalNanos), multiplier));
- }
-
- // wait a random amount of time so we submit new tasks in various stages of
- long timeout;
- if (pending.isEmpty()) timeout = 0;
- else if (Math.random() > 0.98) timeout = Long.MAX_VALUE;
- else if (pending.size() == executorCount) timeout = pending.first().timeout;
- else timeout = (long) (Math.random() * pending.last().timeout);
-
- while (!pending.isEmpty() && timeout > System.nanoTime())
- {
- Batch first = pending.first();
- boolean complete = false;
- try
- {
- for (Result result : first.results.descendingSet())
- result.future.get(timeout - System.nanoTime(), TimeUnit.NANOSECONDS);
- complete = true;
- }
- catch (TimeoutException e)
- {
- }
- if (!complete && System.nanoTime() > first.timeout)
- {
- for (Result result : first.results)
- if (!result.future.isDone())
- throw new AssertionError();
- complete = true;
- }
- if (complete)
- {
- pending.pollFirst();
- executorsWithWork.clear(first.executorIndex);
- }
- }
-
- // if we've emptied the executors, give all our threads an opportunity to spin down
- if (timeout == Long.MAX_VALUE)
- Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
-
- // submit a random batch to the first free executor service
- int executorIndex = executorsWithWork.nextClearBit(0);
- if (executorIndex >= executorCount)
- continue;
- executorsWithWork.set(executorIndex);
- ExecutorService executor = executors[executorIndex];
- TreeSet<Result> results = new TreeSet<>();
- int count = (int) (workCount[executorIndex].sample() * multiplier);
- long targetTotalElapsed = 0;
- long start = System.nanoTime();
- long baseTime;
- if (Math.random() > 0.5) baseTime = 2 * (long) (workTime.sample() * multiplier);
- else baseTime = 0;
- for (int j = 0 ; j < count ; j++)
- {
- long time;
- if (baseTime == 0) time = (long) (workTime.sample() * multiplier);
- else time = (long) (baseTime * Math.random());
- if (time < minWorkTime)
- time = minWorkTime;
- if (time > maxWorkTime)
- time = maxWorkTime;
- targetTotalElapsed += time;
- Future<?> future = executor.submit(new WaitTask(time));
- results.add(new Result(future, System.nanoTime() + time));
- }
- long end = start + (long) Math.ceil(targetTotalElapsed / (double) threadCounts[executorIndex])
- + TimeUnit.MILLISECONDS.toNanos(100L);
- long now = System.nanoTime();
- if (runs++ > executorCount && now > end)
- throw new AssertionError();
- events += results.size();
- pending.add(new Batch(results, end, executorIndex));
-// System.out.println(String.format("Submitted batch to executor %d with %d items and %d permitted millis", executorIndex, count, TimeUnit.NANOSECONDS.toMillis(end - start)));
- }
- }
-
- public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException
- {
- // do longer test
- new LongSharedExecutorPoolTest().testPromptnessOfExecution(TimeUnit.MINUTES.toNanos(10L), 0.1f);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd4a9d18/test/long/org/apache/cassandra/utils/LongBTreeTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/utils/LongBTreeTest.java b/test/long/org/apache/cassandra/utils/LongBTreeTest.java
deleted file mode 100644
index 76ff2bf..0000000
--- a/test/long/org/apache/cassandra/utils/LongBTreeTest.java
+++ /dev/null
@@ -1,401 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cassandra.utils;
-
-import java.util.*;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import javax.annotation.Nullable;
-
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListenableFutureTask;
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.yammer.metrics.Metrics;
-import com.yammer.metrics.core.Timer;
-import com.yammer.metrics.core.TimerContext;
-import com.yammer.metrics.stats.Snapshot;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.utils.btree.BTree;
-import org.apache.cassandra.utils.btree.BTreeSet;
-import org.apache.cassandra.utils.btree.UpdateFunction;
-
-// TODO : should probably lower fan-factor for tests to make them more intensive
-public class LongBTreeTest
-{
-
- private static final Timer BTREE_TIMER = Metrics.newTimer(BTree.class, "BTREE", TimeUnit.NANOSECONDS, TimeUnit.NANOSECONDS);
- private static final Timer TREE_TIMER = Metrics.newTimer(BTree.class, "TREE", TimeUnit.NANOSECONDS, TimeUnit.NANOSECONDS);
- private static final ExecutorService MODIFY = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new NamedThreadFactory("MODIFY"));
- private static final ExecutorService COMPARE = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new NamedThreadFactory("COMPARE"));
- private static final RandomAbort<Integer> SPORADIC_ABORT = new RandomAbort<>(new Random(), 0.0001f);
-
- static
- {
- System.setProperty("cassandra.btree.fanfactor", "4");
- }
-
- @Test
- public void testOversizedMiddleInsert()
- {
- TreeSet<Integer> canon = new TreeSet<>();
- for (int i = 0 ; i < 10000000 ; i++)
- canon.add(i);
- Object[] btree = BTree.build(Arrays.asList(Integer.MIN_VALUE, Integer.MAX_VALUE), ICMP, true, null);
- btree = BTree.update(btree, ICMP, canon, true);
- canon.add(Integer.MIN_VALUE);
- canon.add(Integer.MAX_VALUE);
- Assert.assertTrue(BTree.isWellFormed(btree, ICMP));
- testEqual("Oversize", BTree.<Integer>slice(btree, true), canon.iterator());
- }
-
- @Test
- public void testIndividualInsertsSmallOverlappingRange() throws ExecutionException, InterruptedException
- {
- testInsertions(10000000, 50, 1, 1, true);
- }
-
- @Test
- public void testBatchesSmallOverlappingRange() throws ExecutionException, InterruptedException
- {
- testInsertions(10000000, 50, 1, 5, true);
- }
-
- @Test
- public void testIndividualInsertsMediumSparseRange() throws ExecutionException, InterruptedException
- {
- testInsertions(10000000, 500, 10, 1, true);
- }
-
- @Test
- public void testBatchesMediumSparseRange() throws ExecutionException, InterruptedException
- {
- testInsertions(10000000, 500, 10, 10, true);
- }
-
- @Test
- public void testLargeBatchesLargeRange() throws ExecutionException, InterruptedException
- {
- testInsertions(100000000, 5000, 3, 100, true);
- }
-
- @Test
- public void testSlicingSmallRandomTrees() throws ExecutionException, InterruptedException
- {
- testInsertions(10000, 50, 10, 10, false);
- }
-
- private static void testInsertions(int totalCount, int perTestCount, int testKeyRatio, int modificationBatchSize, boolean quickEquality) throws ExecutionException, InterruptedException
- {
- int batchesPerTest = perTestCount / modificationBatchSize;
- int maximumRunLength = 100;
- int testKeyRange = perTestCount * testKeyRatio;
- int tests = totalCount / perTestCount;
- System.out.println(String.format("Performing %d tests of %d operations, with %.2f max size/key-range ratio in batches of ~%d ops",
- tests, perTestCount, 1 / (float) testKeyRatio, modificationBatchSize));
-
- // if we're not doing quick-equality, we can spam with garbage for all the checks we perform, so we'll split the work into smaller chunks
- int chunkSize = quickEquality ? tests : (int) (100000 / Math.pow(perTestCount, 2));
- for (int chunk = 0 ; chunk < tests ; chunk += chunkSize)
- {
- final List<ListenableFutureTask<List<ListenableFuture<?>>>> outer = new ArrayList<>();
- for (int i = 0 ; i < chunkSize ; i++)
- {
- outer.add(doOneTestInsertions(testKeyRange, maximumRunLength, modificationBatchSize, batchesPerTest, quickEquality));
- }
-
- final List<ListenableFuture<?>> inner = new ArrayList<>();
- int complete = 0;
- int reportInterval = totalCount / 100;
- int lastReportAt = 0;
- for (ListenableFutureTask<List<ListenableFuture<?>>> f : outer)
- {
- inner.addAll(f.get());
- complete += perTestCount;
- if (complete - lastReportAt >= reportInterval)
- {
- System.out.println(String.format("Completed %d of %d operations", (chunk * perTestCount) + complete, totalCount));
- lastReportAt = complete;
- }
- }
- Futures.allAsList(inner).get();
- }
- Snapshot snap = BTREE_TIMER.getSnapshot();
- System.out.println(String.format("btree : %.2fns, %.2fns, %.2fns", snap.getMedian(), snap.get95thPercentile(), snap.get999thPercentile()));
- snap = TREE_TIMER.getSnapshot();
- System.out.println(String.format("snaptree: %.2fns, %.2fns, %.2fns", snap.getMedian(), snap.get95thPercentile(), snap.get999thPercentile()));
- System.out.println("Done");
- }
-
- private static ListenableFutureTask<List<ListenableFuture<?>>> doOneTestInsertions(final int upperBound, final int maxRunLength, final int averageModsPerIteration, final int iterations, final boolean quickEquality)
- {
- ListenableFutureTask<List<ListenableFuture<?>>> f = ListenableFutureTask.create(new Callable<List<ListenableFuture<?>>>()
- {
- @Override
- public List<ListenableFuture<?>> call()
- {
- final List<ListenableFuture<?>> r = new ArrayList<>();
- NavigableMap<Integer, Integer> canon = new TreeMap<>();
- Object[] btree = BTree.empty();
- final TreeMap<Integer, Integer> buffer = new TreeMap<>();
- final Random rnd = new Random();
- for (int i = 0 ; i < iterations ; i++)
- {
- buffer.clear();
- int mods = (averageModsPerIteration >> 1) + 1 + rnd.nextInt(averageModsPerIteration);
- while (mods > 0)
- {
- int v = rnd.nextInt(upperBound);
- int rc = Math.max(0, Math.min(mods, maxRunLength) - 1);
- int c = 1 + (rc <= 0 ? 0 : rnd.nextInt(rc));
- for (int j = 0 ; j < c ; j++)
- {
- buffer.put(v, v);
- v++;
- }
- mods -= c;
- }
- TimerContext ctxt;
- ctxt = TREE_TIMER.time();
- canon.putAll(buffer);
- ctxt.stop();
- ctxt = BTREE_TIMER.time();
- Object[] next = null;
- while (next == null)
- next = BTree.update(btree, ICMP, buffer.keySet(), true, SPORADIC_ABORT);
- btree = next;
- ctxt.stop();
-
- if (!BTree.isWellFormed(btree, ICMP))
- {
- System.out.println("ERROR: Not well formed");
- throw new AssertionError("Not well formed!");
- }
- if (quickEquality)
- testEqual("", BTree.<Integer>slice(btree, true), canon.keySet().iterator());
- else
- r.addAll(testAllSlices("RND", btree, new TreeSet<>(canon.keySet())));
- }
- return r;
- }
- });
- MODIFY.execute(f);
- return f;
- }
-
- @Test
- public void testSlicingAllSmallTrees() throws ExecutionException, InterruptedException
- {
- Object[] cur = BTree.empty();
- TreeSet<Integer> canon = new TreeSet<>();
- // we set FAN_FACTOR to 4, so 128 items is four levels deep, three fully populated
- for (int i = 0 ; i < 128 ; i++)
- {
- String id = String.format("[0..%d)", canon.size());
- System.out.println("Testing " + id);
- Futures.allAsList(testAllSlices(id, cur, canon)).get();
- Object[] next = null;
- while (next == null)
- next = BTree.update(cur, ICMP, Arrays.asList(i), true, SPORADIC_ABORT);
- cur = next;
- canon.add(i);
- }
- }
-
- static final Comparator<Integer> ICMP = new Comparator<Integer>()
- {
- @Override
- public int compare(Integer o1, Integer o2)
- {
- return Integer.compare(o1, o2);
- }
- };
-
- private static List<ListenableFuture<?>> testAllSlices(String id, Object[] btree, NavigableSet<Integer> canon)
- {
- List<ListenableFuture<?>> waitFor = new ArrayList<>();
- testAllSlices(id + " ASC", new BTreeSet<>(btree, ICMP), canon, true, waitFor);
- testAllSlices(id + " DSC", new BTreeSet<>(btree, ICMP).descendingSet(), canon.descendingSet(), false, waitFor);
- return waitFor;
- }
-
- private static void testAllSlices(String id, NavigableSet<Integer> btree, NavigableSet<Integer> canon, boolean ascending, List<ListenableFuture<?>> results)
- {
- testOneSlice(id, btree, canon, results);
- for (Integer lb : range(canon.size(), Integer.MIN_VALUE, ascending))
- {
- // test head/tail sets
- testOneSlice(String.format("%s->[%d..)", id, lb), btree.headSet(lb, true), canon.headSet(lb, true), results);
- testOneSlice(String.format("%s->(%d..)", id, lb), btree.headSet(lb, false), canon.headSet(lb, false), results);
- testOneSlice(String.format("%s->(..%d]", id, lb), btree.tailSet(lb, true), canon.tailSet(lb, true), results);
- testOneSlice(String.format("%s->(..%d]", id, lb), btree.tailSet(lb, false), canon.tailSet(lb, false), results);
- for (Integer ub : range(canon.size(), lb, ascending))
- {
- // test subsets
- testOneSlice(String.format("%s->[%d..%d]", id, lb, ub), btree.subSet(lb, true, ub, true), canon.subSet(lb, true, ub, true), results);
- testOneSlice(String.format("%s->(%d..%d]", id, lb, ub), btree.subSet(lb, false, ub, true), canon.subSet(lb, false, ub, true), results);
- testOneSlice(String.format("%s->[%d..%d)", id, lb, ub), btree.subSet(lb, true, ub, false), canon.subSet(lb, true, ub, false), results);
- testOneSlice(String.format("%s->(%d..%d)", id, lb, ub), btree.subSet(lb, false, ub, false), canon.subSet(lb, false, ub, false), results);
- }
- }
- }
-
- private static void testOneSlice(final String id, final NavigableSet<Integer> test, final NavigableSet<Integer> canon, List<ListenableFuture<?>> results)
- {
- ListenableFutureTask<?> f = ListenableFutureTask.create(new Runnable()
- {
-
- @Override
- public void run()
- {
- test(id + " Count", test.size(), canon.size());
- testEqual(id, test.iterator(), canon.iterator());
- testEqual(id + "->DSCI", test.descendingIterator(), canon.descendingIterator());
- testEqual(id + "->DSCS", test.descendingSet().iterator(), canon.descendingSet().iterator());
- testEqual(id + "->DSCS->DSCI", test.descendingSet().descendingIterator(), canon.descendingSet().descendingIterator());
- }
- }, null);
- results.add(f);
- COMPARE.execute(f);
- }
-
- private static void test(String id, int test, int expect)
- {
- if (test != expect)
- {
- System.out.println(String.format("%s: Expected %d, Got %d", id, expect, test));
- }
- }
-
- private static <V> void testEqual(String id, Iterator<V> btree, Iterator<V> canon)
- {
- boolean equal = true;
- while (btree.hasNext() && canon.hasNext())
- {
- Object i = btree.next();
- Object j = canon.next();
- if (!i.equals(j))
- {
- System.out.println(String.format("%s: Expected %d, Got %d", id, j, i));
- equal = false;
- }
- }
- while (btree.hasNext())
- {
- System.out.println(String.format("%s: Expected <Nil>, Got %d", id, btree.next()));
- equal = false;
- }
- while (canon.hasNext())
- {
- System.out.println(String.format("%s: Expected %d, Got Nil", id, canon.next()));
- equal = false;
- }
- if (!equal)
- throw new AssertionError("Not equal");
- }
-
- // should only be called on sets that range from 0->N or N->0
- private static final Iterable<Integer> range(final int size, final int from, final boolean ascending)
- {
- return new Iterable<Integer>()
- {
- int cur;
- int delta;
- int end;
- {
- if (ascending)
- {
- end = size + 1;
- cur = from == Integer.MIN_VALUE ? -1 : from;
- delta = 1;
- }
- else
- {
- end = -2;
- cur = from == Integer.MIN_VALUE ? size : from;
- delta = -1;
- }
- }
- @Override
- public Iterator<Integer> iterator()
- {
- return new Iterator<Integer>()
- {
- @Override
- public boolean hasNext()
- {
- return cur != end;
- }
-
- @Override
- public Integer next()
- {
- Integer r = cur;
- cur += delta;
- return r;
- }
-
- @Override
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
- };
- }
- };
- }
-
- private static final class RandomAbort<V> implements UpdateFunction<V>
- {
- final Random rnd;
- final float chance;
- private RandomAbort(Random rnd, float chance)
- {
- this.rnd = rnd;
- this.chance = chance;
- }
-
- public V apply(V replacing, V update)
- {
- return update;
- }
-
- public boolean abortEarly()
- {
- return rnd.nextFloat() < chance;
- }
-
- public void allocated(long heapSize)
- {
-
- }
-
- public V apply(V v)
- {
- return v;
- }
- }
-
-}
[4/6] cassandra git commit: Merge branch 'cassandra-2.1' into
cassandra-2.2
Posted by be...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2
Conflicts:
build.xml
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/02a7c342
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/02a7c342
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/02a7c342
Branch: refs/heads/trunk
Commit: 02a7c342922a209ac7374f2f425c783a5faf8538
Parents: 14d7a63 bd4a9d1
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Sun Jun 28 11:39:53 2015 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Sun Jun 28 11:39:53 2015 +0100
----------------------------------------------------------------------
----------------------------------------------------------------------
[2/6] cassandra git commit: backport burn test refactor
Posted by be...@apache.org.
backport burn test refactor
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bd4a9d18
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bd4a9d18
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bd4a9d18
Branch: refs/heads/cassandra-2.2
Commit: bd4a9d18e1317dcb8542bd4adc5a9f99b108d6c6
Parents: 8a56868
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Sun Jun 28 11:38:22 2015 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Sun Jun 28 11:38:22 2015 +0100
----------------------------------------------------------------------
build.xml | 7 +
.../cassandra/concurrent/LongOpOrderTest.java | 240 +++++++++
.../concurrent/LongSharedExecutorPoolTest.java | 226 +++++++++
.../apache/cassandra/utils/LongBTreeTest.java | 502 +++++++++++++++++++
.../cassandra/concurrent/LongOpOrderTest.java | 240 ---------
.../concurrent/LongSharedExecutorPoolTest.java | 228 ---------
.../apache/cassandra/utils/LongBTreeTest.java | 401 ---------------
7 files changed, 975 insertions(+), 869 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd4a9d18/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 73e76e5..18ad49f 100644
--- a/build.xml
+++ b/build.xml
@@ -93,6 +93,7 @@
<property name="test.timeout" value="60000" />
<property name="test.long.timeout" value="600000" />
+ <property name="test.burn.timeout" value="600000" />
<!-- default for cql tests. Can be override by -Dcassandra.test.use_prepared=false -->
<property name="cassandra.test.use_prepared" value="true" />
@@ -1258,6 +1259,12 @@
</testmacro>
</target>
+ <target name="test-burn" depends="build-test" description="Execute functional tests">
+ <testmacro suitename="burn" inputdir="${test.burn.src}"
+ timeout="${test.burn.timeout}">
+ </testmacro>
+ </target>
+
<target name="long-test" depends="build-test" description="Execute functional tests">
<testmacro suitename="long" inputdir="${test.long.src}"
timeout="${test.long.timeout}">
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd4a9d18/test/burn/org/apache/cassandra/concurrent/LongOpOrderTest.java
----------------------------------------------------------------------
diff --git a/test/burn/org/apache/cassandra/concurrent/LongOpOrderTest.java b/test/burn/org/apache/cassandra/concurrent/LongOpOrderTest.java
new file mode 100644
index 0000000..d7105df
--- /dev/null
+++ b/test/burn/org/apache/cassandra/concurrent/LongOpOrderTest.java
@@ -0,0 +1,240 @@
+package org.apache.cassandra.concurrent;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.utils.concurrent.OpOrder;
+
+import static org.junit.Assert.assertTrue;
+
+// TODO: we don't currently test SAFE functionality at all!
+// TODO: should also test markBlocking and SyncOrdered
+public class LongOpOrderTest
+{
+
+ private static final Logger logger = LoggerFactory.getLogger(LongOpOrderTest.class);
+
+ static final int CONSUMERS = 4;
+ static final int PRODUCERS = 32;
+
+ static final long RUNTIME = TimeUnit.MINUTES.toMillis(5);
+ static final long REPORT_INTERVAL = TimeUnit.MINUTES.toMillis(1);
+
+ static final Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler()
+ {
+ @Override
+ public void uncaughtException(Thread t, Throwable e)
+ {
+ System.err.println(t.getName() + ": " + e.getMessage());
+ e.printStackTrace();
+ }
+ };
+
+ final OpOrder order = new OpOrder();
+ final AtomicInteger errors = new AtomicInteger();
+
+ class TestOrdering implements Runnable
+ {
+
+ final int[] waitNanos = new int[1 << 16];
+ volatile State state = new State();
+ final ScheduledExecutorService sched;
+
+ TestOrdering(ExecutorService exec, ScheduledExecutorService sched)
+ {
+ this.sched = sched;
+ final ThreadLocalRandom rnd = ThreadLocalRandom.current();
+ for (int i = 0 ; i < waitNanos.length ; i++)
+ waitNanos[i] = rnd.nextInt(5000);
+ for (int i = 0 ; i < PRODUCERS / CONSUMERS ; i++)
+ exec.execute(new Producer());
+ exec.execute(this);
+ }
+
+ @Override
+ public void run()
+ {
+ final long until = System.currentTimeMillis() + RUNTIME;
+ long lastReport = System.currentTimeMillis();
+ long count = 0;
+ long opCount = 0;
+ while (true)
+ {
+ long now = System.currentTimeMillis();
+ if (now > until)
+ break;
+ if (now > lastReport + REPORT_INTERVAL)
+ {
+ lastReport = now;
+ logger.info(String.format("%s: Executed %d barriers with %d operations. %.0f%% complete.",
+ Thread.currentThread().getName(), count, opCount, 100 * (1 - ((until - now) / (double) RUNTIME))));
+ }
+ try
+ {
+ Thread.sleep(0, waitNanos[((int) (count & (waitNanos.length - 1)))]);
+ } catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+
+ final State s = state;
+ s.barrier = order.newBarrier();
+ s.replacement = new State();
+ s.barrier.issue();
+ s.barrier.await();
+ s.check();
+ opCount += s.totalCount();
+ state = s.replacement;
+ sched.schedule(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ s.check();
+ }
+ }, 1, TimeUnit.SECONDS);
+ count++;
+ }
+ }
+
+ class State
+ {
+
+ volatile OpOrder.Barrier barrier;
+ volatile State replacement;
+ final NonBlockingHashMap<OpOrder.Group, AtomicInteger> count = new NonBlockingHashMap<>();
+ int checkCount = -1;
+
+ boolean accept(OpOrder.Group opGroup)
+ {
+ if (barrier != null && !barrier.isAfter(opGroup))
+ return false;
+ AtomicInteger c;
+ if (null == (c = count.get(opGroup)))
+ {
+ count.putIfAbsent(opGroup, new AtomicInteger());
+ c = count.get(opGroup);
+ }
+ c.incrementAndGet();
+ return true;
+ }
+
+ int totalCount()
+ {
+ int c = 0;
+ for (AtomicInteger v : count.values())
+ c += v.intValue();
+ return c;
+ }
+
+ void check()
+ {
+ boolean delete;
+ if (checkCount >= 0)
+ {
+ if (checkCount != totalCount())
+ {
+ errors.incrementAndGet();
+ logger.error("Received size changed after barrier finished: {} vs {}", checkCount, totalCount());
+ }
+ delete = true;
+ }
+ else
+ {
+ checkCount = totalCount();
+ delete = false;
+ }
+ for (Map.Entry<OpOrder.Group, AtomicInteger> e : count.entrySet())
+ {
+ if (e.getKey().compareTo(barrier.getSyncPoint()) > 0)
+ {
+ errors.incrementAndGet();
+ logger.error("Received an operation that was created after the barrier was issued.");
+ }
+ if (TestOrdering.this.count.get(e.getKey()).intValue() != e.getValue().intValue())
+ {
+ errors.incrementAndGet();
+ logger.error("Missing registered operations. {} vs {}", TestOrdering.this.count.get(e.getKey()).intValue(), e.getValue().intValue());
+ }
+ if (delete)
+ TestOrdering.this.count.remove(e.getKey());
+ }
+ }
+
+ }
+
+ final NonBlockingHashMap<OpOrder.Group, AtomicInteger> count = new NonBlockingHashMap<>();
+
+ class Producer implements Runnable
+ {
+ public void run()
+ {
+ while (true)
+ {
+ AtomicInteger c;
+ try (OpOrder.Group opGroup = order.start())
+ {
+ if (null == (c = count.get(opGroup)))
+ {
+ count.putIfAbsent(opGroup, new AtomicInteger());
+ c = count.get(opGroup);
+ }
+ c.incrementAndGet();
+ State s = state;
+ while (!s.accept(opGroup))
+ s = s.replacement;
+ }
+ }
+ }
+ }
+
+ }
+
+ @Test
+ public void testOrdering() throws InterruptedException
+ {
+ errors.set(0);
+ Thread.setDefaultUncaughtExceptionHandler(handler);
+ final ExecutorService exec = Executors.newCachedThreadPool(new NamedThreadFactory("checker"));
+ final ScheduledExecutorService checker = Executors.newScheduledThreadPool(1, new NamedThreadFactory("checker"));
+ for (int i = 0 ; i < CONSUMERS ; i++)
+ new TestOrdering(exec, checker);
+ exec.shutdown();
+ exec.awaitTermination((long) (RUNTIME * 1.1), TimeUnit.MILLISECONDS);
+ assertTrue(exec.isShutdown());
+ assertTrue(errors.get() == 0);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd4a9d18/test/burn/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java
----------------------------------------------------------------------
diff --git a/test/burn/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java b/test/burn/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java
new file mode 100644
index 0000000..fe464c7
--- /dev/null
+++ b/test/burn/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.concurrent;
+
+import java.util.BitSet;
+import java.util.TreeSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.LockSupport;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.commons.math3.distribution.WeibullDistribution;
+import org.junit.Test;
+
+public class LongSharedExecutorPoolTest
+{
+
+ private static final class WaitTask implements Runnable
+ {
+ final long nanos;
+
+ private WaitTask(long nanos)
+ {
+ this.nanos = nanos;
+ }
+
+ public void run()
+ {
+ LockSupport.parkNanos(nanos);
+ }
+ }
+
+ private static final class Result implements Comparable<Result>
+ {
+ final Future<?> future;
+ final long forecastedCompletion;
+
+ private Result(Future<?> future, long forecastedCompletion)
+ {
+ this.future = future;
+ this.forecastedCompletion = forecastedCompletion;
+ }
+
+ public int compareTo(Result that)
+ {
+ int c = Long.compare(this.forecastedCompletion, that.forecastedCompletion);
+ if (c != 0)
+ return c;
+ c = Integer.compare(this.hashCode(), that.hashCode());
+ if (c != 0)
+ return c;
+ return Integer.compare(this.future.hashCode(), that.future.hashCode());
+ }
+ }
+
+ private static final class Batch implements Comparable<Batch>
+ {
+ final TreeSet<Result> results;
+ final long timeout;
+ final int executorIndex;
+
+ private Batch(TreeSet<Result> results, long timeout, int executorIndex)
+ {
+ this.results = results;
+ this.timeout = timeout;
+ this.executorIndex = executorIndex;
+ }
+
+ public int compareTo(Batch that)
+ {
+ int c = Long.compare(this.timeout, that.timeout);
+ if (c != 0)
+ return c;
+ c = Integer.compare(this.results.size(), that.results.size());
+ if (c != 0)
+ return c;
+ return Integer.compare(this.hashCode(), that.hashCode());
+ }
+ }
+
+ @Test
+ public void testPromptnessOfExecution() throws InterruptedException, ExecutionException
+ {
+ testPromptnessOfExecution(TimeUnit.MINUTES.toNanos(2L), 0.5f);
+ }
+
+ private void testPromptnessOfExecution(long intervalNanos, float loadIncrement) throws InterruptedException, ExecutionException
+ {
+ final int executorCount = 4;
+ int threadCount = 8;
+ int maxQueued = 1024;
+ final WeibullDistribution workTime = new WeibullDistribution(3, 200000);
+ final long minWorkTime = TimeUnit.MICROSECONDS.toNanos(1);
+ final long maxWorkTime = TimeUnit.MILLISECONDS.toNanos(1);
+
+ final int[] threadCounts = new int[executorCount];
+ final WeibullDistribution[] workCount = new WeibullDistribution[executorCount];
+ final ExecutorService[] executors = new ExecutorService[executorCount];
+ for (int i = 0 ; i < executors.length ; i++)
+ {
+ executors[i] = SharedExecutorPool.SHARED.newExecutor(threadCount, maxQueued, "test" + i, "test" + i);
+ threadCounts[i] = threadCount;
+ workCount[i] = new WeibullDistribution(2, maxQueued);
+ threadCount *= 2;
+ maxQueued *= 2;
+ }
+
+ long runs = 0;
+ long events = 0;
+ final TreeSet<Batch> pending = new TreeSet<>();
+ final BitSet executorsWithWork = new BitSet(executorCount);
+ long until = 0;
+ // basic idea is to go through different levels of load on the executor service; initially is all small batches
+ // (mostly within max queue size) of very short operations, moving to progressively larger batches
+ // (beyond max queued size), and longer operations
+ for (float multiplier = 0f ; multiplier < 2.01f ; )
+ {
+ if (System.nanoTime() > until)
+ {
+ System.out.println(String.format("Completed %.0fK batches with %.1fM events", runs * 0.001f, events * 0.000001f));
+ events = 0;
+ until = System.nanoTime() + intervalNanos;
+ multiplier += loadIncrement;
+ System.out.println(String.format("Running for %ds with load multiplier %.1f", TimeUnit.NANOSECONDS.toSeconds(intervalNanos), multiplier));
+ }
+
+ // wait a random amount of time so we submit new tasks in various stages of
+ long timeout;
+ if (pending.isEmpty()) timeout = 0;
+ else if (Math.random() > 0.98) timeout = Long.MAX_VALUE;
+ else if (pending.size() == executorCount) timeout = pending.first().timeout;
+ else timeout = (long) (Math.random() * pending.last().timeout);
+
+ while (!pending.isEmpty() && timeout > System.nanoTime())
+ {
+ Batch first = pending.first();
+ boolean complete = false;
+ try
+ {
+ for (Result result : first.results.descendingSet())
+ result.future.get(timeout - System.nanoTime(), TimeUnit.NANOSECONDS);
+ complete = true;
+ }
+ catch (TimeoutException e)
+ {
+ }
+ if (!complete && System.nanoTime() > first.timeout)
+ {
+ for (Result result : first.results)
+ if (!result.future.isDone())
+ throw new AssertionError();
+ complete = true;
+ }
+ if (complete)
+ {
+ pending.pollFirst();
+ executorsWithWork.clear(first.executorIndex);
+ }
+ }
+
+ // if we've emptied the executors, give all our threads an opportunity to spin down
+ if (timeout == Long.MAX_VALUE)
+ Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
+
+ // submit a random batch to the first free executor service
+ int executorIndex = executorsWithWork.nextClearBit(0);
+ if (executorIndex >= executorCount)
+ continue;
+ executorsWithWork.set(executorIndex);
+ ExecutorService executor = executors[executorIndex];
+ TreeSet<Result> results = new TreeSet<>();
+ int count = (int) (workCount[executorIndex].sample() * multiplier);
+ long targetTotalElapsed = 0;
+ long start = System.nanoTime();
+ long baseTime;
+ if (Math.random() > 0.5) baseTime = 2 * (long) (workTime.sample() * multiplier);
+ else baseTime = 0;
+ for (int j = 0 ; j < count ; j++)
+ {
+ long time;
+ if (baseTime == 0) time = (long) (workTime.sample() * multiplier);
+ else time = (long) (baseTime * Math.random());
+ if (time < minWorkTime)
+ time = minWorkTime;
+ if (time > maxWorkTime)
+ time = maxWorkTime;
+ targetTotalElapsed += time;
+ Future<?> future = executor.submit(new WaitTask(time));
+ results.add(new Result(future, System.nanoTime() + time));
+ }
+ long end = start + (long) Math.ceil(targetTotalElapsed / (double) threadCounts[executorIndex])
+ + TimeUnit.MILLISECONDS.toNanos(100L);
+ long now = System.nanoTime();
+ if (runs++ > executorCount && now > end)
+ throw new AssertionError();
+ events += results.size();
+ pending.add(new Batch(results, end, executorIndex));
+// System.out.println(String.format("Submitted batch to executor %d with %d items and %d permitted millis", executorIndex, count, TimeUnit.NANOSECONDS.toMillis(end - start)));
+ }
+ }
+
+ public static void main(String[] args) throws InterruptedException, ExecutionException
+ {
+ // do longer test
+ new LongSharedExecutorPoolTest().testPromptnessOfExecution(TimeUnit.MINUTES.toNanos(10L), 0.1f);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd4a9d18/test/burn/org/apache/cassandra/utils/LongBTreeTest.java
----------------------------------------------------------------------
diff --git a/test/burn/org/apache/cassandra/utils/LongBTreeTest.java b/test/burn/org/apache/cassandra/utils/LongBTreeTest.java
new file mode 100644
index 0000000..9641930
--- /dev/null
+++ b/test/burn/org/apache/cassandra/utils/LongBTreeTest.java
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.Random;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableFutureTask;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.utils.btree.BTree;
+import org.apache.cassandra.utils.btree.BTreeSearchIterator;
+import org.apache.cassandra.utils.btree.BTreeSet;
+import org.apache.cassandra.utils.btree.UpdateFunction;
+
+// TODO : should probably lower fan-factor for tests to make them more intensive
+public class LongBTreeTest
+{
+
+ private static final MetricRegistry metrics = new MetricRegistry();
+ private static final Timer BTREE_TIMER = metrics.timer(MetricRegistry.name(BTree.class, "BTREE"));
+ private static final Timer TREE_TIMER = metrics.timer(MetricRegistry.name(BTree.class, "TREE"));
+ private static final ExecutorService MODIFY = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new NamedThreadFactory("MODIFY"));
+ private static final ExecutorService COMPARE = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new NamedThreadFactory("COMPARE"));
+ private static final RandomAbort<Integer> SPORADIC_ABORT = new RandomAbort<>(new Random(), 0.0001f);
+
+ static
+ {
+ System.setProperty("cassandra.btree.fanfactor", "4");
+ }
+
+ @Test
+ public void testOversizedMiddleInsert()
+ {
+ TreeSet<Integer> canon = new TreeSet<>();
+ for (int i = 0 ; i < 10000000 ; i++)
+ canon.add(i);
+ Object[] btree = BTree.build(Arrays.asList(Integer.MIN_VALUE, Integer.MAX_VALUE), ICMP, true, null);
+ btree = BTree.update(btree, ICMP, canon, true);
+ canon.add(Integer.MIN_VALUE);
+ canon.add(Integer.MAX_VALUE);
+ Assert.assertTrue(BTree.isWellFormed(btree, ICMP));
+ testEqual("Oversize", BTree.<Integer>slice(btree, true), canon.iterator());
+ }
+
+ @Test
+ public void testIndividualInsertsSmallOverlappingRange() throws ExecutionException, InterruptedException
+ {
+ testInsertions(10000000, 50, 1, 1, true);
+ }
+
+ @Test
+ public void testBatchesSmallOverlappingRange() throws ExecutionException, InterruptedException
+ {
+ testInsertions(10000000, 50, 1, 5, true);
+ }
+
+ @Test
+ public void testIndividualInsertsMediumSparseRange() throws ExecutionException, InterruptedException
+ {
+ testInsertions(10000000, 500, 10, 1, true);
+ }
+
+ @Test
+ public void testBatchesMediumSparseRange() throws ExecutionException, InterruptedException
+ {
+ testInsertions(10000000, 500, 10, 10, true);
+ }
+
+ @Test
+ public void testLargeBatchesLargeRange() throws ExecutionException, InterruptedException
+ {
+ testInsertions(100000000, 5000, 3, 100, true);
+ }
+
+ @Test
+ public void testSlicingSmallRandomTrees() throws ExecutionException, InterruptedException
+ {
+ testInsertions(10000, 50, 10, 10, false);
+ }
+
+ @Test
+ public void testSearchIterator() throws InterruptedException
+ {
+ int threads = Runtime.getRuntime().availableProcessors();
+ final CountDownLatch latch = new CountDownLatch(threads);
+ final AtomicLong errors = new AtomicLong();
+ final AtomicLong count = new AtomicLong();
+ final int perThreadTrees = 100;
+ final int perTreeSelections = 100;
+ final long totalCount = threads * perThreadTrees * perTreeSelections;
+ for (int t = 0 ; t < threads ; t++)
+ {
+ MODIFY.execute(new Runnable()
+ {
+ public void run()
+ {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ for (int i = 0 ; i < perThreadTrees ; i++)
+ {
+ Object[] tree = randomTree(10000, random);
+ for (int j = 0 ; j < perTreeSelections ; j++)
+ {
+ BTreeSearchIterator<Integer, Integer, Integer> searchIterator = new BTreeSearchIterator<>(tree, ICMP);
+ for (Integer key : randomSelection(tree, random))
+ if (key != searchIterator.next(key))
+ errors.incrementAndGet();
+ searchIterator = new BTreeSearchIterator<Integer, Integer, Integer>(tree, ICMP);
+ for (Integer key : randomMix(tree, random))
+ if (key != searchIterator.next(key))
+ if (BTree.find(tree, ICMP, key) == key)
+ errors.incrementAndGet();
+ count.incrementAndGet();
+ }
+ }
+ latch.countDown();
+ }
+ });
+ }
+ while (latch.getCount() > 0)
+ {
+ latch.await(10L, TimeUnit.SECONDS);
+ System.out.println(String.format("%.0f%% complete %s", 100 * count.get() / (double) totalCount, errors.get() > 0 ? ("Errors: " + errors.get()) : ""));
+ assert errors.get() == 0;
+ }
+ }
+
+ private static void testInsertions(int totalCount, int perTestCount, int testKeyRatio, int modificationBatchSize, boolean quickEquality) throws ExecutionException, InterruptedException
+ {
+ int batchesPerTest = perTestCount / modificationBatchSize;
+ int maximumRunLength = 100;
+ int testKeyRange = perTestCount * testKeyRatio;
+ int tests = totalCount / perTestCount;
+ System.out.println(String.format("Performing %d tests of %d operations, with %.2f max size/key-range ratio in batches of ~%d ops",
+ tests, perTestCount, 1 / (float) testKeyRatio, modificationBatchSize));
+
+ // if we're not doing quick-equality, we can spam with garbage for all the checks we perform, so we'll split the work into smaller chunks
+ int chunkSize = quickEquality ? tests : (int) (100000 / Math.pow(perTestCount, 2));
+ for (int chunk = 0 ; chunk < tests ; chunk += chunkSize)
+ {
+ final List<ListenableFutureTask<List<ListenableFuture<?>>>> outer = new ArrayList<>();
+ for (int i = 0 ; i < chunkSize ; i++)
+ {
+ outer.add(doOneTestInsertions(testKeyRange, maximumRunLength, modificationBatchSize, batchesPerTest, quickEquality));
+ }
+
+ final List<ListenableFuture<?>> inner = new ArrayList<>();
+ int complete = 0;
+ int reportInterval = totalCount / 100;
+ int lastReportAt = 0;
+ for (ListenableFutureTask<List<ListenableFuture<?>>> f : outer)
+ {
+ inner.addAll(f.get());
+ complete += perTestCount;
+ if (complete - lastReportAt >= reportInterval)
+ {
+ System.out.println(String.format("Completed %d of %d operations", (chunk * perTestCount) + complete, totalCount));
+ lastReportAt = complete;
+ }
+ }
+ Futures.allAsList(inner).get();
+ }
+ Snapshot snap = BTREE_TIMER.getSnapshot();
+ System.out.println(String.format("btree : %.2fns, %.2fns, %.2fns", snap.getMedian(), snap.get95thPercentile(), snap.get999thPercentile()));
+ snap = TREE_TIMER.getSnapshot();
+ System.out.println(String.format("snaptree: %.2fns, %.2fns, %.2fns", snap.getMedian(), snap.get95thPercentile(), snap.get999thPercentile()));
+ System.out.println("Done");
+ }
+
+ private static ListenableFutureTask<List<ListenableFuture<?>>> doOneTestInsertions(final int upperBound, final int maxRunLength, final int averageModsPerIteration, final int iterations, final boolean quickEquality)
+ {
+ ListenableFutureTask<List<ListenableFuture<?>>> f = ListenableFutureTask.create(new Callable<List<ListenableFuture<?>>>()
+ {
+ @Override
+ public List<ListenableFuture<?>> call()
+ {
+ final List<ListenableFuture<?>> r = new ArrayList<>();
+ NavigableMap<Integer, Integer> canon = new TreeMap<>();
+ Object[] btree = BTree.empty();
+ final TreeMap<Integer, Integer> buffer = new TreeMap<>();
+ final Random rnd = new Random();
+ for (int i = 0 ; i < iterations ; i++)
+ {
+ buffer.clear();
+ int mods = (averageModsPerIteration >> 1) + 1 + rnd.nextInt(averageModsPerIteration);
+ while (mods > 0)
+ {
+ int v = rnd.nextInt(upperBound);
+ int rc = Math.max(0, Math.min(mods, maxRunLength) - 1);
+ int c = 1 + (rc <= 0 ? 0 : rnd.nextInt(rc));
+ for (int j = 0 ; j < c ; j++)
+ {
+ buffer.put(v, v);
+ v++;
+ }
+ mods -= c;
+ }
+ Timer.Context ctxt;
+ ctxt = TREE_TIMER.time();
+ canon.putAll(buffer);
+ ctxt.stop();
+ ctxt = BTREE_TIMER.time();
+ Object[] next = null;
+ while (next == null)
+ next = BTree.update(btree, ICMP, buffer.keySet(), true, SPORADIC_ABORT);
+ btree = next;
+ ctxt.stop();
+
+ if (!BTree.isWellFormed(btree, ICMP))
+ {
+ System.out.println("ERROR: Not well formed");
+ throw new AssertionError("Not well formed!");
+ }
+ if (quickEquality)
+ testEqual("", BTree.<Integer>slice(btree, true), canon.keySet().iterator());
+ else
+ r.addAll(testAllSlices("RND", btree, new TreeSet<>(canon.keySet())));
+ }
+ return r;
+ }
+ });
+ MODIFY.execute(f);
+ return f;
+ }
+
+ @Test
+ public void testSlicingAllSmallTrees() throws ExecutionException, InterruptedException
+ {
+ Object[] cur = BTree.empty();
+ TreeSet<Integer> canon = new TreeSet<>();
+ // we set FAN_FACTOR to 4, so 128 items is four levels deep, three fully populated
+ for (int i = 0 ; i < 128 ; i++)
+ {
+ String id = String.format("[0..%d)", canon.size());
+ System.out.println("Testing " + id);
+ Futures.allAsList(testAllSlices(id, cur, canon)).get();
+ Object[] next = null;
+ while (next == null)
+ next = BTree.update(cur, ICMP, Arrays.asList(i), true, SPORADIC_ABORT);
+ cur = next;
+ canon.add(i);
+ }
+ }
+
+ static final Comparator<Integer> ICMP = new Comparator<Integer>()
+ {
+ @Override
+ public int compare(Integer o1, Integer o2)
+ {
+ return Integer.compare(o1, o2);
+ }
+ };
+
+ private static List<ListenableFuture<?>> testAllSlices(String id, Object[] btree, NavigableSet<Integer> canon)
+ {
+ List<ListenableFuture<?>> waitFor = new ArrayList<>();
+ testAllSlices(id + " ASC", new BTreeSet<>(btree, ICMP), canon, true, waitFor);
+ testAllSlices(id + " DSC", new BTreeSet<>(btree, ICMP).descendingSet(), canon.descendingSet(), false, waitFor);
+ return waitFor;
+ }
+
+ private static void testAllSlices(String id, NavigableSet<Integer> btree, NavigableSet<Integer> canon, boolean ascending, List<ListenableFuture<?>> results)
+ {
+ testOneSlice(id, btree, canon, results);
+ for (Integer lb : range(canon.size(), Integer.MIN_VALUE, ascending))
+ {
+ // test head/tail sets
+ testOneSlice(String.format("%s->[%d..)", id, lb), btree.headSet(lb, true), canon.headSet(lb, true), results);
+ testOneSlice(String.format("%s->(%d..)", id, lb), btree.headSet(lb, false), canon.headSet(lb, false), results);
+ testOneSlice(String.format("%s->(..%d]", id, lb), btree.tailSet(lb, true), canon.tailSet(lb, true), results);
+ testOneSlice(String.format("%s->(..%d]", id, lb), btree.tailSet(lb, false), canon.tailSet(lb, false), results);
+ for (Integer ub : range(canon.size(), lb, ascending))
+ {
+ // test subsets
+ testOneSlice(String.format("%s->[%d..%d]", id, lb, ub), btree.subSet(lb, true, ub, true), canon.subSet(lb, true, ub, true), results);
+ testOneSlice(String.format("%s->(%d..%d]", id, lb, ub), btree.subSet(lb, false, ub, true), canon.subSet(lb, false, ub, true), results);
+ testOneSlice(String.format("%s->[%d..%d)", id, lb, ub), btree.subSet(lb, true, ub, false), canon.subSet(lb, true, ub, false), results);
+ testOneSlice(String.format("%s->(%d..%d)", id, lb, ub), btree.subSet(lb, false, ub, false), canon.subSet(lb, false, ub, false), results);
+ }
+ }
+ }
+
+ private static void testOneSlice(final String id, final NavigableSet<Integer> test, final NavigableSet<Integer> canon, List<ListenableFuture<?>> results)
+ {
+ ListenableFutureTask<?> f = ListenableFutureTask.create(new Runnable()
+ {
+
+ @Override
+ public void run()
+ {
+ test(id + " Count", test.size(), canon.size());
+ testEqual(id, test.iterator(), canon.iterator());
+ testEqual(id + "->DSCI", test.descendingIterator(), canon.descendingIterator());
+ testEqual(id + "->DSCS", test.descendingSet().iterator(), canon.descendingSet().iterator());
+ testEqual(id + "->DSCS->DSCI", test.descendingSet().descendingIterator(), canon.descendingSet().descendingIterator());
+ }
+ }, null);
+ results.add(f);
+ COMPARE.execute(f);
+ }
+
+ private static void test(String id, int test, int expect)
+ {
+ if (test != expect)
+ {
+ System.out.println(String.format("%s: Expected %d, Got %d", id, expect, test));
+ }
+ }
+
+ private static <V> void testEqual(String id, Iterator<V> btree, Iterator<V> canon)
+ {
+ boolean equal = true;
+ while (btree.hasNext() && canon.hasNext())
+ {
+ Object i = btree.next();
+ Object j = canon.next();
+ if (!i.equals(j))
+ {
+ System.out.println(String.format("%s: Expected %d, Got %d", id, j, i));
+ equal = false;
+ }
+ }
+ while (btree.hasNext())
+ {
+ System.out.println(String.format("%s: Expected <Nil>, Got %d", id, btree.next()));
+ equal = false;
+ }
+ while (canon.hasNext())
+ {
+ System.out.println(String.format("%s: Expected %d, Got Nil", id, canon.next()));
+ equal = false;
+ }
+ if (!equal)
+ throw new AssertionError("Not equal");
+ }
+
+ // should only be called on sets that range from 0->N or N->0
+ private static final Iterable<Integer> range(final int size, final int from, final boolean ascending)
+ {
+ return new Iterable<Integer>()
+ {
+ int cur;
+ int delta;
+ int end;
+ {
+ if (ascending)
+ {
+ end = size + 1;
+ cur = from == Integer.MIN_VALUE ? -1 : from;
+ delta = 1;
+ }
+ else
+ {
+ end = -2;
+ cur = from == Integer.MIN_VALUE ? size : from;
+ delta = -1;
+ }
+ }
+ @Override
+ public Iterator<Integer> iterator()
+ {
+ return new Iterator<Integer>()
+ {
+ @Override
+ public boolean hasNext()
+ {
+ return cur != end;
+ }
+
+ @Override
+ public Integer next()
+ {
+ Integer r = cur;
+ cur += delta;
+ return r;
+ }
+
+ @Override
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ };
+ }
+
+ private static Object[] randomTree(int maxSize, Random random)
+ {
+ TreeSet<Integer> build = new TreeSet<>();
+ int size = random.nextInt(maxSize);
+ for (int i = 0 ; i < size ; i++)
+ {
+ build.add(random.nextInt());
+ }
+ return BTree.build(build, ICMP, true, UpdateFunction.NoOp.<Integer>instance());
+ }
+
+ private static Iterable<Integer> randomSelection(Object[] iter, final Random rnd)
+ {
+ final float proportion = rnd.nextFloat();
+ return Iterables.filter(new BTreeSet<>(iter, ICMP), new Predicate<Integer>()
+ {
+ public boolean apply(Integer integer)
+ {
+ return rnd.nextFloat() < proportion;
+ }
+ });
+ }
+
+ private static Iterable<Integer> randomMix(Object[] iter, final Random rnd)
+ {
+ final float proportion = rnd.nextFloat();
+ return Iterables.transform(new BTreeSet<>(iter, ICMP), new Function<Integer, Integer>()
+ {
+ long last = Integer.MIN_VALUE;
+
+ public Integer apply(Integer v)
+ {
+ long last = this.last;
+ this.last = v;
+ if (rnd.nextFloat() < proportion)
+ return v;
+ return (int)((v - last) / 2);
+ }
+ });
+ }
+
+ private static final class RandomAbort<V> implements UpdateFunction<V>
+ {
+ final Random rnd;
+ final float chance;
+ private RandomAbort(Random rnd, float chance)
+ {
+ this.rnd = rnd;
+ this.chance = chance;
+ }
+
+ public V apply(V replacing, V update)
+ {
+ return update;
+ }
+
+ public boolean abortEarly()
+ {
+ return rnd.nextFloat() < chance;
+ }
+
+ public void allocated(long heapSize)
+ {
+
+ }
+
+ public V apply(V v)
+ {
+ return v;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd4a9d18/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java b/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java
deleted file mode 100644
index d7105df..0000000
--- a/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java
+++ /dev/null
@@ -1,240 +0,0 @@
-package org.apache.cassandra.concurrent;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.utils.concurrent.OpOrder;
-
-import static org.junit.Assert.assertTrue;
-
-// TODO: we don't currently test SAFE functionality at all!
-// TODO: should also test markBlocking and SyncOrdered
-public class LongOpOrderTest
-{
-
- private static final Logger logger = LoggerFactory.getLogger(LongOpOrderTest.class);
-
- static final int CONSUMERS = 4;
- static final int PRODUCERS = 32;
-
- static final long RUNTIME = TimeUnit.MINUTES.toMillis(5);
- static final long REPORT_INTERVAL = TimeUnit.MINUTES.toMillis(1);
-
- static final Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler()
- {
- @Override
- public void uncaughtException(Thread t, Throwable e)
- {
- System.err.println(t.getName() + ": " + e.getMessage());
- e.printStackTrace();
- }
- };
-
- final OpOrder order = new OpOrder();
- final AtomicInteger errors = new AtomicInteger();
-
- class TestOrdering implements Runnable
- {
-
- final int[] waitNanos = new int[1 << 16];
- volatile State state = new State();
- final ScheduledExecutorService sched;
-
- TestOrdering(ExecutorService exec, ScheduledExecutorService sched)
- {
- this.sched = sched;
- final ThreadLocalRandom rnd = ThreadLocalRandom.current();
- for (int i = 0 ; i < waitNanos.length ; i++)
- waitNanos[i] = rnd.nextInt(5000);
- for (int i = 0 ; i < PRODUCERS / CONSUMERS ; i++)
- exec.execute(new Producer());
- exec.execute(this);
- }
-
- @Override
- public void run()
- {
- final long until = System.currentTimeMillis() + RUNTIME;
- long lastReport = System.currentTimeMillis();
- long count = 0;
- long opCount = 0;
- while (true)
- {
- long now = System.currentTimeMillis();
- if (now > until)
- break;
- if (now > lastReport + REPORT_INTERVAL)
- {
- lastReport = now;
- logger.info(String.format("%s: Executed %d barriers with %d operations. %.0f%% complete.",
- Thread.currentThread().getName(), count, opCount, 100 * (1 - ((until - now) / (double) RUNTIME))));
- }
- try
- {
- Thread.sleep(0, waitNanos[((int) (count & (waitNanos.length - 1)))]);
- } catch (InterruptedException e)
- {
- e.printStackTrace();
- }
-
- final State s = state;
- s.barrier = order.newBarrier();
- s.replacement = new State();
- s.barrier.issue();
- s.barrier.await();
- s.check();
- opCount += s.totalCount();
- state = s.replacement;
- sched.schedule(new Runnable()
- {
- @Override
- public void run()
- {
- s.check();
- }
- }, 1, TimeUnit.SECONDS);
- count++;
- }
- }
-
- class State
- {
-
- volatile OpOrder.Barrier barrier;
- volatile State replacement;
- final NonBlockingHashMap<OpOrder.Group, AtomicInteger> count = new NonBlockingHashMap<>();
- int checkCount = -1;
-
- boolean accept(OpOrder.Group opGroup)
- {
- if (barrier != null && !barrier.isAfter(opGroup))
- return false;
- AtomicInteger c;
- if (null == (c = count.get(opGroup)))
- {
- count.putIfAbsent(opGroup, new AtomicInteger());
- c = count.get(opGroup);
- }
- c.incrementAndGet();
- return true;
- }
-
- int totalCount()
- {
- int c = 0;
- for (AtomicInteger v : count.values())
- c += v.intValue();
- return c;
- }
-
- void check()
- {
- boolean delete;
- if (checkCount >= 0)
- {
- if (checkCount != totalCount())
- {
- errors.incrementAndGet();
- logger.error("Received size changed after barrier finished: {} vs {}", checkCount, totalCount());
- }
- delete = true;
- }
- else
- {
- checkCount = totalCount();
- delete = false;
- }
- for (Map.Entry<OpOrder.Group, AtomicInteger> e : count.entrySet())
- {
- if (e.getKey().compareTo(barrier.getSyncPoint()) > 0)
- {
- errors.incrementAndGet();
- logger.error("Received an operation that was created after the barrier was issued.");
- }
- if (TestOrdering.this.count.get(e.getKey()).intValue() != e.getValue().intValue())
- {
- errors.incrementAndGet();
- logger.error("Missing registered operations. {} vs {}", TestOrdering.this.count.get(e.getKey()).intValue(), e.getValue().intValue());
- }
- if (delete)
- TestOrdering.this.count.remove(e.getKey());
- }
- }
-
- }
-
- final NonBlockingHashMap<OpOrder.Group, AtomicInteger> count = new NonBlockingHashMap<>();
-
- class Producer implements Runnable
- {
- public void run()
- {
- while (true)
- {
- AtomicInteger c;
- try (OpOrder.Group opGroup = order.start())
- {
- if (null == (c = count.get(opGroup)))
- {
- count.putIfAbsent(opGroup, new AtomicInteger());
- c = count.get(opGroup);
- }
- c.incrementAndGet();
- State s = state;
- while (!s.accept(opGroup))
- s = s.replacement;
- }
- }
- }
- }
-
- }
-
- @Test
- public void testOrdering() throws InterruptedException
- {
- errors.set(0);
- Thread.setDefaultUncaughtExceptionHandler(handler);
- final ExecutorService exec = Executors.newCachedThreadPool(new NamedThreadFactory("checker"));
- final ScheduledExecutorService checker = Executors.newScheduledThreadPool(1, new NamedThreadFactory("checker"));
- for (int i = 0 ; i < CONSUMERS ; i++)
- new TestOrdering(exec, checker);
- exec.shutdown();
- exec.awaitTermination((long) (RUNTIME * 1.1), TimeUnit.MILLISECONDS);
- assertTrue(exec.isShutdown());
- assertTrue(errors.get() == 0);
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd4a9d18/test/long/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java b/test/long/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java
deleted file mode 100644
index 0fd53bb..0000000
--- a/test/long/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.concurrent;
-
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.List;
-import java.util.TreeSet;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.locks.LockSupport;
-
-import com.google.common.util.concurrent.Uninterruptibles;
-import org.apache.commons.math3.distribution.WeibullDistribution;
-import org.junit.Test;
-
-public class LongSharedExecutorPoolTest
-{
-
- private static final class WaitTask implements Runnable
- {
- final long nanos;
-
- private WaitTask(long nanos)
- {
- this.nanos = nanos;
- }
-
- public void run()
- {
- LockSupport.parkNanos(nanos);
- }
- }
-
- private static final class Result implements Comparable<Result>
- {
- final Future<?> future;
- final long forecastedCompletion;
-
- private Result(Future<?> future, long forecastedCompletion)
- {
- this.future = future;
- this.forecastedCompletion = forecastedCompletion;
- }
-
- public int compareTo(Result that)
- {
- int c = Long.compare(this.forecastedCompletion, that.forecastedCompletion);
- if (c != 0)
- return c;
- c = Integer.compare(this.hashCode(), that.hashCode());
- if (c != 0)
- return c;
- return Integer.compare(this.future.hashCode(), that.future.hashCode());
- }
- }
-
- private static final class Batch implements Comparable<Batch>
- {
- final TreeSet<Result> results;
- final long timeout;
- final int executorIndex;
-
- private Batch(TreeSet<Result> results, long timeout, int executorIndex)
- {
- this.results = results;
- this.timeout = timeout;
- this.executorIndex = executorIndex;
- }
-
- public int compareTo(Batch that)
- {
- int c = Long.compare(this.timeout, that.timeout);
- if (c != 0)
- return c;
- c = Integer.compare(this.results.size(), that.results.size());
- if (c != 0)
- return c;
- return Integer.compare(this.hashCode(), that.hashCode());
- }
- }
-
- @Test
- public void testPromptnessOfExecution() throws InterruptedException, ExecutionException, TimeoutException
- {
- testPromptnessOfExecution(TimeUnit.MINUTES.toNanos(2L), 0.5f);
- }
-
- private void testPromptnessOfExecution(long intervalNanos, float loadIncrement) throws InterruptedException, ExecutionException, TimeoutException
- {
- final int executorCount = 4;
- int threadCount = 8;
- int maxQueued = 1024;
- final WeibullDistribution workTime = new WeibullDistribution(3, 200000);
- final long minWorkTime = TimeUnit.MICROSECONDS.toNanos(1);
- final long maxWorkTime = TimeUnit.MILLISECONDS.toNanos(1);
-
- final int[] threadCounts = new int[executorCount];
- final WeibullDistribution[] workCount = new WeibullDistribution[executorCount];
- final ExecutorService[] executors = new ExecutorService[executorCount];
- for (int i = 0 ; i < executors.length ; i++)
- {
- executors[i] = JMXEnabledSharedExecutorPool.SHARED.newExecutor(threadCount, maxQueued, "test" + i, "test" + i);
- threadCounts[i] = threadCount;
- workCount[i] = new WeibullDistribution(2, maxQueued);
- threadCount *= 2;
- maxQueued *= 2;
- }
-
- long runs = 0;
- long events = 0;
- final TreeSet<Batch> pending = new TreeSet<>();
- final BitSet executorsWithWork = new BitSet(executorCount);
- long until = 0;
- // basic idea is to go through different levels of load on the executor service; initially is all small batches
- // (mostly within max queue size) of very short operations, moving to progressively larger batches
- // (beyond max queued size), and longer operations
- for (float multiplier = 0f ; multiplier < 2.01f ; )
- {
- if (System.nanoTime() > until)
- {
- System.out.println(String.format("Completed %.0fK batches with %.1fM events", runs * 0.001f, events * 0.000001f));
- events = 0;
- until = System.nanoTime() + intervalNanos;
- multiplier += loadIncrement;
- System.out.println(String.format("Running for %ds with load multiplier %.1f", TimeUnit.NANOSECONDS.toSeconds(intervalNanos), multiplier));
- }
-
- // wait a random amount of time so we submit new tasks in various stages of
- long timeout;
- if (pending.isEmpty()) timeout = 0;
- else if (Math.random() > 0.98) timeout = Long.MAX_VALUE;
- else if (pending.size() == executorCount) timeout = pending.first().timeout;
- else timeout = (long) (Math.random() * pending.last().timeout);
-
- while (!pending.isEmpty() && timeout > System.nanoTime())
- {
- Batch first = pending.first();
- boolean complete = false;
- try
- {
- for (Result result : first.results.descendingSet())
- result.future.get(timeout - System.nanoTime(), TimeUnit.NANOSECONDS);
- complete = true;
- }
- catch (TimeoutException e)
- {
- }
- if (!complete && System.nanoTime() > first.timeout)
- {
- for (Result result : first.results)
- if (!result.future.isDone())
- throw new AssertionError();
- complete = true;
- }
- if (complete)
- {
- pending.pollFirst();
- executorsWithWork.clear(first.executorIndex);
- }
- }
-
- // if we've emptied the executors, give all our threads an opportunity to spin down
- if (timeout == Long.MAX_VALUE)
- Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
-
- // submit a random batch to the first free executor service
- int executorIndex = executorsWithWork.nextClearBit(0);
- if (executorIndex >= executorCount)
- continue;
- executorsWithWork.set(executorIndex);
- ExecutorService executor = executors[executorIndex];
- TreeSet<Result> results = new TreeSet<>();
- int count = (int) (workCount[executorIndex].sample() * multiplier);
- long targetTotalElapsed = 0;
- long start = System.nanoTime();
- long baseTime;
- if (Math.random() > 0.5) baseTime = 2 * (long) (workTime.sample() * multiplier);
- else baseTime = 0;
- for (int j = 0 ; j < count ; j++)
- {
- long time;
- if (baseTime == 0) time = (long) (workTime.sample() * multiplier);
- else time = (long) (baseTime * Math.random());
- if (time < minWorkTime)
- time = minWorkTime;
- if (time > maxWorkTime)
- time = maxWorkTime;
- targetTotalElapsed += time;
- Future<?> future = executor.submit(new WaitTask(time));
- results.add(new Result(future, System.nanoTime() + time));
- }
- long end = start + (long) Math.ceil(targetTotalElapsed / (double) threadCounts[executorIndex])
- + TimeUnit.MILLISECONDS.toNanos(100L);
- long now = System.nanoTime();
- if (runs++ > executorCount && now > end)
- throw new AssertionError();
- events += results.size();
- pending.add(new Batch(results, end, executorIndex));
-// System.out.println(String.format("Submitted batch to executor %d with %d items and %d permitted millis", executorIndex, count, TimeUnit.NANOSECONDS.toMillis(end - start)));
- }
- }
-
- public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException
- {
- // do longer test
- new LongSharedExecutorPoolTest().testPromptnessOfExecution(TimeUnit.MINUTES.toNanos(10L), 0.1f);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd4a9d18/test/long/org/apache/cassandra/utils/LongBTreeTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/utils/LongBTreeTest.java b/test/long/org/apache/cassandra/utils/LongBTreeTest.java
deleted file mode 100644
index 76ff2bf..0000000
--- a/test/long/org/apache/cassandra/utils/LongBTreeTest.java
+++ /dev/null
@@ -1,401 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cassandra.utils;
-
-import java.util.*;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import javax.annotation.Nullable;
-
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListenableFutureTask;
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.yammer.metrics.Metrics;
-import com.yammer.metrics.core.Timer;
-import com.yammer.metrics.core.TimerContext;
-import com.yammer.metrics.stats.Snapshot;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.utils.btree.BTree;
-import org.apache.cassandra.utils.btree.BTreeSet;
-import org.apache.cassandra.utils.btree.UpdateFunction;
-
-// TODO : should probably lower fan-factor for tests to make them more intensive
-public class LongBTreeTest
-{
-
- private static final Timer BTREE_TIMER = Metrics.newTimer(BTree.class, "BTREE", TimeUnit.NANOSECONDS, TimeUnit.NANOSECONDS);
- private static final Timer TREE_TIMER = Metrics.newTimer(BTree.class, "TREE", TimeUnit.NANOSECONDS, TimeUnit.NANOSECONDS);
- private static final ExecutorService MODIFY = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new NamedThreadFactory("MODIFY"));
- private static final ExecutorService COMPARE = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new NamedThreadFactory("COMPARE"));
- private static final RandomAbort<Integer> SPORADIC_ABORT = new RandomAbort<>(new Random(), 0.0001f);
-
- static
- {
- System.setProperty("cassandra.btree.fanfactor", "4");
- }
-
- @Test
- public void testOversizedMiddleInsert()
- {
- TreeSet<Integer> canon = new TreeSet<>();
- for (int i = 0 ; i < 10000000 ; i++)
- canon.add(i);
- Object[] btree = BTree.build(Arrays.asList(Integer.MIN_VALUE, Integer.MAX_VALUE), ICMP, true, null);
- btree = BTree.update(btree, ICMP, canon, true);
- canon.add(Integer.MIN_VALUE);
- canon.add(Integer.MAX_VALUE);
- Assert.assertTrue(BTree.isWellFormed(btree, ICMP));
- testEqual("Oversize", BTree.<Integer>slice(btree, true), canon.iterator());
- }
-
- @Test
- public void testIndividualInsertsSmallOverlappingRange() throws ExecutionException, InterruptedException
- {
- testInsertions(10000000, 50, 1, 1, true);
- }
-
- @Test
- public void testBatchesSmallOverlappingRange() throws ExecutionException, InterruptedException
- {
- testInsertions(10000000, 50, 1, 5, true);
- }
-
- @Test
- public void testIndividualInsertsMediumSparseRange() throws ExecutionException, InterruptedException
- {
- testInsertions(10000000, 500, 10, 1, true);
- }
-
- @Test
- public void testBatchesMediumSparseRange() throws ExecutionException, InterruptedException
- {
- testInsertions(10000000, 500, 10, 10, true);
- }
-
- @Test
- public void testLargeBatchesLargeRange() throws ExecutionException, InterruptedException
- {
- testInsertions(100000000, 5000, 3, 100, true);
- }
-
- @Test
- public void testSlicingSmallRandomTrees() throws ExecutionException, InterruptedException
- {
- testInsertions(10000, 50, 10, 10, false);
- }
-
- private static void testInsertions(int totalCount, int perTestCount, int testKeyRatio, int modificationBatchSize, boolean quickEquality) throws ExecutionException, InterruptedException
- {
- int batchesPerTest = perTestCount / modificationBatchSize;
- int maximumRunLength = 100;
- int testKeyRange = perTestCount * testKeyRatio;
- int tests = totalCount / perTestCount;
- System.out.println(String.format("Performing %d tests of %d operations, with %.2f max size/key-range ratio in batches of ~%d ops",
- tests, perTestCount, 1 / (float) testKeyRatio, modificationBatchSize));
-
- // if we're not doing quick-equality, we can spam with garbage for all the checks we perform, so we'll split the work into smaller chunks
- int chunkSize = quickEquality ? tests : (int) (100000 / Math.pow(perTestCount, 2));
- for (int chunk = 0 ; chunk < tests ; chunk += chunkSize)
- {
- final List<ListenableFutureTask<List<ListenableFuture<?>>>> outer = new ArrayList<>();
- for (int i = 0 ; i < chunkSize ; i++)
- {
- outer.add(doOneTestInsertions(testKeyRange, maximumRunLength, modificationBatchSize, batchesPerTest, quickEquality));
- }
-
- final List<ListenableFuture<?>> inner = new ArrayList<>();
- int complete = 0;
- int reportInterval = totalCount / 100;
- int lastReportAt = 0;
- for (ListenableFutureTask<List<ListenableFuture<?>>> f : outer)
- {
- inner.addAll(f.get());
- complete += perTestCount;
- if (complete - lastReportAt >= reportInterval)
- {
- System.out.println(String.format("Completed %d of %d operations", (chunk * perTestCount) + complete, totalCount));
- lastReportAt = complete;
- }
- }
- Futures.allAsList(inner).get();
- }
- Snapshot snap = BTREE_TIMER.getSnapshot();
- System.out.println(String.format("btree : %.2fns, %.2fns, %.2fns", snap.getMedian(), snap.get95thPercentile(), snap.get999thPercentile()));
- snap = TREE_TIMER.getSnapshot();
- System.out.println(String.format("snaptree: %.2fns, %.2fns, %.2fns", snap.getMedian(), snap.get95thPercentile(), snap.get999thPercentile()));
- System.out.println("Done");
- }
-
- private static ListenableFutureTask<List<ListenableFuture<?>>> doOneTestInsertions(final int upperBound, final int maxRunLength, final int averageModsPerIteration, final int iterations, final boolean quickEquality)
- {
- ListenableFutureTask<List<ListenableFuture<?>>> f = ListenableFutureTask.create(new Callable<List<ListenableFuture<?>>>()
- {
- @Override
- public List<ListenableFuture<?>> call()
- {
- final List<ListenableFuture<?>> r = new ArrayList<>();
- NavigableMap<Integer, Integer> canon = new TreeMap<>();
- Object[] btree = BTree.empty();
- final TreeMap<Integer, Integer> buffer = new TreeMap<>();
- final Random rnd = new Random();
- for (int i = 0 ; i < iterations ; i++)
- {
- buffer.clear();
- int mods = (averageModsPerIteration >> 1) + 1 + rnd.nextInt(averageModsPerIteration);
- while (mods > 0)
- {
- int v = rnd.nextInt(upperBound);
- int rc = Math.max(0, Math.min(mods, maxRunLength) - 1);
- int c = 1 + (rc <= 0 ? 0 : rnd.nextInt(rc));
- for (int j = 0 ; j < c ; j++)
- {
- buffer.put(v, v);
- v++;
- }
- mods -= c;
- }
- TimerContext ctxt;
- ctxt = TREE_TIMER.time();
- canon.putAll(buffer);
- ctxt.stop();
- ctxt = BTREE_TIMER.time();
- Object[] next = null;
- while (next == null)
- next = BTree.update(btree, ICMP, buffer.keySet(), true, SPORADIC_ABORT);
- btree = next;
- ctxt.stop();
-
- if (!BTree.isWellFormed(btree, ICMP))
- {
- System.out.println("ERROR: Not well formed");
- throw new AssertionError("Not well formed!");
- }
- if (quickEquality)
- testEqual("", BTree.<Integer>slice(btree, true), canon.keySet().iterator());
- else
- r.addAll(testAllSlices("RND", btree, new TreeSet<>(canon.keySet())));
- }
- return r;
- }
- });
- MODIFY.execute(f);
- return f;
- }
-
- @Test
- public void testSlicingAllSmallTrees() throws ExecutionException, InterruptedException
- {
- Object[] cur = BTree.empty();
- TreeSet<Integer> canon = new TreeSet<>();
- // we set FAN_FACTOR to 4, so 128 items is four levels deep, three fully populated
- for (int i = 0 ; i < 128 ; i++)
- {
- String id = String.format("[0..%d)", canon.size());
- System.out.println("Testing " + id);
- Futures.allAsList(testAllSlices(id, cur, canon)).get();
- Object[] next = null;
- while (next == null)
- next = BTree.update(cur, ICMP, Arrays.asList(i), true, SPORADIC_ABORT);
- cur = next;
- canon.add(i);
- }
- }
-
- static final Comparator<Integer> ICMP = new Comparator<Integer>()
- {
- @Override
- public int compare(Integer o1, Integer o2)
- {
- return Integer.compare(o1, o2);
- }
- };
-
- private static List<ListenableFuture<?>> testAllSlices(String id, Object[] btree, NavigableSet<Integer> canon)
- {
- List<ListenableFuture<?>> waitFor = new ArrayList<>();
- testAllSlices(id + " ASC", new BTreeSet<>(btree, ICMP), canon, true, waitFor);
- testAllSlices(id + " DSC", new BTreeSet<>(btree, ICMP).descendingSet(), canon.descendingSet(), false, waitFor);
- return waitFor;
- }
-
- private static void testAllSlices(String id, NavigableSet<Integer> btree, NavigableSet<Integer> canon, boolean ascending, List<ListenableFuture<?>> results)
- {
- testOneSlice(id, btree, canon, results);
- for (Integer lb : range(canon.size(), Integer.MIN_VALUE, ascending))
- {
- // test head/tail sets
- testOneSlice(String.format("%s->[%d..)", id, lb), btree.headSet(lb, true), canon.headSet(lb, true), results);
- testOneSlice(String.format("%s->(%d..)", id, lb), btree.headSet(lb, false), canon.headSet(lb, false), results);
- testOneSlice(String.format("%s->(..%d]", id, lb), btree.tailSet(lb, true), canon.tailSet(lb, true), results);
- testOneSlice(String.format("%s->(..%d]", id, lb), btree.tailSet(lb, false), canon.tailSet(lb, false), results);
- for (Integer ub : range(canon.size(), lb, ascending))
- {
- // test subsets
- testOneSlice(String.format("%s->[%d..%d]", id, lb, ub), btree.subSet(lb, true, ub, true), canon.subSet(lb, true, ub, true), results);
- testOneSlice(String.format("%s->(%d..%d]", id, lb, ub), btree.subSet(lb, false, ub, true), canon.subSet(lb, false, ub, true), results);
- testOneSlice(String.format("%s->[%d..%d)", id, lb, ub), btree.subSet(lb, true, ub, false), canon.subSet(lb, true, ub, false), results);
- testOneSlice(String.format("%s->(%d..%d)", id, lb, ub), btree.subSet(lb, false, ub, false), canon.subSet(lb, false, ub, false), results);
- }
- }
- }
-
- private static void testOneSlice(final String id, final NavigableSet<Integer> test, final NavigableSet<Integer> canon, List<ListenableFuture<?>> results)
- {
- ListenableFutureTask<?> f = ListenableFutureTask.create(new Runnable()
- {
-
- @Override
- public void run()
- {
- test(id + " Count", test.size(), canon.size());
- testEqual(id, test.iterator(), canon.iterator());
- testEqual(id + "->DSCI", test.descendingIterator(), canon.descendingIterator());
- testEqual(id + "->DSCS", test.descendingSet().iterator(), canon.descendingSet().iterator());
- testEqual(id + "->DSCS->DSCI", test.descendingSet().descendingIterator(), canon.descendingSet().descendingIterator());
- }
- }, null);
- results.add(f);
- COMPARE.execute(f);
- }
-
- private static void test(String id, int test, int expect)
- {
- if (test != expect)
- {
- System.out.println(String.format("%s: Expected %d, Got %d", id, expect, test));
- }
- }
-
- private static <V> void testEqual(String id, Iterator<V> btree, Iterator<V> canon)
- {
- boolean equal = true;
- while (btree.hasNext() && canon.hasNext())
- {
- Object i = btree.next();
- Object j = canon.next();
- if (!i.equals(j))
- {
- System.out.println(String.format("%s: Expected %d, Got %d", id, j, i));
- equal = false;
- }
- }
- while (btree.hasNext())
- {
- System.out.println(String.format("%s: Expected <Nil>, Got %d", id, btree.next()));
- equal = false;
- }
- while (canon.hasNext())
- {
- System.out.println(String.format("%s: Expected %d, Got Nil", id, canon.next()));
- equal = false;
- }
- if (!equal)
- throw new AssertionError("Not equal");
- }
-
- // should only be called on sets that range from 0->N or N->0
- private static final Iterable<Integer> range(final int size, final int from, final boolean ascending)
- {
- return new Iterable<Integer>()
- {
- int cur;
- int delta;
- int end;
- {
- if (ascending)
- {
- end = size + 1;
- cur = from == Integer.MIN_VALUE ? -1 : from;
- delta = 1;
- }
- else
- {
- end = -2;
- cur = from == Integer.MIN_VALUE ? size : from;
- delta = -1;
- }
- }
- @Override
- public Iterator<Integer> iterator()
- {
- return new Iterator<Integer>()
- {
- @Override
- public boolean hasNext()
- {
- return cur != end;
- }
-
- @Override
- public Integer next()
- {
- Integer r = cur;
- cur += delta;
- return r;
- }
-
- @Override
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
- };
- }
- };
- }
-
- private static final class RandomAbort<V> implements UpdateFunction<V>
- {
- final Random rnd;
- final float chance;
- private RandomAbort(Random rnd, float chance)
- {
- this.rnd = rnd;
- this.chance = chance;
- }
-
- public V apply(V replacing, V update)
- {
- return update;
- }
-
- public boolean abortEarly()
- {
- return rnd.nextFloat() < chance;
- }
-
- public void allocated(long heapSize)
- {
-
- }
-
- public V apply(V v)
- {
- return v;
- }
- }
-
-}
[5/6] cassandra git commit: Merge branch 'cassandra-2.1' into
cassandra-2.2
Posted by be...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2
Conflicts:
build.xml
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/02a7c342
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/02a7c342
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/02a7c342
Branch: refs/heads/cassandra-2.2
Commit: 02a7c342922a209ac7374f2f425c783a5faf8538
Parents: 14d7a63 bd4a9d1
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Sun Jun 28 11:39:53 2015 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Sun Jun 28 11:39:53 2015 +0100
----------------------------------------------------------------------
----------------------------------------------------------------------