You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2016/09/20 18:07:00 UTC
cassandra git commit: Added new task to Index which runs before
joining
Repository: cassandra
Updated Branches:
refs/heads/trunk beaa2b1b9 -> 0cb5e8032
Added new task to Index which runs before joining
Patch by Sergio Bossa; reviewed by Sam Tunnicliffe for CASSANDRA-12039
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0cb5e803
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0cb5e803
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0cb5e803
Branch: refs/heads/trunk
Commit: 0cb5e8032a2e12831064ab6a9600c235c599a33d
Parents: beaa2b1
Author: Sergio Bossa <se...@gmail.com>
Authored: Fri Jun 24 18:09:11 2016 +0100
Committer: Sam Tunnicliffe <sa...@beobal.com>
Committed: Tue Sep 20 18:56:45 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
NEWS.txt | 2 +
src/java/org/apache/cassandra/index/Index.java | 11 ++++
.../cassandra/index/SecondaryIndexManager.java | 11 ++++
.../cassandra/service/StorageService.java | 32 +++++++++---
.../unit/org/apache/cassandra/SchemaLoader.java | 32 ++++++++++++
.../org/apache/cassandra/index/StubIndex.java | 9 ++++
.../cassandra/service/JoinTokenRingTest.java | 54 ++++++++++++++++++++
8 files changed, 145 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0cb5e803/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 74a2372..e847f8b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.10
+ * Add a blocking task to Index, run before joining the ring (CASSANDRA-12039)
* Fix NPE when using CQLSSTableWriter (CASSANDRA-12667)
* Support optional backpressure strategies at the coordinator (CASSANDRA-9318)
* Make randompartitioner work with new vnode allocation (CASSANDRA-12647)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0cb5e803/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 1b15f7d..708e839 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -18,6 +18,8 @@ using the provided 'sstableupgrade' tool.
New features
------------
+ - An Index implementation may now provide a task which runs prior to joining
+ the ring. See CASSANDRA-12039
- Filtering on partition key columns is now also supported for queries without
secondary indexes.
- A slow query log has been added: slow queries will be logged at DEBUG level.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0cb5e803/src/java/org/apache/cassandra/index/Index.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/Index.java b/src/java/org/apache/cassandra/index/Index.java
index 4ffef1e..e254555 100644
--- a/src/java/org/apache/cassandra/index/Index.java
+++ b/src/java/org/apache/cassandra/index/Index.java
@@ -253,6 +253,17 @@ public interface Index
public Callable<?> getTruncateTask(long truncatedAt);
/**
+ * Return a task to be executed before the node enters NORMAL state and finally joins the ring.
+ *
+ * @param hadBootstrap If the node had bootstrap before joining.
+ * @return task to be executed by the index manager before joining the ring.
+ */
+ default public Callable<?> getPreJoinTask(boolean hadBootstrap)
+ {
+ return null;
+ }
+
+ /**
* Return true if this index can be built or rebuilt when the index manager determines it is necessary. Returning
* false enables the index implementation (or some other component) to control if and when SSTable data is
* incorporated into the index.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0cb5e803/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index e06cab0..6e36511 100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@ -493,6 +493,17 @@ public class SecondaryIndexManager implements IndexRegistry
}
/**
+ * Performs a blocking execution of pre-join tasks of all indexes
+ */
+ public void executePreJoinTasksBlocking(boolean hadBootstrap)
+ {
+ logger.info("Executing pre-join{} tasks for: {}", hadBootstrap ? " post-bootstrap" : "", this.baseCfs);
+ executeAllBlocking(indexes.values().stream(), (index) -> {
+ return index.getPreJoinTask(hadBootstrap);
+ });
+ }
+
+ /**
* @return all indexes which are marked as built and ready to use
*/
public List<String> getBuiltIndexNames()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0cb5e803/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index b33ac0e..24b10ea 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -29,6 +29,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.MatchResult;
import java.util.regex.Pattern;
+import java.util.stream.StreamSupport;
+
import javax.annotation.Nullable;
import javax.management.*;
import javax.management.openmbean.TabularData;
@@ -38,7 +40,9 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.collect.*;
import com.google.common.util.concurrent.*;
+
import org.apache.commons.lang3.StringUtils;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -858,7 +862,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
boolean dataAvailable = true; // make this to false when bootstrap streaming failed
- if (shouldBootstrap())
+ boolean bootstrap = shouldBootstrap();
+ if (bootstrap)
{
if (SystemKeyspace.bootstrapInProgress())
logger.warn("Detected previous bootstrap failure; retrying");
@@ -1000,8 +1005,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
if (dataAvailable)
{
- finishJoiningRing();
-
+ finishJoiningRing(bootstrap);
// remove the existing info about the replaced node.
if (!current.isEmpty())
{
@@ -1034,7 +1038,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
Gossiper.instance.addLocalApplicationState(ApplicationState.RACK, StorageService.instance.valueFactory.rack(rack));
}
- public synchronized void joinRing() throws IOException
+ public void joinRing() throws IOException
+ {
+ SystemKeyspace.BootstrapState state = SystemKeyspace.getBootstrapState();
+ joinRing(state.equals(SystemKeyspace.BootstrapState.IN_PROGRESS));
+ }
+
+ private synchronized void joinRing(boolean resumedBootstrap) throws IOException
{
if (!joined)
{
@@ -1051,15 +1061,23 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
else if (isSurveyMode)
{
logger.info("Leaving write survey mode and joining ring at operator request");
- finishJoiningRing();
+ finishJoiningRing(resumedBootstrap);
isSurveyMode = false;
}
}
- private void finishJoiningRing()
+ private void executePreJoinTasks(boolean bootstrap)
+ {
+ StreamSupport.stream(ColumnFamilyStore.all().spliterator(), false)
+ .filter(cfs -> Schema.instance.getUserKeyspaces().contains(cfs.keyspace.getName()))
+ .forEach(cfs -> cfs.indexManager.executePreJoinTasksBlocking(bootstrap));
+ }
+
+ private void finishJoiningRing(boolean didBootstrap)
{
// start participating in the ring.
SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.COMPLETED);
+ executePreJoinTasks(didBootstrap);
setTokens(bootstrapTokens);
assert tokenMetadata.sortedTokens().size() > 0;
@@ -1506,7 +1524,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
try
{
progressSupport.progress("bootstrap", ProgressEvent.createNotification("Joining ring..."));
- joinRing();
+ joinRing(true);
}
catch (IOException ignore)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0cb5e803/test/unit/org/apache/cassandra/SchemaLoader.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java
index c178ee0..d9c322f 100644
--- a/test/unit/org/apache/cassandra/SchemaLoader.java
+++ b/test/unit/org/apache/cassandra/SchemaLoader.java
@@ -88,6 +88,7 @@ public class SchemaLoader
String ks4 = testName + "Keyspace4";
String ks5 = testName + "Keyspace5";
String ks6 = testName + "Keyspace6";
+ String ks7 = testName + "Keyspace7";
String ks_kcs = testName + "KeyCacheSpace";
String ks_rcs = testName + "RowCacheSpace";
String ks_ccs = testName + "CounterCacheSpace";
@@ -191,11 +192,17 @@ public class SchemaLoader
schema.add(KeyspaceMetadata.create(ks5,
KeyspaceParams.simple(2),
Tables.of(standardCFMD(ks5, "Standard1"))));
+
// Keyspace 6
schema.add(KeyspaceMetadata.create(ks6,
KeyspaceParams.simple(1),
Tables.of(keysIndexCFMD(ks6, "Indexed1", true))));
+ // Keyspace 7
+ schema.add(KeyspaceMetadata.create(ks7,
+ KeyspaceParams.simple(1),
+ Tables.of(customIndexCFMD(ks7, "Indexed1"))));
+
// KeyCacheSpace
schema.add(KeyspaceMetadata.create(ks_kcs,
KeyspaceParams.simple(1),
@@ -455,6 +462,7 @@ public class SchemaLoader
return cfm.compression(getCompressionParameters());
}
+
public static CFMetaData keysIndexCFMD(String ksName, String cfName, boolean withIndex) throws ConfigurationException
{
CFMetaData cfm = CFMetaData.Builder.createDense(ksName, cfName, false, false)
@@ -480,6 +488,30 @@ public class SchemaLoader
return cfm.compression(getCompressionParameters());
}
+ public static CFMetaData customIndexCFMD(String ksName, String cfName) throws ConfigurationException
+ {
+ CFMetaData cfm = CFMetaData.Builder.createDense(ksName, cfName, false, false)
+ .addPartitionKey("key", AsciiType.instance)
+ .addClusteringColumn("c1", AsciiType.instance)
+ .addRegularColumn("value", LongType.instance)
+ .build();
+
+ cfm.indexes(
+ cfm.getIndexes()
+ .with(IndexMetadata.fromIndexTargets(cfm,
+ Collections.singletonList(
+ new IndexTarget(new ColumnIdentifier("value", true),
+ IndexTarget.Type.VALUES)),
+ "value_index",
+ IndexMetadata.Kind.CUSTOM,
+ Collections.singletonMap(
+ IndexTarget.CUSTOM_INDEX_OPTION_NAME,
+ StubIndex.class.getName()))));
+
+
+ return cfm.compression(getCompressionParameters());
+ }
+
public static CFMetaData jdbcCFMD(String ksName, String cfName, AbstractType comp)
{
return CFMetaData.Builder.create(ksName, cfName).addPartitionKey("key", BytesType.instance)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0cb5e803/test/unit/org/apache/cassandra/index/StubIndex.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/StubIndex.java b/test/unit/org/apache/cassandra/index/StubIndex.java
index 0b7b32f..92efee5 100644
--- a/test/unit/org/apache/cassandra/index/StubIndex.java
+++ b/test/unit/org/apache/cassandra/index/StubIndex.java
@@ -51,6 +51,7 @@ public class StubIndex implements Index
public List<Row> rowsInserted = new ArrayList<>();
public List<Row> rowsDeleted = new ArrayList<>();
public List<Pair<Row,Row>> rowsUpdated = new ArrayList<>();
+ public volatile boolean preJoinInvocation;
private IndexMetadata indexMetadata;
private ColumnFamilyStore baseCfs;
@@ -171,6 +172,14 @@ public class StubIndex implements Index
return null;
}
+ public Callable<?> getPreJoinTask(boolean hadBootstrap)
+ {
+ return () -> {
+ preJoinInvocation = true;
+ return null;
+ };
+ }
+
public Callable<?> getInvalidateTask()
{
return null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0cb5e803/test/unit/org/apache/cassandra/service/JoinTokenRingTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/JoinTokenRingTest.java b/test/unit/org/apache/cassandra/service/JoinTokenRingTest.java
new file mode 100644
index 0000000..866910e
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/JoinTokenRingTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.index.SecondaryIndexManager;
+import org.apache.cassandra.index.StubIndex;
+
+public class JoinTokenRingTest
+{
+ @BeforeClass
+ public static void setup() throws ConfigurationException
+ {
+ DatabaseDescriptor.daemonInitialization();
+ SchemaLoader.startGossiper();
+ SchemaLoader.prepareServer();
+ SchemaLoader.schemaDefinition("JoinTokenRingTest");
+ }
+
+ @Test
+ public void testIndexPreJoinInvocation() throws IOException
+ {
+ StorageService ss = StorageService.instance;
+ ss.joinRing();
+
+ SecondaryIndexManager indexManager = ColumnFamilyStore.getIfExists("JoinTokenRingTestKeyspace7", "Indexed1").indexManager;
+ StubIndex stub = (StubIndex) indexManager.getIndexByName("value_index");
+ Assert.assertTrue(stub.preJoinInvocation);
+ }
+}