You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/05/06 18:25:18 UTC
cassandra git commit: Introduce test-burn ant target
Repository: cassandra
Updated Branches:
refs/heads/trunk 4dbe1e0c7 -> aa811c393
Introduce test-burn ant target
patch by ariel for CASSANDRA-9307
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/aa811c39
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/aa811c39
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/aa811c39
Branch: refs/heads/trunk
Commit: aa811c393653e92212458ef147a675f396f53173
Parents: 4dbe1e0
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Wed May 6 17:24:56 2015 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Wed May 6 17:24:56 2015 +0100
----------------------------------------------------------------------
build.xml | 11 +-
.../cassandra/concurrent/LongOpOrderTest.java | 240 ---------
.../concurrent/LongSharedExecutorPoolTest.java | 226 ---------
.../apache/cassandra/utils/LongBTreeTest.java | 502 -------------------
4 files changed, 10 insertions(+), 969 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa811c39/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index a5f195f..c019025 100644
--- a/build.xml
+++ b/build.xml
@@ -62,6 +62,7 @@
<property name="test.runners" value="1"/>
<property name="test.unit.src" value="${test.dir}/unit"/>
<property name="test.long.src" value="${test.dir}/long"/>
+ <property name="test.burn.src" value="${test.dir}/burn"/>
<property name="test.microbench.src" value="${test.dir}/microbench"/>
<property name="test.pig.src" value="${test.dir}/pig"/>
<property name="dist.dir" value="${build.dir}/dist"/>
@@ -96,6 +97,7 @@
<property name="test.timeout" value="60000" />
<property name="test.long.timeout" value="600000" />
+ <property name="test.burn.timeout" value="600000" />
<!-- default for cql tests. Can be override by -Dcassandra.test.use_prepared=false -->
<property name="cassandra.test.use_prepared" value="true" />
@@ -1092,6 +1094,7 @@
<compilerarg value="-XDignore.symbol.file"/>
<src path="${test.unit.src}"/>
<src path="${test.long.src}"/>
+ <src path="${test.burn.src}"/>
<src path="${test.pig.src}"/>
<src path="${test.microbench.src}"/>
</javac>
@@ -1302,6 +1305,12 @@
</testmacro>
</target>
+ <target name="test-burn" depends="build-test" description="Execute functional tests">
+ <testmacro suitename="burn" inputdir="${test.burn.src}"
+ timeout="${test.burn.timeout}">
+ </testmacro>
+ </target>
+
<target name="long-test" depends="build-test" description="Execute functional tests">
<testmacro suitename="long" inputdir="${test.long.src}"
timeout="${test.long.timeout}">
@@ -1389,7 +1398,7 @@
<target name="test-all"
depends="test,long-test,test-compression,pig-test,test-clientutil-jar"
- description="Run all tests" />
+ description="Run all tests except for those under test-burn" />
<!-- Use JaCoCo ant extension without needing externally saved lib -->
<target name="jacoco-init" depends="maven-ant-tasks-init">
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa811c39/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java b/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java
deleted file mode 100644
index d7105df..0000000
--- a/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java
+++ /dev/null
@@ -1,240 +0,0 @@
-package org.apache.cassandra.concurrent;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.utils.concurrent.OpOrder;
-
-import static org.junit.Assert.assertTrue;
-
-// TODO: we don't currently test SAFE functionality at all!
-// TODO: should also test markBlocking and SyncOrdered
-public class LongOpOrderTest
-{
-
- private static final Logger logger = LoggerFactory.getLogger(LongOpOrderTest.class);
-
- static final int CONSUMERS = 4;
- static final int PRODUCERS = 32;
-
- static final long RUNTIME = TimeUnit.MINUTES.toMillis(5);
- static final long REPORT_INTERVAL = TimeUnit.MINUTES.toMillis(1);
-
- static final Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler()
- {
- @Override
- public void uncaughtException(Thread t, Throwable e)
- {
- System.err.println(t.getName() + ": " + e.getMessage());
- e.printStackTrace();
- }
- };
-
- final OpOrder order = new OpOrder();
- final AtomicInteger errors = new AtomicInteger();
-
- class TestOrdering implements Runnable
- {
-
- final int[] waitNanos = new int[1 << 16];
- volatile State state = new State();
- final ScheduledExecutorService sched;
-
- TestOrdering(ExecutorService exec, ScheduledExecutorService sched)
- {
- this.sched = sched;
- final ThreadLocalRandom rnd = ThreadLocalRandom.current();
- for (int i = 0 ; i < waitNanos.length ; i++)
- waitNanos[i] = rnd.nextInt(5000);
- for (int i = 0 ; i < PRODUCERS / CONSUMERS ; i++)
- exec.execute(new Producer());
- exec.execute(this);
- }
-
- @Override
- public void run()
- {
- final long until = System.currentTimeMillis() + RUNTIME;
- long lastReport = System.currentTimeMillis();
- long count = 0;
- long opCount = 0;
- while (true)
- {
- long now = System.currentTimeMillis();
- if (now > until)
- break;
- if (now > lastReport + REPORT_INTERVAL)
- {
- lastReport = now;
- logger.info(String.format("%s: Executed %d barriers with %d operations. %.0f%% complete.",
- Thread.currentThread().getName(), count, opCount, 100 * (1 - ((until - now) / (double) RUNTIME))));
- }
- try
- {
- Thread.sleep(0, waitNanos[((int) (count & (waitNanos.length - 1)))]);
- } catch (InterruptedException e)
- {
- e.printStackTrace();
- }
-
- final State s = state;
- s.barrier = order.newBarrier();
- s.replacement = new State();
- s.barrier.issue();
- s.barrier.await();
- s.check();
- opCount += s.totalCount();
- state = s.replacement;
- sched.schedule(new Runnable()
- {
- @Override
- public void run()
- {
- s.check();
- }
- }, 1, TimeUnit.SECONDS);
- count++;
- }
- }
-
- class State
- {
-
- volatile OpOrder.Barrier barrier;
- volatile State replacement;
- final NonBlockingHashMap<OpOrder.Group, AtomicInteger> count = new NonBlockingHashMap<>();
- int checkCount = -1;
-
- boolean accept(OpOrder.Group opGroup)
- {
- if (barrier != null && !barrier.isAfter(opGroup))
- return false;
- AtomicInteger c;
- if (null == (c = count.get(opGroup)))
- {
- count.putIfAbsent(opGroup, new AtomicInteger());
- c = count.get(opGroup);
- }
- c.incrementAndGet();
- return true;
- }
-
- int totalCount()
- {
- int c = 0;
- for (AtomicInteger v : count.values())
- c += v.intValue();
- return c;
- }
-
- void check()
- {
- boolean delete;
- if (checkCount >= 0)
- {
- if (checkCount != totalCount())
- {
- errors.incrementAndGet();
- logger.error("Received size changed after barrier finished: {} vs {}", checkCount, totalCount());
- }
- delete = true;
- }
- else
- {
- checkCount = totalCount();
- delete = false;
- }
- for (Map.Entry<OpOrder.Group, AtomicInteger> e : count.entrySet())
- {
- if (e.getKey().compareTo(barrier.getSyncPoint()) > 0)
- {
- errors.incrementAndGet();
- logger.error("Received an operation that was created after the barrier was issued.");
- }
- if (TestOrdering.this.count.get(e.getKey()).intValue() != e.getValue().intValue())
- {
- errors.incrementAndGet();
- logger.error("Missing registered operations. {} vs {}", TestOrdering.this.count.get(e.getKey()).intValue(), e.getValue().intValue());
- }
- if (delete)
- TestOrdering.this.count.remove(e.getKey());
- }
- }
-
- }
-
- final NonBlockingHashMap<OpOrder.Group, AtomicInteger> count = new NonBlockingHashMap<>();
-
- class Producer implements Runnable
- {
- public void run()
- {
- while (true)
- {
- AtomicInteger c;
- try (OpOrder.Group opGroup = order.start())
- {
- if (null == (c = count.get(opGroup)))
- {
- count.putIfAbsent(opGroup, new AtomicInteger());
- c = count.get(opGroup);
- }
- c.incrementAndGet();
- State s = state;
- while (!s.accept(opGroup))
- s = s.replacement;
- }
- }
- }
- }
-
- }
-
- @Test
- public void testOrdering() throws InterruptedException
- {
- errors.set(0);
- Thread.setDefaultUncaughtExceptionHandler(handler);
- final ExecutorService exec = Executors.newCachedThreadPool(new NamedThreadFactory("checker"));
- final ScheduledExecutorService checker = Executors.newScheduledThreadPool(1, new NamedThreadFactory("checker"));
- for (int i = 0 ; i < CONSUMERS ; i++)
- new TestOrdering(exec, checker);
- exec.shutdown();
- exec.awaitTermination((long) (RUNTIME * 1.1), TimeUnit.MILLISECONDS);
- assertTrue(exec.isShutdown());
- assertTrue(errors.get() == 0);
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa811c39/test/long/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java b/test/long/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java
deleted file mode 100644
index fe464c7..0000000
--- a/test/long/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.concurrent;
-
-import java.util.BitSet;
-import java.util.TreeSet;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.locks.LockSupport;
-
-import com.google.common.util.concurrent.Uninterruptibles;
-import org.apache.commons.math3.distribution.WeibullDistribution;
-import org.junit.Test;
-
-public class LongSharedExecutorPoolTest
-{
-
- private static final class WaitTask implements Runnable
- {
- final long nanos;
-
- private WaitTask(long nanos)
- {
- this.nanos = nanos;
- }
-
- public void run()
- {
- LockSupport.parkNanos(nanos);
- }
- }
-
- private static final class Result implements Comparable<Result>
- {
- final Future<?> future;
- final long forecastedCompletion;
-
- private Result(Future<?> future, long forecastedCompletion)
- {
- this.future = future;
- this.forecastedCompletion = forecastedCompletion;
- }
-
- public int compareTo(Result that)
- {
- int c = Long.compare(this.forecastedCompletion, that.forecastedCompletion);
- if (c != 0)
- return c;
- c = Integer.compare(this.hashCode(), that.hashCode());
- if (c != 0)
- return c;
- return Integer.compare(this.future.hashCode(), that.future.hashCode());
- }
- }
-
- private static final class Batch implements Comparable<Batch>
- {
- final TreeSet<Result> results;
- final long timeout;
- final int executorIndex;
-
- private Batch(TreeSet<Result> results, long timeout, int executorIndex)
- {
- this.results = results;
- this.timeout = timeout;
- this.executorIndex = executorIndex;
- }
-
- public int compareTo(Batch that)
- {
- int c = Long.compare(this.timeout, that.timeout);
- if (c != 0)
- return c;
- c = Integer.compare(this.results.size(), that.results.size());
- if (c != 0)
- return c;
- return Integer.compare(this.hashCode(), that.hashCode());
- }
- }
-
- @Test
- public void testPromptnessOfExecution() throws InterruptedException, ExecutionException
- {
- testPromptnessOfExecution(TimeUnit.MINUTES.toNanos(2L), 0.5f);
- }
-
- private void testPromptnessOfExecution(long intervalNanos, float loadIncrement) throws InterruptedException, ExecutionException
- {
- final int executorCount = 4;
- int threadCount = 8;
- int maxQueued = 1024;
- final WeibullDistribution workTime = new WeibullDistribution(3, 200000);
- final long minWorkTime = TimeUnit.MICROSECONDS.toNanos(1);
- final long maxWorkTime = TimeUnit.MILLISECONDS.toNanos(1);
-
- final int[] threadCounts = new int[executorCount];
- final WeibullDistribution[] workCount = new WeibullDistribution[executorCount];
- final ExecutorService[] executors = new ExecutorService[executorCount];
- for (int i = 0 ; i < executors.length ; i++)
- {
- executors[i] = SharedExecutorPool.SHARED.newExecutor(threadCount, maxQueued, "test" + i, "test" + i);
- threadCounts[i] = threadCount;
- workCount[i] = new WeibullDistribution(2, maxQueued);
- threadCount *= 2;
- maxQueued *= 2;
- }
-
- long runs = 0;
- long events = 0;
- final TreeSet<Batch> pending = new TreeSet<>();
- final BitSet executorsWithWork = new BitSet(executorCount);
- long until = 0;
- // basic idea is to go through different levels of load on the executor service; initially is all small batches
- // (mostly within max queue size) of very short operations, moving to progressively larger batches
- // (beyond max queued size), and longer operations
- for (float multiplier = 0f ; multiplier < 2.01f ; )
- {
- if (System.nanoTime() > until)
- {
- System.out.println(String.format("Completed %.0fK batches with %.1fM events", runs * 0.001f, events * 0.000001f));
- events = 0;
- until = System.nanoTime() + intervalNanos;
- multiplier += loadIncrement;
- System.out.println(String.format("Running for %ds with load multiplier %.1f", TimeUnit.NANOSECONDS.toSeconds(intervalNanos), multiplier));
- }
-
- // wait a random amount of time so we submit new tasks in various stages of
- long timeout;
- if (pending.isEmpty()) timeout = 0;
- else if (Math.random() > 0.98) timeout = Long.MAX_VALUE;
- else if (pending.size() == executorCount) timeout = pending.first().timeout;
- else timeout = (long) (Math.random() * pending.last().timeout);
-
- while (!pending.isEmpty() && timeout > System.nanoTime())
- {
- Batch first = pending.first();
- boolean complete = false;
- try
- {
- for (Result result : first.results.descendingSet())
- result.future.get(timeout - System.nanoTime(), TimeUnit.NANOSECONDS);
- complete = true;
- }
- catch (TimeoutException e)
- {
- }
- if (!complete && System.nanoTime() > first.timeout)
- {
- for (Result result : first.results)
- if (!result.future.isDone())
- throw new AssertionError();
- complete = true;
- }
- if (complete)
- {
- pending.pollFirst();
- executorsWithWork.clear(first.executorIndex);
- }
- }
-
- // if we've emptied the executors, give all our threads an opportunity to spin down
- if (timeout == Long.MAX_VALUE)
- Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
-
- // submit a random batch to the first free executor service
- int executorIndex = executorsWithWork.nextClearBit(0);
- if (executorIndex >= executorCount)
- continue;
- executorsWithWork.set(executorIndex);
- ExecutorService executor = executors[executorIndex];
- TreeSet<Result> results = new TreeSet<>();
- int count = (int) (workCount[executorIndex].sample() * multiplier);
- long targetTotalElapsed = 0;
- long start = System.nanoTime();
- long baseTime;
- if (Math.random() > 0.5) baseTime = 2 * (long) (workTime.sample() * multiplier);
- else baseTime = 0;
- for (int j = 0 ; j < count ; j++)
- {
- long time;
- if (baseTime == 0) time = (long) (workTime.sample() * multiplier);
- else time = (long) (baseTime * Math.random());
- if (time < minWorkTime)
- time = minWorkTime;
- if (time > maxWorkTime)
- time = maxWorkTime;
- targetTotalElapsed += time;
- Future<?> future = executor.submit(new WaitTask(time));
- results.add(new Result(future, System.nanoTime() + time));
- }
- long end = start + (long) Math.ceil(targetTotalElapsed / (double) threadCounts[executorIndex])
- + TimeUnit.MILLISECONDS.toNanos(100L);
- long now = System.nanoTime();
- if (runs++ > executorCount && now > end)
- throw new AssertionError();
- events += results.size();
- pending.add(new Batch(results, end, executorIndex));
-// System.out.println(String.format("Submitted batch to executor %d with %d items and %d permitted millis", executorIndex, count, TimeUnit.NANOSECONDS.toMillis(end - start)));
- }
- }
-
- public static void main(String[] args) throws InterruptedException, ExecutionException
- {
- // do longer test
- new LongSharedExecutorPoolTest().testPromptnessOfExecution(TimeUnit.MINUTES.toNanos(10L), 0.1f);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa811c39/test/long/org/apache/cassandra/utils/LongBTreeTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/utils/LongBTreeTest.java b/test/long/org/apache/cassandra/utils/LongBTreeTest.java
deleted file mode 100644
index 9641930..0000000
--- a/test/long/org/apache/cassandra/utils/LongBTreeTest.java
+++ /dev/null
@@ -1,502 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cassandra.utils;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NavigableMap;
-import java.util.NavigableSet;
-import java.util.Random;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.google.common.base.Function;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListenableFutureTask;
-import org.junit.Assert;
-import org.junit.Test;
-
-
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.Snapshot;
-import com.codahale.metrics.Timer;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.utils.btree.BTree;
-import org.apache.cassandra.utils.btree.BTreeSearchIterator;
-import org.apache.cassandra.utils.btree.BTreeSet;
-import org.apache.cassandra.utils.btree.UpdateFunction;
-
-// TODO : should probably lower fan-factor for tests to make them more intensive
-public class LongBTreeTest
-{
-
- private static final MetricRegistry metrics = new MetricRegistry();
- private static final Timer BTREE_TIMER = metrics.timer(MetricRegistry.name(BTree.class, "BTREE"));
- private static final Timer TREE_TIMER = metrics.timer(MetricRegistry.name(BTree.class, "TREE"));
- private static final ExecutorService MODIFY = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new NamedThreadFactory("MODIFY"));
- private static final ExecutorService COMPARE = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new NamedThreadFactory("COMPARE"));
- private static final RandomAbort<Integer> SPORADIC_ABORT = new RandomAbort<>(new Random(), 0.0001f);
-
- static
- {
- System.setProperty("cassandra.btree.fanfactor", "4");
- }
-
- @Test
- public void testOversizedMiddleInsert()
- {
- TreeSet<Integer> canon = new TreeSet<>();
- for (int i = 0 ; i < 10000000 ; i++)
- canon.add(i);
- Object[] btree = BTree.build(Arrays.asList(Integer.MIN_VALUE, Integer.MAX_VALUE), ICMP, true, null);
- btree = BTree.update(btree, ICMP, canon, true);
- canon.add(Integer.MIN_VALUE);
- canon.add(Integer.MAX_VALUE);
- Assert.assertTrue(BTree.isWellFormed(btree, ICMP));
- testEqual("Oversize", BTree.<Integer>slice(btree, true), canon.iterator());
- }
-
- @Test
- public void testIndividualInsertsSmallOverlappingRange() throws ExecutionException, InterruptedException
- {
- testInsertions(10000000, 50, 1, 1, true);
- }
-
- @Test
- public void testBatchesSmallOverlappingRange() throws ExecutionException, InterruptedException
- {
- testInsertions(10000000, 50, 1, 5, true);
- }
-
- @Test
- public void testIndividualInsertsMediumSparseRange() throws ExecutionException, InterruptedException
- {
- testInsertions(10000000, 500, 10, 1, true);
- }
-
- @Test
- public void testBatchesMediumSparseRange() throws ExecutionException, InterruptedException
- {
- testInsertions(10000000, 500, 10, 10, true);
- }
-
- @Test
- public void testLargeBatchesLargeRange() throws ExecutionException, InterruptedException
- {
- testInsertions(100000000, 5000, 3, 100, true);
- }
-
- @Test
- public void testSlicingSmallRandomTrees() throws ExecutionException, InterruptedException
- {
- testInsertions(10000, 50, 10, 10, false);
- }
-
- @Test
- public void testSearchIterator() throws InterruptedException
- {
- int threads = Runtime.getRuntime().availableProcessors();
- final CountDownLatch latch = new CountDownLatch(threads);
- final AtomicLong errors = new AtomicLong();
- final AtomicLong count = new AtomicLong();
- final int perThreadTrees = 100;
- final int perTreeSelections = 100;
- final long totalCount = threads * perThreadTrees * perTreeSelections;
- for (int t = 0 ; t < threads ; t++)
- {
- MODIFY.execute(new Runnable()
- {
- public void run()
- {
- ThreadLocalRandom random = ThreadLocalRandom.current();
- for (int i = 0 ; i < perThreadTrees ; i++)
- {
- Object[] tree = randomTree(10000, random);
- for (int j = 0 ; j < perTreeSelections ; j++)
- {
- BTreeSearchIterator<Integer, Integer, Integer> searchIterator = new BTreeSearchIterator<>(tree, ICMP);
- for (Integer key : randomSelection(tree, random))
- if (key != searchIterator.next(key))
- errors.incrementAndGet();
- searchIterator = new BTreeSearchIterator<Integer, Integer, Integer>(tree, ICMP);
- for (Integer key : randomMix(tree, random))
- if (key != searchIterator.next(key))
- if (BTree.find(tree, ICMP, key) == key)
- errors.incrementAndGet();
- count.incrementAndGet();
- }
- }
- latch.countDown();
- }
- });
- }
- while (latch.getCount() > 0)
- {
- latch.await(10L, TimeUnit.SECONDS);
- System.out.println(String.format("%.0f%% complete %s", 100 * count.get() / (double) totalCount, errors.get() > 0 ? ("Errors: " + errors.get()) : ""));
- assert errors.get() == 0;
- }
- }
-
- private static void testInsertions(int totalCount, int perTestCount, int testKeyRatio, int modificationBatchSize, boolean quickEquality) throws ExecutionException, InterruptedException
- {
- int batchesPerTest = perTestCount / modificationBatchSize;
- int maximumRunLength = 100;
- int testKeyRange = perTestCount * testKeyRatio;
- int tests = totalCount / perTestCount;
- System.out.println(String.format("Performing %d tests of %d operations, with %.2f max size/key-range ratio in batches of ~%d ops",
- tests, perTestCount, 1 / (float) testKeyRatio, modificationBatchSize));
-
- // if we're not doing quick-equality, we can spam with garbage for all the checks we perform, so we'll split the work into smaller chunks
- int chunkSize = quickEquality ? tests : (int) (100000 / Math.pow(perTestCount, 2));
- for (int chunk = 0 ; chunk < tests ; chunk += chunkSize)
- {
- final List<ListenableFutureTask<List<ListenableFuture<?>>>> outer = new ArrayList<>();
- for (int i = 0 ; i < chunkSize ; i++)
- {
- outer.add(doOneTestInsertions(testKeyRange, maximumRunLength, modificationBatchSize, batchesPerTest, quickEquality));
- }
-
- final List<ListenableFuture<?>> inner = new ArrayList<>();
- int complete = 0;
- int reportInterval = totalCount / 100;
- int lastReportAt = 0;
- for (ListenableFutureTask<List<ListenableFuture<?>>> f : outer)
- {
- inner.addAll(f.get());
- complete += perTestCount;
- if (complete - lastReportAt >= reportInterval)
- {
- System.out.println(String.format("Completed %d of %d operations", (chunk * perTestCount) + complete, totalCount));
- lastReportAt = complete;
- }
- }
- Futures.allAsList(inner).get();
- }
- Snapshot snap = BTREE_TIMER.getSnapshot();
- System.out.println(String.format("btree : %.2fns, %.2fns, %.2fns", snap.getMedian(), snap.get95thPercentile(), snap.get999thPercentile()));
- snap = TREE_TIMER.getSnapshot();
- System.out.println(String.format("snaptree: %.2fns, %.2fns, %.2fns", snap.getMedian(), snap.get95thPercentile(), snap.get999thPercentile()));
- System.out.println("Done");
- }
-
- private static ListenableFutureTask<List<ListenableFuture<?>>> doOneTestInsertions(final int upperBound, final int maxRunLength, final int averageModsPerIteration, final int iterations, final boolean quickEquality)
- {
- ListenableFutureTask<List<ListenableFuture<?>>> f = ListenableFutureTask.create(new Callable<List<ListenableFuture<?>>>()
- {
- @Override
- public List<ListenableFuture<?>> call()
- {
- final List<ListenableFuture<?>> r = new ArrayList<>();
- NavigableMap<Integer, Integer> canon = new TreeMap<>();
- Object[] btree = BTree.empty();
- final TreeMap<Integer, Integer> buffer = new TreeMap<>();
- final Random rnd = new Random();
- for (int i = 0 ; i < iterations ; i++)
- {
- buffer.clear();
- int mods = (averageModsPerIteration >> 1) + 1 + rnd.nextInt(averageModsPerIteration);
- while (mods > 0)
- {
- int v = rnd.nextInt(upperBound);
- int rc = Math.max(0, Math.min(mods, maxRunLength) - 1);
- int c = 1 + (rc <= 0 ? 0 : rnd.nextInt(rc));
- for (int j = 0 ; j < c ; j++)
- {
- buffer.put(v, v);
- v++;
- }
- mods -= c;
- }
- Timer.Context ctxt;
- ctxt = TREE_TIMER.time();
- canon.putAll(buffer);
- ctxt.stop();
- ctxt = BTREE_TIMER.time();
- Object[] next = null;
- while (next == null)
- next = BTree.update(btree, ICMP, buffer.keySet(), true, SPORADIC_ABORT);
- btree = next;
- ctxt.stop();
-
- if (!BTree.isWellFormed(btree, ICMP))
- {
- System.out.println("ERROR: Not well formed");
- throw new AssertionError("Not well formed!");
- }
- if (quickEquality)
- testEqual("", BTree.<Integer>slice(btree, true), canon.keySet().iterator());
- else
- r.addAll(testAllSlices("RND", btree, new TreeSet<>(canon.keySet())));
- }
- return r;
- }
- });
- MODIFY.execute(f);
- return f;
- }
-
- @Test
- public void testSlicingAllSmallTrees() throws ExecutionException, InterruptedException
- {
- Object[] cur = BTree.empty();
- TreeSet<Integer> canon = new TreeSet<>();
- // we set FAN_FACTOR to 4, so 128 items is four levels deep, three fully populated
- for (int i = 0 ; i < 128 ; i++)
- {
- String id = String.format("[0..%d)", canon.size());
- System.out.println("Testing " + id);
- Futures.allAsList(testAllSlices(id, cur, canon)).get();
- Object[] next = null;
- while (next == null)
- next = BTree.update(cur, ICMP, Arrays.asList(i), true, SPORADIC_ABORT);
- cur = next;
- canon.add(i);
- }
- }
-
- static final Comparator<Integer> ICMP = new Comparator<Integer>()
- {
- @Override
- public int compare(Integer o1, Integer o2)
- {
- return Integer.compare(o1, o2);
- }
- };
-
- private static List<ListenableFuture<?>> testAllSlices(String id, Object[] btree, NavigableSet<Integer> canon)
- {
- List<ListenableFuture<?>> waitFor = new ArrayList<>();
- testAllSlices(id + " ASC", new BTreeSet<>(btree, ICMP), canon, true, waitFor);
- testAllSlices(id + " DSC", new BTreeSet<>(btree, ICMP).descendingSet(), canon.descendingSet(), false, waitFor);
- return waitFor;
- }
-
- private static void testAllSlices(String id, NavigableSet<Integer> btree, NavigableSet<Integer> canon, boolean ascending, List<ListenableFuture<?>> results)
- {
- testOneSlice(id, btree, canon, results);
- for (Integer lb : range(canon.size(), Integer.MIN_VALUE, ascending))
- {
- // test head/tail sets
- testOneSlice(String.format("%s->[%d..)", id, lb), btree.headSet(lb, true), canon.headSet(lb, true), results);
- testOneSlice(String.format("%s->(%d..)", id, lb), btree.headSet(lb, false), canon.headSet(lb, false), results);
- testOneSlice(String.format("%s->(..%d]", id, lb), btree.tailSet(lb, true), canon.tailSet(lb, true), results);
- testOneSlice(String.format("%s->(..%d]", id, lb), btree.tailSet(lb, false), canon.tailSet(lb, false), results);
- for (Integer ub : range(canon.size(), lb, ascending))
- {
- // test subsets
- testOneSlice(String.format("%s->[%d..%d]", id, lb, ub), btree.subSet(lb, true, ub, true), canon.subSet(lb, true, ub, true), results);
- testOneSlice(String.format("%s->(%d..%d]", id, lb, ub), btree.subSet(lb, false, ub, true), canon.subSet(lb, false, ub, true), results);
- testOneSlice(String.format("%s->[%d..%d)", id, lb, ub), btree.subSet(lb, true, ub, false), canon.subSet(lb, true, ub, false), results);
- testOneSlice(String.format("%s->(%d..%d)", id, lb, ub), btree.subSet(lb, false, ub, false), canon.subSet(lb, false, ub, false), results);
- }
- }
- }
-
- private static void testOneSlice(final String id, final NavigableSet<Integer> test, final NavigableSet<Integer> canon, List<ListenableFuture<?>> results)
- {
- ListenableFutureTask<?> f = ListenableFutureTask.create(new Runnable()
- {
-
- @Override
- public void run()
- {
- test(id + " Count", test.size(), canon.size());
- testEqual(id, test.iterator(), canon.iterator());
- testEqual(id + "->DSCI", test.descendingIterator(), canon.descendingIterator());
- testEqual(id + "->DSCS", test.descendingSet().iterator(), canon.descendingSet().iterator());
- testEqual(id + "->DSCS->DSCI", test.descendingSet().descendingIterator(), canon.descendingSet().descendingIterator());
- }
- }, null);
- results.add(f);
- COMPARE.execute(f);
- }
-
- private static void test(String id, int test, int expect)
- {
- if (test != expect)
- {
- System.out.println(String.format("%s: Expected %d, Got %d", id, expect, test));
- }
- }
-
- private static <V> void testEqual(String id, Iterator<V> btree, Iterator<V> canon)
- {
- boolean equal = true;
- while (btree.hasNext() && canon.hasNext())
- {
- Object i = btree.next();
- Object j = canon.next();
- if (!i.equals(j))
- {
- System.out.println(String.format("%s: Expected %d, Got %d", id, j, i));
- equal = false;
- }
- }
- while (btree.hasNext())
- {
- System.out.println(String.format("%s: Expected <Nil>, Got %d", id, btree.next()));
- equal = false;
- }
- while (canon.hasNext())
- {
- System.out.println(String.format("%s: Expected %d, Got Nil", id, canon.next()));
- equal = false;
- }
- if (!equal)
- throw new AssertionError("Not equal");
- }
-
- // should only be called on sets that range from 0->N or N->0
- private static final Iterable<Integer> range(final int size, final int from, final boolean ascending)
- {
- return new Iterable<Integer>()
- {
- int cur;
- int delta;
- int end;
- {
- if (ascending)
- {
- end = size + 1;
- cur = from == Integer.MIN_VALUE ? -1 : from;
- delta = 1;
- }
- else
- {
- end = -2;
- cur = from == Integer.MIN_VALUE ? size : from;
- delta = -1;
- }
- }
- @Override
- public Iterator<Integer> iterator()
- {
- return new Iterator<Integer>()
- {
- @Override
- public boolean hasNext()
- {
- return cur != end;
- }
-
- @Override
- public Integer next()
- {
- Integer r = cur;
- cur += delta;
- return r;
- }
-
- @Override
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
- };
- }
- };
- }
-
- private static Object[] randomTree(int maxSize, Random random)
- {
- TreeSet<Integer> build = new TreeSet<>();
- int size = random.nextInt(maxSize);
- for (int i = 0 ; i < size ; i++)
- {
- build.add(random.nextInt());
- }
- return BTree.build(build, ICMP, true, UpdateFunction.NoOp.<Integer>instance());
- }
-
- private static Iterable<Integer> randomSelection(Object[] iter, final Random rnd)
- {
- final float proportion = rnd.nextFloat();
- return Iterables.filter(new BTreeSet<>(iter, ICMP), new Predicate<Integer>()
- {
- public boolean apply(Integer integer)
- {
- return rnd.nextFloat() < proportion;
- }
- });
- }
-
- private static Iterable<Integer> randomMix(Object[] iter, final Random rnd)
- {
- final float proportion = rnd.nextFloat();
- return Iterables.transform(new BTreeSet<>(iter, ICMP), new Function<Integer, Integer>()
- {
- long last = Integer.MIN_VALUE;
-
- public Integer apply(Integer v)
- {
- long last = this.last;
- this.last = v;
- if (rnd.nextFloat() < proportion)
- return v;
- return (int)((v - last) / 2);
- }
- });
- }
-
- private static final class RandomAbort<V> implements UpdateFunction<V>
- {
- final Random rnd;
- final float chance;
- private RandomAbort(Random rnd, float chance)
- {
- this.rnd = rnd;
- this.chance = chance;
- }
-
- public V apply(V replacing, V update)
- {
- return update;
- }
-
- public boolean abortEarly()
- {
- return rnd.nextFloat() < chance;
- }
-
- public void allocated(long heapSize)
- {
-
- }
-
- public V apply(V v)
- {
- return v;
- }
- }
-}