You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by "Paulo Motta (JIRA)" <ji...@apache.org> on 2017/11/09 01:42:01 UTC

[jira] [Comment Edited] (CASSANDRA-12245) initial view build can be parallel

    [ https://issues.apache.org/jira/browse/CASSANDRA-12245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16245074#comment-16245074 ] 

Paulo Motta edited comment on CASSANDRA-12245 at 11/9/17 1:41 AM:
------------------------------------------------------------------

Getting back to this after a while, sorry about the delay. We are really close now, I think we can wrap up after this last round of review. See follow-up below:

bq. This makes a lot of sense. I'm worried about creating thousands of tasks for large datasets if the number of tasks is relative to the amount of data. Instead, I think we could fix the number of partitions to the higher reasonable number of parallel tasks, something like a multiple of the number of available processors. This would provide the desired immediate performance improvement if the user increases the number of concurrent view builders while keeping the number of tasks limited, independently of the amount of data. What do you think? Does it make any sense?

Awesome! One minor thing is that we should probably only split the view build tasks at all if the base table is larger than a given size (let's say 500MB or so?), to avoid 4 * num_processor flushes for base tables with negligible size, WDYT?

bq. One case that we hadn't considered is that if the token ranges change or are split in a different way when resuming a build then the progress would be lost, because ViewBuildTask won't found any entry for the new range at system.view_builds_in_progress. This would be specially true if we split the ranges by their data size. So, independently of how we finally split the ranges, I think it makes sense to load all the ranges with any progress from system.view_builds_in_progress at ViewBuilder before splitting the local ranges, create a task for those of them that are not already finish, and then split any remaining uncovered local range. It also has the advantage of skipping the creation of tasks for already completed ranges. What do you think?

Good idea! This will prevent a node from computing new tasks for already built subranges if a new node joins the ring during view build. I will add a dtest to verify that scenario on CASSANDRA-13762.

bq. I have also removed the method SystemKeyspace.beginViewBuild because I don't see any need of saving a range without progress. Indeed, if the view build is restarted it is probably better to don't restore the task without progress and let their tokens to be processed by the split logic. 

+1

bq. The first wait does the opposite to _wait_for_view, it waits until there is some progress. But we can use self._wait_for_view("ks", "t_by_v") here. Did you mean that?

Yep.

bq. Makes sense. The new test for ViewBuilderTask is here, and I've extended ViewTest.testViewBuilderResume to run with different number of concurrent view builders.

Awesome, this looks much better now, thanks!

I had a final skim through the patch and I think we covered everything and this looks much more robust now. Three minor things before we can close this:
- I noticed we don't stop in-progess view builds when a view is removed, would you mind adding that?
- ViewBuildExecutor is being constructed with minThreads=1 and maxPoolSize=concurrent_materialized_view_builders, but according to the {{DebuggableThreadPoolExecutor}}'s' [javadoc|https://github.com/apache/cassandra/blob/8b3a60b9a7dbefeecc06bace617279612ec7092d/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java#L33], this will actually make the executor with size 1 since maxPoolSize is not supported by {{DebuggableThreadPoolExecutor}} - and even if it were, new threads would only be created after the queue of the initial threads were full (which is quite unintuitive), but we actually want the pool to have concurrent_materialized_view_builders concurrent threads at most, so we should use the {{threadCount}} constructor instead - at some point we should actually remove the maximumPoolSize constructors from the TPE's since it can be quite confusing.
- In the linked dtest branch results, {{base_replica_repair_with_contention_test}} is showing the following error:
{noformat}
ERROR [InternalResponseStage:1] 2017-10-11 11:03:16,005 CassandraDaemon.java:211 - Exception in thread Thread[InternalResponseStage:1,5,main]
java.lang.RuntimeException: javax.management.InstanceAlreadyExistsException: org.apache.cassandra.db:type=Tables,keyspace=ks,table=t
	at org.apache.cassandra.db.ColumnFamilyStore.<init>(ColumnFamilyStore.java:484) ~[main/:na]
	at org.apache.cassandra.db.ColumnFamilyStore.createColumnFamilyStore(ColumnFamilyStore.java:639) ~[main/:na]
	at org.apache.cassandra.db.ColumnFamilyStore.createColumnFamilyStore(ColumnFamilyStore.java:613) ~[main/:na]
	at org.apache.cassandra.db.ColumnFamilyStore.createColumnFamilyStore(ColumnFamilyStore.java:604) ~[main/:na]
	at org.apache.cassandra.db.Keyspace.initCf(Keyspace.java:420) ~[main/:na]
	at org.apache.cassandra.db.Keyspace.<init>(Keyspace.java:333) ~[main/:na]
	at org.apache.cassandra.db.Keyspace.open(Keyspace.java:133) ~[main/:na]
	at org.apache.cassandra.db.Keyspace.open(Keyspace.java:110) ~[main/:na]
	at org.apache.cassandra.service.StorageService.getRangesForEndpoint(StorageService.java:3467) ~[main/:na]
	at org.apache.cassandra.service.StorageService.getLocalRanges(StorageService.java:161) ~[main/:na]
	at org.apache.cassandra.db.view.ViewBuilder.build(ViewBuilder.java:125) ~[main/:na]
	at org.apache.cassandra.db.view.ViewBuilder.loadStatusAndBuild(ViewBuilder.java:93) ~[main/:na]
	at org.apache.cassandra.db.view.ViewBuilder.<init>(ViewBuilder.java:86) ~[main/:na]
	at org.apache.cassandra.db.view.View.build(View.java:201) ~[main/:na]
	at org.apache.cassandra.db.view.ViewManager.reload(ViewManager.java:132) ~[main/:na]
	at org.apache.cassandra.db.Keyspace.<init>(Keyspace.java:335) ~[main/:na]
	at org.apache.cassandra.db.Keyspace.open(Keyspace.java:133) ~[main/:na]
	at org.apache.cassandra.db.Keyspace.open(Keyspace.java:110) ~[main/:na]
	at org.apache.cassandra.schema.Schema.createKeyspace(Schema.java:647) ~[main/:na]
	at java.util.HashMap$Values.forEach(HashMap.java:980) ~[na:1.8.0_144]
	at java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1080) ~[na:1.8.0_144]
	at org.apache.cassandra.schema.Schema.merge(Schema.java:588) ~[main/:na]
	at org.apache.cassandra.schema.Schema.mergeAndAnnounceVersion(Schema.java:564) ~[main/:na]
	at org.apache.cassandra.schema.MigrationTask$1.response(MigrationTask.java:89) ~[main/:na]
	at org.apache.cassandra.net.ResponseVerbHandler.doVerb(ResponseVerbHandler.java:53) ~[main/:na]
	at org.apache.cassandra.net.MessageDeliveryTask.run(MessageDeliveryTask.java:72) ~[main/:na]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_144]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_144]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_144]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_144]
	at org.apache.cassandra.concurrent.NamedThreadFactory.lambda$threadLocalDeallocator$0(NamedThreadFactory.java:81) [main/:na]
	at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_144]
