You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by jk...@apache.org on 2023/02/03 16:49:34 UTC
[unomi] 01/01: try to simplify configuration of persistence bundle by replacing blueprint by DS
This is an automated email from the ASF dual-hosted git repository.
jkevan pushed a commit to branch tryReplaceBlueprintByDS
in repository https://gitbox.apache.org/repos/asf/unomi.git
commit 1d38b2ef1e29daff71adccf8110ba2cda9df8f5f
Author: Kevan <ke...@jahia.com>
AuthorDate: Fri Feb 3 17:49:16 2023 +0100
try to simplify configuration of persistence bundle by replacing blueprint by DS
---
persistence-elasticsearch/core/pom.xml | 1 +
.../ElasticSearchPersistenceServiceConf.java | 72 +++
.../ElasticSearchPersistenceServiceImpl.java | 556 ++++++---------------
.../ConditionESQueryBuilderDispatcher.java | 26 +-
.../conditions/ConditionEvaluatorDispatcher.java | 24 +-
.../resources/OSGI-INF/blueprint/blueprint.xml | 183 -------
6 files changed, 247 insertions(+), 615 deletions(-)
diff --git a/persistence-elasticsearch/core/pom.xml b/persistence-elasticsearch/core/pom.xml
index 8aa4fd7ba..282e37dee 100644
--- a/persistence-elasticsearch/core/pom.xml
+++ b/persistence-elasticsearch/core/pom.xml
@@ -238,6 +238,7 @@
<extensions>true</extensions>
<configuration>
<instructions>
+ <_dsannotations>*</_dsannotations>
<Import-Package>
org.jctools.queues;resolution:=optional,
org.apache.logging.log4j.util.internal;resolution:=optional,
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceConf.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceConf.java
new file mode 100644
index 000000000..04783bfa3
--- /dev/null
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceConf.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.persistence.elasticsearch;
+
+import org.osgi.service.metatype.annotations.ObjectClassDefinition;
+
+@ObjectClassDefinition(name = "ElasticSearch persistence service config", description = "The configuration for the ElasticSearch persistence service")
+public @interface ElasticSearchPersistenceServiceConf {
+
+ String minimalElasticSearchVersion() default "7.0.0";
+ String maximalElasticSearchVersion() default "8.0.0";
+ String cluster_name() default "contextElasticSearch";
+ String elasticSearchAddresses() default "localhost:9200";
+ String index_prefix() default "context";
+ String username();
+ String password();
+ boolean sslEnable() default false;
+ boolean sslTrustAllCertificates() default false;
+ boolean throwExceptions() default false;
+ boolean alwaysOverwrite() default true;
+ String logLevelRestClient() default "ERROR";
+ String fatalIllegalStateErrors();
+
+ String numberOfShards() default "5";
+ String numberOfReplicas() default "0";
+ String indexMappingTotalFieldsLimit() default "1000";
+ String indexMaxDocValueFieldsSearch() default "1000";
+
+ String monthlyIndex_numberOfShards() default "3"; /* Deprecate use rollover prop instead */
+ String monthlyIndex_numberOfReplicas() default "0"; /* Deprecate use rollover prop instead */
+ String monthlyIndex_indexMappingTotalFieldsLimit() default "1000"; /* Deprecate use rollover prop instead */
+ String monthlyIndex_indexMaxDocValueFieldsSearch() default "1000"; /* Deprecate use rollover prop instead */
+ String monthlyIndex_itemsMonthlyIndexedOverride() default "event,session"; /* Deprecate use rollover prop instead */
+ String rollover_numberOfShards();
+ String rollover_numberOfReplicas();
+ String rollover_indexMappingTotalFieldsLimit();
+ String rollover_indexMaxDocValueFieldsSearch();
+ String rollover_indices();
+ String rollover_maxSize();
+ String rollover_maxAge() default "365d";
+ String rollover_maxDocs();
+
+ int defaultQueryLimit() default 10;
+ int removeByQueryTimeoutInMinutes() default 10;
+ int aggregateQueryBucketSize() default 5000;
+ int clientSocketTimeout() default -1;
+ int aggQueryMaxResponseSizeHttp() default -1;
+ boolean aggQueryThrowOnMissingDocs() default false;
+ boolean useBatchingForSave() default false;
+ boolean useBatchingForUpdate() default true;
+ String itemTypeToRefreshPolicy();
+
+ int bulkProcessor_concurrentRequests() default 1;
+ int bulkProcessor_bulkActions() default 1000;
+ String bulkProcessor_bulkSize() default "5MB";
+ String bulkProcessor_flushInterval() default "5s";
+ String bulkProcessor_backoffPolicy() default "exponential";
+}
\ No newline at end of file
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index 864548f3e..dad8758d4 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -36,9 +36,7 @@ import org.apache.unomi.api.query.NumericRange;
import org.apache.unomi.metrics.MetricAdapter;
import org.apache.unomi.metrics.MetricsService;
import org.apache.unomi.persistence.elasticsearch.conditions.ConditionContextHelper;
-import org.apache.unomi.persistence.elasticsearch.conditions.ConditionESQueryBuilder;
import org.apache.unomi.persistence.elasticsearch.conditions.ConditionESQueryBuilderDispatcher;
-import org.apache.unomi.persistence.elasticsearch.conditions.ConditionEvaluator;
import org.apache.unomi.persistence.elasticsearch.conditions.ConditionEvaluatorDispatcher;
import org.apache.unomi.persistence.spi.PersistenceService;
import org.apache.unomi.persistence.spi.aggregate.BaseAggregate;
@@ -138,8 +136,9 @@ import org.elasticsearch.search.sort.SortOrder;
import org.osgi.framework.Bundle;
import org.osgi.framework.BundleContext;
import org.osgi.framework.BundleEvent;
-import org.osgi.framework.ServiceReference;
import org.osgi.framework.SynchronousBundleListener;
+import org.osgi.service.component.annotations.*;
+import org.osgi.service.metatype.annotations.Designate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -169,313 +168,88 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
+@Component(immediate = true, service = {PersistenceService.class, SynchronousBundleListener.class})
+@Designate(ocd = ElasticSearchPersistenceServiceConf.class)
@SuppressWarnings("rawtypes")
public class ElasticSearchPersistenceServiceImpl implements PersistenceService, SynchronousBundleListener {
- public static final String BULK_PROCESSOR_CONCURRENT_REQUESTS = "bulkProcessor.concurrentRequests";
- public static final String BULK_PROCESSOR_BULK_ACTIONS = "bulkProcessor.bulkActions";
+ private static final Logger logger = LoggerFactory.getLogger(ElasticSearchPersistenceServiceImpl.class.getName());
+
public static final String BULK_PROCESSOR_BULK_SIZE = "bulkProcessor.bulkSize";
public static final String BULK_PROCESSOR_FLUSH_INTERVAL = "bulkProcessor.flushInterval";
public static final String BULK_PROCESSOR_BACKOFF_POLICY = "bulkProcessor.backoffPolicy";
- public static final String MONTHLY_INDEX_ITEMS_MONTHLY_INDEXED = "monthlyIndex.itemsMonthlyIndexedOverride";
public static final String INDEX_DATE_PREFIX = "date-";
public static final String SEQ_NO = "seq_no";
public static final String PRIMARY_TERM = "primary_term";
-
- private static final Logger logger = LoggerFactory.getLogger(ElasticSearchPersistenceServiceImpl.class.getName());
private static final String ROLLOVER_LIFECYCLE_NAME = "unomi-rollover-policy";
- private boolean throwExceptions = false;
- private RestHighLevelClient client;
- private BulkProcessor bulkProcessor;
- private String elasticSearchAddresses;
- private List<String> elasticSearchAddressList = new ArrayList<>();
- private String clusterName;
- private String indexPrefix;
- private String monthlyIndexNumberOfShards;
- private String monthlyIndexNumberOfReplicas;
- private String monthlyIndexMappingTotalFieldsLimit;
- private String monthlyIndexMaxDocValueFieldsSearch;
- private String numberOfShards;
- private String numberOfReplicas;
- private String indexMappingTotalFieldsLimit;
- private String indexMaxDocValueFieldsSearch;
- private String[] fatalIllegalStateErrors;
- private BundleContext bundleContext;
- private Map<String, String> mappings = new HashMap<String, String>();
+
+ private ElasticSearchPersistenceServiceConf conf;
private ConditionEvaluatorDispatcher conditionEvaluatorDispatcher;
private ConditionESQueryBuilderDispatcher conditionESQueryBuilderDispatcher;
- private List<String> itemsMonthlyIndexed;
- private Map<String, String> routingByType;
-
- private Integer defaultQueryLimit = 10;
- private Integer removeByQueryTimeoutInMinutes = 10;
-
- private String bulkProcessorConcurrentRequests = "1";
- private String bulkProcessorBulkActions = "1000";
- private String bulkProcessorBulkSize = "5MB";
- private String bulkProcessorFlushInterval = "5s";
- private String bulkProcessorBackoffPolicy = "exponential";
-
- // Rollover configuration
- private List<String> rolloverIndices;
- private String rolloverMaxSize;
- private String rolloverMaxAge;
- private String rolloverMaxDocs;
- private String rolloverIndexNumberOfShards;
- private String rolloverIndexNumberOfReplicas;
- private String rolloverIndexMappingTotalFieldsLimit;
- private String rolloverIndexMaxDocValueFieldsSearch;
-
- private String minimalElasticSearchVersion = "7.0.0";
- private String maximalElasticSearchVersion = "8.0.0";
-
- // authentication props
- private String username;
- private String password;
- private boolean sslEnable = false;
- private boolean sslTrustAllCertificates = false;
-
- private int aggregateQueryBucketSize = 5000;
-
private MetricsService metricsService;
- private boolean useBatchingForSave = false;
- private boolean useBatchingForUpdate = true;
- private String logLevelRestClient = "ERROR";
- private boolean alwaysOverwrite = true;
- private boolean aggQueryThrowOnMissingDocs = false;
- private Integer aggQueryMaxResponseSizeHttp = null;
- private Integer clientSocketTimeout = null;
- private Map<String, WriteRequest.RefreshPolicy> itemTypeToRefreshPolicy = new HashMap<>();
+ private BundleContext bundleContext;
+ private List<String> itemsMonthlyIndexed;
+ private List<String> rolloverIndices;
+ private Map<String, String> mappings = new HashMap<String, String>();
private Map<String, Map<String, Map<String, Object>>> knownMappings = new HashMap<>();
+ private RestHighLevelClient client;
+ private BulkProcessor bulkProcessor;
+ private List<String> elasticSearchAddressList = new ArrayList<>();
+ private String[] fatalIllegalStateErrors = new String[]{};
+ private Map<String, WriteRequest.RefreshPolicy> itemTypeToRefreshPolicy = new HashMap<>();
- public void setBundleContext(BundleContext bundleContext) {
- this.bundleContext = bundleContext;
- }
-
- public void setClusterName(String clusterName) {
- this.clusterName = clusterName;
- }
-
- public void setElasticSearchAddresses(String elasticSearchAddresses) {
- this.elasticSearchAddresses = elasticSearchAddresses;
- String[] elasticSearchAddressesArray = elasticSearchAddresses.split(",");
- elasticSearchAddressList.clear();
- for (String elasticSearchAddress : elasticSearchAddressesArray) {
- elasticSearchAddressList.add(elasticSearchAddress.trim());
- }
- }
-
- public void setItemTypeToRefreshPolicy(String itemTypeToRefreshPolicy) throws IOException {
- if (!itemTypeToRefreshPolicy.isEmpty()) {
- this.itemTypeToRefreshPolicy = new ObjectMapper().readValue(itemTypeToRefreshPolicy,
- new TypeReference<HashMap<String, WriteRequest.RefreshPolicy>>() {
- });
- }
- }
-
- public void setFatalIllegalStateErrors(String fatalIllegalStateErrors) {
- this.fatalIllegalStateErrors = Arrays.stream(fatalIllegalStateErrors.split(","))
- .map(i -> i.trim()).filter(i -> !i.isEmpty()).toArray(String[]::new);
- }
-
- public void setAggQueryMaxResponseSizeHttp(String aggQueryMaxResponseSizeHttp) {
- if (StringUtils.isNumeric(aggQueryMaxResponseSizeHttp)) {
- this.aggQueryMaxResponseSizeHttp = Integer.parseInt(aggQueryMaxResponseSizeHttp);
- }
- }
-
- public void setIndexPrefix(String indexPrefix) {
- this.indexPrefix = indexPrefix;
- }
-
- @Deprecated
- public void setMonthlyIndexNumberOfShards(String monthlyIndexNumberOfShards) {
- this.monthlyIndexNumberOfShards = monthlyIndexNumberOfShards;
- }
-
- @Deprecated
- public void setMonthlyIndexNumberOfReplicas(String monthlyIndexNumberOfReplicas) {
- this.monthlyIndexNumberOfReplicas = monthlyIndexNumberOfReplicas;
- }
-
- @Deprecated
- public void setMonthlyIndexMappingTotalFieldsLimit(String monthlyIndexMappingTotalFieldsLimit) {
- this.monthlyIndexMappingTotalFieldsLimit = monthlyIndexMappingTotalFieldsLimit;
- }
-
- @Deprecated
- public void setMonthlyIndexMaxDocValueFieldsSearch(String monthlyIndexMaxDocValueFieldsSearch) {
- this.monthlyIndexMaxDocValueFieldsSearch = monthlyIndexMaxDocValueFieldsSearch;
- }
-
- @Deprecated
- public void setItemsMonthlyIndexedOverride(String itemsMonthlyIndexedOverride) {
- this.itemsMonthlyIndexed = StringUtils.isNotEmpty(itemsMonthlyIndexedOverride) ? Arrays.asList(itemsMonthlyIndexedOverride.split(",").clone()) : Collections.emptyList();
- }
-
- public void setNumberOfShards(String numberOfShards) {
- this.numberOfShards = numberOfShards;
- }
-
- public void setNumberOfReplicas(String numberOfReplicas) {
- this.numberOfReplicas = numberOfReplicas;
- }
-
- public void setIndexMappingTotalFieldsLimit(String indexMappingTotalFieldsLimit) {
- this.indexMappingTotalFieldsLimit = indexMappingTotalFieldsLimit;
- }
-
- public void setIndexMaxDocValueFieldsSearch(String indexMaxDocValueFieldsSearch) {
- this.indexMaxDocValueFieldsSearch = indexMaxDocValueFieldsSearch;
- }
-
- public void setDefaultQueryLimit(Integer defaultQueryLimit) {
- this.defaultQueryLimit = defaultQueryLimit;
- }
-
- public void setRoutingByType(Map<String, String> routingByType) {
- this.routingByType = routingByType;
- }
-
+ @Reference
public void setConditionEvaluatorDispatcher(ConditionEvaluatorDispatcher conditionEvaluatorDispatcher) {
this.conditionEvaluatorDispatcher = conditionEvaluatorDispatcher;
}
+ @Reference
public void setConditionESQueryBuilderDispatcher(ConditionESQueryBuilderDispatcher conditionESQueryBuilderDispatcher) {
this.conditionESQueryBuilderDispatcher = conditionESQueryBuilderDispatcher;
}
- public void setBulkProcessorConcurrentRequests(String bulkProcessorConcurrentRequests) {
- this.bulkProcessorConcurrentRequests = bulkProcessorConcurrentRequests;
- }
-
- public void setBulkProcessorBulkActions(String bulkProcessorBulkActions) {
- this.bulkProcessorBulkActions = bulkProcessorBulkActions;
- }
-
- public void setBulkProcessorBulkSize(String bulkProcessorBulkSize) {
- this.bulkProcessorBulkSize = bulkProcessorBulkSize;
- }
-
- public void setBulkProcessorFlushInterval(String bulkProcessorFlushInterval) {
- this.bulkProcessorFlushInterval = bulkProcessorFlushInterval;
- }
-
- public void setBulkProcessorBackoffPolicy(String bulkProcessorBackoffPolicy) {
- this.bulkProcessorBackoffPolicy = bulkProcessorBackoffPolicy;
- }
-
- public void setRolloverIndices(String rolloverIndices) {
- this.rolloverIndices = StringUtils.isNotEmpty(rolloverIndices) ? Arrays.asList(rolloverIndices.split(",").clone()) : null;
- }
-
- public void setRolloverMaxSize(String rolloverMaxSize) {
- this.rolloverMaxSize = rolloverMaxSize;
- }
-
- public void setRolloverMaxAge(String rolloverMaxAge) {
- this.rolloverMaxAge = rolloverMaxAge;
- }
-
- public void setRolloverMaxDocs(String rolloverMaxDocs) {
- this.rolloverMaxDocs = rolloverMaxDocs;
- }
-
- public void setRolloverIndexNumberOfShards(String rolloverIndexNumberOfShards) {
- this.rolloverIndexNumberOfShards = rolloverIndexNumberOfShards;
- }
-
- public void setRolloverIndexNumberOfReplicas(String rolloverIndexNumberOfReplicas) {
- this.rolloverIndexNumberOfReplicas = rolloverIndexNumberOfReplicas;
- }
-
- public void setRolloverIndexMappingTotalFieldsLimit(String rolloverIndexMappingTotalFieldsLimit) {
- this.rolloverIndexMappingTotalFieldsLimit = rolloverIndexMappingTotalFieldsLimit;
- }
-
- public void setRolloverIndexMaxDocValueFieldsSearch(String rolloverIndexMaxDocValueFieldsSearch) {
- this.rolloverIndexMaxDocValueFieldsSearch = rolloverIndexMaxDocValueFieldsSearch;
- }
-
- public void setMinimalElasticSearchVersion(String minimalElasticSearchVersion) {
- this.minimalElasticSearchVersion = minimalElasticSearchVersion;
- }
-
- public void setMaximalElasticSearchVersion(String maximalElasticSearchVersion) {
- this.maximalElasticSearchVersion = maximalElasticSearchVersion;
- }
-
- public void setAggregateQueryBucketSize(int aggregateQueryBucketSize) {
- this.aggregateQueryBucketSize = aggregateQueryBucketSize;
- }
-
- public void setClientSocketTimeout(String clientSocketTimeout) {
- if (StringUtils.isNumeric(clientSocketTimeout)) {
- this.clientSocketTimeout = Integer.parseInt(clientSocketTimeout);
- }
- }
-
+ @Reference
public void setMetricsService(MetricsService metricsService) {
this.metricsService = metricsService;
}
- public void setUseBatchingForSave(boolean useBatchingForSave) {
- this.useBatchingForSave = useBatchingForSave;
- }
-
- public void setUseBatchingForUpdate(boolean useBatchingForUpdate) {
- this.useBatchingForUpdate = useBatchingForUpdate;
- }
-
- public void setUsername(String username) {
- this.username = username;
- }
-
- public void setPassword(String password) {
- this.password = password;
- }
-
- public void setSslEnable(boolean sslEnable) {
- this.sslEnable = sslEnable;
- }
-
- public void setSslTrustAllCertificates(boolean sslTrustAllCertificates) {
- this.sslTrustAllCertificates = sslTrustAllCertificates;
- }
-
-
- public void setAggQueryThrowOnMissingDocs(boolean aggQueryThrowOnMissingDocs) {
- this.aggQueryThrowOnMissingDocs = aggQueryThrowOnMissingDocs;
- }
-
- public void setThrowExceptions(boolean throwExceptions) {
- this.throwExceptions = throwExceptions;
- }
-
- public void setAlwaysOverwrite(boolean alwaysOverwrite) {
- this.alwaysOverwrite = alwaysOverwrite;
- }
-
- public void setLogLevelRestClient(String logLevelRestClient) {
- this.logLevelRestClient = logLevelRestClient;
- }
+ @Activate
+ public void start(ElasticSearchPersistenceServiceConf conf, BundleContext bundleContext) throws Exception {
+ this.bundleContext = bundleContext;
+ this.conf = conf;
- public void start() throws Exception {
+ // init some data based on conf
+ if (StringUtils.isNotEmpty(conf.elasticSearchAddresses())) {
+ this.elasticSearchAddressList = Arrays.stream(conf.elasticSearchAddresses().split(",")).map(String::trim).filter(i -> !i.isEmpty()).collect(Collectors.toList());
+ }
+ if (StringUtils.isNotEmpty(conf.itemTypeToRefreshPolicy())){
+ this.itemTypeToRefreshPolicy = new ObjectMapper().readValue(conf.itemTypeToRefreshPolicy(), new TypeReference<HashMap<String, WriteRequest.RefreshPolicy>>() {});
+ }
+ if (StringUtils.isNotEmpty(conf.fatalIllegalStateErrors())) {
+ this.fatalIllegalStateErrors = Arrays.stream(conf.fatalIllegalStateErrors().split(",")).map(String::trim).filter(i -> !i.isEmpty()).toArray(String[]::new);
+ }
+ if (StringUtils.isNotEmpty(conf.monthlyIndex_itemsMonthlyIndexedOverride())) {
+ this.itemsMonthlyIndexed = Arrays.asList(conf.monthlyIndex_itemsMonthlyIndexedOverride().split(",").clone());
+ }
+ if (StringUtils.isNotEmpty(conf.rollover_indices())) {
+ this.rolloverIndices = Arrays.asList(conf.rollover_indices().split(",").clone());
+ }
// Work around to avoid ES Logs regarding the deprecated [ignore_throttled] parameter
try {
- Level lvl = Level.toLevel(logLevelRestClient, Level.ERROR);
+ Level lvl = Level.toLevel(conf.logLevelRestClient(), Level.ERROR);
org.apache.log4j.Logger.getLogger("org.elasticsearch.client.RestClient").setLevel(lvl);
} catch (Exception e) {
// Never fail because of the set of the logger
}
// on startup
- new InClassLoaderExecute<Object>(null, null, this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
+ new InClassLoaderExecute<Object>(null, null, this.bundleContext, this.fatalIllegalStateErrors, conf.throwExceptions()) {
public Object execute(Object... args) throws Exception {
buildClient();
@@ -483,8 +257,8 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
MainResponse response = client.info(RequestOptions.DEFAULT);
MainResponse.Version version = response.getVersion();
Version clusterVersion = Version.fromString(version.getNumber());
- Version minimalVersion = Version.fromString(minimalElasticSearchVersion);
- Version maximalVersion = Version.fromString(maximalElasticSearchVersion);
+ Version minimalVersion = Version.fromString(conf.minimalElasticSearchVersion());
+ Version maximalVersion = Version.fromString(conf.maximalElasticSearchVersion());
if (clusterVersion.before(minimalVersion) ||
clusterVersion.equals(maximalVersion) ||
clusterVersion.after(maximalVersion)) {
@@ -529,20 +303,20 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
int elasticSearchPort = Integer.parseInt(elasticSearchAddressParts[1]);
// configure authentication
- nodeList.add(new Node(new HttpHost(elasticSearchHostName, elasticSearchPort, sslEnable ? "https" : "http")));
+ nodeList.add(new Node(new HttpHost(elasticSearchHostName, elasticSearchPort, conf.sslEnable() ? "https" : "http")));
}
RestClientBuilder clientBuilder = RestClient.builder(nodeList.toArray(new Node[nodeList.size()]));
- if (clientSocketTimeout != null) {
+ if (conf.clientSocketTimeout() > 0) {
clientBuilder.setRequestConfigCallback(requestConfigBuilder -> {
- requestConfigBuilder.setSocketTimeout(clientSocketTimeout);
+ requestConfigBuilder.setSocketTimeout(conf.clientSocketTimeout());
return requestConfigBuilder;
});
}
clientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
- if (sslTrustAllCertificates) {
+ if (conf.sslTrustAllCertificates()) {
try {
final SSLContext sslContext = SSLContext.getInstance("SSL");
sslContext.init(null, new TrustManager[]{new X509TrustManager() {
@@ -565,9 +339,9 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
}
- if (StringUtils.isNotBlank(username)) {
+ if (StringUtils.isNotBlank(conf.username())) {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
- credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
+ credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(conf.username(), conf.password()));
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
@@ -575,7 +349,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
return httpClientBuilder;
});
- logger.info("Connecting to ElasticSearch persistence backend using cluster name " + clusterName + " and index prefix " + indexPrefix + "...");
+ logger.info("Connecting to ElasticSearch persistence backend using cluster name " + conf.cluster_name() + " and index prefix " + conf.index_prefix() + "...");
client = new RestHighLevelClient(clientBuilder);
}
@@ -609,27 +383,23 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
bulkProcessorListener);
- if (bulkProcessorConcurrentRequests != null) {
- int concurrentRequests = Integer.parseInt(bulkProcessorConcurrentRequests);
- if (concurrentRequests > 1) {
- bulkProcessorBuilder.setConcurrentRequests(concurrentRequests);
- }
+ if (conf.bulkProcessor_concurrentRequests() > 1) {
+ bulkProcessorBuilder.setConcurrentRequests(conf.bulkProcessor_concurrentRequests());
}
- if (bulkProcessorBulkActions != null) {
- int bulkActions = Integer.parseInt(bulkProcessorBulkActions);
- bulkProcessorBuilder.setBulkActions(bulkActions);
+ if (conf.bulkProcessor_bulkActions() > 0) {
+ bulkProcessorBuilder.setBulkActions(conf.bulkProcessor_bulkActions());
}
- if (bulkProcessorBulkSize != null) {
- bulkProcessorBuilder.setBulkSize(ByteSizeValue.parseBytesSizeValue(bulkProcessorBulkSize, new ByteSizeValue(5, ByteSizeUnit.MB), BULK_PROCESSOR_BULK_SIZE));
+ if (conf.bulkProcessor_bulkSize() != null) {
+ bulkProcessorBuilder.setBulkSize(ByteSizeValue.parseBytesSizeValue(conf.bulkProcessor_bulkSize(), new ByteSizeValue(5, ByteSizeUnit.MB), BULK_PROCESSOR_BULK_SIZE));
}
- if (bulkProcessorFlushInterval != null) {
- bulkProcessorBuilder.setFlushInterval(TimeValue.parseTimeValue(bulkProcessorFlushInterval, null, BULK_PROCESSOR_FLUSH_INTERVAL));
+ if (conf.bulkProcessor_flushInterval() != null) {
+ bulkProcessorBuilder.setFlushInterval(TimeValue.parseTimeValue(conf.bulkProcessor_flushInterval(), null, BULK_PROCESSOR_FLUSH_INTERVAL));
} else {
// in ElasticSearch this defaults to null, but we would like to set a value to 5 seconds by default
bulkProcessorBuilder.setFlushInterval(new TimeValue(5, TimeUnit.SECONDS));
}
- if (bulkProcessorBackoffPolicy != null) {
- String backoffPolicyStr = bulkProcessorBackoffPolicy;
+ if (conf.bulkProcessor_backoffPolicy() != null) {
+ String backoffPolicyStr = conf.bulkProcessor_backoffPolicy();
if (backoffPolicyStr != null && backoffPolicyStr.length() > 0) {
backoffPolicyStr = backoffPolicyStr.toLowerCase();
if ("nobackoff".equals(backoffPolicyStr)) {
@@ -661,9 +431,9 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
return bulkProcessor;
}
+ @Deactivate
public void stop() {
-
- new InClassLoaderExecute<Object>(null, null, this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
+ new InClassLoaderExecute<Object>(null, null, this.bundleContext, this.fatalIllegalStateErrors, conf.throwExceptions()) {
protected Object execute(Object... args) throws IOException {
logger.info("Closing ElasticSearch persistence backend...");
if (bulkProcessor != null) {
@@ -683,30 +453,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
bundleContext.removeBundleListener(this);
}
- public void bindConditionEvaluator(ServiceReference<ConditionEvaluator> conditionEvaluatorServiceReference) {
- ConditionEvaluator conditionEvaluator = bundleContext.getService(conditionEvaluatorServiceReference);
- conditionEvaluatorDispatcher.addEvaluator(conditionEvaluatorServiceReference.getProperty("conditionEvaluatorId").toString(), conditionEvaluator);
- }
-
- public void unbindConditionEvaluator(ServiceReference<ConditionEvaluator> conditionEvaluatorServiceReference) {
- if (conditionEvaluatorServiceReference == null) {
- return;
- }
- conditionEvaluatorDispatcher.removeEvaluator(conditionEvaluatorServiceReference.getProperty("conditionEvaluatorId").toString());
- }
-
- public void bindConditionESQueryBuilder(ServiceReference<ConditionESQueryBuilder> conditionESQueryBuilderServiceReference) {
- ConditionESQueryBuilder conditionESQueryBuilder = bundleContext.getService(conditionESQueryBuilderServiceReference);
- conditionESQueryBuilderDispatcher.addQueryBuilder(conditionESQueryBuilderServiceReference.getProperty("queryBuilderId").toString(), conditionESQueryBuilder);
- }
-
- public void unbindConditionESQueryBuilder(ServiceReference<ConditionESQueryBuilder> conditionESQueryBuilderServiceReference) {
- if (conditionESQueryBuilderServiceReference == null) {
- return;
- }
- conditionESQueryBuilderDispatcher.removeQueryBuilder(conditionESQueryBuilderServiceReference.getProperty("queryBuilderId").toString());
- }
-
@Override
public void bundleChanged(BundleEvent event) {
switch (event.getType()) {
@@ -774,7 +520,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
public <T extends Item> PartialList<T> getAllItems(final Class<T> clazz, int offset, int size, String sortBy, String scrollTimeValidity) {
long startTime = System.currentTimeMillis();
try {
- return query(QueryBuilders.matchAllQuery(), sortBy, clazz, offset, size, null, scrollTimeValidity);
+ return query(QueryBuilders.matchAllQuery(), sortBy, clazz, offset, size, scrollTimeValidity);
} finally {
if (metricsService != null && metricsService.isActivated()) {
metricsService.updateTimer(this.getClass().getName() + ".getAllItems", startTime);
@@ -805,7 +551,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
private <T extends Item> T load(final String itemId, final Class<T> clazz, final String customItemType) {
- return new InClassLoaderExecute<T>(metricsService, this.getClass().getName() + ".loadItem", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
+ return new InClassLoaderExecute<T>(metricsService, this.getClass().getName() + ".loadItem", this.bundleContext, this.fatalIllegalStateErrors, conf.throwExceptions()) {
protected T execute(Object... args) throws Exception {
try {
String itemType = Item.getItemType(clazz);
@@ -818,12 +564,12 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
@Override
public T execute(Object... args) throws Exception {
if (customItemType == null) {
- PartialList<T> r = query(QueryBuilders.idsQuery().addIds(itemId), null, clazz, 0, 1, null, null);
+ PartialList<T> r = query(QueryBuilders.idsQuery().addIds(itemId), null, clazz, 0, 1, null);
if (r.size() > 0) {
return r.get(0);
}
} else {
- PartialList<CustomItem> r = query(QueryBuilders.idsQuery().addIds(itemId), null, customItemType, 0, 1, null, null);
+ PartialList<CustomItem> r = query(QueryBuilders.idsQuery().addIds(itemId), null, customItemType, 0, 1, null);
if (r.size() > 0) {
return (T) r.get(0);
}
@@ -875,28 +621,26 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
@Override
public boolean save(final Item item) {
- return save(item, useBatchingForSave, alwaysOverwrite);
+ return save(item, conf.useBatchingForSave(), conf.alwaysOverwrite());
}
@Override
public boolean save(final Item item, final boolean useBatching) {
- return save(item, useBatching, alwaysOverwrite);
+ return save(item, useBatching, conf.alwaysOverwrite());
}
@Override
public boolean save(final Item item, final Boolean useBatchingOption, final Boolean alwaysOverwriteOption) {
- final boolean useBatching = useBatchingOption == null ? this.useBatchingForSave : useBatchingOption;
- final boolean alwaysOverwrite = alwaysOverwriteOption == null ? this.alwaysOverwrite : alwaysOverwriteOption;
+ final boolean useBatching = useBatchingOption == null ? conf.useBatchingForSave() : useBatchingOption;
+ final boolean alwaysOverwrite = alwaysOverwriteOption == null ? conf.alwaysOverwrite() : alwaysOverwriteOption;
- Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".saveItem", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
+ Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".saveItem", this.bundleContext, this.fatalIllegalStateErrors, conf.throwExceptions()) {
protected Boolean execute(Object... args) throws Exception {
try {
String source = ESCustomObjectMapper.getObjectMapper().writeValueAsString(item);
String itemType = item.getItemType();
- String className = item.getClass().getName();
if (item instanceof CustomItem) {
itemType = ((CustomItem) item).getCustomItemType();
- className = CustomItem.class.getName() + "." + itemType;
}
String itemId = item.getItemId();
String index = item.getSystemMetadata("index") != null ?
@@ -918,10 +662,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
}
- if (routingByType.containsKey(itemType)) {
- indexRequest.routing(routingByType.get(itemType));
- }
-
try {
if (bulkProcessor == null || !useBatching) {
indexRequest.setRefreshPolicy(getRefreshPolicy(itemType));
@@ -965,23 +705,23 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
@Override
public boolean update(final Item item, final Class clazz, final String propertyName, final Object propertyValue) {
- return update(item, clazz, Collections.singletonMap(propertyName, propertyValue), alwaysOverwrite);
+ return update(item, clazz, Collections.singletonMap(propertyName, propertyValue), conf.alwaysOverwrite());
}
@Override
public boolean update(final Item item, final Class clazz, final Map source) {
- return update(item, clazz, source, alwaysOverwrite);
+ return update(item, clazz, source, conf.alwaysOverwrite());
}
@Override
public boolean update(final Item item, final Class clazz, final Map source, final boolean alwaysOverwrite) {
- Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".updateItem", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
+ Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".updateItem", this.bundleContext, this.fatalIllegalStateErrors, conf.throwExceptions()) {
protected Boolean execute(Object... args) throws Exception {
try {
UpdateRequest updateRequest = createUpdateRequest(clazz, item, source, alwaysOverwrite);
- if (bulkProcessor == null || !useBatchingForUpdate) {
+ if (bulkProcessor == null || !conf.useBatchingForUpdate()) {
UpdateResponse response = client.update(updateRequest, RequestOptions.DEFAULT);
setMetadata(item, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm(), response.getIndex());
} else {
@@ -1022,13 +762,13 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
if (items.size() == 0)
return new ArrayList<>();
- List<String> result = new InClassLoaderExecute<List<String>>(metricsService, this.getClass().getName() + ".updateItems", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
+ List<String> result = new InClassLoaderExecute<List<String>>(metricsService, this.getClass().getName() + ".updateItems", this.bundleContext, this.fatalIllegalStateErrors, conf.throwExceptions()) {
protected List<String> execute(Object... args) throws Exception {
long batchRequestStartTime = System.currentTimeMillis();
BulkRequest bulkRequest = new BulkRequest();
items.forEach((item, source) -> {
- UpdateRequest updateRequest = createUpdateRequest(clazz, item, source, alwaysOverwrite);
+ UpdateRequest updateRequest = createUpdateRequest(clazz, item, source, conf.alwaysOverwrite());
bulkRequest.add(updateRequest);
});
@@ -1079,7 +819,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
private boolean updateWithQueryAndScript(final Class<?> clazz, final Script[] scripts, final Condition[] conditions) {
- Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".updateWithQueryAndScript", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
+ Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".updateWithQueryAndScript", this.bundleContext, this.fatalIllegalStateErrors, conf.throwExceptions()) {
protected Boolean execute(Object... args) throws Exception {
try {
String itemType = Item.getItemType(clazz);
@@ -1134,7 +874,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
@Override
public boolean storeScripts(Map<String, String> scripts) {
- Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".storeScripts", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
+ Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".storeScripts", this.bundleContext, this.fatalIllegalStateErrors, conf.throwExceptions()) {
protected Boolean execute(Object... args) throws Exception {
boolean executedSuccessfully = true;
for (Map.Entry<String, String> script : scripts.entrySet()) {
@@ -1176,7 +916,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
@Override
public boolean updateWithScript(final Item item, final Class<?> clazz, final String script, final Map<String, Object> scriptParams) {
- Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".updateWithScript", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
+ Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".updateWithScript", this.bundleContext, this.fatalIllegalStateErrors, conf.throwExceptions()) {
protected Boolean execute(Object... args) throws Exception {
try {
String itemType = Item.getItemType(clazz);
@@ -1226,7 +966,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
private <T extends Item> boolean remove(final String itemId, final Class<T> clazz, String customItemType) {
- Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeItem", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
+ Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeItem", this.bundleContext, this.fatalIllegalStateErrors, conf.throwExceptions()) {
protected Boolean execute(Object... args) throws Exception {
try {
String itemType = Item.getItemType(clazz);
@@ -1250,7 +990,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
public <T extends Item> boolean removeByQuery(final Condition query, final Class<T> clazz) {
- Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeByQuery", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
+ Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeByQuery", this.bundleContext, this.fatalIllegalStateErrors, conf.throwExceptions()) {
protected Boolean execute(Object... args) throws Exception {
try {
String itemType = Item.getItemType(clazz);
@@ -1267,7 +1007,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
// Remove by Query is mostly used for purge and cleaning up old data
// It's mostly used in jobs/timed tasks so we don't really care about long request
// So we increase default timeout of 1min to 10min
- .setTimeout(TimeValue.timeValueMinutes(removeByQueryTimeoutInMinutes));
+ .setTimeout(TimeValue.timeValueMinutes(conf.removeByQueryTimeoutInMinutes()));
BulkByScrollResponse bulkByScrollResponse = client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
@@ -1277,7 +1017,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
if (bulkByScrollResponse.isTimedOut()) {
- logger.warn("Remove by query: timed out because took more than {} minutes for query: {}", removeByQueryTimeoutInMinutes, query);
+ logger.warn("Remove by query: timed out because took more than {} minutes for query: {}", conf.removeByQueryTimeoutInMinutes(), query);
}
if ((bulkByScrollResponse.getSearchFailures() != null && bulkByScrollResponse.getSearchFailures().size() > 0) ||
@@ -1324,7 +1064,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
public boolean indexTemplateExists(final String templateName) {
- Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".indexTemplateExists", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
+ Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".indexTemplateExists", this.bundleContext, this.fatalIllegalStateErrors, conf.throwExceptions()) {
protected Boolean execute(Object... args) throws IOException {
IndexTemplatesExistRequest indexTemplatesExistRequest = new IndexTemplatesExistRequest(templateName);
return client.indices().existsTemplate(indexTemplatesExistRequest, RequestOptions.DEFAULT);
@@ -1338,7 +1078,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
public boolean removeIndexTemplate(final String templateName) {
- Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeIndexTemplate", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
+ Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeIndexTemplate", this.bundleContext, this.fatalIllegalStateErrors, conf.throwExceptions()) {
protected Boolean execute(Object... args) throws IOException {
DeleteIndexTemplateRequest deleteIndexTemplateRequest = new DeleteIndexTemplateRequest(templateName);
AcknowledgedResponse deleteIndexTemplateResponse = client.indices().deleteTemplate(deleteIndexTemplateRequest, RequestOptions.DEFAULT);
@@ -1353,17 +1093,17 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
public boolean registerRolloverLifecyclePolicy() {
- Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".createMonthlyIndexLifecyclePolicy", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
+ Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".createMonthlyIndexLifecyclePolicy", this.bundleContext, this.fatalIllegalStateErrors, conf.throwExceptions()) {
protected Boolean execute(Object... args) throws IOException {
// Create the lifecycle policy for monthly indices
Map<String, Phase> phases = new HashMap<>();
Map<String, LifecycleAction> hotActions = new HashMap<>();
- final Long maxDocs = StringUtils.isEmpty(rolloverMaxDocs) ? null : Long.parseLong(rolloverMaxDocs);
+ final Long maxDocs = StringUtils.isEmpty(conf.rollover_maxDocs()) ? null : Long.parseLong(conf.rollover_maxDocs());
hotActions.put(
RolloverAction.NAME,
new RolloverAction(
- StringUtils.isEmpty(rolloverMaxSize) ? null : ByteSizeValue.parseBytesSizeValue(rolloverMaxSize, "rollover.maxSize"),
- StringUtils.isEmpty(rolloverMaxAge) ? null : TimeValue.parseTimeValue(rolloverMaxAge, null, "rollover.maxAge"),
+ StringUtils.isEmpty(conf.rollover_maxSize()) ? null : ByteSizeValue.parseBytesSizeValue(conf.rollover_maxSize(), "rollover.maxSize"),
+ StringUtils.isEmpty(conf.rollover_maxAge()) ? null : TimeValue.parseTimeValue(conf.rollover_maxAge(), null, "rollover.maxAge"),
maxDocs
)
);
@@ -1373,7 +1113,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
Map<String, LifecycleAction> deleteActions = Collections.singletonMap(DeleteAction.NAME, new DeleteAction());
phases.put("delete", new Phase("delete", new TimeValue(90, TimeUnit.DAYS), deleteActions));
- LifecyclePolicy policy = new LifecyclePolicy(indexPrefix + "-" + ROLLOVER_LIFECYCLE_NAME, phases);
+ LifecyclePolicy policy = new LifecyclePolicy(conf.index_prefix() + "-" + ROLLOVER_LIFECYCLE_NAME, phases);
PutLifecyclePolicyRequest request = new PutLifecyclePolicyRequest(policy);
org.elasticsearch.client.core.AcknowledgedResponse putLifecyclePolicy = client.indexLifecycle().putLifecyclePolicy(request, RequestOptions.DEFAULT);
return putLifecyclePolicy.isAcknowledged();
@@ -1388,7 +1128,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
public boolean createIndex(final String itemType) {
- Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".createIndex", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
+ Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".createIndex", this.bundleContext, this.fatalIllegalStateErrors, conf.throwExceptions()) {
protected Boolean execute(Object... args) throws IOException {
String index = getIndex(itemType);
GetIndexRequest getIndexRequest = new GetIndexRequest(index);
@@ -1417,7 +1157,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
public boolean removeIndex(final String itemType) {
String index = getIndex(itemType);
- Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeIndex", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
+ Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeIndex", this.bundleContext, this.fatalIllegalStateErrors, conf.throwExceptions()) {
protected Boolean execute(Object... args) throws IOException {
GetIndexRequest getIndexRequest = new GetIndexRequest(index);
boolean indexExists = client.indices().exists(getIndexRequest, RequestOptions.DEFAULT);
@@ -1437,17 +1177,17 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
private void internalCreateRolloverTemplate(String itemName) throws IOException {
- String rolloverAlias = indexPrefix + "-" + itemName;
+ String rolloverAlias = conf.index_prefix() + "-" + itemName;
PutIndexTemplateRequest putIndexTemplateRequest = new PutIndexTemplateRequest(rolloverAlias + "-rollover-template")
.patterns(Collections.singletonList(getRolloverIndexForQuery(itemName)))
.order(1)
.settings("{\n" +
" \"index\" : {\n" +
- " \"number_of_shards\" : " + StringUtils.defaultIfEmpty(rolloverIndexNumberOfShards, monthlyIndexNumberOfShards) + ",\n" +
- " \"number_of_replicas\" : " + StringUtils.defaultIfEmpty(rolloverIndexNumberOfReplicas, monthlyIndexNumberOfReplicas) + ",\n" +
- " \"mapping.total_fields.limit\" : " + StringUtils.defaultIfEmpty(rolloverIndexMappingTotalFieldsLimit, monthlyIndexMappingTotalFieldsLimit) + ",\n" +
- " \"max_docvalue_fields_search\" : " + StringUtils.defaultIfEmpty(rolloverIndexMaxDocValueFieldsSearch, monthlyIndexMaxDocValueFieldsSearch) + ",\n" +
- " \"lifecycle.name\": \"" + (indexPrefix + "-" + ROLLOVER_LIFECYCLE_NAME) + "\",\n" +
+ " \"number_of_shards\" : " + StringUtils.defaultIfEmpty(conf.rollover_numberOfShards(), conf.monthlyIndex_numberOfShards()) + ",\n" +
+ " \"number_of_replicas\" : " + StringUtils.defaultIfEmpty(conf.rollover_numberOfReplicas(), conf.monthlyIndex_numberOfReplicas()) + ",\n" +
+ " \"mapping.total_fields.limit\" : " + StringUtils.defaultIfEmpty(conf.rollover_indexMappingTotalFieldsLimit(), conf.monthlyIndex_indexMappingTotalFieldsLimit()) + ",\n" +
+ " \"max_docvalue_fields_search\" : " + StringUtils.defaultIfEmpty(conf.rollover_indexMaxDocValueFieldsSearch(), conf.monthlyIndex_indexMaxDocValueFieldsSearch()) + ",\n" +
+ " \"lifecycle.name\": \"" + (conf.index_prefix() + "-" + ROLLOVER_LIFECYCLE_NAME) + "\",\n" +
" \"lifecycle.rollover_alias\": \"" + rolloverAlias + "\"" +
"" +
" },\n" +
@@ -1481,10 +1221,10 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
createIndexRequest.settings("{\n" +
" \"index\" : {\n" +
- " \"number_of_shards\" : " + numberOfShards + ",\n" +
- " \"number_of_replicas\" : " + numberOfReplicas + ",\n" +
- " \"mapping.total_fields.limit\" : " + indexMappingTotalFieldsLimit + ",\n" +
- " \"max_docvalue_fields_search\" : " + indexMaxDocValueFieldsSearch + "\n" +
+ " \"number_of_shards\" : " + conf.numberOfShards() + ",\n" +
+ " \"number_of_replicas\" : " + conf.numberOfReplicas() + ",\n" +
+ " \"mapping.total_fields.limit\" : " + conf.indexMappingTotalFieldsLimit() + ",\n" +
+ " \"max_docvalue_fields_search\" : " + conf.indexMaxDocValueFieldsSearch() + "\n" +
" },\n" +
" \"analysis\": {\n" +
" \"analyzer\": {\n" +
@@ -1607,7 +1347,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
@Override
public Map<String, Map<String, Object>> getPropertiesMapping(final String itemType) {
- return new InClassLoaderExecute<Map<String, Map<String, Object>>>(metricsService, this.getClass().getName() + ".getPropertiesMapping", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
+ return new InClassLoaderExecute<Map<String, Map<String, Object>>>(metricsService, this.getClass().getName() + ".getPropertiesMapping", this.bundleContext, this.fatalIllegalStateErrors, conf.throwExceptions()) {
@SuppressWarnings("unchecked")
protected Map<String, Map<String, Object>> execute(Object... args) throws Exception {
// Get all mapping for current itemType
@@ -1707,7 +1447,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
public boolean saveQuery(final String queryName, final String query) {
- Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".saveQuery", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
+ Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".saveQuery", this.bundleContext, this.fatalIllegalStateErrors, conf.throwExceptions()) {
protected Boolean execute(Object... args) throws Exception {
//Index the query = register it in the percolator
try {
@@ -1742,7 +1482,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
@Override
public boolean removeQuery(final String queryName) {
- Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeQuery", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
+ Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeQuery", this.bundleContext, this.fatalIllegalStateErrors, conf.throwExceptions()) {
protected Boolean execute(Object... args) throws Exception {
//Index the query = register it in the percolator
try {
@@ -1817,22 +1557,22 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
@Override
public <T extends Item> PartialList<T> query(final Condition query, String sortBy, final Class<T> clazz, final int offset, final int size) {
- return query(conditionESQueryBuilderDispatcher.getQueryBuilder(query), sortBy, clazz, offset, size, null, null);
+ return query(conditionESQueryBuilderDispatcher.getQueryBuilder(query), sortBy, clazz, offset, size, null);
}
@Override
public <T extends Item> PartialList<T> query(final Condition query, String sortBy, final Class<T> clazz, final int offset, final int size, final String scrollTimeValidity) {
- return query(conditionESQueryBuilderDispatcher.getQueryBuilder(query), sortBy, clazz, offset, size, null, scrollTimeValidity);
+ return query(conditionESQueryBuilderDispatcher.getQueryBuilder(query), sortBy, clazz, offset, size, scrollTimeValidity);
}
@Override
public PartialList<CustomItem> queryCustomItem(final Condition query, String sortBy, final String customItemType, final int offset, final int size, final String scrollTimeValidity) {
- return query(conditionESQueryBuilderDispatcher.getQueryBuilder(query), sortBy, customItemType, offset, size, null, scrollTimeValidity);
+ return query(conditionESQueryBuilderDispatcher.getQueryBuilder(query), sortBy, customItemType, offset, size, scrollTimeValidity);
}
@Override
public <T extends Item> PartialList<T> queryFullText(final String fulltext, final Condition query, String sortBy, final Class<T> clazz, final int offset, final int size) {
- return query(QueryBuilders.boolQuery().must(QueryBuilders.queryStringQuery(fulltext)).must(conditionESQueryBuilderDispatcher.getQueryBuilder(query)), sortBy, clazz, offset, size, null, null);
+ return query(QueryBuilders.boolQuery().must(QueryBuilders.queryStringQuery(fulltext)).must(conditionESQueryBuilderDispatcher.getQueryBuilder(query)), sortBy, clazz, offset, size, null);
}
@Override
@@ -1842,22 +1582,22 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
@Override
public <T extends Item> List<T> query(final String fieldName, final String[] fieldValues, String sortBy, final Class<T> clazz) {
- return query(QueryBuilders.termsQuery(fieldName, ConditionContextHelper.foldToASCII(fieldValues)), sortBy, clazz, 0, -1, getRouting(fieldName, fieldValues, clazz), null).getList();
+ return query(QueryBuilders.termsQuery(fieldName, ConditionContextHelper.foldToASCII(fieldValues)), sortBy, clazz, 0, -1, null).getList();
}
@Override
public <T extends Item> PartialList<T> query(String fieldName, String fieldValue, String sortBy, Class<T> clazz, int offset, int size) {
- return query(termQuery(fieldName, ConditionContextHelper.foldToASCII(fieldValue)), sortBy, clazz, offset, size, getRouting(fieldName, new String[]{fieldValue}, clazz), null);
+ return query(termQuery(fieldName, ConditionContextHelper.foldToASCII(fieldValue)), sortBy, clazz, offset, size, null);
}
@Override
public <T extends Item> PartialList<T> queryFullText(String fieldName, String fieldValue, String fulltext, String sortBy, Class<T> clazz, int offset, int size) {
- return query(QueryBuilders.boolQuery().must(QueryBuilders.queryStringQuery(fulltext)).must(termQuery(fieldName, fieldValue)), sortBy, clazz, offset, size, getRouting(fieldName, new String[]{fieldValue}, clazz), null);
+ return query(QueryBuilders.boolQuery().must(QueryBuilders.queryStringQuery(fulltext)).must(termQuery(fieldName, fieldValue)), sortBy, clazz, offset, size, null);
}
@Override
public <T extends Item> PartialList<T> queryFullText(String fulltext, String sortBy, Class<T> clazz, int offset, int size) {
- return query(QueryBuilders.queryStringQuery(fulltext), sortBy, clazz, offset, size, null, null);
+ return query(QueryBuilders.queryStringQuery(fulltext), sortBy, clazz, offset, size, null);
}
@Override
@@ -1865,7 +1605,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
RangeQueryBuilder builder = QueryBuilders.rangeQuery(fieldName);
builder.from(from);
builder.to(to);
- return query(builder, sortBy, clazz, offset, size, null, null);
+ return query(builder, sortBy, clazz, offset, size, null);
}
@Override
@@ -1886,7 +1626,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
private long queryCount(final QueryBuilder filter, final String itemType) {
- return new InClassLoaderExecute<Long>(metricsService, this.getClass().getName() + ".queryCount", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
+ return new InClassLoaderExecute<Long>(metricsService, this.getClass().getName() + ".queryCount", this.bundleContext, this.fatalIllegalStateErrors, conf.throwExceptions()) {
@Override
protected Long execute(Object... args) throws IOException {
@@ -1901,16 +1641,16 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}.catchingExecuteInClassLoader(true);
}
- private <T extends Item> PartialList<T> query(final QueryBuilder query, final String sortBy, final Class<T> clazz, final int offset, final int size, final String[] routing, final String scrollTimeValidity) {
- return query(query, sortBy, clazz, null, offset, size, routing, scrollTimeValidity);
+ private <T extends Item> PartialList<T> query(final QueryBuilder query, final String sortBy, final Class<T> clazz, final int offset, final int size, final String scrollTimeValidity) {
+ return query(query, sortBy, clazz, null, offset, size, scrollTimeValidity);
}
- private PartialList<CustomItem> query(final QueryBuilder query, final String sortBy, final String customItemType, final int offset, final int size, final String[] routing, final String scrollTimeValidity) {
- return query(query, sortBy, CustomItem.class, customItemType, offset, size, routing, scrollTimeValidity);
+ private PartialList<CustomItem> query(final QueryBuilder query, final String sortBy, final String customItemType, final int offset, final int size, final String scrollTimeValidity) {
+ return query(query, sortBy, CustomItem.class, customItemType, offset, size, scrollTimeValidity);
}
- private <T extends Item> PartialList<T> query(final QueryBuilder query, final String sortBy, final Class<T> clazz, final String customItemType, final int offset, final int size, final String[] routing, final String scrollTimeValidity) {
- return new InClassLoaderExecute<PartialList<T>>(metricsService, this.getClass().getName() + ".query", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
+ private <T extends Item> PartialList<T> query(final QueryBuilder query, final String sortBy, final Class<T> clazz, final String customItemType, final int offset, final int size, final String scrollTimeValidity) {
+ return new InClassLoaderExecute<PartialList<T>>(metricsService, this.getClass().getName() + ".query", this.bundleContext, this.fatalIllegalStateErrors, conf.throwExceptions()) {
@Override
protected PartialList<T> execute(Object... args) throws Exception {
@@ -1929,7 +1669,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
.fetchSource(true)
.seqNoAndPrimaryTerm(true)
.query(query)
- .size(size < 0 ? defaultQueryLimit : size)
+ .size(size < 0 ? conf.defaultQueryLimit() : size)
.from(offset);
if (scrollTimeValidity != null) {
keepAlive = TimeValue.parseTimeValue(scrollTimeValidity, TimeValue.timeValueHours(1), "scrollTimeValidity");
@@ -1937,16 +1677,13 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
if (size == Integer.MIN_VALUE) {
- searchSourceBuilder.size(defaultQueryLimit);
+ searchSourceBuilder.size(conf.defaultQueryLimit());
} else if (size != -1) {
searchSourceBuilder.size(size);
} else {
// size == -1, use scroll query to retrieve all the results
searchRequest.scroll(keepAlive);
}
- if (routing != null) {
- searchRequest.routing(routing);
- }
if (sortBy != null) {
String[] sortByArray = sortBy.split(",");
for (String sortByElement : sortByArray) {
@@ -2039,7 +1776,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
@Override
public <T extends Item> PartialList<T> continueScrollQuery(final Class<T> clazz, final String scrollIdentifier, final String scrollTimeValidity) {
- return new InClassLoaderExecute<PartialList<T>>(metricsService, this.getClass().getName() + ".continueScrollQuery", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
+ return new InClassLoaderExecute<PartialList<T>>(metricsService, this.getClass().getName() + ".continueScrollQuery", this.bundleContext, this.fatalIllegalStateErrors, conf.throwExceptions()) {
@Override
protected PartialList<T> execute(Object... args) throws Exception {
@@ -2080,7 +1817,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
@Override
public PartialList<CustomItem> continueCustomItemScrollQuery(final String customItemType, final String scrollIdentifier, final String scrollTimeValidity) {
- return new InClassLoaderExecute<PartialList<CustomItem>>(metricsService, this.getClass().getName() + ".continueScrollQuery", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
+ return new InClassLoaderExecute<PartialList<CustomItem>>(metricsService, this.getClass().getName() + ".continueScrollQuery", this.bundleContext, this.fatalIllegalStateErrors, conf.throwExceptions()) {
@Override
protected PartialList<CustomItem> execute(Object... args) throws Exception {
@@ -2125,12 +1862,12 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
@Deprecated
@Override
public Map<String, Long> aggregateQuery(Condition filter, BaseAggregate aggregate, String itemType) {
- return aggregateQuery(filter, aggregate, itemType, false, aggregateQueryBucketSize);
+ return aggregateQuery(filter, aggregate, itemType, false, conf.aggregateQueryBucketSize());
}
@Override
public Map<String, Long> aggregateWithOptimizedQuery(Condition filter, BaseAggregate aggregate, String itemType) {
- return aggregateQuery(filter, aggregate, itemType, true, aggregateQueryBucketSize);
+ return aggregateQuery(filter, aggregate, itemType, true, conf.aggregateQueryBucketSize());
}
@Override
@@ -2140,7 +1877,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
private Map<String, Long> aggregateQuery(final Condition filter, final BaseAggregate aggregate, final String itemType,
final boolean optimizedQuery, int queryBucketSize) {
- return new InClassLoaderExecute<Map<String, Long>>(metricsService, this.getClass().getName() + ".aggregateQuery", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
+ return new InClassLoaderExecute<Map<String, Long>>(metricsService, this.getClass().getName() + ".aggregateQuery", this.bundleContext, this.fatalIllegalStateErrors, conf.throwExceptions()) {
@Override
protected Map<String, Long> execute(Object... args) throws IOException {
@@ -2253,10 +1990,10 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
- if (aggQueryMaxResponseSizeHttp != null) {
+ if (conf.aggQueryMaxResponseSizeHttp() > 0) {
builder.setHttpAsyncResponseConsumerFactory(
new HttpAsyncResponseConsumerFactory
- .HeapBufferedResponseConsumerFactory(aggQueryMaxResponseSizeHttp));
+ .HeapBufferedResponseConsumerFactory(conf.aggQueryMaxResponseSizeHttp()));
}
SearchResponse response = client.search(searchRequest, builder.build());
@@ -2282,7 +2019,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
if (aggregations.get("buckets") != null) {
- if (aggQueryThrowOnMissingDocs) {
+ if (conf.aggQueryThrowOnMissingDocs()) {
if (aggregations.get("buckets") instanceof Terms) {
Terms terms = aggregations.get("buckets");
if (terms.getDocCountError() > 0 || terms.getSumOfOtherDocCounts() > 0) {
@@ -2313,18 +2050,9 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}.catchingExecuteInClassLoader(true);
}
- private <T extends Item> String[] getRouting(String fieldName, String[] fieldValues, Class<T> clazz) {
- String itemType = Item.getItemType(clazz);
- String[] routing = null;
- if (routingByType.containsKey(itemType) && routingByType.get(itemType).equals(fieldName)) {
- routing = fieldValues;
- }
- return routing;
- }
-
@Override
public void refresh() {
- new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".refresh", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
+ new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".refresh", this.bundleContext, this.fatalIllegalStateErrors, conf.throwExceptions()) {
protected Boolean execute(Object... args) {
if (bulkProcessor != null) {
bulkProcessor.flush();
@@ -2341,7 +2069,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
@Override
public <T extends Item> void refreshIndex(Class<T> clazz, Date dateHint) {
- new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".refreshIndex", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
+ new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".refreshIndex", this.bundleContext, this.fatalIllegalStateErrors, conf.throwExceptions()) {
protected Boolean execute(Object... args) {
try {
String itemType = Item.getItemType(clazz);
@@ -2358,7 +2086,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
@Override
public void purge(final Date date) {
- new InClassLoaderExecute<Object>(metricsService, this.getClass().getName() + ".purgeWithDate", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
+ new InClassLoaderExecute<Object>(metricsService, this.getClass().getName() + ".purgeWithDate", this.bundleContext, this.fatalIllegalStateErrors, conf.throwExceptions()) {
@Override
protected Object execute(Object... args) throws Exception {
@@ -2394,7 +2122,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
@Override
public void purge(final String scope) {
- new InClassLoaderExecute<Void>(metricsService, this.getClass().getName() + ".purgeWithScope", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
+ new InClassLoaderExecute<Void>(metricsService, this.getClass().getName() + ".purgeWithScope", this.bundleContext, this.fatalIllegalStateErrors, conf.throwExceptions()) {
@Override
protected Void execute(Object... args) throws IOException {
QueryBuilder query = termQuery("scope", scope);
@@ -2446,7 +2174,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
@Override
public Map<String, Double> getSingleValuesMetrics(final Condition condition, final String[] metrics, final String field, final String itemType) {
- return new InClassLoaderExecute<Map<String, Double>>(metricsService, this.getClass().getName() + ".getSingleValuesMetrics", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
+ return new InClassLoaderExecute<Map<String, Double>>(metricsService, this.getClass().getName() + ".getSingleValuesMetrics", this.bundleContext, this.fatalIllegalStateErrors, conf.throwExceptions()) {
@Override
protected Map<String, Double> execute(Object... args) throws IOException {
@@ -2582,7 +2310,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
private String getAllIndexForQuery() {
- return indexPrefix + "*";
+ return conf.index_prefix() + "*";
}
private String getIndexNameForQuery(String itemType) {
@@ -2590,11 +2318,11 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
private String getRolloverIndexForQuery(String itemType) {
- return indexPrefix + "-" + itemType.toLowerCase() + "-*";
+ return conf.index_prefix() + "-" + itemType.toLowerCase() + "-*";
}
private String getIndex(String indexItemTypePart) {
- return (indexPrefix + "-" + indexItemTypePart).toLowerCase();
+ return (conf.index_prefix() + "-" + indexItemTypePart).toLowerCase();
}
private boolean isItemTypeRollingOver(String itemType) {
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilderDispatcher.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilderDispatcher.java
index 6bc8b5062..85824976f 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilderDispatcher.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilderDispatcher.java
@@ -21,35 +21,39 @@ import org.apache.unomi.api.conditions.Condition;
import org.apache.unomi.scripting.ScriptExecutor;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.component.annotations.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
+@Component(service = ConditionESQueryBuilderDispatcher.class)
public class ConditionESQueryBuilderDispatcher {
private static final Logger logger = LoggerFactory.getLogger(ConditionESQueryBuilderDispatcher.class.getName());
- private Map<String, ConditionESQueryBuilder> queryBuilders = new ConcurrentHashMap<>();
+ private final Map<String, ConditionESQueryBuilder> queryBuilders = new ConcurrentHashMap<>();
private ScriptExecutor scriptExecutor;
- public ConditionESQueryBuilderDispatcher() {
- }
-
+ @Reference
public void setScriptExecutor(ScriptExecutor scriptExecutor) {
this.scriptExecutor = scriptExecutor;
}
- public void addQueryBuilder(String name, ConditionESQueryBuilder evaluator) {
- queryBuilders.put(name, evaluator);
+ @Reference(service = ConditionESQueryBuilder.class, policy = ReferencePolicy.DYNAMIC, policyOption = ReferencePolicyOption.GREEDY,
+ cardinality = ReferenceCardinality.MULTIPLE, unbind = "unbindConditionESQueryBuilder")
+ public void setConditionESQueryBuilders(ConditionESQueryBuilder conditionESQueryBuilder, ServiceReference<ConditionESQueryBuilder> conditionESQueryBuilderServiceReference) {
+ this.queryBuilders.put(conditionESQueryBuilderServiceReference.getProperty("queryBuilderId").toString(), conditionESQueryBuilder);
}
- public void removeQueryBuilder(String name) {
- queryBuilders.remove(name);
+ public void unbindConditionESQueryBuilder(ServiceReference<ConditionESQueryBuilder> conditionESQueryBuilderServiceReference) {
+ if (conditionESQueryBuilderServiceReference == null) {
+ return;
+ }
+ queryBuilders.remove(conditionESQueryBuilderServiceReference.getProperty("queryBuilderId").toString());
}
-
public String getQuery(Condition condition) {
return "{\"query\": " + getQueryBuilder(condition).toString() + "}";
}
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionEvaluatorDispatcher.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionEvaluatorDispatcher.java
index cd0bb35f7..2faa3eb97 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionEvaluatorDispatcher.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionEvaluatorDispatcher.java
@@ -22,6 +22,8 @@ import org.apache.unomi.api.conditions.Condition;
import org.apache.unomi.metrics.MetricAdapter;
import org.apache.unomi.metrics.MetricsService;
import org.apache.unomi.scripting.ScriptExecutor;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.component.annotations.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,28 +34,36 @@ import java.util.concurrent.ConcurrentHashMap;
/**
* Entry point for condition evaluation. Will dispatch to all evaluators.
*/
+@Component(service = ConditionEvaluatorDispatcher.class)
public class ConditionEvaluatorDispatcher {
private static final Logger logger = LoggerFactory.getLogger(ConditionEvaluatorDispatcher.class.getName());
- private Map<String, ConditionEvaluator> evaluators = new ConcurrentHashMap<>();
+ private final Map<String, ConditionEvaluator> evaluators = new ConcurrentHashMap<>();
private MetricsService metricsService;
private ScriptExecutor scriptExecutor;
+ @Reference(service = ConditionEvaluator.class, policy = ReferencePolicy.DYNAMIC, policyOption = ReferencePolicyOption.GREEDY,
+ cardinality = ReferenceCardinality.MULTIPLE, unbind = "unbindConditionEvaluator")
+ public void setConditionEvaluators(ConditionEvaluator conditionEvaluator, ServiceReference<ConditionEvaluator> conditionEvaluatorServiceReference) {
+ this.evaluators.put(conditionEvaluatorServiceReference.getProperty("conditionEvaluatorId").toString(), conditionEvaluator);
+ }
+
+ @Reference
public void setMetricsService(MetricsService metricsService) {
this.metricsService = metricsService;
}
+ @Reference
public void setScriptExecutor(ScriptExecutor scriptExecutor) {
this.scriptExecutor = scriptExecutor;
}
- public void addEvaluator(String name, ConditionEvaluator evaluator) {
- evaluators.put(name, evaluator);
- }
-
- public void removeEvaluator(String name) {
- evaluators.remove(name);
+ public void unbindConditionEvaluator(ServiceReference<ConditionEvaluator> conditionEvaluatorServiceReference) {
+ if (conditionEvaluatorServiceReference == null) {
+ return;
+ }
+ evaluators.remove(conditionEvaluatorServiceReference.getProperty("conditionEvaluatorId").toString());
}
public boolean eval(Condition condition, Item item) {
diff --git a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
deleted file mode 100644
index 83d4ecd41..000000000
--- a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ /dev/null
@@ -1,183 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<blueprint xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.1.0"
- xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
- xsi:schemaLocation="http://www.osgi.org/xmlns/blueprint/v1.0.0 http://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd
-
-
-
-
-
- http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.1.0 http://aries.apache.org/schemas/blueprint-cm/blueprint-cm-1.1.0.xsd">
-
- <cm:property-placeholder persistent-id="org.apache.unomi.persistence.elasticsearch"
- update-strategy="reload" placeholder-prefix="${es.">
- <cm:default-properties>
- <cm:property name="cluster.name" value="contextElasticSearch"/>
- <cm:property name="elasticSearchAddresses" value="localhost:9200"/>
- <cm:property name="index.prefix" value="context"/>
- <cm:property name="numberOfShards" value="5"/>
- <cm:property name="numberOfReplicas" value="0"/>
- <cm:property name="indexMappingTotalFieldsLimit" value="1000"/>
- <cm:property name="indexMaxDocValueFieldsSearch" value="1000"/>
- <cm:property name="monthlyIndex.numberOfShards" value="3"/>
- <cm:property name="monthlyIndex.numberOfReplicas" value="0"/>
- <cm:property name="monthlyIndex.indexMappingTotalFieldsLimit" value="1000"/>
- <cm:property name="monthlyIndex.indexMaxDocValueFieldsSearch" value="1000"/>
- <cm:property name="monthlyIndex.itemsMonthlyIndexedOverride" value="event,session"/>
- <cm:property name="defaultQueryLimit" value="10"/>
-
- <cm:property name="bulkProcessor.concurrentRequests" value="1" />
- <cm:property name="bulkProcessor.bulkActions" value="1000" />
- <cm:property name="bulkProcessor.bulkSize" value="5MB" />
- <cm:property name="bulkProcessor.flushInterval" value="5s" />
- <cm:property name="bulkProcessor.backoffPolicy" value="exponential" />
-
- <cm:property name="rollover.indices" value="" />
- <cm:property name="rollover.maxSize" value="" />
- <cm:property name="rollover.maxAge" value="365d" />
- <cm:property name="rollover.maxDocs" value="" />
- <cm:property name="rollover.numberOfShards" value=""/>
- <cm:property name="rollover.numberOfReplicas" value=""/>
- <cm:property name="rollover.indexMappingTotalFieldsLimit" value=""/>
- <cm:property name="rollover.indexMaxDocValueFieldsSearch" value=""/>
-
- <cm:property name="minimalElasticSearchVersion" value="7.0.0" />
- <cm:property name="maximalElasticSearchVersion" value="8.0.0" />
-
- <cm:property name="aggregateQueryBucketSize" value="5000" />
- <cm:property name="clientSocketTimeout" value="" />
- <cm:property name="aggQueryMaxResponseSizeHttp" value="" />
- <cm:property name="aggQueryThrowOnMissingDocs" value="false" />
- <cm:property name="itemTypeToRefreshPolicy" value="" />
- <cm:property name="useBatchingForSave" value="false" />
- <cm:property name="useBatchingForUpdate" value="true" />
-
- <cm:property name="username" value="" />
- <cm:property name="password" value="" />
- <cm:property name="sslEnable" value="false" />
- <cm:property name="sslTrustAllCertificates" value="false" />
- <cm:property name="throwExceptions" value="false" />
- <cm:property name="alwaysOverwrite" value="true" />
- <cm:property name="errorLogLevelRestClient" value="true" />
-
- </cm:default-properties>
- </cm:property-placeholder>
-
- <reference id="metricsService" interface="org.apache.unomi.metrics.MetricsService" />
- <reference id="scriptExecutor" interface="org.apache.unomi.scripting.ScriptExecutor" />
-
- <service id="elasticSearchPersistenceService" ref="elasticSearchPersistenceServiceImpl">
- <interfaces>
- <value>org.apache.unomi.persistence.spi.PersistenceService</value>
- <value>org.osgi.framework.SynchronousBundleListener</value>
- </interfaces>
- </service>
-
- <bean id="conditionESQueryBuilderDispatcher"
- class="org.apache.unomi.persistence.elasticsearch.conditions.ConditionESQueryBuilderDispatcher">
- <property name="scriptExecutor" ref="scriptExecutor" />
- </bean>
-
- <bean id="conditionEvaluatorDispatcherImpl"
- class="org.apache.unomi.persistence.elasticsearch.conditions.ConditionEvaluatorDispatcher">
- <property name="metricsService" ref="metricsService" />
- <property name="scriptExecutor" ref="scriptExecutor" />
- </bean>
-
- <bean id="elasticSearchPersistenceServiceImpl"
- class="org.apache.unomi.persistence.elasticsearch.ElasticSearchPersistenceServiceImpl"
- init-method="start"
- destroy-method="stop">
- <property name="bundleContext" ref="blueprintBundleContext"/>
- <property name="conditionEvaluatorDispatcher" ref="conditionEvaluatorDispatcherImpl"/>
- <property name="conditionESQueryBuilderDispatcher" ref="conditionESQueryBuilderDispatcher"/>
- <property name="clusterName" value="${es.cluster.name}"/>
- <property name="indexPrefix" value="${es.index.prefix}"/>
- <property name="monthlyIndexNumberOfShards" value="${es.monthlyIndex.numberOfShards}"/>
- <property name="monthlyIndexNumberOfReplicas" value="${es.monthlyIndex.numberOfReplicas}"/>
- <property name="monthlyIndexMappingTotalFieldsLimit" value="${es.monthlyIndex.indexMappingTotalFieldsLimit}"/>
- <property name="monthlyIndexMaxDocValueFieldsSearch" value="${es.monthlyIndex.indexMaxDocValueFieldsSearch}"/>
- <property name="numberOfShards" value="${es.numberOfShards}"/>
- <property name="numberOfReplicas" value="${es.numberOfReplicas}"/>
- <property name="indexMappingTotalFieldsLimit" value="${es.indexMappingTotalFieldsLimit}"/>
- <property name="indexMaxDocValueFieldsSearch" value="${es.indexMaxDocValueFieldsSearch}"/>
- <property name="elasticSearchAddresses" value="${es.elasticSearchAddresses}"/>
- <property name="fatalIllegalStateErrors" value="${es.fatalIllegalStateErrors}"/>
- <property name="defaultQueryLimit" value="${es.defaultQueryLimit}"/>
- <property name="itemsMonthlyIndexedOverride" value="${es.monthlyIndex.itemsMonthlyIndexedOverride}" />
- <property name="routingByType">
- <map>
- </map>
- </property>
- <property name="bulkProcessorConcurrentRequests" value="${es.bulkProcessor.concurrentRequests}" />
- <property name="bulkProcessorBulkActions" value="${es.bulkProcessor.bulkActions}" />
- <property name="bulkProcessorBulkSize" value="${es.bulkProcessor.bulkSize}" />
- <property name="bulkProcessorFlushInterval" value="${es.bulkProcessor.flushInterval}" />
- <property name="bulkProcessorBackoffPolicy" value="${es.bulkProcessor.backoffPolicy}" />
-
- <property name="rolloverIndices" value="${es.rollover.indices}" />
- <property name="rolloverMaxSize" value="${es.rollover.maxSize}" />
- <property name="rolloverMaxAge" value="${es.rollover.maxAge}" />
- <property name="rolloverMaxDocs" value="${es.rollover.maxDocs}" />
- <property name="rolloverIndexNumberOfShards" value="${es.rollover.numberOfShards}"/>
- <property name="rolloverIndexNumberOfReplicas" value="${es.rollover.numberOfReplicas}"/>
- <property name="rolloverIndexMappingTotalFieldsLimit" value="${es.rollover.indexMappingTotalFieldsLimit}"/>
- <property name="rolloverIndexMaxDocValueFieldsSearch" value="${es.rollover.indexMaxDocValueFieldsSearch}"/>
-
- <property name="minimalElasticSearchVersion" value="${es.minimalElasticSearchVersion}" />
- <property name="maximalElasticSearchVersion" value="${es.maximalElasticSearchVersion}" />
-
- <property name="aggregateQueryBucketSize" value="${es.aggregateQueryBucketSize}" />
- <property name="aggQueryMaxResponseSizeHttp" value="${es.aggQueryMaxResponseSizeHttp}" />
- <property name="aggQueryThrowOnMissingDocs" value="${es.aggQueryThrowOnMissingDocs}" />
- <property name="itemTypeToRefreshPolicy" value="${es.itemTypeToRefreshPolicy}" />
-
- <property name="clientSocketTimeout" value="${es.clientSocketTimeout}" />
-
- <property name="metricsService" ref="metricsService" />
- <property name="useBatchingForSave" value="${es.useBatchingForSave}" />
- <property name="useBatchingForUpdate" value="${es.useBatchingForUpdate}" />
-
- <property name="username" value="${es.username}" />
- <property name="password" value="${es.password}" />
- <property name="sslEnable" value="${es.sslEnable}" />
- <property name="sslTrustAllCertificates" value="${es.sslTrustAllCertificates}" />
- <property name="throwExceptions" value="${es.throwExceptions}" />
- <property name="alwaysOverwrite" value="${es.alwaysOverwrite}" />
- <property name="logLevelRestClient" value="${es.logLevelRestClient}" />
- </bean>
-
- <!-- We use a listener here because using the list directly for listening to proxies coming from the same bundle didn't seem to work -->
- <reference-list id="conditionEvaluators"
- interface="org.apache.unomi.persistence.elasticsearch.conditions.ConditionEvaluator"
- availability="optional">
- <reference-listener
- bind-method="bindConditionEvaluator" unbind-method="unbindConditionEvaluator" ref="elasticSearchPersistenceServiceImpl"/>
- </reference-list>
-
- <reference-list id="conditionESQueryBuilders"
- interface="org.apache.unomi.persistence.elasticsearch.conditions.ConditionESQueryBuilder"
- availability="optional">
- <reference-listener
- bind-method="bindConditionESQueryBuilder" unbind-method="unbindConditionESQueryBuilder" ref="elasticSearchPersistenceServiceImpl"/>
- </reference-list>
-
-</blueprint>