You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@lucene.apache.org by "Yonik Seeley (JIRA)" <ji...@apache.org> on 2019/04/29 15:15:00 UTC

[jira] [Updated] (SOLR-13431) Efficient updates with shared storage

     [ https://issues.apache.org/jira/browse/SOLR-13431?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Yonik Seeley updated SOLR-13431:
--------------------------------
    Description: 
h2. Background & problem statement:

With shared storage support, data durability is handled by the storage layer (e.g. S3 or HDFS) and replicas are not needed for durability. This changes the nature of how a single update (say adding a document) must be handled. The local transaction log does not help... a node can go down and never come back. The implication is that *a commit must be done for any updates to be considered durable.*

The problem is also more complex than just batching updates and adding a commit at the end of a batch. Consider indexing documents A,B,C,D followed by a commit:
 1) documents A,B sent to leader1 and indexed
 2) leader1 fails, leader2 is elected
 3) documents C,D sent to leader2 and indexed
 4) commit
 After this sequence of events, documents A,B are actually lost because a commit was not done on leader1 before it failed.

Adding a commit for every single update would fix the problem of data loss, but would obviously be too expensive (and each commit will be more expensive We can still do batches if we *disable transparent failover* for a batch.
 - all updates in a batch (for a specific shard) should be indexed on the *same leader*... any change in leadership should result in a failure at the low level instead of any transparent failover or forwarding.
 - in the event of a failure, *all updates since the last commit must be replayed* (we can't just retry the failure itself), or the failure will need to be bubbled up to a higher layer to retry from the beginning.

h2. Indexing scenario 1: CSV upload

If SolrCloud is loading a large CSV file, The receiving Solr node will forward updates to the correct leaders. This happens in the DistributedUpdateProcessor via SolrCmdDistributor, which ends up using a ConcurrentUpdateHttp2SolrClient subclass.

The forward-to-replica use case here is quite different than the forward-to-correct-leader (the latter has the current solr node acting much more like an external client.).  To simpliify development, we may want to separate these cases and continue using the existing code for forward-to-replica. 

h2. Indexing scenario 2: SolrJ bulk indexing

In this scenario, a client is trying to do a large amount of indexing and can use batches or streaming. For this scenario, we could just require that a commit be added for each batch and then fail a batch on any leader change. This is problematic for a couple of reasons:
 - larger batches add latency to build, hurting throughput
 - doesn't scale well - as a collection grows, the number of shards grow and the chance that any shard leader goes down (or the shard is split) goes up. Requiring that the entire batch (all shards) be replayed when this happens is wasteful and gets worse with collection growth.

h2. Proposed Solution: a SolrJ cloud aware streaming client
 - something like ConcurrentUpdateHttp2SolrClient that can stream and know about cloud layout
 - track when last commit happened for each shard leader
 - buffer updates per-shard since the last commit happened
 -- doesn't have to be exact... assume idempotent updates here, so overlap is fine
 -- buffering would also be triggered by the replica type of the collection (so this class could be used for both shared storage and normal NRT replicas) 
 - a parameter would be passed that would disallow any forwarding (since we're handling buffering/failover at this level)
 - on a failure because of a leader going down or loss of leadership, wait until a new leader has been elected and then replay updates since the last commit
 - insert commits where necessary to prevent buffers from growing too large
 -- inserted commits should be able to proceed in parallel... we shouldn't need to block and wait for a commit before resuming to send documents to that leader.
 -- it would be nice if there was a way we could get notified if a commit happened via some other mechanism (like an autoCommit being triggered)
  --- assuming we can't get this, perhaps we should pass a flag that disables triggering auto-commits for these batch updates?
 - handle splits (not only can a shard leader change, but a shard could split... buffered updates may need to be re-slotted)
 - need to handle a leader "bounce" like a change in leadership (assuming we're skipping using the transaction log)
 - multi-threaded - all updates to a leader regardless of thread are managed as a single update stream
 -- this perhaps provides a way to coalesce incremental/realtime updates
 - OPTIONAL: ability to have multiple channels to a single leader?
 -- we would need to avoid reordering updates to the same ID
 -- an alternative to attempting to create more parallelism-per-shard on the client side is to do it on the server side.

  was:
h2. Background & problem statement:

With shared storage support, data durability is handled by the storage layer (e.g. S3 or HDFS) and replicas are not needed for durability. This changes the nature of how a single update (say adding a document) must be handled. The local transaction log does not help... a node can go down and never come back. The implication is that *a commit must be done for any updates to be considered durable.*

The problem is also more complex than just batching updates and adding a commit at the end of a batch. Consider indexing documents A,B,C,D followed by a commit:
 1) documents A,B sent to leader1 and indexed
 2) leader1 fails, leader2 is elected
 3) documents C,D sent to leader2 and indexed
 4) commit
 After this sequence of events, documents A,B are actually lost because a commit was not done on leader1 before it failed.

