You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ad...@apache.org on 2017/12/01 14:03:17 UTC

[1/2] cassandra git commit: Parallelize initial materialized view build

Repository: cassandra
Updated Branches:
  refs/heads/trunk 88b244a13 -> 4c80eeece


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/test/unit/org/apache/cassandra/dht/SplitterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/SplitterTest.java b/test/unit/org/apache/cassandra/dht/SplitterTest.java
index 751a7d7..409e333 100644
--- a/test/unit/org/apache/cassandra/dht/SplitterTest.java
+++ b/test/unit/org/apache/cassandra/dht/SplitterTest.java
@@ -24,9 +24,14 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import org.junit.Test;
 
+import org.apache.cassandra.utils.Pair;
+
+import static com.google.common.collect.Sets.newHashSet;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -50,34 +55,35 @@ public class SplitterTest
     {
         randomSplitTestVNodes(new RandomPartitioner());
     }
+
     @Test
     public void randomSplitTestVNodesMurmur3Partitioner()
     {
         randomSplitTestVNodes(new Murmur3Partitioner());
     }
 
-    public void randomSplitTestNoVNodes(IPartitioner partitioner)
+    private static void randomSplitTestNoVNodes(IPartitioner partitioner)
     {
-        Splitter splitter = partitioner.splitter().get();
+        Splitter splitter = getSplitter(partitioner);
         Random r = new Random();
         for (int i = 0; i < 10000; i++)
         {
-            List<Range<Token>> localRanges = generateLocalRanges(1, r.nextInt(4)+1, splitter, r, partitioner instanceof RandomPartitioner);
+            List<Range<Token>> localRanges = generateLocalRanges(1, r.nextInt(4) + 1, splitter, r, partitioner instanceof RandomPartitioner);
             List<Token> boundaries = splitter.splitOwnedRanges(r.nextInt(9) + 1, localRanges, false);
-            assertTrue("boundaries = "+boundaries+" ranges = "+localRanges, assertRangeSizeEqual(localRanges, boundaries, partitioner, splitter, true));
+            assertTrue("boundaries = " + boundaries + " ranges = " + localRanges, assertRangeSizeEqual(localRanges, boundaries, partitioner, splitter, true));
         }
     }
 
-    public void randomSplitTestVNodes(IPartitioner partitioner)
+    private static void randomSplitTestVNodes(IPartitioner partitioner)
     {
-        Splitter splitter = partitioner.splitter().get();
+        Splitter splitter = getSplitter(partitioner);
         Random r = new Random();
         for (int i = 0; i < 10000; i++)
         {
             // we need many tokens to be able to split evenly over the disks
             int numTokens = 172 + r.nextInt(128);
             int rf = r.nextInt(4) + 2;
-            int parts = r.nextInt(5)+1;
+            int parts = r.nextInt(5) + 1;
             List<Range<Token>> localRanges = generateLocalRanges(numTokens, rf, splitter, r, partitioner instanceof RandomPartitioner);
             List<Token> boundaries = splitter.splitOwnedRanges(parts, localRanges, true);
             if (!assertRangeSizeEqual(localRanges, boundaries, partitioner, splitter, false))
@@ -85,7 +91,7 @@ public class SplitterTest
         }
     }
 
-    private boolean assertRangeSizeEqual(List<Range<Token>> localRanges, List<Token> tokens, IPartitioner partitioner, Splitter splitter, boolean splitIndividualRanges)
+    private static boolean assertRangeSizeEqual(List<Range<Token>> localRanges, List<Token> tokens, IPartitioner partitioner, Splitter splitter, boolean splitIndividualRanges)
     {
         Token start = partitioner.getMinimumToken();
         List<BigInteger> splits = new ArrayList<>();
@@ -113,7 +119,7 @@ public class SplitterTest
         return allBalanced;
     }
 
-    private BigInteger sumOwnedBetween(List<Range<Token>> localRanges, Token start, Token end, Splitter splitter, boolean splitIndividualRanges)
+    private static BigInteger sumOwnedBetween(List<Range<Token>> localRanges, Token start, Token end, Splitter splitter, boolean splitIndividualRanges)
     {
         BigInteger sum = BigInteger.ZERO;
         for (Range<Token> range : localRanges)
@@ -133,7 +139,7 @@ public class SplitterTest
         return sum;
     }
 
-    private List<Range<Token>> generateLocalRanges(int numTokens, int rf, Splitter splitter, Random r, boolean randomPartitioner)
+    private static List<Range<Token>> generateLocalRanges(int numTokens, int rf, Splitter splitter, Random r, boolean randomPartitioner)
     {
         int localTokens = numTokens * rf;
         List<Token> randomTokens = new ArrayList<>();
@@ -149,10 +155,327 @@ public class SplitterTest
         List<Range<Token>> localRanges = new ArrayList<>(localTokens);
         for (int i = 0; i < randomTokens.size() - 1; i++)
         {
-            assert randomTokens.get(i).compareTo(randomTokens.get(i+1)) < 0;
-            localRanges.add(new Range<>(randomTokens.get(i), randomTokens.get(i+1)));
+            assert randomTokens.get(i).compareTo(randomTokens.get(i + 1)) < 0;
+            localRanges.add(new Range<>(randomTokens.get(i), randomTokens.get(i + 1)));
             i++;
         }
         return localRanges;
     }
