You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ma...@apache.org on 2021/07/13 02:48:33 UTC

[druid] branch master updated: compaction/status API retains status for datasources that no longer existed causing in-memory used to grow unbounded (#11426)

This is an automated email from the ASF dual-hosted git repository.

maytasm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 05d5dd9  compaction/status API retains status for datasources that no longer existed causing in-memory used to grow unbounded (#11426)
05d5dd9 is described below

commit 05d5dd9289c03c62bab8f5a8caf80804382e4829
Author: Maytas Monsereenusorn <ma...@apache.org>
AuthorDate: Tue Jul 13 09:48:06 2021 +0700

    compaction/status API retains status for datasources that no longer existed causing in-memory used to grow unbounded (#11426)
    
    * compaction/status API retains status for datasources that no longer existed causing in-memory used to grow unbounded
    
    * compaction/status API retains status for datasources that no longer existed causing in-memory used to grow unbounded
    
    * compaction/status API retains status for datasources that no longer existed causing in-memory used to grow unbounded
    
    * fix test
    
    * fix test
---
 docs/operations/api-reference.md                   |  2 +-
 .../clients/CompactionResourceTestClient.java      |  4 +-
 .../coordinator/duty/ITAutoCompactionTest.java     | 16 ++-----
 .../server/coordinator/duty/CompactSegments.java   | 51 +---------------------
 .../druid/server/http/CompactionResource.java      |  4 +-
 .../coordinator/duty/CompactSegmentsTest.java      | 25 +++++++----
 .../druid/server/http/CompactionResourceTest.java  |  2 +-
 7 files changed, 28 insertions(+), 76 deletions(-)

diff --git a/docs/operations/api-reference.md b/docs/operations/api-reference.md
index 53ebe36..4998bb1 100644
--- a/docs/operations/api-reference.md
+++ b/docs/operations/api-reference.md
@@ -468,7 +468,7 @@ This is only valid for dataSource which has compaction enabled.
 
 * `/druid/coordinator/v1/compaction/status`
 
-Returns the status and statistics from the latest auto compaction run of all dataSources which have/had auto compaction enabled.
+Returns the status and statistics from the auto compaction run of all dataSources which have auto compaction enabled in the latest run.
 The response payload includes a list of `latestStatus` objects. Each `latestStatus` represents the status for a dataSource (which has/had auto compaction enabled). 
 The `latestStatus` object has the following keys:
 * `dataSource`: name of the datasource for this status information
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java
index c1e9ae4..2fd1a5e 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java
@@ -183,7 +183,9 @@ public class CompactionResourceTestClient
     StatusResponseHolder response = httpClient.go(
         new Request(HttpMethod.GET, new URL(url)), responseHandler
     ).get();
-    if (!response.getStatus().equals(HttpResponseStatus.OK)) {
+    if (response.getStatus().equals(HttpResponseStatus.NOT_FOUND)) {
+      return null;
+    } else if (!response.getStatus().equals(HttpResponseStatus.OK)) {
       throw new ISE(
           "Error while getting compaction status status[%s] content[%s]",
           response.getStatus(),
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
index 05593c4..54d2971 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
@@ -210,7 +210,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
       forceTriggerAutoCompaction(4);
       verifyQuery(INDEX_QUERIES_RESOURCE);
       verifySegmentsCompacted(0, null);
-
+      // Auto compaction stats should be deleted as compacation config was deleted
+      Assert.assertNull(compactionResource.getCompactionStatus(fullDatasourceName));
       checkCompactionIntervals(intervalsBeforeCompaction);
     }
   }
@@ -234,18 +235,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
       verifyQuery(INDEX_QUERIES_RESOURCE);
       verifySegmentsCompacted(0, null);
       checkCompactionIntervals(intervalsBeforeCompaction);
-      getAndAssertCompactionStatus(
-          fullDatasourceName,
-          AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
-          0,
-          0,
-          0,
-          0,
-          0,
-          0,
-          0,
-          0,
-          0);
+      Assert.assertNull(compactionResource.getCompactionStatus(fullDatasourceName));
       // Update compaction slots to be 1
       updateCompactionTaskSlot(1, 1);
       // One day compacted (1 new segment) and one day remains uncompacted. (3 total)
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index e48731b..a9ba0d5 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -21,7 +21,6 @@ package org.apache.druid.server.coordinator.duty;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Maps;
 import com.google.inject.Inject;
 import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
 import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
@@ -213,11 +212,11 @@ public class CompactSegments implements CoordinatorDuty
         }
       } else {
         LOG.info("compactionConfig is empty. Skip.");
-        updateAutoCompactionSnapshotWhenNoCompactTaskScheduled(currentRunAutoCompactionSnapshotBuilders);
+        autoCompactionSnapshotPerDataSource.set(new HashMap<>());
       }
     } else {
       LOG.info("maxCompactionTaskSlots was set to 0. Skip compaction");
-      updateAutoCompactionSnapshotWhenNoCompactTaskScheduled(currentRunAutoCompactionSnapshotBuilders);
+      autoCompactionSnapshotPerDataSource.set(new HashMap<>());
     }
 
     return params.buildFromExisting()
@@ -312,16 +311,6 @@ public class CompactSegments implements CoordinatorDuty
     Set<String> enabledDatasources = compactionConfigList.stream()
                                                          .map(dataSourceCompactionConfig -> dataSourceCompactionConfig.getDataSource())
                                                          .collect(Collectors.toSet());
-    // Update AutoCompactionScheduleStatus for dataSource that now has auto compaction disabled
-    for (Map.Entry<String, AutoCompactionSnapshot> snapshot : autoCompactionSnapshotPerDataSource.get().entrySet()) {
-      if (!enabledDatasources.contains(snapshot.getKey())) {
-        currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(
-            snapshot.getKey(),
-            k -> new AutoCompactionSnapshot.Builder(k, AutoCompactionSnapshot.AutoCompactionScheduleStatus.NOT_ENABLED)
-        );
-      }
-    }
-
     // Create and Update snapshot for dataSource that has auto compaction enabled
     for (String compactionConfigDataSource : enabledDatasources) {
       currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(
@@ -420,42 +409,6 @@ public class CompactSegments implements CoordinatorDuty
     return newContext;
   }
 
-  /**
-   * This method can be use to atomically update the snapshots in {@code autoCompactionSnapshotPerDataSource} when
-   * no compaction task is schedule in this run. Currently, this method does not update compaction statistics
-   * (bytes, interval count, segment count, etc) since we skip iterating through the segments and cannot get an update
-   * on those statistics. Thus, this method only updates the schedule status and task list (compaction statistics
-   * remains the same as the previous snapshot).
-   */
-  private void updateAutoCompactionSnapshotWhenNoCompactTaskScheduled(
-      Map<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders
-  )
-  {
-    Map<String, AutoCompactionSnapshot> previousSnapshots = autoCompactionSnapshotPerDataSource.get();
-    for (Map.Entry<String, AutoCompactionSnapshot.Builder> autoCompactionSnapshotBuilderEntry : currentRunAutoCompactionSnapshotBuilders.entrySet()) {
-      final String dataSource = autoCompactionSnapshotBuilderEntry.getKey();
-      AutoCompactionSnapshot previousSnapshot = previousSnapshots.get(dataSource);
-      if (previousSnapshot != null) {
-        autoCompactionSnapshotBuilderEntry.getValue().incrementBytesAwaitingCompaction(previousSnapshot.getBytesAwaitingCompaction());
-        autoCompactionSnapshotBuilderEntry.getValue().incrementBytesCompacted(previousSnapshot.getBytesCompacted());
-        autoCompactionSnapshotBuilderEntry.getValue().incrementBytesSkipped(previousSnapshot.getBytesSkipped());
-        autoCompactionSnapshotBuilderEntry.getValue().incrementSegmentCountAwaitingCompaction(previousSnapshot.getSegmentCountAwaitingCompaction());
-        autoCompactionSnapshotBuilderEntry.getValue().incrementSegmentCountCompacted(previousSnapshot.getSegmentCountCompacted());
-        autoCompactionSnapshotBuilderEntry.getValue().incrementSegmentCountSkipped(previousSnapshot.getSegmentCountSkipped());
-        autoCompactionSnapshotBuilderEntry.getValue().incrementIntervalCountAwaitingCompaction(previousSnapshot.getIntervalCountAwaitingCompaction());
-        autoCompactionSnapshotBuilderEntry.getValue().incrementIntervalCountCompacted(previousSnapshot.getIntervalCountCompacted());
-        autoCompactionSnapshotBuilderEntry.getValue().incrementIntervalCountSkipped(previousSnapshot.getIntervalCountSkipped());
-      }
-    }
-
-    Map<String, AutoCompactionSnapshot> currentAutoCompactionSnapshotPerDataSource = Maps.transformValues(
-        currentRunAutoCompactionSnapshotBuilders,
-        AutoCompactionSnapshot.Builder::build
-    );
-    // Atomic update of autoCompactionSnapshotPerDataSource with the latest from this coordinator run
-    autoCompactionSnapshotPerDataSource.set(currentAutoCompactionSnapshotPerDataSource);
-  }
-
   private CoordinatorStats makeStats(
       Map<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders,
       int numCompactionTasks,
diff --git a/server/src/main/java/org/apache/druid/server/http/CompactionResource.java b/server/src/main/java/org/apache/druid/server/http/CompactionResource.java
index 8c99514..e88d0cd 100644
--- a/server/src/main/java/org/apache/druid/server/http/CompactionResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/CompactionResource.java
@@ -74,7 +74,7 @@ public class CompactionResource
   {
     final Long notCompactedSegmentSizeBytes = coordinator.getTotalSizeOfSegmentsAwaitingCompaction(dataSource);
     if (notCompactedSegmentSizeBytes == null) {
-      return Response.status(Response.Status.BAD_REQUEST).entity(ImmutableMap.of("error", "unknown dataSource")).build();
+      return Response.status(Response.Status.NOT_FOUND).entity(ImmutableMap.of("error", "unknown dataSource")).build();
     } else {
       return Response.ok(ImmutableMap.of("remainingSegmentSize", notCompactedSegmentSizeBytes)).build();
     }
@@ -94,7 +94,7 @@ public class CompactionResource
     } else {
       AutoCompactionSnapshot autoCompactionSnapshot = coordinator.getAutoCompactionSnapshotForDataSource(dataSource);
       if (autoCompactionSnapshot == null) {
-        return Response.status(Response.Status.BAD_REQUEST).entity(ImmutableMap.of("error", "unknown dataSource")).build();
+        return Response.status(Response.Status.NOT_FOUND).entity(ImmutableMap.of("error", "unknown dataSource")).build();
       }
       snapshots = ImmutableList.of(autoCompactionSnapshot);
     }
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
index a1932dd..635b2fa 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
@@ -340,17 +340,15 @@ public class CompactSegmentsTest
       );
     }
 
-    // Run auto compaction without any dataSource in the compaction config
-    // Should still populate the result of everything fully compacted
-    doCompactSegments(compactSegments, new ArrayList<>());
-    Assert.assertEquals(
-        0,
-        stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT)
-    );
-    for (int i = 0; i < 3; i++) {
+    // Test run auto compaction with one datasource auto compaction disabled
+    // Snapshot should not contain datasource with auto compaction disabled
+    List<DataSourceCompactionConfig> removedOneConfig = createCompactionConfigs();
+    removedOneConfig.remove(0);
+    doCompactSegments(compactSegments, removedOneConfig);
+    for (int i = 1; i < 3; i++) {
       verifySnapshot(
           compactSegments,
-          AutoCompactionSnapshot.AutoCompactionScheduleStatus.NOT_ENABLED,
+          AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
           DATA_SOURCE_PREFIX + i,
           0,
           TOTAL_BYTE_PER_DATASOURCE,
@@ -364,6 +362,15 @@ public class CompactSegmentsTest
       );
     }
 
+    // Run auto compaction without any dataSource in the compaction config
+    // Snapshot should be empty
+    doCompactSegments(compactSegments, new ArrayList<>());
+    Assert.assertEquals(
+        0,
+        stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT)
+    );
+    Assert.assertTrue(compactSegments.getAutoCompactionSnapshot().isEmpty());
+
     assertLastSegmentNotCompacted(compactSegments);
   }
 
diff --git a/server/src/test/java/org/apache/druid/server/http/CompactionResourceTest.java b/server/src/test/java/org/apache/druid/server/http/CompactionResourceTest.java
index 9e7cfd6..fe16d2f 100644
--- a/server/src/test/java/org/apache/druid/server/http/CompactionResourceTest.java
+++ b/server/src/test/java/org/apache/druid/server/http/CompactionResourceTest.java
@@ -117,6 +117,6 @@ public class CompactionResourceTest
     EasyMock.replay(mock);
 
     final Response response = new CompactionResource(mock).getCompactionSnapshotForDataSource(dataSourceName);
-    Assert.assertEquals(400, response.getStatus());
+    Assert.assertEquals(404, response.getStatus());
   }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org