You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/01/17 01:42:04 UTC
git commit: Pre-6504 cleanup and fixups
Updated Branches:
refs/heads/trunk fac3042db -> c8a0a3a68
Pre-6504 cleanup and fixups
patch by Aleksey Yeschenko; reviewed by Sylvain Lebresne for
CASSANDRA-6504
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c8a0a3a6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c8a0a3a6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c8a0a3a6
Branch: refs/heads/trunk
Commit: c8a0a3a689d965af8de683f2d831f2c422105670
Parents: fac3042
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Fri Jan 17 03:41:20 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Fri Jan 17 03:41:20 2014 +0300
----------------------------------------------------------------------
conf/cassandra.yaml | 2 +-
.../org/apache/cassandra/config/CFMetaData.java | 5 ++
.../org/apache/cassandra/config/Config.java | 18 +++----
.../cql3/statements/CreateIndexStatement.java | 2 +-
.../cql3/statements/ModificationStatement.java | 2 +-
src/java/org/apache/cassandra/db/Cell.java | 14 ++----
.../org/apache/cassandra/db/ColumnFamily.java | 2 +-
.../org/apache/cassandra/db/CounterCell.java | 44 ++++++-----------
.../apache/cassandra/db/CounterMutation.java | 12 ++---
.../db/CounterMutationVerbHandler.java | 9 ++--
.../apache/cassandra/db/CounterUpdateCell.java | 7 +++
.../org/apache/cassandra/db/ExpiringCell.java | 6 +--
.../db/compaction/CompactionManager.java | 2 +-
.../cassandra/db/compaction/Scrubber.java | 2 +-
.../db/marshal/AbstractCommutativeType.java | 50 --------------------
.../cassandra/db/marshal/AbstractType.java | 8 +---
.../cassandra/db/marshal/CounterColumnType.java | 30 +++++++-----
.../apache/cassandra/service/StorageProxy.java | 21 ++++----
.../cassandra/thrift/CassandraServer.java | 4 +-
.../cassandra/thrift/ThriftValidation.java | 8 ++--
20 files changed, 91 insertions(+), 157 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8a0a3a6/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 1f9fd8b..2f1c8fa 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -157,7 +157,7 @@ key_cache_save_period: 14400
row_cache_size_in_mb: 0
# Duration in seconds after which Cassandra should
-# safe the row cache. Caches are saved to saved_caches_directory as specified
+# save the row cache. Caches are saved to saved_caches_directory as specified
# in this configuration file.
#
# Saved caches greatly improve cold-start speeds, and is relatively cheap in
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8a0a3a6/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index cdc4cdb..817d4a3 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -2149,6 +2149,11 @@ public final class CFMetaData
return true;
}
+ public boolean isCounter()
+ {
+ return defaultValidator.isCounter();
+ }
+
public void validateColumns(Iterable<Cell> columns)
{
for (Cell cell : columns)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8a0a3a6/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 5c737d4..2ea8e38 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -50,25 +50,25 @@ public class Config
public String initial_token;
public Integer num_tokens = 1;
- public volatile Long request_timeout_in_ms = new Long(10000);
+ public volatile Long request_timeout_in_ms = 10000L;
- public Long read_request_timeout_in_ms = new Long(5000);
+ public volatile Long read_request_timeout_in_ms = 5000L;
- public Long range_request_timeout_in_ms = new Long(10000);
+ public volatile Long range_request_timeout_in_ms = 10000L;
- public Long write_request_timeout_in_ms = new Long(2000);
+ public volatile Long write_request_timeout_in_ms = 2000L;
- public Long cas_contention_timeout_in_ms = new Long(1000);
+ public volatile Long cas_contention_timeout_in_ms = 1000L;
- public Long truncate_request_timeout_in_ms = new Long(60000);
+ public volatile Long truncate_request_timeout_in_ms = 60000L;
- public Integer streaming_socket_timeout_in_ms = new Integer(0);
+ public Integer streaming_socket_timeout_in_ms = 0;
public boolean cross_node_timeout = false;
public volatile Double phi_convict_threshold = 8.0;
- public Integer concurrent_reads = 8;
+ public Integer concurrent_reads = 32;
public Integer concurrent_writes = 32;
public Integer concurrent_replicates = 32;
@@ -161,7 +161,7 @@ public class Config
public Long key_cache_size_in_mb = null;
public volatile int key_cache_save_period = 14400;
- public int key_cache_keys_to_save = Integer.MAX_VALUE;
+ public volatile int key_cache_keys_to_save = Integer.MAX_VALUE;
public long row_cache_size_in_mb = 0;
public volatile int row_cache_save_period = 0;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8a0a3a6/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
index ca43d20..d0478f5 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
@@ -69,7 +69,7 @@ public class CreateIndexStatement extends SchemaAlteringStatement
public void validate(ClientState state) throws RequestValidationException
{
CFMetaData cfm = ThriftValidation.validateColumnFamily(keyspace(), columnFamily());
- if (cfm.getDefaultValidator().isCommutative())
+ if (cfm.isCounter())
throw new InvalidRequestException("Secondary indexes are not supported on counter tables");
ColumnDefinition cd = cfm.getColumnDefinition(target.column);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8a0a3a6/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 148edda..c2a0080 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -91,7 +91,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
public boolean isCounter()
{
- return cfm.getDefaultValidator().isCommutative();
+ return cfm.isCounter();
}
public long getTimestamp(long now, List<ByteBuffer> variables) throws InvalidRequestException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8a0a3a6/src/java/org/apache/cassandra/db/Cell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Cell.java b/src/java/org/apache/cassandra/db/Cell.java
index 537a744..3e04f9b 100644
--- a/src/java/org/apache/cassandra/db/Cell.java
+++ b/src/java/org/apache/cassandra/db/Cell.java
@@ -268,15 +268,11 @@ public class Cell implements OnDiskAtom
public String getString(CellNameType comparator)
{
- StringBuilder sb = new StringBuilder();
- sb.append(comparator.getString(name));
- sb.append(":");
- sb.append(isMarkedForDelete(System.currentTimeMillis()));
- sb.append(":");
- sb.append(value.remaining());
- sb.append("@");
- sb.append(timestamp());
- return sb.toString();
+ return String.format("%s:%b:%d@%d",
+ comparator.getString(name),
+ isMarkedForDelete(System.currentTimeMillis()),
+ value.remaining(),
+ timestamp);
}
protected void validateName(CFMetaData metadata) throws MarshalException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8a0a3a6/src/java/org/apache/cassandra/db/ColumnFamily.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java
index 2ea60f1..9ce6b0c 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamily.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamily.java
@@ -125,7 +125,7 @@ public abstract class ColumnFamily implements Iterable<Cell>, IRowCacheEntry
public void addColumn(CellName name, ByteBuffer value, long timestamp, int timeToLive)
{
- assert !metadata().getDefaultValidator().isCommutative();
+ assert !metadata().isCounter();
Cell cell = Cell.create(name, value, timestamp, timeToLive, metadata());
addColumn(cell);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8a0a3a6/src/java/org/apache/cassandra/db/CounterCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterCell.java b/src/java/org/apache/cassandra/db/CounterCell.java
index 0a1c992..76949d4 100644
--- a/src/java/org/apache/cassandra/db/CounterCell.java
+++ b/src/java/org/apache/cassandra/db/CounterCell.java
@@ -21,9 +21,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.db.composites.CellNameType;
@@ -39,8 +36,6 @@ import org.apache.cassandra.utils.*;
*/
public class CounterCell extends Cell
{
- private static final Logger logger = LoggerFactory.getLogger(CounterCell.class);
-
protected static final CounterContext contextManager = CounterContext.instance();
private final long timestampOfLastDelete;
@@ -92,10 +87,7 @@ public class CounterCell extends Cell
@Override
public int dataSize()
{
- /*
- * A counter column adds to a Cell :
- * + 8 bytes for timestampOfLastDelete
- */
+ // A counter column adds 8 bytes for timestampOfLastDelete to Cell.
return super.dataSize() + TypeSizes.NATIVE.sizeof(timestampOfLastDelete);
}
@@ -157,8 +149,6 @@ public class CounterCell extends Cell
@Override
public Cell reconcile(Cell cell, Allocator allocator)
{
- assert (cell instanceof CounterCell) || (cell instanceof DeletedCell) : "Wrong class type: " + cell.getClass();
-
// live + tombstone: track last tombstone
if (cell.isMarkedForDelete(Long.MIN_VALUE)) // cannot be an expired cell, so the current time is irrelevant
{
@@ -175,6 +165,9 @@ public class CounterCell extends Cell
// live last delete < tombstone
return new CounterCell(name(), value(), timestamp(), cell.timestamp());
}
+
+ assert cell instanceof CounterCell : "Wrong class type: " + cell.getClass();
+
// live < live last delete
if (timestamp() < ((CounterCell) cell).timestampOfLastDelete())
return cell;
@@ -182,11 +175,10 @@ public class CounterCell extends Cell
if (timestampOfLastDelete() > cell.timestamp())
return this;
// live + live: merge clocks; update value
- return new CounterCell(
- name(),
- contextManager.merge(value(), cell.value(), allocator),
- Math.max(timestamp(), cell.timestamp()),
- Math.max(timestampOfLastDelete(), ((CounterCell) cell).timestampOfLastDelete()));
+ return new CounterCell(name(),
+ contextManager.merge(value(), cell.value(), allocator),
+ Math.max(timestamp(), cell.timestamp()),
+ Math.max(timestampOfLastDelete(), ((CounterCell) cell).timestampOfLastDelete()));
}
@Override
@@ -199,9 +191,7 @@ public class CounterCell extends Cell
@Override
public int hashCode()
{
- int result = super.hashCode();
- result = 31 * result + (int)(timestampOfLastDelete ^ (timestampOfLastDelete >>> 32));
- return result;
+ return 31 * super.hashCode() + (int)(timestampOfLastDelete ^ (timestampOfLastDelete >>> 32));
}
@Override
@@ -219,17 +209,11 @@ public class CounterCell extends Cell
@Override
public String getString(CellNameType comparator)
{
- StringBuilder sb = new StringBuilder();
- sb.append(comparator.getString(name));
- sb.append(":");
- sb.append(false);
- sb.append(":");
- sb.append(contextManager.toString(value));
- sb.append("@");
- sb.append(timestamp());
- sb.append("!");
- sb.append(timestampOfLastDelete);
- return sb.toString();
+ return String.format("%s:false:%s@%d!%d",
+ comparator.getString(name),
+ contextManager.toString(value),
+ timestamp,
+ timestampOfLastDelete);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8a0a3a6/src/java/org/apache/cassandra/db/CounterMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java b/src/java/org/apache/cassandra/db/CounterMutation.java
index a07dd9b..7dcb05c 100644
--- a/src/java/org/apache/cassandra/db/CounterMutation.java
+++ b/src/java/org/apache/cassandra/db/CounterMutation.java
@@ -78,7 +78,7 @@ public class CounterMutation implements IMutation
public Mutation makeReplicationMutation()
{
- List<ReadCommand> readCommands = new LinkedList<ReadCommand>();
+ List<ReadCommand> readCommands = new LinkedList<>();
long timestamp = System.currentTimeMillis();
for (ColumnFamily columnFamily : mutation.getColumnFamilies())
{
@@ -111,7 +111,7 @@ public class CounterMutation implements IMutation
public MessageOut<CounterMutation> makeMutationMessage()
{
- return new MessageOut<CounterMutation>(MessagingService.Verb.COUNTER_MUTATION, this, serializer);
+ return new MessageOut<>(MessagingService.Verb.COUNTER_MUTATION, this, serializer);
}
public boolean shouldReplicateOnWrite()
@@ -133,9 +133,7 @@ public class CounterMutation implements IMutation
ColumnFamily cf = cf_.cloneMeShallow();
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cf.id());
for (Cell cell : cf_)
- {
cf.addColumn(cell.localCopy(cfs), HeapAllocator.instance);
- }
m.add(cf);
}
m.apply();
@@ -145,7 +143,6 @@ public class CounterMutation implements IMutation
{
if (!(m instanceof CounterMutation))
throw new IllegalArgumentException();
-
CounterMutation cm = (CounterMutation)m;
mutation.addAll(cm.mutation);
}
@@ -158,10 +155,7 @@ public class CounterMutation implements IMutation
public String toString(boolean shallow)
{
- StringBuilder buff = new StringBuilder("CounterMutation(");
- buff.append(mutation.toString(shallow));
- buff.append(", ").append(consistency.toString());
- return buff.append(")").toString();
+ return String.format("CounterMutation(%s, %s)", mutation.toString(shallow), consistency);
}
public static class CounterMutationSerializer implements IVersionedSerializer<CounterMutation>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8a0a3a6/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
index 966a015..d65fbd7 100644
--- a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
@@ -37,8 +37,7 @@ public class CounterMutationVerbHandler implements IVerbHandler<CounterMutation>
try
{
final CounterMutation cm = message.payload;
- if (logger.isDebugEnabled())
- logger.debug("Applying forwarded {}", cm);
+ logger.debug("Applying forwarded {}", cm);
String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
// We should not wait for the result of the write in this thread,
@@ -48,11 +47,11 @@ public class CounterMutationVerbHandler implements IVerbHandler<CounterMutation>
// will not be called if the request timeout, but this is ok
// because the coordinator of the counter mutation will timeout on
// it's own in that case.
- StorageProxy.applyCounterMutationOnLeader(cm, localDataCenter, new Runnable(){
+ StorageProxy.applyCounterMutationOnLeader(cm, localDataCenter, new Runnable()
+ {
public void run()
{
- WriteResponse response = new WriteResponse();
- MessagingService.instance().sendReply(response.createMessage(), id, message.from);
+ MessagingService.instance().sendReply(new WriteResponse().createMessage(), id, message.from);
}
});
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8a0a3a6/src/java/org/apache/cassandra/db/CounterUpdateCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterUpdateCell.java b/src/java/org/apache/cassandra/db/CounterUpdateCell.java
index f6bb3d4..dd2bf2a 100644
--- a/src/java/org/apache/cassandra/db/CounterUpdateCell.java
+++ b/src/java/org/apache/cassandra/db/CounterUpdateCell.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db;
import java.nio.ByteBuffer;
import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
import org.apache.cassandra.db.context.CounterContext;
import org.apache.cassandra.utils.Allocator;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -96,4 +97,10 @@ public class CounterUpdateCell extends Cell
timestamp(),
Long.MIN_VALUE);
}
+
+ @Override
+ public String getString(CellNameType comparator)
+ {
+ return String.format("%s:%s@%d", comparator.getString(name), ByteBufferUtil.toLong(value), timestamp);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8a0a3a6/src/java/org/apache/cassandra/db/ExpiringCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ExpiringCell.java b/src/java/org/apache/cassandra/db/ExpiringCell.java
index 2b9541c..b15514e 100644
--- a/src/java/org/apache/cassandra/db/ExpiringCell.java
+++ b/src/java/org/apache/cassandra/db/ExpiringCell.java
@@ -147,11 +147,7 @@ public class ExpiringCell extends Cell
@Override
public String getString(CellNameType comparator)
{
- StringBuilder sb = new StringBuilder();
- sb.append(super.getString(comparator));
- sb.append("!");
- sb.append(timeToLive);
- return sb.toString();
+ return String.format("%s!%d", super.getString(comparator), timeToLive);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8a0a3a6/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 7927574..2a8d68d 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -616,7 +616,7 @@ public class CompactionManager implements CompactionManagerMBean
{
public static CleanupStrategy get(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, CounterId.OneShotRenewer renewer)
{
- if (cfs.indexManager.hasIndexes() || cfs.metadata.getDefaultValidator().isCommutative())
+ if (cfs.indexManager.hasIndexes() || cfs.metadata.isCounter())
return new Full(cfs, ranges, renewer);
return new Bounded(cfs, ranges);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8a0a3a6/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index eabfdbc..978865c 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -84,7 +84,7 @@ public class Scrubber implements Closeable
this.controller = isOffline
? new ScrubController(cfs)
: new CompactionController(cfs, Collections.singleton(sstable), CompactionManager.getDefaultGcBefore(cfs));
- this.isCommutative = cfs.metadata.getDefaultValidator().isCommutative();
+ this.isCommutative = cfs.metadata.isCounter();
this.expectedBloomFilterSize = Math.max(cfs.metadata.getIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(toScrub)));
// loop through each row, deserializing to check for damage.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8a0a3a6/src/java/org/apache/cassandra/db/marshal/AbstractCommutativeType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractCommutativeType.java b/src/java/org/apache/cassandra/db/marshal/AbstractCommutativeType.java
deleted file mode 100644
index 4b26d5d..0000000
--- a/src/java/org/apache/cassandra/db/marshal/AbstractCommutativeType.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.marshal;
-
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.context.CounterContext;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-public abstract class AbstractCommutativeType extends AbstractType<Long>
-{
- public boolean isCommutative()
- {
- return true;
- }
-
- @Override
- public Long compose(ByteBuffer bytes)
- {
- return CounterContext.instance().total(bytes);
- }
-
- @Override
- public ByteBuffer decompose(Long value)
- {
- return ByteBufferUtil.bytes(value);
- }
-
- /**
- * create commutative column
- */
- public abstract Cell createColumn(CellName name, ByteBuffer value, long timestamp);
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8a0a3a6/src/java/org/apache/cassandra/db/marshal/AbstractType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractType.java b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
index cefa465..ce233de 100644
--- a/src/java/org/apache/cassandra/db/marshal/AbstractType.java
+++ b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
@@ -104,12 +104,6 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer>
public abstract TypeSerializer<T> getSerializer();
- /** @deprecated use reverseComparator field instead */
- public Comparator<ByteBuffer> getReverseComparator()
- {
- return reverseComparator;
- }
-
/* convenience method */
public String getString(Collection<ByteBuffer> names)
{
@@ -121,7 +115,7 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer>
return builder.toString();
}
- public boolean isCommutative()
+ public boolean isCounter()
{
return false;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8a0a3a6/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java b/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java
index e1a886d..73e9f6f 100644
--- a/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java
@@ -20,18 +20,34 @@ package org.apache.cassandra.db.marshal;
import java.nio.ByteBuffer;
import org.apache.cassandra.cql3.CQL3Type;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.context.CounterContext;
import org.apache.cassandra.serializers.TypeSerializer;
import org.apache.cassandra.serializers.CounterSerializer;
import org.apache.cassandra.utils.ByteBufferUtil;
-public class CounterColumnType extends AbstractCommutativeType
+public class CounterColumnType extends AbstractType<Long>
{
public static final CounterColumnType instance = new CounterColumnType();
CounterColumnType() {} // singleton
+ public boolean isCounter()
+ {
+ return true;
+ }
+
+ @Override
+ public Long compose(ByteBuffer bytes)
+ {
+ return CounterContext.instance().total(bytes);
+ }
+
+ @Override
+ public ByteBuffer decompose(Long value)
+ {
+ return ByteBufferUtil.bytes(value);
+ }
+
public int compare(ByteBuffer o1, ByteBuffer o2)
{
if (o1 == null)
@@ -45,14 +61,6 @@ public class CounterColumnType extends AbstractCommutativeType
return ByteBufferUtil.bytesToHex(bytes);
}
- /**
- * create commutative column
- */
- public Cell createColumn(CellName name, ByteBuffer value, long timestamp)
- {
- return new CounterUpdateCell(name, value, timestamp);
- }
-
public ByteBuffer fromString(String source)
{
return ByteBufferUtil.hexToBytes(source);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8a0a3a6/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index d4bd4ff..cf8636b 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -139,10 +139,9 @@ public class StorageProxy implements StorageProxyMBean
Iterable<InetAddress> targets,
AbstractWriteResponseHandler responseHandler,
String localDataCenter,
- ConsistencyLevel consistency_level)
+ ConsistencyLevel consistencyLevel)
{
- Runnable runnable = counterWriteTask(mutation, targets, responseHandler, localDataCenter, consistency_level);
- runnable.run();
+ counterWriteTask(mutation, targets, responseHandler, localDataCenter).run();
}
};
@@ -152,10 +151,9 @@ public class StorageProxy implements StorageProxyMBean
Iterable<InetAddress> targets,
AbstractWriteResponseHandler responseHandler,
String localDataCenter,
- ConsistencyLevel consistency_level)
+ ConsistencyLevel consistencyLevel)
{
- Runnable runnable = counterWriteTask(mutation, targets, responseHandler, localDataCenter, consistency_level);
- StageManager.getStage(Stage.MUTATION).execute(runnable);
+ StageManager.getStage(Stage.MUTATION).execute(counterWriteTask(mutation, targets, responseHandler, localDataCenter));
}
};
}
@@ -1100,8 +1098,7 @@ public class StorageProxy implements StorageProxyMBean
private static Runnable counterWriteTask(final IMutation mutation,
final Iterable<InetAddress> targets,
final AbstractWriteResponseHandler responseHandler,
- final String localDataCenter,
- final ConsistencyLevel consistency_level)
+ final String localDataCenter)
{
return new DroppableRunnable(MessagingService.Verb.COUNTER_MUTATION)
{
@@ -1120,7 +1117,7 @@ public class StorageProxy implements StorageProxyMBean
// then send to replicas, if any
final Set<InetAddress> remotes = Sets.difference(ImmutableSet.copyOf(targets), ImmutableSet.of(FBUtilities.getBroadcastAddress()));
- if (cm.shouldReplicateOnWrite() && !remotes.isEmpty())
+ if (!remotes.isEmpty() && cm.shouldReplicateOnWrite())
{
// We do the replication on another stage because it involves a read (see CM.makeReplicationMutation)
// and we want to avoid blocking too much the MUTATION stage
@@ -2003,7 +2000,11 @@ public class StorageProxy implements StorageProxyMBean
public interface WritePerformer
{
- public void apply(IMutation mutation, Iterable<InetAddress> targets, AbstractWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws OverloadedException;
+ public void apply(IMutation mutation,
+ Iterable<InetAddress> targets,
+ AbstractWriteResponseHandler responseHandler,
+ String localDataCenter,
+ ConsistencyLevel consistencyLevel) throws OverloadedException;
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8a0a3a6/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 5859f92..2c2e821 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -286,7 +286,7 @@ public class CassandraServer implements Cassandra.Iface
if (cf.metadata().isSuper())
{
- boolean isCounterCF = cf.metadata().getDefaultValidator().isCommutative();
+ boolean isCounterCF = cf.metadata().isCounter();
return thriftifySuperColumns(cf.getSortedColumns(), reverseOrder, now, subcolumnsOnly, isCounterCF);
}
else
@@ -829,7 +829,7 @@ public class CassandraServer implements Cassandra.Iface
ThriftValidation.validateKey(metadata, key);
org.apache.cassandra.db.Mutation mutation;
- if (metadata.getDefaultValidator().isCommutative())
+ if (metadata.isCounter())
{
ThriftConversion.fromThrift(consistency_level).validateCounterForWrite(metadata);
counterMutation = counterMutation == null ? new org.apache.cassandra.db.Mutation(keyspace, key) : counterMutation;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8a0a3a6/src/java/org/apache/cassandra/thrift/ThriftValidation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftValidation.java b/src/java/org/apache/cassandra/thrift/ThriftValidation.java
index d491636..49cf39b 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftValidation.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftValidation.java
@@ -92,12 +92,12 @@ public class ThriftValidation
if (isCommutativeOp)
{
- if (!metadata.getDefaultValidator().isCommutative())
+ if (!metadata.isCounter())
throw new org.apache.cassandra.exceptions.InvalidRequestException("invalid operation for non commutative columnfamily " + cfName);
}
else
{
- if (metadata.getDefaultValidator().isCommutative())
+ if (metadata.isCounter())
throw new org.apache.cassandra.exceptions.InvalidRequestException("invalid operation for commutative columnfamily " + cfName);
}
return metadata;
@@ -297,7 +297,7 @@ public class ThriftValidation
public static void validateColumnOrSuperColumn(CFMetaData metadata, ColumnOrSuperColumn cosc)
throws org.apache.cassandra.exceptions.InvalidRequestException
{
- boolean isCommutative = metadata.getDefaultValidator().isCommutative();
+ boolean isCommutative = metadata.isCounter();
int nulls = 0;
if (cosc.column == null) nulls++;
@@ -405,7 +405,7 @@ public class ThriftValidation
throw new org.apache.cassandra.exceptions.InvalidRequestException(msg);
}
- if (metadata.getDefaultValidator().isCommutative())
+ if (metadata.isCounter())
{
// forcing server timestamp even if a timestamp was set for coherence with other counter operation
del.timestamp = System.currentTimeMillis();