You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/06/30 12:47:30 UTC
[06/51] [partial] cassandra git commit: Storage engine refactor,
a.k.a CASSANDRA-8099
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/utils/memory/HeapPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/HeapPool.java b/src/java/org/apache/cassandra/utils/memory/HeapPool.java
index 3d3ca09..19f81be 100644
--- a/src/java/org/apache/cassandra/utils/memory/HeapPool.java
+++ b/src/java/org/apache/cassandra/utils/memory/HeapPool.java
@@ -22,7 +22,6 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-import org.apache.cassandra.db.Cell;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.utils.concurrent.OpOrder;
@@ -40,63 +39,66 @@ public class HeapPool extends MemtablePool
public MemtableAllocator newAllocator()
{
- return new Allocator(this);
+ // TODO
+ throw new UnsupportedOperationException();
+ //return new Allocator(this);
}
- public static class Allocator extends MemtableBufferAllocator
- {
- Allocator(HeapPool pool)
- {
- super(pool.onHeap.newAllocator(), pool.offHeap.newAllocator());
- }
+ // TODO
+ //public static class Allocator extends MemtableBufferAllocator
+ //{
+ // Allocator(HeapPool pool)
+ // {
+ // super(pool.onHeap.newAllocator(), pool.offHeap.newAllocator());
+ // }
- public ByteBuffer allocate(int size, OpOrder.Group opGroup)
- {
- super.onHeap().allocate(size, opGroup);
- return ByteBuffer.allocate(size);
- }
+ // public ByteBuffer allocate(int size, OpOrder.Group opGroup)
+ // {
+ // super.onHeap().allocate(size, opGroup);
+ // return ByteBuffer.allocate(size);
+ // }
- public DataReclaimer reclaimer()
- {
- return new Reclaimer();
- }
+ // public DataReclaimer reclaimer()
+ // {
+ // return new Reclaimer();
+ // }
- private class Reclaimer implements DataReclaimer
- {
- List<Cell> delayed;
+ // private class Reclaimer implements DataReclaimer
+ // {
+ // List<Cell> delayed;
- public Reclaimer reclaim(Cell cell)
- {
- if (delayed == null)
- delayed = new ArrayList<>();
- delayed.add(cell);
- return this;
- }
+ // public Reclaimer reclaim(Cell cell)
+ // {
+ // if (delayed == null)
+ // delayed = new ArrayList<>();
+ // delayed.add(cell);
+ // return this;
+ // }
- public Reclaimer reclaimImmediately(Cell cell)
- {
- onHeap().release(cell.name().dataSize() + cell.value().remaining());
- return this;
- }
+ // public Reclaimer reclaimImmediately(Cell cell)
+ // {
+ // onHeap().release(cell.name().dataSize() + cell.value().remaining());
+ // return this;
+ // }
- public Reclaimer reclaimImmediately(DecoratedKey key)
- {
- onHeap().release(key.getKey().remaining());
- return this;
- }
+ // public Reclaimer reclaimImmediately(DecoratedKey key)
+ // {
+ // onHeap().release(key.getKey().remaining());
+ // return this;
+ // }
- public void cancel()
- {
- if (delayed != null)
- delayed.clear();
- }
+ // public void cancel()
+ // {
+ // if (delayed != null)
+ // delayed.clear();
+ // }
- public void commit()
- {
- if (delayed != null)
- for (Cell cell : delayed)
- reclaimImmediately(cell);
- }
- }
- }
+ // public void commit()
+ // {
+ // if (delayed != null)
+ // for (Cell cell : delayed)
+ // reclaimImmediately(cell);
+ // }
+ // }
+ //}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java b/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java
index e814b4d..1e0c11e 100644
--- a/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.utils.concurrent.OpOrder;
import org.apache.cassandra.utils.concurrent.WaitQueue;
@@ -58,10 +59,8 @@ public abstract class MemtableAllocator
this.offHeap = offHeap;
}
- public abstract Cell clone(Cell cell, CFMetaData cfm, OpOrder.Group writeOp);
- public abstract CounterCell clone(CounterCell cell, CFMetaData cfm, OpOrder.Group writeOp);
- public abstract DeletedCell clone(DeletedCell cell, CFMetaData cfm, OpOrder.Group writeOp);
- public abstract ExpiringCell clone(ExpiringCell cell, CFMetaData cfm, OpOrder.Group writeOp);
+ public abstract MemtableRowData.ReusableRow newReusableRow();
+ public abstract RowAllocator newRowAllocator(CFMetaData cfm, OpOrder.Group writeOp);
public abstract DecoratedKey clone(DecoratedKey key, OpOrder.Group opGroup);
public abstract DataReclaimer reclaimer();
@@ -104,10 +103,16 @@ public abstract class MemtableAllocator
return state == LifeCycle.LIVE;
}
+ public static interface RowAllocator extends Row.Writer
+ {
+ public void allocateNewRow(int clusteringSize, Columns columns, boolean isStatic);
+ public MemtableRowData allocatedRowData();
+ }
+
public static interface DataReclaimer
{
- public DataReclaimer reclaim(Cell cell);
- public DataReclaimer reclaimImmediately(Cell cell);
+ public DataReclaimer reclaim(MemtableRowData row);
+ public DataReclaimer reclaimImmediately(MemtableRowData row);
public DataReclaimer reclaimImmediately(DecoratedKey key);
public void cancel();
public void commit();
@@ -115,12 +120,12 @@ public abstract class MemtableAllocator
public static final DataReclaimer NO_OP = new DataReclaimer()
{
- public DataReclaimer reclaim(Cell cell)
+ public DataReclaimer reclaim(MemtableRowData update)
{
return this;
}
- public DataReclaimer reclaimImmediately(Cell cell)
+ public DataReclaimer reclaimImmediately(MemtableRowData update)
{
return this;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/utils/memory/MemtableBufferAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/MemtableBufferAllocator.java b/src/java/org/apache/cassandra/utils/memory/MemtableBufferAllocator.java
index 7034d76..144f439 100644
--- a/src/java/org/apache/cassandra/utils/memory/MemtableBufferAllocator.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemtableBufferAllocator.java
@@ -20,12 +20,9 @@ package org.apache.cassandra.utils.memory;
import java.nio.ByteBuffer;
import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.BufferDecoratedKey;
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.db.CounterCell;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.DeletedCell;
-import org.apache.cassandra.db.ExpiringCell;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.utils.concurrent.OpOrder;
public abstract class MemtableBufferAllocator extends MemtableAllocator
@@ -36,24 +33,14 @@ public abstract class MemtableBufferAllocator extends MemtableAllocator
super(onHeap, offHeap);
}
- public Cell clone(Cell cell, CFMetaData cfm, OpOrder.Group writeOp)
+ public MemtableRowData.ReusableRow newReusableRow()
{
- return cell.localCopy(cfm, allocator(writeOp));
+ return MemtableRowData.BufferRowData.createReusableRow();
}
- public CounterCell clone(CounterCell cell, CFMetaData cfm, OpOrder.Group writeOp)
+ public RowAllocator newRowAllocator(CFMetaData cfm, OpOrder.Group writeOp)
{
- return cell.localCopy(cfm, allocator(writeOp));
- }
-
- public DeletedCell clone(DeletedCell cell, CFMetaData cfm, OpOrder.Group writeOp)
- {
- return cell.localCopy(cfm, allocator(writeOp));
- }
-
- public ExpiringCell clone(ExpiringCell cell, CFMetaData cfm, OpOrder.Group writeOp)
- {
- return cell.localCopy(cfm, allocator(writeOp));
+ return new RowBufferAllocator(allocator(writeOp), cfm.isCounter());
}
public DecoratedKey clone(DecoratedKey key, OpOrder.Group writeOp)
@@ -67,4 +54,71 @@ public abstract class MemtableBufferAllocator extends MemtableAllocator
{
return new ContextAllocator(writeOp, this);
}
+
+ private static class RowBufferAllocator extends RowDataBlock.Writer implements RowAllocator
+ {
+ private final AbstractAllocator allocator;
+ private final boolean isCounter;
+
+ private MemtableRowData.BufferClustering clustering;
+ private int clusteringIdx;
+ private LivenessInfo info;
+ private DeletionTime deletion;
+ private RowDataBlock data;
+
+ private RowBufferAllocator(AbstractAllocator allocator, boolean isCounter)
+ {
+ super(true);
+ this.allocator = allocator;
+ this.isCounter = isCounter;
+ }
+
+ public void allocateNewRow(int clusteringSize, Columns columns, boolean isStatic)
+ {
+ data = new RowDataBlock(columns, 1, false, isCounter);
+ clustering = isStatic ? null : new MemtableRowData.BufferClustering(clusteringSize);
+ clusteringIdx = 0;
+ updateWriter(data);
+ }
+
+ public MemtableRowData allocatedRowData()
+ {
+ MemtableRowData row = new MemtableRowData.BufferRowData(clustering == null ? Clustering.STATIC_CLUSTERING : clustering,
+ info,
+ deletion,
+ data);
+
+ clustering = null;
+ info = LivenessInfo.NONE;
+ deletion = DeletionTime.LIVE;
+ data = null;
+
+ return row;
+ }
+
+ public void writeClusteringValue(ByteBuffer value)
+ {
+ clustering.setClusteringValue(clusteringIdx++, value == null ? null : allocator.clone(value));
+ }
+
+ public void writePartitionKeyLivenessInfo(LivenessInfo info)
+ {
+ this.info = info;
+ }
+
+ public void writeRowDeletion(DeletionTime deletion)
+ {
+ this.deletion = deletion;
+ }
+
+ @Override
+ public void writeCell(ColumnDefinition column, boolean isCounter, ByteBuffer value, LivenessInfo info, CellPath path)
+ {
+ ByteBuffer v = allocator.clone(value);
+ if (column.isComplex())
+ complexWriter.addCell(column, v, info, MemtableRowData.BufferCellPath.clone(path, allocator));
+ else
+ simpleWriter.addCell(column, v, info);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java b/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
index 88846c5..7ca859d 100644
--- a/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
+++ b/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
@@ -25,16 +25,9 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.db.CounterCell;
import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.DeletedCell;
-import org.apache.cassandra.db.ExpiringCell;
-import org.apache.cassandra.db.NativeCell;
-import org.apache.cassandra.db.NativeCounterCell;
import org.apache.cassandra.db.NativeDecoratedKey;
-import org.apache.cassandra.db.NativeDeletedCell;
-import org.apache.cassandra.db.NativeExpiringCell;
+import org.apache.cassandra.db.rows.MemtableRowData;
import org.apache.cassandra.utils.concurrent.OpOrder;
public class NativeAllocator extends MemtableAllocator
@@ -60,28 +53,16 @@ public class NativeAllocator extends MemtableAllocator
super(pool.onHeap.newAllocator(), pool.offHeap.newAllocator());
}
- @Override
- public Cell clone(Cell cell, CFMetaData cfm, OpOrder.Group writeOp)
- {
- return new NativeCell(this, writeOp, cell);
- }
-
- @Override
- public CounterCell clone(CounterCell cell, CFMetaData cfm, OpOrder.Group writeOp)
+ public MemtableRowData.ReusableRow newReusableRow()
{
- return new NativeCounterCell(this, writeOp, cell);
+ // TODO
+ throw new UnsupportedOperationException();
}
- @Override
- public DeletedCell clone(DeletedCell cell, CFMetaData cfm, OpOrder.Group writeOp)
- {
- return new NativeDeletedCell(this, writeOp, cell);
- }
-
- @Override
- public ExpiringCell clone(ExpiringCell cell, CFMetaData cfm, OpOrder.Group writeOp)
+ public RowAllocator newRowAllocator(CFMetaData cfm, OpOrder.Group writeOp)
{
- return new NativeExpiringCell(this, writeOp, cell);
+ // TODO
+ throw new UnsupportedOperationException();
}
public DecoratedKey clone(DecoratedKey key, OpOrder.Group writeOp)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/burn/org/apache/cassandra/utils/LongBTreeTest.java
----------------------------------------------------------------------
diff --git a/test/burn/org/apache/cassandra/utils/LongBTreeTest.java b/test/burn/org/apache/cassandra/utils/LongBTreeTest.java
deleted file mode 100644
index 9641930..0000000
--- a/test/burn/org/apache/cassandra/utils/LongBTreeTest.java
+++ /dev/null
@@ -1,502 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cassandra.utils;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NavigableMap;
-import java.util.NavigableSet;
-import java.util.Random;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.google.common.base.Function;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListenableFutureTask;
-import org.junit.Assert;
-import org.junit.Test;
-
-
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.Snapshot;
-import com.codahale.metrics.Timer;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.utils.btree.BTree;
-import org.apache.cassandra.utils.btree.BTreeSearchIterator;
-import org.apache.cassandra.utils.btree.BTreeSet;
-import org.apache.cassandra.utils.btree.UpdateFunction;
-
-// TODO : should probably lower fan-factor for tests to make them more intensive
-public class LongBTreeTest
-{
-
- private static final MetricRegistry metrics = new MetricRegistry();
- private static final Timer BTREE_TIMER = metrics.timer(MetricRegistry.name(BTree.class, "BTREE"));
- private static final Timer TREE_TIMER = metrics.timer(MetricRegistry.name(BTree.class, "TREE"));
- private static final ExecutorService MODIFY = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new NamedThreadFactory("MODIFY"));
- private static final ExecutorService COMPARE = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new NamedThreadFactory("COMPARE"));
- private static final RandomAbort<Integer> SPORADIC_ABORT = new RandomAbort<>(new Random(), 0.0001f);
-
- static
- {
- System.setProperty("cassandra.btree.fanfactor", "4");
- }
-
- @Test
- public void testOversizedMiddleInsert()
- {
- TreeSet<Integer> canon = new TreeSet<>();
- for (int i = 0 ; i < 10000000 ; i++)
- canon.add(i);
- Object[] btree = BTree.build(Arrays.asList(Integer.MIN_VALUE, Integer.MAX_VALUE), ICMP, true, null);
- btree = BTree.update(btree, ICMP, canon, true);
- canon.add(Integer.MIN_VALUE);
- canon.add(Integer.MAX_VALUE);
- Assert.assertTrue(BTree.isWellFormed(btree, ICMP));
- testEqual("Oversize", BTree.<Integer>slice(btree, true), canon.iterator());
- }
-
- @Test
- public void testIndividualInsertsSmallOverlappingRange() throws ExecutionException, InterruptedException
- {
- testInsertions(10000000, 50, 1, 1, true);
- }
-
- @Test
- public void testBatchesSmallOverlappingRange() throws ExecutionException, InterruptedException
- {
- testInsertions(10000000, 50, 1, 5, true);
- }
-
- @Test
- public void testIndividualInsertsMediumSparseRange() throws ExecutionException, InterruptedException
- {
- testInsertions(10000000, 500, 10, 1, true);
- }
-
- @Test
- public void testBatchesMediumSparseRange() throws ExecutionException, InterruptedException
- {
- testInsertions(10000000, 500, 10, 10, true);
- }
-
- @Test
- public void testLargeBatchesLargeRange() throws ExecutionException, InterruptedException
- {
- testInsertions(100000000, 5000, 3, 100, true);
- }
-
- @Test
- public void testSlicingSmallRandomTrees() throws ExecutionException, InterruptedException
- {
- testInsertions(10000, 50, 10, 10, false);
- }
-
- @Test
- public void testSearchIterator() throws InterruptedException
- {
- int threads = Runtime.getRuntime().availableProcessors();
- final CountDownLatch latch = new CountDownLatch(threads);
- final AtomicLong errors = new AtomicLong();
- final AtomicLong count = new AtomicLong();
- final int perThreadTrees = 100;
- final int perTreeSelections = 100;
- final long totalCount = threads * perThreadTrees * perTreeSelections;
- for (int t = 0 ; t < threads ; t++)
- {
- MODIFY.execute(new Runnable()
- {
- public void run()
- {
- ThreadLocalRandom random = ThreadLocalRandom.current();
- for (int i = 0 ; i < perThreadTrees ; i++)
- {
- Object[] tree = randomTree(10000, random);
- for (int j = 0 ; j < perTreeSelections ; j++)
- {
- BTreeSearchIterator<Integer, Integer, Integer> searchIterator = new BTreeSearchIterator<>(tree, ICMP);
- for (Integer key : randomSelection(tree, random))
- if (key != searchIterator.next(key))
- errors.incrementAndGet();
- searchIterator = new BTreeSearchIterator<Integer, Integer, Integer>(tree, ICMP);
- for (Integer key : randomMix(tree, random))
- if (key != searchIterator.next(key))
- if (BTree.find(tree, ICMP, key) == key)
- errors.incrementAndGet();
- count.incrementAndGet();
- }
- }
- latch.countDown();
- }
- });
- }
- while (latch.getCount() > 0)
- {
- latch.await(10L, TimeUnit.SECONDS);
- System.out.println(String.format("%.0f%% complete %s", 100 * count.get() / (double) totalCount, errors.get() > 0 ? ("Errors: " + errors.get()) : ""));
- assert errors.get() == 0;
- }
- }
-
- private static void testInsertions(int totalCount, int perTestCount, int testKeyRatio, int modificationBatchSize, boolean quickEquality) throws ExecutionException, InterruptedException
- {
- int batchesPerTest = perTestCount / modificationBatchSize;
- int maximumRunLength = 100;
- int testKeyRange = perTestCount * testKeyRatio;
- int tests = totalCount / perTestCount;
- System.out.println(String.format("Performing %d tests of %d operations, with %.2f max size/key-range ratio in batches of ~%d ops",
- tests, perTestCount, 1 / (float) testKeyRatio, modificationBatchSize));
-
- // if we're not doing quick-equality, we can spam with garbage for all the checks we perform, so we'll split the work into smaller chunks
- int chunkSize = quickEquality ? tests : (int) (100000 / Math.pow(perTestCount, 2));
- for (int chunk = 0 ; chunk < tests ; chunk += chunkSize)
- {
- final List<ListenableFutureTask<List<ListenableFuture<?>>>> outer = new ArrayList<>();
- for (int i = 0 ; i < chunkSize ; i++)
- {
- outer.add(doOneTestInsertions(testKeyRange, maximumRunLength, modificationBatchSize, batchesPerTest, quickEquality));
- }
-
- final List<ListenableFuture<?>> inner = new ArrayList<>();
- int complete = 0;
- int reportInterval = totalCount / 100;
- int lastReportAt = 0;
- for (ListenableFutureTask<List<ListenableFuture<?>>> f : outer)
- {
- inner.addAll(f.get());
- complete += perTestCount;
- if (complete - lastReportAt >= reportInterval)
- {
- System.out.println(String.format("Completed %d of %d operations", (chunk * perTestCount) + complete, totalCount));
- lastReportAt = complete;
- }
- }
- Futures.allAsList(inner).get();
- }
- Snapshot snap = BTREE_TIMER.getSnapshot();
- System.out.println(String.format("btree : %.2fns, %.2fns, %.2fns", snap.getMedian(), snap.get95thPercentile(), snap.get999thPercentile()));
- snap = TREE_TIMER.getSnapshot();
- System.out.println(String.format("snaptree: %.2fns, %.2fns, %.2fns", snap.getMedian(), snap.get95thPercentile(), snap.get999thPercentile()));
- System.out.println("Done");
- }
-
- private static ListenableFutureTask<List<ListenableFuture<?>>> doOneTestInsertions(final int upperBound, final int maxRunLength, final int averageModsPerIteration, final int iterations, final boolean quickEquality)
- {
- ListenableFutureTask<List<ListenableFuture<?>>> f = ListenableFutureTask.create(new Callable<List<ListenableFuture<?>>>()
- {
- @Override
- public List<ListenableFuture<?>> call()
- {
- final List<ListenableFuture<?>> r = new ArrayList<>();
- NavigableMap<Integer, Integer> canon = new TreeMap<>();
- Object[] btree = BTree.empty();
- final TreeMap<Integer, Integer> buffer = new TreeMap<>();
- final Random rnd = new Random();
- for (int i = 0 ; i < iterations ; i++)
- {
- buffer.clear();
- int mods = (averageModsPerIteration >> 1) + 1 + rnd.nextInt(averageModsPerIteration);
- while (mods > 0)
- {
- int v = rnd.nextInt(upperBound);
- int rc = Math.max(0, Math.min(mods, maxRunLength) - 1);
- int c = 1 + (rc <= 0 ? 0 : rnd.nextInt(rc));
- for (int j = 0 ; j < c ; j++)
- {
- buffer.put(v, v);
- v++;
- }
- mods -= c;
- }
- Timer.Context ctxt;
- ctxt = TREE_TIMER.time();
- canon.putAll(buffer);
- ctxt.stop();
- ctxt = BTREE_TIMER.time();
- Object[] next = null;
- while (next == null)
- next = BTree.update(btree, ICMP, buffer.keySet(), true, SPORADIC_ABORT);
- btree = next;
- ctxt.stop();
-
- if (!BTree.isWellFormed(btree, ICMP))
- {
- System.out.println("ERROR: Not well formed");
- throw new AssertionError("Not well formed!");
- }
- if (quickEquality)
- testEqual("", BTree.<Integer>slice(btree, true), canon.keySet().iterator());
- else
- r.addAll(testAllSlices("RND", btree, new TreeSet<>(canon.keySet())));
- }
- return r;
- }
- });
- MODIFY.execute(f);
- return f;
- }
-
- @Test
- public void testSlicingAllSmallTrees() throws ExecutionException, InterruptedException
- {
- Object[] cur = BTree.empty();
- TreeSet<Integer> canon = new TreeSet<>();
- // we set FAN_FACTOR to 4, so 128 items is four levels deep, three fully populated
- for (int i = 0 ; i < 128 ; i++)
- {
- String id = String.format("[0..%d)", canon.size());
- System.out.println("Testing " + id);
- Futures.allAsList(testAllSlices(id, cur, canon)).get();
- Object[] next = null;
- while (next == null)
- next = BTree.update(cur, ICMP, Arrays.asList(i), true, SPORADIC_ABORT);
- cur = next;
- canon.add(i);
- }
- }
-
- static final Comparator<Integer> ICMP = new Comparator<Integer>()
- {
- @Override
- public int compare(Integer o1, Integer o2)
- {
- return Integer.compare(o1, o2);
- }
- };
-
- private static List<ListenableFuture<?>> testAllSlices(String id, Object[] btree, NavigableSet<Integer> canon)
- {
- List<ListenableFuture<?>> waitFor = new ArrayList<>();
- testAllSlices(id + " ASC", new BTreeSet<>(btree, ICMP), canon, true, waitFor);
- testAllSlices(id + " DSC", new BTreeSet<>(btree, ICMP).descendingSet(), canon.descendingSet(), false, waitFor);
- return waitFor;
- }
-
- private static void testAllSlices(String id, NavigableSet<Integer> btree, NavigableSet<Integer> canon, boolean ascending, List<ListenableFuture<?>> results)
- {
- testOneSlice(id, btree, canon, results);
- for (Integer lb : range(canon.size(), Integer.MIN_VALUE, ascending))
- {
- // test head/tail sets
- testOneSlice(String.format("%s->[%d..)", id, lb), btree.headSet(lb, true), canon.headSet(lb, true), results);
- testOneSlice(String.format("%s->(%d..)", id, lb), btree.headSet(lb, false), canon.headSet(lb, false), results);
- testOneSlice(String.format("%s->(..%d]", id, lb), btree.tailSet(lb, true), canon.tailSet(lb, true), results);
- testOneSlice(String.format("%s->(..%d]", id, lb), btree.tailSet(lb, false), canon.tailSet(lb, false), results);
- for (Integer ub : range(canon.size(), lb, ascending))
- {
- // test subsets
- testOneSlice(String.format("%s->[%d..%d]", id, lb, ub), btree.subSet(lb, true, ub, true), canon.subSet(lb, true, ub, true), results);
- testOneSlice(String.format("%s->(%d..%d]", id, lb, ub), btree.subSet(lb, false, ub, true), canon.subSet(lb, false, ub, true), results);
- testOneSlice(String.format("%s->[%d..%d)", id, lb, ub), btree.subSet(lb, true, ub, false), canon.subSet(lb, true, ub, false), results);
- testOneSlice(String.format("%s->(%d..%d)", id, lb, ub), btree.subSet(lb, false, ub, false), canon.subSet(lb, false, ub, false), results);
- }
- }
- }
-
- private static void testOneSlice(final String id, final NavigableSet<Integer> test, final NavigableSet<Integer> canon, List<ListenableFuture<?>> results)
- {
- ListenableFutureTask<?> f = ListenableFutureTask.create(new Runnable()
- {
-
- @Override
- public void run()
- {
- test(id + " Count", test.size(), canon.size());
- testEqual(id, test.iterator(), canon.iterator());
- testEqual(id + "->DSCI", test.descendingIterator(), canon.descendingIterator());
- testEqual(id + "->DSCS", test.descendingSet().iterator(), canon.descendingSet().iterator());
- testEqual(id + "->DSCS->DSCI", test.descendingSet().descendingIterator(), canon.descendingSet().descendingIterator());
- }
- }, null);
- results.add(f);
- COMPARE.execute(f);
- }
-
- private static void test(String id, int test, int expect)
- {
- if (test != expect)
- {
- System.out.println(String.format("%s: Expected %d, Got %d", id, expect, test));
- }
- }
-
- private static <V> void testEqual(String id, Iterator<V> btree, Iterator<V> canon)
- {
- boolean equal = true;
- while (btree.hasNext() && canon.hasNext())
- {
- Object i = btree.next();
- Object j = canon.next();
- if (!i.equals(j))
- {
- System.out.println(String.format("%s: Expected %d, Got %d", id, j, i));
- equal = false;
- }
- }
- while (btree.hasNext())
- {
- System.out.println(String.format("%s: Expected <Nil>, Got %d", id, btree.next()));
- equal = false;
- }
- while (canon.hasNext())
- {
- System.out.println(String.format("%s: Expected %d, Got Nil", id, canon.next()));
- equal = false;
- }
- if (!equal)
- throw new AssertionError("Not equal");
- }
-
- // should only be called on sets that range from 0->N or N->0
- private static final Iterable<Integer> range(final int size, final int from, final boolean ascending)
- {
- return new Iterable<Integer>()
- {
- int cur;
- int delta;
- int end;
- {
- if (ascending)
- {
- end = size + 1;
- cur = from == Integer.MIN_VALUE ? -1 : from;
- delta = 1;
- }
- else
- {
- end = -2;
- cur = from == Integer.MIN_VALUE ? size : from;
- delta = -1;
- }
- }
- @Override
- public Iterator<Integer> iterator()
- {
- return new Iterator<Integer>()
- {
- @Override
- public boolean hasNext()
- {
- return cur != end;
- }
-
- @Override
- public Integer next()
- {
- Integer r = cur;
- cur += delta;
- return r;
- }
-
- @Override
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
- };
- }
- };
- }
-
- private static Object[] randomTree(int maxSize, Random random)
- {
- TreeSet<Integer> build = new TreeSet<>();
- int size = random.nextInt(maxSize);
- for (int i = 0 ; i < size ; i++)
- {
- build.add(random.nextInt());
- }
- return BTree.build(build, ICMP, true, UpdateFunction.NoOp.<Integer>instance());
- }
-
- private static Iterable<Integer> randomSelection(Object[] iter, final Random rnd)
- {
- final float proportion = rnd.nextFloat();
- return Iterables.filter(new BTreeSet<>(iter, ICMP), new Predicate<Integer>()
- {
- public boolean apply(Integer integer)
- {
- return rnd.nextFloat() < proportion;
- }
- });
- }
-
- private static Iterable<Integer> randomMix(Object[] iter, final Random rnd)
- {
- final float proportion = rnd.nextFloat();
- return Iterables.transform(new BTreeSet<>(iter, ICMP), new Function<Integer, Integer>()
- {
- long last = Integer.MIN_VALUE;
-
- public Integer apply(Integer v)
- {
- long last = this.last;
- this.last = v;
- if (rnd.nextFloat() < proportion)
- return v;
- return (int)((v - last) / 2);
- }
- });
- }
-
- private static final class RandomAbort<V> implements UpdateFunction<V>
- {
- final Random rnd;
- final float chance;
- private RandomAbort(Random rnd, float chance)
- {
- this.rnd = rnd;
- this.chance = chance;
- }
-
- public V apply(V replacing, V update)
- {
- return update;
- }
-
- public boolean abortEarly()
- {
- return rnd.nextFloat() < chance;
- }
-
- public void allocated(long heapSize)
- {
-
- }
-
- public V apply(V v)
- {
- return v;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml
index 3d3de84..cf76e75 100644
--- a/test/conf/cassandra.yaml
+++ b/test/conf/cassandra.yaml
@@ -3,7 +3,7 @@
# Consider the effects on 'o.a.c.i.s.LegacySSTableTest' before changing schemas in this file.
#
cluster_name: Test Cluster
-memtable_allocation_type: offheap_objects
+memtable_allocation_type: heap_buffers
commitlog_sync: batch
commitlog_sync_batch_window_in_ms: 1.0
commitlog_segment_size_in_mb: 5
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/conf/logback-test.xml
----------------------------------------------------------------------
diff --git a/test/conf/logback-test.xml b/test/conf/logback-test.xml
index 8cb2d6f..9facb83 100644
--- a/test/conf/logback-test.xml
+++ b/test/conf/logback-test.xml
@@ -68,7 +68,7 @@
<appender-ref ref="TEE"/>
</appender>
- <root level="DEBUG">
+ <root level="INFO">
<appender-ref ref="ASYNC" />
</root>
</configuration>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-CompressionInfo.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-CompressionInfo.db b/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-CompressionInfo.db
deleted file mode 100644
index 44d2e59..0000000
Binary files a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-CompressionInfo.db and /dev/null differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Data.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Data.db b/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Data.db
deleted file mode 100644
index f75c4e6..0000000
Binary files a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Data.db and /dev/null differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Filter.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Filter.db b/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Filter.db
deleted file mode 100644
index 8f0a999..0000000
Binary files a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Filter.db and /dev/null differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Index.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Index.db b/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Index.db
deleted file mode 100644
index da84fbc..0000000
Binary files a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Index.db and /dev/null differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Statistics.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Statistics.db b/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Statistics.db
deleted file mode 100644
index 0762615..0000000
Binary files a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Statistics.db and /dev/null differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Summary.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Summary.db b/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Summary.db
deleted file mode 100644
index 376ca9d..0000000
Binary files a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Summary.db and /dev/null differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-TOC.txt
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-TOC.txt b/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-TOC.txt
deleted file mode 100644
index cf6efa8..0000000
--- a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-TOC.txt
+++ /dev/null
@@ -1,7 +0,0 @@
-CompressionInfo.db
-TOC.txt
-Filter.db
-Statistics.db
-Data.db
-Summary.db
-Index.db
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/data/corrupt-sstables/la-1-big-CRC.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/la-1-big-CRC.db b/test/data/corrupt-sstables/la-1-big-CRC.db
new file mode 100644
index 0000000..f13b9c7
Binary files /dev/null and b/test/data/corrupt-sstables/la-1-big-CRC.db differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/data/corrupt-sstables/la-1-big-Data.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/la-1-big-Data.db b/test/data/corrupt-sstables/la-1-big-Data.db
new file mode 100644
index 0000000..dc516d8
Binary files /dev/null and b/test/data/corrupt-sstables/la-1-big-Data.db differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/data/corrupt-sstables/la-1-big-Digest.adler32
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/la-1-big-Digest.adler32 b/test/data/corrupt-sstables/la-1-big-Digest.adler32
new file mode 100644
index 0000000..e447277
--- /dev/null
+++ b/test/data/corrupt-sstables/la-1-big-Digest.adler32
@@ -0,0 +1 @@
+2370519993
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/data/corrupt-sstables/la-1-big-Filter.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/la-1-big-Filter.db b/test/data/corrupt-sstables/la-1-big-Filter.db
new file mode 100644
index 0000000..709d2ea
Binary files /dev/null and b/test/data/corrupt-sstables/la-1-big-Filter.db differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/data/corrupt-sstables/la-1-big-Index.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/la-1-big-Index.db b/test/data/corrupt-sstables/la-1-big-Index.db
new file mode 100644
index 0000000..178221e
Binary files /dev/null and b/test/data/corrupt-sstables/la-1-big-Index.db differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/data/corrupt-sstables/la-1-big-Statistics.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/la-1-big-Statistics.db b/test/data/corrupt-sstables/la-1-big-Statistics.db
new file mode 100644
index 0000000..23b76ac
Binary files /dev/null and b/test/data/corrupt-sstables/la-1-big-Statistics.db differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/data/corrupt-sstables/la-1-big-Summary.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/la-1-big-Summary.db b/test/data/corrupt-sstables/la-1-big-Summary.db
new file mode 100644
index 0000000..732f27c
Binary files /dev/null and b/test/data/corrupt-sstables/la-1-big-Summary.db differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/data/corrupt-sstables/la-1-big-TOC.txt
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/la-1-big-TOC.txt b/test/data/corrupt-sstables/la-1-big-TOC.txt
new file mode 100644
index 0000000..9cbcd44
--- /dev/null
+++ b/test/data/corrupt-sstables/la-1-big-TOC.txt
@@ -0,0 +1,8 @@
+Statistics.db
+Filter.db
+Data.db
+Summary.db
+Digest.adler32
+CRC.db
+TOC.txt
+Index.db
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/long/org/apache/cassandra/cql3/DropKeyspaceCommitLogRecycleTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/cql3/DropKeyspaceCommitLogRecycleTest.java b/test/long/org/apache/cassandra/cql3/DropKeyspaceCommitLogRecycleTest.java
deleted file mode 100644
index a0bacea..0000000
--- a/test/long/org/apache/cassandra/cql3/DropKeyspaceCommitLogRecycleTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.cql3;
-
-import org.junit.After;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.SchemaLoader;
-
-import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal;
-
-/**
- * Base class for CQL tests.
- */
-public class DropKeyspaceCommitLogRecycleTest
-{
- protected static final Logger logger = LoggerFactory.getLogger(DropKeyspaceCommitLogRecycleTest.class);
-
- private static final String KEYSPACE = "cql_test_keyspace";
- private static final String KEYSPACE2 = "cql_test_keyspace2";
-
- static
- {
- // Once per-JVM is enough
- SchemaLoader.prepareServer();
- }
-
- private void create(boolean both)
- {
- executeOnceInternal(String.format("CREATE KEYSPACE %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}", KEYSPACE));
- executeOnceInternal(String.format("CREATE TABLE %s.test (k1 int, k2 int, v int, PRIMARY KEY (k1, k2))", KEYSPACE));
-
- if (both)
- {
- executeOnceInternal(String.format("CREATE KEYSPACE %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}", KEYSPACE2));
- executeOnceInternal(String.format("CREATE TABLE %s.test (k1 int, k2 int, v int, PRIMARY KEY (k1, k2))", KEYSPACE2));
- }
- }
-
- private void insert()
- {
- executeOnceInternal(String.format("INSERT INTO %s.test (k1, k2, v) VALUES (0, 0, 0)", KEYSPACE));
- executeOnceInternal(String.format("INSERT INTO %s.test (k1, k2, v) VALUES (1, 1, 1)", KEYSPACE));
- executeOnceInternal(String.format("INSERT INTO %s.test (k1, k2, v) VALUES (2, 2, 2)", KEYSPACE));
-
- executeOnceInternal(String.format("INSERT INTO %s.test (k1, k2, v) VALUES (0, 0, 0)", KEYSPACE2));
- executeOnceInternal(String.format("INSERT INTO %s.test (k1, k2, v) VALUES (1, 1, 1)", KEYSPACE2));
- executeOnceInternal(String.format("INSERT INTO %s.test (k1, k2, v) VALUES (2, 2, 2)", KEYSPACE2));
- }
-
- private void drop(boolean both)
- {
- executeOnceInternal(String.format("DROP KEYSPACE IF EXISTS %s", KEYSPACE));
- if (both)
- executeOnceInternal(String.format("DROP KEYSPACE IF EXISTS %s", KEYSPACE2));
- }
-
- @Test
- public void testRecycle()
- {
- for (int i = 0 ; i < 1000 ; i++)
- {
- create(i == 0);
- insert();
- drop(false);
- }
- }
-
- @After
- public void afterTest() throws Throwable
- {
- drop(true);
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/long/org/apache/cassandra/db/LongFlushMemtableTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/LongFlushMemtableTest.java b/test/long/org/apache/cassandra/db/LongFlushMemtableTest.java
deleted file mode 100644
index 24993c8..0000000
--- a/test/long/org/apache/cassandra/db/LongFlushMemtableTest.java
+++ /dev/null
@@ -1,86 +0,0 @@
-package org.apache.cassandra.db;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.Util;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.KSMetaData;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.db.marshal.UTF8Type;
-import org.apache.cassandra.locator.SimpleStrategy;
-import org.apache.cassandra.service.MigrationManager;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-public class LongFlushMemtableTest
-{
- public static final String KEYSPACE1 = "LongFlushMemtableTest";
-
- @BeforeClass
- public static void defineSchema() throws ConfigurationException
- {
- SchemaLoader.loadSchema();
- SchemaLoader.createKeyspace(KEYSPACE1,
- SimpleStrategy.class,
- KSMetaData.optsWithRF(1));
- }
-
- @Test
- public void testFlushMemtables() throws ConfigurationException
- {
- Keyspace keyspace = Keyspace.open(KEYSPACE1);
- for (int i = 0; i < 100; i++)
- {
- CFMetaData metadata = CFMetaData.denseCFMetaData(keyspace.getName(), "_CF" + i, UTF8Type.instance);
- MigrationManager.announceNewColumnFamily(metadata);
- }
-
- for (int j = 0; j < 200; j++)
- {
- for (int i = 0; i < 100; i++)
- {
- Mutation rm = new Mutation(KEYSPACE1, ByteBufferUtil.bytes("key" + j));
- ColumnFamily cf = ArrayBackedSortedColumns.factory.create(KEYSPACE1, "_CF" + i);
- // don't cheat by allocating this outside of the loop; that defeats the purpose of deliberately using lots of memory
- ByteBuffer value = ByteBuffer.allocate(100000);
- cf.addColumn(new BufferCell(Util.cellname("c"), value));
- rm.add(cf);
- rm.applyUnsafe();
- }
- }
-
- int flushes = 0;
- for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
- {
- if (cfs.name.startsWith("_CF"))
- flushes += cfs.metric.memtableSwitchCount.getCount();
- }
- assert flushes > 0;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/long/org/apache/cassandra/db/LongKeyspaceTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/LongKeyspaceTest.java b/test/long/org/apache/cassandra/db/LongKeyspaceTest.java
deleted file mode 100644
index fe22da8..0000000
--- a/test/long/org/apache/cassandra/db/LongKeyspaceTest.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db;
-
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.config.KSMetaData;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.locator.SimpleStrategy;
-import org.apache.cassandra.utils.WrappedRunnable;
-import static org.apache.cassandra.Util.column;
-
-import org.apache.cassandra.Util;
-
-public class LongKeyspaceTest
-{
- public static final String KEYSPACE1 = "LongKeyspaceTest";
- public static final String CF_STANDARD = "Standard1";
-
- @BeforeClass
- public static void defineSchema() throws ConfigurationException
- {
- SchemaLoader.prepareServer();
- SchemaLoader.createKeyspace(KEYSPACE1,
- SimpleStrategy.class,
- KSMetaData.optsWithRF(1),
- SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD));
- }
-
- @Test
- public void testGetRowMultiColumn() throws Throwable
- {
- final Keyspace keyspace = Keyspace.open(KEYSPACE1);
- final ColumnFamilyStore cfStore = keyspace.getColumnFamilyStore("Standard1");
-
- for (int i = 1; i < 5000; i += 100)
- {
- Mutation rm = new Mutation(KEYSPACE1, Util.dk("key" + i).getKey());
- ColumnFamily cf = ArrayBackedSortedColumns.factory.create(KEYSPACE1, "Standard1");
- for (int j = 0; j < i; j++)
- cf.addColumn(column("c" + j, "v" + j, 1L));
- rm.add(cf);
- rm.applyUnsafe();
- }
-
- Runnable verify = new WrappedRunnable()
- {
- public void runMayThrow() throws Exception
- {
- ColumnFamily cf;
- for (int i = 1; i < 5000; i += 100)
- {
- for (int j = 0; j < i; j++)
- {
- cf = cfStore.getColumnFamily(Util.namesQueryFilter(cfStore, Util.dk("key" + i), "c" + j));
- KeyspaceTest.assertColumns(cf, "c" + j);
- }
- }
-
- }
- };
- KeyspaceTest.reTest(keyspace.getColumnFamilyStore("Standard1"), verify);
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/long/org/apache/cassandra/db/commitlog/ComitLogStress.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/commitlog/ComitLogStress.java b/test/long/org/apache/cassandra/db/commitlog/ComitLogStress.java
deleted file mode 100644
index b4efd49..0000000
--- a/test/long/org/apache/cassandra/db/commitlog/ComitLogStress.java
+++ /dev/null
@@ -1,97 +0,0 @@
-package org.apache.cassandra.db.commitlog;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.cassandra.Util;
-import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.UUIDGen;
-
-public class ComitLogStress
-{
-
- public static final String format = "%s,%s,%s,%s,%s,%s";
-
- public static void main(String[] args) throws Exception {
- int NUM_THREADS = Runtime.getRuntime().availableProcessors();
- if (args.length >= 1) {
- NUM_THREADS = Integer.parseInt(args[0]);
- System.out.println("Setting num threads to: " + NUM_THREADS);
- }
- ExecutorService executor = new JMXEnabledThreadPoolExecutor(NUM_THREADS, NUM_THREADS, 60,
- TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10 * NUM_THREADS), new NamedThreadFactory("Stress"), "");
- ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1);
-
- org.apache.cassandra.SchemaLoader.loadSchema();
- org.apache.cassandra.SchemaLoader.schemaDefinition(""); // leave def. blank to maintain old behaviour
- final AtomicLong count = new AtomicLong();
- final long start = System.currentTimeMillis();
- System.out.println(String.format(format, "seconds", "max_mb", "allocated_mb", "free_mb", "diffrence", "count"));
- scheduled.scheduleAtFixedRate(new Runnable() {
- long lastUpdate = 0;
-
- public void run() {
- Runtime runtime = Runtime.getRuntime();
- long maxMemory = mb(runtime.maxMemory());
- long allocatedMemory = mb(runtime.totalMemory());
- long freeMemory = mb(runtime.freeMemory());
- long temp = count.get();
- System.out.println(String.format(format, ((System.currentTimeMillis() - start) / 1000),
- maxMemory, allocatedMemory, freeMemory, (temp - lastUpdate), lastUpdate));
- lastUpdate = temp;
- }
- }, 1, 1, TimeUnit.SECONDS);
-
- while (true) {
- executor.execute(new CommitlogExecutor());
- count.incrementAndGet();
- }
- }
-
- private static long mb(long maxMemory) {
- return maxMemory / (1024 * 1024);
- }
-
- static final String keyString = UUIDGen.getTimeUUID().toString();
- public static class CommitlogExecutor implements Runnable {
- public void run() {
- String ks = "Keyspace1";
- ByteBuffer key = ByteBufferUtil.bytes(keyString);
- for (int i=0; i<100; ++i) {
- Mutation mutation = new Mutation(ks, key);
- mutation.add("Standard1", Util.cellname("name"), ByteBufferUtil.bytes("value" + i),
- System.currentTimeMillis());
- CommitLog.instance.add(mutation);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
index 5897dec..9b4dde7 100644
--- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
+++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
@@ -30,6 +30,7 @@ import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -48,15 +49,20 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
+import org.apache.cassandra.UpdateBuilder;
import org.apache.cassandra.config.Config.CommitLogSync;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.ParameterizedClass;
import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.ColumnSerializer;
import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.io.util.FastByteArrayInputStream;
+import org.apache.cassandra.utils.FBUtilities;
public class CommitLogStressTest
{
@@ -354,8 +360,7 @@ public class CommitLogStressTest
}
double time = (System.currentTimeMillis() - start) / 1000.0;
double avg = (temp / time);
- System.out
- .println(
+ System.out.println(
String.format("second %d mem max %.0fmb allocated %.0fmb free %.0fmb mutations %d since start %d avg %.3f content %.1fmb ondisk %.1fmb transfer %.3fmb",
((System.currentTimeMillis() - start) / 1000),
mb(maxMemory),
@@ -421,20 +426,20 @@ public class CommitLogStressTest
{
if (rl != null)
rl.acquire();
- String ks = "Keyspace1";
ByteBuffer key = randomBytes(16, rand);
- Mutation mutation = new Mutation(ks, key);
+ UpdateBuilder builder = UpdateBuilder.create(Schema.instance.getCFMetaData("Keyspace1", "Standard1"), Util.dk(key));
for (int ii = 0; ii < numCells; ii++)
{
int sz = randomSize ? rand.nextInt(cellSize) : cellSize;
ByteBuffer bytes = randomBytes(sz, rand);
- mutation.add("Standard1", Util.cellname("name" + ii), bytes, System.currentTimeMillis());
+ builder.newRow("name" + ii).add("val", bytes);
hash = hash(hash, bytes);
++cells;
dataSize += sz;
}
- rp = commitLog.add(mutation);
+
+ rp = commitLog.add(new Mutation(builder.build()));
counter.incrementAndGet();
}
}
@@ -468,7 +473,7 @@ public class CommitLogStressTest
{
mutation = Mutation.serializer.deserialize(new DataInputStream(bufIn),
desc.getMessagingVersion(),
- ColumnSerializer.Flag.LOCAL);
+ SerializationHelper.Flag.LOCAL);
}
catch (IOException e)
{
@@ -476,18 +481,24 @@ public class CommitLogStressTest
throw new AssertionError(e);
}
- for (ColumnFamily cf : mutation.getColumnFamilies())
+ for (PartitionUpdate cf : mutation.getPartitionUpdates())
{
- for (Cell c : cf.getSortedColumns())
+
+ Iterator<Row> rowIterator = cf.iterator();
+
+ while (rowIterator.hasNext())
{
- if (new String(c.name().toByteBuffer().array(), StandardCharsets.UTF_8).startsWith("name"))
+ Row row = rowIterator.next();
+ if (!(UTF8Type.instance.compose(row.clustering().get(0)).startsWith("name")))
+ continue;
+
+ for (Cell cell : row)
{
- hash = hash(hash, c.value());
+ hash = hash(hash, cell.value());
++cells;
}
}
}
}
-
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java b/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
index e6c8f56..ca223ca 100644
--- a/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
+++ b/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
@@ -26,12 +26,14 @@ import org.junit.BeforeClass;
import org.junit.Before;
import org.junit.Test;
+import org.apache.cassandra.UpdateBuilder;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.config.KSMetaData;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.Util;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.io.sstable.SSTableUtils;
@@ -93,7 +95,7 @@ public class LongCompactionsTest
testCompaction(100, 800, 5);
}
- protected void testCompaction(int sstableCount, int rowsPerSSTable, int colsPerRow) throws Exception
+ protected void testCompaction(int sstableCount, int partitionsPerSSTable, int rowsPerPartition) throws Exception
{
CompactionManager.instance.disableAutoCompaction();
@@ -103,17 +105,16 @@ public class LongCompactionsTest
ArrayList<SSTableReader> sstables = new ArrayList<>();
for (int k = 0; k < sstableCount; k++)
{
- SortedMap<String,ColumnFamily> rows = new TreeMap<String,ColumnFamily>();
- for (int j = 0; j < rowsPerSSTable; j++)
+ SortedMap<String, PartitionUpdate> rows = new TreeMap<>();
+ for (int j = 0; j < partitionsPerSSTable; j++)
{
String key = String.valueOf(j);
- Cell[] cols = new Cell[colsPerRow];
- for (int i = 0; i < colsPerRow; i++)
- {
- // last sstable has highest timestamps
- cols[i] = Util.column(String.valueOf(i), String.valueOf(i), k);
- }
- rows.put(key, SSTableUtils.createCF(KEYSPACE1, CF_STANDARD, Long.MIN_VALUE, Integer.MIN_VALUE, cols));
+ // last sstable has highest timestamps
+ UpdateBuilder builder = UpdateBuilder.create(store.metadata, String.valueOf(j))
+ .withTimestamp(k);
+ for (int i = 0; i < rowsPerPartition; i++)
+ builder.newRow(String.valueOf(i)).add("val", String.valueOf(i));
+ rows.put(key, builder.build());
}
SSTableReader sstable = SSTableUtils.prepare().write(rows);
sstables.add(sstable);
@@ -133,8 +134,8 @@ public class LongCompactionsTest
System.out.println(String.format("%s: sstables=%d rowsper=%d colsper=%d: %d ms",
this.getClass().getName(),
sstableCount,
- rowsPerSSTable,
- colsPerRow,
+ partitionsPerSSTable,
+ rowsPerPartition,
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)));
}
@@ -157,23 +158,23 @@ public class LongCompactionsTest
for (int j = 0; j < SSTABLES; j++) {
for (int i = 0; i < ROWS_PER_SSTABLE; i++) {
DecoratedKey key = Util.dk(String.valueOf(i % 2));
- Mutation rm = new Mutation(KEYSPACE1, key.getKey());
long timestamp = j * ROWS_PER_SSTABLE + i;
- rm.add("Standard1", Util.cellname(String.valueOf(i / 2)),
- ByteBufferUtil.EMPTY_BYTE_BUFFER,
- timestamp);
maxTimestampExpected = Math.max(timestamp, maxTimestampExpected);
- rm.apply();
+ UpdateBuilder.create(cfs.metadata, key)
+ .withTimestamp(timestamp)
+ .newRow(String.valueOf(i / 2)).add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
+ .apply();
+
inserted.add(key);
}
cfs.forceBlockingFlush();
CompactionsTest.assertMaxTimestamp(cfs, maxTimestampExpected);
- assertEquals(inserted.toString(), inserted.size(), Util.getRangeSlice(cfs).size());
+
+ assertEquals(inserted.toString(), inserted.size(), Util.getAll(Util.cmd(cfs).build()).size());
}
forceCompactions(cfs);
-
- assertEquals(inserted.size(), Util.getRangeSlice(cfs).size());
+ assertEquals(inserted.toString(), inserted.size(), Util.getAll(Util.cmd(cfs).build()).size());
// make sure max timestamp of compacted sstables is recorded properly after compaction.
CompactionsTest.assertMaxTimestamp(cfs, maxTimestampExpected);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
index cc8203e..fbee72a 100644
--- a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
+++ b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
@@ -27,6 +27,7 @@ import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
+import org.apache.cassandra.UpdateBuilder;
import org.apache.cassandra.config.KSMetaData;
import org.apache.cassandra.db.*;
import org.apache.cassandra.exceptions.ConfigurationException;
@@ -75,11 +76,11 @@ public class LongLeveledCompactionStrategyTest
for (int r = 0; r < rows; r++)
{
DecoratedKey key = Util.dk(String.valueOf(r));
- Mutation rm = new Mutation(ksname, key.getKey());
+ UpdateBuilder builder = UpdateBuilder.create(store.metadata, key);
for (int c = 0; c < columns; c++)
- {
- rm.add(cfname, Util.cellname("column" + c), value, 0);
- }
+ builder.newRow("column" + c).add("val", value);
+
+ Mutation rm = new Mutation(builder.build());
rm.apply();
store.forceBlockingFlush();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/unit/org/apache/cassandra/AbstractReadCommandBuilder.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/AbstractReadCommandBuilder.java b/test/unit/org/apache/cassandra/AbstractReadCommandBuilder.java
new file mode 100644
index 0000000..575036f
--- /dev/null
+++ b/test/unit/org/apache/cassandra/AbstractReadCommandBuilder.java
@@ -0,0 +1,327 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.cassandra;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.dht.*;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+public abstract class AbstractReadCommandBuilder
+{
+ protected final ColumnFamilyStore cfs;
+ protected int nowInSeconds;
+
+ private int cqlLimit = -1;
+ private int pagingLimit = -1;
+ private boolean reversed = false;
+
+ private Set<ColumnIdentifier> columns;
+ protected final RowFilter filter = RowFilter.create();
+
+ private Slice.Bound lowerClusteringBound;
+ private Slice.Bound upperClusteringBound;
+
+ private NavigableSet<Clustering> clusterings;
+
+ // Use Util.cmd() instead of this ctor directly
+ AbstractReadCommandBuilder(ColumnFamilyStore cfs)
+ {
+ this.cfs = cfs;
+ this.nowInSeconds = FBUtilities.nowInSeconds();
+ }
+
+ public AbstractReadCommandBuilder withNowInSeconds(int nowInSec)
+ {
+ this.nowInSeconds = nowInSec;
+ return this;
+ }
+
+ public AbstractReadCommandBuilder fromIncl(Object... values)
+ {
+ assert lowerClusteringBound == null && clusterings == null;
+ this.lowerClusteringBound = Slice.Bound.create(cfs.metadata.comparator, true, true, values);
+ return this;
+ }
+
+ public AbstractReadCommandBuilder fromExcl(Object... values)
+ {
+ assert lowerClusteringBound == null && clusterings == null;
+ this.lowerClusteringBound = Slice.Bound.create(cfs.metadata.comparator, true, false, values);
+ return this;
+ }
+
+ public AbstractReadCommandBuilder toIncl(Object... values)
+ {
+ assert upperClusteringBound == null && clusterings == null;
+ this.upperClusteringBound = Slice.Bound.create(cfs.metadata.comparator, false, true, values);
+ return this;
+ }
+
+ public AbstractReadCommandBuilder toExcl(Object... values)
+ {
+ assert upperClusteringBound == null && clusterings == null;
+ this.upperClusteringBound = Slice.Bound.create(cfs.metadata.comparator, false, false, values);
+ return this;
+ }
+
+ public AbstractReadCommandBuilder includeRow(Object... values)
+ {
+ assert lowerClusteringBound == null && upperClusteringBound == null;
+
+ if (this.clusterings == null)
+ this.clusterings = new TreeSet<>(cfs.metadata.comparator);
+
+ this.clusterings.add(cfs.metadata.comparator.make(values));
+ return this;
+ }
+
+ public AbstractReadCommandBuilder reverse()
+ {
+ this.reversed = true;
+ return this;
+ }
+
+ public AbstractReadCommandBuilder withLimit(int newLimit)
+ {
+ this.cqlLimit = newLimit;
+ return this;
+ }
+
+ public AbstractReadCommandBuilder withPagingLimit(int newLimit)
+ {
+ this.pagingLimit = newLimit;
+ return this;
+ }
+
+ public AbstractReadCommandBuilder columns(String... columns)
+ {
+ if (this.columns == null)
+ this.columns = new HashSet<>();
+
+ for (String column : columns)
+ this.columns.add(ColumnIdentifier.getInterned(column, true));
+ return this;
+ }
+
+ private ByteBuffer bb(Object value, AbstractType<?> type)
+ {
+ return value instanceof ByteBuffer ? (ByteBuffer)value : ((AbstractType)type).decompose(value);
+ }
+
+ private AbstractType<?> forValues(AbstractType<?> collectionType)
+ {
+ assert collectionType instanceof CollectionType;
+ CollectionType ct = (CollectionType)collectionType;
+ switch (ct.kind)
+ {
+ case LIST:
+ case MAP:
+ return ct.valueComparator();
+ case SET:
+ return ct.nameComparator();
+ }
+ throw new AssertionError();
+ }
+
+ private AbstractType<?> forKeys(AbstractType<?> collectionType)
+ {
+ assert collectionType instanceof CollectionType;
+ CollectionType ct = (CollectionType)collectionType;
+ switch (ct.kind)
+ {
+ case LIST:
+ case MAP:
+ return ct.nameComparator();
+ }
+ throw new AssertionError();
+ }
+
+ public AbstractReadCommandBuilder filterOn(String column, Operator op, Object value)
+ {
+ ColumnDefinition def = cfs.metadata.getColumnDefinition(ColumnIdentifier.getInterned(column, true));
+ assert def != null;
+
+ AbstractType<?> type = def.type;
+ if (op == Operator.CONTAINS)
+ type = forValues(type);
+ else if (op == Operator.CONTAINS_KEY)
+ type = forKeys(type);
+
+ this.filter.add(def, op, bb(value, type));
+ return this;
+ }
+
+ protected ColumnFilter makeColumnFilter()
+ {
+ if (columns == null)
+ return ColumnFilter.all(cfs.metadata);
+
+ ColumnFilter.Builder builder = ColumnFilter.allColumnsBuilder(cfs.metadata);
+ for (ColumnIdentifier column : columns)
+ builder.add(cfs.metadata.getColumnDefinition(column));
+ return builder.build();
+ }
+
+ protected ClusteringIndexFilter makeFilter()
+ {
+ if (clusterings != null)
+ {
+ return new ClusteringIndexNamesFilter(clusterings, reversed);
+ }
+ else
+ {
+ Slice slice = Slice.make(lowerClusteringBound == null ? Slice.Bound.BOTTOM : lowerClusteringBound,
+ upperClusteringBound == null ? Slice.Bound.TOP : upperClusteringBound);
+ return new ClusteringIndexSliceFilter(Slices.with(cfs.metadata.comparator, slice), reversed);
+ }
+ }
+
+ protected DataLimits makeLimits()
+ {
+ DataLimits limits = cqlLimit < 0 ? DataLimits.NONE : DataLimits.cqlLimits(cqlLimit);
+ if (pagingLimit >= 0)
+ limits = limits.forPaging(pagingLimit);
+ return limits;
+ }
+
+ public Row getOnlyRow()
+ {
+ return Util.getOnlyRow(build());
+ }
+
+ public Row getOnlyRowUnfiltered()
+ {
+ return Util.getOnlyRowUnfiltered(build());
+ }
+
+ public FilteredPartition getOnlyPartition()
+ {
+ return Util.getOnlyPartition(build());
+ }
+
+ public Partition getOnlyPartitionUnfiltered()
+ {
+ return Util.getOnlyPartitionUnfiltered(build());
+ }
+
+ public abstract ReadCommand build();
+
+ public static class SinglePartitionBuilder extends AbstractReadCommandBuilder
+ {
+ private final DecoratedKey partitionKey;
+
+ SinglePartitionBuilder(ColumnFamilyStore cfs, DecoratedKey key)
+ {
+ super(cfs);
+ this.partitionKey = key;
+ }
+
+ @Override
+ public ReadCommand build()
+ {
+ return SinglePartitionReadCommand.create(cfs.metadata, nowInSeconds, makeColumnFilter(), filter, makeLimits(), partitionKey, makeFilter());
+ }
+ }
+
+ public static class PartitionRangeBuilder extends AbstractReadCommandBuilder
+ {
+ private DecoratedKey startKey;
+ private boolean startInclusive;
+ private DecoratedKey endKey;
+ private boolean endInclusive;
+
+ PartitionRangeBuilder(ColumnFamilyStore cfs)
+ {
+ super(cfs);
+ }
+
+ public PartitionRangeBuilder fromKeyIncl(Object... values)
+ {
+ assert startKey == null;
+ this.startInclusive = true;
+ this.startKey = Util.makeKey(cfs.metadata, values);
+ return this;
+ }
+
+ public PartitionRangeBuilder fromKeyExcl(Object... values)
+ {
+ assert startKey == null;
+ this.startInclusive = false;
+ this.startKey = Util.makeKey(cfs.metadata, values);
+ return this;
+ }
+
+ public PartitionRangeBuilder toKeyIncl(Object... values)
+ {
+ assert endKey == null;
+ this.endInclusive = true;
+ this.endKey = Util.makeKey(cfs.metadata, values);
+ return this;
+ }
+
+ public PartitionRangeBuilder toKeyExcl(Object... values)
+ {
+ assert endKey == null;
+ this.endInclusive = false;
+ this.endKey = Util.makeKey(cfs.metadata, values);
+ return this;
+ }
+
+ @Override
+ public ReadCommand build()
+ {
+ PartitionPosition start = startKey;
+ if (start == null)
+ {
+ start = StorageService.getPartitioner().getMinimumToken().maxKeyBound();
+ startInclusive = false;
+ }
+ PartitionPosition end = endKey;
+ if (end == null)
+ {
+ end = StorageService.getPartitioner().getMinimumToken().maxKeyBound();
+ endInclusive = true;
+ }
+
+ AbstractBounds<PartitionPosition> bounds;
+ if (startInclusive && endInclusive)
+ bounds = new Bounds<PartitionPosition>(start, end);
+ else if (startInclusive && !endInclusive)
+ bounds = new IncludingExcludingBounds<PartitionPosition>(start, end);
+ else if (!startInclusive && endInclusive)
+ bounds = new Range<PartitionPosition>(start, end);
+ else
+ bounds = new ExcludingBounds<PartitionPosition>(start, end);
+
+ return new PartitionRangeReadCommand(cfs.metadata, nowInSeconds, makeColumnFilter(), filter, makeLimits(), new DataRange(bounds, makeFilter()));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/unit/org/apache/cassandra/EmbeddedServer.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/EmbeddedServer.java b/test/unit/org/apache/cassandra/EmbeddedServer.java
deleted file mode 100644
index 25754ea..0000000
--- a/test/unit/org/apache/cassandra/EmbeddedServer.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cassandra;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.cassandra.service.CassandraDaemon;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-
-public class EmbeddedServer extends SchemaLoader
-{
- protected static CassandraDaemon daemon = null;
-
- enum GatewayService
- {
- Thrift
- }
-
- public static GatewayService getDaemonGatewayService()
- {
- return GatewayService.Thrift;
- }
-
- static ExecutorService executor = Executors.newSingleThreadExecutor();
-
- @BeforeClass
- public static void startCassandra()
-
- {
- executor.execute(new Runnable()
- {
- public void run()
- {
- switch (getDaemonGatewayService())
- {
- case Thrift:
- default:
- daemon = new org.apache.cassandra.service.CassandraDaemon();
- }
- daemon.activate();
- }
- });
- try
- {
- TimeUnit.SECONDS.sleep(3);
- }
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
- }
-
- @AfterClass
- public static void stopCassandra() throws Exception
- {
- if (daemon != null)
- {
- daemon.deactivate();
- }
- executor.shutdown();
- executor.shutdownNow();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/unit/org/apache/cassandra/MockSchema.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/MockSchema.java b/test/unit/org/apache/cassandra/MockSchema.java
index c71c98b..f0a849b 100644
--- a/test/unit/org/apache/cassandra/MockSchema.java
+++ b/test/unit/org/apache/cassandra/MockSchema.java
@@ -21,7 +21,7 @@ package org.apache.cassandra;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
-import java.util.Set;
+import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.collect.ImmutableMap;
@@ -32,7 +32,6 @@ import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.KSMetaData;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.SimpleSparseCellNameType;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.io.sstable.Component;
@@ -119,12 +118,13 @@ public class MockSchema
throw new RuntimeException(e);
}
}
+ SerializationHeader header = SerializationHeader.make(cfs.metadata, Collections.EMPTY_LIST);
StatsMetadata metadata = (StatsMetadata) new MetadataCollector(cfs.metadata.comparator)
- .finalizeMetadata(Murmur3Partitioner.instance.getClass().getCanonicalName(), 0.01f, -1)
+ .finalizeMetadata(Murmur3Partitioner.instance.getClass().getCanonicalName(), 0.01f, -1, header)
.get(MetadataType.STATS);
SSTableReader reader = SSTableReader.internalOpen(descriptor, components, cfs.metadata, Murmur3Partitioner.instance,
segmentedFile.sharedCopy(), segmentedFile.sharedCopy(), indexSummary.sharedCopy(),
- new AlwaysPresentFilter(), 1L, metadata, SSTableReader.OpenReason.NORMAL);
+ new AlwaysPresentFilter(), 1L, metadata, SSTableReader.OpenReason.NORMAL, header);
reader.first = reader.last = readerBounds(generation);
if (!keepRef)
reader.selfRef().release();
@@ -140,10 +140,11 @@ public class MockSchema
private static CFMetaData newCFMetaData(String ksname, String cfname)
{
- CFMetaData metadata = new CFMetaData(ksname,
- cfname,
- ColumnFamilyType.Standard,
- new SimpleSparseCellNameType(UTF8Type.instance));
+ CFMetaData metadata = CFMetaData.Builder.create(ksname, cfname)
+ .addPartitionKey("key", UTF8Type.instance)
+ .addClusteringColumn("col", UTF8Type.instance)
+ .addRegularColumn("value", UTF8Type.instance)
+ .build();
metadata.caching(CachingOptions.NONE);
return metadata;
}