You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by sh...@apache.org on 2017/10/17 15:09:02 UTC
incubator-unomi git commit: UNOMI-130 Use index templates to create
new indexes with default mappings instead of Java code - Replace manual index
creation for monthly indexes
Repository: incubator-unomi
Updated Branches:
refs/heads/master 8de0f782b -> 79ba27236
UNOMI-130 Use index templates to create new indexes with default mappings instead of Java code
- Replace manual index creation for monthly indexes
Signed-off-by: Serge Huber <sh...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/79ba2723
Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/79ba2723
Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/79ba2723
Branch: refs/heads/master
Commit: 79ba27236e3168798ffa234c07d4d7c6beeceb5a
Parents: 8de0f78
Author: Serge Huber <sh...@apache.org>
Authored: Tue Oct 17 17:08:53 2017 +0200
Committer: Serge Huber <sh...@apache.org>
Committed: Tue Oct 17 17:08:53 2017 +0200
----------------------------------------------------------------------
.../ElasticSearchPersistenceServiceImpl.java | 209 +++++++++----------
1 file changed, 95 insertions(+), 114 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/79ba2723/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
----------------------------------------------------------------------
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index 4b2df2e..1461c33 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -34,6 +34,11 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
+import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest;
+import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateResponse;
+import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
+import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
+import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
import org.elasticsearch.action.bulk.*;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
@@ -51,6 +56,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.DistanceUnit;
import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
@@ -128,12 +134,9 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
private Map<String, String> indexNames;
private List<String> itemsMonthlyIndexed;
private Map<String, String> routingByType;
- private Set<String> existingIndexNames = new TreeSet<String>();
private Integer defaultQueryLimit = 10;
- private Timer timer;
-
private String bulkProcessorConcurrentRequests = "1";
private String bulkProcessorBulkActions = "1000";
private String bulkProcessorBulkSize = "5MB";
@@ -242,8 +245,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
public void start() throws Exception {
- loadPredefinedMappings(bundleContext, false);
-
// on startup
new InClassLoaderExecute<Object>() {
public Object execute(Object... args) throws Exception {
@@ -298,6 +299,15 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
throw new Exception("Error checking ElasticSearch versions", e);
}
+ loadPredefinedMappings(bundleContext, false);
+
+ // load predefined mappings and condition dispatchers of any bundles that were started before this one.
+ for (Bundle existingBundle : bundleContext.getBundles()) {
+ if (existingBundle.getBundleContext() != null) {
+ loadPredefinedMappings(existingBundle.getBundleContext(), false);
+ }
+ }
+
// @todo is there a better way to detect index existence than to wait for it to startup ?
boolean indexExists = false;
int tries = 0;
@@ -331,22 +341,12 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
}
- client.admin().indices().preparePutTemplate(indexName + "_monthlyindex")
- .setTemplate(indexName + "-*")
- .setOrder(1)
- .setSettings(Settings.builder()
- .put(NUMBER_OF_SHARDS, Integer.parseInt(monthlyIndexNumberOfShards))
- .put(NUMBER_OF_REPLICAS, Integer.parseInt(monthlyIndexNumberOfReplicas))
- .build()).execute().actionGet();
-
- getMonthlyIndex(new Date(), true);
+ createMonthlyIndexTemplate();
if (client != null && bulkProcessor == null) {
bulkProcessor = getBulkProcessor();
}
- refreshExistingIndexNames();
-
logger.info("Waiting for GREEN cluster status...");
client.admin().cluster().prepareHealth()
@@ -359,51 +359,11 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
}.executeInClassLoader();
-
bundleContext.addBundleListener(this);
- timer = new Timer();
-
- timer.scheduleAtFixedRate(new TimerTask() {
- @Override
- public void run() {
- GregorianCalendar gc = new GregorianCalendar();
- int thisMonth = gc.get(Calendar.MONTH);
- gc.add(Calendar.DAY_OF_MONTH, 1);
- if (gc.get(Calendar.MONTH) != thisMonth) {
- String monthlyIndex = getMonthlyIndex(gc.getTime(), true);
- existingIndexNames.add(monthlyIndex);
- }
- }
- }, 10000L, 24L * 60L * 60L * 1000L);
-
- // load predefined mappings and condition dispatchers of any bundles that were started before this one.
- for (Bundle existingBundle : bundleContext.getBundles()) {
- if (existingBundle.getBundleContext() != null) {
- loadPredefinedMappings(existingBundle.getBundleContext(), true);
- }
- }
-
logger.info(this.getClass().getName() + " service started successfully.");
}
- private void refreshExistingIndexNames() {
- new InClassLoaderExecute<Boolean>() {
- protected Boolean execute(Object... args) throws Exception {
- try {
- logger.info("Refreshing existing indices list...");
- IndicesStatsResponse indicesStatsResponse = client.admin().indices().prepareStats().all().execute().get();
- existingIndexNames = new TreeSet<>(indicesStatsResponse.getIndices().keySet());
- } catch (InterruptedException e) {
- throw new Exception("Error retrieving indices stats", e);
- } catch (ExecutionException e) {
- throw new Exception("Error retrieving indices stats", e);
- }
- return true;
- }
- }.catchingExecuteInClassLoader(true);
- }
-
public BulkProcessor getBulkProcessor() {
if (bulkProcessor != null) {
return bulkProcessor;
@@ -502,11 +462,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
}.catchingExecuteInClassLoader(true);
- if (timer != null) {
- timer.cancel();
- timer = null;
- }
-
bundleContext.removeBundleListener(this);
}
@@ -543,32 +498,9 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
}
- private String getMonthlyIndex(Date date) {
- return getMonthlyIndex(date, false);
- }
-
- private String getMonthlyIndex(Date date, boolean checkAndCreate) {
+ private String getMonthlyIndexName(Date date) {
String d = new SimpleDateFormat("-YYYY-MM").format(date);
String monthlyIndexName = indexName + d;
-
- if (checkAndCreate) {
- IndicesExistsResponse indicesExistsResponse = client.admin().indices().prepareExists(monthlyIndexName).execute().actionGet();
- boolean indexExists = indicesExistsResponse.isExists();
- if (!indexExists) {
- logger.info("{} index doesn't exist yet, creating it...", monthlyIndexName);
-
- Map<String, String> indexMappings = new HashMap<String, String>();
- indexMappings.put("_default_", mappings.get("_default_"));
- for (Map.Entry<String, String> entry : mappings.entrySet()) {
- if (itemsMonthlyIndexed.contains(entry.getKey())) {
- indexMappings.put(entry.getKey(), entry.getValue());
- }
- }
-
- internalCreateIndex(monthlyIndexName, indexMappings);
- logger.info("{} index created.", monthlyIndexName);
- }
- }
return monthlyIndexName;
}
@@ -637,7 +569,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
return null;
} else {
String index = indexNames.containsKey(itemType) ? indexNames.get(itemType) :
- (itemsMonthlyIndexed.contains(itemType) ? getMonthlyIndex(dateHint) : indexName);
+ (itemsMonthlyIndexed.contains(itemType) ? getMonthlyIndexName(dateHint) : indexName);
GetResponse response = client.prepareGet(index, itemType, itemId)
.execute()
@@ -653,7 +585,8 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
}
} catch (IndexNotFoundException e) {
- throw new Exception("No index found for itemType=" + clazz.getName() + " itemId=" + itemId, e);
+ // this can happen if we are just testing the existence of the item, it is not always an error.
+ return null;
} catch (Exception ex) {
throw new Exception("Error loading itemType=" + clazz.getName() + " itemId=" + itemId, ex);
}
@@ -675,28 +608,13 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
String source = ESCustomObjectMapper.getObjectMapper().writeValueAsString(item);
String itemType = item.getItemType();
String index = indexNames.containsKey(itemType) ? indexNames.get(itemType) :
- (itemsMonthlyIndexed.contains(itemType) ? getMonthlyIndex(((TimestampedItem) item).getTimeStamp()) : indexName);
+ (itemsMonthlyIndexed.contains(itemType) ? getMonthlyIndexName(((TimestampedItem) item).getTimeStamp()) : indexName);
IndexRequestBuilder indexBuilder = client.prepareIndex(index, itemType, item.getItemId())
.setSource(source);
if (routingByType.containsKey(itemType)) {
indexBuilder = indexBuilder.setRouting(routingByType.get(itemType));
}
- if (!existingIndexNames.contains(index)) {
- // index probably doesn't exist, unless something else has already created it.
- if (itemsMonthlyIndexed.contains(itemType)) {
- Date timeStamp = ((TimestampedItem) item).getTimeStamp();
- if (timeStamp != null) {
- getMonthlyIndex(timeStamp, true);
- } else {
- logger.warn("Missing time stamp on item " + item + " id=" + item.getItemId() + " can't create related monthly index !");
- }
- } else {
- // this is not a timestamped index, should we create it anyway ?
- createIndex(index);
- }
- }
-
try {
if (bulkProcessor == null || !useBatching) {
indexBuilder.execute().actionGet();
@@ -704,9 +622,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
bulkProcessor.add(indexBuilder.request());
}
} catch (IndexNotFoundException e) {
- if (existingIndexNames.contains(index)) {
- existingIndexNames.remove(index);
- }
}
return true;
} catch (IOException e) {
@@ -734,7 +649,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
String itemType = (String) clazz.getField("ITEM_TYPE").get(null);
String index = indexNames.containsKey(itemType) ? indexNames.get(itemType) :
- (itemsMonthlyIndexed.contains(itemType) && dateHint != null ? getMonthlyIndex(dateHint) : indexName);
+ (itemsMonthlyIndexed.contains(itemType) && dateHint != null ? getMonthlyIndexName(dateHint) : indexName);
if (bulkProcessor == null) {
client.prepareUpdate(index, itemType, itemId).setDoc(source)
@@ -769,7 +684,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
String itemType = (String) clazz.getField("ITEM_TYPE").get(null);
String index = indexNames.containsKey(itemType) ? indexNames.get(itemType) :
- (itemsMonthlyIndexed.contains(itemType) && dateHint != null ? getMonthlyIndex(dateHint) : indexName);
+ (itemsMonthlyIndexed.contains(itemType) && dateHint != null ? getMonthlyIndexName(dateHint) : indexName);
for (int i = 0; i < scripts.length; i++) {
Script actualScript = new Script(ScriptType.INLINE, "painless", scripts[i], scriptParams[i]);
@@ -828,7 +743,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
String itemType = (String) clazz.getField("ITEM_TYPE").get(null);
String index = indexNames.containsKey(itemType) ? indexNames.get(itemType) :
- (itemsMonthlyIndexed.contains(itemType) && dateHint != null ? getMonthlyIndex(dateHint) : indexName);
+ (itemsMonthlyIndexed.contains(itemType) && dateHint != null ? getMonthlyIndexName(dateHint) : indexName);
Script actualScript = new Script(ScriptType.INLINE, "painless", script, scriptParams);
@@ -934,6 +849,74 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
}
+
+ public boolean indexTemplateExists(final String templateName) {
+ Boolean result = new InClassLoaderExecute<Boolean>() {
+ protected Boolean execute(Object... args) {
+ GetIndexTemplatesResponse getIndexTemplatesResponse = client.admin().indices().prepareGetTemplates(templateName).execute().actionGet();
+ return getIndexTemplatesResponse.getIndexTemplates().size() == 1;
+ }
+ }.catchingExecuteInClassLoader(true);
+ if (result == null) {
+ return false;
+ } else {
+ return result;
+ }
+ }
+
+ public boolean removeIndexTemplate(final String templateName) {
+ Boolean result = new InClassLoaderExecute<Boolean>() {
+ protected Boolean execute(Object... args) {
+ DeleteIndexTemplateResponse deleteIndexTemplateResponse = client.admin().indices().deleteTemplate(new DeleteIndexTemplateRequest(templateName)).actionGet();
+ return deleteIndexTemplateResponse.isAcknowledged();
+ }
+ }.catchingExecuteInClassLoader(true);
+ if (result == null) {
+ return false;
+ } else {
+ return result;
+ }
+ }
+
+ public boolean createMonthlyIndexTemplate() {
+ Boolean result = new InClassLoaderExecute<Boolean>() {
+ protected Boolean execute(Object... args) {
+ PutIndexTemplateRequest putIndexTemplateRequest = new PutIndexTemplateRequest("context-monthly-indices")
+ .template(indexName + "-*")
+ .settings("{\n" +
+ " \"index\" : {\n" +
+ " \"number_of_shards\" : " + monthlyIndexNumberOfShards + ",\n" +
+ " \"number_of_replicas\" : " + monthlyIndexNumberOfReplicas + "\n" +
+ " },\n" +
+ " \"analysis\": {\n" +
+ " \"analyzer\": {\n" +
+ " \"folding\": {\n" +
+ " \"type\":\"custom\",\n" +
+ " \"tokenizer\": \"keyword\",\n" +
+ " \"filter\": [ \"lowercase\", \"asciifolding\" ]\n" +
+ " }\n" +
+ " }\n" +
+ " }\n" +
+ "}\n", XContentType.JSON);
+ Map<String, String> indexMappings = new HashMap<String, String>();
+ indexMappings.put("_default_", mappings.get("_default_"));
+ for (Map.Entry<String, String> entry : mappings.entrySet()) {
+ if (itemsMonthlyIndexed.contains(entry.getKey())) {
+ indexMappings.put(entry.getKey(), entry.getValue());
+ }
+ }
+ putIndexTemplateRequest.mappings().putAll(indexMappings);
+ PutIndexTemplateResponse putIndexTemplateResponse = client.admin().indices().putTemplate(putIndexTemplateRequest).actionGet();
+ return putIndexTemplateResponse.isAcknowledged();
+ }
+ }.catchingExecuteInClassLoader(true);
+ if (result == null) {
+ return false;
+ } else {
+ return result;
+ }
+ }
+
public boolean createIndex(final String indexName) {
Boolean result = new InClassLoaderExecute<Boolean>() {
protected Boolean execute(Object... args) {
@@ -966,7 +949,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
boolean indexExists = indicesExistsResponse.isExists();
if (indexExists) {
client.admin().indices().prepareDelete(indexName).execute().actionGet();
- existingIndexNames.remove(indexName);
}
return indexExists;
}
@@ -994,14 +976,13 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
" }\n" +
" }\n" +
" }\n" +
- "}\n");
+ "}\n", XContentType.JSON);
for (Map.Entry<String, String> entry : mappings.entrySet()) {
- builder.addMapping(entry.getKey(), entry.getValue());
+ builder.addMapping(entry.getKey(), entry.getValue(), XContentType.JSON);
}
builder.execute().actionGet();
- existingIndexNames.add(indexName);
}
@@ -1010,7 +991,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
client.admin().indices()
.preparePutMapping(indexName)
.setType(type)
- .setSource(source)
+ .setSource(source, XContentType.JSON)
.execute().actionGet();
}