You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2020/09/09 03:04:01 UTC

[GitHub] [druid] maytasm opened a new pull request #10371: Auto-compaction snapshot status API

maytasm opened a new pull request #10371:
URL: https://github.com/apache/druid/pull/10371


   <!-- Thanks for trying to help us make Apache Druid be the best it can be! Please fill out as much of the following information as is possible (where relevant, and remove it when irrelevant) to help make the intention and scope of this PR clear in order to ease review. -->
   
   Fixes #XXXX.
   
   <!-- Replace XXXX with the id of the issue fixed in this PR. Remove this section if there is no corresponding issue. Don't reference the issue in the title of this pull-request. -->
   
   <!-- If you are a committer, follow the PR action item checklist for committers:
   https://github.com/apache/druid/blob/master/dev/committer-instructions.md#pr-and-issue-action-item-checklist-for-committers. -->
   
   ### Description
   
   <!-- Describe the goal of this PR, what problem are you fixing. If there is a corresponding issue (referenced above), it's not necessary to repeat the description here, however, you may choose to keep one summary sentence. -->
   
   <!-- Describe your patch: what did you change in code? How did you fix the problem? -->
   
   <!-- If there are several relatively logically separate changes in this PR, create a mini-section for each of them. For example: -->
   
   #### Fixed the bug ...
   #### Renamed the class ...
   #### Added a forbidden-apis entry ...
   
   <!--
   In each section, please describe design decisions made, including:
    - Choice of algorithms
    - Behavioral aspects. What configuration values are acceptable? How are corner cases and error conditions handled, such as when there are insufficient resources?
    - Class organization and design (how the logic is split between classes, inheritance, composition, design patterns)
    - Method organization and design (how the logic is split between methods, parameters and return types)
    - Naming (class, method, API, configuration, HTTP endpoint, names of emitted metrics)
   -->
   
   
   <!-- It's good to describe an alternative design (or mention an alternative name) for every design (or naming) decision point and compare the alternatives with the designs that you've implemented (or the names you've chosen) to highlight the advantages of the chosen designs and names. -->
   
   <!-- If there was a discussion of the design of the feature implemented in this PR elsewhere (e. g. a "Proposal" issue, any other issue, or a thread in the development mailing list), link to that discussion from this PR description and explain what have changed in your final design compared to your original proposal or the consensus version in the end of the discussion. If something hasn't changed since the original discussion, you can omit a detailed discussion of those aspects of the design here, perhaps apart from brief mentioning for the sake of readability of this PR description. -->
   
   <!-- Some of the aspects mentioned above may be omitted for simple and small changes. -->
   
   <hr>
   
   This PR has:
   - [ ] been self-reviewed.
      - [ ] using the [concurrency checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md) (Remove this item if the PR doesn't have any relation to concurrency.)
   - [ ] added documentation for new or modified features or behaviors.
   - [ ] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/licenses.yaml)
   - [ ] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
   - [ ] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met.
   - [ ] added integration tests.
   - [ ] been tested in a test Druid cluster.
   
   <!-- Check the items by putting "x" in the brackets for the done things. Not all of these items apply to every PR. Remove the items which are not done or not relevant to the PR. None of the items from the checklist above are strictly necessary, but it would be very helpful if you at least self-review the PR. -->
   
   <hr>
   
   ##### Key changed/added classes in this PR
    * `MyFoo`
    * `OurBar`
    * `TheirBaz`
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r490000904



##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentIterator.java
##########
@@ -19,22 +19,45 @@
 
 package org.apache.druid.server.coordinator.duty;
 
-import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
+import org.apache.druid.server.coordinator.CompactionStatistics;
 import org.apache.druid.timeline.DataSegment;
 
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Segments in the lists which are the elements of this iterator are sorted according to the natural segment order
  * (see {@link DataSegment#compareTo}).
  */
 public interface CompactionSegmentIterator extends Iterator<List<DataSegment>>

Review comment:
       we can only know the skipped and compacted segments when we are iterating the segment timeline (doSegmentsNeedCompaction method in NewestSegmentFirstIterator). This code path isn’t called by the next(). It is computed and stored as a QueueEntry into the queue. The next() method simply just poll the QueueEntry from the queue. This means that the skipped and compacted stats must be in the QueueEntry when the next() call polls the queue. We also do not store an entry into the queue if the List<DataSegment> is empty (no segments to compact). This does not fit well with having to try include the skipped and compacted stats into the QueueEntry. For example, we still need to store and aggregates the skipped and compacted stats when the actual return segment is an empty List (no more segments for the datasource).
   
   tbh I think it is best to keep it this way. Also, since this (the current way or modifying the CompactionSegmentIterator) does not change the API, we can come back and modify it later. Also since NewestSegmentFirstIterator is the only CompactionSegmentIterator right now, the cost for changing isn't too high. Maybe a good time to re-visit this is if we are going to add a new iterator and see what will work best when we have multiple implementations of CompactionSegmentIterator




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r489160868



##########
File path: server/src/main/java/org/apache/druid/server/coordinator/AutoCompactionSnapshot.java
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+import java.util.Objects;
+
+public class AutoCompactionSnapshot
+{
+  public enum AutoCompactionScheduleStatus
+  {
+    NOT_ENABLED,
+    RUNNING
+  }
+
+  @JsonProperty
+  private String dataSource;
+  @JsonProperty
+  private AutoCompactionScheduleStatus scheduleStatus;
+  @JsonProperty
+  private String latestScheduledTaskId;
+  @JsonProperty
+  private long byteCountAwaitingCompaction;
+  @JsonProperty
+  private long byteCountProcessed;
+  @JsonProperty
+  private long segmentCountAwaitingCompaction;
+  @JsonProperty
+  private long segmentCountProcessed;
+  @JsonProperty
+  private long intervalCountAwaitingCompaction;
+  @JsonProperty
+  private long intervalCountProcessed;
+
+  @JsonCreator
+  public AutoCompactionSnapshot(
+      @JsonProperty @NotNull String dataSource,
+      @JsonProperty @NotNull AutoCompactionScheduleStatus scheduleStatus
+  )
+  {
+    this.dataSource = dataSource;
+    this.scheduleStatus = scheduleStatus;
+  }
+
+  @NotNull
+  public String getDataSource()
+  {
+    return dataSource;
+  }
+
+  @NotNull
+  public AutoCompactionScheduleStatus getScheduleStatus()
+  {
+    return scheduleStatus;
+  }
+
+  @Nullable
+  public String getLatestScheduledTaskId()
+  {
+    return latestScheduledTaskId;
+  }
+
+  public long getByteCountAwaitingCompaction()
+  {
+    return byteCountAwaitingCompaction;
+  }
+
+  public long getByteCountProcessed()
+  {
+    return byteCountProcessed;
+  }
+
+  public long getSegmentCountAwaitingCompaction()
+  {
+    return segmentCountAwaitingCompaction;
+  }
+
+  public long getSegmentCountProcessed()
+  {
+    return segmentCountProcessed;
+  }
+
+  public long getIntervalCountAwaitingCompaction()
+  {
+    return intervalCountAwaitingCompaction;
+  }
+
+  public long getIntervalCountProcessed()
+  {
+    return intervalCountProcessed;
+  }
+
+  public void setScheduleStatus(AutoCompactionScheduleStatus scheduleStatus)
+  {
+    this.scheduleStatus = scheduleStatus;
+  }
+
+  public void setLatestScheduledTaskId(String latestScheduledTaskId)
+  {
+    this.latestScheduledTaskId = latestScheduledTaskId;
+  }
+
+  public void setByteCountAwaitingCompaction(long byteCountAwaitingCompaction)
+  {
+    this.byteCountAwaitingCompaction = byteCountAwaitingCompaction;
+  }
+
+  public void setByteCountProcessed(long byteCountProcessed)
+  {
+    this.byteCountProcessed = byteCountProcessed;
+  }
+
+  public void setSegmentCountAwaitingCompaction(long segmentCountAwaitingCompaction)
+  {
+    this.segmentCountAwaitingCompaction = segmentCountAwaitingCompaction;
+  }
+
+  public void setSegmentCountProcessed(long segmentCountProcessed)
+  {
+    this.segmentCountProcessed = segmentCountProcessed;
+  }
+
+  public void setIntervalCountAwaitingCompaction(long intervalCountAwaitingCompaction)
+  {
+    this.intervalCountAwaitingCompaction = intervalCountAwaitingCompaction;
+  }
+
+  public void setIntervalCountProcessed(long intervalCountProcessed)
+  {
+    this.intervalCountProcessed = intervalCountProcessed;
+  }
+
+
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    AutoCompactionSnapshot that = (AutoCompactionSnapshot) o;
+    return byteCountAwaitingCompaction == that.byteCountAwaitingCompaction &&
+           byteCountProcessed == that.byteCountProcessed &&
+           segmentCountAwaitingCompaction == that.segmentCountAwaitingCompaction &&
+           segmentCountProcessed == that.segmentCountProcessed &&
+           intervalCountAwaitingCompaction == that.intervalCountAwaitingCompaction &&
+           intervalCountProcessed == that.intervalCountProcessed &&
+           dataSource.equals(that.dataSource) &&
+           Objects.equals(scheduleStatus, that.scheduleStatus) &&
+           Objects.equals(latestScheduledTaskId, that.latestScheduledTaskId);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(
+        dataSource,
+        scheduleStatus,
+        latestScheduledTaskId,
+        byteCountAwaitingCompaction,

Review comment:
       Done. Good catch




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r487839440



##########
File path: server/src/main/java/org/apache/druid/server/coordinator/AutoCompactionSnapshot.java
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+import java.util.Objects;
+
+public class AutoCompactionSnapshot
+{
+  public enum AutoCompactionScheduleStatus
+  {
+    NOT_ENABLED,
+    RUNNING
+  }
+
+  @JsonProperty
+  private String dataSource;
+  @JsonProperty
+  private AutoCompactionScheduleStatus scheduleStatus;
+  @JsonProperty
+  private String latestScheduledTaskId;
+  @JsonProperty
+  private long byteAwaitingCompaction;
+  @JsonProperty
+  private long byteProcessed;

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jon-wei commented on pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
jon-wei commented on pull request #10371:
URL: https://github.com/apache/druid/pull/10371#issuecomment-692462764


   > I was thinking of "snapshot" as the snapshot taken at the point in time the API is called. I think "status" also works. Let me know if you still think "status" is better. I'm open to both
   
   I think "status" is better, "snapshot" could be misinterpreted as snapshotting the data itself, vs getting a snapshot of the stats


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r489225007



##########
File path: server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
##########
@@ -270,12 +275,246 @@ public String get()
     assertLastSegmentNotCompacted(compactSegments);
   }
 
+  @Test
+  public void testMakeStats()

Review comment:
       For example, if you only care about the actual compaction (i.e.the spec in the compaction is not as expected, etc) then you can focus on testRun() and ignore all the stuff in assertCompactSegmentStatistics (along with all the complicated calculation of each stats in each compaction call). 
   On the other hand if you need to debug incorrect calculation in bytesCompacted (but the actual compaction tasks are fine) then you can focus on testMakeStats()




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r489191821



##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
##########
@@ -301,13 +301,82 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
         )
     );
 
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/maxSlot/count",
+            stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/availableSlot/count",
+            stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_SIZE_OF_SEGMENTS_AWAITING_COMPACTION,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/segmentByte", count)

Review comment:
       Done

##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
##########
@@ -301,13 +301,82 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
         )
     );
 
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/maxSlot/count",
+            stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/availableSlot/count",
+            stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_SIZE_OF_SEGMENTS_AWAITING_COMPACTION,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/segmentByte", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_COUNT_OF_SEGMENTS_AWAITING_COMPACTION,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/segmentCount", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_INTERVAL_OF_SEGMENTS_AWAITING_COMPACTION,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/intervalCount", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_SIZE_OF_SEGMENTS_COMPACTED,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/compacted/segmentByte", count)

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r488385897



##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
##########
@@ -112,27 +114,38 @@
   }
 
   @Override
