You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by jk...@apache.org on 2022/05/25 15:30:45 UTC
[unomi] branch master updated: UNOMI-395 : add migration logic to create scopes entries from scope i… (#427)
This is an automated email from the ASF dual-hosted git repository.
jkevan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/unomi.git
The following commit(s) were added to refs/heads/master by this push:
new da80579b1 UNOMI-395 : add migration logic to create scopes entries from scope i… (#427)
da80579b1 is described below
commit da80579b12fa6ee596ceb97826aec22e8edd5935
Author: jsinovassin <58...@users.noreply.github.com>
AuthorDate: Wed May 25 17:30:40 2022 +0200
UNOMI-395 : add migration logic to create scopes entries from scope i… (#427)
* UNOMI-395 : add migration logic to create scopes entries from scope in existing events
* set json text in json files
---
.../resources/META-INF/cxs/mappings/scope.json | 3 -
pom.xml | 2 +
tools/shell-commands/pom.xml | 5 +
.../unomi/shell/migration/impl/MigrationTo200.java | 191 ++++++++++++++++-----
.../resources/requestBody/bulkSaveScope.ndjson | 2 +
.../requestBody/copyValueScopeToSourceId.json | 6 +
.../main/resources/requestBody/scopeMapping.json | 44 +++++
.../main/resources/requestBody/searchScope.json | 16 ++
.../main/resources/requestBody/updateMapping.json | 14 ++
9 files changed, 238 insertions(+), 45 deletions(-)
diff --git a/persistence-elasticsearch/core/src/main/resources/META-INF/cxs/mappings/scope.json b/persistence-elasticsearch/core/src/main/resources/META-INF/cxs/mappings/scope.json
index 30234221c..27fa2b384 100644
--- a/persistence-elasticsearch/core/src/main/resources/META-INF/cxs/mappings/scope.json
+++ b/persistence-elasticsearch/core/src/main/resources/META-INF/cxs/mappings/scope.json
@@ -18,8 +18,5 @@
}
],
"properties": {
- "value": {
- "type": "text"
- }
}
}
diff --git a/pom.xml b/pom.xml
index 0874955e5..89d6d038b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -513,6 +513,8 @@
<exclude>**/*.mf</exclude>
<!-- test json files -->
<exclude>**/*.json</exclude>
+ <!-- nd json files -->
+ <exclude>**/*.ndjson</exclude>
<!-- SSH keys -->
<exclude>**/*.key</exclude>
<!-- For Jenkins, ignore the .repository -->
diff --git a/tools/shell-commands/pom.xml b/tools/shell-commands/pom.xml
index 2f4575506..b92d1c515 100644
--- a/tools/shell-commands/pom.xml
+++ b/tools/shell-commands/pom.xml
@@ -49,6 +49,11 @@
<artifactId>commons-lang3</artifactId>
</dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </dependency>
+
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
diff --git a/tools/shell-commands/src/main/java/org/apache/unomi/shell/migration/impl/MigrationTo200.java b/tools/shell-commands/src/main/java/org/apache/unomi/shell/migration/impl/MigrationTo200.java
index 94dfd40be..9dc873b6e 100644
--- a/tools/shell-commands/src/main/java/org/apache/unomi/shell/migration/impl/MigrationTo200.java
+++ b/tools/shell-commands/src/main/java/org/apache/unomi/shell/migration/impl/MigrationTo200.java
@@ -16,6 +16,7 @@
*/
package org.apache.unomi.shell.migration.impl;
+import org.apache.commons.io.IOUtils;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
@@ -26,14 +27,21 @@ import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.util.EntityUtils;
import org.apache.karaf.shell.api.console.Session;
import org.apache.unomi.shell.migration.Migration;
+import org.apache.unomi.shell.migration.utils.ConsoleUtils;
import org.json.JSONObject;
import org.osgi.framework.BundleContext;
import org.osgi.framework.Version;
import org.osgi.service.component.annotations.Component;
import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
@Component
public class MigrationTo200 implements Migration {
@@ -41,6 +49,7 @@ public class MigrationTo200 implements Migration {
private CloseableHttpClient httpClient;
private Session session;
private String esAddress;
+ private BundleContext bundleContext;
@Override
public Version getFromVersion() {
@@ -54,7 +63,9 @@ public class MigrationTo200 implements Migration {
@Override
public String getDescription() {
- return "Updates mapping for an index with prefix \"context-event\". Adds the \"sourceId\" field and copies value from the \"scope\" field to it.";
+ return "Updates mapping for an index \"event\" with prefix \"context\" by default. Adds the \"sourceId\" field and copies value "
+ + "from the \"scope\" field to it."
+ + "Creates the scope entries in the index \"scope\" from the existing sopes of the events";
}
@Override
@@ -62,55 +73,36 @@ public class MigrationTo200 implements Migration {
this.httpClient = httpClient;
this.session = session;
this.esAddress = esAddress;
+ this.bundleContext = bundleContext;
doExecute();
}
private void doExecute() throws IOException {
- try (CloseableHttpResponse response = httpClient.execute(new HttpGet(esAddress + "/_aliases"))) {
-
- if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
- JSONObject indicesAsJson = new JSONObject(EntityUtils.toString(response.getEntity()));
-
- final Set<String> indices = indicesAsJson.keySet().stream().
- filter(alias -> alias.startsWith("context-event")).
- collect(Collectors.toSet());
-
- for (String indexName : indices) {
- updateMapping(indexName, httpClient);
- }
- }
+ String indexPrefix = ConsoleUtils.askUserWithDefaultAnswer(session, "SOURCE index name (default: context) : ", "context");
+ Set<String> indexes = getEventIndexes(indexPrefix);
+ for (String index : indexes) {
+ updateMapping(index);
}
+ createScopeMapping(indexPrefix);
+ createScopes(getSetOfScopes(indexes), indexPrefix);
}
- private void updateMapping(final String indexName, final CloseableHttpClient httpClient) throws IOException {
+ private void updateMapping(final String indexName) throws IOException {
HttpPut httpPut = new HttpPut(esAddress + "/" + indexName + "/_mapping");
httpPut.addHeader("Accept", "application/json");
httpPut.addHeader("Content-Type", "application/json");
- String request = "{\n" +
- "\"properties\": {\n" +
- " \"sourceId\": {\n" +
- " \"analyzer\": \"folding\",\n" +
- " \"type\": \"text\",\n" +
- " \"fields\": {\n" +
- " \"keyword\": {\n" +
- " \"type\": \"keyword\",\n" +
- " \"ignore_above\": 256\n" +
- " }\n" +
- " }\n" +
- " }\n" +
- " }\n" +
- "}";
-
- httpPut.setEntity(new StringEntity(request));
+ String requestBody = resourceAsString("requestBody/updateMapping.json");
+
+ httpPut.setEntity(new StringEntity(requestBody));
try (CloseableHttpResponse response = httpClient.execute(httpPut)) {
JSONObject responseAsJson = new JSONObject(EntityUtils.toString(response.getEntity()));
- if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK
- && responseAsJson.has("acknowledged") && responseAsJson.getBoolean("acknowledged")) {
+ if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK && responseAsJson.has("acknowledged") && responseAsJson
+ .getBoolean("acknowledged")) {
System.out.println("Mapping for index = \"" + indexName + "\" successfully updated.");
copyValueScopeToSourceId(indexName, httpClient);
@@ -126,25 +118,140 @@ public class MigrationTo200 implements Migration {
httpPost.addHeader("Accept", "application/json");
httpPost.addHeader("Content-Type", "application/json");
- String request = "{\n" +
- " \"script\": {\n" +
- " \"source\": \"ctx._source.sourceId = ctx._source.scope\",\n" +
- " \"lang\": \"painless\"\n" +
- " }\n" +
- "}";
+ String requestBody = resourceAsString("requestBody/copyValueScopeToSourceId.json");
- httpPost.setEntity(new StringEntity(request));
+ httpPost.setEntity(new StringEntity(requestBody));
try (CloseableHttpResponse response = httpClient.execute(httpPost)) {
JSONObject responseAsJson = new JSONObject(EntityUtils.toString(response.getEntity()));
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
- System.out.println("Copying the \"scope\" field to the \"sourceId\" field for index = \"" + indexName + "\" successfully completed. Total: " +
- responseAsJson.get("total") + ", updated: " + responseAsJson.get("updated") + ".");
+ System.out.println("Copying the \"scope\" field to the \"sourceId\" field for index = \"" + indexName
+ + "\" successfully completed. Total: " + responseAsJson.get("total") + ", updated: " + responseAsJson.get("updated")
+ + ".");
} else {
System.out.println("Copying the \"scope\" field to the \"sourceId\" field for index = \"" + indexName + "\" failed.");
}
}
}
+ private Set<String> getEventIndexes(String indexPrefix) throws IOException {
+ try (CloseableHttpResponse response = httpClient.execute(new HttpGet(esAddress + "/_aliases"))) {
+ if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
+ JSONObject indexesAsJson = new JSONObject(EntityUtils.toString(response.getEntity()));
+ return indexesAsJson.keySet().stream().
+ filter(alias -> alias.startsWith(indexPrefix + "-event")).
+ collect(Collectors.toSet());
+ }
+ }
+ return Collections.emptySet();
+ }
+
+ private boolean scopeIndexNotExists(String indexPrefix) throws IOException {
+ final HttpGet httpGet = new HttpGet(esAddress + "/" + indexPrefix + "-scope");
+
+ httpGet.addHeader("Accept", "application/json");
+ httpGet.addHeader("Content-Type", "application/json");
+
+ try (CloseableHttpResponse response = httpClient.execute(httpGet)) {
+ return response.getStatusLine().getStatusCode() != HttpStatus.SC_OK;
+ }
+ }
+
+ private void createScopeMapping(String indexPrefix) throws IOException {
+
+ if (scopeIndexNotExists(indexPrefix)) {
+ System.out.println("Creation for index = \"" + indexPrefix + "-scope\" starting.");
+ System.out.println("Specify the following parameters:");
+ String numberOfShards = ConsoleUtils.askUserWithDefaultAnswer(session, "number_of_shards: (default: 3)", "3");
+ String numberOfReplicas = ConsoleUtils.askUserWithDefaultAnswer(session, "number_of_replicas: (default: 0)", "0");
+ String mappingTotalFieldsLimit = ConsoleUtils
+ .askUserWithDefaultAnswer(session, "mapping.total_fields.limit: (default: 1000)", "1000");
+ String maxDocValueFieldsSearch = ConsoleUtils
+ .askUserWithDefaultAnswer(session, "max_docvalue_fields_search: (default: 1000)", "1000");
+
+ final HttpPut httpPost = new HttpPut(esAddress + "/" + indexPrefix + "-scope");
+
+ httpPost.addHeader("Accept", "application/json");
+ httpPost.addHeader("Content-Type", "application/json");
+
+ String request = resourceAsString("requestBody/scopeMapping.json").replace("$numberOfShards", numberOfShards)
+ .replace("$numberOfReplicas", numberOfReplicas).replace("$mappingTotalFieldsLimit", mappingTotalFieldsLimit)
+ .replace("$maxDocValueFieldsSearch", maxDocValueFieldsSearch);
+
+ httpPost.setEntity(new StringEntity(request));
+
+ try (CloseableHttpResponse response = httpClient.execute(httpPost)) {
+ if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
+ System.out.println(indexPrefix + "-scope has been correctly created");
+ } else {
+ System.out.println(
+ "Failed to create the index " + indexPrefix + "-scope.Code:" + response.getStatusLine().getStatusCode());
+ throw new RuntimeException("Can not create the scope index. Stop the execution of the migration.");
+ }
+ }
+ } else {
+ System.out.println("The scope index already exists. Skipping the creation of this index");
+ }
+
+ }
+
+ private void createScopes(Set<String> scopes, String indexPrefix) throws IOException {
+ final StringBuilder body = new StringBuilder();
+ String saveScopeBody = resourceAsString("requestBody/bulkSaveScope.ndjson");
+ scopes.forEach(scope -> body.append(saveScopeBody.replace("$scope", scope)));
+
+ final HttpPost httpPost = new HttpPost(esAddress + "/" + indexPrefix + "-scope/_bulk");
+
+ httpPost.addHeader("Accept", "application/json");
+ httpPost.addHeader("Content-Type", "application/x-ndjson");
+
+ httpPost.setEntity(new StringEntity(body.toString()));
+
+ try (CloseableHttpResponse response = httpClient.execute(httpPost)) {
+ if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
+ System.out.println("Creating the \"scopes\" into the index " + indexPrefix + "-scope successfully finished");
+ } else {
+ System.out.println("Creating the \"scopes\" into the index " + indexPrefix + "-scope has failed" + response.getStatusLine()
+ .getStatusCode());
+ }
+ }
+ }
+
+ private Set<String> getSetOfScopes(Set<String> indices) throws IOException {
+ String joinedIndices = String.join(",", indices);
+ final HttpPost httpPost = new HttpPost(esAddress + "/" + joinedIndices + "/_search");
+
+ httpPost.addHeader("Accept", "application/json");
+ httpPost.addHeader("Content-Type", "application/json");
+
+ String request = resourceAsString("requestBody/searchScope.json");
+
+ httpPost.setEntity(new StringEntity(request));
+
+ Set<String> scopes = new HashSet<>();
+ try (CloseableHttpResponse response = httpClient.execute(httpPost)) {
+ JSONObject responseAsJson = new JSONObject(EntityUtils.toString(response.getEntity()));
+ if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
+ System.out.println("Getting the \"scope\" values from the events successfully finished. " + "Number of scope to create: "
+ + responseAsJson.getJSONObject("aggregations").getJSONObject("bucketInfos").get("count").toString());
+ scopes = StreamSupport
+ .stream(responseAsJson.getJSONObject("aggregations").getJSONObject("scopes").getJSONArray("buckets").spliterator(),
+ false).map(bucketElement -> ((JSONObject) bucketElement).getString("key")).collect(Collectors.toSet());
+ } else {
+ System.out.println(
+ "Getting the \"scope\" values from the event has failed. Code: " + response.getStatusLine().getStatusCode());
+ }
+ }
+ return scopes;
+ }
+
+ protected String resourceAsString(final String resource) {
+ final URL url = bundleContext.getBundle().getResource(resource);
+ try (InputStream stream = url.openStream()) {
+ return IOUtils.toString(stream, StandardCharsets.UTF_8);
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git a/tools/shell-commands/src/main/resources/requestBody/bulkSaveScope.ndjson b/tools/shell-commands/src/main/resources/requestBody/bulkSaveScope.ndjson
new file mode 100644
index 000000000..32f9bb0bd
--- /dev/null
+++ b/tools/shell-commands/src/main/resources/requestBody/bulkSaveScope.ndjson
@@ -0,0 +1,2 @@
+{"index": {"_id": "$scope"}}
+{"itemId": "$scope", "itemType": "scope", "metadata": {"id": "$scope" }}
diff --git a/tools/shell-commands/src/main/resources/requestBody/copyValueScopeToSourceId.json b/tools/shell-commands/src/main/resources/requestBody/copyValueScopeToSourceId.json
new file mode 100644
index 000000000..8d9a1ddaa
--- /dev/null
+++ b/tools/shell-commands/src/main/resources/requestBody/copyValueScopeToSourceId.json
@@ -0,0 +1,6 @@
+{
+ "script": {
+ "source": "ctx._source.sourceId = ctx._source.scope",
+ "lang": "painless"
+ }
+}
diff --git a/tools/shell-commands/src/main/resources/requestBody/scopeMapping.json b/tools/shell-commands/src/main/resources/requestBody/scopeMapping.json
new file mode 100644
index 000000000..b45b0a851
--- /dev/null
+++ b/tools/shell-commands/src/main/resources/requestBody/scopeMapping.json
@@ -0,0 +1,44 @@
+{
+ "settings": {
+ "index": {
+ "number_of_shards": $numberOfShards,
+ "number_of_replicas": $numberOfReplicas,
+ "mapping.total_fields.limit": $mappingTotalFieldsLimit,
+ "max_docvalue_fields_search": $maxDocValueFieldsSearch
+ },
+ "analysis": {
+ "analyzer": {
+ "folding": {
+ "type": "custom",
+ "tokenizer": "keyword",
+ "filter": [
+ "lowercase",
+ "asciifolding"
+ ]
+ }
+ }
+ }
+ },
+ "mappings": {
+ "dynamic_templates": [
+ {
+ "all": {
+ "match": "*",
+ "match_mapping_type": "string",
+ "mapping": {
+ "type": "text",
+ "analyzer": "folding",
+ "fields": {
+ "keyword": {
+ "type": "keyword",
+ "ignore_above": 256
+ }
+ }
+ }
+ }
+ }
+ ],
+ "properties": {
+ }
+ }
+}
diff --git a/tools/shell-commands/src/main/resources/requestBody/searchScope.json b/tools/shell-commands/src/main/resources/requestBody/searchScope.json
new file mode 100644
index 000000000..8e7732707
--- /dev/null
+++ b/tools/shell-commands/src/main/resources/requestBody/searchScope.json
@@ -0,0 +1,16 @@
+{
+ "_source": false,
+ "size": 0,
+ "aggs": {
+ "scopes": {
+ "terms": {
+ "field": "scope.keyword"
+ }
+ },
+ "bucketInfos": {
+ "stats_bucket": {
+ "buckets_path": "scopes._count"
+ }
+ }
+ }
+}
diff --git a/tools/shell-commands/src/main/resources/requestBody/updateMapping.json b/tools/shell-commands/src/main/resources/requestBody/updateMapping.json
new file mode 100644
index 000000000..82d12c4d4
--- /dev/null
+++ b/tools/shell-commands/src/main/resources/requestBody/updateMapping.json
@@ -0,0 +1,14 @@
+{
+ "properties": {
+ "sourceId": {
+ "analyzer": "folding",
+ "type": "text",
+ "fields": {
+ "keyword": {
+ "type": "keyword",
+ "ignore_above": 256
+ }
+ }
+ }
+ }
+}