Caused by: javax.management.InstanceAlreadyExistsException: org.apache.cassandra.db:type=Tables,keyspace=ks,table=t
	at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) ~[na:1.8.0_144]
	at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) ~[na:1.8.0_144]
	at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) ~[na:1.8.0_144]
	at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) ~[na:1.8.0_144]
	at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) ~[na:1.8.0_144]
	at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) ~[na:1.8.0_144]
	at org.apache.cassandra.db.ColumnFamilyStore.<init>(ColumnFamilyStore.java:479) ~[main/:na]
	... 31 common frames omitted
{noformat}

I think we could take a {{buildAllViews}} parameter on reload, and set that to false during Keyspace initialization, since views will be build during daemon initialization and keyspace changes anyway, WDYT?

One last thing, can you please add the new yaml option {{concurrent_materialized_view_builders}} to the configuration section of the doc?

After these nits are addressed, feel free to rebase + squash and prepare for commit to submit a final round of CI. Thanks!


was (Author: pauloricardomg):
Getting back to this after a while, sorry about the delay. We are really close now, I think we can wrap up after this last round of review. See follow-up below:

bq. This makes a lot of sense. I'm worried about creating thousands of tasks for large datasets if the number of tasks is relative to the amount of data. Instead, I think we could fix the number of partitions to the higher reasonable number of parallel tasks, something like a multiple of the number of available processors. This would provide the desired immediate performance improvement if the user increases the number of concurrent view builders while keeping the number of tasks limited, independently of the amount of data. What do you think? Does it make any sense?

