You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by sh...@apache.org on 2020/11/11 09:20:53 UTC
[unomi] 07/17: Unomi - 387: Stop Unomi in case of an error which
its cannot recovery from (#186)
This is an automated email from the ASF dual-hosted git repository.
shuber pushed a commit to branch unomi-1.5.x
in repository https://gitbox.apache.org/repos/asf/unomi.git
commit d9e4a717f955a8e9640e3b9e2e9696a491079295
Author: liatiusim <62...@users.noreply.github.com>
AuthorDate: Tue Oct 13 17:56:44 2020 +0200
Unomi - 387: Stop Unomi in case of an error which its cannot recovery from (#186)
* Stop Unomi if reactor stopped
* Use system bundle stop instead of system exit, define fatal errors in configuration
Co-authored-by: Shir Bromberg <sb...@yotpo.com>
(cherry picked from commit a6999de528eafccd43076c622fca74d3807877db)
---
.../main/resources/etc/custom.system.properties | 1 +
.../ElasticSearchPersistenceServiceImpl.java | 90 +++++++++++++++-------
.../resources/OSGI-INF/blueprint/blueprint.xml | 1 +
.../org.apache.unomi.persistence.elasticsearch.cfg | 1 +
4 files changed, 64 insertions(+), 29 deletions(-)
diff --git a/package/src/main/resources/etc/custom.system.properties b/package/src/main/resources/etc/custom.system.properties
index dd58502..f55d9cf 100644
--- a/package/src/main/resources/etc/custom.system.properties
+++ b/package/src/main/resources/etc/custom.system.properties
@@ -72,6 +72,7 @@ org.apache.unomi.elasticsearch.cluster.name=${env:UNOMI_ELASTICSEARCH_CLUSTERNAM
# hostA:9200,hostB:9200
# Note: the port number must be repeated for each host.
org.apache.unomi.elasticsearch.addresses=${env:UNOMI_ELASTICSEARCH_ADDRESSES:-localhost:9200}
+org.apache.unomi.elasticsearch.fatalIllegalStateErrors=${env:UNOMI_ELASTICSEARCH_FATAL_STATE_ERRORS:-}
org.apache.unomi.elasticsearch.index.prefix=${env:UNOMI_ELASTICSEARCH_INDEXPREFIX:-context}
org.apache.unomi.elasticsearch.monthlyIndex.nbShards=${env:UNOMI_ELASTICSEARCH_MONTHLYINDEX_SHARDS:-5}
org.apache.unomi.elasticsearch.monthlyIndex.nbReplicas=${env:UNOMI_ELASTICSEARCH_MONTHLYINDEX_REPLICAS:-0}
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index b86b368..530395b 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -148,6 +148,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
private String numberOfReplicas;
private String indexMappingTotalFieldsLimit;
private String indexMaxDocValueFieldsSearch;
+ private String[] fatalIllegalStateErrors;
private BundleContext bundleContext;
private Map<String, String> mappings = new HashMap<String, String>();
private ConditionEvaluatorDispatcher conditionEvaluatorDispatcher;
@@ -201,6 +202,11 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
}
+ public void setFatalIllegalStateErrors(String fatalIllegalStateErrors) {
+ this.fatalIllegalStateErrors = Arrays.stream(fatalIllegalStateErrors.split(","))
+ .map(i -> i.trim()).filter(i -> !i.isEmpty()).toArray(String[]::new);
+ }
+
public void setIndexPrefix(String indexPrefix) {
this.indexPrefix = indexPrefix;
}
@@ -333,7 +339,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
public void start() throws Exception {
// on startup
- new InClassLoaderExecute<Object>(null, null) {
+ new InClassLoaderExecute<Object>(null, null, this.bundleContext, this.fatalIllegalStateErrors) {
public Object execute(Object... args) throws Exception {
bulkProcessorConcurrentRequests = System.getProperty(BULK_PROCESSOR_CONCURRENT_REQUESTS, bulkProcessorConcurrentRequests);
@@ -532,7 +538,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
public void stop() {
- new InClassLoaderExecute<Object>(null, null) {
+ new InClassLoaderExecute<Object>(null, null, this.bundleContext, this.fatalIllegalStateErrors) {
protected Object execute(Object... args) throws IOException {
logger.info("Closing ElasticSearch persistence backend...");
if (bulkProcessor != null) {
@@ -662,7 +668,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
@Override
public <T extends Item> T load(final String itemId, final Date dateHint, final Class<T> clazz) {
- return new InClassLoaderExecute<T>(metricsService, this.getClass().getName() + ".loadItem") {
+ return new InClassLoaderExecute<T>(metricsService, this.getClass().getName() + ".loadItem", this.bundleContext, this.fatalIllegalStateErrors) {
protected T execute(Object... args) throws Exception {
try {
String itemType = Item.getItemType(clazz);
@@ -720,7 +726,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
@Override
public boolean save(final Item item, final boolean useBatching) {
- Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".saveItem") {
+ Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".saveItem", this.bundleContext, this.fatalIllegalStateErrors) {
protected Boolean execute(Object... args) throws Exception {
try {
String source = ESCustomObjectMapper.getObjectMapper().writeValueAsString(item);
@@ -766,7 +772,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
@Override
public boolean update(final String itemId, final Date dateHint, final Class clazz, final Map source) {
- Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".updateItem") {
+ Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".updateItem", this.bundleContext, this.fatalIllegalStateErrors) {
protected Boolean execute(Object... args) throws Exception {
try {
String itemType = Item.getItemType(clazz);
@@ -792,7 +798,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
@Override
public boolean updateWithQueryAndScript(final Date dateHint, final Class<?> clazz, final String[] scripts, final Map<String, Object>[] scriptParams, final Condition[] conditions) {
- Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".updateWithQueryAndScript") {
+ Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".updateWithQueryAndScript", this.bundleContext, this.fatalIllegalStateErrors) {
protected Boolean execute(Object... args) throws Exception {
try {
String itemType = Item.getItemType(clazz);
@@ -849,7 +855,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
@Override
public boolean updateWithScript(final String itemId, final Date dateHint, final Class<?> clazz, final String script, final Map<String, Object> scriptParams) {
- Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".updateWithScript") {
+ Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".updateWithScript", this.bundleContext, this.fatalIllegalStateErrors) {
protected Boolean execute(Object... args) throws Exception {
try {
String itemType = Item.getItemType(clazz);
@@ -881,7 +887,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
@Override
public <T extends Item> boolean remove(final String itemId, final Class<T> clazz) {
- Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeItem") {
+ Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeItem", this.bundleContext, this.fatalIllegalStateErrors) {
protected Boolean execute(Object... args) throws Exception {
try {
String itemType = Item.getItemType(clazz);
@@ -902,7 +908,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
public <T extends Item> boolean removeByQuery(final Condition query, final Class<T> clazz) {
- Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeByQuery") {
+ Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeByQuery", this.bundleContext, this.fatalIllegalStateErrors) {
protected Boolean execute(Object... args) throws Exception {
try {
String itemType = Item.getItemType(clazz);
@@ -967,7 +973,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
public boolean indexTemplateExists(final String templateName) {
- Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".indexTemplateExists") {
+ Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".indexTemplateExists", this.bundleContext, this.fatalIllegalStateErrors) {
protected Boolean execute(Object... args) throws IOException {
IndexTemplatesExistRequest indexTemplatesExistRequest = new IndexTemplatesExistRequest(templateName);
return client.indices().existsTemplate(indexTemplatesExistRequest, RequestOptions.DEFAULT);
@@ -981,7 +987,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
public boolean removeIndexTemplate(final String templateName) {
- Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeIndexTemplate") {
+ Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeIndexTemplate", this.bundleContext, this.fatalIllegalStateErrors) {
protected Boolean execute(Object... args) throws IOException {
DeleteIndexTemplateRequest deleteIndexTemplateRequest = new DeleteIndexTemplateRequest(templateName);
AcknowledgedResponse deleteIndexTemplateResponse = client.indices().deleteTemplate(deleteIndexTemplateRequest, RequestOptions.DEFAULT);
@@ -996,7 +1002,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
public boolean createMonthlyIndexTemplate() {
- Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".createMonthlyIndexTemplate") {
+ Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".createMonthlyIndexTemplate", this.bundleContext, this.fatalIllegalStateErrors) {
protected Boolean execute(Object... args) throws IOException {
boolean executedSuccessfully = true;
for (String itemName : itemsMonthlyIndexed) {
@@ -1037,7 +1043,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
public boolean createIndex(final String itemType) {
String index = getIndex(itemType);
- Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".createIndex") {
+ Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".createIndex", this.bundleContext, this.fatalIllegalStateErrors) {
protected Boolean execute(Object... args) throws IOException {
GetIndexRequest getIndexRequest = new GetIndexRequest(index);
boolean indexExists = client.indices().exists(getIndexRequest, RequestOptions.DEFAULT);
@@ -1058,7 +1064,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
public boolean removeIndex(final String itemType) {
String index = getIndex(itemType);
- Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeIndex") {
+ Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeIndex", this.bundleContext, this.fatalIllegalStateErrors) {
protected Boolean execute(Object... args) throws IOException {
GetIndexRequest getIndexRequest = new GetIndexRequest(index);
boolean indexExists = client.indices().exists(getIndexRequest, RequestOptions.DEFAULT);
@@ -1129,7 +1135,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
@Override
public Map<String, Map<String, Object>> getPropertiesMapping(final String itemType) {
- return new InClassLoaderExecute<Map<String, Map<String, Object>>>(metricsService, this.getClass().getName() + ".getPropertiesMapping") {
+ return new InClassLoaderExecute<Map<String, Map<String, Object>>>(metricsService, this.getClass().getName() + ".getPropertiesMapping", this.bundleContext, this.fatalIllegalStateErrors) {
@SuppressWarnings("unchecked")
protected Map<String, Map<String, Object>> execute(Object... args) throws Exception {
// Get all mapping for current itemType
@@ -1226,7 +1232,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
public boolean saveQuery(final String queryName, final String query) {
- Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".saveQuery") {
+ Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".saveQuery", this.bundleContext, this.fatalIllegalStateErrors) {
protected Boolean execute(Object... args) throws Exception {
//Index the query = register it in the percolator
try {
@@ -1261,7 +1267,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
@Override
public boolean removeQuery(final String queryName) {
- Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeQuery") {
+ Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".removeQuery", this.bundleContext, this.fatalIllegalStateErrors) {
protected Boolean execute(Object... args) throws Exception {
//Index the query = register it in the percolator
try {
@@ -1382,7 +1388,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
private long queryCount(final QueryBuilder filter, final String itemType) {
- return new InClassLoaderExecute<Long>(metricsService, this.getClass().getName() + ".queryCount") {
+ return new InClassLoaderExecute<Long>(metricsService, this.getClass().getName() + ".queryCount", this.bundleContext, this.fatalIllegalStateErrors) {
@Override
protected Long execute(Object... args) throws IOException {
@@ -1398,7 +1404,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
private <T extends Item> PartialList<T> query(final QueryBuilder query, final String sortBy, final Class<T> clazz, final int offset, final int size, final String[] routing, final String scrollTimeValidity) {
- return new InClassLoaderExecute<PartialList<T>>(metricsService, this.getClass().getName() + ".query") {
+ return new InClassLoaderExecute<PartialList<T>>(metricsService, this.getClass().getName() + ".query", this.bundleContext, this.fatalIllegalStateErrors) {
@Override
protected PartialList<T> execute(Object... args) throws Exception {
@@ -1524,7 +1530,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
@Override
public <T extends Item> PartialList<T> continueScrollQuery(final Class<T> clazz, final String scrollIdentifier, final String scrollTimeValidity) {
- return new InClassLoaderExecute<PartialList<T>>(metricsService, this.getClass().getName() + ".continueScrollQuery") {
+ return new InClassLoaderExecute<PartialList<T>>(metricsService, this.getClass().getName() + ".continueScrollQuery", this.bundleContext, this.fatalIllegalStateErrors) {
@Override
protected PartialList<T> execute(Object... args) throws Exception {
@@ -1580,7 +1586,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
private Map<String, Long> aggregateQuery(final Condition filter, final BaseAggregate aggregate, final String itemType,
final boolean optimizedQuery) {
- return new InClassLoaderExecute<Map<String, Long>>(metricsService, this.getClass().getName() + ".aggregateQuery") {
+ return new InClassLoaderExecute<Map<String, Long>>(metricsService, this.getClass().getName() + ".aggregateQuery", this.bundleContext, this.fatalIllegalStateErrors) {
@Override
protected Map<String, Long> execute(Object... args) throws IOException {
@@ -1741,7 +1747,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
@Override
public void refresh() {
- new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".refresh") {
+ new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".refresh", this.bundleContext, this.fatalIllegalStateErrors) {
protected Boolean execute(Object... args) {
if (bulkProcessor != null) {
bulkProcessor.flush();
@@ -1758,7 +1764,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
@Override
public <T extends Item> void refreshIndex(Class<T> clazz, Date dateHint){
- new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".refreshIndex") {
+ new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".refreshIndex", this.bundleContext, this.fatalIllegalStateErrors) {
protected Boolean execute(Object... args) {
try {
String itemType = Item.getItemType(clazz);
@@ -1776,7 +1782,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
@Override
public void purge(final Date date) {
- new InClassLoaderExecute<Object>(metricsService, this.getClass().getName() + ".purgeWithDate") {
+ new InClassLoaderExecute<Object>(metricsService, this.getClass().getName() + ".purgeWithDate", this.bundleContext, this.fatalIllegalStateErrors) {
@Override
protected Object execute(Object... args) throws Exception {
@@ -1812,7 +1818,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
@Override
public void purge(final String scope) {
- new InClassLoaderExecute<Void>(metricsService, this.getClass().getName() + ".purgeWithScope") {
+ new InClassLoaderExecute<Void>(metricsService, this.getClass().getName() + ".purgeWithScope", this.bundleContext, this.fatalIllegalStateErrors) {
@Override
protected Void execute(Object... args) throws IOException {
QueryBuilder query = termQuery("scope", scope);
@@ -1864,7 +1870,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
@Override
public Map<String, Double> getSingleValuesMetrics(final Condition condition, final String[] metrics, final String field, final String itemType) {
- return new InClassLoaderExecute<Map<String, Double>>(metricsService, this.getClass().getName() + ".getSingleValuesMetrics") {
+ return new InClassLoaderExecute<Map<String, Double>>(metricsService, this.getClass().getName() + ".getSingleValuesMetrics", this.bundleContext, this.fatalIllegalStateErrors) {
@Override
protected Map<String, Double> execute(Object... args) throws IOException {
@@ -1934,10 +1940,14 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
private String timerName;
private MetricsService metricsService;
+ private BundleContext bundleContext;
+ private String[] fatalIllegalStateErrors; // Errors that if occur - stop the application
- public InClassLoaderExecute(MetricsService metricsService, String timerName) {
+ public InClassLoaderExecute(MetricsService metricsService, String timerName, BundleContext bundleContext, String[] fatalIllegalStateErrors) {
this.timerName = timerName;
this.metricsService = metricsService;
+ this.bundleContext = bundleContext;
+ this.fatalIllegalStateErrors = fatalIllegalStateErrors;
}
protected abstract T execute(Object... args) throws Exception;
@@ -1961,12 +1971,34 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
try {
return executeInClassLoader(timerName, args);
} catch (Throwable t) {
- if (logError) {
- logger.error("Error while executing in class loader", t);
+ Throwable tTemp = t;
+ // Go over the stack trace and check if there were any fatal state errors
+ while (tTemp != null) {
+ if (tTemp instanceof IllegalStateException && Arrays.stream(this.fatalIllegalStateErrors).anyMatch(tTemp.getMessage()::contains)) {
+ handleFatalStateError(); // Stop application
+ return null;
+ }
+ tTemp = tTemp.getCause();
}
+ handleError(t, logError);
}
return null;
}
+
+ private void handleError(Throwable t, boolean logError) {
+ if (logError) {
+ logger.error("Error while executing in class loader", t);
+ }
+ }
+
+ private void handleFatalStateError() {
+ logger.error("Fatal state error occurred - stopping application");
+ try {
+ this.bundleContext.getBundle(0).stop();
+ } catch (Throwable tInner) { // Stopping system bundle failed - force exit
+ System.exit(-1);
+ }
+ }
}
private <T extends Item> boolean isCacheActiveForClass(String className) {
diff --git a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 04bd53b..5888c26 100644
--- a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -104,6 +104,7 @@
<property name="indexMappingTotalFieldsLimit" value="${es.indexMappingTotalFieldsLimit}"/>
<property name="indexMaxDocValueFieldsSearch" value="${es.indexMaxDocValueFieldsSearch}"/>
<property name="elasticSearchAddresses" value="${es.elasticSearchAddresses}"/>
+ <property name="fatalIllegalStateErrors" value="${es.fatalIllegalStateErrors}"/>
<property name="defaultQueryLimit" value="${es.defaultQueryLimit}"/>
<property name="itemsMonthlyIndexedOverride" value="${es.monthlyIndex.itemsMonthlyIndexedOverride}" />
<property name="routingByType">
diff --git a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
index ffecb4a..c6205ed 100644
--- a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
+++ b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
@@ -20,6 +20,7 @@ cluster.name=${org.apache.unomi.elasticsearch.cluster.name:-contextElasticSearch
# hostA:9200,hostB:9200
# Note: the port number must be repeated for each host.
elasticSearchAddresses=${org.apache.unomi.elasticsearch.addresses:-localhost:9200}
+fatalIllegalStateErrors=${org.apache.unomi.elasticsearch.fatalIllegalStateErrors:-}
index.prefix=${org.apache.unomi.elasticsearch.index.prefix:-context}
monthlyIndex.numberOfShards=${org.apache.unomi.elasticsearch.monthlyIndex.nbShards:-5}
monthlyIndex.numberOfReplicas=${org.apache.unomi.elasticsearch.monthlyIndex.nbReplicas:-0}