You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by sh...@apache.org on 2017/10/17 15:09:02 UTC

incubator-unomi git commit: UNOMI-130 Use index templates to create new indexes with default mappings instead of Java code - Replace manual index creation for monthly indexes

Repository: incubator-unomi
Updated Branches:
  refs/heads/master 8de0f782b -> 79ba27236


UNOMI-130 Use index templates to create new indexes with default mappings instead of Java code
- Replace manual index creation for monthly indexes

Signed-off-by: Serge Huber <sh...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/79ba2723
Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/79ba2723
Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/79ba2723

Branch: refs/heads/master
Commit: 79ba27236e3168798ffa234c07d4d7c6beeceb5a
Parents: 8de0f78
Author: Serge Huber <sh...@apache.org>
Authored: Tue Oct 17 17:08:53 2017 +0200
Committer: Serge Huber <sh...@apache.org>
Committed: Tue Oct 17 17:08:53 2017 +0200

----------------------------------------------------------------------
 .../ElasticSearchPersistenceServiceImpl.java    | 209 +++++++++----------
 1 file changed, 95 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/79ba2723/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
----------------------------------------------------------------------
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index 4b2df2e..1461c33 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -34,6 +34,11 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
 import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
 import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
 import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
+import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest;
+import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateResponse;
+import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
+import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
+import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
 import org.elasticsearch.action.bulk.*;
 import org.elasticsearch.action.get.GetResponse;
 import org.elasticsearch.action.index.IndexRequestBuilder;
@@ -51,6 +56,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.DistanceUnit;
 import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
@@ -128,12 +134,9 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
     private Map<String, String> indexNames;
     private List<String> itemsMonthlyIndexed;
     private Map<String, String> routingByType;
-    private Set<String> existingIndexNames = new TreeSet<String>();
 
     private Integer defaultQueryLimit = 10;
 
-    private Timer timer;
-
     private String bulkProcessorConcurrentRequests = "1";
     private String bulkProcessorBulkActions = "1000";
     private String bulkProcessorBulkSize = "5MB";
@@ -242,8 +245,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
 
     public void start() throws Exception {
 
-        loadPredefinedMappings(bundleContext, false);
-
         // on startup
         new InClassLoaderExecute<Object>() {
             public Object execute(Object... args) throws Exception {
@@ -298,6 +299,15 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                     throw new Exception("Error checking ElasticSearch versions", e);
                 }
 
+                loadPredefinedMappings(bundleContext, false);
+
+                // load predefined mappings and condition dispatchers of any bundles that were started before this one.
+                for (Bundle existingBundle : bundleContext.getBundles()) {
+                    if (existingBundle.getBundleContext() != null) {
+                        loadPredefinedMappings(existingBundle.getBundleContext(), false);
+                    }
+                }
+
                 // @todo is there a better way to detect index existence than to wait for it to startup ?
                 boolean indexExists = false;
                 int tries = 0;
@@ -331,22 +341,12 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                     }
                 }
 
-                client.admin().indices().preparePutTemplate(indexName + "_monthlyindex")
-                        .setTemplate(indexName + "-*")
-                        .setOrder(1)
-                        .setSettings(Settings.builder()
-                                .put(NUMBER_OF_SHARDS, Integer.parseInt(monthlyIndexNumberOfShards))
-                                .put(NUMBER_OF_REPLICAS, Integer.parseInt(monthlyIndexNumberOfReplicas))
-                                .build()).execute().actionGet();
-
-                getMonthlyIndex(new Date(), true);
+                createMonthlyIndexTemplate();
 
                 if (client != null && bulkProcessor == null) {
                     bulkProcessor = getBulkProcessor();
                 }
 
-                refreshExistingIndexNames();
-
                 logger.info("Waiting for GREEN cluster status...");
 
                 client.admin().cluster().prepareHealth()
@@ -359,51 +359,11 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
             }
         }.executeInClassLoader();
 
-
         bundleContext.addBundleListener(this);
 
