You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@lucene.apache.org by "Bilal Waheed (Jira)" <ji...@apache.org> on 2020/03/18 02:34:00 UTC

[jira] [Updated] (SOLR-14339) SHARED replica's freshness

     [ https://issues.apache.org/jira/browse/SOLR-14339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Bilal Waheed updated SOLR-14339:
--------------------------------
    Description: 
*Problem*

The source of truth for a SHARED replica lives in the shared store. At any give point in time, locally a SHARED replica can be missing some or all of its shard's indexing. The amount of indexing that is missing from a SHARED replica is arbitrary. This is even true for a leader replica because we do not update SHARED replica from shared store at startup/recovery/leader-election etc. All this means that a query served by SHARED replica can be blind to arbitrary amount of indexing.

There are few requirements we would like to support.
 # Query visibility into whatever have been indexed so far is not stale beyond some well defined SLA.
 # A client can issue a Solr query that has complete visibility into whatever has been indexed so far. This is primarily needed for tests.
 # Solr query results has information about the freshness of index used to serve the query.
 # Scalability: We would like to eventually support high number (~10K) of shared store based collections per Solr cluster. With understanding that a large majority (~80%) will not be used most of the time. The ones that will be used will also go through some daily usage cycle (less usage at night time). In other words, we do not want to proactively refresh SHARED replica from shared store until unless it is needed for querying or indexing. 

For indexing we already have things covered where we synchronously ensures that the leader replica is up to date with shared store before proceeding with new indexing. But we need a solution for queries.

*Proposal*

Two duration configuration values called *freshnessTolerance* and *refreshInterval* are introduced. Both are defined in units of seconds. Each SHARED replica tracks (in JVM memory) the last refresh time (LRT) it was brought to be up to date with the index data in the shared store (last time it read metadataSuffix^[1]^ from Zookeeper to evaluate its freshness, followed by a pull from shared store when needed). The time immediately before reading the metadataSuffix becomes the LRT. If there is nothing to pull then we simply update replica’s LRT. But if there is something to pull then LRT is updated only after the pull is successfully finished. Leader will also update its LRT after successful indexing batch. In successful indexing case the time just before the update of metadataSuffix in ZK becomes the LRT.

A replica with LRT less than equal to *freshnessTolerance* duration ago (when compared to query arrival time) is allowed to serve a query. But if the LRT is more than freshnessTolerance duration ago then we will check if there is any new indexing to pull or not. If there is none we will update the LRT of the replica and serve the query. If there is new indexing to pull then we will enqueue a pull from shared store and fail the query immediately by throwing "*Replica is not fresh enough to serve the query*" error.

SolrCloud already has a logic where if a replica is failed to serve the query then it will try the other available replicas of the shard. If none of the replicas of a shard are fresh enough to serve the query then Solr will return *NoReplicaIsFreshEnough* error.

The *refreshInterval* config denotes a shorter duration than freshnessTolerance config. A replica getting a continuous stream of queries will at least refresh itself every refreshInterval duration. 

Clients will have an option to *override freshnessTolerance on per query basis* i.e. it can be passed as a parameter to query. For immediate readability (serve the query with whatever has been indexed so far) client will pass 0 for freshnessTolerance.

If a replica serves the query it will return the *timeSinceLastRefresh* duration of the replica (in milliseconds) in query response. If in query fanout there are multiple shards(replicas) involved then the timeSinceLastRefresh duration of the query will be maximum of timeSinceLastRefresh of all the replicas that contributed to the query. Because of clock skewness issues between client and Solr, timestamp of the last refresh does not carry much meaning therefore we are returning the length of time since the last refresh.

[1] *metadataSuffix*: One collection can be made up of multiple shards and each shard can have multiple replicas. For each shard only one replica will be acting as a leader and taking indexing and pushing to shared store. Failures can happen and we can have old leader and new leader working at the same time. To keep index correct we write a unique string in zookeeper at the end of each successful indexing batch.  We call that unique string metadataSuffix and it is a per-shard metadata. It is not important to understand how it is used but that it exists and for each shard this is the source of truth for last successful indexing batch. Each replica has a JVM cache telling on what metadataSuffix version of the index it is at.

*Pseudo code*

 
{code:java}
// A SolrQueryRequest q has arrived for replica r of shard s.

// Use of query arrival time is conservative. If we somehow has the ability
// to subtract network time from this, we will still be correct in our logic.
long queryArrivalTime = q.getStartTime();
long lastRefreshTime = getLastRefreshTime(r);
Long freshnessToleranceParameter = getFreshnessToleranceParameter(q);
long acceptableFreshnessTolerance = freshnessToleranceParameter != null ?
                    freshnessToleranceParameter : getFreshnessTolerance();
acceptableFreshnessTolerance = TimeUnit.MILLISECONDS.convert(
                    acceptableFreshnessTolerance, TimeUnit.SECONDS);
long minimumAcceptableLastRefreshTime = queryArrivalTime - 
                    acceptableFreshnessTolerance;
if (lastRefreshTime >= minimumAcceptableLastRefreshTime) {
  long refreshInterval = TimeUnit.MILLISECONDS.convert(getRefreshInterval(), 
                TimeUnit.SECONDS);
  if ((lastRefreshTime + refreshInterval) < System.currentTimeMillis()) { 
    enqueuePullFromSharedStore(r);
  }
} else {
  String metadataSuffixInCache = getMetadataSuffixFromCache(r);
  long newLastRefreshTime = System.currentTimeMillis();
  String metadataSuffixInZK = getMetadataSuffixFromZK(s);
  if (metadataSuffixInCache.equals(metadataSuffixInZK)) {
    // No new indexing since the last refresh.
    // we can just update our last refresh time and we will be fresh enough to
    // serve the query        
    updateLastRefreshTime(r, newLastRefreshTime);
  } else {
    enqueuePullFromSharedStore(r);
    throw new SolrException(ErrorCode.SERVER_ERROR, "Replica is not fresh   
                                   enough to serve the query");
  }
}

// serve the query...
{code}
 

 

*Minimizing NoReplicaIsFreshEnough errors*

Some steady state scenarios this proposal will cover without failing the query with NoReplicaIsFreshEnough error.
 * A replica getting continuous query stream will keep serving them.
 * Query stream is not continuous and there is no query activity for freshnessTolerance duration.
 ** If there is no new indexing since the last query then any replica that has already been queried before will be able to server the query.
 ** If there is new indexing then only the leader replica will be able to server the query (assuming leader has not failed after indexing)

We will still have the scenarios where we will run into NoReplicaIsFreshEnough error.
 * Cold start, shard was not active (no indexing, no querying).
 * Deployments doing a rolling restart of all the replicas.
 * There was no query activity for freshnessTolerance duration of time and then there was some indexing. After indexing leader died.

Minimizing NoReplicaIsFreshEnough errors is out of the scope for this Jira. There will be a separate Jira on that.

  was:
*Problem*

The source of truth for a SHARED replica lives in the shared store. At any give point in time, locally a SHARED replica can be missing some or all of its shard's indexing. The amount of indexing that is missing from a SHARED replica is arbitrary. This is even true for a leader replica because we do not update SHARED replica from shared store at startup/recovery/leader-election etc. All this means that a query served by SHARED replica can be blind to arbitrary amount of indexing.

There are few requirements we would like to support.
 # Query visibility into whatever have been indexed so far is not stale beyond some well defined SLA.
 # A client can issue a Solr query that has complete visibility into whatever has been indexed so far. This is primarily needed for tests.
 # Solr query results has information about the freshness of index used to serve the query.
 # Scalability: We would like to eventually support high number (~10K) of shared store based collections per Solr cluster. With understanding that a large majority (~80%) will not be used most of the time. The ones that will be used will also go through some daily usage cycle (less usage at night time). In other words, we do not want to proactively refresh SHARED replica from shared store until unless it is needed for querying or indexing. 

For indexing we already have things covered where we synchronously ensures that the leader replica is up to date with shared store before proceeding with new indexing. But we need a solution for queries.

*Proposal*

Two duration configuration values called *freshnessTolerance* and *refreshInterval* are introduced. Both are defined in units of seconds. Each SHARED replica tracks (in JVM memory) the last refresh time (LRT) it was brought to be up to date with the index data in the shared store (last time it read metadataSuffix^[1]^ from Zookeeper to evaluate its freshness, followed by a pull from shared store when needed). The time immediately before reading the metadataSuffix becomes the LRT. If there is nothing to pull then we simply update replica’s LRT. But if there is something to pull then LRT is updated only after the pull is successfully finished. Leader will also update its LRT after successful indexing batch. In successful indexing case the time just before the update of metadataSuffix in ZK becomes the LRT.

A replica with LRT less than equal to *freshnessTolerance* duration ago (when compared to query arrival time) is allowed to serve a query. But if the LRT is more than freshnessTolerance duration ago then we will check if there is any new indexing to pull or not. If there is none we will update the LRT of the replica and serve the query. If there is new indexing to pull then we will enqueue a pull from shared store and fail the query immediately by throwing "*Replica is not fresh enough to serve the query*" error.

SolrCloud already has a logic where if a replica is failed to serve the query then it will try the other available replicas of the shard. If none of the replicas of a shard are fresh enough to serve the query then Solr will return *NoReplicaIsFreshEnough* error.

The *refreshInterval* config denotes a shorter duration than freshnessTolerance config. A replica getting a continuous stream of queries will at least refresh itself every refreshInterval duration. 

Clients will have an option to *override freshnessTolerance on per query basis* i.e. it can be passed as a parameter to query. For immediate readability (serve the query with whatever has been indexed so far) client will pass 0 for freshnessTolerance.

If a replica serves the query it will return the *timeSinceLastRefresh* duration of the replica (in milliseconds) in query response. If in query fanout there are multiple shards(replicas) involved then the timeSinceLastRefresh duration of the query will be maximum of timeSinceLastRefresh of all the replicas that contributed to the query. Because of clock skewness issues between client and Solr, timestamp of the last refresh does not carry much meaning therefore we are returning the length of time since the last refresh.

[1] *metadataSuffix*: One collection can be made up of multiple shards and each shard can have multiple replicas. For each shard only one replica will be acting as a leader and taking indexing and pushing to shared store. Failures can happen and we can have old leader and new leader working at the same time. To keep index correct we write a unique string in zookeeper at the end of each successful indexing batch.  We call that unique string metadataSuffix and it is a per-shard metadata. It is not important to understand how it is used but that it exists and for each shard this is the source of truth for last successful indexing batch. Each replica has a JVM cache telling on what metadataSuffix version of the index it is at. 

*Pseudo code*

 
{code:java}
// A SolrQueryRequest q has arrived for replica r of shard s.

// Use of query arrival time is conservative. If we somehow has the ability
// to subtract network time from this, we will still be correct in our logic.
long queryArrivalTime = q.getStartTime();
long lastRefreshTime = getLastRefreshTime(r);
Long freshnessToleranceParameter = getFreshnessToleranceParameter(q);
long acceptableFreshnessTolerance = freshnessToleranceParameter != null ?
                    freshnessToleranceParameter : getFreshnessTolerance();
acceptableFreshnessTolerance = TimeUnit.MILLISECONDS.convert(
                    acceptableFreshnessTolerance, TimeUnit.SECONDS);
long minimumAcceptableLastRefreshTime = queryArrivalTime - 
                    acceptableFreshnessTolerance;
if (lastRefreshTime >= minimumAcceptableLastRefreshTime) {
  long refreshInterval = TimeUnit.MILLISECONDS.convert(getRefreshInterval(), 
                TimeUnit.SECONDS);
  if ((lastRefreshTime + refreshInterval) < System.currentTimeMillis()) {          enqueuePullFromSharedStore(r);
  }
} else {
  long newLastRefreshTime = System.currentTimeMillis();
  String metadataSuffixInCache = getMetadataSuffixFromCache(r);
  String metadataSuffixInZK = getMetadataSuffixFromZK(s);
  if (metadataSuffixInCache.equals(metadataSuffixInZK)) {
    // No new indexing since we last pulled from shared store.
    // we can just update our last refresh time and we will be fresh enough to
    // serve the query        
    updateLastRefreshTime(r, newLastRefreshTime);
  } else {
    enqueuePullFromSharedStore(r);
    throw new SolrException(ErrorCode.SERVER_ERROR, "Replica is not fresh   
                                   enough to serve the query");
  }
}

// serve the query...
{code}
 

 

*Minimizing NoReplicaIsFreshEnough errors*

Some steady state scenarios this proposal will cover without failing the query with NoReplicaIsFreshEnough error.
 * A replica getting continuous query stream will keep serving them.
 * Query stream is not continuous and there is no query activity for freshnessTolerance duration. 
 ** If there is no new indexing since the last query then any replica that has already been queried before will be able to server the query. 
 ** If there is new indexing then only the leader replica will be able to server the query (assuming leader has not failed after indexing)

We will still have the scenarios where we will run into NoReplicaIsFreshEnough error.
 * Cold start, shard was not active (no indexing, no querying).
 * Deployments doing a rolling restart of all the replicas.
 * There was no query activity for freshnessTolerance duration of time and then there was some indexing. After indexing leader died.

Minimizing NoReplicaIsFreshEnough errors is out of the scope for this Jira. There will be a separate Jira on that.


> SHARED replica's freshness
> --------------------------
>
>                 Key: SOLR-14339
>                 URL: https://issues.apache.org/jira/browse/SOLR-14339
>             Project: Solr
>          Issue Type: Sub-task
>          Components: SolrCloud, SolrJ
>            Reporter: Bilal Waheed
>            Priority: Major
>
> *Problem*
> The source of truth for a SHARED replica lives in the shared store. At any give point in time, locally a SHARED replica can be missing some or all of its shard's indexing. The amount of indexing that is missing from a SHARED replica is arbitrary. This is even true for a leader replica because we do not update SHARED replica from shared store at startup/recovery/leader-election etc. All this means that a query served by SHARED replica can be blind to arbitrary amount of indexing.
> There are few requirements we would like to support.
>  # Query visibility into whatever have been indexed so far is not stale beyond some well defined SLA.
>  # A client can issue a Solr query that has complete visibility into whatever has been indexed so far. This is primarily needed for tests.
>  # Solr query results has information about the freshness of index used to serve the query.
>  # Scalability: We would like to eventually support high number (~10K) of shared store based collections per Solr cluster. With understanding that a large majority (~80%) will not be used most of the time. The ones that will be used will also go through some daily usage cycle (less usage at night time). In other words, we do not want to proactively refresh SHARED replica from shared store until unless it is needed for querying or indexing. 
> For indexing we already have things covered where we synchronously ensures that the leader replica is up to date with shared store before proceeding with new indexing. But we need a solution for queries.
> *Proposal*
> Two duration configuration values called *freshnessTolerance* and *refreshInterval* are introduced. Both are defined in units of seconds. Each SHARED replica tracks (in JVM memory) the last refresh time (LRT) it was brought to be up to date with the index data in the shared store (last time it read metadataSuffix^[1]^ from Zookeeper to evaluate its freshness, followed by a pull from shared store when needed). The time immediately before reading the metadataSuffix becomes the LRT. If there is nothing to pull then we simply update replica’s LRT. But if there is something to pull then LRT is updated only after the pull is successfully finished. Leader will also update its LRT after successful indexing batch. In successful indexing case the time just before the update of metadataSuffix in ZK becomes the LRT.
> A replica with LRT less than equal to *freshnessTolerance* duration ago (when compared to query arrival time) is allowed to serve a query. But if the LRT is more than freshnessTolerance duration ago then we will check if there is any new indexing to pull or not. If there is none we will update the LRT of the replica and serve the query. If there is new indexing to pull then we will enqueue a pull from shared store and fail the query immediately by throwing "*Replica is not fresh enough to serve the query*" error.
> SolrCloud already has a logic where if a replica is failed to serve the query then it will try the other available replicas of the shard. If none of the replicas of a shard are fresh enough to serve the query then Solr will return *NoReplicaIsFreshEnough* error.
> The *refreshInterval* config denotes a shorter duration than freshnessTolerance config. A replica getting a continuous stream of queries will at least refresh itself every refreshInterval duration. 
> Clients will have an option to *override freshnessTolerance on per query basis* i.e. it can be passed as a parameter to query. For immediate readability (serve the query with whatever has been indexed so far) client will pass 0 for freshnessTolerance.
> If a replica serves the query it will return the *timeSinceLastRefresh* duration of the replica (in milliseconds) in query response. If in query fanout there are multiple shards(replicas) involved then the timeSinceLastRefresh duration of the query will be maximum of timeSinceLastRefresh of all the replicas that contributed to the query. Because of clock skewness issues between client and Solr, timestamp of the last refresh does not carry much meaning therefore we are returning the length of time since the last refresh.
> [1] *metadataSuffix*: One collection can be made up of multiple shards and each shard can have multiple replicas. For each shard only one replica will be acting as a leader and taking indexing and pushing to shared store. Failures can happen and we can have old leader and new leader working at the same time. To keep index correct we write a unique string in zookeeper at the end of each successful indexing batch.  We call that unique string metadataSuffix and it is a per-shard metadata. It is not important to understand how it is used but that it exists and for each shard this is the source of truth for last successful indexing batch. Each replica has a JVM cache telling on what metadataSuffix version of the index it is at.
> *Pseudo code*
>  
> {code:java}
> // A SolrQueryRequest q has arrived for replica r of shard s.
> // Use of query arrival time is conservative. If we somehow has the ability
> // to subtract network time from this, we will still be correct in our logic.
> long queryArrivalTime = q.getStartTime();
> long lastRefreshTime = getLastRefreshTime(r);
> Long freshnessToleranceParameter = getFreshnessToleranceParameter(q);
> long acceptableFreshnessTolerance = freshnessToleranceParameter != null ?
>                     freshnessToleranceParameter : getFreshnessTolerance();
> acceptableFreshnessTolerance = TimeUnit.MILLISECONDS.convert(
>                     acceptableFreshnessTolerance, TimeUnit.SECONDS);
> long minimumAcceptableLastRefreshTime = queryArrivalTime - 
>                     acceptableFreshnessTolerance;
> if (lastRefreshTime >= minimumAcceptableLastRefreshTime) {
>   long refreshInterval = TimeUnit.MILLISECONDS.convert(getRefreshInterval(), 
>                 TimeUnit.SECONDS);
>   if ((lastRefreshTime + refreshInterval) < System.currentTimeMillis()) { 
>     enqueuePullFromSharedStore(r);
>   }
> } else {
>   String metadataSuffixInCache = getMetadataSuffixFromCache(r);
>   long newLastRefreshTime = System.currentTimeMillis();
>   String metadataSuffixInZK = getMetadataSuffixFromZK(s);
>   if (metadataSuffixInCache.equals(metadataSuffixInZK)) {
>     // No new indexing since the last refresh.
>     // we can just update our last refresh time and we will be fresh enough to
>     // serve the query        
>     updateLastRefreshTime(r, newLastRefreshTime);
>   } else {
>     enqueuePullFromSharedStore(r);
>     throw new SolrException(ErrorCode.SERVER_ERROR, "Replica is not fresh   
>                                    enough to serve the query");
>   }
> }
> // serve the query...
> {code}
>  
>  
> *Minimizing NoReplicaIsFreshEnough errors*
> Some steady state scenarios this proposal will cover without failing the query with NoReplicaIsFreshEnough error.
>  * A replica getting continuous query stream will keep serving them.
>  * Query stream is not continuous and there is no query activity for freshnessTolerance duration.
>  ** If there is no new indexing since the last query then any replica that has already been queried before will be able to server the query.
>  ** If there is new indexing then only the leader replica will be able to server the query (assuming leader has not failed after indexing)
> We will still have the scenarios where we will run into NoReplicaIsFreshEnough error.
>  * Cold start, shard was not active (no indexing, no querying).
>  * Deployments doing a rolling restart of all the replicas.
>  * There was no query activity for freshnessTolerance duration of time and then there was some indexing. After indexing leader died.
> Minimizing NoReplicaIsFreshEnough errors is out of the scope for this Jira. There will be a separate Jira on that.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org
For additional commands, e-mail: issues-help@lucene.apache.org