You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by dg...@apache.org on 2018/02/26 16:48:55 UTC

incubator-unomi git commit: UNOMI-153 add event when updating camel route (remove, add or update) to make sure route are synchronize in the cluster

Repository: incubator-unomi
Updated Branches:
  refs/heads/master f84401a7a -> bd34ae9e0


UNOMI-153 add event when updating camel route (remove, add or update) to make sure route are synchronize in the cluster


Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/bd34ae9e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/bd34ae9e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/bd34ae9e

Branch: refs/heads/master
Commit: bd34ae9e0d3e6101a9aeb5a0988b3c0c099bbf97
Parents: f84401a
Author: dgaillard <dg...@jahia.com>
Authored: Mon Feb 26 17:48:46 2018 +0100
Committer: dgaillard <dg...@jahia.com>
Committed: Mon Feb 26 17:48:46 2018 +0100

----------------------------------------------------------------------
 .../unomi/api/services/ClusterService.java      |  8 +++
 .../unomi/router/api/IRouterCamelContext.java   |  4 +-
 extensions/router/router-core/pom.xml           | 10 +++
 .../router/core/context/RouterCamelContext.java | 42 ++++++++---
 .../core/event/UpdateCamelRouteEvent.java       | 47 +++++++++++++
 .../event/UpdateCamelRouteEventHandler.java     | 74 ++++++++++++++++++++
 .../resources/OSGI-INF/blueprint/blueprint.xml  | 17 +++++
 .../ExportConfigurationServiceImpl.java         |  4 +-
 .../ImportConfigurationServiceImpl.java         |  4 +-
 .../services/services/ClusterServiceImpl.java   |  9 +++
 10 files changed, 204 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/bd34ae9e/api/src/main/java/org/apache/unomi/api/services/ClusterService.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/unomi/api/services/ClusterService.java b/api/src/main/java/org/apache/unomi/api/services/ClusterService.java
index b851b78..9a0fdfa 100644
--- a/api/src/main/java/org/apache/unomi/api/services/ClusterService.java
+++ b/api/src/main/java/org/apache/unomi/api/services/ClusterService.java
@@ -19,6 +19,7 @@ package org.apache.unomi.api.services;
 
 import org.apache.unomi.api.ClusterNode;
 
+import java.io.Serializable;
 import java.util.Date;
 import java.util.List;
 
