You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Abhijeet Kumar <ab...@gmail.com> on 2023/11/22 10:26:54 UTC

[DISCUSS] KIP-956: Tiered Storage Quotas

Hi All,

I have created KIP-956 for defining read and write quota for tiered storage.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas

Feedback and suggestions are welcome.

Regards,
Abhijeet.

Re: [DISCUSS] KIP-956: Tiered Storage Quotas

Posted by Abhijeet Kumar <ab...@gmail.com>.
Comments inline

On Tue, Nov 28, 2023 at 3:50 PM Luke Chen <sh...@gmail.com> wrote:
>
> Hi Abhijeet,
>
> Thanks for the KIP!
> This is an important feature for tiered storage.
>
> Some comments:
> 1. Will we introduce new metrics for this tiered storage quotas?
> This is important because the admin can know the throttling status by
> checking the metrics while the remote write/read are slow, like the rate
of
> uploading/reading byte rate, the throttled time for upload/read... etc.
>

Good point. When the fetches/copies on a broker are getting throttled,
we'll notice that the broker's fetch and copy
activities stabilize according to the predefined limit. We plan to add the
following extra metrics to indicate the throttled
time for fetches/copies, similar to the local fetch throttle metrics.

*RemoteFetchThrottleTime* - The amount of time needed to bring the observed
remote fetch rate within the read quota
*RemoteCopyThrottleTime *- The amount of time needed to bring the observed
remote copy rate with the copy quota.

> 2. Could you give some examples for the throttling algorithm in the KIP to
> explain it? That will make it much clearer.
>

The QuotaManagers look something like:

