You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/06/30 12:48:04 UTC
[40/51] [partial] cassandra git commit: Storage engine refactor,
a.k.a CASSANDRA-8099
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index df8820b..848ba01 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -30,7 +30,6 @@ import javax.management.MBeanServer;
import javax.management.ObjectName;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.RateLimiter;
import com.google.common.util.concurrent.Uninterruptibles;
@@ -39,17 +38,15 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.JMXEnabledScheduledThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.composites.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.UUIDType;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.exceptions.WriteFailureException;
import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.FailureDetector;
@@ -61,6 +58,7 @@ import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.*;
import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.concurrent.OpOrder;
import org.cliffc.high_scale_lib.NonBlockingHashSet;
/**
@@ -91,7 +89,8 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
public static final HintedHandOffManager instance = new HintedHandOffManager();
private static final Logger logger = LoggerFactory.getLogger(HintedHandOffManager.class);
- private static final int PAGE_SIZE = 128;
+
+ private static final int MAX_SIMULTANEOUSLY_REPLAYED_HINTS = 128;
private static final int LARGE_NUMBER = 65536; // 64k nodes ought to be enough for anybody.
public final HintedHandoffMetrics metrics = new HintedHandoffMetrics();
@@ -110,6 +109,8 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
private final ColumnFamilyStore hintStore = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.HINTS);
+ private static final ColumnDefinition hintColumn = SystemKeyspace.Hints.compactValueColumn();
+
/**
* Returns a mutation representing a Hint to be sent to <code>targetId</code>
* as soon as it becomes available again.
@@ -127,11 +128,20 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
UUID hintId = UUIDGen.getTimeUUID();
// serialize the hint with id and version as a composite column name
- CellName name = SystemKeyspace.Hints.comparator.makeCellName(hintId, MessagingService.current_version);
+
+ PartitionUpdate upd = new PartitionUpdate(SystemKeyspace.Hints,
+ StorageService.getPartitioner().decorateKey(UUIDType.instance.decompose(targetId)),
+ PartitionColumns.of(hintColumn),
+ 1);
+
+ Row.Writer writer = upd.writer();
+ Rows.writeClustering(SystemKeyspace.Hints.comparator.make(hintId, MessagingService.current_version), writer);
+
ByteBuffer value = ByteBuffer.wrap(FBUtilities.serialize(mutation, Mutation.serializer, MessagingService.current_version));
- ColumnFamily cf = ArrayBackedSortedColumns.factory.create(Schema.instance.getCFMetaData(SystemKeyspace.NAME, SystemKeyspace.HINTS));
- cf.addColumn(name, value, now, ttl);
- return new Mutation(SystemKeyspace.NAME, UUIDType.instance.decompose(targetId), cf);
+ writer.writeCell(hintColumn, false, value, SimpleLivenessInfo.forUpdate(now, ttl, FBUtilities.nowInSeconds(), SystemKeyspace.Hints), null);
+ writer.endOfRow();
+
+ return new Mutation(upd);
}
/*
@@ -142,12 +152,11 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
public static int calculateHintTTL(Mutation mutation)
{
int ttl = maxHintTTL;
- for (ColumnFamily cf : mutation.getColumnFamilies())
- ttl = Math.min(ttl, cf.metadata().getGcGraceSeconds());
+ for (PartitionUpdate upd : mutation.getPartitionUpdates())
+ ttl = Math.min(ttl, upd.metadata().getGcGraceSeconds());
return ttl;
}
-
public void start()
{
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
@@ -172,11 +181,17 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
executor.scheduleWithFixedDelay(runnable, 10, 10, TimeUnit.MINUTES);
}
- private static void deleteHint(ByteBuffer tokenBytes, CellName columnName, long timestamp)
+ private static void deleteHint(ByteBuffer tokenBytes, Clustering clustering, long timestamp)
{
- Mutation mutation = new Mutation(SystemKeyspace.NAME, tokenBytes);
- mutation.delete(SystemKeyspace.HINTS, columnName, timestamp);
- mutation.applyUnsafe(); // don't bother with commitlog since we're going to flush as soon as we're done with delivery
+ DecoratedKey dk = StorageService.getPartitioner().decorateKey(tokenBytes);
+
+ PartitionUpdate upd = new PartitionUpdate(SystemKeyspace.Hints, dk, PartitionColumns.of(hintColumn), 1);
+
+ Row.Writer writer = upd.writer();
+ Rows.writeClustering(clustering, writer);
+ Cells.writeTombstone(writer, hintColumn, timestamp, FBUtilities.nowInSeconds());
+
+ new Mutation(upd).applyUnsafe(); // don't bother with commitlog since we're going to flush as soon as we're done with delivery
}
public void deleteHintsForEndpoint(final String ipOrHostname)
@@ -198,9 +213,8 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
if (!StorageService.instance.getTokenMetadata().isMember(endpoint))
return;
UUID hostId = StorageService.instance.getTokenMetadata().getHostId(endpoint);
- ByteBuffer hostIdBytes = ByteBuffer.wrap(UUIDGen.decompose(hostId));
- final Mutation mutation = new Mutation(SystemKeyspace.NAME, hostIdBytes);
- mutation.delete(SystemKeyspace.HINTS, System.currentTimeMillis());
+ DecoratedKey dk = StorageService.getPartitioner().decorateKey(ByteBuffer.wrap(UUIDGen.decompose(hostId)));
+ final Mutation mutation = new Mutation(PartitionUpdate.fullPartitionDelete(SystemKeyspace.Hints, dk, System.currentTimeMillis(), FBUtilities.nowInSeconds()));
// execute asynchronously to avoid blocking caller (which may be processing gossip)
Runnable runnable = new Runnable()
@@ -266,13 +280,6 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
}
}
- private static boolean pagingFinished(ColumnFamily hintColumnFamily, Composite startColumn)
- {
- // done if no hints found or the start column (same as last column processed in previous iteration) is the only one
- return hintColumnFamily == null
- || (!startColumn.isEmpty() && hintColumnFamily.getSortedColumns().size() == 1 && hintColumnFamily.getColumn((CellName)startColumn) != null);
- }
-
private int waitForSchemaAgreement(InetAddress endpoint) throws TimeoutException
{
Gossiper gossiper = Gossiper.instance;
@@ -335,6 +342,27 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
}
doDeliverHintsToEndpoint(endpoint);
+
+ // Flush all the tombstones to disk
+ hintStore.forceBlockingFlush();
+ }
+
+ private boolean checkDelivered(InetAddress endpoint, List<WriteResponseHandler<Mutation>> handlers, AtomicInteger rowsReplayed)
+ {
+ for (WriteResponseHandler<Mutation> handler : handlers)
+ {
+ try
+ {
+ handler.get();
+ }
+ catch (WriteTimeoutException e)
+ {
+ logger.info("Failed replaying hints to {}; aborting ({} delivered), error : {}",
+ endpoint, rowsReplayed, e.getMessage());
+ return false;
+ }
+ }
+ return true;
}
/*
@@ -352,10 +380,6 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
DecoratedKey epkey = StorageService.getPartitioner().decorateKey(hostIdBytes);
final AtomicInteger rowsReplayed = new AtomicInteger(0);
- Composite startColumn = Composites.EMPTY;
-
- int pageSize = calculatePageSize();
- logger.debug("Using pageSize of {}", pageSize);
// rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml).
// max rate is scaled by the number of nodes in the cluster (CASSANDRA-5272).
@@ -363,55 +387,38 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
/ (StorageService.instance.getTokenMetadata().getAllEndpoints().size() - 1);
RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024);
- delivery:
- while (true)
+ int nowInSec = FBUtilities.nowInSeconds();
+ try (OpOrder.Group op = hintStore.readOrdering.start();
+ RowIterator iter = UnfilteredRowIterators.filter(SinglePartitionReadCommand.fullPartitionRead(SystemKeyspace.Hints, nowInSec, epkey).queryMemtableAndDisk(hintStore, op), nowInSec))
{
- long now = System.currentTimeMillis();
- QueryFilter filter = QueryFilter.getSliceFilter(epkey,
- SystemKeyspace.HINTS,
- startColumn,
- Composites.EMPTY,
- false,
- pageSize,
- now);
-
- ColumnFamily hintsPage = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter), (int) (now / 1000));
-
- if (pagingFinished(hintsPage, startColumn))
- {
- logger.info("Finished hinted handoff of {} rows to endpoint {}", rowsReplayed, endpoint);
- break;
- }
+ List<WriteResponseHandler<Mutation>> responseHandlers = Lists.newArrayList();
- // check if node is still alive and we should continue delivery process
- if (!FailureDetector.instance.isAlive(endpoint))
+ while (iter.hasNext())
{
- logger.info("Endpoint {} died during hint delivery; aborting ({} delivered)", endpoint, rowsReplayed);
- break;
- }
+ // check if node is still alive and we should continue delivery process
+ if (!FailureDetector.instance.isAlive(endpoint))
+ {
+ logger.info("Endpoint {} died during hint delivery; aborting ({} delivered)", endpoint, rowsReplayed);
+ return;
+ }
- List<WriteResponseHandler<Mutation>> responseHandlers = Lists.newArrayList();
- for (final Cell hint : hintsPage)
- {
// check if hints delivery has been paused during the process
if (hintedHandOffPaused)
{
logger.debug("Hints delivery process is paused, aborting");
- break delivery;
+ return;
}
- // Skip tombstones:
- // if we iterate quickly enough, it's possible that we could request a new page in the same millisecond
- // in which the local deletion timestamp was generated on the last column in the old page, in which
- // case the hint will have no columns (since it's deleted) but will still be included in the resultset
- // since (even with gcgs=0) it's still a "relevant" tombstone.
- if (!hint.isLive())
- continue;
+ // Wait regularly on the endpoint acknowledgment. If we timeout on it, the endpoint is probably dead so stop delivery
+ if (responseHandlers.size() > MAX_SIMULTANEOUSLY_REPLAYED_HINTS && !checkDelivered(endpoint, responseHandlers, rowsReplayed))
+ return;
- startColumn = hint.name();
+ final Row hint = iter.next();
+ int version = Int32Type.instance.compose(hint.clustering().get(1));
+ Cell cell = hint.getCell(hintColumn);
- int version = Int32Type.instance.compose(hint.name().get(1));
- DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(hint.value()));
+ final long timestamp = cell.livenessInfo().timestamp();
+ DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(cell.value()));
Mutation mutation;
try
{
@@ -420,7 +427,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
catch (UnknownColumnFamilyException e)
{
logger.debug("Skipping delivery of hint for deleted table", e);
- deleteHint(hostIdBytes, hint.name(), hint.timestamp());
+ deleteHint(hostIdBytes, hint.clustering(), timestamp);
continue;
}
catch (IOException e)
@@ -430,7 +437,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
for (UUID cfId : mutation.getColumnFamilyIds())
{
- if (hint.timestamp() <= SystemKeyspace.getTruncatedAt(cfId))
+ if (timestamp <= SystemKeyspace.getTruncatedAt(cfId))
{
logger.debug("Skipping delivery of hint for truncated table {}", cfId);
mutation = mutation.without(cfId);
@@ -439,7 +446,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
if (mutation.isEmpty())
{
- deleteHint(hostIdBytes, hint.name(), hint.timestamp());
+ deleteHint(hostIdBytes, hint.clustering(), timestamp);
continue;
}
@@ -450,7 +457,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
public void run()
{
rowsReplayed.incrementAndGet();
- deleteHint(hostIdBytes, hint.name(), hint.timestamp());
+ deleteHint(hostIdBytes, hint.clustering(), timestamp);
}
};
WriteResponseHandler<Mutation> responseHandler = new WriteResponseHandler<>(endpoint, WriteType.SIMPLE, callback);
@@ -458,38 +465,10 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
responseHandlers.add(responseHandler);
}
- for (WriteResponseHandler<Mutation> handler : responseHandlers)
- {
- try
- {
- handler.get();
- }
- catch (WriteTimeoutException|WriteFailureException e)
- {
- logger.info("Failed replaying hints to {}; aborting ({} delivered), error : {}",
- endpoint, rowsReplayed, e.getMessage());
- break delivery;
- }
- }
+ // Wait on the last handlers
+ if (checkDelivered(endpoint, responseHandlers, rowsReplayed))
+ logger.info("Finished hinted handoff of {} rows to endpoint {}", rowsReplayed, endpoint);
}
-
- // Flush all the tombstones to disk
- hintStore.forceBlockingFlush();
- }
-
- // read less columns (mutations) per page if they are very large
- private int calculatePageSize()
- {
- int meanColumnCount = hintStore.getMeanColumns();
- if (meanColumnCount <= 0)
- return PAGE_SIZE;
-
- int averageColumnSize = (int) (hintStore.metric.meanRowSize.getValue() / meanColumnCount);
- if (averageColumnSize <= 0)
- return PAGE_SIZE;
-
- // page size of 1 does not allow actual paging b/c of >= behavior on startColumn
- return Math.max(2, Math.min(PAGE_SIZE, 4 * 1024 * 1024 / averageColumnSize));
}
/**
@@ -505,18 +484,26 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
// to deliver to).
compact();
- IPartitioner p = StorageService.getPartitioner();
- RowPosition minPos = p.getMinimumToken().minKeyBound();
- Range<RowPosition> range = new Range<>(minPos, minPos);
- IDiskAtomFilter filter = new NamesQueryFilter(ImmutableSortedSet.<CellName>of());
- List<Row> rows = hintStore.getRangeSlice(range, null, filter, Integer.MAX_VALUE, System.currentTimeMillis());
- for (Row row : rows)
+ ReadCommand cmd = new PartitionRangeReadCommand(hintStore.metadata,
+ FBUtilities.nowInSeconds(),
+ ColumnFilter.all(hintStore.metadata),
+ RowFilter.NONE,
+ DataLimits.cqlLimits(Integer.MAX_VALUE, 1),
+ DataRange.allData(StorageService.getPartitioner()));
+
+ try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator iter = cmd.executeLocally(orderGroup))
{
- UUID hostId = UUIDGen.getUUID(row.key.getKey());
- InetAddress target = StorageService.instance.getTokenMetadata().getEndpointForHostId(hostId);
- // token may have since been removed (in which case we have just read back a tombstone)
- if (target != null)
- scheduleHintDelivery(target, false);
+ while (iter.hasNext())
+ {
+ try (UnfilteredRowIterator partition = iter.next())
+ {
+ UUID hostId = UUIDGen.getUUID(partition.partitionKey().getKey());
+ InetAddress target = StorageService.instance.getTokenMetadata().getEndpointForHostId(hostId);
+ // token may have since been removed (in which case we have just read back a tombstone)
+ if (target != null)
+ scheduleHintDelivery(target, false);
+ }
+ }
}
logger.debug("Finished scheduleAllDeliveries");
@@ -572,42 +559,20 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
// Extract the keys as strings to be reported.
LinkedList<String> result = new LinkedList<>();
- for (Row row : getHintsSlice(1))
+ ReadCommand cmd = PartitionRangeReadCommand.allDataRead(SystemKeyspace.Hints, FBUtilities.nowInSeconds());
+ try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator iter = cmd.executeLocally(orderGroup))
{
- if (row.cf != null) //ignore removed rows
- result.addFirst(tokenFactory.toString(row.key.getToken()));
+ while (iter.hasNext())
+ {
+ try (UnfilteredRowIterator partition = iter.next())
+ {
+ // We don't delete by range on the hints table, so we don't have to worry about the
+ // iterator returning only range tombstone marker
+ if (partition.hasNext())
+ result.addFirst(tokenFactory.toString(partition.partitionKey().getToken()));
+ }
+ }
}
return result;
}
-
- private List<Row> getHintsSlice(int columnCount)
- {
- // Get count # of columns...
- SliceQueryFilter predicate = new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY,
- false,
- columnCount);
-
- // From keys "" to ""...
- IPartitioner partitioner = StorageService.getPartitioner();
- RowPosition minPos = partitioner.getMinimumToken().minKeyBound();
- Range<RowPosition> range = new Range<>(minPos, minPos);
-
- try
- {
- RangeSliceCommand cmd = new RangeSliceCommand(SystemKeyspace.NAME,
- SystemKeyspace.HINTS,
- System.currentTimeMillis(),
- predicate,
- range,
- null,
- LARGE_NUMBER);
- return StorageProxy.getRangeSlice(cmd, ConsistencyLevel.ONE);
- }
- catch (Exception e)
- {
- logger.info("HintsCF getEPPendingHints timed out.");
- throw new RuntimeException(e);
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/IMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/IMutation.java b/src/java/org/apache/cassandra/db/IMutation.java
index 44df104..aad35c3 100644
--- a/src/java/org/apache/cassandra/db/IMutation.java
+++ b/src/java/org/apache/cassandra/db/IMutation.java
@@ -21,13 +21,14 @@ import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.UUID;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+
public interface IMutation
{
public String getKeyspaceName();
public Collection<UUID> getColumnFamilyIds();
- public ByteBuffer key();
+ public DecoratedKey key();
public long getTimeout();
public String toString(boolean shallow);
- public void addAll(IMutation m);
- public Collection<ColumnFamily> getColumnFamilies();
+ public Collection<PartitionUpdate> getPartitionUpdates();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/IndexExpression.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/IndexExpression.java b/src/java/org/apache/cassandra/db/IndexExpression.java
deleted file mode 100644
index bdb74ce..0000000
--- a/src/java/org/apache/cassandra/db/IndexExpression.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cassandra.db;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import com.google.common.base.Objects;
-
-import org.apache.cassandra.cql3.Operator;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-public final class IndexExpression
-{
- public final ByteBuffer column;
- public final Operator operator;
- public final ByteBuffer value;
-
- public IndexExpression(ByteBuffer column, Operator operator, ByteBuffer value)
- {
- this.column = column;
- this.operator = operator;
- this.value = value;
- }
-
- /**
- * Checks if the operator of this <code>IndexExpression</code> is a <code>CONTAINS</code> operator.
- *
- * @return <code>true</code> if the operator of this <code>IndexExpression</code> is a <code>CONTAINS</code>
- * operator, <code>false</code> otherwise.
- */
- public boolean isContains()
- {
- return Operator.CONTAINS == operator;
- }
-
- /**
- * Checks if the operator of this <code>IndexExpression</code> is a <code>CONTAINS_KEY</code> operator.
- *
- * @return <code>true</code> if the operator of this <code>IndexExpression</code> is a <code>CONTAINS_KEY</code>
- * operator, <code>false</code> otherwise.
- */
- public boolean isContainsKey()
- {
- return Operator.CONTAINS_KEY == operator;
- }
-
- @Override
- public String toString()
- {
- return String.format("%s %s %s", ByteBufferUtil.bytesToHex(column), operator, ByteBufferUtil.bytesToHex(value));
- }
-
- @Override
- public boolean equals(Object o)
- {
- if (this == o)
- return true;
-
- if (!(o instanceof IndexExpression))
- return false;
-
- IndexExpression ie = (IndexExpression) o;
-
- return Objects.equal(this.column, ie.column)
- && Objects.equal(this.operator, ie.operator)
- && Objects.equal(this.value, ie.value);
- }
-
- @Override
- public int hashCode()
- {
- return Objects.hashCode(column, operator, value);
- }
-
- /**
- * Write the serialized version of this <code>IndexExpression</code> to the specified output.
- *
- * @param output the output to write to
- * @throws IOException if an I/O problem occurs while writing to the specified output
- */
- public void writeTo(DataOutputPlus output) throws IOException
- {
- ByteBufferUtil.writeWithShortLength(column, output);
- operator.writeTo(output);
- ByteBufferUtil.writeWithShortLength(value, output);
- }
-
- /**
- * Deserializes an <code>IndexExpression</code> instance from the specified input.
- *
- * @param input the input to read from
- * @return the <code>IndexExpression</code> instance deserialized
- * @throws IOException if a problem occurs while deserializing the <code>IndexExpression</code> instance.
- */
- public static IndexExpression readFrom(DataInput input) throws IOException
- {
- return new IndexExpression(ByteBufferUtil.readWithShortLength(input),
- Operator.readFrom(input),
- ByteBufferUtil.readWithShortLength(input));
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index cb5c54d..e045466 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -30,18 +30,19 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.*;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.index.SecondaryIndex;
import org.apache.cassandra.db.index.SecondaryIndexManager;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.pager.QueryPagers;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.metrics.KeyspaceMetrics;
/**
@@ -49,8 +50,6 @@ import org.apache.cassandra.metrics.KeyspaceMetrics;
*/
public class Keyspace
{
- private static final int DEFAULT_PAGE_SIZE = 10000;
-
private static final Logger logger = LoggerFactory.getLogger(Keyspace.class);
private static final String TEST_FAIL_WRITES_KS = System.getProperty("cassandra.test.fail_writes_ks", "");
@@ -145,6 +144,11 @@ public class Keyspace
}
}
+ public static ColumnFamilyStore openAndGetStore(CFMetaData cfm)
+ {
+ return open(cfm.ksName).getColumnFamilyStore(cfm.cfId);
+ }
+
/**
* Removes every SSTable in the directory from the appropriate Tracker's view.
* @param directory the unreadable directory, possibly with SSTables in it, but not necessarily.
@@ -347,13 +351,6 @@ public class Keyspace
}
}
- public Row getRow(QueryFilter filter)
- {
- ColumnFamilyStore cfStore = getColumnFamilyStore(filter.getColumnFamilyName());
- ColumnFamily columnFamily = cfStore.getColumnFamily(filter);
- return new Row(filter.key, columnFamily);
- }
-
public void apply(Mutation mutation, boolean writeCommitLog)
{
apply(mutation, writeCommitLog, true);
@@ -372,6 +369,7 @@ public class Keyspace
if (TEST_FAIL_WRITES && metadata.name.equals(TEST_FAIL_WRITES_KS))
throw new RuntimeException("Testing write failures");
+ int nowInSec = FBUtilities.nowInSeconds();
try (OpOrder.Group opGroup = writeOrder.start())
{
// write the mutation to the commitlog and memtables
@@ -382,21 +380,21 @@ public class Keyspace
replayPosition = CommitLog.instance.add(mutation);
}
- DecoratedKey key = StorageService.getPartitioner().decorateKey(mutation.key());
- for (ColumnFamily cf : mutation.getColumnFamilies())
+ DecoratedKey key = mutation.key();
+ for (PartitionUpdate upd : mutation.getPartitionUpdates())
{
- ColumnFamilyStore cfs = columnFamilyStores.get(cf.id());
+ ColumnFamilyStore cfs = columnFamilyStores.get(upd.metadata().cfId);
if (cfs == null)
{
- logger.error("Attempting to mutate non-existant table {}", cf.id());
+ logger.error("Attempting to mutate non-existant table {}", upd.metadata().cfId);
continue;
}
- Tracing.trace("Adding to {} memtable", cf.metadata().cfName);
+ Tracing.trace("Adding to {} memtable", upd.metadata().cfName);
SecondaryIndexManager.Updater updater = updateIndexes
- ? cfs.indexManager.updaterFor(key, cf, opGroup)
+ ? cfs.indexManager.updaterFor(upd, opGroup, nowInSec)
: SecondaryIndexManager.nullUpdater;
- cfs.apply(key, cf, updater, opGroup, replayPosition);
+ cfs.apply(upd, updater, opGroup, replayPosition);
}
}
}
@@ -408,30 +406,21 @@ public class Keyspace
/**
* @param key row to index
- * @param cfs ColumnFamily to index row in
+ * @param cfs ColumnFamily to index partition in
* @param idxNames columns to index, in comparator order
*/
- public static void indexRow(DecoratedKey key, ColumnFamilyStore cfs, Set<String> idxNames)
+ public static void indexPartition(DecoratedKey key, ColumnFamilyStore cfs, Set<String> idxNames)
{
if (logger.isDebugEnabled())
- logger.debug("Indexing row {} ", cfs.metadata.getKeyValidator().getString(key.getKey()));
+ logger.debug("Indexing partition {} ", cfs.metadata.getKeyValidator().getString(key.getKey()));
- try (OpOrder.Group opGroup = cfs.keyspace.writeOrder.start())
- {
- Set<SecondaryIndex> indexes = cfs.indexManager.getIndexesByNames(idxNames);
+ Set<SecondaryIndex> indexes = cfs.indexManager.getIndexesByNames(idxNames);
+ SinglePartitionReadCommand cmd = SinglePartitionReadCommand.fullPartitionRead(cfs.metadata, FBUtilities.nowInSeconds(), key);
- Iterator<ColumnFamily> pager = QueryPagers.pageRowLocally(cfs, key.getKey(), DEFAULT_PAGE_SIZE);
- while (pager.hasNext())
- {
- ColumnFamily cf = pager.next();
- ColumnFamily cf2 = cf.cloneMeShallow();
- for (Cell cell : cf)
- {
- if (cfs.indexManager.indexes(cell.name(), indexes))
- cf2.addColumn(cell);
- }
- cfs.indexManager.indexRow(key.getKey(), cf2, opGroup);
- }
+ try (OpOrder.Group opGroup = cfs.keyspace.writeOrder.start();
+ UnfilteredRowIterator partition = cmd.queryMemtableAndDisk(cfs, opGroup))
+ {
+ cfs.indexManager.indexPartition(partition, opGroup, indexes, cmd.nowInSec());
}
}