You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2014/04/29 14:05:05 UTC
[3/7] Push more of memtable data off-heap
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java b/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
new file mode 100644
index 0000000..1b5dcf2
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils.memory;
+
+import java.lang.reflect.Field;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.Cell;
+import org.apache.cassandra.db.CounterCell;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletedCell;
+import org.apache.cassandra.db.ExpiringCell;
+import org.apache.cassandra.db.NativeCell;
+import org.apache.cassandra.db.NativeCounterCell;
+import org.apache.cassandra.db.NativeDecoratedKey;
+import org.apache.cassandra.db.NativeDeletedCell;
+import org.apache.cassandra.db.NativeExpiringCell;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import sun.misc.Unsafe;
+
+public class NativeAllocator extends MemtableAllocator
+{
+ private static final Logger logger = LoggerFactory.getLogger(NativeAllocator.class);
+
+ private final static int REGION_SIZE = 1024 * 1024;
+ private final static int MAX_CLONED_SIZE = 128 * 1024; // bigger than this don't go in the region
+
+ // globally stash any Regions we allocate but are beaten to using, and use these up before allocating any more
+ private static final ConcurrentLinkedQueue<Region> RACE_ALLOCATED = new ConcurrentLinkedQueue<>();
+
+ private final AtomicReference<Region> currentRegion = new AtomicReference<>();
+ private final AtomicInteger regionCount = new AtomicInteger(0);
+ private final ConcurrentLinkedQueue<Region> regions = new ConcurrentLinkedQueue<>();
+ private AtomicLong unslabbed = new AtomicLong(0);
+
+ protected NativeAllocator(NativePool pool)
+ {
+ super(pool.onHeap.newAllocator(), pool.offHeap.newAllocator());
+ }
+
+ @Override
+ public Cell clone(Cell cell, CFMetaData cfm, OpOrder.Group writeOp)
+ {
+ return new NativeCell(this, writeOp, cell);
+ }
+
+ @Override
+ public CounterCell clone(CounterCell cell, CFMetaData cfm, OpOrder.Group writeOp)
+ {
+ return new NativeCounterCell(this, writeOp, cell);
+ }
+
+ @Override
+ public DeletedCell clone(DeletedCell cell, CFMetaData cfm, OpOrder.Group writeOp)
+ {
+ return new NativeDeletedCell(this, writeOp, cell);
+ }
+
+ @Override
+ public ExpiringCell clone(ExpiringCell cell, CFMetaData cfm, OpOrder.Group writeOp)
+ {
+ return new NativeExpiringCell(this, writeOp, cell);
+ }
+
+ public DecoratedKey clone(DecoratedKey key, OpOrder.Group writeOp)
+ {
+ return new NativeDecoratedKey(key.getToken(), this, writeOp, key.getKey());
+ }
+
+ @Override
+ public MemtableAllocator.DataReclaimer reclaimer()
+ {
+ return NO_OP;
+ }
+
+ public long allocate(int size, OpOrder.Group opGroup)
+ {
+ assert size >= 0;
+ offHeap().allocate(size, opGroup);
+ // satisfy large allocations directly from JVM since they don't cause fragmentation
+ // as badly, and fill up our regions quickly
+ if (size > MAX_CLONED_SIZE)
+ {
+ unslabbed.addAndGet(size);
+ Region region = new Region(unsafe.allocateMemory(size), size);
+ regions.add(region);
+
+ long peer;
+ if ((peer = region.allocate(size)) == -1)
+ throw new AssertionError();
+
+ return peer;
+ }
+
+ while (true)
+ {
+ Region region = getRegion();
+
+ long peer;
+ if ((peer = region.allocate(size)) > 0)
+ return peer;
+
+ // not enough space!
+ currentRegion.compareAndSet(region, null);
+ }
+ }
+
+ public void setDiscarded()
+ {
+ for (Region region : regions)
+ unsafe.freeMemory(region.peer);
+ super.setDiscarded();
+ }
+
+ /**
+ * Get the current region, or, if there is no current region, allocate a new one
+ */
+ private Region getRegion()
+ {
+ while (true)
+ {
+ // Try to get the region
+ Region region = currentRegion.get();
+ if (region != null)
+ return region;
+
+ // No current region, so we want to allocate one. We race
+ // against other allocators to CAS in a Region, and if we fail we stash the region for re-use
+ region = RACE_ALLOCATED.poll();
+ if (region == null)
+ region = new Region(unsafe.allocateMemory(REGION_SIZE), REGION_SIZE);
+ if (currentRegion.compareAndSet(null, region))
+ {
+ regions.add(region);
+ regionCount.incrementAndGet();
+ logger.trace("{} regions now allocated in {}", regionCount, this);
+ return region;
+ }
+
+ // someone else won race - that's fine, we'll try to grab theirs
+ // in the next iteration of the loop.
+ RACE_ALLOCATED.add(region);
+ }
+ }
+
+ /**
+ * A region of memory out of which allocations are sliced.
+ *
+ * This serves two purposes:
+ * - to provide a step between initialization and allocation, so that racing to CAS a
+ * new region in is harmless
+ * - encapsulates the allocation offset
+ */
+ private static class Region
+ {
+ /**
+ * Actual underlying data
+ */
+ private final long peer;
+
+ private final long capacity;
+
+ /**
+ * Offset for the next allocation, or the sentinel value -1
+ * which implies that the region is still uninitialized.
+ */
+ private AtomicInteger nextFreeOffset = new AtomicInteger(0);
+
+ /**
+ * Total number of allocations satisfied from this buffer
+ */
+ private AtomicInteger allocCount = new AtomicInteger();
+
+ /**
+ * Create an uninitialized region. Note that memory is not allocated yet, so
+ * this is cheap.
+ *
+ * @param peer peer
+ */
+ private Region(long peer, long capacity)
+ {
+ this.peer = peer;
+ this.capacity = capacity;
+ }
+
+ /**
+ * Try to allocate <code>size</code> bytes from the region.
+ *
+ * @return the successful allocation, or null to indicate not-enough-space
+ */
+ long allocate(int size)
+ {
+ while (true)
+ {
+ int oldOffset = nextFreeOffset.get();
+
+ if (oldOffset + size > capacity) // capacity == remaining
+ return -1;
+
+ // Try to atomically claim this region
+ if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size))
+ {
+ // we got the alloc
+ allocCount.incrementAndGet();
+ return peer + oldOffset;
+ }
+ // we raced and lost alloc, try again
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Region@" + System.identityHashCode(this) +
+ " allocs=" + allocCount.get() + "waste=" +
+ (capacity - nextFreeOffset.get());
+ }
+ }
+
+
+ static final Unsafe unsafe;
+
+ static
+ {
+ try
+ {
+ Field field = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
+ field.setAccessible(true);
+ unsafe = (sun.misc.Unsafe) field.get(null);
+ }
+ catch (Exception e)
+ {
+ throw new AssertionError(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/utils/memory/NativePool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/NativePool.java b/src/java/org/apache/cassandra/utils/memory/NativePool.java
new file mode 100644
index 0000000..012867a
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/memory/NativePool.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.utils.memory;
+
+public class NativePool extends MemtablePool
+{
+ public NativePool(long maxOnHeapMemory, long maxOffHeapMemory, float cleanThreshold, Runnable cleaner)
+ {
+ super(maxOnHeapMemory, maxOffHeapMemory, cleanThreshold, cleaner);
+ }
+
+ @Override
+ public boolean needToCopyOnHeap()
+ {
+ return true;
+ }
+
+ @Override
+ public NativeAllocator newAllocator()
+ {
+ return new NativeAllocator(this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/utils/memory/Pool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/Pool.java b/src/java/org/apache/cassandra/utils/memory/Pool.java
deleted file mode 100644
index aa5e05c..0000000
--- a/src/java/org/apache/cassandra/utils/memory/Pool.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cassandra.utils.memory;
-
-import java.util.concurrent.atomic.AtomicLongFieldUpdater;
-
-import org.apache.cassandra.utils.concurrent.WaitQueue;
-
-
-/**
- * Represents an amount of memory used for a given purpose, that can be allocated to specific tasks through
- * child PoolAllocator objects.
- */
-public abstract class Pool
-{
- final PoolCleanerThread<?> cleaner;
-
- // the total memory used by this pool
- public final SubPool onHeap;
- public final SubPool offHeap;
-
- final WaitQueue hasRoom = new WaitQueue();
-
- Pool(long maxOnHeapMemory, long maxOffHeapMemory, float cleanThreshold, Runnable cleaner)
- {
- this.onHeap = getSubPool(maxOnHeapMemory, cleanThreshold);
- this.offHeap = getSubPool(maxOffHeapMemory, cleanThreshold);
- this.cleaner = getCleaner(cleaner);
- if (this.cleaner != null)
- this.cleaner.start();
- }
-
- SubPool getSubPool(long limit, float cleanThreshold)
- {
- return new SubPool(limit, cleanThreshold);
- }
-
- PoolCleanerThread<?> getCleaner(Runnable cleaner)
- {
- return cleaner == null ? null : new PoolCleanerThread<>(this, cleaner);
- }
-
- public abstract boolean needToCopyOnHeap();
- public abstract PoolAllocator newAllocator();
-
- /**
- * Note the difference between acquire() and allocate(); allocate() makes more resources available to all owners,
- * and acquire() makes shared resources unavailable but still recorded. An Owner must always acquire resources,
- * but only needs to allocate if there are none already available. This distinction is not always meaningful.
- */
- public class SubPool
- {
-
- // total memory/resource permitted to allocate
- public final long limit;
-
- // ratio of used to spare (both excluding 'reclaiming') at which to trigger a clean
- public final float cleanThreshold;
-
- // total bytes allocated and reclaiming
- volatile long allocated;
- volatile long reclaiming;
-
- // a cache of the calculation determining at what allocation threshold we should next clean
- volatile long nextClean;
-
- public SubPool(long limit, float cleanThreshold)
- {
- this.limit = limit;
- this.cleanThreshold = cleanThreshold;
- }
-
- /** Methods for tracking and triggering a clean **/
-
- boolean needsCleaning()
- {
- // use strictly-greater-than so we don't clean when limit is 0
- return used() > nextClean && updateNextClean();
- }
-
- void maybeClean()
- {
- if (needsCleaning() && cleaner != null)
- cleaner.trigger();
- }
-
- private boolean updateNextClean()
- {
- while (true)
- {
- long current = nextClean;
- long reclaiming = this.reclaiming;
- long next = reclaiming + (long) (this.limit * cleanThreshold);
- if (current == next || nextCleanUpdater.compareAndSet(this, current, next))
- return used() > next;
- }
- }
-
- /** Methods to allocate space **/
-
- boolean tryAllocate(long size)
- {
- while (true)
- {
- long cur;
- if ((cur = allocated) + size > limit)
- return false;
- if (allocatedUpdater.compareAndSet(this, cur, cur + size))
- return true;
- }
- }
-
- /**
- * apply the size adjustment to allocated, bypassing any limits or constraints. If this reduces the
- * allocated total, we will signal waiters
- */
- void adjustAllocated(long size)
- {
- if (size == 0)
- return;
- while (true)
- {
- long cur = allocated;
- if (allocatedUpdater.compareAndSet(this, cur, cur + size))
- return;
- }
- }
-
- // 'acquires' an amount of memory, and maybe also marks it allocated. This method is meant to be overridden
- // by implementations with a separate concept of acquired/allocated. As this method stands, an acquire
- // without an allocate is a no-op (acquisition is achieved through allocation), however a release (where size < 0)
- // is always processed and accounted for in allocated.
- void adjustAcquired(long size, boolean alsoAllocated)
- {
- if (size > 0 || alsoAllocated)
- {
- if (alsoAllocated)
- adjustAllocated(size);
- maybeClean();
- }
- else if (size < 0)
- {
- adjustAllocated(size);
- hasRoom.signalAll();
- }
- }
-
- // space reclaimed should be released prior to calling this, to avoid triggering unnecessary cleans
- void adjustReclaiming(long reclaiming)
- {
- if (reclaiming == 0)
- return;
- reclaimingUpdater.addAndGet(this, reclaiming);
- if (reclaiming < 0 && updateNextClean() && cleaner != null)
- cleaner.trigger();
- }
-
- public long allocated()
- {
- return allocated;
- }
-
- public long used()
- {
- return allocated;
- }
-
- public PoolAllocator.SubAllocator newAllocator()
- {
- return new PoolAllocator.SubAllocator(this);
- }
-
- public WaitQueue hasRoom()
- {
- return hasRoom;
- }
- }
-
- private static final AtomicLongFieldUpdater<SubPool> reclaimingUpdater = AtomicLongFieldUpdater.newUpdater(SubPool.class, "reclaiming");
- private static final AtomicLongFieldUpdater<SubPool> allocatedUpdater = AtomicLongFieldUpdater.newUpdater(SubPool.class, "allocated");
- private static final AtomicLongFieldUpdater<SubPool> nextCleanUpdater = AtomicLongFieldUpdater.newUpdater(SubPool.class, "nextClean");
-
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/utils/memory/PoolAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/PoolAllocator.java b/src/java/org/apache/cassandra/utils/memory/PoolAllocator.java
deleted file mode 100644
index aa374fe..0000000
--- a/src/java/org/apache/cassandra/utils/memory/PoolAllocator.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.utils.memory;
-
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.utils.concurrent.OpOrder;
-import java.util.concurrent.atomic.AtomicLongFieldUpdater;
-
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.concurrent.WaitQueue;
-
-public abstract class PoolAllocator extends AbstractAllocator
-{
-
- private final SubAllocator onHeap;
- private final SubAllocator offHeap;
- volatile LifeCycle state = LifeCycle.LIVE;
-
- static enum LifeCycle
- {
- LIVE, DISCARDING, DISCARDED;
- LifeCycle transition(LifeCycle targetState)
- {
- switch (targetState)
- {
- case DISCARDING:
- assert this == LifeCycle.LIVE;
- return LifeCycle.DISCARDING;
- case DISCARDED:
- assert this == LifeCycle.DISCARDING;
- return LifeCycle.DISCARDED;
- }
- throw new IllegalStateException();
- }
- }
-
- PoolAllocator(SubAllocator onHeap, SubAllocator offHeap)
- {
- this.onHeap = onHeap;
- this.offHeap = offHeap;
- }
-
- public SubAllocator onHeap()
- {
- return onHeap;
- }
-
- public SubAllocator offHeap()
- {
- return offHeap;
- }
-
- /**
- * Mark this allocator reclaiming; this will permit any outstanding allocations to temporarily
- * overshoot the maximum memory limit so that flushing can begin immediately
- */
- public void setDiscarding()
- {
- state = state.transition(LifeCycle.DISCARDING);
- // mark the memory owned by this allocator as reclaiming
- onHeap.markAllReclaiming();
- offHeap.markAllReclaiming();
- }
-
- /**
- * Indicate the memory and resources owned by this allocator are no longer referenced,
- * and can be reclaimed/reused.
- */
- public void setDiscarded()
- {
- state = state.transition(LifeCycle.DISCARDED);
- // release any memory owned by this allocator; automatically signals waiters
- onHeap.releaseAll();
- offHeap.releaseAll();
- }
-
- public boolean isLive()
- {
- return state == LifeCycle.LIVE;
- }
-
- public abstract ByteBuffer allocate(int size, OpOrder.Group opGroup);
- public abstract void free(ByteBuffer name);
-
- /**
- * Allocate a slice of the given length.
- */
- public ByteBuffer clone(ByteBuffer buffer, OpOrder.Group opGroup)
- {
- assert buffer != null;
- if (buffer.remaining() == 0)
- return ByteBufferUtil.EMPTY_BYTE_BUFFER;
- ByteBuffer cloned = allocate(buffer.remaining(), opGroup);
-
- cloned.mark();
- cloned.put(buffer.duplicate());
- cloned.reset();
- return cloned;
- }
-
- public ContextAllocator wrap(OpOrder.Group opGroup)
- {
- return new ContextAllocator(opGroup, this);
- }
-
- /** Mark the BB as unused, permitting it to be reclaimed */
- public static final class SubAllocator
- {
- // the tracker we are owning memory from
- private final Pool.SubPool parent;
-
- // the amount of memory/resource owned by this object
- private volatile long owns;
- // the amount of memory we are reporting to collect; this may be inaccurate, but is close
- // and is used only to ensure that once we have reclaimed we mark the tracker with the same amount
- private volatile long reclaiming;
-
- SubAllocator(Pool.SubPool parent)
- {
- this.parent = parent;
- }
-
- // should only be called once we know we will never allocate to the object again.
- // currently no corroboration/enforcement of this is performed.
- void releaseAll()
- {
- parent.adjustAcquired(-ownsUpdater.getAndSet(this, 0), false);
- parent.adjustReclaiming(-reclaimingUpdater.getAndSet(this, 0));
- }
-
- // allocate memory in the tracker, and mark ourselves as owning it
- public void allocate(long size, OpOrder.Group opGroup)
- {
- while (true)
- {
- if (parent.tryAllocate(size))
- {
- acquired(size);
- return;
- }
- WaitQueue.Signal signal = opGroup.isBlockingSignal(parent.hasRoom().register());
- boolean allocated = parent.tryAllocate(size);
- if (allocated || opGroup.isBlocking())
- {
- signal.cancel();
- if (allocated) // if we allocated, take ownership
- acquired(size);
- else // otherwise we're blocking so we're permitted to overshoot our constraints, to just allocate without blocking
- allocated(size);
- return;
- }
- else
- signal.awaitUninterruptibly();
- }
- }
-
- // retroactively mark an amount allocated amd acquired in the tracker, and owned by us
- void allocated(long size)
- {
- parent.adjustAcquired(size, true);
- ownsUpdater.addAndGet(this, size);
- }
-
- // retroactively mark an amount acquired in the tracker, and owned by us
- void acquired(long size)
- {
- parent.adjustAcquired(size, false);
- ownsUpdater.addAndGet(this, size);
- }
-
- void release(long size)
- {
- parent.adjustAcquired(-size, false);
- ownsUpdater.addAndGet(this, -size);
- }
-
- // mark everything we currently own as reclaiming, both here and in our parent
- void markAllReclaiming()
- {
- while (true)
- {
- long cur = owns;
- long prev = reclaiming;
- if (reclaimingUpdater.compareAndSet(this, prev, cur))
- {
- parent.adjustReclaiming(cur - prev);
- return;
- }
- }
- }
-
- public long owns()
- {
- return owns;
- }
-
- public float ownershipRatio()
- {
- float r = owns / (float) parent.limit;
- if (Float.isNaN(r))
- return 0;
- return r;
- }
-
- private static final AtomicLongFieldUpdater<SubAllocator> ownsUpdater = AtomicLongFieldUpdater.newUpdater(SubAllocator.class, "owns");
- private static final AtomicLongFieldUpdater<SubAllocator> reclaimingUpdater = AtomicLongFieldUpdater.newUpdater(SubAllocator.class, "reclaiming");
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/utils/memory/PoolCleanerThread.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/PoolCleanerThread.java b/src/java/org/apache/cassandra/utils/memory/PoolCleanerThread.java
deleted file mode 100644
index 68b0c20..0000000
--- a/src/java/org/apache/cassandra/utils/memory/PoolCleanerThread.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cassandra.utils.memory;
-
-import org.apache.cassandra.utils.concurrent.WaitQueue;
-
-/**
- * A thread that reclaims memory from a Pool on demand. The actual reclaiming work is delegated to the
- * cleaner Runnable, e.g., FlushLargestColumnFamily
- */
-class PoolCleanerThread<P extends Pool> extends Thread
-{
- /** The pool we're cleaning */
- final P pool;
-
- /** should ensure that at least some memory has been marked reclaiming after completion */
- final Runnable cleaner;
-
- /** signalled whenever needsCleaning() may return true */
- final WaitQueue wait = new WaitQueue();
-
- PoolCleanerThread(P pool, Runnable cleaner)
- {
- super(pool.getClass().getSimpleName() + "Cleaner");
- this.pool = pool;
- this.cleaner = cleaner;
- }
-
- boolean needsCleaning()
- {
- return pool.onHeap.needsCleaning() || pool.offHeap.needsCleaning();
- }
-
- // should ONLY be called when we really think it already needs cleaning
- void trigger()
- {
- wait.signal();
- }
-
- @Override
- public void run()
- {
- while (true)
- {
- while (!needsCleaning())
- {
- final WaitQueue.Signal signal = wait.register();
- if (!needsCleaning())
- signal.awaitUninterruptibly();
- else
- signal.cancel();
- }
-
- cleaner.run();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/utils/memory/SlabAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/SlabAllocator.java b/src/java/org/apache/cassandra/utils/memory/SlabAllocator.java
index a90357c..19334ee 100644
--- a/src/java/org/apache/cassandra/utils/memory/SlabAllocator.java
+++ b/src/java/org/apache/cassandra/utils/memory/SlabAllocator.java
@@ -44,7 +44,7 @@ import sun.nio.ch.DirectBuffer;
* interleaved throughout the heap, and the old generation gets progressively
* more fragmented until a stop-the-world compacting collection occurs.
*/
-public class SlabAllocator extends PoolAllocator
+public class SlabAllocator extends MemtableBufferAllocator
{
private static final Logger logger = LoggerFactory.getLogger(SlabAllocator.class);
@@ -54,7 +54,7 @@ public class SlabAllocator extends PoolAllocator
// globally stash any Regions we allocate but are beaten to using, and use these up before allocating any more
private static final ConcurrentLinkedQueue<Region> RACE_ALLOCATED = new ConcurrentLinkedQueue<>();
- private final AtomicReference<Region> currentRegion = new AtomicReference<Region>();
+ private final AtomicReference<Region> currentRegion = new AtomicReference<>();
private final AtomicInteger regionCount = new AtomicInteger(0);
// this queue is used to keep references to off-heap allocated regions so that we can free them when we are discarded
@@ -106,9 +106,9 @@ public class SlabAllocator extends PoolAllocator
}
}
- public void free(ByteBuffer name)
+ public DataReclaimer reclaimer()
{
- // have to assume we cannot free the memory here, and just reclaim it all when we flush
+ return NO_OP;
}
public void setDiscarded()
@@ -150,6 +150,11 @@ public class SlabAllocator extends PoolAllocator
}
}
+ protected AbstractAllocator allocator(OpOrder.Group writeOp)
+ {
+ return new ContextAllocator(writeOp, this);
+ }
+
/**
* A region of memory out of which allocations are sliced.
*
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/utils/memory/SlabPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/SlabPool.java b/src/java/org/apache/cassandra/utils/memory/SlabPool.java
index 7276e57..c5c44e1 100644
--- a/src/java/org/apache/cassandra/utils/memory/SlabPool.java
+++ b/src/java/org/apache/cassandra/utils/memory/SlabPool.java
@@ -18,18 +18,17 @@
*/
package org.apache.cassandra.utils.memory;
-
-public class SlabPool extends Pool
+public class SlabPool extends MemtablePool
{
-
final boolean allocateOnHeap;
+
public SlabPool(long maxOnHeapMemory, long maxOffHeapMemory, float cleanupThreshold, Runnable cleaner)
{
super(maxOnHeapMemory, maxOffHeapMemory, cleanupThreshold, cleaner);
this.allocateOnHeap = maxOffHeapMemory == 0;
}
- public SlabAllocator newAllocator()
+ public MemtableAllocator newAllocator()
{
return new SlabAllocator(onHeap.newAllocator(), offHeap.newAllocator(), allocateOnHeap);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/test/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml
index 37b0b96..b766a64 100644
--- a/test/conf/cassandra.yaml
+++ b/test/conf/cassandra.yaml
@@ -3,7 +3,7 @@
# Consider the effects on 'o.a.c.i.s.LegacySSTableTest' before changing schemas in this file.
#
cluster_name: Test Cluster
-memtable_allocation_type: offheap_buffers
+memtable_allocation_type: offheap_objects
in_memory_compaction_limit_in_mb: 1
commitlog_sync: batch
commitlog_sync_batch_window_in_ms: 1.0
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/test/long/org/apache/cassandra/db/LongFlushMemtableTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/LongFlushMemtableTest.java b/test/long/org/apache/cassandra/db/LongFlushMemtableTest.java
index ba59404..4bb8fdd 100644
--- a/test/long/org/apache/cassandra/db/LongFlushMemtableTest.java
+++ b/test/long/org/apache/cassandra/db/LongFlushMemtableTest.java
@@ -54,7 +54,7 @@ public class LongFlushMemtableTest extends SchemaLoader
ColumnFamily cf = ArrayBackedSortedColumns.factory.create("Keyspace1", "_CF" + i);
// don't cheat by allocating this outside of the loop; that defeats the purpose of deliberately using lots of memory
ByteBuffer value = ByteBuffer.allocate(100000);
- cf.addColumn(new Cell(Util.cellname("c"), value));
+ cf.addColumn(new BufferCell(Util.cellname("c"), value));
rm.add(cf);
rm.applyUnsafe();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/test/long/org/apache/cassandra/db/LongKeyspaceTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/LongKeyspaceTest.java b/test/long/org/apache/cassandra/db/LongKeyspaceTest.java
index 327ff47..7a5b837 100644
--- a/test/long/org/apache/cassandra/db/LongKeyspaceTest.java
+++ b/test/long/org/apache/cassandra/db/LongKeyspaceTest.java
@@ -37,7 +37,7 @@ public class LongKeyspaceTest extends SchemaLoader
for (int i = 1; i < 5000; i += 100)
{
- Mutation rm = new Mutation("Keyspace1", Util.dk("key" + i).key);
+ Mutation rm = new Mutation("Keyspace1", Util.dk("key" + i).getKey());
ColumnFamily cf = ArrayBackedSortedColumns.factory.create("Keyspace1", "Standard1");
for (int j = 0; j < i; j++)
cf.addColumn(column("c" + j, "v" + j, 1L));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java b/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
index 35c2b5e..94bc09f 100644
--- a/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
+++ b/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
@@ -127,7 +127,7 @@ public class LongCompactionsTest extends SchemaLoader
for (int j = 0; j < SSTABLES; j++) {
for (int i = 0; i < ROWS_PER_SSTABLE; i++) {
DecoratedKey key = Util.dk(String.valueOf(i % 2));
- Mutation rm = new Mutation(KEYSPACE1, key.key);
+ Mutation rm = new Mutation(KEYSPACE1, key.getKey());
long timestamp = j * ROWS_PER_SSTABLE + i;
rm.add("Standard1", Util.cellname(String.valueOf(i / 2)),
ByteBufferUtil.EMPTY_BYTE_BUFFER,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
index 90e7123..b071001 100644
--- a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
+++ b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
@@ -54,7 +54,7 @@ public class LongLeveledCompactionStrategyTest extends SchemaLoader
for (int r = 0; r < rows; r++)
{
DecoratedKey key = Util.dk(String.valueOf(r));
- Mutation rm = new Mutation(ksname, key.key);
+ Mutation rm = new Mutation(ksname, key.getKey());
for (int c = 0; c < columns; c++)
{
rm.add(cfname, Util.cellname("column" + c), value, 0);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index fe80009..d2fe949 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -77,7 +77,7 @@ public class Util
public static RowPosition rp(String key, IPartitioner partitioner)
{
- return RowPosition.forKey(ByteBufferUtil.bytes(key), partitioner);
+ return RowPosition.ForKey.get(ByteBufferUtil.bytes(key), partitioner);
}
public static CellName cellname(ByteBuffer... bbs)
@@ -108,17 +108,17 @@ public class Util
public static Cell column(String name, String value, long timestamp)
{
- return new Cell(cellname(name), ByteBufferUtil.bytes(value), timestamp);
+ return new BufferCell(cellname(name), ByteBufferUtil.bytes(value), timestamp);
}
public static Cell expiringColumn(String name, String value, long timestamp, int ttl)
{
- return new ExpiringCell(cellname(name), ByteBufferUtil.bytes(value), timestamp, ttl);
+ return new BufferExpiringCell(cellname(name), ByteBufferUtil.bytes(value), timestamp, ttl);
}
public static Cell counterColumn(String name, long value, long timestamp)
{
- return new CounterUpdateCell(cellname(name), value, timestamp);
+ return new BufferCounterUpdateCell(cellname(name), value, timestamp);
}
public static Token token(String key)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/test/unit/org/apache/cassandra/config/DefsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/config/DefsTest.java b/test/unit/org/apache/cassandra/config/DefsTest.java
index 2e1876f..94738ac 100644
--- a/test/unit/org/apache/cassandra/config/DefsTest.java
+++ b/test/unit/org/apache/cassandra/config/DefsTest.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.config;
import java.io.File;
import java.nio.ByteBuffer;
-import java.util.*;
import org.apache.cassandra.OrderedJUnit4ClassRunner;
import org.apache.cassandra.SchemaLoader;
@@ -170,7 +169,7 @@ public class DefsTest extends SchemaLoader
// now read and write to it.
CellName col0 = cellname("col0");
DecoratedKey dk = Util.dk("key0");
- Mutation rm = new Mutation(ks, dk.key);
+ Mutation rm = new Mutation(ks, dk.getKey());
rm.add(cf, col0, ByteBufferUtil.bytes("value0"), 1L);
rm.apply();
ColumnFamilyStore store = Keyspace.open(ks).getColumnFamilyStore(cf);
@@ -194,7 +193,7 @@ public class DefsTest extends SchemaLoader
assert cfm != null;
// write some data, force a flush, then verify that files exist on disk.
- Mutation rm = new Mutation(ks.name, dk.key);
+ Mutation rm = new Mutation(ks.name, dk.getKey());
for (int i = 0; i < 100; i++)
rm.add(cfm.cfName, cellname("col" + i), ByteBufferUtil.bytes("anyvalue"), 1L);
rm.apply();
@@ -208,7 +207,7 @@ public class DefsTest extends SchemaLoader
assert !Schema.instance.getKSMetaData(ks.name).cfMetaData().containsKey(cfm.cfName);
// any write should fail.
- rm = new Mutation(ks.name, dk.key);
+ rm = new Mutation(ks.name, dk.getKey());
boolean success = true;
try
{
@@ -244,7 +243,7 @@ public class DefsTest extends SchemaLoader
// test reads and writes.
CellName col0 = cellname("col0");
- Mutation rm = new Mutation(newCf.ksName, dk.key);
+ Mutation rm = new Mutation(newCf.ksName, dk.getKey());
rm.add(newCf.cfName, col0, ByteBufferUtil.bytes("value0"), 1L);
rm.apply();
ColumnFamilyStore store = Keyspace.open(newCf.ksName).getColumnFamilyStore(newCf.cfName);
@@ -268,7 +267,7 @@ public class DefsTest extends SchemaLoader
assert cfm != null;
// write some data, force a flush, then verify that files exist on disk.
- Mutation rm = new Mutation(ks.name, dk.key);
+ Mutation rm = new Mutation(ks.name, dk.getKey());
for (int i = 0; i < 100; i++)
rm.add(cfm.cfName, cellname("col" + i), ByteBufferUtil.bytes("anyvalue"), 1L);
rm.apply();
@@ -282,7 +281,7 @@ public class DefsTest extends SchemaLoader
assert Schema.instance.getKSMetaData(ks.name) == null;
// write should fail.
- rm = new Mutation(ks.name, dk.key);
+ rm = new Mutation(ks.name, dk.getKey());
boolean success = true;
try
{
@@ -319,7 +318,7 @@ public class DefsTest extends SchemaLoader
assert cfm != null;
// write some data
- Mutation rm = new Mutation(ks.name, dk.key);
+ Mutation rm = new Mutation(ks.name, dk.getKey());
for (int i = 0; i < 100; i++)
rm.add(cfm.cfName, cellname("col" + i), ByteBufferUtil.bytes("anyvalue"), 1L);
rm.apply();
@@ -353,7 +352,7 @@ public class DefsTest extends SchemaLoader
// now read and write to it.
CellName col0 = cellname("col0");
DecoratedKey dk = Util.dk("key0");
- Mutation rm = new Mutation(newKs.name, dk.key);
+ Mutation rm = new Mutation(newKs.name, dk.getKey());
rm.add(newCf.cfName, col0, ByteBufferUtil.bytes("value0"), 1L);
rm.apply();
ColumnFamilyStore store = Keyspace.open(newKs.name).getColumnFamilyStore(newCf.cfName);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java b/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java
index a1c98f3..83a58e4 100644
--- a/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java
+++ b/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java
@@ -54,7 +54,7 @@ public class ArrayBackedSortedColumnsTest extends SchemaLoader
int[] values = new int[]{ 1, 2, 2, 3 };
for (int i = 0; i < values.length; ++i)
- map.addColumn(new Cell(type.makeCellName(values[reversed ? values.length - 1 - i : i])));
+ map.addColumn(new BufferCell(type.makeCellName(values[reversed ? values.length - 1 - i : i])));
Iterator<Cell> iter = map.iterator();
assertEquals("1st column", 1, iter.next().name().toByteBuffer().getInt(0));
@@ -76,7 +76,7 @@ public class ArrayBackedSortedColumnsTest extends SchemaLoader
int[] values = new int[]{ 1, 2, 1, 3, 4, 4, 5, 5, 1, 2, 6, 6, 6, 1, 2, 3 };
for (int i = 0; i < values.length; ++i)
- cells.addColumn(new Cell(type.makeCellName(values[reversed ? values.length - 1 - i : i])));
+ cells.addColumn(new BufferCell(type.makeCellName(values[reversed ? values.length - 1 - i : i])));
assertEquals(6, cells.getColumnCount());
@@ -91,7 +91,7 @@ public class ArrayBackedSortedColumnsTest extends SchemaLoader
// Add more values
values = new int[]{ 11, 15, 12, 12, 12, 16, 10, 8, 8, 7, 4, 4, 5 };
for (int i = 0; i < values.length; ++i)
- cells.addColumn(new Cell(type.makeCellName(values[reversed ? values.length - 1 - i : i])));
+ cells.addColumn(new BufferCell(type.makeCellName(values[reversed ? values.length - 1 - i : i])));
assertEquals(13, cells.getColumnCount());
@@ -125,7 +125,7 @@ public class ArrayBackedSortedColumnsTest extends SchemaLoader
int[] values = new int[]{ -1, 20, 44, 55, 27, 27, 17, 1, 9, 89, 33, 44, 0, 9 };
for (int i = 0; i < values.length; ++i)
- cells.addColumn(new Cell(type.makeCellName(values[reversed ? values.length - 1 - i : i])));
+ cells.addColumn(new BufferCell(type.makeCellName(values[reversed ? values.length - 1 - i : i])));
for (int i : values)
assertEquals(i, cells.getColumn(type.makeCellName(i)).name().toByteBuffer().getInt(0));
@@ -148,10 +148,10 @@ public class ArrayBackedSortedColumnsTest extends SchemaLoader
int[] values2 = new int[]{ 2, 4, 5, 6 };
for (int i = 0; i < values1.length; ++i)
- map.addColumn(new Cell(type.makeCellName(values1[reversed ? values1.length - 1 - i : i])));
+ map.addColumn(new BufferCell(type.makeCellName(values1[reversed ? values1.length - 1 - i : i])));
for (int i = 0; i < values2.length; ++i)
- map2.addColumn(new Cell(type.makeCellName(values2[reversed ? values2.length - 1 - i : i])));
+ map2.addColumn(new BufferCell(type.makeCellName(values2[reversed ? values2.length - 1 - i : i])));
map2.addAll(map);
@@ -179,12 +179,12 @@ public class ArrayBackedSortedColumnsTest extends SchemaLoader
List<Cell> sorted = new ArrayList<>();
for (int v : values)
- sorted.add(new Cell(type.makeCellName(v)));
+ sorted.add(new BufferCell(type.makeCellName(v)));
List<Cell> reverseSorted = new ArrayList<>(sorted);
Collections.reverse(reverseSorted);
for (int i = 0; i < values.length; ++i)
- map.addColumn(new Cell(type.makeCellName(values[reversed ? values.length - 1 - i : i])));
+ map.addColumn(new BufferCell(type.makeCellName(values[reversed ? values.length - 1 - i : i])));
assertSame(sorted, map.getSortedColumns());
assertSame(reverseSorted, map.getReverseSortedColumns());
@@ -205,7 +205,7 @@ public class ArrayBackedSortedColumnsTest extends SchemaLoader
int[] values = new int[]{ 1, 2, 3, 5, 9 };
for (int i = 0; i < values.length; ++i)
- map.addColumn(new Cell(type.makeCellName(values[reversed ? values.length - 1 - i : i])));
+ map.addColumn(new BufferCell(type.makeCellName(values[reversed ? values.length - 1 - i : i])));
assertSame(new int[]{ 3, 2, 1 }, map.reverseIterator(new ColumnSlice[]{ new ColumnSlice(type.make(3), Composites.EMPTY) }));
assertSame(new int[]{ 3, 2, 1 }, map.reverseIterator(new ColumnSlice[]{ new ColumnSlice(type.make(4), Composites.EMPTY) }));
@@ -251,7 +251,7 @@ public class ArrayBackedSortedColumnsTest extends SchemaLoader
int[] values = new int[]{ 1, 2, 2, 3 };
for (int i = 0; i < values.length; ++i)
- map.addColumn(new Cell(type.makeCellName(values[reversed ? values.length - 1 - i : i])));
+ map.addColumn(new BufferCell(type.makeCellName(values[reversed ? values.length - 1 - i : i])));
Iterator<Cell> iter = map.getReverseSortedColumns().iterator();
assertTrue(iter.hasNext());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/test/unit/org/apache/cassandra/db/CollationControllerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CollationControllerTest.java b/test/unit/org/apache/cassandra/db/CollationControllerTest.java
index fc92aae..22c60b8 100644
--- a/test/unit/org/apache/cassandra/db/CollationControllerTest.java
+++ b/test/unit/org/apache/cassandra/db/CollationControllerTest.java
@@ -40,26 +40,26 @@ public class CollationControllerTest extends SchemaLoader
DecoratedKey dk = Util.dk("key1");
// add data
- rm = new Mutation(keyspace.getName(), dk.key);
+ rm = new Mutation(keyspace.getName(), dk.getKey());
rm.add(cfs.name, Util.cellname("Column1"), ByteBufferUtil.bytes("asdf"), 0);
rm.apply();
cfs.forceBlockingFlush();
// remove
- rm = new Mutation(keyspace.getName(), dk.key);
+ rm = new Mutation(keyspace.getName(), dk.getKey());
rm.delete(cfs.name, 10);
rm.apply();
// add another mutation because sstable maxtimestamp isn't set
// correctly during flush if the most recent mutation is a row delete
- rm = new Mutation(keyspace.getName(), Util.dk("key2").key);
+ rm = new Mutation(keyspace.getName(), Util.dk("key2").getKey());
rm.add(cfs.name, Util.cellname("Column1"), ByteBufferUtil.bytes("zxcv"), 20);
rm.apply();
cfs.forceBlockingFlush();
// add yet one more mutation
- rm = new Mutation(keyspace.getName(), dk.key);
+ rm = new Mutation(keyspace.getName(), dk.getKey());
rm.add(cfs.name, Util.cellname("Column1"), ByteBufferUtil.bytes("foobar"), 30);
rm.apply();
cfs.forceBlockingFlush();
@@ -92,13 +92,13 @@ public class CollationControllerTest extends SchemaLoader
CellName cellName = Util.cellname("Column1");
// add data
- rm = new Mutation(keyspace.getName(), dk.key);
+ rm = new Mutation(keyspace.getName(), dk.getKey());
rm.add(cfs.name, cellName, ByteBufferUtil.bytes("asdf"), 0);
rm.apply();
cfs.forceBlockingFlush();
// remove
- rm = new Mutation(keyspace.getName(), dk.key);
+ rm = new Mutation(keyspace.getName(), dk.getKey());
rm.delete(cfs.name, cellName, 0);
rm.apply();
cfs.forceBlockingFlush();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index d180b82..f812849 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -202,7 +202,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
Util.namesFilter(cfs, "asdf"),
10);
assertEquals(1, result.size());
- assert result.get(0).key.key.equals(ByteBufferUtil.bytes("key2"));
+ assert result.get(0).key.getKey().equals(ByteBufferUtil.bytes("key2"));
}
@Test
@@ -243,10 +243,10 @@ public class ColumnFamilyStoreTest extends SchemaLoader
assert rows != null;
assert rows.size() == 2 : StringUtils.join(rows, ",");
- String key = new String(rows.get(0).key.key.array(),rows.get(0).key.key.position(),rows.get(0).key.key.remaining());
+ String key = new String(rows.get(0).key.getKey().array(), rows.get(0).key.getKey().position(), rows.get(0).key.getKey().remaining());
assert "k1".equals( key ) : key;
- key = new String(rows.get(1).key.key.array(),rows.get(1).key.key.position(),rows.get(1).key.key.remaining());
+ key = new String(rows.get(1).key.getKey().array(), rows.get(1).key.getKey().position(), rows.get(1).key.getKey().remaining());
assert "k3".equals(key) : key;
assert ByteBufferUtil.bytes(1L).equals( rows.get(0).cf.getColumn(birthdate).value());
@@ -258,14 +258,14 @@ public class ColumnFamilyStoreTest extends SchemaLoader
rows = cfs.search(range, clause, filter, 100);
assert rows.size() == 1 : StringUtils.join(rows, ",");
- key = new String(rows.get(0).key.key.array(),rows.get(0).key.key.position(),rows.get(0).key.key.remaining());
+ key = new String(rows.get(0).key.getKey().array(), rows.get(0).key.getKey().position(), rows.get(0).key.getKey().remaining());
assert "k3".equals( key );
// same query again, but with resultset not including the subordinate expression
rows = cfs.search(range, clause, Util.namesFilter(cfs, "birthdate"), 100);
assert rows.size() == 1 : StringUtils.join(rows, ",");
- key = new String(rows.get(0).key.key.array(),rows.get(0).key.key.position(),rows.get(0).key.key.remaining());
+ key = new String(rows.get(0).key.getKey().array(), rows.get(0).key.getKey().position(), rows.get(0).key.getKey().remaining());
assert "k3".equals( key );
assert rows.get(0).cf.getColumnCount() == 1 : rows.get(0).cf;
@@ -275,7 +275,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
rows = cfs.search(range, clause, emptyFilter, 100);
assert rows.size() == 1 : StringUtils.join(rows, ",");
- key = new String(rows.get(0).key.key.array(),rows.get(0).key.key.position(),rows.get(0).key.key.remaining());
+ key = new String(rows.get(0).key.getKey().array(), rows.get(0).key.getKey().position(), rows.get(0).key.getKey().remaining());
assert "k3".equals( key );
assertFalse(rows.get(0).cf.hasColumns());
@@ -334,7 +334,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
Range<RowPosition> range = Util.range("", "");
List<Row> rows = cfs.search(range, clause, filter, 100);
assert rows.size() == 1 : StringUtils.join(rows, ",");
- String key = ByteBufferUtil.string(rows.get(0).key.key);
+ String key = ByteBufferUtil.string(rows.get(0).key.getKey());
assert "k1".equals( key );
// delete the column directly
@@ -358,7 +358,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
rm.apply();
rows = cfs.search(range, clause, filter, 100);
assert rows.size() == 1 : StringUtils.join(rows, ",");
- key = ByteBufferUtil.string(rows.get(0).key.key);
+ key = ByteBufferUtil.string(rows.get(0).key.getKey());
assert "k1".equals( key );
// verify that row and delete w/ older timestamp does nothing
@@ -367,7 +367,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
rm.apply();
rows = cfs.search(range, clause, filter, 100);
assert rows.size() == 1 : StringUtils.join(rows, ",");
- key = ByteBufferUtil.string(rows.get(0).key.key);
+ key = ByteBufferUtil.string(rows.get(0).key.getKey());
assert "k1".equals( key );
// similarly, column delete w/ older timestamp should do nothing
@@ -376,7 +376,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
rm.apply();
rows = cfs.search(range, clause, filter, 100);
assert rows.size() == 1 : StringUtils.join(rows, ",");
- key = ByteBufferUtil.string(rows.get(0).key.key);
+ key = ByteBufferUtil.string(rows.get(0).key.getKey());
assert "k1".equals( key );
// delete the entire row (w/ newer timestamp this time)
@@ -408,7 +408,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
rm.apply();
rows = cfs.search(range, clause, filter, 100);
assert rows.size() == 1 : StringUtils.join(rows, ",");
- key = ByteBufferUtil.string(rows.get(0).key.key);
+ key = ByteBufferUtil.string(rows.get(0).key.getKey());
assert "k1".equals( key );
}
@@ -438,7 +438,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
expr = new IndexExpression(ByteBufferUtil.bytes("birthdate"), IndexExpression.Operator.EQ, ByteBufferUtil.bytes(2L));
clause = Arrays.asList(expr);
rows = keyspace.getColumnFamilyStore("Indexed1").search(range, clause, filter, 100);
- String key = ByteBufferUtil.string(rows.get(0).key.key);
+ String key = ByteBufferUtil.string(rows.get(0).key.getKey());
assert "k1".equals( key );
// update the birthdate value with an OLDER timestamp, and test that the index ignores this
@@ -447,7 +447,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
rm.apply();
rows = keyspace.getColumnFamilyStore("Indexed1").search(range, clause, filter, 100);
- key = ByteBufferUtil.string(rows.get(0).key.key);
+ key = ByteBufferUtil.string(rows.get(0).key.getKey());
assert "k1".equals( key );
}
@@ -714,7 +714,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
IDiskAtomFilter filter = new IdentityQueryFilter();
List<Row> rows = keyspace.getColumnFamilyStore("Indexed2").search(Util.range("", ""), clause, filter, 100);
assert rows.size() == 1 : StringUtils.join(rows, ",");
- assertEquals("k1", ByteBufferUtil.string(rows.get(0).key.key));
+ assertEquals("k1", ByteBufferUtil.string(rows.get(0).key.getKey()));
}
@Test
@@ -771,7 +771,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
Util.namesFilter(cfs, "asdf"),
10);
assertEquals(2, result.size());
- assert result.get(0).key.key.equals(ByteBufferUtil.bytes("key1"));
+ assert result.get(0).key.getKey().equals(ByteBufferUtil.bytes("key1"));
}
@Test
@@ -786,16 +786,16 @@ public class ColumnFamilyStoreTest extends SchemaLoader
// create an isolated sstable.
putColsSuper(cfs, key, scfName,
- new Cell(cellname(1L), ByteBufferUtil.bytes("val1"), 1),
- new Cell(cellname(2L), ByteBufferUtil.bytes("val2"), 1),
- new Cell(cellname(3L), ByteBufferUtil.bytes("val3"), 1));
+ new BufferCell(cellname(1L), ByteBufferUtil.bytes("val1"), 1),
+ new BufferCell(cellname(2L), ByteBufferUtil.bytes("val2"), 1),
+ new BufferCell(cellname(3L), ByteBufferUtil.bytes("val3"), 1));
cfs.forceBlockingFlush();
// insert, don't flush.
putColsSuper(cfs, key, scfName,
- new Cell(cellname(4L), ByteBufferUtil.bytes("val4"), 1),
- new Cell(cellname(5L), ByteBufferUtil.bytes("val5"), 1),
- new Cell(cellname(6L), ByteBufferUtil.bytes("val6"), 1));
+ new BufferCell(cellname(4L), ByteBufferUtil.bytes("val4"), 1),
+ new BufferCell(cellname(5L), ByteBufferUtil.bytes("val5"), 1),
+ new BufferCell(cellname(6L), ByteBufferUtil.bytes("val6"), 1));
// verify insert.
final SlicePredicate sp = new SlicePredicate();
@@ -807,7 +807,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
assertRowAndColCount(1, 6, false, cfs.getRangeSlice(Util.range("f", "g"), null, ThriftValidation.asIFilter(sp, cfs.metadata, scfName), 100));
// delete
- Mutation rm = new Mutation(keyspace.getName(), key.key);
+ Mutation rm = new Mutation(keyspace.getName(), key.getKey());
rm.deleteRange(cfName, SuperColumns.startOf(scfName), SuperColumns.endOf(scfName), 2);
rm.apply();
@@ -822,17 +822,17 @@ public class ColumnFamilyStoreTest extends SchemaLoader
// late insert.
putColsSuper(cfs, key, scfName,
- new Cell(cellname(4L), ByteBufferUtil.bytes("val4"), 1L),
- new Cell(cellname(7L), ByteBufferUtil.bytes("val7"), 1L));
+ new BufferCell(cellname(4L), ByteBufferUtil.bytes("val4"), 1L),
+ new BufferCell(cellname(7L), ByteBufferUtil.bytes("val7"), 1L));
// re-verify delete.
assertRowAndColCount(1, 0, false, cfs.getRangeSlice(Util.range("f", "g"), null, ThriftValidation.asIFilter(sp, cfs.metadata, scfName), 100));
// make sure new writes are recognized.
putColsSuper(cfs, key, scfName,
- new Cell(cellname(3L), ByteBufferUtil.bytes("val3"), 3),
- new Cell(cellname(8L), ByteBufferUtil.bytes("val8"), 3),
- new Cell(cellname(9L), ByteBufferUtil.bytes("val9"), 3));
+ new BufferCell(cellname(3L), ByteBufferUtil.bytes("val3"), 3),
+ new BufferCell(cellname(8L), ByteBufferUtil.bytes("val8"), 3),
+ new BufferCell(cellname(9L), ByteBufferUtil.bytes("val9"), 3));
assertRowAndColCount(1, 3, false, cfs.getRangeSlice(Util.range("f", "g"), null, ThriftValidation.asIFilter(sp, cfs.metadata, scfName), 100));
}
@@ -861,7 +861,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cfs.keyspace.getName(), cfs.name);
for (Cell col : cols)
cf.addColumn(col.withUpdatedName(CellNames.compositeDense(scfName, col.name().toByteBuffer())));
- Mutation rm = new Mutation(cfs.keyspace.getName(), key.key, cf);
+ Mutation rm = new Mutation(cfs.keyspace.getName(), key.getKey(), cf);
rm.apply();
}
@@ -870,7 +870,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cfs.keyspace.getName(), cfs.name);
for (Cell col : cols)
cf.addColumn(col);
- Mutation rm = new Mutation(cfs.keyspace.getName(), key.key, cf);
+ Mutation rm = new Mutation(cfs.keyspace.getName(), key.getKey(), cf);
rm.apply();
}
@@ -902,7 +902,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
assertRowAndColCount(1, 4, false, cfs.getRangeSlice(Util.range("f", "g"), null, ThriftValidation.asIFilter(sp, cfs.metadata, null), 100));
// delete (from sstable and memtable)
- Mutation rm = new Mutation(keyspace.getName(), key.key);
+ Mutation rm = new Mutation(keyspace.getName(), key.getKey());
rm.delete(cfs.name, 2);
rm.apply();
@@ -973,8 +973,8 @@ public class ColumnFamilyStoreTest extends SchemaLoader
DecoratedKey key = Util.dk("slice-get-uuid-type");
// Insert a row with one supercolumn and multiple subcolumns
- putColsSuper(cfs, key, superColName, new Cell(cellname("a"), ByteBufferUtil.bytes("A"), 1),
- new Cell(cellname("b"), ByteBufferUtil.bytes("B"), 1));
+ putColsSuper(cfs, key, superColName, new BufferCell(cellname("a"), ByteBufferUtil.bytes("A"), 1),
+ new BufferCell(cellname("b"), ByteBufferUtil.bytes("B"), 1));
// Get the entire supercolumn like normal
ColumnFamily cfGet = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, cfName, System.currentTimeMillis()));
@@ -985,7 +985,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
SortedSet<CellName> sliceColNames = new TreeSet<CellName>(cfs.metadata.comparator);
sliceColNames.add(CellNames.compositeDense(superColName, ByteBufferUtil.bytes("a")));
sliceColNames.add(CellNames.compositeDense(superColName, ByteBufferUtil.bytes("b")));
- SliceByNamesReadCommand cmd = new SliceByNamesReadCommand(keyspaceName, key.key, cfName, System.currentTimeMillis(), new NamesQueryFilter(sliceColNames));
+ SliceByNamesReadCommand cmd = new SliceByNamesReadCommand(keyspaceName, key.getKey(), cfName, System.currentTimeMillis(), new NamesQueryFilter(sliceColNames));
ColumnFamily cfSliced = cmd.getRow(keyspace).cf;
// Make sure the slice returns the same as the straight get
@@ -1005,7 +1005,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
cfs.clearUnsafe();
// Create a cell a 'high timestamp'
- putColsStandard(cfs, key, new Cell(cname, ByteBufferUtil.bytes("a"), 2));
+ putColsStandard(cfs, key, new BufferCell(cname, ByteBufferUtil.bytes("a"), 2));
cfs.forceBlockingFlush();
// Nuke the metadata and reload that sstable
@@ -1018,10 +1018,10 @@ public class ColumnFamilyStoreTest extends SchemaLoader
cfs.loadNewSSTables();
// Add another cell with a lower timestamp
- putColsStandard(cfs, key, new Cell(cname, ByteBufferUtil.bytes("b"), 1));
+ putColsStandard(cfs, key, new BufferCell(cname, ByteBufferUtil.bytes("b"), 1));
// Test fetching the cell by name returns the first cell
- SliceByNamesReadCommand cmd = new SliceByNamesReadCommand(keyspaceName, key.key, cfName, System.currentTimeMillis(), new NamesQueryFilter(FBUtilities.singleton(cname, cfs.getComparator())));
+ SliceByNamesReadCommand cmd = new SliceByNamesReadCommand(keyspaceName, key.getKey(), cfName, System.currentTimeMillis(), new NamesQueryFilter(FBUtilities.singleton(cname, cfs.getComparator())));
ColumnFamily cf = cmd.getRow(keyspace).cf;
Cell cell = cf.getColumn(cname);
assert cell.value().equals(ByteBufferUtil.bytes("a")) : "expecting a, got " + ByteBufferUtil.string(cell.value());
@@ -1250,7 +1250,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
for (Row row : rows)
{
sb.append("{");
- sb.append(ByteBufferUtil.string(row.key.key));
+ sb.append(ByteBufferUtil.string(row.key.getKey()));
sb.append(":");
if (row.cf != null && !row.cf.isEmpty())
{
@@ -1399,7 +1399,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
Cell[] cols = new Cell[letters.length];
for (int i = 0; i < cols.length; i++)
{
- cols[i] = new Cell(cellname("col" + letters[i].toUpperCase()),
+ cols[i] = new BufferCell(cellname("col" + letters[i].toUpperCase()),
ByteBuffer.wrap(new byte[1]), 1);
}
@@ -1448,7 +1448,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
Cell[] cols = new Cell[letters.length];
for (int i = 0; i < cols.length; i++)
{
- cols[i] = new Cell(cellname("col" + letters[i].toUpperCase()),
+ cols[i] = new BufferCell(cellname("col" + letters[i].toUpperCase()),
ByteBuffer.wrap(new byte[1366]), 1);
}
@@ -1497,7 +1497,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
Cell[] cols = new Cell[letters.length];
for (int i = 0; i < cols.length; i++)
{
- cols[i] = new Cell(cellname("col" + letters[i].toUpperCase()),
+ cols[i] = new BufferCell(cellname("col" + letters[i].toUpperCase()),
ByteBuffer.wrap(new byte[1]), 1);
}
@@ -1547,7 +1547,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
Cell[] cols = new Cell[letters.length];
for (int i = 0; i < cols.length; i++)
{
- cols[i] = new Cell(cellname("col" + letters[i].toUpperCase()),
+ cols[i] = new BufferCell(cellname("col" + letters[i].toUpperCase()),
ByteBuffer.wrap(new byte[1366]), 1);
}
@@ -1595,7 +1595,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
Cell[] cols = new Cell[letters.length];
for (int i = 0; i < cols.length; i++)
{
- cols[i] = new Cell(cellname("col" + letters[i].toUpperCase()),
+ cols[i] = new BufferCell(cellname("col" + letters[i].toUpperCase()),
// use 1366 so that three cols make an index segment
ByteBuffer.wrap(new byte[1366]), 1);
}
@@ -1856,7 +1856,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
Cell[] cols = new Cell[12];
for (int i = 0; i < cols.length; i++)
{
- cols[i] = new Cell(cellname("col" + letters[i]), ByteBuffer.wrap(new byte[valueSize]), 1);
+ cols[i] = new BufferCell(cellname("col" + letters[i]), ByteBuffer.wrap(new byte[valueSize]), 1);
}
for (int i = 0; i < 12; i++)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/test/unit/org/apache/cassandra/db/CounterCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CounterCacheTest.java b/test/unit/org/apache/cassandra/db/CounterCacheTest.java
index a015a43..cb2d97a 100644
--- a/test/unit/org/apache/cassandra/db/CounterCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/CounterCacheTest.java
@@ -75,8 +75,8 @@ public class CounterCacheTest extends SchemaLoader
CacheService.instance.invalidateCounterCache();
ColumnFamily cells = ArrayBackedSortedColumns.factory.create(cfs.metadata);
- cells.addColumn(new CounterUpdateCell(cellname(1), 1L, FBUtilities.timestampMicros()));
- cells.addColumn(new CounterUpdateCell(cellname(2), 2L, FBUtilities.timestampMicros()));
+ cells.addColumn(new BufferCounterUpdateCell(cellname(1), 1L, FBUtilities.timestampMicros()));
+ cells.addColumn(new BufferCounterUpdateCell(cellname(2), 2L, FBUtilities.timestampMicros()));
new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
new CounterMutation(new Mutation(KS, bytes(2), cells), ConsistencyLevel.ONE).apply();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/test/unit/org/apache/cassandra/db/CounterCellTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CounterCellTest.java b/test/unit/org/apache/cassandra/db/CounterCellTest.java
index 86b856c..efc365d 100644
--- a/test/unit/org/apache/cassandra/db/CounterCellTest.java
+++ b/test/unit/org/apache/cassandra/db/CounterCellTest.java
@@ -64,7 +64,7 @@ public class CounterCellTest extends SchemaLoader
public void testCreate()
{
long delta = 3L;
- CounterCell cell = new CounterCell(Util.cellname("x"),
+ CounterCell cell = new BufferCounterCell(Util.cellname("x"),
CounterContext.instance().createLocal(delta),
1L,
Long.MIN_VALUE);
@@ -87,33 +87,33 @@ public class CounterCellTest extends SchemaLoader
ByteBuffer context;
// tombstone + tombstone
- left = new DeletedCell(cellname("x"), 1, 1L);
- right = new DeletedCell(cellname("x"), 2, 2L);
+ left = new BufferDeletedCell(cellname("x"), 1, 1L);
+ right = new BufferDeletedCell(cellname("x"), 2, 2L);
assert left.reconcile(right).getMarkedForDeleteAt() == right.getMarkedForDeleteAt();
assert right.reconcile(left).getMarkedForDeleteAt() == right.getMarkedForDeleteAt();
// tombstone > live
- left = new DeletedCell(cellname("x"), 1, 2L);
- right = CounterCell.createLocal(cellname("x"), 0L, 1L, Long.MIN_VALUE);
+ left = new BufferDeletedCell(cellname("x"), 1, 2L);
+ right = BufferCounterCell.createLocal(cellname("x"), 0L, 1L, Long.MIN_VALUE);
assert left.reconcile(right) == left;
// tombstone < live last delete
- left = new DeletedCell(cellname("x"), 1, 1L);
- right = CounterCell.createLocal(cellname("x"), 0L, 4L, 2L);
+ left = new BufferDeletedCell(cellname("x"), 1, 1L);
+ right = BufferCounterCell.createLocal(cellname("x"), 0L, 4L, 2L);
assert left.reconcile(right) == right;
// tombstone == live last delete
- left = new DeletedCell(cellname("x"), 1, 2L);
- right = CounterCell.createLocal(cellname("x"), 0L, 4L, 2L);
+ left = new BufferDeletedCell(cellname("x"), 1, 2L);
+ right = BufferCounterCell.createLocal(cellname("x"), 0L, 4L, 2L);
assert left.reconcile(right) == right;
// tombstone > live last delete
- left = new DeletedCell(cellname("x"), 1, 4L);
- right = CounterCell.createLocal(cellname("x"), 0L, 9L, 1L);
+ left = new BufferDeletedCell(cellname("x"), 1, 4L);
+ right = BufferCounterCell.createLocal(cellname("x"), 0L, 9L, 1L);
reconciled = left.reconcile(right);
assert reconciled.name() == right.name();
@@ -122,26 +122,26 @@ public class CounterCellTest extends SchemaLoader
assert ((CounterCell)reconciled).timestampOfLastDelete() == left.getMarkedForDeleteAt();
// live < tombstone
- left = CounterCell.createLocal(cellname("x"), 0L, 1L, Long.MIN_VALUE);
- right = new DeletedCell(cellname("x"), 1, 2L);
+ left = BufferCounterCell.createLocal(cellname("x"), 0L, 1L, Long.MIN_VALUE);
+ right = new BufferDeletedCell(cellname("x"), 1, 2L);
assert left.reconcile(right) == right;
// live last delete > tombstone
- left = CounterCell.createLocal(cellname("x"), 0L, 4L, 2L);
- right = new DeletedCell(cellname("x"), 1, 1L);
+ left = BufferCounterCell.createLocal(cellname("x"), 0L, 4L, 2L);
+ right = new BufferDeletedCell(cellname("x"), 1, 1L);
assert left.reconcile(right) == left;
// live last delete == tombstone
- left = CounterCell.createLocal(cellname("x"), 0L, 4L, 2L);
- right = new DeletedCell(cellname("x"), 1, 2L);
+ left = BufferCounterCell.createLocal(cellname("x"), 0L, 4L, 2L);
+ right = new BufferDeletedCell(cellname("x"), 1, 2L);
assert left.reconcile(right) == left;
// live last delete < tombstone
- left = CounterCell.createLocal(cellname("x"), 0L, 9L, 1L);
- right = new DeletedCell(cellname("x"), 1, 4L);
+ left = BufferCounterCell.createLocal(cellname("x"), 0L, 9L, 1L);
+ right = new BufferDeletedCell(cellname("x"), 1, 4L);
reconciled = left.reconcile(right);
assert reconciled.name() == left.name();
@@ -150,20 +150,20 @@ public class CounterCellTest extends SchemaLoader
assert ((CounterCell)reconciled).timestampOfLastDelete() == right.getMarkedForDeleteAt();
// live < live last delete
- left = new CounterCell(cellname("x"), cc.createRemote(CounterId.fromInt(1), 2L, 3L), 1L, Long.MIN_VALUE);
- right = new CounterCell(cellname("x"), cc.createRemote(CounterId.fromInt(1), 1L, 1L), 4L, 3L);
+ left = new BufferCounterCell(cellname("x"), cc.createRemote(CounterId.fromInt(1), 2L, 3L), 1L, Long.MIN_VALUE);
+ right = new BufferCounterCell(cellname("x"), cc.createRemote(CounterId.fromInt(1), 1L, 1L), 4L, 3L);
assert left.reconcile(right) == right;
// live last delete > live
- left = new CounterCell(cellname("x"), cc.createRemote(CounterId.fromInt(1), 2L, 3L), 6L, 5L);
- right = new CounterCell(cellname("x"), cc.createRemote(CounterId.fromInt(1), 1L, 1L), 4L, 3L);
+ left = new BufferCounterCell(cellname("x"), cc.createRemote(CounterId.fromInt(1), 2L, 3L), 6L, 5L);
+ right = new BufferCounterCell(cellname("x"), cc.createRemote(CounterId.fromInt(1), 1L, 1L), 4L, 3L);
assert left.reconcile(right) == left;
// live + live
- left = new CounterCell(cellname("x"), cc.createRemote(CounterId.fromInt(1), 1L, 1L), 4L, Long.MIN_VALUE);
- right = new CounterCell(cellname("x"), cc.createRemote(CounterId.fromInt(1), 2L, 3L), 1L, Long.MIN_VALUE);
+ left = new BufferCounterCell(cellname("x"), cc.createRemote(CounterId.fromInt(1), 1L, 1L), 4L, Long.MIN_VALUE);
+ right = new BufferCounterCell(cellname("x"), cc.createRemote(CounterId.fromInt(1), 2L, 3L), 1L, Long.MIN_VALUE);
reconciled = left.reconcile(right);
assert reconciled.name().equals(left.name());
@@ -171,7 +171,7 @@ public class CounterCellTest extends SchemaLoader
assert reconciled.timestamp() == 4L;
left = reconciled;
- right = new CounterCell(cellname("x"), cc.createRemote(CounterId.fromInt(2), 1L, 5L), 2L, Long.MIN_VALUE);
+ right = new BufferCounterCell(cellname("x"), cc.createRemote(CounterId.fromInt(2), 1L, 5L), 2L, Long.MIN_VALUE);
reconciled = left.reconcile(right);
assert reconciled.name().equals(left.name());
@@ -179,7 +179,7 @@ public class CounterCellTest extends SchemaLoader
assert reconciled.timestamp() == 4L;
left = reconciled;
- right = new CounterCell(cellname("x"), cc.createRemote(CounterId.fromInt(2), 2L, 2L), 6L, Long.MIN_VALUE);
+ right = new BufferCounterCell(cellname("x"), cc.createRemote(CounterId.fromInt(2), 2L, 2L), 6L, Long.MIN_VALUE);
reconciled = left.reconcile(right);
assert reconciled.name().equals(left.name());
@@ -211,15 +211,15 @@ public class CounterCellTest extends SchemaLoader
CounterCell rightCell;
// timestamp
- leftCell = CounterCell.createLocal(cellname("x"), 0, 1L, Long.MIN_VALUE);
- rightCell = CounterCell.createLocal(cellname("x"), 0, 2L, Long.MIN_VALUE);
+ leftCell = BufferCounterCell.createLocal(cellname("x"), 0, 1L, Long.MIN_VALUE);
+ rightCell = BufferCounterCell.createLocal(cellname("x"), 0, 2L, Long.MIN_VALUE);
assert rightCell == leftCell.diff(rightCell);
assert null == rightCell.diff(leftCell);
// timestampOfLastDelete
- leftCell = CounterCell.createLocal(cellname("x"), 0, 1L, 1L);
- rightCell = CounterCell.createLocal(cellname("x"), 0, 1L, 2L);
+ leftCell = BufferCounterCell.createLocal(cellname("x"), 0, 1L, 1L);
+ rightCell = BufferCounterCell.createLocal(cellname("x"), 0, 1L, 2L);
assert rightCell == leftCell.diff(rightCell);
assert null == rightCell.diff(leftCell);
@@ -231,8 +231,8 @@ public class CounterCellTest extends SchemaLoader
left.writeRemote(CounterId.fromInt(9), 1L, 0L);
right = ContextState.wrap(ByteBufferUtil.clone(left.context));
- leftCell = new CounterCell(cellname("x"), left.context, 1L);
- rightCell = new CounterCell(cellname("x"), right.context, 1L);
+ leftCell = new BufferCounterCell(cellname("x"), left.context, 1L);
+ rightCell = new BufferCounterCell(cellname("x"), right.context, 1L);
assert leftCell.diff(rightCell) == null;
// greater than: left has superset of nodes (counts equal)
@@ -247,8 +247,8 @@ public class CounterCellTest extends SchemaLoader
right.writeRemote(CounterId.fromInt(6), 2L, 0L);
right.writeRemote(CounterId.fromInt(9), 1L, 0L);
- leftCell = new CounterCell(cellname("x"), left.context, 1L);
- rightCell = new CounterCell(cellname("x"), right.context, 1L);
+ leftCell = new BufferCounterCell(cellname("x"), left.context, 1L);
+ rightCell = new BufferCounterCell(cellname("x"), right.context, 1L);
assert leftCell.diff(rightCell) == null;
// less than: right has subset of nodes (counts equal)
@@ -265,8 +265,8 @@ public class CounterCellTest extends SchemaLoader
right.writeRemote(CounterId.fromInt(6), 1L, 0L);
right.writeRemote(CounterId.fromInt(9), 1L, 0L);
- leftCell = new CounterCell(cellname("x"), left.context, 1L);
- rightCell = new CounterCell(cellname("x"), right.context, 1L);
+ leftCell = new BufferCounterCell(cellname("x"), left.context, 1L);
+ rightCell = new BufferCounterCell(cellname("x"), right.context, 1L);
assert rightCell == leftCell.diff(rightCell);
assert leftCell == rightCell.diff(leftCell);
}
@@ -281,7 +281,7 @@ public class CounterCellTest extends SchemaLoader
state.writeLocal(CounterId.fromInt(4), 4L, 4L);
CellNameType type = new SimpleDenseCellNameType(UTF8Type.instance);
- CounterCell original = new CounterCell(cellname("x"), state.context, 1L);
+ CounterCell original = new BufferCounterCell(cellname("x"), state.context, 1L);
byte[] serialized;
try (DataOutputBuffer bufOut = new DataOutputBuffer())
{
@@ -315,8 +315,8 @@ public class CounterCellTest extends SchemaLoader
state.writeRemote(CounterId.fromInt(3), 4L, 4L);
state.writeLocal(CounterId.fromInt(4), 4L, 4L);
- CounterCell original = new CounterCell(cellname("x"), state.context, 1L);
- CounterCell cleared = new CounterCell(cellname("x"), cc.clearAllLocal(state.context), 1L);
+ CounterCell original = new BufferCounterCell(cellname("x"), state.context, 1L);
+ CounterCell cleared = new BufferCounterCell(cellname("x"), cc.clearAllLocal(state.context), 1L);
original.updateDigest(digest1);
cleared.updateDigest(digest2);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/test/unit/org/apache/cassandra/db/KeyCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/KeyCacheTest.java b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
index ec99fd1..e6745a1 100644
--- a/test/unit/org/apache/cassandra/db/KeyCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
@@ -120,10 +120,10 @@ public class KeyCacheTest extends SchemaLoader
Mutation rm;
// inserts
- rm = new Mutation(KEYSPACE1, key1.key);
+ rm = new Mutation(KEYSPACE1, key1.getKey());
rm.add(COLUMN_FAMILY1, Util.cellname("1"), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
rm.apply();
- rm = new Mutation(KEYSPACE1, key2.key);
+ rm = new Mutation(KEYSPACE1, key2.getKey());
rm.add(COLUMN_FAMILY1, Util.cellname("2"), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
rm.apply();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/test/unit/org/apache/cassandra/db/KeyCollisionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/KeyCollisionTest.java b/test/unit/org/apache/cassandra/db/KeyCollisionTest.java
index 57f077f..1869872 100644
--- a/test/unit/org/apache/cassandra/db/KeyCollisionTest.java
+++ b/test/unit/org/apache/cassandra/db/KeyCollisionTest.java
@@ -73,10 +73,10 @@ public class KeyCollisionTest extends SchemaLoader
List<Row> rows = cfs.getRangeSlice(new Bounds<RowPosition>(dk("k2"), dk("key2")), null, new IdentityQueryFilter(), 10000);
assert rows.size() == 4 : "Expecting 4 keys, got " + rows.size();
- assert rows.get(0).key.key.equals(ByteBufferUtil.bytes("k2"));
- assert rows.get(1).key.key.equals(ByteBufferUtil.bytes("k3"));
- assert rows.get(2).key.key.equals(ByteBufferUtil.bytes("key1"));
- assert rows.get(3).key.key.equals(ByteBufferUtil.bytes("key2"));
+ assert rows.get(0).key.getKey().equals(ByteBufferUtil.bytes("k2"));
+ assert rows.get(1).key.getKey().equals(ByteBufferUtil.bytes("k3"));
+ assert rows.get(2).key.getKey().equals(ByteBufferUtil.bytes("key1"));
+ assert rows.get(3).key.getKey().equals(ByteBufferUtil.bytes("key2"));
}
private void insert(String... keys)
@@ -102,7 +102,7 @@ public class KeyCollisionTest extends SchemaLoader
public DecoratedKey decorateKey(ByteBuffer key)
{
- return new DecoratedKey(getToken(key), key);
+ return new BufferDecoratedKey(getToken(key), key);
}
public Token midpoint(Token ltoken, Token rtoken)