Awesome! One minor thing is that we should probably only split the view build tasks at all if the base table is larger than a given size (let's say 500MB or so?), to avoid 4 * num_processor flushes for base tables with negligible size, WDYT?

bq. One case that we hadn't considered is that if the token ranges change or are split in a different way when resuming a build then the progress would be lost, because ViewBuildTask won't found any entry for the new range at system.view_builds_in_progress. This would be specially true if we split the ranges by their data size. So, independently of how we finally split the ranges, I think it makes sense to load all the ranges with any progress from system.view_builds_in_progress at ViewBuilder before splitting the local ranges, create a task for those of them that are not already finish, and then split any remaining uncovered local range. It also has the advantage of skipping the creation of tasks for already completed ranges. What do you think?

Good idea! This will prevent a node from computing new tasks for already built subranges if a new node joins the ring during view build. I will add a dtest to verify that scenario on CASSANDRA-13762.

bq. I have also removed the method SystemKeyspace.beginViewBuild because I don't see any need of saving a range without progress. Indeed, if the view build is restarted it is probably better to don't restore the task without progress and let their tokens to be processed by the split logic. 

+1

bq. The first wait does the opposite to _wait_for_view, it waits until there is some progress. But we can use self._wait_for_view("ks", "t_by_v") here. Did you mean that?

Yep.

bq. Makes sense. The new test for ViewBuilderTask is here, and I've extended ViewTest.testViewBuilderResume to run with different number of concurrent view builders.

Awesome, this looks much better now, thanks!

I had a final skim through the patch and I think we covered everything and this looks much more robust now. Three minor things before we can close this:
- I noticed we don't stop in-progess view builds when a view is removed, would you mind adding that?
- ViewBuildExecutor is being constructed with minThreads=1 and maxPoolSize=concurrent_materialized_view_builders, but according to the {{DebuggableThreadPoolExecutor}}'s' [javadoc|https://github.com/apache/cassandra/blob/8b3a60b9a7dbefeecc06bace617279612ec7092d/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java#L33], this will actually make the executor with size 1 since maxPoolSize is not supported by {{DebuggableThreadPoolExecutor}} - and even if it were, new threads would only be created after the queue of the initial threads were full (which is quite unintuitive), but we actually want the pool to have concurrent_materialized_view_builders concurrent threads at most, so we should use the {{threadCount}} constructor instead - at some point we should actually remove the maximumPoolSize constructors from the TPE's since it can be quite confusing.
- In the linked dtest branch results, {{base_replica_repair_with_contention_test}} is showing the following error:
{noformat}
ERROR [InternalResponseStage:1] 2017-10-11 11:03:16,005 CassandraDaemon.java:211 - Exception in thread Thread[InternalResponseStage:1,5,main]
java.lang.RuntimeException: javax.management.InstanceAlreadyExistsException: org.apache.cassandra.db:type=Tables,keyspace=ks,table=t
	at org.apache.cassandra.db.ColumnFamilyStore.<init>(ColumnFamilyStore.java:484) ~[main/:na]
	at org.apache.cassandra.db.ColumnFamilyStore.createColumnFamilyStore(ColumnFamilyStore.java:639) ~[main/:na]
	at org.apache.cassandra.db.ColumnFamilyStore.createColumnFamilyStore(ColumnFamilyStore.java:613) ~[main/:na]
	at org.apache.cassandra.db.ColumnFamilyStore.createColumnFamilyStore(ColumnFamilyStore.java:604) ~[main/:na]
	at org.apache.cassandra.db.Keyspace.initCf(Keyspace.java:420) ~[main/:na]
	at org.apache.cassandra.db.Keyspace.<init>(Keyspace.java:333) ~[main/:na]
	at org.apache.cassandra.db.Keyspace.open(Keyspace.java:133) ~[main/:na]
	at org.apache.cassandra.db.Keyspace.open(Keyspace.java:110) ~[main/:na]
	at org.apache.cassandra.service.StorageService.getRangesForEndpoint(StorageService.java:3467) ~[main/:na]
	at org.apache.cassandra.service.StorageService.getLocalRanges(StorageService.java:161) ~[main/:na]
	at org.apache.cassandra.db.view.ViewBuilder.build(ViewBuilder.java:125) ~[main/:na]
	at org.apache.cassandra.db.view.ViewBuilder.loadStatusAndBuild(ViewBuilder.java:93) ~[main/:na]
	at org.apache.cassandra.db.view.ViewBuilder.<init>(ViewBuilder.java:86) ~[main/:na]
	at org.apache.cassandra.db.view.View.build(View.java:201) ~[main/:na]
	at org.apache.cassandra.db.view.ViewManager.reload(ViewManager.java:132) ~[main/:na]
	at org.apache.cassandra.db.Keyspace.<init>(Keyspace.java:335) ~[main/:na]
	at org.apache.cassandra.db.Keyspace.open(Keyspace.java:133) ~[main/:na]
	at org.apache.cassandra.db.Keyspace.open(Keyspace.java:110) ~[main/:na]
	at org.apache.cassandra.schema.Schema.createKeyspace(Schema.java:647) ~[main/:na]
	at java.util.HashMap$Values.forEach(HashMap.java:980) ~[na:1.8.0_144]
	at java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1080) ~[na:1.8.0_144]
	at org.apache.cassandra.schema.Schema.merge(Schema.java:588) ~[main/:na]
	at org.apache.cassandra.schema.Schema.mergeAndAnnounceVersion(Schema.java:564) ~[main/:na]
	at org.apache.cassandra.schema.MigrationTask$1.response(MigrationTask.java:89) ~[main/:na]
	at org.apache.cassandra.net.ResponseVerbHandler.doVerb(ResponseVerbHandler.java:53) ~[main/:na]
	at org.apache.cassandra.net.MessageDeliveryTask.run(MessageDeliveryTask.java:72) ~[main/:na]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_144]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_144]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_144]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_144]
	at org.apache.cassandra.concurrent.NamedThreadFactory.lambda$threadLocalDeallocator$0(NamedThreadFactory.java:81) [main/:na]
	at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_144]