+
+    @Test
+    public void testSplitMurmur3Partitioner()
+    {
+        testSplit(new Murmur3Partitioner());
+    }
+
+    @Test
+    public void testSplitRandomPartitioner()
+    {
+        testSplit(new RandomPartitioner());
+    }
+
+    @SuppressWarnings("unchecked")
+    private static void testSplit(IPartitioner partitioner)
+    {
+        boolean isRandom = partitioner instanceof RandomPartitioner;
+        Splitter splitter = getSplitter(partitioner);
+        BigInteger min = splitter.valueForToken(partitioner.getMinimumToken());
+        BigInteger max = splitter.valueForToken(partitioner.getMaximumToken());
+        BigInteger first = isRandom ? RandomPartitioner.ZERO : min;
+        BigInteger last = isRandom ? max.subtract(BigInteger.valueOf(1)) : max;
+        BigInteger midpoint = last.add(first).divide(BigInteger.valueOf(2));
+
+        // regular single range
+        testSplit(partitioner, 1, newHashSet(Pair.create(1, 100)), newHashSet(Pair.create(1, 100)));
+        testSplit(partitioner, 2,
+                  newHashSet(Pair.create(1, 100)),
+                  newHashSet(Pair.create(1, 50), Pair.create(50, 100)));
+        testSplit(partitioner, 4,
+                  newHashSet(Pair.create(1, 100)),
+                  newHashSet(Pair.create(1, 25), Pair.create(25, 50), Pair.create(50, 75), Pair.create(75, 100)));
+        testSplit(partitioner, 5,
+                  newHashSet(Pair.create(3, 79)),
+                  newHashSet(Pair.create(3, 18), Pair.create(18, 33), Pair.create(33, 48), Pair.create(48, 63),
+                             Pair.create(63, 79)));
+        testSplit(partitioner, 3,
+                  newHashSet(Pair.create(3, 20)),
+                  newHashSet(Pair.create(3, 8), Pair.create(8, 14), Pair.create(14, 20)));
+        testSplit(partitioner, 4,
+                  newHashSet(Pair.create(3, 20)),
+                  newHashSet(Pair.create(3, 7), Pair.create(7, 11), Pair.create(11, 15), Pair.create(15, 20)));
+
+        // single range too small to be partitioned
+        testSplit(partitioner, 1, newHashSet(Pair.create(1, 2)), newHashSet(Pair.create(1, 2)));
+        testSplit(partitioner, 2, newHashSet(Pair.create(1, 2)), newHashSet(Pair.create(1, 2)));
+        testSplit(partitioner, 4, newHashSet(Pair.create(1, 4)), newHashSet(Pair.create(1, 4)));
+        testSplit(partitioner, 8, newHashSet(Pair.create(1, 2)), newHashSet(Pair.create(1, 2)));
+
+        // single wrapping range
+        BigInteger cutpoint = isRandom ? midpoint.add(BigInteger.valueOf(7)) : min.add(BigInteger.valueOf(6));
+        testSplit(partitioner, 2,
+                  newHashSet(Pair.create(8, 4)),
+                  newHashSet(Pair.create(8, cutpoint), Pair.create(cutpoint, 4)));
+
+        // single range around partitioner min/max values
+        testSplit(partitioner, 2,
+                  newHashSet(Pair.create(max.subtract(BigInteger.valueOf(8)), min)),
+                  newHashSet(Pair.create(max.subtract(BigInteger.valueOf(8)), max.subtract(BigInteger.valueOf(4))),
+                             Pair.create(max.subtract(BigInteger.valueOf(4)), isRandom ? first : max)));
+        testSplit(partitioner, 2,
+                  newHashSet(Pair.create(max.subtract(BigInteger.valueOf(8)), max)),
+                  newHashSet(Pair.create(max.subtract(BigInteger.valueOf(8)), max.subtract(BigInteger.valueOf(4))),
+                             Pair.create(max.subtract(BigInteger.valueOf(4)), max)));
+        testSplit(partitioner, 2,
+                  newHashSet(Pair.create(min, min.add(BigInteger.valueOf(8)))),
+                  newHashSet(Pair.create(min, min.add(BigInteger.valueOf(4))),
+                             Pair.create(min.add(BigInteger.valueOf(4)), min.add(BigInteger.valueOf(8)))));
+        testSplit(partitioner, 2,
+                  newHashSet(Pair.create(max, min.add(BigInteger.valueOf(8)))),
+                  newHashSet(Pair.create(max, min.add(BigInteger.valueOf(4))),
+                             Pair.create(min.add(BigInteger.valueOf(4)), min.add(BigInteger.valueOf(8)))));
+        testSplit(partitioner, 2,
+                  newHashSet(Pair.create(max.subtract(BigInteger.valueOf(4)), min.add(BigInteger.valueOf(4)))),
+                  newHashSet(Pair.create(max.subtract(BigInteger.valueOf(4)), last),
+                             Pair.create(last, min.add(BigInteger.valueOf(4)))));
+        testSplit(partitioner, 2,
+                  newHashSet(Pair.create(max.subtract(BigInteger.valueOf(4)), min.add(BigInteger.valueOf(8)))),
+                  newHashSet(Pair.create(max.subtract(BigInteger.valueOf(4)), min.add(BigInteger.valueOf(2))),
+                             Pair.create(min.add(BigInteger.valueOf(2)), min.add(BigInteger.valueOf(8)))));
+
+        // multiple ranges
+        testSplit(partitioner, 1,
+                  newHashSet(Pair.create(1, 100), Pair.create(200, 300)),
+                  newHashSet(Pair.create(1, 100), Pair.create(200, 300)));
+        testSplit(partitioner, 2,
+                  newHashSet(Pair.create(1, 100), Pair.create(200, 300)),
+                  newHashSet(Pair.create(1, 100), Pair.create(200, 300)));
+        testSplit(partitioner, 4,
+                  newHashSet(Pair.create(1, 100), Pair.create(200, 300)),
+                  newHashSet(Pair.create(1, 50), Pair.create(50, 100), Pair.create(200, 250), Pair.create(250, 300)));
+        testSplit(partitioner, 4,
+                  newHashSet(Pair.create(1, 100),
+                             Pair.create(200, 300),
+                             Pair.create(max.subtract(BigInteger.valueOf(4)), min.add(BigInteger.valueOf(4)))),
+                  newHashSet(Pair.create(1, 50),
+                             Pair.create(50, 100),
+                             Pair.create(200, 250),
+                             Pair.create(250, 300),
+                             Pair.create(last, min.add(BigInteger.valueOf(4))),
+                             Pair.create(max.subtract(BigInteger.valueOf(4)), last)));
+    }
+
+    private static void testSplit(IPartitioner partitioner, int parts, Set<Pair<Object, Object>> ranges, Set<Pair<Object, Object>> expected)
+    {
+        Splitter splitter = getSplitter(partitioner);
+        Set<Range<Token>> splittedRanges = splitter.split(ranges(partitioner, ranges), parts);
+        assertEquals(ranges(partitioner, expected), splittedRanges);
+    }
+
+    private static Set<Range<Token>> ranges(IPartitioner partitioner, Set<Pair<Object, Object>> pairs)
+    {
+        return pairs.stream().map(pair -> range(partitioner, pair)).collect(Collectors.toSet());
+    }
+
+    private static Range<Token> range(IPartitioner partitioner, Pair<?, ?> pair)
+    {
+        return new Range<>(token(partitioner, pair.left), token(partitioner, pair.right));
+    }
+
+    private static Token token(IPartitioner partitioner, Object n)
+    {
+        return partitioner.getTokenFactory().fromString(n.toString());
+    }
+
+    @Test
+    public void testTokensInRangeRandomPartitioner()
+    {
+        testTokensInRange(new RandomPartitioner());
+    }
+
+    @Test
+    public void testTokensInRangeMurmur3Partitioner()
+    {
+        testTokensInRange(new Murmur3Partitioner());
+    }
+
+    private static void testTokensInRange(IPartitioner partitioner)
+    {
+        Splitter splitter = getSplitter(partitioner);
+
+        // test full range
+        Range<Token> fullRange = new Range<>(partitioner.getMinimumToken(), partitioner.getMaximumToken());
+        BigInteger fullRangeSize = splitter.valueForToken(partitioner.getMaximumToken()).subtract(splitter.valueForToken(partitioner.getMinimumToken()));
+        assertEquals(fullRangeSize, splitter.tokensInRange(fullRange));
+        fullRange = new Range<>(splitter.tokenForValue(BigInteger.valueOf(-10)), splitter.tokenForValue(BigInteger.valueOf(-10)));
+        assertEquals(fullRangeSize, splitter.tokensInRange(fullRange));
+
+        // test small range
+        Range<Token> smallRange = new Range<>(splitter.tokenForValue(BigInteger.valueOf(-5)), splitter.tokenForValue(BigInteger.valueOf(5)));
+        assertEquals(BigInteger.valueOf(10), splitter.tokensInRange(smallRange));
+
+        // test wrap-around range
+        Range<Token> wrapAround = new Range<>(splitter.tokenForValue(BigInteger.valueOf(5)), splitter.tokenForValue(BigInteger.valueOf(-5)));
+        assertEquals(fullRangeSize.subtract(BigInteger.TEN), splitter.tokensInRange(wrapAround));
+    }
+
+    @Test
+    public void testElapsedTokensRandomPartitioner()
+    {
+        testElapsedMultiRange(new RandomPartitioner());
+    }
+
+    @Test
+    public void testElapsedTokensMurmur3Partitioner()
+    {
+        testElapsedMultiRange(new Murmur3Partitioner());
+    }
+
+    private static void testElapsedMultiRange(IPartitioner partitioner)
+    {
+        Splitter splitter = getSplitter(partitioner);
+        // small range
+        Range<Token> smallRange = new Range<>(splitter.tokenForValue(BigInteger.valueOf(-1)), splitter.tokenForValue(BigInteger.valueOf(1)));
+        testElapsedTokens(partitioner, smallRange, true);
+
+        // medium range
+        Range<Token> mediumRange = new Range<>(splitter.tokenForValue(BigInteger.valueOf(0)), splitter.tokenForValue(BigInteger.valueOf(123456789)));
+        testElapsedTokens(partitioner, mediumRange, true);
+
+        // wrapped range
+        BigInteger min = splitter.valueForToken(partitioner.getMinimumToken());
+        BigInteger max = splitter.valueForToken(partitioner.getMaximumToken());
+        Range<Token> wrappedRange = new Range<>(splitter.tokenForValue(max.subtract(BigInteger.valueOf(1350))),
+                                                splitter.tokenForValue(min.add(BigInteger.valueOf(20394))));
+        testElapsedTokens(partitioner, wrappedRange, true);
+
+        // full range
+        Range<Token> fullRange = new Range<>(partitioner.getMinimumToken(), partitioner.getMaximumToken());
+        testElapsedTokens(partitioner, fullRange, false);
+    }
+
+    private static void testElapsedTokens(IPartitioner partitioner, Range<Token> range, boolean partialRange)
+    {
+        Splitter splitter = getSplitter(partitioner);
+
+        BigInteger left = splitter.valueForToken(range.left);
+        BigInteger right = splitter.valueForToken(range.right);
+        BigInteger tokensInRange = splitter.tokensInRange(range);
+
+        // elapsedTokens(left, (left, right]) = 0
+        assertEquals(BigInteger.ZERO, splitter.elapsedTokens(splitter.tokenForValue(left), range));
+
+        // elapsedTokens(right, (left, right]) = tokensInRange((left, right])
+        assertEquals(tokensInRange, splitter.elapsedTokens(splitter.tokenForValue(right), range));
+
+        // elapsedTokens(left+1, (left, right]) = 1
+        assertEquals(BigInteger.ONE, splitter.elapsedTokens(splitter.tokenForValue(left.add(BigInteger.ONE)), range));
+
+        // elapsedTokens(right-1, (left, right]) = tokensInRange((left, right]) - 1
+        assertEquals(tokensInRange.subtract(BigInteger.ONE), splitter.elapsedTokens(splitter.tokenForValue(right.subtract(BigInteger.ONE)), range));
+
+        // elapsedTokens(midpoint, (left, right]) + tokensInRange((midpoint, right]) = tokensInRange
+        Token midpoint = partitioner.midpoint(range.left, range.right);
+        assertEquals(tokensInRange, splitter.elapsedTokens(midpoint, range).add(splitter.tokensInRange(new Range<>(midpoint, range.right))));
+
+        if (partialRange)
+        {
+            // elapsedTokens(right + 1, (left, right]) = 0
+            assertEquals(BigInteger.ZERO, splitter.elapsedTokens(splitter.tokenForValue(right.add(BigInteger.ONE)), range));
+        }
+    }
+
+    @Test
+    public void testPositionInRangeRandomPartitioner()
+    {
+        testPositionInRangeMultiRange(new RandomPartitioner());
+    }
+
+    @Test
+    public void testPositionInRangeMurmur3Partitioner()
+    {
+        testPositionInRangeMultiRange(new Murmur3Partitioner());
+    }
+
+    private static void testPositionInRangeMultiRange(IPartitioner partitioner)
+    {
+        Splitter splitter = getSplitter(partitioner);
+
+        // Test tiny range
+        Token start = splitter.tokenForValue(BigInteger.ZERO);
+        Token end = splitter.tokenForValue(BigInteger.valueOf(3));
+        Range<Token> range = new Range<>(start, end);
+        assertEquals(0.0, splitter.positionInRange(start, range), 0.01);
+        assertEquals(0.33, splitter.positionInRange(splitter.tokenForValue(BigInteger.valueOf(1)), range), 0.01);
+        assertEquals(0.66, splitter.positionInRange(splitter.tokenForValue(BigInteger.valueOf(2)), range), 0.01);
+        assertEquals(1.0, splitter.positionInRange(end, range), 0.01);
+        // Token not in range should return -1.0 for position
+        Token notInRange = splitter.tokenForValue(BigInteger.valueOf(10));
+        assertEquals(-1.0, splitter.positionInRange(notInRange, range), 0.0);
+
+
+        // Test medium range
+        start = splitter.tokenForValue(BigInteger.ZERO);
+        end = splitter.tokenForValue(BigInteger.valueOf(1000));
+        range = new Range<>(start, end);
+        testPositionInRange(partitioner, splitter, range);
+
+        // Test wrap-around range
+        start = splitter.tokenForValue(splitter.valueForToken(partitioner.getMaximumToken()).subtract(BigInteger.valueOf(123456789)));
+        end = splitter.tokenForValue(splitter.valueForToken(partitioner.getMinimumToken()).add(BigInteger.valueOf(123456789)));
+        range = new Range<>(start, end);
+        testPositionInRange(partitioner, splitter, range);
+
+        // Test full range
+        testPositionInRange(partitioner, splitter, new Range<>(partitioner.getMinimumToken(), partitioner.getMaximumToken()));
+        testPositionInRange(partitioner, splitter, new Range<>(partitioner.getMinimumToken(), partitioner.getMinimumToken()));
+        testPositionInRange(partitioner, splitter, new Range<>(partitioner.getMaximumToken(), partitioner.getMaximumToken()));
+        testPositionInRange(partitioner, splitter, new Range<>(splitter.tokenForValue(BigInteger.ONE), splitter.tokenForValue(BigInteger.ONE)));
+    }
+
+    private static void testPositionInRange(IPartitioner partitioner, Splitter splitter, Range<Token> range)
+    {
+        Range<Token> actualRange = range;
+        //full range case
+        if (range.left.equals(range.right))
+        {
+            actualRange = new Range<>(partitioner.getMinimumToken(), partitioner.getMaximumToken());
+        }
+        assertEquals(0.0, splitter.positionInRange(actualRange.left, range), 0.01);
+        assertEquals(0.25, splitter.positionInRange(getTokenInPosition(partitioner, actualRange, 0.25), range), 0.01);
+        assertEquals(0.37, splitter.positionInRange(getTokenInPosition(partitioner, actualRange, 0.373), range), 0.01);
+        assertEquals(0.5, splitter.positionInRange(getTokenInPosition(partitioner, actualRange, 0.5), range), 0.01);
+        assertEquals(0.75, splitter.positionInRange(getTokenInPosition(partitioner, actualRange, 0.75), range), 0.01);
+        assertEquals(0.99, splitter.positionInRange(getTokenInPosition(partitioner, actualRange, 0.999), range), 0.01);
+        assertEquals(1.0, splitter.positionInRange(actualRange.right, range), 0.01);
+    }
+
+    private static Token getTokenInPosition(IPartitioner partitioner, Range<Token> range, double position)
+    {
+        if (range.left.equals(range.right))
+        {
+            range = new Range<>(partitioner.getMinimumToken(), partitioner.getMaximumToken());
+        }
+        Splitter splitter = getSplitter(partitioner);
+        BigInteger totalTokens = splitter.tokensInRange(range);
+        BigInteger elapsedTokens = BigDecimal.valueOf(position).multiply(new BigDecimal(totalTokens)).toBigInteger();
+        BigInteger tokenInPosition = splitter.valueForToken(range.left).add(elapsedTokens);
+        return getWrappedToken(partitioner, tokenInPosition);
+    }
+
+    private static Token getWrappedToken(IPartitioner partitioner, BigInteger position)
+    {
+        Splitter splitter = getSplitter(partitioner);
+        BigInteger maxTokenValue = splitter.valueForToken(partitioner.getMaximumToken());
+        BigInteger minTokenValue = splitter.valueForToken(partitioner.getMinimumToken());
+        if (position.compareTo(maxTokenValue) > 0)
+        {
+            position = minTokenValue.add(position.subtract(maxTokenValue));
+        }
+        return splitter.tokenForValue(position);
+    }
+
+    private static Splitter getSplitter(IPartitioner partitioner)
+    {
+        return partitioner.splitter().orElseThrow(() -> new AssertionError(partitioner.getClass() + " must have a splitter"));
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[2/2] cassandra git commit: Parallelize initial materialized view build

Posted by ad...@apache.org.
Parallelize initial materialized view build

patch by Andres de la Peña; reviewed by Paulo Motta for CASSANDRA-12245


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4c80eeec
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4c80eeec
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4c80eeec

Branch: refs/heads/trunk
Commit: 4c80eeece37d79f434078224a0504400ae10a20d
Parents: 88b244a
Author: Andrés de la Peña <a....@gmail.com>
Authored: Sun Jul 9 14:42:14 2017 +0100
Committer: Andrés de la Peña <a....@gmail.com>
Committed: Fri Dec 1 14:58:12 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |   8 +
 conf/cassandra.yaml                             |   3 +
 doc/source/cql/mvs.rst                          |   5 +
 doc/source/operating/metrics.rst                |   1 +
 .../org/apache/cassandra/config/Config.java     |   1 +
 .../cassandra/config/DatabaseDescriptor.java    |  13 +
 src/java/org/apache/cassandra/db/Keyspace.java  |   2 +-
 .../org/apache/cassandra/db/SystemKeyspace.java |  78 +++--
 .../db/compaction/CompactionManager.java        |  88 +++--
 .../db/compaction/CompactionManagerMBean.java   |  22 ++
 src/java/org/apache/cassandra/db/view/View.java |  20 +-
 .../apache/cassandra/db/view/ViewBuilder.java   | 305 ++++++++--------
 .../cassandra/db/view/ViewBuilderTask.java      | 250 +++++++++++++
 .../apache/cassandra/db/view/ViewManager.java   |  15 +-
 src/java/org/apache/cassandra/dht/Splitter.java | 129 +++++++
 .../apache/cassandra/io/sstable/SSTable.java    |   7 +
 .../org/apache/cassandra/schema/Schema.java     |   3 +-
 .../cassandra/service/StorageService.java       |  14 +-
 .../cassandra/service/StorageServiceMBean.java  |   4 +-
 .../org/apache/cassandra/tools/NodeProbe.java   |  10 +
 .../org/apache/cassandra/tools/NodeTool.java    |   2 +
 .../nodetool/GetConcurrentViewBuilders.java     |  33 ++
 .../nodetool/SetConcurrentViewBuilders.java     |  39 +++
 .../org/apache/cassandra/cql3/ViewTest.java     |  23 +-
 .../cassandra/db/view/ViewBuilderTaskTest.java  | 135 ++++++++
 .../org/apache/cassandra/dht/SplitterTest.java  | 347 ++++++++++++++++++-
 27 files changed, 1315 insertions(+), 243 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 009dcb5..56458f8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Parallelize initial materialized view build (CASSANDRA-12245)
  * Fix flaky SecondaryIndexManagerTest.assert[Not]MarkedAsBuilt (CASSANDRA-13965)
  * Make LWTs send resultset metadata on every request (CASSANDRA-13992)
  * Fix flaky indexWithFailedInitializationIsNotQueryableAfterPartialRebuild (CASSANDRA-13963)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index de7d58a..510577e 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -30,6 +30,10 @@ New features
      immediately upon creation via hard-linking the files. This means that incomplete
      segments will be available in cdc_raw rather than fully flushed. See documentation
      and CASSANDRA-12148 for more detail.
+   - The initial build of materialized views can be parallelized. The number of concurrent builder
+     threads is specified by the property `cassandra.yaml:concurrent_materialized_view_builders`.
+     This property can be modified at runtime through both JMX and the new `setconcurrentviewbuilders`
+     and `getconcurrentviewbuilders` nodetool commands. See CASSANDRA-12245 for more details.
 
 Upgrading
 ---------
@@ -74,6 +78,10 @@ Upgrading
 	- Cassandra 4.0 allows a single port to be used for both secure and insecure
 	  connections between cassandra nodes (CASSANDRA-10404). See the yaml for
 	  specific property changes, and see the security doc for full details.
+    - Due to the parallelization of the initial build of materialized views,
+      the per token range view building status is stored in the new table
+      `system.view_builds_in_progress`. The old table `system.views_builds_in_progress`
+      is no longer used and can be removed. See CASSANDRA-12245 for more details.
 
 Materialized Views
 -------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index e41af17..7328a01 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -754,6 +754,9 @@ column_index_cache_size_in_kb: 2
 # Values less than one are interpreted as unbounded (the default)
 # concurrent_validations: 0
 
+# Number of simultaneous materialized view builder tasks to allow.
+concurrent_materialized_view_builders: 1
+
 # Throttles compaction to the given total throughput across the entire
 # system. The faster you insert data, the faster you need to compact in
 # order to keep the sstable count down, but in general, setting this to

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/doc/source/cql/mvs.rst
----------------------------------------------------------------------
diff --git a/doc/source/cql/mvs.rst b/doc/source/cql/mvs.rst
index 55ede22..200090a 100644
--- a/doc/source/cql/mvs.rst
+++ b/doc/source/cql/mvs.rst
@@ -62,6 +62,11 @@ Creating a materialized view has 3 main parts:
 Attempting to create an already existing materialized view will return an error unless the ``IF NOT EXISTS`` option is
 used. If it is used, the statement will be a no-op if the materialized view already exists.
 
+.. note:: By default, materialized views are built in a single thread. The initial build can be parallelized by
+   increasing the number of threads specified by the property ``concurrent_materialized_view_builders`` in
+   ``cassandra.yaml``. This property can also be manipulated at runtime through both JMX and the
+   ``setconcurrentviewbuilders`` and ``getconcurrentviewbuilders`` nodetool commands.
+
 .. _mv-select:
 
 MV select statement

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/doc/source/operating/metrics.rst
----------------------------------------------------------------------
diff --git a/doc/source/operating/metrics.rst b/doc/source/operating/metrics.rst
index 6559b53..2df1cf8 100644
--- a/doc/source/operating/metrics.rst
+++ b/doc/source/operating/metrics.rst
@@ -227,6 +227,7 @@ PerDiskMemtableFlushWriter_0 internal       Responsible for writing a spec (ther
 Sampler                      internal       Responsible for re-sampling the index summaries of SStables
 SecondaryIndexManagement     internal       Performs updates to secondary indexes
 ValidationExecutor           internal       Performs validation compaction or scrubbing
+ViewBuildExecutor            internal       Performs materialized views initial build
 ============================ ============== ===========
 
 .. |nbsp| unicode:: 0xA0 .. nonbreaking space

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index de193b0..f63d94d 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -166,6 +166,7 @@ public class Config
     public int min_free_space_per_drive_in_mb = 50;
 
     public volatile int concurrent_validations = Integer.MAX_VALUE;
+    public volatile int concurrent_materialized_view_builders = 1;
 
     /**
      * @deprecated retry support removed on CASSANDRA-10992

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index af1cbde..58c0bf4 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -589,6 +589,9 @@ public class DatabaseDescriptor
         if (conf.concurrent_compactors <= 0)
             throw new ConfigurationException("concurrent_compactors should be strictly greater than 0, but was " + conf.concurrent_compactors, false);
 
+        if (conf.concurrent_materialized_view_builders <= 0)
+            throw new ConfigurationException("concurrent_materialized_view_builders should be strictly greater than 0, but was " + conf.concurrent_materialized_view_builders, false);
+
         if (conf.num_tokens > MAX_NUM_TOKENS)
             throw new ConfigurationException(String.format("A maximum number of %d tokens per node is supported", MAX_NUM_TOKENS), false);
 
@@ -1516,6 +1519,16 @@ public class DatabaseDescriptor
         conf.concurrent_validations = value;
     }
 
+    public static int getConcurrentViewBuilders()
+    {
+        return conf.concurrent_materialized_view_builders;
+    }
+
+    public static void setConcurrentViewBuilders(int value)
+    {
+        conf.concurrent_materialized_view_builders = value;
+    }
+
     public static long getMinFreeSpacePerDriveInBytes()
     {
         return conf.min_free_space_per_drive_in_mb * 1024L * 1024L;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index d814ac7..c3e649a 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -332,7 +332,7 @@ public class Keyspace
             logger.trace("Initializing {}.{}", getName(), cfm.name);
             initCf(Schema.instance.getTableMetadataRef(cfm.id), loadSSTables);
         }
-        this.viewManager.reload();
+        this.viewManager.reload(false);
     }
 
     private Keyspace(KeyspaceMetadata metadata)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 2ffae11..9da0f6b 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -99,7 +99,7 @@ public final class SystemKeyspace
     public static final String SIZE_ESTIMATES = "size_estimates";
     public static final String AVAILABLE_RANGES = "available_ranges";
     public static final String TRANSFERRED_RANGES = "transferred_ranges";
-    public static final String VIEWS_BUILDS_IN_PROGRESS = "views_builds_in_progress";
+    public static final String VIEW_BUILDS_IN_PROGRESS = "view_builds_in_progress";
     public static final String BUILT_VIEWS = "built_views";
     public static final String PREPARED_STATEMENTS = "prepared_statements";
     public static final String REPAIRS = "repairs";
@@ -262,15 +262,17 @@ public final class SystemKeyspace
               + "PRIMARY KEY ((operation, keyspace_name), peer))")
               .build();
 
-    private static final TableMetadata ViewsBuildsInProgress =
-        parse(VIEWS_BUILDS_IN_PROGRESS,
+    private static final TableMetadata ViewBuildsInProgress =
+        parse(VIEW_BUILDS_IN_PROGRESS,
               "views builds current progress",
               "CREATE TABLE %s ("
               + "keyspace_name text,"
               + "view_name text,"
+              + "start_token varchar,"
+              + "end_token varchar,"
               + "last_token varchar,"
-              + "generation_number int,"
-              + "PRIMARY KEY ((keyspace_name), view_name))")
+              + "keys_built bigint,"
+              + "PRIMARY KEY ((keyspace_name), view_name, start_token, end_token))")
               .build();
 
     private static final TableMetadata BuiltViews =
@@ -337,7 +339,7 @@ public final class SystemKeyspace
                          SizeEstimates,
                          AvailableRanges,
                          TransferredRanges,
-                         ViewsBuildsInProgress,
+                         ViewBuildsInProgress,
                          BuiltViews,
                          PreparedStatements,
                          Repairs);
@@ -457,23 +459,15 @@ public final class SystemKeyspace
 
     public static void setViewRemoved(String keyspaceName, String viewName)
     {
-        String buildReq = "DELETE FROM %S.%s WHERE keyspace_name = ? AND view_name = ? IF EXISTS";
-        executeInternal(String.format(buildReq, SchemaConstants.SYSTEM_KEYSPACE_NAME, VIEWS_BUILDS_IN_PROGRESS), keyspaceName, viewName);
-        forceBlockingFlush(VIEWS_BUILDS_IN_PROGRESS);
+        String buildReq = "DELETE FROM %s.%s WHERE keyspace_name = ? AND view_name = ?";
+        executeInternal(String.format(buildReq, SchemaConstants.SYSTEM_KEYSPACE_NAME, VIEW_BUILDS_IN_PROGRESS), keyspaceName, viewName);
+        forceBlockingFlush(VIEW_BUILDS_IN_PROGRESS);
 
         String builtReq = "DELETE FROM %s.\"%s\" WHERE keyspace_name = ? AND view_name = ? IF EXISTS";
         executeInternal(String.format(builtReq, SchemaConstants.SYSTEM_KEYSPACE_NAME, BUILT_VIEWS), keyspaceName, viewName);
         forceBlockingFlush(BUILT_VIEWS);
     }
 
-    public static void beginViewBuild(String ksname, String viewName, int generationNumber)
-    {
-        executeInternal(format("INSERT INTO system.%s (keyspace_name, view_name, generation_number) VALUES (?, ?, ?)", VIEWS_BUILDS_IN_PROGRESS),
-                        ksname,
-                        viewName,
-                        generationNumber);
-    }
-
     public static void finishViewBuildStatus(String ksname, String viewName)
     {
         // We flush the view built first, because if we fail now, we'll restart at the last place we checkpointed
@@ -482,8 +476,8 @@ public final class SystemKeyspace
         // Also, if writing to the built_view succeeds, but the view_builds_in_progress deletion fails, we will be able
         // to skip the view build next boot.
         setViewBuilt(ksname, viewName, false);
-        executeInternal(String.format("DELETE FROM system.%s WHERE keyspace_name = ? AND view_name = ? IF EXISTS", VIEWS_BUILDS_IN_PROGRESS), ksname, viewName);
-        forceBlockingFlush(VIEWS_BUILDS_IN_PROGRESS);
+        executeInternal(String.format("DELETE FROM system.%s WHERE keyspace_name = ? AND view_name = ?", VIEW_BUILDS_IN_PROGRESS), ksname, viewName);
+        forceBlockingFlush(VIEW_BUILDS_IN_PROGRESS);
     }
 
     public static void setViewBuiltReplicated(String ksname, String viewName)
@@ -491,33 +485,41 @@ public final class SystemKeyspace
         setViewBuilt(ksname, viewName, true);
     }
 
-    public static void updateViewBuildStatus(String ksname, String viewName, Token token)
+    public static void updateViewBuildStatus(String ksname, String viewName, Range<Token> range, Token lastToken, long keysBuilt)
     {
-        String req = "INSERT INTO system.%s (keyspace_name, view_name, last_token) VALUES (?, ?, ?)";
-        Token.TokenFactory factory = ViewsBuildsInProgress.partitioner.getTokenFactory();
-        executeInternal(format(req, VIEWS_BUILDS_IN_PROGRESS), ksname, viewName, factory.toString(token));
+        String req = "INSERT INTO system.%s (keyspace_name, view_name, start_token, end_token, last_token, keys_built) VALUES (?, ?, ?, ?, ?, ?)";
+        Token.TokenFactory factory = ViewBuildsInProgress.partitioner.getTokenFactory();
+        executeInternal(format(req, VIEW_BUILDS_IN_PROGRESS),
+                        ksname,
+                        viewName,
+                        factory.toString(range.left),
+                        factory.toString(range.right),
+                        factory.toString(lastToken),
+                        keysBuilt);
     }
 
-    public static Pair<Integer, Token> getViewBuildStatus(String ksname, String viewName)
+    public static Map<Range<Token>, Pair<Token, Long>> getViewBuildStatus(String ksname, String viewName)
     {
-        String req = "SELECT generation_number, last_token FROM system.%s WHERE keyspace_name = ? AND view_name = ?";
-        UntypedResultSet queryResultSet = executeInternal(format(req, VIEWS_BUILDS_IN_PROGRESS), ksname, viewName);
-        if (queryResultSet == null || queryResultSet.isEmpty())
-            return null;
+        String req = "SELECT start_token, end_token, last_token, keys_built FROM system.%s WHERE keyspace_name = ? AND view_name = ?";
+        Token.TokenFactory factory = ViewBuildsInProgress.partitioner.getTokenFactory();
+        UntypedResultSet rs = executeInternal(format(req, VIEW_BUILDS_IN_PROGRESS), ksname, viewName);
 
-        UntypedResultSet.Row row = queryResultSet.one();
+        if (rs == null || rs.isEmpty())
+            return Collections.emptyMap();
 
-        Integer generation = null;
-        Token lastKey = null;
-        if (row.has("generation_number"))
-            generation = row.getInt("generation_number");
-        if (row.has("last_key"))
+        Map<Range<Token>, Pair<Token, Long>> status = new HashMap<>();
+        for (UntypedResultSet.Row row : rs)
         {
-            Token.TokenFactory factory = ViewsBuildsInProgress.partitioner.getTokenFactory();
-            lastKey = factory.fromString(row.getString("last_key"));
-        }
+            Token start = factory.fromString(row.getString("start_token"));
+            Token end = factory.fromString(row.getString("end_token"));
+            Range<Token> range = new Range<>(start, end);
+
+            Token lastToken = row.has("last_token") ? factory.fromString(row.getString("last_token")) : null;
+            long keysBuilt = row.has("keys_built") ? row.getLong("keys_built") : 0;
 
-        return Pair.create(generation, lastKey);
+            status.put(range, Pair.create(lastToken, keysBuilt));
+        }
+        return status;
     }
 
     public static synchronized void saveTruncationRecord(ColumnFamilyStore cfs, long truncatedAt, CommitLogPosition position)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 3ff9c24..a615c03 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -51,7 +51,7 @@ import org.apache.cassandra.db.lifecycle.SSTableIntervalTree;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
 import org.apache.cassandra.db.lifecycle.View;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
-import org.apache.cassandra.db.view.ViewBuilder;
+import org.apache.cassandra.db.view.ViewBuilderTask;
 import org.apache.cassandra.dht.Bounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
@@ -124,8 +124,9 @@ public class CompactionManager implements CompactionManagerMBean
     private final CompactionExecutor executor = new CompactionExecutor();
     private final CompactionExecutor validationExecutor = new ValidationExecutor();
     private final static CompactionExecutor cacheCleanupExecutor = new CacheCleanupExecutor();
+    private final CompactionExecutor viewBuildExecutor = new ViewBuildExecutor();
 
-    private final CompactionMetrics metrics = new CompactionMetrics(executor, validationExecutor);
+    private final CompactionMetrics metrics = new CompactionMetrics(executor, validationExecutor, viewBuildExecutor);
     private final Multiset<ColumnFamilyStore> compactingCF = ConcurrentHashMultiset.create();
 
     private final RateLimiter compactionRateLimiter = RateLimiter.create(Double.MAX_VALUE);
@@ -216,6 +217,7 @@ public class CompactionManager implements CompactionManagerMBean
         // shutdown executors to prevent further submission
         executor.shutdown();
         validationExecutor.shutdown();
+        viewBuildExecutor.shutdown();
 
         // interrupt compactions and validations
         for (Holder compactionHolder : CompactionMetrics.getCompactions())
@@ -226,7 +228,7 @@ public class CompactionManager implements CompactionManagerMBean
         // wait for tasks to terminate
         // compaction tasks are interrupted above, so it shuold be fairy quick
         // until not interrupted tasks to complete.
-        for (ExecutorService exec : Arrays.asList(executor, validationExecutor))
+        for (ExecutorService exec : Arrays.asList(executor, validationExecutor, viewBuildExecutor))
         {
             try
             {
@@ -1718,31 +1720,21 @@ public class CompactionManager implements CompactionManagerMBean
         }
     }
 
-    public Future<?> submitViewBuilder(final ViewBuilder builder)
+    public ListenableFuture<Long> submitViewBuilder(final ViewBuilderTask task)
     {
-        Runnable runnable = new Runnable()
-        {
-            public void run()
+        return viewBuildExecutor.submitIfRunning(() -> {
+            metrics.beginCompaction(task);
+            try
             {
-                metrics.beginCompaction(builder);
-                try
-                {
-                    builder.run();
-                }
-                finally
-                {
-                    metrics.finishCompaction(builder);
-                }
+                return task.call();
             }
-        };
-        if (executor.isShutdown())
-        {
-            logger.info("Compaction executor has shut down, not submitting index build");
-            return null;
-        }
-
-        return executor.submit(runnable);
+            finally
+            {
+                metrics.finishCompaction(task);
+            }
+        }, "view build");
     }
+
     public int getActiveCompactions()
     {
         return CompactionMetrics.getCompactions().size();
@@ -1817,7 +1809,7 @@ public class CompactionManager implements CompactionManagerMBean
          * @return the future that will deliver the task result, or a future that has already been
          *         cancelled if the task could not be submitted.
          */
-        public ListenableFuture<?> submitIfRunning(Callable<?> task, String name)
+        public <T> ListenableFuture<T> submitIfRunning(Callable<T> task, String name)
         {
             if (isShutdown())
             {
@@ -1827,7 +1819,7 @@ public class CompactionManager implements CompactionManagerMBean
 
             try
             {
-                ListenableFutureTask ret = ListenableFutureTask.create(task);
+                ListenableFutureTask<T> ret = ListenableFutureTask.create(task);
                 execute(ret);
                 return ret;
             }
@@ -1851,6 +1843,14 @@ public class CompactionManager implements CompactionManagerMBean
         }
     }
 
+    private static class ViewBuildExecutor extends CompactionExecutor
+    {
+        public ViewBuildExecutor()
+        {
+            super(DatabaseDescriptor.getConcurrentViewBuilders(), "ViewBuildExecutor");
+        }
+    }
+
     private static class CacheCleanupExecutor extends CompactionExecutor
     {
         public CacheCleanupExecutor()
@@ -1974,6 +1974,22 @@ public class CompactionManager implements CompactionManagerMBean
         validationExecutor.setMaximumPoolSize(value);
     }
 
+    public void setConcurrentViewBuilders(int value)
+    {
+        if (value > viewBuildExecutor.getCorePoolSize())
+        {
+            // we are increasing the value
+            viewBuildExecutor.setMaximumPoolSize(value);
+            viewBuildExecutor.setCorePoolSize(value);
+        }
+        else if (value < viewBuildExecutor.getCorePoolSize())
+        {
+            // we are reducing the value
+            viewBuildExecutor.setCorePoolSize(value);
+            viewBuildExecutor.setMaximumPoolSize(value);
+        }
+    }
+
     public int getCoreCompactorThreads()
     {
         return executor.getCorePoolSize();
@@ -2014,6 +2030,26 @@ public class CompactionManager implements CompactionManagerMBean
         validationExecutor.setMaximumPoolSize(number);
     }
 
+    public int getCoreViewBuildThreads()
+    {
+        return viewBuildExecutor.getCorePoolSize();
+    }
+
+    public void setCoreViewBuildThreads(int number)
+    {
+        viewBuildExecutor.setCorePoolSize(number);
+    }
+
+    public int getMaximumViewBuildThreads()
+    {
+        return viewBuildExecutor.getMaximumPoolSize();
+    }
+
+    public void setMaximumViewBuildThreads(int number)
+    {
+        viewBuildExecutor.setMaximumPoolSize(number);
+    }
+
     /**
      * Try to stop all of the compactions for given ColumnFamilies.
      *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java b/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
index 8785b41..b98b371 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
@@ -116,4 +116,26 @@ public interface CompactionManagerMBean
      * @param number New maximum of validator threads
      */
     public void setMaximumValidatorThreads(int number);
+
+    /**
+     * Returns core size of view build thread pool
+     */
+    public int getCoreViewBuildThreads();
+
+    /**
+     * Allows user to resize maximum size of the view build thread pool.
+     * @param number New maximum of view build threads
+     */
+    public void setCoreViewBuildThreads(int number);
+
+    /**
+     * Returns size of view build thread pool
+     */
+    public int getMaximumViewBuildThreads();
+
+    /**
+     * Allows user to resize maximum size of the view build thread pool.
+     * @param number New maximum of view build threads
+     */
+    public void setMaximumViewBuildThreads(int number);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/db/view/View.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/View.java b/src/java/org/apache/cassandra/db/view/View.java
index f601673..f6545b0 100644
--- a/src/java/org/apache/cassandra/db/view/View.java
+++ b/src/java/org/apache/cassandra/db/view/View.java
@@ -28,7 +28,6 @@ import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.cql3.statements.ParsedStatement;
 import org.apache.cassandra.cql3.statements.SelectStatement;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.KeyspaceMetadata;
@@ -192,15 +191,22 @@ public class View
 
     public synchronized void build()
     {
-        if (this.builder != null)
+        stopBuild();
+        builder = new ViewBuilder(baseCfs, this);
+        builder.start();
+    }
+
+    /**
+     * Stops the building of this view, no-op if it isn't building.
+     */
+    synchronized void stopBuild()
+    {
+        if (builder != null)
         {
             logger.debug("Stopping current view builder due to schema change");
-            this.builder.stop();
-            this.builder = null;
+            builder.stop();
+            builder = null;
         }
-
-        this.builder = new ViewBuilder(baseCfs, this);
-        CompactionManager.instance.submitViewBuilder(builder);
     }
 
     @Nullable

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/db/view/ViewBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilder.java b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
index fcb1e98..8187a57 100644
--- a/src/java/org/apache/cassandra/db/view/ViewBuilder.java
+++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
@@ -18,217 +18,224 @@
 
 package org.apache.cassandra.db.view;
 
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import javax.annotation.Nullable;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.ScheduledExecutors;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.compaction.CompactionInfo;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.compaction.CompactionInterruptedException;
 import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.compaction.OperationType;
-import org.apache.cassandra.db.lifecycle.SSTableSet;
-import org.apache.cassandra.db.partitions.*;
-import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.sstable.ReducingKeyIterator;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.repair.SystemDistributedKeyspace;
-import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.UUIDGen;
-import org.apache.cassandra.utils.concurrent.Refs;
 
-public class ViewBuilder extends CompactionInfo.Holder
+import static java.util.stream.Collectors.toList;
+
+/**
+ * Builds a materialized view for the local token ranges.
+ * <p>
+ * The build is split in at least {@link #NUM_TASKS} {@link ViewBuilderTask tasks}, suitable of being parallelized by
+ * the {@link CompactionManager} which will execute them.
+ */
+class ViewBuilder
 {
-    private final ColumnFamilyStore baseCfs;
-    private final View view;
-    private final UUID compactionId;
-    private volatile Token prevToken = null;
+    private static final Logger logger = LoggerFactory.getLogger(ViewBuilderTask.class);
 
-    private static final Logger logger = LoggerFactory.getLogger(ViewBuilder.class);
+    private static final int NUM_TASKS = Runtime.getRuntime().availableProcessors() * 4;
 
+    private final ColumnFamilyStore baseCfs;
+    private final View view;
+    private final String ksName;
+    private final UUID localHostId = SystemKeyspace.getLocalHostId();
+    private final Set<Range<Token>> builtRanges = Sets.newConcurrentHashSet();
+    private final Map<Range<Token>, Pair<Token, Long>> pendingRanges = Maps.newConcurrentMap();
+    private final Set<ViewBuilderTask> tasks = Sets.newConcurrentHashSet();
+    private volatile long keysBuilt = 0;
     private volatile boolean isStopped = false;
+    private volatile Future<?> future = Futures.immediateFuture(null);
 
-    public ViewBuilder(ColumnFamilyStore baseCfs, View view)
+    ViewBuilder(ColumnFamilyStore baseCfs, View view)
     {
         this.baseCfs = baseCfs;
         this.view = view;
-        compactionId = UUIDGen.getTimeUUID();
+        ksName = baseCfs.metadata.keyspace;
     }
 
-    private void buildKey(DecoratedKey key)
+    public void start()
     {
-        ReadQuery selectQuery = view.getReadQuery();
-
-        if (!selectQuery.selectsKey(key))
+        if (SystemKeyspace.isViewBuilt(ksName, view.name))
         {
-            logger.trace("Skipping {}, view query filters", key);
-            return;
+            logger.debug("View already marked built for {}.{}", ksName, view.name);
+            if (!SystemKeyspace.isViewStatusReplicated(ksName, view.name))
+                updateDistributed();
         }
-
-        int nowInSec = FBUtilities.nowInSeconds();
-        SinglePartitionReadCommand command = view.getSelectStatement().internalReadForView(key, nowInSec);
-
-        // We're rebuilding everything from what's on disk, so we read everything, consider that as new updates
-        // and pretend that there is nothing pre-existing.
-        UnfilteredRowIterator empty = UnfilteredRowIterators.noRowsIterator(baseCfs.metadata(), key, Rows.EMPTY_STATIC_ROW, DeletionTime.LIVE, false);
-
-        try (ReadExecutionController orderGroup = command.executionController();
-             UnfilteredRowIterator data = UnfilteredPartitionIterators.getOnlyElement(command.executeLocally(orderGroup), command))
+        else
         {
-            Iterator<Collection<Mutation>> mutations = baseCfs.keyspace.viewManager
-                                                      .forTable(baseCfs.metadata.id)
-                                                      .generateViewUpdates(Collections.singleton(view), data, empty, nowInSec, true);
+            SystemDistributedKeyspace.startViewBuild(ksName, view.name, localHostId);
+
+            logger.debug("Starting build of view({}.{}). Flushing base table {}.{}",
+                         ksName, view.name, ksName, baseCfs.name);
+            baseCfs.forceBlockingFlush();
 
-            AtomicLong noBase = new AtomicLong(Long.MAX_VALUE);
-            mutations.forEachRemaining(m -> StorageProxy.mutateMV(key.getKey(), m, true, noBase, System.nanoTime()));
+            loadStatusAndBuild();
         }
     }
 
-    public void run()
+    private void loadStatusAndBuild()
+    {
+        loadStatus();
+        build();
+    }
+
+    private void loadStatus()
     {
-        logger.debug("Starting view builder for {}.{}", baseCfs.metadata.keyspace, view.name);
-        UUID localHostId = SystemKeyspace.getLocalHostId();
-        String ksname = baseCfs.metadata.keyspace, viewName = view.name;
+        builtRanges.clear();
+        pendingRanges.clear();
+        SystemKeyspace.getViewBuildStatus(ksName, view.name)
+                      .forEach((range, pair) ->
+                               {
+                                   Token lastToken = pair.left;
+                                   if (lastToken != null && lastToken.equals(range.right))
+                                   {
+                                       builtRanges.add(range);
+                                       keysBuilt += pair.right;
+                                   }
+                                   else
+                                   {
+                                       pendingRanges.put(range, pair);
+                                   }
+                               });
+    }
 
-        if (SystemKeyspace.isViewBuilt(ksname, viewName))
+    private synchronized void build()
+    {
+        if (isStopped)
         {
-            logger.debug("View already marked built for {}.{}", baseCfs.metadata.keyspace, view.name);
-            if (!SystemKeyspace.isViewStatusReplicated(ksname, viewName))
-                updateDistributed(ksname, viewName, localHostId);
+            logger.debug("Stopped build for view({}.{}) after covering {} keys", ksName, view.name, keysBuilt);
             return;
         }
 
-        Iterable<Range<Token>> ranges = StorageService.instance.getLocalRanges(baseCfs.metadata.keyspace);
-        final Pair<Integer, Token> buildStatus = SystemKeyspace.getViewBuildStatus(ksname, viewName);
-        Token lastToken;
-        Function<org.apache.cassandra.db.lifecycle.View, Iterable<SSTableReader>> function;
-        if (buildStatus == null)
-        {
-            logger.debug("Starting new view build. flushing base table {}.{}", baseCfs.metadata.keyspace, baseCfs.name);
-            lastToken = null;
-
-            //We don't track the generation number anymore since if a rebuild is stopped and
-            //restarted the max generation filter may yield no sstables due to compactions.
-            //We only care about max generation *during* a build, not across builds.
-            //see CASSANDRA-13405
-            SystemKeyspace.beginViewBuild(ksname, viewName, 0);
-        }
-        else
+        // Get the local ranges for which the view hasn't already been built nor it's building
+        Set<Range<Token>> newRanges = StorageService.instance.getLocalRanges(ksName)
+                                                             .stream()
+                                                             .map(r -> r.subtractAll(builtRanges))
+                                                             .flatMap(Set::stream)
+                                                             .map(r -> r.subtractAll(pendingRanges.keySet()))
+                                                             .flatMap(Set::stream)
+                                                             .collect(Collectors.toSet());
+
+        // If there are no new nor pending ranges we should finish the build
+        if (newRanges.isEmpty() && pendingRanges.isEmpty())
         {
-            lastToken = buildStatus.right;
-            logger.debug("Resuming view build from token {}. flushing base table {}.{}", lastToken, baseCfs.metadata.keyspace, baseCfs.name);
+            finish();
+            return;
         }
 
-        baseCfs.forceBlockingFlush();
-        function = org.apache.cassandra.db.lifecycle.View.selectFunction(SSTableSet.CANONICAL);
-
-        prevToken = lastToken;
-        long keysBuilt = 0;
-        try (Refs<SSTableReader> sstables = baseCfs.selectAndReference(function).refs;
-             ReducingKeyIterator iter = new ReducingKeyIterator(sstables))
+        // Split the new local ranges and add them to the pending set
+        DatabaseDescriptor.getPartitioner()
+                          .splitter()
+                          .map(s -> s.split(newRanges, NUM_TASKS))
+                          .orElse(newRanges)
+                          .forEach(r -> pendingRanges.put(r, Pair.<Token, Long>create(null, 0L)));
+
+        // Submit a new view build task for each building range.
+        // We keep record of all the submitted tasks to be able of stopping them.
+        List<ListenableFuture<Long>> futures = pendingRanges.entrySet()
+                                                            .stream()
+                                                            .map(e -> new ViewBuilderTask(baseCfs,
+                                                                                          view,
+                                                                                          e.getKey(),
+                                                                                          e.getValue().left,
+                                                                                          e.getValue().right))
+                                                            .peek(tasks::add)
+                                                            .map(CompactionManager.instance::submitViewBuilder)
+                                                            .collect(toList());
+
+        // Add a callback to process any eventual new local range and mark the view as built, doing a delayed retry if
+        // the tasks don't succeed
+        ListenableFuture<List<Long>> future = Futures.allAsList(futures);
+        Futures.addCallback(future, new FutureCallback<List<Long>>()
         {
-            SystemDistributedKeyspace.startViewBuild(ksname, viewName, localHostId);
-            while (!isStopped && iter.hasNext())
+            public void onSuccess(List<Long> result)
             {
-                DecoratedKey key = iter.next();
-                Token token = key.getToken();
-                if (lastToken == null || lastToken.compareTo(token) < 0)
-                {
-                    for (Range<Token> range : ranges)
-                    {
-                        if (range.contains(token))
-                        {
-                            buildKey(key);
-                            ++keysBuilt;
-
-                            if (prevToken == null || prevToken.compareTo(token) != 0)
-                            {
-                                SystemKeyspace.updateViewBuildStatus(ksname, viewName, key.getToken());
-                                prevToken = token;
-                            }
-                        }
-                    }
-
-                    lastToken = null;
-                }
+                keysBuilt += result.stream().mapToLong(x -> x).sum();
+                builtRanges.addAll(pendingRanges.keySet());
+                pendingRanges.clear();
+                build();
             }
 
-            if (!isStopped)
+            public void onFailure(Throwable t)
             {
-                logger.debug("Marking view({}.{}) as built covered {} keys ", ksname, viewName, keysBuilt);
-                SystemKeyspace.finishViewBuildStatus(ksname, viewName);
-                updateDistributed(ksname, viewName, localHostId);
-            }
-            else
-            {
-                logger.debug("Stopped build for view({}.{}) after covering {} keys", ksname, viewName, keysBuilt);
+                if (t instanceof CompactionInterruptedException)
+                {
+                    internalStop(true);
+                    keysBuilt = tasks.stream().mapToLong(ViewBuilderTask::keysBuilt).sum();
+                    logger.info("Interrupted build for view({}.{}) after covering {} keys", ksName, view.name, keysBuilt);
+                }
+                else
+                {
+                    ScheduledExecutors.nonPeriodicTasks.schedule(() -> loadStatusAndBuild(), 5, TimeUnit.MINUTES);
+                    logger.warn("Materialized View failed to complete, sleeping 5 minutes before restarting", t);
+                }
             }
-        }
-        catch (Exception e)
-        {
-            ScheduledExecutors.nonPeriodicTasks.schedule(() -> CompactionManager.instance.submitViewBuilder(this),
-                                                         5,
-                                                         TimeUnit.MINUTES);
-            logger.warn("Materialized View failed to complete, sleeping 5 minutes before restarting", e);
-        }
+        }, MoreExecutors.directExecutor());
+        this.future = future;
+    }
+
+    private void finish()
+    {
+        logger.debug("Marking view({}.{}) as built after covering {} keys ", ksName, view.name, keysBuilt);
+        SystemKeyspace.finishViewBuildStatus(ksName, view.name);
+        updateDistributed();
     }
 
-    private void updateDistributed(String ksname, String viewName, UUID localHostId)
+    private void updateDistributed()
     {
         try
         {
-            SystemDistributedKeyspace.successfulViewBuild(ksname, viewName, localHostId);
-            SystemKeyspace.setViewBuiltReplicated(ksname, viewName);
+            SystemDistributedKeyspace.successfulViewBuild(ksName, view.name, localHostId);
+            SystemKeyspace.setViewBuiltReplicated(ksName, view.name);
         }
         catch (Exception e)
         {
-            ScheduledExecutors.nonPeriodicTasks.schedule(() -> CompactionManager.instance.submitViewBuilder(this),
-                                                         5,
-                                                         TimeUnit.MINUTES);
-            logger.warn("Failed to updated the distributed status of view, sleeping 5 minutes before retrying", e);
+            ScheduledExecutors.nonPeriodicTasks.schedule(this::updateDistributed, 5, TimeUnit.MINUTES);
+            logger.warn("Failed to update the distributed status of view, sleeping 5 minutes before retrying", e);
         }
     }
 
-    public CompactionInfo getCompactionInfo()
+    /**
+     * Stops the view building.
+     */
+    synchronized void stop()
     {
-        long rangesLeft = 0, rangesTotal = 0;
-        Token lastToken = prevToken;
-
-        // This approximation is not very accurate, but since we do not have a method which allows us to calculate the
-        // percentage of a range covered by a second range, this is the best approximation that we can calculate.
-        // Instead, we just count the total number of ranges that haven't been seen by the node (we use the order of
-        // the tokens to determine whether they have been seen yet or not), and the total number of ranges that a node
-        // has.
-        for (Range<Token> range : StorageService.instance.getLocalRanges(baseCfs.keyspace.getName()))
-        {
-            rangesLeft++;
-            rangesTotal++;
-            // This will reset rangesLeft, so that the number of ranges left will be less than the total ranges at the
-            // end of the method.
-            if (lastToken == null || range.contains(lastToken))
-                rangesLeft = 0;
-        }
-
-        return new CompactionInfo(baseCfs.metadata(), OperationType.VIEW_BUILD, rangesLeft, rangesTotal, "ranges", compactionId);
+        boolean wasStopped = isStopped;
+        internalStop(false);
+        if (!wasStopped)
+            FBUtilities.waitOnFuture(future);
     }
 
-    public void stop()
+    private void internalStop(boolean isCompactionInterrupted)
     {
         isStopped = true;
+        tasks.forEach(task -> task.stop(isCompactionInterrupted));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/db/view/ViewBuilderTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilderTask.java b/src/java/org/apache/cassandra/db/view/ViewBuilderTask.java
new file mode 100644
index 0000000..0273c17
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/view/ViewBuilderTask.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.view;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Function;
+import com.google.common.base.Objects;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+import com.google.common.util.concurrent.Futures;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.ReadExecutionController;
+import org.apache.cassandra.db.ReadQuery;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.compaction.CompactionInfo;
+import org.apache.cassandra.db.compaction.CompactionInterruptedException;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.SSTableSet;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.db.rows.Rows;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterators;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.ReducingKeyIterator;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+import org.apache.cassandra.utils.concurrent.Refs;
+
+public class ViewBuilderTask extends CompactionInfo.Holder implements Callable<Long>
+{
+    private static final Logger logger = LoggerFactory.getLogger(ViewBuilderTask.class);
+
+    private static final int ROWS_BETWEEN_CHECKPOINTS = 1000;
+
+    private final ColumnFamilyStore baseCfs;
+    private final View view;
+    private final Range<Token> range;
+    private final UUID compactionId;
+    private volatile Token prevToken;
+    private volatile long keysBuilt = 0;
+    private volatile boolean isStopped = false;
+    private volatile boolean isCompactionInterrupted = false;
+
+    ViewBuilderTask(ColumnFamilyStore baseCfs, View view, Range<Token> range, Token lastToken, long keysBuilt)
+    {
+        this.baseCfs = baseCfs;
+        this.view = view;
+        this.range = range;
+        this.compactionId = UUIDGen.getTimeUUID();
+        this.prevToken = lastToken;
+        this.keysBuilt = keysBuilt;
+    }
+
+    private void buildKey(DecoratedKey key)
+    {
+        ReadQuery selectQuery = view.getReadQuery();
+
+        if (!selectQuery.selectsKey(key))
+        {
+            logger.trace("Skipping {}, view query filters", key);
+            return;
+        }
+
+        int nowInSec = FBUtilities.nowInSeconds();
+        SinglePartitionReadCommand command = view.getSelectStatement().internalReadForView(key, nowInSec);
+
+        // We're rebuilding everything from what's on disk, so we read everything, consider that as new updates
+        // and pretend that there is nothing pre-existing.
+        UnfilteredRowIterator empty = UnfilteredRowIterators.noRowsIterator(baseCfs.metadata(), key, Rows.EMPTY_STATIC_ROW, DeletionTime.LIVE, false);
+
+        try (ReadExecutionController orderGroup = command.executionController();
+             UnfilteredRowIterator data = UnfilteredPartitionIterators.getOnlyElement(command.executeLocally(orderGroup), command))
+        {
+            Iterator<Collection<Mutation>> mutations = baseCfs.keyspace.viewManager
+                                                       .forTable(baseCfs.metadata.id)
+                                                       .generateViewUpdates(Collections.singleton(view), data, empty, nowInSec, true);
+
+            AtomicLong noBase = new AtomicLong(Long.MAX_VALUE);
+            mutations.forEachRemaining(m -> StorageProxy.mutateMV(key.getKey(), m, true, noBase, System.nanoTime()));
+        }
+    }
+
+    public Long call()
+    {
+        String ksName = baseCfs.metadata.keyspace;
+
+        if (prevToken == null)
+            logger.debug("Starting new view build for range {}", range);
+        else
+            logger.debug("Resuming view build for range {} from token {} with {} covered keys", range, prevToken, keysBuilt);
+
+        Function<org.apache.cassandra.db.lifecycle.View, Iterable<SSTableReader>> function;
+        function = org.apache.cassandra.db.lifecycle.View.select(SSTableSet.CANONICAL, s -> range.intersects(s.getBounds()));
+
+        try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(function);
+             Refs<SSTableReader> sstables = viewFragment.refs;
+             ReducingKeyIterator keyIter = new ReducingKeyIterator(sstables))
+        {
+            PeekingIterator<DecoratedKey> iter = Iterators.peekingIterator(keyIter);
+            while (!isStopped && iter.hasNext())
+            {
+                DecoratedKey key = iter.next();
+                Token token = key.getToken();
+                //skip tokens already built or not present in range
+                if (range.contains(token) && (prevToken == null || token.compareTo(prevToken) > 0))
+                {
+                    buildKey(key);
+                    ++keysBuilt;
+                    //build other keys sharing the same token
+                    while (iter.hasNext() && iter.peek().getToken().equals(token))
+                    {
+                        key = iter.next();
+                        buildKey(key);
+                        ++keysBuilt;
+                    }
+                    if (keysBuilt % ROWS_BETWEEN_CHECKPOINTS == 1)
+                        SystemKeyspace.updateViewBuildStatus(ksName, view.name, range, token, keysBuilt);
+                    prevToken = token;
+                }
+            }
+        }
+
+        finish();
+
+        return keysBuilt;
+    }
+
+    private void finish()
+    {
+        String ksName = baseCfs.keyspace.getName();
+        if (!isStopped)
+        {
+            // Save the completed status using the end of the range as last token. This way it will be possible for
+            // future view build attempts to don't even create a task for this range
+            SystemKeyspace.updateViewBuildStatus(ksName, view.name, range, range.right, keysBuilt);
+
+            logger.debug("Completed build of view({}.{}) for range {} after covering {} keys ", ksName, view.name, range, keysBuilt);
+        }
+        else
+        {
+            logger.debug("Stopped build for view({}.{}) for range {} after covering {} keys", ksName, view.name, range, keysBuilt);
+
+            // If it's stopped due to a compaction interruption we should throw that exception.
+            // Otherwise we assume that the task has been stopped due to a schema update and we can finish successfully.
+            if (isCompactionInterrupted)
+                throw new StoppedException(ksName, view.name, getCompactionInfo());
+        }
+    }
+
+    @Override
+    public CompactionInfo getCompactionInfo()
+    {
+        // If there's splitter, calculate progress based on last token position
+        if (range.left.getPartitioner().splitter().isPresent())
+        {
+            long progress = prevToken == null ? 0 : Math.round(prevToken.getPartitioner().splitter().get().positionInRange(prevToken, range) * 1000);
+            return new CompactionInfo(baseCfs.metadata(), OperationType.VIEW_BUILD, progress, 1000, "token range parts", compactionId);
+        }
+
+        // When there is no splitter, estimate based on number of total keys but
+        // take the max with keysBuilt + 1 to avoid having more completed than total
+        long keysTotal = Math.max(keysBuilt + 1, baseCfs.estimatedKeysForRange(range));
+        return new CompactionInfo(baseCfs.metadata(), OperationType.VIEW_BUILD, keysBuilt, keysTotal, "keys", compactionId);
+    }
+
+    @Override
+    public void stop()
+    {
+        stop(true);
+    }
+
+    synchronized void stop(boolean isCompactionInterrupted)
+    {
+        isStopped = true;
+        this.isCompactionInterrupted = isCompactionInterrupted;
+    }
+
+    long keysBuilt()
+    {
+        return keysBuilt;
+    }
+
+    /**
+     * {@link CompactionInterruptedException} with {@link Object#equals(Object)} and {@link Object#hashCode()}
+     * implementations that consider equals all the exceptions produced by the same view build, independently of their
+     * token range.
+     * <p>
+     * This is used to avoid Guava's {@link Futures#allAsList(Iterable)} log spamming when multiple build tasks fail
+     * due to compaction interruption.
+     */
+    static class StoppedException extends CompactionInterruptedException
+    {
+        private final String ksName, viewName;
+
+        private StoppedException(String ksName, String viewName, CompactionInfo info)
+        {
+            super(info);
+            this.ksName = ksName;
+            this.viewName = viewName;
+        }
+
+        @Override
+        public boolean equals(Object o)
+        {
+            if (!(o instanceof StoppedException))
+                return false;
+
+            StoppedException that = (StoppedException) o;
+            return Objects.equal(this.ksName, that.ksName) && Objects.equal(this.viewName, that.viewName);
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return 31 * ksName.hashCode() + viewName.hashCode();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/db/view/ViewManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewManager.java b/src/java/org/apache/cassandra/db/view/ViewManager.java
index cf731dd..8506d82 100644
--- a/src/java/org/apache/cassandra/db/view/ViewManager.java
+++ b/src/java/org/apache/cassandra/db/view/ViewManager.java
@@ -93,7 +93,7 @@ public class ViewManager
         return viewsByName.values();
     }
 
-    public void reload()
+    public void reload(boolean buildAllViews)
     {
         Map<String, ViewMetadata> newViewsByName = new HashMap<>();
         for (ViewMetadata definition : keyspace.getMetadata().views)
@@ -113,6 +113,9 @@ public class ViewManager
                 addView(entry.getValue());
         }
 
+        if (!buildAllViews)
+            return;
+
         // Building views involves updating view build status in the system_distributed
         // keyspace and therefore it requires ring information. This check prevents builds
         // being submitted when Keyspaces are initialized during CassandraDaemon::setup as
@@ -163,6 +166,16 @@ public class ViewManager
         SystemDistributedKeyspace.setViewRemoved(keyspace.getName(), view.name);
     }
 
+    /**
+     * Stops the building of the specified view, no-op if it isn't building.
+     *
+     * @param name the name of the view
+     */
+    public void stopBuild(String name)
+    {
+        viewsByName.get(name).stopBuild();
+    }
+
     public View getByName(String name)
     {
         return viewsByName.get(name);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/dht/Splitter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Splitter.java b/src/java/org/apache/cassandra/dht/Splitter.java
index 4433f97..c63fe91 100644
--- a/src/java/org/apache/cassandra/dht/Splitter.java
+++ b/src/java/org/apache/cassandra/dht/Splitter.java
@@ -18,10 +18,19 @@
 
 package org.apache.cassandra.dht;
 
+import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
+
+import static java.util.stream.Collectors.toSet;
 
 /**
  * Partition splitter.
@@ -35,10 +44,79 @@ public abstract class Splitter
         this.partitioner = partitioner;
     }
 
+    @VisibleForTesting
     protected abstract Token tokenForValue(BigInteger value);
 
+    @VisibleForTesting
     protected abstract BigInteger valueForToken(Token token);
 
+    @VisibleForTesting
+    protected BigInteger tokensInRange(Range<Token> range)
+    {
+        //full range case
+        if (range.left.equals(range.right))
+            return tokensInRange(new Range(partitioner.getMinimumToken(), partitioner.getMaximumToken()));
+
+        BigInteger totalTokens = BigInteger.ZERO;
+        for (Range<Token> unwrapped : range.unwrap())
+        {
+            totalTokens = totalTokens.add(valueForToken(token(unwrapped.right)).subtract(valueForToken(unwrapped.left))).abs();
+        }
+        return totalTokens;
+    }
+
+    /**
+     * Computes the number of elapsed tokens from the range start until this token
+     * @return the number of tokens from the range start to the token
+     */
+    @VisibleForTesting
+    protected BigInteger elapsedTokens(Token token, Range<Token> range)
+    {
+        // No token elapsed since range does not contain token
+        if (!range.contains(token))
+            return BigInteger.ZERO;
+
+        BigInteger elapsedTokens = BigInteger.ZERO;
+        for (Range<Token> unwrapped : range.unwrap())
+        {
+            if (unwrapped.contains(token))
+            {
+                elapsedTokens = elapsedTokens.add(tokensInRange(new Range<>(unwrapped.left, token)));
+            }
+            else if (token.compareTo(unwrapped.left) < 0)
+            {
+                elapsedTokens = elapsedTokens.add(tokensInRange(unwrapped));
+            }
+        }
+        return elapsedTokens;
+    }
+
+    /**
+     * Computes the normalized position of this token relative to this range
+     * @return A number between 0.0 and 1.0 representing this token's position
+     * in this range or -1.0 if this range doesn't contain this token.
+     */
+    public double positionInRange(Token token, Range<Token> range)
+    {
+        //full range case
+        if (range.left.equals(range.right))
+            return positionInRange(token, new Range(partitioner.getMinimumToken(), partitioner.getMaximumToken()));
+
+        // leftmost token means we are on position 0.0
+        if (token.equals(range.left))
+            return 0.0;
+
+        // rightmost token means we are on position 1.0
+        if (token.equals(range.right))
+            return 1.0;
+
+        // Impossible to find position when token is not contained in range
+        if (!range.contains(token))
+            return -1.0;
+
+        return new BigDecimal(elapsedTokens(token, range)).divide(new BigDecimal(tokensInRange(range)), 3, BigDecimal.ROUND_HALF_EVEN).doubleValue();
+    }
+
     public List<Token> splitOwnedRanges(int parts, List<Range<Token>> localRanges, boolean dontSplitRanges)
     {
         if (localRanges.isEmpty() || parts == 1)
@@ -127,4 +205,55 @@ public abstract class Splitter
         return t.equals(partitioner.getMinimumToken()) ? partitioner.getMaximumToken() : t;
     }
 
+    /**
+     * Splits the specified token ranges in at least {@code parts} subranges.
+     * <p>
+     * Each returned subrange will be contained in exactly one of the specified ranges.
+     *
+     * @param ranges a collection of token ranges to be split
+     * @param parts the minimum number of returned ranges
+     * @return at least {@code minParts} token ranges covering {@code ranges}
+     */
+    public Set<Range<Token>> split(Collection<Range<Token>> ranges, int parts)
+    {
+        int numRanges = ranges.size();
+        if (numRanges >= parts)
+        {
+            return Sets.newHashSet(ranges);
+        }
+        else
+        {
+            int partsPerRange = (int) Math.ceil((double) parts / numRanges);
+            return ranges.stream()
+                         .map(range -> split(range, partsPerRange))
+                         .flatMap(Collection::stream)
+                         .collect(toSet());
+        }
+    }
+
+    /**
+     * Splits the specified token range in at least {@code minParts} subranges, unless the range has not enough tokens
+     * in which case the range will be returned without splitting.
+     *
+     * @param range a token range
+     * @param parts the number of subranges
+     * @return {@code parts} even subranges of {@code range}
+     */
+    private Set<Range<Token>> split(Range<Token> range, int parts)
+    {
+        // the range might not have enough tokens to split
+        BigInteger numTokens = tokensInRange(range);
+        if (BigInteger.valueOf(parts).compareTo(numTokens) > 0)
+            return Collections.singleton(range);
+
+        Token left = range.left;
+        Set<Range<Token>> subranges = new HashSet<>(parts);
+        for (double i = 1; i <= parts; i++)
+        {
+            Token right = partitioner.split(range.left, range.right, i / parts);
+            subranges.add(new Range<>(left, right));
+            left = right;
+        }
+        return subranges;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/io/sstable/SSTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java
index 3018fc1..f4d3706 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@ -33,7 +33,9 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.db.BufferDecoratedKey;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.util.DiskOptimizationStrategy;
 import org.apache.cassandra.io.util.FileUtils;
@@ -343,4 +345,9 @@ public abstract class SSTable
         appendTOC(descriptor, componentsToAdd);
         components.addAll(componentsToAdd);
     }
+
+    public AbstractBounds<Token> getBounds()
+    {
+        return AbstractBounds.bounds(first.getToken(), true, last.getToken(), true);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/schema/Schema.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Schema.java b/src/java/org/apache/cassandra/schema/Schema.java
index e79e3bd..711724b 100644
--- a/src/java/org/apache/cassandra/schema/Schema.java
+++ b/src/java/org/apache/cassandra/schema/Schema.java
@@ -632,7 +632,7 @@ public final class Schema
         viewsDiff.entriesDiffering().values().forEach(diff -> alterView(diff.rightValue()));
 
         // deal with all removed, added, and altered views
-        Keyspace.open(before.name).viewManager.reload();
+        Keyspace.open(before.name).viewManager.reload(true);
 
         // notify on everything dropped
         udasDiff.entriesOnlyOnLeft().values().forEach(this::notifyDropAggregate);
@@ -691,6 +691,7 @@ public final class Schema
 
     private void dropView(ViewMetadata metadata)
     {
+        Keyspace.open(metadata.keyspace).viewManager.stopBuild(metadata.name);
         dropTable(metadata.metadata);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index cb942b9..c1202be 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -96,7 +96,6 @@ import org.apache.cassandra.service.paxos.PrepareVerbHandler;
 import org.apache.cassandra.service.paxos.ProposeVerbHandler;
 import org.apache.cassandra.streaming.*;
 import org.apache.cassandra.tracing.TraceKeyspace;
-import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.*;
 import org.apache.cassandra.utils.progress.ProgressEvent;
@@ -1392,6 +1391,19 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         CompactionManager.instance.setConcurrentValidations(DatabaseDescriptor.getConcurrentValidations());
     }
 
+    public int getConcurrentViewBuilders()
+    {
+        return DatabaseDescriptor.getConcurrentViewBuilders();
+    }
+
+    public void setConcurrentViewBuilders(int value)
+    {
+        if (value <= 0)
+            throw new IllegalArgumentException("Number of concurrent view builders should be greater than 0.");
+        DatabaseDescriptor.setConcurrentViewBuilders(value);
+        CompactionManager.instance.setConcurrentViewBuilders(DatabaseDescriptor.getConcurrentViewBuilders());
+    }
+
     public boolean isIncrementalBackupsEnabled()
     {
         return DatabaseDescriptor.isIncrementalBackupsEnabled();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index c4548ae..48e1b2f 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
@@ -520,6 +519,9 @@ public interface StorageServiceMBean extends NotificationEmitter
     public int getConcurrentValidators();
     public void setConcurrentValidators(int value);
 
+    public int getConcurrentViewBuilders();
+    public void setConcurrentViewBuilders(int value);
+
     public boolean isIncrementalBackupsEnabled();
     public void setIncrementalBackupsEnabled(boolean value);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 0912534..0de00f7 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -1053,6 +1053,16 @@ public class NodeProbe implements AutoCloseable
         return ssProxy.getConcurrentCompactors();
     }
 
+    public void setConcurrentViewBuilders(int value)
+    {
+        ssProxy.setConcurrentViewBuilders(value);
+    }
+
+    public int getConcurrentViewBuilders()
+    {
+        return ssProxy.getConcurrentViewBuilders();
+    }
+
     public void setMaxHintWindow(int value)
     {
         spProxy.setMaxHintWindow(value);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/tools/NodeTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java
index 8618d87..0db422e 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -110,6 +110,8 @@ public class NodeTool
                 SetCompactionThroughput.class,
                 GetConcurrentCompactors.class,
                 SetConcurrentCompactors.class,
+                GetConcurrentViewBuilders.class,
+                SetConcurrentViewBuilders.class,
                 SetTimeout.class,
                 SetStreamThroughput.class,
                 SetInterDCStreamThroughput.class,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/tools/nodetool/GetConcurrentViewBuilders.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/GetConcurrentViewBuilders.java b/src/java/org/apache/cassandra/tools/nodetool/GetConcurrentViewBuilders.java
new file mode 100644
index 0000000..c189fb0
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/nodetool/GetConcurrentViewBuilders.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.nodetool;
+
+import io.airlift.airline.Command;
+import org.apache.cassandra.tools.NodeProbe;
+import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
+
+@Command(name = "getconcurrentviewbuilders", description = "Get the number of concurrent view builders in the system")
+public class GetConcurrentViewBuilders extends NodeToolCmd
+{
+    protected void execute(NodeProbe probe)
+    {
+        System.out.println("Current number of concurrent view builders in the system is: \n" +
+                           probe.getConcurrentViewBuilders());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/src/java/org/apache/cassandra/tools/nodetool/SetConcurrentViewBuilders.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/SetConcurrentViewBuilders.java b/src/java/org/apache/cassandra/tools/nodetool/SetConcurrentViewBuilders.java
new file mode 100644
index 0000000..96adf2c
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/nodetool/SetConcurrentViewBuilders.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.nodetool;
+
+import io.airlift.airline.Arguments;
+import io.airlift.airline.Command;
+import org.apache.cassandra.tools.NodeProbe;
+import org.apache.cassandra.tools.NodeTool;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+@Command(name = "setconcurrentviewbuilders", description = "Set the number of concurrent view builders in the system")
+public class SetConcurrentViewBuilders extends NodeTool.NodeToolCmd
+{
+    @Arguments(title = "concurrent_view_builders", usage = "<value>", description = "Number of concurrent view builders, greater than 0.", required = true)
+    private Integer concurrentViewBuilders = null;
+
+    protected void execute(NodeProbe probe)
+    {
+        checkArgument(concurrentViewBuilders > 0, "concurrent_view_builders should be great than 0.");
+        probe.setConcurrentViewBuilders(concurrentViewBuilders);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/test/unit/org/apache/cassandra/cql3/ViewTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/ViewTest.java b/test/unit/org/apache/cassandra/cql3/ViewTest.java
index 4fd4df6..2b95574 100644
--- a/test/unit/org/apache/cassandra/cql3/ViewTest.java
+++ b/test/unit/org/apache/cassandra/cql3/ViewTest.java
@@ -1320,8 +1320,7 @@ public class ViewTest extends CQLTester
         }
     }
 
-    @Test
-    public void testViewBuilderResume() throws Throwable
+    private void testViewBuilderResume(int concurrentViewBuilders) throws Throwable
     {
         createTable("CREATE TABLE %s (" +
                     "k int, " +
@@ -1332,6 +1331,7 @@ public class ViewTest extends CQLTester
         execute("USE " + keyspace());
         executeNet(protocolVersion, "USE " + keyspace());
 
+        CompactionManager.instance.setConcurrentViewBuilders(concurrentViewBuilders);
         CompactionManager.instance.setCoreCompactorThreads(1);
         CompactionManager.instance.setMaximumCompactorThreads(1);
         ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
@@ -1357,21 +1357,32 @@ public class ViewTest extends CQLTester
 
         cfs.forceBlockingFlush();
 
-        createView("mv_test", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE val IS NOT NULL AND k IS NOT NULL AND c IS NOT NULL PRIMARY KEY (val,k,c)");
+        String viewName1 = "mv_test_" + concurrentViewBuilders;
+        createView(viewName1, "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE val IS NOT NULL AND k IS NOT NULL AND c IS NOT NULL PRIMARY KEY (val,k,c)");
 
         cfs.enableAutoCompaction();
         List<Future<?>> futures = CompactionManager.instance.submitBackground(cfs);
 
+        String viewName2 = viewName1 + "_2";
         //Force a second MV on the same base table, which will restart the first MV builder...
-        createView("mv_test2", "CREATE MATERIALIZED VIEW %s AS SELECT val, k, c FROM %%s WHERE val IS NOT NULL AND k IS NOT NULL AND c IS NOT NULL PRIMARY KEY (val,k,c)");
+        createView(viewName2, "CREATE MATERIALIZED VIEW %s AS SELECT val, k, c FROM %%s WHERE val IS NOT NULL AND k IS NOT NULL AND c IS NOT NULL PRIMARY KEY (val,k,c)");
 
 
         //Compact the base table
         FBUtilities.waitOnFutures(futures);
 
-        while (!SystemKeyspace.isViewBuilt(keyspace(), "mv_test"))
+        while (!SystemKeyspace.isViewBuilt(keyspace(), viewName1))
             Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
 
-        assertRows(execute("SELECT count(*) FROM mv_test"), row(1024L));
+        assertRows(execute("SELECT count(*) FROM " + viewName1), row(1024L));
+    }
+
+    @Test
+    public void testViewBuilderResume() throws Throwable
+    {
+        for (int i = 1; i <= 8; i *= 2)
+        {
+            testViewBuilderResume(i);
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c80eeec/test/unit/org/apache/cassandra/db/view/ViewBuilderTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/view/ViewBuilderTaskTest.java b/test/unit/org/apache/cassandra/db/view/ViewBuilderTaskTest.java
new file mode 100644
index 0000000..2341c73
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/view/ViewBuilderTaskTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.view;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.junit.Test;
+
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.transport.ProtocolVersion;
+
+import static org.junit.Assert.assertEquals;
+
+public class ViewBuilderTaskTest extends CQLTester
+{
+    private static final ProtocolVersion protocolVersion = ProtocolVersion.CURRENT;
+
+    @Test
+    public void testBuildRange() throws Throwable
+    {
+        requireNetwork();
+        execute("USE " + keyspace());
+        executeNet(protocolVersion, "USE " + keyspace());
+
+        String tableName = createTable("CREATE TABLE %s (" +
+                                       "k int, " +
+                                       "c int, " +
+                                       "v text, " +
+                                       "PRIMARY KEY(k, c))");
+
+        String viewName = tableName + "_view";
+        executeNet(protocolVersion, String.format("CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s " +
+                                                  "WHERE v IS NOT NULL AND k IS NOT NULL AND c IS NOT NULL " +
+                                                  "PRIMARY KEY (v, k, c)", viewName));
+
+        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+        View view = cfs.keyspace.viewManager.forTable(cfs.metadata().id).iterator().next();
+
+        // Insert the dataset
+        for (int k = 0; k < 100; k++)
+            for (int c = 0; c < 10; c++)
+                execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", k, c, String.valueOf(k));
+
+        // Retrieve the sorted tokens of the inserted rows
+        IPartitioner partitioner = cfs.metadata().partitioner;
+        List<Token> tokens = IntStream.range(0, 100)
+                                      .mapToObj(Int32Type.instance::decompose)
+                                      .map(partitioner::getToken)
+                                      .sorted()
+                                      .collect(Collectors.toList());
+
+        class Tester
+        {
+            private void test(int indexOfStartToken,
+                              int indexOfEndToken,
+                              Integer indexOfLastToken,
+                              long keysBuilt,
+                              long expectedKeysBuilt,
+                              int expectedRowsInView) throws Throwable
+            {
+                // Truncate the materialized view (not the base table)
+                cfs.viewManager.forceBlockingFlush();
+                cfs.viewManager.truncateBlocking(cfs.forceBlockingFlush(), System.currentTimeMillis());
+                assertRowCount(execute("SELECT * FROM " + viewName), 0);
+
+                // Get the tokens from the referenced inserted rows
+                Token startToken = tokens.get(indexOfStartToken);
+                Token endToken = tokens.get(indexOfEndToken);
+                Token lastToken = indexOfLastToken == null ? null : tokens.get(indexOfLastToken);
+                Range<Token> range = new Range<>(startToken, endToken);
+
+                // Run the view build task, verifying the returned number of bult keys
+                long actualKeysBuilt = new ViewBuilderTask(cfs, view, range, lastToken, keysBuilt).call();
+                assertEquals(expectedKeysBuilt, actualKeysBuilt);
+
+                // Verify that the rows have been written to the MV
+                assertRowCount(execute("SELECT * FROM " + viewName), expectedRowsInView);
+
+                // Verify that the last position and number of bult keys have been stored
+                assertRows(execute(String.format("SELECT last_token, keys_built " +
+                                                 "FROM %s.%s WHERE keyspace_name='%s' AND view_name='%s' " +
+                                                 "AND start_token=? AND end_token=?",
+                                                 SchemaConstants.SYSTEM_KEYSPACE_NAME,
+                                                 SystemKeyspace.VIEW_BUILDS_IN_PROGRESS,
+                                                 keyspace(),
+                                                 viewName),
+                                   startToken.toString(), endToken.toString()),
+                           row(endToken.toString(), expectedKeysBuilt));
+            }
+        }
+        Tester tester = new Tester();
+
+        // Build range from rows 0 to 100 without any recorded start position
+        tester.test(0, 10, null, 0, 10, 100);
+
+        // Build range from rows 100 to 200 starting at row 150
+        tester.test(10, 20, 15, 0, 5, 50);
+
+        // Build range from rows 300 to 400 starting at row 350 with 10 built keys
+        tester.test(30, 40, 35, 10, 15, 50);
+
+        // Build range from rows 400 to 500 starting at row 100 (out of range) with 10 built keys
+        tester.test(40, 50, 10, 10, 20, 100);
+
+        // Build range from rows 900 to 100 (wrap around) without any recorded start position
+        tester.test(90, 10, null, 0, 20, 200);
+
+        executeNet(protocolVersion, "DROP MATERIALIZED VIEW " + view.name);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org