You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2022/09/12 07:53:39 UTC

[GitHub] [druid] kfaraz opened a new pull request, #13074: Add coordinator test framework

kfaraz opened a new pull request, #13074:
URL: https://github.com/apache/druid/pull/13074

   Fixes #12822
   
   ### Description
   This PR adds the changes required for the simulation framework discussed in the above proposal.
   The framework makes it easy to write tests that simulate coordinator behaviour under various
   conditions. Some of the things that are made possible by the changes are:
   - have any complicated cluster setup
   - easily create and assign a large number of segments of varying granularities, sizes, etc.
      to the servers
   - full control of the following during the course of the simulation
     - cluster: add, kill servers
     - segments: create, load, delete
     - load rules of a datasource
     - inventory view maintained by the coordinator: synchronize
     - metrics emitted by the coordinator
   - minimal mocking in the sim environment and no additional mocking for writing a new test
   
   
   
   ### Changes:
   The changes here are slightly different from what was proposed, mostly in terms of how each coordinator cycle is invoked.
   
   Main classes
   - Add `CoordinatorSimulation` and related interfaces to dictate behaviour of simulation
   - Add `CoordinatorSimulationImpl` as a concrete implementation
   - Add `CoordinatorSimulationBaseTest`, `SegmentBalancingTest`
   - Add `BlockingExecutorService`
   
   Provide mocked dependencies to the `DruidCoordinator` for:
   - JacksonConfigManager
   - LookupCoordinatorManager
   
   Provide test dependencies to the `DruidCoordinator` for:
   - `SegmentsMetadataManager`: keeps a list of used segments in memory
   - `HttpClient`: used for sending segment load requests to respective historicals. 
      The test impl here processes each request and updates the server state but not
      the inventory view maintained by the coordinator.
   - `MetadataRuleManager`: keeps rules in memory and allows for update during simulation
   - `ServerInventoryView`: allows synchronization with the current cluster state in the sim
   Some of these test dependencies can later be consolidated with existing utility classes.
   
   ### Pending changes in this PR:
   - Populate the remaining test cases in `SegmentBalancingTest` and `SegmentLoadingTest`
   - Each of these test cases represents a bullet discussed in #12881  
     and will initially be disabled until #12881 is fixed
   
   <hr>
   
   This PR has:
   - [ ] been self-reviewed.
      - [ ] using the [concurrency checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md) (Remove this item if the PR doesn't have any relation to concurrency.)
   - [ ] added documentation for new or modified features or behaviors.
   - [ ] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md)
   - [ ] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
   - [ ] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met.
   - [ ] added integration tests.
   - [ ] been tested in a test Druid cluster.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] AmatyaAvadhanula commented on a diff in pull request #13074: Add test framework to simulate segment loading and balancing

Posted by GitBox <gi...@apache.org>.
AmatyaAvadhanula commented on code in PR #13074:
URL: https://github.com/apache/druid/pull/13074#discussion_r985208607