Adding a commit for every single update would fix the problem of data loss, but would obviously be too expensive (and each commit will be more expensive We can still do batches if we *disable transparent failover* for a batch.
 - all updates in a batch (for a specific shard) should be indexed on the *same leader*... any change in leadership should result in a failure at the low level instead of any transparent failover or forwarding.
 - in the event of a failure, *all updates since the last commit must be replayed* (we can't just retry the failure itself), or the failure will need to be bubbled up to a higher layer to retry from the beginning.

h2. Indexing scenario 1: CSV upload

If SolrCloud is loading a large CSV file, The receiving Solr node will forward updates to the correct leaders. This happens in the DistributedUpdateProcessor via SolrCmdDistributor, which ends up using a ConcurrentUpdateHttp2SolrClient subclass.
h2. Indexing scenario 2: SolrJ bulk indexing

In this scenario, a client is trying to do a large amount of indexing and can use batches or streaming. For this scenario, we could just require that a commit be added for each batch and then fail a batch on any leader change. This is problematic for a couple of reasons:
 - larger batches add latency to build, hurting throughput
 - doesn't scale well - as a collection grows, the number of shards grow and the chance that any shard leader goes down (or the shard is split) goes up. Requiring that the entire batch (all shards) be replayed when this happens is wasteful and gets worse with collection growth.

h2. Proposed Solution: a SolrJ cloud aware streaming client
 - something like ConcurrentUpdateHttp2SolrClient that can stream and know about cloud layout
 - track when last commit happened for each shard leader
 - buffer updates per-shard since the last commit happened
 -- doesn't have to be exact... assume idempotent updates here, so overlap is fine
 -- buffering would also be triggered by the replica type of the collection (so this class could be used for both shared storage and normal NRT replicas) 
 - a parameter would be passed that would disallow any forwarding (since we're handling buffering/failover at this level)
 - on a failure because of a leader going down or loss of leadership, wait until a new leader has been elected and then replay updates since the last commit
 - insert commits where necessary to prevent buffers from growing too large
 -- inserted commits should be able to proceed in parallel... we shouldn't need to block and wait for a commit before resuming to send documents to that leader.
 -- it would be nice if there was a way we could get notified if a commit happened via some other mechanism (like an autoCommit being triggered)
  --- assuming we can't get this, perhaps we should pass a flag that disables triggering auto-commits for these batch updates?
 - handle splits (not only can a shard leader change, but a shard could split... buffered updates may need to be re-slotted)
 - need to handle a leader "bounce" like a change in leadership (assuming we're skipping using the transaction log)
 - multi-threaded - all updates to a leader regardless of thread are managed as a single update stream
 -- this perhaps provides a way to coalesce incremental/realtime updates
 - OPTIONAL: ability to have multiple channels to a single leader?
 -- we would need to avoid reordering updates to the same ID
 -- an alternative to attempting to create more parallelism-per-shard on the client side is to do it on the server side.


> Efficient updates with shared storage
> -------------------------------------
>
>                 Key: SOLR-13431
>                 URL: https://issues.apache.org/jira/browse/SOLR-13431
>             Project: Solr
>          Issue Type: New Feature
>      Security Level: Public(Default Security Level. Issues are Public) 
>            Reporter: Yonik Seeley
>            Priority: Major
>
> h2. Background & problem statement:
> With shared storage support, data durability is handled by the storage layer (e.g. S3 or HDFS) and replicas are not needed for durability. This changes the nature of how a single update (say adding a document) must be handled. The local transaction log does not help... a node can go down and never come back. The implication is that *a commit must be done for any updates to be considered durable.*
> The problem is also more complex than just batching updates and adding a commit at the end of a batch. Consider indexing documents A,B,C,D followed by a commit:
>  1) documents A,B sent to leader1 and indexed
>  2) leader1 fails, leader2 is elected
>  3) documents C,D sent to leader2 and indexed
>  4) commit
>  After this sequence of events, documents A,B are actually lost because a commit was not done on leader1 before it failed.
> Adding a commit for every single update would fix the problem of data loss, but would obviously be too expensive (and each commit will be more expensive We can still do batches if we *disable transparent failover* for a batch.
>  - all updates in a batch (for a specific shard) should be indexed on the *same leader*... any change in leadership should result in a failure at the low level instead of any transparent failover or forwarding.
>  - in the event of a failure, *all updates since the last commit must be replayed* (we can't just retry the failure itself), or the failure will need to be bubbled up to a higher layer to retry from the beginning.
> h2. Indexing scenario 1: CSV upload
> If SolrCloud is loading a large CSV file, The receiving Solr node will forward updates to the correct leaders. This happens in the DistributedUpdateProcessor via SolrCmdDistributor, which ends up using a ConcurrentUpdateHttp2SolrClient subclass.
> The forward-to-replica use case here is quite different than the forward-to-correct-leader (the latter has the current solr node acting much more like an external client.).  To simpliify development, we may want to separate these cases and continue using the existing code for forward-to-replica. 
> h2. Indexing scenario 2: SolrJ bulk indexing
> In this scenario, a client is trying to do a large amount of indexing and can use batches or streaming. For this scenario, we could just require that a commit be added for each batch and then fail a batch on any leader change. This is problematic for a couple of reasons:
>  - larger batches add latency to build, hurting throughput
>  - doesn't scale well - as a collection grows, the number of shards grow and the chance that any shard leader goes down (or the shard is split) goes up. Requiring that the entire batch (all shards) be replayed when this happens is wasteful and gets worse with collection growth.
> h2. Proposed Solution: a SolrJ cloud aware streaming client
>  - something like ConcurrentUpdateHttp2SolrClient that can stream and know about cloud layout
>  - track when last commit happened for each shard leader
>  - buffer updates per-shard since the last commit happened
>  -- doesn't have to be exact... assume idempotent updates here, so overlap is fine
>  -- buffering would also be triggered by the replica type of the collection (so this class could be used for both shared storage and normal NRT replicas) 
>  - a parameter would be passed that would disallow any forwarding (since we're handling buffering/failover at this level)
>  - on a failure because of a leader going down or loss of leadership, wait until a new leader has been elected and then replay updates since the last commit
>  - insert commits where necessary to prevent buffers from growing too large
>  -- inserted commits should be able to proceed in parallel... we shouldn't need to block and wait for a commit before resuming to send documents to that leader.
>  -- it would be nice if there was a way we could get notified if a commit happened via some other mechanism (like an autoCommit being triggered)
>   --- assuming we can't get this, perhaps we should pass a flag that disables triggering auto-commits for these batch updates?
>  - handle splits (not only can a shard leader change, but a shard could split... buffered updates may need to be re-slotted)
>  - need to handle a leader "bounce" like a change in leadership (assuming we're skipping using the transaction log)
>  - multi-threaded - all updates to a leader regardless of thread are managed as a single update stream
>  -- this perhaps provides a way to coalesce incremental/realtime updates
>  - OPTIONAL: ability to have multiple channels to a single leader?
>  -- we would need to avoid reordering updates to the same ID
>  -- an alternative to attempting to create more parallelism-per-shard on the client side is to do it on the server side.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@lucene.apache.org
For additional commands, e-mail: dev-help@lucene.apache.org