You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2019/02/19 21:11:04 UTC
[incubator-druid] branch master updated: bugfix: when building
materialized-view, if taskCount>1,
may cause concurrentModificationException (#6690)
This is an automated email from the ASF dual-hosted git repository.
jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 7d1e8f3 bugfix: when building materialized-view, if taskCount>1, may cause concurrentModificationException (#6690)
7d1e8f3 is described below
commit 7d1e8f353eb768a1170b0faf9405c48fbde69178
Author: Fangyuan Deng <98...@qq.com>
AuthorDate: Wed Feb 20 05:10:55 2019 +0800
bugfix: when building materialized-view, if taskCount>1, may cause concurrentModificationException (#6690)
* bugfix: when building materialized-view, if taskCount >1, may cause ConcurrentModificationException
* remove entry after iteration instead of using ConcurrentMap, and add unit test
* small change
* modify unit test for coverage
* remove unused method
---
.../MaterializedViewSupervisor.java | 15 +++-
.../MaterializedViewSupervisorTest.java | 83 ++++++++++++++++++++++
2 files changed, 96 insertions(+), 2 deletions(-)
diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
index 1ccd812..105afdf 100644
--- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
+++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
@@ -270,13 +270,18 @@ public class MaterializedViewSupervisor implements Supervisor
void checkSegmentsAndSubmitTasks()
{
synchronized (taskLock) {
+ List<Interval> intervalsToRemove = new ArrayList<>();
for (Map.Entry<Interval, HadoopIndexTask> entry : runningTasks.entrySet()) {
Optional<TaskStatus> taskStatus = taskStorage.getStatus(entry.getValue().getId());
if (!taskStatus.isPresent() || !taskStatus.get().isRunnable()) {
- runningTasks.remove(entry.getKey());
- runningVersion.remove(entry.getKey());
+ intervalsToRemove.add(entry.getKey());
}
}
+ for (Interval interval : intervalsToRemove) {
+ runningTasks.remove(interval);
+ runningVersion.remove(interval);
+ }
+
if (runningTasks.size() == maxTaskCount) {
//if the number of running tasks reach the max task count, supervisor won't submit new tasks.
return;
@@ -288,6 +293,12 @@ public class MaterializedViewSupervisor implements Supervisor
submitTasks(sortedToBuildVersion, baseSegments);
}
}
+
+ @VisibleForTesting
+ Pair<Map<Interval, HadoopIndexTask>, Map<Interval, String>> getRunningTasks()
+ {
+ return new Pair<>(runningTasks, runningVersion);
+ }
/**
* Find infomation about the intervals in which derived dataSource data should be rebuilt.
diff --git a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
index 7b575f0..1bf1c39 100644
--- a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
+++ b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
@@ -27,7 +27,11 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.indexer.HadoopIOConfig;
+import org.apache.druid.indexer.HadoopIngestionSpec;
import org.apache.druid.indexer.HadoopTuningConfig;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.task.HadoopIndexTask;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskQueue;
@@ -41,7 +45,9 @@ import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
+import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
@@ -176,6 +182,83 @@ public class MaterializedViewSupervisorTest
Assert.assertEquals(expectedSegments, toBuildInterval.rhs);
}
+ @Test
+ public void testCheckSegmentsAndSubmitTasks() throws IOException
+ {
+ Set<DataSegment> baseSegments = Sets.newHashSet(
+ new DataSegment(
+ "base",
+ Intervals.of("2015-01-02T00Z/2015-01-03T00Z"),
+ "2015-01-03",
+ ImmutableMap.of(),
+ ImmutableList.of("dim1", "dim2"),
+ ImmutableList.of("m1"),
+ new HashBasedNumberedShardSpec(0, 1, null, null),
+ 9,
+ 1024
+ )
+ );
+ indexerMetadataStorageCoordinator.announceHistoricalSegments(baseSegments);
+ expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+ expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
+ expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes();
+ expect(taskStorage.getStatus("test_task1")).andReturn(Optional.of(TaskStatus.failure("test_task1"))).anyTimes();
+ expect(taskStorage.getStatus("test_task2")).andReturn(Optional.of(TaskStatus.running("test_task2"))).anyTimes();
+ EasyMock.replay(taskStorage);
+
+ Pair<Map<Interval, HadoopIndexTask>, Map<Interval, String>> runningTasksPair = supervisor.getRunningTasks();
+ Map<Interval, HadoopIndexTask> runningTasks = runningTasksPair.lhs;
+ Map<Interval, String> runningVersion = runningTasksPair.rhs;
+
+ DataSchema dataSchema = new DataSchema(
+ "test_datasource",
+ null,
+ null,
+ null,
+ TransformSpec.NONE,
+ objectMapper
+ );
+ HadoopIOConfig hadoopIOConfig = new HadoopIOConfig(new HashMap<>(), null, null);
+ HadoopIngestionSpec spec = new HadoopIngestionSpec(dataSchema, hadoopIOConfig, null);
+ HadoopIndexTask task1 = new HadoopIndexTask(
+ "test_task1",
+ spec,
+ null,
+ null,
+ null,
+ objectMapper,
+ null,
+ null,
+ null
+ );
+ runningTasks.put(Intervals.of("2015-01-01T00Z/2015-01-02T00Z"), task1);
+ runningVersion.put(Intervals.of("2015-01-01T00Z/2015-01-02T00Z"), "test_version1");
+
+ HadoopIndexTask task2 = new HadoopIndexTask(
+ "test_task2",
+ spec,
+ null,
+ null,
+ null,
+ objectMapper,
+ null,
+ null,
+ null
+ );
+ runningTasks.put(Intervals.of("2015-01-02T00Z/2015-01-03T00Z"), task2);
+ runningVersion.put(Intervals.of("2015-01-02T00Z/2015-01-03T00Z"), "test_version2");
+
+ supervisor.checkSegmentsAndSubmitTasks();
+
+ Map<Interval, HadoopIndexTask> expectedRunningTasks = new HashMap<>();
+ Map<Interval, String> expectedRunningVersion = new HashMap<>();
+ expectedRunningTasks.put(Intervals.of("2015-01-02T00Z/2015-01-03T00Z"), task2);
+ expectedRunningVersion.put(Intervals.of("2015-01-02T00Z/2015-01-03T00Z"), "test_version2");
+
+ Assert.assertEquals(expectedRunningTasks, runningTasks);
+ Assert.assertEquals(expectedRunningVersion, runningVersion);
+
+ }
@Test
public void testSuspendedDoesntRun()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org