You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/10/01 19:28:12 UTC

[pinot] branch master updated: fix pinot-controller/broker recent fork timeout (#7490)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b42278  fix pinot-controller/broker recent fork timeout (#7490)
9b42278 is described below

commit 9b42278690a4ef295fa5f86b73cb4f25ab047db3
Author: Rong Rong <wa...@gmail.com>
AuthorDate: Fri Oct 1 12:27:01 2021 -0700

    fix pinot-controller/broker recent fork timeout (#7490)
---
 .../routing/segmentpruner/SegmentPrunerTest.java   | 11 ++---
 .../timeboundary/TimeBoundaryManagerTest.java      | 11 ++---
 .../api/ConsumingSegmentInfoReaderTest.java        | 28 ++++-------
 .../controller/api/PinotSegmentsMetadataTest.java  | 18 ++-----
 .../pinot/controller/api/TableSizeReaderTest.java  | 26 ++--------
 .../pinot/controller/utils/FakeHttpServer.java     | 55 ++++++++++++++++++++++
 6 files changed, 82 insertions(+), 67 deletions(-)

diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
index 4e8d11a..2612bcb 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
@@ -37,7 +37,7 @@ import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.request.BrokerRequest;
-import org.apache.pinot.common.utils.ZkStarter;
+import org.apache.pinot.controller.helix.ControllerTest;
 import org.apache.pinot.parsers.QueryCompiler;
 import org.apache.pinot.pql.parsers.Pql2Compiler;
 import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
@@ -66,7 +66,7 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 
 
-public class SegmentPrunerTest {
+public class SegmentPrunerTest extends ControllerTest {
   private static final String RAW_TABLE_NAME = "testTable";
   private static final String OFFLINE_TABLE_NAME = "testTable_OFFLINE";
   private static final String REALTIME_TABLE_NAME = "testTable_REALTIME";
@@ -95,15 +95,14 @@ public class SegmentPrunerTest {
   private static final String SDF_QUERY_5 =
       "SELECT * FROM testTable where timeColumn in (20200101, 20200102) AND timeColumn >= 20200530";
 
-  private ZkStarter.ZookeeperInstance _zkInstance;
   private ZkClient _zkClient;
   private ZkHelixPropertyStore<ZNRecord> _propertyStore;
 
   @BeforeClass
   public void setUp() {
-    _zkInstance = ZkStarter.startLocalZkServer();
+    startZk();
     _zkClient =
-        new ZkClient(_zkInstance.getZkUrl(), ZkClient.DEFAULT_SESSION_TIMEOUT, ZkClient.DEFAULT_CONNECTION_TIMEOUT,
+        new ZkClient(getZkUrl(), ZkClient.DEFAULT_SESSION_TIMEOUT, ZkClient.DEFAULT_CONNECTION_TIMEOUT,
             new ZNRecordSerializer());
     _propertyStore =
         new ZkHelixPropertyStore<>(new ZkBaseDataAccessor<>(_zkClient), "/SegmentPrunerTest/PROPERTYSTORE", null);
@@ -112,7 +111,7 @@ public class SegmentPrunerTest {
   @AfterClass
   public void tearDown() {
     _zkClient.close();
-    ZkStarter.stopLocalZkServer(_zkInstance);
+    stopZk();
   }
 
   @Test
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java
index 71fcfbd..390bfda 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java
@@ -32,7 +32,7 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
-import org.apache.pinot.common.utils.ZkStarter;
+import org.apache.pinot.controller.helix.ControllerTest;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -53,18 +53,17 @@ import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 
 
-public class TimeBoundaryManagerTest {
+public class TimeBoundaryManagerTest extends ControllerTest {
   private static final String TIME_COLUMN = "time";
 
-  private ZkStarter.ZookeeperInstance _zkInstance;
   private ZkClient _zkClient;
   private ZkHelixPropertyStore<ZNRecord> _propertyStore;
 
   @BeforeClass
   public void setUp() {
-    _zkInstance = ZkStarter.startLocalZkServer();
+    startZk();
     _zkClient =
-        new ZkClient(_zkInstance.getZkUrl(), ZkClient.DEFAULT_SESSION_TIMEOUT, ZkClient.DEFAULT_CONNECTION_TIMEOUT,
+        new ZkClient(getZkUrl(), ZkClient.DEFAULT_SESSION_TIMEOUT, ZkClient.DEFAULT_CONNECTION_TIMEOUT,
             new ZNRecordSerializer());
     _propertyStore =
         new ZkHelixPropertyStore<>(new ZkBaseDataAccessor<>(_zkClient), "/TimeBoundaryManagerTest/PROPERTYSTORE", null);
@@ -73,7 +72,7 @@ public class TimeBoundaryManagerTest {
   @AfterClass
   public void tearDown() {
     _zkClient.close();
-    ZkStarter.stopLocalZkServer(_zkInstance);
+    stopZk();
   }
 
   @Test
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/ConsumingSegmentInfoReaderTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/ConsumingSegmentInfoReaderTest.java
index a019f6b..b83af3d 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/ConsumingSegmentInfoReaderTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/ConsumingSegmentInfoReaderTest.java
@@ -23,10 +23,8 @@ import com.google.common.collect.HashBiMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.sun.net.httpserver.HttpHandler;
-import com.sun.net.httpserver.HttpServer;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.net.InetSocketAddress;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -43,6 +41,7 @@ import org.apache.pinot.common.exception.InvalidConfigException;
 import org.apache.pinot.common.restlet.resources.SegmentConsumerInfo;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.util.ConsumingSegmentInfoReader;
+import org.apache.pinot.controller.utils.FakeHttpServer;
 import org.apache.pinot.spi.config.table.TableStatus;
 import org.apache.pinot.spi.utils.CommonConstants.ConsumerState;
 import org.apache.pinot.spi.utils.JsonUtils;
@@ -68,12 +67,14 @@ public class ConsumingSegmentInfoReaderTest {
   private static final String TABLE_NAME = "myTable_REALTIME";
   private static final String SEGMENT_NAME_PARTITION_0 = "table__0__29__12345";
   private static final String SEGMENT_NAME_PARTITION_1 = "table__1__32__12345";
+  private static final int TIMEOUT_MSEC = 10000;
+  private static final int EXTENDED_TIMEOUT_FACTOR = 100;
 
   private final Executor _executor = Executors.newFixedThreadPool(1);
   private final HttpConnectionManager _connectionManager = new MultiThreadedHttpConnectionManager();
   private PinotHelixResourceManager _helix;
   private final Map<String, FakeConsumingInfoServer> _serverMap = new HashMap<>();
-  private final int _timeoutMsec = 10000;
+
 
   @BeforeClass
   public void setUp()
@@ -116,14 +117,14 @@ public class ConsumingSegmentInfoReaderTest {
     FakeConsumingInfoServer s4 = new FakeConsumingInfoServer(Lists
         .newArrayList(new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_0, "CONSUMING", 0, partitionToOffset0),
             new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0, partitionToOffset1)));
-    s4.start(uriPath, createHandler(200, s4._consumerInfos, _timeoutMsec * 1000));
+    s4.start(uriPath, createHandler(200, s4._consumerInfos, TIMEOUT_MSEC * EXTENDED_TIMEOUT_FACTOR));
     _serverMap.put("server4", s4);
   }
 
   @AfterClass
   public void tearDown() {
     for (Map.Entry<String, FakeConsumingInfoServer> fakeServerEntry : _serverMap.entrySet()) {
-      fakeServerEntry.getValue()._httpServer.stop(0);
+      fakeServerEntry.getValue().stop();
     }
   }
 
@@ -148,23 +149,12 @@ public class ConsumingSegmentInfoReaderTest {
   /**
    * Server to return fake consuming segment info
    */
-  private static class FakeConsumingInfoServer {
-    String _endpoint;
-    InetSocketAddress _socket = new InetSocketAddress(0);
+  private static class FakeConsumingInfoServer extends FakeHttpServer {
     List<SegmentConsumerInfo> _consumerInfos;
-    HttpServer _httpServer;
 
     FakeConsumingInfoServer(List<SegmentConsumerInfo> consumerInfos) {
       _consumerInfos = consumerInfos;
     }
-
-    private void start(String path, HttpHandler handler)
-        throws IOException {
-      _httpServer = HttpServer.create(_socket, 0);
-      _httpServer.createContext(path, handler);
-      new Thread(() -> _httpServer.start()).start();
-      _endpoint = "http://localhost:" + _httpServer.getAddress().getPort();
-    }
   }
 
   private Map<String, List<String>> subsetOfServerSegments(String... servers) {
@@ -199,7 +189,7 @@ public class ConsumingSegmentInfoReaderTest {
       throws InvalidConfigException {
     mockSetup(servers, consumingSegments);
     ConsumingSegmentInfoReader reader = new ConsumingSegmentInfoReader(_executor, _connectionManager, _helix);
-    return reader.getConsumingSegmentsInfo(table, _timeoutMsec);
+    return reader.getConsumingSegmentsInfo(table, TIMEOUT_MSEC);
   }
 
   private TableStatus.IngestionStatus testRunnerIngestionStatus(final String[] servers,
@@ -207,7 +197,7 @@ public class ConsumingSegmentInfoReaderTest {
       throws InvalidConfigException {
     mockSetup(servers, consumingSegments);
     ConsumingSegmentInfoReader reader = new ConsumingSegmentInfoReader(_executor, _connectionManager, _helix);
-    return reader.getIngestionStatus(table, _timeoutMsec);
+    return reader.getIngestionStatus(table, TIMEOUT_MSEC);
   }
 
   private void checkIngestionStatus(final String[] servers, final Set<String> consumingSegments,
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSegmentsMetadataTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSegmentsMetadataTest.java
index ff44fc7..46997a5 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSegmentsMetadataTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSegmentsMetadataTest.java
@@ -23,10 +23,8 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
 import com.sun.net.httpserver.HttpHandler;
-import com.sun.net.httpserver.HttpServer;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.net.InetSocketAddress;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -38,6 +36,7 @@ import org.apache.commons.httpclient.HttpConnectionManager;
 import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.util.ServerSegmentMetadataReader;
+import org.apache.pinot.controller.utils.FakeHttpServer;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
@@ -113,7 +112,7 @@ public class PinotSegmentsMetadataTest {
   @AfterClass
   public void tearDown() {
     for (Map.Entry<String, SegmentsServerMock> fakeServerEntry : _serverMap.entrySet()) {
-      fakeServerEntry.getValue()._httpServer.stop(0);
+      fakeServerEntry.getValue().stop();
     }
   }
 
@@ -179,12 +178,9 @@ public class PinotSegmentsMetadataTest {
     Assert.assertEquals(expectedNonResponsiveServers, totalResponses - metadata.size());
   }
 
-  public static class SegmentsServerMock {
+  public static class SegmentsServerMock extends FakeHttpServer {
     String _segment;
-    String _endpoint;
-    InetSocketAddress _socket = new InetSocketAddress(0);
     String _segmentMetadata;
-    HttpServer _httpServer;
 
     public SegmentsServerMock(String segment) {
       _segment = segment;
@@ -197,14 +193,6 @@ public class PinotSegmentsMetadataTest {
       objectNode.put("segmentName", _segment);
       _segmentMetadata = JsonUtils.objectToString(objectNode);
     }
-
-    private void start(String path, HttpHandler handler)
-        throws IOException {
-      _httpServer = HttpServer.create(_socket, 0);
-      _httpServer.createContext(path, handler);
-      new Thread(() -> _httpServer.start()).start();
-      _endpoint = "http://localhost:" + _httpServer.getAddress().getPort();
-    }
   }
 
   public static class MetadataConstants {
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableSizeReaderTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableSizeReaderTest.java
index 31dca74..f83fbe6 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableSizeReaderTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableSizeReaderTest.java
@@ -22,10 +22,8 @@ import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
 import com.sun.net.httpserver.HttpExchange;
 import com.sun.net.httpserver.HttpHandler;
-import com.sun.net.httpserver.HttpServer;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -43,6 +41,7 @@ import org.apache.pinot.common.restlet.resources.SegmentSizeInfo;
 import org.apache.pinot.common.restlet.resources.TableSizeInfo;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.util.TableSizeReader;
+import org.apache.pinot.controller.utils.FakeHttpServer;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.mockito.ArgumentMatchers;
@@ -64,6 +63,7 @@ public class TableSizeReaderTest {
   private static final Logger LOGGER = LoggerFactory.getLogger(TableSizeReaderTest.class);
   private static final String URI_PATH = "/table/";
   private static final int TIMEOUT_MSEC = 10000;
+  private static final int EXTENDED_TIMEOUT_FACTOR = 100;
 
   private final Executor _executor = Executors.newFixedThreadPool(1);
   private final HttpConnectionManager _connectionManager = new MultiThreadedHttpConnectionManager();
@@ -127,7 +127,7 @@ public class TableSizeReaderTest {
 
     // server5 ... timing out server
     s = new FakeSizeServer(Arrays.asList("s1", "s3"));
-    s.start(URI_PATH, createHandler(200, s._sizes, TIMEOUT_MSEC * 100));
+    s.start(URI_PATH, createHandler(200, s._sizes, TIMEOUT_MSEC * EXTENDED_TIMEOUT_FACTOR));
     _serverMap.put(serverName(counter), s);
     counter++;
   }
@@ -135,7 +135,7 @@ public class TableSizeReaderTest {
   @AfterClass
   public void tearDown() {
     for (Map.Entry<String, FakeSizeServer> fakeServerEntry : _serverMap.entrySet()) {
-      fakeServerEntry.getValue()._httpServer.stop(0);
+      fakeServerEntry.getValue().stop();
     }
   }
 
@@ -170,12 +170,9 @@ public class TableSizeReaderTest {
     return "server" + index;
   }
 
-  private static class FakeSizeServer {
+  private static class FakeSizeServer extends FakeHttpServer {
     List<String> _segments;
-    String _endpoint;
-    InetSocketAddress _socket = new InetSocketAddress(0);
     List<SegmentSizeInfo> _sizes = new ArrayList<>();
-    HttpServer _httpServer;
 
     FakeSizeServer(List<String> segments) {
       _segments = segments;
@@ -193,19 +190,6 @@ public class TableSizeReaderTest {
       int index = Integer.parseInt(segment.substring(1));
       return 100 + index * 10;
     }
-
-    private void start(String path, HttpHandler handler)
-        throws IOException {
-      _httpServer = HttpServer.create(_socket, 0);
-      _httpServer.createContext(path, handler);
-      new Thread(new Runnable() {
-        @Override
-        public void run() {
-          _httpServer.start();
-        }
-      }).start();
-      _endpoint = "http://localhost:" + _httpServer.getAddress().getPort();
-    }
   }
 
   private Map<String, List<String>> subsetOfServerSegments(String... servers) {
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/utils/FakeHttpServer.java b/pinot-controller/src/test/java/org/apache/pinot/controller/utils/FakeHttpServer.java
new file mode 100644
index 0000000..76054c3
--- /dev/null
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/utils/FakeHttpServer.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.utils;
+
+import com.sun.net.httpserver.HttpHandler;
+import com.sun.net.httpserver.HttpServer;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+
+/**
+ * Fake HTTP Server that encapsulates one fake handler.
+ */
+public class FakeHttpServer {
+  public String _endpoint;
+  public InetSocketAddress _socket = new InetSocketAddress(0);
+  public ExecutorService _executorService;
+  public HttpServer _httpServer;
+
+  public FakeHttpServer() {
+  }
+
+  public void start(String path, HttpHandler handler)
+      throws IOException {
+    _executorService = Executors.newCachedThreadPool();
+    _httpServer = HttpServer.create(_socket, 0);
+    _httpServer.setExecutor(_executorService);
+    _httpServer.createContext(path, handler);
+    new Thread(() -> _httpServer.start()).start();
+    _endpoint = "http://localhost:" + _httpServer.getAddress().getPort();
+  }
+
+  public void stop() {
+    _executorService.shutdown();
+    _httpServer.stop(0);
+  }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org