You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2015/11/27 11:12:44 UTC

cassandra git commit: Reject index queries while the index is building

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.2 b3e6a433e -> 61e0251a1


Reject index queries while the index is building

patch by Benjamin Lerer; reviewed by Sam Tunnicliffe for CASSANDRA-8505


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/61e0251a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/61e0251a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/61e0251a

Branch: refs/heads/cassandra-2.2
Commit: 61e0251a1d4ddc695382aee11e443506afd40899
Parents: b3e6a43
Author: Benjamin Lerer <b....@gmail.com>
Authored: Fri Nov 27 11:10:28 2015 +0100
Committer: Benjamin Lerer <b....@gmail.com>
Committed: Fri Nov 27 11:10:28 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  |   2 +-
 .../db/index/IndexNotAvailableException.java    |  49 ++++++
 .../cassandra/db/index/SecondaryIndex.java      |  28 +++-
 .../db/index/SecondaryIndexManager.java         |   6 +-
 .../db/index/composites/CompositesSearcher.java |   9 +-
 .../cassandra/db/index/keys/KeysSearcher.java   |   4 +-
 .../cassandra/net/MessageDeliveryTask.java      |   7 +-
 .../validation/entities/SecondaryIndexTest.java | 159 ++++++++++++++++++-
 9 files changed, 244 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/61e0251a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c2940cc..63305d6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.4
+ * Reject index queries while the index is building (CASSANDRA-8505)
  * CQL.textile syntax incorrectly includes optional keyspace for aggregate SFUNC and FINALFUNC (CASSANDRA-10747)
  * Fix JSON update with prepared statements (CASSANDRA-10631)
  * Don't do anticompaction after subrange repair (CASSANDRA-10422)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61e0251a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 2d58219..8fefae2 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -26,6 +26,7 @@ import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
+
 import javax.management.*;
 import javax.management.openmbean.*;
 
@@ -43,7 +44,6 @@ import org.apache.cassandra.io.FSWriteError;
 import org.json.simple.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.cache.*;
 import org.apache.cassandra.concurrent.*;
 import org.apache.cassandra.config.*;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61e0251a/src/java/org/apache/cassandra/db/index/IndexNotAvailableException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/IndexNotAvailableException.java b/src/java/org/apache/cassandra/db/index/IndexNotAvailableException.java
