You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/06/30 12:48:04 UTC

[40/51] [partial] cassandra git commit: Storage engine refactor, a.k.a CASSANDRA-8099

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index df8820b..848ba01 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -30,7 +30,6 @@ import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableSortedSet;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.RateLimiter;
 import com.google.common.util.concurrent.Uninterruptibles;
@@ -39,17 +38,15 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.JMXEnabledScheduledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.composites.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.db.marshal.UUIDType;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.exceptions.WriteFailureException;
 import org.apache.cassandra.exceptions.WriteTimeoutException;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.FailureDetector;
@@ -61,6 +58,7 @@ import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.*;
 import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.concurrent.OpOrder;
 import org.cliffc.high_scale_lib.NonBlockingHashSet;
 
 /**
@@ -91,7 +89,8 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
     public static final HintedHandOffManager instance = new HintedHandOffManager();
 
     private static final Logger logger = LoggerFactory.getLogger(HintedHandOffManager.class);
-    private static final int PAGE_SIZE = 128;
+
+    private static final int MAX_SIMULTANEOUSLY_REPLAYED_HINTS = 128;
     private static final int LARGE_NUMBER = 65536; // 64k nodes ought to be enough for anybody.
 
     public final HintedHandoffMetrics metrics = new HintedHandoffMetrics();
@@ -110,6 +109,8 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
 
     private final ColumnFamilyStore hintStore = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.HINTS);
 
+    private static final ColumnDefinition hintColumn = SystemKeyspace.Hints.compactValueColumn();
+
     /**
      * Returns a mutation representing a Hint to be sent to <code>targetId</code>
      * as soon as it becomes available again.
@@ -127,11 +128,20 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
 
         UUID hintId = UUIDGen.getTimeUUID();
         // serialize the hint with id and version as a composite column name
-        CellName name = SystemKeyspace.Hints.comparator.makeCellName(hintId, MessagingService.current_version);
+
+        PartitionUpdate upd = new PartitionUpdate(SystemKeyspace.Hints,
+                                                  StorageService.getPartitioner().decorateKey(UUIDType.instance.decompose(targetId)),
+                                                  PartitionColumns.of(hintColumn),
+                                                  1);
+
+        Row.Writer writer = upd.writer();
+        Rows.writeClustering(SystemKeyspace.Hints.comparator.make(hintId, MessagingService.current_version), writer);
+
         ByteBuffer value = ByteBuffer.wrap(FBUtilities.serialize(mutation, Mutation.serializer, MessagingService.current_version));
-        ColumnFamily cf = ArrayBackedSortedColumns.factory.create(Schema.instance.getCFMetaData(SystemKeyspace.NAME, SystemKeyspace.HINTS));
-        cf.addColumn(name, value, now, ttl);
-        return new Mutation(SystemKeyspace.NAME, UUIDType.instance.decompose(targetId), cf);
+        writer.writeCell(hintColumn, false, value, SimpleLivenessInfo.forUpdate(now, ttl, FBUtilities.nowInSeconds(), SystemKeyspace.Hints), null);
+        writer.endOfRow();
+
+        return new Mutation(upd);
     }
 
     /*
@@ -142,12 +152,11 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
     public static int calculateHintTTL(Mutation mutation)
     {
         int ttl = maxHintTTL;
-        for (ColumnFamily cf : mutation.getColumnFamilies())
-            ttl = Math.min(ttl, cf.metadata().getGcGraceSeconds());
+        for (PartitionUpdate upd : mutation.getPartitionUpdates())
+            ttl = Math.min(ttl, upd.metadata().getGcGraceSeconds());
         return ttl;
     }
 
-
     public void start()
     {
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
@@ -172,11 +181,17 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
         executor.scheduleWithFixedDelay(runnable, 10, 10, TimeUnit.MINUTES);
     }
 
-    private static void deleteHint(ByteBuffer tokenBytes, CellName columnName, long timestamp)
+    private static void deleteHint(ByteBuffer tokenBytes, Clustering clustering, long timestamp)
     {
-        Mutation mutation = new Mutation(SystemKeyspace.NAME, tokenBytes);
-        mutation.delete(SystemKeyspace.HINTS, columnName, timestamp);
-        mutation.applyUnsafe(); // don't bother with commitlog since we're going to flush as soon as we're done with delivery
+        DecoratedKey dk =  StorageService.getPartitioner().decorateKey(tokenBytes);
+
+        PartitionUpdate upd = new PartitionUpdate(SystemKeyspace.Hints, dk, PartitionColumns.of(hintColumn), 1);
+
+        Row.Writer writer = upd.writer();
+        Rows.writeClustering(clustering, writer);
+        Cells.writeTombstone(writer, hintColumn, timestamp, FBUtilities.nowInSeconds());
+
+        new Mutation(upd).applyUnsafe(); // don't bother with commitlog since we're going to flush as soon as we're done with delivery
     }
 
     public void deleteHintsForEndpoint(final String ipOrHostname)
@@ -198,9 +213,8 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
         if (!StorageService.instance.getTokenMetadata().isMember(endpoint))
             return;
         UUID hostId = StorageService.instance.getTokenMetadata().getHostId(endpoint);
-        ByteBuffer hostIdBytes = ByteBuffer.wrap(UUIDGen.decompose(hostId));
-        final Mutation mutation = new Mutation(SystemKeyspace.NAME, hostIdBytes);
-        mutation.delete(SystemKeyspace.HINTS, System.currentTimeMillis());
+        DecoratedKey dk =  StorageService.getPartitioner().decorateKey(ByteBuffer.wrap(UUIDGen.decompose(hostId)));
+        final Mutation mutation = new Mutation(PartitionUpdate.fullPartitionDelete(SystemKeyspace.Hints, dk, System.currentTimeMillis(), FBUtilities.nowInSeconds()));
 
         // execute asynchronously to avoid blocking caller (which may be processing gossip)
         Runnable runnable = new Runnable()
@@ -266,13 +280,6 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
         }
     }
 
-    private static boolean pagingFinished(ColumnFamily hintColumnFamily, Composite startColumn)
-    {
-        // done if no hints found or the start column (same as last column processed in previous iteration) is the only one
-        return hintColumnFamily == null
-               || (!startColumn.isEmpty() && hintColumnFamily.getSortedColumns().size() == 1 && hintColumnFamily.getColumn((CellName)startColumn) != null);
-    }
-
     private int waitForSchemaAgreement(InetAddress endpoint) throws TimeoutException
     {
         Gossiper gossiper = Gossiper.instance;
@@ -335,6 +342,27 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
         }
 
         doDeliverHintsToEndpoint(endpoint);
+
+        // Flush all the tombstones to disk
+        hintStore.forceBlockingFlush();
+    }
+
+    private boolean checkDelivered(InetAddress endpoint, List<WriteResponseHandler<Mutation>> handlers, AtomicInteger rowsReplayed)
+    {
+        for (WriteResponseHandler<Mutation> handler : handlers)
+        {
+            try
+            {
+                handler.get();
+            }
+            catch (WriteTimeoutException e)
+            {
+                logger.info("Failed replaying hints to {}; aborting ({} delivered), error : {}",
+                            endpoint, rowsReplayed, e.getMessage());
+                return false;
+            }
+        }
+        return true;
     }
 
     /*
@@ -352,10 +380,6 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
         DecoratedKey epkey =  StorageService.getPartitioner().decorateKey(hostIdBytes);
 
         final AtomicInteger rowsReplayed = new AtomicInteger(0);
-        Composite startColumn = Composites.EMPTY;
-
-        int pageSize = calculatePageSize();
-        logger.debug("Using pageSize of {}", pageSize);
 
         // rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml).
         // max rate is scaled by the number of nodes in the cluster (CASSANDRA-5272).
@@ -363,55 +387,38 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
                            / (StorageService.instance.getTokenMetadata().getAllEndpoints().size() - 1);
         RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024);
 
-        delivery:
-        while (true)
+        int nowInSec = FBUtilities.nowInSeconds();
+        try (OpOrder.Group op = hintStore.readOrdering.start();
+             RowIterator iter = UnfilteredRowIterators.filter(SinglePartitionReadCommand.fullPartitionRead(SystemKeyspace.Hints, nowInSec, epkey).queryMemtableAndDisk(hintStore, op), nowInSec))
         {
-            long now = System.currentTimeMillis();
-            QueryFilter filter = QueryFilter.getSliceFilter(epkey,
-                                                            SystemKeyspace.HINTS,
-                                                            startColumn,
-                                                            Composites.EMPTY,
-                                                            false,
-                                                            pageSize,
-                                                            now);
-
-            ColumnFamily hintsPage = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter), (int) (now / 1000));
-
-            if (pagingFinished(hintsPage, startColumn))
-            {
-                logger.info("Finished hinted handoff of {} rows to endpoint {}", rowsReplayed, endpoint);
-                break;
-            }
+            List<WriteResponseHandler<Mutation>> responseHandlers = Lists.newArrayList();
 
-            // check if node is still alive and we should continue delivery process
-            if (!FailureDetector.instance.isAlive(endpoint))
+            while (iter.hasNext())
             {
-                logger.info("Endpoint {} died during hint delivery; aborting ({} delivered)", endpoint, rowsReplayed);
-                break;
-            }
+                // check if node is still alive and we should continue delivery process
+                if (!FailureDetector.instance.isAlive(endpoint))
+                {
+                    logger.info("Endpoint {} died during hint delivery; aborting ({} delivered)", endpoint, rowsReplayed);
+                    return;
+                }
 
-            List<WriteResponseHandler<Mutation>> responseHandlers = Lists.newArrayList();
-            for (final Cell hint : hintsPage)
-            {
                 // check if hints delivery has been paused during the process
                 if (hintedHandOffPaused)
                 {
                     logger.debug("Hints delivery process is paused, aborting");
-                    break delivery;
+                    return;
                 }
 
-                // Skip tombstones:
-                // if we iterate quickly enough, it's possible that we could request a new page in the same millisecond
-                // in which the local deletion timestamp was generated on the last column in the old page, in which
-                // case the hint will have no columns (since it's deleted) but will still be included in the resultset
-                // since (even with gcgs=0) it's still a "relevant" tombstone.
-                if (!hint.isLive())
-                    continue;
+                // Wait regularly on the endpoint acknowledgment. If we timeout on it, the endpoint is probably dead so stop delivery
+                if (responseHandlers.size() > MAX_SIMULTANEOUSLY_REPLAYED_HINTS && !checkDelivered(endpoint, responseHandlers, rowsReplayed))
+                    return;
 
-                startColumn = hint.name();
+                final Row hint = iter.next();
+                int version = Int32Type.instance.compose(hint.clustering().get(1));
+                Cell cell = hint.getCell(hintColumn);
 
-                int version = Int32Type.instance.compose(hint.name().get(1));
-                DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(hint.value()));
+                final long timestamp = cell.livenessInfo().timestamp();
+                DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(cell.value()));
                 Mutation mutation;
                 try
                 {
@@ -420,7 +427,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
                 catch (UnknownColumnFamilyException e)
                 {
                     logger.debug("Skipping delivery of hint for deleted table", e);
-                    deleteHint(hostIdBytes, hint.name(), hint.timestamp());
+                    deleteHint(hostIdBytes, hint.clustering(), timestamp);
                     continue;
                 }
                 catch (IOException e)
@@ -430,7 +437,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
 
                 for (UUID cfId : mutation.getColumnFamilyIds())
                 {
-                    if (hint.timestamp() <= SystemKeyspace.getTruncatedAt(cfId))
+                    if (timestamp <= SystemKeyspace.getTruncatedAt(cfId))
                     {
                         logger.debug("Skipping delivery of hint for truncated table {}", cfId);
                         mutation = mutation.without(cfId);
@@ -439,7 +446,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
 
                 if (mutation.isEmpty())
                 {
-                    deleteHint(hostIdBytes, hint.name(), hint.timestamp());
+                    deleteHint(hostIdBytes, hint.clustering(), timestamp);
                     continue;
                 }
 
@@ -450,7 +457,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
                     public void run()
                     {
                         rowsReplayed.incrementAndGet();
-                        deleteHint(hostIdBytes, hint.name(), hint.timestamp());
+                        deleteHint(hostIdBytes, hint.clustering(), timestamp);
                     }
                 };
                 WriteResponseHandler<Mutation> responseHandler = new WriteResponseHandler<>(endpoint, WriteType.SIMPLE, callback);
@@ -458,38 +465,10 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
                 responseHandlers.add(responseHandler);
             }
 
-            for (WriteResponseHandler<Mutation> handler : responseHandlers)
-            {
-                try
-                {
-                    handler.get();
-                }
-                catch (WriteTimeoutException|WriteFailureException e)
-                {
-                    logger.info("Failed replaying hints to {}; aborting ({} delivered), error : {}",
-                        endpoint, rowsReplayed, e.getMessage());
-                    break delivery;
-                }
-            }
+            // Wait on the last handlers
+            if (checkDelivered(endpoint, responseHandlers, rowsReplayed))
+                logger.info("Finished hinted handoff of {} rows to endpoint {}", rowsReplayed, endpoint);
         }
-
-        // Flush all the tombstones to disk
-        hintStore.forceBlockingFlush();
-    }
-
-    // read less columns (mutations) per page if they are very large
-    private int calculatePageSize()
-    {
-        int meanColumnCount = hintStore.getMeanColumns();
-        if (meanColumnCount <= 0)
-            return PAGE_SIZE;
-
-        int averageColumnSize = (int) (hintStore.metric.meanRowSize.getValue() / meanColumnCount);
-        if (averageColumnSize <= 0)
-            return PAGE_SIZE;
-
-        // page size of 1 does not allow actual paging b/c of >= behavior on startColumn
-        return Math.max(2, Math.min(PAGE_SIZE, 4 * 1024 * 1024 / averageColumnSize));
     }
 
     /**
@@ -505,18 +484,26 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
         // to deliver to).
         compact();
 
-        IPartitioner p = StorageService.getPartitioner();
-        RowPosition minPos = p.getMinimumToken().minKeyBound();
-        Range<RowPosition> range = new Range<>(minPos, minPos);
-        IDiskAtomFilter filter = new NamesQueryFilter(ImmutableSortedSet.<CellName>of());
-        List<Row> rows = hintStore.getRangeSlice(range, null, filter, Integer.MAX_VALUE, System.currentTimeMillis());
-        for (Row row : rows)
+        ReadCommand cmd = new PartitionRangeReadCommand(hintStore.metadata,
+                                                        FBUtilities.nowInSeconds(),
+                                                        ColumnFilter.all(hintStore.metadata),
+                                                        RowFilter.NONE,
+                                                        DataLimits.cqlLimits(Integer.MAX_VALUE, 1),
+                                                        DataRange.allData(StorageService.getPartitioner()));
+
+        try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator iter = cmd.executeLocally(orderGroup))
         {
-            UUID hostId = UUIDGen.getUUID(row.key.getKey());
-            InetAddress target = StorageService.instance.getTokenMetadata().getEndpointForHostId(hostId);
-            // token may have since been removed (in which case we have just read back a tombstone)
-            if (target != null)
-                scheduleHintDelivery(target, false);
+            while (iter.hasNext())
+            {
+                try (UnfilteredRowIterator partition = iter.next())
+                {
+                    UUID hostId = UUIDGen.getUUID(partition.partitionKey().getKey());
+                    InetAddress target = StorageService.instance.getTokenMetadata().getEndpointForHostId(hostId);
+                    // token may have since been removed (in which case we have just read back a tombstone)
+                    if (target != null)
+                        scheduleHintDelivery(target, false);
+                }
+            }
         }
 
         logger.debug("Finished scheduleAllDeliveries");
@@ -572,42 +559,20 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
 
         // Extract the keys as strings to be reported.
         LinkedList<String> result = new LinkedList<>();
-        for (Row row : getHintsSlice(1))
+        ReadCommand cmd = PartitionRangeReadCommand.allDataRead(SystemKeyspace.Hints, FBUtilities.nowInSeconds());
+        try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator iter = cmd.executeLocally(orderGroup))
         {
-            if (row.cf != null) //ignore removed rows
-                result.addFirst(tokenFactory.toString(row.key.getToken()));
+            while (iter.hasNext())
+            {
+                try (UnfilteredRowIterator partition = iter.next())
+                {
+                    // We don't delete by range on the hints table, so we don't have to worry about the
+                    // iterator returning only range tombstone marker
+                    if (partition.hasNext())
+                        result.addFirst(tokenFactory.toString(partition.partitionKey().getToken()));
+                }
+            }
         }
         return result;
     }
-
-    private List<Row> getHintsSlice(int columnCount)
-    {
-        // Get count # of columns...
-        SliceQueryFilter predicate = new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY,
-                                                          false,
-                                                          columnCount);
-
-        // From keys "" to ""...
-        IPartitioner partitioner = StorageService.getPartitioner();
-        RowPosition minPos = partitioner.getMinimumToken().minKeyBound();
-        Range<RowPosition> range = new Range<>(minPos, minPos);
-
-        try
-        {
-            RangeSliceCommand cmd = new RangeSliceCommand(SystemKeyspace.NAME,
-                                                          SystemKeyspace.HINTS,
-                                                          System.currentTimeMillis(),
-                                                          predicate,
-                                                          range,
-                                                          null,
-                                                          LARGE_NUMBER);
-            return StorageProxy.getRangeSlice(cmd, ConsistencyLevel.ONE);
-        }
-        catch (Exception e)
-        {
-            logger.info("HintsCF getEPPendingHints timed out.");
-            throw new RuntimeException(e);
-        }
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/IMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/IMutation.java b/src/java/org/apache/cassandra/db/IMutation.java
index 44df104..aad35c3 100644
--- a/src/java/org/apache/cassandra/db/IMutation.java
+++ b/src/java/org/apache/cassandra/db/IMutation.java
@@ -21,13 +21,14 @@ import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.UUID;
 
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+
 public interface IMutation
 {
     public String getKeyspaceName();
     public Collection<UUID> getColumnFamilyIds();
-    public ByteBuffer key();
+    public DecoratedKey key();
     public long getTimeout();
     public String toString(boolean shallow);
-    public void addAll(IMutation m);
-    public Collection<ColumnFamily> getColumnFamilies();
+    public Collection<PartitionUpdate> getPartitionUpdates();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/IndexExpression.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/IndexExpression.java b/src/java/org/apache/cassandra/db/IndexExpression.java
deleted file mode 100644
index bdb74ce..0000000
--- a/src/java/org/apache/cassandra/db/IndexExpression.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cassandra.db;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import com.google.common.base.Objects;
-
-import org.apache.cassandra.cql3.Operator;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-public final class IndexExpression
-{
-    public final ByteBuffer column;
-    public final Operator operator;
-    public final ByteBuffer value;
-
-    public IndexExpression(ByteBuffer column, Operator operator, ByteBuffer value)
-    {
-        this.column = column;
-        this.operator = operator;
-        this.value = value;
-    }
-
-    /**
-     * Checks if the operator of this <code>IndexExpression</code> is a <code>CONTAINS</code> operator.
-     *
-     * @return <code>true</code> if the operator of this <code>IndexExpression</code> is a <code>CONTAINS</code>
-     * operator, <code>false</code> otherwise.
-     */
-    public boolean isContains()
-    {
-        return Operator.CONTAINS == operator;
-    }
-
-    /**
-     * Checks if the operator of this <code>IndexExpression</code> is a <code>CONTAINS_KEY</code> operator.
-     *
-     * @return <code>true</code> if the operator of this <code>IndexExpression</code> is a <code>CONTAINS_KEY</code>
-     * operator, <code>false</code> otherwise.
-     */
-    public boolean isContainsKey()
-    {
-        return Operator.CONTAINS_KEY == operator;
-    }
-
-    @Override
-    public String toString()
-    {
-        return String.format("%s %s %s", ByteBufferUtil.bytesToHex(column), operator, ByteBufferUtil.bytesToHex(value));
-    }
-
-    @Override
-    public boolean equals(Object o)
-    {
-        if (this == o)
-            return true;
-
-        if (!(o instanceof IndexExpression))
-            return false;
-
-        IndexExpression ie = (IndexExpression) o;
-
-        return Objects.equal(this.column, ie.column)
-            && Objects.equal(this.operator, ie.operator)
-            && Objects.equal(this.value, ie.value);
-    }
-
-    @Override
-    public int hashCode()
-    {
-        return Objects.hashCode(column, operator, value);
-    }
-
-    /**
-     * Write the serialized version of this <code>IndexExpression</code> to the specified output.
-     *
-     * @param output the output to write to
-     * @throws IOException if an I/O problem occurs while writing to the specified output
-     */
-    public void writeTo(DataOutputPlus output) throws IOException
-    {
-        ByteBufferUtil.writeWithShortLength(column, output);
-        operator.writeTo(output);
-        ByteBufferUtil.writeWithShortLength(value, output);
-    }
-
-    /**
-     * Deserializes an <code>IndexExpression</code> instance from the specified input. 
-     *
-     * @param input the input to read from 
-     * @return the <code>IndexExpression</code> instance deserialized
-     * @throws IOException if a problem occurs while deserializing the <code>IndexExpression</code> instance.
-     */
-    public static IndexExpression readFrom(DataInput input) throws IOException
-    {
-        return new IndexExpression(ByteBufferUtil.readWithShortLength(input),
-                                   Operator.readFrom(input),
-                                   ByteBufferUtil.readWithShortLength(input));
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index cb5c54d..e045466 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -30,18 +30,19 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.*;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.pager.QueryPagers;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.metrics.KeyspaceMetrics;
 
 /**
@@ -49,8 +50,6 @@ import org.apache.cassandra.metrics.KeyspaceMetrics;
  */
 public class Keyspace
 {
-    private static final int DEFAULT_PAGE_SIZE = 10000;
-
     private static final Logger logger = LoggerFactory.getLogger(Keyspace.class);
 
     private static final String TEST_FAIL_WRITES_KS = System.getProperty("cassandra.test.fail_writes_ks", "");
@@ -145,6 +144,11 @@ public class Keyspace
         }
     }
 