Caused by: javax.management.InstanceAlreadyExistsException: org.apache.cassandra.db:type=Tables,keyspace=ks,table=t
	at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) ~[na:1.8.0_144]
	at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) ~[na:1.8.0_144]
	at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) ~[na:1.8.0_144]
	at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) ~[na:1.8.0_144]
	at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) ~[na:1.8.0_144]
	at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) ~[na:1.8.0_144]
	at org.apache.cassandra.db.ColumnFamilyStore.<init>(ColumnFamilyStore.java:479) ~[main/:na]
	... 31 common frames omitted
{noformat}

I think we could take a {{buildAllViews}} parameter on reload, and set that to false during Keyspace initialization, since views will be build during daemon initialization and keyspace changes anyway, WDYT?

After these nits are addressed, feel free to rebase + squash and prepare for commit and submit a final round of CI. Thanks!

> initial view build can be parallel
> ----------------------------------
>
>                 Key: CASSANDRA-12245
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-12245
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Materialized Views
>            Reporter: Tom van der Woerdt
>            Assignee: Andrés de la Peña
>             Fix For: 4.x
>
>
> On a node with lots of data (~3TB) building a materialized view takes several weeks, which is not ideal. It's doing this in a single thread.
> There are several potential ways this can be optimized :
>  * do vnodes in parallel, instead of going through the entire range in one thread
>  * just iterate through sstables, not worrying about duplicates, and include the timestamp of the original write in the MV mutation. since this doesn't exclude duplicates it does increase the amount of work and could temporarily surface ghost rows (yikes) but I guess that's why they call it eventual consistency. doing it this way can avoid holding references to all tables on disk, allows parallelization, and removes the need to check other sstables for existing data. this is essentially the 'do a full repair' path



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org