You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2022/11/08 03:23:00 UTC

[pinot] branch master updated: [multistage] test restructure (#9726)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f5fc107aa5 [multistage] test restructure (#9726)
f5fc107aa5 is described below

commit f5fc107aa5751c6408d211876213e9fac98707d9
Author: Rong Rong <ro...@apache.org>
AuthorDate: Mon Nov 7 19:22:55 2022 -0800

    [multistage] test restructure (#9726)
    
    * restructure planner with mock routing manager factory
    * runtime test restructure with mock instance data manager factory
    * also include operator util refactoring with mock operator factory
    
    Co-authored-by: Rong Rong <ro...@startree.ai>
---
 .../apache/pinot/query/QueryCompilationTest.java   |   9 +-
 .../pinot/query/QueryEnvironmentTestBase.java      |  79 +++++++++-
 .../pinot/query/QueryEnvironmentTestUtils.java     | 163 --------------------
 .../query/testutils/MockRoutingManagerFactory.java | 165 ++++++++++++++++++++
 .../pinot/query/testutils/QueryTestUtils.java      |  44 ++++++
 .../apache/pinot/query/QueryServerEnclosure.java   | 120 +--------------
 .../query/mailbox/GrpcMailboxServiceTestBase.java  |   4 +-
 .../pinot/query/runtime/QueryRunnerTestBase.java   |  46 ++++--
 .../runtime/operator/AggregateOperatorTest.java    |  22 +--
 .../runtime/operator/HashJoinOperatorTest.java     |  47 ++----
 .../query/runtime/operator/OperatorTestUtil.java   |  36 +++--
 .../pinot/query/service/QueryDispatcherTest.java   |   8 +-
 .../pinot/query/service/QueryServerTest.java       |   8 +-
 .../testutils/MockDataBlockOperatorFactory.java    |  74 +++++++++
 .../testutils/MockInstanceDataManagerFactory.java  | 168 +++++++++++++++++++++
 15 files changed, 619 insertions(+), 374 deletions(-)

diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
index 87b3c940f2..7ece7fce43 100644
--- a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
+++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.query;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -127,13 +128,13 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase {
       } else if (!PlannerUtils.isRootStage(e.getKey())) {
         // join stage should have both servers used.
         Assert.assertEquals(
-            e.getValue().getServerInstances().stream().map(ServerInstance::toString).collect(Collectors.toList()),
-            ImmutableList.of("Server_localhost_1", "Server_localhost_2"));
+            e.getValue().getServerInstances().stream().map(ServerInstance::toString).collect(Collectors.toSet()),
+            ImmutableSet.of("Server_localhost_1", "Server_localhost_2"));
       } else {
         // reduce stage should have the reducer instance.
         Assert.assertEquals(
-            e.getValue().getServerInstances().stream().map(ServerInstance::toString).collect(Collectors.toList()),
-            ImmutableList.of("Server_localhost_3"));
+            e.getValue().getServerInstances().stream().map(ServerInstance::toString).collect(Collectors.toSet()),
+            ImmutableSet.of("Server_localhost_3"));
       }
     }
   }
diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
index 799d0ef380..8994b81f57 100644
--- a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
+++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
@@ -18,26 +18,59 @@
  */
 package org.apache.pinot.query;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import org.apache.calcite.jdbc.CalciteSchemaBuilder;
+import org.apache.pinot.common.config.provider.TableCache;
 import org.apache.pinot.core.routing.RoutingManager;
 import org.apache.pinot.query.catalog.PinotCatalog;
 import org.apache.pinot.query.routing.WorkerManager;
+import org.apache.pinot.query.testutils.MockRoutingManagerFactory;
 import org.apache.pinot.query.type.TypeFactory;
 import org.apache.pinot.query.type.TypeSystem;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 
 
 public class QueryEnvironmentTestBase {
+  public static final Map<String, List<String>> SERVER1_SEGMENTS =
+      ImmutableMap.of("a_REALTIME", ImmutableList.of("a1", "a2"), "b_REALTIME", ImmutableList.of("b1"), "c_OFFLINE",
+          ImmutableList.of("c1"), "d_OFFLINE", ImmutableList.of("d1"));
+  public static final Map<String, List<String>> SERVER2_SEGMENTS =
+      ImmutableMap.of("a_REALTIME", ImmutableList.of("a3"), "c_OFFLINE", ImmutableList.of("c2", "c3"),
+          "d_REALTIME", ImmutableList.of("d2"), "d_OFFLINE", ImmutableList.of("d3"));
+  public static final Schema.SchemaBuilder SCHEMA_BUILDER;
+  public static final Object[][] ROWS = new Object[][]{
+      new Object[]{"foo", "foo", 1},
+      new Object[]{"bar", "bar", 42},
+      new Object[]{"alice", "alice", 1},
+      new Object[]{"bob", "foo", 42},
+      new Object[]{"charlie", "bar", 1},
+  };
+  static {
+    SCHEMA_BUILDER = new Schema.SchemaBuilder()
+        .addSingleValueDimension("col1", FieldSpec.DataType.STRING, "")
+        .addSingleValueDimension("col2", FieldSpec.DataType.STRING, "")
+        .addDateTime("ts", FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:HOURS")
+        .addMetric("col3", FieldSpec.DataType.INT, 0)
+        .setSchemaName("defaultSchemaName");
+  }
+
   protected QueryEnvironment _queryEnvironment;
 
   @BeforeClass
   public void setUp() {
     // the port doesn't matter as we are not actually making a server call.
-    RoutingManager routingManager = QueryEnvironmentTestUtils.getMockRoutingManager(1, 2);
-    _queryEnvironment = new QueryEnvironment(new TypeFactory(new TypeSystem()),
-        CalciteSchemaBuilder.asRootSchema(new PinotCatalog(QueryEnvironmentTestUtils.mockTableCache())),
-        new WorkerManager("localhost", 3, routingManager));
+    _queryEnvironment = getQueryEnvironment(3, 1, 2, SERVER1_SEGMENTS, SERVER2_SEGMENTS);
   }
 
   @DataProvider(name = "testQueryDataProvider")
@@ -71,4 +104,42 @@ public class QueryEnvironmentTestBase {
             + " b.col2 NOT IN ('alice', 'charlie')"},
     };
   }
+
+  public static List<GenericRow> buildRows(String tableName) {
+    List<GenericRow> rows = new ArrayList<>(ROWS.length);
+    for (int i = 0; i < ROWS.length; i++) {
+      GenericRow row = new GenericRow();
+      row.putValue("col1", ROWS[i][0]);
+      row.putValue("col2", ROWS[i][1]);
+      row.putValue("col3", ROWS[i][2]);
+      row.putValue("ts", TableType.OFFLINE.equals(TableNameBuilder.getTableTypeFromTableName(tableName))
+          ? System.currentTimeMillis() - TimeUnit.DAYS.toMillis(2) : System.currentTimeMillis());
+      rows.add(row);
+    }
+    return rows;
+  }
+
+  public static QueryEnvironment getQueryEnvironment(int reducerPort, int port1, int port2,
+      Map<String, List<String>> segmentMap1, Map<String, List<String>> segmentMap2) {
+    MockRoutingManagerFactory factory = new MockRoutingManagerFactory(port1, port2)
+        .registerTable(SCHEMA_BUILDER.setSchemaName("a").build(), "a_REALTIME")
+        .registerTable(SCHEMA_BUILDER.setSchemaName("b").build(), "b_REALTIME")
+        .registerTable(SCHEMA_BUILDER.setSchemaName("c").build(), "c_OFFLINE")
+        .registerTable(SCHEMA_BUILDER.setSchemaName("d").build(), "d");
+    for (Map.Entry<String, List<String>> entry : segmentMap1.entrySet()) {
+      for (String segment : entry.getValue()) {
+        factory.registerSegment(port1, entry.getKey(), segment);
+      }
+    }
+    for (Map.Entry<String, List<String>> entry : segmentMap2.entrySet()) {
+      for (String segment : entry.getValue()) {
+        factory.registerSegment(port2, entry.getKey(), segment);
+      }
+    }
+    RoutingManager routingManager = factory.buildRoutingManager();
+    TableCache tableCache = factory.buildTableCache();
+    return new QueryEnvironment(new TypeFactory(new TypeSystem()),
+        CalciteSchemaBuilder.asRootSchema(new PinotCatalog(tableCache)),
+        new WorkerManager("localhost", reducerPort, routingManager));
+  }
 }
diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestUtils.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestUtils.java
deleted file mode 100644
index 6e2776d6c7..0000000000
--- a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestUtils.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.query;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import org.apache.calcite.jdbc.CalciteSchemaBuilder;
-import org.apache.pinot.common.config.provider.TableCache;
-import org.apache.pinot.common.request.BrokerRequest;
-import org.apache.pinot.core.routing.RoutingManager;
-import org.apache.pinot.core.routing.RoutingTable;
-import org.apache.pinot.core.routing.TimeBoundaryInfo;
-import org.apache.pinot.core.transport.ServerInstance;
-import org.apache.pinot.query.catalog.PinotCatalog;
-import org.apache.pinot.query.planner.QueryPlan;
-import org.apache.pinot.query.routing.WorkerInstance;
-import org.apache.pinot.query.routing.WorkerManager;
-import org.apache.pinot.query.type.TypeFactory;
-import org.apache.pinot.query.type.TypeSystem;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-/**
- * Base query environment test that provides a bunch of mock tables / schemas so that
- * we can run a simple query planning, produce stages / metadata for other components to test.
- */
-public class QueryEnvironmentTestUtils {
-  public static final Schema.SchemaBuilder SCHEMA_BUILDER;
-  public static final Map<String, List<String>> SERVER1_SEGMENTS =
-      ImmutableMap.of("a", Lists.newArrayList("a1", "a2"), "b", Lists.newArrayList("b1"), "c",
-          Lists.newArrayList("c1"), "d_O", Lists.newArrayList("d1"));
-  public static final Map<String, List<String>> SERVER2_SEGMENTS =
-      ImmutableMap.of("a", Lists.newArrayList("a3"), "c", Lists.newArrayList("c2", "c3"),
-          "d_R", Lists.newArrayList("d2"), "d_O", Lists.newArrayList("d3"));
-
-  public static final Map<String, String> TABLE_NAME_MAP;
-  public static final Map<String, Schema> SCHEMA_NAME_MAP;
-
-  static {
-    SCHEMA_BUILDER = new Schema.SchemaBuilder()
-        .addSingleValueDimension("col1", FieldSpec.DataType.STRING, "")
-        .addSingleValueDimension("col2", FieldSpec.DataType.STRING, "")
-        .addDateTime("ts", FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:HOURS")
-        .addMetric("col3", FieldSpec.DataType.INT, 0)
-        .setSchemaName("defaultSchemaName");
-    SCHEMA_NAME_MAP = ImmutableMap.of(
-        "a", SCHEMA_BUILDER.setSchemaName("a").build(),
-        "b", SCHEMA_BUILDER.setSchemaName("b").build(),
-        "c", SCHEMA_BUILDER.setSchemaName("c").build(),
-        "d", SCHEMA_BUILDER.setSchemaName("d").build());
-    TABLE_NAME_MAP = ImmutableMap.of("a_REALTIME", "a", "b_REALTIME", "b", "c_OFFLINE", "c",
-        "d_OFFLINE", "d", "d_REALTIME", "d");
-  }
-
-  private QueryEnvironmentTestUtils() {
-    // do not instantiate.
-  }
-
-  public static TableCache mockTableCache() {
-    TableCache mock = mock(TableCache.class);
-    when(mock.getTableNameMap()).thenReturn(TABLE_NAME_MAP);
-    when(mock.getSchema(anyString())).thenAnswer(invocationOnMock -> {
-      String schemaName = invocationOnMock.getArgument(0);
-      return SCHEMA_NAME_MAP.get(schemaName);
-    });
-    return mock;
-  }
-
-  public static QueryEnvironment getQueryEnvironment(int reducerPort, int port1, int port2) {
-    RoutingManager routingManager = QueryEnvironmentTestUtils.getMockRoutingManager(port1, port2);
-    return new QueryEnvironment(new TypeFactory(new TypeSystem()),
-        CalciteSchemaBuilder.asRootSchema(new PinotCatalog(QueryEnvironmentTestUtils.mockTableCache())),
-        new WorkerManager("localhost", reducerPort, routingManager));
-  }
-
-  public static RoutingManager getMockRoutingManager(int port1, int port2) {
-    String server1 = String.format("localhost_%d", port1);
-    String server2 = String.format("localhost_%d", port2);
-    // this doesn't test the QueryServer functionality so the server port can be the same as the mailbox port.
-    // this is only use for test identifier purpose.
-    ServerInstance host1 = new WorkerInstance("localhost", port1, port1, port1, port1);
-    ServerInstance host2 = new WorkerInstance("localhost", port2, port2, port2, port2);
-
-    RoutingTable rtA = mock(RoutingTable.class);
-    when(rtA.getServerInstanceToSegmentsMap()).thenReturn(
-        ImmutableMap.of(host1, SERVER1_SEGMENTS.get("a"), host2, SERVER2_SEGMENTS.get("a")));
-    RoutingTable rtB = mock(RoutingTable.class);
-    when(rtB.getServerInstanceToSegmentsMap()).thenReturn(ImmutableMap.of(host1, SERVER1_SEGMENTS.get("b")));
-    RoutingTable rtC = mock(RoutingTable.class);
-    when(rtC.getServerInstanceToSegmentsMap()).thenReturn(
-        ImmutableMap.of(host1, SERVER1_SEGMENTS.get("c"), host2, SERVER2_SEGMENTS.get("c")));
-
-    // hybrid table
-    RoutingTable rtDOffline = mock(RoutingTable.class);
-    RoutingTable rtDRealtime = mock(RoutingTable.class);
-    when(rtDOffline.getServerInstanceToSegmentsMap()).thenReturn(
-        ImmutableMap.of(host1, SERVER1_SEGMENTS.get("d_O"), host2, SERVER2_SEGMENTS.get("d_O")));
-    when(rtDRealtime.getServerInstanceToSegmentsMap()).thenReturn(ImmutableMap.of(host2, SERVER2_SEGMENTS.get("d_R")));
-    Map<String, RoutingTable> mockRoutingTableMap = ImmutableMap.of("a", rtA, "b", rtB, "c", rtC,
-        "d_OFFLINE", rtDOffline, "d_REALTIME", rtDRealtime);
-
-    RoutingManager mock = mock(RoutingManager.class);
-    when(mock.getRoutingTable(any())).thenAnswer(invocation -> {
-      BrokerRequest brokerRequest = invocation.getArgument(0);
-      String tableName = brokerRequest.getPinotQuery().getDataSource().getTableName();
-      return mockRoutingTableMap.getOrDefault(tableName,
-          mockRoutingTableMap.get(TableNameBuilder.extractRawTableName(tableName)));
-    });
-    when(mock.getEnabledServerInstanceMap()).thenReturn(ImmutableMap.of(server1, host1, server2, host2));
-    when(mock.getTimeBoundaryInfo(anyString())).thenAnswer(invocation -> {
-      String offlineTableName = invocation.getArgument(0);
-      return "d_OFFLINE".equals(offlineTableName) ? new TimeBoundaryInfo("ts",
-          String.valueOf(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1))) : null;
-    });
-    return mock;
-  }
-
-  public static int getTestStageByServerCount(QueryPlan queryPlan, int serverCount) {
-    List<Integer> stageIds = queryPlan.getStageMetadataMap().entrySet().stream()
-        .filter(e -> !e.getKey().equals(0) && e.getValue().getServerInstances().size() == serverCount)
-        .map(Map.Entry::getKey).collect(Collectors.toList());
-    return stageIds.size() > 0 ? stageIds.get(0) : -1;
-  }
-
-  public static int getAvailablePort() {
-    try {
-      try (ServerSocket socket = new ServerSocket(0)) {
-        return socket.getLocalPort();
-      }
-    } catch (IOException e) {
-      throw new RuntimeException("Failed to find an available port to use", e);
-    }
-  }
-}
diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java
new file mode 100644
index 0000000000..9d2d5e1a22
--- /dev/null
+++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.testutils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.core.routing.RoutingManager;
+import org.apache.pinot.core.routing.RoutingTable;
+import org.apache.pinot.core.routing.TimeBoundaryInfo;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.routing.WorkerInstance;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * This is a builder pattern for generating a Mock Routing Manager.
+ */
+public class MockRoutingManagerFactory {
+  private static final String TIME_BOUNDARY_COLUMN = "ts";
+  private static final String HOST_NAME = "localhost";
+
+  private final HashMap<String, String> _tableNameMap;
+  private final Map<String, Schema> _schemaMap;
+
+  private final Map<String, ServerInstance> _serverInstances;
+  private final Map<String, RoutingTable> _routingTableMap;
+  private final List<String> _hybridTables;
+
+  private final Map<String, Map<ServerInstance, List<String>>> _tableServerSegmentMap;
+
+  public MockRoutingManagerFactory(int... ports) {
+    _hybridTables = new ArrayList<>();
+    _serverInstances = new HashMap<>();
+    _schemaMap = new HashMap<>();
+    _tableNameMap = new HashMap<>();
+    _routingTableMap = new HashMap<>();
+
+    _tableServerSegmentMap = new HashMap<>();
+    for (int port : ports) {
+      _serverInstances.put(toHostname(port), new WorkerInstance(HOST_NAME, port, port, port, port));
+    }
+  }
+
+  public MockRoutingManagerFactory registerTable(Schema schema, String tableName) {
+    TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName);
+    if (tableType == null) {
+      registerTableNameWithType(schema, TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(tableName));
+      registerTableNameWithType(schema, TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName));
+      _hybridTables.add(tableName);
+    } else {
+      registerTableNameWithType(schema, TableNameBuilder.forType(tableType).tableNameWithType(tableName));
+    }
+    return this;
+  }
+
+  public MockRoutingManagerFactory registerSegment(int insertToServerPort, String tableNameWithType,
+      String segmentName) {
+    Map<ServerInstance, List<String>> serverSegmentMap =
+        _tableServerSegmentMap.getOrDefault(tableNameWithType, new HashMap<>());
+    ServerInstance serverInstance = _serverInstances.get(toHostname(insertToServerPort));
+
+    List<String> sSegments = serverSegmentMap.getOrDefault(serverInstance, new ArrayList<>());
+    sSegments.add(segmentName);
+    serverSegmentMap.put(serverInstance, sSegments);
+    _tableServerSegmentMap.put(tableNameWithType, serverSegmentMap);
+    return this;
+  }
+
+  public RoutingManager buildRoutingManager() {
+    // create all the fake routing tables
+    _routingTableMap.clear();
+    for (Map.Entry<String, Map<ServerInstance, List<String>>> tableEntry : _tableServerSegmentMap.entrySet()) {
+      String tableNameWithType = tableEntry.getKey();
+      RoutingTable fakeRoutingTable = new RoutingTable(tableEntry.getValue(), Collections.emptyList(), 0);
+      _routingTableMap.put(tableNameWithType, fakeRoutingTable);
+    }
+    return new FakeRoutingManager(_routingTableMap, _serverInstances, _hybridTables);
+  }
+
+  public TableCache buildTableCache() {
+    TableCache mock = mock(TableCache.class);
+    when(mock.getTableNameMap()).thenReturn(_tableNameMap);
+    when(mock.getSchema(anyString())).thenAnswer(invocationOnMock -> {
+      String schemaName = invocationOnMock.getArgument(0);
+      return _schemaMap.get(schemaName);
+    });
+    return mock;
+  }
+
+  private static String toHostname(int port) {
+    return String.format("%s_%d", HOST_NAME, port);
+  }
+
+  private void registerTableNameWithType(Schema schema, String tableNameWithType) {
+    String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
+    _tableNameMap.put(tableNameWithType, rawTableName);
+    _schemaMap.put(rawTableName, schema);
+    _schemaMap.put(tableNameWithType, schema);
+  }
+
+  private static class FakeRoutingManager implements RoutingManager {
+    private final Map<String, RoutingTable> _routingTableMap;
+    private final Map<String, ServerInstance> _serverInstances;
+    private final List<String> _hybridTables;
+
+    public FakeRoutingManager(Map<String, RoutingTable> routingTableMap, Map<String, ServerInstance> serverInstances,
+        List<String> hybridTables) {
+      _routingTableMap = routingTableMap;
+      _serverInstances = serverInstances;
+      _hybridTables = hybridTables;
+    }
+
+    @Override
+    public Map<String, ServerInstance> getEnabledServerInstanceMap() {
+      return _serverInstances;
+    }
+
+    @Override
+    public RoutingTable getRoutingTable(BrokerRequest brokerRequest) {
+      String tableName = brokerRequest.getPinotQuery().getDataSource().getTableName();
+      return _routingTableMap.getOrDefault(tableName,
+          _routingTableMap.get(TableNameBuilder.extractRawTableName(tableName)));
+    }
+
+    @Override
+    public boolean routingExists(String tableNameWithType) {
+      return _routingTableMap.containsKey(tableNameWithType);
+    }
+
+    @Override
+    public TimeBoundaryInfo getTimeBoundaryInfo(String tableName) {
+      String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+      return _hybridTables.contains(rawTableName) ? new TimeBoundaryInfo(TIME_BOUNDARY_COLUMN,
+          String.valueOf(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1))) : null;
+    }
+  }
+}
diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/QueryTestUtils.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/QueryTestUtils.java
new file mode 100644
index 0000000000..6fb22b24ac
--- /dev/null
+++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/QueryTestUtils.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.testutils;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+
+
+public class QueryTestUtils {
+
+  private QueryTestUtils() {
+    // do not instantiate.
+  }
+
+  /**
+   * Acquire an available port. This acquired port is subject to a race-condition - a different process/thread may
+   * bind to the same port immediately after this method return and before the port bind to the intended process/thread.
+   */
+  public static synchronized int getAvailablePort() {
+    try {
+      try (ServerSocket socket = new ServerSocket(0)) {
+        return socket.getLocalPort();
+      }
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to find an available port to use", e);
+    }
+  }
+}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
index d9ae5150bc..9481c4db61 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
@@ -18,16 +18,10 @@
  */
 package org.apache.pinot.query;
 