-        timer = new Timer();
-
-        timer.scheduleAtFixedRate(new TimerTask() {
-            @Override
-            public void run() {
-                GregorianCalendar gc = new GregorianCalendar();
-                int thisMonth = gc.get(Calendar.MONTH);
-                gc.add(Calendar.DAY_OF_MONTH, 1);
-                if (gc.get(Calendar.MONTH) != thisMonth) {
-                    String monthlyIndex = getMonthlyIndex(gc.getTime(), true);
-                    existingIndexNames.add(monthlyIndex);
-                }
-            }
-        }, 10000L, 24L * 60L * 60L * 1000L);
-
-        // load predefined mappings and condition dispatchers of any bundles that were started before this one.
-        for (Bundle existingBundle : bundleContext.getBundles()) {
-            if (existingBundle.getBundleContext() != null) {
-                loadPredefinedMappings(existingBundle.getBundleContext(), true);
-            }
-        }
-
         logger.info(this.getClass().getName() + " service started successfully.");
     }
 
-    private void refreshExistingIndexNames() {
-        new InClassLoaderExecute<Boolean>() {
-            protected Boolean execute(Object... args) throws Exception {
-                try {
-                    logger.info("Refreshing existing indices list...");
-                    IndicesStatsResponse indicesStatsResponse = client.admin().indices().prepareStats().all().execute().get();
-                    existingIndexNames = new TreeSet<>(indicesStatsResponse.getIndices().keySet());
-                } catch (InterruptedException e) {
-                    throw new Exception("Error retrieving indices stats", e);
-                } catch (ExecutionException e) {
-                    throw new Exception("Error retrieving indices stats", e);
-                }
-                return true;
-            }
-        }.catchingExecuteInClassLoader(true);
-    }
-
     public BulkProcessor getBulkProcessor() {
         if (bulkProcessor != null) {
             return bulkProcessor;
@@ -502,11 +462,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
             }
         }.catchingExecuteInClassLoader(true);
 
-        if (timer != null) {
-            timer.cancel();
-            timer = null;
-        }
-
         bundleContext.removeBundleListener(this);
     }
 
@@ -543,32 +498,9 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
         }
     }
 
-    private String getMonthlyIndex(Date date) {
-        return getMonthlyIndex(date, false);
-    }
-
-    private String getMonthlyIndex(Date date, boolean checkAndCreate) {
+    private String getMonthlyIndexName(Date date) {
         String d = new SimpleDateFormat("-YYYY-MM").format(date);
         String monthlyIndexName = indexName + d;
-
-        if (checkAndCreate) {
-            IndicesExistsResponse indicesExistsResponse = client.admin().indices().prepareExists(monthlyIndexName).execute().actionGet();
-            boolean indexExists = indicesExistsResponse.isExists();
-            if (!indexExists) {
-                logger.info("{} index doesn't exist yet, creating it...", monthlyIndexName);
-
-                Map<String, String> indexMappings = new HashMap<String, String>();
-                indexMappings.put("_default_", mappings.get("_default_"));
-                for (Map.Entry<String, String> entry : mappings.entrySet()) {
-                    if (itemsMonthlyIndexed.contains(entry.getKey())) {
-                        indexMappings.put(entry.getKey(), entry.getValue());
-                    }
-                }
-
-                internalCreateIndex(monthlyIndexName, indexMappings);
-                logger.info("{} index created.", monthlyIndexName);
-            }
-        }
         return monthlyIndexName;
     }
 
@@ -637,7 +569,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                         return null;
                     } else {
                         String index = indexNames.containsKey(itemType) ? indexNames.get(itemType) :
-                                (itemsMonthlyIndexed.contains(itemType) ? getMonthlyIndex(dateHint) : indexName);
+                                (itemsMonthlyIndexed.contains(itemType) ? getMonthlyIndexName(dateHint) : indexName);
 
                         GetResponse response = client.prepareGet(index, itemType, itemId)
                                 .execute()
@@ -653,7 +585,8 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                         }
                     }
                 } catch (IndexNotFoundException e) {
-                    throw new Exception("No index found for itemType=" + clazz.getName() + " itemId=" + itemId, e);
+                    // this can happen if we are just testing the existence of the item, it is not always an error.
+                    return null;
                 } catch (Exception ex) {
                     throw new Exception("Error loading itemType=" + clazz.getName() + " itemId=" + itemId, ex);
                 }
