You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by sh...@apache.org on 2016/11/21 16:40:07 UTC

incubator-unomi git commit: UNOMI-63 Use ElasticSearch BulkProcessing to perform segment updates - Track index creation through internal memory structure

Repository: incubator-unomi
Updated Branches:
  refs/heads/feature-UNOMI-28-ES2X 62d11ded0 -> 661daeea5


UNOMI-63 Use ElasticSearch BulkProcessing to perform segment updates
- Track index creation through internal memory structure


Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/661daeea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/661daeea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/661daeea

Branch: refs/heads/feature-UNOMI-28-ES2X
Commit: 661daeea5951fe7219508e710609a965a3192989
Parents: 62d11de
Author: Serge Huber <sh...@apache.org>
Authored: Mon Nov 21 17:39:59 2016 +0100
Committer: Serge Huber <sh...@apache.org>
Committed: Mon Nov 21 17:39:59 2016 +0100

----------------------------------------------------------------------
 .../ElasticSearchPersistenceServiceImpl.java    | 60 +++++++++++++++-----
 1 file changed, 45 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/661daeea/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
----------------------------------------------------------------------
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index b954b75..733b3a9 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -411,14 +411,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                     bulkProcessor = getBulkProcessor();
                 }
 
-                try {
-                    IndicesStatsResponse indicesStatsResponse = client.admin().indices().prepareStats().all().execute().get();
-                    existingIndexNames = new TreeSet<>(indicesStatsResponse.getIndices().keySet());
-                } catch (InterruptedException e) {
-                    logger.error("Error retrieving indices stats", e);
-                } catch (ExecutionException e) {
-                    logger.error("Error retrieving indices stats", e);
-                }
+                refreshExistingIndexNames();
 
                 logger.info("Waiting for index creation to complete...");
 
@@ -457,7 +450,8 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                 int thisMonth = gc.get(Calendar.MONTH);
                 gc.add(Calendar.DAY_OF_MONTH, 1);
                 if (gc.get(Calendar.MONTH) != thisMonth) {
-                    getMonthlyIndex(gc.getTime(), true);
+                    String monthlyIndex = getMonthlyIndex(gc.getTime(), true);
+                    existingIndexNames.add(monthlyIndex);
                 }
             }
         }, 10000L, 24L * 60L * 60L * 1000L);
@@ -465,6 +459,23 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
         logger.info(this.getClass().getName() + " service started successfully.");
     }
 
+    private void refreshExistingIndexNames() {
+        new InClassLoaderExecute<Boolean>() {
+            protected Boolean execute(Object... args) {
+                try {
+                    logger.info("Refreshing existing indices list...");
+                    IndicesStatsResponse indicesStatsResponse = client.admin().indices().prepareStats().all().execute().get();
+                    existingIndexNames = new TreeSet<>(indicesStatsResponse.getIndices().keySet());
+                } catch (InterruptedException e) {
+                    logger.error("Error retrieving indices stats", e);
+                } catch (ExecutionException e) {
+                    logger.error("Error retrieving indices stats", e);
+                }
+                return true;
+            }
+        }.executeInClassLoader();
+    }
+
     public BulkProcessor getBulkProcessor() {
         if (bulkProcessor != null) {
             return bulkProcessor;
@@ -490,7 +501,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                                           BulkRequest request,
                                           Throwable failure) {
                         logger.error("After Bulk (failure)", failure);
-                        // we could add index creation here in the case of index seperation by dates.
                     }
                 });
         if (bulkProcessorName != null && bulkProcessorName.length() > 0) {
@@ -554,7 +564,11 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
             protected Object execute(Object... args) {
                 logger.info("Closing ElasticSearch persistence backend...");
                 if (bulkProcessor != null) {
-                    bulkProcessor.close();
+                    try {
+                        bulkProcessor.awaitClose(2, TimeUnit.MINUTES);
+                    } catch (InterruptedException e) {
+                        logger.error("Error waiting for bulk operations to flush !", e);
+                    }
                 }
                 node.close();
                 return null;
@@ -730,17 +744,27 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                     if (routingByType.containsKey(itemType)) {
                         indexBuilder = indexBuilder.setRouting(routingByType.get(itemType));
                     }
-                    try {
-                        indexBuilder.execute().actionGet();
-                    } catch (IndexNotFoundException e) {
+
+                    if (!existingIndexNames.contains(index)) {
+                        // index probably doesn't exist, unless something else has already created it.
                         if (itemsMonthlyIndexed.contains(itemType)) {
                             Date timeStamp = ((TimestampedItem) item).getTimeStamp();
                             if (timeStamp != null) {
                                 getMonthlyIndex(timeStamp, true);
-                                indexBuilder.execute().actionGet();
                             } else {
                                 logger.warn("Missing time stamp on item " + item + " id=" + item.getItemId() + " can't create related monthly index !");
                             }
+                        } else {
+                            // this is not a timestamped index, should we create it anyway ?
+                            createIndex(index);
+                        }
+                    }
+
+                    try {
+                        indexBuilder.execute().actionGet();
+                    } catch (IndexNotFoundException e) {
+                        if (existingIndexNames.contains(index)) {
+                            existingIndexNames.remove(index);
                         }
                     }
                     return true;
@@ -887,6 +911,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                 boolean indexExists = indicesExistsResponse.isExists();
                 if (indexExists) {
                     client.admin().indices().prepareDelete(indexName).execute().actionGet();
+                    existingIndexNames.remove(indexName);
                 }
                 return indexExists;
             }
@@ -919,6 +944,8 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
         }
 
         builder.execute().actionGet();
+        existingIndexNames.add(indexName);
+
     }
 
 
@@ -1417,6 +1444,9 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
     public void refresh() {
         new InClassLoaderExecute<Boolean>() {
             protected Boolean execute(Object... args) {
+                if (bulkProcessor != null) {
+                    bulkProcessor.flush();
+                }
                 client.admin().indices().refresh(Requests.refreshRequest()).actionGet();
                 return true;
             }