new file mode 100644
index 0000000..750e899
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/index/IndexNotAvailableException.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.index;
+
+/**
+ * Thrown if a secondary index is not currently available.
+ */
+public final class IndexNotAvailableException extends RuntimeException
+{
+    /**
+     * Creates a new <code>IndexNotAvailableException</code> for the specified index.
+     * @param name the index name
+     */
+    public IndexNotAvailableException(String name)
+    {
+        super(String.format("The secondary index '%s' is not yet available",
+                            removeTableNameIfNeeded(name)));
+    }
+
+    /**
+     * Extract the name of the index if necessary.
+     *
+     * @param name the index name prefixed by the tablename or not
+     * @return the index name
+     */
+    private static String removeTableNameIfNeeded(String name)
+    {
+        int index = name.indexOf('.');
+        if (index < 0)
+            return name;
+
+        return name.substring(index + 1);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61e0251a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
index 11626d6..cf2deeb 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
@@ -89,6 +89,12 @@ public abstract class SecondaryIndex
      */
     protected ColumnFamilyStore baseCfs;
 
+    // We need to keep track if the index is queryable or not to be sure that we can safely use it. If the index
+    // is still being build, using it will return incomplete results.
+    /**
+     * Specify if the index is queryable or not.
+     */
+    private volatile boolean queryable;
 
     /**
      * The column definitions which this index is responsible for
@@ -150,8 +156,18 @@ public abstract class SecondaryIndex
         return SystemKeyspace.isIndexBuilt(baseCfs.keyspace.getName(), getNameForSystemKeyspace(columnName));
     }
 
+    /**
+     * Checks if the index is ready.
+     * @return <code>true</code> if the index is ready, <code>false</code> otherwise
+     */
+    public boolean isQueryable()
+    {
+        return queryable;
+    }
+
     public void setIndexBuilt()
     {
+        queryable = true;
         for (ColumnDefinition columnDef : columnDefs)
             SystemKeyspace.setIndexBuilt(baseCfs.keyspace.getName(), getNameForSystemKeyspace(columnDef.name.bytes));
     }
@@ -229,7 +245,7 @@ public abstract class SecondaryIndex
      *
      * @return A future object which the caller can block on (optional)
      */
-    public Future<?> buildIndexAsync()
+    public final Future<?> buildIndexAsync()
     {
         // if we're just linking in the index to indexedColumns on an already-built index post-restart, we're done
         boolean allAreBuilt = true;
@@ -243,7 +259,17 @@ public abstract class SecondaryIndex
         }
 
         if (allAreBuilt)
+        {
+            queryable = true;
             return null;
+        }
+
+        // If the base table is empty we can directly mark the index as built.
+        if (baseCfs.isEmpty())
+        {
+            setIndexBuilt();
+            return null;
+        }
 
         // build it asynchronously; addIndex gets called by CFS open and schema update, neither of which
         // we want to block for a long period.  (actual build is serialized on CompactionManager.)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61e0251a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
index c4fd068..6df8616 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
@@ -314,11 +314,7 @@ public class SecondaryIndexManager
         // Add to all indexes set:
         indexesByName.put(index.getIndexName(), index);
 
-        // if we're just linking in the index to indexedColumns on an
-        // already-built index post-restart, we're done
-        if (index.isIndexBuilt(cdef.name.bytes))
-            return null;
-
+        // We do not need to check if the index is already build as buildIndexAsync will do it for us
         return index.buildIndexAsync();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61e0251a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
index 2f85e35..a67aa2b 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
@@ -40,6 +40,8 @@ import org.apache.cassandra.db.filter.ExtendedFilter;
 import org.apache.cassandra.db.filter.IDiskAtomFilter;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.SliceQueryFilter;
+import org.apache.cassandra.db.index.IndexNotAvailableException;
+import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.db.index.SecondaryIndexSearcher;
 import org.apache.cassandra.dht.AbstractBounds;
@@ -60,12 +62,15 @@ public class CompositesSearcher extends SecondaryIndexSearcher
     {
         assert filter.getClause() != null && !filter.getClause().isEmpty();
         final IndexExpression primary = highestSelectivityPredicate(filter.getClause(), true);
-        final CompositesIndex index = (CompositesIndex)indexManager.getIndexForColumn(primary.column);
+        final SecondaryIndex index = indexManager.getIndexForColumn(primary.column);
+        if (!index.isQueryable())
+            throw new IndexNotAvailableException(index.getIndexName());
+
         // TODO: this should perhaps not open and maintain a writeOp for the full duration, but instead only *try* to delete stale entries, without blocking if there's no room
         // as it stands, we open a writeOp and keep it open for the duration to ensure that should this CF get flushed to make room we don't block the reclamation of any room being made
         try (OpOrder.Group writeOp = baseCfs.keyspace.writeOrder.start(); OpOrder.Group baseOp = baseCfs.readOrdering.start(); OpOrder.Group indexOp = index.getIndexCfs().readOrdering.start())
         {
-            return baseCfs.filter(getIndexedIterator(writeOp, filter, primary, index), filter);
+            return baseCfs.filter(getIndexedIterator(writeOp, filter, primary, (CompositesIndex) index), filter);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61e0251a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
index 86ff122..2b07c41 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
@@ -26,7 +26,6 @@ import java.util.Set;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.composites.CellNameType;
@@ -55,6 +54,9 @@ public class KeysSearcher extends SecondaryIndexSearcher
         assert filter.getClause() != null && !filter.getClause().isEmpty();
         final IndexExpression primary = highestSelectivityPredicate(filter.getClause(), true);
         final SecondaryIndex index = indexManager.getIndexForColumn(primary.column);
+        if (!index.isQueryable())
+            throw new IndexNotAvailableException(index.getIndexName());
+
         // TODO: this should perhaps not open and maintain a writeOp for the full duration, but instead only *try* to delete stale entries, without blocking if there's no room
         // as it stands, we open a writeOp and keep it open for the duration to ensure that should this CF get flushed to make room we don't block the reclamation of any room  being made
         try (OpOrder.Group writeOp = baseCfs.keyspace.writeOrder.start(); OpOrder.Group baseOp = baseCfs.readOrdering.start(); OpOrder.Group indexOp = index.getIndexCfs().readOrdering.start())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61e0251a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
index a46366c..4211f5a 100644
--- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
+++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
@@ -24,6 +24,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
+import org.apache.cassandra.db.index.IndexNotAvailableException;
 import org.apache.cassandra.gms.Gossiper;
 
 public class MessageDeliveryTask implements Runnable
@@ -70,10 +71,10 @@ public class MessageDeliveryTask implements Runnable
             handleFailure(ioe);
             throw new RuntimeException(ioe);
         }
-        catch (TombstoneOverwhelmingException toe)
+        catch (TombstoneOverwhelmingException | IndexNotAvailableException e)
         {
-            handleFailure(toe);
-            logger.error(toe.getMessage());
+            handleFailure(e);
+            logger.error(e.getMessage());
         }
         catch (Throwable t)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61e0251a/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
index 0b812c6..b8f6b9f 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
@@ -17,23 +17,35 @@
  */
 package org.apache.cassandra.cql3.validation.entities;
 
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import java.nio.ByteBuffer;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
-
-import org.apache.commons.lang3.StringUtils;
-
-import org.junit.Test;
+import java.util.concurrent.CountDownLatch;
 
 import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.IndexExpression;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.index.IndexNotAvailableException;
+import org.apache.cassandra.db.index.PerRowSecondaryIndex;
+import org.apache.cassandra.db.index.SecondaryIndexSearcher;
+import org.apache.cassandra.db.index.composites.CompositesSearcher;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.utils.FBUtilities;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import org.apache.cassandra.utils.concurrent.OpOrder.Group;
+import org.apache.commons.lang3.StringUtils;
+import org.junit.Test;
 
 public class SecondaryIndexTest extends CQLTester
 {
@@ -563,7 +575,7 @@ public class SecondaryIndexTest extends CQLTester
     {
         createTable("CREATE TABLE %s(a int, b frozen<map<int, blob>>, PRIMARY KEY (a))");
         createIndex("CREATE INDEX ON %s(full(b))");
-        Map<Integer, ByteBuffer> map = new HashMap();
+        Map<Integer, ByteBuffer> map = new HashMap<>();
         map.put(0, ByteBuffer.allocate(1024 * 65));
         failInsert("INSERT INTO %s (a, b) VALUES (0, ?)", map);
     }
@@ -642,4 +654,135 @@ public class SecondaryIndexTest extends CQLTester
         assertInvalid("CREATE INDEX ON %s (c)");
     }
 
+    @Test
+    public void testIndexQueriesWithIndexNotReady() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, ck int, value int, PRIMARY KEY (pk, ck))");
+
+        for (int i = 0; i < 10; i++)
+            for (int j = 0; j < 10; j++)
+                execute("INSERT INTO %s (pk, ck, value) VALUES (?, ?, ?)", i, j, i + j);
+
+        createIndex("CREATE CUSTOM INDEX testIndex ON %s (value) USING '" + IndexBlockingOnInitialization.class.getName()
+                + "'");
+        try
+        {
+            execute("SELECT value FROM %s WHERE value = 2");
+            fail();
+        }
+        catch (IndexNotAvailableException e)
+        {
+            assertTrue(true);
+        }
+        finally
+        {
+            execute("DROP index " + KEYSPACE + ".testIndex");
+        }
+    }
+
+    /**
+     * Custom index used to test the behavior of the system when the index is not ready.
+     * As Custom indices cannot by <code>PerColumnSecondaryIndex</code> we use a <code>PerRowSecondaryIndex</code>
+     * to avoid the check but return a <code>CompositesSearcher</code>.
+     */
+    public static class IndexBlockingOnInitialization extends PerRowSecondaryIndex
+    {
+        private volatile CountDownLatch latch = new CountDownLatch(1);
+
+        @Override
+        public void index(ByteBuffer rowKey, ColumnFamily cf)
+        {
+            try
+            {
+                latch.await();
+            }
+            catch (InterruptedException e)
+            {
+                Thread.interrupted();
+            }
+        }
+
+        @Override
+        public void delete(DecoratedKey key, Group opGroup)
+        {
+        }
+
+        @Override
+        public void init()
+        {
+        }
+
+        @Override
+        public void reload()
+        {
+        }
+
+        @Override
+        public void validateOptions() throws ConfigurationException
+        {
+        }
+
+        @Override
+        public String getIndexName()
+        {
+            return "testIndex";
+        }
+
+        @Override
+        protected SecondaryIndexSearcher createSecondaryIndexSearcher(Set<ByteBuffer> columns)
+        {
+            return new CompositesSearcher(baseCfs.indexManager, columns)
+            {
+                @Override
+                public boolean canHandleIndexClause(List<IndexExpression> clause)
+                {
+                    return true;
+                }
+
+                @Override
+                public void validate(IndexExpression indexExpression) throws InvalidRequestException
+                {
+                }
+            };
+        }
+
+        @Override
+        public void forceBlockingFlush()
+        {
+        }
+
+        @Override
+        public ColumnFamilyStore getIndexCfs()
+        {
+            return baseCfs;
+        }
+
+        @Override
+        public void removeIndex(ByteBuffer columnName)
+        {
+            latch.countDown();
+        }
+
+        @Override
+        public void invalidate()
+        {
+        }
+
+        @Override
+        public void truncateBlocking(long truncatedAt)
+        {
+        }
+
+        @Override
+        public boolean indexes(CellName name)
+        {
+            return false;
+        }
+
+        @Override
+        public long estimateResultRows()
+        {
+            return 0;
+        }
+    }
 }