@@ -49,4 +50,11 @@ public interface ClusterService {
      */
     void purge(final String scope);
 
+    /**
+     * This function will send an event to the nodes of the cluster
+     * The function takes a Serializable to avoid dependency on any clustering framework
+     *
+     * @param event this object will be cast to {@link org.apache.karaf.cellar.core.event.Event}
+     */
+    void sendEvent(Serializable event);
 }

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/bd34ae9e/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/IRouterCamelContext.java
----------------------------------------------------------------------
diff --git a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/IRouterCamelContext.java b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/IRouterCamelContext.java
index d2d3249..8775b43 100644
--- a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/IRouterCamelContext.java
+++ b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/IRouterCamelContext.java
@@ -21,7 +21,7 @@ package org.apache.unomi.router.api;
  */
 public interface IRouterCamelContext {
 
-    void killExistingRoute(String routeId) throws Exception;
+    void killExistingRoute(String routeId, boolean fireEvent) throws Exception;
 
-    void updateProfileReaderRoute(Object configuration) throws Exception;
+    void updateProfileReaderRoute(Object configuration, boolean fireEvent) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/bd34ae9e/extensions/router/router-core/pom.xml
----------------------------------------------------------------------
diff --git a/extensions/router/router-core/pom.xml b/extensions/router/router-core/pom.xml
index 42e10dc..d393e3b 100644
--- a/extensions/router/router-core/pom.xml
+++ b/extensions/router/router-core/pom.xml
@@ -137,6 +137,16 @@
             <version>0.10.1.0</version>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.karaf.cellar</groupId>
+            <artifactId>org.apache.karaf.cellar.core</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.karaf.cellar</groupId>
+            <artifactId>org.apache.karaf.cellar.config</artifactId>
+            <scope>provided</scope>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/bd34ae9e/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java
----------------------------------------------------------------------
diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java
index 3b18803..03b2e04 100644
--- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java
+++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java
@@ -21,6 +21,7 @@ import org.apache.camel.Route;
 import org.apache.camel.component.jackson.JacksonDataFormat;
 import org.apache.camel.core.osgi.OsgiDefaultCamelContext;
 import org.apache.camel.model.RouteDefinition;
+import org.apache.unomi.api.services.ClusterService;
 import org.apache.unomi.api.services.ConfigSharingService;
 import org.apache.unomi.api.services.ProfileService;
 import org.apache.unomi.persistence.spi.PersistenceService;
@@ -30,6 +31,7 @@ import org.apache.unomi.router.api.ImportConfiguration;
 import org.apache.unomi.router.api.RouterConstants;
 import org.apache.unomi.router.api.services.ImportExportConfigurationService;
 import org.apache.unomi.router.api.services.ProfileExportService;
+import org.apache.unomi.router.core.event.UpdateCamelRouteEvent;
 import org.apache.unomi.router.core.processor.ExportRouteCompletionProcessor;
 import org.apache.unomi.router.core.processor.ImportConfigByFileNameProcessor;
 import org.apache.unomi.router.core.processor.ImportRouteCompletionProcessor;
@@ -70,6 +72,7 @@ public class RouterCamelContext implements SynchronousBundleListener, IRouterCam
     private String allowedEndpoints;
     private BundleContext bundleContext;
     private ConfigSharingService configSharingService;
+    private ClusterService clusterService;
 
     public void setExecHistorySize(String execHistorySize) {
         this.execHistorySize = execHistorySize;
@@ -87,6 +90,10 @@ public class RouterCamelContext implements SynchronousBundleListener, IRouterCam
         this.configSharingService = configSharingService;
     }
 
+    public void setClusterService(ClusterService clusterService) {
+        this.clusterService = clusterService;
+    }
+
     public void initCamelContext() throws Exception {
         logger.info("Initialize Camel Context...");
 
@@ -160,10 +167,9 @@ public class RouterCamelContext implements SynchronousBundleListener, IRouterCam
         exportConfigurationService.setRouterCamelContext(this);
 
         logger.info("Camel Context {} initialized successfully.");
-
     }
 
-    public void killExistingRoute(String routeId) throws Exception {
+    public void killExistingRoute(String routeId, boolean fireEvent) throws Exception {
         //Active routes
         Route route = camelContext.getRoute(routeId);
         if (route != null) {
@@ -172,18 +178,24 @@ public class RouterCamelContext implements SynchronousBundleListener, IRouterCam
                 camelContext.removeRouteDefinition(routeDefinition);
             }
         }
+
+        if (fireEvent) {
+            UpdateCamelRouteEvent event = new UpdateCamelRouteEvent("org.apache.unomi.router.event.remove");
+            event.setRouteId(routeId);
+            clusterService.sendEvent(event);
+        }
     }
 
-    public void updateProfileReaderRoute(Object configuration) throws Exception {
+    public void updateProfileReaderRoute(Object configuration, boolean fireEvent) throws Exception {
         if (configuration instanceof ImportConfiguration) {
-            updateProfileImportReaderRoute((ImportConfiguration) configuration);
+            updateProfileImportReaderRoute((ImportConfiguration) configuration, fireEvent);
         } else {
-            updateProfileExportReaderRoute((ExportConfiguration) configuration);
+            updateProfileExportReaderRoute((ExportConfiguration) configuration, fireEvent);
         }
     }
 
-    private void updateProfileImportReaderRoute(ImportConfiguration importConfiguration) throws Exception {
-        killExistingRoute(importConfiguration.getItemId());
+    private void updateProfileImportReaderRoute(ImportConfiguration importConfiguration, boolean fireEvent) throws Exception {
+        killExistingRoute(importConfiguration.getItemId(), false);
         //Handle transforming an import config oneshot <--> recurrent
         if (RouterConstants.IMPORT_EXPORT_CONFIG_TYPE_RECURRENT.equals(importConfiguration.getConfigType())) {
             ProfileImportFromSourceRouteBuilder builder = new ProfileImportFromSourceRouteBuilder(kafkaProps, configType);
@@ -194,11 +206,17 @@ public class RouterCamelContext implements SynchronousBundleListener, IRouterCam
             builder.setJacksonDataFormat(jacksonDataFormat);
             builder.setContext(camelContext);
             camelContext.addRoutes(builder);
+
+            if (fireEvent) {
+                UpdateCamelRouteEvent event = new UpdateCamelRouteEvent("org.apache.unomi.router.event.import");
+                event.setConfiguration(importConfiguration);
+                clusterService.sendEvent(event);
+            }
         }
     }
 
-    private void updateProfileExportReaderRoute(ExportConfiguration exportConfiguration) throws Exception {
-        killExistingRoute(exportConfiguration.getItemId());
+    private void updateProfileExportReaderRoute(ExportConfiguration exportConfiguration, boolean fireEvent) throws Exception {
+        killExistingRoute(exportConfiguration.getItemId(), false);
         //Handle transforming an import config oneshot <--> recurrent
         if (RouterConstants.IMPORT_EXPORT_CONFIG_TYPE_RECURRENT.equals(exportConfiguration.getConfigType())) {
             ProfileExportCollectRouteBuilder profileExportCollectRouteBuilder = new ProfileExportCollectRouteBuilder(kafkaProps, configType);
@@ -209,6 +227,12 @@ public class RouterCamelContext implements SynchronousBundleListener, IRouterCam
             profileExportCollectRouteBuilder.setJacksonDataFormat(jacksonDataFormat);
             profileExportCollectRouteBuilder.setContext(camelContext);
             camelContext.addRoutes(profileExportCollectRouteBuilder);
+
+            if (fireEvent) {
+                UpdateCamelRouteEvent event = new UpdateCamelRouteEvent("org.apache.unomi.router.event.export");
+                event.setConfiguration(exportConfiguration);
+                clusterService.sendEvent(event);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/bd34ae9e/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/event/UpdateCamelRouteEvent.java
----------------------------------------------------------------------
diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/event/UpdateCamelRouteEvent.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/event/UpdateCamelRouteEvent.java
new file mode 100644
index 0000000..7e1dc81
--- /dev/null
+++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/event/UpdateCamelRouteEvent.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.router.core.event;
+
+import org.apache.karaf.cellar.core.event.Event;
+
+/**
+ * @author dgaillard
+ */
+public class UpdateCamelRouteEvent extends Event {
+    private String routeId;
+    private Object configuration;
+
+    public UpdateCamelRouteEvent(String id) {
+        super(id);
+    }
+
+    public String getRouteId() {
+        return routeId;
+    }
+
+    public void setRouteId(String routeId) {
+        this.routeId = routeId;
+    }
+
+    public Object getConfiguration() {
+        return configuration;
+    }
+
+    public void setConfiguration(Object configuration) {
+        this.configuration = configuration;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/bd34ae9e/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/event/UpdateCamelRouteEventHandler.java
----------------------------------------------------------------------
diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/event/UpdateCamelRouteEventHandler.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/event/UpdateCamelRouteEventHandler.java
new file mode 100644
index 0000000..6760f4c
--- /dev/null
+++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/event/UpdateCamelRouteEventHandler.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.router.core.event;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.karaf.cellar.config.Constants;
+import org.apache.karaf.cellar.core.CellarSupport;
+import org.apache.karaf.cellar.core.control.Switch;
+import org.apache.karaf.cellar.core.event.EventHandler;
+import org.apache.karaf.cellar.core.event.EventType;
+import org.apache.unomi.router.core.context.RouterCamelContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author dgaillard
+ */
+public class UpdateCamelRouteEventHandler extends CellarSupport implements EventHandler<UpdateCamelRouteEvent> {
+    private static final Logger logger = LoggerFactory.getLogger(UpdateCamelRouteEventHandler.class.getName());
+
+    private RouterCamelContext routerCamelContext;
+
+    @Override
+    public void handle(UpdateCamelRouteEvent event) {
+        logger.debug("Handle event");
+        if (isAllowed(event.getSourceGroup(), Constants.CATEGORY, event.getId(), EventType.INBOUND)) {
+            logger.debug("Event is allowed");
+            // check if it's not a "local" event
+            if (event.getSourceNode() != null && event.getSourceNode().getId().equalsIgnoreCase(clusterManager.getNode().getId())) {
+                logger.debug("Cluster event is local (coming from local synchronizer or listener)");
+                return;
+            }
+
+            try {
+                logger.debug("Event id is {}", event.getId());
+                if (event.getId().equals("org.apache.unomi.router.event.remove") && StringUtils.isNotBlank(event.getRouteId())) {
+                    routerCamelContext.killExistingRoute(event.getRouteId(), false);
+                } else if ((event.getId().equals("org.apache.unomi.router.event.import") || event.getId().equals("org.apache.unomi.router.event.export")) && event.getConfiguration() != null) {
+                    routerCamelContext.updateProfileReaderRoute(event.getConfiguration(), false);
+                }
+            } catch (Exception e) {
+                logger.error("Error when executing event", e);
+            }
+        }
+    }
+
+    @Override
+    public Class<UpdateCamelRouteEvent> getType() {
+        return UpdateCamelRouteEvent.class;
+    }
+
+    @Override
+    public Switch getSwitch() {
+        return null;
+    }
+
+    public void setRouterCamelContext(RouterCamelContext routerCamelContext) {
+        this.routerCamelContext = routerCamelContext;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/bd34ae9e/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
----------------------------------------------------------------------
diff --git a/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 3f81b5a..d61d64f 100644
--- a/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -112,12 +112,25 @@
         <property name="persistenceService" ref="persistenceService"/>
         <property name="profileExportService" ref="profileExportService"/>
         <property name="profileService" ref="profileService"/>
+        <property name="clusterService" ref="clusterService" />
     </bean>
 
     <bean id="collectProfileBean" class="org.apache.unomi.router.core.bean.CollectProfileBean">
         <property name="persistenceService" ref="persistenceService"/>
     </bean>
 
+    <bean id="updateCamelRouteEventHandler" class="org.apache.unomi.router.core.event.UpdateCamelRouteEventHandler">
+        <property name="configurationAdmin" ref="osgiConfigurationAdmin"/>
+        <property name="clusterManager" ref="karafCellarClusterManager"/>
+        <property name="groupManager" ref="karafCellarGroupManager"/>
+        <property name="routerCamelContext" ref="camelContext"/>
+    </bean>
+    <service ref="updateCamelRouteEventHandler" interface="org.apache.karaf.cellar.core.event.EventHandler">
+        <service-properties>
+            <entry key="managed" value="true"/>
+        </service-properties>
+    </service>
+
     <reference id="configSharingService" interface="org.apache.unomi.api.services.ConfigSharingService" />
     <reference id="profileImportService" interface="org.apache.unomi.router.api.services.ProfileImportService"/>
     <reference id="profileExportService" interface="org.apache.unomi.router.api.services.ProfileExportService"/>
@@ -126,5 +139,9 @@
     <reference id="segmentService" interface="org.apache.unomi.api.services.SegmentService"/>
     <reference id="importConfigurationService" interface="org.apache.unomi.router.api.services.ImportExportConfigurationService" filter="(configDiscriminator=IMPORT)"/>
     <reference id="exportConfigurationService" interface="org.apache.unomi.router.api.services.ImportExportConfigurationService" filter="(configDiscriminator=EXPORT)"/>
+    <reference id="clusterService" interface="org.apache.unomi.api.services.ClusterService" />
+    <reference id="karafCellarGroupManager" interface="org.apache.karaf.cellar.core.GroupManager" />
+    <reference id="osgiConfigurationAdmin" interface="org.osgi.service.cm.ConfigurationAdmin"/>
+    <reference id="karafCellarClusterManager" interface="org.apache.karaf.cellar.core.ClusterManager" />
 
 </blueprint>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/bd34ae9e/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ExportConfigurationServiceImpl.java
----------------------------------------------------------------------
diff --git a/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ExportConfigurationServiceImpl.java b/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ExportConfigurationServiceImpl.java
index 101c2f3..2378717 100644
--- a/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ExportConfigurationServiceImpl.java
+++ b/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ExportConfigurationServiceImpl.java
@@ -53,7 +53,7 @@ public class ExportConfigurationServiceImpl extends AbstractConfigurationService
         }
         if(updateRunningRoute) {
             try {
-                routerCamelContext.updateProfileReaderRoute(exportConfiguration);
+                routerCamelContext.updateProfileReaderRoute(exportConfiguration, true);
             } catch (Exception e) {
                 logger.error("Error when trying to save/update running Apache Camel Route: {}", exportConfiguration.getItemId());
             }
@@ -65,7 +65,7 @@ public class ExportConfigurationServiceImpl extends AbstractConfigurationService
     @Override
     public void delete(String configId) {
         try {
-            routerCamelContext.killExistingRoute(configId);
+            routerCamelContext.killExistingRoute(configId, true);
         } catch (Exception e) {
             logger.error("Error when trying to delete running Apache Camel Route: {}", configId);
         }

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/bd34ae9e/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ImportConfigurationServiceImpl.java
----------------------------------------------------------------------
diff --git a/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ImportConfigurationServiceImpl.java b/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ImportConfigurationServiceImpl.java
index 0813f05..364ea73 100644
--- a/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ImportConfigurationServiceImpl.java
+++ b/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ImportConfigurationServiceImpl.java
@@ -53,7 +53,7 @@ public class ImportConfigurationServiceImpl extends AbstractConfigurationService
         }
         if(updateRunningRoute) {
             try {
-                routerCamelContext.updateProfileReaderRoute(importConfiguration);
+                routerCamelContext.updateProfileReaderRoute(importConfiguration, true);
             } catch (Exception e) {
                 logger.error("Error when trying to save/update running Apache Camel Route: {}", importConfiguration.getItemId());
             }
@@ -65,7 +65,7 @@ public class ImportConfigurationServiceImpl extends AbstractConfigurationService
     @Override
     public void delete(String configId) {
         try {
-            routerCamelContext.killExistingRoute(configId);
+            routerCamelContext.killExistingRoute(configId, true);
         } catch (Exception e) {
             logger.error("Error when trying to delete running Apache Camel Route: {}", configId);
         }

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/bd34ae9e/services/src/main/java/org/apache/unomi/services/services/ClusterServiceImpl.java
----------------------------------------------------------------------
diff --git a/services/src/main/java/org/apache/unomi/services/services/ClusterServiceImpl.java b/services/src/main/java/org/apache/unomi/services/services/ClusterServiceImpl.java
index 6bc0cdc..163812d 100644
--- a/services/src/main/java/org/apache/unomi/services/services/ClusterServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/services/ClusterServiceImpl.java
@@ -22,6 +22,7 @@ import org.apache.karaf.cellar.config.ClusterConfigurationEvent;
 import org.apache.karaf.cellar.config.Constants;
 import org.apache.karaf.cellar.core.*;
 import org.apache.karaf.cellar.core.control.SwitchStatus;
+import org.apache.karaf.cellar.core.event.Event;
 import org.apache.karaf.cellar.core.event.EventProducer;
 import org.apache.karaf.cellar.core.event.EventType;
 import org.apache.unomi.api.ClusterNode;
@@ -224,6 +225,14 @@ public class ClusterServiceImpl implements ClusterService {
         persistenceService.purge(scope);
     }
 
+    @Override
+    public void sendEvent(Serializable eventObject) {
+        Event event = (Event) eventObject;
+        event.setSourceGroup(group);
+        event.setSourceNode(karafCellarClusterManager.getNode());
+        karafCellarEventProducer.produce(event);
+    }
+
     /**
      * Check if a configuration is allowed.
      *