You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2014/01/02 21:20:13 UTC
git commit: Use Atomic*FieldUpdater to save memory.
Updated Branches:
refs/heads/trunk 8165af5db -> 7aa3364e0
Use Atomic*FieldUpdater to save memory.
Patch by marcuse, reviewed by belliottsmith for CASSANDRA-6281.
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7aa3364e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7aa3364e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7aa3364e
Branch: refs/heads/trunk
Commit: 7aa3364e04b286ac7b41cfadda568df41e4e2821
Parents: 8165af5
Author: Marcus Eriksson <ma...@apache.org>
Authored: Thu Jan 2 21:17:14 2014 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Thu Jan 2 21:17:14 2014 +0100
----------------------------------------------------------------------
.../cassandra/db/AtomicSortedColumns.java | 56 ++++++++++----------
.../service/DatacenterWriteResponseHandler.java | 3 +-
.../apache/cassandra/service/ReadCallback.java | 18 ++++---
.../cassandra/service/WriteResponseHandler.java | 12 +++--
4 files changed, 47 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aa3364e/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
index 6e4fd01..b1f1e59 100644
--- a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
@@ -18,7 +18,7 @@
package org.apache.cassandra.db;
import java.util.*;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
@@ -49,7 +49,9 @@ import org.apache.cassandra.utils.Allocator;
*/
public class AtomicSortedColumns extends ColumnFamily
{
- private final AtomicReference<Holder> ref;
+ private volatile Holder ref;
+ private static final AtomicReferenceFieldUpdater<AtomicSortedColumns, Holder> refUpdater
+ = AtomicReferenceFieldUpdater.newUpdater(AtomicSortedColumns.class, Holder.class, "ref");
public static final ColumnFamily.Factory<AtomicSortedColumns> factory = new Factory<AtomicSortedColumns>()
{
@@ -67,12 +69,12 @@ public class AtomicSortedColumns extends ColumnFamily
private AtomicSortedColumns(CFMetaData metadata, Holder holder)
{
super(metadata);
- this.ref = new AtomicReference<>(holder);
+ this.ref = holder;
}
public CellNameType getComparator()
{
- return (CellNameType)ref.get().map.comparator();
+ return (CellNameType)ref.map.comparator();
}
public ColumnFamily.Factory getFactory()
@@ -82,12 +84,12 @@ public class AtomicSortedColumns extends ColumnFamily
public ColumnFamily cloneMe()
{
- return new AtomicSortedColumns(metadata, ref.get().cloneMe());
+ return new AtomicSortedColumns(metadata, ref.cloneMe());
}
public DeletionInfo deletionInfo()
{
- return ref.get().deletionInfo;
+ return ref.deletionInfo;
}
public void delete(DeletionTime delTime)
@@ -108,29 +110,29 @@ public class AtomicSortedColumns extends ColumnFamily
// Keeping deletion info for max markedForDeleteAt value
while (true)
{
- Holder current = ref.get();
+ Holder current = ref;
DeletionInfo newDelInfo = current.deletionInfo.copy().add(info);
- if (ref.compareAndSet(current, current.with(newDelInfo)))
+ if (refUpdater.compareAndSet(this, current, current.with(newDelInfo)))
break;
}
}
public void setDeletionInfo(DeletionInfo newInfo)
{
- ref.set(ref.get().with(newInfo));
+ ref = ref.with(newInfo);
}
public void purgeTombstones(int gcBefore)
{
while (true)
{
- Holder current = ref.get();
+ Holder current = ref;
if (!current.deletionInfo.hasPurgeableTombstones(gcBefore))
break;
DeletionInfo purgedInfo = current.deletionInfo.copy();
purgedInfo.purge(gcBefore);
- if (ref.compareAndSet(current, current.with(purgedInfo)))
+ if (refUpdater.compareAndSet(this, current, current.with(purgedInfo)))
break;
}
}
@@ -140,11 +142,11 @@ public class AtomicSortedColumns extends ColumnFamily
Holder current, modified;
do
{
- current = ref.get();
+ current = ref;
modified = current.cloneMe();
modified.addColumn(cell, allocator, SecondaryIndexManager.nullUpdater);
}
- while (!ref.compareAndSet(current, modified));
+ while (!refUpdater.compareAndSet(this, current, modified));
}
public void addAll(ColumnFamily cm, Allocator allocator, Function<Cell, Cell> transformation)
@@ -177,7 +179,7 @@ public class AtomicSortedColumns extends ColumnFamily
do
{
sizeDelta = 0;
- current = ref.get();
+ current = ref;
DeletionInfo newDelInfo = current.deletionInfo.copy().add(cm.deletionInfo());
modified = new Holder(current.map.clone(), newDelInfo);
@@ -194,11 +196,11 @@ public class AtomicSortedColumns extends ColumnFamily
{
sizeDelta += modified.addColumn(transformation.apply(cell), allocator, indexer);
// bail early if we know we've been beaten
- if (ref.get() != current)
+ if (ref != current)
continue main_loop;
}
}
- while (!ref.compareAndSet(current, modified));
+ while (!refUpdater.compareAndSet(this, current, modified));
indexer.updateRowLevelIndexes();
@@ -214,11 +216,11 @@ public class AtomicSortedColumns extends ColumnFamily
boolean replaced;
do
{
- current = ref.get();
+ current = ref;
modified = current.cloneMe();
replaced = modified.map.replace(oldCell.name(), oldCell, newCell);
}
- while (!ref.compareAndSet(current, modified));
+ while (!refUpdater.compareAndSet(this, current, modified));
return replaced;
}
@@ -227,45 +229,45 @@ public class AtomicSortedColumns extends ColumnFamily
Holder current, modified;
do
{
- current = ref.get();
+ current = ref;
modified = current.clear();
}
- while (!ref.compareAndSet(current, modified));
+ while (!refUpdater.compareAndSet(this, current, modified));
}
public Cell getColumn(CellName name)
{
- return ref.get().map.get(name);
+ return ref.map.get(name);
}
public SortedSet<CellName> getColumnNames()
{
- return ref.get().map.keySet();
+ return ref.map.keySet();
}
public Collection<Cell> getSortedColumns()
{
- return ref.get().map.values();
+ return ref.map.values();
}
public Collection<Cell> getReverseSortedColumns()
{
- return ref.get().map.descendingMap().values();
+ return ref.map.descendingMap().values();
}
public int getColumnCount()
{
- return ref.get().map.size();
+ return ref.map.size();
}
public Iterator<Cell> iterator(ColumnSlice[] slices)
{
- return new ColumnSlice.NavigableMapIterator(ref.get().map, slices);
+ return new ColumnSlice.NavigableMapIterator(ref.map, slices);
}
public Iterator<Cell> reverseIterator(ColumnSlice[] slices)
{
- return new ColumnSlice.NavigableMapIterator(ref.get().map.descendingMap(), slices);
+ return new ColumnSlice.NavigableMapIterator(ref.map.descendingMap(), slices);
}
public boolean isInsertReversed()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aa3364e/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
index 5530374..96fc96d 100644
--- a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
@@ -50,8 +50,7 @@ public class DatacenterWriteResponseHandler extends WriteResponseHandler
{
if (message == null || DatabaseDescriptor.getLocalDataCenter().equals(snitch.getDatacenter(message.from)))
{
- if (responses.decrementAndGet() == 0)
- signal();
+ super.response(message);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aa3364e/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index d665242..ff6a8d4 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -21,7 +21,7 @@ import java.net.InetAddress;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
@@ -54,7 +54,9 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
final List<InetAddress> endpoints;
private final IReadCommand command;
private final ConsistencyLevel consistencyLevel;
- private final AtomicInteger received = new AtomicInteger(0);
+ private static final AtomicIntegerFieldUpdater<ReadCallback> recievedUpdater
+ = AtomicIntegerFieldUpdater.newUpdater(ReadCallback.class, "received");
+ private volatile int received = 0;
private final Keyspace keyspace; // TODO push this into ConsistencyLevel?
/**
@@ -96,10 +98,10 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
if (!await(command.getTimeout(), TimeUnit.MILLISECONDS))
{
// Same as for writes, see AbstractWriteResponseHandler
- int acks = received.get();
+ int acks = received;
if (resolver.isDataPresent() && acks >= blockfor)
acks = blockfor - 1;
- ReadTimeoutException ex = new ReadTimeoutException(consistencyLevel, received.get(), blockfor, resolver.isDataPresent());
+ ReadTimeoutException ex = new ReadTimeoutException(consistencyLevel, received, blockfor, resolver.isDataPresent());
if (logger.isDebugEnabled())
logger.debug("Read timeout: {}", ex.toString());
throw ex;
@@ -112,8 +114,8 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
{
resolver.preprocess(message);
int n = waitingFor(message)
- ? received.incrementAndGet()
- : received.get();
+ ? recievedUpdater.incrementAndGet(this)
+ : received;
if (n >= blockfor && resolver.isDataPresent())
{
condition.signalAll();
@@ -136,7 +138,7 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
*/
public int getReceivedCount()
{
- return received.get();
+ return received;
}
public void response(TMessage result)
@@ -155,7 +157,7 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
*/
protected void maybeResolveForRepair()
{
- if (blockfor < endpoints.size() && received.get() == endpoints.size())
+ if (blockfor < endpoints.size() && received == endpoints.size())
{
assert resolver.isDataPresent();
StageManager.getStage(Stage.READ_REPAIR).execute(new AsyncRepairRunner());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aa3364e/src/java/org/apache/cassandra/service/WriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/WriteResponseHandler.java b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
index 826ae01..df23b19 100644
--- a/src/java/org/apache/cassandra/service/WriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
@@ -21,7 +21,7 @@ import java.net.InetAddress;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,7 +38,9 @@ public class WriteResponseHandler extends AbstractWriteResponseHandler
{
protected static final Logger logger = LoggerFactory.getLogger(WriteResponseHandler.class);
- protected final AtomicInteger responses;
+ protected volatile int responses;
+ private static final AtomicIntegerFieldUpdater<WriteResponseHandler> responsesUpdater
+ = AtomicIntegerFieldUpdater.newUpdater(WriteResponseHandler.class, "responses");
public WriteResponseHandler(Collection<InetAddress> writeEndpoints,
Collection<InetAddress> pendingEndpoints,
@@ -48,7 +50,7 @@ public class WriteResponseHandler extends AbstractWriteResponseHandler
WriteType writeType)
{
super(keyspace, writeEndpoints, pendingEndpoints, consistencyLevel, callback, writeType);
- responses = new AtomicInteger(totalBlockFor());
+ responses = totalBlockFor();
}
public WriteResponseHandler(InetAddress endpoint, WriteType writeType, Runnable callback)
@@ -63,13 +65,13 @@ public class WriteResponseHandler extends AbstractWriteResponseHandler
public void response(MessageIn m)
{
- if (responses.decrementAndGet() == 0)
+ if (responsesUpdater.decrementAndGet(this) == 0)
signal();
}
protected int ackCount()
{
- return totalBlockFor() - responses.get();
+ return totalBlockFor() - responses;
}
public boolean isLatencyForSnitch()