You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2021/03/16 07:39:23 UTC
[incubator-pinot] branch master updated: Fix bug #6671:
RealtimeTableDataManager shuts down SegmentBuildTimeLeaseExtender for all
tables in the host (#6682)
This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 4ec38f7 Fix bug #6671: RealtimeTableDataManager shuts down SegmentBuildTimeLeaseExtender for all tables in the host (#6682)
4ec38f7 is described below
commit 4ec38f79315d4017e5e2ac45e8989fa7fc4584fa
Author: Jiapeng Tao <ji...@linkedin.com>
AuthorDate: Tue Mar 16 00:38:58 2021 -0700
Fix bug #6671: RealtimeTableDataManager shuts down SegmentBuildTimeLeaseExtender for all tables in the host (#6682)
Co-authored-by: Jiapeng Tao <ji...@jiatao-mn1.linkedin.biz>
---
.../realtime/LLRealtimeSegmentDataManager.java | 2 +-
.../manager/realtime/RealtimeTableDataManager.java | 2 +-
.../realtime/SegmentBuildTimeLeaseExtender.java | 59 +++++++++++++++-------
.../realtime/LLRealtimeSegmentDataManagerTest.java | 37 ++++++++++++--
.../starter/helix/HelixInstanceDataManager.java | 8 ++-
5 files changed, 83 insertions(+), 25 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 82a86d2..89a7210 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -1115,7 +1115,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
_serverMetrics = serverMetrics;
_segmentVersion = indexLoadingConfig.getSegmentVersion();
_instanceId = _realtimeTableDataManager.getServerInstance();
- _leaseExtender = SegmentBuildTimeLeaseExtender.getLeaseExtender(_instanceId);
+ _leaseExtender = SegmentBuildTimeLeaseExtender.getLeaseExtender(_tableNameWithType);
_protocolHandler = new ServerSegmentCompletionProtocolHandler(_serverMetrics, _tableNameWithType);
String timeColumnName = tableConfig.getValidationConfig().getTimeColumnName();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 73d30e1..7417c3a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -122,7 +122,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
@Override
protected void doInit() {
- _leaseExtender = SegmentBuildTimeLeaseExtender.create(_instanceId, _serverMetrics, _tableNameWithType);
+ _leaseExtender = SegmentBuildTimeLeaseExtender.getOrCreate(_instanceId, _serverMetrics, _tableNameWithType);
File statsFile = new File(_tableDataDir, STATS_FILE_NAME);
try {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java
index 69d7e80..725dc95 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java
@@ -18,8 +18,8 @@
*/
package org.apache.pinot.core.data.manager.realtime;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.Uninterruptibles;
-import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
@@ -45,42 +45,63 @@ public class SegmentBuildTimeLeaseExtender {
private static final int EXTRA_TIME_SECONDS = 120;
// Retransmit lease request 10% before lease expires.
private static final int REPEAT_REQUEST_PERIOD_SEC = (EXTRA_TIME_SECONDS * 9 / 10);
- private static Logger LOGGER = LoggerFactory.getLogger(SegmentBuildTimeLeaseExtender.class);
- private static final Map<String, SegmentBuildTimeLeaseExtender> INSTANCE_TO_LEASE_EXTENDER = new HashMap<>(1);
+ private static final Logger LOGGER = LoggerFactory.getLogger(SegmentBuildTimeLeaseExtender.class);
+ private static final Map<String, SegmentBuildTimeLeaseExtender> TABLE_TO_LEASE_EXTENDER = new ConcurrentHashMap<>();
+ private static ScheduledExecutorService _executor;
- private ScheduledExecutorService _executor;
private final Map<String, Future> _segmentToFutureMap = new ConcurrentHashMap<>();
private final String _instanceId;
+ private final String _tableNameWithType;
private final ServerSegmentCompletionProtocolHandler _protocolHandler;
- public static SegmentBuildTimeLeaseExtender getLeaseExtender(final String instanceId) {
- return INSTANCE_TO_LEASE_EXTENDER.get(instanceId);
+ public static void initExecutor() {
+ _executor = new ScheduledThreadPoolExecutor(1);
}
- public static synchronized SegmentBuildTimeLeaseExtender create(final String instanceId,
- ServerMetrics serverMetrics, String tableNameWithType) {
- SegmentBuildTimeLeaseExtender leaseExtender = INSTANCE_TO_LEASE_EXTENDER.get(instanceId);
- if (leaseExtender != null) {
- LOGGER.warn("Instance already exists");
- } else {
- leaseExtender = new SegmentBuildTimeLeaseExtender(instanceId, serverMetrics, tableNameWithType);
- INSTANCE_TO_LEASE_EXTENDER.put(instanceId, leaseExtender);
+ public static void shutdownExecutor() {
+ if (_executor != null) {
+ _executor.shutdownNow();
+ _executor = null;
}
- return leaseExtender;
+ }
+
+ @VisibleForTesting
+ public static boolean isExecutorShutdown() {
+ return _executor == null;
+ }
+
+ public static SegmentBuildTimeLeaseExtender getLeaseExtender(final String tableNameWithType) {
+ return TABLE_TO_LEASE_EXTENDER.get(tableNameWithType);
+ }
+
+ public static SegmentBuildTimeLeaseExtender getOrCreate(final String instanceId,
+ ServerMetrics serverMetrics, String tableNameWithType) {
+ return TABLE_TO_LEASE_EXTENDER.compute(tableNameWithType, (k, v) -> {
+ if (v == null) {
+ return new SegmentBuildTimeLeaseExtender(instanceId, serverMetrics, tableNameWithType);
+ } else {
+ LOGGER.warn("Lease extender for Table: {} already exists", tableNameWithType);
+ return v;
+ }
+ });
}
private SegmentBuildTimeLeaseExtender(String instanceId, ServerMetrics serverMetrics, String tableNameWithType) {
_instanceId = instanceId;
+ _tableNameWithType = tableNameWithType;
_protocolHandler = new ServerSegmentCompletionProtocolHandler(serverMetrics, tableNameWithType);
- _executor = new ScheduledThreadPoolExecutor(1);
}
public void shutDown() {
- if (_executor != null) {
- _executor.shutdownNow();
- _executor = null;
+ for (Map.Entry<String, Future> entry : _segmentToFutureMap.entrySet()) {
+ Future future = entry.getValue();
+ boolean cancelled = future.cancel(true);
+ if (!cancelled) {
+ LOGGER.warn("Task could not be cancelled for {}" + entry.getKey());
+ }
}
_segmentToFutureMap.clear();
+ TABLE_TO_LEASE_EXTENDER.remove(_tableNameWithType);
}
/**
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
index 6da00df..39cf466 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
@@ -29,6 +29,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.FileUtils;
+import org.apache.helix.HelixManager;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
@@ -36,7 +38,11 @@ import org.apache.pinot.common.metrics.PinotMetricUtils;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.config.TableConfigUtils;
+import org.apache.pinot.core.data.manager.TableDataManager;
import org.apache.pinot.core.data.manager.config.InstanceDataManagerConfig;
+import org.apache.pinot.core.data.manager.config.TableDataManagerConfig;
+import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
import org.apache.pinot.core.indexsegment.mutable.MutableSegmentImpl;
import org.apache.pinot.core.realtime.impl.RealtimeSegmentStatsHistory;
import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConsumerFactory;
@@ -58,6 +64,8 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -128,9 +136,9 @@ public class LLRealtimeSegmentDataManagerTest {
return JsonUtils.stringToObject(_tableConfigJson, TableConfig.class);
}
- private RealtimeTableDataManager createTableDataManager() {
+ private RealtimeTableDataManager createTableDataManager(TableConfig tableConfig) {
final String instanceId = "server-1";
- SegmentBuildTimeLeaseExtender.create(instanceId, new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), _tableName);
+ SegmentBuildTimeLeaseExtender.getOrCreate(instanceId, new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), tableConfig.getTableName());
RealtimeTableDataManager tableDataManager = mock(RealtimeTableDataManager.class);
when(tableDataManager.getServerInstance()).thenReturn(instanceId);
RealtimeSegmentStatsHistory statsHistory = mock(RealtimeSegmentStatsHistory.class);
@@ -154,7 +162,7 @@ public class LLRealtimeSegmentDataManagerTest {
LLCRealtimeSegmentZKMetadata segmentZKMetadata = createZkMetadata();
TableConfig tableConfig = createTableConfig();
InstanceZKMetadata instanceZKMetadata = new InstanceZKMetadata();
- RealtimeTableDataManager tableDataManager = createTableDataManager();
+ RealtimeTableDataManager tableDataManager = createTableDataManager(tableConfig);
String resourceDir = _segmentDir;
LLCSegmentName llcSegmentName = new LLCSegmentName(_segmentNameStr);
_partitionGroupIdToSemaphoreMap.putIfAbsent(_partitionGroupId, new Semaphore(1));
@@ -169,11 +177,13 @@ public class LLRealtimeSegmentDataManagerTest {
@BeforeClass
public void setUp() {
_segmentDirFile.deleteOnExit();
+ SegmentBuildTimeLeaseExtender.initExecutor();
}
@AfterClass
public void tearDown() {
FileUtils.deleteQuietly(_segmentDirFile);
+ SegmentBuildTimeLeaseExtender.shutdownExecutor();
}
@Test
@@ -765,6 +775,27 @@ public class LLRealtimeSegmentDataManagerTest {
Assert.assertEquals(secondSegmentDataManager.get().getPartitionGroupConsumerSemaphore().availablePermits(), 1);
}
+ @Test
+ public void testShutdownTableDataManagerWillNotShutdownLeaseExtenderExecutor()
+ throws Exception {
+ TableConfig tableConfig = createTableConfig();
+ tableConfig.setUpsertConfig(null);
+ ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class);
+ when(propertyStore.get(anyString(), any(), anyInt())).thenReturn(TableConfigUtils.toZNRecord(tableConfig));
+
+ TableDataManagerConfig tableDataManagerConfig = mock(TableDataManagerConfig.class);
+ when(tableDataManagerConfig.getTableDataManagerType()).thenReturn("REALTIME");
+ when(tableDataManagerConfig.getTableName()).thenReturn(tableConfig.getTableName());
+ when(tableDataManagerConfig.getDataDir()).thenReturn(FileUtils.getTempDirectoryPath());
+
+ TableDataManager tableDataManager = TableDataManagerProvider
+ .getTableDataManager(tableDataManagerConfig, "testInstance", propertyStore, mock(ServerMetrics.class),
+ mock(HelixManager.class));
+ tableDataManager.start();
+ tableDataManager.shutDown();
+ Assert.assertFalse(SegmentBuildTimeLeaseExtender.isExecutorShutdown());
+ }
+
public static class FakeLLRealtimeSegmentDataManager extends LLRealtimeSegmentDataManager {
public Field _state;
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index deb3a6a..30ba125 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -41,6 +41,7 @@ import org.apache.pinot.core.data.manager.SegmentDataManager;
import org.apache.pinot.core.data.manager.TableDataManager;
import org.apache.pinot.core.data.manager.config.TableDataManagerConfig;
import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
+import org.apache.pinot.core.data.manager.realtime.SegmentBuildTimeLeaseExtender;
import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.core.indexsegment.mutable.MutableSegmentImpl;
@@ -91,9 +92,12 @@ public class HelixInstanceDataManager implements InstanceDataManager {
Preconditions.checkState(instanceSegmentTarDir.mkdirs());
}
+ // Initialize segment build time lease extender executor
+ SegmentBuildTimeLeaseExtender.initExecutor();
+ LOGGER.info("Initialized segment build time lease extender executor");
+
// Initialize the table data manager provider
TableDataManagerProvider.init(_instanceDataManagerConfig);
-
LOGGER.info("Initialized Helix instance data manager");
}
@@ -108,6 +112,8 @@ public class HelixInstanceDataManager implements InstanceDataManager {
for (TableDataManager tableDataManager : _tableDataManagerMap.values()) {
tableDataManager.shutDown();
}
+ SegmentBuildTimeLeaseExtender.shutdownExecutor();
+ LOGGER.info("Segment build time lease extender executor shut down");
LOGGER.info("Helix instance data manager shut down");
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org