You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/10/01 19:28:12 UTC
[pinot] branch master updated: fix pinot-controller/broker recent
fork timeout (#7490)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 9b42278 fix pinot-controller/broker recent fork timeout (#7490)
9b42278 is described below
commit 9b42278690a4ef295fa5f86b73cb4f25ab047db3
Author: Rong Rong <wa...@gmail.com>
AuthorDate: Fri Oct 1 12:27:01 2021 -0700
fix pinot-controller/broker recent fork timeout (#7490)
---
.../routing/segmentpruner/SegmentPrunerTest.java | 11 ++---
.../timeboundary/TimeBoundaryManagerTest.java | 11 ++---
.../api/ConsumingSegmentInfoReaderTest.java | 28 ++++-------
.../controller/api/PinotSegmentsMetadataTest.java | 18 ++-----
.../pinot/controller/api/TableSizeReaderTest.java | 26 ++--------
.../pinot/controller/utils/FakeHttpServer.java | 55 ++++++++++++++++++++++
6 files changed, 82 insertions(+), 67 deletions(-)
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
index 4e8d11a..2612bcb 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
@@ -37,7 +37,7 @@ import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.request.BrokerRequest;
-import org.apache.pinot.common.utils.ZkStarter;
+import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.parsers.QueryCompiler;
import org.apache.pinot.pql.parsers.Pql2Compiler;
import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
@@ -66,7 +66,7 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
-public class SegmentPrunerTest {
+public class SegmentPrunerTest extends ControllerTest {
private static final String RAW_TABLE_NAME = "testTable";
private static final String OFFLINE_TABLE_NAME = "testTable_OFFLINE";
private static final String REALTIME_TABLE_NAME = "testTable_REALTIME";
@@ -95,15 +95,14 @@ public class SegmentPrunerTest {
private static final String SDF_QUERY_5 =
"SELECT * FROM testTable where timeColumn in (20200101, 20200102) AND timeColumn >= 20200530";
- private ZkStarter.ZookeeperInstance _zkInstance;
private ZkClient _zkClient;
private ZkHelixPropertyStore<ZNRecord> _propertyStore;
@BeforeClass
public void setUp() {
- _zkInstance = ZkStarter.startLocalZkServer();
+ startZk();
_zkClient =
- new ZkClient(_zkInstance.getZkUrl(), ZkClient.DEFAULT_SESSION_TIMEOUT, ZkClient.DEFAULT_CONNECTION_TIMEOUT,
+ new ZkClient(getZkUrl(), ZkClient.DEFAULT_SESSION_TIMEOUT, ZkClient.DEFAULT_CONNECTION_TIMEOUT,
new ZNRecordSerializer());
_propertyStore =
new ZkHelixPropertyStore<>(new ZkBaseDataAccessor<>(_zkClient), "/SegmentPrunerTest/PROPERTYSTORE", null);
@@ -112,7 +111,7 @@ public class SegmentPrunerTest {
@AfterClass
public void tearDown() {
_zkClient.close();
- ZkStarter.stopLocalZkServer(_zkInstance);
+ stopZk();
}
@Test
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java
index 71fcfbd..390bfda 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java
@@ -32,7 +32,7 @@ import org.apache.helix.model.IdealState;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
-import org.apache.pinot.common.utils.ZkStarter;
+import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
@@ -53,18 +53,17 @@ import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
-public class TimeBoundaryManagerTest {
+public class TimeBoundaryManagerTest extends ControllerTest {
private static final String TIME_COLUMN = "time";
- private ZkStarter.ZookeeperInstance _zkInstance;
private ZkClient _zkClient;
private ZkHelixPropertyStore<ZNRecord> _propertyStore;
@BeforeClass
public void setUp() {
- _zkInstance = ZkStarter.startLocalZkServer();
+ startZk();
_zkClient =
- new ZkClient(_zkInstance.getZkUrl(), ZkClient.DEFAULT_SESSION_TIMEOUT, ZkClient.DEFAULT_CONNECTION_TIMEOUT,
+ new ZkClient(getZkUrl(), ZkClient.DEFAULT_SESSION_TIMEOUT, ZkClient.DEFAULT_CONNECTION_TIMEOUT,
new ZNRecordSerializer());
_propertyStore =
new ZkHelixPropertyStore<>(new ZkBaseDataAccessor<>(_zkClient), "/TimeBoundaryManagerTest/PROPERTYSTORE", null);
@@ -73,7 +72,7 @@ public class TimeBoundaryManagerTest {
@AfterClass
public void tearDown() {
_zkClient.close();
- ZkStarter.stopLocalZkServer(_zkInstance);
+ stopZk();
}
@Test
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/ConsumingSegmentInfoReaderTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/ConsumingSegmentInfoReaderTest.java
index a019f6b..b83af3d 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/ConsumingSegmentInfoReaderTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/ConsumingSegmentInfoReaderTest.java
@@ -23,10 +23,8 @@ import com.google.common.collect.HashBiMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.sun.net.httpserver.HttpHandler;
-import com.sun.net.httpserver.HttpServer;
import java.io.IOException;
import java.io.OutputStream;
-import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -43,6 +41,7 @@ import org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.common.restlet.resources.SegmentConsumerInfo;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.util.ConsumingSegmentInfoReader;
+import org.apache.pinot.controller.utils.FakeHttpServer;
import org.apache.pinot.spi.config.table.TableStatus;
import org.apache.pinot.spi.utils.CommonConstants.ConsumerState;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -68,12 +67,14 @@ public class ConsumingSegmentInfoReaderTest {
private static final String TABLE_NAME = "myTable_REALTIME";
private static final String SEGMENT_NAME_PARTITION_0 = "table__0__29__12345";
private static final String SEGMENT_NAME_PARTITION_1 = "table__1__32__12345";
+ private static final int TIMEOUT_MSEC = 10000;
+ private static final int EXTENDED_TIMEOUT_FACTOR = 100;
private final Executor _executor = Executors.newFixedThreadPool(1);
private final HttpConnectionManager _connectionManager = new MultiThreadedHttpConnectionManager();
private PinotHelixResourceManager _helix;
private final Map<String, FakeConsumingInfoServer> _serverMap = new HashMap<>();
- private final int _timeoutMsec = 10000;
+
@BeforeClass
public void setUp()
@@ -116,14 +117,14 @@ public class ConsumingSegmentInfoReaderTest {
FakeConsumingInfoServer s4 = new FakeConsumingInfoServer(Lists
.newArrayList(new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_0, "CONSUMING", 0, partitionToOffset0),
new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0, partitionToOffset1)));
- s4.start(uriPath, createHandler(200, s4._consumerInfos, _timeoutMsec * 1000));
+ s4.start(uriPath, createHandler(200, s4._consumerInfos, TIMEOUT_MSEC * EXTENDED_TIMEOUT_FACTOR));
_serverMap.put("server4", s4);
}
@AfterClass
public void tearDown() {
for (Map.Entry<String, FakeConsumingInfoServer> fakeServerEntry : _serverMap.entrySet()) {
- fakeServerEntry.getValue()._httpServer.stop(0);
+ fakeServerEntry.getValue().stop();
}
}
@@ -148,23 +149,12 @@ public class ConsumingSegmentInfoReaderTest {
/**
* Server to return fake consuming segment info
*/
- private static class FakeConsumingInfoServer {
- String _endpoint;
- InetSocketAddress _socket = new InetSocketAddress(0);
+ private static class FakeConsumingInfoServer extends FakeHttpServer {
List<SegmentConsumerInfo> _consumerInfos;
- HttpServer _httpServer;
FakeConsumingInfoServer(List<SegmentConsumerInfo> consumerInfos) {
_consumerInfos = consumerInfos;
}
-
- private void start(String path, HttpHandler handler)
- throws IOException {
- _httpServer = HttpServer.create(_socket, 0);
- _httpServer.createContext(path, handler);
- new Thread(() -> _httpServer.start()).start();
- _endpoint = "http://localhost:" + _httpServer.getAddress().getPort();
- }
}
private Map<String, List<String>> subsetOfServerSegments(String... servers) {
@@ -199,7 +189,7 @@ public class ConsumingSegmentInfoReaderTest {
throws InvalidConfigException {
mockSetup(servers, consumingSegments);
ConsumingSegmentInfoReader reader = new ConsumingSegmentInfoReader(_executor, _connectionManager, _helix);
- return reader.getConsumingSegmentsInfo(table, _timeoutMsec);
+ return reader.getConsumingSegmentsInfo(table, TIMEOUT_MSEC);
}
private TableStatus.IngestionStatus testRunnerIngestionStatus(final String[] servers,
@@ -207,7 +197,7 @@ public class ConsumingSegmentInfoReaderTest {
throws InvalidConfigException {
mockSetup(servers, consumingSegments);
ConsumingSegmentInfoReader reader = new ConsumingSegmentInfoReader(_executor, _connectionManager, _helix);
- return reader.getIngestionStatus(table, _timeoutMsec);
+ return reader.getIngestionStatus(table, TIMEOUT_MSEC);
}
private void checkIngestionStatus(final String[] servers, final Set<String> consumingSegments,
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSegmentsMetadataTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSegmentsMetadataTest.java
index ff44fc7..46997a5 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSegmentsMetadataTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSegmentsMetadataTest.java
@@ -23,10 +23,8 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.sun.net.httpserver.HttpHandler;
-import com.sun.net.httpserver.HttpServer;
import java.io.IOException;
import java.io.OutputStream;
-import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -38,6 +36,7 @@ import org.apache.commons.httpclient.HttpConnectionManager;
import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.util.ServerSegmentMetadataReader;
+import org.apache.pinot.controller.utils.FakeHttpServer;
import org.apache.pinot.spi.utils.JsonUtils;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
@@ -113,7 +112,7 @@ public class PinotSegmentsMetadataTest {
@AfterClass
public void tearDown() {
for (Map.Entry<String, SegmentsServerMock> fakeServerEntry : _serverMap.entrySet()) {
- fakeServerEntry.getValue()._httpServer.stop(0);
+ fakeServerEntry.getValue().stop();
}
}
@@ -179,12 +178,9 @@ public class PinotSegmentsMetadataTest {
Assert.assertEquals(expectedNonResponsiveServers, totalResponses - metadata.size());
}
- public static class SegmentsServerMock {
+ public static class SegmentsServerMock extends FakeHttpServer {
String _segment;
- String _endpoint;
- InetSocketAddress _socket = new InetSocketAddress(0);
String _segmentMetadata;
- HttpServer _httpServer;
public SegmentsServerMock(String segment) {
_segment = segment;
@@ -197,14 +193,6 @@ public class PinotSegmentsMetadataTest {
objectNode.put("segmentName", _segment);
_segmentMetadata = JsonUtils.objectToString(objectNode);
}
-
- private void start(String path, HttpHandler handler)
- throws IOException {
- _httpServer = HttpServer.create(_socket, 0);
- _httpServer.createContext(path, handler);
- new Thread(() -> _httpServer.start()).start();
- _endpoint = "http://localhost:" + _httpServer.getAddress().getPort();
- }
}
public static class MetadataConstants {
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableSizeReaderTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableSizeReaderTest.java
index 31dca74..f83fbe6 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableSizeReaderTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableSizeReaderTest.java
@@ -22,10 +22,8 @@ import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
-import com.sun.net.httpserver.HttpServer;
import java.io.IOException;
import java.io.OutputStream;
-import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -43,6 +41,7 @@ import org.apache.pinot.common.restlet.resources.SegmentSizeInfo;
import org.apache.pinot.common.restlet.resources.TableSizeInfo;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.util.TableSizeReader;
+import org.apache.pinot.controller.utils.FakeHttpServer;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.mockito.ArgumentMatchers;
@@ -64,6 +63,7 @@ public class TableSizeReaderTest {
private static final Logger LOGGER = LoggerFactory.getLogger(TableSizeReaderTest.class);
private static final String URI_PATH = "/table/";
private static final int TIMEOUT_MSEC = 10000;
+ private static final int EXTENDED_TIMEOUT_FACTOR = 100;
private final Executor _executor = Executors.newFixedThreadPool(1);
private final HttpConnectionManager _connectionManager = new MultiThreadedHttpConnectionManager();
@@ -127,7 +127,7 @@ public class TableSizeReaderTest {
// server5 ... timing out server
s = new FakeSizeServer(Arrays.asList("s1", "s3"));
- s.start(URI_PATH, createHandler(200, s._sizes, TIMEOUT_MSEC * 100));
+ s.start(URI_PATH, createHandler(200, s._sizes, TIMEOUT_MSEC * EXTENDED_TIMEOUT_FACTOR));
_serverMap.put(serverName(counter), s);
counter++;
}
@@ -135,7 +135,7 @@ public class TableSizeReaderTest {
@AfterClass
public void tearDown() {
for (Map.Entry<String, FakeSizeServer> fakeServerEntry : _serverMap.entrySet()) {
- fakeServerEntry.getValue()._httpServer.stop(0);
+ fakeServerEntry.getValue().stop();
}
}
@@ -170,12 +170,9 @@ public class TableSizeReaderTest {
return "server" + index;
}
- private static class FakeSizeServer {
+ private static class FakeSizeServer extends FakeHttpServer {
List<String> _segments;
- String _endpoint;
- InetSocketAddress _socket = new InetSocketAddress(0);
List<SegmentSizeInfo> _sizes = new ArrayList<>();
- HttpServer _httpServer;
FakeSizeServer(List<String> segments) {
_segments = segments;
@@ -193,19 +190,6 @@ public class TableSizeReaderTest {
int index = Integer.parseInt(segment.substring(1));
return 100 + index * 10;
}
-
- private void start(String path, HttpHandler handler)
- throws IOException {
- _httpServer = HttpServer.create(_socket, 0);
- _httpServer.createContext(path, handler);
- new Thread(new Runnable() {
- @Override
- public void run() {
- _httpServer.start();
- }
- }).start();
- _endpoint = "http://localhost:" + _httpServer.getAddress().getPort();
- }
}
private Map<String, List<String>> subsetOfServerSegments(String... servers) {
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/utils/FakeHttpServer.java b/pinot-controller/src/test/java/org/apache/pinot/controller/utils/FakeHttpServer.java
new file mode 100644
index 0000000..76054c3
--- /dev/null
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/utils/FakeHttpServer.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.utils;
+
+import com.sun.net.httpserver.HttpHandler;
+import com.sun.net.httpserver.HttpServer;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+
+/**
+ * Fake HTTP Server that encapsulates one fake handler.
+ */
+public class FakeHttpServer {
+ public String _endpoint;
+ public InetSocketAddress _socket = new InetSocketAddress(0);
+ public ExecutorService _executorService;
+ public HttpServer _httpServer;
+
+ public FakeHttpServer() {
+ }
+
+ public void start(String path, HttpHandler handler)
+ throws IOException {
+ _executorService = Executors.newCachedThreadPool();
+ _httpServer = HttpServer.create(_socket, 0);
+ _httpServer.setExecutor(_executorService);
+ _httpServer.createContext(path, handler);
+ new Thread(() -> _httpServer.start()).start();
+ _endpoint = "http://localhost:" + _httpServer.getAddress().getPort();
+ }
+
+ public void stop() {
+ _executorService.shutdown();
+ _httpServer.stop(0);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org