class RlmWriteQuotaManager {
   public void updateQuota(Quota newQuota) {// Implementation}
   public boolean isQuotaExceeded() {// Implementation}
   public void record(Double value) {// Implementation}
}

class RlmReadQuotaManager {
   public void updateQuota(Quota newQuota) {// Implementation}
   public boolean isQuotaExceeded() {// Implementation}
   public void record(Double value) {// Implementation}
}

*Read Throttling*

RemoteLogReader will capture the read rate once it has fetched the remote
log segment to serve the fetch request.
Changes would look something like below.

class RemoteLogReader {
@Override
public Void call() {
   // ...
   quotaManager.record(result.fetchDataInfo.map(info -> (double)
info.records.sizeInBytes()).orElse(0.0));
   callback.accept(result);
  }
}

Read throttling will be carried out by the ReplicaManager. When it gets
offset out of range error, it will check for quota
getting exhausted before returning delayedRemoteStorageFetch info.

def handleOffsetOutOfRangeError()
 {
   // ...
   if (params.isFromFollower) {
   // ...
   } else {
     // ...
     val fetchDataInfo = if (isRemoteLogReadQuotaExceeded) {
     info("Read quota exceeded. Returning empty remote storage fetch info")
     FetchDataInfo(
       LogOffsetMetadata.UnknownOffsetMetadata,
       MemoryRecords.EMPTY,
       delayedRemoteStorageFetch = None
     )
    } else {
      // Regular case when quota is not exceeded
      FetchDataInfo(LogOffsetMetadata(fetchInfo.fetchOffset),
MemoryRecords.EMPTY,
       delayedRemoteStorageFetch = Some(
         RemoteStorageFetchInfo(adjustedMaxBytes, minOneMessage, tp,
fetchInfo, fetchIsolation)))
    }
    //...
}

When the read quota is exceeded, we return an empty remote storage fetch
info. We do not want to send an exception
in the LogReadResult response (like we do in other cases when we send
UnknownOffsetMetadata), because then it is
classified as an error in reading data, and a response is immediately sent
back to the client. Instead, we want that we
should be able to serve data for other topic partitions in the fetch
request via delayed fetch if required (when sending an
immediate response, delayed fetch is skipped). Also, immediately sending a
response would make the consumer retry
again immediately, which may run into quota exceeded situation again and
thus get it into a loop.

*Write Throttling*

RemoteLogManager.java

private ReentrantLock writeQuotaManagerLock = new ReentrantLock(true);
private Condition lockCondition = writeQuotaManagerLock.newCondition();

private void copyLogSegment {
  // ...
  writeQuotaManagerLock.lock();
  try {
    while (rlmWriteQuotaManager.isQuotaExceeded) {
      // Quota exceeded, waiting for quota to be available
      lockCondition.await(timeout, TimeUnit.MILLISECONDS);
    }
    rlmWriteQuotamanager.record(segment.log.sizeInBytes())
    // Signal waiting threads to check the quota
    locakCondition.signalAll()
  } finally {
    writeQuotaManagerLock.unlock();
  }

  // Actual copy operation
}

Before beginning to copy the segment to remote, the RLMTask checks if the
quota is exceeded. If it has, the task waits for

a specified period before rechecking the quota status. Once the RLMTask
confirms that the quota is available, it records

the size of the log it plans to upload with the quota manager before
proceeding with the segment upload. This step is crucial

to ensure that the quota is accurately updated and prevents other RLMTasks
from mistakenly assuming that quota space is

available for them as well.

> 3. To solve this problem, we can break down the RLMTask into two smaller
> tasks - one for segment upload and the other for handling expired
segments.
> How do we handle the situation when a segment is still waiting for
> offloading while this segment is expired and eligible to be deleted?
> Maybe it'll be easier to not block the RLMTask when quota exceeded, and
> just check it each time the RLMTask runs?
>

The concern here is that this may cause starvation from some topic
partitions. RLMTasks are added to the thread pool executor
to run with a specific schedule (every 30 secs). If we do not block the
RLMTask when the quota is exceeded and instead skip
the run, RLMTask for certain topic partitions may never run, because every
time it is scheduled to run, the broker-level upload
quota at that instant may have already been exhausted

Breaking down the RLMTask into smaller tasks is already being proposed in
KIP-950. The two tasks will need some coordination

at a topic-partition level to handle such cases as pointed out. For this
particular case, the upload task could skip uploading the

segment if it is already eligible for deletion.


> Thank you.
> Luke
>
> On Wed, Nov 22, 2023 at 6:27 PM Abhijeet Kumar <abhijeet.cse.kgp@gmail.com
>
> wrote:
>
> > Hi All,
> >
> > I have created KIP-956 for defining read and write quota for tiered
> > storage.
> >
> >
> >
https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas
> >
> > Feedback and suggestions are welcome.
> >
> > Regards,
> > Abhijeet.
> >



-- 
Abhijeet.

Re: [DISCUSS] KIP-956: Tiered Storage Quotas

Posted by Jorge Esteban Quilcate Otoya <qu...@gmail.com>.
Sorry I missed that comment on the thread. Proposal looks great, thanks,
Abhijeet!

On Sat, 16 Mar 2024 at 13:19, Abhijeet Kumar <ab...@gmail.com>
wrote:

> Hi Jorge,
>
> The configs name was chosen to keep it consistent with the other existing
> quota configs, such as
> *replica.alter.log.dirs.io.max.bytes.per.second* as pointed out by Jun in
> the thread.
>
> Also, we can revisit the names of the components during implementation,
> since those are not exposed to the user.
>
> Please let me know if you have any further concerns.
>
> Regards,
> Abhijeet.
>
>
>
> On Mon, Mar 11, 2024 at 6:11 PM Jorge Esteban Quilcate Otoya <
> quilcate.jorge@gmail.com> wrote:
>
> > Hi Abhijeet,
> >
> > Thanks for the KIP! Looks good to me. I just have a minor comments on
> > naming:
> >
> > Would it be work to align the config names to existing quota names?
> > e.g. `remote.log.manager.copy.byte.rate.quota` (or similar) instead of
> > `remote.log.manager.copy.max.bytes.per.second`?
> >
> > Same for new components, could we use the same verbs as in the configs:
> > - RLMCopyQuotaManager
> > - RLMFetchQuotaManager
> >
> >
> > On Fri, 8 Mar 2024 at 13:43, Abhijeet Kumar <ab...@gmail.com>
> > wrote:
> >
> > > Thank you all for your comments. As all the comments in the thread are
> > > addressed, I am starting a Vote thread for the KIP. Please have a look.
> > >
> > > Regards.
> > >
> > > On Thu, Mar 7, 2024 at 12:34 PM Luke Chen <sh...@gmail.com> wrote:
> > >
> > > > Hi Abhijeet,
> > > >
> > > > Thanks for the update and the explanation.
> > > > I had another look, and it LGTM now!
> > > >
> > > > Thanks.
> > > > Luke
> > > >
> > > > On Tue, Mar 5, 2024 at 2:50 AM Jun Rao <ju...@confluent.io.invalid>
> > wrote:
> > > >
> > > > > Hi, Abhijeet,
> > > > >
> > > > > Thanks for the reply. Sounds good to me.
> > > > >
> > > > > Jun
> > > > >
> > > > >
> > > > > On Sat, Mar 2, 2024 at 7:40 PM Abhijeet Kumar <
> > > > abhijeet.cse.kgp@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Jun,
> > > > > >
> > > > > > Thanks for pointing it out. It makes sense to me. We can have the
> > > > > following
> > > > > > metrics instead. What do you think?
> > > > > >
> > > > > >    - remote-(fetch|copy)-throttle-time-avg (The average time in
> ms
> > > > remote
> > > > > >    fetches/copies was throttled by a broker)
> > > > > >    - remote-(fetch|copy)-throttle-time--max (The maximum time in
> ms
> > > > > remote
> > > > > >    fetches/copies was throttled by a broker)
> > > > > >
> > > > > > These are similar to fetch-throttle-time-avg and
> > > > fetch-throttle-time-max
> > > > > > metrics we have for Kafak Consumers?
> > > > > > The Avg and Max are computed over the (sliding) window as defined
> > by
> > > > the
> > > > > > configuration metrics.sample.window.ms on the server.
> > > > > >
> > > > > > (Also, I will update the config and metric names to be
> consistent)
> > > > > >
> > > > > > Regards.
> > > > > >
> > > > > > On Thu, Feb 29, 2024 at 2:51 AM Jun Rao <jun@confluent.io.invalid
> >
> > > > > wrote:
> > > > > >
> > > > > > > Hi, Abhijeet,
> > > > > > >
> > > > > > > Thanks for the reply.
> > > > > > >
> > > > > > > The issue with recording the throttle time as a gauge is that
> > it's
> > > > > > > transient. If the metric is not read immediately, the recorded
> > > value
> > > > > > could
> > > > > > > be reset to 0. The admin won't realize that throttling has
> > > happened.
> > > > > > >
> > > > > > > For client quotas, the throttle time is tracked as the average
> > > > > > > throttle-time per user/client-id. This makes the metric less
> > > > transient.
> > > > > > >
> > > > > > > Also, the configs use read/write whereas the metrics use
> > > fetch/copy.
> > > > > > Could
> > > > > > > we make them consistent?
> > > > > > >
> > > > > > > Jun
> > > > > > >
> > > > > > > On Wed, Feb 28, 2024 at 6:49 AM Abhijeet Kumar <
> > > > > > abhijeet.cse.kgp@gmail.com
> > > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Jun,
> > > > > > > >
> > > > > > > > Clarified the meaning of the two metrics. Also updated the
> KIP.
> > > > > > > >
> > > > > > > > kafka.log.remote:type=RemoteLogManager,
> > > > name=RemoteFetchThrottleTime
> > > > > ->
> > > > > > > The
> > > > > > > > duration of time required at a given moment to bring the
> > observed
> > > > > fetch
> > > > > > > > rate within the allowed limit, by preventing further reads.
> > > > > > > > kafka.log.remote:type=RemoteLogManager,
> > > name=RemoteCopyThrottleTime
> > > > > ->
> > > > > > > The
> > > > > > > > duration of time required at a given moment to bring the
> > observed
> > > > > > remote
> > > > > > > > copy rate within the allowed limit, by preventing further
> > copies.
> > > > > > > >
> > > > > > > > Regards.
> > > > > > > >
> > > > > > > > On Wed, Feb 28, 2024 at 12:28 AM Jun Rao
> > > <jun@confluent.io.invalid
> > > > >
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi, Abhijeet,
> > > > > > > > >
> > > > > > > > > Thanks for the explanation. Makes sense to me now.
> > > > > > > > >
> > > > > > > > > Just a minor comment. Could you document the exact meaning
> of
> > > the
> > > > > > > > following
> > > > > > > > > two metrics? For example, is the time accumulated? If so,
> is
> > it
> > > > > from
> > > > > > > the
> > > > > > > > > start of the broker or over some window?
> > > > > > > > >
> > > > > > > > > kafka.log.remote:type=RemoteLogManager,
> > > > > name=RemoteFetchThrottleTime
> > > > > > > > > kafka.log.remote:type=RemoteLogManager,
> > > > name=RemoteCopyThrottleTime
> > > > > > > > >
> > > > > > > > > Jun
> > > > > > > > >
> > > > > > > > > On Tue, Feb 27, 2024 at 1:39 AM Abhijeet Kumar <
> > > > > > > > abhijeet.cse.kgp@gmail.com
> > > > > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Jun,
> > > > > > > > > >
> > > > > > > > > > The existing quota system for consumers is designed to
> > > throttle
> > > > > the
> > > > > > > > > > consumer if it exceeds the allowed fetch rate.
> > > > > > > > > > The additional quota we want to add works on the broker
> > > level.
> > > > If
> > > > > > the
> > > > > > > > > > broker-level remote read quota is being
> > > > > > > > > > exceeded, we prevent additional reads from the remote
> > storage
> > > > but
> > > > > > do
> > > > > > > > not
> > > > > > > > > > prevent local reads for the consumer.
> > > > > > > > > > If the consumer has specified other partitions to read,
> > which
> > > > can
> > > > > > be
> > > > > > > > > served
> > > > > > > > > > from local, it can continue to read those
> > > > > > > > > > partitions. To elaborate more, we make a check for quota
> > > > exceeded
> > > > > > if
> > > > > > > we
> > > > > > > > > > know a segment needs to be read from
> > > > > > > > > > remote. If the quota is exceeded, we simply skip the
> > > partition
> > > > > and
> > > > > > > move
> > > > > > > > > to
> > > > > > > > > > other segments in the fetch request.
> > > > > > > > > > This way consumers can continue to read the local data as
> > > long
> > > > as
> > > > > > > they
> > > > > > > > > have
> > > > > > > > > > not exceeded the client-level quota.
> > > > > > > > > >
> > > > > > > > > > Also, when we choose the appropriate consumer-level
> quota,
> > we
> > > > > would
> > > > > > > > > > typically look at what kind of local fetch
> > > > > > > > > > throughput is supported. If we were to reuse the same
> > > consumer
> > > > > > quota,
> > > > > > > > we
> > > > > > > > > > should also consider the throughput
> > > > > > > > > > the remote storage supports. The throughput supported by
> > > remote
> > > > > may
> > > > > > > be
> > > > > > > > > > less/more than the throughput supported
> > > > > > > > > > by local (when using a cloud provider, it depends on the
> > plan
> > > > > opted
> > > > > > > by
> > > > > > > > > the
> > > > > > > > > > user). The consumer quota has to be carefully
> > > > > > > > > > set considering both local and remote throughput.
> Instead,
> > if
> > > > we
> > > > > > > have a
> > > > > > > > > > separate quota, it makes things much simpler
> > > > > > > > > > for the user, since they already know what throughput
> their
> > > > > remote
> > > > > > > > > storage
> > > > > > > > > > supports.
> > > > > > > > > >
> > > > > > > > > > (Also, thanks for pointing out. I will update the KIP
> based
> > > on
> > > > > the
> > > > > > > > > > discussion)
> > > > > > > > > >
> > > > > > > > > > Regards,
> > > > > > > > > > Abhijeet.
> > > > > > > > > >
> > > > > > > > > > On Tue, Feb 27, 2024 at 2:49 AM Jun Rao
> > > > <jun@confluent.io.invalid
> > > > > >
> > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi, Abhijeet,
> > > > > > > > > > >
> > > > > > > > > > > Sorry for the late reply. It seems that you haven't
> > updated
> > > > the
> > > > > > KIP
> > > > > > > > > based
> > > > > > > > > > > on the discussion? One more comment.
> > > > > > > > > > >
> > > > > > > > > > > 11. Currently, we already have a quota system for both
> > the
> > > > > > > producers
> > > > > > > > > and
> > > > > > > > > > > consumers. I can understand why we need an additional
> > > > > > > > > > > remote.log.manager.write.quota.default quota. For
> > example,
> > > > when
> > > > > > > tier
> > > > > > > > > > > storage is enabled for the first time, there could be a
> > lot
> > > > of
> > > > > > > > segments
> > > > > > > > > > > that need to be written to the remote storage, even
> > though
> > > > > there
> > > > > > is
> > > > > > > > no
> > > > > > > > > > > increase in the produced data. However, I am not sure
> > about
> > > > an
> > > > > > > > > > > additional remote.log.manager.read.quota.default. The
> KIP
> > > > says
> > > > > > that
> > > > > > > > the
> > > > > > > > > > > reason is "This may happen when the majority of the
> > > consumers
> > > > > > start
> > > > > > > > > > reading
> > > > > > > > > > > from the earliest offset of their respective Kafka
> > > topics.".
> > > > > > > However,
> > > > > > > > > > this
> > > > > > > > > > > can happen with or without tier storage and the current
> > > quota
> > > > > > > system
> > > > > > > > > for
> > > > > > > > > > > consumers is designed for solving this exact problem.
> > Could
> > > > you
> > > > > > > > explain
> > > > > > > > > > the
> > > > > > > > > > > usage of this additional quota?
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > >
> > > > > > > > > > > Jun
> > > > > > > > > > >
> > > > > > > > > > > On Mon, Feb 12, 2024 at 11:08 AM Abhijeet Kumar <
> > > > > > > > > > > abhijeet.cse.kgp@gmail.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Comments inline
> > > > > > > > > > > >
> > > > > > > > > > > > On Wed, Dec 6, 2023 at 1:12 AM Jun Rao
> > > > > > <jun@confluent.io.invalid
> > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi, Abhijeet,
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks for the KIP. A few comments.
> > > > > > > > > > > > >
> > > > > > > > > > > > > 10. remote.log.manager.write.quota.default:
> > > > > > > > > > > > > 10.1 For other configs, we
> > > > > > > > > > > > > use replica.alter.log.dirs.io.max.bytes.per.second.
> > To
> > > be
> > > > > > > > > consistent,
> > > > > > > > > > > > > perhaps this can be sth like
> > > > > > > > > > > > remote.log.manager.write.max.bytes.per.second.
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > This makes sense, we can rename the following configs
> > to
> > > be
> > > > > > > > > consistent.
> > > > > > > > > > > >
> > > > > > > > > > > > Remote.log.manager.write.quota.default ->
> > > > > > > > > > > > remote.log.manager.write.max.bytes.per.second
> > > > > > > > > > > >
> > > > > > > > > > > > Remote.log.manager.read.quota.default ->
> > > > > > > > > > > > remote.log.manager.read.max.bytes.per.second.
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > > 10.2 Could we list the new metrics associated with
> > the
> > > > new
> > > > > > > quota.
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > We will add the following metrics as mentioned in the
> > > other
> > > > > > > > response.
> > > > > > > > > > > > *RemoteFetchThrottleTime* - The amount of time needed
> > to
> > > > > bring
> > > > > > > the
> > > > > > > > > > > observed
> > > > > > > > > > > > remote fetch rate within the read quota
> > > > > > > > > > > > *RemoteCopyThrottleTime *- The amount of time needed
> to
> > > > bring
> > > > > > the
> > > > > > > > > > > observed
> > > > > > > > > > > > remote copy rate with the copy quota.
> > > > > > > > > > > >
> > > > > > > > > > > > 10.3 Is this dynamically configurable? If so, could
> we
> > > > > document
> > > > > > > the
> > > > > > > > > > > impact
> > > > > > > > > > > > > to tools like kafka-configs.sh and AdminClient?
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > Yes, the quotas are dynamically configurable. We will
> > add
> > > > > them
> > > > > > as
> > > > > > > > > > Dynamic
> > > > > > > > > > > > Broker Configs. Users will be able to change
> > > > > > > > > > > > the following configs using either kafka-configs.sh
> or
> > > > > > > AdminClient
> > > > > > > > by
> > > > > > > > > > > > specifying the config name and new value. For eg.
> > > > > > > > > > > >
> > > > > > > > > > > > Using kafka-configs.sh
> > > > > > > > > > > >
> > > > > > > > > > > > bin/kafka-configs.sh --bootstrap-server
> > > <bootstrap-server>
> > > > > > > > > > --entity-type
> > > > > > > > > > > > brokers --entity-default --alter --add-config
> > > > > > > > > > > >
> remote.log.manager.write.max.bytes.per.second=52428800
> > > > > > > > > > > >
> > > > > > > > > > > > Using AdminClient
> > > > > > > > > > > >
> > > > > > > > > > > > ConfigEntry configEntry = new
> > > > > > > > > > > >
> > > > ConfigEntry("remote.log.manager.write.max.bytes.per.second",
> > > > > > > > > > "5242800");
> > > > > > > > > > > > AlterConfigOp alterConfigOp = new
> > > > AlterConfigOp(configEntry,
> > > > > > > > > > > > AlterConfigOp.OpType.SET);
> > > > > > > > > > > > List<AlterConfigOp> alterConfigOps =
> > > > > > > > > > > > Collections.singletonList(alterConfigOp);
> > > > > > > > > > > >
> > > > > > > > > > > > ConfigResource resource = new
> > > > > > > > > > ConfigResource(ConfigResource.Type.BROKER,
> > > > > > > > > > > > "");
> > > > > > > > > > > > Map<ConfigResource, Collection<AlterConfigOp>>
> > > > updateConfig =
> > > > > > > > > > > > ImmutableMap.of(resource, alterConfigOps);
> > > > > > > > > > > > adminClient.incrementalAlterConfigs(updateConfig);
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > Jun
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Tue, Nov 28, 2023 at 2:19 AM Luke Chen <
> > > > > showuon@gmail.com
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi Abhijeet,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks for the KIP!
> > > > > > > > > > > > > > This is an important feature for tiered storage.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Some comments:
> > > > > > > > > > > > > > 1. Will we introduce new metrics for this tiered
> > > > storage
> > > > > > > > quotas?
> > > > > > > > > > > > > > This is important because the admin can know the
> > > > > throttling
> > > > > > > > > status
> > > > > > > > > > by
> > > > > > > > > > > > > > checking the metrics while the remote write/read
> > are
> > > > > slow,
> > > > > > > like
> > > > > > > > > the
> > > > > > > > > > > > rate
> > > > > > > > > > > > > of
> > > > > > > > > > > > > > uploading/reading byte rate, the throttled time
> for
> > > > > > > > > upload/read...
> > > > > > > > > > > etc.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > 2. Could you give some examples for the
> throttling
> > > > > > algorithm
> > > > > > > in
> > > > > > > > > the
> > > > > > > > > > > KIP
> > > > > > > > > > > > > to
> > > > > > > > > > > > > > explain it? That will make it much clearer.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > 3. To solve this problem, we can break down the
> > > RLMTask
> > > > > > into
> > > > > > > > two
> > > > > > > > > > > > smaller
> > > > > > > > > > > > > > tasks - one for segment upload and the other for
> > > > handling
> > > > > > > > expired
> > > > > > > > > > > > > segments.
> > > > > > > > > > > > > > How do we handle the situation when a segment is
> > > still
> > > > > > > waiting
> > > > > > > > > for
> > > > > > > > > > > > > > offloading while this segment is expired and
> > eligible
> > > > to
> > > > > be
> > > > > > > > > > deleted?
> > > > > > > > > > > > > > Maybe it'll be easier to not block the RLMTask
> when
> > > > quota
> > > > > > > > > exceeded,
> > > > > > > > > > > and
> > > > > > > > > > > > > > just check it each time the RLMTask runs?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Thank you.
> > > > > > > > > > > > > > Luke
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Wed, Nov 22, 2023 at 6:27 PM Abhijeet Kumar <
> > > > > > > > > > > > > abhijeet.cse.kgp@gmail.com
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Hi All,
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > I have created KIP-956 for defining read and
> > write
> > > > > quota
> > > > > > > for
> > > > > > > > > > tiered
> > > > > > > > > > > > > > > storage.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Feedback and suggestions are welcome.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Regards,
> > > > > > > > > > > > > > > Abhijeet.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > --
> > > > > > > > > > > > Abhijeet.
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > --
> > > > > > > > > > Abhijeet.
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > Abhijeet.
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Abhijeet.
> > > > > >
> > > > >
> > > >
> > >
> > >
> > > --
> > > Abhijeet.
> > >
> >
>

Re: [DISCUSS] KIP-956: Tiered Storage Quotas

Posted by Abhijeet Kumar <ab...@gmail.com>.
Hi Jorge,

The configs name was chosen to keep it consistent with the other existing
quota configs, such as
*replica.alter.log.dirs.io.max.bytes.per.second* as pointed out by Jun in
the thread.

Also, we can revisit the names of the components during implementation,
since those are not exposed to the user.

Please let me know if you have any further concerns.

Regards,
Abhijeet.



On Mon, Mar 11, 2024 at 6:11 PM Jorge Esteban Quilcate Otoya <
quilcate.jorge@gmail.com> wrote:

> Hi Abhijeet,
>
> Thanks for the KIP! Looks good to me. I just have a minor comments on
> naming:
>
> Would it be work to align the config names to existing quota names?
> e.g. `remote.log.manager.copy.byte.rate.quota` (or similar) instead of
> `remote.log.manager.copy.max.bytes.per.second`?
>
> Same for new components, could we use the same verbs as in the configs:
> - RLMCopyQuotaManager
> - RLMFetchQuotaManager
>
>
> On Fri, 8 Mar 2024 at 13:43, Abhijeet Kumar <ab...@gmail.com>
> wrote:
>
> > Thank you all for your comments. As all the comments in the thread are
> > addressed, I am starting a Vote thread for the KIP. Please have a look.
> >
> > Regards.
> >
> > On Thu, Mar 7, 2024 at 12:34 PM Luke Chen <sh...@gmail.com> wrote:
> >
> > > Hi Abhijeet,
> > >
> > > Thanks for the update and the explanation.
> > > I had another look, and it LGTM now!
> > >
> > > Thanks.
> > > Luke
> > >
> > > On Tue, Mar 5, 2024 at 2:50 AM Jun Rao <ju...@confluent.io.invalid>
> wrote:
> > >
> > > > Hi, Abhijeet,
> > > >
> > > > Thanks for the reply. Sounds good to me.
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Sat, Mar 2, 2024 at 7:40 PM Abhijeet Kumar <
> > > abhijeet.cse.kgp@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi Jun,
> > > > >
> > > > > Thanks for pointing it out. It makes sense to me. We can have the
> > > > following
> > > > > metrics instead. What do you think?
> > > > >
> > > > >    - remote-(fetch|copy)-throttle-time-avg (The average time in ms
> > > remote
> > > > >    fetches/copies was throttled by a broker)
> > > > >    - remote-(fetch|copy)-throttle-time--max (The maximum time in ms
> > > > remote
> > > > >    fetches/copies was throttled by a broker)
> > > > >
> > > > > These are similar to fetch-throttle-time-avg and
> > > fetch-throttle-time-max
> > > > > metrics we have for Kafak Consumers?
> > > > > The Avg and Max are computed over the (sliding) window as defined
> by
> > > the
> > > > > configuration metrics.sample.window.ms on the server.
> > > > >
> > > > > (Also, I will update the config and metric names to be consistent)
> > > > >
> > > > > Regards.
> > > > >
> > > > > On Thu, Feb 29, 2024 at 2:51 AM Jun Rao <ju...@confluent.io.invalid>
> > > > wrote:
> > > > >
> > > > > > Hi, Abhijeet,
> > > > > >
> > > > > > Thanks for the reply.
> > > > > >
> > > > > > The issue with recording the throttle time as a gauge is that
> it's
> > > > > > transient. If the metric is not read immediately, the recorded
> > value
> > > > > could
> > > > > > be reset to 0. The admin won't realize that throttling has
> > happened.
> > > > > >
> > > > > > For client quotas, the throttle time is tracked as the average
> > > > > > throttle-time per user/client-id. This makes the metric less
> > > transient.
> > > > > >
> > > > > > Also, the configs use read/write whereas the metrics use
> > fetch/copy.
> > > > > Could
> > > > > > we make them consistent?
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > > On Wed, Feb 28, 2024 at 6:49 AM Abhijeet Kumar <
> > > > > abhijeet.cse.kgp@gmail.com
> > > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Jun,
> > > > > > >
> > > > > > > Clarified the meaning of the two metrics. Also updated the KIP.
> > > > > > >
> > > > > > > kafka.log.remote:type=RemoteLogManager,
> > > name=RemoteFetchThrottleTime
> > > > ->
> > > > > > The
> > > > > > > duration of time required at a given moment to bring the
> observed
> > > > fetch
> > > > > > > rate within the allowed limit, by preventing further reads.
> > > > > > > kafka.log.remote:type=RemoteLogManager,
> > name=RemoteCopyThrottleTime
> > > > ->
> > > > > > The
> > > > > > > duration of time required at a given moment to bring the
> observed
> > > > > remote
> > > > > > > copy rate within the allowed limit, by preventing further
> copies.
> > > > > > >
> > > > > > > Regards.
> > > > > > >
> > > > > > > On Wed, Feb 28, 2024 at 12:28 AM Jun Rao
> > <jun@confluent.io.invalid
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hi, Abhijeet,
> > > > > > > >
> > > > > > > > Thanks for the explanation. Makes sense to me now.
> > > > > > > >
> > > > > > > > Just a minor comment. Could you document the exact meaning of
> > the
> > > > > > > following
> > > > > > > > two metrics? For example, is the time accumulated? If so, is
> it
> > > > from
> > > > > > the
> > > > > > > > start of the broker or over some window?
> > > > > > > >
> > > > > > > > kafka.log.remote:type=RemoteLogManager,
> > > > name=RemoteFetchThrottleTime
> > > > > > > > kafka.log.remote:type=RemoteLogManager,
> > > name=RemoteCopyThrottleTime
> > > > > > > >
> > > > > > > > Jun
> > > > > > > >
> > > > > > > > On Tue, Feb 27, 2024 at 1:39 AM Abhijeet Kumar <
> > > > > > > abhijeet.cse.kgp@gmail.com
> > > > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Jun,
> > > > > > > > >
> > > > > > > > > The existing quota system for consumers is designed to
> > throttle
> > > > the
> > > > > > > > > consumer if it exceeds the allowed fetch rate.
> > > > > > > > > The additional quota we want to add works on the broker
> > level.
> > > If
> > > > > the
> > > > > > > > > broker-level remote read quota is being
> > > > > > > > > exceeded, we prevent additional reads from the remote
> storage
> > > but
> > > > > do
> > > > > > > not
> > > > > > > > > prevent local reads for the consumer.
> > > > > > > > > If the consumer has specified other partitions to read,
> which
> > > can
> > > > > be
> > > > > > > > served
> > > > > > > > > from local, it can continue to read those
> > > > > > > > > partitions. To elaborate more, we make a check for quota
> > > exceeded
> > > > > if
> > > > > > we
> > > > > > > > > know a segment needs to be read from
> > > > > > > > > remote. If the quota is exceeded, we simply skip the
> > partition
> > > > and
> > > > > > move
> > > > > > > > to
> > > > > > > > > other segments in the fetch request.
> > > > > > > > > This way consumers can continue to read the local data as
> > long
> > > as
> > > > > > they
> > > > > > > > have
> > > > > > > > > not exceeded the client-level quota.
> > > > > > > > >
> > > > > > > > > Also, when we choose the appropriate consumer-level quota,
> we
> > > > would
> > > > > > > > > typically look at what kind of local fetch
> > > > > > > > > throughput is supported. If we were to reuse the same
> > consumer
> > > > > quota,
> > > > > > > we
> > > > > > > > > should also consider the throughput
> > > > > > > > > the remote storage supports. The throughput supported by
> > remote
> > > > may
> > > > > > be
> > > > > > > > > less/more than the throughput supported
> > > > > > > > > by local (when using a cloud provider, it depends on the
> plan
> > > > opted
> > > > > > by
> > > > > > > > the
> > > > > > > > > user). The consumer quota has to be carefully
> > > > > > > > > set considering both local and remote throughput. Instead,
> if
> > > we
> > > > > > have a
> > > > > > > > > separate quota, it makes things much simpler
> > > > > > > > > for the user, since they already know what throughput their
> > > > remote
> > > > > > > > storage
> > > > > > > > > supports.
> > > > > > > > >
> > > > > > > > > (Also, thanks for pointing out. I will update the KIP based
> > on
> > > > the
> > > > > > > > > discussion)
> > > > > > > > >
> > > > > > > > > Regards,
> > > > > > > > > Abhijeet.
> > > > > > > > >
> > > > > > > > > On Tue, Feb 27, 2024 at 2:49 AM Jun Rao
> > > <jun@confluent.io.invalid
> > > > >
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi, Abhijeet,
> > > > > > > > > >
> > > > > > > > > > Sorry for the late reply. It seems that you haven't
> updated
> > > the
> > > > > KIP
> > > > > > > > based
> > > > > > > > > > on the discussion? One more comment.
> > > > > > > > > >
> > > > > > > > > > 11. Currently, we already have a quota system for both
> the
> > > > > > producers
> > > > > > > > and
> > > > > > > > > > consumers. I can understand why we need an additional
> > > > > > > > > > remote.log.manager.write.quota.default quota. For
> example,
> > > when
> > > > > > tier
> > > > > > > > > > storage is enabled for the first time, there could be a
> lot
> > > of
> > > > > > > segments
> > > > > > > > > > that need to be written to the remote storage, even
> though
> > > > there
> > > > > is
> > > > > > > no
> > > > > > > > > > increase in the produced data. However, I am not sure
> about
> > > an
> > > > > > > > > > additional remote.log.manager.read.quota.default. The KIP
> > > says
> > > > > that
> > > > > > > the
> > > > > > > > > > reason is "This may happen when the majority of the
> > consumers
> > > > > start
> > > > > > > > > reading
> > > > > > > > > > from the earliest offset of their respective Kafka
> > topics.".
> > > > > > However,
> > > > > > > > > this
> > > > > > > > > > can happen with or without tier storage and the current
> > quota
> > > > > > system
> > > > > > > > for
> > > > > > > > > > consumers is designed for solving this exact problem.
> Could
> > > you
> > > > > > > explain
> > > > > > > > > the
> > > > > > > > > > usage of this additional quota?
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > >
> > > > > > > > > > Jun
> > > > > > > > > >
> > > > > > > > > > On Mon, Feb 12, 2024 at 11:08 AM Abhijeet Kumar <
> > > > > > > > > > abhijeet.cse.kgp@gmail.com>
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Comments inline
> > > > > > > > > > >
> > > > > > > > > > > On Wed, Dec 6, 2023 at 1:12 AM Jun Rao
> > > > > <jun@confluent.io.invalid
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi, Abhijeet,
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks for the KIP. A few comments.
> > > > > > > > > > > >
> > > > > > > > > > > > 10. remote.log.manager.write.quota.default:
> > > > > > > > > > > > 10.1 For other configs, we
> > > > > > > > > > > > use replica.alter.log.dirs.io.max.bytes.per.second.
> To
> > be
> > > > > > > > consistent,
> > > > > > > > > > > > perhaps this can be sth like
> > > > > > > > > > > remote.log.manager.write.max.bytes.per.second.
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > This makes sense, we can rename the following configs
> to
> > be
> > > > > > > > consistent.
> > > > > > > > > > >
> > > > > > > > > > > Remote.log.manager.write.quota.default ->
> > > > > > > > > > > remote.log.manager.write.max.bytes.per.second
> > > > > > > > > > >
> > > > > > > > > > > Remote.log.manager.read.quota.default ->
> > > > > > > > > > > remote.log.manager.read.max.bytes.per.second.
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > 10.2 Could we list the new metrics associated with
> the
> > > new
> > > > > > quota.
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > We will add the following metrics as mentioned in the
> > other
> > > > > > > response.
> > > > > > > > > > > *RemoteFetchThrottleTime* - The amount of time needed
> to
> > > > bring
> > > > > > the
> > > > > > > > > > observed
> > > > > > > > > > > remote fetch rate within the read quota
> > > > > > > > > > > *RemoteCopyThrottleTime *- The amount of time needed to
> > > bring
> > > > > the
> > > > > > > > > > observed
> > > > > > > > > > > remote copy rate with the copy quota.
> > > > > > > > > > >
> > > > > > > > > > > 10.3 Is this dynamically configurable? If so, could we
> > > > document
> > > > > > the
> > > > > > > > > > impact
> > > > > > > > > > > > to tools like kafka-configs.sh and AdminClient?
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > Yes, the quotas are dynamically configurable. We will
> add
> > > > them
> > > > > as
> > > > > > > > > Dynamic
> > > > > > > > > > > Broker Configs. Users will be able to change
> > > > > > > > > > > the following configs using either kafka-configs.sh or
> > > > > > AdminClient
> > > > > > > by
> > > > > > > > > > > specifying the config name and new value. For eg.
> > > > > > > > > > >
> > > > > > > > > > > Using kafka-configs.sh
> > > > > > > > > > >
> > > > > > > > > > > bin/kafka-configs.sh --bootstrap-server
> > <bootstrap-server>
> > > > > > > > > --entity-type
> > > > > > > > > > > brokers --entity-default --alter --add-config
> > > > > > > > > > > remote.log.manager.write.max.bytes.per.second=52428800
> > > > > > > > > > >
> > > > > > > > > > > Using AdminClient
> > > > > > > > > > >
> > > > > > > > > > > ConfigEntry configEntry = new
> > > > > > > > > > >
> > > ConfigEntry("remote.log.manager.write.max.bytes.per.second",
> > > > > > > > > "5242800");
> > > > > > > > > > > AlterConfigOp alterConfigOp = new
> > > AlterConfigOp(configEntry,
> > > > > > > > > > > AlterConfigOp.OpType.SET);
> > > > > > > > > > > List<AlterConfigOp> alterConfigOps =
> > > > > > > > > > > Collections.singletonList(alterConfigOp);
> > > > > > > > > > >
> > > > > > > > > > > ConfigResource resource = new
> > > > > > > > > ConfigResource(ConfigResource.Type.BROKER,
> > > > > > > > > > > "");
> > > > > > > > > > > Map<ConfigResource, Collection<AlterConfigOp>>
> > > updateConfig =
> > > > > > > > > > > ImmutableMap.of(resource, alterConfigOps);
> > > > > > > > > > > adminClient.incrementalAlterConfigs(updateConfig);
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > Jun
> > > > > > > > > > > >
> > > > > > > > > > > > On Tue, Nov 28, 2023 at 2:19 AM Luke Chen <
> > > > showuon@gmail.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi Abhijeet,
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks for the KIP!
> > > > > > > > > > > > > This is an important feature for tiered storage.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Some comments:
> > > > > > > > > > > > > 1. Will we introduce new metrics for this tiered
> > > storage
> > > > > > > quotas?
> > > > > > > > > > > > > This is important because the admin can know the
> > > > throttling
> > > > > > > > status
> > > > > > > > > by
> > > > > > > > > > > > > checking the metrics while the remote write/read
> are
> > > > slow,
> > > > > > like
> > > > > > > > the
> > > > > > > > > > > rate
> > > > > > > > > > > > of
> > > > > > > > > > > > > uploading/reading byte rate, the throttled time for
> > > > > > > > upload/read...
> > > > > > > > > > etc.
> > > > > > > > > > > > >
> > > > > > > > > > > > > 2. Could you give some examples for the throttling
> > > > > algorithm
> > > > > > in
> > > > > > > > the
> > > > > > > > > > KIP
> > > > > > > > > > > > to
> > > > > > > > > > > > > explain it? That will make it much clearer.
> > > > > > > > > > > > >
> > > > > > > > > > > > > 3. To solve this problem, we can break down the
> > RLMTask
> > > > > into
> > > > > > > two
> > > > > > > > > > > smaller
> > > > > > > > > > > > > tasks - one for segment upload and the other for
> > > handling
> > > > > > > expired
> > > > > > > > > > > > segments.
> > > > > > > > > > > > > How do we handle the situation when a segment is
> > still
> > > > > > waiting
> > > > > > > > for
> > > > > > > > > > > > > offloading while this segment is expired and
> eligible
> > > to
> > > > be
> > > > > > > > > deleted?
> > > > > > > > > > > > > Maybe it'll be easier to not block the RLMTask when
> > > quota
> > > > > > > > exceeded,
> > > > > > > > > > and
> > > > > > > > > > > > > just check it each time the RLMTask runs?
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thank you.
> > > > > > > > > > > > > Luke
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Wed, Nov 22, 2023 at 6:27 PM Abhijeet Kumar <
> > > > > > > > > > > > abhijeet.cse.kgp@gmail.com
> > > > > > > > > > > > > >
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi All,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > I have created KIP-956 for defining read and
> write
> > > > quota
> > > > > > for
> > > > > > > > > tiered
> > > > > > > > > > > > > > storage.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Feedback and suggestions are welcome.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Regards,
> > > > > > > > > > > > > > Abhijeet.
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > --
> > > > > > > > > > > Abhijeet.
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > > Abhijeet.
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > Abhijeet.
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Abhijeet.
> > > > >
> > > >
> > >
> >
> >
> > --
> > Abhijeet.
> >
>

Re: [DISCUSS] KIP-956: Tiered Storage Quotas

Posted by Jorge Esteban Quilcate Otoya <qu...@gmail.com>.
Hi Abhijeet,

Thanks for the KIP! Looks good to me. I just have a minor comments on
naming:

Would it be work to align the config names to existing quota names?
e.g. `remote.log.manager.copy.byte.rate.quota` (or similar) instead of
`remote.log.manager.copy.max.bytes.per.second`?

Same for new components, could we use the same verbs as in the configs:
- RLMCopyQuotaManager
- RLMFetchQuotaManager


On Fri, 8 Mar 2024 at 13:43, Abhijeet Kumar <ab...@gmail.com>
wrote:

> Thank you all for your comments. As all the comments in the thread are
> addressed, I am starting a Vote thread for the KIP. Please have a look.
>
> Regards.
>
> On Thu, Mar 7, 2024 at 12:34 PM Luke Chen <sh...@gmail.com> wrote:
>
> > Hi Abhijeet,
> >
> > Thanks for the update and the explanation.
> > I had another look, and it LGTM now!
> >
> > Thanks.
> > Luke
> >
> > On Tue, Mar 5, 2024 at 2:50 AM Jun Rao <ju...@confluent.io.invalid> wrote:
> >
> > > Hi, Abhijeet,
> > >
> > > Thanks for the reply. Sounds good to me.
> > >
> > > Jun
> > >
> > >
> > > On Sat, Mar 2, 2024 at 7:40 PM Abhijeet Kumar <
> > abhijeet.cse.kgp@gmail.com>
> > > wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > Thanks for pointing it out. It makes sense to me. We can have the
> > > following
> > > > metrics instead. What do you think?
> > > >
> > > >    - remote-(fetch|copy)-throttle-time-avg (The average time in ms
> > remote
> > > >    fetches/copies was throttled by a broker)
> > > >    - remote-(fetch|copy)-throttle-time--max (The maximum time in ms
> > > remote
> > > >    fetches/copies was throttled by a broker)
> > > >
> > > > These are similar to fetch-throttle-time-avg and
> > fetch-throttle-time-max
> > > > metrics we have for Kafak Consumers?
> > > > The Avg and Max are computed over the (sliding) window as defined by
> > the
> > > > configuration metrics.sample.window.ms on the server.
> > > >
> > > > (Also, I will update the config and metric names to be consistent)
> > > >
> > > > Regards.
> > > >
> > > > On Thu, Feb 29, 2024 at 2:51 AM Jun Rao <ju...@confluent.io.invalid>
> > > wrote:
> > > >
> > > > > Hi, Abhijeet,
> > > > >
> > > > > Thanks for the reply.
> > > > >
> > > > > The issue with recording the throttle time as a gauge is that it's
> > > > > transient. If the metric is not read immediately, the recorded
> value
> > > > could
> > > > > be reset to 0. The admin won't realize that throttling has
> happened.
> > > > >
> > > > > For client quotas, the throttle time is tracked as the average
> > > > > throttle-time per user/client-id. This makes the metric less
> > transient.
> > > > >
> > > > > Also, the configs use read/write whereas the metrics use
> fetch/copy.
> > > > Could
> > > > > we make them consistent?
> > > > >
> > > > > Jun
> > > > >
> > > > > On Wed, Feb 28, 2024 at 6:49 AM Abhijeet Kumar <
> > > > abhijeet.cse.kgp@gmail.com
> > > > > >
> > > > > wrote:
> > > > >
> > > > > > Hi Jun,
> > > > > >
> > > > > > Clarified the meaning of the two metrics. Also updated the KIP.
> > > > > >
> > > > > > kafka.log.remote:type=RemoteLogManager,
> > name=RemoteFetchThrottleTime
> > > ->
> > > > > The
> > > > > > duration of time required at a given moment to bring the observed
> > > fetch
> > > > > > rate within the allowed limit, by preventing further reads.
> > > > > > kafka.log.remote:type=RemoteLogManager,
> name=RemoteCopyThrottleTime
> > > ->
> > > > > The
> > > > > > duration of time required at a given moment to bring the observed
> > > > remote
> > > > > > copy rate within the allowed limit, by preventing further copies.
> > > > > >
> > > > > > Regards.
> > > > > >
> > > > > > On Wed, Feb 28, 2024 at 12:28 AM Jun Rao
> <jun@confluent.io.invalid
> > >
> > > > > wrote:
> > > > > >
> > > > > > > Hi, Abhijeet,
> > > > > > >
> > > > > > > Thanks for the explanation. Makes sense to me now.
> > > > > > >
> > > > > > > Just a minor comment. Could you document the exact meaning of
> the
> > > > > > following
> > > > > > > two metrics? For example, is the time accumulated? If so, is it
> > > from
> > > > > the
> > > > > > > start of the broker or over some window?
> > > > > > >
> > > > > > > kafka.log.remote:type=RemoteLogManager,
> > > name=RemoteFetchThrottleTime
> > > > > > > kafka.log.remote:type=RemoteLogManager,
> > name=RemoteCopyThrottleTime
> > > > > > >
> > > > > > > Jun
> > > > > > >
> > > > > > > On Tue, Feb 27, 2024 at 1:39 AM Abhijeet Kumar <
> > > > > > abhijeet.cse.kgp@gmail.com
> > > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Jun,
> > > > > > > >
> > > > > > > > The existing quota system for consumers is designed to
> throttle
> > > the
> > > > > > > > consumer if it exceeds the allowed fetch rate.
> > > > > > > > The additional quota we want to add works on the broker
> level.
> > If
> > > > the
> > > > > > > > broker-level remote read quota is being
> > > > > > > > exceeded, we prevent additional reads from the remote storage
> > but
> > > > do
> > > > > > not
> > > > > > > > prevent local reads for the consumer.
> > > > > > > > If the consumer has specified other partitions to read, which
> > can
> > > > be
> > > > > > > served
> > > > > > > > from local, it can continue to read those
> > > > > > > > partitions. To elaborate more, we make a check for quota
> > exceeded
> > > > if
> > > > > we
> > > > > > > > know a segment needs to be read from
> > > > > > > > remote. If the quota is exceeded, we simply skip the
> partition
> > > and
> > > > > move
> > > > > > > to
> > > > > > > > other segments in the fetch request.
> > > > > > > > This way consumers can continue to read the local data as
> long
> > as
> > > > > they
> > > > > > > have
> > > > > > > > not exceeded the client-level quota.
> > > > > > > >
> > > > > > > > Also, when we choose the appropriate consumer-level quota, we
> > > would
> > > > > > > > typically look at what kind of local fetch
> > > > > > > > throughput is supported. If we were to reuse the same
> consumer
> > > > quota,
> > > > > > we
> > > > > > > > should also consider the throughput
> > > > > > > > the remote storage supports. The throughput supported by
> remote
> > > may
> > > > > be
> > > > > > > > less/more than the throughput supported
> > > > > > > > by local (when using a cloud provider, it depends on the plan
> > > opted
> > > > > by
> > > > > > > the
> > > > > > > > user). The consumer quota has to be carefully
> > > > > > > > set considering both local and remote throughput. Instead, if
> > we
> > > > > have a
> > > > > > > > separate quota, it makes things much simpler
> > > > > > > > for the user, since they already know what throughput their
> > > remote
> > > > > > > storage
> > > > > > > > supports.
> > > > > > > >
> > > > > > > > (Also, thanks for pointing out. I will update the KIP based
> on
> > > the
> > > > > > > > discussion)
> > > > > > > >
> > > > > > > > Regards,
> > > > > > > > Abhijeet.
> > > > > > > >
> > > > > > > > On Tue, Feb 27, 2024 at 2:49 AM Jun Rao
> > <jun@confluent.io.invalid
> > > >
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi, Abhijeet,
> > > > > > > > >
> > > > > > > > > Sorry for the late reply. It seems that you haven't updated
> > the
> > > > KIP
> > > > > > > based
> > > > > > > > > on the discussion? One more comment.
> > > > > > > > >
> > > > > > > > > 11. Currently, we already have a quota system for both the
> > > > > producers
> > > > > > > and
> > > > > > > > > consumers. I can understand why we need an additional
> > > > > > > > > remote.log.manager.write.quota.default quota. For example,
> > when
> > > > > tier
> > > > > > > > > storage is enabled for the first time, there could be a lot
> > of
> > > > > > segments
> > > > > > > > > that need to be written to the remote storage, even though
> > > there
> > > > is
> > > > > > no
> > > > > > > > > increase in the produced data. However, I am not sure about
> > an
> > > > > > > > > additional remote.log.manager.read.quota.default. The KIP
> > says
> > > > that
> > > > > > the
> > > > > > > > > reason is "This may happen when the majority of the
> consumers
> > > > start
> > > > > > > > reading
> > > > > > > > > from the earliest offset of their respective Kafka
> topics.".
> > > > > However,
> > > > > > > > this
> > > > > > > > > can happen with or without tier storage and the current
> quota
> > > > > system
> > > > > > > for
> > > > > > > > > consumers is designed for solving this exact problem. Could
> > you
> > > > > > explain
> > > > > > > > the
> > > > > > > > > usage of this additional quota?
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > >
> > > > > > > > > Jun
> > > > > > > > >
> > > > > > > > > On Mon, Feb 12, 2024 at 11:08 AM Abhijeet Kumar <
> > > > > > > > > abhijeet.cse.kgp@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Comments inline
> > > > > > > > > >
> > > > > > > > > > On Wed, Dec 6, 2023 at 1:12 AM Jun Rao
> > > > <jun@confluent.io.invalid
> > > > > >
> > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi, Abhijeet,
> > > > > > > > > > >
> > > > > > > > > > > Thanks for the KIP. A few comments.
> > > > > > > > > > >
> > > > > > > > > > > 10. remote.log.manager.write.quota.default:
> > > > > > > > > > > 10.1 For other configs, we
> > > > > > > > > > > use replica.alter.log.dirs.io.max.bytes.per.second. To
> be
> > > > > > > consistent,
> > > > > > > > > > > perhaps this can be sth like
> > > > > > > > > > remote.log.manager.write.max.bytes.per.second.
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > This makes sense, we can rename the following configs to
> be
> > > > > > > consistent.
> > > > > > > > > >
> > > > > > > > > > Remote.log.manager.write.quota.default ->
> > > > > > > > > > remote.log.manager.write.max.bytes.per.second
> > > > > > > > > >
> > > > > > > > > > Remote.log.manager.read.quota.default ->
> > > > > > > > > > remote.log.manager.read.max.bytes.per.second.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > > 10.2 Could we list the new metrics associated with the
> > new
> > > > > quota.
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > We will add the following metrics as mentioned in the
> other
> > > > > > response.
> > > > > > > > > > *RemoteFetchThrottleTime* - The amount of time needed to
> > > bring
> > > > > the
> > > > > > > > > observed
> > > > > > > > > > remote fetch rate within the read quota
> > > > > > > > > > *RemoteCopyThrottleTime *- The amount of time needed to
> > bring
> > > > the
> > > > > > > > > observed
> > > > > > > > > > remote copy rate with the copy quota.
> > > > > > > > > >
> > > > > > > > > > 10.3 Is this dynamically configurable? If so, could we
> > > document
> > > > > the
> > > > > > > > > impact
> > > > > > > > > > > to tools like kafka-configs.sh and AdminClient?
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Yes, the quotas are dynamically configurable. We will add
> > > them
> > > > as
> > > > > > > > Dynamic
> > > > > > > > > > Broker Configs. Users will be able to change
> > > > > > > > > > the following configs using either kafka-configs.sh or
> > > > > AdminClient
> > > > > > by
> > > > > > > > > > specifying the config name and new value. For eg.
> > > > > > > > > >
> > > > > > > > > > Using kafka-configs.sh
> > > > > > > > > >
> > > > > > > > > > bin/kafka-configs.sh --bootstrap-server
> <bootstrap-server>
> > > > > > > > --entity-type
> > > > > > > > > > brokers --entity-default --alter --add-config
> > > > > > > > > > remote.log.manager.write.max.bytes.per.second=52428800
> > > > > > > > > >
> > > > > > > > > > Using AdminClient
> > > > > > > > > >
> > > > > > > > > > ConfigEntry configEntry = new
> > > > > > > > > >
> > ConfigEntry("remote.log.manager.write.max.bytes.per.second",
> > > > > > > > "5242800");
> > > > > > > > > > AlterConfigOp alterConfigOp = new
> > AlterConfigOp(configEntry,
> > > > > > > > > > AlterConfigOp.OpType.SET);
> > > > > > > > > > List<AlterConfigOp> alterConfigOps =
> > > > > > > > > > Collections.singletonList(alterConfigOp);
> > > > > > > > > >
> > > > > > > > > > ConfigResource resource = new
> > > > > > > > ConfigResource(ConfigResource.Type.BROKER,
> > > > > > > > > > "");
> > > > > > > > > > Map<ConfigResource, Collection<AlterConfigOp>>
> > updateConfig =
> > > > > > > > > > ImmutableMap.of(resource, alterConfigOps);
> > > > > > > > > > adminClient.incrementalAlterConfigs(updateConfig);
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > Jun
> > > > > > > > > > >
> > > > > > > > > > > On Tue, Nov 28, 2023 at 2:19 AM Luke Chen <
> > > showuon@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi Abhijeet,
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks for the KIP!
> > > > > > > > > > > > This is an important feature for tiered storage.
> > > > > > > > > > > >
> > > > > > > > > > > > Some comments:
> > > > > > > > > > > > 1. Will we introduce new metrics for this tiered
> > storage
> > > > > > quotas?
> > > > > > > > > > > > This is important because the admin can know the
> > > throttling
> > > > > > > status
> > > > > > > > by
> > > > > > > > > > > > checking the metrics while the remote write/read are
> > > slow,
> > > > > like
> > > > > > > the
> > > > > > > > > > rate
> > > > > > > > > > > of
> > > > > > > > > > > > uploading/reading byte rate, the throttled time for
> > > > > > > upload/read...
> > > > > > > > > etc.
> > > > > > > > > > > >
> > > > > > > > > > > > 2. Could you give some examples for the throttling
> > > > algorithm
> > > > > in
> > > > > > > the
> > > > > > > > > KIP
> > > > > > > > > > > to
> > > > > > > > > > > > explain it? That will make it much clearer.
> > > > > > > > > > > >
> > > > > > > > > > > > 3. To solve this problem, we can break down the
> RLMTask
> > > > into
> > > > > > two
> > > > > > > > > > smaller
> > > > > > > > > > > > tasks - one for segment upload and the other for
> > handling
> > > > > > expired
> > > > > > > > > > > segments.
> > > > > > > > > > > > How do we handle the situation when a segment is
> still
> > > > > waiting
> > > > > > > for
> > > > > > > > > > > > offloading while this segment is expired and eligible
> > to
> > > be
> > > > > > > > deleted?
> > > > > > > > > > > > Maybe it'll be easier to not block the RLMTask when
> > quota
> > > > > > > exceeded,
> > > > > > > > > and
> > > > > > > > > > > > just check it each time the RLMTask runs?
> > > > > > > > > > > >
> > > > > > > > > > > > Thank you.
> > > > > > > > > > > > Luke
> > > > > > > > > > > >
> > > > > > > > > > > > On Wed, Nov 22, 2023 at 6:27 PM Abhijeet Kumar <
> > > > > > > > > > > abhijeet.cse.kgp@gmail.com
> > > > > > > > > > > > >
> > > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi All,
> > > > > > > > > > > > >
> > > > > > > > > > > > > I have created KIP-956 for defining read and write
> > > quota
> > > > > for
> > > > > > > > tiered
> > > > > > > > > > > > > storage.
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas
> > > > > > > > > > > > >
> > > > > > > > > > > > > Feedback and suggestions are welcome.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Regards,
> > > > > > > > > > > > > Abhijeet.
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > --
> > > > > > > > > > Abhijeet.
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > Abhijeet.
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Abhijeet.
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > Abhijeet.
> > > >
> > >
> >
>
>
> --
> Abhijeet.
>

Re: [DISCUSS] KIP-956: Tiered Storage Quotas

Posted by Abhijeet Kumar <ab...@gmail.com>.
Thank you all for your comments. As all the comments in the thread are
addressed, I am starting a Vote thread for the KIP. Please have a look.

Regards.

On Thu, Mar 7, 2024 at 12:34 PM Luke Chen <sh...@gmail.com> wrote:

> Hi Abhijeet,
>
> Thanks for the update and the explanation.
> I had another look, and it LGTM now!
>
> Thanks.
> Luke
>
> On Tue, Mar 5, 2024 at 2:50 AM Jun Rao <ju...@confluent.io.invalid> wrote:
>
> > Hi, Abhijeet,
> >
> > Thanks for the reply. Sounds good to me.
> >
> > Jun
> >
> >
> > On Sat, Mar 2, 2024 at 7:40 PM Abhijeet Kumar <
> abhijeet.cse.kgp@gmail.com>
> > wrote:
> >
> > > Hi Jun,
> > >
> > > Thanks for pointing it out. It makes sense to me. We can have the
> > following
> > > metrics instead. What do you think?
> > >
> > >    - remote-(fetch|copy)-throttle-time-avg (The average time in ms
> remote
> > >    fetches/copies was throttled by a broker)
> > >    - remote-(fetch|copy)-throttle-time--max (The maximum time in ms
> > remote
> > >    fetches/copies was throttled by a broker)
> > >
> > > These are similar to fetch-throttle-time-avg and
> fetch-throttle-time-max
> > > metrics we have for Kafak Consumers?
> > > The Avg and Max are computed over the (sliding) window as defined by
> the
> > > configuration metrics.sample.window.ms on the server.
> > >
> > > (Also, I will update the config and metric names to be consistent)
> > >
> > > Regards.
> > >
> > > On Thu, Feb 29, 2024 at 2:51 AM Jun Rao <ju...@confluent.io.invalid>
> > wrote:
> > >
> > > > Hi, Abhijeet,
> > > >
> > > > Thanks for the reply.
> > > >
> > > > The issue with recording the throttle time as a gauge is that it's
> > > > transient. If the metric is not read immediately, the recorded value
> > > could
> > > > be reset to 0. The admin won't realize that throttling has happened.
> > > >
> > > > For client quotas, the throttle time is tracked as the average
> > > > throttle-time per user/client-id. This makes the metric less
> transient.
> > > >
> > > > Also, the configs use read/write whereas the metrics use fetch/copy.
> > > Could
> > > > we make them consistent?
> > > >
> > > > Jun
> > > >
> > > > On Wed, Feb 28, 2024 at 6:49 AM Abhijeet Kumar <
> > > abhijeet.cse.kgp@gmail.com
> > > > >
> > > > wrote:
> > > >
> > > > > Hi Jun,
> > > > >
> > > > > Clarified the meaning of the two metrics. Also updated the KIP.
> > > > >
> > > > > kafka.log.remote:type=RemoteLogManager,
> name=RemoteFetchThrottleTime
> > ->
> > > > The
> > > > > duration of time required at a given moment to bring the observed
> > fetch
> > > > > rate within the allowed limit, by preventing further reads.
> > > > > kafka.log.remote:type=RemoteLogManager, name=RemoteCopyThrottleTime
> > ->
> > > > The
> > > > > duration of time required at a given moment to bring the observed
> > > remote
> > > > > copy rate within the allowed limit, by preventing further copies.
> > > > >
> > > > > Regards.
> > > > >
> > > > > On Wed, Feb 28, 2024 at 12:28 AM Jun Rao <jun@confluent.io.invalid
> >
> > > > wrote:
> > > > >
> > > > > > Hi, Abhijeet,
> > > > > >
> > > > > > Thanks for the explanation. Makes sense to me now.
> > > > > >
> > > > > > Just a minor comment. Could you document the exact meaning of the
> > > > > following
> > > > > > two metrics? For example, is the time accumulated? If so, is it
> > from
> > > > the
> > > > > > start of the broker or over some window?
> > > > > >
> > > > > > kafka.log.remote:type=RemoteLogManager,
> > name=RemoteFetchThrottleTime
> > > > > > kafka.log.remote:type=RemoteLogManager,
> name=RemoteCopyThrottleTime
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > > On Tue, Feb 27, 2024 at 1:39 AM Abhijeet Kumar <
> > > > > abhijeet.cse.kgp@gmail.com
> > > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Jun,
> > > > > > >
> > > > > > > The existing quota system for consumers is designed to throttle
> > the
> > > > > > > consumer if it exceeds the allowed fetch rate.
> > > > > > > The additional quota we want to add works on the broker level.
> If
> > > the
> > > > > > > broker-level remote read quota is being
> > > > > > > exceeded, we prevent additional reads from the remote storage
> but
> > > do
> > > > > not
> > > > > > > prevent local reads for the consumer.
> > > > > > > If the consumer has specified other partitions to read, which
> can
> > > be
> > > > > > served
> > > > > > > from local, it can continue to read those
> > > > > > > partitions. To elaborate more, we make a check for quota
> exceeded
> > > if
> > > > we
> > > > > > > know a segment needs to be read from
> > > > > > > remote. If the quota is exceeded, we simply skip the partition
> > and
> > > > move
> > > > > > to
> > > > > > > other segments in the fetch request.
> > > > > > > This way consumers can continue to read the local data as long
> as
> > > > they
> > > > > > have
> > > > > > > not exceeded the client-level quota.
> > > > > > >
> > > > > > > Also, when we choose the appropriate consumer-level quota, we
> > would
> > > > > > > typically look at what kind of local fetch
> > > > > > > throughput is supported. If we were to reuse the same consumer
> > > quota,
> > > > > we
> > > > > > > should also consider the throughput
> > > > > > > the remote storage supports. The throughput supported by remote
> > may
> > > > be
> > > > > > > less/more than the throughput supported
> > > > > > > by local (when using a cloud provider, it depends on the plan
> > opted
> > > > by
> > > > > > the
> > > > > > > user). The consumer quota has to be carefully
> > > > > > > set considering both local and remote throughput. Instead, if
> we
> > > > have a
> > > > > > > separate quota, it makes things much simpler
> > > > > > > for the user, since they already know what throughput their
> > remote
> > > > > > storage
> > > > > > > supports.
> > > > > > >
> > > > > > > (Also, thanks for pointing out. I will update the KIP based on
> > the
> > > > > > > discussion)
> > > > > > >
> > > > > > > Regards,
> > > > > > > Abhijeet.
> > > > > > >
> > > > > > > On Tue, Feb 27, 2024 at 2:49 AM Jun Rao
> <jun@confluent.io.invalid
> > >
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hi, Abhijeet,
> > > > > > > >
> > > > > > > > Sorry for the late reply. It seems that you haven't updated
> the
> > > KIP
> > > > > > based
> > > > > > > > on the discussion? One more comment.
> > > > > > > >
> > > > > > > > 11. Currently, we already have a quota system for both the
> > > > producers
> > > > > > and
> > > > > > > > consumers. I can understand why we need an additional
> > > > > > > > remote.log.manager.write.quota.default quota. For example,
> when
> > > > tier
> > > > > > > > storage is enabled for the first time, there could be a lot
> of
> > > > > segments
> > > > > > > > that need to be written to the remote storage, even though
> > there
> > > is
> > > > > no
> > > > > > > > increase in the produced data. However, I am not sure about
> an
> > > > > > > > additional remote.log.manager.read.quota.default. The KIP
> says
> > > that
> > > > > the
> > > > > > > > reason is "This may happen when the majority of the consumers
> > > start
> > > > > > > reading
> > > > > > > > from the earliest offset of their respective Kafka topics.".
> > > > However,
> > > > > > > this
> > > > > > > > can happen with or without tier storage and the current quota
> > > > system
> > > > > > for
> > > > > > > > consumers is designed for solving this exact problem. Could
> you
> > > > > explain
> > > > > > > the
> > > > > > > > usage of this additional quota?
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Jun
> > > > > > > >
> > > > > > > > On Mon, Feb 12, 2024 at 11:08 AM Abhijeet Kumar <
> > > > > > > > abhijeet.cse.kgp@gmail.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Comments inline
> > > > > > > > >
> > > > > > > > > On Wed, Dec 6, 2023 at 1:12 AM Jun Rao
> > > <jun@confluent.io.invalid
> > > > >
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi, Abhijeet,
> > > > > > > > > >
> > > > > > > > > > Thanks for the KIP. A few comments.
> > > > > > > > > >
> > > > > > > > > > 10. remote.log.manager.write.quota.default:
> > > > > > > > > > 10.1 For other configs, we
> > > > > > > > > > use replica.alter.log.dirs.io.max.bytes.per.second. To be
> > > > > > consistent,
> > > > > > > > > > perhaps this can be sth like
> > > > > > > > > remote.log.manager.write.max.bytes.per.second.
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > > This makes sense, we can rename the following configs to be
> > > > > > consistent.
> > > > > > > > >
> > > > > > > > > Remote.log.manager.write.quota.default ->
> > > > > > > > > remote.log.manager.write.max.bytes.per.second
> > > > > > > > >
> > > > > > > > > Remote.log.manager.read.quota.default ->
> > > > > > > > > remote.log.manager.read.max.bytes.per.second.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > > 10.2 Could we list the new metrics associated with the
> new
> > > > quota.
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > > We will add the following metrics as mentioned in the other
> > > > > response.
> > > > > > > > > *RemoteFetchThrottleTime* - The amount of time needed to
> > bring
> > > > the
> > > > > > > > observed
> > > > > > > > > remote fetch rate within the read quota
> > > > > > > > > *RemoteCopyThrottleTime *- The amount of time needed to
> bring
> > > the
> > > > > > > > observed
> > > > > > > > > remote copy rate with the copy quota.
> > > > > > > > >
> > > > > > > > > 10.3 Is this dynamically configurable? If so, could we
> > document
> > > > the
> > > > > > > > impact
> > > > > > > > > > to tools like kafka-configs.sh and AdminClient?
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > > Yes, the quotas are dynamically configurable. We will add
> > them
> > > as
> > > > > > > Dynamic
> > > > > > > > > Broker Configs. Users will be able to change
> > > > > > > > > the following configs using either kafka-configs.sh or
> > > > AdminClient
> > > > > by
> > > > > > > > > specifying the config name and new value. For eg.
> > > > > > > > >
> > > > > > > > > Using kafka-configs.sh
> > > > > > > > >
> > > > > > > > > bin/kafka-configs.sh --bootstrap-server <bootstrap-server>
> > > > > > > --entity-type
> > > > > > > > > brokers --entity-default --alter --add-config
> > > > > > > > > remote.log.manager.write.max.bytes.per.second=52428800
> > > > > > > > >
> > > > > > > > > Using AdminClient
> > > > > > > > >
> > > > > > > > > ConfigEntry configEntry = new
> > > > > > > > >
> ConfigEntry("remote.log.manager.write.max.bytes.per.second",
> > > > > > > "5242800");
> > > > > > > > > AlterConfigOp alterConfigOp = new
> AlterConfigOp(configEntry,
> > > > > > > > > AlterConfigOp.OpType.SET);
> > > > > > > > > List<AlterConfigOp> alterConfigOps =
> > > > > > > > > Collections.singletonList(alterConfigOp);
> > > > > > > > >
> > > > > > > > > ConfigResource resource = new
> > > > > > > ConfigResource(ConfigResource.Type.BROKER,
> > > > > > > > > "");
> > > > > > > > > Map<ConfigResource, Collection<AlterConfigOp>>
> updateConfig =
> > > > > > > > > ImmutableMap.of(resource, alterConfigOps);
> > > > > > > > > adminClient.incrementalAlterConfigs(updateConfig);
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Jun
> > > > > > > > > >
> > > > > > > > > > On Tue, Nov 28, 2023 at 2:19 AM Luke Chen <
> > showuon@gmail.com
> > > >
> > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Abhijeet,
> > > > > > > > > > >
> > > > > > > > > > > Thanks for the KIP!
> > > > > > > > > > > This is an important feature for tiered storage.
> > > > > > > > > > >
> > > > > > > > > > > Some comments:
> > > > > > > > > > > 1. Will we introduce new metrics for this tiered
> storage
> > > > > quotas?
> > > > > > > > > > > This is important because the admin can know the
> > throttling
> > > > > > status
> > > > > > > by
> > > > > > > > > > > checking the metrics while the remote write/read are
> > slow,
> > > > like
> > > > > > the
> > > > > > > > > rate
> > > > > > > > > > of
> > > > > > > > > > > uploading/reading byte rate, the throttled time for
> > > > > > upload/read...
> > > > > > > > etc.
> > > > > > > > > > >
> > > > > > > > > > > 2. Could you give some examples for the throttling
> > > algorithm
> > > > in
> > > > > > the
> > > > > > > > KIP
> > > > > > > > > > to
> > > > > > > > > > > explain it? That will make it much clearer.
> > > > > > > > > > >
> > > > > > > > > > > 3. To solve this problem, we can break down the RLMTask
> > > into
> > > > > two
> > > > > > > > > smaller
> > > > > > > > > > > tasks - one for segment upload and the other for
> handling
> > > > > expired
> > > > > > > > > > segments.
> > > > > > > > > > > How do we handle the situation when a segment is still
> > > > waiting
> > > > > > for
> > > > > > > > > > > offloading while this segment is expired and eligible
> to
> > be
> > > > > > > deleted?
> > > > > > > > > > > Maybe it'll be easier to not block the RLMTask when
> quota
> > > > > > exceeded,
> > > > > > > > and
> > > > > > > > > > > just check it each time the RLMTask runs?
> > > > > > > > > > >
> > > > > > > > > > > Thank you.
> > > > > > > > > > > Luke
> > > > > > > > > > >
> > > > > > > > > > > On Wed, Nov 22, 2023 at 6:27 PM Abhijeet Kumar <
> > > > > > > > > > abhijeet.cse.kgp@gmail.com
> > > > > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi All,
> > > > > > > > > > > >
> > > > > > > > > > > > I have created KIP-956 for defining read and write
> > quota
> > > > for
> > > > > > > tiered
> > > > > > > > > > > > storage.
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas
> > > > > > > > > > > >
> > > > > > > > > > > > Feedback and suggestions are welcome.
> > > > > > > > > > > >
> > > > > > > > > > > > Regards,
> > > > > > > > > > > > Abhijeet.
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > > Abhijeet.
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > Abhijeet.
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Abhijeet.
> > > > >
> > > >
> > >
> > >
> > > --
> > > Abhijeet.
> > >
> >
>


-- 
Abhijeet.

Re: [DISCUSS] KIP-956: Tiered Storage Quotas

Posted by Luke Chen <sh...@gmail.com>.
Hi Abhijeet,

Thanks for the update and the explanation.
I had another look, and it LGTM now!

Thanks.
Luke

On Tue, Mar 5, 2024 at 2:50 AM Jun Rao <ju...@confluent.io.invalid> wrote:

> Hi, Abhijeet,
>
> Thanks for the reply. Sounds good to me.
>
> Jun
>
>
> On Sat, Mar 2, 2024 at 7:40 PM Abhijeet Kumar <ab...@gmail.com>
> wrote:
>
> > Hi Jun,
> >
> > Thanks for pointing it out. It makes sense to me. We can have the
> following
> > metrics instead. What do you think?
> >
> >    - remote-(fetch|copy)-throttle-time-avg (The average time in ms remote
> >    fetches/copies was throttled by a broker)
> >    - remote-(fetch|copy)-throttle-time--max (The maximum time in ms
> remote
> >    fetches/copies was throttled by a broker)
> >
> > These are similar to fetch-throttle-time-avg and fetch-throttle-time-max
> > metrics we have for Kafak Consumers?
> > The Avg and Max are computed over the (sliding) window as defined by the
> > configuration metrics.sample.window.ms on the server.
> >
> > (Also, I will update the config and metric names to be consistent)
> >
> > Regards.
> >
> > On Thu, Feb 29, 2024 at 2:51 AM Jun Rao <ju...@confluent.io.invalid>
> wrote:
> >
> > > Hi, Abhijeet,
> > >
> > > Thanks for the reply.
> > >
> > > The issue with recording the throttle time as a gauge is that it's
> > > transient. If the metric is not read immediately, the recorded value
> > could
> > > be reset to 0. The admin won't realize that throttling has happened.
> > >
> > > For client quotas, the throttle time is tracked as the average
> > > throttle-time per user/client-id. This makes the metric less transient.
> > >
> > > Also, the configs use read/write whereas the metrics use fetch/copy.
> > Could
> > > we make them consistent?
> > >
> > > Jun
> > >
> > > On Wed, Feb 28, 2024 at 6:49 AM Abhijeet Kumar <
> > abhijeet.cse.kgp@gmail.com
> > > >
> > > wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > Clarified the meaning of the two metrics. Also updated the KIP.
> > > >
> > > > kafka.log.remote:type=RemoteLogManager, name=RemoteFetchThrottleTime
> ->
> > > The
> > > > duration of time required at a given moment to bring the observed
> fetch
> > > > rate within the allowed limit, by preventing further reads.
> > > > kafka.log.remote:type=RemoteLogManager, name=RemoteCopyThrottleTime
> ->
> > > The
> > > > duration of time required at a given moment to bring the observed
> > remote
> > > > copy rate within the allowed limit, by preventing further copies.
> > > >
> > > > Regards.
> > > >
> > > > On Wed, Feb 28, 2024 at 12:28 AM Jun Rao <ju...@confluent.io.invalid>
> > > wrote:
> > > >
> > > > > Hi, Abhijeet,
> > > > >
> > > > > Thanks for the explanation. Makes sense to me now.
> > > > >
> > > > > Just a minor comment. Could you document the exact meaning of the
> > > > following
> > > > > two metrics? For example, is the time accumulated? If so, is it
> from
> > > the
> > > > > start of the broker or over some window?
> > > > >
> > > > > kafka.log.remote:type=RemoteLogManager,
> name=RemoteFetchThrottleTime
> > > > > kafka.log.remote:type=RemoteLogManager, name=RemoteCopyThrottleTime
> > > > >
> > > > > Jun
> > > > >
> > > > > On Tue, Feb 27, 2024 at 1:39 AM Abhijeet Kumar <
> > > > abhijeet.cse.kgp@gmail.com
> > > > > >
> > > > > wrote:
> > > > >
> > > > > > Hi Jun,
> > > > > >
> > > > > > The existing quota system for consumers is designed to throttle
> the
> > > > > > consumer if it exceeds the allowed fetch rate.
> > > > > > The additional quota we want to add works on the broker level. If
> > the
> > > > > > broker-level remote read quota is being
> > > > > > exceeded, we prevent additional reads from the remote storage but
> > do
> > > > not
> > > > > > prevent local reads for the consumer.
> > > > > > If the consumer has specified other partitions to read, which can
> > be
> > > > > served
> > > > > > from local, it can continue to read those
> > > > > > partitions. To elaborate more, we make a check for quota exceeded
> > if
> > > we
> > > > > > know a segment needs to be read from
> > > > > > remote. If the quota is exceeded, we simply skip the partition
> and
> > > move
> > > > > to
> > > > > > other segments in the fetch request.
> > > > > > This way consumers can continue to read the local data as long as
> > > they
> > > > > have
> > > > > > not exceeded the client-level quota.
> > > > > >
> > > > > > Also, when we choose the appropriate consumer-level quota, we
> would
> > > > > > typically look at what kind of local fetch
> > > > > > throughput is supported. If we were to reuse the same consumer
> > quota,
> > > > we
> > > > > > should also consider the throughput
> > > > > > the remote storage supports. The throughput supported by remote
> may
> > > be
> > > > > > less/more than the throughput supported
> > > > > > by local (when using a cloud provider, it depends on the plan
> opted
> > > by
> > > > > the
> > > > > > user). The consumer quota has to be carefully
> > > > > > set considering both local and remote throughput. Instead, if we
> > > have a
> > > > > > separate quota, it makes things much simpler
> > > > > > for the user, since they already know what throughput their
> remote
> > > > > storage
> > > > > > supports.
> > > > > >
> > > > > > (Also, thanks for pointing out. I will update the KIP based on
> the
> > > > > > discussion)
> > > > > >
> > > > > > Regards,
> > > > > > Abhijeet.
> > > > > >
> > > > > > On Tue, Feb 27, 2024 at 2:49 AM Jun Rao <jun@confluent.io.invalid
> >
> > > > > wrote:
> > > > > >
> > > > > > > Hi, Abhijeet,
> > > > > > >
> > > > > > > Sorry for the late reply. It seems that you haven't updated the
> > KIP
> > > > > based
> > > > > > > on the discussion? One more comment.
> > > > > > >
> > > > > > > 11. Currently, we already have a quota system for both the
> > > producers
> > > > > and
> > > > > > > consumers. I can understand why we need an additional
> > > > > > > remote.log.manager.write.quota.default quota. For example, when
> > > tier
> > > > > > > storage is enabled for the first time, there could be a lot of
> > > > segments
> > > > > > > that need to be written to the remote storage, even though
> there
> > is
> > > > no
> > > > > > > increase in the produced data. However, I am not sure about an
> > > > > > > additional remote.log.manager.read.quota.default. The KIP says
> > that
> > > > the
> > > > > > > reason is "This may happen when the majority of the consumers
> > start
> > > > > > reading
> > > > > > > from the earliest offset of their respective Kafka topics.".
> > > However,
> > > > > > this
> > > > > > > can happen with or without tier storage and the current quota
> > > system
> > > > > for
> > > > > > > consumers is designed for solving this exact problem. Could you
> > > > explain
> > > > > > the
> > > > > > > usage of this additional quota?
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jun
> > > > > > >
> > > > > > > On Mon, Feb 12, 2024 at 11:08 AM Abhijeet Kumar <
> > > > > > > abhijeet.cse.kgp@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Comments inline
> > > > > > > >
> > > > > > > > On Wed, Dec 6, 2023 at 1:12 AM Jun Rao
> > <jun@confluent.io.invalid
> > > >
> > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi, Abhijeet,
> > > > > > > > >
> > > > > > > > > Thanks for the KIP. A few comments.
> > > > > > > > >
> > > > > > > > > 10. remote.log.manager.write.quota.default:
> > > > > > > > > 10.1 For other configs, we
> > > > > > > > > use replica.alter.log.dirs.io.max.bytes.per.second. To be
> > > > > consistent,
> > > > > > > > > perhaps this can be sth like
> > > > > > > > remote.log.manager.write.max.bytes.per.second.
> > > > > > > > >
> > > > > > > >
> > > > > > > > This makes sense, we can rename the following configs to be
> > > > > consistent.
> > > > > > > >
> > > > > > > > Remote.log.manager.write.quota.default ->
> > > > > > > > remote.log.manager.write.max.bytes.per.second
> > > > > > > >
> > > > > > > > Remote.log.manager.read.quota.default ->
> > > > > > > > remote.log.manager.read.max.bytes.per.second.
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > > 10.2 Could we list the new metrics associated with the new
> > > quota.
> > > > > > > > >
> > > > > > > >
> > > > > > > > We will add the following metrics as mentioned in the other
> > > > response.
> > > > > > > > *RemoteFetchThrottleTime* - The amount of time needed to
> bring
> > > the
> > > > > > > observed
> > > > > > > > remote fetch rate within the read quota
> > > > > > > > *RemoteCopyThrottleTime *- The amount of time needed to bring
> > the
> > > > > > > observed
> > > > > > > > remote copy rate with the copy quota.
> > > > > > > >
> > > > > > > > 10.3 Is this dynamically configurable? If so, could we
> document
> > > the
> > > > > > > impact
> > > > > > > > > to tools like kafka-configs.sh and AdminClient?
> > > > > > > > >
> > > > > > > >
> > > > > > > > Yes, the quotas are dynamically configurable. We will add
> them
> > as
> > > > > > Dynamic
> > > > > > > > Broker Configs. Users will be able to change
> > > > > > > > the following configs using either kafka-configs.sh or
> > > AdminClient
> > > > by
> > > > > > > > specifying the config name and new value. For eg.
> > > > > > > >
> > > > > > > > Using kafka-configs.sh
> > > > > > > >
> > > > > > > > bin/kafka-configs.sh --bootstrap-server <bootstrap-server>
> > > > > > --entity-type
> > > > > > > > brokers --entity-default --alter --add-config
> > > > > > > > remote.log.manager.write.max.bytes.per.second=52428800
> > > > > > > >
> > > > > > > > Using AdminClient
> > > > > > > >
> > > > > > > > ConfigEntry configEntry = new
> > > > > > > > ConfigEntry("remote.log.manager.write.max.bytes.per.second",
> > > > > > "5242800");
> > > > > > > > AlterConfigOp alterConfigOp = new AlterConfigOp(configEntry,
> > > > > > > > AlterConfigOp.OpType.SET);
> > > > > > > > List<AlterConfigOp> alterConfigOps =
> > > > > > > > Collections.singletonList(alterConfigOp);
> > > > > > > >
> > > > > > > > ConfigResource resource = new
> > > > > > ConfigResource(ConfigResource.Type.BROKER,
> > > > > > > > "");
> > > > > > > > Map<ConfigResource, Collection<AlterConfigOp>> updateConfig =
> > > > > > > > ImmutableMap.of(resource, alterConfigOps);
> > > > > > > > adminClient.incrementalAlterConfigs(updateConfig);
> > > > > > > >
> > > > > > > >
> > > > > > > > >
> > > > > > > > > Jun
> > > > > > > > >
> > > > > > > > > On Tue, Nov 28, 2023 at 2:19 AM Luke Chen <
> showuon@gmail.com
> > >
> > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Abhijeet,
> > > > > > > > > >
> > > > > > > > > > Thanks for the KIP!
> > > > > > > > > > This is an important feature for tiered storage.
> > > > > > > > > >
> > > > > > > > > > Some comments:
> > > > > > > > > > 1. Will we introduce new metrics for this tiered storage
> > > > quotas?
> > > > > > > > > > This is important because the admin can know the
> throttling
> > > > > status
> > > > > > by
> > > > > > > > > > checking the metrics while the remote write/read are
> slow,
> > > like
> > > > > the
> > > > > > > > rate
> > > > > > > > > of
> > > > > > > > > > uploading/reading byte rate, the throttled time for
> > > > > upload/read...
> > > > > > > etc.
> > > > > > > > > >
> > > > > > > > > > 2. Could you give some examples for the throttling
> > algorithm
> > > in
> > > > > the
> > > > > > > KIP
> > > > > > > > > to
> > > > > > > > > > explain it? That will make it much clearer.
> > > > > > > > > >
> > > > > > > > > > 3. To solve this problem, we can break down the RLMTask
> > into
> > > > two
> > > > > > > > smaller
> > > > > > > > > > tasks - one for segment upload and the other for handling
> > > > expired
> > > > > > > > > segments.
> > > > > > > > > > How do we handle the situation when a segment is still
> > > waiting
> > > > > for
> > > > > > > > > > offloading while this segment is expired and eligible to
> be
> > > > > > deleted?
> > > > > > > > > > Maybe it'll be easier to not block the RLMTask when quota
> > > > > exceeded,
> > > > > > > and
> > > > > > > > > > just check it each time the RLMTask runs?
> > > > > > > > > >
> > > > > > > > > > Thank you.
> > > > > > > > > > Luke
> > > > > > > > > >
> > > > > > > > > > On Wed, Nov 22, 2023 at 6:27 PM Abhijeet Kumar <
> > > > > > > > > abhijeet.cse.kgp@gmail.com
> > > > > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi All,
> > > > > > > > > > >
> > > > > > > > > > > I have created KIP-956 for defining read and write
> quota
> > > for
> > > > > > tiered
> > > > > > > > > > > storage.
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas
> > > > > > > > > > >
> > > > > > > > > > > Feedback and suggestions are welcome.
> > > > > > > > > > >
> > > > > > > > > > > Regards,
> > > > > > > > > > > Abhijeet.
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > Abhijeet.
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Abhijeet.
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > Abhijeet.
> > > >
> > >
> >
> >
> > --
> > Abhijeet.
> >
>

Re: [DISCUSS] KIP-956: Tiered Storage Quotas

Posted by Jun Rao <ju...@confluent.io.INVALID>.
Hi, Abhijeet,

Thanks for the reply. Sounds good to me.

Jun


On Sat, Mar 2, 2024 at 7:40 PM Abhijeet Kumar <ab...@gmail.com>
wrote:

> Hi Jun,
>
> Thanks for pointing it out. It makes sense to me. We can have the following
> metrics instead. What do you think?
>
>    - remote-(fetch|copy)-throttle-time-avg (The average time in ms remote
>    fetches/copies was throttled by a broker)
>    - remote-(fetch|copy)-throttle-time--max (The maximum time in ms remote
>    fetches/copies was throttled by a broker)
>
> These are similar to fetch-throttle-time-avg and fetch-throttle-time-max
> metrics we have for Kafak Consumers?
> The Avg and Max are computed over the (sliding) window as defined by the
> configuration metrics.sample.window.ms on the server.
>
> (Also, I will update the config and metric names to be consistent)
>
> Regards.
>
> On Thu, Feb 29, 2024 at 2:51 AM Jun Rao <ju...@confluent.io.invalid> wrote:
>
> > Hi, Abhijeet,
> >
> > Thanks for the reply.
> >
> > The issue with recording the throttle time as a gauge is that it's
> > transient. If the metric is not read immediately, the recorded value
> could
> > be reset to 0. The admin won't realize that throttling has happened.
> >
> > For client quotas, the throttle time is tracked as the average
> > throttle-time per user/client-id. This makes the metric less transient.
> >
> > Also, the configs use read/write whereas the metrics use fetch/copy.
> Could
> > we make them consistent?
> >
> > Jun
> >
> > On Wed, Feb 28, 2024 at 6:49 AM Abhijeet Kumar <
> abhijeet.cse.kgp@gmail.com
> > >
> > wrote:
> >
> > > Hi Jun,
> > >
> > > Clarified the meaning of the two metrics. Also updated the KIP.
> > >
> > > kafka.log.remote:type=RemoteLogManager, name=RemoteFetchThrottleTime ->
> > The
> > > duration of time required at a given moment to bring the observed fetch
> > > rate within the allowed limit, by preventing further reads.
> > > kafka.log.remote:type=RemoteLogManager, name=RemoteCopyThrottleTime ->
> > The
> > > duration of time required at a given moment to bring the observed
> remote
> > > copy rate within the allowed limit, by preventing further copies.
> > >
> > > Regards.
> > >
> > > On Wed, Feb 28, 2024 at 12:28 AM Jun Rao <ju...@confluent.io.invalid>
> > wrote:
> > >
> > > > Hi, Abhijeet,
> > > >
> > > > Thanks for the explanation. Makes sense to me now.
> > > >
> > > > Just a minor comment. Could you document the exact meaning of the
> > > following
> > > > two metrics? For example, is the time accumulated? If so, is it from
> > the
> > > > start of the broker or over some window?
> > > >
> > > > kafka.log.remote:type=RemoteLogManager, name=RemoteFetchThrottleTime
> > > > kafka.log.remote:type=RemoteLogManager, name=RemoteCopyThrottleTime
> > > >
> > > > Jun
> > > >
> > > > On Tue, Feb 27, 2024 at 1:39 AM Abhijeet Kumar <
> > > abhijeet.cse.kgp@gmail.com
> > > > >
> > > > wrote:
> > > >
> > > > > Hi Jun,
> > > > >
> > > > > The existing quota system for consumers is designed to throttle the
> > > > > consumer if it exceeds the allowed fetch rate.
> > > > > The additional quota we want to add works on the broker level. If
> the
> > > > > broker-level remote read quota is being
> > > > > exceeded, we prevent additional reads from the remote storage but
> do
> > > not
> > > > > prevent local reads for the consumer.
> > > > > If the consumer has specified other partitions to read, which can
> be
> > > > served
> > > > > from local, it can continue to read those
> > > > > partitions. To elaborate more, we make a check for quota exceeded
> if
> > we
> > > > > know a segment needs to be read from
> > > > > remote. If the quota is exceeded, we simply skip the partition and
> > move
> > > > to
> > > > > other segments in the fetch request.
> > > > > This way consumers can continue to read the local data as long as
> > they
> > > > have
> > > > > not exceeded the client-level quota.
> > > > >
> > > > > Also, when we choose the appropriate consumer-level quota, we would
> > > > > typically look at what kind of local fetch
> > > > > throughput is supported. If we were to reuse the same consumer
> quota,
> > > we
> > > > > should also consider the throughput
> > > > > the remote storage supports. The throughput supported by remote may
> > be
> > > > > less/more than the throughput supported
> > > > > by local (when using a cloud provider, it depends on the plan opted
> > by
> > > > the
> > > > > user). The consumer quota has to be carefully
> > > > > set considering both local and remote throughput. Instead, if we
> > have a
> > > > > separate quota, it makes things much simpler
> > > > > for the user, since they already know what throughput their remote
> > > > storage
> > > > > supports.
> > > > >
> > > > > (Also, thanks for pointing out. I will update the KIP based on the
> > > > > discussion)
> > > > >
> > > > > Regards,
> > > > > Abhijeet.
> > > > >
> > > > > On Tue, Feb 27, 2024 at 2:49 AM Jun Rao <ju...@confluent.io.invalid>
> > > > wrote:
> > > > >
> > > > > > Hi, Abhijeet,
> > > > > >
> > > > > > Sorry for the late reply. It seems that you haven't updated the
> KIP
> > > > based
> > > > > > on the discussion? One more comment.
> > > > > >
> > > > > > 11. Currently, we already have a quota system for both the
> > producers
> > > > and
> > > > > > consumers. I can understand why we need an additional
> > > > > > remote.log.manager.write.quota.default quota. For example, when
> > tier
> > > > > > storage is enabled for the first time, there could be a lot of
> > > segments
> > > > > > that need to be written to the remote storage, even though there
> is
> > > no
> > > > > > increase in the produced data. However, I am not sure about an
> > > > > > additional remote.log.manager.read.quota.default. The KIP says
> that
> > > the
> > > > > > reason is "This may happen when the majority of the consumers
> start
> > > > > reading
> > > > > > from the earliest offset of their respective Kafka topics.".
> > However,
> > > > > this
> > > > > > can happen with or without tier storage and the current quota
> > system
> > > > for
> > > > > > consumers is designed for solving this exact problem. Could you
> > > explain
> > > > > the
> > > > > > usage of this additional quota?
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > > On Mon, Feb 12, 2024 at 11:08 AM Abhijeet Kumar <
> > > > > > abhijeet.cse.kgp@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Comments inline
> > > > > > >
> > > > > > > On Wed, Dec 6, 2023 at 1:12 AM Jun Rao
> <jun@confluent.io.invalid
> > >
> > > > > wrote:
> > > > > > >
> > > > > > > > Hi, Abhijeet,
> > > > > > > >
> > > > > > > > Thanks for the KIP. A few comments.
> > > > > > > >
> > > > > > > > 10. remote.log.manager.write.quota.default:
> > > > > > > > 10.1 For other configs, we
> > > > > > > > use replica.alter.log.dirs.io.max.bytes.per.second. To be
> > > > consistent,
> > > > > > > > perhaps this can be sth like
> > > > > > > remote.log.manager.write.max.bytes.per.second.
> > > > > > > >
> > > > > > >
> > > > > > > This makes sense, we can rename the following configs to be
> > > > consistent.
> > > > > > >
> > > > > > > Remote.log.manager.write.quota.default ->
> > > > > > > remote.log.manager.write.max.bytes.per.second
> > > > > > >
> > > > > > > Remote.log.manager.read.quota.default ->
> > > > > > > remote.log.manager.read.max.bytes.per.second.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > > 10.2 Could we list the new metrics associated with the new
> > quota.
> > > > > > > >
> > > > > > >
> > > > > > > We will add the following metrics as mentioned in the other
> > > response.
> > > > > > > *RemoteFetchThrottleTime* - The amount of time needed to bring
> > the
> > > > > > observed
> > > > > > > remote fetch rate within the read quota
> > > > > > > *RemoteCopyThrottleTime *- The amount of time needed to bring
> the
> > > > > > observed
> > > > > > > remote copy rate with the copy quota.
> > > > > > >
> > > > > > > 10.3 Is this dynamically configurable? If so, could we document
> > the
> > > > > > impact
> > > > > > > > to tools like kafka-configs.sh and AdminClient?
> > > > > > > >
> > > > > > >
> > > > > > > Yes, the quotas are dynamically configurable. We will add them
> as
> > > > > Dynamic
> > > > > > > Broker Configs. Users will be able to change
> > > > > > > the following configs using either kafka-configs.sh or
> > AdminClient
> > > by
> > > > > > > specifying the config name and new value. For eg.
> > > > > > >
> > > > > > > Using kafka-configs.sh
> > > > > > >
> > > > > > > bin/kafka-configs.sh --bootstrap-server <bootstrap-server>
> > > > > --entity-type
> > > > > > > brokers --entity-default --alter --add-config
> > > > > > > remote.log.manager.write.max.bytes.per.second=52428800
> > > > > > >
> > > > > > > Using AdminClient
> > > > > > >
> > > > > > > ConfigEntry configEntry = new
> > > > > > > ConfigEntry("remote.log.manager.write.max.bytes.per.second",
> > > > > "5242800");
> > > > > > > AlterConfigOp alterConfigOp = new AlterConfigOp(configEntry,
> > > > > > > AlterConfigOp.OpType.SET);
> > > > > > > List<AlterConfigOp> alterConfigOps =
> > > > > > > Collections.singletonList(alterConfigOp);
> > > > > > >
> > > > > > > ConfigResource resource = new
> > > > > ConfigResource(ConfigResource.Type.BROKER,
> > > > > > > "");
> > > > > > > Map<ConfigResource, Collection<AlterConfigOp>> updateConfig =
> > > > > > > ImmutableMap.of(resource, alterConfigOps);
> > > > > > > adminClient.incrementalAlterConfigs(updateConfig);
> > > > > > >
> > > > > > >
> > > > > > > >
> > > > > > > > Jun
> > > > > > > >
> > > > > > > > On Tue, Nov 28, 2023 at 2:19 AM Luke Chen <showuon@gmail.com
> >
> > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Abhijeet,
> > > > > > > > >
> > > > > > > > > Thanks for the KIP!
> > > > > > > > > This is an important feature for tiered storage.
> > > > > > > > >
> > > > > > > > > Some comments:
> > > > > > > > > 1. Will we introduce new metrics for this tiered storage
> > > quotas?
> > > > > > > > > This is important because the admin can know the throttling
> > > > status
> > > > > by
> > > > > > > > > checking the metrics while the remote write/read are slow,
> > like
> > > > the
> > > > > > > rate
> > > > > > > > of
> > > > > > > > > uploading/reading byte rate, the throttled time for
> > > > upload/read...
> > > > > > etc.
> > > > > > > > >
> > > > > > > > > 2. Could you give some examples for the throttling
> algorithm
> > in
> > > > the
> > > > > > KIP
> > > > > > > > to
> > > > > > > > > explain it? That will make it much clearer.
> > > > > > > > >
> > > > > > > > > 3. To solve this problem, we can break down the RLMTask
> into
> > > two
> > > > > > > smaller
> > > > > > > > > tasks - one for segment upload and the other for handling
> > > expired
> > > > > > > > segments.
> > > > > > > > > How do we handle the situation when a segment is still
> > waiting
> > > > for
> > > > > > > > > offloading while this segment is expired and eligible to be
> > > > > deleted?
> > > > > > > > > Maybe it'll be easier to not block the RLMTask when quota
> > > > exceeded,
> > > > > > and
> > > > > > > > > just check it each time the RLMTask runs?
> > > > > > > > >
> > > > > > > > > Thank you.
> > > > > > > > > Luke
> > > > > > > > >
> > > > > > > > > On Wed, Nov 22, 2023 at 6:27 PM Abhijeet Kumar <
> > > > > > > > abhijeet.cse.kgp@gmail.com
> > > > > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi All,
> > > > > > > > > >
> > > > > > > > > > I have created KIP-956 for defining read and write quota
> > for
> > > > > tiered
> > > > > > > > > > storage.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas
> > > > > > > > > >
> > > > > > > > > > Feedback and suggestions are welcome.
> > > > > > > > > >
> > > > > > > > > > Regards,
> > > > > > > > > > Abhijeet.
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > Abhijeet.
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Abhijeet.
> > > > >
> > > >
> > >
> > >
> > > --
> > > Abhijeet.
> > >
> >
>
>
> --
> Abhijeet.
>

Re: [DISCUSS] KIP-956: Tiered Storage Quotas

Posted by Abhijeet Kumar <ab...@gmail.com>.
Hi Jun,

Thanks for pointing it out. It makes sense to me. We can have the following
metrics instead. What do you think?

   - remote-(fetch|copy)-throttle-time-avg (The average time in ms remote
   fetches/copies was throttled by a broker)
   - remote-(fetch|copy)-throttle-time--max (The maximum time in ms remote
   fetches/copies was throttled by a broker)

These are similar to fetch-throttle-time-avg and fetch-throttle-time-max
metrics we have for Kafak Consumers?
The Avg and Max are computed over the (sliding) window as defined by the
configuration metrics.sample.window.ms on the server.

(Also, I will update the config and metric names to be consistent)

Regards.

On Thu, Feb 29, 2024 at 2:51 AM Jun Rao <ju...@confluent.io.invalid> wrote:

> Hi, Abhijeet,
>
> Thanks for the reply.
>
> The issue with recording the throttle time as a gauge is that it's
> transient. If the metric is not read immediately, the recorded value could
> be reset to 0. The admin won't realize that throttling has happened.
>
> For client quotas, the throttle time is tracked as the average
> throttle-time per user/client-id. This makes the metric less transient.
>
> Also, the configs use read/write whereas the metrics use fetch/copy. Could
> we make them consistent?
>
> Jun
>
> On Wed, Feb 28, 2024 at 6:49 AM Abhijeet Kumar <abhijeet.cse.kgp@gmail.com
> >
> wrote:
>
> > Hi Jun,
> >
> > Clarified the meaning of the two metrics. Also updated the KIP.
> >
> > kafka.log.remote:type=RemoteLogManager, name=RemoteFetchThrottleTime ->
> The
> > duration of time required at a given moment to bring the observed fetch
> > rate within the allowed limit, by preventing further reads.
> > kafka.log.remote:type=RemoteLogManager, name=RemoteCopyThrottleTime ->
> The
> > duration of time required at a given moment to bring the observed remote
> > copy rate within the allowed limit, by preventing further copies.
> >
> > Regards.
> >
> > On Wed, Feb 28, 2024 at 12:28 AM Jun Rao <ju...@confluent.io.invalid>
> wrote:
> >
> > > Hi, Abhijeet,
> > >
> > > Thanks for the explanation. Makes sense to me now.
> > >
> > > Just a minor comment. Could you document the exact meaning of the
> > following
> > > two metrics? For example, is the time accumulated? If so, is it from
> the
> > > start of the broker or over some window?
> > >
> > > kafka.log.remote:type=RemoteLogManager, name=RemoteFetchThrottleTime
> > > kafka.log.remote:type=RemoteLogManager, name=RemoteCopyThrottleTime
> > >
> > > Jun
> > >
> > > On Tue, Feb 27, 2024 at 1:39 AM Abhijeet Kumar <
> > abhijeet.cse.kgp@gmail.com
> > > >
> > > wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > The existing quota system for consumers is designed to throttle the
> > > > consumer if it exceeds the allowed fetch rate.
> > > > The additional quota we want to add works on the broker level. If the
> > > > broker-level remote read quota is being
> > > > exceeded, we prevent additional reads from the remote storage but do
> > not
> > > > prevent local reads for the consumer.
> > > > If the consumer has specified other partitions to read, which can be
> > > served
> > > > from local, it can continue to read those
> > > > partitions. To elaborate more, we make a check for quota exceeded if
> we
> > > > know a segment needs to be read from
> > > > remote. If the quota is exceeded, we simply skip the partition and
> move
> > > to
> > > > other segments in the fetch request.
> > > > This way consumers can continue to read the local data as long as
> they
> > > have
> > > > not exceeded the client-level quota.
> > > >
> > > > Also, when we choose the appropriate consumer-level quota, we would
> > > > typically look at what kind of local fetch
> > > > throughput is supported. If we were to reuse the same consumer quota,
> > we
> > > > should also consider the throughput
> > > > the remote storage supports. The throughput supported by remote may
> be
> > > > less/more than the throughput supported
> > > > by local (when using a cloud provider, it depends on the plan opted
> by
> > > the
> > > > user). The consumer quota has to be carefully
> > > > set considering both local and remote throughput. Instead, if we
> have a
> > > > separate quota, it makes things much simpler
> > > > for the user, since they already know what throughput their remote
> > > storage
> > > > supports.
> > > >
> > > > (Also, thanks for pointing out. I will update the KIP based on the
> > > > discussion)
> > > >
> > > > Regards,
> > > > Abhijeet.
> > > >
> > > > On Tue, Feb 27, 2024 at 2:49 AM Jun Rao <ju...@confluent.io.invalid>
> > > wrote:
> > > >
> > > > > Hi, Abhijeet,
> > > > >
> > > > > Sorry for the late reply. It seems that you haven't updated the KIP
> > > based
> > > > > on the discussion? One more comment.
> > > > >
> > > > > 11. Currently, we already have a quota system for both the
> producers
> > > and
> > > > > consumers. I can understand why we need an additional
> > > > > remote.log.manager.write.quota.default quota. For example, when
> tier
> > > > > storage is enabled for the first time, there could be a lot of
> > segments
> > > > > that need to be written to the remote storage, even though there is
> > no
> > > > > increase in the produced data. However, I am not sure about an
> > > > > additional remote.log.manager.read.quota.default. The KIP says that
> > the
> > > > > reason is "This may happen when the majority of the consumers start
> > > > reading
> > > > > from the earliest offset of their respective Kafka topics.".
> However,
> > > > this
> > > > > can happen with or without tier storage and the current quota
> system
> > > for
> > > > > consumers is designed for solving this exact problem. Could you
> > explain
> > > > the
> > > > > usage of this additional quota?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > > On Mon, Feb 12, 2024 at 11:08 AM Abhijeet Kumar <
> > > > > abhijeet.cse.kgp@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Comments inline
> > > > > >
> > > > > > On Wed, Dec 6, 2023 at 1:12 AM Jun Rao <jun@confluent.io.invalid
> >
> > > > wrote:
> > > > > >
> > > > > > > Hi, Abhijeet,
> > > > > > >
> > > > > > > Thanks for the KIP. A few comments.
> > > > > > >
> > > > > > > 10. remote.log.manager.write.quota.default:
> > > > > > > 10.1 For other configs, we
> > > > > > > use replica.alter.log.dirs.io.max.bytes.per.second. To be
> > > consistent,
> > > > > > > perhaps this can be sth like
> > > > > > remote.log.manager.write.max.bytes.per.second.
> > > > > > >
> > > > > >
> > > > > > This makes sense, we can rename the following configs to be
> > > consistent.
> > > > > >
> > > > > > Remote.log.manager.write.quota.default ->
> > > > > > remote.log.manager.write.max.bytes.per.second
> > > > > >
> > > > > > Remote.log.manager.read.quota.default ->
> > > > > > remote.log.manager.read.max.bytes.per.second.
> > > > > >
> > > > > >
> > > > > >
> > > > > > > 10.2 Could we list the new metrics associated with the new
> quota.
> > > > > > >
> > > > > >
> > > > > > We will add the following metrics as mentioned in the other
> > response.
> > > > > > *RemoteFetchThrottleTime* - The amount of time needed to bring
> the
> > > > > observed
> > > > > > remote fetch rate within the read quota
> > > > > > *RemoteCopyThrottleTime *- The amount of time needed to bring the
> > > > > observed
> > > > > > remote copy rate with the copy quota.
> > > > > >
> > > > > > 10.3 Is this dynamically configurable? If so, could we document
> the
> > > > > impact
> > > > > > > to tools like kafka-configs.sh and AdminClient?
> > > > > > >
> > > > > >
> > > > > > Yes, the quotas are dynamically configurable. We will add them as
> > > > Dynamic
> > > > > > Broker Configs. Users will be able to change
> > > > > > the following configs using either kafka-configs.sh or
> AdminClient
> > by
> > > > > > specifying the config name and new value. For eg.
> > > > > >
> > > > > > Using kafka-configs.sh
> > > > > >
> > > > > > bin/kafka-configs.sh --bootstrap-server <bootstrap-server>
> > > > --entity-type
> > > > > > brokers --entity-default --alter --add-config
> > > > > > remote.log.manager.write.max.bytes.per.second=52428800
> > > > > >
> > > > > > Using AdminClient
> > > > > >
> > > > > > ConfigEntry configEntry = new
> > > > > > ConfigEntry("remote.log.manager.write.max.bytes.per.second",
> > > > "5242800");
> > > > > > AlterConfigOp alterConfigOp = new AlterConfigOp(configEntry,
> > > > > > AlterConfigOp.OpType.SET);
> > > > > > List<AlterConfigOp> alterConfigOps =
> > > > > > Collections.singletonList(alterConfigOp);
> > > > > >
> > > > > > ConfigResource resource = new
> > > > ConfigResource(ConfigResource.Type.BROKER,
> > > > > > "");
> > > > > > Map<ConfigResource, Collection<AlterConfigOp>> updateConfig =
> > > > > > ImmutableMap.of(resource, alterConfigOps);
> > > > > > adminClient.incrementalAlterConfigs(updateConfig);
> > > > > >
> > > > > >
> > > > > > >
> > > > > > > Jun
> > > > > > >
> > > > > > > On Tue, Nov 28, 2023 at 2:19 AM Luke Chen <sh...@gmail.com>
> > > wrote:
> > > > > > >
> > > > > > > > Hi Abhijeet,
> > > > > > > >
> > > > > > > > Thanks for the KIP!
> > > > > > > > This is an important feature for tiered storage.
> > > > > > > >
> > > > > > > > Some comments:
> > > > > > > > 1. Will we introduce new metrics for this tiered storage
> > quotas?
> > > > > > > > This is important because the admin can know the throttling
> > > status
> > > > by
> > > > > > > > checking the metrics while the remote write/read are slow,
> like
> > > the
> > > > > > rate
> > > > > > > of
> > > > > > > > uploading/reading byte rate, the throttled time for
> > > upload/read...
> > > > > etc.
> > > > > > > >
> > > > > > > > 2. Could you give some examples for the throttling algorithm
> in
> > > the
> > > > > KIP
> > > > > > > to
> > > > > > > > explain it? That will make it much clearer.
> > > > > > > >
> > > > > > > > 3. To solve this problem, we can break down the RLMTask into
> > two
> > > > > > smaller
> > > > > > > > tasks - one for segment upload and the other for handling
> > expired
> > > > > > > segments.
> > > > > > > > How do we handle the situation when a segment is still
> waiting
> > > for
> > > > > > > > offloading while this segment is expired and eligible to be
> > > > deleted?
> > > > > > > > Maybe it'll be easier to not block the RLMTask when quota
> > > exceeded,
> > > > > and
> > > > > > > > just check it each time the RLMTask runs?
> > > > > > > >
> > > > > > > > Thank you.
> > > > > > > > Luke
> > > > > > > >
> > > > > > > > On Wed, Nov 22, 2023 at 6:27 PM Abhijeet Kumar <
> > > > > > > abhijeet.cse.kgp@gmail.com
> > > > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi All,
> > > > > > > > >
> > > > > > > > > I have created KIP-956 for defining read and write quota
> for
> > > > tiered
> > > > > > > > > storage.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas
> > > > > > > > >
> > > > > > > > > Feedback and suggestions are welcome.
> > > > > > > > >
> > > > > > > > > Regards,
> > > > > > > > > Abhijeet.
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Abhijeet.
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > Abhijeet.
> > > >
> > >
> >
> >
> > --
> > Abhijeet.
> >
>


-- 
Abhijeet.

Re: [DISCUSS] KIP-956: Tiered Storage Quotas

Posted by Jun Rao <ju...@confluent.io.INVALID>.
Hi, Abhijeet,

Thanks for the reply.

The issue with recording the throttle time as a gauge is that it's
transient. If the metric is not read immediately, the recorded value could
be reset to 0. The admin won't realize that throttling has happened.

For client quotas, the throttle time is tracked as the average
throttle-time per user/client-id. This makes the metric less transient.

Also, the configs use read/write whereas the metrics use fetch/copy. Could
we make them consistent?

Jun

On Wed, Feb 28, 2024 at 6:49 AM Abhijeet Kumar <ab...@gmail.com>
wrote:

> Hi Jun,
>
> Clarified the meaning of the two metrics. Also updated the KIP.
>
> kafka.log.remote:type=RemoteLogManager, name=RemoteFetchThrottleTime -> The
> duration of time required at a given moment to bring the observed fetch
> rate within the allowed limit, by preventing further reads.
> kafka.log.remote:type=RemoteLogManager, name=RemoteCopyThrottleTime -> The
> duration of time required at a given moment to bring the observed remote
> copy rate within the allowed limit, by preventing further copies.
>
> Regards.
>
> On Wed, Feb 28, 2024 at 12:28 AM Jun Rao <ju...@confluent.io.invalid> wrote:
>
> > Hi, Abhijeet,
> >
> > Thanks for the explanation. Makes sense to me now.
> >
> > Just a minor comment. Could you document the exact meaning of the
> following
> > two metrics? For example, is the time accumulated? If so, is it from the
> > start of the broker or over some window?
> >
> > kafka.log.remote:type=RemoteLogManager, name=RemoteFetchThrottleTime
> > kafka.log.remote:type=RemoteLogManager, name=RemoteCopyThrottleTime
> >
> > Jun
> >
> > On Tue, Feb 27, 2024 at 1:39 AM Abhijeet Kumar <
> abhijeet.cse.kgp@gmail.com
> > >
> > wrote:
> >
> > > Hi Jun,
> > >
> > > The existing quota system for consumers is designed to throttle the
> > > consumer if it exceeds the allowed fetch rate.
> > > The additional quota we want to add works on the broker level. If the
> > > broker-level remote read quota is being
> > > exceeded, we prevent additional reads from the remote storage but do
> not
> > > prevent local reads for the consumer.
> > > If the consumer has specified other partitions to read, which can be
> > served
> > > from local, it can continue to read those
> > > partitions. To elaborate more, we make a check for quota exceeded if we
> > > know a segment needs to be read from
> > > remote. If the quota is exceeded, we simply skip the partition and move
> > to
> > > other segments in the fetch request.
> > > This way consumers can continue to read the local data as long as they
> > have
> > > not exceeded the client-level quota.
> > >
> > > Also, when we choose the appropriate consumer-level quota, we would
> > > typically look at what kind of local fetch
> > > throughput is supported. If we were to reuse the same consumer quota,
> we
> > > should also consider the throughput
> > > the remote storage supports. The throughput supported by remote may be
> > > less/more than the throughput supported
> > > by local (when using a cloud provider, it depends on the plan opted by
> > the
> > > user). The consumer quota has to be carefully
> > > set considering both local and remote throughput. Instead, if we have a
> > > separate quota, it makes things much simpler
> > > for the user, since they already know what throughput their remote
> > storage
> > > supports.
> > >
> > > (Also, thanks for pointing out. I will update the KIP based on the
> > > discussion)
> > >
> > > Regards,
> > > Abhijeet.
> > >
> > > On Tue, Feb 27, 2024 at 2:49 AM Jun Rao <ju...@confluent.io.invalid>
> > wrote:
> > >
> > > > Hi, Abhijeet,
> > > >
> > > > Sorry for the late reply. It seems that you haven't updated the KIP
> > based
> > > > on the discussion? One more comment.
> > > >
> > > > 11. Currently, we already have a quota system for both the producers
> > and
> > > > consumers. I can understand why we need an additional
> > > > remote.log.manager.write.quota.default quota. For example, when tier
> > > > storage is enabled for the first time, there could be a lot of
> segments
> > > > that need to be written to the remote storage, even though there is
> no
> > > > increase in the produced data. However, I am not sure about an
> > > > additional remote.log.manager.read.quota.default. The KIP says that
> the
> > > > reason is "This may happen when the majority of the consumers start
> > > reading
> > > > from the earliest offset of their respective Kafka topics.". However,
> > > this
> > > > can happen with or without tier storage and the current quota system
> > for
> > > > consumers is designed for solving this exact problem. Could you
> explain
> > > the
> > > > usage of this additional quota?
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Mon, Feb 12, 2024 at 11:08 AM Abhijeet Kumar <
> > > > abhijeet.cse.kgp@gmail.com>
> > > > wrote:
> > > >
> > > > > Comments inline
> > > > >
> > > > > On Wed, Dec 6, 2023 at 1:12 AM Jun Rao <ju...@confluent.io.invalid>
> > > wrote:
> > > > >
> > > > > > Hi, Abhijeet,
> > > > > >
> > > > > > Thanks for the KIP. A few comments.
> > > > > >
> > > > > > 10. remote.log.manager.write.quota.default:
> > > > > > 10.1 For other configs, we
> > > > > > use replica.alter.log.dirs.io.max.bytes.per.second. To be
> > consistent,
> > > > > > perhaps this can be sth like
> > > > > remote.log.manager.write.max.bytes.per.second.
> > > > > >
> > > > >
> > > > > This makes sense, we can rename the following configs to be
> > consistent.
> > > > >
> > > > > Remote.log.manager.write.quota.default ->
> > > > > remote.log.manager.write.max.bytes.per.second
> > > > >
> > > > > Remote.log.manager.read.quota.default ->
> > > > > remote.log.manager.read.max.bytes.per.second.
> > > > >
> > > > >
> > > > >
> > > > > > 10.2 Could we list the new metrics associated with the new quota.
> > > > > >
> > > > >
> > > > > We will add the following metrics as mentioned in the other
> response.
> > > > > *RemoteFetchThrottleTime* - The amount of time needed to bring the
> > > > observed
> > > > > remote fetch rate within the read quota
> > > > > *RemoteCopyThrottleTime *- The amount of time needed to bring the
> > > > observed
> > > > > remote copy rate with the copy quota.
> > > > >
> > > > > 10.3 Is this dynamically configurable? If so, could we document the
> > > > impact
> > > > > > to tools like kafka-configs.sh and AdminClient?
> > > > > >
> > > > >
> > > > > Yes, the quotas are dynamically configurable. We will add them as
> > > Dynamic
> > > > > Broker Configs. Users will be able to change
> > > > > the following configs using either kafka-configs.sh or AdminClient
> by
> > > > > specifying the config name and new value. For eg.
> > > > >
> > > > > Using kafka-configs.sh
> > > > >
> > > > > bin/kafka-configs.sh --bootstrap-server <bootstrap-server>
> > > --entity-type
> > > > > brokers --entity-default --alter --add-config
> > > > > remote.log.manager.write.max.bytes.per.second=52428800
> > > > >
> > > > > Using AdminClient
> > > > >
> > > > > ConfigEntry configEntry = new
> > > > > ConfigEntry("remote.log.manager.write.max.bytes.per.second",
> > > "5242800");
> > > > > AlterConfigOp alterConfigOp = new AlterConfigOp(configEntry,
> > > > > AlterConfigOp.OpType.SET);
> > > > > List<AlterConfigOp> alterConfigOps =
> > > > > Collections.singletonList(alterConfigOp);
> > > > >
> > > > > ConfigResource resource = new
> > > ConfigResource(ConfigResource.Type.BROKER,
> > > > > "");
> > > > > Map<ConfigResource, Collection<AlterConfigOp>> updateConfig =
> > > > > ImmutableMap.of(resource, alterConfigOps);
> > > > > adminClient.incrementalAlterConfigs(updateConfig);
> > > > >
> > > > >
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > > On Tue, Nov 28, 2023 at 2:19 AM Luke Chen <sh...@gmail.com>
> > wrote:
> > > > > >
> > > > > > > Hi Abhijeet,
> > > > > > >
> > > > > > > Thanks for the KIP!
> > > > > > > This is an important feature for tiered storage.
> > > > > > >
> > > > > > > Some comments:
> > > > > > > 1. Will we introduce new metrics for this tiered storage
> quotas?
> > > > > > > This is important because the admin can know the throttling
> > status
> > > by
> > > > > > > checking the metrics while the remote write/read are slow, like
> > the
> > > > > rate
> > > > > > of
> > > > > > > uploading/reading byte rate, the throttled time for
> > upload/read...
> > > > etc.
> > > > > > >
> > > > > > > 2. Could you give some examples for the throttling algorithm in
> > the
> > > > KIP
> > > > > > to
> > > > > > > explain it? That will make it much clearer.
> > > > > > >
> > > > > > > 3. To solve this problem, we can break down the RLMTask into
> two
> > > > > smaller
> > > > > > > tasks - one for segment upload and the other for handling
> expired
> > > > > > segments.
> > > > > > > How do we handle the situation when a segment is still waiting
> > for
> > > > > > > offloading while this segment is expired and eligible to be
> > > deleted?
> > > > > > > Maybe it'll be easier to not block the RLMTask when quota
> > exceeded,
> > > > and
> > > > > > > just check it each time the RLMTask runs?
> > > > > > >
> > > > > > > Thank you.
> > > > > > > Luke
> > > > > > >
> > > > > > > On Wed, Nov 22, 2023 at 6:27 PM Abhijeet Kumar <
> > > > > > abhijeet.cse.kgp@gmail.com
> > > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi All,
> > > > > > > >
> > > > > > > > I have created KIP-956 for defining read and write quota for
> > > tiered
> > > > > > > > storage.
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas
> > > > > > > >
> > > > > > > > Feedback and suggestions are welcome.
> > > > > > > >
> > > > > > > > Regards,
> > > > > > > > Abhijeet.
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Abhijeet.
> > > > >
> > > >
> > >
> > >
> > > --
> > > Abhijeet.
> > >
> >
>
>
> --
> Abhijeet.
>

Re: [DISCUSS] KIP-956: Tiered Storage Quotas

Posted by Abhijeet Kumar <ab...@gmail.com>.
Hi Jun,

Clarified the meaning of the two metrics. Also updated the KIP.

kafka.log.remote:type=RemoteLogManager, name=RemoteFetchThrottleTime -> The
duration of time required at a given moment to bring the observed fetch
rate within the allowed limit, by preventing further reads.
kafka.log.remote:type=RemoteLogManager, name=RemoteCopyThrottleTime -> The
duration of time required at a given moment to bring the observed remote
copy rate within the allowed limit, by preventing further copies.

Regards.

On Wed, Feb 28, 2024 at 12:28 AM Jun Rao <ju...@confluent.io.invalid> wrote:

> Hi, Abhijeet,
>
> Thanks for the explanation. Makes sense to me now.
>
> Just a minor comment. Could you document the exact meaning of the following
> two metrics? For example, is the time accumulated? If so, is it from the
> start of the broker or over some window?
>
> kafka.log.remote:type=RemoteLogManager, name=RemoteFetchThrottleTime
> kafka.log.remote:type=RemoteLogManager, name=RemoteCopyThrottleTime
>
> Jun
>
> On Tue, Feb 27, 2024 at 1:39 AM Abhijeet Kumar <abhijeet.cse.kgp@gmail.com
> >
> wrote:
>
> > Hi Jun,
> >
> > The existing quota system for consumers is designed to throttle the
> > consumer if it exceeds the allowed fetch rate.
> > The additional quota we want to add works on the broker level. If the
> > broker-level remote read quota is being
> > exceeded, we prevent additional reads from the remote storage but do not
> > prevent local reads for the consumer.
> > If the consumer has specified other partitions to read, which can be
> served
> > from local, it can continue to read those
> > partitions. To elaborate more, we make a check for quota exceeded if we
> > know a segment needs to be read from
> > remote. If the quota is exceeded, we simply skip the partition and move
> to
> > other segments in the fetch request.
> > This way consumers can continue to read the local data as long as they
> have
> > not exceeded the client-level quota.
> >
> > Also, when we choose the appropriate consumer-level quota, we would
> > typically look at what kind of local fetch
> > throughput is supported. If we were to reuse the same consumer quota, we
> > should also consider the throughput
> > the remote storage supports. The throughput supported by remote may be
> > less/more than the throughput supported
> > by local (when using a cloud provider, it depends on the plan opted by
> the
> > user). The consumer quota has to be carefully
> > set considering both local and remote throughput. Instead, if we have a
> > separate quota, it makes things much simpler
> > for the user, since they already know what throughput their remote
> storage
> > supports.
> >
> > (Also, thanks for pointing out. I will update the KIP based on the
> > discussion)
> >
> > Regards,
> > Abhijeet.
> >
> > On Tue, Feb 27, 2024 at 2:49 AM Jun Rao <ju...@confluent.io.invalid>
> wrote:
> >
> > > Hi, Abhijeet,
> > >
> > > Sorry for the late reply. It seems that you haven't updated the KIP
> based
> > > on the discussion? One more comment.
> > >
> > > 11. Currently, we already have a quota system for both the producers
> and
> > > consumers. I can understand why we need an additional
> > > remote.log.manager.write.quota.default quota. For example, when tier
> > > storage is enabled for the first time, there could be a lot of segments
> > > that need to be written to the remote storage, even though there is no
> > > increase in the produced data. However, I am not sure about an
> > > additional remote.log.manager.read.quota.default. The KIP says that the
> > > reason is "This may happen when the majority of the consumers start
> > reading
> > > from the earliest offset of their respective Kafka topics.". However,
> > this
> > > can happen with or without tier storage and the current quota system
> for
> > > consumers is designed for solving this exact problem. Could you explain
> > the
> > > usage of this additional quota?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Mon, Feb 12, 2024 at 11:08 AM Abhijeet Kumar <
> > > abhijeet.cse.kgp@gmail.com>
> > > wrote:
> > >
> > > > Comments inline
> > > >
> > > > On Wed, Dec 6, 2023 at 1:12 AM Jun Rao <ju...@confluent.io.invalid>
> > wrote:
> > > >
> > > > > Hi, Abhijeet,
> > > > >
> > > > > Thanks for the KIP. A few comments.
> > > > >
> > > > > 10. remote.log.manager.write.quota.default:
> > > > > 10.1 For other configs, we
> > > > > use replica.alter.log.dirs.io.max.bytes.per.second. To be
> consistent,
> > > > > perhaps this can be sth like
> > > > remote.log.manager.write.max.bytes.per.second.
> > > > >
> > > >
> > > > This makes sense, we can rename the following configs to be
> consistent.
> > > >
> > > > Remote.log.manager.write.quota.default ->
> > > > remote.log.manager.write.max.bytes.per.second
> > > >
> > > > Remote.log.manager.read.quota.default ->
> > > > remote.log.manager.read.max.bytes.per.second.
> > > >
> > > >
> > > >
> > > > > 10.2 Could we list the new metrics associated with the new quota.
> > > > >
> > > >
> > > > We will add the following metrics as mentioned in the other response.
> > > > *RemoteFetchThrottleTime* - The amount of time needed to bring the
> > > observed
> > > > remote fetch rate within the read quota
> > > > *RemoteCopyThrottleTime *- The amount of time needed to bring the
> > > observed
> > > > remote copy rate with the copy quota.
> > > >
> > > > 10.3 Is this dynamically configurable? If so, could we document the
> > > impact
> > > > > to tools like kafka-configs.sh and AdminClient?
> > > > >
> > > >
> > > > Yes, the quotas are dynamically configurable. We will add them as
> > Dynamic
> > > > Broker Configs. Users will be able to change
> > > > the following configs using either kafka-configs.sh or AdminClient by
> > > > specifying the config name and new value. For eg.
> > > >
> > > > Using kafka-configs.sh
> > > >
> > > > bin/kafka-configs.sh --bootstrap-server <bootstrap-server>
> > --entity-type
> > > > brokers --entity-default --alter --add-config
> > > > remote.log.manager.write.max.bytes.per.second=52428800
> > > >
> > > > Using AdminClient
> > > >
> > > > ConfigEntry configEntry = new
> > > > ConfigEntry("remote.log.manager.write.max.bytes.per.second",
> > "5242800");
> > > > AlterConfigOp alterConfigOp = new AlterConfigOp(configEntry,
> > > > AlterConfigOp.OpType.SET);
> > > > List<AlterConfigOp> alterConfigOps =
> > > > Collections.singletonList(alterConfigOp);
> > > >
> > > > ConfigResource resource = new
> > ConfigResource(ConfigResource.Type.BROKER,
> > > > "");
> > > > Map<ConfigResource, Collection<AlterConfigOp>> updateConfig =
> > > > ImmutableMap.of(resource, alterConfigOps);
> > > > adminClient.incrementalAlterConfigs(updateConfig);
> > > >
> > > >
> > > > >
> > > > > Jun
> > > > >
> > > > > On Tue, Nov 28, 2023 at 2:19 AM Luke Chen <sh...@gmail.com>
> wrote:
> > > > >
> > > > > > Hi Abhijeet,
> > > > > >
> > > > > > Thanks for the KIP!
> > > > > > This is an important feature for tiered storage.
> > > > > >
> > > > > > Some comments:
> > > > > > 1. Will we introduce new metrics for this tiered storage quotas?
> > > > > > This is important because the admin can know the throttling
> status
> > by
> > > > > > checking the metrics while the remote write/read are slow, like
> the
> > > > rate
> > > > > of
> > > > > > uploading/reading byte rate, the throttled time for
> upload/read...
> > > etc.
> > > > > >
> > > > > > 2. Could you give some examples for the throttling algorithm in
> the
> > > KIP
> > > > > to
> > > > > > explain it? That will make it much clearer.
> > > > > >
> > > > > > 3. To solve this problem, we can break down the RLMTask into two
> > > > smaller
> > > > > > tasks - one for segment upload and the other for handling expired
> > > > > segments.
> > > > > > How do we handle the situation when a segment is still waiting
> for
> > > > > > offloading while this segment is expired and eligible to be
> > deleted?
> > > > > > Maybe it'll be easier to not block the RLMTask when quota
> exceeded,
> > > and
> > > > > > just check it each time the RLMTask runs?
> > > > > >
> > > > > > Thank you.
> > > > > > Luke
> > > > > >
> > > > > > On Wed, Nov 22, 2023 at 6:27 PM Abhijeet Kumar <
> > > > > abhijeet.cse.kgp@gmail.com
> > > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hi All,
> > > > > > >
> > > > > > > I have created KIP-956 for defining read and write quota for
> > tiered
> > > > > > > storage.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas
> > > > > > >
> > > > > > > Feedback and suggestions are welcome.
> > > > > > >
> > > > > > > Regards,
> > > > > > > Abhijeet.
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > Abhijeet.
> > > >
> > >
> >
> >
> > --
> > Abhijeet.
> >
>


-- 
Abhijeet.

Re: [DISCUSS] KIP-956: Tiered Storage Quotas

Posted by Jun Rao <ju...@confluent.io.INVALID>.
Hi, Abhijeet,

Thanks for the explanation. Makes sense to me now.

Just a minor comment. Could you document the exact meaning of the following
two metrics? For example, is the time accumulated? If so, is it from the
start of the broker or over some window?

kafka.log.remote:type=RemoteLogManager, name=RemoteFetchThrottleTime
kafka.log.remote:type=RemoteLogManager, name=RemoteCopyThrottleTime

Jun

On Tue, Feb 27, 2024 at 1:39 AM Abhijeet Kumar <ab...@gmail.com>
wrote:

> Hi Jun,
>
> The existing quota system for consumers is designed to throttle the
> consumer if it exceeds the allowed fetch rate.
> The additional quota we want to add works on the broker level. If the
> broker-level remote read quota is being
> exceeded, we prevent additional reads from the remote storage but do not
> prevent local reads for the consumer.
> If the consumer has specified other partitions to read, which can be served
> from local, it can continue to read those
> partitions. To elaborate more, we make a check for quota exceeded if we
> know a segment needs to be read from
> remote. If the quota is exceeded, we simply skip the partition and move to
> other segments in the fetch request.
> This way consumers can continue to read the local data as long as they have
> not exceeded the client-level quota.
>
> Also, when we choose the appropriate consumer-level quota, we would
> typically look at what kind of local fetch
> throughput is supported. If we were to reuse the same consumer quota, we
> should also consider the throughput
> the remote storage supports. The throughput supported by remote may be
> less/more than the throughput supported
> by local (when using a cloud provider, it depends on the plan opted by the
> user). The consumer quota has to be carefully
> set considering both local and remote throughput. Instead, if we have a
> separate quota, it makes things much simpler
> for the user, since they already know what throughput their remote storage
> supports.
>
> (Also, thanks for pointing out. I will update the KIP based on the
> discussion)
>
> Regards,
> Abhijeet.
>
> On Tue, Feb 27, 2024 at 2:49 AM Jun Rao <ju...@confluent.io.invalid> wrote:
>
> > Hi, Abhijeet,
> >
> > Sorry for the late reply. It seems that you haven't updated the KIP based
> > on the discussion? One more comment.
> >
> > 11. Currently, we already have a quota system for both the producers and
> > consumers. I can understand why we need an additional
> > remote.log.manager.write.quota.default quota. For example, when tier
> > storage is enabled for the first time, there could be a lot of segments
> > that need to be written to the remote storage, even though there is no
> > increase in the produced data. However, I am not sure about an
> > additional remote.log.manager.read.quota.default. The KIP says that the
> > reason is "This may happen when the majority of the consumers start
> reading
> > from the earliest offset of their respective Kafka topics.". However,
> this
> > can happen with or without tier storage and the current quota system for
> > consumers is designed for solving this exact problem. Could you explain
> the
> > usage of this additional quota?
> >
> > Thanks,
> >
> > Jun
> >
> > On Mon, Feb 12, 2024 at 11:08 AM Abhijeet Kumar <
> > abhijeet.cse.kgp@gmail.com>
> > wrote:
> >
> > > Comments inline
> > >
> > > On Wed, Dec 6, 2023 at 1:12 AM Jun Rao <ju...@confluent.io.invalid>
> wrote:
> > >
> > > > Hi, Abhijeet,
> > > >
> > > > Thanks for the KIP. A few comments.
> > > >
> > > > 10. remote.log.manager.write.quota.default:
> > > > 10.1 For other configs, we
> > > > use replica.alter.log.dirs.io.max.bytes.per.second. To be consistent,
> > > > perhaps this can be sth like
> > > remote.log.manager.write.max.bytes.per.second.
> > > >
> > >
> > > This makes sense, we can rename the following configs to be consistent.
> > >
> > > Remote.log.manager.write.quota.default ->
> > > remote.log.manager.write.max.bytes.per.second
> > >
> > > Remote.log.manager.read.quota.default ->
> > > remote.log.manager.read.max.bytes.per.second.
> > >
> > >
> > >
> > > > 10.2 Could we list the new metrics associated with the new quota.
> > > >
> > >
> > > We will add the following metrics as mentioned in the other response.
> > > *RemoteFetchThrottleTime* - The amount of time needed to bring the
> > observed
> > > remote fetch rate within the read quota
> > > *RemoteCopyThrottleTime *- The amount of time needed to bring the
> > observed
> > > remote copy rate with the copy quota.
> > >
> > > 10.3 Is this dynamically configurable? If so, could we document the
> > impact
> > > > to tools like kafka-configs.sh and AdminClient?
> > > >
> > >
> > > Yes, the quotas are dynamically configurable. We will add them as
> Dynamic
> > > Broker Configs. Users will be able to change
> > > the following configs using either kafka-configs.sh or AdminClient by
> > > specifying the config name and new value. For eg.
> > >
> > > Using kafka-configs.sh
> > >
> > > bin/kafka-configs.sh --bootstrap-server <bootstrap-server>
> --entity-type
> > > brokers --entity-default --alter --add-config
> > > remote.log.manager.write.max.bytes.per.second=52428800
> > >
> > > Using AdminClient
> > >
> > > ConfigEntry configEntry = new
> > > ConfigEntry("remote.log.manager.write.max.bytes.per.second",
> "5242800");
> > > AlterConfigOp alterConfigOp = new AlterConfigOp(configEntry,
> > > AlterConfigOp.OpType.SET);
> > > List<AlterConfigOp> alterConfigOps =
> > > Collections.singletonList(alterConfigOp);
> > >
> > > ConfigResource resource = new
> ConfigResource(ConfigResource.Type.BROKER,
> > > "");
> > > Map<ConfigResource, Collection<AlterConfigOp>> updateConfig =
> > > ImmutableMap.of(resource, alterConfigOps);
> > > adminClient.incrementalAlterConfigs(updateConfig);
> > >
> > >
> > > >
> > > > Jun
> > > >
> > > > On Tue, Nov 28, 2023 at 2:19 AM Luke Chen <sh...@gmail.com> wrote:
> > > >
> > > > > Hi Abhijeet,
> > > > >
> > > > > Thanks for the KIP!
> > > > > This is an important feature for tiered storage.
> > > > >
> > > > > Some comments:
> > > > > 1. Will we introduce new metrics for this tiered storage quotas?
> > > > > This is important because the admin can know the throttling status
> by
> > > > > checking the metrics while the remote write/read are slow, like the
> > > rate
> > > > of
> > > > > uploading/reading byte rate, the throttled time for upload/read...
> > etc.
> > > > >
> > > > > 2. Could you give some examples for the throttling algorithm in the
> > KIP
> > > > to
> > > > > explain it? That will make it much clearer.
> > > > >
> > > > > 3. To solve this problem, we can break down the RLMTask into two
> > > smaller
> > > > > tasks - one for segment upload and the other for handling expired
> > > > segments.
> > > > > How do we handle the situation when a segment is still waiting for
> > > > > offloading while this segment is expired and eligible to be
> deleted?
> > > > > Maybe it'll be easier to not block the RLMTask when quota exceeded,
> > and
> > > > > just check it each time the RLMTask runs?
> > > > >
> > > > > Thank you.
> > > > > Luke
> > > > >
> > > > > On Wed, Nov 22, 2023 at 6:27 PM Abhijeet Kumar <
> > > > abhijeet.cse.kgp@gmail.com
> > > > > >
> > > > > wrote:
> > > > >
> > > > > > Hi All,
> > > > > >
> > > > > > I have created KIP-956 for defining read and write quota for
> tiered
> > > > > > storage.
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas
> > > > > >
> > > > > > Feedback and suggestions are welcome.
> > > > > >
> > > > > > Regards,
> > > > > > Abhijeet.
> > > > > >
> > > > >
> > > >
> > >
> > >
> > > --
> > > Abhijeet.
> > >
> >
>
>
> --
> Abhijeet.
>

Re: [DISCUSS] KIP-956: Tiered Storage Quotas

Posted by Abhijeet Kumar <ab...@gmail.com>.
Hi Jun,

The existing quota system for consumers is designed to throttle the
consumer if it exceeds the allowed fetch rate.
The additional quota we want to add works on the broker level. If the
broker-level remote read quota is being
exceeded, we prevent additional reads from the remote storage but do not
prevent local reads for the consumer.
If the consumer has specified other partitions to read, which can be served
from local, it can continue to read those
partitions. To elaborate more, we make a check for quota exceeded if we
know a segment needs to be read from
remote. If the quota is exceeded, we simply skip the partition and move to
other segments in the fetch request.
This way consumers can continue to read the local data as long as they have
not exceeded the client-level quota.

Also, when we choose the appropriate consumer-level quota, we would
typically look at what kind of local fetch
throughput is supported. If we were to reuse the same consumer quota, we
should also consider the throughput
the remote storage supports. The throughput supported by remote may be
less/more than the throughput supported
by local (when using a cloud provider, it depends on the plan opted by the
user). The consumer quota has to be carefully
set considering both local and remote throughput. Instead, if we have a
separate quota, it makes things much simpler
for the user, since they already know what throughput their remote storage
supports.

(Also, thanks for pointing out. I will update the KIP based on the
discussion)

Regards,
Abhijeet.

On Tue, Feb 27, 2024 at 2:49 AM Jun Rao <ju...@confluent.io.invalid> wrote:

> Hi, Abhijeet,
>
> Sorry for the late reply. It seems that you haven't updated the KIP based
> on the discussion? One more comment.
>
> 11. Currently, we already have a quota system for both the producers and
> consumers. I can understand why we need an additional
> remote.log.manager.write.quota.default quota. For example, when tier
> storage is enabled for the first time, there could be a lot of segments
> that need to be written to the remote storage, even though there is no
> increase in the produced data. However, I am not sure about an
> additional remote.log.manager.read.quota.default. The KIP says that the
> reason is "This may happen when the majority of the consumers start reading
> from the earliest offset of their respective Kafka topics.". However, this
> can happen with or without tier storage and the current quota system for
> consumers is designed for solving this exact problem. Could you explain the
> usage of this additional quota?
>
> Thanks,
>
> Jun
>
> On Mon, Feb 12, 2024 at 11:08 AM Abhijeet Kumar <
> abhijeet.cse.kgp@gmail.com>
> wrote:
>
> > Comments inline
> >
> > On Wed, Dec 6, 2023 at 1:12 AM Jun Rao <ju...@confluent.io.invalid> wrote:
> >
> > > Hi, Abhijeet,
> > >
> > > Thanks for the KIP. A few comments.
> > >
> > > 10. remote.log.manager.write.quota.default:
> > > 10.1 For other configs, we
> > > use replica.alter.log.dirs.io.max.bytes.per.second. To be consistent,
> > > perhaps this can be sth like
> > remote.log.manager.write.max.bytes.per.second.
> > >
> >
> > This makes sense, we can rename the following configs to be consistent.
> >
> > Remote.log.manager.write.quota.default ->
> > remote.log.manager.write.max.bytes.per.second
> >
> > Remote.log.manager.read.quota.default ->
> > remote.log.manager.read.max.bytes.per.second.
> >
> >
> >
> > > 10.2 Could we list the new metrics associated with the new quota.
> > >
> >
> > We will add the following metrics as mentioned in the other response.
> > *RemoteFetchThrottleTime* - The amount of time needed to bring the
> observed
> > remote fetch rate within the read quota
> > *RemoteCopyThrottleTime *- The amount of time needed to bring the
> observed
> > remote copy rate with the copy quota.
> >
> > 10.3 Is this dynamically configurable? If so, could we document the
> impact
> > > to tools like kafka-configs.sh and AdminClient?
> > >
> >
> > Yes, the quotas are dynamically configurable. We will add them as Dynamic
> > Broker Configs. Users will be able to change
> > the following configs using either kafka-configs.sh or AdminClient by
> > specifying the config name and new value. For eg.
> >
> > Using kafka-configs.sh
> >
> > bin/kafka-configs.sh --bootstrap-server <bootstrap-server> --entity-type
> > brokers --entity-default --alter --add-config
> > remote.log.manager.write.max.bytes.per.second=52428800
> >
> > Using AdminClient
> >
> > ConfigEntry configEntry = new
> > ConfigEntry("remote.log.manager.write.max.bytes.per.second", "5242800");
> > AlterConfigOp alterConfigOp = new AlterConfigOp(configEntry,
> > AlterConfigOp.OpType.SET);
> > List<AlterConfigOp> alterConfigOps =
> > Collections.singletonList(alterConfigOp);
> >
> > ConfigResource resource = new ConfigResource(ConfigResource.Type.BROKER,
> > "");
> > Map<ConfigResource, Collection<AlterConfigOp>> updateConfig =
> > ImmutableMap.of(resource, alterConfigOps);
> > adminClient.incrementalAlterConfigs(updateConfig);
> >
> >
> > >
> > > Jun
> > >
> > > On Tue, Nov 28, 2023 at 2:19 AM Luke Chen <sh...@gmail.com> wrote:
> > >
> > > > Hi Abhijeet,
> > > >
> > > > Thanks for the KIP!
> > > > This is an important feature for tiered storage.
> > > >
> > > > Some comments:
> > > > 1. Will we introduce new metrics for this tiered storage quotas?
> > > > This is important because the admin can know the throttling status by
> > > > checking the metrics while the remote write/read are slow, like the
> > rate
> > > of
> > > > uploading/reading byte rate, the throttled time for upload/read...
> etc.
> > > >
> > > > 2. Could you give some examples for the throttling algorithm in the
> KIP
> > > to
> > > > explain it? That will make it much clearer.
> > > >
> > > > 3. To solve this problem, we can break down the RLMTask into two
> > smaller
> > > > tasks - one for segment upload and the other for handling expired
> > > segments.
> > > > How do we handle the situation when a segment is still waiting for
> > > > offloading while this segment is expired and eligible to be deleted?
> > > > Maybe it'll be easier to not block the RLMTask when quota exceeded,
> and
> > > > just check it each time the RLMTask runs?
> > > >
> > > > Thank you.
> > > > Luke
> > > >
> > > > On Wed, Nov 22, 2023 at 6:27 PM Abhijeet Kumar <
> > > abhijeet.cse.kgp@gmail.com
> > > > >
> > > > wrote:
> > > >
> > > > > Hi All,
> > > > >
> > > > > I have created KIP-956 for defining read and write quota for tiered
> > > > > storage.
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas
> > > > >
> > > > > Feedback and suggestions are welcome.
> > > > >
> > > > > Regards,
> > > > > Abhijeet.
> > > > >
> > > >
> > >
> >
> >
> > --
> > Abhijeet.
> >
>


-- 
Abhijeet.

Re: [DISCUSS] KIP-956: Tiered Storage Quotas

Posted by Jun Rao <ju...@confluent.io.INVALID>.
Hi, Abhijeet,

Sorry for the late reply. It seems that you haven't updated the KIP based
on the discussion? One more comment.

11. Currently, we already have a quota system for both the producers and
consumers. I can understand why we need an additional
remote.log.manager.write.quota.default quota. For example, when tier
storage is enabled for the first time, there could be a lot of segments
that need to be written to the remote storage, even though there is no
increase in the produced data. However, I am not sure about an
additional remote.log.manager.read.quota.default. The KIP says that the
reason is "This may happen when the majority of the consumers start reading
from the earliest offset of their respective Kafka topics.". However, this
can happen with or without tier storage and the current quota system for
consumers is designed for solving this exact problem. Could you explain the
usage of this additional quota?

Thanks,

Jun

On Mon, Feb 12, 2024 at 11:08 AM Abhijeet Kumar <ab...@gmail.com>
wrote:

> Comments inline
>
> On Wed, Dec 6, 2023 at 1:12 AM Jun Rao <ju...@confluent.io.invalid> wrote:
>
> > Hi, Abhijeet,
> >
> > Thanks for the KIP. A few comments.
> >
> > 10. remote.log.manager.write.quota.default:
> > 10.1 For other configs, we
> > use replica.alter.log.dirs.io.max.bytes.per.second. To be consistent,
> > perhaps this can be sth like
> remote.log.manager.write.max.bytes.per.second.
> >
>
> This makes sense, we can rename the following configs to be consistent.
>
> Remote.log.manager.write.quota.default ->
> remote.log.manager.write.max.bytes.per.second
>
> Remote.log.manager.read.quota.default ->
> remote.log.manager.read.max.bytes.per.second.
>
>
>
> > 10.2 Could we list the new metrics associated with the new quota.
> >
>
> We will add the following metrics as mentioned in the other response.
> *RemoteFetchThrottleTime* - The amount of time needed to bring the observed
> remote fetch rate within the read quota
> *RemoteCopyThrottleTime *- The amount of time needed to bring the observed
> remote copy rate with the copy quota.
>
> 10.3 Is this dynamically configurable? If so, could we document the impact
> > to tools like kafka-configs.sh and AdminClient?
> >
>
> Yes, the quotas are dynamically configurable. We will add them as Dynamic
> Broker Configs. Users will be able to change
> the following configs using either kafka-configs.sh or AdminClient by
> specifying the config name and new value. For eg.
>
> Using kafka-configs.sh
>
> bin/kafka-configs.sh --bootstrap-server <bootstrap-server> --entity-type
> brokers --entity-default --alter --add-config
> remote.log.manager.write.max.bytes.per.second=52428800
>
> Using AdminClient
>
> ConfigEntry configEntry = new
> ConfigEntry("remote.log.manager.write.max.bytes.per.second", "5242800");
> AlterConfigOp alterConfigOp = new AlterConfigOp(configEntry,
> AlterConfigOp.OpType.SET);
> List<AlterConfigOp> alterConfigOps =
> Collections.singletonList(alterConfigOp);
>
> ConfigResource resource = new ConfigResource(ConfigResource.Type.BROKER,
> "");
> Map<ConfigResource, Collection<AlterConfigOp>> updateConfig =
> ImmutableMap.of(resource, alterConfigOps);
> adminClient.incrementalAlterConfigs(updateConfig);
>
>
> >
> > Jun
> >
> > On Tue, Nov 28, 2023 at 2:19 AM Luke Chen <sh...@gmail.com> wrote:
> >
> > > Hi Abhijeet,
> > >
> > > Thanks for the KIP!
> > > This is an important feature for tiered storage.
> > >
> > > Some comments:
> > > 1. Will we introduce new metrics for this tiered storage quotas?
> > > This is important because the admin can know the throttling status by
> > > checking the metrics while the remote write/read are slow, like the
> rate
> > of
> > > uploading/reading byte rate, the throttled time for upload/read... etc.
> > >
> > > 2. Could you give some examples for the throttling algorithm in the KIP
> > to
> > > explain it? That will make it much clearer.
> > >
> > > 3. To solve this problem, we can break down the RLMTask into two
> smaller
> > > tasks - one for segment upload and the other for handling expired
> > segments.
> > > How do we handle the situation when a segment is still waiting for
> > > offloading while this segment is expired and eligible to be deleted?
> > > Maybe it'll be easier to not block the RLMTask when quota exceeded, and
> > > just check it each time the RLMTask runs?
> > >
> > > Thank you.
> > > Luke
> > >
> > > On Wed, Nov 22, 2023 at 6:27 PM Abhijeet Kumar <
> > abhijeet.cse.kgp@gmail.com
> > > >
> > > wrote:
> > >
> > > > Hi All,
> > > >
> > > > I have created KIP-956 for defining read and write quota for tiered
> > > > storage.
> > > >
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas
> > > >
> > > > Feedback and suggestions are welcome.
> > > >
> > > > Regards,
> > > > Abhijeet.
> > > >
> > >
> >
>
>
> --
> Abhijeet.
>

Re: [DISCUSS] KIP-956: Tiered Storage Quotas

Posted by Abhijeet Kumar <ab...@gmail.com>.
Comments inline

On Wed, Dec 6, 2023 at 1:12 AM Jun Rao <ju...@confluent.io.invalid> wrote:

> Hi, Abhijeet,
>
> Thanks for the KIP. A few comments.
>
> 10. remote.log.manager.write.quota.default:
> 10.1 For other configs, we
> use replica.alter.log.dirs.io.max.bytes.per.second. To be consistent,
> perhaps this can be sth like remote.log.manager.write.max.bytes.per.second.
>

This makes sense, we can rename the following configs to be consistent.

Remote.log.manager.write.quota.default ->
remote.log.manager.write.max.bytes.per.second

Remote.log.manager.read.quota.default ->
remote.log.manager.read.max.bytes.per.second.



> 10.2 Could we list the new metrics associated with the new quota.
>

We will add the following metrics as mentioned in the other response.
*RemoteFetchThrottleTime* - The amount of time needed to bring the observed
remote fetch rate within the read quota
*RemoteCopyThrottleTime *- The amount of time needed to bring the observed
remote copy rate with the copy quota.

10.3 Is this dynamically configurable? If so, could we document the impact
> to tools like kafka-configs.sh and AdminClient?
>

Yes, the quotas are dynamically configurable. We will add them as Dynamic
Broker Configs. Users will be able to change
the following configs using either kafka-configs.sh or AdminClient by
specifying the config name and new value. For eg.

Using kafka-configs.sh

bin/kafka-configs.sh --bootstrap-server <bootstrap-server> --entity-type
brokers --entity-default --alter --add-config
remote.log.manager.write.max.bytes.per.second=52428800

Using AdminClient

ConfigEntry configEntry = new
ConfigEntry("remote.log.manager.write.max.bytes.per.second", "5242800");
AlterConfigOp alterConfigOp = new AlterConfigOp(configEntry,
AlterConfigOp.OpType.SET);
List<AlterConfigOp> alterConfigOps =
Collections.singletonList(alterConfigOp);

ConfigResource resource = new ConfigResource(ConfigResource.Type.BROKER,
"");
Map<ConfigResource, Collection<AlterConfigOp>> updateConfig =
ImmutableMap.of(resource, alterConfigOps);
adminClient.incrementalAlterConfigs(updateConfig);


>
> Jun
>
> On Tue, Nov 28, 2023 at 2:19 AM Luke Chen <sh...@gmail.com> wrote:
>
> > Hi Abhijeet,
> >
> > Thanks for the KIP!
> > This is an important feature for tiered storage.
> >
> > Some comments:
> > 1. Will we introduce new metrics for this tiered storage quotas?
> > This is important because the admin can know the throttling status by
> > checking the metrics while the remote write/read are slow, like the rate
> of
> > uploading/reading byte rate, the throttled time for upload/read... etc.
> >
> > 2. Could you give some examples for the throttling algorithm in the KIP
> to
> > explain it? That will make it much clearer.
> >
> > 3. To solve this problem, we can break down the RLMTask into two smaller
> > tasks - one for segment upload and the other for handling expired
> segments.
> > How do we handle the situation when a segment is still waiting for
> > offloading while this segment is expired and eligible to be deleted?
> > Maybe it'll be easier to not block the RLMTask when quota exceeded, and
> > just check it each time the RLMTask runs?
> >
> > Thank you.
> > Luke
> >
> > On Wed, Nov 22, 2023 at 6:27 PM Abhijeet Kumar <
> abhijeet.cse.kgp@gmail.com
> > >
> > wrote:
> >
> > > Hi All,
> > >
> > > I have created KIP-956 for defining read and write quota for tiered
> > > storage.
> > >
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas
> > >
> > > Feedback and suggestions are welcome.
> > >
> > > Regards,
> > > Abhijeet.
> > >
> >
>


-- 
Abhijeet.

Re: [DISCUSS] KIP-956: Tiered Storage Quotas

Posted by Abhijeet Kumar <ab...@gmail.com>.
Yes, it is part of V1. I have added it now.

I will follow up on the KIP comments in the next couple of days and
try to close the review in the next few weeks.

Regards.

On Thu, Feb 1, 2024 at 7:40 PM Francois Visconte
<fr...@datadoghq.com.invalid> wrote:
>
> Hi,
>
> I see that the ticket has been left untouched since a while now.
> Should it be included in the tiered storage v1?
> We've observed that lacking a way to throttle uploads to tiered storage has
> a major impact on
> producers and consumers when tiered storage access recovers (starving disk
> IOps/throughput or CPU).
> For this reason, I think this is an important feature and possibly worth
> including in v1?
>
> Regards,
>
>
> On Tue, Dec 5, 2023 at 8:43 PM Jun Rao <ju...@confluent.io.invalid> wrote:
>
> > Hi, Abhijeet,
> >
> > Thanks for the KIP. A few comments.
> >
> > 10. remote.log.manager.write.quota.default:
> > 10.1 For other configs, we
> > use replica.alter.log.dirs.io.max.bytes.per.second. To be consistent,
> > perhaps this can be sth like remote.log.manager.write.max.bytes.per.second.
> > 10.2 Could we list the new metrics associated with the new quota.
> > 10.3 Is this dynamically configurable? If so, could we document the impact
> > to tools like kafka-configs.sh and AdminClient?
> >
> > Jun
> >
> > On Tue, Nov 28, 2023 at 2:19 AM Luke Chen <sh...@gmail.com> wrote:
> >
> > > Hi Abhijeet,
> > >
> > > Thanks for the KIP!
> > > This is an important feature for tiered storage.
> > >
> > > Some comments:
> > > 1. Will we introduce new metrics for this tiered storage quotas?
> > > This is important because the admin can know the throttling status by
> > > checking the metrics while the remote write/read are slow, like the rate
> > of
> > > uploading/reading byte rate, the throttled time for upload/read... etc.
> > >
> > > 2. Could you give some examples for the throttling algorithm in the KIP
> > to
> > > explain it? That will make it much clearer.
> > >
> > > 3. To solve this problem, we can break down the RLMTask into two smaller
> > > tasks - one for segment upload and the other for handling expired
> > segments.
> > > How do we handle the situation when a segment is still waiting for
> > > offloading while this segment is expired and eligible to be deleted?
> > > Maybe it'll be easier to not block the RLMTask when quota exceeded, and
> > > just check it each time the RLMTask runs?
> > >
> > > Thank you.
> > > Luke
> > >
> > > On Wed, Nov 22, 2023 at 6:27 PM Abhijeet Kumar <
> > abhijeet.cse.kgp@gmail.com
> > > >
> > > wrote:
> > >
> > > > Hi All,
> > > >
> > > > I have created KIP-956 for defining read and write quota for tiered
> > > > storage.
> > > >
> > > >
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas
> > > >
> > > > Feedback and suggestions are welcome.
> > > >
> > > > Regards,
> > > > Abhijeet.
> > > >
> > >
> >



-- 
Abhijeet.

Re: [DISCUSS] KIP-956: Tiered Storage Quotas

Posted by Francois Visconte <fr...@datadoghq.com.INVALID>.
Hi,

I see that the ticket has been left untouched since a while now.
Should it be included in the tiered storage v1?
We've observed that lacking a way to throttle uploads to tiered storage has
a major impact on
producers and consumers when tiered storage access recovers (starving disk
IOps/throughput or CPU).
For this reason, I think this is an important feature and possibly worth
including in v1?

Regards,


On Tue, Dec 5, 2023 at 8:43 PM Jun Rao <ju...@confluent.io.invalid> wrote:

> Hi, Abhijeet,
>
> Thanks for the KIP. A few comments.
>
> 10. remote.log.manager.write.quota.default:
> 10.1 For other configs, we
> use replica.alter.log.dirs.io.max.bytes.per.second. To be consistent,
> perhaps this can be sth like remote.log.manager.write.max.bytes.per.second.
> 10.2 Could we list the new metrics associated with the new quota.
> 10.3 Is this dynamically configurable? If so, could we document the impact
> to tools like kafka-configs.sh and AdminClient?
>
> Jun
>
> On Tue, Nov 28, 2023 at 2:19 AM Luke Chen <sh...@gmail.com> wrote:
>
> > Hi Abhijeet,
> >
> > Thanks for the KIP!
> > This is an important feature for tiered storage.
> >
> > Some comments:
> > 1. Will we introduce new metrics for this tiered storage quotas?
> > This is important because the admin can know the throttling status by
> > checking the metrics while the remote write/read are slow, like the rate
> of
> > uploading/reading byte rate, the throttled time for upload/read... etc.
> >
> > 2. Could you give some examples for the throttling algorithm in the KIP
> to
> > explain it? That will make it much clearer.
> >
> > 3. To solve this problem, we can break down the RLMTask into two smaller
> > tasks - one for segment upload and the other for handling expired
> segments.
> > How do we handle the situation when a segment is still waiting for
> > offloading while this segment is expired and eligible to be deleted?
> > Maybe it'll be easier to not block the RLMTask when quota exceeded, and
> > just check it each time the RLMTask runs?
> >
> > Thank you.
> > Luke
> >
> > On Wed, Nov 22, 2023 at 6:27 PM Abhijeet Kumar <
> abhijeet.cse.kgp@gmail.com
> > >
> > wrote:
> >
> > > Hi All,
> > >
> > > I have created KIP-956 for defining read and write quota for tiered
> > > storage.
> > >
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas
> > >
> > > Feedback and suggestions are welcome.
> > >
> > > Regards,
> > > Abhijeet.
> > >
> >
>

Re: [DISCUSS] KIP-956: Tiered Storage Quotas

Posted by Jun Rao <ju...@confluent.io.INVALID>.
Hi, Abhijeet,

Thanks for the KIP. A few comments.

10. remote.log.manager.write.quota.default:
10.1 For other configs, we
use replica.alter.log.dirs.io.max.bytes.per.second. To be consistent,
perhaps this can be sth like remote.log.manager.write.max.bytes.per.second.
10.2 Could we list the new metrics associated with the new quota.
10.3 Is this dynamically configurable? If so, could we document the impact
to tools like kafka-configs.sh and AdminClient?

Jun

On Tue, Nov 28, 2023 at 2:19 AM Luke Chen <sh...@gmail.com> wrote:

> Hi Abhijeet,
>
> Thanks for the KIP!
> This is an important feature for tiered storage.
>
> Some comments:
> 1. Will we introduce new metrics for this tiered storage quotas?
> This is important because the admin can know the throttling status by
> checking the metrics while the remote write/read are slow, like the rate of
> uploading/reading byte rate, the throttled time for upload/read... etc.
>
> 2. Could you give some examples for the throttling algorithm in the KIP to
> explain it? That will make it much clearer.
>
> 3. To solve this problem, we can break down the RLMTask into two smaller
> tasks - one for segment upload and the other for handling expired segments.
> How do we handle the situation when a segment is still waiting for
> offloading while this segment is expired and eligible to be deleted?
> Maybe it'll be easier to not block the RLMTask when quota exceeded, and
> just check it each time the RLMTask runs?
>
> Thank you.
> Luke
>
> On Wed, Nov 22, 2023 at 6:27 PM Abhijeet Kumar <abhijeet.cse.kgp@gmail.com
> >
> wrote:
>
> > Hi All,
> >
> > I have created KIP-956 for defining read and write quota for tiered
> > storage.
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas
> >
> > Feedback and suggestions are welcome.
> >
> > Regards,
> > Abhijeet.
> >
>

Re: [DISCUSS] KIP-956: Tiered Storage Quotas

Posted by Luke Chen <sh...@gmail.com>.
Hi Abhijeet,

Thanks for the KIP!
This is an important feature for tiered storage.

Some comments:
1. Will we introduce new metrics for this tiered storage quotas?
This is important because the admin can know the throttling status by
checking the metrics while the remote write/read are slow, like the rate of
uploading/reading byte rate, the throttled time for upload/read... etc.

2. Could you give some examples for the throttling algorithm in the KIP to
explain it? That will make it much clearer.

3. To solve this problem, we can break down the RLMTask into two smaller
tasks - one for segment upload and the other for handling expired segments.
How do we handle the situation when a segment is still waiting for
offloading while this segment is expired and eligible to be deleted?
Maybe it'll be easier to not block the RLMTask when quota exceeded, and
just check it each time the RLMTask runs?

Thank you.
Luke

On Wed, Nov 22, 2023 at 6:27 PM Abhijeet Kumar <ab...@gmail.com>
wrote:

> Hi All,
>
> I have created KIP-956 for defining read and write quota for tiered
> storage.
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas
>
> Feedback and suggestions are welcome.
>
> Regards,
> Abhijeet.
>