-import java.io.File;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import org.apache.commons.io.FileUtils;
 import org.apache.helix.HelixManager;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
@@ -35,31 +29,18 @@ import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.utils.NamedThreadFactory;
 import org.apache.pinot.common.utils.SchemaUtils;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
-import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
 import org.apache.pinot.query.runtime.QueryRunner;
 import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
 import org.apache.pinot.query.service.QueryConfig;
-import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
-import org.apache.pinot.segment.local.data.manager.TableDataManager;
-import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
-import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
-import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
-import org.apache.pinot.segment.spi.ImmutableSegment;
-import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.query.testutils.MockInstanceDataManagerFactory;
+import org.apache.pinot.query.testutils.QueryTestUtils;
 import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.utils.ReadMode;
-import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.matches;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -78,41 +59,23 @@ import static org.mockito.Mockito.when;
  * multi-stage query communication.
  */
 public class QueryServerEnclosure {
-  private static final int NUM_ROWS = 5;
   private static final int DEFAULT_EXECUTOR_THREAD_NUM = 5;
-  private static final String[] STRING_FIELD_LIST = new String[]{"foo", "bar", "alice", "bob", "charlie"};
-  private static final int[] INT_FIELD_LIST = new int[]{1, 42};
   private static final String TABLE_CONFIGS_PREFIX = "/CONFIGS/TABLE/";
   private static final String SCHEMAS_PREFIX = "/SCHEMAS/";
 
   private final ExecutorService _testExecutor;
   private final int _queryRunnerPort;
   private final Map<String, Object> _runnerConfig = new HashMap<>();
-  private final Map<String, List<ImmutableSegment>> _segmentMap = new HashMap<>();
-  private final Map<String, List<GenericRow>> _rowsMap = new HashMap<>();
   private final InstanceDataManager _instanceDataManager;
-  private final Map<String, TableDataManager> _tableDataManagers = new HashMap<>();
-  private final Map<String, File> _indexDirs;
   private final HelixManager _helixManager;
 
   private QueryRunner _queryRunner;
 
-  public QueryServerEnclosure(Map<String, File> indexDirs, Map<String, List<String>> segments) {
-    _indexDirs = indexDirs;
+  public QueryServerEnclosure(MockInstanceDataManagerFactory factory) {
     try {
-      for (Map.Entry<String, List<String>> entry : segments.entrySet()) {
-        String tableName = entry.getKey();
-        File indexDir = indexDirs.get(entry.getKey());
-        FileUtils.deleteQuietly(indexDir);
-        List<ImmutableSegment> segmentList = new ArrayList<>();
-        for (String segmentName : entry.getValue()) {
-          segmentList.add(buildSegment(indexDir, tableName, segmentName));
-        }
-        _segmentMap.put(tableName, segmentList);
-      }
-      _instanceDataManager = mockInstanceDataManager();
-      _helixManager = mockHelixManager();
-      _queryRunnerPort = QueryEnvironmentTestUtils.getAvailablePort();
+      _instanceDataManager = factory.buildInstanceDataManager();
+      _helixManager = mockHelixManager(factory.buildSchemaMap());
+      _queryRunnerPort = QueryTestUtils.getAvailablePort();
       _runnerConfig.put(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, _queryRunnerPort);
       _runnerConfig.put(QueryConfig.KEY_OF_QUERY_RUNNER_HOSTNAME,
           String.format("Server_%s", QueryConfig.DEFAULT_QUERY_RUNNER_HOSTNAME));
@@ -124,7 +87,7 @@ public class QueryServerEnclosure {
     }
   }
 
-  private HelixManager mockHelixManager() {
+  private HelixManager mockHelixManager(Map<String, Schema> schemaMap) {
     ZkHelixPropertyStore<ZNRecord> zkHelixPropertyStore = mock(ZkHelixPropertyStore.class);
     when(zkHelixPropertyStore.get(anyString(), any(), anyInt())).thenAnswer(invocationOnMock -> {
       String path = invocationOnMock.getArgument(0);
@@ -133,7 +96,7 @@ public class QueryServerEnclosure {
         return null;
       } else if (path.startsWith(SCHEMAS_PREFIX)) {
         String tableName = TableNameBuilder.extractRawTableName(path.substring(SCHEMAS_PREFIX.length()));
-        return SchemaUtils.toZNRecord(QueryEnvironmentTestUtils.SCHEMA_NAME_MAP.get(tableName));
+        return SchemaUtils.toZNRecord(schemaMap.get(tableName));
       } else {
         return null;
       }
@@ -147,67 +110,6 @@ public class QueryServerEnclosure {
     return mock(ServerMetrics.class);
   }
 
-  private InstanceDataManager mockInstanceDataManager() {
-    InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
-    for (Map.Entry<String, List<ImmutableSegment>> e : _segmentMap.entrySet()) {
-      TableDataManager tableDataManager = mockTableDataManager(e.getValue());
-      _tableDataManagers.put(e.getKey(), tableDataManager);
-    }
-    for (Map.Entry<String, TableDataManager> e : _tableDataManagers.entrySet()) {
-      when(instanceDataManager.getTableDataManager(matches(String.format("%s.*", e.getKey())))).thenReturn(
-          e.getValue());
-    }
-    return instanceDataManager;
-  }
-
-  private TableDataManager mockTableDataManager(List<ImmutableSegment> segmentList) {
-    List<SegmentDataManager> tableSegmentDataManagers =
-        segmentList.stream().map(ImmutableSegmentDataManager::new).collect(Collectors.toList());
-    TableDataManager tableDataManager = mock(TableDataManager.class);
-    when(tableDataManager.acquireSegments(any(), any())).thenReturn(tableSegmentDataManagers);
-    return tableDataManager;
-  }
-
-  private static List<GenericRow> buildRows(String tableName) {
-    List<GenericRow> rows = new ArrayList<>(NUM_ROWS);
-    for (int i = 0; i < NUM_ROWS; i++) {
-      GenericRow row = new GenericRow();
-      row.putValue("col1", STRING_FIELD_LIST[i % STRING_FIELD_LIST.length]);
-      row.putValue("col2", STRING_FIELD_LIST[i % (STRING_FIELD_LIST.length - 2)]);
-      row.putValue("col3", INT_FIELD_LIST[i % INT_FIELD_LIST.length]);
-      row.putValue("ts", tableName.endsWith("_O")
-          ? System.currentTimeMillis() - TimeUnit.DAYS.toMillis(2) : System.currentTimeMillis());
-      rows.add(row);
-    }
-    return rows;
-  }
-
-  private ImmutableSegment buildSegment(File indexDir, String tableName, String segmentName)
-      throws Exception {
-    List<GenericRow> rows = buildRows(tableName);
-    _rowsMap.putIfAbsent(tableName, new ArrayList<>());
-    _rowsMap.get(tableName).addAll(rows);
-
-    TableConfig tableConfig =
-        new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName).setTimeColumnName("ts").build();
-    Schema schema = QueryEnvironmentTestUtils.SCHEMA_BUILDER.setSchemaName(tableName).build();
-    SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema);
-    config.setOutDir(indexDir.getPath());
-    config.setTableName(tableName);
-    config.setSegmentName(segmentName);
-
-    SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
-    try (RecordReader recordReader = new GenericRowRecordReader(rows)) {
-      driver.init(config, recordReader);
-    }
-    driver.build();
-    return ImmutableSegmentLoader.load(new File(indexDir, segmentName), ReadMode.mmap);
-  }
-
-  public Map<String, List<GenericRow>> getRowsMap() {
-    return _rowsMap;
-  }
-
   public int getPort() {
     return _queryRunnerPort;
   }
@@ -222,12 +124,6 @@ public class QueryServerEnclosure {
 
   public void shutDown() {
     _queryRunner.shutDown();
-    for (Map.Entry<String, List<ImmutableSegment>> e : _segmentMap.entrySet()) {
-      for (ImmutableSegment segment : e.getValue()) {
-        segment.destroy();
-      }
-      FileUtils.deleteQuietly(_indexDirs.get(e.getKey()));
-    }
   }
 
   public void processQuery(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap) {
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTestBase.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTestBase.java
index 60704e1c96..51ef9606cb 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTestBase.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTestBase.java
@@ -20,8 +20,8 @@ package org.apache.pinot.query.mailbox;
 
 import java.util.Collections;
 import java.util.TreeMap;
-import org.apache.pinot.query.QueryEnvironmentTestUtils;
 import org.apache.pinot.query.service.QueryConfig;
+import org.apache.pinot.query.testutils.QueryTestUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -37,7 +37,7 @@ public abstract class GrpcMailboxServiceTestBase {
     PinotConfiguration extraConfig = new PinotConfiguration(Collections.singletonMap(
         QueryConfig.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES, 4_000_000));
     for (int i = 0; i < MAILBOX_TEST_SIZE; i++) {
-      int availablePort = QueryEnvironmentTestUtils.getAvailablePort();
+      int availablePort = QueryTestUtils.getAvailablePort();
       GrpcMailboxService grpcMailboxService = new GrpcMailboxService("localhost", availablePort, extraConfig);
       grpcMailboxService.start();
       _mailboxServices.put(availablePort, grpcMailboxService);
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
index 916a12ff1a..290ac2615d 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.query.runtime;
 
-import com.google.common.collect.ImmutableMap;
 import java.io.File;
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -35,12 +34,14 @@ import org.apache.pinot.common.datatable.DataTableFactory;
 import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.QueryEnvironment;
-import org.apache.pinot.query.QueryEnvironmentTestUtils;
+import org.apache.pinot.query.QueryEnvironmentTestBase;
 import org.apache.pinot.query.QueryServerEnclosure;
 import org.apache.pinot.query.QueryTestSet;
 import org.apache.pinot.query.mailbox.GrpcMailboxService;
 import org.apache.pinot.query.routing.WorkerInstance;
 import org.apache.pinot.query.service.QueryConfig;
+import org.apache.pinot.query.testutils.MockInstanceDataManagerFactory;
+import org.apache.pinot.query.testutils.QueryTestUtils;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
@@ -85,7 +86,7 @@ public class QueryRunnerTestBase extends QueryTestSet {
 
   protected void addTableToH2(List<String> tables)
       throws SQLException {
-    Schema schema = QueryEnvironmentTestUtils.SCHEMA_BUILDER.build();
+    Schema schema = QueryEnvironmentTestBase.SCHEMA_BUILDER.build();
     List<String> h2FieldNamesAndTypes = toH2FieldNamesAndTypes(schema);
     for (String tableName : tables) {
       // create table
@@ -97,7 +98,7 @@ public class QueryRunnerTestBase extends QueryTestSet {
 
   protected void addDataToH2(Map<String, List<GenericRow>> rowsMap)
       throws SQLException {
-    Schema schema = QueryEnvironmentTestUtils.SCHEMA_BUILDER.build();
+    Schema schema = QueryEnvironmentTestBase.SCHEMA_BUILDER.build();
     List<String> h2FieldNamesAndTypes = toH2FieldNamesAndTypes(schema);
     for (Map.Entry<String, List<GenericRow>> entry : rowsMap.entrySet()) {
       String tableName = entry.getKey();
@@ -127,20 +128,35 @@ public class QueryRunnerTestBase extends QueryTestSet {
   public void setUp()
       throws Exception {
     DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4);
-    QueryServerEnclosure server1 = new QueryServerEnclosure(
-        ImmutableMap.of("a", INDEX_DIR_S1_A, "b", INDEX_DIR_S1_B, "c", INDEX_DIR_S1_C, "d_O", INDEX_DIR_S1_D),
-        QueryEnvironmentTestUtils.SERVER1_SEGMENTS);
-    QueryServerEnclosure server2 = new QueryServerEnclosure(
-        ImmutableMap.of("a", INDEX_DIR_S2_A, "c", INDEX_DIR_S2_C, "d_R", INDEX_DIR_S2_D, "d_O", INDEX_DIR_S1_D),
-        QueryEnvironmentTestUtils.SERVER2_SEGMENTS);
+    MockInstanceDataManagerFactory factory1 = new MockInstanceDataManagerFactory("server1")
+        .registerTable(QueryEnvironmentTestBase.SCHEMA_BUILDER.setSchemaName("a").build(), "a_REALTIME")
+        .registerTable(QueryEnvironmentTestBase.SCHEMA_BUILDER.setSchemaName("b").build(), "b_REALTIME")
+        .registerTable(QueryEnvironmentTestBase.SCHEMA_BUILDER.setSchemaName("c").build(), "c_OFFLINE")
+        .registerTable(QueryEnvironmentTestBase.SCHEMA_BUILDER.setSchemaName("d").build(), "d")
+        .addSegment("a_REALTIME", QueryEnvironmentTestBase.buildRows("a_REALTIME"))
+        .addSegment("a_REALTIME", QueryEnvironmentTestBase.buildRows("a_REALTIME"))
+        .addSegment("b_REALTIME", QueryEnvironmentTestBase.buildRows("b_REALTIME"))
+        .addSegment("c_OFFLINE", QueryEnvironmentTestBase.buildRows("c_OFFLINE"))
+        .addSegment("d_OFFLINE", QueryEnvironmentTestBase.buildRows("d_OFFLINE"));
+    MockInstanceDataManagerFactory factory2 = new MockInstanceDataManagerFactory("server1")
+        .registerTable(QueryEnvironmentTestBase.SCHEMA_BUILDER.setSchemaName("a").build(), "a_REALTIME")
+        .registerTable(QueryEnvironmentTestBase.SCHEMA_BUILDER.setSchemaName("c").build(), "c_OFFLINE")
+        .registerTable(QueryEnvironmentTestBase.SCHEMA_BUILDER.setSchemaName("d").build(), "d")
+        .addSegment("a_REALTIME", QueryEnvironmentTestBase.buildRows("a_REALTIME"))
+        .addSegment("c_OFFLINE", QueryEnvironmentTestBase.buildRows("c_OFFLINE"))
+        .addSegment("c_OFFLINE", QueryEnvironmentTestBase.buildRows("c_OFFLINE"))
+        .addSegment("d_OFFLINE", QueryEnvironmentTestBase.buildRows("d_OFFLINE"))
+        .addSegment("d_REALTIME", QueryEnvironmentTestBase.buildRows("d_REALTIME"));
+    QueryServerEnclosure server1 = new QueryServerEnclosure(factory1);
+    QueryServerEnclosure server2 = new QueryServerEnclosure(factory2);
 
     // Setting up H2 for validation
     setH2Connection();
     addTableToH2(Arrays.asList("a", "b", "c", "d"));
-    addDataToH2(server1.getRowsMap());
-    addDataToH2(server2.getRowsMap());
+    addDataToH2(factory1.buildTableRowsMap());
+    addDataToH2(factory2.buildTableRowsMap());
 
-    _reducerGrpcPort = QueryEnvironmentTestUtils.getAvailablePort();
+    _reducerGrpcPort = QueryTestUtils.getAvailablePort();
     _reducerHostname = String.format("Broker_%s", QueryConfig.DEFAULT_QUERY_RUNNER_HOSTNAME);
     Map<String, Object> reducerConfig = new HashMap<>();
     reducerConfig.put(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, _reducerGrpcPort);
@@ -148,8 +164,8 @@ public class QueryRunnerTestBase extends QueryTestSet {
     _mailboxService = new GrpcMailboxService(_reducerHostname, _reducerGrpcPort, new PinotConfiguration(reducerConfig));
     _mailboxService.start();
 
-    _queryEnvironment = QueryEnvironmentTestUtils.getQueryEnvironment(_reducerGrpcPort, server1.getPort(),
-        server2.getPort());
+    _queryEnvironment = QueryEnvironmentTestBase.getQueryEnvironment(_reducerGrpcPort, server1.getPort(),
+        server2.getPort(), factory1.buildTableSegmentNameMap(), factory2.buildTableSegmentNameMap());
     server1.start();
     server2.start();
     // this doesn't test the QueryServer functionality so the server port can be the same as the mailbox port.
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
index d467ba8112..fc782c5c57 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
@@ -21,40 +21,26 @@ package org.apache.pinot.query.runtime.operator;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.calcite.sql.SqlKind;
-import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.spi.data.FieldSpec;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
 import org.testng.Assert;
-import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import static org.mockito.Mockito.when;
 
 
 public class AggregateOperatorTest {
-  @Mock
-  Operator<TransferableBlock> _upstreamOperator;
-
-  @BeforeMethod
-  public void setup() {
-    MockitoAnnotations.initMocks(this);
-  }
 
   @Test
   public void testGroupByAggregateWithHashCollision() {
-    // "Aa" and "BB" have same hash code in java.
-    List<Object[]> rows = Arrays.asList(new Object[]{1, "Aa"}, new Object[]{2, "BB"}, new Object[]{3, "BB"});
-    when(_upstreamOperator.nextBlock()).thenReturn(OperatorTestUtil.getRowDataBlock(rows))
-        .thenReturn(OperatorTestUtil.getEndOfStreamRowBlock());
+    BaseOperator<TransferableBlock> upstreamOperator = OperatorTestUtil.getOperator(OperatorTestUtil.OP_1);
     // Create an aggregation call with sum for first column and group by second column.
     RexExpression.FunctionCall agg = new RexExpression.FunctionCall(SqlKind.SUM, FieldSpec.DataType.INT, "SUM",
         Arrays.asList(new RexExpression.InputRef(0)));
     AggregateOperator sum0GroupBy1 =
-        new AggregateOperator(_upstreamOperator, OperatorTestUtil.TEST_DATA_SCHEMA, Arrays.asList(agg),
-            Arrays.asList(new RexExpression.InputRef(1)));
+        new AggregateOperator(upstreamOperator, OperatorTestUtil.getDataSchema(OperatorTestUtil.OP_1),
+            Arrays.asList(agg), Arrays.asList(new RexExpression.InputRef(1)));
     TransferableBlock result = sum0GroupBy1.getNextBlock();
     while (result.isNoOpBlock()) {
       result = sum0GroupBy1.getNextBlock();
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
index 9975cd57ff..65bb2192c2 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
@@ -23,18 +23,14 @@ import java.util.Arrays;
 import java.util.List;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
 import org.apache.pinot.query.planner.stage.JoinNode;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
 import org.testng.Assert;
-import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import static org.mockito.Mockito.when;
 
 
 public class HashJoinOperatorTest {
@@ -43,32 +39,17 @@ public class HashJoinOperatorTest {
     FieldSelectionKeySelector rightSelect = new FieldSelectionKeySelector(rightIdx);
     return new JoinNode.JoinKeys(leftSelect, rightSelect);
   }
-  @Mock
-  Operator<TransferableBlock> _leftOperator;
-
-  @Mock
-  Operator<TransferableBlock> _rightOperator;
-
-  @BeforeMethod
-  public void setup() {
-    MockitoAnnotations.initMocks(this);
-  }
 
   @Test
   public void testHashJoinKeyCollisionInnerJoin() {
-    // "Aa" and "BB" have same hash code in java.
-    List<Object[]> rows = Arrays.asList(new Object[]{1, "Aa"}, new Object[]{2, "BB"}, new Object[]{3, "BB"});
-    when(_leftOperator.nextBlock()).thenReturn(OperatorTestUtil.getRowDataBlock(rows))
-        .thenReturn(OperatorTestUtil.getEndOfStreamRowBlock());
-    when(_rightOperator.nextBlock()).thenReturn(OperatorTestUtil.getRowDataBlock(rows))
-        .thenReturn(OperatorTestUtil.getEndOfStreamRowBlock());
-
+    BaseOperator<TransferableBlock> leftOperator = OperatorTestUtil.getOperator(OperatorTestUtil.OP_1);
+    BaseOperator<TransferableBlock> rightOperator = OperatorTestUtil.getOperator(OperatorTestUtil.OP_1);
     List<RexExpression> joinClauses = new ArrayList<>();
     DataSchema resultSchema = new DataSchema(new String[]{"foo", "bar", "foo", "bar"}, new DataSchema.ColumnDataType[]{
         DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT,
         DataSchema.ColumnDataType.STRING
     });
-    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, resultSchema,
+    HashJoinOperator join = new HashJoinOperator(leftOperator, rightOperator, resultSchema,
         getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, JoinRelType.INNER);
 
     TransferableBlock result = join.nextBlock();
@@ -89,19 +70,15 @@ public class HashJoinOperatorTest {
 
   @Test
   public void testInnerJoin() {
-    List<Object[]> leftRows = Arrays.asList(new Object[]{1, "Aa"}, new Object[]{2, "BB"}, new Object[]{3, "BB"});
-    when(_leftOperator.nextBlock()).thenReturn(OperatorTestUtil.getRowDataBlock(leftRows))
-        .thenReturn(OperatorTestUtil.getEndOfStreamRowBlock());
-    List<Object[]> rightRows = Arrays.asList(new Object[]{1, "AA"}, new Object[]{2, "Aa"});
-    when(_rightOperator.nextBlock()).thenReturn(OperatorTestUtil.getRowDataBlock(rightRows))
-        .thenReturn(OperatorTestUtil.getEndOfStreamRowBlock());
+    BaseOperator<TransferableBlock> leftOperator = OperatorTestUtil.getOperator(OperatorTestUtil.OP_1);
+    BaseOperator<TransferableBlock> rightOperator = OperatorTestUtil.getOperator(OperatorTestUtil.OP_2);
 
     List<RexExpression> joinClauses = new ArrayList<>();
     DataSchema resultSchema = new DataSchema(new String[]{"foo", "bar", "foo", "bar"}, new DataSchema.ColumnDataType[]{
         DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT,
         DataSchema.ColumnDataType.STRING
     });
-    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, resultSchema,
+    HashJoinOperator join = new HashJoinOperator(leftOperator, rightOperator, resultSchema,
         getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, JoinRelType.INNER);
 
     TransferableBlock result = join.nextBlock();
@@ -118,19 +95,15 @@ public class HashJoinOperatorTest {
 
   @Test
   public void testLeftJoin() {
-    List<Object[]> leftRows = Arrays.asList(new Object[]{1, "Aa"}, new Object[]{2, "BB"}, new Object[]{3, "BB"});
-    when(_leftOperator.nextBlock()).thenReturn(OperatorTestUtil.getRowDataBlock(leftRows))
-        .thenReturn(OperatorTestUtil.getEndOfStreamRowBlock());
-    List<Object[]> rightRows = Arrays.asList(new Object[]{1, "AA"}, new Object[]{2, "Aa"});
-    when(_rightOperator.nextBlock()).thenReturn(OperatorTestUtil.getRowDataBlock(rightRows))
-        .thenReturn(OperatorTestUtil.getEndOfStreamRowBlock());
+    BaseOperator<TransferableBlock> leftOperator = OperatorTestUtil.getOperator(OperatorTestUtil.OP_1);
+    BaseOperator<TransferableBlock> rightOperator = OperatorTestUtil.getOperator(OperatorTestUtil.OP_2);
 
     List<RexExpression> joinClauses = new ArrayList<>();
     DataSchema resultSchema = new DataSchema(new String[]{"foo", "bar", "foo", "bar"}, new DataSchema.ColumnDataType[]{
         DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT,
         DataSchema.ColumnDataType.STRING
     });
-    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, resultSchema,
+    HashJoinOperator join = new HashJoinOperator(leftOperator, rightOperator, resultSchema,
         getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, JoinRelType.LEFT);
 
     TransferableBlock result = join.nextBlock();
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
index aaa190a3d4..86b65a1686 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
@@ -18,33 +18,43 @@
  */
 package org.apache.pinot.query.runtime.operator;
 
+import java.util.Arrays;
 import java.util.List;
-import org.apache.pinot.common.datablock.BaseDataBlock;
-import org.apache.pinot.common.datablock.DataBlockUtils;
 import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.testutils.MockDataBlockOperatorFactory;
 
 
 public class OperatorTestUtil {
-  private OperatorTestUtil() {
-  }
+  // simple key-value collision schema/data test set: "Aa" and "BB" have same hash code in java.
+  private static final List<List<Object[]>> SIMPLE_KV_DATA_ROWS = Arrays.asList(
+      Arrays.asList(new Object[]{1, "Aa"}, new Object[]{2, "BB"}, new Object[]{3, "BB"}),
+      Arrays.asList(new Object[]{1, "AA"}, new Object[]{2, "Aa"}));
+  private static final MockDataBlockOperatorFactory MOCK_OPERATOR_FACTORY;
 
-  public static final DataSchema TEST_DATA_SCHEMA = new DataSchema(new String[]{"foo", "bar"},
+  public static final DataSchema SIMPLE_KV_DATA_SCHEMA = new DataSchema(new String[]{"foo", "bar"},
       new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING});
 
-  public static TransferableBlock getEndOfStreamRowBlock() {
-    return getEndOfStreamRowBlockWithSchema();
+  public static final String OP_1 = "op1";
+  public static final String OP_2 = "op2";
+
+  static {
+    MOCK_OPERATOR_FACTORY = new MockDataBlockOperatorFactory()
+        .registerOperator(OP_1, SIMPLE_KV_DATA_SCHEMA)
+        .registerOperator(OP_2, SIMPLE_KV_DATA_SCHEMA)
+        .addRows(OP_1, SIMPLE_KV_DATA_ROWS.get(0))
+        .addRows(OP_2, SIMPLE_KV_DATA_ROWS.get(1));
   }
 
-  public static TransferableBlock getEndOfStreamRowBlockWithSchema() {
-    return new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock());
+  private OperatorTestUtil() {
   }
 
-  public static TransferableBlock getRowDataBlock(List<Object[]> rows) {
-    return getRowDataBlockWithSchema(rows, TEST_DATA_SCHEMA);
+  public static BaseOperator<TransferableBlock> getOperator(String operatorName) {
+    return MOCK_OPERATOR_FACTORY.buildMockOperator(operatorName);
   }
 
-  public static TransferableBlock getRowDataBlockWithSchema(List<Object[]> rows, DataSchema schema) {
-    return new TransferableBlock(rows, schema, BaseDataBlock.Type.ROW);
+  public static DataSchema getDataSchema(String operatorName) {
+    return MOCK_OPERATOR_FACTORY.getDataSchema(operatorName);
   }
 }
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryDispatcherTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryDispatcherTest.java
index 34e52f56bb..673858ab00 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryDispatcherTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryDispatcherTest.java
@@ -24,11 +24,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import org.apache.pinot.query.QueryEnvironment;
-import org.apache.pinot.query.QueryEnvironmentTestUtils;
+import org.apache.pinot.query.QueryEnvironmentTestBase;
 import org.apache.pinot.query.QueryTestSet;
 import org.apache.pinot.query.planner.PlannerUtils;
 import org.apache.pinot.query.planner.QueryPlan;
 import org.apache.pinot.query.runtime.QueryRunner;
+import org.apache.pinot.query.testutils.QueryTestUtils;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -49,7 +50,7 @@ public class QueryDispatcherTest extends QueryTestSet {
       throws Exception {
 
     for (int i = 0; i < QUERY_SERVER_COUNT; i++) {
-      int availablePort = QueryEnvironmentTestUtils.getAvailablePort();
+      int availablePort = QueryTestUtils.getAvailablePort();
       QueryRunner queryRunner = Mockito.mock(QueryRunner.class);
       QueryServer queryServer = new QueryServer(availablePort, queryRunner);
       queryServer.start();
@@ -60,7 +61,8 @@ public class QueryDispatcherTest extends QueryTestSet {
     List<Integer> portList = Lists.newArrayList(_queryServerMap.keySet());
 
     // reducer port doesn't matter, we are testing the worker instance not GRPC.
-    _queryEnvironment = QueryEnvironmentTestUtils.getQueryEnvironment(1, portList.get(0), portList.get(1));
+    _queryEnvironment = QueryEnvironmentTestBase.getQueryEnvironment(1, portList.get(0), portList.get(1),
+        QueryEnvironmentTestBase.SERVER1_SEGMENTS, QueryEnvironmentTestBase.SERVER2_SEGMENTS);
   }
 
   @AfterClass
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
index 7202ab8d91..066591dbd3 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
@@ -31,7 +31,7 @@ import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
 import org.apache.pinot.common.proto.Worker;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.QueryEnvironment;
-import org.apache.pinot.query.QueryEnvironmentTestUtils;
+import org.apache.pinot.query.QueryEnvironmentTestBase;
 import org.apache.pinot.query.QueryTestSet;
 import org.apache.pinot.query.planner.QueryPlan;
 import org.apache.pinot.query.planner.StageMetadata;
@@ -39,6 +39,7 @@ import org.apache.pinot.query.planner.stage.StageNode;
 import org.apache.pinot.query.routing.WorkerInstance;
 import org.apache.pinot.query.runtime.QueryRunner;
 import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils;
+import org.apache.pinot.query.testutils.QueryTestUtils;
 import org.apache.pinot.util.TestUtils;
 import org.mockito.Mockito;
 import org.testng.Assert;
@@ -63,7 +64,7 @@ public class QueryServerTest extends QueryTestSet {
       throws Exception {
 
     for (int i = 0; i < QUERY_SERVER_COUNT; i++) {
-      int availablePort = QueryEnvironmentTestUtils.getAvailablePort();
+      int availablePort = QueryTestUtils.getAvailablePort();
       QueryRunner queryRunner = Mockito.mock(QueryRunner.class);
       QueryServer queryServer = new QueryServer(availablePort, queryRunner);
       queryServer.start();
@@ -78,7 +79,8 @@ public class QueryServerTest extends QueryTestSet {
     List<Integer> portList = Lists.newArrayList(_queryServerMap.keySet());
 
     // reducer port doesn't matter, we are testing the worker instance not GRPC.
-    _queryEnvironment = QueryEnvironmentTestUtils.getQueryEnvironment(1, portList.get(0), portList.get(1));
+    _queryEnvironment = QueryEnvironmentTestBase.getQueryEnvironment(1, portList.get(0), portList.get(1),
+        QueryEnvironmentTestBase.SERVER1_SEGMENTS, QueryEnvironmentTestBase.SERVER2_SEGMENTS);
   }
 
   @AfterClass
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockDataBlockOperatorFactory.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockDataBlockOperatorFactory.java
new file mode 100644
index 0000000000..a85ae59e6d
--- /dev/null
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockDataBlockOperatorFactory.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.testutils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.datablock.BaseDataBlock;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+
+public class MockDataBlockOperatorFactory {
+  protected final Map<String, List<List<Object[]>>> _rowsMap;
+  protected final Map<String, DataSchema> _operatorSchemaMap;
+
+  public MockDataBlockOperatorFactory() {
+    _rowsMap = new HashMap<>();
+    _operatorSchemaMap = new HashMap<>();
+  }
+
+  public MockDataBlockOperatorFactory registerOperator(String operatorName, DataSchema resultSchema) {
+    _operatorSchemaMap.put(operatorName, resultSchema);
+    return this;
+  }
+
+  public MockDataBlockOperatorFactory addRows(String operatorName, List<Object[]> rows) {
+    List<List<Object[]>> operatorRows = _rowsMap.getOrDefault(operatorName, new ArrayList<>());
+    operatorRows.add(rows);
+    _rowsMap.put(operatorName, operatorRows);
+    return this;
+  }
+
+  @SuppressWarnings("unchecked")
+  public BaseOperator<TransferableBlock> buildMockOperator(String operatorName) {
+    BaseOperator<TransferableBlock> operator = Mockito.mock(BaseOperator.class);
+    Mockito.when(operator.nextBlock()).thenAnswer(new Answer<Object>() {
+      private int _invocationCount = 0;
+      public Object answer(InvocationOnMock invocation) {
+        return _invocationCount >= _rowsMap.get(operatorName).size()
+            ? TransferableBlockUtils.getEndOfStreamTransferableBlock()
+            : new TransferableBlock(_rowsMap.get(operatorName).get(_invocationCount++),
+                _operatorSchemaMap.get(operatorName), BaseDataBlock.Type.ROW);
+      }
+    });
+    return operator;
+  }
+
+  public DataSchema getDataSchema(String operatorName) {
+    return _operatorSchemaMap.get(operatorName);
+  }
+}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java
new file mode 100644
index 0000000000..396b2e65ab
--- /dev/null
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.testutils;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.matches;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class MockInstanceDataManagerFactory {
+  private static final String DATA_DIR_PREFIX = "MockInstanceDataDir";
+
+  private final Map<String, List<GenericRow>> _tableRowsMap;
+  private final Map<String, List<ImmutableSegment>> _tableSegmentMap;
+  private final Map<String, List<String>> _tableSegmentNameMap;
+  private final Map<String, File> _serverTableDataDirMap;
+  private final Map<String, Schema> _schemaMap;
+  private String _serverName;
+
+  public MockInstanceDataManagerFactory(String serverName) {
+    _serverName = serverName;
+    _serverTableDataDirMap = new HashMap<>();
+    _tableSegmentMap = new HashMap<>();
+    _tableSegmentNameMap = new HashMap<>();
+    _tableRowsMap = new HashMap<>();
+    _schemaMap = new HashMap<>();
+  }
+
+  public MockInstanceDataManagerFactory registerTable(Schema schema, String tableName) {
+    TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName);
+    if (tableType == null) {
+      registerTableNameWithType(schema, TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(tableName));
+      registerTableNameWithType(schema, TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName));
+    } else {
+      registerTableNameWithType(schema, tableName);
+    }
+    return this;
+  }
+
+  public MockInstanceDataManagerFactory addSegment(String tableNameWithType, List<GenericRow> rows) {
+    String segmentName = String.format("%s_%s", tableNameWithType, UUID.randomUUID());
+    File tableDataDir = _serverTableDataDirMap.get(tableNameWithType);
+    ImmutableSegment segment = buildSegment(tableNameWithType, tableDataDir, segmentName, rows);
+
+    List<ImmutableSegment> segmentList = _tableSegmentMap.getOrDefault(tableNameWithType, new ArrayList<>());
+    segmentList.add(segment);
+    _tableSegmentMap.put(tableNameWithType, segmentList);
+
+    List<String> segmentNameList = _tableSegmentNameMap.getOrDefault(tableNameWithType, new ArrayList<>());
+    segmentNameList.add(segment.getSegmentName());
+    _tableSegmentNameMap.put(tableNameWithType, segmentNameList);
+
+    String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
+    List<GenericRow> tableRows = _tableRowsMap.getOrDefault(rawTableName, new ArrayList<>());
+    tableRows.addAll(rows);
+    _tableRowsMap.put(rawTableName, tableRows);
+    return this;
+  }
+
+  public InstanceDataManager buildInstanceDataManager() {
+    InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
+    Map<String, TableDataManager> tableDataManagers = new HashMap<>();
+    for (Map.Entry<String, List<ImmutableSegment>> e : _tableSegmentMap.entrySet()) {
+      TableDataManager tableDataManager = mockTableDataManager(e.getValue());
+      tableDataManagers.put(e.getKey(), tableDataManager);
+    }
+    for (Map.Entry<String, TableDataManager> e : tableDataManagers.entrySet()) {
+      when(instanceDataManager.getTableDataManager(matches(String.format("%s.*", e.getKey())))).thenReturn(
+          e.getValue());
+    }
+    return instanceDataManager;
+  }
+
+  public Map<String, Schema> buildSchemaMap() {
+    return _schemaMap;
+  }
+
+  public Map<String, List<GenericRow>> buildTableRowsMap() {
+    return _tableRowsMap;
+  }
+
+  public Map<String, List<String>> buildTableSegmentNameMap() {
+    return _tableSegmentNameMap;
+  }
+
+  private TableDataManager mockTableDataManager(List<ImmutableSegment> segmentList) {
+    List<SegmentDataManager> tableSegmentDataManagers =
+        segmentList.stream().map(ImmutableSegmentDataManager::new).collect(Collectors.toList());
+    TableDataManager tableDataManager = mock(TableDataManager.class);
+    when(tableDataManager.acquireSegments(any(), any())).thenReturn(tableSegmentDataManagers);
+    return tableDataManager;
+  }
+
+  private ImmutableSegment buildSegment(String tableNameWithType, File indexDir, String segmentName,
+      List<GenericRow> rows) {
+    String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
+    TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+    // TODO: plugin table config constructor
+    TableConfig tableConfig = new TableConfigBuilder(tableType).setTableName(rawTableName).setTimeColumnName("ts")
+        .build();
+    Schema schema = _schemaMap.get(tableNameWithType);
+    SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema);
+    config.setOutDir(indexDir.getPath());
+    config.setTableName(tableNameWithType);
+    config.setSegmentName(segmentName);
+
+    SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
+    try (RecordReader recordReader = new GenericRowRecordReader(rows)) {
+      driver.init(config, recordReader);
+      driver.build();
+      return ImmutableSegmentLoader.load(new File(indexDir, segmentName), ReadMode.mmap);
+    } catch (Exception e) {
+      throw new RuntimeException("Unable to construct immutable segment from records", e);
+    }
+  }
+
+  private void registerTableNameWithType(Schema schema, String tableNameWithType) {
+    File tableDataDir = new File(FileUtils.getTempDirectory(),
+        String.format("%s_%s_%s", DATA_DIR_PREFIX, _serverName, tableNameWithType));
+    FileUtils.deleteQuietly(tableDataDir);
+    _serverTableDataDirMap.put(tableNameWithType, tableDataDir);
+    _schemaMap.put(TableNameBuilder.extractRawTableName(tableNameWithType), schema);
+    _schemaMap.put(tableNameWithType, schema);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org