You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ma...@apache.org on 2021/07/13 02:48:33 UTC
[druid] branch master updated: compaction/status API retains status
for datasources that no longer existed causing in-memory used to grow
unbounded (#11426)
This is an automated email from the ASF dual-hosted git repository.
maytasm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 05d5dd9 compaction/status API retains status for datasources that no longer existed causing in-memory used to grow unbounded (#11426)
05d5dd9 is described below
commit 05d5dd9289c03c62bab8f5a8caf80804382e4829
Author: Maytas Monsereenusorn <ma...@apache.org>
AuthorDate: Tue Jul 13 09:48:06 2021 +0700
compaction/status API retains status for datasources that no longer existed causing in-memory used to grow unbounded (#11426)
* compaction/status API retains status for datasources that no longer existed causing in-memory used to grow unbounded
* compaction/status API retains status for datasources that no longer existed causing in-memory used to grow unbounded
* compaction/status API retains status for datasources that no longer existed causing in-memory used to grow unbounded
* fix test
* fix test
---
docs/operations/api-reference.md | 2 +-
.../clients/CompactionResourceTestClient.java | 4 +-
.../coordinator/duty/ITAutoCompactionTest.java | 16 ++-----
.../server/coordinator/duty/CompactSegments.java | 51 +---------------------
.../druid/server/http/CompactionResource.java | 4 +-
.../coordinator/duty/CompactSegmentsTest.java | 25 +++++++----
.../druid/server/http/CompactionResourceTest.java | 2 +-
7 files changed, 28 insertions(+), 76 deletions(-)
diff --git a/docs/operations/api-reference.md b/docs/operations/api-reference.md
index 53ebe36..4998bb1 100644
--- a/docs/operations/api-reference.md
+++ b/docs/operations/api-reference.md
@@ -468,7 +468,7 @@ This is only valid for dataSource which has compaction enabled.
* `/druid/coordinator/v1/compaction/status`
-Returns the status and statistics from the latest auto compaction run of all dataSources which have/had auto compaction enabled.
+Returns the status and statistics from the auto compaction run of all dataSources which have auto compaction enabled in the latest run.
The response payload includes a list of `latestStatus` objects. Each `latestStatus` represents the status for a dataSource (which has/had auto compaction enabled).
The `latestStatus` object has the following keys:
* `dataSource`: name of the datasource for this status information
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java
index c1e9ae4..2fd1a5e 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java
@@ -183,7 +183,9 @@ public class CompactionResourceTestClient
StatusResponseHolder response = httpClient.go(
new Request(HttpMethod.GET, new URL(url)), responseHandler
).get();
- if (!response.getStatus().equals(HttpResponseStatus.OK)) {
+ if (response.getStatus().equals(HttpResponseStatus.NOT_FOUND)) {
+ return null;
+ } else if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE(
"Error while getting compaction status status[%s] content[%s]",
response.getStatus(),
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
index 05593c4..54d2971 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
@@ -210,7 +210,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
forceTriggerAutoCompaction(4);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(0, null);
-
+ // Auto compaction stats should be deleted as compacation config was deleted
+ Assert.assertNull(compactionResource.getCompactionStatus(fullDatasourceName));
checkCompactionIntervals(intervalsBeforeCompaction);
}
}
@@ -234,18 +235,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(0, null);
checkCompactionIntervals(intervalsBeforeCompaction);
- getAndAssertCompactionStatus(
- fullDatasourceName,
- AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
- 0,
- 0,
- 0,
- 0,
- 0,
- 0,
- 0,
- 0,
- 0);
+ Assert.assertNull(compactionResource.getCompactionStatus(fullDatasourceName));
// Update compaction slots to be 1
updateCompactionTaskSlot(1, 1);
// One day compacted (1 new segment) and one day remains uncompacted. (3 total)
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index e48731b..a9ba0d5 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -21,7 +21,6 @@ package org.apache.druid.server.coordinator.duty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Maps;
import com.google.inject.Inject;
import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
@@ -213,11 +212,11 @@ public class CompactSegments implements CoordinatorDuty
}
} else {
LOG.info("compactionConfig is empty. Skip.");
- updateAutoCompactionSnapshotWhenNoCompactTaskScheduled(currentRunAutoCompactionSnapshotBuilders);
+ autoCompactionSnapshotPerDataSource.set(new HashMap<>());
}
} else {
LOG.info("maxCompactionTaskSlots was set to 0. Skip compaction");
- updateAutoCompactionSnapshotWhenNoCompactTaskScheduled(currentRunAutoCompactionSnapshotBuilders);
+ autoCompactionSnapshotPerDataSource.set(new HashMap<>());
}
return params.buildFromExisting()
@@ -312,16 +311,6 @@ public class CompactSegments implements CoordinatorDuty
Set<String> enabledDatasources = compactionConfigList.stream()
.map(dataSourceCompactionConfig -> dataSourceCompactionConfig.getDataSource())
.collect(Collectors.toSet());
- // Update AutoCompactionScheduleStatus for dataSource that now has auto compaction disabled
- for (Map.Entry<String, AutoCompactionSnapshot> snapshot : autoCompactionSnapshotPerDataSource.get().entrySet()) {
- if (!enabledDatasources.contains(snapshot.getKey())) {
- currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(
- snapshot.getKey(),
- k -> new AutoCompactionSnapshot.Builder(k, AutoCompactionSnapshot.AutoCompactionScheduleStatus.NOT_ENABLED)
- );
- }
- }
-
// Create and Update snapshot for dataSource that has auto compaction enabled
for (String compactionConfigDataSource : enabledDatasources) {
currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(
@@ -420,42 +409,6 @@ public class CompactSegments implements CoordinatorDuty
return newContext;
}
- /**
- * This method can be use to atomically update the snapshots in {@code autoCompactionSnapshotPerDataSource} when
- * no compaction task is schedule in this run. Currently, this method does not update compaction statistics
- * (bytes, interval count, segment count, etc) since we skip iterating through the segments and cannot get an update
- * on those statistics. Thus, this method only updates the schedule status and task list (compaction statistics
- * remains the same as the previous snapshot).
- */
- private void updateAutoCompactionSnapshotWhenNoCompactTaskScheduled(
- Map<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders
- )
- {
- Map<String, AutoCompactionSnapshot> previousSnapshots = autoCompactionSnapshotPerDataSource.get();
- for (Map.Entry<String, AutoCompactionSnapshot.Builder> autoCompactionSnapshotBuilderEntry : currentRunAutoCompactionSnapshotBuilders.entrySet()) {
- final String dataSource = autoCompactionSnapshotBuilderEntry.getKey();
- AutoCompactionSnapshot previousSnapshot = previousSnapshots.get(dataSource);
- if (previousSnapshot != null) {
- autoCompactionSnapshotBuilderEntry.getValue().incrementBytesAwaitingCompaction(previousSnapshot.getBytesAwaitingCompaction());
- autoCompactionSnapshotBuilderEntry.getValue().incrementBytesCompacted(previousSnapshot.getBytesCompacted());
- autoCompactionSnapshotBuilderEntry.getValue().incrementBytesSkipped(previousSnapshot.getBytesSkipped());
- autoCompactionSnapshotBuilderEntry.getValue().incrementSegmentCountAwaitingCompaction(previousSnapshot.getSegmentCountAwaitingCompaction());
- autoCompactionSnapshotBuilderEntry.getValue().incrementSegmentCountCompacted(previousSnapshot.getSegmentCountCompacted());
- autoCompactionSnapshotBuilderEntry.getValue().incrementSegmentCountSkipped(previousSnapshot.getSegmentCountSkipped());
- autoCompactionSnapshotBuilderEntry.getValue().incrementIntervalCountAwaitingCompaction(previousSnapshot.getIntervalCountAwaitingCompaction());
- autoCompactionSnapshotBuilderEntry.getValue().incrementIntervalCountCompacted(previousSnapshot.getIntervalCountCompacted());
- autoCompactionSnapshotBuilderEntry.getValue().incrementIntervalCountSkipped(previousSnapshot.getIntervalCountSkipped());
- }
- }
-
- Map<String, AutoCompactionSnapshot> currentAutoCompactionSnapshotPerDataSource = Maps.transformValues(
- currentRunAutoCompactionSnapshotBuilders,
- AutoCompactionSnapshot.Builder::build
- );
- // Atomic update of autoCompactionSnapshotPerDataSource with the latest from this coordinator run
- autoCompactionSnapshotPerDataSource.set(currentAutoCompactionSnapshotPerDataSource);
- }
-
private CoordinatorStats makeStats(
Map<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders,
int numCompactionTasks,
diff --git a/server/src/main/java/org/apache/druid/server/http/CompactionResource.java b/server/src/main/java/org/apache/druid/server/http/CompactionResource.java
index 8c99514..e88d0cd 100644
--- a/server/src/main/java/org/apache/druid/server/http/CompactionResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/CompactionResource.java
@@ -74,7 +74,7 @@ public class CompactionResource
{
final Long notCompactedSegmentSizeBytes = coordinator.getTotalSizeOfSegmentsAwaitingCompaction(dataSource);
if (notCompactedSegmentSizeBytes == null) {
- return Response.status(Response.Status.BAD_REQUEST).entity(ImmutableMap.of("error", "unknown dataSource")).build();
+ return Response.status(Response.Status.NOT_FOUND).entity(ImmutableMap.of("error", "unknown dataSource")).build();
} else {
return Response.ok(ImmutableMap.of("remainingSegmentSize", notCompactedSegmentSizeBytes)).build();
}
@@ -94,7 +94,7 @@ public class CompactionResource
} else {
AutoCompactionSnapshot autoCompactionSnapshot = coordinator.getAutoCompactionSnapshotForDataSource(dataSource);
if (autoCompactionSnapshot == null) {
- return Response.status(Response.Status.BAD_REQUEST).entity(ImmutableMap.of("error", "unknown dataSource")).build();
+ return Response.status(Response.Status.NOT_FOUND).entity(ImmutableMap.of("error", "unknown dataSource")).build();
}
snapshots = ImmutableList.of(autoCompactionSnapshot);
}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
index a1932dd..635b2fa 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
@@ -340,17 +340,15 @@ public class CompactSegmentsTest
);
}
- // Run auto compaction without any dataSource in the compaction config
- // Should still populate the result of everything fully compacted
- doCompactSegments(compactSegments, new ArrayList<>());
- Assert.assertEquals(
- 0,
- stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT)
- );
- for (int i = 0; i < 3; i++) {
+ // Test run auto compaction with one datasource auto compaction disabled
+ // Snapshot should not contain datasource with auto compaction disabled
+ List<DataSourceCompactionConfig> removedOneConfig = createCompactionConfigs();
+ removedOneConfig.remove(0);
+ doCompactSegments(compactSegments, removedOneConfig);
+ for (int i = 1; i < 3; i++) {
verifySnapshot(
compactSegments,
- AutoCompactionSnapshot.AutoCompactionScheduleStatus.NOT_ENABLED,
+ AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
DATA_SOURCE_PREFIX + i,
0,
TOTAL_BYTE_PER_DATASOURCE,
@@ -364,6 +362,15 @@ public class CompactSegmentsTest
);
}
+ // Run auto compaction without any dataSource in the compaction config
+ // Snapshot should be empty
+ doCompactSegments(compactSegments, new ArrayList<>());
+ Assert.assertEquals(
+ 0,
+ stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT)
+ );
+ Assert.assertTrue(compactSegments.getAutoCompactionSnapshot().isEmpty());
+
assertLastSegmentNotCompacted(compactSegments);
}
diff --git a/server/src/test/java/org/apache/druid/server/http/CompactionResourceTest.java b/server/src/test/java/org/apache/druid/server/http/CompactionResourceTest.java
index 9e7cfd6..fe16d2f 100644
--- a/server/src/test/java/org/apache/druid/server/http/CompactionResourceTest.java
+++ b/server/src/test/java/org/apache/druid/server/http/CompactionResourceTest.java
@@ -117,6 +117,6 @@ public class CompactionResourceTest
EasyMock.replay(mock);
final Response response = new CompactionResource(mock).getCompactionSnapshotForDataSource(dataSourceName);
- Assert.assertEquals(400, response.getStatus());
+ Assert.assertEquals(404, response.getStatus());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org