You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2022/11/08 03:23:00 UTC
[pinot] branch master updated: [multistage] test restructure (#9726)
This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f5fc107aa5 [multistage] test restructure (#9726)
f5fc107aa5 is described below
commit f5fc107aa5751c6408d211876213e9fac98707d9
Author: Rong Rong <ro...@apache.org>
AuthorDate: Mon Nov 7 19:22:55 2022 -0800
[multistage] test restructure (#9726)
* restructure planner with mock routing manager factory
* runtime test restructure with mock instance data manager factory
* also include operator util refactoring with mock operator factory
Co-authored-by: Rong Rong <ro...@startree.ai>
---
.../apache/pinot/query/QueryCompilationTest.java | 9 +-
.../pinot/query/QueryEnvironmentTestBase.java | 79 +++++++++-
.../pinot/query/QueryEnvironmentTestUtils.java | 163 --------------------
.../query/testutils/MockRoutingManagerFactory.java | 165 ++++++++++++++++++++
.../pinot/query/testutils/QueryTestUtils.java | 44 ++++++
.../apache/pinot/query/QueryServerEnclosure.java | 120 +--------------
.../query/mailbox/GrpcMailboxServiceTestBase.java | 4 +-
.../pinot/query/runtime/QueryRunnerTestBase.java | 46 ++++--
.../runtime/operator/AggregateOperatorTest.java | 22 +--
.../runtime/operator/HashJoinOperatorTest.java | 47 ++----
.../query/runtime/operator/OperatorTestUtil.java | 36 +++--
.../pinot/query/service/QueryDispatcherTest.java | 8 +-
.../pinot/query/service/QueryServerTest.java | 8 +-
.../testutils/MockDataBlockOperatorFactory.java | 74 +++++++++
.../testutils/MockInstanceDataManagerFactory.java | 168 +++++++++++++++++++++
15 files changed, 619 insertions(+), 374 deletions(-)
diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
index 87b3c940f2..7ece7fce43 100644
--- a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
+++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
@@ -19,6 +19,7 @@
package org.apache.pinot.query;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -127,13 +128,13 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase {
} else if (!PlannerUtils.isRootStage(e.getKey())) {
// join stage should have both servers used.
Assert.assertEquals(
- e.getValue().getServerInstances().stream().map(ServerInstance::toString).collect(Collectors.toList()),
- ImmutableList.of("Server_localhost_1", "Server_localhost_2"));
+ e.getValue().getServerInstances().stream().map(ServerInstance::toString).collect(Collectors.toSet()),
+ ImmutableSet.of("Server_localhost_1", "Server_localhost_2"));
} else {
// reduce stage should have the reducer instance.
Assert.assertEquals(
- e.getValue().getServerInstances().stream().map(ServerInstance::toString).collect(Collectors.toList()),
- ImmutableList.of("Server_localhost_3"));
+ e.getValue().getServerInstances().stream().map(ServerInstance::toString).collect(Collectors.toSet()),
+ ImmutableSet.of("Server_localhost_3"));
}
}
}
diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
index 799d0ef380..8994b81f57 100644
--- a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
+++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
@@ -18,26 +18,59 @@
*/
package org.apache.pinot.query;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.apache.calcite.jdbc.CalciteSchemaBuilder;
+import org.apache.pinot.common.config.provider.TableCache;
import org.apache.pinot.core.routing.RoutingManager;
import org.apache.pinot.query.catalog.PinotCatalog;
import org.apache.pinot.query.routing.WorkerManager;
+import org.apache.pinot.query.testutils.MockRoutingManagerFactory;
import org.apache.pinot.query.type.TypeFactory;
import org.apache.pinot.query.type.TypeSystem;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
public class QueryEnvironmentTestBase {
+ public static final Map<String, List<String>> SERVER1_SEGMENTS =
+ ImmutableMap.of("a_REALTIME", ImmutableList.of("a1", "a2"), "b_REALTIME", ImmutableList.of("b1"), "c_OFFLINE",
+ ImmutableList.of("c1"), "d_OFFLINE", ImmutableList.of("d1"));
+ public static final Map<String, List<String>> SERVER2_SEGMENTS =
+ ImmutableMap.of("a_REALTIME", ImmutableList.of("a3"), "c_OFFLINE", ImmutableList.of("c2", "c3"),
+ "d_REALTIME", ImmutableList.of("d2"), "d_OFFLINE", ImmutableList.of("d3"));
+ public static final Schema.SchemaBuilder SCHEMA_BUILDER;
+ public static final Object[][] ROWS = new Object[][]{
+ new Object[]{"foo", "foo", 1},
+ new Object[]{"bar", "bar", 42},
+ new Object[]{"alice", "alice", 1},
+ new Object[]{"bob", "foo", 42},
+ new Object[]{"charlie", "bar", 1},
+ };
+ static {
+ SCHEMA_BUILDER = new Schema.SchemaBuilder()
+ .addSingleValueDimension("col1", FieldSpec.DataType.STRING, "")
+ .addSingleValueDimension("col2", FieldSpec.DataType.STRING, "")
+ .addDateTime("ts", FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:HOURS")
+ .addMetric("col3", FieldSpec.DataType.INT, 0)
+ .setSchemaName("defaultSchemaName");
+ }
+
protected QueryEnvironment _queryEnvironment;
@BeforeClass
public void setUp() {
// the port doesn't matter as we are not actually making a server call.
- RoutingManager routingManager = QueryEnvironmentTestUtils.getMockRoutingManager(1, 2);
- _queryEnvironment = new QueryEnvironment(new TypeFactory(new TypeSystem()),
- CalciteSchemaBuilder.asRootSchema(new PinotCatalog(QueryEnvironmentTestUtils.mockTableCache())),
- new WorkerManager("localhost", 3, routingManager));
+ _queryEnvironment = getQueryEnvironment(3, 1, 2, SERVER1_SEGMENTS, SERVER2_SEGMENTS);
}
@DataProvider(name = "testQueryDataProvider")
@@ -71,4 +104,42 @@ public class QueryEnvironmentTestBase {
+ " b.col2 NOT IN ('alice', 'charlie')"},
};
}
+
+ public static List<GenericRow> buildRows(String tableName) {
+ List<GenericRow> rows = new ArrayList<>(ROWS.length);
+ for (int i = 0; i < ROWS.length; i++) {
+ GenericRow row = new GenericRow();
+ row.putValue("col1", ROWS[i][0]);
+ row.putValue("col2", ROWS[i][1]);
+ row.putValue("col3", ROWS[i][2]);
+ row.putValue("ts", TableType.OFFLINE.equals(TableNameBuilder.getTableTypeFromTableName(tableName))
+ ? System.currentTimeMillis() - TimeUnit.DAYS.toMillis(2) : System.currentTimeMillis());
+ rows.add(row);
+ }
+ return rows;
+ }
+
+ public static QueryEnvironment getQueryEnvironment(int reducerPort, int port1, int port2,
+ Map<String, List<String>> segmentMap1, Map<String, List<String>> segmentMap2) {
+ MockRoutingManagerFactory factory = new MockRoutingManagerFactory(port1, port2)
+ .registerTable(SCHEMA_BUILDER.setSchemaName("a").build(), "a_REALTIME")
+ .registerTable(SCHEMA_BUILDER.setSchemaName("b").build(), "b_REALTIME")
+ .registerTable(SCHEMA_BUILDER.setSchemaName("c").build(), "c_OFFLINE")
+ .registerTable(SCHEMA_BUILDER.setSchemaName("d").build(), "d");
+ for (Map.Entry<String, List<String>> entry : segmentMap1.entrySet()) {
+ for (String segment : entry.getValue()) {
+ factory.registerSegment(port1, entry.getKey(), segment);
+ }
+ }
+ for (Map.Entry<String, List<String>> entry : segmentMap2.entrySet()) {
+ for (String segment : entry.getValue()) {
+ factory.registerSegment(port2, entry.getKey(), segment);
+ }
+ }
+ RoutingManager routingManager = factory.buildRoutingManager();
+ TableCache tableCache = factory.buildTableCache();
+ return new QueryEnvironment(new TypeFactory(new TypeSystem()),
+ CalciteSchemaBuilder.asRootSchema(new PinotCatalog(tableCache)),
+ new WorkerManager("localhost", reducerPort, routingManager));
+ }
}
diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestUtils.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestUtils.java
deleted file mode 100644
index 6e2776d6c7..0000000000
--- a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestUtils.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.query;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import org.apache.calcite.jdbc.CalciteSchemaBuilder;
-import org.apache.pinot.common.config.provider.TableCache;
-import org.apache.pinot.common.request.BrokerRequest;
-import org.apache.pinot.core.routing.RoutingManager;
-import org.apache.pinot.core.routing.RoutingTable;
-import org.apache.pinot.core.routing.TimeBoundaryInfo;
-import org.apache.pinot.core.transport.ServerInstance;
-import org.apache.pinot.query.catalog.PinotCatalog;
-import org.apache.pinot.query.planner.QueryPlan;
-import org.apache.pinot.query.routing.WorkerInstance;
-import org.apache.pinot.query.routing.WorkerManager;
-import org.apache.pinot.query.type.TypeFactory;
-import org.apache.pinot.query.type.TypeSystem;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-/**
- * Base query environment test that provides a bunch of mock tables / schemas so that
- * we can run a simple query planning, produce stages / metadata for other components to test.
- */
-public class QueryEnvironmentTestUtils {
- public static final Schema.SchemaBuilder SCHEMA_BUILDER;
- public static final Map<String, List<String>> SERVER1_SEGMENTS =
- ImmutableMap.of("a", Lists.newArrayList("a1", "a2"), "b", Lists.newArrayList("b1"), "c",
- Lists.newArrayList("c1"), "d_O", Lists.newArrayList("d1"));
- public static final Map<String, List<String>> SERVER2_SEGMENTS =
- ImmutableMap.of("a", Lists.newArrayList("a3"), "c", Lists.newArrayList("c2", "c3"),
- "d_R", Lists.newArrayList("d2"), "d_O", Lists.newArrayList("d3"));
-
- public static final Map<String, String> TABLE_NAME_MAP;
- public static final Map<String, Schema> SCHEMA_NAME_MAP;
-
- static {
- SCHEMA_BUILDER = new Schema.SchemaBuilder()
- .addSingleValueDimension("col1", FieldSpec.DataType.STRING, "")
- .addSingleValueDimension("col2", FieldSpec.DataType.STRING, "")
- .addDateTime("ts", FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:HOURS")
- .addMetric("col3", FieldSpec.DataType.INT, 0)
- .setSchemaName("defaultSchemaName");
- SCHEMA_NAME_MAP = ImmutableMap.of(
- "a", SCHEMA_BUILDER.setSchemaName("a").build(),
- "b", SCHEMA_BUILDER.setSchemaName("b").build(),
- "c", SCHEMA_BUILDER.setSchemaName("c").build(),
- "d", SCHEMA_BUILDER.setSchemaName("d").build());
- TABLE_NAME_MAP = ImmutableMap.of("a_REALTIME", "a", "b_REALTIME", "b", "c_OFFLINE", "c",
- "d_OFFLINE", "d", "d_REALTIME", "d");
- }
-
- private QueryEnvironmentTestUtils() {
- // do not instantiate.
- }
-
- public static TableCache mockTableCache() {
- TableCache mock = mock(TableCache.class);
- when(mock.getTableNameMap()).thenReturn(TABLE_NAME_MAP);
- when(mock.getSchema(anyString())).thenAnswer(invocationOnMock -> {
- String schemaName = invocationOnMock.getArgument(0);
- return SCHEMA_NAME_MAP.get(schemaName);
- });
- return mock;
- }
-
- public static QueryEnvironment getQueryEnvironment(int reducerPort, int port1, int port2) {
- RoutingManager routingManager = QueryEnvironmentTestUtils.getMockRoutingManager(port1, port2);
- return new QueryEnvironment(new TypeFactory(new TypeSystem()),
- CalciteSchemaBuilder.asRootSchema(new PinotCatalog(QueryEnvironmentTestUtils.mockTableCache())),
- new WorkerManager("localhost", reducerPort, routingManager));
- }
-
- public static RoutingManager getMockRoutingManager(int port1, int port2) {
- String server1 = String.format("localhost_%d", port1);
- String server2 = String.format("localhost_%d", port2);
- // this doesn't test the QueryServer functionality so the server port can be the same as the mailbox port.
- // this is only use for test identifier purpose.
- ServerInstance host1 = new WorkerInstance("localhost", port1, port1, port1, port1);
- ServerInstance host2 = new WorkerInstance("localhost", port2, port2, port2, port2);
-
- RoutingTable rtA = mock(RoutingTable.class);
- when(rtA.getServerInstanceToSegmentsMap()).thenReturn(
- ImmutableMap.of(host1, SERVER1_SEGMENTS.get("a"), host2, SERVER2_SEGMENTS.get("a")));
- RoutingTable rtB = mock(RoutingTable.class);
- when(rtB.getServerInstanceToSegmentsMap()).thenReturn(ImmutableMap.of(host1, SERVER1_SEGMENTS.get("b")));
- RoutingTable rtC = mock(RoutingTable.class);
- when(rtC.getServerInstanceToSegmentsMap()).thenReturn(
- ImmutableMap.of(host1, SERVER1_SEGMENTS.get("c"), host2, SERVER2_SEGMENTS.get("c")));
-
- // hybrid table
- RoutingTable rtDOffline = mock(RoutingTable.class);
- RoutingTable rtDRealtime = mock(RoutingTable.class);
- when(rtDOffline.getServerInstanceToSegmentsMap()).thenReturn(
- ImmutableMap.of(host1, SERVER1_SEGMENTS.get("d_O"), host2, SERVER2_SEGMENTS.get("d_O")));
- when(rtDRealtime.getServerInstanceToSegmentsMap()).thenReturn(ImmutableMap.of(host2, SERVER2_SEGMENTS.get("d_R")));
- Map<String, RoutingTable> mockRoutingTableMap = ImmutableMap.of("a", rtA, "b", rtB, "c", rtC,
- "d_OFFLINE", rtDOffline, "d_REALTIME", rtDRealtime);
-
- RoutingManager mock = mock(RoutingManager.class);
- when(mock.getRoutingTable(any())).thenAnswer(invocation -> {
- BrokerRequest brokerRequest = invocation.getArgument(0);
- String tableName = brokerRequest.getPinotQuery().getDataSource().getTableName();
- return mockRoutingTableMap.getOrDefault(tableName,
- mockRoutingTableMap.get(TableNameBuilder.extractRawTableName(tableName)));
- });
- when(mock.getEnabledServerInstanceMap()).thenReturn(ImmutableMap.of(server1, host1, server2, host2));
- when(mock.getTimeBoundaryInfo(anyString())).thenAnswer(invocation -> {
- String offlineTableName = invocation.getArgument(0);
- return "d_OFFLINE".equals(offlineTableName) ? new TimeBoundaryInfo("ts",
- String.valueOf(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1))) : null;
- });
- return mock;
- }
-
- public static int getTestStageByServerCount(QueryPlan queryPlan, int serverCount) {
- List<Integer> stageIds = queryPlan.getStageMetadataMap().entrySet().stream()
- .filter(e -> !e.getKey().equals(0) && e.getValue().getServerInstances().size() == serverCount)
- .map(Map.Entry::getKey).collect(Collectors.toList());
- return stageIds.size() > 0 ? stageIds.get(0) : -1;
- }
-
- public static int getAvailablePort() {
- try {
- try (ServerSocket socket = new ServerSocket(0)) {
- return socket.getLocalPort();
- }
- } catch (IOException e) {
- throw new RuntimeException("Failed to find an available port to use", e);
- }
- }
-}
diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java
new file mode 100644
index 0000000000..9d2d5e1a22
--- /dev/null
+++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.testutils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.core.routing.RoutingManager;
+import org.apache.pinot.core.routing.RoutingTable;
+import org.apache.pinot.core.routing.TimeBoundaryInfo;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.routing.WorkerInstance;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * This is a builder pattern for generating a Mock Routing Manager.
+ */
+public class MockRoutingManagerFactory {
+ private static final String TIME_BOUNDARY_COLUMN = "ts";
+ private static final String HOST_NAME = "localhost";
+
+ private final HashMap<String, String> _tableNameMap;
+ private final Map<String, Schema> _schemaMap;
+
+ private final Map<String, ServerInstance> _serverInstances;
+ private final Map<String, RoutingTable> _routingTableMap;
+ private final List<String> _hybridTables;
+
+ private final Map<String, Map<ServerInstance, List<String>>> _tableServerSegmentMap;
+
+ public MockRoutingManagerFactory(int... ports) {
+ _hybridTables = new ArrayList<>();
+ _serverInstances = new HashMap<>();
+ _schemaMap = new HashMap<>();
+ _tableNameMap = new HashMap<>();
+ _routingTableMap = new HashMap<>();
+
+ _tableServerSegmentMap = new HashMap<>();
+ for (int port : ports) {
+ _serverInstances.put(toHostname(port), new WorkerInstance(HOST_NAME, port, port, port, port));
+ }
+ }
+
+ public MockRoutingManagerFactory registerTable(Schema schema, String tableName) {
+ TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName);
+ if (tableType == null) {
+ registerTableNameWithType(schema, TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(tableName));
+ registerTableNameWithType(schema, TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName));
+ _hybridTables.add(tableName);
+ } else {
+ registerTableNameWithType(schema, TableNameBuilder.forType(tableType).tableNameWithType(tableName));
+ }
+ return this;
+ }
+
+ public MockRoutingManagerFactory registerSegment(int insertToServerPort, String tableNameWithType,
+ String segmentName) {
+ Map<ServerInstance, List<String>> serverSegmentMap =
+ _tableServerSegmentMap.getOrDefault(tableNameWithType, new HashMap<>());
+ ServerInstance serverInstance = _serverInstances.get(toHostname(insertToServerPort));
+
+ List<String> sSegments = serverSegmentMap.getOrDefault(serverInstance, new ArrayList<>());
+ sSegments.add(segmentName);
+ serverSegmentMap.put(serverInstance, sSegments);
+ _tableServerSegmentMap.put(tableNameWithType, serverSegmentMap);
+ return this;
+ }
+
+ public RoutingManager buildRoutingManager() {
+ // create all the fake routing tables
+ _routingTableMap.clear();
+ for (Map.Entry<String, Map<ServerInstance, List<String>>> tableEntry : _tableServerSegmentMap.entrySet()) {
+ String tableNameWithType = tableEntry.getKey();
+ RoutingTable fakeRoutingTable = new RoutingTable(tableEntry.getValue(), Collections.emptyList(), 0);
+ _routingTableMap.put(tableNameWithType, fakeRoutingTable);
+ }
+ return new FakeRoutingManager(_routingTableMap, _serverInstances, _hybridTables);
+ }
+
+ public TableCache buildTableCache() {
+ TableCache mock = mock(TableCache.class);
+ when(mock.getTableNameMap()).thenReturn(_tableNameMap);
+ when(mock.getSchema(anyString())).thenAnswer(invocationOnMock -> {
+ String schemaName = invocationOnMock.getArgument(0);
+ return _schemaMap.get(schemaName);
+ });
+ return mock;
+ }
+
+ private static String toHostname(int port) {
+ return String.format("%s_%d", HOST_NAME, port);
+ }
+
+ private void registerTableNameWithType(Schema schema, String tableNameWithType) {
+ String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
+ _tableNameMap.put(tableNameWithType, rawTableName);
+ _schemaMap.put(rawTableName, schema);
+ _schemaMap.put(tableNameWithType, schema);
+ }
+
+ private static class FakeRoutingManager implements RoutingManager {
+ private final Map<String, RoutingTable> _routingTableMap;
+ private final Map<String, ServerInstance> _serverInstances;
+ private final List<String> _hybridTables;
+
+ public FakeRoutingManager(Map<String, RoutingTable> routingTableMap, Map<String, ServerInstance> serverInstances,
+ List<String> hybridTables) {
+ _routingTableMap = routingTableMap;
+ _serverInstances = serverInstances;
+ _hybridTables = hybridTables;
+ }
+
+ @Override
+ public Map<String, ServerInstance> getEnabledServerInstanceMap() {
+ return _serverInstances;
+ }
+
+ @Override
+ public RoutingTable getRoutingTable(BrokerRequest brokerRequest) {
+ String tableName = brokerRequest.getPinotQuery().getDataSource().getTableName();
+ return _routingTableMap.getOrDefault(tableName,
+ _routingTableMap.get(TableNameBuilder.extractRawTableName(tableName)));
+ }
+
+ @Override
+ public boolean routingExists(String tableNameWithType) {
+ return _routingTableMap.containsKey(tableNameWithType);
+ }
+
+ @Override
+ public TimeBoundaryInfo getTimeBoundaryInfo(String tableName) {
+ String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+ return _hybridTables.contains(rawTableName) ? new TimeBoundaryInfo(TIME_BOUNDARY_COLUMN,
+ String.valueOf(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1))) : null;
+ }
+ }
+}
diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/QueryTestUtils.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/QueryTestUtils.java
new file mode 100644
index 0000000000..6fb22b24ac
--- /dev/null
+++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/QueryTestUtils.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.testutils;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+
+
+public class QueryTestUtils {
+
+ private QueryTestUtils() {
+ // do not instantiate.
+ }
+
+ /**
+ * Acquire an available port. This acquired port is subject to a race-condition - a different process/thread may
+ * bind to the same port immediately after this method return and before the port bind to the intended process/thread.
+ */
+ public static synchronized int getAvailablePort() {
+ try {
+ try (ServerSocket socket = new ServerSocket(0)) {
+ return socket.getLocalPort();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to find an available port to use", e);
+ }
+ }
+}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
index d9ae5150bc..9481c4db61 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
@@ -18,16 +18,10 @@
*/
package org.apache.pinot.query;
-import java.io.File;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import org.apache.commons.io.FileUtils;
import org.apache.helix.HelixManager;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
@@ -35,31 +29,18 @@ import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.NamedThreadFactory;
import org.apache.pinot.common.utils.SchemaUtils;
import org.apache.pinot.core.data.manager.InstanceDataManager;
-import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
import org.apache.pinot.query.runtime.QueryRunner;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
import org.apache.pinot.query.service.QueryConfig;
-import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
-import org.apache.pinot.segment.local.data.manager.TableDataManager;
-import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
-import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
-import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
-import org.apache.pinot.segment.spi.ImmutableSegment;
-import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.query.testutils.MockInstanceDataManagerFactory;
+import org.apache.pinot.query.testutils.QueryTestUtils;
import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.utils.ReadMode;
-import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.matches;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -78,41 +59,23 @@ import static org.mockito.Mockito.when;
* multi-stage query communication.
*/
public class QueryServerEnclosure {
- private static final int NUM_ROWS = 5;
private static final int DEFAULT_EXECUTOR_THREAD_NUM = 5;
- private static final String[] STRING_FIELD_LIST = new String[]{"foo", "bar", "alice", "bob", "charlie"};
- private static final int[] INT_FIELD_LIST = new int[]{1, 42};
private static final String TABLE_CONFIGS_PREFIX = "/CONFIGS/TABLE/";
private static final String SCHEMAS_PREFIX = "/SCHEMAS/";
private final ExecutorService _testExecutor;
private final int _queryRunnerPort;
private final Map<String, Object> _runnerConfig = new HashMap<>();
- private final Map<String, List<ImmutableSegment>> _segmentMap = new HashMap<>();
- private final Map<String, List<GenericRow>> _rowsMap = new HashMap<>();
private final InstanceDataManager _instanceDataManager;
- private final Map<String, TableDataManager> _tableDataManagers = new HashMap<>();
- private final Map<String, File> _indexDirs;
private final HelixManager _helixManager;
private QueryRunner _queryRunner;
- public QueryServerEnclosure(Map<String, File> indexDirs, Map<String, List<String>> segments) {
- _indexDirs = indexDirs;
+ public QueryServerEnclosure(MockInstanceDataManagerFactory factory) {
try {
- for (Map.Entry<String, List<String>> entry : segments.entrySet()) {
- String tableName = entry.getKey();
- File indexDir = indexDirs.get(entry.getKey());
- FileUtils.deleteQuietly(indexDir);
- List<ImmutableSegment> segmentList = new ArrayList<>();
- for (String segmentName : entry.getValue()) {
- segmentList.add(buildSegment(indexDir, tableName, segmentName));
- }
- _segmentMap.put(tableName, segmentList);
- }
- _instanceDataManager = mockInstanceDataManager();
- _helixManager = mockHelixManager();
- _queryRunnerPort = QueryEnvironmentTestUtils.getAvailablePort();
+ _instanceDataManager = factory.buildInstanceDataManager();
+ _helixManager = mockHelixManager(factory.buildSchemaMap());
+ _queryRunnerPort = QueryTestUtils.getAvailablePort();
_runnerConfig.put(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, _queryRunnerPort);
_runnerConfig.put(QueryConfig.KEY_OF_QUERY_RUNNER_HOSTNAME,
String.format("Server_%s", QueryConfig.DEFAULT_QUERY_RUNNER_HOSTNAME));
@@ -124,7 +87,7 @@ public class QueryServerEnclosure {
}
}
- private HelixManager mockHelixManager() {
+ private HelixManager mockHelixManager(Map<String, Schema> schemaMap) {
ZkHelixPropertyStore<ZNRecord> zkHelixPropertyStore = mock(ZkHelixPropertyStore.class);
when(zkHelixPropertyStore.get(anyString(), any(), anyInt())).thenAnswer(invocationOnMock -> {
String path = invocationOnMock.getArgument(0);
@@ -133,7 +96,7 @@ public class QueryServerEnclosure {
return null;
} else if (path.startsWith(SCHEMAS_PREFIX)) {
String tableName = TableNameBuilder.extractRawTableName(path.substring(SCHEMAS_PREFIX.length()));
- return SchemaUtils.toZNRecord(QueryEnvironmentTestUtils.SCHEMA_NAME_MAP.get(tableName));
+ return SchemaUtils.toZNRecord(schemaMap.get(tableName));
} else {
return null;
}
@@ -147,67 +110,6 @@ public class QueryServerEnclosure {
return mock(ServerMetrics.class);
}
- private InstanceDataManager mockInstanceDataManager() {
- InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
- for (Map.Entry<String, List<ImmutableSegment>> e : _segmentMap.entrySet()) {
- TableDataManager tableDataManager = mockTableDataManager(e.getValue());
- _tableDataManagers.put(e.getKey(), tableDataManager);
- }
- for (Map.Entry<String, TableDataManager> e : _tableDataManagers.entrySet()) {
- when(instanceDataManager.getTableDataManager(matches(String.format("%s.*", e.getKey())))).thenReturn(
- e.getValue());
- }
- return instanceDataManager;
- }
-
- private TableDataManager mockTableDataManager(List<ImmutableSegment> segmentList) {
- List<SegmentDataManager> tableSegmentDataManagers =
- segmentList.stream().map(ImmutableSegmentDataManager::new).collect(Collectors.toList());
- TableDataManager tableDataManager = mock(TableDataManager.class);
- when(tableDataManager.acquireSegments(any(), any())).thenReturn(tableSegmentDataManagers);
- return tableDataManager;
- }
-
- private static List<GenericRow> buildRows(String tableName) {
- List<GenericRow> rows = new ArrayList<>(NUM_ROWS);
- for (int i = 0; i < NUM_ROWS; i++) {
- GenericRow row = new GenericRow();
- row.putValue("col1", STRING_FIELD_LIST[i % STRING_FIELD_LIST.length]);
- row.putValue("col2", STRING_FIELD_LIST[i % (STRING_FIELD_LIST.length - 2)]);
- row.putValue("col3", INT_FIELD_LIST[i % INT_FIELD_LIST.length]);
- row.putValue("ts", tableName.endsWith("_O")
- ? System.currentTimeMillis() - TimeUnit.DAYS.toMillis(2) : System.currentTimeMillis());
- rows.add(row);
- }
- return rows;
- }
-
- private ImmutableSegment buildSegment(File indexDir, String tableName, String segmentName)
- throws Exception {
- List<GenericRow> rows = buildRows(tableName);
- _rowsMap.putIfAbsent(tableName, new ArrayList<>());
- _rowsMap.get(tableName).addAll(rows);
-
- TableConfig tableConfig =
- new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName).setTimeColumnName("ts").build();
- Schema schema = QueryEnvironmentTestUtils.SCHEMA_BUILDER.setSchemaName(tableName).build();
- SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema);
- config.setOutDir(indexDir.getPath());
- config.setTableName(tableName);
- config.setSegmentName(segmentName);
-
- SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
- try (RecordReader recordReader = new GenericRowRecordReader(rows)) {
- driver.init(config, recordReader);
- }
- driver.build();
- return ImmutableSegmentLoader.load(new File(indexDir, segmentName), ReadMode.mmap);
- }
-
- public Map<String, List<GenericRow>> getRowsMap() {
- return _rowsMap;
- }
-
public int getPort() {
return _queryRunnerPort;
}
@@ -222,12 +124,6 @@ public class QueryServerEnclosure {
public void shutDown() {
_queryRunner.shutDown();
- for (Map.Entry<String, List<ImmutableSegment>> e : _segmentMap.entrySet()) {
- for (ImmutableSegment segment : e.getValue()) {
- segment.destroy();
- }
- FileUtils.deleteQuietly(_indexDirs.get(e.getKey()));
- }
}
public void processQuery(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap) {
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTestBase.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTestBase.java
index 60704e1c96..51ef9606cb 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTestBase.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTestBase.java
@@ -20,8 +20,8 @@ package org.apache.pinot.query.mailbox;
import java.util.Collections;
import java.util.TreeMap;
-import org.apache.pinot.query.QueryEnvironmentTestUtils;
import org.apache.pinot.query.service.QueryConfig;
+import org.apache.pinot.query.testutils.QueryTestUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -37,7 +37,7 @@ public abstract class GrpcMailboxServiceTestBase {
PinotConfiguration extraConfig = new PinotConfiguration(Collections.singletonMap(
QueryConfig.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES, 4_000_000));
for (int i = 0; i < MAILBOX_TEST_SIZE; i++) {
- int availablePort = QueryEnvironmentTestUtils.getAvailablePort();
+ int availablePort = QueryTestUtils.getAvailablePort();
GrpcMailboxService grpcMailboxService = new GrpcMailboxService("localhost", availablePort, extraConfig);
grpcMailboxService.start();
_mailboxServices.put(availablePort, grpcMailboxService);
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
index 916a12ff1a..290ac2615d 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
@@ -18,7 +18,6 @@
*/
package org.apache.pinot.query.runtime;
-import com.google.common.collect.ImmutableMap;
import java.io.File;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -35,12 +34,14 @@ import org.apache.pinot.common.datatable.DataTableFactory;
import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.QueryEnvironment;
-import org.apache.pinot.query.QueryEnvironmentTestUtils;
+import org.apache.pinot.query.QueryEnvironmentTestBase;
import org.apache.pinot.query.QueryServerEnclosure;
import org.apache.pinot.query.QueryTestSet;
import org.apache.pinot.query.mailbox.GrpcMailboxService;
import org.apache.pinot.query.routing.WorkerInstance;
import org.apache.pinot.query.service.QueryConfig;
+import org.apache.pinot.query.testutils.MockInstanceDataManagerFactory;
+import org.apache.pinot.query.testutils.QueryTestUtils;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
@@ -85,7 +86,7 @@ public class QueryRunnerTestBase extends QueryTestSet {
protected void addTableToH2(List<String> tables)
throws SQLException {
- Schema schema = QueryEnvironmentTestUtils.SCHEMA_BUILDER.build();
+ Schema schema = QueryEnvironmentTestBase.SCHEMA_BUILDER.build();
List<String> h2FieldNamesAndTypes = toH2FieldNamesAndTypes(schema);
for (String tableName : tables) {
// create table
@@ -97,7 +98,7 @@ public class QueryRunnerTestBase extends QueryTestSet {
protected void addDataToH2(Map<String, List<GenericRow>> rowsMap)
throws SQLException {
- Schema schema = QueryEnvironmentTestUtils.SCHEMA_BUILDER.build();
+ Schema schema = QueryEnvironmentTestBase.SCHEMA_BUILDER.build();
List<String> h2FieldNamesAndTypes = toH2FieldNamesAndTypes(schema);
for (Map.Entry<String, List<GenericRow>> entry : rowsMap.entrySet()) {
String tableName = entry.getKey();
@@ -127,20 +128,35 @@ public class QueryRunnerTestBase extends QueryTestSet {
public void setUp()
throws Exception {
DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4);
- QueryServerEnclosure server1 = new QueryServerEnclosure(
- ImmutableMap.of("a", INDEX_DIR_S1_A, "b", INDEX_DIR_S1_B, "c", INDEX_DIR_S1_C, "d_O", INDEX_DIR_S1_D),
- QueryEnvironmentTestUtils.SERVER1_SEGMENTS);
- QueryServerEnclosure server2 = new QueryServerEnclosure(
- ImmutableMap.of("a", INDEX_DIR_S2_A, "c", INDEX_DIR_S2_C, "d_R", INDEX_DIR_S2_D, "d_O", INDEX_DIR_S1_D),
- QueryEnvironmentTestUtils.SERVER2_SEGMENTS);
+ MockInstanceDataManagerFactory factory1 = new MockInstanceDataManagerFactory("server1")
+ .registerTable(QueryEnvironmentTestBase.SCHEMA_BUILDER.setSchemaName("a").build(), "a_REALTIME")
+ .registerTable(QueryEnvironmentTestBase.SCHEMA_BUILDER.setSchemaName("b").build(), "b_REALTIME")
+ .registerTable(QueryEnvironmentTestBase.SCHEMA_BUILDER.setSchemaName("c").build(), "c_OFFLINE")
+ .registerTable(QueryEnvironmentTestBase.SCHEMA_BUILDER.setSchemaName("d").build(), "d")
+ .addSegment("a_REALTIME", QueryEnvironmentTestBase.buildRows("a_REALTIME"))
+ .addSegment("a_REALTIME", QueryEnvironmentTestBase.buildRows("a_REALTIME"))
+ .addSegment("b_REALTIME", QueryEnvironmentTestBase.buildRows("b_REALTIME"))
+ .addSegment("c_OFFLINE", QueryEnvironmentTestBase.buildRows("c_OFFLINE"))
+ .addSegment("d_OFFLINE", QueryEnvironmentTestBase.buildRows("d_OFFLINE"));
+ MockInstanceDataManagerFactory factory2 = new MockInstanceDataManagerFactory("server1")
+ .registerTable(QueryEnvironmentTestBase.SCHEMA_BUILDER.setSchemaName("a").build(), "a_REALTIME")
+ .registerTable(QueryEnvironmentTestBase.SCHEMA_BUILDER.setSchemaName("c").build(), "c_OFFLINE")
+ .registerTable(QueryEnvironmentTestBase.SCHEMA_BUILDER.setSchemaName("d").build(), "d")
+ .addSegment("a_REALTIME", QueryEnvironmentTestBase.buildRows("a_REALTIME"))
+ .addSegment("c_OFFLINE", QueryEnvironmentTestBase.buildRows("c_OFFLINE"))
+ .addSegment("c_OFFLINE", QueryEnvironmentTestBase.buildRows("c_OFFLINE"))
+ .addSegment("d_OFFLINE", QueryEnvironmentTestBase.buildRows("d_OFFLINE"))
+ .addSegment("d_REALTIME", QueryEnvironmentTestBase.buildRows("d_REALTIME"));
+ QueryServerEnclosure server1 = new QueryServerEnclosure(factory1);
+ QueryServerEnclosure server2 = new QueryServerEnclosure(factory2);
// Setting up H2 for validation
setH2Connection();
addTableToH2(Arrays.asList("a", "b", "c", "d"));
- addDataToH2(server1.getRowsMap());
- addDataToH2(server2.getRowsMap());
+ addDataToH2(factory1.buildTableRowsMap());
+ addDataToH2(factory2.buildTableRowsMap());
- _reducerGrpcPort = QueryEnvironmentTestUtils.getAvailablePort();
+ _reducerGrpcPort = QueryTestUtils.getAvailablePort();
_reducerHostname = String.format("Broker_%s", QueryConfig.DEFAULT_QUERY_RUNNER_HOSTNAME);
Map<String, Object> reducerConfig = new HashMap<>();
reducerConfig.put(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, _reducerGrpcPort);
@@ -148,8 +164,8 @@ public class QueryRunnerTestBase extends QueryTestSet {
_mailboxService = new GrpcMailboxService(_reducerHostname, _reducerGrpcPort, new PinotConfiguration(reducerConfig));
_mailboxService.start();
- _queryEnvironment = QueryEnvironmentTestUtils.getQueryEnvironment(_reducerGrpcPort, server1.getPort(),
- server2.getPort());
+ _queryEnvironment = QueryEnvironmentTestBase.getQueryEnvironment(_reducerGrpcPort, server1.getPort(),
+ server2.getPort(), factory1.buildTableSegmentNameMap(), factory2.buildTableSegmentNameMap());
server1.start();
server2.start();
// this doesn't test the QueryServer functionality so the server port can be the same as the mailbox port.
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
index d467ba8112..fc782c5c57 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
@@ -21,40 +21,26 @@ package org.apache.pinot.query.runtime.operator;
import java.util.Arrays;
import java.util.List;
import org.apache.calcite.sql.SqlKind;
-import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.spi.data.FieldSpec;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
import org.testng.Assert;
-import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import static org.mockito.Mockito.when;
public class AggregateOperatorTest {
- @Mock
- Operator<TransferableBlock> _upstreamOperator;
-
- @BeforeMethod
- public void setup() {
- MockitoAnnotations.initMocks(this);
- }
@Test
public void testGroupByAggregateWithHashCollision() {
- // "Aa" and "BB" have same hash code in java.
- List<Object[]> rows = Arrays.asList(new Object[]{1, "Aa"}, new Object[]{2, "BB"}, new Object[]{3, "BB"});
- when(_upstreamOperator.nextBlock()).thenReturn(OperatorTestUtil.getRowDataBlock(rows))
- .thenReturn(OperatorTestUtil.getEndOfStreamRowBlock());
+ BaseOperator<TransferableBlock> upstreamOperator = OperatorTestUtil.getOperator(OperatorTestUtil.OP_1);
// Create an aggregation call with sum for first column and group by second column.
RexExpression.FunctionCall agg = new RexExpression.FunctionCall(SqlKind.SUM, FieldSpec.DataType.INT, "SUM",
Arrays.asList(new RexExpression.InputRef(0)));
AggregateOperator sum0GroupBy1 =
- new AggregateOperator(_upstreamOperator, OperatorTestUtil.TEST_DATA_SCHEMA, Arrays.asList(agg),
- Arrays.asList(new RexExpression.InputRef(1)));
+ new AggregateOperator(upstreamOperator, OperatorTestUtil.getDataSchema(OperatorTestUtil.OP_1),
+ Arrays.asList(agg), Arrays.asList(new RexExpression.InputRef(1)));
TransferableBlock result = sum0GroupBy1.getNextBlock();
while (result.isNoOpBlock()) {
result = sum0GroupBy1.getNextBlock();
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
index 9975cd57ff..65bb2192c2 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
@@ -23,18 +23,14 @@ import java.util.Arrays;
import java.util.List;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
import org.apache.pinot.query.planner.stage.JoinNode;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
import org.testng.Assert;
-import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import static org.mockito.Mockito.when;
public class HashJoinOperatorTest {
@@ -43,32 +39,17 @@ public class HashJoinOperatorTest {
FieldSelectionKeySelector rightSelect = new FieldSelectionKeySelector(rightIdx);
return new JoinNode.JoinKeys(leftSelect, rightSelect);
}
- @Mock
- Operator<TransferableBlock> _leftOperator;
-
- @Mock
- Operator<TransferableBlock> _rightOperator;
-
- @BeforeMethod
- public void setup() {
- MockitoAnnotations.initMocks(this);
- }
@Test
public void testHashJoinKeyCollisionInnerJoin() {
- // "Aa" and "BB" have same hash code in java.
- List<Object[]> rows = Arrays.asList(new Object[]{1, "Aa"}, new Object[]{2, "BB"}, new Object[]{3, "BB"});
- when(_leftOperator.nextBlock()).thenReturn(OperatorTestUtil.getRowDataBlock(rows))
- .thenReturn(OperatorTestUtil.getEndOfStreamRowBlock());
- when(_rightOperator.nextBlock()).thenReturn(OperatorTestUtil.getRowDataBlock(rows))
- .thenReturn(OperatorTestUtil.getEndOfStreamRowBlock());
-
+ BaseOperator<TransferableBlock> leftOperator = OperatorTestUtil.getOperator(OperatorTestUtil.OP_1);
+ BaseOperator<TransferableBlock> rightOperator = OperatorTestUtil.getOperator(OperatorTestUtil.OP_1);
List<RexExpression> joinClauses = new ArrayList<>();
DataSchema resultSchema = new DataSchema(new String[]{"foo", "bar", "foo", "bar"}, new DataSchema.ColumnDataType[]{
DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT,
DataSchema.ColumnDataType.STRING
});
- HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, resultSchema,
+ HashJoinOperator join = new HashJoinOperator(leftOperator, rightOperator, resultSchema,
getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, JoinRelType.INNER);
TransferableBlock result = join.nextBlock();
@@ -89,19 +70,15 @@ public class HashJoinOperatorTest {
@Test
public void testInnerJoin() {
- List<Object[]> leftRows = Arrays.asList(new Object[]{1, "Aa"}, new Object[]{2, "BB"}, new Object[]{3, "BB"});
- when(_leftOperator.nextBlock()).thenReturn(OperatorTestUtil.getRowDataBlock(leftRows))
- .thenReturn(OperatorTestUtil.getEndOfStreamRowBlock());
- List<Object[]> rightRows = Arrays.asList(new Object[]{1, "AA"}, new Object[]{2, "Aa"});
- when(_rightOperator.nextBlock()).thenReturn(OperatorTestUtil.getRowDataBlock(rightRows))
- .thenReturn(OperatorTestUtil.getEndOfStreamRowBlock());
+ BaseOperator<TransferableBlock> leftOperator = OperatorTestUtil.getOperator(OperatorTestUtil.OP_1);
+ BaseOperator<TransferableBlock> rightOperator = OperatorTestUtil.getOperator(OperatorTestUtil.OP_2);
List<RexExpression> joinClauses = new ArrayList<>();
DataSchema resultSchema = new DataSchema(new String[]{"foo", "bar", "foo", "bar"}, new DataSchema.ColumnDataType[]{
DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT,
DataSchema.ColumnDataType.STRING
});
- HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, resultSchema,
+ HashJoinOperator join = new HashJoinOperator(leftOperator, rightOperator, resultSchema,
getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, JoinRelType.INNER);
TransferableBlock result = join.nextBlock();
@@ -118,19 +95,15 @@ public class HashJoinOperatorTest {
@Test
public void testLeftJoin() {
- List<Object[]> leftRows = Arrays.asList(new Object[]{1, "Aa"}, new Object[]{2, "BB"}, new Object[]{3, "BB"});
- when(_leftOperator.nextBlock()).thenReturn(OperatorTestUtil.getRowDataBlock(leftRows))
- .thenReturn(OperatorTestUtil.getEndOfStreamRowBlock());
- List<Object[]> rightRows = Arrays.asList(new Object[]{1, "AA"}, new Object[]{2, "Aa"});
- when(_rightOperator.nextBlock()).thenReturn(OperatorTestUtil.getRowDataBlock(rightRows))
- .thenReturn(OperatorTestUtil.getEndOfStreamRowBlock());
+ BaseOperator<TransferableBlock> leftOperator = OperatorTestUtil.getOperator(OperatorTestUtil.OP_1);
+ BaseOperator<TransferableBlock> rightOperator = OperatorTestUtil.getOperator(OperatorTestUtil.OP_2);
List<RexExpression> joinClauses = new ArrayList<>();
DataSchema resultSchema = new DataSchema(new String[]{"foo", "bar", "foo", "bar"}, new DataSchema.ColumnDataType[]{
DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT,
DataSchema.ColumnDataType.STRING
});
- HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, resultSchema,
+ HashJoinOperator join = new HashJoinOperator(leftOperator, rightOperator, resultSchema,
getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, JoinRelType.LEFT);
TransferableBlock result = join.nextBlock();
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
index aaa190a3d4..86b65a1686 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
@@ -18,33 +18,43 @@
*/
package org.apache.pinot.query.runtime.operator;
+import java.util.Arrays;
import java.util.List;
-import org.apache.pinot.common.datablock.BaseDataBlock;
-import org.apache.pinot.common.datablock.DataBlockUtils;
import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.testutils.MockDataBlockOperatorFactory;
public class OperatorTestUtil {
- private OperatorTestUtil() {
- }
+ // simple key-value collision schema/data test set: "Aa" and "BB" have same hash code in java.
+ private static final List<List<Object[]>> SIMPLE_KV_DATA_ROWS = Arrays.asList(
+ Arrays.asList(new Object[]{1, "Aa"}, new Object[]{2, "BB"}, new Object[]{3, "BB"}),
+ Arrays.asList(new Object[]{1, "AA"}, new Object[]{2, "Aa"}));
+ private static final MockDataBlockOperatorFactory MOCK_OPERATOR_FACTORY;
- public static final DataSchema TEST_DATA_SCHEMA = new DataSchema(new String[]{"foo", "bar"},
+ public static final DataSchema SIMPLE_KV_DATA_SCHEMA = new DataSchema(new String[]{"foo", "bar"},
new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING});
- public static TransferableBlock getEndOfStreamRowBlock() {
- return getEndOfStreamRowBlockWithSchema();
+ public static final String OP_1 = "op1";
+ public static final String OP_2 = "op2";
+
+ static {
+ MOCK_OPERATOR_FACTORY = new MockDataBlockOperatorFactory()
+ .registerOperator(OP_1, SIMPLE_KV_DATA_SCHEMA)
+ .registerOperator(OP_2, SIMPLE_KV_DATA_SCHEMA)
+ .addRows(OP_1, SIMPLE_KV_DATA_ROWS.get(0))
+ .addRows(OP_2, SIMPLE_KV_DATA_ROWS.get(1));
}
- public static TransferableBlock getEndOfStreamRowBlockWithSchema() {
- return new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock());
+ private OperatorTestUtil() {
}
- public static TransferableBlock getRowDataBlock(List<Object[]> rows) {
- return getRowDataBlockWithSchema(rows, TEST_DATA_SCHEMA);
+ public static BaseOperator<TransferableBlock> getOperator(String operatorName) {
+ return MOCK_OPERATOR_FACTORY.buildMockOperator(operatorName);
}
- public static TransferableBlock getRowDataBlockWithSchema(List<Object[]> rows, DataSchema schema) {
- return new TransferableBlock(rows, schema, BaseDataBlock.Type.ROW);
+ public static DataSchema getDataSchema(String operatorName) {
+ return MOCK_OPERATOR_FACTORY.getDataSchema(operatorName);
}
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryDispatcherTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryDispatcherTest.java
index 34e52f56bb..673858ab00 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryDispatcherTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryDispatcherTest.java
@@ -24,11 +24,12 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.pinot.query.QueryEnvironment;
-import org.apache.pinot.query.QueryEnvironmentTestUtils;
+import org.apache.pinot.query.QueryEnvironmentTestBase;
import org.apache.pinot.query.QueryTestSet;
import org.apache.pinot.query.planner.PlannerUtils;
import org.apache.pinot.query.planner.QueryPlan;
import org.apache.pinot.query.runtime.QueryRunner;
+import org.apache.pinot.query.testutils.QueryTestUtils;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -49,7 +50,7 @@ public class QueryDispatcherTest extends QueryTestSet {
throws Exception {
for (int i = 0; i < QUERY_SERVER_COUNT; i++) {
- int availablePort = QueryEnvironmentTestUtils.getAvailablePort();
+ int availablePort = QueryTestUtils.getAvailablePort();
QueryRunner queryRunner = Mockito.mock(QueryRunner.class);
QueryServer queryServer = new QueryServer(availablePort, queryRunner);
queryServer.start();
@@ -60,7 +61,8 @@ public class QueryDispatcherTest extends QueryTestSet {
List<Integer> portList = Lists.newArrayList(_queryServerMap.keySet());
// reducer port doesn't matter, we are testing the worker instance not GRPC.
- _queryEnvironment = QueryEnvironmentTestUtils.getQueryEnvironment(1, portList.get(0), portList.get(1));
+ _queryEnvironment = QueryEnvironmentTestBase.getQueryEnvironment(1, portList.get(0), portList.get(1),
+ QueryEnvironmentTestBase.SERVER1_SEGMENTS, QueryEnvironmentTestBase.SERVER2_SEGMENTS);
}
@AfterClass
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
index 7202ab8d91..066591dbd3 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
@@ -31,7 +31,7 @@ import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
import org.apache.pinot.common.proto.Worker;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.QueryEnvironment;
-import org.apache.pinot.query.QueryEnvironmentTestUtils;
+import org.apache.pinot.query.QueryEnvironmentTestBase;
import org.apache.pinot.query.QueryTestSet;
import org.apache.pinot.query.planner.QueryPlan;
import org.apache.pinot.query.planner.StageMetadata;
@@ -39,6 +39,7 @@ import org.apache.pinot.query.planner.stage.StageNode;
import org.apache.pinot.query.routing.WorkerInstance;
import org.apache.pinot.query.runtime.QueryRunner;
import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils;
+import org.apache.pinot.query.testutils.QueryTestUtils;
import org.apache.pinot.util.TestUtils;
import org.mockito.Mockito;
import org.testng.Assert;
@@ -63,7 +64,7 @@ public class QueryServerTest extends QueryTestSet {
throws Exception {
for (int i = 0; i < QUERY_SERVER_COUNT; i++) {
- int availablePort = QueryEnvironmentTestUtils.getAvailablePort();
+ int availablePort = QueryTestUtils.getAvailablePort();
QueryRunner queryRunner = Mockito.mock(QueryRunner.class);
QueryServer queryServer = new QueryServer(availablePort, queryRunner);
queryServer.start();
@@ -78,7 +79,8 @@ public class QueryServerTest extends QueryTestSet {
List<Integer> portList = Lists.newArrayList(_queryServerMap.keySet());
// reducer port doesn't matter, we are testing the worker instance not GRPC.
- _queryEnvironment = QueryEnvironmentTestUtils.getQueryEnvironment(1, portList.get(0), portList.get(1));
+ _queryEnvironment = QueryEnvironmentTestBase.getQueryEnvironment(1, portList.get(0), portList.get(1),
+ QueryEnvironmentTestBase.SERVER1_SEGMENTS, QueryEnvironmentTestBase.SERVER2_SEGMENTS);
}
@AfterClass
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockDataBlockOperatorFactory.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockDataBlockOperatorFactory.java
new file mode 100644
index 0000000000..a85ae59e6d
--- /dev/null
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockDataBlockOperatorFactory.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.testutils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.datablock.BaseDataBlock;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+
+public class MockDataBlockOperatorFactory {
+ protected final Map<String, List<List<Object[]>>> _rowsMap;
+ protected final Map<String, DataSchema> _operatorSchemaMap;
+
+ public MockDataBlockOperatorFactory() {
+ _rowsMap = new HashMap<>();
+ _operatorSchemaMap = new HashMap<>();
+ }
+
+ public MockDataBlockOperatorFactory registerOperator(String operatorName, DataSchema resultSchema) {
+ _operatorSchemaMap.put(operatorName, resultSchema);
+ return this;
+ }
+
+ public MockDataBlockOperatorFactory addRows(String operatorName, List<Object[]> rows) {
+ List<List<Object[]>> operatorRows = _rowsMap.getOrDefault(operatorName, new ArrayList<>());
+ operatorRows.add(rows);
+ _rowsMap.put(operatorName, operatorRows);
+ return this;
+ }
+
+ @SuppressWarnings("unchecked")
+ public BaseOperator<TransferableBlock> buildMockOperator(String operatorName) {
+ BaseOperator<TransferableBlock> operator = Mockito.mock(BaseOperator.class);
+ Mockito.when(operator.nextBlock()).thenAnswer(new Answer<Object>() {
+ private int _invocationCount = 0;
+ public Object answer(InvocationOnMock invocation) {
+ return _invocationCount >= _rowsMap.get(operatorName).size()
+ ? TransferableBlockUtils.getEndOfStreamTransferableBlock()
+ : new TransferableBlock(_rowsMap.get(operatorName).get(_invocationCount++),
+ _operatorSchemaMap.get(operatorName), BaseDataBlock.Type.ROW);
+ }
+ });
+ return operator;
+ }
+
+ public DataSchema getDataSchema(String operatorName) {
+ return _operatorSchemaMap.get(operatorName);
+ }
+}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java
new file mode 100644
index 0000000000..396b2e65ab
--- /dev/null
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.testutils;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.matches;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class MockInstanceDataManagerFactory {
+ private static final String DATA_DIR_PREFIX = "MockInstanceDataDir";
+
+ private final Map<String, List<GenericRow>> _tableRowsMap;
+ private final Map<String, List<ImmutableSegment>> _tableSegmentMap;
+ private final Map<String, List<String>> _tableSegmentNameMap;
+ private final Map<String, File> _serverTableDataDirMap;
+ private final Map<String, Schema> _schemaMap;
+ private String _serverName;
+
+ public MockInstanceDataManagerFactory(String serverName) {
+ _serverName = serverName;
+ _serverTableDataDirMap = new HashMap<>();
+ _tableSegmentMap = new HashMap<>();
+ _tableSegmentNameMap = new HashMap<>();
+ _tableRowsMap = new HashMap<>();
+ _schemaMap = new HashMap<>();
+ }
+
+ public MockInstanceDataManagerFactory registerTable(Schema schema, String tableName) {
+ TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName);
+ if (tableType == null) {
+ registerTableNameWithType(schema, TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(tableName));
+ registerTableNameWithType(schema, TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName));
+ } else {
+ registerTableNameWithType(schema, tableName);
+ }
+ return this;
+ }
+
+ public MockInstanceDataManagerFactory addSegment(String tableNameWithType, List<GenericRow> rows) {
+ String segmentName = String.format("%s_%s", tableNameWithType, UUID.randomUUID());
+ File tableDataDir = _serverTableDataDirMap.get(tableNameWithType);
+ ImmutableSegment segment = buildSegment(tableNameWithType, tableDataDir, segmentName, rows);
+
+ List<ImmutableSegment> segmentList = _tableSegmentMap.getOrDefault(tableNameWithType, new ArrayList<>());
+ segmentList.add(segment);
+ _tableSegmentMap.put(tableNameWithType, segmentList);
+
+ List<String> segmentNameList = _tableSegmentNameMap.getOrDefault(tableNameWithType, new ArrayList<>());
+ segmentNameList.add(segment.getSegmentName());
+ _tableSegmentNameMap.put(tableNameWithType, segmentNameList);
+
+ String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
+ List<GenericRow> tableRows = _tableRowsMap.getOrDefault(rawTableName, new ArrayList<>());
+ tableRows.addAll(rows);
+ _tableRowsMap.put(rawTableName, tableRows);
+ return this;
+ }
+
+ public InstanceDataManager buildInstanceDataManager() {
+ InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
+ Map<String, TableDataManager> tableDataManagers = new HashMap<>();
+ for (Map.Entry<String, List<ImmutableSegment>> e : _tableSegmentMap.entrySet()) {
+ TableDataManager tableDataManager = mockTableDataManager(e.getValue());
+ tableDataManagers.put(e.getKey(), tableDataManager);
+ }
+ for (Map.Entry<String, TableDataManager> e : tableDataManagers.entrySet()) {
+ when(instanceDataManager.getTableDataManager(matches(String.format("%s.*", e.getKey())))).thenReturn(
+ e.getValue());
+ }
+ return instanceDataManager;
+ }
+
+ public Map<String, Schema> buildSchemaMap() {
+ return _schemaMap;
+ }
+
+ public Map<String, List<GenericRow>> buildTableRowsMap() {
+ return _tableRowsMap;
+ }
+
+ public Map<String, List<String>> buildTableSegmentNameMap() {
+ return _tableSegmentNameMap;
+ }
+
+ private TableDataManager mockTableDataManager(List<ImmutableSegment> segmentList) {
+ List<SegmentDataManager> tableSegmentDataManagers =
+ segmentList.stream().map(ImmutableSegmentDataManager::new).collect(Collectors.toList());
+ TableDataManager tableDataManager = mock(TableDataManager.class);
+ when(tableDataManager.acquireSegments(any(), any())).thenReturn(tableSegmentDataManagers);
+ return tableDataManager;
+ }
+
+ private ImmutableSegment buildSegment(String tableNameWithType, File indexDir, String segmentName,
+ List<GenericRow> rows) {
+ String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
+ TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ // TODO: plugin table config constructor
+ TableConfig tableConfig = new TableConfigBuilder(tableType).setTableName(rawTableName).setTimeColumnName("ts")
+ .build();
+ Schema schema = _schemaMap.get(tableNameWithType);
+ SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema);
+ config.setOutDir(indexDir.getPath());
+ config.setTableName(tableNameWithType);
+ config.setSegmentName(segmentName);
+
+ SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
+ try (RecordReader recordReader = new GenericRowRecordReader(rows)) {
+ driver.init(config, recordReader);
+ driver.build();
+ return ImmutableSegmentLoader.load(new File(indexDir, segmentName), ReadMode.mmap);
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to construct immutable segment from records", e);
+ }
+ }
+
+ private void registerTableNameWithType(Schema schema, String tableNameWithType) {
+ File tableDataDir = new File(FileUtils.getTempDirectory(),
+ String.format("%s_%s_%s", DATA_DIR_PREFIX, _serverName, tableNameWithType));
+ FileUtils.deleteQuietly(tableDataDir);
+ _serverTableDataDirMap.put(tableNameWithType, tableDataDir);
+ _schemaMap.put(TableNameBuilder.extractRawTableName(tableNameWithType), schema);
+ _schemaMap.put(tableNameWithType, schema);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org