-  public Object2LongOpenHashMap<String> totalRemainingSegmentsSizeBytes()
+  public Map<String, CompactionStatistics> totalRemainingStatistics()
   {
-    final Object2LongOpenHashMap<String> resultMap = new Object2LongOpenHashMap<>();
-    resultMap.defaultReturnValue(UNKNOWN_TOTAL_REMAINING_SEGMENTS_SIZE);
-    for (QueueEntry entry : queue) {
-      final VersionedIntervalTimeline<String, DataSegment> timeline = dataSources.get(entry.getDataSource());
-      final Interval interval = new Interval(timeline.first().getInterval().getStart(), entry.interval.getEnd());
-
-      final List<TimelineObjectHolder<String, DataSegment>> holders = timeline.lookup(interval);
-
-      long size = 0;
-      for (DataSegment segment : FluentIterable
-          .from(holders)
-          .transformAndConcat(TimelineObjectHolder::getObject)
-          .transform(PartitionChunk::getObject)) {
-        size += segment.getSize();
-      }
+    return remainingSegments;
+  }
+
+  @Override
+  public Map<String, CompactionStatistics> totalProcessedStatistics()
+  {
+    return processedSegments;
+  }
 
-      resultMap.put(entry.getDataSource(), size);
+  @Override
+  public void flushAllSegments()
+  {
+    if (queue.isEmpty()) {
+      return;
+    }
+    QueueEntry entry;
+    while ((entry = queue.poll()) != null) {
+      final List<DataSegment> resultSegments = entry.segments;
+      final String dataSourceName = resultSegments.get(0).getDataSource();
+      // This entry was in the queue, meaning that it was not processed. Hence, also aggregates it's
+      // statistic to the remaining segments counts.
+      collectSegmentStatistics(remainingSegments, dataSourceName, new SegmentsToCompact(entry.segments));
+      final CompactibleTimelineObjectHolderCursor compactibleTimelineObjectHolderCursor = timelineIterators.get(
+          dataSourceName
+      );
+      // WARNING: This iterates the compactibleTimelineObjectHolderCursor.
+      // Since this method is intended to only be use after all necessary iteration is done on this iterator

Review comment:
       I mean that this method (`flushAllSegments`), will iterates the `compactibleTimelineObjectHolderCursor` in `iterateAllSegments` method call. This class `NewestSegmentFirstIterator` when use as a iterator in the next() method will also iterate the `compactibleTimelineObjectHolderCursor`.
   Hence, this iterator (`NewestSegmentFirstIterator) cannot be use to iterate after this method (`flushAllSegments`) is called.
   
   Basically, you cannot call `flushAllSegments` while iterating the NewestSegmentFirstIterator and you cannot call `flushAllSegments` then go back to iterating the NewestSegmentFirstIterator




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r489217716



##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
##########
@@ -336,25 +353,72 @@ private boolean needsCompaction(ClientCompactionTaskQueryTuningConfig tuningConf
    * @return segments to compact
    */
   private SegmentsToCompact findSegmentsToCompact(
+      final String dataSourceName,
       final CompactibleTimelineObjectHolderCursor compactibleTimelineObjectHolderCursor,
       final DataSourceCompactionConfig config
   )
   {
-    final long inputSegmentSize = config.getInputSegmentSizeBytes();
+    while (compactibleTimelineObjectHolderCursor.hasNext()) {
+      final SegmentsToCompact candidates = new SegmentsToCompact(compactibleTimelineObjectHolderCursor.next());
+      if (isSegmentsNeedCompact(candidates, config, true)) {
+        return candidates;
+      } else {
+        collectSegmentStatistics(processedSegments, dataSourceName, candidates);
+      }
+    }
+    log.info("All segments look good! Nothing to compact");
+    return new SegmentsToCompact();
+  }
 
+  /**
+   * Progressively iterates all remaining time intervals (latest first) in the
+   * timeline {@param compactibleTimelineObjectHolderCursor}. Note that the timeline lookup duration is one day.
+   * The logic for checking if the segments can be compacted or not is then perform on each iteration.
+   * This is repeated until no remaining time intervals in {@param compactibleTimelineObjectHolderCursor}.
+   */
+  private void iterateAllSegments(
+      final String dataSourceName,
+      final CompactibleTimelineObjectHolderCursor compactibleTimelineObjectHolderCursor,
+      final DataSourceCompactionConfig config
+  )
+  {
     while (compactibleTimelineObjectHolderCursor.hasNext()) {
       final SegmentsToCompact candidates = new SegmentsToCompact(compactibleTimelineObjectHolderCursor.next());
+      if (isSegmentsNeedCompact(candidates, config, false)) {
+        // Collect statistic for segments that need compaction
+        collectSegmentStatistics(remainingSegments, dataSourceName, candidates);
+      } else {
+        // Collect statistic for segments that does not need compaction
+        collectSegmentStatistics(processedSegments, dataSourceName, candidates);

Review comment:
       I agree. I split the processedSegments into compactedSegments and skippedSegments. Also added new fields in       autoCompactionSnapshot for the skipped byte, skipped interval count and skipped segment counts
   

##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
##########
@@ -301,13 +301,82 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
         )
     );
 
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/maxSlot/count",
+            stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/availableSlot/count",
+            stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_SIZE_OF_SEGMENTS_AWAITING_COMPACTION,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/segmentByte", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_COUNT_OF_SEGMENTS_AWAITING_COMPACTION,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/segmentCount", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_INTERVAL_OF_SEGMENTS_AWAITING_COMPACTION,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/intervalCount", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_SIZE_OF_SEGMENTS_COMPACTED,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/compacted/segmentByte", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_COUNT_OF_SEGMENTS_COMPACTED,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/compacted/segmentCount", count)

Review comment:
       Added metrics for skipped byte, skipped interval count and skipped segment counts
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r489208067



##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
##########
@@ -336,25 +353,72 @@ private boolean needsCompaction(ClientCompactionTaskQueryTuningConfig tuningConf
    * @return segments to compact
    */
   private SegmentsToCompact findSegmentsToCompact(
+      final String dataSourceName,
       final CompactibleTimelineObjectHolderCursor compactibleTimelineObjectHolderCursor,
       final DataSourceCompactionConfig config
   )
   {
-    final long inputSegmentSize = config.getInputSegmentSizeBytes();
+    while (compactibleTimelineObjectHolderCursor.hasNext()) {
+      final SegmentsToCompact candidates = new SegmentsToCompact(compactibleTimelineObjectHolderCursor.next());
+      if (isSegmentsNeedCompact(candidates, config, true)) {
+        return candidates;
+      } else {
+        collectSegmentStatistics(processedSegments, dataSourceName, candidates);
+      }
+    }
+    log.info("All segments look good! Nothing to compact");
+    return new SegmentsToCompact();
+  }
 
+  /**
+   * Progressively iterates all remaining time intervals (latest first) in the
+   * timeline {@param compactibleTimelineObjectHolderCursor}. Note that the timeline lookup duration is one day.
+   * The logic for checking if the segments can be compacted or not is then perform on each iteration.
+   * This is repeated until no remaining time intervals in {@param compactibleTimelineObjectHolderCursor}.
+   */
+  private void iterateAllSegments(
+      final String dataSourceName,
+      final CompactibleTimelineObjectHolderCursor compactibleTimelineObjectHolderCursor,
+      final DataSourceCompactionConfig config
+  )
+  {
     while (compactibleTimelineObjectHolderCursor.hasNext()) {
       final SegmentsToCompact candidates = new SegmentsToCompact(compactibleTimelineObjectHolderCursor.next());
+      if (isSegmentsNeedCompact(candidates, config, false)) {
+        // Collect statistic for segments that need compaction
+        collectSegmentStatistics(remainingSegments, dataSourceName, candidates);
+      } else {
+        // Collect statistic for segments that does not need compaction
+        collectSegmentStatistics(processedSegments, dataSourceName, candidates);
+      }
+    }
+  }
 
-      if (!candidates.isEmpty()) {
-        final boolean isCompactibleSize = candidates.getTotalSize() <= inputSegmentSize;
-        final boolean needsCompaction = needsCompaction(
-            ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()),
-            candidates
-        );
+  /**
+   * This method encapsulates the logic for checking if a given {@param candidates} needs compaction or not.
+   * If {@param logCannotCompactReason} is true then the reason for {@param candidates} not needing compaction is
+   * logged (for the case that {@param candidates} does not needs compaction).
+   *
+   * @return true if the {@param candidates} needs compaction, false if the {@param candidates} does not needs compaction
+   */
+  private boolean isSegmentsNeedCompact(

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r489160535



##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
##########
@@ -238,25 +272,102 @@ private CoordinatorStats makeStats(int numCompactionTasks, CompactionSegmentIter
   {
     final CoordinatorStats stats = new CoordinatorStats();
     stats.addToGlobalStat(COMPACTION_TASK_COUNT, numCompactionTasks);
-    totalSizesOfSegmentsAwaitingCompactionPerDataSource = iterator.totalRemainingSegmentsSizeBytes();
-    totalSizesOfSegmentsAwaitingCompactionPerDataSource.object2LongEntrySet().fastForEach(
-        entry -> {
-          final String dataSource = entry.getKey();
-          final long totalSizeOfSegmentsAwaitingCompaction = entry.getLongValue();
-          stats.addToDataSourceStat(
-              TOTAL_SIZE_OF_SEGMENTS_AWAITING_COMPACTION,
-              dataSource,
-              totalSizeOfSegmentsAwaitingCompaction
-          );
-        }
-    );
+
+    // Make sure that the iterator iterate through all the remaining segments so that we can get accurate and correct
+    // statistics (remaining, skipped, processed, etc.). The reason we have to do this explicitly here is because
+    // earlier (when we are iterating to submit compaction tasks) we may have ran out of task slot and were not able
+    // to iterate to the first segment that needs compaction for some datasource.
+    iterator.flushAllSegments();

Review comment:
       I'll add it to the Druid docs in the follow up PR. I did also add it as a comment in DruidCoordinator.java




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r487838791



##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
##########
@@ -296,18 +296,87 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
 
     emitter.emit(
         new ServiceMetricEvent.Builder().build(
-            "compact/task/count",
+            "compact/task/scheduled/count",
             stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT)
         )
     );
 
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/task/maxSlot/count",
+            stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/task/availableSlot/count",
+            stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_SIZE_OF_SEGMENTS_AWAITING_COMPACTION,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/segmentByte", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_COUNT_OF_SEGMENTS_AWAITING_COMPACTION,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/segmentCount", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_INTERVAL_OF_SEGMENTS_AWAITING_COMPACTION,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/intervalCount", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_SIZE_OF_SEGMENTS_COMPACTED,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/compacted/segmentByte", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_COUNT_OF_SEGMENTS_COMPACTED,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/compacted/segmentCount", count)
+          );
+        }
+    );
+
     stats.forEachDataSourceStat(
-        "segmentsWaitCompact",
+        CompactSegments.TOTAL_INTERVAL_OF_SEGMENTS_COMPACTED,
         (final String dataSource, final long count) -> {
           emitter.emit(
               new ServiceMetricEvent.Builder()
                   .setDimension(DruidMetrics.DATASOURCE, dataSource)
-                  .build("segment/waitCompact/count", count)
+                  .build("segment/compacted/intervalCount", count)

Review comment:
       Yep đź‘Ť 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r488984636



##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
##########
@@ -238,25 +272,102 @@ private CoordinatorStats makeStats(int numCompactionTasks, CompactionSegmentIter
   {
     final CoordinatorStats stats = new CoordinatorStats();
     stats.addToGlobalStat(COMPACTION_TASK_COUNT, numCompactionTasks);
-    totalSizesOfSegmentsAwaitingCompactionPerDataSource = iterator.totalRemainingSegmentsSizeBytes();
-    totalSizesOfSegmentsAwaitingCompactionPerDataSource.object2LongEntrySet().fastForEach(
-        entry -> {
-          final String dataSource = entry.getKey();
-          final long totalSizeOfSegmentsAwaitingCompaction = entry.getLongValue();
-          stats.addToDataSourceStat(
-              TOTAL_SIZE_OF_SEGMENTS_AWAITING_COMPACTION,
-              dataSource,
-              totalSizeOfSegmentsAwaitingCompaction
-          );
-        }
-    );
+
+    // Make sure that the iterator iterate through all the remaining segments so that we can get accurate and correct
+    // statistics (remaining, skipped, processed, etc.). The reason we have to do this explicitly here is because
+    // earlier (when we are iterating to submit compaction tasks) we may have ran out of task slot and were not able
+    // to iterate to the first segment that needs compaction for some datasource.
+    iterator.flushAllSegments();

Review comment:
       Yeah it could be expensive, but I think it's fine since the compaction period is supposed to be long enough (30 min by default). It should be documented though that compaction period should be long enough to iterate all datasources.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r490000904



##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentIterator.java
##########
@@ -19,22 +19,45 @@
 
 package org.apache.druid.server.coordinator.duty;
 
-import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
+import org.apache.druid.server.coordinator.CompactionStatistics;
 import org.apache.druid.timeline.DataSegment;
 
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Segments in the lists which are the elements of this iterator are sorted according to the natural segment order
  * (see {@link DataSegment#compareTo}).
  */
 public interface CompactionSegmentIterator extends Iterator<List<DataSegment>>

Review comment:
       we can only know the skipped and compacted segments when we are iterating the segment timeline. This code path isn’t called by the next(). It is computed and stored as a QueueEntry into the queue. next() simply just poll the QueueEntry from the queue. This means that the skipped and compacted stats must be in the QueueEntry when the next() call polls the queue. We also do not store an entry into the queue if the List<DataSegment> is empty (no segments to compact). This does not fit well with having to try include the skipped and compacted stats into the QueueEntry. For example, we still need to store and aggregates the skipped and compacted stats when the actual return segment is an empty List (no more segments for the datasource).
   
   tbh I think it is best to keep it this way. Also, since this (the current way or modifying the CompactionSegmentIterator) does not change the API, we can come back and modify it later. Also since NewestSegmentFirstIterator is the only CompactionSegmentIterator right now, the cost for changing isn't too high. Maybe a good time to re-visit this is if we are going to add a new iterator and see what will work best when we have multiple implementations of CompactionSegmentIterator




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r490000904



##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentIterator.java
##########
@@ -19,22 +19,45 @@
 
 package org.apache.druid.server.coordinator.duty;
 
-import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
+import org.apache.druid.server.coordinator.CompactionStatistics;
 import org.apache.druid.timeline.DataSegment;
 
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Segments in the lists which are the elements of this iterator are sorted according to the natural segment order
  * (see {@link DataSegment#compareTo}).
  */
 public interface CompactionSegmentIterator extends Iterator<List<DataSegment>>

Review comment:
       we can only know the skipped and compacted segments when we are iterating the segment timeline (doSegmentsNeedCompaction method in NewestSegmentFirstIterator). This code path isn’t called by the next(). It is computed and stored as a QueueEntry into the queue before the next() is called. The next() method simply just poll the QueueEntry from the queue. This means that the skipped and compacted stats must be in the QueueEntry when the next() call polls the queue. Currently, we also do not store an entry into the queue if the List<DataSegment> is empty (no segments to compact). This does not fit well with having to try include the skipped and compacted stats into the QueueEntry. For example, we still need to store and aggregates the skipped and compacted stats when the List<DataSegment> is an empty List (no more segments for the datasource).
   
   tbh I think it is best to keep it this way. Also, since this (the current way or modifying the CompactionSegmentIterator) does not change the API, we can come back and modify it later. Also since NewestSegmentFirstIterator is the only CompactionSegmentIterator right now, the cost for changing isn't too high. Maybe a good time to re-visit this is if we are going to add a new iterator and see what will work best when we have multiple implementations of CompactionSegmentIterator




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm edited a comment on pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
maytasm edited a comment on pull request #10371:
URL: https://github.com/apache/druid/pull/10371#issuecomment-690787993


   @jihoonson 
   Thanks for taking a look. Please see answers below.
   
   > * We need documents for new metrics and APIs. Are you planning to add them as a follow-up?
   
   I am planning to add the docs and integration tests as a follow-up PR. I am planning to get this change in with the implementation (current state of the PR) and unit tests (coming very soon). We are also planning to add UI for showing these metrics. Hence, this PR will unblock the UI work.
   
   > * If you don't want to add docs in this PR, would you please add descriptions on each field in the API response and each metric?
   
   Sure. I'll add it to the PR description 
   
   > * Could you define "snapshot" more precisely? Is it the status of the latest auto-compaction run? What would it be if the latest run did nothing due to lack of task slots?
   
   Snapshot refers to the statistics from the latest auto-compaction run. It is the "snapshot" of the latest auto-compaction. If there is no available slot or not enough slot to get to a particular datasource then CompactSegment (after using up all the available slot) will iterate the CompactionSegmentIterator (NewestSegmentFirstIterator) until it reached the first segment that needs compaction for all datasource. This will still allows us to get accurate statistic of all datasources. So to answer your question... "scheduleStatus" will always be update regardless of slots. "latestScheduledTaskId" can be task id from previous coordinator run if the current run did not schedule a task for this datasource. "byteCompacted"/"segmentCountCompacted"/"intervalCountCompacted" will be the same as previous run if there is no slot (meaning no compact task scheduled). "byteAwaitingCompaction"/"segmentCountAwaitingCompaction"/"intervalCountAwaitingCompaction" will be the same as previous run if th
 ere is no slot (meaning no compact task scheduled) or it may increase if there are no data ingested for the datasource between the last run and this run. Basically, "byteCompacted"/"segmentCountCompacted"/"intervalCountCompacted"/"byteAwaitingCompaction"/"segmentCountAwaitingCompaction"/"intervalCountAwaitingCompaction" are always correct statistic of the datasource at the time of the latest run even if there is no slot / no task scheduled. 
   
   > * I'm not sure `snapshot` is a good API name since it's not intuitive to me what it means. Would `status` be better?
   
   I was thinking of "snapshot" as the snapshot taken at the point in time the API is called. I think "status" also works. Let me know if you still think "status" is better. I'm open to both
   
   > * Why do you want to distinguish datasources which auto compaction has never configured and others which auto compaction has paused? Is it useful to return the same statistics for the datasources as well which auto compaction has never configured?
   
   For datasources that never has auto compaction enabled then all of these statistics are already avaiabled or can be calculated from existing APIs. "scheduleStatus" will be "NOT_ENABLED", "byteCompacted"/"segmentCountCompacted"/"intervalCountCompacted" will be 0, "byteAwaitingCompaction"/"segmentCountAwaitingCompaction"/"intervalCountAwaitingCompaction" will be total size, number of segments, number of intervals (these are available in sys.segment etc.). "latestScheduledTaskId" will be null. So, not including those to reduce unnesseary computation in the API and reduce payload size.
   Although, I can see one case that this will not be true which is for datasource that never has auto compaction enabled but has manual compaction task ran. Do you think it is common and useful for datasource that never has auto compaction enabled but has manual compaction task? (Initially, this PR only aims to make auto compation more user-friendly and not really care about the manual compaction stuff)
   
   > * Why does the response include only one task ID? What will happen if auto compaction issues multiple compaction tasks?
   
   Hmm... I am thinking of having the UI component show the status (fail, success) of the latest task which could indicate if user action is requried or not.
   Another idea may be to return a list of all tasks issued during this run (and empty list if no slot). The UI can select which task it want to show. The UI can choose the last task ID or it can calcualte success vs fail % rate of all tasks in last run.
   
   > * Please list out new metrics in the PR description. It will help release manager.
   
   Sure. I'll add it to the PR description 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r490712432



##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
##########
@@ -301,13 +301,115 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
         )
     );
 
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/maxSlot/count",

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r490712478



##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
##########
@@ -301,13 +301,115 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
         )
     );
 
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/maxSlot/count",
+            stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/availableSlot/count",

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r486569512



##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
##########
@@ -296,18 +296,87 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
 
     emitter.emit(
         new ServiceMetricEvent.Builder().build(
-            "compact/task/count",
+            "compact/task/scheduled/count",
             stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT)
         )
     );
 
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/task/maxSlot/count",
+            stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/task/availableSlot/count",
+            stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_SIZE_OF_SEGMENTS_AWAITING_COMPACTION,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/segmentByte", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_COUNT_OF_SEGMENTS_AWAITING_COMPACTION,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/segmentCount", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_INTERVAL_OF_SEGMENTS_AWAITING_COMPACTION,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/intervalCount", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_SIZE_OF_SEGMENTS_COMPACTED,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/compacted/segmentByte", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_COUNT_OF_SEGMENTS_COMPACTED,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/compacted/segmentCount", count)
+          );
+        }
+    );
+
     stats.forEachDataSourceStat(
-        "segmentsWaitCompact",
+        CompactSegments.TOTAL_INTERVAL_OF_SEGMENTS_COMPACTED,
         (final String dataSource, final long count) -> {
           emitter.emit(
               new ServiceMetricEvent.Builder()
                   .setDimension(DruidMetrics.DATASOURCE, dataSource)
-                  .build("segment/waitCompact/count", count)
+                  .build("segment/compacted/intervalCount", count)

Review comment:
       Even though this changes the metric name, it seems fine since this metrics has never been emitted because of the metric name mismatch (`segmentsWaitCompact` and `segmentSizeWaitCompact`).

##########
File path: server/src/main/java/org/apache/druid/server/coordinator/AutoCompactionSnapshot.java
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+import java.util.Objects;
+
+public class AutoCompactionSnapshot
+{
+  public enum AutoCompactionScheduleStatus
+  {
+    NOT_ENABLED,
+    RUNNING
+  }
+
+  @JsonProperty
+  private String dataSource;
+  @JsonProperty
+  private AutoCompactionScheduleStatus scheduleStatus;
+  @JsonProperty
+  private String latestScheduledTaskId;
+  @JsonProperty
+  private long byteAwaitingCompaction;
+  @JsonProperty
+  private long byteProcessed;

Review comment:
       Why not `byteCountProcessed` to align with other metrics?

##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
##########
@@ -296,18 +296,87 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
 
     emitter.emit(
         new ServiceMetricEvent.Builder().build(
-            "compact/task/count",
+            "compact/task/scheduled/count",

Review comment:
       We shouldn't change the metric name for backwards compatibility.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r490103518



##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentIterator.java
##########
@@ -19,22 +19,45 @@
 
 package org.apache.druid.server.coordinator.duty;
 
-import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
+import org.apache.druid.server.coordinator.CompactionStatistics;
 import org.apache.druid.timeline.DataSegment;
 
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Segments in the lists which are the elements of this iterator are sorted according to the natural segment order
  * (see {@link DataSegment#compareTo}).
  */
 public interface CompactionSegmentIterator extends Iterator<List<DataSegment>>

Review comment:
       Actually, I have another idea which I really like and I think works well + clean + simple (I hope you like it too).
   
   Removed flushAllSegments() and totalRemainingStatistics()  from the CompactionSegmentIterator. Keep totalCompactedStatistics() and totalSkippedStatistics() in the CompactionSegmentIterator . The stats for totalCompactedStatistics() and totalSkippedStatistics()  is maintain and stored in the CompactionSegmentIterator  (NewestSegmentFirstIterator). I think this makes perfect sense since the already compacted and skipped segments are never returned from the iterator’s next() (in fact, it’s never even put into the queue that backs the iterator). Hence, as we iterate through the already compacted and skipped segments, we should just aggregates the stats within the CompactionSegmentIterator . The totalCompactedStatistics() and totalSkippedStatistics()  will simply just returns the aggregates up to the current point of the iterator being iterated. We will then simply just iterate the iterator as before when creating the compaction task. The List<DataSegment> that is returned is then 
 aggregates into a map in CompactSegments. Similarly, we will continue to iterate the iterator in makeStats after we run out of task slot, then the List<DataSegment> that is returned is then aggregates into a different map in CompactSegments
   
   There are Stats map maintained in both CompactSegments  and CompactionSegmentIterator . But I think it’s much clearer and we separate the “skipped/already compacted” from the “segments returned by the iterator”. Also, no complex contracts in the CompactionSegmentIterator . Login in CompactionSegmentIterator  for totalCompactedStatistics() and totalSkippedStatistics()  is also very simple.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r490710835



##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
##########
@@ -234,29 +294,174 @@ private CoordinatorStats doRun(
     return newContext;
   }
 
-  private CoordinatorStats makeStats(int numCompactionTasks, CompactionSegmentIterator iterator)
+  /**
+   * This method can be use to atomically update the snapshots in {@code autoCompactionSnapshotPerDataSource} when
+   * no compaction task is schedule in this run. Currently, this method does not update compaction statistics
+   * (bytes, interval count, segment count, etc) since we skip iterating through the segments and cannot get an update
+   * on those statistics. Thus, this method only updates the schedule status and task list (compaction statistics
+   * remains the same as the previous snapshot).
+   */
+  private void updateAutoCompactionSnapshotWhenNoCompactTaskScheduled(
+      Map<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders
+  )
+  {
+    Map<String, AutoCompactionSnapshot> previousSnapshots = autoCompactionSnapshotPerDataSource.get();
+    for (Map.Entry<String, AutoCompactionSnapshot.Builder> autoCompactionSnapshotBuilderEntry : currentRunAutoCompactionSnapshotBuilders.entrySet()) {
+      final String dataSource = autoCompactionSnapshotBuilderEntry.getKey();
+      AutoCompactionSnapshot previousSnapshot = previousSnapshots.get(dataSource);
+      if (previousSnapshot != null) {
+        autoCompactionSnapshotBuilderEntry.getValue().incrementBytesAwaitingCompaction(previousSnapshot.getBytesAwaitingCompaction());
+        autoCompactionSnapshotBuilderEntry.getValue().incrementBytesCompacted(previousSnapshot.getBytesCompacted());
+        autoCompactionSnapshotBuilderEntry.getValue().incrementBytesSkipped(previousSnapshot.getBytesSkipped());
+        autoCompactionSnapshotBuilderEntry.getValue().incrementSegmentCountAwaitingCompaction(previousSnapshot.getSegmentCountAwaitingCompaction());
+        autoCompactionSnapshotBuilderEntry.getValue().incrementSegmentCountCompacted(previousSnapshot.getSegmentCountCompacted());
+        autoCompactionSnapshotBuilderEntry.getValue().incrementSegmentCountSkipped(previousSnapshot.getSegmentCountSkipped());
+        autoCompactionSnapshotBuilderEntry.getValue().incrementIntervalCountAwaitingCompaction(previousSnapshot.getIntervalCountAwaitingCompaction());
+        autoCompactionSnapshotBuilderEntry.getValue().incrementIntervalCountCompacted(previousSnapshot.getIntervalCountCompacted());
+        autoCompactionSnapshotBuilderEntry.getValue().incrementIntervalCountSkipped(previousSnapshot.getIntervalCountSkipped());
+      }
+    }
+
+    Map<String, AutoCompactionSnapshot> currentAutoCompactionSnapshotPerDataSource = Maps.transformValues(
+        currentRunAutoCompactionSnapshotBuilders,
+        AutoCompactionSnapshot.Builder::build
+    );
+    // Atomic update of autoCompactionSnapshotPerDataSource with the latest from this coordinator run
+    autoCompactionSnapshotPerDataSource.set(currentAutoCompactionSnapshotPerDataSource);
+  }
+
+  private CoordinatorStats makeStats(
+      Map<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders,
+      int numCompactionTasks,
+      CompactionSegmentIterator iterator
+  )
   {
+    final Map<String, AutoCompactionSnapshot> currentAutoCompactionSnapshotPerDataSource = new HashMap<>();
     final CoordinatorStats stats = new CoordinatorStats();
     stats.addToGlobalStat(COMPACTION_TASK_COUNT, numCompactionTasks);
-    totalSizesOfSegmentsAwaitingCompactionPerDataSource = iterator.totalRemainingSegmentsSizeBytes();
-    totalSizesOfSegmentsAwaitingCompactionPerDataSource.object2LongEntrySet().fastForEach(
-        entry -> {
-          final String dataSource = entry.getKey();
-          final long totalSizeOfSegmentsAwaitingCompaction = entry.getLongValue();
-          stats.addToDataSourceStat(
-              TOTAL_SIZE_OF_SEGMENTS_AWAITING_COMPACTION,
-              dataSource,
-              totalSizeOfSegmentsAwaitingCompaction
-          );
-        }
-    );
+
+    // Iterate through all the remaining segments in the iterator.
+    // As these segments could be compacted but were not compacted due to lack of task slot, we will aggregates
+    // the statistic to the AwaitingCompaction statistics
+    for (; iterator.hasNext();) {

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r489245349



##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentIterator.java
##########
@@ -19,22 +19,45 @@
 
 package org.apache.druid.server.coordinator.duty;
 
-import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
+import org.apache.druid.server.coordinator.CompactionStatistics;
 import org.apache.druid.timeline.DataSegment;
 
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Segments in the lists which are the elements of this iterator are sorted according to the natural segment order
  * (see {@link DataSegment#compareTo}).
  */
 public interface CompactionSegmentIterator extends Iterator<List<DataSegment>>

Review comment:
       > > > * Why does the response include only one task ID? What will happen if auto compaction issues multiple compaction tasks?
   > > 
   > > 
   > > Hmm... I am thinking of having the UI component show the status (fail, success) of the latest task which could indicate if user action is requried or not.
   > > Another idea may be to return a list of all tasks issued during this run (and empty list if no slot). The UI can select which task it want to show. The UI can choose the last task ID or it can calcualte success vs fail % rate of all tasks in last run.
   > 
   > Hmm, I'm not sure what would be a use case of such information of the tasks issued in each run. Would it be more useful to know how many compaction task failures there have been recently (over a couple of runs)?
   
   I just realized that we not have a different prefix to task name when they are issued manually vs issued automatically by the coordinator (auto compaction). In this case, returning the last task scheduled by auto compaction is not necessary as we can use the existing task api to get the latest task that has the `coordinator-issue` prefix in the name. In fact, we can sort by time and get a history of all the tasks issued by the auto compaction. Thus, the existing task API can already answer question like how many compaction task failures there have been recently (over some time period like hours, days, etc.). 
   
   The information that currently cannot be retrieved is a list for all the tasks issue in the same / latest coordinator run. That provides information you cannot get currently with the ingestion tab since we have no idea which tasks issued in the same run. This allows us to know things like % of tasks succeeded vs. failed vs. canceled in the last run. Moreover, we can then find out  % of tasks succeeded vs. failed vs. canceled in the last run vs previous run (or any other runs). This allows user to see if changes they made (change to configs, specs, etc) between the runs fix any previous issues they were seeing. 
   For example...
   - run 1 have 100% failed and 0 % success, then user go fix something and now run 2 have 100% success and 0% failed. They can see that the fix they made solve the problem. 
   -  run 1 have 100% failed and 0 % success, then user go fix something and now run 2 have 0% success and 100% failed. They can see that the fix did not solve the problem. 
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r488390448



##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
##########
@@ -238,25 +272,102 @@ private CoordinatorStats makeStats(int numCompactionTasks, CompactionSegmentIter
   {
     final CoordinatorStats stats = new CoordinatorStats();
     stats.addToGlobalStat(COMPACTION_TASK_COUNT, numCompactionTasks);
-    totalSizesOfSegmentsAwaitingCompactionPerDataSource = iterator.totalRemainingSegmentsSizeBytes();
-    totalSizesOfSegmentsAwaitingCompactionPerDataSource.object2LongEntrySet().fastForEach(
-        entry -> {
-          final String dataSource = entry.getKey();
-          final long totalSizeOfSegmentsAwaitingCompaction = entry.getLongValue();
-          stats.addToDataSourceStat(
-              TOTAL_SIZE_OF_SEGMENTS_AWAITING_COMPACTION,
-              dataSource,
-              totalSizeOfSegmentsAwaitingCompaction
-          );
-        }
-    );
+
+    // Make sure that the iterator iterate through all the remaining segments so that we can get accurate and correct
+    // statistics (remaining, skipped, processed, etc.). The reason we have to do this explicitly here is because
+    // earlier (when we are iterating to submit compaction tasks) we may have ran out of task slot and were not able
+    // to iterate to the first segment that needs compaction for some datasource.
+    iterator.flushAllSegments();

Review comment:
       I also thought of having two implementation of statistic calculation. We can have the old method which is just a sum of all segments from the earliest to the segment that we iterated up to. We can call this approximation and the downside is that it can overestimate if there are compacted intervals between the  earliest to the segment that we iterated up to. Then we can have this new implementation which is an exact calculation (i.e. we check each and every interval). Each datasource can choose which implementation to use for its stats calculation.
   
   However, I discussed this with @jihoonson and we think that this new implementation is actually ok to use as a default for all datasources. This code is run as part of Coordinator duty hence it is not on critical path and there is no client waiting, etc. While it is going through all segments, it is doing this at a interval level (we do batch of segments for each interval at a time). Thus, the looping is for every interval of every datasources (not every segments). Also, the auto compaction is run every 30 mins by default. This should be plenty of time for this code to finish before the next cycle begins. 
   Furthermore, we can move the auto compaction duty to the last of the set of Coordinator duty and change the scheduling of the Coordinator duty from scheduleWithFixedDelay to scheduleWithFixedRate. This should ensures that the other duty still get run at every 30 mins (assuming the auto compaction doesn't go pass 30 mins which I doubt it will).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r488392899



##########
File path: server/src/main/java/org/apache/druid/server/coordinator/AutoCompactionSnapshot.java
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+import java.util.Objects;
+
+public class AutoCompactionSnapshot
+{
+  public enum AutoCompactionScheduleStatus
+  {
+    NOT_ENABLED,
+    RUNNING
+  }
+
+  @JsonProperty
+  private String dataSource;
+  @JsonProperty
+  private AutoCompactionScheduleStatus scheduleStatus;
+  @JsonProperty
+  private String latestScheduledTaskId;
+  @JsonProperty
+  private long byteCountAwaitingCompaction;

Review comment:
       Im fine either way. "byteCount" matches it with the other Counts 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
jihoonson commented on pull request #10371:
URL: https://github.com/apache/druid/pull/10371#issuecomment-692991796


   > > * Why does the response include only one task ID? What will happen if auto compaction issues multiple compaction tasks?
   > 
   > Hmm... I am thinking of having the UI component show the status (fail, success) of the latest task which could indicate if user action is requried or not.
   > Another idea may be to return a list of all tasks issued during this run (and empty list if no slot). The UI can select which task it want to show. The UI can choose the last task ID or it can calcualte success vs fail % rate of all tasks in last run.
   
   Hmm, I'm not sure what would be a use case of such information of the tasks issued in each run. Would it be more useful to know how many compaction task failures there have been recently (over a couple of runs)?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r488392405



##########
File path: server/src/main/java/org/apache/druid/server/coordinator/AutoCompactionSnapshot.java
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+import java.util.Objects;
+
+public class AutoCompactionSnapshot
+{
+  public enum AutoCompactionScheduleStatus
+  {
+    NOT_ENABLED,
+    RUNNING
+  }
+
+  @JsonProperty
+  private String dataSource;
+  @JsonProperty
+  private AutoCompactionScheduleStatus scheduleStatus;
+  @JsonProperty
+  private String latestScheduledTaskId;
+  @JsonProperty
+  private long byteCountAwaitingCompaction;

Review comment:
       Initially I had it as "bytes" but @jihoonson suggested "byteCount".




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r490712828



##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
##########
@@ -301,13 +301,115 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
         )
     );
 
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/maxSlot/count",
+            stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/availableSlot/count",
+            stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_SIZE_OF_SEGMENTS_AWAITING,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/segmentBytes", count)

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r490711206



##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
##########
@@ -61,7 +77,7 @@
   private final CompactionSegmentSearchPolicy policy;
   private final IndexingServiceClient indexingServiceClient;
 
-  private Object2LongOpenHashMap<String> totalSizesOfSegmentsAwaitingCompactionPerDataSource;
+  private AtomicReference<Map<String, AutoCompactionSnapshot>> autoCompactionSnapshotPerDataSource = new AtomicReference<>();

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r490103518



##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentIterator.java
##########
@@ -19,22 +19,45 @@
 
 package org.apache.druid.server.coordinator.duty;
 
-import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
+import org.apache.druid.server.coordinator.CompactionStatistics;
 import org.apache.druid.timeline.DataSegment;
 
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Segments in the lists which are the elements of this iterator are sorted according to the natural segment order
  * (see {@link DataSegment#compareTo}).
  */
 public interface CompactionSegmentIterator extends Iterator<List<DataSegment>>

Review comment:
       Actually, I have another idea which I really like and I think works well + clean + simple (I hope you like it too).
   
   Removed `flushAllSegments()` and `totalRemainingStatistics()`  from the `CompactionSegmentIterator`. Keep `totalCompactedStatistics()` and `totalSkippedStatistics()` in the `CompactionSegmentIterator`. The stats for `totalCompactedStatistics()` and `totalSkippedStatistics()`  is maintain and stored in the `CompactionSegmentIterator` (`NewestSegmentFirstIterator`). I think this makes perfect sense since the "already compacted" and "skipped segments" are never returned from the iterator’s `next()` (in fact, it’s not even put into the queue that backs the iterator). Hence, as we iterate through the already compacted and skipped segments, we should just aggregates the stats within the `CompactionSegmentIterator` and keep it within the `CompactionSegmentIterator`. The `totalCompactedStatistics()` and `totalSkippedStatistics()`  will simply just returns the aggregates up to the current point of the iterator being iterated. We will then simply just iterate the iterator as before when
  creating the compaction task and the List<DataSegment> that is returned by the iterator is then aggregates into the `AutoCompactionSnapshot.Builder`'s Compacted stats (this is in CompactSegments). Similarly, we will continue to iterate the iterator in `makeStats()` (after we run out of task slot), then the List<DataSegment> that is returned by the iterator is then aggregates into `AutoCompactionSnapshot.Builder`'s Remaining stats (this is in CompactSegments).
   
   There are Stats map maintained in both CompactSegments  and CompactionSegmentIterator . But I think it’s much clearer and we separate the “skipped/already compacted” from the “segments returned by the iterator”. Also, no complex contracts in the CompactionSegmentIterator . Login in CompactionSegmentIterator  for totalCompactedStatistics() and totalSkippedStatistics()  is also very simple.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jon-wei commented on a change in pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
jon-wei commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r488322171



##########
File path: server/src/main/java/org/apache/druid/server/coordinator/AutoCompactionSnapshot.java
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+import java.util.Objects;
+
+public class AutoCompactionSnapshot
+{
+  public enum AutoCompactionScheduleStatus
+  {
+    NOT_ENABLED,
+    RUNNING
+  }
+
+  @JsonProperty
+  private String dataSource;
+  @JsonProperty
+  private AutoCompactionScheduleStatus scheduleStatus;
+  @JsonProperty
+  private String latestScheduledTaskId;
+  @JsonProperty
+  private long byteCountAwaitingCompaction;

Review comment:
       suggest just "bytes" instead of "byteCount" for the property name

##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
##########
@@ -112,27 +114,38 @@
   }
 
   @Override
-  public Object2LongOpenHashMap<String> totalRemainingSegmentsSizeBytes()
+  public Map<String, CompactionStatistics> totalRemainingStatistics()
   {
-    final Object2LongOpenHashMap<String> resultMap = new Object2LongOpenHashMap<>();
-    resultMap.defaultReturnValue(UNKNOWN_TOTAL_REMAINING_SEGMENTS_SIZE);
-    for (QueueEntry entry : queue) {
-      final VersionedIntervalTimeline<String, DataSegment> timeline = dataSources.get(entry.getDataSource());
-      final Interval interval = new Interval(timeline.first().getInterval().getStart(), entry.interval.getEnd());
-
-      final List<TimelineObjectHolder<String, DataSegment>> holders = timeline.lookup(interval);
-
-      long size = 0;
-      for (DataSegment segment : FluentIterable
-          .from(holders)
-          .transformAndConcat(TimelineObjectHolder::getObject)
-          .transform(PartitionChunk::getObject)) {
-        size += segment.getSize();
-      }
+    return remainingSegments;
+  }
+
+  @Override
+  public Map<String, CompactionStatistics> totalProcessedStatistics()
+  {
+    return processedSegments;
+  }
 
-      resultMap.put(entry.getDataSource(), size);
+  @Override
+  public void flushAllSegments()
+  {
+    if (queue.isEmpty()) {
+      return;
+    }
+    QueueEntry entry;
+    while ((entry = queue.poll()) != null) {
+      final List<DataSegment> resultSegments = entry.segments;
+      final String dataSourceName = resultSegments.get(0).getDataSource();
+      // This entry was in the queue, meaning that it was not processed. Hence, also aggregates it's
+      // statistic to the remaining segments counts.
+      collectSegmentStatistics(remainingSegments, dataSourceName, new SegmentsToCompact(entry.segments));
+      final CompactibleTimelineObjectHolderCursor compactibleTimelineObjectHolderCursor = timelineIterators.get(
+          dataSourceName
+      );
+      // WARNING: This iterates the compactibleTimelineObjectHolderCursor.
+      // Since this method is intended to only be use after all necessary iteration is done on this iterator

Review comment:
       I don't think I understand this comment, if all iteration is done (by that do you mean `compactibleTimelineObjectHolderCursor.hasNext` returns false?), then iterateAllSegments would do nothing.

##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
##########
@@ -238,25 +272,102 @@ private CoordinatorStats makeStats(int numCompactionTasks, CompactionSegmentIter
   {
     final CoordinatorStats stats = new CoordinatorStats();
     stats.addToGlobalStat(COMPACTION_TASK_COUNT, numCompactionTasks);
-    totalSizesOfSegmentsAwaitingCompactionPerDataSource = iterator.totalRemainingSegmentsSizeBytes();
-    totalSizesOfSegmentsAwaitingCompactionPerDataSource.object2LongEntrySet().fastForEach(
-        entry -> {
-          final String dataSource = entry.getKey();
-          final long totalSizeOfSegmentsAwaitingCompaction = entry.getLongValue();
-          stats.addToDataSourceStat(
-              TOTAL_SIZE_OF_SEGMENTS_AWAITING_COMPACTION,
-              dataSource,
-              totalSizeOfSegmentsAwaitingCompaction
-          );
-        }
-    );
+
+    // Make sure that the iterator iterate through all the remaining segments so that we can get accurate and correct
+    // statistics (remaining, skipped, processed, etc.). The reason we have to do this explicitly here is because
+    // earlier (when we are iterating to submit compaction tasks) we may have ran out of task slot and were not able
+    // to iterate to the first segment that needs compaction for some datasource.
+    iterator.flushAllSegments();

Review comment:
       Are there any concerns with performance overhead from this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r489275214



##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentIterator.java
##########
@@ -19,22 +19,45 @@
 
 package org.apache.druid.server.coordinator.duty;
 
-import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
+import org.apache.druid.server.coordinator.CompactionStatistics;
 import org.apache.druid.timeline.DataSegment;
 
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Segments in the lists which are the elements of this iterator are sorted according to the natural segment order
  * (see {@link DataSegment#compareTo}).
  */
 public interface CompactionSegmentIterator extends Iterator<List<DataSegment>>

Review comment:
       I don't understand the proposed alternative. What would the `next(Map<String, CompactionStatistics> stats)` do? What map is the `appropriate map` that `CompactSegments` will pass to this method?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm merged pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
maytasm merged pull request #10371:
URL: https://github.com/apache/druid/pull/10371


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r490712589



##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
##########
@@ -301,13 +301,115 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
         )
     );
 
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/maxSlot/count",
+            stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/availableSlot/count",
+            stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_SIZE_OF_SEGMENTS_AWAITING,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/segmentBytes", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_COUNT_OF_SEGMENTS_AWAITING,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/segmentCount", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_INTERVAL_OF_SEGMENTS_AWAITING,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/intervalCount", count)

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
maytasm commented on pull request #10371:
URL: https://github.com/apache/druid/pull/10371#issuecomment-692471451


   > > I was thinking of "snapshot" as the snapshot taken at the point in time the API is called. I think "status" also works. Let me know if you still think "status" is better. I'm open to both
   > 
   > I think "status" is better, "snapshot" could be misinterpreted as snapshotting the data itself, vs getting a snapshot of the stats
   
   Changed the API endpoint to "status"


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r489161421



##########
File path: server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
##########
@@ -632,8 +643,9 @@ private void stopBeingLeader()
   {
     List<CoordinatorDuty> duties = new ArrayList<>();
     duties.add(new LogUsedSegments());
-    duties.addAll(makeCompactSegmentsDuty());
     duties.addAll(indexingServiceDuties);
+    // CompactSegmentsDuty should be the last duty as it can takea long time

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r490000904



##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentIterator.java
##########
@@ -19,22 +19,45 @@
 
 package org.apache.druid.server.coordinator.duty;
 
-import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
+import org.apache.druid.server.coordinator.CompactionStatistics;
 import org.apache.druid.timeline.DataSegment;
 
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Segments in the lists which are the elements of this iterator are sorted according to the natural segment order
  * (see {@link DataSegment#compareTo}).
  */
 public interface CompactionSegmentIterator extends Iterator<List<DataSegment>>

Review comment:
       we can only know the skipped and compacted segments when we are iterating the segment timeline (doSegmentsNeedCompaction method in NewestSegmentFirstIterator). This code path isn’t called by the next(). It is computed and stored as a QueueEntry into the queue before the next() is called. The next() method simply just poll the QueueEntry from the queue. This means that the skipped and compacted stats must be in the QueueEntry when the next() call polls the queue. We also do not store an entry into the queue if the List<DataSegment> is empty (no segments to compact). This does not fit well with having to try include the skipped and compacted stats into the QueueEntry. For example, we still need to store and aggregates the skipped and compacted stats when the actual return segment is an empty List (no more segments for the datasource).
   
   tbh I think it is best to keep it this way. Also, since this (the current way or modifying the CompactionSegmentIterator) does not change the API, we can come back and modify it later. Also since NewestSegmentFirstIterator is the only CompactionSegmentIterator right now, the cost for changing isn't too high. Maybe a good time to re-visit this is if we are going to add a new iterator and see what will work best when we have multiple implementations of CompactionSegmentIterator




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r490711502



##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
##########
@@ -301,13 +301,115 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
         )
     );
 
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/maxSlot/count",
+            stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/availableSlot/count",
+            stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_SIZE_OF_SEGMENTS_AWAITING,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/segmentBytes", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_COUNT_OF_SEGMENTS_AWAITING,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/segmentCount", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_INTERVAL_OF_SEGMENTS_AWAITING,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/intervalCount", count)

Review comment:
       yep. that make much more sense.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r489216664



##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
##########
@@ -301,13 +301,82 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
         )
     );
 
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/maxSlot/count",
+            stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/availableSlot/count",
+            stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_SIZE_OF_SEGMENTS_AWAITING_COMPACTION,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/segmentByte", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_COUNT_OF_SEGMENTS_AWAITING_COMPACTION,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/segmentCount", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_INTERVAL_OF_SEGMENTS_AWAITING_COMPACTION,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/intervalCount", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_SIZE_OF_SEGMENTS_COMPACTED,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/compacted/segmentByte", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_COUNT_OF_SEGMENTS_COMPACTED,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/compacted/segmentCount", count)

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r489223414



##########
File path: server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
##########
@@ -270,12 +275,246 @@ public String get()
     assertLastSegmentNotCompacted(compactSegments);
   }
 
+  @Test
+  public void testMakeStats()

Review comment:
       I feel that each test is already doing a lot. Combining them will make the tests hard to understand as it will be testing and verifying many many many things. They are also not duplicated code as they are verifying different things so I think it is fine to separate them. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r490103518



##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentIterator.java
##########
@@ -19,22 +19,45 @@
 
 package org.apache.druid.server.coordinator.duty;
 
-import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
+import org.apache.druid.server.coordinator.CompactionStatistics;
 import org.apache.druid.timeline.DataSegment;
 
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Segments in the lists which are the elements of this iterator are sorted according to the natural segment order
  * (see {@link DataSegment#compareTo}).
  */
 public interface CompactionSegmentIterator extends Iterator<List<DataSegment>>

Review comment:
       Actually, I have another idea which I really like and I think works well + clean + simple (I hope you like it too).
   
   Removed `flushAllSegments()` and `totalRemainingStatistics()`  from the `CompactionSegmentIterator`. Keep `totalCompactedStatistics()` and `totalSkippedStatistics()` in the `CompactionSegmentIterator`. The stats for `totalCompactedStatistics()` and `totalSkippedStatistics()`  is maintain and stored in the `CompactionSegmentIterator` (`NewestSegmentFirstIterator`). I think this makes perfect sense since the "already compacted" and "skipped segments" are never returned from the iterator’s `next()` (in fact, it’s not even put into the queue that backs the iterator). Hence, as we iterate through the already compacted and skipped segments, we should just aggregates the stats within the `CompactionSegmentIterator` and keep it within the `CompactionSegmentIterator`. The `totalCompactedStatistics()` and `totalSkippedStatistics()`  will simply just returns the aggregates up to the current point of the iterator being iterated. We will then simply just iterate the iterator as before when
  creating the compaction task and the List<DataSegment> that is returned by the iterator is then aggregates into the `AutoCompactionSnapshot.Builder`'s Compacted stats (this is in `CompactSegments`). Similarly, we will continue to iterate the iterator in `makeStats()` (after we run out of task slot), then the List<DataSegment> that is returned by the iterator is then aggregates into `AutoCompactionSnapshot.Builder`'s Remaining stats (this is in `CompactSegments`).
   
   There are Stats maintained in both `CompactSegments`  and `CompactionSegmentIterator` . But I think it’s much clearer since we separate the “skipped/already compacted” from the “segments returned by the iterator”. Also, no complex contracts in the `CompactionSegmentIterator` anymore . Logic in `CompactionSegmentIterator`  for `totalCompactedStatistics()` and `totalSkippedStatistics()`  is also very simple.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r490690136



##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
##########
@@ -61,7 +77,7 @@
   private final CompactionSegmentSearchPolicy policy;
   private final IndexingServiceClient indexingServiceClient;
 
-  private Object2LongOpenHashMap<String> totalSizesOfSegmentsAwaitingCompactionPerDataSource;
+  private AtomicReference<Map<String, AutoCompactionSnapshot>> autoCompactionSnapshotPerDataSource = new AtomicReference<>();

Review comment:
       Could you please add a Javadoc about the concurrent access pattern on `autoCompactionSnapshotPerDataSource`? I guess it can say "This variable is updated by the Coordinator thread executing duties and read by HTTP threads processing Coordinator API calls."

##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
##########
@@ -301,13 +301,115 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
         )
     );
 
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/maxSlot/count",

Review comment:
       Maybe `compactTask/maxSlot/count`?

##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
##########
@@ -301,13 +301,115 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
         )
     );
 
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/maxSlot/count",
+            stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/availableSlot/count",
+            stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_SIZE_OF_SEGMENTS_AWAITING,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/segmentBytes", count)

Review comment:
       I think `segment/waitCompact/bytes` would be enough. The second segment in `segmentBytes` seems duplicate.

##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
##########
@@ -301,13 +301,115 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
         )
     );
 
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/maxSlot/count",
+            stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/availableSlot/count",
+            stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_SIZE_OF_SEGMENTS_AWAITING,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/segmentBytes", count)

Review comment:
       Same comment for other metrics for segment size and count.

##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
##########
@@ -61,7 +77,7 @@
   private final CompactionSegmentSearchPolicy policy;
   private final IndexingServiceClient indexingServiceClient;
 
-  private Object2LongOpenHashMap<String> totalSizesOfSegmentsAwaitingCompactionPerDataSource;
+  private AtomicReference<Map<String, AutoCompactionSnapshot>> autoCompactionSnapshotPerDataSource = new AtomicReference<>();

Review comment:
       nit: this variable can be final.

##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
##########
@@ -301,13 +301,115 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
         )
     );
 
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/maxSlot/count",
+            stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/availableSlot/count",
+            stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_SIZE_OF_SEGMENTS_AWAITING,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/segmentBytes", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_COUNT_OF_SEGMENTS_AWAITING,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/segmentCount", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_INTERVAL_OF_SEGMENTS_AWAITING,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/intervalCount", count)

Review comment:
       Same question for other metrics for interval.

##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
##########
@@ -234,29 +294,174 @@ private CoordinatorStats doRun(
     return newContext;
   }
 
-  private CoordinatorStats makeStats(int numCompactionTasks, CompactionSegmentIterator iterator)
+  /**
+   * This method can be use to atomically update the snapshots in {@code autoCompactionSnapshotPerDataSource} when
+   * no compaction task is schedule in this run. Currently, this method does not update compaction statistics
+   * (bytes, interval count, segment count, etc) since we skip iterating through the segments and cannot get an update
+   * on those statistics. Thus, this method only updates the schedule status and task list (compaction statistics
+   * remains the same as the previous snapshot).
+   */
+  private void updateAutoCompactionSnapshotWhenNoCompactTaskScheduled(
+      Map<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders
+  )
+  {
+    Map<String, AutoCompactionSnapshot> previousSnapshots = autoCompactionSnapshotPerDataSource.get();
+    for (Map.Entry<String, AutoCompactionSnapshot.Builder> autoCompactionSnapshotBuilderEntry : currentRunAutoCompactionSnapshotBuilders.entrySet()) {
+      final String dataSource = autoCompactionSnapshotBuilderEntry.getKey();
+      AutoCompactionSnapshot previousSnapshot = previousSnapshots.get(dataSource);
+      if (previousSnapshot != null) {
+        autoCompactionSnapshotBuilderEntry.getValue().incrementBytesAwaitingCompaction(previousSnapshot.getBytesAwaitingCompaction());
+        autoCompactionSnapshotBuilderEntry.getValue().incrementBytesCompacted(previousSnapshot.getBytesCompacted());
+        autoCompactionSnapshotBuilderEntry.getValue().incrementBytesSkipped(previousSnapshot.getBytesSkipped());
+        autoCompactionSnapshotBuilderEntry.getValue().incrementSegmentCountAwaitingCompaction(previousSnapshot.getSegmentCountAwaitingCompaction());
+        autoCompactionSnapshotBuilderEntry.getValue().incrementSegmentCountCompacted(previousSnapshot.getSegmentCountCompacted());
+        autoCompactionSnapshotBuilderEntry.getValue().incrementSegmentCountSkipped(previousSnapshot.getSegmentCountSkipped());
+        autoCompactionSnapshotBuilderEntry.getValue().incrementIntervalCountAwaitingCompaction(previousSnapshot.getIntervalCountAwaitingCompaction());
+        autoCompactionSnapshotBuilderEntry.getValue().incrementIntervalCountCompacted(previousSnapshot.getIntervalCountCompacted());
+        autoCompactionSnapshotBuilderEntry.getValue().incrementIntervalCountSkipped(previousSnapshot.getIntervalCountSkipped());
+      }
+    }
+
+    Map<String, AutoCompactionSnapshot> currentAutoCompactionSnapshotPerDataSource = Maps.transformValues(
+        currentRunAutoCompactionSnapshotBuilders,
+        AutoCompactionSnapshot.Builder::build
+    );
+    // Atomic update of autoCompactionSnapshotPerDataSource with the latest from this coordinator run
+    autoCompactionSnapshotPerDataSource.set(currentAutoCompactionSnapshotPerDataSource);
+  }
+
+  private CoordinatorStats makeStats(
+      Map<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders,
+      int numCompactionTasks,
+      CompactionSegmentIterator iterator
+  )
   {
+    final Map<String, AutoCompactionSnapshot> currentAutoCompactionSnapshotPerDataSource = new HashMap<>();
     final CoordinatorStats stats = new CoordinatorStats();
     stats.addToGlobalStat(COMPACTION_TASK_COUNT, numCompactionTasks);
-    totalSizesOfSegmentsAwaitingCompactionPerDataSource = iterator.totalRemainingSegmentsSizeBytes();
-    totalSizesOfSegmentsAwaitingCompactionPerDataSource.object2LongEntrySet().fastForEach(
-        entry -> {
-          final String dataSource = entry.getKey();
-          final long totalSizeOfSegmentsAwaitingCompaction = entry.getLongValue();
-          stats.addToDataSourceStat(
-              TOTAL_SIZE_OF_SEGMENTS_AWAITING_COMPACTION,
-              dataSource,
-              totalSizeOfSegmentsAwaitingCompaction
-          );
-        }
-    );
+
+    // Iterate through all the remaining segments in the iterator.
+    // As these segments could be compacted but were not compacted due to lack of task slot, we will aggregates
+    // the statistic to the AwaitingCompaction statistics
+    for (; iterator.hasNext();) {

Review comment:
       nit: you can use `while(iterator.hasNext())`

##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
##########
@@ -301,13 +301,115 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
         )
     );
 
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/maxSlot/count",
+            stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/availableSlot/count",

Review comment:
       Similarly, maybe `compactTask/availableSlot/count`?

##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
##########
@@ -301,13 +301,115 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
         )
     );
 
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/maxSlot/count",
+            stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/availableSlot/count",
+            stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_SIZE_OF_SEGMENTS_AWAITING,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/segmentBytes", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_COUNT_OF_SEGMENTS_AWAITING,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/segmentCount", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_INTERVAL_OF_SEGMENTS_AWAITING,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/intervalCount", count)

Review comment:
       Hmm, should it be `interval/waitCompact/count`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r489168520



##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
##########
@@ -61,7 +70,7 @@
   private final CompactionSegmentSearchPolicy policy;
   private final IndexingServiceClient indexingServiceClient;
 
-  private Object2LongOpenHashMap<String> totalSizesOfSegmentsAwaitingCompactionPerDataSource;
+  private HashMap<String, AutoCompactionSnapshot> autoCompactionSnapshotPerDataSource = new HashMap<>();

Review comment:
       Changed the autoCompactionSnapshotPerDataSource into a AtomicReference<Map<String, AutoCompactionSnapshot>>.
   
   We will build a new autoCompactionSnapshotPerDataSource of snapshots in each compaction run then replace (atomically) the whole map (autoCompactionSnapshotPerDataSource) at the end in `makeStats`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r489203219



##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
##########
@@ -301,13 +301,82 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
         )
     );
 
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/maxSlot/count",
+            stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/availableSlot/count",
+            stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_SIZE_OF_SEGMENTS_AWAITING_COMPACTION,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/segmentByte", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_COUNT_OF_SEGMENTS_AWAITING_COMPACTION,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/segmentCount", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_INTERVAL_OF_SEGMENTS_AWAITING_COMPACTION,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/intervalCount", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_SIZE_OF_SEGMENTS_COMPACTED,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/compacted/segmentByte", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_COUNT_OF_SEGMENTS_COMPACTED,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/compacted/segmentCount", count)

Review comment:
       I have a separate PR to think about the skipped intervals. The other PR is not ready yet but I am intending to address information on the status of the segments (if segments is compacted or not processed or skipped). I will expose this information so that we can build a UI to display something like a segments fragmentation view. In that PR, I'll emit metrics for skipped intervals too.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r489189326



##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
##########
@@ -238,25 +272,102 @@ private CoordinatorStats makeStats(int numCompactionTasks, CompactionSegmentIter
   {
     final CoordinatorStats stats = new CoordinatorStats();
     stats.addToGlobalStat(COMPACTION_TASK_COUNT, numCompactionTasks);
-    totalSizesOfSegmentsAwaitingCompactionPerDataSource = iterator.totalRemainingSegmentsSizeBytes();
-    totalSizesOfSegmentsAwaitingCompactionPerDataSource.object2LongEntrySet().fastForEach(
-        entry -> {
-          final String dataSource = entry.getKey();
-          final long totalSizeOfSegmentsAwaitingCompaction = entry.getLongValue();
-          stats.addToDataSourceStat(
-              TOTAL_SIZE_OF_SEGMENTS_AWAITING_COMPACTION,
-              dataSource,
-              totalSizeOfSegmentsAwaitingCompaction
-          );
-        }
-    );
+
+    // Make sure that the iterator iterate through all the remaining segments so that we can get accurate and correct
+    // statistics (remaining, skipped, processed, etc.). The reason we have to do this explicitly here is because
+    // earlier (when we are iterating to submit compaction tasks) we may have ran out of task slot and were not able
+    // to iterate to the first segment that needs compaction for some datasource.
+    iterator.flushAllSegments();
+    // Statistics of all segments that still need compaction after this run
+    Map<String, CompactionStatistics> allRemainingStatistics = iterator.totalRemainingStatistics();
+    // Statistics of all segments either compacted or skipped after this run
+    Map<String, CompactionStatistics> allProcessedStatistics = iterator.totalProcessedStatistics();
+
+    for (Map.Entry<String, AutoCompactionSnapshot> autoCompactionSnapshotEntry : autoCompactionSnapshotPerDataSource.entrySet()) {
+      final String dataSource = autoCompactionSnapshotEntry.getKey();
+      CompactionStatistics remainingStatistics = allRemainingStatistics.get(dataSource);
+      CompactionStatistics processedStatistics = allProcessedStatistics.get(dataSource);
+
+      long byteAwaitingCompaction = 0;
+      long segmentCountAwaitingCompaction = 0;
+      long intervalCountAwaitingCompaction = 0;
+      if (remainingStatistics != null) {
+        // If null means that all segments are either compacted or skipped.
+        // Hence, we can leave these set to default value of 0. If not null, we set it to the collected statistic.
+        byteAwaitingCompaction = remainingStatistics.getByteSum();
+        segmentCountAwaitingCompaction = remainingStatistics.getSegmentNumberCountSum();
+        intervalCountAwaitingCompaction = remainingStatistics.getSegmentIntervalCountSum();
+      }
+
+      long byteProcessed = 0;
+      long segmentCountProcessed = 0;
+      long intervalCountProcessed = 0;
+      if (processedStatistics != null) {
+        byteProcessed = processedStatistics.getByteSum();
+        segmentCountProcessed = processedStatistics.getSegmentNumberCountSum();
+        intervalCountProcessed = processedStatistics.getSegmentIntervalCountSum();
+      }
+
+      autoCompactionSnapshotEntry.getValue().setByteCountAwaitingCompaction(byteAwaitingCompaction);
+      autoCompactionSnapshotEntry.getValue().setByteCountProcessed(byteProcessed);
+      autoCompactionSnapshotEntry.getValue().setSegmentCountAwaitingCompaction(segmentCountAwaitingCompaction);
+      autoCompactionSnapshotEntry.getValue().setSegmentCountProcessed(segmentCountProcessed);
+      autoCompactionSnapshotEntry.getValue().setIntervalCountAwaitingCompaction(intervalCountAwaitingCompaction);
+      autoCompactionSnapshotEntry.getValue().setIntervalCountProcessed(intervalCountProcessed);
+
+      stats.addToDataSourceStat(
+          TOTAL_SIZE_OF_SEGMENTS_AWAITING_COMPACTION,
+          dataSource,
+          byteAwaitingCompaction
+      );
+      stats.addToDataSourceStat(
+          TOTAL_COUNT_OF_SEGMENTS_AWAITING_COMPACTION,
+          dataSource,
+          segmentCountAwaitingCompaction
+      );
+      stats.addToDataSourceStat(
+          TOTAL_INTERVAL_OF_SEGMENTS_AWAITING_COMPACTION,
+          dataSource,
+          intervalCountAwaitingCompaction
+      );
+      stats.addToDataSourceStat(
+          TOTAL_SIZE_OF_SEGMENTS_COMPACTED,
+          dataSource,
+          byteProcessed
+      );
+      stats.addToDataSourceStat(
+          TOTAL_COUNT_OF_SEGMENTS_COMPACTED,
+          dataSource,
+          segmentCountProcessed
+      );
+      stats.addToDataSourceStat(
+          TOTAL_INTERVAL_OF_SEGMENTS_COMPACTED,
+          dataSource,
+          intervalCountProcessed
+      );
+    }
+
     return stats;
   }
 
-  @SuppressWarnings("deprecation") // Intentionally using boxing get() to return null if dataSource is unknown
   @Nullable
   public Long getTotalSizeOfSegmentsAwaitingCompaction(String dataSource)
   {
-    return totalSizesOfSegmentsAwaitingCompactionPerDataSource.get(dataSource);
+    AutoCompactionSnapshot autoCompactionSnapshot = autoCompactionSnapshotPerDataSource.get(dataSource);
+    if (autoCompactionSnapshot == null) {
+      return null;
+    }
+    return autoCompactionSnapshotPerDataSource.get(dataSource).getByteCountAwaitingCompaction();

Review comment:
       Oops. Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r488392581



##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
##########
@@ -238,25 +272,102 @@ private CoordinatorStats makeStats(int numCompactionTasks, CompactionSegmentIter
   {
     final CoordinatorStats stats = new CoordinatorStats();
     stats.addToGlobalStat(COMPACTION_TASK_COUNT, numCompactionTasks);
-    totalSizesOfSegmentsAwaitingCompactionPerDataSource = iterator.totalRemainingSegmentsSizeBytes();
-    totalSizesOfSegmentsAwaitingCompactionPerDataSource.object2LongEntrySet().fastForEach(
-        entry -> {
-          final String dataSource = entry.getKey();
-          final long totalSizeOfSegmentsAwaitingCompaction = entry.getLongValue();
-          stats.addToDataSourceStat(
-              TOTAL_SIZE_OF_SEGMENTS_AWAITING_COMPACTION,
-              dataSource,
-              totalSizeOfSegmentsAwaitingCompaction
-          );
-        }
-    );
+
+    // Make sure that the iterator iterate through all the remaining segments so that we can get accurate and correct
+    // statistics (remaining, skipped, processed, etc.). The reason we have to do this explicitly here is because
+    // earlier (when we are iterating to submit compaction tasks) we may have ran out of task slot and were not able
+    // to iterate to the first segment that needs compaction for some datasource.
+    iterator.flushAllSegments();

Review comment:
       Made the change for auto compaction duty to the last of the set of Coordinator duty and change the scheduling of the Coordinator duty from scheduleWithFixedDelay to scheduleWithFixedRate. This should ensures that the other duty still get run at every 30 mins (assuming the auto compaction doesn't go pass 30 mins which I doubt it will).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] lgtm-com[bot] commented on pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
lgtm-com[bot] commented on pull request #10371:
URL: https://github.com/apache/druid/pull/10371#issuecomment-689290117


   This pull request **introduces 1 alert** when merging 2a373b26b31b276789f46a2e00f2b2a167e0288a into e5f0da30ae15369f66c7e9ecc05a41c3d49eb2e6 - [view on LGTM.com](https://lgtm.com/projects/g/apache/druid/rev/pr-dc3239d5090a23c3d540f3559ca7e473b46cd073)
   
   **new alerts:**
   
   * 1 for Dereferenced variable may be null


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r487838589



##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
##########
@@ -296,18 +296,87 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
 
     emitter.emit(
         new ServiceMetricEvent.Builder().build(
-            "compact/task/count",
+            "compact/task/scheduled/count",

Review comment:
       Revert back




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r490000904



##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentIterator.java
##########
@@ -19,22 +19,45 @@
 
 package org.apache.druid.server.coordinator.duty;
 
-import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
+import org.apache.druid.server.coordinator.CompactionStatistics;
 import org.apache.druid.timeline.DataSegment;
 
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Segments in the lists which are the elements of this iterator are sorted according to the natural segment order
  * (see {@link DataSegment#compareTo}).
  */
 public interface CompactionSegmentIterator extends Iterator<List<DataSegment>>

Review comment:
       we can only know the skipped and compacted segments when we are iterating the segment timeline. This code path isn’t called by the next(). It is computed and stored as a QueueEntry into the queue. The next() method simply just poll the QueueEntry from the queue. This means that the skipped and compacted stats must be in the QueueEntry when the next() call polls the queue. We also do not store an entry into the queue if the List<DataSegment> is empty (no segments to compact). This does not fit well with having to try include the skipped and compacted stats into the QueueEntry. For example, we still need to store and aggregates the skipped and compacted stats when the actual return segment is an empty List (no more segments for the datasource).
   
   tbh I think it is best to keep it this way. Also, since this (the current way or modifying the CompactionSegmentIterator) does not change the API, we can come back and modify it later. Also since NewestSegmentFirstIterator is the only CompactionSegmentIterator right now, the cost for changing isn't too high. Maybe a good time to re-visit this is if we are going to add a new iterator and see what will work best when we have multiple implementations of CompactionSegmentIterator




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r489165878



##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
##########
@@ -61,7 +70,7 @@
   private final CompactionSegmentSearchPolicy policy;
   private final IndexingServiceClient indexingServiceClient;
 
-  private Object2LongOpenHashMap<String> totalSizesOfSegmentsAwaitingCompactionPerDataSource;
+  private HashMap<String, AutoCompactionSnapshot> autoCompactionSnapshotPerDataSource = new HashMap<>();

Review comment:
       Changed AutoCompactionSnapshot to be immutable and changed autoCompactionSnapshotPerDataSource to be a concurrent hash map. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
maytasm commented on pull request #10371:
URL: https://github.com/apache/druid/pull/10371#issuecomment-693247121


   > > > * Why does the response include only one task ID? What will happen if auto compaction issues multiple compaction tasks?
   > > 
   > > 
   > > Hmm... I am thinking of having the UI component show the status (fail, success) of the latest task which could indicate if user action is requried or not.
   > > Another idea may be to return a list of all tasks issued during this run (and empty list if no slot). The UI can select which task it want to show. The UI can choose the last task ID or it can calcualte success vs fail % rate of all tasks in last run.
   > 
   > Hmm, I'm not sure what would be a use case of such information of the tasks issued in each run. Would it be more useful to know how many compaction task failures there have been recently (over a couple of runs)?
   
   I just realized that we not have a different prefix to task name when they are issued manually vs issued automatically by the coordinator (auto compaction). In this case, returning the last task scheduled by auto compaction is not necessary as we can use the existing task api to get the latest task that has the coordinator-issue prefix in the name. In fact, we can sort by time and get a history of all the tasks issued by the auto compaction. Thus, the existing task API can already answer question like how many compaction task failures there have been recently (over some time period like hours, days, etc.).
   
   The information that currently cannot be retrieved is a list for all the tasks issue in the same / latest coordinator run. That provides information you cannot get currently with the ingestion tab since we have no idea which tasks issued in the same run. This allows us to know things like % of tasks succeeded vs. failed vs. canceled in the last run. Moreover, we can then find out % of tasks succeeded vs. failed vs. canceled in the last run vs previous run (or any other runs). This allows user to see if changes they made (change to configs, specs, etc) between the runs fix any previous issues they were seeing.
   For example...
   
   run 1 have 100% failed and 0 % success, then user go fix something and now run 2 have 100% success and 0% failed. They can see that the fix they made solve the problem.
   run 1 have 100% failed and 0 % success, then user go fix something and now run 2 have 0% success and 100% failed. They can see that the fix did not solve the problem.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r490103518



##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentIterator.java
##########
@@ -19,22 +19,45 @@
 
 package org.apache.druid.server.coordinator.duty;
 
-import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
+import org.apache.druid.server.coordinator.CompactionStatistics;
 import org.apache.druid.timeline.DataSegment;
 
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Segments in the lists which are the elements of this iterator are sorted according to the natural segment order
  * (see {@link DataSegment#compareTo}).
  */
 public interface CompactionSegmentIterator extends Iterator<List<DataSegment>>

Review comment:
       Actually, I have another idea which I really like and I think works well + clean + simple (I hope you like it too).
   
   Removed flushAllSegments() and totalRemainingStatistics()  from the CompactionSegmentIterator. Keep totalCompactedStatistics() and totalSkippedStatistics() in the CompactionSegmentIterator . The stats for totalCompactedStatistics() and totalSkippedStatistics()  is maintain and stored in the CompactionSegmentIterator  (NewestSegmentFirstIterator). I think this makes perfect sense since the already compacted and skipped segments are never returned from the iterator’s next() (in fact, it’s never even put into the queue that backs the iterator). Hence, as we iterate through the already compacted and skipped segments, we should just aggregates the stats within the CompactionSegmentIterator . The totalCompactedStatistics() and totalSkippedStatistics()  will simply just returns the aggregates up to the current point of the iterator being iterated. We will then simply just iterate the iterator as before when creating the compaction task. The List<DataSegment> that is returned is then 
 aggregates into a map in CompactSegments. Similarly, we will continue to iterate the iterator in makeStats after we run out of task slot, then the List<DataSegment> that is returned is then aggregates into a different map in CompactSegments
   There are Stats map maintained in both CompactSegments  and CompactionSegmentIterator . But I think it’s much clearer and we separate the “skipped/already compacted” from the “segments returned by the iterator”. Also, no complex contracts in the CompactionSegmentIterator . Login in CompactionSegmentIterator  for totalCompactedStatistics() and totalSkippedStatistics()  is also very simple.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
maytasm commented on pull request #10371:
URL: https://github.com/apache/druid/pull/10371#issuecomment-690787993


   @jihoonson 
   Thanks for taking a look. Please see answers below.
   
   > * We need documents for new metrics and APIs. Are you planning to add them as a follow-up?
   I am planning to add the docs and integration tests as a follow-up PR. I am planning to get this change in with the implementation (current state of the PR) and unit tests (coming very soon). We are also planning to add UI for showing these metrics. Hence, this PR will unblock the UI work.
   
   > * If you don't want to add docs in this PR, would you please add descriptions on each field in the API response and each metric?
   Sure. I'll add it to the PR description 
   
   > * Could you define "snapshot" more precisely? Is it the status of the latest auto-compaction run? What would it be if the latest run did nothing due to lack of task slots?
   Snapshot refers to the statistics from the latest auto-compaction run. It is the "snapshot" of the latest auto-compaction. If there is no available slot or not enough slot to get to a particular datasource then CompactSegment (after using up all the available slot) will iterate the CompactionSegmentIterator (NewestSegmentFirstIterator) until it reached the first segment that needs compaction for all datasource. This will still allows us to get accurate statistic of all datasources. So to answer your question... "scheduleStatus" will always be update regardless of slots. "latestScheduledTaskId" can be task id from previous coordinator run if the current run did not schedule a task for this datasource. "byteCompacted"/"segmentCountCompacted"/"intervalCountCompacted" will be the same as previous run if there is no slot (meaning no compact task scheduled). "byteAwaitingCompaction"/"segmentCountAwaitingCompaction"/"intervalCountAwaitingCompaction" will be the same as previous run if th
 ere is no slot (meaning no compact task scheduled) or it may increase if there are no data ingested for the datasource between the last run and this run. Basically, "byteCompacted"/"segmentCountCompacted"/"intervalCountCompacted"/"byteAwaitingCompaction"/"segmentCountAwaitingCompaction"/"intervalCountAwaitingCompaction" are always correct statistic of the datasource at the time of the latest run even if there is no slot / no task scheduled. 
   
   > * I'm not sure `snapshot` is a good API name since it's not intuitive to me what it means. Would `status` be better?
   I was thinking of "snapshot" as the snapshot taken at the point in time the API is called. I think "status" also works. Let me know if you still think "status" is better. I'm open to both
   
   > * Why do you want to distinguish datasources which auto compaction has never configured and others which auto compaction has paused? Is it useful to return the same statistics for the datasources as well which auto compaction has never configured?
   For datasources that never has auto compaction enabled then all of these statistics are already avaiabled or can be calculated from existing APIs. "scheduleStatus" will be "NOT_ENABLED", "byteCompacted"/"segmentCountCompacted"/"intervalCountCompacted" will be 0, "byteAwaitingCompaction"/"segmentCountAwaitingCompaction"/"intervalCountAwaitingCompaction" will be total size, number of segments, number of intervals (these are available in sys.segment etc.). "latestScheduledTaskId" will be null. So, not including those to reduce unnesseary computation in the API and reduce payload size.
   Although, I can see one case that this will not be true which is for datasource that never has auto compaction enabled but has manual compaction task ran. Do you think it is common and useful for datasource that never has auto compaction enabled but has manual compaction task? (Initially, this PR only aims to make auto compation more user-friendly and not really care about the manual compaction stuff)
   
   > * Why does the response include only one task ID? What will happen if auto compaction issues multiple compaction tasks?
   Hmm... I am thinking of having the UI component show the status (fail, success) of the latest task which could indicate if user action is requried or not.
   Another idea may be to return a list of all tasks issued during this run (and empty list if no slot). The UI can select which task it want to show. The UI can choose the last task ID or it can calcualte success vs fail % rate of all tasks in last run.
   
   > * Please list out new metrics in the PR description. It will help release manager.
   Sure. I'll add it to the PR description 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r489158548



##########
File path: server/src/main/java/org/apache/druid/server/coordinator/AutoCompactionSnapshot.java
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+import java.util.Objects;
+
+public class AutoCompactionSnapshot
+{
+  public enum AutoCompactionScheduleStatus
+  {
+    NOT_ENABLED,
+    RUNNING
+  }
+
+  @JsonProperty
+  private String dataSource;
+  @JsonProperty
+  private AutoCompactionScheduleStatus scheduleStatus;
+  @JsonProperty
+  private String latestScheduledTaskId;
+  @JsonProperty
+  private long byteCountAwaitingCompaction;

Review comment:
       Changed to `bytesAwaitingCompaction` and `bytesProcessed`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r489166052



##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
##########
@@ -61,7 +70,7 @@
   private final CompactionSegmentSearchPolicy policy;
   private final IndexingServiceClient indexingServiceClient;
 
-  private Object2LongOpenHashMap<String> totalSizesOfSegmentsAwaitingCompactionPerDataSource;
+  private HashMap<String, AutoCompactionSnapshot> autoCompactionSnapshotPerDataSource = new HashMap<>();

Review comment:
       When a new compaction run happens, we create a new snapshot and update (atomically) the autoCompactionSnapshotPerDataSource since autoCompactionSnapshotPerDataSource is a concurrent hash map.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10371: Auto-compaction snapshot status API

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r488943398



##########
File path: server/src/main/java/org/apache/druid/server/coordinator/AutoCompactionSnapshot.java
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+import java.util.Objects;
+
+public class AutoCompactionSnapshot
+{
+  public enum AutoCompactionScheduleStatus
+  {
+    NOT_ENABLED,
+    RUNNING
+  }
+
+  @JsonProperty
+  private String dataSource;
+  @JsonProperty
+  private AutoCompactionScheduleStatus scheduleStatus;
+  @JsonProperty
+  private String latestScheduledTaskId;
+  @JsonProperty
+  private long byteCountAwaitingCompaction;
+  @JsonProperty
+  private long byteCountProcessed;
+  @JsonProperty
+  private long segmentCountAwaitingCompaction;
+  @JsonProperty
+  private long segmentCountProcessed;
+  @JsonProperty
+  private long intervalCountAwaitingCompaction;
+  @JsonProperty
+  private long intervalCountProcessed;
+
+  @JsonCreator
+  public AutoCompactionSnapshot(
+      @JsonProperty @NotNull String dataSource,
+      @JsonProperty @NotNull AutoCompactionScheduleStatus scheduleStatus
+  )
+  {
+    this.dataSource = dataSource;
+    this.scheduleStatus = scheduleStatus;
+  }
+
+  @NotNull
+  public String getDataSource()
+  {
+    return dataSource;
+  }
+
+  @NotNull
+  public AutoCompactionScheduleStatus getScheduleStatus()
+  {
+    return scheduleStatus;
+  }
+
+  @Nullable
+  public String getLatestScheduledTaskId()
+  {
+    return latestScheduledTaskId;
+  }
+
+  public long getByteCountAwaitingCompaction()
+  {
+    return byteCountAwaitingCompaction;
+  }
+
+  public long getByteCountProcessed()
+  {
+    return byteCountProcessed;
+  }
+
+  public long getSegmentCountAwaitingCompaction()
+  {
+    return segmentCountAwaitingCompaction;
+  }
+
+  public long getSegmentCountProcessed()
+  {
+    return segmentCountProcessed;
+  }
+
+  public long getIntervalCountAwaitingCompaction()
+  {
+    return intervalCountAwaitingCompaction;
+  }
+
+  public long getIntervalCountProcessed()
+  {
+    return intervalCountProcessed;
+  }
+
+  public void setScheduleStatus(AutoCompactionScheduleStatus scheduleStatus)
+  {
+    this.scheduleStatus = scheduleStatus;
+  }
+
+  public void setLatestScheduledTaskId(String latestScheduledTaskId)
+  {
+    this.latestScheduledTaskId = latestScheduledTaskId;
+  }
+
+  public void setByteCountAwaitingCompaction(long byteCountAwaitingCompaction)
+  {
+    this.byteCountAwaitingCompaction = byteCountAwaitingCompaction;
+  }
+
+  public void setByteCountProcessed(long byteCountProcessed)
+  {
+    this.byteCountProcessed = byteCountProcessed;
+  }
+
+  public void setSegmentCountAwaitingCompaction(long segmentCountAwaitingCompaction)
+  {
+    this.segmentCountAwaitingCompaction = segmentCountAwaitingCompaction;
+  }
+
+  public void setSegmentCountProcessed(long segmentCountProcessed)
+  {
+    this.segmentCountProcessed = segmentCountProcessed;
+  }
+
+  public void setIntervalCountAwaitingCompaction(long intervalCountAwaitingCompaction)
+  {
+    this.intervalCountAwaitingCompaction = intervalCountAwaitingCompaction;
+  }
+
+  public void setIntervalCountProcessed(long intervalCountProcessed)
+  {
+    this.intervalCountProcessed = intervalCountProcessed;
+  }
+
+
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    AutoCompactionSnapshot that = (AutoCompactionSnapshot) o;
+    return byteCountAwaitingCompaction == that.byteCountAwaitingCompaction &&
+           byteCountProcessed == that.byteCountProcessed &&
+           segmentCountAwaitingCompaction == that.segmentCountAwaitingCompaction &&
+           segmentCountProcessed == that.segmentCountProcessed &&
+           intervalCountAwaitingCompaction == that.intervalCountAwaitingCompaction &&
+           intervalCountProcessed == that.intervalCountProcessed &&
+           dataSource.equals(that.dataSource) &&
+           Objects.equals(scheduleStatus, that.scheduleStatus) &&
+           Objects.equals(latestScheduledTaskId, that.latestScheduledTaskId);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(
+        dataSource,
+        scheduleStatus,
+        latestScheduledTaskId,
+        byteCountAwaitingCompaction,

Review comment:
       This looks error-prone since all fields in this class are mutable. Suppose that this class was used as a key in a hash set. If you updated a field in this class after adding it to the set, its hash key would be different which will cause an unintended result. Even though it seems that `hashCode()` and `equals()` are used only in unit tests, I would suggest to make this class immutable.

##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
##########
@@ -61,7 +70,7 @@
   private final CompactionSegmentSearchPolicy policy;
   private final IndexingServiceClient indexingServiceClient;
 
-  private Object2LongOpenHashMap<String> totalSizesOfSegmentsAwaitingCompactionPerDataSource;
+  private HashMap<String, AutoCompactionSnapshot> autoCompactionSnapshotPerDataSource = new HashMap<>();

Review comment:
       It was my bad that updating `totalSizesOfSegmentsAwaitingCompactionPerDataSource` was not thread-safe, but it should be. It applies same to `autoCompactionSnapshotPerDataSource`. I think an easy way is storing `autoCompactionSnapshotPerDataSource` in an `AtomicReference` so that we can atomically update the reference to the hash map whenever we compute new snapshots.

##########
File path: server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
##########
@@ -569,7 +580,7 @@ private void becomeLeader()
       }
 
       for (final Pair<? extends DutiesRunnable, Duration> dutiesRunnable : dutiesRunnables) {
-        ScheduledExecutors.scheduleWithFixedDelay(
+        ScheduledExecutors.scheduleAtFixedRate(

Review comment:
       :+1: 

##########
File path: server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
##########
@@ -632,8 +643,9 @@ private void stopBeingLeader()
   {
     List<CoordinatorDuty> duties = new ArrayList<>();
     duties.add(new LogUsedSegments());
-    duties.addAll(makeCompactSegmentsDuty());
     duties.addAll(indexingServiceDuties);
+    // CompactSegmentsDuty should be the last duty as it can takea long time

Review comment:
       typo: `takea` -> `take a`

##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentIterator.java
##########
@@ -19,22 +19,45 @@
 
 package org.apache.druid.server.coordinator.duty;
 
-import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
+import org.apache.druid.server.coordinator.CompactionStatistics;
 import org.apache.druid.timeline.DataSegment;
 
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Segments in the lists which are the elements of this iterator are sorted according to the natural segment order
  * (see {@link DataSegment#compareTo}).
  */
 public interface CompactionSegmentIterator extends Iterator<List<DataSegment>>

Review comment:
       Thinking about new methods in this interface, I'm not sure if they are good since now every implementation of this interface should track of remaining and processed segments even though the tracking logic will be likely duplicate (even though we have only one implementation yet :slightly_smiling_face:). How about adding `next(Map<String, CompactionStatistics> stats)` so that `CompactSegments` can pass in an appropriate map? Then, it can just iterate over all remaining entries in the iterator without introducing any methods such as `flushAllSegments()` which seems to have a complicated contract. `CompactionSegmentIterator` will not extend `Iterator` anymore in this case.

##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
##########
@@ -238,25 +272,102 @@ private CoordinatorStats makeStats(int numCompactionTasks, CompactionSegmentIter
   {
     final CoordinatorStats stats = new CoordinatorStats();
     stats.addToGlobalStat(COMPACTION_TASK_COUNT, numCompactionTasks);
-    totalSizesOfSegmentsAwaitingCompactionPerDataSource = iterator.totalRemainingSegmentsSizeBytes();
-    totalSizesOfSegmentsAwaitingCompactionPerDataSource.object2LongEntrySet().fastForEach(
-        entry -> {
-          final String dataSource = entry.getKey();
-          final long totalSizeOfSegmentsAwaitingCompaction = entry.getLongValue();
-          stats.addToDataSourceStat(
-              TOTAL_SIZE_OF_SEGMENTS_AWAITING_COMPACTION,
-              dataSource,
-              totalSizeOfSegmentsAwaitingCompaction
-          );
-        }
-    );
+
+    // Make sure that the iterator iterate through all the remaining segments so that we can get accurate and correct
+    // statistics (remaining, skipped, processed, etc.). The reason we have to do this explicitly here is because
+    // earlier (when we are iterating to submit compaction tasks) we may have ran out of task slot and were not able
+    // to iterate to the first segment that needs compaction for some datasource.
+    iterator.flushAllSegments();
+    // Statistics of all segments that still need compaction after this run
+    Map<String, CompactionStatistics> allRemainingStatistics = iterator.totalRemainingStatistics();
+    // Statistics of all segments either compacted or skipped after this run
+    Map<String, CompactionStatistics> allProcessedStatistics = iterator.totalProcessedStatistics();
+
+    for (Map.Entry<String, AutoCompactionSnapshot> autoCompactionSnapshotEntry : autoCompactionSnapshotPerDataSource.entrySet()) {
+      final String dataSource = autoCompactionSnapshotEntry.getKey();
+      CompactionStatistics remainingStatistics = allRemainingStatistics.get(dataSource);
+      CompactionStatistics processedStatistics = allProcessedStatistics.get(dataSource);
+
+      long byteAwaitingCompaction = 0;
+      long segmentCountAwaitingCompaction = 0;
+      long intervalCountAwaitingCompaction = 0;
+      if (remainingStatistics != null) {
+        // If null means that all segments are either compacted or skipped.
+        // Hence, we can leave these set to default value of 0. If not null, we set it to the collected statistic.
+        byteAwaitingCompaction = remainingStatistics.getByteSum();
+        segmentCountAwaitingCompaction = remainingStatistics.getSegmentNumberCountSum();
+        intervalCountAwaitingCompaction = remainingStatistics.getSegmentIntervalCountSum();
+      }
+
+      long byteProcessed = 0;
+      long segmentCountProcessed = 0;
+      long intervalCountProcessed = 0;
+      if (processedStatistics != null) {
+        byteProcessed = processedStatistics.getByteSum();
+        segmentCountProcessed = processedStatistics.getSegmentNumberCountSum();
+        intervalCountProcessed = processedStatistics.getSegmentIntervalCountSum();
+      }
+
+      autoCompactionSnapshotEntry.getValue().setByteCountAwaitingCompaction(byteAwaitingCompaction);
+      autoCompactionSnapshotEntry.getValue().setByteCountProcessed(byteProcessed);
+      autoCompactionSnapshotEntry.getValue().setSegmentCountAwaitingCompaction(segmentCountAwaitingCompaction);
+      autoCompactionSnapshotEntry.getValue().setSegmentCountProcessed(segmentCountProcessed);
+      autoCompactionSnapshotEntry.getValue().setIntervalCountAwaitingCompaction(intervalCountAwaitingCompaction);
+      autoCompactionSnapshotEntry.getValue().setIntervalCountProcessed(intervalCountProcessed);
+
+      stats.addToDataSourceStat(
+          TOTAL_SIZE_OF_SEGMENTS_AWAITING_COMPACTION,
+          dataSource,
+          byteAwaitingCompaction
+      );
+      stats.addToDataSourceStat(
+          TOTAL_COUNT_OF_SEGMENTS_AWAITING_COMPACTION,
+          dataSource,
+          segmentCountAwaitingCompaction
+      );
+      stats.addToDataSourceStat(
+          TOTAL_INTERVAL_OF_SEGMENTS_AWAITING_COMPACTION,
+          dataSource,
+          intervalCountAwaitingCompaction
+      );
+      stats.addToDataSourceStat(
+          TOTAL_SIZE_OF_SEGMENTS_COMPACTED,
+          dataSource,
+          byteProcessed
+      );
+      stats.addToDataSourceStat(
+          TOTAL_COUNT_OF_SEGMENTS_COMPACTED,
+          dataSource,
+          segmentCountProcessed
+      );
+      stats.addToDataSourceStat(
+          TOTAL_INTERVAL_OF_SEGMENTS_COMPACTED,
+          dataSource,
+          intervalCountProcessed
+      );
+    }
+
     return stats;
   }
 
-  @SuppressWarnings("deprecation") // Intentionally using boxing get() to return null if dataSource is unknown
   @Nullable
   public Long getTotalSizeOfSegmentsAwaitingCompaction(String dataSource)
   {
-    return totalSizesOfSegmentsAwaitingCompactionPerDataSource.get(dataSource);
+    AutoCompactionSnapshot autoCompactionSnapshot = autoCompactionSnapshotPerDataSource.get(dataSource);
+    if (autoCompactionSnapshot == null) {
+      return null;
+    }
+    return autoCompactionSnapshotPerDataSource.get(dataSource).getByteCountAwaitingCompaction();

Review comment:
       Duplicate call to `autoCompactionSnapshotPerDataSource.get()`.

##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
##########
@@ -301,13 +301,82 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
         )
     );
 
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/maxSlot/count",
+            stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/availableSlot/count",
+            stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_SIZE_OF_SEGMENTS_AWAITING_COMPACTION,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/segmentByte", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_COUNT_OF_SEGMENTS_AWAITING_COMPACTION,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/segmentCount", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_INTERVAL_OF_SEGMENTS_AWAITING_COMPACTION,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/intervalCount", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_SIZE_OF_SEGMENTS_COMPACTED,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/compacted/segmentByte", count)

Review comment:
       Same here. I think it should be `segmentBytes`.

##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
##########
@@ -301,13 +301,82 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
         )
     );
 
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/maxSlot/count",
+            stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/availableSlot/count",
+            stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_SIZE_OF_SEGMENTS_AWAITING_COMPACTION,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/segmentByte", count)

Review comment:
       I think it should be `segmentBytes`.

##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
##########
@@ -301,13 +301,82 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
         )
     );
 
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/maxSlot/count",
+            stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/availableSlot/count",
+            stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_SIZE_OF_SEGMENTS_AWAITING_COMPACTION,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/segmentByte", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_COUNT_OF_SEGMENTS_AWAITING_COMPACTION,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/segmentCount", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_INTERVAL_OF_SEGMENTS_AWAITING_COMPACTION,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/intervalCount", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_SIZE_OF_SEGMENTS_COMPACTED,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/compacted/segmentByte", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_COUNT_OF_SEGMENTS_COMPACTED,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/compacted/segmentCount", count)

Review comment:
       Should we emit metrics for skipped intervals segments as well?

##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
##########
@@ -336,25 +353,72 @@ private boolean needsCompaction(ClientCompactionTaskQueryTuningConfig tuningConf
    * @return segments to compact
    */
   private SegmentsToCompact findSegmentsToCompact(
+      final String dataSourceName,
       final CompactibleTimelineObjectHolderCursor compactibleTimelineObjectHolderCursor,
       final DataSourceCompactionConfig config
   )
   {
-    final long inputSegmentSize = config.getInputSegmentSizeBytes();
+    while (compactibleTimelineObjectHolderCursor.hasNext()) {
+      final SegmentsToCompact candidates = new SegmentsToCompact(compactibleTimelineObjectHolderCursor.next());
+      if (isSegmentsNeedCompact(candidates, config, true)) {
+        return candidates;
+      } else {
+        collectSegmentStatistics(processedSegments, dataSourceName, candidates);
+      }
+    }
+    log.info("All segments look good! Nothing to compact");
+    return new SegmentsToCompact();
+  }
 
+  /**
+   * Progressively iterates all remaining time intervals (latest first) in the
+   * timeline {@param compactibleTimelineObjectHolderCursor}. Note that the timeline lookup duration is one day.
+   * The logic for checking if the segments can be compacted or not is then perform on each iteration.
+   * This is repeated until no remaining time intervals in {@param compactibleTimelineObjectHolderCursor}.
+   */
+  private void iterateAllSegments(
+      final String dataSourceName,
+      final CompactibleTimelineObjectHolderCursor compactibleTimelineObjectHolderCursor,
+      final DataSourceCompactionConfig config
+  )
+  {
     while (compactibleTimelineObjectHolderCursor.hasNext()) {
       final SegmentsToCompact candidates = new SegmentsToCompact(compactibleTimelineObjectHolderCursor.next());
+      if (isSegmentsNeedCompact(candidates, config, false)) {
+        // Collect statistic for segments that need compaction
+        collectSegmentStatistics(remainingSegments, dataSourceName, candidates);
+      } else {
+        // Collect statistic for segments that does not need compaction
+        collectSegmentStatistics(processedSegments, dataSourceName, candidates);
+      }
+    }
+  }
 
-      if (!candidates.isEmpty()) {
-        final boolean isCompactibleSize = candidates.getTotalSize() <= inputSegmentSize;
-        final boolean needsCompaction = needsCompaction(
-            ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()),
-            candidates
-        );
+  /**
+   * This method encapsulates the logic for checking if a given {@param candidates} needs compaction or not.
+   * If {@param logCannotCompactReason} is true then the reason for {@param candidates} not needing compaction is
+   * logged (for the case that {@param candidates} does not needs compaction).
+   *
+   * @return true if the {@param candidates} needs compaction, false if the {@param candidates} does not needs compaction
+   */
+  private boolean isSegmentsNeedCompact(

Review comment:
       `doSegmentsNeedCompaction()`?

##########
File path: server/src/main/java/org/apache/druid/server/coordinator/AutoCompactionSnapshot.java
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+import java.util.Objects;
+
+public class AutoCompactionSnapshot
+{
+  public enum AutoCompactionScheduleStatus
+  {
+    NOT_ENABLED,
+    RUNNING
+  }
+
+  @JsonProperty
+  private String dataSource;
+  @JsonProperty
+  private AutoCompactionScheduleStatus scheduleStatus;
+  @JsonProperty
+  private String latestScheduledTaskId;
+  @JsonProperty
+  private long byteCountAwaitingCompaction;

Review comment:
       IIRC, it was `byte` not `bytes`. `bytes` sounds better to me too.

##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
##########
@@ -336,25 +353,72 @@ private boolean needsCompaction(ClientCompactionTaskQueryTuningConfig tuningConf
    * @return segments to compact
    */
   private SegmentsToCompact findSegmentsToCompact(
+      final String dataSourceName,
       final CompactibleTimelineObjectHolderCursor compactibleTimelineObjectHolderCursor,
       final DataSourceCompactionConfig config
   )
   {
-    final long inputSegmentSize = config.getInputSegmentSizeBytes();
+    while (compactibleTimelineObjectHolderCursor.hasNext()) {
+      final SegmentsToCompact candidates = new SegmentsToCompact(compactibleTimelineObjectHolderCursor.next());
+      if (isSegmentsNeedCompact(candidates, config, true)) {
+        return candidates;
+      } else {
+        collectSegmentStatistics(processedSegments, dataSourceName, candidates);
+      }
+    }
+    log.info("All segments look good! Nothing to compact");
+    return new SegmentsToCompact();
+  }
 
+  /**
+   * Progressively iterates all remaining time intervals (latest first) in the
+   * timeline {@param compactibleTimelineObjectHolderCursor}. Note that the timeline lookup duration is one day.
+   * The logic for checking if the segments can be compacted or not is then perform on each iteration.
+   * This is repeated until no remaining time intervals in {@param compactibleTimelineObjectHolderCursor}.
+   */
+  private void iterateAllSegments(
+      final String dataSourceName,
+      final CompactibleTimelineObjectHolderCursor compactibleTimelineObjectHolderCursor,
+      final DataSourceCompactionConfig config
+  )
+  {
     while (compactibleTimelineObjectHolderCursor.hasNext()) {
       final SegmentsToCompact candidates = new SegmentsToCompact(compactibleTimelineObjectHolderCursor.next());
+      if (isSegmentsNeedCompact(candidates, config, false)) {
+        // Collect statistic for segments that need compaction
+        collectSegmentStatistics(remainingSegments, dataSourceName, candidates);
+      } else {
+        // Collect statistic for segments that does not need compaction
+        collectSegmentStatistics(processedSegments, dataSourceName, candidates);

Review comment:
       Should we distinguish segments processed and segments skipped? Information about segments skipped will be useful that you can be aware of how many segments (or intervals) that auto compaction has skipped.

##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
##########
@@ -61,7 +70,7 @@
   private final CompactionSegmentSearchPolicy policy;
   private final IndexingServiceClient indexingServiceClient;
 
-  private Object2LongOpenHashMap<String> totalSizesOfSegmentsAwaitingCompactionPerDataSource;
+  private HashMap<String, AutoCompactionSnapshot> autoCompactionSnapshotPerDataSource = new HashMap<>();

Review comment:
       When you fix this concurrency issue, please add enough description about what concurrency issue exists here and how it is handled. It would be also useful to check out our [concurrency checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md).

##########
File path: server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
##########
@@ -270,12 +275,246 @@ public String get()
     assertLastSegmentNotCompacted(compactSegments);
   }
 
+  @Test
+  public void testMakeStats()

Review comment:
       Hmm this test looks similar to `testRun()`. Can they be merged by moving the snapshot verification to `assertCompactSegments()` (or merging `assertCompactSegmentStatistics` and `assertCompactSegments`)?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org