@@ -675,28 +608,13 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                     String source = ESCustomObjectMapper.getObjectMapper().writeValueAsString(item);
                     String itemType = item.getItemType();
                     String index = indexNames.containsKey(itemType) ? indexNames.get(itemType) :
-                            (itemsMonthlyIndexed.contains(itemType) ? getMonthlyIndex(((TimestampedItem) item).getTimeStamp()) : indexName);
+                            (itemsMonthlyIndexed.contains(itemType) ? getMonthlyIndexName(((TimestampedItem) item).getTimeStamp()) : indexName);
                     IndexRequestBuilder indexBuilder = client.prepareIndex(index, itemType, item.getItemId())
                             .setSource(source);
                     if (routingByType.containsKey(itemType)) {
                         indexBuilder = indexBuilder.setRouting(routingByType.get(itemType));
                     }
 
-                    if (!existingIndexNames.contains(index)) {
-                        // index probably doesn't exist, unless something else has already created it.
-                        if (itemsMonthlyIndexed.contains(itemType)) {
-                            Date timeStamp = ((TimestampedItem) item).getTimeStamp();
-                            if (timeStamp != null) {
-                                getMonthlyIndex(timeStamp, true);
-                            } else {
-                                logger.warn("Missing time stamp on item " + item + " id=" + item.getItemId() + " can't create related monthly index !");
-                            }
-                        } else {
-                            // this is not a timestamped index, should we create it anyway ?
-                            createIndex(index);
-                        }
-                    }
-
                     try {
                         if (bulkProcessor == null || !useBatching) {
                             indexBuilder.execute().actionGet();
@@ -704,9 +622,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                             bulkProcessor.add(indexBuilder.request());
                         }
                     } catch (IndexNotFoundException e) {
-                        if (existingIndexNames.contains(index)) {
-                            existingIndexNames.remove(index);
-                        }
                     }
                     return true;
                 } catch (IOException e) {
@@ -734,7 +649,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                     String itemType = (String) clazz.getField("ITEM_TYPE").get(null);
 
                     String index = indexNames.containsKey(itemType) ? indexNames.get(itemType) :
-                            (itemsMonthlyIndexed.contains(itemType) && dateHint != null ? getMonthlyIndex(dateHint) : indexName);
+                            (itemsMonthlyIndexed.contains(itemType) && dateHint != null ? getMonthlyIndexName(dateHint) : indexName);
 
                     if (bulkProcessor == null) {
                         client.prepareUpdate(index, itemType, itemId).setDoc(source)
@@ -769,7 +684,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                     String itemType = (String) clazz.getField("ITEM_TYPE").get(null);
 
                     String index = indexNames.containsKey(itemType) ? indexNames.get(itemType) :
-                            (itemsMonthlyIndexed.contains(itemType) && dateHint != null ? getMonthlyIndex(dateHint) : indexName);
+                            (itemsMonthlyIndexed.contains(itemType) && dateHint != null ? getMonthlyIndexName(dateHint) : indexName);
 
                     for (int i = 0; i < scripts.length; i++) {
                         Script actualScript = new Script(ScriptType.INLINE, "painless", scripts[i], scriptParams[i]);
@@ -828,7 +743,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                     String itemType = (String) clazz.getField("ITEM_TYPE").get(null);
 
                     String index = indexNames.containsKey(itemType) ? indexNames.get(itemType) :
-                            (itemsMonthlyIndexed.contains(itemType) && dateHint != null ? getMonthlyIndex(dateHint) : indexName);
+                            (itemsMonthlyIndexed.contains(itemType) && dateHint != null ? getMonthlyIndexName(dateHint) : indexName);
 
                     Script actualScript = new Script(ScriptType.INLINE, "painless", script, scriptParams);
 
@@ -934,6 +849,74 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
         }
     }
 
+
+    public boolean indexTemplateExists(final String templateName) {
+        Boolean result = new InClassLoaderExecute<Boolean>() {
+            protected Boolean execute(Object... args) {
+                GetIndexTemplatesResponse getIndexTemplatesResponse = client.admin().indices().prepareGetTemplates(templateName).execute().actionGet();
+                return getIndexTemplatesResponse.getIndexTemplates().size() == 1;
+            }
+        }.catchingExecuteInClassLoader(true);
+        if (result == null) {
+            return false;
+        } else {
+            return result;
+        }
+    }
+
+    public boolean removeIndexTemplate(final String templateName) {
+        Boolean result = new InClassLoaderExecute<Boolean>() {
+            protected Boolean execute(Object... args) {
+                DeleteIndexTemplateResponse deleteIndexTemplateResponse = client.admin().indices().deleteTemplate(new DeleteIndexTemplateRequest(templateName)).actionGet();
+                return deleteIndexTemplateResponse.isAcknowledged();
+            }
+        }.catchingExecuteInClassLoader(true);
+        if (result == null) {
+            return false;
+        } else {
+            return result;
+        }
+    }
+
+    public boolean createMonthlyIndexTemplate() {
+        Boolean result = new InClassLoaderExecute<Boolean>() {
+            protected Boolean execute(Object... args) {
+                PutIndexTemplateRequest putIndexTemplateRequest = new PutIndexTemplateRequest("context-monthly-indices")
+                        .template(indexName + "-*")
+                        .settings("{\n" +
+                                "    \"index\" : {\n" +
+                                "        \"number_of_shards\" : " + monthlyIndexNumberOfShards + ",\n" +
+                                "        \"number_of_replicas\" : " + monthlyIndexNumberOfReplicas + "\n" +
+                                "    },\n" +
+                                "    \"analysis\": {\n" +
+                                "      \"analyzer\": {\n" +
+                                "        \"folding\": {\n" +
+                                "          \"type\":\"custom\",\n" +
+                                "          \"tokenizer\": \"keyword\",\n" +
+                                "          \"filter\":  [ \"lowercase\", \"asciifolding\" ]\n" +
+                                "        }\n" +
+                                "      }\n" +
+                                "    }\n" +
+                                "}\n", XContentType.JSON);
+                Map<String, String> indexMappings = new HashMap<String, String>();
+                indexMappings.put("_default_", mappings.get("_default_"));
+                for (Map.Entry<String, String> entry : mappings.entrySet()) {
+                    if (itemsMonthlyIndexed.contains(entry.getKey())) {
+                        indexMappings.put(entry.getKey(), entry.getValue());
+                    }
+                }
+                putIndexTemplateRequest.mappings().putAll(indexMappings);
+                PutIndexTemplateResponse putIndexTemplateResponse = client.admin().indices().putTemplate(putIndexTemplateRequest).actionGet();
+                return putIndexTemplateResponse.isAcknowledged();
+            }
+        }.catchingExecuteInClassLoader(true);
+        if (result == null) {
+            return false;
+        } else {
+            return result;
+        }
+    }
+
     public boolean createIndex(final String indexName) {
         Boolean result = new InClassLoaderExecute<Boolean>() {
             protected Boolean execute(Object... args) {
@@ -966,7 +949,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                 boolean indexExists = indicesExistsResponse.isExists();
                 if (indexExists) {
                     client.admin().indices().prepareDelete(indexName).execute().actionGet();
-                    existingIndexNames.remove(indexName);
                 }
                 return indexExists;
             }
@@ -994,14 +976,13 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                         "        }\n" +
                         "      }\n" +
                         "    }\n" +
-                        "}\n");
+                        "}\n", XContentType.JSON);
 
         for (Map.Entry<String, String> entry : mappings.entrySet()) {
-            builder.addMapping(entry.getKey(), entry.getValue());
+            builder.addMapping(entry.getKey(), entry.getValue(), XContentType.JSON);
         }
 
         builder.execute().actionGet();
-        existingIndexNames.add(indexName);
 
     }
 
@@ -1010,7 +991,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
         client.admin().indices()
                 .preparePutMapping(indexName)
                 .setType(type)
-                .setSource(source)
+                .setSource(source, XContentType.JSON)
                 .execute().actionGet();
     }