You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/04/25 19:45:47 UTC
[pinot] branch master updated: Segment lifecycle event listener support (#10536)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 6d73e58fab Segment lifecycle event listener support (#10536)
6d73e58fab is described below
commit 6d73e58fabc92287a829e67ee52f66fb1221111f
Author: Saurabh Dubey <sa...@gmail.com>
AuthorDate: Wed Apr 26 01:15:38 2023 +0530
Segment lifecycle event listener support (#10536)
---
.../pinot/controller/BaseControllerStarter.java | 4 +
.../helix/core/SegmentDeletionManager.java | 6 ++
.../PinotSegmentLifecycleEventListener.java | 30 +++++++
.../PinotSegmentLifecycleEventListenerManager.java | 96 ++++++++++++++++++++++
.../lifecycle/SegmentLifecycleEventDetails.java | 29 +++++++
.../lifecycle/SegmentLifecycleEventListener.java | 30 +++++++
.../lifecycle/SegmentLifecycleEventType.java | 23 ++++++
.../impl/SegmentDeletionEventDetails.java | 48 +++++++++++
8 files changed, 266 insertions(+)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 381097a1b6..3467bc05e0 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -98,6 +98,7 @@ import org.apache.pinot.controller.validation.RealtimeSegmentValidationManager;
import org.apache.pinot.core.periodictask.PeriodicTask;
import org.apache.pinot.core.periodictask.PeriodicTaskScheduler;
import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
+import org.apache.pinot.core.segment.processing.lifecycle.PinotSegmentLifecycleEventListenerManager;
import org.apache.pinot.core.transport.ListenerConfig;
import org.apache.pinot.core.util.ListenerConfigUtil;
import org.apache.pinot.spi.crypt.PinotCrypterFactory;
@@ -409,6 +410,9 @@ public abstract class BaseControllerStarter implements ServiceStartable {
LOGGER.info("Starting Pinot Helix resource manager and connecting to Zookeeper");
_helixResourceManager.start(_helixParticipantManager);
+ // Initialize segment lifecycle event listeners
+ PinotSegmentLifecycleEventListenerManager.getInstance().init(_helixParticipantManager);
+
LOGGER.info("Starting task resource manager");
_helixTaskResourceManager =
new PinotHelixTaskResourceManager(_helixResourceManager, new TaskDriver(_helixParticipantManager),
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
index ecc7d337e8..ebde702b09 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
@@ -45,6 +45,8 @@ import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.utils.SegmentName;
import org.apache.pinot.common.utils.URIUtils;
import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.core.segment.processing.lifecycle.PinotSegmentLifecycleEventListenerManager;
+import org.apache.pinot.core.segment.processing.lifecycle.impl.SegmentDeletionEventDetails;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.filesystem.PinotFS;
@@ -166,6 +168,10 @@ public class SegmentDeletionManager {
propStorePathList.add(segmentPropertyStorePath);
}
+ // Notify all active listeners here
+ PinotSegmentLifecycleEventListenerManager.getInstance()
+ .notifyListeners(new SegmentDeletionEventDetails(tableName, segmentsToDelete));
+
boolean[] deleteSuccessful = _propertyStore.remove(propStorePathList, AccessOption.PERSISTENT);
List<String> propStoreFailedSegs = new ArrayList<>(segmentsToDelete.size());
for (int i = 0; i < deleteSuccessful.length; i++) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/PinotSegmentLifecycleEventListener.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/PinotSegmentLifecycleEventListener.java
new file mode 100644
index 0000000000..c7c510ae2d
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/PinotSegmentLifecycleEventListener.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.lifecycle;
+
+import org.apache.helix.HelixManager;
+
+
+public interface PinotSegmentLifecycleEventListener {
+ SegmentLifecycleEventType getType();
+
+ void init(HelixManager helixManager);
+
+ void onEvent(SegmentLifecycleEventDetails event);
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/PinotSegmentLifecycleEventListenerManager.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/PinotSegmentLifecycleEventListenerManager.java
new file mode 100644
index 0000000000..5269c3f797
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/PinotSegmentLifecycleEventListenerManager.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.lifecycle;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.HelixManager;
+import org.apache.pinot.spi.utils.PinotReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class PinotSegmentLifecycleEventListenerManager {
+ private static final Logger LOGGER = LoggerFactory.getLogger(PinotSegmentLifecycleEventListenerManager.class);
+ private static final PinotSegmentLifecycleEventListenerManager INSTANCE =
+ new PinotSegmentLifecycleEventListenerManager();
+ private Map<SegmentLifecycleEventType, List<PinotSegmentLifecycleEventListener>> _eventTypeToListenersMap;
+ private boolean _initialized = false;
+
+ private PinotSegmentLifecycleEventListenerManager() {
+ }
+
+ public static PinotSegmentLifecycleEventListenerManager getInstance() {
+ return INSTANCE;
+ }
+
+ public synchronized void init(HelixManager helixZkManager) {
+ if (_initialized) {
+ LOGGER.warn("Segment lifecycle event listener manager already initialized, skipping it");
+ return;
+ }
+ _eventTypeToListenersMap = new HashMap<>();
+ Set<Class<?>> classes =
+ PinotReflectionUtils.getClassesThroughReflection(".*\\.plugin\\.segment\\.lifecycle\\.listener\\..*",
+ SegmentLifecycleEventListener.class);
+ for (Class<?> clazz : classes) {
+ SegmentLifecycleEventListener annotation = clazz.getAnnotation(SegmentLifecycleEventListener.class);
+ if (annotation.enabled()) {
+ try {
+ PinotSegmentLifecycleEventListener pinotSegmentLifecycleEventListener =
+ (PinotSegmentLifecycleEventListener) clazz.newInstance();
+ pinotSegmentLifecycleEventListener.init(helixZkManager);
+ _eventTypeToListenersMap.compute(pinotSegmentLifecycleEventListener.getType(), (key, list) -> {
+ if (list == null) {
+ list = new ArrayList<>();
+ }
+ list.add(pinotSegmentLifecycleEventListener);
+ return list;
+ });
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while initializing segment lifecyle event listener : {}, skipping it", clazz,
+ e);
+ }
+ }
+ }
+
+ _initialized = true;
+ }
+
+ public void notifyListeners(SegmentLifecycleEventDetails event) {
+ if (!_initialized) {
+ LOGGER.warn("Segment lifecycle event listener manager not initialized, skipping it");
+ return;
+ }
+
+ List<PinotSegmentLifecycleEventListener> listeners = _eventTypeToListenersMap.get(event.getType());
+ if (listeners != null) {
+ for (PinotSegmentLifecycleEventListener listener : listeners) {
+ try {
+ listener.onEvent(event);
+ } catch (Exception e) {
+ LOGGER.error("Segment lifecycle listener call failed : ", e);
+ }
+ }
+ }
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/SegmentLifecycleEventDetails.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/SegmentLifecycleEventDetails.java
new file mode 100644
index 0000000000..c59435455c
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/SegmentLifecycleEventDetails.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.lifecycle;
+
+import java.util.List;
+
+public interface SegmentLifecycleEventDetails {
+ SegmentLifecycleEventType getType();
+
+ List<String> getSegments();
+
+ String getTableNameWithType();
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/SegmentLifecycleEventListener.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/SegmentLifecycleEventListener.java
new file mode 100644
index 0000000000..0a4225e529
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/SegmentLifecycleEventListener.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.lifecycle;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface SegmentLifecycleEventListener {
+ boolean enabled() default true;
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/SegmentLifecycleEventType.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/SegmentLifecycleEventType.java
new file mode 100644
index 0000000000..9559b9547f
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/SegmentLifecycleEventType.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.lifecycle;
+
+public enum SegmentLifecycleEventType {
+ DELETION
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/impl/SegmentDeletionEventDetails.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/impl/SegmentDeletionEventDetails.java
new file mode 100644
index 0000000000..eb839a26a6
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/impl/SegmentDeletionEventDetails.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.lifecycle.impl;
+
+import java.util.List;
+import org.apache.pinot.core.segment.processing.lifecycle.SegmentLifecycleEventDetails;
+import org.apache.pinot.core.segment.processing.lifecycle.SegmentLifecycleEventType;
+
+public class SegmentDeletionEventDetails implements SegmentLifecycleEventDetails {
+ private final List<String> _segmentsDeleted;
+ private final String _tableName;
+
+ public SegmentDeletionEventDetails(String tableName, List<String> segmentsDeleted) {
+ _tableName = tableName;
+ _segmentsDeleted = segmentsDeleted;
+ }
+
+ @Override
+ public SegmentLifecycleEventType getType() {
+ return SegmentLifecycleEventType.DELETION;
+ }
+
+ @Override
+ public List<String> getSegments() {
+ return _segmentsDeleted;
+ }
+
+ @Override
+ public String getTableNameWithType() {
+ return _tableName;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org