+    public static ColumnFamilyStore openAndGetStore(CFMetaData cfm)
+    {
+        return open(cfm.ksName).getColumnFamilyStore(cfm.cfId);
+    }
+
     /**
      * Removes every SSTable in the directory from the appropriate Tracker's view.
      * @param directory the unreadable directory, possibly with SSTables in it, but not necessarily.
@@ -347,13 +351,6 @@ public class Keyspace
         }
     }
 
-    public Row getRow(QueryFilter filter)
-    {
-        ColumnFamilyStore cfStore = getColumnFamilyStore(filter.getColumnFamilyName());
-        ColumnFamily columnFamily = cfStore.getColumnFamily(filter);
-        return new Row(filter.key, columnFamily);
-    }
-
     public void apply(Mutation mutation, boolean writeCommitLog)
     {
         apply(mutation, writeCommitLog, true);
@@ -372,6 +369,7 @@ public class Keyspace
         if (TEST_FAIL_WRITES && metadata.name.equals(TEST_FAIL_WRITES_KS))
             throw new RuntimeException("Testing write failures");
 
+        int nowInSec = FBUtilities.nowInSeconds();
         try (OpOrder.Group opGroup = writeOrder.start())
         {
             // write the mutation to the commitlog and memtables
@@ -382,21 +380,21 @@ public class Keyspace
                 replayPosition = CommitLog.instance.add(mutation);
             }
 
-            DecoratedKey key = StorageService.getPartitioner().decorateKey(mutation.key());
-            for (ColumnFamily cf : mutation.getColumnFamilies())
+            DecoratedKey key = mutation.key();
+            for (PartitionUpdate upd : mutation.getPartitionUpdates())
             {
-                ColumnFamilyStore cfs = columnFamilyStores.get(cf.id());
+                ColumnFamilyStore cfs = columnFamilyStores.get(upd.metadata().cfId);
                 if (cfs == null)
                 {
-                    logger.error("Attempting to mutate non-existant table {}", cf.id());
+                    logger.error("Attempting to mutate non-existant table {}", upd.metadata().cfId);
                     continue;
                 }
 
-                Tracing.trace("Adding to {} memtable", cf.metadata().cfName);
+                Tracing.trace("Adding to {} memtable", upd.metadata().cfName);
                 SecondaryIndexManager.Updater updater = updateIndexes
-                                                      ? cfs.indexManager.updaterFor(key, cf, opGroup)
+                                                      ? cfs.indexManager.updaterFor(upd, opGroup, nowInSec)
                                                       : SecondaryIndexManager.nullUpdater;
-                cfs.apply(key, cf, updater, opGroup, replayPosition);
+                cfs.apply(upd, updater, opGroup, replayPosition);
             }
         }
     }
@@ -408,30 +406,21 @@ public class Keyspace
 
     /**
      * @param key row to index
-     * @param cfs ColumnFamily to index row in
+     * @param cfs ColumnFamily to index partition in
      * @param idxNames columns to index, in comparator order
      */