##########
server/src/test/java/org/apache/druid/server/coordinator/simulate/TestServerInventoryView.java:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.simulate;
+
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.client.ServerInventoryView;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.coordination.DataSegmentChangeCallback;
+import org.apache.druid.server.coordination.DataSegmentChangeHandler;
+import org.apache.druid.timeline.DataSegment;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+
+public class TestServerInventoryView implements ServerInventoryView
+{
+  private static final Logger log = new Logger(TestServerInventoryView.class);
+
+  private final ConcurrentHashMap<String, DruidServer> servers = new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, DataSegmentChangeHandler> segmentChangeHandlers = new ConcurrentHashMap<>();
+
+  private final ConcurrentHashMap<SegmentCallback, Executor> segmentCallbacks = new ConcurrentHashMap<>();
+  private final List<ServerChangeHandler> serverChangeHandlers = new ArrayList<>();
+
+  public void setUp()
+  {
+    segmentCallbacks.forEach(
+        (segmentCallback, executor) ->
+            executor.execute(segmentCallback::segmentViewInitialized)
+    );
+  }
+
+  /**
+   * Synchronizes this inventory view with the given inventory view.
+   */
+  public void sync(ServerInventoryView other)
+  {
+    // Clear the current inventory
+    for (ServerChangeHandler handler : serverChangeHandlers) {
+      servers.values().forEach(handler::removeServer);
+    }
+    servers.clear();
+    segmentChangeHandlers.clear();
+
+    for (DruidServer server : other.getInventory()) {
+      addServer(new DruidServer(
+          server.getName(),
+          server.getHostAndPort(),
+          server.getHostAndTlsPort(),
+          server.getMaxSize(),
+          server.getType(),
+          server.getTier(),
+          server.getPriority()
+      ));
+      DataSegmentChangeHandler handler = getChangeHandlerForHost(server.getName());
+      for (DataSegment segment : server.iterateAllSegments()) {
+        handler.addSegment(segment, null);
+      }
+    }
+  }
+
+  public void addServer(DruidServer server)
+  {
+    servers.put(server.getName(), server);
+    segmentChangeHandlers.put(server.getName(), new SegmentChangeHandler(server));
+  }
+
+  public void removeServer(DruidServer server)
+  {
+    servers.remove(server.getName());
+    segmentChangeHandlers.remove(server.getName());
+
+    for (ServerChangeHandler handler : serverChangeHandlers) {
+      handler.removeServer(server);
+    }
+  }
+
+  public DataSegmentChangeHandler getChangeHandlerForHost(String serverName)
+  {
+    return segmentChangeHandlers.get(serverName);
+  }
+
+  @Nullable
+  @Override
+  public DruidServer getInventoryValue(String serverKey)
+  {
+    return servers.get(serverKey);
+  }
+
+  @Override
+  public Collection<DruidServer> getInventory()
+  {
+    return Collections.unmodifiableCollection(servers.values());
+  }
+
+  @Override
+  public boolean isStarted()
+  {
+    return true;
+  }
+
+  @Override
+  public boolean isSegmentLoadedByServer(String serverKey, DataSegment segment)
+  {
+    DruidServer server = servers.get(serverKey);
+    return server != null && server.getSegment(segment.getId()) != null;
+  }
+
+  @Override
+  public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback)
+  {
+    serverChangeHandlers.add(new ServerChangeHandler(callback, exec));
+  }
+
+  @Override
+  public void registerSegmentCallback(Executor exec, SegmentCallback callback)
+  {
+    segmentCallbacks.put(callback, exec);
+  }
+
+  private class SegmentChangeHandler implements DataSegmentChangeHandler
+  {
+    private final DruidServer server;
+
+    private SegmentChangeHandler(DruidServer server)
+    {
+      this.server = server;
+    }
+
+    @Override
+    public void addSegment(
+        DataSegment segment,
+        @Nullable DataSegmentChangeCallback callback
+    )
+    {
+      log.debug("Adding segment [%s] to server [%s]", segment.getId(), server.getName());
+
+      if (server.getMaxSize() - server.getCurrSize() >= segment.getSize()) {
+        server.addDataSegment(segment);
+        segmentCallbacks.forEach(
+            (segmentCallback, executor) -> executor.execute(
+                () -> segmentCallback.segmentAdded(server.getMetadata(), segment)
+            )
+        );
+      } else {
+        throw new ISE(
+            "Not enough free space on server %s. Segment size [%d]. Current free space [%d]",
+            server.getName(),
+            segment.getSize(),
+            server.getMaxSize() - server.getCurrSize()
+        );
+      }
+    }
+
+    @Override
+    public void removeSegment(
+        DataSegment segment,
+        @Nullable DataSegmentChangeCallback callback
+    )
+    {
+      log.debug("Removing segment [%s] from server [%s]", segment.getId(), server.getName());
+      server.removeDataSegment(segment.getId());
+      segmentCallbacks.forEach(
+          (segmentCallback, executor) -> executor.execute(
+              () -> segmentCallback.segmentAdded(server.getMetadata(), segment)

Review Comment:
   segmentAdded -> segmentDropped?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on pull request #13074: Add test framework to simulate segment loading and balancing

Posted by GitBox <gi...@apache.org>.
kfaraz commented on PR #13074:
URL: https://github.com/apache/druid/pull/13074#issuecomment-1251839641

   Thanks for the review, @cheddar ! Some of the changes you have suggested are planned for future PRs.
   I will include the smaller ones here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a diff in pull request #13074: Add test framework to simulate segment loading and balancing

Posted by GitBox <gi...@apache.org>.
kfaraz commented on code in PR #13074:
URL: https://github.com/apache/druid/pull/13074#discussion_r972570643


##########
server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.simulate;
+
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.emitter.core.Event;
+import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.server.coordinator.CreateDataSegments;
+import org.apache.druid.server.coordinator.rules.ForeverLoadRule;
+import org.apache.druid.server.coordinator.rules.Rule;
+import org.apache.druid.timeline.DataSegment;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Base test for coordinator simulations.
+ * <p>
+ * Each test must call {@link #startSimulation(CoordinatorSimulation)} to start
+ * the simulation. {@link CoordinatorSimulation#stop()} should not be called as
+ * the simulation is stopped when cleaning up after the test in {@link #tearDown()}.
+ * <p>
+ * Tests that verify balancing behaviour should set
+ * {@link CoordinatorDynamicConfig#useBatchedSegmentSampler()} to true.
+ * Otherwise, the segment sampling is random and can produce repeated values
+ * leading to flakiness in the tests. The simulation sets this field to true by
+ * default.
+ */
+public abstract class CoordinatorSimulationBaseTest

Review Comment:
   I did start off with the fixture pattern but later decided to have a base test as it helps avoid every statement beginning with `fixture.`, which begins to feel redundant if every lines has it.
   
   The tests that have already been added shouldn't be likely to implement other interfaces.
   New tests that have conflicting inheritances could always treat the same base test as a fixture, I guess.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on pull request #13074: Add test framework to simulate segment loading and balancing

Posted by GitBox <gi...@apache.org>.
kfaraz commented on PR #13074:
URL: https://github.com/apache/druid/pull/13074#issuecomment-1248869153

   Thanks a lot for the review, @paul-rogers !
   I will be sure to include a README to help other developers write more of these tests.
   
   Your understanding of the changes is correct.
   
   > We verify results by looking at the state of the simulated nodes.
   
   We also verify the state of the coordinator itself and the emitted metrics, as the `DruidCoordinator` is the primary entity under test (I will clarify these in the README).
   
   > It seems that the tests don't cover the dynamic aspects: load, the threads which decided when to fire off the control tasks in the coordinator, latencies, etc.
   
   - Yes, we do not verify latency of an operation.
   - The behaviour to actually load a segment would always be mocked (as it happens on a historical).
   Here, we would only want to control when the load happens and whether it succeeds or fails.
   - The simulation maintains a handle to all the executors used inside the coordinator. It can thus choose
   to invoke pending tasks of a certain executor at a certain step to recreate race conditions. For example,
   a sequence of steps could be to: run coordinator, load one segment from queue, sync inventory, load
   remaining segments from queue and verify the final state.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a diff in pull request #13074: Add test framework to simulate segment loading and balancing

Posted by GitBox <gi...@apache.org>.
kfaraz commented on code in PR #13074:
URL: https://github.com/apache/druid/pull/13074#discussion_r974884161


##########
server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentLoadingHttpClient.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.simulate;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.java.util.http.client.Request;
+import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
+import org.apache.druid.server.coordination.DataSegmentChangeCallback;
+import org.apache.druid.server.coordination.DataSegmentChangeHandler;
+import org.apache.druid.server.coordination.DataSegmentChangeRequest;
+import org.apache.druid.server.coordination.SegmentLoadDropHandler;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.jboss.netty.handler.codec.http.HttpVersion;
+import org.joda.time.Duration;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class TestSegmentLoadingHttpClient implements HttpClient
+{
+  private static final HttpResponseHandler.TrafficCop NOOP_TRAFFIC_COP = checkNum -> 0L;
+  private static final DataSegmentChangeCallback NOOP_CALLBACK = () -> {
+  };
+
+  private final ObjectMapper objectMapper;
+  private final Function<String, DataSegmentChangeHandler> hostToHandler;
+
+  private final ListeningScheduledExecutorService executorService;
+
+  public TestSegmentLoadingHttpClient(
+      ObjectMapper objectMapper,
+      Function<String, DataSegmentChangeHandler> hostToHandler,
+      ScheduledExecutorService executorService
+  )
+  {
+    this.objectMapper = objectMapper;
+    this.hostToHandler = hostToHandler;
+    this.executorService = MoreExecutors.listeningDecorator(executorService);
+  }
+
+  @Override
+  public <Intermediate, Final> ListenableFuture<Final> go(
+      Request request,
+      HttpResponseHandler<Intermediate, Final> handler
+  )
+  {
+    throw new UnsupportedOperationException();

Review Comment:
   I wonder if I shouldn't just allow this one as well. In the 3-arg call, I am not doing anything with the 3rd argument, durationTimeout anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a diff in pull request #13074: Add test framework to simulate segment loading and balancing

Posted by GitBox <gi...@apache.org>.
kfaraz commented on code in PR #13074:
URL: https://github.com/apache/druid/pull/13074#discussion_r974876899


##########
core/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java:
##########
@@ -37,7 +41,18 @@ public StubServiceEmitter(String service, String host)
   @Override
   public void emit(Event event)
   {
-    events.add(event);
+    if (event instanceof AlertEvent) {
+      final AlertEvent alertEvent = (AlertEvent) event;

Review Comment:
   That makes sense. I did have a dedicated list for alerts during my initial testing but later removed it as I wasn't using it in my tests. Will fix it up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a diff in pull request #13074: Add test framework to simulate segment loading and balancing

Posted by GitBox <gi...@apache.org>.
kfaraz commented on code in PR #13074:
URL: https://github.com/apache/druid/pull/13074#discussion_r974882232


##########
server/src/test/java/org/apache/druid/server/coordinator/simulate/README.md:
##########
@@ -0,0 +1,141 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+# Coordinator simulations
+
+The simulation framework allows developers to recreate arbitrary cluster setups and verify coordinator behaviour. Tests
+written using the framework can also help identify performance bottlenecks or potential bugs in the system and even
+compare different balancing strategies.
+
+As opposed to unit tests, simulations are meant to test the coordinator as a whole and verify the interactions of all
+the underlying parts. In that regard, these simulations resemble integration tests more closely.
+
+## Test targets
+
+The primary test target is the `DruidCoordinator` itself. The behaviour of the following entities can also be verified
+using simulations:
+
+- `LoadQueuePeon`, `LoadQueueTaskMaster`
+- All coordinator duties, e.g. `BalanceSegments`, `RunRules`
+- All retention rules
+
+## Capabilities
+
+The framework provides control over the following aspects of the setup:
+
+| Input | Details | Actions |
+|-------|---------|---------|
+|cluster | server name, type, tier, size | add a server, remove a server|
+|segment |datasource, interval, version, partition num, size | add/remove from server, mark used/unused, publish new segments|
+|rules | type (foreverLoad, drop, etc), replica count per tier | set rules for a datasource| 
+|configs |coordinator period, load queue type, load queue size, max segments to balance | set or update a config |
+
+The above actions can be performed at any point after building the simulation. So, you could even recreate scenarios
+where during a coordinator run, a server crashes or the retention rules of a datasource change, and verify the behaviour
+of the coordinator in these situations.
+
+## Design
+
+1. __Execution__: A tight dependency on time durations such as the period of a repeating task or the delay before a
+   scheduled task makes it difficult to reliably reproduce a test scenario. As a result, the tests become flaky. Thus,
+   all the executors required for coordinator operations have been allowed only two possible modes of execution:
+    - __immediate__: Execute tasks on the calling thread itself.
+    - __blocked__: Keep tasks in a queue until explicitly invoked.
+2. __Internal dependencies__: In order to allow realistic reproductions of the coordinator behaviour, none of the
+   internal parts of the coordinator have been mocked in the framework and new tests need not mock anything at all.
+3. __External dependencies__: Since these tests are meant to verify the behaviour of only the coordinator, the
+   interfaces to communicate with external dependencies have been provided as simple in-memory implementations:
+    - communication with metadata store: `SegmentMetadataManager`, `MetadataRuleManager`
+    - communication with historicals: `HttpClient`, `ServerInventoryView`
+4. __Inventory__: The coordinator maintains an inventory view of the cluster state. Simulations can choose from two
+   modes of inventory update - auto and manual. In auto update mode, any change made to the cluster is immediately
+   reflected in the inventory view. In manual update mode, the inventory must be explicitly synchronized with the
+   cluster state.
+
+## Limitations
+
+- The framework does not expose the coordinator HTTP endpoints.
+- It should not be used to verify the absolute values of execution latencies, e.g. the time taken to compute the
+  balancing cost of a segment. But the relative values can still be a good indicator while doing comparisons between,
+  say two balancing strategies.

Review Comment:
   Yeah, I think the language got a little ambiguous.
   
   I meant that we should not try to assert things like "once a segment is queued, it gets processed within 5 seconds" but we can assert things like "across 5 coordinator runs, cachingCost strategy does faster assignment than cost strategy".
   
   I hope that clarifies things a bit. Let me know if the comment should be rephrased.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] paul-rogers commented on pull request #13074: Add test framework to simulate segment loading and balancing

Posted by GitBox <gi...@apache.org>.
paul-rogers commented on PR #13074:
URL: https://github.com/apache/druid/pull/13074#issuecomment-1248790959

   This is great! One general suggestion: provide an overview, in a `README.md` file, of the general design of the framework. By this I mean, what is being tested, what is simulated, and how the tests work. What I think I learned from looking at the code is:
   
   * We simulate the cluster by responding to command set from the coordinator.
   * We wrap the algorithm part of the coordinator in a "fixture" that we invoke from tests.
   * We verify results by looking at the state of the simulated nodes.
   
   It seems that the tests don't cover the dynamic aspects: load, the threads which decided when to fire off the control tasks in the coordinator, latencies, etc. It is fine to omit these, they are another level of complexity we could add once the basics work. Still, would be good to state which bits of the code this framework targets.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a diff in pull request #13074: Add test framework to simulate segment loading and balancing

Posted by GitBox <gi...@apache.org>.
kfaraz commented on code in PR #13074:
URL: https://github.com/apache/druid/pull/13074#discussion_r972571438


##########
server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java:
##########
@@ -0,0 +1,549 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.simulate;
+
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.common.config.JacksonConfigManager;
+import org.apache.druid.curator.ZkEnablementConfig;
+import org.apache.druid.curator.discovery.ServiceAnnouncer;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.concurrent.DirectExecutorService;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
+import org.apache.druid.java.util.common.lifecycle.Lifecycle;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.core.Event;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.apache.druid.server.coordinator.BalancerStrategyFactory;
+import org.apache.druid.server.coordinator.CachingCostBalancerStrategyConfig;
+import org.apache.druid.server.coordinator.CachingCostBalancerStrategyFactory;
+import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.server.coordinator.CostBalancerStrategyFactory;
+import org.apache.druid.server.coordinator.DruidCoordinator;
+import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
+import org.apache.druid.server.coordinator.LoadQueueTaskMaster;
+import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig;
+import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups;
+import org.apache.druid.server.coordinator.rules.Rule;
+import org.apache.druid.server.lookup.cache.LookupCoordinatorManager;
+import org.apache.druid.timeline.DataSegment;
+import org.easymock.EasyMock;
+import org.joda.time.Duration;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Builder for {@link CoordinatorSimulation}.
+ */
+public class CoordinatorSimulationBuilder
+{
+  private static final long DEFAULT_COORDINATOR_PERIOD = 100L;
+  private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper()
+      .setInjectableValues(
+          new InjectableValues.Std().addValue(
+              DataSegment.PruneSpecsHolder.class,
+              DataSegment.PruneSpecsHolder.DEFAULT
+          )
+      );
+
+  private BalancerStrategyFactory balancerStrategyFactory;
+  private CoordinatorDynamicConfig dynamicConfig =
+      CoordinatorDynamicConfig.builder()
+                              .withUseBatchedSegmentSampler(true)
+                              .build();
+  private List<DruidServer> servers;
+  private List<DataSegment> segments;
+  private final Map<String, List<Rule>> datasourceRules = new HashMap<>();
+  private boolean loadImmediately = false;
+  private boolean autoSyncInventory = true;
+
+  /**
+   * Specifies the balancer strategy to be used.
+   * <p>
+   * Default: "cost" ({@link CostBalancerStrategyFactory})
+   */
+  public CoordinatorSimulationBuilder balancer(BalancerStrategyFactory strategyFactory)
+  {
+    this.balancerStrategyFactory = strategyFactory;
+    return this;
+  }
+
+  public CoordinatorSimulationBuilder servers(List<DruidServer> servers)
+  {
+    this.servers = servers;
+    return this;
+  }
+
+  public CoordinatorSimulationBuilder servers(DruidServer... servers)
+  {
+    return servers(Arrays.asList(servers));
+  }
+
+  public CoordinatorSimulationBuilder segments(List<DataSegment> segments)
+  {
+    this.segments = segments;
+    return this;
+  }
+
+  public CoordinatorSimulationBuilder rules(String datasource, Rule... rules)
+  {
+    this.datasourceRules.put(datasource, Arrays.asList(rules));
+    return this;
+  }
+
+  /**
+   * Specifies whether segments should be loaded as soon as they are queued.
+   * <p>
+   * Default: false
+   */
+  public CoordinatorSimulationBuilder loadSegmentsImmediately(boolean loadImmediately)
+  {
+    this.loadImmediately = loadImmediately;
+    return this;
+  }
+
+  /**
+   * Specifies whether the inventory view maintained by the coordinator
+   * should be auto-synced as soon as any change is made to the cluster.
+   * <p>
+   * Default: true
+   */
+  public CoordinatorSimulationBuilder autoSyncInventory(boolean autoSync)
+  {
+    this.autoSyncInventory = autoSync;
+    return this;
+  }
+
+  /**
+   * Specifies the CoordinatorDynamicConfig to be used in the simulation.
+   * <p>
+   * Default values: Specified in {@link CoordinatorDynamicConfig.Builder}.
+   */
+  public CoordinatorSimulationBuilder dynamicConfig(CoordinatorDynamicConfig dynamicConfig)
+  {
+    this.dynamicConfig = dynamicConfig;
+    return this;
+  }
+
+  public CoordinatorSimulation build()
+  {
+    Preconditions.checkArgument(
+        servers != null && !servers.isEmpty(),
+        "Cannot run simulation for an empty cluster"
+    );
+
+    // Prepare the environment
+    final TestServerInventoryView serverInventoryView = new TestServerInventoryView();
+    servers.forEach(serverInventoryView::addServer);
+
+    final TestSegmentsMetadataManager segmentManager = new TestSegmentsMetadataManager();
+    if (segments != null) {
+      segments.forEach(segmentManager::addSegment);
+    }
+
+    final TestMetadataRuleManager ruleManager = new TestMetadataRuleManager();
+    datasourceRules.forEach(
+        (datasource, rules) ->
+            ruleManager.overrideRule(datasource, rules, null)
+    );
+
+    final Environment env = new Environment(
+        serverInventoryView,
+        segmentManager,
+        ruleManager,
+        dynamicConfig,
+        loadImmediately,
+        autoSyncInventory
+    );
+
+    // Build the coordinator
+    final DruidCoordinator coordinator = new DruidCoordinator(
+        env.coordinatorConfig,
+        null,
+        env.jacksonConfigManager,
+        env.segmentManager,
+        env.coordinatorInventoryView,
+        env.ruleManager,
+        () -> null,
+        env.serviceEmitter,
+        env.executorFactory,
+        null,
+        env.loadQueueTaskMaster,
+        new ServiceAnnouncer.Noop(),
+        null,
+        Collections.emptySet(),
+        null,
+        new CoordinatorCustomDutyGroups(Collections.emptySet()),
+        balancerStrategyFactory != null ? balancerStrategyFactory
+                                        : new CostBalancerStrategyFactory(),
+        env.lookupCoordinatorManager,
+        env.leaderSelector,
+        OBJECT_MAPPER,
+        ZkEnablementConfig.ENABLED
+    );
+
+    return new SimulationImpl(coordinator, env);
+  }
+
+  private BalancerStrategyFactory buildCachingCostBalancerStrategy(Environment env)
+  {
+    try {
+      return new CachingCostBalancerStrategyFactory(
+          env.coordinatorInventoryView,
+          env.lifecycle,
+          new CachingCostBalancerStrategyConfig()
+      );
+    }
+    catch (Exception e) {
+      throw new ISE(e, "Error building balancer strategy");
+    }
+  }
+
+  /**
+   * Implementation of {@link CoordinatorSimulation}.
+   */
+  private static class SimulationImpl implements CoordinatorSimulation,
+      CoordinatorSimulation.CoordinatorState, CoordinatorSimulation.ClusterState
+  {
+    private final AtomicBoolean running = new AtomicBoolean(false);
+
+    private final Environment env;
+    private final DruidCoordinator coordinator;
+
+    private SimulationImpl(DruidCoordinator coordinator, Environment env)
+    {
+      this.env = env;
+      this.coordinator = coordinator;
+    }
+
+    @Override
+    public void start()
+    {
+      if (!running.compareAndSet(false, true)) {
+        throw new ISE("Simulation is already running");
+      }
+
+      try {
+        env.setUp();
+        coordinator.start();
+      }
+      catch (Exception e) {
+        throw new ISE(e, "Exception while running simulation");
+      }
+    }
+
+    @Override
+    public void stop()
+    {
+      coordinator.stop();
+      env.leaderSelector.stopBeingLeader();
+      env.tearDown();
+    }
+
+    @Override
+    public CoordinatorState coordinator()
+    {
+      return this;
+    }
+
+    @Override
+    public ClusterState cluster()
+    {
+      return this;
+    }
+
+    @Override
+    public void runCoordinatorCycle()
+    {
+      verifySimulationRunning();
+      env.serviceEmitter.flush();
+
+      // Invoke historical duties and metadata duties
+      env.executorFactory.coordinatorRunner.finishNextPendingTasks(2);
+    }
+
+    @Override
+    public void syncInventoryView()
+    {
+      verifySimulationRunning();
+      Preconditions.checkState(
+          !env.autoSyncInventory,
+          "Cannot invoke syncInventoryView as simulation is running in auto-sync mode."
+      );
+      env.coordinatorInventoryView.sync(env.historicalInventoryView);
+    }
+
+    @Override
+    public void setDynamicConfig(CoordinatorDynamicConfig dynamicConfig)
+    {
+      env.setDynamicConfig(dynamicConfig);
+    }
+
+    @Override
+    public DruidServer getInventoryView(String serverName)
+    {
+      return env.coordinatorInventoryView.getInventoryValue(serverName);
+    }
+
+    @Override
+    public void loadQueuedSegments()
+    {
+      verifySimulationRunning();
+      Preconditions.checkState(
+          !env.loadImmediately,
+          "Cannot invoke loadQueuedSegments as simulation is running in immediate loading mode."
+      );
+
+      final BlockingExecutorService loadQueueExecutor = env.executorFactory.loadQueueExecutor;
+      while (loadQueueExecutor.hasPendingTasks()) {
+        // Drain all the items from the load queue executor
+        // This sends at most 1 load/drop request to each server
+        loadQueueExecutor.finishAllPendingTasks();
+
+        // Load all the queued segments, handle their responses and execute callbacks
+        int loadedSegments = env.executorFactory.historicalLoader.finishAllPendingTasks();
+        loadQueueExecutor.finishNextPendingTasks(loadedSegments);
+
+        // TODO: sync should happen here?? or should it not??
+
+        env.executorFactory.loadCallbackExecutor.finishAllPendingTasks();
+      }
+    }
+
+    private void verifySimulationRunning()
+    {
+      if (!running.get()) {
+        throw new ISE("Simulation hasn't been started yet.");
+      }
+    }
+
+    @Override
+    public double getLoadPercentage(String datasource)
+    {
+      return coordinator.getLoadStatus().get(datasource);
+    }
+
+    @Override
+    public List<Event> getMetricEvents()
+    {
+      return new ArrayList<>(env.serviceEmitter.getEvents());
+    }
+  }
+
+  /**
+   * Environment for a coordinator simulation.
+   */
+  private static class Environment
+  {
+    private final Lifecycle lifecycle = new Lifecycle("coord-sim");
+
+    // Executors
+    private final ExecutorFactory executorFactory;
+
+    private final TestDruidLeaderSelector leaderSelector = new TestDruidLeaderSelector();
+    private final TestSegmentsMetadataManager segmentManager;
+    private final TestMetadataRuleManager ruleManager;
+    private final TestServerInventoryView historicalInventoryView;
+
+    private final LoadQueueTaskMaster loadQueueTaskMaster;
+    private final StubServiceEmitter serviceEmitter
+        = new StubServiceEmitter("coordinator", "coordinator");
+    private final TestServerInventoryView coordinatorInventoryView;
+
+    private final AtomicReference<CoordinatorDynamicConfig> dynamicConfig = new AtomicReference<>();
+    private final JacksonConfigManager jacksonConfigManager;
+    private final LookupCoordinatorManager lookupCoordinatorManager;
+    private final DruidCoordinatorConfig coordinatorConfig;
+    private final boolean loadImmediately;
+    private final boolean autoSyncInventory;
+
+    private final List<Object> mocks = new ArrayList<>();
+
+    private Environment(
+        TestServerInventoryView clusterInventory,
+        TestSegmentsMetadataManager segmentManager,
+        TestMetadataRuleManager ruleManager,
+        CoordinatorDynamicConfig dynamicConfig,
+        boolean loadImmediately,
+        boolean autoSyncInventory
+    )
+    {
+      this.historicalInventoryView = clusterInventory;
+      this.segmentManager = segmentManager;
+      this.ruleManager = ruleManager;
+      this.loadImmediately = loadImmediately;
+      this.autoSyncInventory = autoSyncInventory;
+
+      this.coordinatorConfig = new TestDruidCoordinatorConfig.Builder()
+          .withCoordinatorStartDelay(new Duration(1L))
+          .withCoordinatorPeriod(new Duration(DEFAULT_COORDINATOR_PERIOD))
+          .withCoordinatorKillPeriod(new Duration(DEFAULT_COORDINATOR_PERIOD))
+          .withLoadQueuePeonRepeatDelay(new Duration("PT0S"))
+          .withLoadQueuePeonType("http")
+          .withCoordinatorKillIgnoreDurationToRetain(false)
+          .build();
+
+      this.executorFactory = new ExecutorFactory(loadImmediately);
+      this.coordinatorInventoryView = autoSyncInventory
+                                      ? clusterInventory
+                                      : new TestServerInventoryView();
+      HttpClient httpClient = new TestSegmentLoadingHttpClient(
+          OBJECT_MAPPER,
+          clusterInventory::getChangeHandlerForHost,
+          executorFactory.create(1, ExecutorFactory.HISTORICAL_LOADER)
+      );
+
+      this.loadQueueTaskMaster = new LoadQueueTaskMaster(
+          null,
+          OBJECT_MAPPER,
+          executorFactory.create(1, ExecutorFactory.LOAD_QUEUE_EXECUTOR),
+          executorFactory.create(1, ExecutorFactory.LOAD_CALLBACK_EXECUTOR),
+          coordinatorConfig,
+          httpClient,
+          null
+      );
+
+      this.jacksonConfigManager = mockConfigManager();
+      setDynamicConfig(dynamicConfig);
+
+      this.lookupCoordinatorManager = EasyMock.createNiceMock(LookupCoordinatorManager.class);
+      mocks.add(jacksonConfigManager);
+      mocks.add(lookupCoordinatorManager);
+    }
+
+    private void setUp() throws Exception
+    {
+      EmittingLogger.registerEmitter(serviceEmitter);
+      historicalInventoryView.setUp();
+      coordinatorInventoryView.setUp();
+      lifecycle.start();
+      executorFactory.setUp();
+      leaderSelector.becomeLeader();
+      EasyMock.replay(mocks.toArray());
+    }
+
+    private void tearDown()
+    {
+      EasyMock.verify(mocks.toArray());
+      executorFactory.tearDown();
+      lifecycle.stop();
+    }
+
+    private void setDynamicConfig(CoordinatorDynamicConfig dynamicConfig)
+    {
+      this.dynamicConfig.set(dynamicConfig);
+    }
+
+    private JacksonConfigManager mockConfigManager()
+    {
+      final JacksonConfigManager jacksonConfigManager
+          = EasyMock.createMock(JacksonConfigManager.class);
+      EasyMock.expect(
+          jacksonConfigManager.watch(
+              EasyMock.eq(CoordinatorDynamicConfig.CONFIG_KEY),
+              EasyMock.eq(CoordinatorDynamicConfig.class),
+              EasyMock.anyObject()
+          )
+      ).andReturn(dynamicConfig).anyTimes();
+
+      EasyMock.expect(
+          jacksonConfigManager.watch(
+              EasyMock.eq(CoordinatorCompactionConfig.CONFIG_KEY),
+              EasyMock.eq(CoordinatorCompactionConfig.class),
+              EasyMock.anyObject()

Review Comment:
   > Suggestion: rather than mocking a config, add a constructor or builder to the config. A class that cannot be constructed (as with our config classes) is very tedious to use in tests. Mocking isn't the answer since, if there are methods that compute values, those methods also must be mocked.
   
   I completely agree, most of our configs have private fields, no setters and no builders.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a diff in pull request #13074: Add test framework to simulate segment loading and balancing

Posted by GitBox <gi...@apache.org>.
kfaraz commented on code in PR #13074:
URL: https://github.com/apache/druid/pull/13074#discussion_r972571438


##########
server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java:
##########
@@ -0,0 +1,549 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.simulate;
+
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.common.config.JacksonConfigManager;
+import org.apache.druid.curator.ZkEnablementConfig;
+import org.apache.druid.curator.discovery.ServiceAnnouncer;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.concurrent.DirectExecutorService;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
+import org.apache.druid.java.util.common.lifecycle.Lifecycle;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.core.Event;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.apache.druid.server.coordinator.BalancerStrategyFactory;
+import org.apache.druid.server.coordinator.CachingCostBalancerStrategyConfig;
+import org.apache.druid.server.coordinator.CachingCostBalancerStrategyFactory;
+import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.server.coordinator.CostBalancerStrategyFactory;
+import org.apache.druid.server.coordinator.DruidCoordinator;
+import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
+import org.apache.druid.server.coordinator.LoadQueueTaskMaster;
+import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig;
+import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups;
+import org.apache.druid.server.coordinator.rules.Rule;
+import org.apache.druid.server.lookup.cache.LookupCoordinatorManager;
+import org.apache.druid.timeline.DataSegment;
+import org.easymock.EasyMock;
+import org.joda.time.Duration;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Builder for {@link CoordinatorSimulation}.
+ */
+public class CoordinatorSimulationBuilder
+{
+  private static final long DEFAULT_COORDINATOR_PERIOD = 100L;
+  private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper()
+      .setInjectableValues(
+          new InjectableValues.Std().addValue(
+              DataSegment.PruneSpecsHolder.class,
+              DataSegment.PruneSpecsHolder.DEFAULT
+          )
+      );
+
+  private BalancerStrategyFactory balancerStrategyFactory;
+  private CoordinatorDynamicConfig dynamicConfig =
+      CoordinatorDynamicConfig.builder()
+                              .withUseBatchedSegmentSampler(true)
+                              .build();
+  private List<DruidServer> servers;
+  private List<DataSegment> segments;
+  private final Map<String, List<Rule>> datasourceRules = new HashMap<>();
+  private boolean loadImmediately = false;
+  private boolean autoSyncInventory = true;
+
+  /**
+   * Specifies the balancer strategy to be used.
+   * <p>
+   * Default: "cost" ({@link CostBalancerStrategyFactory})
+   */
+  public CoordinatorSimulationBuilder balancer(BalancerStrategyFactory strategyFactory)
+  {
+    this.balancerStrategyFactory = strategyFactory;
+    return this;
+  }
+
+  public CoordinatorSimulationBuilder servers(List<DruidServer> servers)
+  {
+    this.servers = servers;
+    return this;
+  }
+
+  public CoordinatorSimulationBuilder servers(DruidServer... servers)
+  {
+    return servers(Arrays.asList(servers));
+  }
+
+  public CoordinatorSimulationBuilder segments(List<DataSegment> segments)
+  {
+    this.segments = segments;
+    return this;
+  }
+
+  public CoordinatorSimulationBuilder rules(String datasource, Rule... rules)
+  {
+    this.datasourceRules.put(datasource, Arrays.asList(rules));
+    return this;
+  }
+
+  /**
+   * Specifies whether segments should be loaded as soon as they are queued.
+   * <p>
+   * Default: false
+   */
+  public CoordinatorSimulationBuilder loadSegmentsImmediately(boolean loadImmediately)
+  {
+    this.loadImmediately = loadImmediately;
+    return this;
+  }
+
+  /**
+   * Specifies whether the inventory view maintained by the coordinator
+   * should be auto-synced as soon as any change is made to the cluster.
+   * <p>
+   * Default: true
+   */
+  public CoordinatorSimulationBuilder autoSyncInventory(boolean autoSync)
+  {
+    this.autoSyncInventory = autoSync;
+    return this;
+  }
+
+  /**
+   * Specifies the CoordinatorDynamicConfig to be used in the simulation.
+   * <p>
+   * Default values: Specified in {@link CoordinatorDynamicConfig.Builder}.
+   */
+  public CoordinatorSimulationBuilder dynamicConfig(CoordinatorDynamicConfig dynamicConfig)
+  {
+    this.dynamicConfig = dynamicConfig;
+    return this;
+  }
+
+  public CoordinatorSimulation build()
+  {
+    Preconditions.checkArgument(
+        servers != null && !servers.isEmpty(),
+        "Cannot run simulation for an empty cluster"
+    );
+
+    // Prepare the environment
+    final TestServerInventoryView serverInventoryView = new TestServerInventoryView();
+    servers.forEach(serverInventoryView::addServer);
+
+    final TestSegmentsMetadataManager segmentManager = new TestSegmentsMetadataManager();
+    if (segments != null) {
+      segments.forEach(segmentManager::addSegment);
+    }
+
+    final TestMetadataRuleManager ruleManager = new TestMetadataRuleManager();
+    datasourceRules.forEach(
+        (datasource, rules) ->
+            ruleManager.overrideRule(datasource, rules, null)
+    );
+
+    final Environment env = new Environment(
+        serverInventoryView,
+        segmentManager,
+        ruleManager,
+        dynamicConfig,
+        loadImmediately,
+        autoSyncInventory
+    );
+
+    // Build the coordinator
+    final DruidCoordinator coordinator = new DruidCoordinator(
+        env.coordinatorConfig,
+        null,
+        env.jacksonConfigManager,
+        env.segmentManager,
+        env.coordinatorInventoryView,
+        env.ruleManager,
+        () -> null,
+        env.serviceEmitter,
+        env.executorFactory,
+        null,
+        env.loadQueueTaskMaster,
+        new ServiceAnnouncer.Noop(),
+        null,
+        Collections.emptySet(),
+        null,
+        new CoordinatorCustomDutyGroups(Collections.emptySet()),
+        balancerStrategyFactory != null ? balancerStrategyFactory
+                                        : new CostBalancerStrategyFactory(),
+        env.lookupCoordinatorManager,
+        env.leaderSelector,
+        OBJECT_MAPPER,
+        ZkEnablementConfig.ENABLED
+    );
+
+    return new SimulationImpl(coordinator, env);
+  }
+
+  private BalancerStrategyFactory buildCachingCostBalancerStrategy(Environment env)
+  {
+    try {
+      return new CachingCostBalancerStrategyFactory(
+          env.coordinatorInventoryView,
+          env.lifecycle,
+          new CachingCostBalancerStrategyConfig()
+      );
+    }
+    catch (Exception e) {
+      throw new ISE(e, "Error building balancer strategy");
+    }
+  }
+
+  /**
+   * Implementation of {@link CoordinatorSimulation}.
+   */
+  private static class SimulationImpl implements CoordinatorSimulation,
+      CoordinatorSimulation.CoordinatorState, CoordinatorSimulation.ClusterState
+  {
+    private final AtomicBoolean running = new AtomicBoolean(false);
+
+    private final Environment env;
+    private final DruidCoordinator coordinator;
+
+    private SimulationImpl(DruidCoordinator coordinator, Environment env)
+    {
+      this.env = env;
+      this.coordinator = coordinator;
+    }
+
+    @Override
+    public void start()
+    {
+      if (!running.compareAndSet(false, true)) {
+        throw new ISE("Simulation is already running");
+      }
+
+      try {
+        env.setUp();
+        coordinator.start();
+      }
+      catch (Exception e) {
+        throw new ISE(e, "Exception while running simulation");
+      }
+    }
+
+    @Override
+    public void stop()
+    {
+      coordinator.stop();
+      env.leaderSelector.stopBeingLeader();
+      env.tearDown();
+    }
+
+    @Override
+    public CoordinatorState coordinator()
+    {
+      return this;
+    }
+
+    @Override
+    public ClusterState cluster()
+    {
+      return this;
+    }
+
+    @Override
+    public void runCoordinatorCycle()
+    {
+      verifySimulationRunning();
+      env.serviceEmitter.flush();
+
+      // Invoke historical duties and metadata duties
+      env.executorFactory.coordinatorRunner.finishNextPendingTasks(2);
+    }
+
+    @Override
+    public void syncInventoryView()
+    {
+      verifySimulationRunning();
+      Preconditions.checkState(
+          !env.autoSyncInventory,
+          "Cannot invoke syncInventoryView as simulation is running in auto-sync mode."
+      );
+      env.coordinatorInventoryView.sync(env.historicalInventoryView);
+    }
+
+    @Override
+    public void setDynamicConfig(CoordinatorDynamicConfig dynamicConfig)
+    {
+      env.setDynamicConfig(dynamicConfig);
+    }
+
+    @Override
+    public DruidServer getInventoryView(String serverName)
+    {
+      return env.coordinatorInventoryView.getInventoryValue(serverName);
+    }
+
+    @Override
+    public void loadQueuedSegments()
+    {
+      verifySimulationRunning();
+      Preconditions.checkState(
+          !env.loadImmediately,
+          "Cannot invoke loadQueuedSegments as simulation is running in immediate loading mode."
+      );
+
+      final BlockingExecutorService loadQueueExecutor = env.executorFactory.loadQueueExecutor;
+      while (loadQueueExecutor.hasPendingTasks()) {
+        // Drain all the items from the load queue executor
+        // This sends at most 1 load/drop request to each server
+        loadQueueExecutor.finishAllPendingTasks();
+
+        // Load all the queued segments, handle their responses and execute callbacks
+        int loadedSegments = env.executorFactory.historicalLoader.finishAllPendingTasks();
+        loadQueueExecutor.finishNextPendingTasks(loadedSegments);
+
+        // TODO: sync should happen here?? or should it not??
+
+        env.executorFactory.loadCallbackExecutor.finishAllPendingTasks();
+      }
+    }
+
+    private void verifySimulationRunning()
+    {
+      if (!running.get()) {
+        throw new ISE("Simulation hasn't been started yet.");
+      }
+    }
+
+    @Override
+    public double getLoadPercentage(String datasource)
+    {
+      return coordinator.getLoadStatus().get(datasource);
+    }
+
+    @Override
+    public List<Event> getMetricEvents()
+    {
+      return new ArrayList<>(env.serviceEmitter.getEvents());
+    }
+  }
+
+  /**
+   * Environment for a coordinator simulation.
+   */
+  private static class Environment
+  {
+    private final Lifecycle lifecycle = new Lifecycle("coord-sim");
+
+    // Executors
+    private final ExecutorFactory executorFactory;
+
+    private final TestDruidLeaderSelector leaderSelector = new TestDruidLeaderSelector();
+    private final TestSegmentsMetadataManager segmentManager;
+    private final TestMetadataRuleManager ruleManager;
+    private final TestServerInventoryView historicalInventoryView;
+
+    private final LoadQueueTaskMaster loadQueueTaskMaster;
+    private final StubServiceEmitter serviceEmitter
+        = new StubServiceEmitter("coordinator", "coordinator");
+    private final TestServerInventoryView coordinatorInventoryView;
+
+    private final AtomicReference<CoordinatorDynamicConfig> dynamicConfig = new AtomicReference<>();
+    private final JacksonConfigManager jacksonConfigManager;
+    private final LookupCoordinatorManager lookupCoordinatorManager;
+    private final DruidCoordinatorConfig coordinatorConfig;
+    private final boolean loadImmediately;
+    private final boolean autoSyncInventory;
+
+    private final List<Object> mocks = new ArrayList<>();
+
+    private Environment(
+        TestServerInventoryView clusterInventory,
+        TestSegmentsMetadataManager segmentManager,
+        TestMetadataRuleManager ruleManager,
+        CoordinatorDynamicConfig dynamicConfig,
+        boolean loadImmediately,
+        boolean autoSyncInventory
+    )
+    {
+      this.historicalInventoryView = clusterInventory;
+      this.segmentManager = segmentManager;
+      this.ruleManager = ruleManager;
+      this.loadImmediately = loadImmediately;
+      this.autoSyncInventory = autoSyncInventory;
+
+      this.coordinatorConfig = new TestDruidCoordinatorConfig.Builder()
+          .withCoordinatorStartDelay(new Duration(1L))
+          .withCoordinatorPeriod(new Duration(DEFAULT_COORDINATOR_PERIOD))
+          .withCoordinatorKillPeriod(new Duration(DEFAULT_COORDINATOR_PERIOD))
+          .withLoadQueuePeonRepeatDelay(new Duration("PT0S"))
+          .withLoadQueuePeonType("http")
+          .withCoordinatorKillIgnoreDurationToRetain(false)
+          .build();
+
+      this.executorFactory = new ExecutorFactory(loadImmediately);
+      this.coordinatorInventoryView = autoSyncInventory
+                                      ? clusterInventory
+                                      : new TestServerInventoryView();
+      HttpClient httpClient = new TestSegmentLoadingHttpClient(
+          OBJECT_MAPPER,
+          clusterInventory::getChangeHandlerForHost,
+          executorFactory.create(1, ExecutorFactory.HISTORICAL_LOADER)
+      );
+
+      this.loadQueueTaskMaster = new LoadQueueTaskMaster(
+          null,
+          OBJECT_MAPPER,
+          executorFactory.create(1, ExecutorFactory.LOAD_QUEUE_EXECUTOR),
+          executorFactory.create(1, ExecutorFactory.LOAD_CALLBACK_EXECUTOR),
+          coordinatorConfig,
+          httpClient,
+          null
+      );
+
+      this.jacksonConfigManager = mockConfigManager();
+      setDynamicConfig(dynamicConfig);
+
+      this.lookupCoordinatorManager = EasyMock.createNiceMock(LookupCoordinatorManager.class);
+      mocks.add(jacksonConfigManager);
+      mocks.add(lookupCoordinatorManager);
+    }
+
+    private void setUp() throws Exception
+    {
+      EmittingLogger.registerEmitter(serviceEmitter);
+      historicalInventoryView.setUp();
+      coordinatorInventoryView.setUp();
+      lifecycle.start();
+      executorFactory.setUp();
+      leaderSelector.becomeLeader();
+      EasyMock.replay(mocks.toArray());
+    }
+
+    private void tearDown()
+    {
+      EasyMock.verify(mocks.toArray());
+      executorFactory.tearDown();
+      lifecycle.stop();
+    }
+
+    private void setDynamicConfig(CoordinatorDynamicConfig dynamicConfig)
+    {
+      this.dynamicConfig.set(dynamicConfig);
+    }
+
+    private JacksonConfigManager mockConfigManager()
+    {
+      final JacksonConfigManager jacksonConfigManager
+          = EasyMock.createMock(JacksonConfigManager.class);
+      EasyMock.expect(
+          jacksonConfigManager.watch(
+              EasyMock.eq(CoordinatorDynamicConfig.CONFIG_KEY),
+              EasyMock.eq(CoordinatorDynamicConfig.class),
+              EasyMock.anyObject()
+          )
+      ).andReturn(dynamicConfig).anyTimes();
+
+      EasyMock.expect(
+          jacksonConfigManager.watch(
+              EasyMock.eq(CoordinatorCompactionConfig.CONFIG_KEY),
+              EasyMock.eq(CoordinatorCompactionConfig.class),
+              EasyMock.anyObject()

Review Comment:
   > Suggestion: rather than mocking a config, add a constructor or builder to the config. A class that cannot be constructed (as with our config classes) is very tedious to use in tests. Mocking isn't the answer since, if there are methods that compute values, those methods also must be mocked.
   
   I agree, most of our configs have private fields, no setters and no builders, and it becomes a pain to use them comfortably in tests.
   
   But here, the configs are not being mocked. Only the config manager is mocked and we are setting expectations on that mock.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a diff in pull request #13074: Add test framework to simulate segment loading and balancing

Posted by GitBox <gi...@apache.org>.
kfaraz commented on code in PR #13074:
URL: https://github.com/apache/druid/pull/13074#discussion_r974900811


##########
server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.simulate;
+
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.emitter.core.Event;
+import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.server.coordinator.CreateDataSegments;
+import org.apache.druid.server.coordinator.rules.ForeverLoadRule;
+import org.apache.druid.server.coordinator.rules.Rule;
+import org.apache.druid.timeline.DataSegment;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Base test for coordinator simulations.
+ * <p>
+ * Each test must call {@link #startSimulation(CoordinatorSimulation)} to start
+ * the simulation. {@link CoordinatorSimulation#stop()} should not be called as
+ * the simulation is stopped when cleaning up after the test in {@link #tearDown()}.
+ * <p>
+ * Tests that verify balancing behaviour should set
+ * {@link CoordinatorDynamicConfig#useBatchedSegmentSampler()} to true.
+ * Otherwise, the segment sampling is random and can produce repeated values
+ * leading to flakiness in the tests. The simulation sets this field to true by
+ * default.
+ */
+public abstract class CoordinatorSimulationBaseTest

Review Comment:
   Agreed. Although, I think the `CoordinatorSimulation` object itself is already (somewhat) fulfilling the role of the fixture.
   
   It's easy for a developer to write simulation-based tests without ever having to extend the `BaseTest`. The `BaseTest` just provides a bunch of convenience methods which do nothing but delegate to the simulation itself.
   
   In the current tests, other than action invocations on the simulation object, 
   each test does only the following:
   1. declare inputs to the simulation, which would be different for each test case
   2. extract and map metrics from the simulation, which can be commoned out into the simulation itself
   3. verifications
   
   I will make the updates for 2 thus making the `BaseTest` an even thinner layer on top of the simulation. But, as there is hardly any common setup required for the tests, I am not sure if we need a fixture just yet.
   
   Edit: There is probably some room to put metric value extraction and verification into a fixture. I will see if we can include it in this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a diff in pull request #13074: Add test framework to simulate segment loading and balancing

Posted by GitBox <gi...@apache.org>.
kfaraz commented on code in PR #13074:
URL: https://github.com/apache/druid/pull/13074#discussion_r974880978


##########
server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java:
##########
@@ -973,6 +973,14 @@ public List<? extends CoordinatorDuty> getDuties()
     {
       return duties;
     }
+
+    @Override
+    public String toString()
+    {
+      return "DutiesRunnable{" +
+             "dutiesRunnableAlias='" + dutiesRunnableAlias + '\'' +
+             '}';
+    }

Review Comment:
   Oh, yeah, the `VisibleForTesting` is ubiquitous 😅
   
   It would be good to pull out `DutiesRunnable`. Right now, it seems to be directly using pretty much all the fields that `DruidCoordinator` contains. That's probably why it is still hanging around here and why it is not a _static_ inner class either.
   
   The preferable way to do this would be for `DruidCoordinator` to expose a bunch of methods that update the state of segmentManager and other fields that `DutiesRunnable` needs to access. And the `DutiesRunnable` constructor just gets the `DruidCoordinator` instance. `DruidCoordinator` already exposes other such utility methods such as `moveSegment()` or `markSegmentAsUnused()` which are used by the actual duties themselves.
   
   Let me know if this approach makes sense. We can get it done in a follow-up PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] paul-rogers commented on a diff in pull request #13074: Add test framework to simulate segment loading and balancing

Posted by GitBox <gi...@apache.org>.
paul-rogers commented on code in PR #13074:
URL: https://github.com/apache/druid/pull/13074#discussion_r972514017


##########
server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.simulate;
+
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.emitter.core.Event;
+import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.server.coordinator.CreateDataSegments;
+import org.apache.druid.server.coordinator.rules.ForeverLoadRule;
+import org.apache.druid.server.coordinator.rules.Rule;
+import org.apache.druid.timeline.DataSegment;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Base test for coordinator simulations.
+ * <p>
+ * Each test must call {@link #startSimulation(CoordinatorSimulation)} to start
+ * the simulation. {@link CoordinatorSimulation#stop()} should not be called as
+ * the simulation is stopped when cleaning up after the test in {@link #tearDown()}.
+ * <p>
+ * Tests that verify balancing behaviour should set
+ * {@link CoordinatorDynamicConfig#useBatchedSegmentSampler()} to true.
+ * Otherwise, the segment sampling is random and can produce repeated values
+ * leading to flakiness in the tests. The simulation sets this field to true by
+ * default.
+ */
+public abstract class CoordinatorSimulationBaseTest

Review Comment:
   Consider making this a "fixture" rather than a base test. With a fixture, you write a test like so:
   
   ```java
   class MyTest
   {
     CoordinatorSimulationFixture fixture = new CoordinatorSimulationFixture(...);
   
     @Before
     public void setup()
     {
       fixture.setup();
       // My own setup
     }
   
     @Test
     public void myTest()
     {
       fixture.startSimulation(...);
       fixture.doOtherStuff(...);
     }
   ```
   
   The point is that the test class is simple. The test writer can pull in other dependencies without having a multiple-inheritance problem. Etc.



##########
core/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java:
##########
@@ -37,7 +41,18 @@ public StubServiceEmitter(String service, String host)
   @Override
   public void emit(Event event)
   {
-    events.add(event);
+    if (event instanceof AlertEvent) {
+      final AlertEvent alertEvent = (AlertEvent) event;
+      log.warn(
+          "[%s] [%s] [%s]: %s%n",
+          alertEvent.getSeverity(),
+          alertEvent.getService(),
+          alertEvent.getFeed(),
+          alertEvent.getDescription()

Review Comment:
   Pattern expects four string and a number; only four strings provided.



##########
server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentBalancingTest.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.simulate;
+
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.timeline.DataSegment;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.List;
+
+/**
+ * Coordinator simulation test to verify behaviour of segment balancing.
+ */
+public class SegmentBalancingTest extends CoordinatorSimulationBaseTest
+{
+  private DruidServer historicalT11;
+  private DruidServer historicalT12;
+
+  private final String datasource = DS.WIKI;
+  private final List<DataSegment> segments = Segments.WIKI_10X1D;
+
+  @Override
+  public void setUp()
+  {
+    // Setup historicals for 2 tiers, size 10 GB each
+    historicalT11 = createHistorical(1, Tier.T1, 10_000);
+    historicalT12 = createHistorical(2, Tier.T1, 10_000);
+  }
+
+  @Test
+  public void testBalancingWithSyncedInventory()
+  {
+    testBalancingWithInventorySynced(true);
+  }
+
+  @Test
+  @Ignore("Fix #12881 to enable this test. "
+          + "Current impl requires updated inventory for correct callback behaviour.")
+  public void testBalancingWithUnsyncedInventory()
+  {
+    testBalancingWithInventorySynced(false);
+  }
+
+  private void testBalancingWithInventorySynced(boolean autoSyncInventory)

Review Comment:
   Pretty cool!



##########
server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java:
##########
@@ -483,7 +483,7 @@ public void moveSegment(
         );
       }
 
-      final String toLoadQueueSegPath =
+      final String toLoadQueueSegPath = curator == null ? null :

Review Comment:
   As far as I can tell, none of the arguments here depend on ZK. So, we can define the path even if we don't actually have a ZK.



##########
server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java:
##########
@@ -0,0 +1,549 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.simulate;
+
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.common.config.JacksonConfigManager;
+import org.apache.druid.curator.ZkEnablementConfig;
+import org.apache.druid.curator.discovery.ServiceAnnouncer;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.concurrent.DirectExecutorService;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
+import org.apache.druid.java.util.common.lifecycle.Lifecycle;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.core.Event;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.apache.druid.server.coordinator.BalancerStrategyFactory;
+import org.apache.druid.server.coordinator.CachingCostBalancerStrategyConfig;
+import org.apache.druid.server.coordinator.CachingCostBalancerStrategyFactory;
+import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.server.coordinator.CostBalancerStrategyFactory;
+import org.apache.druid.server.coordinator.DruidCoordinator;
+import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
+import org.apache.druid.server.coordinator.LoadQueueTaskMaster;
+import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig;
+import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups;
+import org.apache.druid.server.coordinator.rules.Rule;
+import org.apache.druid.server.lookup.cache.LookupCoordinatorManager;
+import org.apache.druid.timeline.DataSegment;
+import org.easymock.EasyMock;
+import org.joda.time.Duration;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Builder for {@link CoordinatorSimulation}.
+ */
+public class CoordinatorSimulationBuilder
+{
+  private static final long DEFAULT_COORDINATOR_PERIOD = 100L;
+  private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper()
+      .setInjectableValues(
+          new InjectableValues.Std().addValue(
+              DataSegment.PruneSpecsHolder.class,
+              DataSegment.PruneSpecsHolder.DEFAULT
+          )
+      );
+
+  private BalancerStrategyFactory balancerStrategyFactory;
+  private CoordinatorDynamicConfig dynamicConfig =
+      CoordinatorDynamicConfig.builder()
+                              .withUseBatchedSegmentSampler(true)
+                              .build();
+  private List<DruidServer> servers;
+  private List<DataSegment> segments;
+  private final Map<String, List<Rule>> datasourceRules = new HashMap<>();
+  private boolean loadImmediately = false;
+  private boolean autoSyncInventory = true;
+
+  /**
+   * Specifies the balancer strategy to be used.
+   * <p>
+   * Default: "cost" ({@link CostBalancerStrategyFactory})
+   */
+  public CoordinatorSimulationBuilder balancer(BalancerStrategyFactory strategyFactory)
+  {
+    this.balancerStrategyFactory = strategyFactory;
+    return this;
+  }
+
+  public CoordinatorSimulationBuilder servers(List<DruidServer> servers)
+  {
+    this.servers = servers;
+    return this;
+  }
+
+  public CoordinatorSimulationBuilder servers(DruidServer... servers)
+  {
+    return servers(Arrays.asList(servers));
+  }
+
+  public CoordinatorSimulationBuilder segments(List<DataSegment> segments)
+  {
+    this.segments = segments;
+    return this;
+  }
+
+  public CoordinatorSimulationBuilder rules(String datasource, Rule... rules)
+  {
+    this.datasourceRules.put(datasource, Arrays.asList(rules));
+    return this;
+  }
+
+  /**
+   * Specifies whether segments should be loaded as soon as they are queued.
+   * <p>
+   * Default: false
+   */
+  public CoordinatorSimulationBuilder loadSegmentsImmediately(boolean loadImmediately)
+  {
+    this.loadImmediately = loadImmediately;
+    return this;
+  }
+
+  /**
+   * Specifies whether the inventory view maintained by the coordinator
+   * should be auto-synced as soon as any change is made to the cluster.
+   * <p>
+   * Default: true
+   */
+  public CoordinatorSimulationBuilder autoSyncInventory(boolean autoSync)
+  {
+    this.autoSyncInventory = autoSync;
+    return this;
+  }
+
+  /**
+   * Specifies the CoordinatorDynamicConfig to be used in the simulation.
+   * <p>
+   * Default values: Specified in {@link CoordinatorDynamicConfig.Builder}.
+   */
+  public CoordinatorSimulationBuilder dynamicConfig(CoordinatorDynamicConfig dynamicConfig)
+  {
+    this.dynamicConfig = dynamicConfig;
+    return this;
+  }
+
+  public CoordinatorSimulation build()
+  {
+    Preconditions.checkArgument(
+        servers != null && !servers.isEmpty(),
+        "Cannot run simulation for an empty cluster"
+    );
+
+    // Prepare the environment
+    final TestServerInventoryView serverInventoryView = new TestServerInventoryView();
+    servers.forEach(serverInventoryView::addServer);
+
+    final TestSegmentsMetadataManager segmentManager = new TestSegmentsMetadataManager();
+    if (segments != null) {
+      segments.forEach(segmentManager::addSegment);
+    }
+
+    final TestMetadataRuleManager ruleManager = new TestMetadataRuleManager();
+    datasourceRules.forEach(
+        (datasource, rules) ->
+            ruleManager.overrideRule(datasource, rules, null)
+    );
+
+    final Environment env = new Environment(
+        serverInventoryView,
+        segmentManager,
+        ruleManager,
+        dynamicConfig,
+        loadImmediately,
+        autoSyncInventory
+    );
+
+    // Build the coordinator
+    final DruidCoordinator coordinator = new DruidCoordinator(
+        env.coordinatorConfig,
+        null,
+        env.jacksonConfigManager,
+        env.segmentManager,
+        env.coordinatorInventoryView,
+        env.ruleManager,
+        () -> null,
+        env.serviceEmitter,
+        env.executorFactory,
+        null,
+        env.loadQueueTaskMaster,
+        new ServiceAnnouncer.Noop(),
+        null,
+        Collections.emptySet(),
+        null,
+        new CoordinatorCustomDutyGroups(Collections.emptySet()),
+        balancerStrategyFactory != null ? balancerStrategyFactory
+                                        : new CostBalancerStrategyFactory(),
+        env.lookupCoordinatorManager,
+        env.leaderSelector,
+        OBJECT_MAPPER,
+        ZkEnablementConfig.ENABLED
+    );
+
+    return new SimulationImpl(coordinator, env);
+  }
+
+  private BalancerStrategyFactory buildCachingCostBalancerStrategy(Environment env)
+  {
+    try {
+      return new CachingCostBalancerStrategyFactory(
+          env.coordinatorInventoryView,
+          env.lifecycle,
+          new CachingCostBalancerStrategyConfig()
+      );
+    }
+    catch (Exception e) {
+      throw new ISE(e, "Error building balancer strategy");
+    }
+  }
+
+  /**
+   * Implementation of {@link CoordinatorSimulation}.
+   */
+  private static class SimulationImpl implements CoordinatorSimulation,
+      CoordinatorSimulation.CoordinatorState, CoordinatorSimulation.ClusterState
+  {
+    private final AtomicBoolean running = new AtomicBoolean(false);
+
+    private final Environment env;
+    private final DruidCoordinator coordinator;
+
+    private SimulationImpl(DruidCoordinator coordinator, Environment env)
+    {
+      this.env = env;
+      this.coordinator = coordinator;
+    }
+
+    @Override
+    public void start()
+    {
+      if (!running.compareAndSet(false, true)) {
+        throw new ISE("Simulation is already running");
+      }
+
+      try {
+        env.setUp();
+        coordinator.start();
+      }
+      catch (Exception e) {
+        throw new ISE(e, "Exception while running simulation");
+      }
+    }
+
+    @Override
+    public void stop()
+    {
+      coordinator.stop();
+      env.leaderSelector.stopBeingLeader();
+      env.tearDown();
+    }
+
+    @Override
+    public CoordinatorState coordinator()
+    {
+      return this;
+    }
+
+    @Override
+    public ClusterState cluster()
+    {
+      return this;
+    }
+
+    @Override
+    public void runCoordinatorCycle()
+    {
+      verifySimulationRunning();
+      env.serviceEmitter.flush();
+
+      // Invoke historical duties and metadata duties
+      env.executorFactory.coordinatorRunner.finishNextPendingTasks(2);
+    }
+
+    @Override
+    public void syncInventoryView()
+    {
+      verifySimulationRunning();
+      Preconditions.checkState(
+          !env.autoSyncInventory,
+          "Cannot invoke syncInventoryView as simulation is running in auto-sync mode."
+      );
+      env.coordinatorInventoryView.sync(env.historicalInventoryView);
+    }
+
+    @Override
+    public void setDynamicConfig(CoordinatorDynamicConfig dynamicConfig)
+    {
+      env.setDynamicConfig(dynamicConfig);
+    }
+
+    @Override
+    public DruidServer getInventoryView(String serverName)
+    {
+      return env.coordinatorInventoryView.getInventoryValue(serverName);
+    }
+
+    @Override
+    public void loadQueuedSegments()
+    {
+      verifySimulationRunning();
+      Preconditions.checkState(
+          !env.loadImmediately,
+          "Cannot invoke loadQueuedSegments as simulation is running in immediate loading mode."
+      );
+
+      final BlockingExecutorService loadQueueExecutor = env.executorFactory.loadQueueExecutor;
+      while (loadQueueExecutor.hasPendingTasks()) {
+        // Drain all the items from the load queue executor
+        // This sends at most 1 load/drop request to each server
+        loadQueueExecutor.finishAllPendingTasks();
+
+        // Load all the queued segments, handle their responses and execute callbacks
+        int loadedSegments = env.executorFactory.historicalLoader.finishAllPendingTasks();
+        loadQueueExecutor.finishNextPendingTasks(loadedSegments);
+
+        // TODO: sync should happen here?? or should it not??
+
+        env.executorFactory.loadCallbackExecutor.finishAllPendingTasks();
+      }
+    }
+
+    private void verifySimulationRunning()
+    {
+      if (!running.get()) {
+        throw new ISE("Simulation hasn't been started yet.");
+      }
+    }
+
+    @Override
+    public double getLoadPercentage(String datasource)
+    {
+      return coordinator.getLoadStatus().get(datasource);
+    }
+
+    @Override
+    public List<Event> getMetricEvents()
+    {
+      return new ArrayList<>(env.serviceEmitter.getEvents());
+    }
+  }
+
+  /**
+   * Environment for a coordinator simulation.
+   */
+  private static class Environment
+  {
+    private final Lifecycle lifecycle = new Lifecycle("coord-sim");
+
+    // Executors
+    private final ExecutorFactory executorFactory;
+
+    private final TestDruidLeaderSelector leaderSelector = new TestDruidLeaderSelector();
+    private final TestSegmentsMetadataManager segmentManager;
+    private final TestMetadataRuleManager ruleManager;
+    private final TestServerInventoryView historicalInventoryView;
+
+    private final LoadQueueTaskMaster loadQueueTaskMaster;
+    private final StubServiceEmitter serviceEmitter
+        = new StubServiceEmitter("coordinator", "coordinator");
+    private final TestServerInventoryView coordinatorInventoryView;
+
+    private final AtomicReference<CoordinatorDynamicConfig> dynamicConfig = new AtomicReference<>();
+    private final JacksonConfigManager jacksonConfigManager;
+    private final LookupCoordinatorManager lookupCoordinatorManager;
+    private final DruidCoordinatorConfig coordinatorConfig;
+    private final boolean loadImmediately;
+    private final boolean autoSyncInventory;
+
+    private final List<Object> mocks = new ArrayList<>();
+
+    private Environment(
+        TestServerInventoryView clusterInventory,
+        TestSegmentsMetadataManager segmentManager,
+        TestMetadataRuleManager ruleManager,
+        CoordinatorDynamicConfig dynamicConfig,
+        boolean loadImmediately,
+        boolean autoSyncInventory
+    )
+    {
+      this.historicalInventoryView = clusterInventory;
+      this.segmentManager = segmentManager;
+      this.ruleManager = ruleManager;
+      this.loadImmediately = loadImmediately;
+      this.autoSyncInventory = autoSyncInventory;
+
+      this.coordinatorConfig = new TestDruidCoordinatorConfig.Builder()
+          .withCoordinatorStartDelay(new Duration(1L))
+          .withCoordinatorPeriod(new Duration(DEFAULT_COORDINATOR_PERIOD))
+          .withCoordinatorKillPeriod(new Duration(DEFAULT_COORDINATOR_PERIOD))
+          .withLoadQueuePeonRepeatDelay(new Duration("PT0S"))
+          .withLoadQueuePeonType("http")
+          .withCoordinatorKillIgnoreDurationToRetain(false)
+          .build();
+
+      this.executorFactory = new ExecutorFactory(loadImmediately);
+      this.coordinatorInventoryView = autoSyncInventory
+                                      ? clusterInventory
+                                      : new TestServerInventoryView();
+      HttpClient httpClient = new TestSegmentLoadingHttpClient(
+          OBJECT_MAPPER,
+          clusterInventory::getChangeHandlerForHost,
+          executorFactory.create(1, ExecutorFactory.HISTORICAL_LOADER)
+      );
+
+      this.loadQueueTaskMaster = new LoadQueueTaskMaster(
+          null,
+          OBJECT_MAPPER,
+          executorFactory.create(1, ExecutorFactory.LOAD_QUEUE_EXECUTOR),
+          executorFactory.create(1, ExecutorFactory.LOAD_CALLBACK_EXECUTOR),
+          coordinatorConfig,
+          httpClient,
+          null
+      );
+
+      this.jacksonConfigManager = mockConfigManager();
+      setDynamicConfig(dynamicConfig);
+
+      this.lookupCoordinatorManager = EasyMock.createNiceMock(LookupCoordinatorManager.class);
+      mocks.add(jacksonConfigManager);
+      mocks.add(lookupCoordinatorManager);
+    }
+
+    private void setUp() throws Exception
+    {
+      EmittingLogger.registerEmitter(serviceEmitter);
+      historicalInventoryView.setUp();
+      coordinatorInventoryView.setUp();
+      lifecycle.start();
+      executorFactory.setUp();
+      leaderSelector.becomeLeader();
+      EasyMock.replay(mocks.toArray());
+    }
+
+    private void tearDown()
+    {
+      EasyMock.verify(mocks.toArray());
+      executorFactory.tearDown();
+      lifecycle.stop();
+    }
+
+    private void setDynamicConfig(CoordinatorDynamicConfig dynamicConfig)
+    {
+      this.dynamicConfig.set(dynamicConfig);
+    }
+
+    private JacksonConfigManager mockConfigManager()
+    {
+      final JacksonConfigManager jacksonConfigManager
+          = EasyMock.createMock(JacksonConfigManager.class);
+      EasyMock.expect(
+          jacksonConfigManager.watch(
+              EasyMock.eq(CoordinatorDynamicConfig.CONFIG_KEY),
+              EasyMock.eq(CoordinatorDynamicConfig.class),
+              EasyMock.anyObject()
+          )
+      ).andReturn(dynamicConfig).anyTimes();
+
+      EasyMock.expect(
+          jacksonConfigManager.watch(
+              EasyMock.eq(CoordinatorCompactionConfig.CONFIG_KEY),
+              EasyMock.eq(CoordinatorCompactionConfig.class),
+              EasyMock.anyObject()

Review Comment:
   Suggestion: rather than mocking a config, add a constructor or builder to the config. A class that cannot be constructed (as with our config classes) is very tedious to use in tests. Mocking isn't the answer since, if there are methods that compute values, those methods also must be mocked.
   
   Another solution is to use Guice. Use the new startup config builder and friends to pass in the set of properties, then ask the injector to create config instances. Going that route is a bit overkill, but it avoids the need to add constructors: we just use the Json config process we already have.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a diff in pull request #13074: Add test framework to simulate segment loading and balancing

Posted by GitBox <gi...@apache.org>.
kfaraz commented on code in PR #13074:
URL: https://github.com/apache/druid/pull/13074#discussion_r972564812


##########
core/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java:
##########
@@ -37,7 +41,18 @@ public StubServiceEmitter(String service, String host)
   @Override
   public void emit(Event event)
   {
-    events.add(event);
+    if (event instanceof AlertEvent) {
+      final AlertEvent alertEvent = (AlertEvent) event;
+      log.warn(
+          "[%s] [%s] [%s]: %s%n",
+          alertEvent.getSeverity(),
+          alertEvent.getService(),
+          alertEvent.getFeed(),
+          alertEvent.getDescription()

Review Comment:
   The last one `%n` prints a newline.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a diff in pull request #13074: Add test framework to simulate segment loading and balancing

Posted by GitBox <gi...@apache.org>.
kfaraz commented on code in PR #13074:
URL: https://github.com/apache/druid/pull/13074#discussion_r972570643


##########
server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.simulate;
+
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.emitter.core.Event;
+import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.server.coordinator.CreateDataSegments;
+import org.apache.druid.server.coordinator.rules.ForeverLoadRule;
+import org.apache.druid.server.coordinator.rules.Rule;
+import org.apache.druid.timeline.DataSegment;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Base test for coordinator simulations.
+ * <p>
+ * Each test must call {@link #startSimulation(CoordinatorSimulation)} to start
+ * the simulation. {@link CoordinatorSimulation#stop()} should not be called as
+ * the simulation is stopped when cleaning up after the test in {@link #tearDown()}.
+ * <p>
+ * Tests that verify balancing behaviour should set
+ * {@link CoordinatorDynamicConfig#useBatchedSegmentSampler()} to true.
+ * Otherwise, the segment sampling is random and can produce repeated values
+ * leading to flakiness in the tests. The simulation sets this field to true by
+ * default.
+ */
+public abstract class CoordinatorSimulationBaseTest

Review Comment:
   I did start off with the fixture pattern but later decided to have a base test as it helps avoid every statement beginning with `fixture.`, which begins to feel redundant if every line has it.
   
   The tests that have already been added shouldn't be likely to implement other interfaces.
   New tests that have conflicting inheritances could always treat the same base test as a fixture, I guess.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a diff in pull request #13074: Add test framework to simulate segment loading and balancing

Posted by GitBox <gi...@apache.org>.
kfaraz commented on code in PR #13074:
URL: https://github.com/apache/druid/pull/13074#discussion_r973087144


##########
server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java:
##########
@@ -483,7 +483,7 @@ public void moveSegment(
         );
       }
 
-      final String toLoadQueueSegPath =
+      final String toLoadQueueSegPath = curator == null ? null :

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a diff in pull request #13074: Add test framework to simulate segment loading and balancing

Posted by GitBox <gi...@apache.org>.
kfaraz commented on code in PR #13074:
URL: https://github.com/apache/druid/pull/13074#discussion_r974900811


##########
server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.simulate;
+
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.emitter.core.Event;
+import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.server.coordinator.CreateDataSegments;
+import org.apache.druid.server.coordinator.rules.ForeverLoadRule;
+import org.apache.druid.server.coordinator.rules.Rule;
+import org.apache.druid.timeline.DataSegment;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Base test for coordinator simulations.
+ * <p>
+ * Each test must call {@link #startSimulation(CoordinatorSimulation)} to start
+ * the simulation. {@link CoordinatorSimulation#stop()} should not be called as
+ * the simulation is stopped when cleaning up after the test in {@link #tearDown()}.
+ * <p>
+ * Tests that verify balancing behaviour should set
+ * {@link CoordinatorDynamicConfig#useBatchedSegmentSampler()} to true.
+ * Otherwise, the segment sampling is random and can produce repeated values
+ * leading to flakiness in the tests. The simulation sets this field to true by
+ * default.
+ */
+public abstract class CoordinatorSimulationBaseTest

Review Comment:
   Agreed. Although, I think the `CoordinatorSimulation` object itself is already (somewhat) fulfilling the role of the fixture.
   
   It's easy for a developer to write simulation-based tests without ever having to extend the `BaseTest`. The `BaseTest` just provides a bunch of convenience methods which do nothing but delegate to the simulation itself.
   
   In the current tests, other than action invocations on the simulation object, 
   each test does only the following:
   1. declare inputs to the simulation, which would be different for each test case
   2. extract and map metrics from the simulation, which can be commoned out into the simulation itself
   3. verifications
   
   I will make the updates for 2 thus making the `BaseTest` an even thinner layer on top of the simulation. But, as there is hardly any common setup required for the tests, I am not sure if we need a fixture just yet.
   
   __Edit: There is probably some room to put metric value extraction and verification into a fixture. I will see if we can include it in this PR.__



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a diff in pull request #13074: Add test framework to simulate segment loading and balancing

Posted by GitBox <gi...@apache.org>.
kfaraz commented on code in PR #13074:
URL: https://github.com/apache/druid/pull/13074#discussion_r974877221


##########
server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java:
##########
@@ -92,6 +92,7 @@ private void balanceTier(
   )
   {
 
+    log.info("Balancing segments in tier [%s]", tier);

Review Comment:
   I intend to clean up the logs and add some more useful metrics around balancing/loading as a follow up to these changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz merged pull request #13074: Add test framework to simulate segment loading and balancing

Posted by GitBox <gi...@apache.org>.
kfaraz merged PR #13074:
URL: https://github.com/apache/druid/pull/13074


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] imply-cheddar commented on a diff in pull request #13074: Add test framework to simulate segment loading and balancing

Posted by GitBox <gi...@apache.org>.
imply-cheddar commented on code in PR #13074:
URL: https://github.com/apache/druid/pull/13074#discussion_r974863239


##########
core/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java:
##########
@@ -37,7 +41,18 @@ public StubServiceEmitter(String service, String host)
   @Override
   public void emit(Event event)
   {
-    events.add(event);
+    if (event instanceof AlertEvent) {
+      final AlertEvent alertEvent = (AlertEvent) event;

Review Comment:
   This seems like it is attempting to use logs to validate that alert events were fired?  What's wrong with having the AlertEvents in the list?  Or, maybe, have 2 lists, one for metrics and one for alerts?



##########
server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java:
##########
@@ -973,6 +973,14 @@ public List<? extends CoordinatorDuty> getDuties()
     {
       return duties;
     }
+
+    @Override
+    public String toString()
+    {
+      return "DutiesRunnable{" +
+             "dutiesRunnableAlias='" + dutiesRunnableAlias + '\'' +
+             '}';
+    }

Review Comment:
   I know this comment isn't about your code, but your addition of the `toString` here made me wonder why `DruidCoordinator's` `toString` reads as "DutiesRunnable".  The class is probably large enough (and already depended upon, see `@VisibleForTesting` annotation peppering this code) that maybe it's just time to promote it to its own class. 



##########
server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentLoadingHttpClient.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.simulate;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.java.util.http.client.Request;
+import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
+import org.apache.druid.server.coordination.DataSegmentChangeCallback;
+import org.apache.druid.server.coordination.DataSegmentChangeHandler;
+import org.apache.druid.server.coordination.DataSegmentChangeRequest;
+import org.apache.druid.server.coordination.SegmentLoadDropHandler;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.jboss.netty.handler.codec.http.HttpVersion;
+import org.joda.time.Duration;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class TestSegmentLoadingHttpClient implements HttpClient
+{
+  private static final HttpResponseHandler.TrafficCop NOOP_TRAFFIC_COP = checkNum -> 0L;
+  private static final DataSegmentChangeCallback NOOP_CALLBACK = () -> {
+  };
+
+  private final ObjectMapper objectMapper;
+  private final Function<String, DataSegmentChangeHandler> hostToHandler;
+
+  private final ListeningScheduledExecutorService executorService;
+
+  public TestSegmentLoadingHttpClient(
+      ObjectMapper objectMapper,
+      Function<String, DataSegmentChangeHandler> hostToHandler,
+      ScheduledExecutorService executorService
+  )
+  {
+    this.objectMapper = objectMapper;
+    this.hostToHandler = hostToHandler;
+    this.executorService = MoreExecutors.listeningDecorator(executorService);
+  }
+
+  @Override
+  public <Intermediate, Final> ListenableFuture<Final> go(
+      Request request,
+      HttpResponseHandler<Intermediate, Final> handler
+  )
+  {
+    throw new UnsupportedOperationException();

Review Comment:
   Perhaps overly defensive, but I think that this can be an UOE with a message about all expected usages going through the 3-argument call.  That way, if this actually does end up getting called at some point in time, the developer will have an idea for what assumption broke.



##########
server/src/test/java/org/apache/druid/server/coordinator/simulate/README.md:
##########
@@ -0,0 +1,141 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+# Coordinator simulations
+
+The simulation framework allows developers to recreate arbitrary cluster setups and verify coordinator behaviour. Tests
+written using the framework can also help identify performance bottlenecks or potential bugs in the system and even
+compare different balancing strategies.
+
+As opposed to unit tests, simulations are meant to test the coordinator as a whole and verify the interactions of all
+the underlying parts. In that regard, these simulations resemble integration tests more closely.
+
+## Test targets
+
+The primary test target is the `DruidCoordinator` itself. The behaviour of the following entities can also be verified
+using simulations:
+
+- `LoadQueuePeon`, `LoadQueueTaskMaster`
+- All coordinator duties, e.g. `BalanceSegments`, `RunRules`
+- All retention rules
+
+## Capabilities
+
+The framework provides control over the following aspects of the setup:
+
+| Input | Details | Actions |
+|-------|---------|---------|
+|cluster | server name, type, tier, size | add a server, remove a server|
+|segment |datasource, interval, version, partition num, size | add/remove from server, mark used/unused, publish new segments|
+|rules | type (foreverLoad, drop, etc), replica count per tier | set rules for a datasource| 
+|configs |coordinator period, load queue type, load queue size, max segments to balance | set or update a config |
+
+The above actions can be performed at any point after building the simulation. So, you could even recreate scenarios
+where during a coordinator run, a server crashes or the retention rules of a datasource change, and verify the behaviour
+of the coordinator in these situations.
+
+## Design
+
+1. __Execution__: A tight dependency on time durations such as the period of a repeating task or the delay before a
+   scheduled task makes it difficult to reliably reproduce a test scenario. As a result, the tests become flaky. Thus,
+   all the executors required for coordinator operations have been allowed only two possible modes of execution:
+    - __immediate__: Execute tasks on the calling thread itself.
+    - __blocked__: Keep tasks in a queue until explicitly invoked.
+2. __Internal dependencies__: In order to allow realistic reproductions of the coordinator behaviour, none of the
+   internal parts of the coordinator have been mocked in the framework and new tests need not mock anything at all.
+3. __External dependencies__: Since these tests are meant to verify the behaviour of only the coordinator, the
+   interfaces to communicate with external dependencies have been provided as simple in-memory implementations:
+    - communication with metadata store: `SegmentMetadataManager`, `MetadataRuleManager`
+    - communication with historicals: `HttpClient`, `ServerInventoryView`
+4. __Inventory__: The coordinator maintains an inventory view of the cluster state. Simulations can choose from two
+   modes of inventory update - auto and manual. In auto update mode, any change made to the cluster is immediately
+   reflected in the inventory view. In manual update mode, the inventory must be explicitly synchronized with the
+   cluster state.
+
+## Limitations
+
+- The framework does not expose the coordinator HTTP endpoints.
+- It should not be used to verify the absolute values of execution latencies, e.g. the time taken to compute the
+  balancing cost of a segment. But the relative values can still be a good indicator while doing comparisons between,
+  say two balancing strategies.

Review Comment:
   What's wrong with trying to use it to benchmark the execution latencies of different balancing strategies?  
   
   Or.  What's the different between "verify the absolute values of execution latencies" and "be a good indicator while doing comparisons between, say, two balancing strategies"?



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java:
##########
@@ -92,6 +92,7 @@ private void balanceTier(
   )
   {
 
+    log.info("Balancing segments in tier [%s]", tier);

Review Comment:
   Is there any more information that can be added to this?  Having just the fact that the balancing occurred is useful, but if we can like add sizes or anything else that might be nice to have when trying to understand what happened, that can make it even more useful.



##########
server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java:
##########
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.simulate;
+
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.emitter.core.Event;
+import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.server.coordinator.CreateDataSegments;
+import org.apache.druid.server.coordinator.rules.ForeverLoadRule;
+import org.apache.druid.server.coordinator.rules.Rule;
+import org.apache.druid.timeline.DataSegment;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Base test for coordinator simulations.
+ * <p>
+ * Each test must call {@link #startSimulation(CoordinatorSimulation)} to start
+ * the simulation. {@link CoordinatorSimulation#stop()} should not be called as
+ * the simulation is stopped when cleaning up after the test in {@link #tearDown()}.
+ * <p>
+ * Tests that verify balancing behaviour should set
+ * {@link CoordinatorDynamicConfig#useBatchedSegmentSampler()} to true.
+ * Otherwise, the segment sampling is random and can produce repeated values
+ * leading to flakiness in the tests. The simulation sets this field to true by
+ * default.

Review Comment:
   Part of me wonders if this comment doesn't (also?) belong on `createDynamicConfig`?



##########
server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.simulate;
+
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.emitter.core.Event;
+import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.server.coordinator.CreateDataSegments;
+import org.apache.druid.server.coordinator.rules.ForeverLoadRule;
+import org.apache.druid.server.coordinator.rules.Rule;
+import org.apache.druid.timeline.DataSegment;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Base test for coordinator simulations.
+ * <p>
+ * Each test must call {@link #startSimulation(CoordinatorSimulation)} to start
+ * the simulation. {@link CoordinatorSimulation#stop()} should not be called as
+ * the simulation is stopped when cleaning up after the test in {@link #tearDown()}.
+ * <p>
+ * Tests that verify balancing behaviour should set
+ * {@link CoordinatorDynamicConfig#useBatchedSegmentSampler()} to true.
+ * Otherwise, the segment sampling is random and can produce repeated values
+ * leading to flakiness in the tests. The simulation sets this field to true by
+ * default.
+ */
+public abstract class CoordinatorSimulationBaseTest

Review Comment:
   You could also have the fixture and then also have a BaseTest implemented using the fixture.  Then you are effectively actually using a composable fixture for things (and people can fall back to that if need be), but still don't have to repeat `fixture.` in all of the places.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a diff in pull request #13074: Add test framework to simulate segment loading and balancing

Posted by GitBox <gi...@apache.org>.
kfaraz commented on code in PR #13074:
URL: https://github.com/apache/druid/pull/13074#discussion_r974880978


##########
server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java:
##########
@@ -973,6 +973,14 @@ public List<? extends CoordinatorDuty> getDuties()
     {
       return duties;
     }
+
+    @Override
+    public String toString()
+    {
+      return "DutiesRunnable{" +
+             "dutiesRunnableAlias='" + dutiesRunnableAlias + '\'' +
+             '}';
+    }

Review Comment:
   Oh, yeah, the `VisibleForTesting` is ubiquitous 😅
   
   It would be good to pull out `DutiesRunnable`. Right now, it seems to be directly using pretty much all the fields that `DruidCoordinator` contains. That's probably why it is still hanging around here and why it is not a _static_ inner class either.
   
   The preferable way to do this would be for `DruidCoordinator` to expose a bunch of methods to update the state of segmentManager or other things that `DutiesRunnable` needs to access. And the `DutiesRunnable` constructor just gets the `DruidCoordinator` instance. `DruidCoordinator` already exposes a few of these utility methods such as `moveSegment()` or `markSegmentAsUnused()` which are
   used by the actual duties themselves.
   
   Let me know if this approach makes sense. We can get it done in a follow-up PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a diff in pull request #13074: Add test framework to simulate segment loading and balancing

Posted by GitBox <gi...@apache.org>.
kfaraz commented on code in PR #13074:
URL: https://github.com/apache/druid/pull/13074#discussion_r974876899


##########
core/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java:
##########
@@ -37,7 +41,18 @@ public StubServiceEmitter(String service, String host)
   @Override
   public void emit(Event event)
   {
-    events.add(event);
+    if (event instanceof AlertEvent) {
+      final AlertEvent alertEvent = (AlertEvent) event;

Review Comment:
   That makes sense. I did have a dedicated list for alerts during my initial testing but later removed it as I wasn't using it in my tests anymore. Will fix it up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a diff in pull request #13074: Add test framework to simulate segment loading and balancing

Posted by GitBox <gi...@apache.org>.
kfaraz commented on code in PR #13074:
URL: https://github.com/apache/druid/pull/13074#discussion_r974900811


##########
server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.simulate;
+
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.emitter.core.Event;
+import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.server.coordinator.CreateDataSegments;
+import org.apache.druid.server.coordinator.rules.ForeverLoadRule;
+import org.apache.druid.server.coordinator.rules.Rule;
+import org.apache.druid.timeline.DataSegment;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Base test for coordinator simulations.
+ * <p>
+ * Each test must call {@link #startSimulation(CoordinatorSimulation)} to start
+ * the simulation. {@link CoordinatorSimulation#stop()} should not be called as
+ * the simulation is stopped when cleaning up after the test in {@link #tearDown()}.
+ * <p>
+ * Tests that verify balancing behaviour should set
+ * {@link CoordinatorDynamicConfig#useBatchedSegmentSampler()} to true.
+ * Otherwise, the segment sampling is random and can produce repeated values
+ * leading to flakiness in the tests. The simulation sets this field to true by
+ * default.
+ */
+public abstract class CoordinatorSimulationBaseTest

Review Comment:
   Agreed. Although, I think the `CoordinatorSimulation` object itself is already (somewhat) fulfilling the role of the fixture.
   
   It's easy for a developer to write simulation-based tests without ever having to extend the `BaseTest`. The `BaseTest` just provides a bunch of convenience methods which do nothing but delegate to the simulation itself.
   
   In the current tests, other than action invocations on the simulation object, 
   each test does only the following:
   1. declare inputs to the simulation, which would be different for each test case
   2. extract and map metrics from the simulation, which can be commoned out into the simulation itself
   3. verifications
   
   I will make the updates for 2 thus making the `BaseTest` an even thinner layer on top of the simulation.
   But, as there is hardly any common setup required for the tests, I am not sure if we need a fixture just yet.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org