You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by sh...@apache.org on 2017/01/09 16:22:21 UTC
[47/50] [abbrv] incubator-unomi git commit: - DMF-1133 Error when
displaying an empty list in MF : fixed issue for real this time :) - Some
configuration file cleanup - Changed inactive user purging from 30 days to
180 days
- DMF-1133 Error when displaying an empty list in MF : fixed issue for real this time :)
- Some configuration file cleanup
- Changed inactive user purging from 30 days to 180 days
Signed-off-by: Serge Huber <sh...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/a5b7b156
Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/a5b7b156
Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/a5b7b156
Branch: refs/heads/master
Commit: a5b7b15650afd5b9086eedfce8fa406c8095d580
Parents: b812349
Author: Serge Huber <sh...@apache.org>
Authored: Fri Jan 6 11:15:28 2017 +0100
Committer: Serge Huber <sh...@apache.org>
Committed: Fri Jan 6 11:15:28 2017 +0100
----------------------------------------------------------------------
README.md | 56 +--
.../unomi/api/services/ClusterService.java | 1 -
kar/src/main/feature/feature.xml | 4 +-
package/pom.xml | 13 +-
persistence-elasticsearch/core/pom.xml | 20 -
.../ElasticSearchPersistenceServiceImpl.java | 397 +++++--------------
.../resources/OSGI-INF/blueprint/blueprint.xml | 21 +-
.../core/src/main/resources/elasticsearch.yml | 110 -----
.../core/src/main/resources/hazelcast.xml | 219 ----------
...g.apache.unomi.persistence.elasticsearch.cfg | 10 +-
.../persistence/spi/PersistenceService.java | 8 +
pom.xml | 2 -
services/pom.xml | 22 +
.../services/services/ClusterServiceImpl.java | 304 ++++++++++++++
.../resources/OSGI-INF/blueprint/blueprint.xml | 45 ++-
services/src/main/resources/hazelcast.xml | 219 ++++++++++
.../main/resources/org.apache.unomi.cluster.cfg | 20 +
.../resources/org.apache.unomi.services.cfg | 2 +-
src/site/markdown/clustering.md | 32 +-
src/site/markdown/configuration.md | 27 +-
20 files changed, 739 insertions(+), 793 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/a5b7b156/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index b73efa4..fd6e1d7 100644
--- a/README.md
+++ b/README.md
@@ -116,16 +116,9 @@ with the following contents:
cluster.name=contextElasticSearch
index.name=context
- elasticSearchConfig=file:${karaf.etc}/elasticsearch.yml
And replace the cluster.name parameter here by your cluster name.
-
-You can also put an elasticsearch configuration file in $MY_KARAF_HOME/etc/elasticsearch.yml ,
-and put any standard Elasticsearch configuration options in this last file.
-
-If you want your context server to be a client only on a cluster of elasticsearch nodes, just set the node.data property
-to false.
-
+
Secured events configuration
---------------------------
@@ -260,23 +253,16 @@ servers on the same network, and enable the discovery protocol in $MY_KARAF_HOME
All nodes on the same network, sharing the same cluster name will be part of the same cluster.
-###Recommended configurations
-
-It is recommended to have one node dedicated to the context server, where the other nodes take care of the
-Elasticsearch persistence. The node dedicated to the context server will have node.data set to false.
-
#### 2 nodes configuration
One node dedicated to context server, 1 node for elasticsearch storage.
Node A :
- node.data=true
numberOfReplicas=0
monthlyIndex.numberOfReplicas=0
Node B :
- node.data=false
numberOfReplicas=0
monthlyIndex.numberOfReplicas=0
@@ -285,39 +271,19 @@ One node dedicated to context server, 2 nodes for elasticsearch storage with fau
Node A :
- node.data=false
numberOfReplicas=1
monthlyIndex.numberOfReplicas=1
Node B :
- node.data=true
numberOfReplicas=1
monthlyIndex.numberOfReplicas=1
Node C :
- node.data=true
numberOfReplicas=1
monthlyIndex.numberOfReplicas=1
-### Specific configuration
-If multicast is not allowed on your network, you'll need to switch to unicast protocol and manually configure the server IPs. This can be
-done by disabling the elasticsearch automatic discovery in $MY_KARAF_HOME/etc/org.apache.unomi.persistence.elasticsearch.cfg :
-
- discovery.zen.ping.multicast.enabled=false
-
-
-And then set the property discovery.zen.ping.unicast.hosts in $MY_KARAF_HOME/etc/elasticsearch.yml files :
-
-
- discovery.zen.ping.unicast.hosts: [\u2018192.168.0.1:9300', \u2018192.168.0.2:9300']
-
-
-More information and configuration options can be found at :
-[https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-discovery.html](https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-discovery.html)
-
-
JDK Selection on Mac OS X
-------------------------
@@ -415,20 +381,6 @@ Of course any ports listed here are the default ports configured in each server,
Step 2 : Adjust the Context Server IP filtering
-By default the Context Server limits to connections to port 9200 and 9300 to the following IP ranges
-
- - localhost
- - 127.0.0.1
- - ::1
- - the current subnet (i.e., 192.168.1.0-192.168.1.255)
-
-(this is done using a custom plugin for Elasticsearch, that you may find here :
-https://git-wip-us.apache.org/repos/asf/incubator-unomi/context-server/persistence-elasticsearch/plugins/security)
-
-You can adjust this setting by using the following setting in the $MY_KARAF_HOME/etc/elasticsearch.yml file :
-
- security.ipranges: localhost,127.0.0.1,::1,10.0.1.0-10.0.1.255
-
Step 3 : Follow industry recommended best practices for securing Elasticsearch
You may find more valuable recommendations here :
@@ -463,9 +415,3 @@ To upload the site to the Apache website, simply run after the above command has
This operation takes a little bit of time, so don't interrupt it even if you're waiting for a while for it to complete
(usually takes about 16 minutes !)
-
-Todo
-----
-
-- Look at possible integration with newsletter management systems such as MailChimp, for example to synchronize profile data with collected info.
-- Integrate with machine learning implementations such as Prediction.io or Apache Mahout
http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/a5b7b156/api/src/main/java/org/apache/unomi/api/services/ClusterService.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/unomi/api/services/ClusterService.java b/api/src/main/java/org/apache/unomi/api/services/ClusterService.java
index 37fcd37..b851b78 100644
--- a/api/src/main/java/org/apache/unomi/api/services/ClusterService.java
+++ b/api/src/main/java/org/apache/unomi/api/services/ClusterService.java
@@ -25,7 +25,6 @@ import java.util.List;
/**
* A service to access information about the context server's cluster.
*
- * TODO: rename to something less specific like ContextRuntimeService?
*/
public interface ClusterService {
http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/a5b7b156/kar/src/main/feature/feature.xml
----------------------------------------------------------------------
diff --git a/kar/src/main/feature/feature.xml b/kar/src/main/feature/feature.xml
index 8e6ddb2..fa3ef1f 100644
--- a/kar/src/main/feature/feature.xml
+++ b/kar/src/main/feature/feature.xml
@@ -29,10 +29,10 @@
<configfile finalname="/etc/org.apache.unomi.plugins.request.cfg">mvn:org.apache.unomi/unomi-plugins-request/${project.version}/cfg/requestcfg</configfile>
<configfile finalname="/etc/org.apache.unomi.services.cfg">mvn:org.apache.unomi/unomi-services/${project.version}/cfg/servicescfg</configfile>
<configfile finalname="/etc/org.apache.unomi.thirdparty.cfg">mvn:org.apache.unomi/unomi-services/${project.version}/cfg/thirdpartycfg</configfile>
+ <configfile finalname="/etc/org.apache.unomi.cluster.cfg">mvn:org.apache.unomi/unomi-services/${project.version}/cfg/clustercfg</configfile>
+ <configfile finalname="/etc/hazelcast.xml">mvn:org.apache.unomi/unomi-services/${project.version}/xml/hazelcastconfig</configfile>
<configfile finalname="/etc/org.apache.unomi.privacy.cfg">mvn:org.apache.unomi/cxs-privacy-extension-services/${project.version}/cfg/privacycfg</configfile>
<configfile finalname="/etc/org.apache.unomi.geonames.cfg">mvn:org.apache.unomi/cxs-geonames-services/${project.version}/cfg/geonamescfg</configfile>
- <configfile finalname="/etc/elasticsearch.yml">mvn:org.apache.unomi/unomi-persistence-elasticsearch-core/${project.version}/yml/elasticsearchconfig</configfile>
- <configfile finalname="/etc/hazelcast.xml">mvn:org.apache.unomi/unomi-persistence-elasticsearch-core/${project.version}/xml/hazelcastconfig</configfile>
<bundle start-level="75">mvn:commons-io/commons-io/2.4</bundle>
<bundle start-level="75">mvn:com.fasterxml.jackson.core/jackson-core/${version.jackson.core}</bundle>
<bundle start-level="75">mvn:com.fasterxml.jackson.core/jackson-databind/${version.jackson.core}</bundle>
http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/a5b7b156/package/pom.xml
----------------------------------------------------------------------
diff --git a/package/pom.xml b/package/pom.xml
index 104723e..fd630fd 100644
--- a/package/pom.xml
+++ b/package/pom.xml
@@ -215,18 +215,7 @@
</artifactItem>
<artifactItem>
<groupId>org.apache.unomi</groupId>
- <artifactId>unomi-persistence-elasticsearch-core</artifactId>
- <version>${project.version}</version>
- <classifier>elasticsearchconfig</classifier>
- <type>yml</type>
- <outputDirectory>
- ${project.build.directory}/assembly/etc
- </outputDirectory>
- <destFileName>elasticsearch.yml</destFileName>
- </artifactItem>
- <artifactItem>
- <groupId>org.apache.unomi</groupId>
- <artifactId>unomi-persistence-elasticsearch-core</artifactId>
+ <artifactId>unomi-services</artifactId>
<version>${project.version}</version>
<classifier>hazelcastconfig</classifier>
<type>xml</type>
http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/a5b7b156/persistence-elasticsearch/core/pom.xml
----------------------------------------------------------------------
diff --git a/persistence-elasticsearch/core/pom.xml b/persistence-elasticsearch/core/pom.xml
index f4b2a24..ad96cfb 100644
--- a/persistence-elasticsearch/core/pom.xml
+++ b/persistence-elasticsearch/core/pom.xml
@@ -54,16 +54,6 @@
<scope>provided</scope>
</dependency>
<dependency>
- <groupId>org.apache.karaf.cellar</groupId>
- <artifactId>org.apache.karaf.cellar.core</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.karaf.cellar</groupId>
- <artifactId>org.apache.karaf.cellar.config</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>20.0</version>
@@ -255,16 +245,6 @@
<type>cfg</type>
<classifier>elasticsearchcfg</classifier>
</artifact>
- <artifact>
- <file>src/main/resources/elasticsearch.yml</file>
- <type>yml</type>
- <classifier>elasticsearchconfig</classifier>
- </artifact>
- <artifact>
- <file>src/main/resources/hazelcast.xml</file>
- <type>xml</type>
- <classifier>hazelcastconfig</classifier>
- </artifact>
</artifacts>
</configuration>
</execution>
http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/a5b7b156/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
----------------------------------------------------------------------
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index f98854b..7abcbd5 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -18,13 +18,6 @@
package org.apache.unomi.persistence.elasticsearch;
import org.apache.commons.lang3.StringUtils;
-import org.apache.karaf.cellar.config.ClusterConfigurationEvent;
-import org.apache.karaf.cellar.config.Constants;
-import org.apache.karaf.cellar.core.*;
-import org.apache.karaf.cellar.core.control.SwitchStatus;
-import org.apache.karaf.cellar.core.event.EventProducer;
-import org.apache.karaf.cellar.core.event.EventType;
-import org.apache.unomi.api.ClusterNode;
import org.apache.unomi.api.Item;
import org.apache.unomi.api.PartialList;
import org.apache.unomi.api.TimestampedItem;
@@ -32,11 +25,12 @@ import org.apache.unomi.api.conditions.Condition;
import org.apache.unomi.api.query.DateRange;
import org.apache.unomi.api.query.IpRange;
import org.apache.unomi.api.query.NumericRange;
-import org.apache.unomi.api.services.ClusterService;
import org.apache.unomi.persistence.elasticsearch.conditions.*;
import org.apache.unomi.persistence.spi.CustomObjectMapper;
import org.apache.unomi.persistence.spi.PersistenceService;
import org.apache.unomi.persistence.spi.aggregate.*;
+import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
+import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
@@ -83,22 +77,13 @@ import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.osgi.framework.*;
-import org.osgi.service.cm.ConfigurationAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.management.*;
-import javax.management.remote.JMXConnector;
-import javax.management.remote.JMXConnectorFactory;
-import javax.management.remote.JMXServiceURL;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
-import java.lang.management.ManagementFactory;
-import java.lang.management.OperatingSystemMXBean;
-import java.lang.management.RuntimeMXBean;
import java.net.InetAddress;
-import java.net.MalformedURLException;
import java.net.URL;
import java.net.UnknownHostException;
import java.text.ParseException;
@@ -108,7 +93,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@SuppressWarnings("rawtypes")
-public class ElasticSearchPersistenceServiceImpl implements PersistenceService, ClusterService, SynchronousBundleListener {
+public class ElasticSearchPersistenceServiceImpl implements PersistenceService, SynchronousBundleListener {
private static final Logger logger = LoggerFactory.getLogger(ElasticSearchPersistenceServiceImpl.class.getName());
@@ -116,18 +101,17 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
public static final String CONTEXTSERVER_PORT = "contextserver.port";
public static final String CONTEXTSERVER_SECURE_ADDRESS = "contextserver.secureAddress";
public static final String CONTEXTSERVER_SECURE_PORT = "contextserver.securePort";
+
public static final String NUMBER_OF_SHARDS = "number_of_shards";
public static final String NUMBER_OF_REPLICAS = "number_of_replicas";
public static final String CLUSTER_NAME = "cluster.name";
+
public static final String BULK_PROCESSOR_NAME = "bulkProcessor.name";
public static final String BULK_PROCESSOR_CONCURRENT_REQUESTS = "bulkProcessor.concurrentRequests";
public static final String BULK_PROCESSOR_BULK_ACTIONS = "bulkProcessor.bulkActions";
public static final String BULK_PROCESSOR_BULK_SIZE = "bulkProcessor.bulkSize";
public static final String BULK_PROCESSOR_FLUSH_INTERVAL = "bulkProcessor.flushInterval";
public static final String BULK_PROCESSOR_BACKOFF_POLICY = "bulkProcessor.backoffPolicy";
- public static final String KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION = "org.apache.unoni.nodes";
- public static final String KARAF_CLUSTER_CONFIGURATION_PUBLIC_ENDPOINTS = "publicEndpoints";
- public static final String KARAF_CLUSTER_CONFIGURATION_SECURE_ENDPOINTS = "secureEndpoints";
private Client client;
private BulkProcessor bulkProcessor;
@@ -137,19 +121,10 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
private String monthlyIndexNumberOfReplicas;
private String numberOfShards;
private String numberOfReplicas;
- private String elasticSearchConfig = null;
private BundleContext bundleContext;
private Map<String, String> mappings = new HashMap<String, String>();
private ConditionEvaluatorDispatcher conditionEvaluatorDispatcher;
private ConditionESQueryBuilderDispatcher conditionESQueryBuilderDispatcher;
- private ClusterManager karafCellarClusterManager;
- private EventProducer karafCellarEventProducer;
- private GroupManager karafCellarGroupManager;
- private String karafCellarGroupName = Configurations.DEFAULT_GROUP_NAME;
- private ConfigurationAdmin osgiConfigurationAdmin;
- private String karafJMXUsername = "karaf";
- private String karafJMXPassword = "karaf";
- private int karafJMXPort = 1099;
private Map<String,String> indexNames;
private List<String> itemsMonthlyIndexed;
@@ -171,6 +146,9 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
private String bulkProcessorFlushInterval = "5s";
private String bulkProcessorBackoffPolicy = "exponential";
+ private String minimalElasticSearchVersion = "5.0.0";
+ private String maximalElasticSearchVersion = "5.2.0";
+
private Map<String, Map<String, Map<String, Object>>> knownMappings = new HashMap<>();
public void setBundleContext(BundleContext bundleContext) {
@@ -233,10 +211,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
this.routingByType = routingByType;
}
- public void setElasticSearchConfig(String elasticSearchConfig) {
- this.elasticSearchConfig = elasticSearchConfig;
- }
-
public void setConditionEvaluatorDispatcher(ConditionEvaluatorDispatcher conditionEvaluatorDispatcher) {
this.conditionEvaluatorDispatcher = conditionEvaluatorDispatcher;
}
@@ -269,45 +243,21 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
this.bulkProcessorBackoffPolicy = bulkProcessorBackoffPolicy;
}
- public void setKarafCellarClusterManager(ClusterManager karafCellarClusterManager) {
- this.karafCellarClusterManager = karafCellarClusterManager;
+ public void setMinimalElasticSearchVersion(String minimalElasticSearchVersion) {
+ this.minimalElasticSearchVersion = minimalElasticSearchVersion;
}
- public void setKarafCellarEventProducer(EventProducer karafCellarEventProducer) {
- this.karafCellarEventProducer = karafCellarEventProducer;
+ public void setMaximalElasticSearchVersion(String maximalElasticSearchVersion) {
+ this.maximalElasticSearchVersion = maximalElasticSearchVersion;
}
- public void setKarafCellarGroupManager(GroupManager karafCellarGroupManager) {
- this.karafCellarGroupManager = karafCellarGroupManager;
- }
-
- public void setKarafCellarGroupName(String karafCellarGroupName) {
- this.karafCellarGroupName = karafCellarGroupName;
- }
-
- public void setOsgiConfigurationAdmin(ConfigurationAdmin osgiConfigurationAdmin) {
- this.osgiConfigurationAdmin = osgiConfigurationAdmin;
- }
-
- public void setKarafJMXUsername(String karafJMXUsername) {
- this.karafJMXUsername = karafJMXUsername;
- }
-
- public void setKarafJMXPassword(String karafJMXPassword) {
- this.karafJMXPassword = karafJMXPassword;
- }
-
- public void setKarafJMXPort(int karafJMXPort) {
- this.karafJMXPort = karafJMXPort;
- }
-
- public void start() {
+ public void start() throws Exception {
loadPredefinedMappings(bundleContext, false);
// on startup
new InClassLoaderExecute<Object>() {
- public Object execute(Object... args) {
+ public Object execute(Object... args) throws Exception {
logger.info("Connecting to ElasticSearch persistence backend using cluster name " + clusterName + " and index name " + indexName + "...");
address = System.getProperty(CONTEXTSERVER_ADDRESS, address);
@@ -315,51 +265,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
secureAddress = System.getProperty(CONTEXTSERVER_SECURE_ADDRESS, secureAddress);
securePort = System.getProperty(CONTEXTSERVER_SECURE_PORT, securePort);
- if (karafCellarEventProducer != null && karafCellarClusterManager != null) {
-
- boolean setupConfigOk = true;
- Group group = karafCellarGroupManager.findGroupByName(karafCellarGroupName);
- if (setupConfigOk && group == null) {
- logger.error("Cluster group " + karafCellarGroupName + " doesn't exist");
- setupConfigOk = false;
- }
-
- // check if the producer is ON
- if (setupConfigOk && karafCellarEventProducer.getSwitch().getStatus().equals(SwitchStatus.OFF)) {
- logger.error("Cluster event producer is OFF");
- setupConfigOk = false;
- }
-
- // check if the config pid is allowed
- if (setupConfigOk && !isClusterConfigPIDAllowed(group, Constants.CATEGORY, KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION, EventType.OUTBOUND)) {
- logger.error("Configuration PID " + KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION + " is blocked outbound for cluster group " + karafCellarGroupName);
- setupConfigOk = false;
- }
-
- if (setupConfigOk) {
- Map<String, Properties> configurations = karafCellarClusterManager.getMap(Constants.CONFIGURATION_MAP + Configurations.SEPARATOR + karafCellarGroupName);
- org.apache.karaf.cellar.core.Node thisKarafNode = karafCellarClusterManager.getNode();
- Properties karafCellarClusterNodeConfiguration = configurations.get(KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION);
- if (karafCellarClusterNodeConfiguration == null) {
- karafCellarClusterNodeConfiguration = new Properties();
- }
- String publicEndpointsPropValue = karafCellarClusterNodeConfiguration.getProperty(KARAF_CLUSTER_CONFIGURATION_PUBLIC_ENDPOINTS, thisKarafNode.getId() + "=" + address + ":" + port);
- String secureEndpointsPropValue = karafCellarClusterNodeConfiguration.getProperty(KARAF_CLUSTER_CONFIGURATION_SECURE_ENDPOINTS, thisKarafNode.getId() + "=" + secureAddress + ":" + securePort);
- String[] publicEndpointsArray = publicEndpointsPropValue.split(",");
- Set<String> publicEndpoints = new TreeSet<String>(Arrays.asList(publicEndpointsArray));
- String[] secureEndpointsArray = secureEndpointsPropValue.split(",");
- Set<String> secureEndpoints = new TreeSet<String>(Arrays.asList(secureEndpointsArray));
- publicEndpoints.add(thisKarafNode.getId() + "=" + address + ":" + port);
- secureEndpoints.add(thisKarafNode.getId() + "=" + secureAddress + ":" + securePort);
- karafCellarClusterNodeConfiguration.setProperty(KARAF_CLUSTER_CONFIGURATION_PUBLIC_ENDPOINTS, StringUtils.join(publicEndpoints, ","));
- karafCellarClusterNodeConfiguration.setProperty(KARAF_CLUSTER_CONFIGURATION_SECURE_ENDPOINTS, StringUtils.join(secureEndpoints, ","));
- configurations.put(KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION, karafCellarClusterNodeConfiguration);
- ClusterConfigurationEvent clusterConfigurationEvent = new ClusterConfigurationEvent(KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION);
- clusterConfigurationEvent.setSourceGroup(group);
- karafCellarEventProducer.produce(clusterConfigurationEvent);
- }
- }
-
bulkProcessorName = System.getProperty(BULK_PROCESSOR_NAME, bulkProcessorName);
bulkProcessorConcurrentRequests = System.getProperty(BULK_PROCESSOR_CONCURRENT_REQUESTS, bulkProcessorConcurrentRequests);
bulkProcessorBulkActions = System.getProperty(BULK_PROCESSOR_BULK_ACTIONS, bulkProcessorBulkActions);
@@ -373,7 +278,29 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
client = new PreBuiltTransportClient(transportSettings)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(address), 9300));
} catch (UnknownHostException e) {
- logger.error("Error resolving address " + address + " ElasticSearch transport client not connected, using internal client instead", e);
+ String message = "Error resolving address " + address + " ElasticSearch transport client not connected";
+ throw new Exception(message, e);
+ }
+
+ // let's now check the versions of all the nodes in the cluster, to make sure they are as expected.
+ try {
+ NodesInfoResponse nodesInfoResponse = client.admin().cluster().prepareNodesInfo()
+ .all().execute().get();
+
+ org.elasticsearch.Version minimalVersion = org.elasticsearch.Version.fromString(minimalElasticSearchVersion);
+ org.elasticsearch.Version maximalVersion = org.elasticsearch.Version.fromString(maximalElasticSearchVersion);
+ for (NodeInfo nodeInfo : nodesInfoResponse.getNodes()) {
+ org.elasticsearch.Version version = nodeInfo.getVersion();
+ if (version.before(minimalVersion) ||
+ version.equals(maximalVersion) ||
+ version.after(maximalVersion)) {
+ throw new Exception("ElasticSearch version on node " + nodeInfo.getHostname() + " is not within [" + minimalVersion + "," + maximalVersion + "), aborting startup !");
+ }
+ }
+ } catch (InterruptedException e) {
+ throw new Exception("Error checking ElasticSearch versions", e);
+ } catch (ExecutionException e) {
+ throw new Exception("Error checking ElasticSearch versions", e);
}
// @todo is there a better way to detect index existence than to wait for it to startup ?
@@ -433,7 +360,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
logger.info("Cluster status is GREEN");
- return null;
+ return true;
}
}.executeInClassLoader();
@@ -467,19 +394,19 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
private void refreshExistingIndexNames() {
new InClassLoaderExecute<Boolean>() {
- protected Boolean execute(Object... args) {
+ protected Boolean execute(Object... args) throws Exception {
try {
logger.info("Refreshing existing indices list...");
IndicesStatsResponse indicesStatsResponse = client.admin().indices().prepareStats().all().execute().get();
existingIndexNames = new TreeSet<>(indicesStatsResponse.getIndices().keySet());
} catch (InterruptedException e) {
- logger.error("Error retrieving indices stats", e);
+ throw new Exception("Error retrieving indices stats", e);
} catch (ExecutionException e) {
- logger.error("Error retrieving indices stats", e);
+ throw new Exception("Error retrieving indices stats", e);
}
return true;
}
- }.executeInClassLoader();
+ }.catchingExecuteInClassLoader(true);
}
public BulkProcessor getBulkProcessor() {
@@ -581,7 +508,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
return null;
}
- }.executeInClassLoader();
+ }.catchingExecuteInClassLoader(true);
if (timer != null) {
timer.cancel();
@@ -706,7 +633,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
@Override
public <T extends Item> T load(final String itemId, final Date dateHint, final Class<T> clazz) {
return new InClassLoaderExecute<T>() {
- protected T execute(Object... args) {
+ protected T execute(Object... args) throws Exception {
try {
String itemType = (String) clazz.getField("ITEM_TYPE").get(null);
@@ -733,15 +660,14 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
}
} catch (IndexNotFoundException e) {
- logger.debug("No index found for itemType=" + clazz.getName() + " itemId=" + itemId, e);
+ throw new Exception("No index found for itemType=" + clazz.getName() + " itemId=" + itemId, e);
} catch (IllegalAccessException e) {
- logger.error("Error loading itemType=" + clazz.getName() + " itemId=" + itemId, e);
+ throw new Exception("Error loading itemType=" + clazz.getName() + " itemId=" + itemId, e);
} catch (Exception t) {
- logger.error("Error loading itemType=" + clazz.getName() + " itemId=" + itemId, t);
+ throw new Exception("Error loading itemType=" + clazz.getName() + " itemId=" + itemId, t);
}
- return null;
}
- }.executeInClassLoader();
+ }.catchingExecuteInClassLoader(true);
}
@@ -754,7 +680,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
public boolean save(final Item item, final boolean useBatching) {
return new InClassLoaderExecute<Boolean>() {
- protected Boolean execute(Object... args) {
+ protected Boolean execute(Object... args) throws Exception {
try {
String source = CustomObjectMapper.getObjectMapper().writeValueAsString(item);
String itemType = item.getItemType();
@@ -794,11 +720,10 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
return true;
} catch (IOException e) {
- logger.error("Error saving item " + item, e);
+ throw new Exception("Error saving item " + item, e);
}
- return false;
}
- }.executeInClassLoader();
+ }.catchingExecuteInClassLoader(true);
}
@@ -810,7 +735,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
@Override
public boolean update(final String itemId, final Date dateHint, final Class clazz, final Map source) {
return new InClassLoaderExecute<Boolean>() {
- protected Boolean execute(Object... args) {
+ protected Boolean execute(Object... args) throws Exception {
try {
String itemType = (String) clazz.getField("ITEM_TYPE").get(null);
@@ -827,21 +752,20 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
return true;
} catch (IndexNotFoundException e) {
- logger.debug("No index found for itemType=" + clazz.getName() + "itemId=" + itemId, e);
+ throw new Exception("No index found for itemType=" + clazz.getName() + "itemId=" + itemId, e);
} catch (NoSuchFieldException e) {
- logger.error("Error updating item " + itemId, e);
+ throw new Exception("Error updating item " + itemId, e);
} catch (IllegalAccessException e) {
- logger.error("Error updating item " + itemId, e);
+ throw new Exception("Error updating item " + itemId, e);
}
- return false;
}
- }.executeInClassLoader();
+ }.catchingExecuteInClassLoader(true);
}
@Override
public boolean updateWithScript(final String itemId, final Date dateHint, final Class<?> clazz, final String script, final Map<String, Object> scriptParams) {
return new InClassLoaderExecute<Boolean>() {
- protected Boolean execute(Object... args) {
+ protected Boolean execute(Object... args) throws Exception {
try {
String itemType = (String) clazz.getField("ITEM_TYPE").get(null);
@@ -859,21 +783,20 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
return true;
} catch (IndexNotFoundException e) {
- logger.debug("No index found for itemType=" + clazz.getName() + "itemId=" + itemId, e);
+ throw new Exception("No index found for itemType=" + clazz.getName() + "itemId=" + itemId, e);
} catch (NoSuchFieldException e) {
- logger.error("Error updating item " + itemId, e);
+ throw new Exception("Error updating item " + itemId, e);
} catch (IllegalAccessException e) {
- logger.error("Error updating item " + itemId, e);
+ throw new Exception("Error updating item " + itemId, e);
}
- return false;
}
- }.executeInClassLoader();
+ }.catchingExecuteInClassLoader(true);
}
@Override
public <T extends Item> boolean remove(final String itemId, final Class<T> clazz) {
return new InClassLoaderExecute<Boolean>() {
- protected Boolean execute(Object... args) {
+ protected Boolean execute(Object... args) throws Exception {
//Index the query = register it in the percolator
try {
String itemType = (String) clazz.getField("ITEM_TYPE").get(null);
@@ -882,16 +805,15 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
.execute().actionGet();
return true;
} catch (Exception e) {
- logger.error("Cannot remove", e);
+ throw new Exception("Cannot remove", e);
}
- return false;
}
- }.executeInClassLoader();
+ }.catchingExecuteInClassLoader(true);
}
public <T extends Item> boolean removeByQuery(final Condition query, final Class<T> clazz) {
return new InClassLoaderExecute<Boolean>() {
- protected Boolean execute(Object... args) {
+ protected Boolean execute(Object... args) throws Exception {
try {
String itemType = (String) clazz.getField("ITEM_TYPE").get(null);
@@ -932,11 +854,10 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
return true;
} catch (Exception e) {
- logger.error("Cannot remove by query", e);
+ throw new Exception("Cannot remove by query", e);
}
- return false;
}
- }.executeInClassLoader();
+ }.catchingExecuteInClassLoader(true);
}
public boolean createIndex(final String indexName) {
@@ -956,7 +877,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
return !indexExists;
}
- }.executeInClassLoader();
+ }.catchingExecuteInClassLoader(true);
}
public boolean removeIndex(final String indexName) {
@@ -970,7 +891,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
return indexExists;
}
- }.executeInClassLoader();
+ }.catchingExecuteInClassLoader(true);
}
private void internalCreateIndex(String indexName, Map<String,String> mappings) {
@@ -1029,7 +950,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
public Map<String, Map<String, Object>> getPropertiesMapping(final String itemType) {
return new InClassLoaderExecute<Map<String, Map<String, Object>>>() {
@SuppressWarnings("unchecked")
- protected Map<String, Map<String, Object>> execute(Object... args) {
+ protected Map<String, Map<String, Object>> execute(Object... args) throws Exception {
GetMappingsResponse getMappingsResponse = client.admin().indices().prepareGetMappings().setTypes(itemType).execute().actionGet();
ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings = getMappingsResponse.getMappings();
Map<String, Map<String, Object>> propertyMap = new HashMap<>();
@@ -1054,11 +975,11 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
}
} catch (IOException e) {
- logger.error("Cannot get mapping", e);
+ throw new Exception("Cannot get mapping", e);
}
return propertyMap;
}
- }.executeInClassLoader();
+ }.catchingExecuteInClassLoader(true);
}
public Map<String, Object> getPropertyMapping(String property, String itemType) {
@@ -1089,6 +1010,9 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
private String getPropertyNameWithData(String name, String itemType) {
Map<String,Object> propertyMapping = getPropertyMapping(name,itemType);
+ if (propertyMapping == null) {
+ return null;
+ }
if (propertyMapping != null
&& "text".equals(propertyMapping.get("type"))
&& propertyMapping.containsKey("fields")
@@ -1100,7 +1024,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
public boolean saveQuery(final String queryName, final String query) {
return new InClassLoaderExecute<Boolean>() {
- protected Boolean execute(Object... args) {
+ protected Boolean execute(Object... args) throws Exception {
//Index the query = register it in the percolator
try {
logger.info("Saving query : " + queryName);
@@ -1110,11 +1034,10 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
.execute().actionGet();
return true;
} catch (Exception e) {
- logger.error("Cannot save query", e);
+ throw new Exception("Cannot save query", e);
}
- return false;
}
- }.executeInClassLoader();
+ }.catchingExecuteInClassLoader(true);
}
@Override
@@ -1129,7 +1052,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
@Override
public boolean removeQuery(final String queryName) {
return new InClassLoaderExecute<Boolean>() {
- protected Boolean execute(Object... args) {
+ protected Boolean execute(Object... args) throws Exception {
//Index the query = register it in the percolator
try {
client.prepareDelete(indexName, ".percolator", queryName)
@@ -1137,11 +1060,10 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
.execute().actionGet();
return true;
} catch (Exception e) {
- logger.error("Cannot delete query", e);
+ throw new Exception("Cannot delete query", e);
}
- return false;
}
- }.executeInClassLoader();
+ }.catchingExecuteInClassLoader(true);
}
@Override
@@ -1238,14 +1160,14 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
.actionGet();
return response.getHits().getTotalHits();
}
- }.executeInClassLoader();
+ }.catchingExecuteInClassLoader(true);
}
private <T extends Item> PartialList<T> query(final QueryBuilder query, final String sortBy, final Class<T> clazz, final int offset, final int size, final String[] routing, final String scrollTimeValidity) {
return new InClassLoaderExecute<PartialList<T>>() {
@Override
- protected PartialList<T> execute(Object... args) {
+ protected PartialList<T> execute(Object... args) throws Exception {
List<T> results = new ArrayList<T>();
String scrollIdentifier = null;
long totalHits = 0;
@@ -1348,7 +1270,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
}
} catch (Exception t) {
- logger.error("Error loading itemType=" + clazz.getName() + " query=" + query + " sortBy=" + sortBy, t);
+ throw new Exception("Error loading itemType=" + clazz.getName() + " query=" + query + " sortBy=" + sortBy, t);
}
PartialList<T> result = new PartialList<T>(results, offset, size, totalHits);
@@ -1358,7 +1280,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
return result;
}
- }.executeInClassLoader();
+ }.catchingExecuteInClassLoader(true);
}
@Override
@@ -1366,7 +1288,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
return new InClassLoaderExecute<PartialList<T>>() {
@Override
- protected PartialList<T> execute(Object... args) {
+ protected PartialList<T> execute(Object... args) throws Exception {
List<T> results = new ArrayList<T>();
long totalHits = 0;
try {
@@ -1391,11 +1313,10 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
return result;
} catch (Exception t) {
- logger.error("Error continuing scrolling query for itemType=" + clazz.getName() + " scrollIdentifier=" + scrollIdentifier + " scrollTimeValidity=" + scrollTimeValidity, t);
+ throw new Exception("Error continuing scrolling query for itemType=" + clazz.getName() + " scrollIdentifier=" + scrollIdentifier + " scrollTimeValidity=" + scrollTimeValidity, t);
}
- return null;
}
- }.executeInClassLoader();
+ }.catchingExecuteInClassLoader(true);
}
@Override
@@ -1520,7 +1441,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
return results;
}
- }.executeInClassLoader();
+ }.catchingExecuteInClassLoader(true);
}
private <T extends Item> String getItemType(Class<T> clazz) {
@@ -1544,105 +1465,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
- @Override
- public List<ClusterNode> getClusterNodes() {
- return new InClassLoaderExecute<List<ClusterNode>>() {
-
- @Override
- protected List<ClusterNode> execute(Object... args) {
- Map<String, ClusterNode> clusterNodes = new LinkedHashMap<String, ClusterNode>();
-
- Set<org.apache.karaf.cellar.core.Node> karafCellarNodes = karafCellarClusterManager.listNodes();
- org.apache.karaf.cellar.core.Node thisKarafNode = karafCellarClusterManager.getNode();
- Map<String, Properties> clusterConfigurations = karafCellarClusterManager.getMap(Constants.CONFIGURATION_MAP + Configurations.SEPARATOR + karafCellarGroupName);
- Properties karafCellarClusterNodeConfiguration = clusterConfigurations.get(KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION);
- Map<String, String> publicNodeEndpoints = new TreeMap<>();
- Map<String, String> secureNodeEndpoints = new TreeMap<>();
- if (karafCellarClusterNodeConfiguration != null) {
- String publicEndpointsPropValue = karafCellarClusterNodeConfiguration.getProperty(KARAF_CLUSTER_CONFIGURATION_PUBLIC_ENDPOINTS, thisKarafNode.getId() + "=" + address + ":" + port);
- String secureEndpointsPropValue = karafCellarClusterNodeConfiguration.getProperty(KARAF_CLUSTER_CONFIGURATION_SECURE_ENDPOINTS, thisKarafNode.getId() + "=" + secureAddress + ":" + securePort);
- String[] publicEndpointsArray = publicEndpointsPropValue.split(",");
- Set<String> publicEndpoints = new TreeSet<String>(Arrays.asList(publicEndpointsArray));
- for (String endpoint : publicEndpoints) {
- String[] endpointParts = endpoint.split("=");
- publicNodeEndpoints.put(endpointParts[0], endpointParts[1]);
- }
- String[] secureEndpointsArray = secureEndpointsPropValue.split(",");
- Set<String> secureEndpoints = new TreeSet<String>(Arrays.asList(secureEndpointsArray));
- for (String endpoint : secureEndpoints) {
- String[] endpointParts = endpoint.split("=");
- secureNodeEndpoints.put(endpointParts[0], endpointParts[1]);
- }
- }
- for (org.apache.karaf.cellar.core.Node karafCellarNode : karafCellarNodes) {
- ClusterNode clusterNode = new ClusterNode();
- clusterNode.setHostName(karafCellarNode.getHost());
- String publicEndpoint = publicNodeEndpoints.get(karafCellarNode.getId());
- if (publicEndpoint != null) {
- String[] publicEndpointParts = publicEndpoint.split(":");
- clusterNode.setHostAddress(publicEndpointParts[0]);
- clusterNode.setPublicPort(Integer.parseInt(publicEndpointParts[1]));
- }
- String secureEndpoint = secureNodeEndpoints.get(karafCellarNode.getId());
- if (secureEndpoint != null) {
- String[] secureEndpointParts = secureEndpoint.split(":");
- clusterNode.setSecureHostAddress(secureEndpointParts[0]);
- clusterNode.setSecurePort(Integer.parseInt(secureEndpointParts[1]));
- clusterNode.setMaster(false);
- clusterNode.setData(false);
- }
- try {
- // now let's connect to remote JMX service to retrieve information from the runtime and operating system MX beans
- JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://"+karafCellarNode.getHost() + ":"+karafJMXPort+"/karaf-root");
- Map<String,Object> environment=new HashMap<String,Object>();
- if (karafJMXUsername != null && karafJMXPassword != null) {
- environment.put(JMXConnector.CREDENTIALS,new String[]{karafJMXUsername,karafJMXPassword});
- }
- JMXConnector jmxc = JMXConnectorFactory.connect(url, environment);
- MBeanServerConnection mbsc = jmxc.getMBeanServerConnection();
- final RuntimeMXBean remoteRuntime = ManagementFactory.newPlatformMXBeanProxy(mbsc, ManagementFactory.RUNTIME_MXBEAN_NAME, RuntimeMXBean.class);
- clusterNode.setUptime(remoteRuntime.getUptime());
- ObjectName operatingSystemMXBeanName = new ObjectName(ManagementFactory.OPERATING_SYSTEM_MXBEAN_NAME);
- Double processCpuLoad = null;
- Double systemCpuLoad = null;
- try {
- processCpuLoad = (Double) mbsc.getAttribute(operatingSystemMXBeanName, "ProcessCpuLoad");
- } catch (MBeanException e) {
- e.printStackTrace();
- } catch (AttributeNotFoundException e) {
- e.printStackTrace();
- }
- try {
- systemCpuLoad = (Double) mbsc.getAttribute(operatingSystemMXBeanName, "SystemCpuLoad");
- } catch (MBeanException e) {
- e.printStackTrace();
- } catch (AttributeNotFoundException e) {
- e.printStackTrace();
- }
- final OperatingSystemMXBean remoteOperatingSystemMXBean = ManagementFactory.newPlatformMXBeanProxy(mbsc, ManagementFactory.OPERATING_SYSTEM_MXBEAN_NAME, OperatingSystemMXBean.class);
- clusterNode.setLoadAverage(new double[] { remoteOperatingSystemMXBean.getSystemLoadAverage()});
- if (systemCpuLoad != null) {
- clusterNode.setCpuLoad(systemCpuLoad);
- }
-
- } catch (MalformedURLException e) {
- logger.error("Error connecting to remote JMX server", e);
- } catch (IOException e) {
- logger.error("Error retrieving remote JMX data", e);
- } catch (MalformedObjectNameException e) {
- logger.error("Error retrieving remote JMX data", e);
- } catch (InstanceNotFoundException e) {
- logger.error("Error retrieving remote JMX data", e);
- } catch (ReflectionException e) {
- logger.error("Error retrieving remote JMX data", e);
- }
- clusterNodes.put(karafCellarNode.getId(), clusterNode);
- }
-
- return new ArrayList<ClusterNode>(clusterNodes.values());
- }
- }.executeInClassLoader();
- }
@Override
public void refresh() {
@@ -1654,7 +1476,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
client.admin().indices().refresh(Requests.refreshRequest()).actionGet();
return true;
}
- }.executeInClassLoader();
+ }.catchingExecuteInClassLoader(true);
}
@@ -1663,7 +1485,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
public void purge(final Date date) {
new InClassLoaderExecute<Object>() {
@Override
- protected Object execute(Object... args) {
+ protected Object execute(Object... args) throws Exception {
IndicesStatsResponse statsResponse = client.admin().indices().prepareStats(indexName + "-*")
.setIndexing(false)
.setGet(false)
@@ -1689,7 +1511,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
toDelete.add(currentIndexName);
}
} catch (ParseException e) {
- logger.error("Cannot parse index name " + currentIndexName, e);
+ throw new Exception("Cannot parse index name " + currentIndexName, e);
}
}
}
@@ -1698,7 +1520,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
return null;
}
- }.executeInClassLoader();
+ }.catchingExecuteInClassLoader(true);
}
@Override
@@ -1743,7 +1565,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
return null;
}
- }.executeInClassLoader();
+ }.catchingExecuteInClassLoader(true);
}
@Override
@@ -1794,7 +1616,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
return results;
}
- }.executeInClassLoader();
+ }.catchingExecuteInClassLoader(true);
}
private String getIndexNameForQuery(String itemType) {
@@ -1804,9 +1626,9 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
public abstract static class InClassLoaderExecute<T> {
- protected abstract T execute(Object... args);
+ protected abstract T execute(Object... args) throws Exception;
- public T executeInClassLoader(Object... args) {
+ public T executeInClassLoader(Object... args) throws Exception {
ClassLoader tccl = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
@@ -1815,6 +1637,15 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
Thread.currentThread().setContextClassLoader(tccl);
}
}
+
+ public T catchingExecuteInClassLoader( boolean logError, Object... args) {
+ try {
+ return executeInClassLoader(args);
+ } catch (Exception e) {
+ logger.error("Error while executing in class loader", e);
+ }
+ return null;
+ }
}
private String getConfig(Map<String,String> settings, String key,
@@ -1825,21 +1656,5 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
return defaultValue;
}
- /**
- * Check if a configuration is allowed.
- *
- * @param group the cluster group.
- * @param category the configuration category constant.
- * @param pid the configuration PID.
- * @param type the cluster event type.
- * @return true if the cluster event type is allowed, false else.
- */
- public boolean isClusterConfigPIDAllowed(Group group, String category, String pid, EventType type) {
- CellarSupport support = new CellarSupport();
- support.setClusterManager(this.karafCellarClusterManager);
- support.setGroupManager(this.karafCellarGroupManager);
- support.setConfigurationAdmin(this.osgiConfigurationAdmin);
- return support.isAllowed(group, category, pid, type);
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/a5b7b156/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
----------------------------------------------------------------------
diff --git a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index c929009..1135ef5 100644
--- a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -32,7 +32,6 @@
<cm:default-properties>
<cm:property name="cluster.name" value="contextElasticSearch"/>
<cm:property name="index.name" value="context"/>
- <cm:property name="elasticSearchConfig" value="file:${karaf.etc}/elasticsearch.yml"/>
<cm:property name="numberOfShards" value="5"/>
<cm:property name="numberOfReplicas" value="0"/>
<cm:property name="monthlyIndex.numberOfShards" value="3"/>
@@ -46,10 +45,9 @@
<cm:property name="bulkProcessor.flushInterval" value="5s" />
<cm:property name="bulkProcessor.backoffPolicy" value="exponential" />
- <cm:property name="cluster.group" value="default" />
- <cm:property name="cluster.jmxUsername" value="karaf" />
- <cm:property name="cluster.jmxPassword" value="karaf" />
- <cm:property name="cluster.jmxPort" value="1099" />
+ <cm:property name="minimalElasticSearchVersion" value="5.0.0" />
+ <cm:property name="maximalElasticSearchVersion" value="5.2.0" />
+
</cm:default-properties>
</cm:property-placeholder>
@@ -71,7 +69,6 @@
<service id="elasticSearchPersistenceService" ref="elasticSearchPersistenceServiceImpl">
<interfaces>
<value>org.apache.unomi.persistence.spi.PersistenceService</value>
- <value>org.apache.unomi.api.services.ClusterService</value>
</interfaces>
</service>
@@ -101,7 +98,6 @@
<property name="secureAddress" value="${web.contextserver.secureAddress}"/>
<property name="securePort" value="${web.contextserver.securePort}"/>
<property name="defaultQueryLimit" value="${es.defaultQueryLimit}"/>
- <property name="elasticSearchConfig" value="${es.elasticSearchConfig}"/>
<property name="itemsMonthlyIndexed">
<list>
<value>event</value>
@@ -123,14 +119,9 @@
<property name="bulkProcessorBulkSize" value="${es.bulkProcessor.bulkSize}" />
<property name="bulkProcessorFlushInterval" value="${es.bulkProcessor.flushInterval}" />
<property name="bulkProcessorBackoffPolicy" value="${es.bulkProcessor.backoffPolicy}" />
- <property name="karafCellarClusterManager" ref="karafCellarClusterManager" />
- <property name="karafCellarEventProducer" ref="karafCellarEventProducer" />
- <property name="karafCellarGroupManager" ref="karafCellarGroupManager" />
- <property name="karafCellarGroupName" value="${es.cluster.group}" />
- <property name="osgiConfigurationAdmin" ref="osgiConfigurationAdmin" />
- <property name="karafJMXUsername" value="${es.cluster.jmxUsername}" />
- <property name="karafJMXPassword" value="${es.cluster.jmxPassword}" />
- <property name="karafJMXPort" value="${es.cluster.jmxPort}" />
+
+ <property name="minimalElasticSearchVersion" value="${es.minimalElasticSearchVersion}" />
+ <property name="maximalElasticSearchVersion" value="${es.maximalElasticSearchVersion}" />
</bean>
<!-- We use a listener here because using the list directly for listening to proxies coming from the same bundle didn't seem to work -->
http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/a5b7b156/persistence-elasticsearch/core/src/main/resources/elasticsearch.yml
----------------------------------------------------------------------
diff --git a/persistence-elasticsearch/core/src/main/resources/elasticsearch.yml b/persistence-elasticsearch/core/src/main/resources/elasticsearch.yml
deleted file mode 100644
index fe0e52f..0000000
--- a/persistence-elasticsearch/core/src/main/resources/elasticsearch.yml
+++ /dev/null
@@ -1,110 +0,0 @@
-# ======================== Elasticsearch Configuration =========================
-#
-# NOTE: Elasticsearch comes with reasonable defaults for most settings.
-# Before you set out to tweak and tune the configuration, make sure you
-# understand what are you trying to accomplish and the consequences.
-#
-# The primary way of configuring a node is via this file. This template lists
-# the most important settings you may want to configure for a production cluster.
-#
-# Please see the documentation for further information on configuration options:
-# <http://www.elastic.co/guide/en/elasticsearch/reference/current/setup-configuration.html>
-#
-# ---------------------------------- Cluster -----------------------------------
-#
-# Use a descriptive name for your cluster:
-#
-# cluster.name: my-application
-#
-# ------------------------------------ Node ------------------------------------
-#
-# Use a descriptive name for the node:
-#
-# node.name: node-1
-#
-# Add custom attributes to the node:
-#
-# node.rack: r1
-#
-# ----------------------------------- Paths ------------------------------------
-#
-# Path to directory where to store the data (separate multiple locations by comma):
-#
-# path.data: /path/to/data
-#
-# Path to log files:
-#
-# path.logs: /path/to/logs
-#
-# ----------------------------------- Memory -----------------------------------
-#
-# Lock the memory on startup:
-#
-# bootstrap.mlockall: true
-#
-# Make sure that the `ES_HEAP_SIZE` environment variable is set to about half the memory
-# available on the system and that the owner of the process is allowed to use this limit.
-#
-# Elasticsearch performs poorly when the system is swapping the memory.
-#
-# ---------------------------------- Network -----------------------------------
-#
-# Set the bind address to a specific IP (IPv4 or IPv6):
-#
-# network.host: 192.168.0.1
-#
-# Set a custom port for HTTP:
-#
-# http.port: 9200
-#
-# For more information, see the documentation at:
-# <http://www.elastic.co/guide/en/elasticsearch/reference/current/modules-network.html>
-#
-# --------------------------------- Discovery ----------------------------------
-#
-# Pass an initial list of hosts to perform discovery when new node is started:
-# The default list of hosts is ["127.0.0.1", "[::1]"]
-#
-# discovery.zen.ping.unicast.hosts: ["host1", "host2"]
-#
-# Prevent the "split brain" by configuring the majority of nodes (total number of nodes / 2 + 1):
-#
-# discovery.zen.minimum_master_nodes: 3
-#
-# For more information, see the documentation at:
-# <http://www.elastic.co/guide/en/elasticsearch/reference/current/modules-discovery.html>
-#
-# ---------------------------------- Gateway -----------------------------------
-#
-# Block initial recovery after a full cluster restart until N nodes are started:
-#
-# gateway.recover_after_nodes: 3
-#
-# For more information, see the documentation at:
-# <http://www.elastic.co/guide/en/elasticsearch/reference/current/modules-gateway.html>
-#
-# ---------------------------------- Various -----------------------------------
-#
-# Disable starting multiple nodes on a single system:
-#
-# node.max_local_storage_nodes: 1
-#
-# Require explicit names when deleting indices:
-#
-# action.destructive_requires_name: true
-#
-# --------------------------- Apache Unomi specific ---------------------------
-#
-threadpool.index.queue_size: 1000
-script.engine.groovy.inline.update: on
-index.percolator.allow_unmapped_fields: true
-
-# Require explicit index creation
-action.auto_create_index: false
-
-# Protect against accidental close/delete operations
-# on all indices. You can still close/delete individual
-# indices
-#action.disable_close_all_indices: true
-#action.disable_delete_all_indices: true
-#action.disable_shutdown: true
http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/a5b7b156/persistence-elasticsearch/core/src/main/resources/hazelcast.xml
----------------------------------------------------------------------
diff --git a/persistence-elasticsearch/core/src/main/resources/hazelcast.xml b/persistence-elasticsearch/core/src/main/resources/hazelcast.xml
deleted file mode 100644
index 0fc6f5d..0000000
--- a/persistence-elasticsearch/core/src/main/resources/hazelcast.xml
+++ /dev/null
@@ -1,219 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-<hazelcast xsi:schemaLocation="http://www.hazelcast.com/schema/config hazelcast-config-3.2.xsd"
- xmlns="http://www.hazelcast.com/schema/config"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
- <group>
- <name>cellar</name>
- <password>pass</password>
- </group>
- <management-center enabled="false">http://localhost:8080/mancenter</management-center>
- <network>
- <port auto-increment="true" port-count="100">5701</port>
- <outbound-ports>
- <!--
- Allowed port range when connecting to other nodes.
- 0 or * means use system provided port.
- -->
- <ports>0</ports>
- </outbound-ports>
- <join>
- <multicast enabled="false">
- <multicast-group>224.2.2.3</multicast-group>
- <multicast-port>54327</multicast-port>
- </multicast>
- <tcp-ip enabled="true">
- <interface>127.0.0.1</interface>
- </tcp-ip>
- <aws enabled="false">
- <access-key>my-access-key</access-key>
- <secret-key>my-secret-key</secret-key>
- <!--optional, default is us-east-1 -->
- <region>us-west-1</region>
- <!--optional, default is ec2.amazonaws.com. If set, region shouldn't be set as it will override this property -->
- <host-header>ec2.amazonaws.com</host-header>
- <!-- optional, only instances belonging to this group will be discovered, default will try all running instances -->
- <security-group-name>hazelcast-sg</security-group-name>
- <tag-key>type</tag-key>
- <tag-value>hz-nodes</tag-value>
- </aws>
- </join>
- <interfaces enabled="false">
- <interface>10.10.1.*</interface>
- </interfaces>
- <ssl enabled="false"/>
- <socket-interceptor enabled="false"/>
- <symmetric-encryption enabled="false">
- <!--
- encryption algorithm such as
- DES/ECB/PKCS5Padding,
- PBEWithMD5AndDES,
- AES/CBC/PKCS5Padding,
- Blowfish,
- DESede
- -->
- <algorithm>PBEWithMD5AndDES</algorithm>
- <!-- salt value to use when generating the secret key -->
- <salt>thesalt</salt>
- <!-- pass phrase to use when generating the secret key -->
- <password>thepass</password>
- <!-- iteration count to use when generating the secret key -->
- <iteration-count>19</iteration-count>
- </symmetric-encryption>
- </network>
- <partition-group enabled="false"/>
- <executor-service>
- <pool-size>16</pool-size>
- <!-- Queue capacity. 0 means Integer.MAX_VALUE -->
- <queue-capacity>0</queue-capacity>
- </executor-service>
- <queue name="default">
- <!--
- Maximum size of the queue. When a JVM's local queue size reaches the maximum,
- all put/offer operations will get blocked until the queue size
- of the JVM goes down below the maximum.
- Any integer between 0 and Integer.MAX_VALUE. 0 means
- Integer.MAX_VALUE. Default is 0.
- -->
- <max-size>0</max-size>
- <!--
- Number of backups. If 1 is set as the backup-count for example,
- then all entries of the map will be copied to another JVM for
- fail-safety. 0 means no backup.
- -->
- <backup-count>1</backup-count>
- <!--
- Number of async backups. 0 means no backup.
- -->
- <async-backup-count>0</async-backup-count>
- <empty-queue-ttl>-1</empty-queue-ttl>
- </queue>
-
- <map name="default">
- <!--
- Data type that will be used for storing recordMap.
- Possible values:
- BINARY (default): keys and values will be stored as binary data
- OBJECT : values will be stored in their object forms
- OFFHEAP : values will be stored in non-heap region of JVM
- -->
- <in-memory-format>BINARY</in-memory-format>
- <!--
- Number of backups. If 1 is set as the backup-count for example,
- then all entries of the map will be copied to another JVM for
- fail-safety. 0 means no backup.
- -->
- <backup-count>1</backup-count>
- <!--
- Number of async backups. 0 means no backup.
- -->
- <async-backup-count>0</async-backup-count>
- <!--
- Maximum number of seconds for each entry to stay in the map. Entries that are
- older than <time-to-live-seconds> and not updated for <time-to-live-seconds>
- will get automatically evicted from the map.
- Any integer between 0 and Integer.MAX_VALUE. 0 means infinite. Default is 0.
- -->
- <time-to-live-seconds>0</time-to-live-seconds>
- <!--
- Maximum number of seconds for each entry to stay idle in the map. Entries that are
- idle(not touched) for more than <max-idle-seconds> will get
- automatically evicted from the map. Entry is touched if get, put or containsKey is called.
- Any integer between 0 and Integer.MAX_VALUE. 0 means infinite. Default is 0.
- -->
- <max-idle-seconds>0</max-idle-seconds>
- <!--
- Valid values are:
- NONE (no eviction),
- LRU (Least Recently Used),
- LFU (Least Frequently Used).
- NONE is the default.
- -->
- <eviction-policy>NONE</eviction-policy>
- <!--
- Maximum size of the map. When max size is reached,
- map is evicted based on the policy defined.
- Any integer between 0 and Integer.MAX_VALUE. 0 means
- Integer.MAX_VALUE. Default is 0.
- -->
- <max-size policy="PER_NODE">0</max-size>
- <!--
- When max. size is reached, specified percentage of
- the map will be evicted. Any integer between 0 and 100.
- If 25 is set for example, 25% of the entries will
- get evicted.
- -->
- <eviction-percentage>25</eviction-percentage>
- <!--
- While recovering from split-brain (network partitioning),
- map entries in the small cluster will merge into the bigger cluster
- based on the policy set here. When an entry merge into the
- cluster, there might an existing entry with the same key already.
- Values of these entries might be different for that same key.
- Which value should be set for the key? Conflict is resolved by
- the policy set here. Default policy is PutIfAbsentMapMergePolicy
-
- There are built-in merge policies such as
- com.hazelcast.map.merge.PassThroughMergePolicy; entry will be added if there is no existing entry for the key.
- com.hazelcast.map.merge.PutIfAbsentMapMergePolicy ; entry will be added if the merging entry doesn't exist in the cluster.
- com.hazelcast.map.merge.HigherHitsMapMergePolicy ; entry with the higher hits wins.
- com.hazelcast.map.merge.LatestUpdateMapMergePolicy ; entry with the latest update wins.
- -->
- <merge-policy>com.hazelcast.map.merge.PassThroughMergePolicy</merge-policy>
- </map>
-
- <multimap name="default">
- <backup-count>1</backup-count>
- <value-collection-type>SET</value-collection-type>
- </multimap>
-
- <multimap name="default">
- <backup-count>1</backup-count>
- <value-collection-type>SET</value-collection-type>
- </multimap>
-
- <list name="default">
- <backup-count>1</backup-count>
- </list>
-
- <set name="default">
- <backup-count>1</backup-count>
- </set>
-
- <jobtracker name="default">
- <max-thread-size>0</max-thread-size>
- <!-- Queue size 0 means number of partitions * 2 -->
- <queue-size>0</queue-size>
- <retry-count>0</retry-count>
- <chunk-size>1000</chunk-size>
- <communicate-stats>true</communicate-stats>
- <topology-changed-strategy>CANCEL_RUNNING_OPERATION</topology-changed-strategy>
- </jobtracker>
-
- <semaphore name="default">
- <initial-permits>0</initial-permits>
- <backup-count>1</backup-count>
- <async-backup-count>0</async-backup-count>
- </semaphore>
-
- <serialization>
- <portable-version>0</portable-version>
- </serialization>
-
- <services enable-defaults="true" />
-</hazelcast>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/a5b7b156/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
----------------------------------------------------------------------
diff --git a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
index ddcbed5..55a24ea 100644
--- a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
+++ b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
@@ -17,14 +17,10 @@
cluster.name=contextElasticSearch
index.name=context
-elasticSearchConfig=file:${karaf.etc}/elasticsearch.yml
monthlyIndex.numberOfShards=3
monthlyIndex.numberOfReplicas=0
numberOfShards=5
numberOfReplicas=0
-node.data=true
-discovery.zen.ping.multicast.enabled=false
-#discovery.zen.ping.unicast.hosts=["192.168.0.1:9300", "192.168.0.2:9300"]
defaultQueryLimit=10
# The following settings control the behavior of the BulkProcessor API. You can find more information about these
@@ -37,7 +33,5 @@ bulkProcessor.bulkSize=5MB
bulkProcessor.flushInterval=5s
bulkProcessor.backoffPolicy=exponential
-cluster.group=default
-cluster.jmxUsername=karaf
-cluster.jmxPassword=karaf
-cluster.jmxPort=1099
\ No newline at end of file
+minimalElasticSearchVersion=5.0.0
+maximalElasticSearchVersion=5.2.0
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/a5b7b156/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
----------------------------------------------------------------------
diff --git a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
index 75c0a3b..a6b175f 100644
--- a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
+++ b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
@@ -452,4 +452,12 @@ public interface PersistenceService {
* @return {@code true} if the operation was successful, {@code false} otherwise
*/
boolean removeIndex(final String indexName);
+
+ /**
+ * Removes all data associated with the provided scope.
+ *
+ * @param scope the scope for which we want to remove data
+ */
+ void purge(final String scope);
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/a5b7b156/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1aba4e5..6de2441 100644
--- a/pom.xml
+++ b/pom.xml
@@ -427,8 +427,6 @@
<exclude>**/src/main/webapp/WEB-INF/beans.xml</exclude>
<!-- Web application robots.txt file -->
<exclude>**/src/main/webapp/robots.txt</exclude>
- <!-- Elastic search configuration files with (mostly) default configuration -->
- <exclude>**/src/main/resources/elasticsearch.yml</exclude>
<!-- ignore generated log files -->
<exclude>**/*.log</exclude>
</excludes>
http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/a5b7b156/services/pom.xml
----------------------------------------------------------------------
diff --git a/services/pom.xml b/services/pom.xml
index 7a58cd7..5188aea 100644
--- a/services/pom.xml
+++ b/services/pom.xml
@@ -112,6 +112,16 @@
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.karaf.cellar</groupId>
+ <artifactId>org.apache.karaf.cellar.core</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.karaf.cellar</groupId>
+ <artifactId>org.apache.karaf.cellar.config</artifactId>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
<build>
@@ -156,6 +166,18 @@
<type>cfg</type>
<classifier>thirdpartycfg</classifier>
</artifact>
+ <artifact>
+ <file>
+ src/main/resources/org.apache.unomi.cluster.cfg
+ </file>
+ <type>cfg</type>
+ <classifier>clustercfg</classifier>
+ </artifact>
+ <artifact>
+ <file>src/main/resources/hazelcast.xml</file>
+ <type>xml</type>
+ <classifier>hazelcastconfig</classifier>
+ </artifact>
</artifacts>
</configuration>
</execution>