You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by sh...@apache.org on 2016/11/21 16:40:07 UTC
incubator-unomi git commit: UNOMI-63 Use ElasticSearch BulkProcessing
to perform segment updates - Track index creation through internal memory
structure
Repository: incubator-unomi
Updated Branches:
refs/heads/feature-UNOMI-28-ES2X 62d11ded0 -> 661daeea5
UNOMI-63 Use ElasticSearch BulkProcessing to perform segment updates
- Track index creation through internal memory structure
Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/661daeea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/661daeea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/661daeea
Branch: refs/heads/feature-UNOMI-28-ES2X
Commit: 661daeea5951fe7219508e710609a965a3192989
Parents: 62d11de
Author: Serge Huber <sh...@apache.org>
Authored: Mon Nov 21 17:39:59 2016 +0100
Committer: Serge Huber <sh...@apache.org>
Committed: Mon Nov 21 17:39:59 2016 +0100
----------------------------------------------------------------------
.../ElasticSearchPersistenceServiceImpl.java | 60 +++++++++++++++-----
1 file changed, 45 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/661daeea/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
----------------------------------------------------------------------
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index b954b75..733b3a9 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -411,14 +411,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
bulkProcessor = getBulkProcessor();
}
- try {
- IndicesStatsResponse indicesStatsResponse = client.admin().indices().prepareStats().all().execute().get();
- existingIndexNames = new TreeSet<>(indicesStatsResponse.getIndices().keySet());
- } catch (InterruptedException e) {
- logger.error("Error retrieving indices stats", e);
- } catch (ExecutionException e) {
- logger.error("Error retrieving indices stats", e);
- }
+ refreshExistingIndexNames();
logger.info("Waiting for index creation to complete...");
@@ -457,7 +450,8 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
int thisMonth = gc.get(Calendar.MONTH);
gc.add(Calendar.DAY_OF_MONTH, 1);
if (gc.get(Calendar.MONTH) != thisMonth) {
- getMonthlyIndex(gc.getTime(), true);
+ String monthlyIndex = getMonthlyIndex(gc.getTime(), true);
+ existingIndexNames.add(monthlyIndex);
}
}
}, 10000L, 24L * 60L * 60L * 1000L);
@@ -465,6 +459,23 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
logger.info(this.getClass().getName() + " service started successfully.");
}
+ private void refreshExistingIndexNames() {
+ new InClassLoaderExecute<Boolean>() {
+ protected Boolean execute(Object... args) {
+ try {
+ logger.info("Refreshing existing indices list...");
+ IndicesStatsResponse indicesStatsResponse = client.admin().indices().prepareStats().all().execute().get();
+ existingIndexNames = new TreeSet<>(indicesStatsResponse.getIndices().keySet());
+ } catch (InterruptedException e) {
+ logger.error("Error retrieving indices stats", e);
+ } catch (ExecutionException e) {
+ logger.error("Error retrieving indices stats", e);
+ }
+ return true;
+ }
+ }.executeInClassLoader();
+ }
+
public BulkProcessor getBulkProcessor() {
if (bulkProcessor != null) {
return bulkProcessor;
@@ -490,7 +501,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
BulkRequest request,
Throwable failure) {
logger.error("After Bulk (failure)", failure);
- // we could add index creation here in the case of index seperation by dates.
}
});
if (bulkProcessorName != null && bulkProcessorName.length() > 0) {
@@ -554,7 +564,11 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
protected Object execute(Object... args) {
logger.info("Closing ElasticSearch persistence backend...");
if (bulkProcessor != null) {
- bulkProcessor.close();
+ try {
+ bulkProcessor.awaitClose(2, TimeUnit.MINUTES);
+ } catch (InterruptedException e) {
+ logger.error("Error waiting for bulk operations to flush !", e);
+ }
}
node.close();
return null;
@@ -730,17 +744,27 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
if (routingByType.containsKey(itemType)) {
indexBuilder = indexBuilder.setRouting(routingByType.get(itemType));
}
- try {
- indexBuilder.execute().actionGet();
- } catch (IndexNotFoundException e) {
+
+ if (!existingIndexNames.contains(index)) {
+ // index probably doesn't exist, unless something else has already created it.
if (itemsMonthlyIndexed.contains(itemType)) {
Date timeStamp = ((TimestampedItem) item).getTimeStamp();
if (timeStamp != null) {
getMonthlyIndex(timeStamp, true);
- indexBuilder.execute().actionGet();
} else {
logger.warn("Missing time stamp on item " + item + " id=" + item.getItemId() + " can't create related monthly index !");
}
+ } else {
+ // this is not a timestamped index, should we create it anyway ?
+ createIndex(index);
+ }
+ }
+
+ try {
+ indexBuilder.execute().actionGet();
+ } catch (IndexNotFoundException e) {
+ if (existingIndexNames.contains(index)) {
+ existingIndexNames.remove(index);
}
}
return true;
@@ -887,6 +911,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
boolean indexExists = indicesExistsResponse.isExists();
if (indexExists) {
client.admin().indices().prepareDelete(indexName).execute().actionGet();
+ existingIndexNames.remove(indexName);
}
return indexExists;
}
@@ -919,6 +944,8 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
builder.execute().actionGet();
+ existingIndexNames.add(indexName);
+
}
@@ -1417,6 +1444,9 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
public void refresh() {
new InClassLoaderExecute<Boolean>() {
protected Boolean execute(Object... args) {
+ if (bulkProcessor != null) {
+ bulkProcessor.flush();
+ }
client.admin().indices().refresh(Requests.refreshRequest()).actionGet();
return true;
}