You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2013/12/21 22:40:52 UTC
[3/4] Rename RowMutation->Mutation in preparation for Row->Partition
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbb13b9/src/java/org/apache/cassandra/db/RowMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowMutation.java b/src/java/org/apache/cassandra/db/RowMutation.java
deleted file mode 100644
index c2c1780..0000000
--- a/src/java/org/apache/cassandra/db/RowMutation.java
+++ /dev/null
@@ -1,335 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.*;
-
-import org.apache.commons.lang3.StringUtils;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.net.MessageOut;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-// TODO convert this to a Builder pattern instead of encouraging RM.add directly,
-// which is less-efficient since we have to keep a mutable HashMap around
-public class RowMutation implements IMutation
-{
- public static final RowMutationSerializer serializer = new RowMutationSerializer();
- public static final String FORWARD_TO = "FWD_TO";
- public static final String FORWARD_FROM = "FWD_FRM";
-
- // todo this is redundant
- // when we remove it, also restore SerializationsTest.testRowMutationRead to not regenerate new RowMutations each test
- private final String keyspaceName;
-
- private final ByteBuffer key;
- // map of column family id to mutations for that column family.
- private final Map<UUID, ColumnFamily> modifications;
-
- public RowMutation(String keyspaceName, ByteBuffer key)
- {
- this(keyspaceName, key, new HashMap<UUID, ColumnFamily>());
- }
-
- public RowMutation(String keyspaceName, ByteBuffer key, ColumnFamily cf)
- {
- this(keyspaceName, key, Collections.singletonMap(cf.id(), cf));
- }
-
- public RowMutation(String keyspaceName, Row row)
- {
- this(keyspaceName, row.key.key, row.cf);
- }
-
- protected RowMutation(String keyspaceName, ByteBuffer key, Map<UUID, ColumnFamily> modifications)
- {
- this.keyspaceName = keyspaceName;
- this.key = key;
- this.modifications = modifications;
- }
-
- public RowMutation(ByteBuffer key, ColumnFamily cf)
- {
- this(cf.metadata().ksName, key, cf);
- }
-
- public String getKeyspaceName()
- {
- return keyspaceName;
- }
-
- public Collection<UUID> getColumnFamilyIds()
- {
- return modifications.keySet();
- }
-
- public ByteBuffer key()
- {
- return key;
- }
-
- public Collection<ColumnFamily> getColumnFamilies()
- {
- return modifications.values();
- }
-
- public ColumnFamily getColumnFamily(UUID cfId)
- {
- return modifications.get(cfId);
- }
-
- /*
- * Specify a column family name and the corresponding column
- * family object.
- * param @ cf - column family name
- * param @ columnFamily - the column family.
- */
- public void add(ColumnFamily columnFamily)
- {
- assert columnFamily != null;
- ColumnFamily prev = modifications.put(columnFamily.id(), columnFamily);
- if (prev != null)
- // developer error
- throw new IllegalArgumentException("ColumnFamily " + columnFamily + " already has modifications in this mutation: " + prev);
- }
-
- /**
- * @return the ColumnFamily in this RowMutation corresponding to @param cfName, creating an empty one if necessary.
- */
- public ColumnFamily addOrGet(String cfName)
- {
- return addOrGet(Schema.instance.getCFMetaData(keyspaceName, cfName));
- }
-
- public ColumnFamily addOrGet(CFMetaData cfm)
- {
- ColumnFamily cf = modifications.get(cfm.cfId);
- if (cf == null)
- {
- cf = TreeMapBackedSortedColumns.factory.create(cfm);
- modifications.put(cfm.cfId, cf);
- }
- return cf;
- }
-
- public boolean isEmpty()
- {
- return modifications.isEmpty();
- }
-
- public void add(String cfName, CellName name, ByteBuffer value, long timestamp, int timeToLive)
- {
- addOrGet(cfName).addColumn(name, value, timestamp, timeToLive);
- }
-
- public void addCounter(String cfName, CellName name, long value)
- {
- addOrGet(cfName).addCounter(name, value);
- }
-
- public void add(String cfName, CellName name, ByteBuffer value, long timestamp)
- {
- add(cfName, name, value, timestamp, 0);
- }
-
- public void delete(String cfName, long timestamp)
- {
- int localDeleteTime = (int) (System.currentTimeMillis() / 1000);
- addOrGet(cfName).delete(new DeletionInfo(timestamp, localDeleteTime));
- }
-
- public void delete(String cfName, CellName name, long timestamp)
- {
- int localDeleteTime = (int) (System.currentTimeMillis() / 1000);
- addOrGet(cfName).addTombstone(name, localDeleteTime, timestamp);
- }
-
- public void deleteRange(String cfName, Composite start, Composite end, long timestamp)
- {
- int localDeleteTime = (int) (System.currentTimeMillis() / 1000);
- addOrGet(cfName).addAtom(new RangeTombstone(start, end, timestamp, localDeleteTime));
- }
-
- public void addAll(IMutation m)
- {
- if (!(m instanceof RowMutation))
- throw new IllegalArgumentException();
-
- RowMutation rm = (RowMutation)m;
- if (!keyspaceName.equals(rm.keyspaceName) || !key.equals(rm.key))
- throw new IllegalArgumentException();
-
- for (Map.Entry<UUID, ColumnFamily> entry : rm.modifications.entrySet())
- {
- // It's slighty faster to assume the key wasn't present and fix if
- // not in the case where it wasn't there indeed.
- ColumnFamily cf = modifications.put(entry.getKey(), entry.getValue());
- if (cf != null)
- entry.getValue().resolve(cf);
- }
- }
-
- /*
- * This is equivalent to calling commit. Applies the changes to
- * to the keyspace that is obtained by calling Keyspace.open().
- */
- public void apply()
- {
- Keyspace ks = Keyspace.open(keyspaceName);
- ks.apply(this, ks.metadata.durableWrites);
- }
-
- public void applyUnsafe()
- {
- Keyspace.open(keyspaceName).apply(this, false);
- }
-
- public MessageOut<RowMutation> createMessage()
- {
- return createMessage(MessagingService.Verb.MUTATION);
- }
-
- public MessageOut<RowMutation> createMessage(MessagingService.Verb verb)
- {
- return new MessageOut<>(verb, this, serializer);
- }
-
- public String toString()
- {
- return toString(false);
- }
-
- public String toString(boolean shallow)
- {
- StringBuilder buff = new StringBuilder("RowMutation(");
- buff.append("keyspace='").append(keyspaceName).append('\'');
- buff.append(", key='").append(ByteBufferUtil.bytesToHex(key)).append('\'');
- buff.append(", modifications=[");
- if (shallow)
- {
- List<String> cfnames = new ArrayList<String>(modifications.size());
- for (UUID cfid : modifications.keySet())
- {
- CFMetaData cfm = Schema.instance.getCFMetaData(cfid);
- cfnames.add(cfm == null ? "-dropped-" : cfm.cfName);
- }
- buff.append(StringUtils.join(cfnames, ", "));
- }
- else
- buff.append(StringUtils.join(modifications.values(), ", "));
- return buff.append("])").toString();
- }
-
- public RowMutation without(UUID cfId)
- {
- RowMutation rm = new RowMutation(keyspaceName, key);
- for (Map.Entry<UUID, ColumnFamily> entry : modifications.entrySet())
- if (!entry.getKey().equals(cfId))
- rm.add(entry.getValue());
- return rm;
- }
-
- public static class RowMutationSerializer implements IVersionedSerializer<RowMutation>
- {
- public void serialize(RowMutation rm, DataOutput out, int version) throws IOException
- {
- if (version < MessagingService.VERSION_20)
- out.writeUTF(rm.getKeyspaceName());
-
- ByteBufferUtil.writeWithShortLength(rm.key(), out);
-
- /* serialize the modifications in the mutation */
- int size = rm.modifications.size();
- out.writeInt(size);
- assert size > 0;
- for (Map.Entry<UUID, ColumnFamily> entry : rm.modifications.entrySet())
- ColumnFamily.serializer.serialize(entry.getValue(), out, version);
- }
-
- public RowMutation deserialize(DataInput in, int version, ColumnSerializer.Flag flag) throws IOException
- {
- String keyspaceName = null; // will always be set from cf.metadata but javac isn't smart enough to see that
- if (version < MessagingService.VERSION_20)
- keyspaceName = in.readUTF();
-
- ByteBuffer key = ByteBufferUtil.readWithShortLength(in);
- int size = in.readInt();
- assert size > 0;
-
- Map<UUID, ColumnFamily> modifications;
- if (size == 1)
- {
- ColumnFamily cf = deserializeOneCf(in, version, flag);
- modifications = Collections.singletonMap(cf.id(), cf);
- keyspaceName = cf.metadata().ksName;
- }
- else
- {
- modifications = new HashMap<UUID, ColumnFamily>();
- for (int i = 0; i < size; ++i)
- {
- ColumnFamily cf = deserializeOneCf(in, version, flag);
- modifications.put(cf.id(), cf);
- keyspaceName = cf.metadata().ksName;
- }
- }
-
- return new RowMutation(keyspaceName, key, modifications);
- }
-
- private ColumnFamily deserializeOneCf(DataInput in, int version, ColumnSerializer.Flag flag) throws IOException
- {
- ColumnFamily cf = ColumnFamily.serializer.deserialize(in, UnsortedColumns.factory, flag, version);
- // We don't allow RowMutation with null column family, so we should never get null back.
- assert cf != null;
- return cf;
- }
-
- public RowMutation deserialize(DataInput in, int version) throws IOException
- {
- return deserialize(in, version, ColumnSerializer.Flag.FROM_REMOTE);
- }
-
- public long serializedSize(RowMutation rm, int version)
- {
- TypeSizes sizes = TypeSizes.NATIVE;
- int size = 0;
-
- if (version < MessagingService.VERSION_20)
- size += sizes.sizeof(rm.getKeyspaceName());
-
- int keySize = rm.key().remaining();
- size += sizes.sizeof((short) keySize) + keySize;
-
- size += sizes.sizeof(rm.modifications.size());
- for (Map.Entry<UUID,ColumnFamily> entry : rm.modifications.entrySet())
- size += ColumnFamily.serializer.serializedSize(entry.getValue(), TypeSizes.NATIVE, version);
-
- return size;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbb13b9/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java b/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
deleted file mode 100644
index dcdfc2e..0000000
--- a/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.net.InetAddress;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.io.util.FastByteArrayInputStream;
-import org.apache.cassandra.net.*;
-import org.apache.cassandra.tracing.Tracing;
-
-public class RowMutationVerbHandler implements IVerbHandler<RowMutation>
-{
- private static final Logger logger = LoggerFactory.getLogger(RowMutationVerbHandler.class);
-
- public void doVerb(MessageIn<RowMutation> message, int id)
- {
- try
- {
- RowMutation rm = message.payload;
-
- // Check if there were any forwarding headers in this message
- byte[] from = message.parameters.get(RowMutation.FORWARD_FROM);
- InetAddress replyTo;
- if (from == null)
- {
- replyTo = message.from;
- byte[] forwardBytes = message.parameters.get(RowMutation.FORWARD_TO);
- if (forwardBytes != null)
- forwardToLocalNodes(rm, message.verb, forwardBytes, message.from);
- }
- else
- {
- replyTo = InetAddress.getByAddress(from);
- }
-
- rm.apply();
- WriteResponse response = new WriteResponse();
- Tracing.trace("Enqueuing response to {}", replyTo);
- MessagingService.instance().sendReply(response.createMessage(), id, replyTo);
- }
- catch (IOException e)
- {
- logger.error("Error in row mutation", e);
- }
- }
-
- /**
- * Older version (< 1.0) will not send this message at all, hence we don't
- * need to check the version of the data.
- */
- private void forwardToLocalNodes(RowMutation rm, MessagingService.Verb verb, byte[] forwardBytes, InetAddress from) throws IOException
- {
- DataInputStream in = new DataInputStream(new FastByteArrayInputStream(forwardBytes));
- int size = in.readInt();
-
- // tell the recipients who to send their ack to
- MessageOut<RowMutation> message = new MessageOut<RowMutation>(verb, rm, RowMutation.serializer).withParameter(RowMutation.FORWARD_FROM, from.getAddress());
- // Send a message to each of the addresses on our Forward List
- for (int i = 0; i < size; i++)
- {
- InetAddress address = CompactEndpointSerializationHelper.deserialize(in);
- int id = in.readInt();
- Tracing.trace("Enqueuing forwarded write to {}", address);
- MessagingService.instance().sendOneWay(message, id, address);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbb13b9/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 57ae146..34acf4e 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -29,12 +29,6 @@ import com.google.common.collect.HashMultimap;
import com.google.common.collect.Iterables;
import com.google.common.collect.SetMultimap;
import com.google.common.collect.Sets;
-
-import org.apache.cassandra.db.compaction.CompactionHistoryTabularData;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.composites.Composites;
-import org.apache.cassandra.metrics.RestorableMeter;
-import org.apache.cassandra.transport.Server;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,6 +41,9 @@ import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.compaction.CompactionHistoryTabularData;
+import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.composites.Composites;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.dht.Range;
@@ -55,10 +52,12 @@ import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.metrics.RestorableMeter;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.paxos.Commit;
import org.apache.cassandra.service.paxos.PaxosState;
import org.apache.cassandra.thrift.cassandraConstants;
+import org.apache.cassandra.transport.Server;
import org.apache.cassandra.utils.*;
import static org.apache.cassandra.cql3.QueryProcessor.processInternal;
@@ -597,15 +596,14 @@ public class SystemKeyspace
{
ColumnFamily cf = ArrayBackedSortedColumns.factory.create(Keyspace.SYSTEM_KS, INDEX_CF);
cf.addColumn(new Cell(cf.getComparator().makeCellName(indexName), ByteBufferUtil.EMPTY_BYTE_BUFFER, FBUtilities.timestampMicros()));
- RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, ByteBufferUtil.bytes(keyspaceName), cf);
- rm.apply();
+ new Mutation(Keyspace.SYSTEM_KS, ByteBufferUtil.bytes(keyspaceName), cf).apply();
}
public static void setIndexRemoved(String keyspaceName, String indexName)
{
- RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, ByteBufferUtil.bytes(keyspaceName));
- rm.delete(INDEX_CF, CFMetaData.IndexCf.comparator.makeCellName(indexName), FBUtilities.timestampMicros());
- rm.apply();
+ Mutation mutation = new Mutation(Keyspace.SYSTEM_KS, ByteBufferUtil.bytes(keyspaceName));
+ mutation.delete(INDEX_CF, CFMetaData.IndexCf.comparator.makeCellName(indexName), FBUtilities.timestampMicros());
+ mutation.apply();
}
/**
@@ -676,8 +674,7 @@ public class SystemKeyspace
ColumnFamily cf = ArrayBackedSortedColumns.factory.create(Keyspace.SYSTEM_KS, COUNTER_ID_CF);
cf.addColumn(new Cell(cf.getComparator().makeCellName(newCounterId.bytes()), ip, now));
- RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, ALL_LOCAL_NODE_ID_KEY, cf);
- rm.apply();
+ new Mutation(Keyspace.SYSTEM_KS, ALL_LOCAL_NODE_ID_KEY, cf).apply();
forceBlockingFlush(COUNTER_ID_CF);
}
@@ -736,9 +733,9 @@ public class SystemKeyspace
System.currentTimeMillis());
}
- public static Collection<RowMutation> serializeSchema()
+ public static Collection<Mutation> serializeSchema()
{
- Map<DecoratedKey, RowMutation> mutationMap = new HashMap<>();
+ Map<DecoratedKey, Mutation> mutationMap = new HashMap<>();
for (String cf : allSchemaCfs)
serializeSchema(mutationMap, cf);
@@ -746,17 +743,17 @@ public class SystemKeyspace
return mutationMap.values();
}
- private static void serializeSchema(Map<DecoratedKey, RowMutation> mutationMap, String schemaCfName)
+ private static void serializeSchema(Map<DecoratedKey, Mutation> mutationMap, String schemaCfName)
{
for (Row schemaRow : serializedSchema(schemaCfName))
{
if (Schema.ignoredSchemaRow(schemaRow))
continue;
- RowMutation mutation = mutationMap.get(schemaRow.key);
+ Mutation mutation = mutationMap.get(schemaRow.key);
if (mutation == null)
{
- mutation = new RowMutation(Keyspace.SYSTEM_KS, schemaRow.key.key);
+ mutation = new Mutation(Keyspace.SYSTEM_KS, schemaRow.key.key);
mutationMap.put(schemaRow.key, mutation);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbb13b9/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 7240aee..b6b32a0 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -21,7 +21,6 @@ import java.io.*;
import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
import java.util.*;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import javax.management.MBeanServer;
@@ -188,13 +187,13 @@ public class CommitLog implements CommitLogMBean
}
/**
- * Add a RowMutation to the commit log.
+ * Add a Mutation to the commit log.
*
- * @param rowMutation the RowMutation to add to the log
+ * @param mutation the Mutation to add to the log
*/
- public void add(RowMutation rowMutation)
+ public void add(Mutation mutation)
{
- long size = RowMutation.serializer.serializedSize(rowMutation, MessagingService.current_version);
+ long size = Mutation.serializer.serializedSize(mutation, MessagingService.current_version);
long totalSize = size + ENTRY_OVERHEAD_SIZE;
if (totalSize > MAX_MUTATION_SIZE)
@@ -203,7 +202,7 @@ public class CommitLog implements CommitLogMBean
return;
}
- Allocation alloc = allocator.allocate(rowMutation, (int) totalSize, new Allocation());
+ Allocation alloc = allocator.allocate(mutation, (int) totalSize, new Allocation());
try
{
PureJavaCrc32 checksum = new PureJavaCrc32();
@@ -215,7 +214,7 @@ public class CommitLog implements CommitLogMBean
buffer.putLong(checksum.getValue());
// checksummed mutation
- RowMutation.serializer.serialize(rowMutation, dos, MessagingService.current_version);
+ Mutation.serializer.serialize(mutation, dos, MessagingService.current_version);
buffer.putLong(checksum.getValue());
}
catch (IOException e)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbb13b9/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 5e8dc9b..00c8bb4 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -164,7 +164,7 @@ public class CommitLogReplayer
private abstract static class ReplayFilter
{
- public abstract Iterable<ColumnFamily> filter(RowMutation rm);
+ public abstract Iterable<ColumnFamily> filter(Mutation mutation);
public static ReplayFilter create()
{
@@ -193,9 +193,9 @@ public class CommitLogReplayer
private static class AlwaysReplayFilter extends ReplayFilter
{
- public Iterable<ColumnFamily> filter(RowMutation rm)
+ public Iterable<ColumnFamily> filter(Mutation mutation)
{
- return rm.getColumnFamilies();
+ return mutation.getColumnFamilies();
}
}
@@ -208,13 +208,13 @@ public class CommitLogReplayer
this.toReplay = toReplay;
}
- public Iterable<ColumnFamily> filter(RowMutation rm)
+ public Iterable<ColumnFamily> filter(Mutation mutation)
{
- final Collection<String> cfNames = toReplay.get(rm.getKeyspaceName());
+ final Collection<String> cfNames = toReplay.get(mutation.getKeyspaceName());
if (cfNames == null)
return Collections.emptySet();
- return Iterables.filter(rm.getColumnFamilies(), new Predicate<ColumnFamily>()
+ return Iterables.filter(mutation.getColumnFamilies(), new Predicate<ColumnFamily>()
{
public boolean apply(ColumnFamily cf)
{
@@ -264,7 +264,7 @@ public class CommitLogReplayer
reader.seek(offset);
- /* read the logs populate RowMutation and apply */
+ /* read the logs populate Mutation and apply */
while (reader.getPosition() < end && !reader.isEOF())
{
if (logger.isDebugEnabled())
@@ -282,7 +282,7 @@ public class CommitLogReplayer
break main;
}
- // RowMutation must be at LEAST 10 bytes:
+ // Mutation must be at LEAST 10 bytes:
// 3 each for a non-empty Keyspace and Key (including the
// 2-byte length from writeUTF/writeWithShortLength) and 4 bytes for column count.
// This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128
@@ -320,14 +320,14 @@ public class CommitLogReplayer
/* deserialize the commit log entry */
FastByteArrayInputStream bufIn = new FastByteArrayInputStream(buffer, 0, serializedSize);
- final RowMutation rm;
+ final Mutation mutation;
try
{
// assuming version here. We've gone to lengths to make sure what gets written to the CL is in
// the current version. so do make sure the CL is drained prior to upgrading a node.
- rm = RowMutation.serializer.deserialize(new DataInputStream(bufIn), version, ColumnSerializer.Flag.LOCAL);
+ mutation = Mutation.serializer.deserialize(new DataInputStream(bufIn), version, ColumnSerializer.Flag.LOCAL);
// doublecheck that what we read is [still] valid for the current schema
- for (ColumnFamily cf : rm.getColumnFamilies())
+ for (ColumnFamily cf : mutation.getColumnFamilies())
for (Cell cell : cf)
cf.getComparator().validate(cell.name());
}
@@ -364,27 +364,27 @@ public class CommitLogReplayer
}
if (logger.isDebugEnabled())
- logger.debug("replaying mutation for {}.{}: {}", rm.getKeyspaceName(), ByteBufferUtil.bytesToHex(rm.key()), "{" + StringUtils.join(rm.getColumnFamilies().iterator(), ", ") + "}");
+ logger.debug("replaying mutation for {}.{}: {}", mutation.getKeyspaceName(), ByteBufferUtil.bytesToHex(mutation.key()), "{" + StringUtils.join(mutation.getColumnFamilies().iterator(), ", ") + "}");
final long entryLocation = reader.getFilePointer();
Runnable runnable = new WrappedRunnable()
{
public void runMayThrow() throws IOException
{
- if (Schema.instance.getKSMetaData(rm.getKeyspaceName()) == null)
+ if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null)
return;
- if (pointInTimeExceeded(rm))
+ if (pointInTimeExceeded(mutation))
return;
- final Keyspace keyspace = Keyspace.open(rm.getKeyspaceName());
+ final Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
- // Rebuild the row mutation, omitting column families that
+ // Rebuild the mutation, omitting column families that
// a) the user has requested that we ignore,
// b) have already been flushed,
// or c) are part of a cf that was dropped.
// Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead.
- RowMutation newRm = null;
- for (ColumnFamily columnFamily : replayFilter.filter(rm))
+ Mutation newMutation = null;
+ for (ColumnFamily columnFamily : replayFilter.filter(mutation))
{
if (Schema.instance.getCF(columnFamily.id()) == null)
continue; // dropped
@@ -395,16 +395,16 @@ public class CommitLogReplayer
// if it is the last known segment, if we are after the replay position
if (segmentId > rp.segment || (segmentId == rp.segment && entryLocation > rp.position))
{
- if (newRm == null)
- newRm = new RowMutation(rm.getKeyspaceName(), rm.key());
- newRm.add(columnFamily);
+ if (newMutation == null)
+ newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key());
+ newMutation.add(columnFamily);
replayedCount.incrementAndGet();
}
}
- if (newRm != null)
+ if (newMutation != null)
{
- assert !newRm.isEmpty();
- Keyspace.open(newRm.getKeyspaceName()).apply(newRm, false);
+ assert !newMutation.isEmpty();
+ Keyspace.open(newMutation.getKeyspaceName()).apply(newMutation, false);
keyspacesRecovered.add(keyspace);
}
}
@@ -431,11 +431,11 @@ public class CommitLogReplayer
}
}
- protected boolean pointInTimeExceeded(RowMutation frm)
+ protected boolean pointInTimeExceeded(Mutation fm)
{
long restoreTarget = CommitLog.instance.archiver.restorePointInTime;
- for (ColumnFamily families : frm.getColumnFamilies())
+ for (ColumnFamily families : fm.getColumnFamilies())
{
if (CommitLog.instance.archiver.precision.toMillis(families.maxTimestamp()) > restoreTarget)
return true;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbb13b9/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index bc5c7d1..9c80965 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -46,14 +46,14 @@ import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.PureJavaCrc32;
import org.apache.cassandra.utils.WaitQueue;
/*
- * A single commit log file on disk. Manages creation of the file and writing row mutations to disk,
+ * A single commit log file on disk. Manages creation of the file and writing mutations to disk,
* as well as tracking the last mutation position of any "dirty" CFs covered by the segment file. Segment
* files are initially allocated to a fixed size and can grow to accomidate a larger value if necessary.
*/
@@ -166,11 +166,11 @@ public class CommitLogSegment
}
/**
- * allocate space in this buffer for the provided row mutation, and populate the provided
+ * allocate space in this buffer for the provided mutation, and populate the provided
* Allocation object, returning true on success. False indicates there is not enough room in
* this segment, and a new segment is needed
*/
- boolean allocate(RowMutation rowMutation, int size, Allocation alloc)
+ boolean allocate(Mutation mutation, int size, Allocation alloc)
{
final AppendLock appendLock = lockForAppend();
try
@@ -185,7 +185,7 @@ public class CommitLogSegment
alloc.position = position;
alloc.segment = this;
alloc.appendLock = appendLock;
- markDirty(rowMutation, position);
+ markDirty(mutation, position);
return true;
}
catch (Throwable t)
@@ -386,9 +386,9 @@ public class CommitLogSegment
}
}
- void markDirty(RowMutation rowMutation, int allocatedPosition)
+ void markDirty(Mutation mutation, int allocatedPosition)
{
- for (ColumnFamily columnFamily : rowMutation.getColumnFamilies())
+ for (ColumnFamily columnFamily : mutation.getColumnFamilies())
{
// check for deleted CFS
CFMetaData cfm = columnFamily.metadata();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbb13b9/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
index dd96f35..4275362 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
@@ -29,7 +29,6 @@ import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
@@ -44,7 +43,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
@@ -168,16 +167,16 @@ public class CommitLogSegmentManager
}
/**
- * Reserve space in the current segment for the provided row mutation or, if there isn't space available,
+ * Reserve space in the current segment for the provided mutation or, if there isn't space available,
* create a new segment.
*
* @return the provided Allocation object
*/
- public Allocation allocate(RowMutation rowMutation, int size, Allocation alloc)
+ public Allocation allocate(Mutation mutation, int size, Allocation alloc)
{
CommitLogSegment segment = allocatingFrom();
- while (!segment.allocate(rowMutation, size, alloc))
+ while (!segment.allocate(mutation, size, alloc))
{
// failed to allocate, so move to a new segment with enough room
advanceAllocatingFrom(segment);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbb13b9/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index b0e554d..894a29c 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -189,8 +189,8 @@ public final class MessagingService implements MessagingServiceMBean
put(Verb.REQUEST_RESPONSE, CallbackDeterminedSerializer.instance);
put(Verb.INTERNAL_RESPONSE, CallbackDeterminedSerializer.instance);
- put(Verb.MUTATION, RowMutation.serializer);
- put(Verb.READ_REPAIR, RowMutation.serializer);
+ put(Verb.MUTATION, Mutation.serializer);
+ put(Verb.READ_REPAIR, Mutation.serializer);
put(Verb.READ, ReadCommand.serializer);
put(Verb.RANGE_SLICE, RangeSliceCommand.serializer);
put(Verb.PAGED_RANGE, PagedRangeCommand.serializer);
@@ -334,8 +334,8 @@ public final class MessagingService implements MessagingServiceMBean
if (expiredCallbackInfo.shouldHint())
{
- RowMutation rm = (RowMutation) ((WriteCallbackInfo) expiredCallbackInfo).sentMessage.payload;
- return StorageProxy.submitHint(rm, expiredCallbackInfo.target, null);
+ Mutation mutation = (Mutation) ((WriteCallbackInfo) expiredCallbackInfo).sentMessage.payload;
+ return StorageProxy.submitHint(mutation, expiredCallbackInfo.target, null);
}
return null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbb13b9/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index e4740ae..e308613 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -299,21 +299,21 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
* actively announce a new version to active hosts via rpc
* @param schema The schema mutation to be applied
*/
- private static void announce(RowMutation schema)
+ private static void announce(Mutation schema)
{
FBUtilities.waitOnFuture(announce(Collections.singletonList(schema)));
}
- private static void pushSchemaMutation(InetAddress endpoint, Collection<RowMutation> schema)
+ private static void pushSchemaMutation(InetAddress endpoint, Collection<Mutation> schema)
{
- MessageOut<Collection<RowMutation>> msg = new MessageOut<>(MessagingService.Verb.DEFINITIONS_UPDATE,
- schema,
- MigrationsSerializer.instance);
+ MessageOut<Collection<Mutation>> msg = new MessageOut<>(MessagingService.Verb.DEFINITIONS_UPDATE,
+ schema,
+ MigrationsSerializer.instance);
MessagingService.instance().sendOneWay(msg, endpoint);
}
// Returns a future on the local application of the schema
- private static Future<?> announce(final Collection<RowMutation> schema)
+ private static Future<?> announce(final Collection<Mutation> schema)
{
Future<?> f = StageManager.getStage(Stage.MIGRATION).submit(new WrappedRunnable()
{
@@ -386,33 +386,33 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
logger.info("Local schema reset is complete.");
}
- public static class MigrationsSerializer implements IVersionedSerializer<Collection<RowMutation>>
+ public static class MigrationsSerializer implements IVersionedSerializer<Collection<Mutation>>
{
public static MigrationsSerializer instance = new MigrationsSerializer();
- public void serialize(Collection<RowMutation> schema, DataOutput out, int version) throws IOException
+ public void serialize(Collection<Mutation> schema, DataOutput out, int version) throws IOException
{
out.writeInt(schema.size());
- for (RowMutation rm : schema)
- RowMutation.serializer.serialize(rm, out, version);
+ for (Mutation mutation : schema)
+ Mutation.serializer.serialize(mutation, out, version);
}
- public Collection<RowMutation> deserialize(DataInput in, int version) throws IOException
+ public Collection<Mutation> deserialize(DataInput in, int version) throws IOException
{
int count = in.readInt();
- Collection<RowMutation> schema = new ArrayList<RowMutation>(count);
+ Collection<Mutation> schema = new ArrayList<Mutation>(count);
for (int i = 0; i < count; i++)
- schema.add(RowMutation.serializer.deserialize(in, version));
+ schema.add(Mutation.serializer.deserialize(in, version));
return schema;
}
- public long serializedSize(Collection<RowMutation> schema, int version)
+ public long serializedSize(Collection<Mutation> schema, int version)
{
int size = TypeSizes.NATIVE.sizeof(schema.size());
- for (RowMutation rm : schema)
- size += RowMutation.serializer.serializedSize(rm, version);
+ for (Mutation mutation : schema)
+ size += Mutation.serializer.serializedSize(mutation, version);
return size;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbb13b9/src/java/org/apache/cassandra/service/MigrationTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationTask.java b/src/java/org/apache/cassandra/service/MigrationTask.java
index 93572f0..9fdbff4 100644
--- a/src/java/org/apache/cassandra/service/MigrationTask.java
+++ b/src/java/org/apache/cassandra/service/MigrationTask.java
@@ -24,9 +24,9 @@ import java.util.Collection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.db.DefsTables;
-import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.net.IAsyncCallback;
import org.apache.cassandra.net.MessageIn;
@@ -56,10 +56,10 @@ class MigrationTask extends WrappedRunnable
return;
}
- IAsyncCallback<Collection<RowMutation>> cb = new IAsyncCallback<Collection<RowMutation>>()
+ IAsyncCallback<Collection<Mutation>> cb = new IAsyncCallback<Collection<Mutation>>()
{
@Override
- public void response(MessageIn<Collection<RowMutation>> message)
+ public void response(MessageIn<Collection<Mutation>> message)
{
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbb13b9/src/java/org/apache/cassandra/service/RowDataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RowDataResolver.java b/src/java/org/apache/cassandra/service/RowDataResolver.java
index f1948cd..5422d82 100644
--- a/src/java/org/apache/cassandra/service/RowDataResolver.java
+++ b/src/java/org/apache/cassandra/service/RowDataResolver.java
@@ -115,13 +115,12 @@ public class RowDataResolver extends AbstractRowResolver
if (diffCf == null) // no repair needs to happen
continue;
- // create and send the row mutation message based on the diff
- RowMutation rowMutation = new RowMutation(keyspaceName, key.key, diffCf);
- MessageOut repairMessage;
+ // create and send the mutation message based on the diff
+ Mutation mutation = new Mutation(keyspaceName, key.key, diffCf);
// use a separate verb here because we don't want these to be get the white glove hint-
// on-timeout behavior that a "real" mutation gets
- repairMessage = rowMutation.createMessage(MessagingService.Verb.READ_REPAIR);
- results.add(MessagingService.instance().sendRR(repairMessage, endpoints.get(i)));
+ results.add(MessagingService.instance().sendRR(mutation.createMessage(MessagingService.Verb.READ_REPAIR),
+ endpoints.get(i)));
}
return results;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbb13b9/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 49aa2be..4113029 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -122,8 +122,8 @@ public class StorageProxy implements StorageProxyMBean
ConsistencyLevel consistency_level)
throws OverloadedException
{
- assert mutation instanceof RowMutation;
- sendToHintedEndpoints((RowMutation) mutation, targets, responseHandler, localDataCenter);
+ assert mutation instanceof Mutation;
+ sendToHintedEndpoints((Mutation) mutation, targets, responseHandler, localDataCenter);
}
};
@@ -537,7 +537,7 @@ public class StorageProxy implements StorageProxyMBean
List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(mutation.getKeyspaceName(), tk);
Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, mutation.getKeyspaceName());
for (InetAddress target : Iterables.concat(naturalEndpoints, pendingEndpoints))
- submitHint((RowMutation) mutation, target, null);
+ submitHint((Mutation) mutation, target, null);
}
Tracing.trace("Wrote hint to satisfy CL.ANY after no replicas acknowledged the write");
}
@@ -571,10 +571,10 @@ public class StorageProxy implements StorageProxyMBean
public static void mutateWithTriggers(Collection<? extends IMutation> mutations, ConsistencyLevel consistencyLevel, boolean mutateAtomically) throws WriteTimeoutException, UnavailableException,
OverloadedException, InvalidRequestException
{
- Collection<RowMutation> tmutations = TriggerExecutor.instance.execute(mutations);
+ Collection<Mutation> tmutations = TriggerExecutor.instance.execute(mutations);
if (mutateAtomically || tmutations != null)
{
- Collection<RowMutation> allMutations = (Collection<RowMutation>) mutations;
+ Collection<Mutation> allMutations = (Collection<Mutation>) mutations;
if (tmutations != null)
allMutations.addAll(tmutations);
StorageProxy.mutateAtomically(allMutations, consistencyLevel);
@@ -591,10 +591,10 @@ public class StorageProxy implements StorageProxyMBean
* write the entire batch to a batchlog elsewhere in the cluster.
* After: remove the batchlog entry (after writing hints for the batch rows, if necessary).
*
- * @param mutations the RowMutations to be applied across the replicas
+ * @param mutations the Mutations to be applied across the replicas
* @param consistency_level the consistency level for the operation
*/
- public static void mutateAtomically(Collection<RowMutation> mutations, ConsistencyLevel consistency_level)
+ public static void mutateAtomically(Collection<Mutation> mutations, ConsistencyLevel consistency_level)
throws UnavailableException, OverloadedException, WriteTimeoutException
{
Tracing.trace("Determining replicas for atomic batch");
@@ -606,7 +606,7 @@ public class StorageProxy implements StorageProxyMBean
try
{
// add a handler for each mutation - includes checking availability, but doesn't initiate any writes, yet
- for (RowMutation mutation : mutations)
+ for (Mutation mutation : mutations)
{
WriteResponseHandlerWrapper wrapper = wrapResponseHandler(mutation, consistency_level, WriteType.BATCH);
// exit early if we can't fulfill the CL at this time.
@@ -645,17 +645,16 @@ public class StorageProxy implements StorageProxyMBean
}
}
- private static void syncWriteToBatchlog(Collection<RowMutation> mutations, Collection<InetAddress> endpoints, UUID uuid)
+ private static void syncWriteToBatchlog(Collection<Mutation> mutations, Collection<InetAddress> endpoints, UUID uuid)
throws WriteTimeoutException
{
- RowMutation rm = BatchlogManager.getBatchlogMutationFor(mutations, uuid);
AbstractWriteResponseHandler handler = new WriteResponseHandler(endpoints,
Collections.<InetAddress>emptyList(),
ConsistencyLevel.ONE,
Keyspace.open(Keyspace.SYSTEM_KS),
null,
WriteType.BATCH_LOG);
- updateBatchlog(rm, endpoints, handler);
+ updateBatchlog(BatchlogManager.getBatchlogMutationFor(mutations, uuid), endpoints, handler);
handler.get();
}
@@ -669,20 +668,19 @@ public class StorageProxy implements StorageProxyMBean
Keyspace.open(Keyspace.SYSTEM_KS),
null,
WriteType.SIMPLE);
- RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, UUIDType.instance.decompose(uuid), cf);
- updateBatchlog(rm, endpoints, handler);
+ updateBatchlog(new Mutation(Keyspace.SYSTEM_KS, UUIDType.instance.decompose(uuid), cf), endpoints, handler);
}
- private static void updateBatchlog(RowMutation rm, Collection<InetAddress> endpoints, AbstractWriteResponseHandler handler)
+ private static void updateBatchlog(Mutation mutation, Collection<InetAddress> endpoints, AbstractWriteResponseHandler handler)
{
if (endpoints.contains(FBUtilities.getBroadcastAddress()))
{
assert endpoints.size() == 1;
- insertLocal(rm, handler);
+ insertLocal(mutation, handler);
}
else
{
- MessageOut<RowMutation> message = rm.createMessage();
+ MessageOut<Mutation> message = mutation.createMessage();
for (InetAddress target : endpoints)
MessagingService.instance().sendRR(message, target, handler);
}
@@ -744,7 +742,7 @@ public class StorageProxy implements StorageProxyMBean
}
// same as above except does not initiate writes (but does perform availability checks).
- private static WriteResponseHandlerWrapper wrapResponseHandler(RowMutation mutation, ConsistencyLevel consistency_level, WriteType writeType)
+ private static WriteResponseHandlerWrapper wrapResponseHandler(Mutation mutation, ConsistencyLevel consistency_level, WriteType writeType)
{
AbstractReplicationStrategy rs = Keyspace.open(mutation.getKeyspaceName()).getReplicationStrategy();
String keyspaceName = mutation.getKeyspaceName();
@@ -759,9 +757,9 @@ public class StorageProxy implements StorageProxyMBean
private static class WriteResponseHandlerWrapper
{
final AbstractWriteResponseHandler handler;
- final RowMutation mutation;
+ final Mutation mutation;
- WriteResponseHandlerWrapper(AbstractWriteResponseHandler handler, RowMutation mutation)
+ WriteResponseHandlerWrapper(AbstractWriteResponseHandler handler, Mutation mutation)
{
this.handler = handler;
this.mutation = mutation;
@@ -823,7 +821,7 @@ public class StorageProxy implements StorageProxyMBean
*
* @throws OverloadedException if the hints cannot be written/enqueued
*/
- public static void sendToHintedEndpoints(final RowMutation rm,
+ public static void sendToHintedEndpoints(final Mutation mutation,
Iterable<InetAddress> targets,
AbstractWriteResponseHandler responseHandler,
String localDataCenter)
@@ -832,7 +830,7 @@ public class StorageProxy implements StorageProxyMBean
// extra-datacenter replicas, grouped by dc
Map<String, Collection<InetAddress>> dcGroups = null;
// only need to create a Message for non-local writes
- MessageOut<RowMutation> message = null;
+ MessageOut<Mutation> message = null;
for (InetAddress destination : targets)
{
@@ -851,13 +849,13 @@ public class StorageProxy implements StorageProxyMBean
{
if (destination.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
{
- insertLocal(rm, responseHandler);
+ insertLocal(mutation, responseHandler);
}
else
{
// belongs on a different server
if (message == null)
- message = rm.createMessage();
+ message = mutation.createMessage();
String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination);
// direct writes to local DC or old Cassandra versions
// (1.1 knows how to forward old-style String message IDs; updated to int in 2.0)
@@ -885,7 +883,7 @@ public class StorageProxy implements StorageProxyMBean
continue;
// Schedule a local hint
- submitHint(rm, destination, responseHandler);
+ submitHint(mutation, destination, responseHandler);
}
}
@@ -893,7 +891,7 @@ public class StorageProxy implements StorageProxyMBean
{
// for each datacenter, send the message to one node to relay the write to other replicas
if (message == null)
- message = rm.createMessage();
+ message = mutation.createMessage();
for (Collection<InetAddress> dcTargets : dcGroups.values())
sendMessagesToNonlocalDC(message, dcTargets, responseHandler);
@@ -912,7 +910,7 @@ public class StorageProxy implements StorageProxyMBean
}
}
- public static Future<Void> submitHint(final RowMutation mutation,
+ public static Future<Void> submitHint(final Mutation mutation,
final InetAddress target,
final AbstractWriteResponseHandler responseHandler)
{
@@ -949,7 +947,7 @@ public class StorageProxy implements StorageProxyMBean
return (Future<Void>) StageManager.getStage(Stage.MUTATION).submit(runnable);
}
- public static void writeHintForMutation(RowMutation mutation, int ttl, InetAddress target)
+ public static void writeHintForMutation(Mutation mutation, int ttl, InetAddress target)
{
assert ttl > 0;
UUID hostId = StorageService.instance.getTokenMetadata().getHostId(target);
@@ -976,7 +974,7 @@ public class StorageProxy implements StorageProxyMBean
out.writeInt(id);
logger.trace("Adding FWD message to {}@{}", id, destination);
}
- message = message.withParameter(RowMutation.FORWARD_TO, out.getData());
+ message = message.withParameter(Mutation.FORWARD_TO, out.getData());
// send the combined message + forward headers
int id = MessagingService.instance().sendRR(message, target, handler);
logger.trace("Sending message to {}@{}", id, target);
@@ -988,13 +986,13 @@ public class StorageProxy implements StorageProxyMBean
}
}
- private static void insertLocal(final RowMutation rm, final AbstractWriteResponseHandler responseHandler)
+ private static void insertLocal(final Mutation mutation, final AbstractWriteResponseHandler responseHandler)
{
StageManager.getStage(Stage.MUTATION).execute(new LocalMutationRunnable()
{
public void runMayThrow()
{
- IMutation processed = SinkManager.processWriteRequest(rm);
+ IMutation processed = SinkManager.processWriteRequest(mutation);
if (processed != null)
{
processed.apply();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbb13b9/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index d088b42..ab095e6 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -222,7 +222,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
/* register the verb handlers */
- MessagingService.instance().registerVerbHandlers(MessagingService.Verb.MUTATION, new RowMutationVerbHandler());
+ MessagingService.instance().registerVerbHandlers(MessagingService.Verb.MUTATION, new MutationVerbHandler());
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.READ_REPAIR, new ReadRepairVerbHandler());
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.READ, new ReadVerbHandler());
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.RANGE_SLICE, new RangeSliceVerbHandler());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbb13b9/src/java/org/apache/cassandra/service/paxos/Commit.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/Commit.java b/src/java/org/apache/cassandra/service/paxos/Commit.java
index 1f95d04..23f35db 100644
--- a/src/java/org/apache/cassandra/service/paxos/Commit.java
+++ b/src/java/org/apache/cassandra/service/paxos/Commit.java
@@ -30,12 +30,7 @@ import java.nio.ByteBuffer;
import com.google.common.base.Objects;
import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.ColumnSerializer;
-import org.apache.cassandra.db.EmptyColumns;
-import org.apache.cassandra.db.RowMutation;
-import org.apache.cassandra.db.UnsortedColumns;
+import org.apache.cassandra.db.*;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.UUIDGen;
@@ -85,10 +80,10 @@ public class Commit
return this.ballot.equals(ballot);
}
- public RowMutation makeMutation()
+ public Mutation makeMutation()
{
assert update != null;
- return new RowMutation(key, update);
+ return new Mutation(key, update);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbb13b9/src/java/org/apache/cassandra/service/paxos/PaxosState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosState.java b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
index ff0b02c..f893acf 100644
--- a/src/java/org/apache/cassandra/service/paxos/PaxosState.java
+++ b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
@@ -27,7 +27,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.tracing.Tracing;
@@ -114,8 +114,8 @@ public class PaxosState
// if our current in-progress ballot is strictly greater than the proposal one, we shouldn't
// erase the in-progress update.
Tracing.trace("Committing proposal {}", proposal);
- RowMutation rm = proposal.makeMutation();
- Keyspace.open(rm.getKeyspaceName()).apply(rm, true);
+ Mutation mutation = proposal.makeMutation();
+ Keyspace.open(mutation.getKeyspaceName()).apply(mutation, true);
// We don't need to lock, we're just blindly updating
SystemKeyspace.savePaxosCommit(proposal);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbb13b9/src/java/org/apache/cassandra/sink/IRequestSink.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/sink/IRequestSink.java b/src/java/org/apache/cassandra/sink/IRequestSink.java
index 8d68ce8..2873e46 100644
--- a/src/java/org/apache/cassandra/sink/IRequestSink.java
+++ b/src/java/org/apache/cassandra/sink/IRequestSink.java
@@ -22,9 +22,9 @@ import org.apache.cassandra.db.IMutation;
public interface IRequestSink
{
/**
- * Transform or drop a write request (represented by a RowMutation).
+ * Transform or drop a write request (represented by a Mutation).
*
- * @param mutation the RowMutation to be applied locally.
+ * @param mutation the Mutation to be applied locally.
* @return null if the mutation is to be dropped, or the transformed mutation to apply, which may be just
* the original mutation.
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbb13b9/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 0e512bd..e91bea2 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -659,7 +659,7 @@ public class CassandraServer implements Cassandra.Iface
ThriftValidation.validateColumnNames(metadata, column_parent, Arrays.asList(column.name));
ThriftValidation.validateColumnData(metadata, column_parent.super_column, column);
- RowMutation rm;
+ org.apache.cassandra.db.Mutation mutation;
try
{
CellName name = metadata.isSuper()
@@ -668,13 +668,13 @@ public class CassandraServer implements Cassandra.Iface
ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cState.getKeyspace(), column_parent.column_family);
cf.addColumn(name, column.value, column.timestamp, column.ttl);
- rm = new RowMutation(cState.getKeyspace(), key, cf);
+ mutation = new org.apache.cassandra.db.Mutation(cState.getKeyspace(), key, cf);
}
catch (MarshalException e)
{
throw new org.apache.cassandra.exceptions.InvalidRequestException(e.getMessage());
}
- doInsert(consistency_level, Arrays.asList(rm));
+ doInsert(consistency_level, Arrays.asList(mutation));
}
public void insert(ByteBuffer key, ColumnParent column_parent, Column column, ConsistencyLevel consistency_level)
@@ -805,7 +805,7 @@ public class CassandraServer implements Cassandra.Iface
boolean allowCounterMutations)
throws RequestValidationException
{
- List<IMutation> rowMutations = new ArrayList<IMutation>();
+ List<IMutation> mutations = new ArrayList<>();
ThriftClientState cState = state();
String keyspace = cState.getKeyspace();
@@ -813,10 +813,10 @@ public class CassandraServer implements Cassandra.Iface
{
ByteBuffer key = mutationEntry.getKey();
- // We need to separate row mutation for standard cf and counter cf (that will be encapsulated in a
+ // We need to separate mutation for standard cf and counter cf (that will be encapsulated in a
// CounterMutation) because it doesn't follow the same code path
- RowMutation rmStandard = null;
- RowMutation rmCounter = null;
+ org.apache.cassandra.db.Mutation standardMutation = null;
+ org.apache.cassandra.db.Mutation counterMutation = null;
Map<String, List<Mutation>> columnFamilyToMutations = mutationEntry.getValue();
for (Map.Entry<String, List<Mutation>> columnFamilyMutations : columnFamilyToMutations.entrySet())
@@ -828,112 +828,112 @@ public class CassandraServer implements Cassandra.Iface
CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, cfName);
ThriftValidation.validateKey(metadata, key);
- RowMutation rm;
+ org.apache.cassandra.db.Mutation mutation;
if (metadata.getDefaultValidator().isCommutative())
{
ThriftConversion.fromThrift(consistency_level).validateCounterForWrite(metadata);
- rmCounter = rmCounter == null ? new RowMutation(keyspace, key) : rmCounter;
- rm = rmCounter;
+ counterMutation = counterMutation == null ? new org.apache.cassandra.db.Mutation(keyspace, key) : counterMutation;
+ mutation = counterMutation;
}
else
{
- rmStandard = rmStandard == null ? new RowMutation(keyspace, key) : rmStandard;
- rm = rmStandard;
+ standardMutation = standardMutation == null ? new org.apache.cassandra.db.Mutation(keyspace, key) : standardMutation;
+ mutation = standardMutation;
}
- for (Mutation mutation : columnFamilyMutations.getValue())
+ for (Mutation m : columnFamilyMutations.getValue())
{
- ThriftValidation.validateMutation(metadata, mutation);
+ ThriftValidation.validateMutation(metadata, m);
- if (mutation.deletion != null)
+ if (m.deletion != null)
{
- deleteColumnOrSuperColumn(rm, metadata, mutation.deletion);
+ deleteColumnOrSuperColumn(mutation, metadata, m.deletion);
}
- if (mutation.column_or_supercolumn != null)
+ if (m.column_or_supercolumn != null)
{
- addColumnOrSuperColumn(rm, metadata, mutation.column_or_supercolumn);
+ addColumnOrSuperColumn(mutation, metadata, m.column_or_supercolumn);
}
}
}
- if (rmStandard != null && !rmStandard.isEmpty())
- rowMutations.add(rmStandard);
+ if (standardMutation != null && !standardMutation.isEmpty())
+ mutations.add(standardMutation);
- if (rmCounter != null && !rmCounter.isEmpty())
+ if (counterMutation != null && !counterMutation.isEmpty())
{
if (allowCounterMutations)
- rowMutations.add(new CounterMutation(rmCounter, ThriftConversion.fromThrift(consistency_level)));
+ mutations.add(new CounterMutation(counterMutation, ThriftConversion.fromThrift(consistency_level)));
else
throw new org.apache.cassandra.exceptions.InvalidRequestException("Counter mutations are not allowed in atomic batches");
}
}
- return rowMutations;
+ return mutations;
}
- private void addColumnOrSuperColumn(RowMutation rm, CFMetaData cfm, ColumnOrSuperColumn cosc)
+ private void addColumnOrSuperColumn(org.apache.cassandra.db.Mutation mutation, CFMetaData cfm, ColumnOrSuperColumn cosc)
{
if (cosc.super_column != null)
{
for (Column column : cosc.super_column.columns)
{
- rm.add(cfm.cfName, cfm.comparator.makeCellName(cosc.super_column.name, column.name), column.value, column.timestamp, column.ttl);
+ mutation.add(cfm.cfName, cfm.comparator.makeCellName(cosc.super_column.name, column.name), column.value, column.timestamp, column.ttl);
}
}
else if (cosc.column != null)
{
- rm.add(cfm.cfName, cfm.comparator.cellFromByteBuffer(cosc.column.name), cosc.column.value, cosc.column.timestamp, cosc.column.ttl);
+ mutation.add(cfm.cfName, cfm.comparator.cellFromByteBuffer(cosc.column.name), cosc.column.value, cosc.column.timestamp, cosc.column.ttl);
}
else if (cosc.counter_super_column != null)
{
for (CounterColumn column : cosc.counter_super_column.columns)
{
- rm.addCounter(cfm.cfName, cfm.comparator.makeCellName(cosc.counter_super_column.name, column.name), column.value);
+ mutation.addCounter(cfm.cfName, cfm.comparator.makeCellName(cosc.counter_super_column.name, column.name), column.value);
}
}
else // cosc.counter_column != null
{
- rm.addCounter(cfm.cfName, cfm.comparator.cellFromByteBuffer(cosc.counter_column.name), cosc.counter_column.value);
+ mutation.addCounter(cfm.cfName, cfm.comparator.cellFromByteBuffer(cosc.counter_column.name), cosc.counter_column.value);
}
}
- private void deleteColumnOrSuperColumn(RowMutation rm, CFMetaData cfm, Deletion del)
+ private void deleteColumnOrSuperColumn(org.apache.cassandra.db.Mutation mutation, CFMetaData cfm, Deletion del)
{
if (del.predicate != null && del.predicate.column_names != null)
{
for (ByteBuffer c : del.predicate.column_names)
{
if (del.super_column == null && cfm.isSuper())
- rm.deleteRange(cfm.cfName, SuperColumns.startOf(c), SuperColumns.endOf(c), del.timestamp);
+ mutation.deleteRange(cfm.cfName, SuperColumns.startOf(c), SuperColumns.endOf(c), del.timestamp);
else if (del.super_column != null)
- rm.delete(cfm.cfName, cfm.comparator.makeCellName(del.super_column, c), del.timestamp);
+ mutation.delete(cfm.cfName, cfm.comparator.makeCellName(del.super_column, c), del.timestamp);
else
- rm.delete(cfm.cfName, cfm.comparator.cellFromByteBuffer(c), del.timestamp);
+ mutation.delete(cfm.cfName, cfm.comparator.cellFromByteBuffer(c), del.timestamp);
}
}
else if (del.predicate != null && del.predicate.slice_range != null)
{
if (del.super_column == null && cfm.isSuper())
- rm.deleteRange(cfm.cfName,
- SuperColumns.startOf(del.predicate.getSlice_range().start),
- SuperColumns.startOf(del.predicate.getSlice_range().finish),
- del.timestamp);
+ mutation.deleteRange(cfm.cfName,
+ SuperColumns.startOf(del.predicate.getSlice_range().start),
+ SuperColumns.startOf(del.predicate.getSlice_range().finish),
+ del.timestamp);
else if (del.super_column != null)
- rm.deleteRange(cfm.cfName,
- cfm.comparator.makeCellName(del.super_column, del.predicate.getSlice_range().start),
- cfm.comparator.makeCellName(del.super_column, del.predicate.getSlice_range().finish),
- del.timestamp);
+ mutation.deleteRange(cfm.cfName,
+ cfm.comparator.makeCellName(del.super_column, del.predicate.getSlice_range().start),
+ cfm.comparator.makeCellName(del.super_column, del.predicate.getSlice_range().finish),
+ del.timestamp);
else
- rm.deleteRange(cfm.cfName,
- cfm.comparator.cellFromByteBuffer(del.predicate.getSlice_range().start),
- cfm.comparator.cellFromByteBuffer(del.predicate.getSlice_range().finish),
- del.timestamp);
+ mutation.deleteRange(cfm.cfName,
+ cfm.comparator.cellFromByteBuffer(del.predicate.getSlice_range().start),
+ cfm.comparator.cellFromByteBuffer(del.predicate.getSlice_range().finish),
+ del.timestamp);
}
else
{
if (del.super_column != null)
- rm.deleteRange(cfm.cfName, SuperColumns.startOf(del.super_column), SuperColumns.endOf(del.super_column), del.timestamp);
+ mutation.deleteRange(cfm.cfName, SuperColumns.startOf(del.super_column), SuperColumns.endOf(del.super_column), del.timestamp);
else
- rm.delete(cfm.cfName, del.timestamp);
+ mutation.delete(cfm.cfName, del.timestamp);
}
}
@@ -1016,20 +1016,20 @@ public class CassandraServer implements Cassandra.Iface
if (isCommutativeOp)
ThriftConversion.fromThrift(consistency_level).validateCounterForWrite(metadata);
- RowMutation rm = new RowMutation(keyspace, key);
+ org.apache.cassandra.db.Mutation mutation = new org.apache.cassandra.db.Mutation(keyspace, key);
if (column_path.super_column == null && column_path.column == null)
- rm.delete(column_path.column_family, timestamp);
+ mutation.delete(column_path.column_family, timestamp);
else if (column_path.super_column == null)
- rm.delete(column_path.column_family, metadata.comparator.cellFromByteBuffer(column_path.column), timestamp);
+ mutation.delete(column_path.column_family, metadata.comparator.cellFromByteBuffer(column_path.column), timestamp);
else if (column_path.column == null)
- rm.deleteRange(column_path.column_family, SuperColumns.startOf(column_path.super_column), SuperColumns.endOf(column_path.super_column), timestamp);
+ mutation.deleteRange(column_path.column_family, SuperColumns.startOf(column_path.super_column), SuperColumns.endOf(column_path.super_column), timestamp);
else
- rm.delete(column_path.column_family, metadata.comparator.makeCellName(column_path.super_column, column_path.column), timestamp);
+ mutation.delete(column_path.column_family, metadata.comparator.makeCellName(column_path.super_column, column_path.column), timestamp);
if (isCommutativeOp)
- doInsert(consistency_level, Arrays.asList(new CounterMutation(rm, ThriftConversion.fromThrift(consistency_level))));
+ doInsert(consistency_level, Arrays.asList(new CounterMutation(mutation, ThriftConversion.fromThrift(consistency_level))));
else
- doInsert(consistency_level, Arrays.asList(rm));
+ doInsert(consistency_level, Arrays.asList(mutation));
}
public void remove(ByteBuffer key, ColumnPath column_path, long timestamp, ConsistencyLevel consistency_level)
@@ -1777,19 +1777,19 @@ public class CassandraServer implements Cassandra.Iface
ThriftValidation.validateColumnNames(metadata, column_parent, Arrays.asList(column.name));
- RowMutation rm = new RowMutation(keyspace, key);
+ org.apache.cassandra.db.Mutation mutation = new org.apache.cassandra.db.Mutation(keyspace, key);
try
{
if (metadata.isSuper())
- rm.addCounter(column_parent.column_family, metadata.comparator.makeCellName(column_parent.super_column, column.name), column.value);
+ mutation.addCounter(column_parent.column_family, metadata.comparator.makeCellName(column_parent.super_column, column.name), column.value);
else
- rm.addCounter(column_parent.column_family, metadata.comparator.cellFromByteBuffer(column.name), column.value);
+ mutation.addCounter(column_parent.column_family, metadata.comparator.cellFromByteBuffer(column.name), column.value);
}
catch (MarshalException e)
{
throw new InvalidRequestException(e.getMessage());
}
- doInsert(consistency_level, Arrays.asList(new CounterMutation(rm, ThriftConversion.fromThrift(consistency_level))));
+ doInsert(consistency_level, Arrays.asList(new CounterMutation(mutation, ThriftConversion.fromThrift(consistency_level))));
}
catch (RequestValidationException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbb13b9/src/java/org/apache/cassandra/tracing/TraceState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/TraceState.java b/src/java/org/apache/cassandra/tracing/TraceState.java
index 5fec633..7f60433 100644
--- a/src/java/org/apache/cassandra/tracing/TraceState.java
+++ b/src/java/org/apache/cassandra/tracing/TraceState.java
@@ -29,7 +29,7 @@ import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.TreeMapBackedSortedColumns;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
@@ -101,7 +101,7 @@ public class TraceState
if (elapsed >= 0)
Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("source_elapsed")), elapsed);
Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("thread")), threadName);
- Tracing.mutateWithCatch(new RowMutation(Tracing.TRACE_KS, sessionIdBytes, cf));
+ Tracing.mutateWithCatch(new Mutation(Tracing.TRACE_KS, sessionIdBytes, cf));
}
});
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbb13b9/src/java/org/apache/cassandra/tracing/Tracing.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/Tracing.java b/src/java/org/apache/cassandra/tracing/Tracing.java
index 42ce12b..f22f273 100644
--- a/src/java/org/apache/cassandra/tracing/Tracing.java
+++ b/src/java/org/apache/cassandra/tracing/Tracing.java
@@ -167,7 +167,7 @@ public class Tracing
CFMetaData cfMeta = CFMetaData.TraceSessionsCf;
ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cfMeta);
addColumn(cf, buildName(cfMeta, "duration"), elapsed);
- mutateWithCatch(new RowMutation(TRACE_KS, sessionIdBytes, cf));
+ mutateWithCatch(new Mutation(TRACE_KS, sessionIdBytes, cf));
}
});
@@ -208,7 +208,7 @@ public class Tracing
addParameterColumns(cf, parameters);
addColumn(cf, buildName(cfMeta, "request"), request);
addColumn(cf, buildName(cfMeta, "started_at"), started_at);
- mutateWithCatch(new RowMutation(TRACE_KS, sessionIdBytes, cf));
+ mutateWithCatch(new Mutation(TRACE_KS, sessionIdBytes, cf));
}
});
}
@@ -280,7 +280,7 @@ public class Tracing
state.trace(format, args);
}
- static void mutateWithCatch(RowMutation mutation)
+ static void mutateWithCatch(Mutation mutation)
{
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbb13b9/src/java/org/apache/cassandra/triggers/ITrigger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/triggers/ITrigger.java b/src/java/org/apache/cassandra/triggers/ITrigger.java
index 15ed7ba..208acea 100644
--- a/src/java/org/apache/cassandra/triggers/ITrigger.java
+++ b/src/java/org/apache/cassandra/triggers/ITrigger.java
@@ -25,7 +25,7 @@ import java.nio.ByteBuffer;
import java.util.Collection;
import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.Mutation;
/**
* Trigger interface, For every Mutation received by the coordinator {@link #augment(ByteBuffer, ColumnFamily)}
@@ -44,9 +44,9 @@ public interface ITrigger
/**
* Called exactly once per CF update, returned mutations are atomically updated.
*
- * @param key - Row Key for the update.
- * @param update - Update received for the CF
+ * @param key - parition Key for the update.
+ * @param update - update received for the CF
* @return modifications to be applied, null if no action to be performed.
*/
- public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update);
+ public Collection<Mutation> augment(ByteBuffer partitionKey, ColumnFamily update);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbb13b9/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/triggers/TriggerExecutor.java b/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
index c294128..91d0ea0 100644
--- a/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
+++ b/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
@@ -29,11 +29,8 @@ import com.google.common.collect.Maps;
import org.apache.cassandra.config.TriggerDefinition;
import org.apache.cassandra.cql.QueryProcessor;
+import org.apache.cassandra.db.*;
import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.CounterMutation;
-import org.apache.cassandra.db.IMutation;
-import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.utils.FBUtilities;
@@ -63,15 +60,15 @@ public class TriggerExecutor
cachedTriggers.clear();
}
- public Collection<RowMutation> execute(Collection<? extends IMutation> updates) throws InvalidRequestException
+ public Collection<Mutation> execute(Collection<? extends IMutation> updates) throws InvalidRequestException
{
boolean hasCounters = false;
- Collection<RowMutation> tmutations = null;
+ Collection<Mutation> tmutations = null;
for (IMutation mutation : updates)
{
for (ColumnFamily cf : mutation.getColumnFamilies())
{
- List<RowMutation> intermediate = execute(mutation.key(), cf);
+ List<Mutation> intermediate = execute(mutation.key(), cf);
if (intermediate == null)
continue;
@@ -89,9 +86,9 @@ public class TriggerExecutor
return tmutations;
}
- private void validate(Collection<RowMutation> tmutations) throws InvalidRequestException
+ private void validate(Collection<Mutation> tmutations) throws InvalidRequestException
{
- for (RowMutation mutation : tmutations)
+ for (Mutation mutation : tmutations)
{
QueryProcessor.validateKey(mutation.key());
for (ColumnFamily tcf : mutation.getColumnFamilies())
@@ -104,12 +101,12 @@ public class TriggerExecutor
* Switch class loader before using the triggers for the column family, if
* not loaded them with the custom class loader.
*/
- private List<RowMutation> execute(ByteBuffer key, ColumnFamily columnFamily)
+ private List<Mutation> execute(ByteBuffer key, ColumnFamily columnFamily)
{
Map<String,TriggerDefinition> triggers = columnFamily.metadata().getTriggers();
if (triggers.isEmpty())
return null;
- List<RowMutation> tmutations = Lists.newLinkedList();
+ List<Mutation> tmutations = Lists.newLinkedList();
Thread.currentThread().setContextClassLoader(customClassLoader);
try
{
@@ -121,7 +118,7 @@ public class TriggerExecutor
trigger = loadTriggerInstance(td.classOption);
cachedTriggers.put(td.classOption, trigger);
}
- Collection<RowMutation> temp = trigger.augment(key, columnFamily);
+ Collection<Mutation> temp = trigger.augment(key, columnFamily);
if (temp != null)
tmutations.addAll(temp);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbb13b9/src/resources/org/apache/cassandra/cli/CliHelp.yaml
----------------------------------------------------------------------
diff --git a/src/resources/org/apache/cassandra/cli/CliHelp.yaml b/src/resources/org/apache/cassandra/cli/CliHelp.yaml
index 3d4bd99..0202d19 100644
--- a/src/resources/org/apache/cassandra/cli/CliHelp.yaml
+++ b/src/resources/org/apache/cassandra/cli/CliHelp.yaml
@@ -250,7 +250,7 @@ commands:
Options have the form {key:value}, see the information on each
strategy and the examples.
- - durable_writes: When set to false all RowMutations on keyspace will by-pass CommitLog.
+ - durable_writes: When set to false all Mutations on keyspace will by-pass CommitLog.
Set to true by default.
Examples:
@@ -318,7 +318,7 @@ commands:
Options have the form {key:value}, see the information on each
strategy and the examples.
- - durable_writes: When set to false all RowMutations on keyspace will by-pass CommitLog.
+ - durable_writes: When set to false all Mutations on keyspace will by-pass CommitLog.
Set to true by default.
Examples: