You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/04/25 19:45:47 UTC

[pinot] branch master updated: Segment lifecycle event listener support (#10536)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d73e58fab Segment lifecycle event listener support (#10536)
6d73e58fab is described below

commit 6d73e58fabc92287a829e67ee52f66fb1221111f
Author: Saurabh Dubey <sa...@gmail.com>
AuthorDate: Wed Apr 26 01:15:38 2023 +0530

    Segment lifecycle event listener support (#10536)
---
 .../pinot/controller/BaseControllerStarter.java    |  4 +
 .../helix/core/SegmentDeletionManager.java         |  6 ++
 .../PinotSegmentLifecycleEventListener.java        | 30 +++++++
 .../PinotSegmentLifecycleEventListenerManager.java | 96 ++++++++++++++++++++++
 .../lifecycle/SegmentLifecycleEventDetails.java    | 29 +++++++
 .../lifecycle/SegmentLifecycleEventListener.java   | 30 +++++++
 .../lifecycle/SegmentLifecycleEventType.java       | 23 ++++++
 .../impl/SegmentDeletionEventDetails.java          | 48 +++++++++++
 8 files changed, 266 insertions(+)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 381097a1b6..3467bc05e0 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -98,6 +98,7 @@ import org.apache.pinot.controller.validation.RealtimeSegmentValidationManager;
 import org.apache.pinot.core.periodictask.PeriodicTask;
 import org.apache.pinot.core.periodictask.PeriodicTaskScheduler;
 import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
+import org.apache.pinot.core.segment.processing.lifecycle.PinotSegmentLifecycleEventListenerManager;
 import org.apache.pinot.core.transport.ListenerConfig;
 import org.apache.pinot.core.util.ListenerConfigUtil;
 import org.apache.pinot.spi.crypt.PinotCrypterFactory;
@@ -409,6 +410,9 @@ public abstract class BaseControllerStarter implements ServiceStartable {
     LOGGER.info("Starting Pinot Helix resource manager and connecting to Zookeeper");
     _helixResourceManager.start(_helixParticipantManager);
 
+    // Initialize segment lifecycle event listeners
+    PinotSegmentLifecycleEventListenerManager.getInstance().init(_helixParticipantManager);
+
     LOGGER.info("Starting task resource manager");
     _helixTaskResourceManager =
         new PinotHelixTaskResourceManager(_helixResourceManager, new TaskDriver(_helixParticipantManager),
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
index ecc7d337e8..ebde702b09 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
@@ -45,6 +45,8 @@ import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.utils.SegmentName;
 import org.apache.pinot.common.utils.URIUtils;
 import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.core.segment.processing.lifecycle.PinotSegmentLifecycleEventListenerManager;
+import org.apache.pinot.core.segment.processing.lifecycle.impl.SegmentDeletionEventDetails;
 import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.filesystem.PinotFS;
@@ -166,6 +168,10 @@ public class SegmentDeletionManager {
         propStorePathList.add(segmentPropertyStorePath);
       }
 
+      // Notify all active listeners here
+      PinotSegmentLifecycleEventListenerManager.getInstance()
+          .notifyListeners(new SegmentDeletionEventDetails(tableName, segmentsToDelete));
+
       boolean[] deleteSuccessful = _propertyStore.remove(propStorePathList, AccessOption.PERSISTENT);
       List<String> propStoreFailedSegs = new ArrayList<>(segmentsToDelete.size());
       for (int i = 0; i < deleteSuccessful.length; i++) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/PinotSegmentLifecycleEventListener.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/PinotSegmentLifecycleEventListener.java
new file mode 100644
index 0000000000..c7c510ae2d
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/PinotSegmentLifecycleEventListener.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.lifecycle;
+
+import org.apache.helix.HelixManager;
+
+
+public interface PinotSegmentLifecycleEventListener {
+  SegmentLifecycleEventType getType();
+
+  void init(HelixManager helixManager);
+
+  void onEvent(SegmentLifecycleEventDetails event);
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/PinotSegmentLifecycleEventListenerManager.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/PinotSegmentLifecycleEventListenerManager.java
new file mode 100644
index 0000000000..5269c3f797
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/PinotSegmentLifecycleEventListenerManager.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.lifecycle;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.HelixManager;
+import org.apache.pinot.spi.utils.PinotReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class PinotSegmentLifecycleEventListenerManager {
+  private static final Logger LOGGER = LoggerFactory.getLogger(PinotSegmentLifecycleEventListenerManager.class);
+  private static final PinotSegmentLifecycleEventListenerManager INSTANCE =
+      new PinotSegmentLifecycleEventListenerManager();
+  private Map<SegmentLifecycleEventType, List<PinotSegmentLifecycleEventListener>> _eventTypeToListenersMap;
+  private boolean _initialized = false;
+
+  private PinotSegmentLifecycleEventListenerManager() {
+  }
+
+  public static PinotSegmentLifecycleEventListenerManager getInstance() {
+    return INSTANCE;
+  }
+
+  public synchronized void init(HelixManager helixZkManager) {
+    if (_initialized) {
+      LOGGER.warn("Segment lifecycle event listener manager already initialized, skipping it");
+      return;
+    }
+    _eventTypeToListenersMap = new HashMap<>();
+    Set<Class<?>> classes =
+        PinotReflectionUtils.getClassesThroughReflection(".*\\.plugin\\.segment\\.lifecycle\\.listener\\..*",
+            SegmentLifecycleEventListener.class);
+    for (Class<?> clazz : classes) {
+      SegmentLifecycleEventListener annotation = clazz.getAnnotation(SegmentLifecycleEventListener.class);
+      if (annotation.enabled()) {
+        try {
+          PinotSegmentLifecycleEventListener pinotSegmentLifecycleEventListener =
+              (PinotSegmentLifecycleEventListener) clazz.newInstance();
+          pinotSegmentLifecycleEventListener.init(helixZkManager);
+          _eventTypeToListenersMap.compute(pinotSegmentLifecycleEventListener.getType(), (key, list) -> {
+            if (list == null) {
+              list = new ArrayList<>();
+            }
+            list.add(pinotSegmentLifecycleEventListener);
+            return list;
+          });
+        } catch (Exception e) {
+          LOGGER.error("Caught exception while initializing segment lifecyle event listener : {}, skipping it", clazz,
+              e);
+        }
+      }
+    }
+
+    _initialized = true;
+  }
+
+  public void notifyListeners(SegmentLifecycleEventDetails event) {
+    if (!_initialized) {
+      LOGGER.warn("Segment lifecycle event listener manager not initialized, skipping it");
+      return;
+    }
+
+    List<PinotSegmentLifecycleEventListener> listeners = _eventTypeToListenersMap.get(event.getType());
+    if (listeners != null) {
+      for (PinotSegmentLifecycleEventListener listener : listeners) {
+        try {
+          listener.onEvent(event);
+        } catch (Exception e) {
+          LOGGER.error("Segment lifecycle listener call failed : ", e);
+        }
+      }
+    }
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/SegmentLifecycleEventDetails.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/SegmentLifecycleEventDetails.java
new file mode 100644
index 0000000000..c59435455c
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/SegmentLifecycleEventDetails.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.lifecycle;
+
+import java.util.List;
+
+public interface SegmentLifecycleEventDetails {
+  SegmentLifecycleEventType getType();
+
+  List<String> getSegments();
+
+  String getTableNameWithType();
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/SegmentLifecycleEventListener.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/SegmentLifecycleEventListener.java
new file mode 100644
index 0000000000..0a4225e529
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/SegmentLifecycleEventListener.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.lifecycle;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface SegmentLifecycleEventListener {
+  boolean enabled() default true;
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/SegmentLifecycleEventType.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/SegmentLifecycleEventType.java
new file mode 100644
index 0000000000..9559b9547f
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/SegmentLifecycleEventType.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.lifecycle;
+
+public enum SegmentLifecycleEventType {
+  DELETION
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/impl/SegmentDeletionEventDetails.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/impl/SegmentDeletionEventDetails.java
new file mode 100644
index 0000000000..eb839a26a6
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/impl/SegmentDeletionEventDetails.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.lifecycle.impl;
+
+import java.util.List;
+import org.apache.pinot.core.segment.processing.lifecycle.SegmentLifecycleEventDetails;
+import org.apache.pinot.core.segment.processing.lifecycle.SegmentLifecycleEventType;
+
+public class SegmentDeletionEventDetails implements SegmentLifecycleEventDetails {
+  private final List<String> _segmentsDeleted;
+  private final String _tableName;
+
+  public SegmentDeletionEventDetails(String tableName, List<String> segmentsDeleted) {
+    _tableName = tableName;
+    _segmentsDeleted = segmentsDeleted;
+  }
+
+  @Override
+  public SegmentLifecycleEventType getType() {
+    return SegmentLifecycleEventType.DELETION;
+  }
+
+  @Override
+  public List<String> getSegments() {
+    return _segmentsDeleted;
+  }
+
+  @Override
+  public String getTableNameWithType() {
+    return _tableName;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org