-    public static void indexRow(DecoratedKey key, ColumnFamilyStore cfs, Set<String> idxNames)
+    public static void indexPartition(DecoratedKey key, ColumnFamilyStore cfs, Set<String> idxNames)
     {
         if (logger.isDebugEnabled())
-            logger.debug("Indexing row {} ", cfs.metadata.getKeyValidator().getString(key.getKey()));
+            logger.debug("Indexing partition {} ", cfs.metadata.getKeyValidator().getString(key.getKey()));
 
-        try (OpOrder.Group opGroup = cfs.keyspace.writeOrder.start())
-        {
-            Set<SecondaryIndex> indexes = cfs.indexManager.getIndexesByNames(idxNames);
+        Set<SecondaryIndex> indexes = cfs.indexManager.getIndexesByNames(idxNames);
+        SinglePartitionReadCommand cmd = SinglePartitionReadCommand.fullPartitionRead(cfs.metadata, FBUtilities.nowInSeconds(), key);
 
-            Iterator<ColumnFamily> pager = QueryPagers.pageRowLocally(cfs, key.getKey(), DEFAULT_PAGE_SIZE);
-            while (pager.hasNext())
-            {
-                ColumnFamily cf = pager.next();
-                ColumnFamily cf2 = cf.cloneMeShallow();
-                for (Cell cell : cf)
-                {
-                    if (cfs.indexManager.indexes(cell.name(), indexes))
-                        cf2.addColumn(cell);
-                }
-                cfs.indexManager.indexRow(key.getKey(), cf2, opGroup);
-            }
+        try (OpOrder.Group opGroup = cfs.keyspace.writeOrder.start();
+             UnfilteredRowIterator partition = cmd.queryMemtableAndDisk(cfs, opGroup))
+        {
+            cfs.indexManager.indexPartition(partition, opGroup, indexes, cmd.nowInSec());
         }
     }