You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by jk...@apache.org on 2023/03/31 08:44:24 UTC
[unomi] branch master updated: UNOMI-758 : async save of export config (#601)
This is an automated email from the ASF dual-hosted git repository.
jkevan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/unomi.git
The following commit(s) were added to refs/heads/master by this push:
new e6e01b3ad UNOMI-758 : async save of export config (#601)
e6e01b3ad is described below
commit e6e01b3ad9bb2642dea11b2d27fed024e7089114
Author: David Griffon <dg...@jahia.com>
AuthorDate: Fri Mar 31 10:44:18 2023 +0200
UNOMI-758 : async save of export config (#601)
* UNOMI-758 : async save of export config
* small cleanup
* small cleanup
* small cleanup
* Perf improvements for oneshot exports
---------
Co-authored-by: Kevan <ke...@jahia.com>
---
.../unomi/router/api/IRouterCamelContext.java | 4 +-
.../apache/unomi/router/api/RouterConstants.java | 4 +
.../org/apache/unomi/router/api/RouterUtils.java | 4 +
.../services/ImportExportConfigurationService.java | 16 +--
.../unomi/router/core/bean/CollectProfileBean.java | 1 +
.../router/core/context/RouterCamelContext.java | 147 +++++++++++++--------
.../router/core/event/UpdateCamelRouteEvent.java | 9 --
.../core/event/UpdateCamelRouteEventHandler.java | 6 +-
.../processor/ExportRouteCompletionProcessor.java | 9 +-
.../route/ProfileExportCollectRouteBuilder.java | 18 +--
.../route/ProfileExportProducerRouteBuilder.java | 6 +-
.../resources/OSGI-INF/blueprint/blueprint.xml | 3 +-
.../services/AbstractConfigurationServiceImpl.java | 83 ------------
.../router/services/AbstractCustomServiceImpl.java | 76 -----------
.../services/ExportConfigurationServiceImpl.java | 43 +++---
.../services/ImportConfigurationServiceImpl.java | 38 +++---
.../router/services/ProfileExportServiceImpl.java | 54 ++++++--
.../router/services/ProfileImportServiceImpl.java | 9 +-
.../resources/OSGI-INF/blueprint/blueprint.xml | 23 +---
.../test/java/org/apache/unomi/itests/BaseIT.java | 4 +-
.../unomi/itests/ProfileImportRankingIT.java | 2 +-
21 files changed, 235 insertions(+), 324 deletions(-)
diff --git a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/IRouterCamelContext.java b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/IRouterCamelContext.java
index d35c3d4a0..5ec1adb57 100644
--- a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/IRouterCamelContext.java
+++ b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/IRouterCamelContext.java
@@ -23,7 +23,9 @@ public interface IRouterCamelContext {
void killExistingRoute(String routeId, boolean fireEvent) throws Exception;
- void updateProfileReaderRoute(Object configuration, boolean fireEvent) throws Exception;
+ void updateProfileImportReaderRoute(String configId, boolean fireEvent) throws Exception;
+
+ void updateProfileExportReaderRoute(String configId, boolean fireEvent) throws Exception;
void setTracing(boolean tracing);
}
diff --git a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/RouterConstants.java b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/RouterConstants.java
index 3b04703ca..5ef19fe44 100644
--- a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/RouterConstants.java
+++ b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/RouterConstants.java
@@ -20,6 +20,10 @@ package org.apache.unomi.router.api;
* Created by amidani on 13/06/2017.
*/
public interface RouterConstants {
+ enum CONFIG_CAMEL_REFRESH {
+ UPDATED,
+ REMOVED
+ }
String CONFIG_TYPE_NOBROKER = "nobroker";
String CONFIG_TYPE_KAFKA = "kafka";
diff --git a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/RouterUtils.java b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/RouterUtils.java
index 464f90839..a535206c9 100644
--- a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/RouterUtils.java
+++ b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/RouterUtils.java
@@ -18,6 +18,7 @@ package org.apache.unomi.router.api;
import org.apache.unomi.api.PropertyType;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
@@ -27,6 +28,9 @@ import java.util.Map;
public class RouterUtils {
public static ImportExportConfiguration addExecutionEntry(ImportExportConfiguration configuration, Map execution, int executionsHistorySize) {
+ if (configuration.getExecutions() == null) {
+ configuration.setExecutions(new ArrayList<>());
+ }
if (configuration.getExecutions().size() >= executionsHistorySize) {
int oldestExecIndex = 0;
long oldestExecDate = (Long) configuration.getExecutions().get(0).get(RouterConstants.KEY_EXECS_DATE);
diff --git a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ImportExportConfigurationService.java b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ImportExportConfigurationService.java
index dd561e745..edb103cc5 100644
--- a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ImportExportConfigurationService.java
+++ b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ImportExportConfigurationService.java
@@ -17,10 +17,11 @@
package org.apache.unomi.router.api.services;
import org.apache.unomi.router.api.ExportConfiguration;
-import org.apache.unomi.router.api.IRouterCamelContext;
import org.apache.unomi.router.api.ImportConfiguration;
+import org.apache.unomi.router.api.RouterConstants;
import java.util.List;
+import java.util.Map;
/**
* A service to access and operate on {@link ImportConfiguration}s / {@link ExportConfiguration}s.
@@ -60,15 +61,8 @@ public interface ImportExportConfigurationService<T> {
void delete(String configId);
/**
- * Set the router camel context to share
- *
- * @param routerCamelContext the router Camel context to use for all operations
- */
- void setRouterCamelContext(IRouterCamelContext routerCamelContext);
-
- /**
- * Retrieve the configured router camel context
- * @return the configured instance, or null if not configured
+ * Used by camel route system to get the latest changes on configs and reflect changes on camel routes if necessary
+ * @return map of configId per operation to be done in camel
*/
- IRouterCamelContext getRouterCamelContext();
+ Map<String, RouterConstants.CONFIG_CAMEL_REFRESH> consumeConfigsToBeRefresh();
}
diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/bean/CollectProfileBean.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/bean/CollectProfileBean.java
index 452501974..1ea03eb3a 100644
--- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/bean/CollectProfileBean.java
+++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/bean/CollectProfileBean.java
@@ -29,6 +29,7 @@ public class CollectProfileBean {
private PersistenceService persistenceService;
public List<Profile> extractProfileBySegment(String segment) {
+ // TODO: UNOMI-759 avoid loading all profiles in RAM here
return persistenceService.query("segments", segment,null, Profile.class);
}
diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java
index fdcebfa24..4ad24e2d1 100644
--- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java
+++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java
@@ -24,6 +24,7 @@ import org.apache.camel.model.RouteDefinition;
import org.apache.unomi.api.services.ClusterService;
import org.apache.unomi.api.services.ConfigSharingService;
import org.apache.unomi.api.services.ProfileService;
+import org.apache.unomi.api.services.SchedulerService;
import org.apache.unomi.persistence.spi.PersistenceService;
import org.apache.unomi.router.api.ExportConfiguration;
import org.apache.unomi.router.api.IRouterCamelContext;
@@ -37,20 +38,23 @@ import org.apache.unomi.router.core.processor.ImportConfigByFileNameProcessor;
import org.apache.unomi.router.core.processor.ImportRouteCompletionProcessor;
import org.apache.unomi.router.core.processor.UnomiStorageProcessor;
import org.apache.unomi.router.core.route.*;
-import org.osgi.framework.Bundle;
import org.osgi.framework.BundleContext;
-import org.osgi.framework.BundleEvent;
-import org.osgi.framework.SynchronousBundleListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Map;
+import java.util.TimerTask;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
/**
* Created by amidani on 04/05/2017.
*/
-public class RouterCamelContext implements SynchronousBundleListener, IRouterCamelContext {
+public class RouterCamelContext implements IRouterCamelContext {
private Logger logger = LoggerFactory.getLogger(RouterCamelContext.class.getName());
private CamelContext camelContext;
@@ -74,6 +78,11 @@ public class RouterCamelContext implements SynchronousBundleListener, IRouterCam
private ConfigSharingService configSharingService;
private ClusterService clusterService;
+ // TODO UNOMI-572: when fixing UNOMI-572 please remove the usage of the custom ScheduledExecutorService and re-introduce the Unomi Scheduler Service
+ private ScheduledExecutorService scheduler;
+ private Integer configsRefreshInterval = 1000;
+ private ScheduledFuture<?> scheduledFuture;
+
public static String EVENT_ID_REMOVE = "org.apache.unomi.router.event.remove";
public static String EVENT_ID_IMPORT = "org.apache.unomi.router.event.import";
public static String EVENT_ID_EXPORT = "org.apache.unomi.router.event.export";
@@ -102,12 +111,71 @@ public class RouterCamelContext implements SynchronousBundleListener, IRouterCam
camelContext.setTracing(true);
}
- public void initCamelContext() throws Exception {
+ public void init() throws Exception {
logger.info("Initialize Camel Context...");
+ scheduler = Executors.newSingleThreadScheduledExecutor();
configSharingService.setProperty(RouterConstants.IMPORT_ONESHOT_UPLOAD_DIR, uploadDir);
configSharingService.setProperty(RouterConstants.KEY_HISTORY_SIZE, execHistorySize);
+ initCamel();
+
+ initTimers();
+ logger.info("Camel Context initialized successfully.");
+ }
+
+ public void destroy() throws Exception {
+ scheduledFuture.cancel(true);
+ if (scheduler != null) {
+ scheduler.shutdown();
+ }
+ //This is to shutdown Camel context
+ //(will stop all routes/components/endpoints etc and clear internal state/cache)
+ this.camelContext.stop();
+ logger.info("Camel context for profile import is shutdown.");
+ }
+
+ private void initTimers() {
+ TimerTask task = new TimerTask() {
+ @Override
+ public void run() {
+ try {
+ Map<String, RouterConstants.CONFIG_CAMEL_REFRESH> importConfigsToRefresh = importConfigurationService.consumeConfigsToBeRefresh();
+ Map<String, RouterConstants.CONFIG_CAMEL_REFRESH> exportConfigsToRefresh = exportConfigurationService.consumeConfigsToBeRefresh();
+
+ for (Map.Entry<String, RouterConstants.CONFIG_CAMEL_REFRESH> importConfigToRefresh : importConfigsToRefresh.entrySet()) {
+ try {
+ if (importConfigToRefresh.getValue().equals(RouterConstants.CONFIG_CAMEL_REFRESH.UPDATED)) {
+ updateProfileImportReaderRoute(importConfigToRefresh.getKey(), true);
+ } else if (importConfigToRefresh.getValue().equals(RouterConstants.CONFIG_CAMEL_REFRESH.REMOVED)) {
+ killExistingRoute(importConfigToRefresh.getKey(), true);
+ }
+ } catch (Exception e) {
+ logger.error("Unexpected error while refreshing(" + importConfigToRefresh.getValue() + ") camel route: " + importConfigToRefresh.getKey(), e);
+ }
+ }
+
+
+ for (Map.Entry<String, RouterConstants.CONFIG_CAMEL_REFRESH> exportConfigToRefresh : exportConfigsToRefresh.entrySet()) {
+ try {
+ if (exportConfigToRefresh.getValue().equals(RouterConstants.CONFIG_CAMEL_REFRESH.UPDATED)) {
+ updateProfileExportReaderRoute(exportConfigToRefresh.getKey(), true);
+ } else if (exportConfigToRefresh.getValue().equals(RouterConstants.CONFIG_CAMEL_REFRESH.REMOVED)) {
+ killExistingRoute(exportConfigToRefresh.getKey(), true);
+ }
+ } catch (Exception e) {
+ logger.error("Unexpected error while refreshing(" + exportConfigToRefresh.getValue() + ") camel route: " + exportConfigToRefresh.getKey(), e);
+ }
+ }
+ } catch (Exception e) {
+ logger.error("Unexpected error while refreshing import/export camel routes", e);
+ }
+ }
+ };
+ scheduledFuture = scheduler.scheduleWithFixedDelay(task, 0, configsRefreshInterval, TimeUnit.MILLISECONDS);
+ }
+
+ private void initCamel() throws Exception {
camelContext = new OsgiDefaultCamelContext(bundleContext);
//--IMPORT ROUTES
@@ -142,7 +210,7 @@ public class RouterCamelContext implements SynchronousBundleListener, IRouterCam
//Profiles collect
ProfileExportCollectRouteBuilder profileExportCollectRouteBuilder = new ProfileExportCollectRouteBuilder(kafkaProps, configType);
- profileExportCollectRouteBuilder.setExportConfigurationService(exportConfigurationService);
+ profileExportCollectRouteBuilder.setExportConfigurationList(exportConfigurationService.getAll());
profileExportCollectRouteBuilder.setPersistenceService(persistenceService);
profileExportCollectRouteBuilder.setAllowedEndpoints(allowedEndpoints);
profileExportCollectRouteBuilder.setJacksonDataFormat(jacksonDataFormat);
@@ -160,21 +228,6 @@ public class RouterCamelContext implements SynchronousBundleListener, IRouterCam
camelContext.addRoutes(profileExportProducerRouteBuilder);
camelContext.start();
-
- logger.debug("postConstruct {" + bundleContext.getBundle() + "}");
-
- processBundleStartup(bundleContext);
- for (Bundle bundle : bundleContext.getBundles()) {
- if (bundle.getBundleContext() != null) {
- processBundleStartup(bundle.getBundleContext());
- }
- }
- bundleContext.addBundleListener(this);
-
- importConfigurationService.setRouterCamelContext(this);
- exportConfigurationService.setRouterCamelContext(this);
-
- logger.info("Camel Context {} initialized successfully.");
}
public void killExistingRoute(String routeId, boolean fireEvent) throws Exception {
@@ -194,17 +247,15 @@ public class RouterCamelContext implements SynchronousBundleListener, IRouterCam
}
}
- public void updateProfileReaderRoute(Object configuration, boolean fireEvent) throws Exception {
- if (configuration instanceof ImportConfiguration) {
- updateProfileImportReaderRoute((ImportConfiguration) configuration, fireEvent);
- } else {
- updateProfileExportReaderRoute((ExportConfiguration) configuration, fireEvent);
+ public void updateProfileImportReaderRoute(String configId, boolean fireEvent) throws Exception {
+ killExistingRoute(configId, false);
+
+ ImportConfiguration importConfiguration = importConfigurationService.load(configId);
+ if (importConfiguration == null) {
+ logger.warn("Cannot update profile import reader route, config: {} not found", configId);
+ return;
}
- }
- private void updateProfileImportReaderRoute(ImportConfiguration importConfiguration, boolean fireEvent) throws Exception {
- killExistingRoute(importConfiguration.getItemId(), false);
- //Handle transforming an import config oneshot <--> recurrent
if (RouterConstants.IMPORT_EXPORT_CONFIG_TYPE_RECURRENT.equals(importConfiguration.getConfigType())) {
ProfileImportFromSourceRouteBuilder builder = new ProfileImportFromSourceRouteBuilder(kafkaProps, configType);
builder.setImportConfigurationList(Arrays.asList(importConfiguration));
@@ -217,19 +268,23 @@ public class RouterCamelContext implements SynchronousBundleListener, IRouterCam
if (fireEvent) {
UpdateCamelRouteEvent event = new UpdateCamelRouteEvent(EVENT_ID_IMPORT);
- event.setConfiguration(importConfiguration);
clusterService.sendEvent(event);
}
}
}
- private void updateProfileExportReaderRoute(ExportConfiguration exportConfiguration, boolean fireEvent) throws Exception {
- killExistingRoute(exportConfiguration.getItemId(), false);
- //Handle transforming an import config oneshot <--> recurrent
+ public void updateProfileExportReaderRoute(String configId, boolean fireEvent) throws Exception {
+ killExistingRoute(configId, false);
+
+ ExportConfiguration exportConfiguration = exportConfigurationService.load(configId);
+ if (exportConfiguration == null) {
+ logger.warn("Cannot update profile export reader route, config: {} not found", configId);
+ return;
+ }
+
if (RouterConstants.IMPORT_EXPORT_CONFIG_TYPE_RECURRENT.equals(exportConfiguration.getConfigType())) {
ProfileExportCollectRouteBuilder profileExportCollectRouteBuilder = new ProfileExportCollectRouteBuilder(kafkaProps, configType);
- profileExportCollectRouteBuilder.setExportConfigurationList(Arrays.asList(exportConfiguration));
- profileExportCollectRouteBuilder.setExportConfigurationService(exportConfigurationService);
+ profileExportCollectRouteBuilder.setExportConfigurationList(Collections.singletonList(exportConfiguration));
profileExportCollectRouteBuilder.setPersistenceService(persistenceService);
profileExportCollectRouteBuilder.setAllowedEndpoints(allowedEndpoints);
profileExportCollectRouteBuilder.setJacksonDataFormat(jacksonDataFormat);
@@ -238,7 +293,6 @@ public class RouterCamelContext implements SynchronousBundleListener, IRouterCam
if (fireEvent) {
UpdateCamelRouteEvent event = new UpdateCamelRouteEvent(EVENT_ID_EXPORT);
- event.setConfiguration(exportConfiguration);
clusterService.sendEvent(event);
}
}
@@ -303,23 +357,4 @@ public class RouterCamelContext implements SynchronousBundleListener, IRouterCam
public void setAllowedEndpoints(String allowedEndpoints) {
this.allowedEndpoints = allowedEndpoints;
}
-
- public void preDestroy() throws Exception {
- bundleContext.removeBundleListener(this);
- //This is to shutdown Camel context
- //(will stop all routes/components/endpoints etc and clear internal state/cache)
- this.camelContext.stop();
- logger.info("Camel context for profile import is shutdown.");
- }
-
- private void processBundleStartup(BundleContext bundleContext) {
- if (bundleContext == null) {
- return;
- }
- }
-
- @Override
- public void bundleChanged(BundleEvent bundleEvent) {
-
- }
}
diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/event/UpdateCamelRouteEvent.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/event/UpdateCamelRouteEvent.java
index 7e1dc81d2..2f3d2cb3f 100644
--- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/event/UpdateCamelRouteEvent.java
+++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/event/UpdateCamelRouteEvent.java
@@ -23,7 +23,6 @@ import org.apache.karaf.cellar.core.event.Event;
*/
public class UpdateCamelRouteEvent extends Event {
private String routeId;
- private Object configuration;
public UpdateCamelRouteEvent(String id) {
super(id);
@@ -36,12 +35,4 @@ public class UpdateCamelRouteEvent extends Event {
public void setRouteId(String routeId) {
this.routeId = routeId;
}
-
- public Object getConfiguration() {
- return configuration;
- }
-
- public void setConfiguration(Object configuration) {
- this.configuration = configuration;
- }
}
diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/event/UpdateCamelRouteEventHandler.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/event/UpdateCamelRouteEventHandler.java
index c75207273..91cd09df1 100644
--- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/event/UpdateCamelRouteEventHandler.java
+++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/event/UpdateCamelRouteEventHandler.java
@@ -49,8 +49,10 @@ public class UpdateCamelRouteEventHandler extends CellarSupport implements Event
logger.debug("Event id is {}", event.getId());
if (event.getId().equals(RouterCamelContext.EVENT_ID_REMOVE) && StringUtils.isNotBlank(event.getRouteId())) {
routerCamelContext.killExistingRoute(event.getRouteId(), false);
- } else if ((event.getId().equals(RouterCamelContext.EVENT_ID_IMPORT) || event.getId().equals(RouterCamelContext.EVENT_ID_EXPORT)) && event.getConfiguration() != null) {
- routerCamelContext.updateProfileReaderRoute(event.getConfiguration(), false);
+ } else if ((event.getId().equals(RouterCamelContext.EVENT_ID_IMPORT))) {
+ routerCamelContext.updateProfileImportReaderRoute(event.getRouteId(), false);
+ } else if (event.getId().equals(RouterCamelContext.EVENT_ID_EXPORT)) {
+ routerCamelContext.updateProfileExportReaderRoute(event.getRouteId(), false);
}
} catch (Exception e) {
logger.error("Error when executing event", e);
diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ExportRouteCompletionProcessor.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ExportRouteCompletionProcessor.java
index 309c7c2a2..414e1c00f 100644
--- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ExportRouteCompletionProcessor.java
+++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ExportRouteCompletionProcessor.java
@@ -40,14 +40,17 @@ public class ExportRouteCompletionProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
- ExportConfiguration exportConfig = (ExportConfiguration) exchange.getIn().getHeader(RouterConstants.HEADER_EXPORT_CONFIG);
+ // We load the conf from ES because we are going to increment the execution number
+ ExportConfiguration exportConfiguration = exportConfigurationService.load(exchange.getFromRouteId());
+ if (exportConfiguration == null) {
+ logger.warn("Unable to complete export, config cannot not found: {}", exchange.getFromRouteId());
+ return;
+ }
Map execution = new HashMap();
execution.put(RouterConstants.KEY_EXECS_DATE, ((Date) exchange.getProperty("CamelCreatedTimestamp")).getTime());
execution.put(RouterConstants.KEY_EXECS_EXTRACTED, exchange.getProperty("CamelSplitSize"));
- ExportConfiguration exportConfiguration = exportConfigurationService.load(exportConfig.getItemId());
-
exportConfiguration = (ExportConfiguration) RouterUtils.addExecutionEntry(exportConfiguration, execution, executionsHistorySize);
exportConfiguration.setStatus(RouterConstants.CONFIG_STATUS_COMPLETE_SUCCESS);
diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportCollectRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportCollectRouteBuilder.java
index bff0d6d46..e87ec50be 100644
--- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportCollectRouteBuilder.java
+++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportCollectRouteBuilder.java
@@ -39,7 +39,6 @@ public class ProfileExportCollectRouteBuilder extends RouterAbstractRouteBuilder
private static final Logger logger = LoggerFactory.getLogger(ProfileExportCollectRouteBuilder.class);
private List<ExportConfiguration> exportConfigurationList;
- private ImportExportConfigurationService<ExportConfiguration> exportConfigurationService;
private PersistenceService persistenceService;
public ProfileExportCollectRouteBuilder(Map<String, String> kafkaProps, String configType) {
@@ -48,16 +47,16 @@ public class ProfileExportCollectRouteBuilder extends RouterAbstractRouteBuilder
@Override
public void configure() throws Exception {
- logger.info("Configure Recurrent Route 'Export :: Collect Data'");
-
- if (exportConfigurationList == null) {
- exportConfigurationList = exportConfigurationService.getAll();
+ if (exportConfigurationList == null || exportConfigurationList.isEmpty()) {
+ // Nothing to configure
+ return;
}
+ logger.info("Configure Recurrent Route 'Export :: Collect Data'");
+
CollectProfileBean collectProfileBean = new CollectProfileBean();
collectProfileBean.setPersistenceService(persistenceService);
-
//Loop on multiple export configuration
for (final ExportConfiguration exportConfiguration : exportConfigurationList) {
if (RouterConstants.IMPORT_EXPORT_CONFIG_TYPE_RECURRENT.equals(exportConfiguration.getConfigType()) &&
@@ -74,7 +73,7 @@ public class ProfileExportCollectRouteBuilder extends RouterAbstractRouteBuilder
.autoStartup(exportConfiguration.isActive())
.bean(collectProfileBean, "extractProfileBySegment(" + exportConfiguration.getProperties().get("segment") + ")")
.split(body())
- .marshal(jacksonDataFormat)
+ .marshal(jacksonDataFormat) // TODO: UNOMI-759 avoid unnecessary marshalling
.convertBodyTo(String.class)
.setHeader(RouterConstants.HEADER_EXPORT_CONFIG, constant(exportConfiguration))
.log(LoggingLevel.DEBUG, "BODY : ${body}");
@@ -99,12 +98,7 @@ public class ProfileExportCollectRouteBuilder extends RouterAbstractRouteBuilder
this.exportConfigurationList = exportConfigurationList;
}
- public void setExportConfigurationService(ImportExportConfigurationService<ExportConfiguration> exportConfigurationService) {
- this.exportConfigurationService = exportConfigurationService;
- }
-
public void setPersistenceService(PersistenceService persistenceService) {
this.persistenceService = persistenceService;
}
-
}
diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportProducerRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportProducerRouteBuilder.java
index 9f25c3bb9..86288019f 100644
--- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportProducerRouteBuilder.java
+++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportProducerRouteBuilder.java
@@ -59,10 +59,8 @@ public class ProfileExportProducerRouteBuilder extends RouterAbstractRouteBuilde
rtDef = from((String) getEndpointURI(RouterConstants.DIRECTION_TO, RouterConstants.DIRECT_EXPORT_DEPOSIT_BUFFER));
}
- LineBuildProcessor processor = new LineBuildProcessor(profileExportService);
-
- rtDef.unmarshal(jacksonDataFormat)
- .process(processor)
+ rtDef.unmarshal(jacksonDataFormat) // TODO: UNOMI-759 avoid unnecessary marshalling
+ .process(new LineBuildProcessor(profileExportService))
.aggregate(constant(true), new StringLinesAggregationStrategy())
.completionPredicate(exchangeProperty("CamelSplitSize").isEqualTo(exchangeProperty("CamelAggregatedSize")))
.eagerCheckCompletion()
diff --git a/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index d61d64f2d..d7b7a36c0 100644
--- a/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -82,7 +82,7 @@
<bean id="camelContext" class="org.apache.unomi.router.core.context.RouterCamelContext"
- init-method="initCamelContext" destroy-method="preDestroy">
+ init-method="init" destroy-method="destroy">
<property name="configType" value="${router.config.type}"/>
<property name="allowedEndpoints" value="${config.allowedEndpoints}"/>
<property name="uploadDir" value="${import.oneshot.uploadDir}"/>
@@ -114,6 +114,7 @@
<property name="profileService" ref="profileService"/>
<property name="clusterService" ref="clusterService" />
</bean>
+ <service id="camelContextOSGI" ref="camelContext" interface="org.apache.unomi.router.api.IRouterCamelContext"/>
<bean id="collectProfileBean" class="org.apache.unomi.router.core.bean.CollectProfileBean">
<property name="persistenceService" ref="persistenceService"/>
diff --git a/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/AbstractConfigurationServiceImpl.java b/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/AbstractConfigurationServiceImpl.java
deleted file mode 100644
index e7c5d67af..000000000
--- a/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/AbstractConfigurationServiceImpl.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.unomi.router.services;
-
-import org.apache.unomi.persistence.spi.PersistenceService;
-import org.apache.unomi.router.api.IRouterCamelContext;
-import org.osgi.framework.Bundle;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.BundleEvent;
-import org.osgi.framework.SynchronousBundleListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Created by amidani on 26/06/2017.
- */
-public abstract class AbstractConfigurationServiceImpl implements SynchronousBundleListener {
-
- private static final Logger logger = LoggerFactory.getLogger(AbstractConfigurationServiceImpl.class.getName());
-
- protected BundleContext bundleContext;
- protected PersistenceService persistenceService;
- protected IRouterCamelContext routerCamelContext;
-
- public void setBundleContext(BundleContext bundleContext) {
- this.bundleContext = bundleContext;
- }
-
- public void setPersistenceService(PersistenceService persistenceService) {
- this.persistenceService = persistenceService;
- }
-
- public void setRouterCamelContext(IRouterCamelContext routerCamelContext) {
- this.routerCamelContext = routerCamelContext;
- }
-
- public IRouterCamelContext getRouterCamelContext() {
- return routerCamelContext;
- }
-
- public void postConstruct() {
- logger.debug("postConstruct {" + bundleContext.getBundle() + "}");
-
- processBundleStartup(bundleContext);
- for (Bundle bundle : bundleContext.getBundles()) {
- if (bundle.getBundleContext() != null && bundle.getBundleId() != bundleContext.getBundle().getBundleId()) {
- processBundleStartup(bundle.getBundleContext());
- }
- }
- bundleContext.addBundleListener(this);
- logger.info("Configuration service initialized.");
- }
-
- public void preDestroy() {
- bundleContext.removeBundleListener(this);
- logger.info("Configuration service shutdown.");
- }
-
- @Override
- public void bundleChanged(BundleEvent bundleEvent) {
-
- }
-
- private void processBundleStartup(BundleContext bundleContext) {
- if (bundleContext == null) {
- return;
- }
- }
-}
diff --git a/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/AbstractCustomServiceImpl.java b/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/AbstractCustomServiceImpl.java
deleted file mode 100644
index dc06fff61..000000000
--- a/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/AbstractCustomServiceImpl.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.unomi.router.services;
-
-import org.apache.unomi.persistence.spi.PersistenceService;
-import org.osgi.framework.Bundle;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.BundleEvent;
-import org.osgi.framework.SynchronousBundleListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Created by amidani on 30/06/2017.
- */
-public class AbstractCustomServiceImpl implements SynchronousBundleListener {
-
- private static final Logger logger = LoggerFactory.getLogger(AbstractCustomServiceImpl.class);
-
- protected PersistenceService persistenceService;
- protected BundleContext bundleContext;
-
- public void setPersistenceService(PersistenceService persistenceService) {
- this.persistenceService = persistenceService;
- }
-
- public void setBundleContext(BundleContext bundleContext) {
- this.bundleContext = bundleContext;
- }
-
- public void postConstruct() {
- logger.debug("postConstruct {" + bundleContext.getBundle() + "}");
-
- processBundleStartup(bundleContext);
- for (Bundle bundle : bundleContext.getBundles()) {
- if (bundle.getBundleContext() != null && bundle.getBundleId() != bundleContext.getBundle().getBundleId()) {
- processBundleStartup(bundle.getBundleContext());
- }
- }
- bundleContext.addBundleListener(this);
- logger.info("Import configuration service initialized.");
- }
-
- public void preDestroy() {
- bundleContext.removeBundleListener(this);
- logger.info("Import configuration service shutdown.");
- }
-
- private void processBundleStartup(BundleContext bundleContext) {
- if (bundleContext == null) {
- return;
- }
- }
-
- private void processBundleStop(BundleContext bundleContext) {
- }
-
- @Override
- public void bundleChanged(BundleEvent bundleEvent) {
-
- }
-}
diff --git a/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ExportConfigurationServiceImpl.java b/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ExportConfigurationServiceImpl.java
index 23787170e..d1b8d8f75 100644
--- a/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ExportConfigurationServiceImpl.java
+++ b/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ExportConfigurationServiceImpl.java
@@ -16,22 +16,33 @@
*/
package org.apache.unomi.router.services;
+import org.apache.unomi.persistence.spi.PersistenceService;
import org.apache.unomi.router.api.ExportConfiguration;
-import org.apache.unomi.router.api.IRouterCamelContext;
+import org.apache.unomi.router.api.RouterConstants;
import org.apache.unomi.router.api.services.ImportExportConfigurationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.List;
-import java.util.UUID;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
/**
+ * Service to manage Configuration of Item to export
* Created by amidani on 28/04/2017.
*/
-public class ExportConfigurationServiceImpl extends AbstractConfigurationServiceImpl implements ImportExportConfigurationService<ExportConfiguration> {
+public class ExportConfigurationServiceImpl implements ImportExportConfigurationService<ExportConfiguration> {
private static final Logger logger = LoggerFactory.getLogger(ExportConfigurationServiceImpl.class.getName());
+
+ private PersistenceService persistenceService;
+
+ public void setPersistenceService(PersistenceService persistenceService) {
+ this.persistenceService = persistenceService;
+ }
+
+ private final Map<String, RouterConstants.CONFIG_CAMEL_REFRESH> camelConfigsToRefresh = new ConcurrentHashMap<>();
+
public ExportConfigurationServiceImpl() {
logger.info("Initializing export configuration service...");
}
@@ -51,29 +62,25 @@ public class ExportConfigurationServiceImpl extends AbstractConfigurationService
if (exportConfiguration.getItemId() == null) {
exportConfiguration.setItemId(UUID.randomUUID().toString());
}
- if(updateRunningRoute) {
- try {
- routerCamelContext.updateProfileReaderRoute(exportConfiguration, true);
- } catch (Exception e) {
- logger.error("Error when trying to save/update running Apache Camel Route: {}", exportConfiguration.getItemId());
- }
- }
persistenceService.save(exportConfiguration);
+
+ if (updateRunningRoute) {
+ camelConfigsToRefresh.put(exportConfiguration.getItemId(), RouterConstants.CONFIG_CAMEL_REFRESH.UPDATED);
+ }
+
return persistenceService.load(exportConfiguration.getItemId(), ExportConfiguration.class);
}
@Override
public void delete(String configId) {
- try {
- routerCamelContext.killExistingRoute(configId, true);
- } catch (Exception e) {
- logger.error("Error when trying to delete running Apache Camel Route: {}", configId);
- }
persistenceService.remove(configId, ExportConfiguration.class);
+ camelConfigsToRefresh.put(configId, RouterConstants.CONFIG_CAMEL_REFRESH.REMOVED);
}
@Override
- public void setRouterCamelContext(IRouterCamelContext routerCamelContext) {
- super.setRouterCamelContext(routerCamelContext);
+ public Map<String, RouterConstants.CONFIG_CAMEL_REFRESH> consumeConfigsToBeRefresh() {
+ Map<String, RouterConstants.CONFIG_CAMEL_REFRESH> result = new HashMap<>(camelConfigsToRefresh);
+ camelConfigsToRefresh.clear();
+ return result;
}
}
diff --git a/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ImportConfigurationServiceImpl.java b/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ImportConfigurationServiceImpl.java
index 364ea73fc..a12d2991f 100644
--- a/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ImportConfigurationServiceImpl.java
+++ b/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ImportConfigurationServiceImpl.java
@@ -16,22 +16,32 @@
*/
package org.apache.unomi.router.services;
-import org.apache.unomi.router.api.IRouterCamelContext;
+import org.apache.unomi.persistence.spi.PersistenceService;
import org.apache.unomi.router.api.ImportConfiguration;
+import org.apache.unomi.router.api.RouterConstants;
import org.apache.unomi.router.api.services.ImportExportConfigurationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.List;
-import java.util.UUID;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
/**
+ * Service to manage Configuration of object to import
* Created by amidani on 28/04/2017.
*/
-public class ImportConfigurationServiceImpl extends AbstractConfigurationServiceImpl implements ImportExportConfigurationService<ImportConfiguration> {
+public class ImportConfigurationServiceImpl implements ImportExportConfigurationService<ImportConfiguration> {
private static final Logger logger = LoggerFactory.getLogger(ImportConfigurationServiceImpl.class.getName());
+ private PersistenceService persistenceService;
+
+ public void setPersistenceService(PersistenceService persistenceService) {
+ this.persistenceService = persistenceService;
+ }
+
+ private final Map<String, RouterConstants.CONFIG_CAMEL_REFRESH> camelConfigsToRefresh = new ConcurrentHashMap<>();
+
public ImportConfigurationServiceImpl() {
logger.info("Initializing import configuration service...");
}
@@ -51,12 +61,8 @@ public class ImportConfigurationServiceImpl extends AbstractConfigurationService
if (importConfiguration.getItemId() == null) {
importConfiguration.setItemId(UUID.randomUUID().toString());
}
- if(updateRunningRoute) {
- try {
- routerCamelContext.updateProfileReaderRoute(importConfiguration, true);
- } catch (Exception e) {
- logger.error("Error when trying to save/update running Apache Camel Route: {}", importConfiguration.getItemId());
- }
+ if (updateRunningRoute) {
+ camelConfigsToRefresh.put(importConfiguration.getItemId(), RouterConstants.CONFIG_CAMEL_REFRESH.UPDATED);
}
persistenceService.save(importConfiguration);
return persistenceService.load(importConfiguration.getItemId(), ImportConfiguration.class);
@@ -64,16 +70,14 @@ public class ImportConfigurationServiceImpl extends AbstractConfigurationService
@Override
public void delete(String configId) {
- try {
- routerCamelContext.killExistingRoute(configId, true);
- } catch (Exception e) {
- logger.error("Error when trying to delete running Apache Camel Route: {}", configId);
- }
persistenceService.remove(configId, ImportConfiguration.class);
+ camelConfigsToRefresh.put(configId, RouterConstants.CONFIG_CAMEL_REFRESH.REMOVED);
}
@Override
- public void setRouterCamelContext(IRouterCamelContext routerCamelContext) {
- super.setRouterCamelContext(routerCamelContext);
+ public Map<String, RouterConstants.CONFIG_CAMEL_REFRESH> consumeConfigsToBeRefresh() {
+ Map<String, RouterConstants.CONFIG_CAMEL_REFRESH> result = new HashMap<>(camelConfigsToRefresh);
+ camelConfigsToRefresh.clear();
+ return result;
}
}
diff --git a/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ProfileExportServiceImpl.java b/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ProfileExportServiceImpl.java
index 1b35e963d..c3197b61e 100644
--- a/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ProfileExportServiceImpl.java
+++ b/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ProfileExportServiceImpl.java
@@ -18,9 +18,13 @@ package org.apache.unomi.router.services;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.unomi.api.PartialList;
import org.apache.unomi.api.Profile;
import org.apache.unomi.api.PropertyType;
+import org.apache.unomi.api.conditions.Condition;
import org.apache.unomi.api.services.ConfigSharingService;
+import org.apache.unomi.api.services.DefinitionsService;
+import org.apache.unomi.persistence.spi.PersistenceService;
import org.apache.unomi.router.api.ExportConfiguration;
import org.apache.unomi.router.api.RouterConstants;
import org.apache.unomi.router.api.RouterUtils;
@@ -33,24 +37,51 @@ import java.util.*;
/**
* Created by amidani on 30/06/2017.
*/
-public class ProfileExportServiceImpl extends AbstractCustomServiceImpl implements ProfileExportService {
+public class ProfileExportServiceImpl implements ProfileExportService {
private static final Logger logger = LoggerFactory.getLogger(ProfileExportServiceImpl.class.getName());
+
+ private PersistenceService persistenceService;
+ private DefinitionsService definitionsService;
private ConfigSharingService configSharingService;
+ public void setPersistenceService(PersistenceService persistenceService) {
+ this.persistenceService = persistenceService;
+ }
+
+ public void setDefinitionsService(DefinitionsService definitionsService) {
+ this.definitionsService = definitionsService;
+ }
+
+ public void setConfigSharingService(ConfigSharingService configSharingService) {
+ this.configSharingService = configSharingService;
+ }
+
public String extractProfilesBySegment(ExportConfiguration exportConfiguration) {
- List<Profile> profileList = persistenceService.query("segments", (String) exportConfiguration.getProperty("segment"), null, Profile.class);
+ Collection<PropertyType> propertiesDef = persistenceService.query("target", "profiles", null, PropertyType.class);
+
+ Condition segmentCondition = new Condition();
+ segmentCondition.setConditionType(definitionsService.getConditionType("profileSegmentCondition"));
+ segmentCondition.setParameter("segments", Collections.singletonList((String) exportConfiguration.getProperty("segment")));
+ segmentCondition.setParameter("matchType", "in");
+
StringBuilder csvContent = new StringBuilder();
- for (Profile profile : profileList) {
- csvContent.append(convertProfileToCSVLine(profile, exportConfiguration));
- csvContent.append(RouterUtils.getCharFromLineSeparator(exportConfiguration.getLineSeparator()));
+ PartialList<Profile> profiles = persistenceService.query(segmentCondition, null, Profile.class, 0, 1000, "10m");
+ int counter = 0;
+ while (profiles != null && profiles.getList().size() > 0) {
+ List<Profile> scrolledProfiles = profiles.getList();
+ for (Profile profile : scrolledProfiles) {
+ csvContent.append(convertProfileToCSVLine(profile, exportConfiguration, propertiesDef));
+ csvContent.append(RouterUtils.getCharFromLineSeparator(exportConfiguration.getLineSeparator()));
+ }
+ counter += scrolledProfiles.size();
+ profiles = persistenceService.continueScrollQuery(Profile.class, profiles.getScrollIdentifier(), profiles.getScrollTimeValidity());
}
- logger.debug("Exporting {} extracted profiles.", profileList.size());
Map execution = new HashMap();
execution.put(RouterConstants.KEY_EXECS_DATE, new Date().getTime());
- execution.put(RouterConstants.KEY_EXECS_EXTRACTED, profileList.size());
+ execution.put(RouterConstants.KEY_EXECS_EXTRACTED, counter);
exportConfiguration = (ExportConfiguration) RouterUtils.addExecutionEntry(exportConfiguration, execution, Integer.parseInt((String) configSharingService.getProperty(RouterConstants.KEY_HISTORY_SIZE)));
persistenceService.save(exportConfiguration);
@@ -59,7 +90,12 @@ public class ProfileExportServiceImpl extends AbstractCustomServiceImpl implemen
}
public String convertProfileToCSVLine(Profile profile, ExportConfiguration exportConfiguration) {
+ // TODO: UNOMI-759 querying this everytimes
Collection<PropertyType> propertiesDef = persistenceService.query("target", "profiles", null, PropertyType.class);
+ return convertProfileToCSVLine(profile, exportConfiguration, propertiesDef);
+ }
+
+ public String convertProfileToCSVLine(Profile profile, ExportConfiguration exportConfiguration, Collection<PropertyType> propertiesDef) {
Map<String, String> mapping = (Map<String, String>) exportConfiguration.getProperty("mapping");
String lineToWrite = "";
for (int i = 0; i < mapping.size(); i++) {
@@ -104,8 +140,4 @@ public class ProfileExportServiceImpl extends AbstractCustomServiceImpl implemen
return lineToWrite;
}
- public void setConfigSharingService(ConfigSharingService configSharingService) {
- this.configSharingService = configSharingService;
- }
-
}
diff --git a/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ProfileImportServiceImpl.java b/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ProfileImportServiceImpl.java
index 86431034f..578aee22c 100644
--- a/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ProfileImportServiceImpl.java
+++ b/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ProfileImportServiceImpl.java
@@ -18,6 +18,7 @@ package org.apache.unomi.router.services;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.unomi.api.Profile;
+import org.apache.unomi.persistence.spi.PersistenceService;
import org.apache.unomi.router.api.ProfileToImport;
import org.apache.unomi.router.api.services.ProfileImportService;
import org.slf4j.Logger;
@@ -29,10 +30,16 @@ import java.util.List;
/**
* Created by amidani on 18/05/2017.
*/
-public class ProfileImportServiceImpl extends AbstractCustomServiceImpl implements ProfileImportService {
+public class ProfileImportServiceImpl implements ProfileImportService {
private static final Logger logger = LoggerFactory.getLogger(ProfileImportServiceImpl.class.getName());
+ private PersistenceService persistenceService;
+
+ public void setPersistenceService(PersistenceService persistenceService) {
+ this.persistenceService = persistenceService;
+ }
+
public boolean saveMergeDeleteImportedProfile(ProfileToImport profileToImport) throws InvocationTargetException, IllegalAccessException {
logger.debug("Importing profile with ID : {}", profileToImport.getItemId());
Profile existingProfile = new Profile();
diff --git a/extensions/router/router-service/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/extensions/router/router-service/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index c00554cfa..8b187cad6 100644
--- a/extensions/router/router-service/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/extensions/router/router-service/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -22,30 +22,25 @@
<reference id="persistenceService" interface="org.apache.unomi.persistence.spi.PersistenceService"/>
<reference id="configSharingService" interface="org.apache.unomi.api.services.ConfigSharingService"/>
+ <reference id="definitionsService" interface="org.apache.unomi.api.services.DefinitionsService"/>
- <bean id="importConfigurationServiceImpl" class="org.apache.unomi.router.services.ImportConfigurationServiceImpl"
- init-method="postConstruct" destroy-method="preDestroy">
+ <bean id="importConfigurationServiceImpl" class="org.apache.unomi.router.services.ImportConfigurationServiceImpl">
<property name="persistenceService" ref="persistenceService"/>
- <property name="bundleContext" ref="blueprintBundleContext"/>
</bean>
<service id="importConfigurationService" ref="importConfigurationServiceImpl">
<interfaces>
<value>org.apache.unomi.router.api.services.ImportExportConfigurationService</value>
- <value>org.osgi.framework.SynchronousBundleListener</value>
</interfaces>
<service-properties>
<entry key="configDiscriminator" value="IMPORT"/>
</service-properties>
</service>
- <bean id="exportConfigurationServiceImpl" class="org.apache.unomi.router.services.ExportConfigurationServiceImpl"
- init-method="postConstruct" destroy-method="preDestroy">
+ <bean id="exportConfigurationServiceImpl" class="org.apache.unomi.router.services.ExportConfigurationServiceImpl">
<property name="persistenceService" ref="persistenceService"/>
- <property name="bundleContext" ref="blueprintBundleContext"/>
</bean>
<service id="exportConfigurationService" ref="exportConfigurationServiceImpl">
<interfaces>
- <value>org.osgi.framework.SynchronousBundleListener</value>
<value>org.apache.unomi.router.api.services.ImportExportConfigurationService</value>
</interfaces>
<service-properties>
@@ -53,29 +48,23 @@
</service-properties>
</service>
- <bean id="profileImportServiceImpl" class="org.apache.unomi.router.services.ProfileImportServiceImpl"
- init-method="postConstruct" destroy-method="preDestroy">
+ <bean id="profileImportServiceImpl" class="org.apache.unomi.router.services.ProfileImportServiceImpl">
<property name="persistenceService" ref="persistenceService"/>
- <property name="bundleContext" ref="blueprintBundleContext"/>
</bean>
<service id="profileImportService" ref="profileImportServiceImpl">
<interfaces>
- <value>org.osgi.framework.SynchronousBundleListener</value>
<value>org.apache.unomi.router.api.services.ProfileImportService</value>
</interfaces>
</service>
- <bean id="profileExportServiceImpl" class="org.apache.unomi.router.services.ProfileExportServiceImpl"
- init-method="postConstruct" destroy-method="preDestroy">
+ <bean id="profileExportServiceImpl" class="org.apache.unomi.router.services.ProfileExportServiceImpl">
<property name="persistenceService" ref="persistenceService"/>
<property name="configSharingService" ref="configSharingService" />
- <property name="bundleContext" ref="blueprintBundleContext"/>
+ <property name="definitionsService" ref="definitionsService" />
</bean>
<service id="profileExportService" ref="profileExportServiceImpl">
<interfaces>
- <value>org.osgi.framework.SynchronousBundleListener</value>
<value>org.apache.unomi.router.api.services.ProfileExportService</value>
</interfaces>
</service>
-
</blueprint>
diff --git a/itests/src/test/java/org/apache/unomi/itests/BaseIT.java b/itests/src/test/java/org/apache/unomi/itests/BaseIT.java
index f62d32d8a..7e70e40ab 100644
--- a/itests/src/test/java/org/apache/unomi/itests/BaseIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/BaseIT.java
@@ -52,6 +52,7 @@ import org.apache.unomi.lifecycle.BundleWatcher;
import org.apache.unomi.persistence.spi.CustomObjectMapper;
import org.apache.unomi.persistence.spi.PersistenceService;
import org.apache.unomi.router.api.ExportConfiguration;
+import org.apache.unomi.router.api.IRouterCamelContext;
import org.apache.unomi.router.api.ImportConfiguration;
import org.apache.unomi.router.api.services.ImportExportConfigurationService;
import org.apache.unomi.schema.api.SchemaService;
@@ -150,6 +151,7 @@ public abstract class BaseIT extends KarafTestSupport {
protected PatchService patchService;
protected ImportExportConfigurationService<ImportConfiguration> importConfigurationService;
protected ImportExportConfigurationService<ExportConfiguration> exportConfigurationService;
+ protected IRouterCamelContext routerCamelContext;
protected UserListService userListService;
protected TopicService topicService;
@@ -194,9 +196,9 @@ public abstract class BaseIT extends KarafTestSupport {
patchService = getOsgiService(PatchService.class, 600000);
userListService = getOsgiService(UserListService.class, 600000);
topicService = getOsgiService(TopicService.class, 600000);
- patchService = getOsgiService(PatchService.class, 600000);
importConfigurationService = getOsgiService(ImportExportConfigurationService.class, "(configDiscriminator=IMPORT)", 600000);
exportConfigurationService = getOsgiService(ImportExportConfigurationService.class, "(configDiscriminator=EXPORT)", 600000);
+ routerCamelContext = getOsgiService(IRouterCamelContext.class, 600000);
// init httpClient
httpClient = initHttpClient();
diff --git a/itests/src/test/java/org/apache/unomi/itests/ProfileImportRankingIT.java b/itests/src/test/java/org/apache/unomi/itests/ProfileImportRankingIT.java
index 2b321b26e..18c7e8128 100644
--- a/itests/src/test/java/org/apache/unomi/itests/ProfileImportRankingIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/ProfileImportRankingIT.java
@@ -50,7 +50,7 @@ public class ProfileImportRankingIT extends BaseIT {
@Test
public void testImportRanking() throws InterruptedException {
- importConfigurationService.getRouterCamelContext().setTracing(true);
+ routerCamelContext.setTracing(true);
/*** Create Missing Properties ***/
PropertyType propertyTypeUciId = new PropertyType(new Metadata("integration", "uciId", "UCI ID", "UCI ID"));