You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ad...@apache.org on 2017/12/01 14:03:18 UTC
[2/2] cassandra git commit: Parallelize initial materialized view
build
Parallelize initial materialized view build
patch by Andres de la Peña; reviewed by Paulo Motta for CASSANDRA-12245
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4c80eeec
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4c80eeec
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4c80eeec
Branch: refs/heads/trunk
Commit: 4c80eeece37d79f434078224a0504400ae10a20d
Parents: 88b244a
Author: Andrés de la Peña <a....@gmail.com>
Authored: Sun Jul 9 14:42:14 2017 +0100
Committer: Andrés de la Peña <a....@gmail.com>
Committed: Fri Dec 1 14:58:12 2017 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
NEWS.txt | 8 +
conf/cassandra.yaml | 3 +
doc/source/cql/mvs.rst | 5 +
doc/source/operating/metrics.rst | 1 +
.../org/apache/cassandra/config/Config.java | 1 +
.../cassandra/config/DatabaseDescriptor.java | 13 +
src/java/org/apache/cassandra/db/Keyspace.java | 2 +-
.../org/apache/cassandra/db/SystemKeyspace.java | 78 +++--
.../db/compaction/CompactionManager.java | 88 +++--
.../db/compaction/CompactionManagerMBean.java | 22 ++
src/java/org/apache/cassandra/db/view/View.java | 20 +-
.../apache/cassandra/db/view/ViewBuilder.java | 305 ++++++++--------
.../cassandra/db/view/ViewBuilderTask.java | 250 +++++++++++++
.../apache/cassandra/db/view/ViewManager.java | 15 +-
src/java/org/apache/cassandra/dht/Splitter.java | 129 +++++++
.../apache/cassandra/io/sstable/SSTable.java | 7 +
.../org/apache/cassandra/schema/Schema.java | 3 +-
.../cassandra/service/StorageService.java | 14 +-
.../cassandra/service/StorageServiceMBean.java | 4 +-
.../org/apache/cassandra/tools/NodeProbe.java | 10 +
.../org/apache/cassandra/tools/NodeTool.java | 2 +
.../nodetool/GetConcurrentViewBuilders.java | 33 ++
.../nodetool/SetConcurrentViewBuilders.java | 39 +++
.../org/apache/cassandra/cql3/ViewTest.java | 23 +-
.../cassandra/db/view/ViewBuilderTaskTest.java | 135 ++++++++
.../org/apache/cassandra/dht/SplitterTest.java | 347 ++++++++++++++++++-
27 files changed, 1315 insertions(+), 243 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 009dcb5..56458f8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0
+ * Parallelize initial materialized view build (CASSANDRA-12245)
* Fix flaky SecondaryIndexManagerTest.assert[Not]MarkedAsBuilt (CASSANDRA-13965)
* Make LWTs send resultset metadata on every request (CASSANDRA-13992)
* Fix flaky indexWithFailedInitializationIsNotQueryableAfterPartialRebuild (CASSANDRA-13963)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index de7d58a..510577e 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -30,6 +30,10 @@ New features
immediately upon creation via hard-linking the files. This means that incomplete
segments will be available in cdc_raw rather than fully flushed. See documentation
and CASSANDRA-12148 for more detail.
+ - The initial build of materialized views can be parallelized. The number of concurrent builder
+ threads is specified by the property `cassandra.yaml:concurrent_materialized_view_builders`.
+ This property can be modified at runtime through both JMX and the new `setconcurrentviewbuilders`
+ and `getconcurrentviewbuilders` nodetool commands. See CASSANDRA-12245 for more details.
Upgrading
---------
@@ -74,6 +78,10 @@ Upgrading
- Cassandra 4.0 allows a single port to be used for both secure and insecure
connections between cassandra nodes (CASSANDRA-10404). See the yaml for
specific property changes, and see the security doc for full details.
+ - Due to the parallelization of the initial build of materialized views,
+ the per token range view building status is stored in the new table
+ `system.view_builds_in_progress`. The old table `system.views_builds_in_progress`
+ is no longer used and can be removed. See CASSANDRA-12245 for more details.
Materialized Views
-------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index e41af17..7328a01 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -754,6 +754,9 @@ column_index_cache_size_in_kb: 2
# Values less than one are interpreted as unbounded (the default)
# concurrent_validations: 0
+# Number of simultaneous materialized view builder tasks to allow.
+concurrent_materialized_view_builders: 1
+
# Throttles compaction to the given total throughput across the entire
# system. The faster you insert data, the faster you need to compact in
# order to keep the sstable count down, but in general, setting this to
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/doc/source/cql/mvs.rst
----------------------------------------------------------------------
diff --git a/doc/source/cql/mvs.rst b/doc/source/cql/mvs.rst
index 55ede22..200090a 100644
--- a/doc/source/cql/mvs.rst
+++ b/doc/source/cql/mvs.rst
@@ -62,6 +62,11 @@ Creating a materialized view has 3 main parts:
Attempting to create an already existing materialized view will return an error unless the ``IF NOT EXISTS`` option is
used. If it is used, the statement will be a no-op if the materialized view already exists.
+.. note:: By default, materialized views are built in a single thread. The initial build can be parallelized by
+ increasing the number of threads specified by the property ``concurrent_materialized_view_builders`` in
+ ``cassandra.yaml``. This property can also be manipulated at runtime through both JMX and the
+ ``setconcurrentviewbuilders`` and ``getconcurrentviewbuilders`` nodetool commands.
+
.. _mv-select:
MV select statement
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/doc/source/operating/metrics.rst
----------------------------------------------------------------------
diff --git a/doc/source/operating/metrics.rst b/doc/source/operating/metrics.rst
index 6559b53..2df1cf8 100644
--- a/doc/source/operating/metrics.rst
+++ b/doc/source/operating/metrics.rst
@@ -227,6 +227,7 @@ PerDiskMemtableFlushWriter_0 internal Responsible for writing a spec (ther
Sampler internal Responsible for re-sampling the index summaries of SStables
SecondaryIndexManagement internal Performs updates to secondary indexes
ValidationExecutor internal Performs validation compaction or scrubbing
+ViewBuildExecutor internal Performs materialized views initial build
============================ ============== ===========
.. |nbsp| unicode:: 0xA0 .. nonbreaking space
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index de193b0..f63d94d 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -166,6 +166,7 @@ public class Config
public int min_free_space_per_drive_in_mb = 50;
public volatile int concurrent_validations = Integer.MAX_VALUE;
+ public volatile int concurrent_materialized_view_builders = 1;
/**
* @deprecated retry support removed on CASSANDRA-10992
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index af1cbde..58c0bf4 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -589,6 +589,9 @@ public class DatabaseDescriptor
if (conf.concurrent_compactors <= 0)
throw new ConfigurationException("concurrent_compactors should be strictly greater than 0, but was " + conf.concurrent_compactors, false);
+ if (conf.concurrent_materialized_view_builders <= 0)
+ throw new ConfigurationException("concurrent_materialized_view_builders should be strictly greater than 0, but was " + conf.concurrent_materialized_view_builders, false);
+
if (conf.num_tokens > MAX_NUM_TOKENS)
throw new ConfigurationException(String.format("A maximum number of %d tokens per node is supported", MAX_NUM_TOKENS), false);
@@ -1516,6 +1519,16 @@ public class DatabaseDescriptor
conf.concurrent_validations = value;
}
+ public static int getConcurrentViewBuilders()
+ {
+ return conf.concurrent_materialized_view_builders;
+ }
+
+ public static void setConcurrentViewBuilders(int value)
+ {
+ conf.concurrent_materialized_view_builders = value;
+ }
+
public static long getMinFreeSpacePerDriveInBytes()
{
return conf.min_free_space_per_drive_in_mb * 1024L * 1024L;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index d814ac7..c3e649a 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -332,7 +332,7 @@ public class Keyspace
logger.trace("Initializing {}.{}", getName(), cfm.name);
initCf(Schema.instance.getTableMetadataRef(cfm.id), loadSSTables);
}
- this.viewManager.reload();
+ this.viewManager.reload(false);
}
private Keyspace(KeyspaceMetadata metadata)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 2ffae11..9da0f6b 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -99,7 +99,7 @@ public final class SystemKeyspace
public static final String SIZE_ESTIMATES = "size_estimates";
public static final String AVAILABLE_RANGES = "available_ranges";
public static final String TRANSFERRED_RANGES = "transferred_ranges";
- public static final String VIEWS_BUILDS_IN_PROGRESS = "views_builds_in_progress";
+ public static final String VIEW_BUILDS_IN_PROGRESS = "view_builds_in_progress";
public static final String BUILT_VIEWS = "built_views";
public static final String PREPARED_STATEMENTS = "prepared_statements";
public static final String REPAIRS = "repairs";
@@ -262,15 +262,17 @@ public final class SystemKeyspace
+ "PRIMARY KEY ((operation, keyspace_name), peer))")
.build();
- private static final TableMetadata ViewsBuildsInProgress =
- parse(VIEWS_BUILDS_IN_PROGRESS,
+ private static final TableMetadata ViewBuildsInProgress =
+ parse(VIEW_BUILDS_IN_PROGRESS,
"views builds current progress",
"CREATE TABLE %s ("
+ "keyspace_name text,"
+ "view_name text,"
+ + "start_token varchar,"
+ + "end_token varchar,"
+ "last_token varchar,"
- + "generation_number int,"
- + "PRIMARY KEY ((keyspace_name), view_name))")
+ + "keys_built bigint,"
+ + "PRIMARY KEY ((keyspace_name), view_name, start_token, end_token))")
.build();
private static final TableMetadata BuiltViews =
@@ -337,7 +339,7 @@ public final class SystemKeyspace
SizeEstimates,
AvailableRanges,
TransferredRanges,
- ViewsBuildsInProgress,
+ ViewBuildsInProgress,
BuiltViews,
PreparedStatements,
Repairs);
@@ -457,23 +459,15 @@ public final class SystemKeyspace
public static void setViewRemoved(String keyspaceName, String viewName)
{
- String buildReq = "DELETE FROM %S.%s WHERE keyspace_name = ? AND view_name = ? IF EXISTS";
- executeInternal(String.format(buildReq, SchemaConstants.SYSTEM_KEYSPACE_NAME, VIEWS_BUILDS_IN_PROGRESS), keyspaceName, viewName);
- forceBlockingFlush(VIEWS_BUILDS_IN_PROGRESS);
+ String buildReq = "DELETE FROM %s.%s WHERE keyspace_name = ? AND view_name = ?";
+ executeInternal(String.format(buildReq, SchemaConstants.SYSTEM_KEYSPACE_NAME, VIEW_BUILDS_IN_PROGRESS), keyspaceName, viewName);
+ forceBlockingFlush(VIEW_BUILDS_IN_PROGRESS);
String builtReq = "DELETE FROM %s.\"%s\" WHERE keyspace_name = ? AND view_name = ? IF EXISTS";
executeInternal(String.format(builtReq, SchemaConstants.SYSTEM_KEYSPACE_NAME, BUILT_VIEWS), keyspaceName, viewName);
forceBlockingFlush(BUILT_VIEWS);
}
- public static void beginViewBuild(String ksname, String viewName, int generationNumber)
- {
- executeInternal(format("INSERT INTO system.%s (keyspace_name, view_name, generation_number) VALUES (?, ?, ?)", VIEWS_BUILDS_IN_PROGRESS),
- ksname,
- viewName,
- generationNumber);
- }
-
public static void finishViewBuildStatus(String ksname, String viewName)
{
// We flush the view built first, because if we fail now, we'll restart at the last place we checkpointed
@@ -482,8 +476,8 @@ public final class SystemKeyspace
// Also, if writing to the built_view succeeds, but the view_builds_in_progress deletion fails, we will be able
// to skip the view build next boot.
setViewBuilt(ksname, viewName, false);
- executeInternal(String.format("DELETE FROM system.%s WHERE keyspace_name = ? AND view_name = ? IF EXISTS", VIEWS_BUILDS_IN_PROGRESS), ksname, viewName);
- forceBlockingFlush(VIEWS_BUILDS_IN_PROGRESS);
+ executeInternal(String.format("DELETE FROM system.%s WHERE keyspace_name = ? AND view_name = ?", VIEW_BUILDS_IN_PROGRESS), ksname, viewName);
+ forceBlockingFlush(VIEW_BUILDS_IN_PROGRESS);
}
public static void setViewBuiltReplicated(String ksname, String viewName)
@@ -491,33 +485,41 @@ public final class SystemKeyspace
setViewBuilt(ksname, viewName, true);
}
- public static void updateViewBuildStatus(String ksname, String viewName, Token token)
+ public static void updateViewBuildStatus(String ksname, String viewName, Range<Token> range, Token lastToken, long keysBuilt)
{
- String req = "INSERT INTO system.%s (keyspace_name, view_name, last_token) VALUES (?, ?, ?)";
- Token.TokenFactory factory = ViewsBuildsInProgress.partitioner.getTokenFactory();
- executeInternal(format(req, VIEWS_BUILDS_IN_PROGRESS), ksname, viewName, factory.toString(token));
+ String req = "INSERT INTO system.%s (keyspace_name, view_name, start_token, end_token, last_token, keys_built) VALUES (?, ?, ?, ?, ?, ?)";
+ Token.TokenFactory factory = ViewBuildsInProgress.partitioner.getTokenFactory();
+ executeInternal(format(req, VIEW_BUILDS_IN_PROGRESS),
+ ksname,
+ viewName,
+ factory.toString(range.left),
+ factory.toString(range.right),
+ factory.toString(lastToken),
+ keysBuilt);
}
- public static Pair<Integer, Token> getViewBuildStatus(String ksname, String viewName)
+ public static Map<Range<Token>, Pair<Token, Long>> getViewBuildStatus(String ksname, String viewName)
{
- String req = "SELECT generation_number, last_token FROM system.%s WHERE keyspace_name = ? AND view_name = ?";
- UntypedResultSet queryResultSet = executeInternal(format(req, VIEWS_BUILDS_IN_PROGRESS), ksname, viewName);
- if (queryResultSet == null || queryResultSet.isEmpty())
- return null;
+ String req = "SELECT start_token, end_token, last_token, keys_built FROM system.%s WHERE keyspace_name = ? AND view_name = ?";
+ Token.TokenFactory factory = ViewBuildsInProgress.partitioner.getTokenFactory();
+ UntypedResultSet rs = executeInternal(format(req, VIEW_BUILDS_IN_PROGRESS), ksname, viewName);
- UntypedResultSet.Row row = queryResultSet.one();
+ if (rs == null || rs.isEmpty())
+ return Collections.emptyMap();
- Integer generation = null;
- Token lastKey = null;
- if (row.has("generation_number"))
- generation = row.getInt("generation_number");
- if (row.has("last_key"))
+ Map<Range<Token>, Pair<Token, Long>> status = new HashMap<>();
+ for (UntypedResultSet.Row row : rs)
{
- Token.TokenFactory factory = ViewsBuildsInProgress.partitioner.getTokenFactory();
- lastKey = factory.fromString(row.getString("last_key"));
- }
+ Token start = factory.fromString(row.getString("start_token"));
+ Token end = factory.fromString(row.getString("end_token"));
+ Range<Token> range = new Range<>(start, end);
+
+ Token lastToken = row.has("last_token") ? factory.fromString(row.getString("last_token")) : null;
+ long keysBuilt = row.has("keys_built") ? row.getLong("keys_built") : 0;
- return Pair.create(generation, lastKey);
+ status.put(range, Pair.create(lastToken, keysBuilt));
+ }
+ return status;
}
public static synchronized void saveTruncationRecord(ColumnFamilyStore cfs, long truncatedAt, CommitLogPosition position)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 3ff9c24..a615c03 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -51,7 +51,7 @@ import org.apache.cassandra.db.lifecycle.SSTableIntervalTree;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.db.lifecycle.View;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
-import org.apache.cassandra.db.view.ViewBuilder;
+import org.apache.cassandra.db.view.ViewBuilderTask;
import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
@@ -124,8 +124,9 @@ public class CompactionManager implements CompactionManagerMBean
private final CompactionExecutor executor = new CompactionExecutor();
private final CompactionExecutor validationExecutor = new ValidationExecutor();
private final static CompactionExecutor cacheCleanupExecutor = new CacheCleanupExecutor();
+ private final CompactionExecutor viewBuildExecutor = new ViewBuildExecutor();
- private final CompactionMetrics metrics = new CompactionMetrics(executor, validationExecutor);
+ private final CompactionMetrics metrics = new CompactionMetrics(executor, validationExecutor, viewBuildExecutor);
private final Multiset<ColumnFamilyStore> compactingCF = ConcurrentHashMultiset.create();
private final RateLimiter compactionRateLimiter = RateLimiter.create(Double.MAX_VALUE);
@@ -216,6 +217,7 @@ public class CompactionManager implements CompactionManagerMBean
// shutdown executors to prevent further submission
executor.shutdown();
validationExecutor.shutdown();
+ viewBuildExecutor.shutdown();
// interrupt compactions and validations
for (Holder compactionHolder : CompactionMetrics.getCompactions())
@@ -226,7 +228,7 @@ public class CompactionManager implements CompactionManagerMBean
// wait for tasks to terminate
// compaction tasks are interrupted above, so it shuold be fairy quick
// until not interrupted tasks to complete.
- for (ExecutorService exec : Arrays.asList(executor, validationExecutor))
+ for (ExecutorService exec : Arrays.asList(executor, validationExecutor, viewBuildExecutor))
{
try
{
@@ -1718,31 +1720,21 @@ public class CompactionManager implements CompactionManagerMBean
}
}
- public Future<?> submitViewBuilder(final ViewBuilder builder)
+ public ListenableFuture<Long> submitViewBuilder(final ViewBuilderTask task)
{
- Runnable runnable = new Runnable()
- {
- public void run()
+ return viewBuildExecutor.submitIfRunning(() -> {
+ metrics.beginCompaction(task);
+ try
{
- metrics.beginCompaction(builder);
- try
- {
- builder.run();
- }
- finally
- {
- metrics.finishCompaction(builder);
- }
+ return task.call();
}
- };
- if (executor.isShutdown())
- {
- logger.info("Compaction executor has shut down, not submitting index build");
- return null;
- }
-
- return executor.submit(runnable);
+ finally
+ {
+ metrics.finishCompaction(task);
+ }
+ }, "view build");
}
+
public int getActiveCompactions()
{
return CompactionMetrics.getCompactions().size();
@@ -1817,7 +1809,7 @@ public class CompactionManager implements CompactionManagerMBean
* @return the future that will deliver the task result, or a future that has already been
* cancelled if the task could not be submitted.
*/
- public ListenableFuture<?> submitIfRunning(Callable<?> task, String name)
+ public <T> ListenableFuture<T> submitIfRunning(Callable<T> task, String name)
{
if (isShutdown())
{
@@ -1827,7 +1819,7 @@ public class CompactionManager implements CompactionManagerMBean
try
{
- ListenableFutureTask ret = ListenableFutureTask.create(task);
+ ListenableFutureTask<T> ret = ListenableFutureTask.create(task);
execute(ret);
return ret;
}
@@ -1851,6 +1843,14 @@ public class CompactionManager implements CompactionManagerMBean
}
}
+ private static class ViewBuildExecutor extends CompactionExecutor
+ {
+ public ViewBuildExecutor()
+ {
+ super(DatabaseDescriptor.getConcurrentViewBuilders(), "ViewBuildExecutor");
+ }
+ }
+
private static class CacheCleanupExecutor extends CompactionExecutor
{
public CacheCleanupExecutor()
@@ -1974,6 +1974,22 @@ public class CompactionManager implements CompactionManagerMBean
validationExecutor.setMaximumPoolSize(value);
}
+ public void setConcurrentViewBuilders(int value)
+ {
+ if (value > viewBuildExecutor.getCorePoolSize())
+ {
+ // we are increasing the value
+ viewBuildExecutor.setMaximumPoolSize(value);
+ viewBuildExecutor.setCorePoolSize(value);
+ }
+ else if (value < viewBuildExecutor.getCorePoolSize())
+ {
+ // we are reducing the value
+ viewBuildExecutor.setCorePoolSize(value);
+ viewBuildExecutor.setMaximumPoolSize(value);
+ }
+ }
+
public int getCoreCompactorThreads()
{
return executor.getCorePoolSize();
@@ -2014,6 +2030,26 @@ public class CompactionManager implements CompactionManagerMBean
validationExecutor.setMaximumPoolSize(number);
}
+ public int getCoreViewBuildThreads()
+ {
+ return viewBuildExecutor.getCorePoolSize();
+ }
+
+ public void setCoreViewBuildThreads(int number)
+ {
+ viewBuildExecutor.setCorePoolSize(number);
+ }
+
+ public int getMaximumViewBuildThreads()
+ {
+ return viewBuildExecutor.getMaximumPoolSize();
+ }
+
+ public void setMaximumViewBuildThreads(int number)
+ {
+ viewBuildExecutor.setMaximumPoolSize(number);
+ }
+
/**
* Try to stop all of the compactions for given ColumnFamilies.
*
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java b/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
index 8785b41..b98b371 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
@@ -116,4 +116,26 @@ public interface CompactionManagerMBean
* @param number New maximum of validator threads
*/
public void setMaximumValidatorThreads(int number);
+
+ /**
+ * Returns core size of view build thread pool
+ */
+ public int getCoreViewBuildThreads();
+
+ /**
+ * Allows user to resize maximum size of the view build thread pool.
+ * @param number New maximum of view build threads
+ */
+ public void setCoreViewBuildThreads(int number);
+
+ /**
+ * Returns size of view build thread pool
+ */
+ public int getMaximumViewBuildThreads();
+
+ /**
+ * Allows user to resize maximum size of the view build thread pool.
+ * @param number New maximum of view build threads
+ */
+ public void setMaximumViewBuildThreads(int number);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/db/view/View.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/View.java b/src/java/org/apache/cassandra/db/view/View.java
index f601673..f6545b0 100644
--- a/src/java/org/apache/cassandra/db/view/View.java
+++ b/src/java/org/apache/cassandra/db/view/View.java
@@ -28,7 +28,6 @@ import org.apache.cassandra.cql3.*;
import org.apache.cassandra.cql3.statements.ParsedStatement;
import org.apache.cassandra.cql3.statements.SelectStatement;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.KeyspaceMetadata;
@@ -192,15 +191,22 @@ public class View
public synchronized void build()
{
- if (this.builder != null)
+ stopBuild();
+ builder = new ViewBuilder(baseCfs, this);
+ builder.start();
+ }
+
+ /**
+ * Stops the building of this view, no-op if it isn't building.
+ */
+ synchronized void stopBuild()
+ {
+ if (builder != null)
{
logger.debug("Stopping current view builder due to schema change");
- this.builder.stop();
- this.builder = null;
+ builder.stop();
+ builder = null;
}
-
- this.builder = new ViewBuilder(baseCfs, this);
- CompactionManager.instance.submitViewBuilder(builder);
}
@Nullable
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/db/view/ViewBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilder.java b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
index fcb1e98..8187a57 100644
--- a/src/java/org/apache/cassandra/db/view/ViewBuilder.java
+++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
@@ -18,217 +18,224 @@
package org.apache.cassandra.db.view;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import javax.annotation.Nullable;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.ScheduledExecutors;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.compaction.CompactionInfo;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.compaction.CompactionInterruptedException;
import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.compaction.OperationType;
-import org.apache.cassandra.db.lifecycle.SSTableSet;
-import org.apache.cassandra.db.partitions.*;
-import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.sstable.ReducingKeyIterator;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.repair.SystemDistributedKeyspace;
-import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.UUIDGen;
-import org.apache.cassandra.utils.concurrent.Refs;
-public class ViewBuilder extends CompactionInfo.Holder
+import static java.util.stream.Collectors.toList;
+
+/**
+ * Builds a materialized view for the local token ranges.
+ * <p>
+ * The build is split in at least {@link #NUM_TASKS} {@link ViewBuilderTask tasks}, suitable of being parallelized by
+ * the {@link CompactionManager} which will execute them.
+ */
+class ViewBuilder
{
- private final ColumnFamilyStore baseCfs;
- private final View view;
- private final UUID compactionId;
- private volatile Token prevToken = null;
+ private static final Logger logger = LoggerFactory.getLogger(ViewBuilderTask.class);
- private static final Logger logger = LoggerFactory.getLogger(ViewBuilder.class);
+ private static final int NUM_TASKS = Runtime.getRuntime().availableProcessors() * 4;
+ private final ColumnFamilyStore baseCfs;
+ private final View view;
+ private final String ksName;
+ private final UUID localHostId = SystemKeyspace.getLocalHostId();
+ private final Set<Range<Token>> builtRanges = Sets.newConcurrentHashSet();
+ private final Map<Range<Token>, Pair<Token, Long>> pendingRanges = Maps.newConcurrentMap();
+ private final Set<ViewBuilderTask> tasks = Sets.newConcurrentHashSet();
+ private volatile long keysBuilt = 0;
private volatile boolean isStopped = false;
+ private volatile Future<?> future = Futures.immediateFuture(null);
- public ViewBuilder(ColumnFamilyStore baseCfs, View view)
+ ViewBuilder(ColumnFamilyStore baseCfs, View view)
{
this.baseCfs = baseCfs;
this.view = view;
- compactionId = UUIDGen.getTimeUUID();
+ ksName = baseCfs.metadata.keyspace;
}
- private void buildKey(DecoratedKey key)
+ public void start()
{
- ReadQuery selectQuery = view.getReadQuery();
-
- if (!selectQuery.selectsKey(key))
+ if (SystemKeyspace.isViewBuilt(ksName, view.name))
{
- logger.trace("Skipping {}, view query filters", key);
- return;
+ logger.debug("View already marked built for {}.{}", ksName, view.name);
+ if (!SystemKeyspace.isViewStatusReplicated(ksName, view.name))
+ updateDistributed();
}
-
- int nowInSec = FBUtilities.nowInSeconds();
- SinglePartitionReadCommand command = view.getSelectStatement().internalReadForView(key, nowInSec);
-
- // We're rebuilding everything from what's on disk, so we read everything, consider that as new updates
- // and pretend that there is nothing pre-existing.
- UnfilteredRowIterator empty = UnfilteredRowIterators.noRowsIterator(baseCfs.metadata(), key, Rows.EMPTY_STATIC_ROW, DeletionTime.LIVE, false);
-
- try (ReadExecutionController orderGroup = command.executionController();
- UnfilteredRowIterator data = UnfilteredPartitionIterators.getOnlyElement(command.executeLocally(orderGroup), command))
+ else
{
- Iterator<Collection<Mutation>> mutations = baseCfs.keyspace.viewManager
- .forTable(baseCfs.metadata.id)
- .generateViewUpdates(Collections.singleton(view), data, empty, nowInSec, true);
+ SystemDistributedKeyspace.startViewBuild(ksName, view.name, localHostId);
+
+ logger.debug("Starting build of view({}.{}). Flushing base table {}.{}",
+ ksName, view.name, ksName, baseCfs.name);
+ baseCfs.forceBlockingFlush();
- AtomicLong noBase = new AtomicLong(Long.MAX_VALUE);
- mutations.forEachRemaining(m -> StorageProxy.mutateMV(key.getKey(), m, true, noBase, System.nanoTime()));
+ loadStatusAndBuild();
}
}
- public void run()
+ private void loadStatusAndBuild()
+ {
+ loadStatus();
+ build();
+ }
+
+ private void loadStatus()
{
- logger.debug("Starting view builder for {}.{}", baseCfs.metadata.keyspace, view.name);
- UUID localHostId = SystemKeyspace.getLocalHostId();
- String ksname = baseCfs.metadata.keyspace, viewName = view.name;
+ builtRanges.clear();
+ pendingRanges.clear();
+ SystemKeyspace.getViewBuildStatus(ksName, view.name)
+ .forEach((range, pair) ->
+ {
+ Token lastToken = pair.left;
+ if (lastToken != null && lastToken.equals(range.right))
+ {
+ builtRanges.add(range);
+ keysBuilt += pair.right;
+ }
+ else
+ {
+ pendingRanges.put(range, pair);
+ }
+ });
+ }
- if (SystemKeyspace.isViewBuilt(ksname, viewName))
+ private synchronized void build()
+ {
+ if (isStopped)
{
- logger.debug("View already marked built for {}.{}", baseCfs.metadata.keyspace, view.name);
- if (!SystemKeyspace.isViewStatusReplicated(ksname, viewName))
- updateDistributed(ksname, viewName, localHostId);
+ logger.debug("Stopped build for view({}.{}) after covering {} keys", ksName, view.name, keysBuilt);
return;
}
- Iterable<Range<Token>> ranges = StorageService.instance.getLocalRanges(baseCfs.metadata.keyspace);
- final Pair<Integer, Token> buildStatus = SystemKeyspace.getViewBuildStatus(ksname, viewName);
- Token lastToken;
- Function<org.apache.cassandra.db.lifecycle.View, Iterable<SSTableReader>> function;
- if (buildStatus == null)
- {
- logger.debug("Starting new view build. flushing base table {}.{}", baseCfs.metadata.keyspace, baseCfs.name);
- lastToken = null;
-
- //We don't track the generation number anymore since if a rebuild is stopped and
- //restarted the max generation filter may yield no sstables due to compactions.
- //We only care about max generation *during* a build, not across builds.
- //see CASSANDRA-13405
- SystemKeyspace.beginViewBuild(ksname, viewName, 0);
- }
- else
+ // Get the local ranges for which the view hasn't already been built nor it's building
+ Set<Range<Token>> newRanges = StorageService.instance.getLocalRanges(ksName)
+ .stream()
+ .map(r -> r.subtractAll(builtRanges))
+ .flatMap(Set::stream)
+ .map(r -> r.subtractAll(pendingRanges.keySet()))
+ .flatMap(Set::stream)
+ .collect(Collectors.toSet());
+
+ // If there are no new nor pending ranges we should finish the build
+ if (newRanges.isEmpty() && pendingRanges.isEmpty())
{
- lastToken = buildStatus.right;
- logger.debug("Resuming view build from token {}. flushing base table {}.{}", lastToken, baseCfs.metadata.keyspace, baseCfs.name);
+ finish();
+ return;
}
- baseCfs.forceBlockingFlush();
- function = org.apache.cassandra.db.lifecycle.View.selectFunction(SSTableSet.CANONICAL);
-
- prevToken = lastToken;
- long keysBuilt = 0;
- try (Refs<SSTableReader> sstables = baseCfs.selectAndReference(function).refs;
- ReducingKeyIterator iter = new ReducingKeyIterator(sstables))
+ // Split the new local ranges and add them to the pending set
+ DatabaseDescriptor.getPartitioner()
+ .splitter()
+ .map(s -> s.split(newRanges, NUM_TASKS))
+ .orElse(newRanges)
+ .forEach(r -> pendingRanges.put(r, Pair.<Token, Long>create(null, 0L)));
+
+ // Submit a new view build task for each building range.
+ // We keep record of all the submitted tasks to be able of stopping them.
+ List<ListenableFuture<Long>> futures = pendingRanges.entrySet()
+ .stream()
+ .map(e -> new ViewBuilderTask(baseCfs,
+ view,
+ e.getKey(),
+ e.getValue().left,
+ e.getValue().right))
+ .peek(tasks::add)
+ .map(CompactionManager.instance::submitViewBuilder)
+ .collect(toList());
+
+ // Add a callback to process any eventual new local range and mark the view as built, doing a delayed retry if
+ // the tasks don't succeed
+ ListenableFuture<List<Long>> future = Futures.allAsList(futures);
+ Futures.addCallback(future, new FutureCallback<List<Long>>()
{
- SystemDistributedKeyspace.startViewBuild(ksname, viewName, localHostId);
- while (!isStopped && iter.hasNext())
+ public void onSuccess(List<Long> result)
{
- DecoratedKey key = iter.next();
- Token token = key.getToken();
- if (lastToken == null || lastToken.compareTo(token) < 0)
- {
- for (Range<Token> range : ranges)
- {
- if (range.contains(token))
- {
- buildKey(key);
- ++keysBuilt;
-
- if (prevToken == null || prevToken.compareTo(token) != 0)
- {
- SystemKeyspace.updateViewBuildStatus(ksname, viewName, key.getToken());
- prevToken = token;
- }
- }
- }
-
- lastToken = null;
- }
+ keysBuilt += result.stream().mapToLong(x -> x).sum();
+ builtRanges.addAll(pendingRanges.keySet());
+ pendingRanges.clear();
+ build();
}
- if (!isStopped)
+ public void onFailure(Throwable t)
{
- logger.debug("Marking view({}.{}) as built covered {} keys ", ksname, viewName, keysBuilt);
- SystemKeyspace.finishViewBuildStatus(ksname, viewName);
- updateDistributed(ksname, viewName, localHostId);
- }
- else
- {
- logger.debug("Stopped build for view({}.{}) after covering {} keys", ksname, viewName, keysBuilt);
+ if (t instanceof CompactionInterruptedException)
+ {
+ internalStop(true);
+ keysBuilt = tasks.stream().mapToLong(ViewBuilderTask::keysBuilt).sum();
+ logger.info("Interrupted build for view({}.{}) after covering {} keys", ksName, view.name, keysBuilt);
+ }
+ else
+ {
+ ScheduledExecutors.nonPeriodicTasks.schedule(() -> loadStatusAndBuild(), 5, TimeUnit.MINUTES);
+ logger.warn("Materialized View failed to complete, sleeping 5 minutes before restarting", t);
+ }
}
- }
- catch (Exception e)
- {
- ScheduledExecutors.nonPeriodicTasks.schedule(() -> CompactionManager.instance.submitViewBuilder(this),
- 5,
- TimeUnit.MINUTES);
- logger.warn("Materialized View failed to complete, sleeping 5 minutes before restarting", e);
- }
+ }, MoreExecutors.directExecutor());
+ this.future = future;
+ }
+
+ private void finish()
+ {
+ logger.debug("Marking view({}.{}) as built after covering {} keys ", ksName, view.name, keysBuilt);
+ SystemKeyspace.finishViewBuildStatus(ksName, view.name);
+ updateDistributed();
}
- private void updateDistributed(String ksname, String viewName, UUID localHostId)
+ private void updateDistributed()
{
try
{
- SystemDistributedKeyspace.successfulViewBuild(ksname, viewName, localHostId);
- SystemKeyspace.setViewBuiltReplicated(ksname, viewName);
+ SystemDistributedKeyspace.successfulViewBuild(ksName, view.name, localHostId);
+ SystemKeyspace.setViewBuiltReplicated(ksName, view.name);
}
catch (Exception e)
{
- ScheduledExecutors.nonPeriodicTasks.schedule(() -> CompactionManager.instance.submitViewBuilder(this),
- 5,
- TimeUnit.MINUTES);
- logger.warn("Failed to updated the distributed status of view, sleeping 5 minutes before retrying", e);
+ ScheduledExecutors.nonPeriodicTasks.schedule(this::updateDistributed, 5, TimeUnit.MINUTES);
+ logger.warn("Failed to update the distributed status of view, sleeping 5 minutes before retrying", e);
}
}
- public CompactionInfo getCompactionInfo()
+ /**
+ * Stops the view building.
+ */
+ synchronized void stop()
{
- long rangesLeft = 0, rangesTotal = 0;
- Token lastToken = prevToken;
-
- // This approximation is not very accurate, but since we do not have a method which allows us to calculate the
- // percentage of a range covered by a second range, this is the best approximation that we can calculate.
- // Instead, we just count the total number of ranges that haven't been seen by the node (we use the order of
- // the tokens to determine whether they have been seen yet or not), and the total number of ranges that a node
- // has.
- for (Range<Token> range : StorageService.instance.getLocalRanges(baseCfs.keyspace.getName()))
- {
- rangesLeft++;
- rangesTotal++;
- // This will reset rangesLeft, so that the number of ranges left will be less than the total ranges at the
- // end of the method.
- if (lastToken == null || range.contains(lastToken))
- rangesLeft = 0;
- }
-
- return new CompactionInfo(baseCfs.metadata(), OperationType.VIEW_BUILD, rangesLeft, rangesTotal, "ranges", compactionId);
+ boolean wasStopped = isStopped;
+ internalStop(false);
+ if (!wasStopped)
+ FBUtilities.waitOnFuture(future);
}
- public void stop()
+ private void internalStop(boolean isCompactionInterrupted)
{
isStopped = true;
+ tasks.forEach(task -> task.stop(isCompactionInterrupted));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/db/view/ViewBuilderTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilderTask.java b/src/java/org/apache/cassandra/db/view/ViewBuilderTask.java
new file mode 100644
index 0000000..0273c17
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/view/ViewBuilderTask.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.view;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Function;
+import com.google.common.base.Objects;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+import com.google.common.util.concurrent.Futures;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.ReadExecutionController;
+import org.apache.cassandra.db.ReadQuery;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.compaction.CompactionInfo;
+import org.apache.cassandra.db.compaction.CompactionInterruptedException;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.SSTableSet;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.db.rows.Rows;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterators;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.ReducingKeyIterator;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+import org.apache.cassandra.utils.concurrent.Refs;
+
+public class ViewBuilderTask extends CompactionInfo.Holder implements Callable<Long>
+{
+ private static final Logger logger = LoggerFactory.getLogger(ViewBuilderTask.class);
+
+ private static final int ROWS_BETWEEN_CHECKPOINTS = 1000;
+
+ private final ColumnFamilyStore baseCfs;
+ private final View view;
+ private final Range<Token> range;
+ private final UUID compactionId;
+ private volatile Token prevToken;
+ private volatile long keysBuilt = 0;
+ private volatile boolean isStopped = false;
+ private volatile boolean isCompactionInterrupted = false;
+
+ ViewBuilderTask(ColumnFamilyStore baseCfs, View view, Range<Token> range, Token lastToken, long keysBuilt)
+ {
+ this.baseCfs = baseCfs;
+ this.view = view;
+ this.range = range;
+ this.compactionId = UUIDGen.getTimeUUID();
+ this.prevToken = lastToken;
+ this.keysBuilt = keysBuilt;
+ }
+
+ private void buildKey(DecoratedKey key)
+ {
+ ReadQuery selectQuery = view.getReadQuery();
+
+ if (!selectQuery.selectsKey(key))
+ {
+ logger.trace("Skipping {}, view query filters", key);
+ return;
+ }
+
+ int nowInSec = FBUtilities.nowInSeconds();
+ SinglePartitionReadCommand command = view.getSelectStatement().internalReadForView(key, nowInSec);
+
+ // We're rebuilding everything from what's on disk, so we read everything, consider that as new updates
+ // and pretend that there is nothing pre-existing.
+ UnfilteredRowIterator empty = UnfilteredRowIterators.noRowsIterator(baseCfs.metadata(), key, Rows.EMPTY_STATIC_ROW, DeletionTime.LIVE, false);
+
+ try (ReadExecutionController orderGroup = command.executionController();
+ UnfilteredRowIterator data = UnfilteredPartitionIterators.getOnlyElement(command.executeLocally(orderGroup), command))
+ {
+ Iterator<Collection<Mutation>> mutations = baseCfs.keyspace.viewManager
+ .forTable(baseCfs.metadata.id)
+ .generateViewUpdates(Collections.singleton(view), data, empty, nowInSec, true);
+
+ AtomicLong noBase = new AtomicLong(Long.MAX_VALUE);
+ mutations.forEachRemaining(m -> StorageProxy.mutateMV(key.getKey(), m, true, noBase, System.nanoTime()));
+ }
+ }
+
+ public Long call()
+ {
+ String ksName = baseCfs.metadata.keyspace;
+
+ if (prevToken == null)
+ logger.debug("Starting new view build for range {}", range);
+ else
+ logger.debug("Resuming view build for range {} from token {} with {} covered keys", range, prevToken, keysBuilt);
+
+ Function<org.apache.cassandra.db.lifecycle.View, Iterable<SSTableReader>> function;
+ function = org.apache.cassandra.db.lifecycle.View.select(SSTableSet.CANONICAL, s -> range.intersects(s.getBounds()));
+
+ try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(function);
+ Refs<SSTableReader> sstables = viewFragment.refs;
+ ReducingKeyIterator keyIter = new ReducingKeyIterator(sstables))
+ {
+ PeekingIterator<DecoratedKey> iter = Iterators.peekingIterator(keyIter);
+ while (!isStopped && iter.hasNext())
+ {
+ DecoratedKey key = iter.next();
+ Token token = key.getToken();
+ //skip tokens already built or not present in range
+ if (range.contains(token) && (prevToken == null || token.compareTo(prevToken) > 0))
+ {
+ buildKey(key);
+ ++keysBuilt;
+ //build other keys sharing the same token
+ while (iter.hasNext() && iter.peek().getToken().equals(token))
+ {
+ key = iter.next();
+ buildKey(key);
+ ++keysBuilt;
+ }
+ if (keysBuilt % ROWS_BETWEEN_CHECKPOINTS == 1)
+ SystemKeyspace.updateViewBuildStatus(ksName, view.name, range, token, keysBuilt);
+ prevToken = token;
+ }
+ }
+ }
+
+ finish();
+
+ return keysBuilt;
+ }
+
+ private void finish()
+ {
+ String ksName = baseCfs.keyspace.getName();
+ if (!isStopped)
+ {
+ // Save the completed status using the end of the range as last token. This way it will be possible for
+ // future view build attempts to don't even create a task for this range
+ SystemKeyspace.updateViewBuildStatus(ksName, view.name, range, range.right, keysBuilt);
+
+ logger.debug("Completed build of view({}.{}) for range {} after covering {} keys ", ksName, view.name, range, keysBuilt);
+ }
+ else
+ {
+ logger.debug("Stopped build for view({}.{}) for range {} after covering {} keys", ksName, view.name, range, keysBuilt);
+
+ // If it's stopped due to a compaction interruption we should throw that exception.
+ // Otherwise we assume that the task has been stopped due to a schema update and we can finish successfully.
+ if (isCompactionInterrupted)
+ throw new StoppedException(ksName, view.name, getCompactionInfo());
+ }
+ }
+
+ @Override
+ public CompactionInfo getCompactionInfo()
+ {
+ // If there's splitter, calculate progress based on last token position
+ if (range.left.getPartitioner().splitter().isPresent())
+ {
+ long progress = prevToken == null ? 0 : Math.round(prevToken.getPartitioner().splitter().get().positionInRange(prevToken, range) * 1000);
+ return new CompactionInfo(baseCfs.metadata(), OperationType.VIEW_BUILD, progress, 1000, "token range parts", compactionId);
+ }
+
+ // When there is no splitter, estimate based on number of total keys but
+ // take the max with keysBuilt + 1 to avoid having more completed than total
+ long keysTotal = Math.max(keysBuilt + 1, baseCfs.estimatedKeysForRange(range));
+ return new CompactionInfo(baseCfs.metadata(), OperationType.VIEW_BUILD, keysBuilt, keysTotal, "keys", compactionId);
+ }
+
+ @Override
+ public void stop()
+ {
+ stop(true);
+ }
+
+ synchronized void stop(boolean isCompactionInterrupted)
+ {
+ isStopped = true;
+ this.isCompactionInterrupted = isCompactionInterrupted;
+ }
+
+ long keysBuilt()
+ {
+ return keysBuilt;
+ }
+
+ /**
+ * {@link CompactionInterruptedException} with {@link Object#equals(Object)} and {@link Object#hashCode()}
+ * implementations that consider equals all the exceptions produced by the same view build, independently of their
+ * token range.
+ * <p>
+ * This is used to avoid Guava's {@link Futures#allAsList(Iterable)} log spamming when multiple build tasks fail
+ * due to compaction interruption.
+ */
+ static class StoppedException extends CompactionInterruptedException
+ {
+ private final String ksName, viewName;
+
+ private StoppedException(String ksName, String viewName, CompactionInfo info)
+ {
+ super(info);
+ this.ksName = ksName;
+ this.viewName = viewName;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (!(o instanceof StoppedException))
+ return false;
+
+ StoppedException that = (StoppedException) o;
+ return Objects.equal(this.ksName, that.ksName) && Objects.equal(this.viewName, that.viewName);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return 31 * ksName.hashCode() + viewName.hashCode();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/db/view/ViewManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewManager.java b/src/java/org/apache/cassandra/db/view/ViewManager.java
index cf731dd..8506d82 100644
--- a/src/java/org/apache/cassandra/db/view/ViewManager.java
+++ b/src/java/org/apache/cassandra/db/view/ViewManager.java
@@ -93,7 +93,7 @@ public class ViewManager
return viewsByName.values();
}
- public void reload()
+ public void reload(boolean buildAllViews)
{
Map<String, ViewMetadata> newViewsByName = new HashMap<>();
for (ViewMetadata definition : keyspace.getMetadata().views)
@@ -113,6 +113,9 @@ public class ViewManager
addView(entry.getValue());
}
+ if (!buildAllViews)
+ return;
+
// Building views involves updating view build status in the system_distributed
// keyspace and therefore it requires ring information. This check prevents builds
// being submitted when Keyspaces are initialized during CassandraDaemon::setup as
@@ -163,6 +166,16 @@ public class ViewManager
SystemDistributedKeyspace.setViewRemoved(keyspace.getName(), view.name);
}
+ /**
+ * Stops the building of the specified view, no-op if it isn't building.
+ *
+ * @param name the name of the view
+ */
+ public void stopBuild(String name)
+ {
+ viewsByName.get(name).stopBuild();
+ }
+
public View getByName(String name)
{
return viewsByName.get(name);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/dht/Splitter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Splitter.java b/src/java/org/apache/cassandra/dht/Splitter.java
index 4433f97..c63fe91 100644
--- a/src/java/org/apache/cassandra/dht/Splitter.java
+++ b/src/java/org/apache/cassandra/dht/Splitter.java
@@ -18,10 +18,19 @@
package org.apache.cassandra.dht;
+import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
+
+import static java.util.stream.Collectors.toSet;
/**
* Partition splitter.
@@ -35,10 +44,79 @@ public abstract class Splitter
this.partitioner = partitioner;
}
+ @VisibleForTesting
protected abstract Token tokenForValue(BigInteger value);
+ @VisibleForTesting
protected abstract BigInteger valueForToken(Token token);
+ @VisibleForTesting
+ protected BigInteger tokensInRange(Range<Token> range)
+ {
+ //full range case
+ if (range.left.equals(range.right))
+ return tokensInRange(new Range(partitioner.getMinimumToken(), partitioner.getMaximumToken()));
+
+ BigInteger totalTokens = BigInteger.ZERO;
+ for (Range<Token> unwrapped : range.unwrap())
+ {
+ totalTokens = totalTokens.add(valueForToken(token(unwrapped.right)).subtract(valueForToken(unwrapped.left))).abs();
+ }
+ return totalTokens;
+ }
+
+ /**
+ * Computes the number of elapsed tokens from the range start until this token
+ * @return the number of tokens from the range start to the token
+ */
+ @VisibleForTesting
+ protected BigInteger elapsedTokens(Token token, Range<Token> range)
+ {
+ // No token elapsed since range does not contain token
+ if (!range.contains(token))
+ return BigInteger.ZERO;
+
+ BigInteger elapsedTokens = BigInteger.ZERO;
+ for (Range<Token> unwrapped : range.unwrap())
+ {
+ if (unwrapped.contains(token))
+ {
+ elapsedTokens = elapsedTokens.add(tokensInRange(new Range<>(unwrapped.left, token)));
+ }
+ else if (token.compareTo(unwrapped.left) < 0)
+ {
+ elapsedTokens = elapsedTokens.add(tokensInRange(unwrapped));
+ }
+ }
+ return elapsedTokens;
+ }
+
+ /**
+ * Computes the normalized position of this token relative to this range
+ * @return A number between 0.0 and 1.0 representing this token's position
+ * in this range or -1.0 if this range doesn't contain this token.
+ */
+ public double positionInRange(Token token, Range<Token> range)
+ {
+ //full range case
+ if (range.left.equals(range.right))
+ return positionInRange(token, new Range(partitioner.getMinimumToken(), partitioner.getMaximumToken()));
+
+ // leftmost token means we are on position 0.0
+ if (token.equals(range.left))
+ return 0.0;
+
+ // rightmost token means we are on position 1.0
+ if (token.equals(range.right))
+ return 1.0;
+
+ // Impossible to find position when token is not contained in range
+ if (!range.contains(token))
+ return -1.0;
+
+ return new BigDecimal(elapsedTokens(token, range)).divide(new BigDecimal(tokensInRange(range)), 3, BigDecimal.ROUND_HALF_EVEN).doubleValue();
+ }
+
public List<Token> splitOwnedRanges(int parts, List<Range<Token>> localRanges, boolean dontSplitRanges)
{
if (localRanges.isEmpty() || parts == 1)
@@ -127,4 +205,55 @@ public abstract class Splitter
return t.equals(partitioner.getMinimumToken()) ? partitioner.getMaximumToken() : t;
}
+ /**
+ * Splits the specified token ranges in at least {@code parts} subranges.
+ * <p>
+ * Each returned subrange will be contained in exactly one of the specified ranges.
+ *
+ * @param ranges a collection of token ranges to be split
+ * @param parts the minimum number of returned ranges
+ * @return at least {@code minParts} token ranges covering {@code ranges}
+ */
+ public Set<Range<Token>> split(Collection<Range<Token>> ranges, int parts)
+ {
+ int numRanges = ranges.size();
+ if (numRanges >= parts)
+ {
+ return Sets.newHashSet(ranges);
+ }
+ else
+ {
+ int partsPerRange = (int) Math.ceil((double) parts / numRanges);
+ return ranges.stream()
+ .map(range -> split(range, partsPerRange))
+ .flatMap(Collection::stream)
+ .collect(toSet());
+ }
+ }
+
+ /**
+ * Splits the specified token range in at least {@code minParts} subranges, unless the range has not enough tokens
+ * in which case the range will be returned without splitting.
+ *
+ * @param range a token range
+ * @param parts the number of subranges
+ * @return {@code parts} even subranges of {@code range}
+ */
+ private Set<Range<Token>> split(Range<Token> range, int parts)
+ {
+ // the range might not have enough tokens to split
+ BigInteger numTokens = tokensInRange(range);
+ if (BigInteger.valueOf(parts).compareTo(numTokens) > 0)
+ return Collections.singleton(range);
+
+ Token left = range.left;
+ Set<Range<Token>> subranges = new HashSet<>(parts);
+ for (double i = 1; i <= parts; i++)
+ {
+ Token right = partitioner.split(range.left, range.right, i / parts);
+ subranges.add(new Range<>(left, right));
+ left = right;
+ }
+ return subranges;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/io/sstable/SSTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java
index 3018fc1..f4d3706 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@ -33,7 +33,9 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.BufferDecoratedKey;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.util.DiskOptimizationStrategy;
import org.apache.cassandra.io.util.FileUtils;
@@ -343,4 +345,9 @@ public abstract class SSTable
appendTOC(descriptor, componentsToAdd);
components.addAll(componentsToAdd);
}
+
+ public AbstractBounds<Token> getBounds()
+ {
+ return AbstractBounds.bounds(first.getToken(), true, last.getToken(), true);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/schema/Schema.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Schema.java b/src/java/org/apache/cassandra/schema/Schema.java
index e79e3bd..711724b 100644
--- a/src/java/org/apache/cassandra/schema/Schema.java
+++ b/src/java/org/apache/cassandra/schema/Schema.java
@@ -632,7 +632,7 @@ public final class Schema
viewsDiff.entriesDiffering().values().forEach(diff -> alterView(diff.rightValue()));
// deal with all removed, added, and altered views
- Keyspace.open(before.name).viewManager.reload();
+ Keyspace.open(before.name).viewManager.reload(true);
// notify on everything dropped
udasDiff.entriesOnlyOnLeft().values().forEach(this::notifyDropAggregate);
@@ -691,6 +691,7 @@ public final class Schema
private void dropView(ViewMetadata metadata)
{
+ Keyspace.open(metadata.keyspace).viewManager.stopBuild(metadata.name);
dropTable(metadata.metadata);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index cb942b9..c1202be 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -96,7 +96,6 @@ import org.apache.cassandra.service.paxos.PrepareVerbHandler;
import org.apache.cassandra.service.paxos.ProposeVerbHandler;
import org.apache.cassandra.streaming.*;
import org.apache.cassandra.tracing.TraceKeyspace;
-import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.utils.*;
import org.apache.cassandra.utils.progress.ProgressEvent;
@@ -1392,6 +1391,19 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
CompactionManager.instance.setConcurrentValidations(DatabaseDescriptor.getConcurrentValidations());
}
+ public int getConcurrentViewBuilders()
+ {
+ return DatabaseDescriptor.getConcurrentViewBuilders();
+ }
+
+ public void setConcurrentViewBuilders(int value)
+ {
+ if (value <= 0)
+ throw new IllegalArgumentException("Number of concurrent view builders should be greater than 0.");
+ DatabaseDescriptor.setConcurrentViewBuilders(value);
+ CompactionManager.instance.setConcurrentViewBuilders(DatabaseDescriptor.getConcurrentViewBuilders());
+ }
+
public boolean isIncrementalBackupsEnabled()
{
return DatabaseDescriptor.isIncrementalBackupsEnabled();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index c4548ae..48e1b2f 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -21,7 +21,6 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
-import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
@@ -520,6 +519,9 @@ public interface StorageServiceMBean extends NotificationEmitter
public int getConcurrentValidators();
public void setConcurrentValidators(int value);
+ public int getConcurrentViewBuilders();
+ public void setConcurrentViewBuilders(int value);
+
public boolean isIncrementalBackupsEnabled();
public void setIncrementalBackupsEnabled(boolean value);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 0912534..0de00f7 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -1053,6 +1053,16 @@ public class NodeProbe implements AutoCloseable
return ssProxy.getConcurrentCompactors();
}
+ public void setConcurrentViewBuilders(int value)
+ {
+ ssProxy.setConcurrentViewBuilders(value);
+ }
+
+ public int getConcurrentViewBuilders()
+ {
+ return ssProxy.getConcurrentViewBuilders();
+ }
+
public void setMaxHintWindow(int value)
{
spProxy.setMaxHintWindow(value);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/tools/NodeTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java
index 8618d87..0db422e 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -110,6 +110,8 @@ public class NodeTool
SetCompactionThroughput.class,
GetConcurrentCompactors.class,
SetConcurrentCompactors.class,
+ GetConcurrentViewBuilders.class,
+ SetConcurrentViewBuilders.class,
SetTimeout.class,
SetStreamThroughput.class,
SetInterDCStreamThroughput.class,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/tools/nodetool/GetConcurrentViewBuilders.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/GetConcurrentViewBuilders.java b/src/java/org/apache/cassandra/tools/nodetool/GetConcurrentViewBuilders.java
new file mode 100644
index 0000000..c189fb0
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/nodetool/GetConcurrentViewBuilders.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.nodetool;
+
+import io.airlift.airline.Command;
+import org.apache.cassandra.tools.NodeProbe;
+import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
+
+@Command(name = "getconcurrentviewbuilders", description = "Get the number of concurrent view builders in the system")
+public class GetConcurrentViewBuilders extends NodeToolCmd
+{
+ protected void execute(NodeProbe probe)
+ {
+ System.out.println("Current number of concurrent view builders in the system is: \n" +
+ probe.getConcurrentViewBuilders());
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/tools/nodetool/SetConcurrentViewBuilders.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/SetConcurrentViewBuilders.java b/src/java/org/apache/cassandra/tools/nodetool/SetConcurrentViewBuilders.java
new file mode 100644
index 0000000..96adf2c
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/nodetool/SetConcurrentViewBuilders.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.nodetool;
+
+import io.airlift.airline.Arguments;
+import io.airlift.airline.Command;
+import org.apache.cassandra.tools.NodeProbe;
+import org.apache.cassandra.tools.NodeTool;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+@Command(name = "setconcurrentviewbuilders", description = "Set the number of concurrent view builders in the system")
+public class SetConcurrentViewBuilders extends NodeTool.NodeToolCmd
+{
+ @Arguments(title = "concurrent_view_builders", usage = "<value>", description = "Number of concurrent view builders, greater than 0.", required = true)
+ private Integer concurrentViewBuilders = null;
+
+ protected void execute(NodeProbe probe)
+ {
+ checkArgument(concurrentViewBuilders > 0, "concurrent_view_builders should be great than 0.");
+ probe.setConcurrentViewBuilders(concurrentViewBuilders);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/test/unit/org/apache/cassandra/cql3/ViewTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/ViewTest.java b/test/unit/org/apache/cassandra/cql3/ViewTest.java
index 4fd4df6..2b95574 100644
--- a/test/unit/org/apache/cassandra/cql3/ViewTest.java
+++ b/test/unit/org/apache/cassandra/cql3/ViewTest.java
@@ -1320,8 +1320,7 @@ public class ViewTest extends CQLTester
}
}
- @Test
- public void testViewBuilderResume() throws Throwable
+ private void testViewBuilderResume(int concurrentViewBuilders) throws Throwable
{
createTable("CREATE TABLE %s (" +
"k int, " +
@@ -1332,6 +1331,7 @@ public class ViewTest extends CQLTester
execute("USE " + keyspace());
executeNet(protocolVersion, "USE " + keyspace());
+ CompactionManager.instance.setConcurrentViewBuilders(concurrentViewBuilders);
CompactionManager.instance.setCoreCompactorThreads(1);
CompactionManager.instance.setMaximumCompactorThreads(1);
ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
@@ -1357,21 +1357,32 @@ public class ViewTest extends CQLTester
cfs.forceBlockingFlush();
- createView("mv_test", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE val IS NOT NULL AND k IS NOT NULL AND c IS NOT NULL PRIMARY KEY (val,k,c)");
+ String viewName1 = "mv_test_" + concurrentViewBuilders;
+ createView(viewName1, "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE val IS NOT NULL AND k IS NOT NULL AND c IS NOT NULL PRIMARY KEY (val,k,c)");
cfs.enableAutoCompaction();
List<Future<?>> futures = CompactionManager.instance.submitBackground(cfs);
+ String viewName2 = viewName1 + "_2";
//Force a second MV on the same base table, which will restart the first MV builder...
- createView("mv_test2", "CREATE MATERIALIZED VIEW %s AS SELECT val, k, c FROM %%s WHERE val IS NOT NULL AND k IS NOT NULL AND c IS NOT NULL PRIMARY KEY (val,k,c)");
+ createView(viewName2, "CREATE MATERIALIZED VIEW %s AS SELECT val, k, c FROM %%s WHERE val IS NOT NULL AND k IS NOT NULL AND c IS NOT NULL PRIMARY KEY (val,k,c)");
//Compact the base table
FBUtilities.waitOnFutures(futures);
- while (!SystemKeyspace.isViewBuilt(keyspace(), "mv_test"))
+ while (!SystemKeyspace.isViewBuilt(keyspace(), viewName1))
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
- assertRows(execute("SELECT count(*) FROM mv_test"), row(1024L));
+ assertRows(execute("SELECT count(*) FROM " + viewName1), row(1024L));
+ }
+
+ @Test
+ public void testViewBuilderResume() throws Throwable
+ {
+ for (int i = 1; i <= 8; i *= 2)
+ {
+ testViewBuilderResume(i);
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/test/unit/org/apache/cassandra/db/view/ViewBuilderTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/view/ViewBuilderTaskTest.java b/test/unit/org/apache/cassandra/db/view/ViewBuilderTaskTest.java
new file mode 100644
index 0000000..2341c73
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/view/ViewBuilderTaskTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.view;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.junit.Test;
+
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.transport.ProtocolVersion;
+
+import static org.junit.Assert.assertEquals;
+
+public class ViewBuilderTaskTest extends CQLTester
+{
+ private static final ProtocolVersion protocolVersion = ProtocolVersion.CURRENT;
+
+ @Test
+ public void testBuildRange() throws Throwable
+ {
+ requireNetwork();
+ execute("USE " + keyspace());
+ executeNet(protocolVersion, "USE " + keyspace());
+
+ String tableName = createTable("CREATE TABLE %s (" +
+ "k int, " +
+ "c int, " +
+ "v text, " +
+ "PRIMARY KEY(k, c))");
+
+ String viewName = tableName + "_view";
+ executeNet(protocolVersion, String.format("CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s " +
+ "WHERE v IS NOT NULL AND k IS NOT NULL AND c IS NOT NULL " +
+ "PRIMARY KEY (v, k, c)", viewName));
+
+ ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+ View view = cfs.keyspace.viewManager.forTable(cfs.metadata().id).iterator().next();
+
+ // Insert the dataset
+ for (int k = 0; k < 100; k++)
+ for (int c = 0; c < 10; c++)
+ execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", k, c, String.valueOf(k));
+
+ // Retrieve the sorted tokens of the inserted rows
+ IPartitioner partitioner = cfs.metadata().partitioner;
+ List<Token> tokens = IntStream.range(0, 100)
+ .mapToObj(Int32Type.instance::decompose)
+ .map(partitioner::getToken)
+ .sorted()
+ .collect(Collectors.toList());
+
+ class Tester
+ {
+ private void test(int indexOfStartToken,
+ int indexOfEndToken,
+ Integer indexOfLastToken,
+ long keysBuilt,
+ long expectedKeysBuilt,
+ int expectedRowsInView) throws Throwable
+ {
+ // Truncate the materialized view (not the base table)
+ cfs.viewManager.forceBlockingFlush();
+ cfs.viewManager.truncateBlocking(cfs.forceBlockingFlush(), System.currentTimeMillis());
+ assertRowCount(execute("SELECT * FROM " + viewName), 0);
+
+ // Get the tokens from the referenced inserted rows
+ Token startToken = tokens.get(indexOfStartToken);
+ Token endToken = tokens.get(indexOfEndToken);
+ Token lastToken = indexOfLastToken == null ? null : tokens.get(indexOfLastToken);
+ Range<Token> range = new Range<>(startToken, endToken);
+
+ // Run the view build task, verifying the returned number of bult keys
+ long actualKeysBuilt = new ViewBuilderTask(cfs, view, range, lastToken, keysBuilt).call();
+ assertEquals(expectedKeysBuilt, actualKeysBuilt);
+
+ // Verify that the rows have been written to the MV
+ assertRowCount(execute("SELECT * FROM " + viewName), expectedRowsInView);
+
+ // Verify that the last position and number of bult keys have been stored
+ assertRows(execute(String.format("SELECT last_token, keys_built " +
+ "FROM %s.%s WHERE keyspace_name='%s' AND view_name='%s' " +
+ "AND start_token=? AND end_token=?",
+ SchemaConstants.SYSTEM_KEYSPACE_NAME,
+ SystemKeyspace.VIEW_BUILDS_IN_PROGRESS,
+ keyspace(),
+ viewName),
+ startToken.toString(), endToken.toString()),
+ row(endToken.toString(), expectedKeysBuilt));
+ }
+ }
+ Tester tester = new Tester();
+
+ // Build range from rows 0 to 100 without any recorded start position
+ tester.test(0, 10, null, 0, 10, 100);
+
+ // Build range from rows 100 to 200 starting at row 150
+ tester.test(10, 20, 15, 0, 5, 50);
+
+ // Build range from rows 300 to 400 starting at row 350 with 10 built keys
+ tester.test(30, 40, 35, 10, 15, 50);
+
+ // Build range from rows 400 to 500 starting at row 100 (out of range) with 10 built keys
+ tester.test(40, 50, 10, 10, 20, 100);
+
+ // Build range from rows 900 to 100 (wrap around) without any recorded start position
+ tester.test(90, 10, null, 0, 20, 200);
+
+ executeNet(protocolVersion, "DROP MATERIALIZED VIEW " + view.name);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org