You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by jk...@apache.org on 2022/05/25 15:30:45 UTC

[unomi] branch master updated: UNOMI-395 : add migration logic to create scopes entries from scope i… (#427)

This is an automated email from the ASF dual-hosted git repository.

jkevan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/unomi.git


The following commit(s) were added to refs/heads/master by this push:
     new da80579b1 UNOMI-395 : add migration logic to create scopes entries from scope i… (#427)
da80579b1 is described below

commit da80579b12fa6ee596ceb97826aec22e8edd5935
Author: jsinovassin <58...@users.noreply.github.com>
AuthorDate: Wed May 25 17:30:40 2022 +0200

    UNOMI-395 : add migration logic to create scopes entries from scope i… (#427)
    
    * UNOMI-395 : add migration logic to create scopes entries from scope in existing events
    
    * set json text in json files
---
 .../resources/META-INF/cxs/mappings/scope.json     |   3 -
 pom.xml                                            |   2 +
 tools/shell-commands/pom.xml                       |   5 +
 .../unomi/shell/migration/impl/MigrationTo200.java | 191 ++++++++++++++++-----
 .../resources/requestBody/bulkSaveScope.ndjson     |   2 +
 .../requestBody/copyValueScopeToSourceId.json      |   6 +
 .../main/resources/requestBody/scopeMapping.json   |  44 +++++
 .../main/resources/requestBody/searchScope.json    |  16 ++
 .../main/resources/requestBody/updateMapping.json  |  14 ++
 9 files changed, 238 insertions(+), 45 deletions(-)

diff --git a/persistence-elasticsearch/core/src/main/resources/META-INF/cxs/mappings/scope.json b/persistence-elasticsearch/core/src/main/resources/META-INF/cxs/mappings/scope.json
index 30234221c..27fa2b384 100644
--- a/persistence-elasticsearch/core/src/main/resources/META-INF/cxs/mappings/scope.json
+++ b/persistence-elasticsearch/core/src/main/resources/META-INF/cxs/mappings/scope.json
@@ -18,8 +18,5 @@
     }
   ],
   "properties": {
-    "value": {
-      "type": "text"
-    }
   }
 }
diff --git a/pom.xml b/pom.xml
index 0874955e5..89d6d038b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -513,6 +513,8 @@
                                 <exclude>**/*.mf</exclude>
                                 <!-- test json files -->
                                 <exclude>**/*.json</exclude>
+                                <!-- nd json files -->
+                                <exclude>**/*.ndjson</exclude>
                                 <!-- SSH keys -->
                                 <exclude>**/*.key</exclude>
                                 <!-- For Jenkins, ignore the .repository -->
diff --git a/tools/shell-commands/pom.xml b/tools/shell-commands/pom.xml
index 2f4575506..b92d1c515 100644
--- a/tools/shell-commands/pom.xml
+++ b/tools/shell-commands/pom.xml
@@ -49,6 +49,11 @@
             <artifactId>commons-lang3</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+        </dependency>
+
         <dependency>
             <groupId>org.json</groupId>
             <artifactId>json</artifactId>
diff --git a/tools/shell-commands/src/main/java/org/apache/unomi/shell/migration/impl/MigrationTo200.java b/tools/shell-commands/src/main/java/org/apache/unomi/shell/migration/impl/MigrationTo200.java
index 94dfd40be..9dc873b6e 100644
--- a/tools/shell-commands/src/main/java/org/apache/unomi/shell/migration/impl/MigrationTo200.java
+++ b/tools/shell-commands/src/main/java/org/apache/unomi/shell/migration/impl/MigrationTo200.java
@@ -16,6 +16,7 @@
  */
 package org.apache.unomi.shell.migration.impl;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.http.HttpStatus;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpGet;
@@ -26,14 +27,21 @@ import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.util.EntityUtils;
 import org.apache.karaf.shell.api.console.Session;
 import org.apache.unomi.shell.migration.Migration;
+import org.apache.unomi.shell.migration.utils.ConsoleUtils;
 import org.json.JSONObject;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.Version;
 import org.osgi.service.component.annotations.Component;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.Set;
 import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
 @Component
 public class MigrationTo200 implements Migration {
@@ -41,6 +49,7 @@ public class MigrationTo200 implements Migration {
     private CloseableHttpClient httpClient;
     private Session session;
     private String esAddress;
+    private BundleContext bundleContext;
 
     @Override
     public Version getFromVersion() {
@@ -54,7 +63,9 @@ public class MigrationTo200 implements Migration {
 
     @Override
     public String getDescription() {
-        return "Updates mapping for an index with prefix \"context-event\". Adds the \"sourceId\" field and copies value from the \"scope\" field to it.";
+        return "Updates mapping for an index \"event\" with prefix \"context\" by default. Adds the \"sourceId\" field and copies value "
+                + "from the \"scope\" field to it."
+                + "Creates the scope entries in the index \"scope\" from the existing sopes of the events";
     }
 
     @Override
@@ -62,55 +73,36 @@ public class MigrationTo200 implements Migration {
         this.httpClient = httpClient;
         this.session = session;
         this.esAddress = esAddress;
+        this.bundleContext = bundleContext;
 
         doExecute();
     }
 
     private void doExecute() throws IOException {
-        try (CloseableHttpResponse response = httpClient.execute(new HttpGet(esAddress + "/_aliases"))) {
-
-            if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
-                JSONObject indicesAsJson = new JSONObject(EntityUtils.toString(response.getEntity()));
-
-                final Set<String> indices = indicesAsJson.keySet().stream().
-                        filter(alias -> alias.startsWith("context-event")).
-                        collect(Collectors.toSet());
-
-                for (String indexName : indices) {
-                    updateMapping(indexName, httpClient);
-                }
-            }
+        String indexPrefix = ConsoleUtils.askUserWithDefaultAnswer(session, "SOURCE index name (default: context) : ", "context");
+        Set<String> indexes = getEventIndexes(indexPrefix);
+        for (String index : indexes) {
+            updateMapping(index);
         }
+        createScopeMapping(indexPrefix);
+        createScopes(getSetOfScopes(indexes), indexPrefix);
     }
 
-    private void updateMapping(final String indexName, final CloseableHttpClient httpClient) throws IOException {
+    private void updateMapping(final String indexName) throws IOException {
         HttpPut httpPut = new HttpPut(esAddress + "/" + indexName + "/_mapping");
 
         httpPut.addHeader("Accept", "application/json");
         httpPut.addHeader("Content-Type", "application/json");
 
-        String request = "{\n" +
-                "\"properties\": {\n" +
-                " \"sourceId\": {\n" +
-                "  \"analyzer\": \"folding\",\n" +
-                "  \"type\": \"text\",\n" +
-                "  \"fields\": {\n" +
-                "   \"keyword\": {\n" +
-                "    \"type\": \"keyword\",\n" +
-                "    \"ignore_above\": 256\n" +
-                "    }\n" +
-                "   }\n" +
-                "  }\n" +
-                " }\n" +
-                "}";
-
-        httpPut.setEntity(new StringEntity(request));
+        String requestBody = resourceAsString("requestBody/updateMapping.json");
+
+        httpPut.setEntity(new StringEntity(requestBody));
 
         try (CloseableHttpResponse response = httpClient.execute(httpPut)) {
             JSONObject responseAsJson = new JSONObject(EntityUtils.toString(response.getEntity()));
 
-            if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK
-                    && responseAsJson.has("acknowledged") && responseAsJson.getBoolean("acknowledged")) {
+            if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK && responseAsJson.has("acknowledged") && responseAsJson
+                    .getBoolean("acknowledged")) {
                 System.out.println("Mapping for index = \"" + indexName + "\" successfully updated.");
 
                 copyValueScopeToSourceId(indexName, httpClient);
@@ -126,25 +118,140 @@ public class MigrationTo200 implements Migration {
         httpPost.addHeader("Accept", "application/json");
         httpPost.addHeader("Content-Type", "application/json");
 
-        String request = "{\n" +
-                "  \"script\": {\n" +
-                "    \"source\": \"ctx._source.sourceId = ctx._source.scope\",\n" +
-                "    \"lang\": \"painless\"\n" +
-                "  }\n" +
-                "}";
+        String requestBody = resourceAsString("requestBody/copyValueScopeToSourceId.json");
 
-        httpPost.setEntity(new StringEntity(request));
+        httpPost.setEntity(new StringEntity(requestBody));
 
         try (CloseableHttpResponse response = httpClient.execute(httpPost)) {
             JSONObject responseAsJson = new JSONObject(EntityUtils.toString(response.getEntity()));
 
             if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
-                System.out.println("Copying the \"scope\" field to the \"sourceId\" field for index = \"" + indexName + "\" successfully completed. Total: " +
-                        responseAsJson.get("total") + ", updated: " + responseAsJson.get("updated") + ".");
+                System.out.println("Copying the \"scope\" field to the \"sourceId\" field for index = \"" + indexName
+                        + "\" successfully completed. Total: " + responseAsJson.get("total") + ", updated: " + responseAsJson.get("updated")
+                        + ".");
             } else {
                 System.out.println("Copying the \"scope\" field to the \"sourceId\" field for index = \"" + indexName + "\" failed.");
             }
         }
     }
 
+    private Set<String> getEventIndexes(String indexPrefix) throws IOException {
+        try (CloseableHttpResponse response = httpClient.execute(new HttpGet(esAddress + "/_aliases"))) {
+            if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
+                JSONObject indexesAsJson = new JSONObject(EntityUtils.toString(response.getEntity()));
+                return indexesAsJson.keySet().stream().
+                        filter(alias -> alias.startsWith(indexPrefix + "-event")).
+                        collect(Collectors.toSet());
+            }
+        }
+        return Collections.emptySet();
+    }
+
+    private boolean scopeIndexNotExists(String indexPrefix) throws IOException {
+        final HttpGet httpGet = new HttpGet(esAddress + "/" + indexPrefix + "-scope");
+
+        httpGet.addHeader("Accept", "application/json");
+        httpGet.addHeader("Content-Type", "application/json");
+
+        try (CloseableHttpResponse response = httpClient.execute(httpGet)) {
+            return response.getStatusLine().getStatusCode() != HttpStatus.SC_OK;
+        }
+    }
+
+    private void createScopeMapping(String indexPrefix) throws IOException {
+
+        if (scopeIndexNotExists(indexPrefix)) {
+            System.out.println("Creation for index = \"" + indexPrefix + "-scope\" starting.");
+            System.out.println("Specify the following parameters:");
+            String numberOfShards = ConsoleUtils.askUserWithDefaultAnswer(session, "number_of_shards: (default: 3)", "3");
+            String numberOfReplicas = ConsoleUtils.askUserWithDefaultAnswer(session, "number_of_replicas: (default: 0)", "0");
+            String mappingTotalFieldsLimit = ConsoleUtils
+                    .askUserWithDefaultAnswer(session, "mapping.total_fields.limit: (default: 1000)", "1000");
+            String maxDocValueFieldsSearch = ConsoleUtils
+                    .askUserWithDefaultAnswer(session, "max_docvalue_fields_search: (default: 1000)", "1000");
+
+            final HttpPut httpPost = new HttpPut(esAddress + "/" + indexPrefix + "-scope");
+
+            httpPost.addHeader("Accept", "application/json");
+            httpPost.addHeader("Content-Type", "application/json");
+
+            String request = resourceAsString("requestBody/scopeMapping.json").replace("$numberOfShards", numberOfShards)
+                    .replace("$numberOfReplicas", numberOfReplicas).replace("$mappingTotalFieldsLimit", mappingTotalFieldsLimit)
+                    .replace("$maxDocValueFieldsSearch", maxDocValueFieldsSearch);
+
+            httpPost.setEntity(new StringEntity(request));
+
+            try (CloseableHttpResponse response = httpClient.execute(httpPost)) {
+                if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
+                    System.out.println(indexPrefix + "-scope has been correctly created");
+                } else {
+                    System.out.println(
+                            "Failed to create the index " + indexPrefix + "-scope.Code:" + response.getStatusLine().getStatusCode());
+                    throw new RuntimeException("Can not create the scope index. Stop the execution of the migration.");
+                }
+            }
+        } else {
+            System.out.println("The scope index already exists. Skipping the creation of this index");
+        }
+
+    }
+
+    private void createScopes(Set<String> scopes, String indexPrefix) throws IOException {
+        final StringBuilder body = new StringBuilder();
+        String saveScopeBody = resourceAsString("requestBody/bulkSaveScope.ndjson");
+        scopes.forEach(scope -> body.append(saveScopeBody.replace("$scope", scope)));
+
+        final HttpPost httpPost = new HttpPost(esAddress + "/" + indexPrefix + "-scope/_bulk");
+
+        httpPost.addHeader("Accept", "application/json");
+        httpPost.addHeader("Content-Type", "application/x-ndjson");
+
+        httpPost.setEntity(new StringEntity(body.toString()));
+
+        try (CloseableHttpResponse response = httpClient.execute(httpPost)) {
+            if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
+                System.out.println("Creating the \"scopes\" into the index " + indexPrefix + "-scope successfully finished");
+            } else {
+                System.out.println("Creating the \"scopes\" into the index " + indexPrefix + "-scope has failed" + response.getStatusLine()
+                        .getStatusCode());
+            }
+        }
+    }
+
+    private Set<String> getSetOfScopes(Set<String> indices) throws IOException {
+        String joinedIndices = String.join(",", indices);
+        final HttpPost httpPost = new HttpPost(esAddress + "/" + joinedIndices + "/_search");
+
+        httpPost.addHeader("Accept", "application/json");
+        httpPost.addHeader("Content-Type", "application/json");
+
+        String request = resourceAsString("requestBody/searchScope.json");
+
+        httpPost.setEntity(new StringEntity(request));
+
+        Set<String> scopes = new HashSet<>();
+        try (CloseableHttpResponse response = httpClient.execute(httpPost)) {
+            JSONObject responseAsJson = new JSONObject(EntityUtils.toString(response.getEntity()));
+            if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
+                System.out.println("Getting the \"scope\" values from the events successfully finished. " + "Number of scope to create: "
+                        + responseAsJson.getJSONObject("aggregations").getJSONObject("bucketInfos").get("count").toString());
+                scopes = StreamSupport
+                        .stream(responseAsJson.getJSONObject("aggregations").getJSONObject("scopes").getJSONArray("buckets").spliterator(),
+                                false).map(bucketElement -> ((JSONObject) bucketElement).getString("key")).collect(Collectors.toSet());
+            } else {
+                System.out.println(
+                        "Getting the \"scope\" values from the event has failed. Code: " + response.getStatusLine().getStatusCode());
+            }
+        }
+        return scopes;
+    }
+
+    protected String resourceAsString(final String resource) {
+        final URL url = bundleContext.getBundle().getResource(resource);
+        try (InputStream stream = url.openStream()) {
+            return IOUtils.toString(stream, StandardCharsets.UTF_8);
+        } catch (final Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
 }
diff --git a/tools/shell-commands/src/main/resources/requestBody/bulkSaveScope.ndjson b/tools/shell-commands/src/main/resources/requestBody/bulkSaveScope.ndjson
new file mode 100644
index 000000000..32f9bb0bd
--- /dev/null
+++ b/tools/shell-commands/src/main/resources/requestBody/bulkSaveScope.ndjson
@@ -0,0 +1,2 @@
+{"index": {"_id": "$scope"}}
+{"itemId": "$scope", "itemType": "scope", "metadata": {"id": "$scope" }}
diff --git a/tools/shell-commands/src/main/resources/requestBody/copyValueScopeToSourceId.json b/tools/shell-commands/src/main/resources/requestBody/copyValueScopeToSourceId.json
new file mode 100644
index 000000000..8d9a1ddaa
--- /dev/null
+++ b/tools/shell-commands/src/main/resources/requestBody/copyValueScopeToSourceId.json
@@ -0,0 +1,6 @@
+{
+  "script": {
+    "source": "ctx._source.sourceId = ctx._source.scope",
+    "lang": "painless"
+  }
+}
diff --git a/tools/shell-commands/src/main/resources/requestBody/scopeMapping.json b/tools/shell-commands/src/main/resources/requestBody/scopeMapping.json
new file mode 100644
index 000000000..b45b0a851
--- /dev/null
+++ b/tools/shell-commands/src/main/resources/requestBody/scopeMapping.json
@@ -0,0 +1,44 @@
+{
+  "settings": {
+    "index": {
+      "number_of_shards": $numberOfShards,
+      "number_of_replicas": $numberOfReplicas,
+      "mapping.total_fields.limit": $mappingTotalFieldsLimit,
+      "max_docvalue_fields_search": $maxDocValueFieldsSearch
+    },
+    "analysis": {
+      "analyzer": {
+        "folding": {
+          "type": "custom",
+          "tokenizer": "keyword",
+          "filter": [
+            "lowercase",
+            "asciifolding"
+          ]
+        }
+      }
+    }
+  },
+  "mappings": {
+    "dynamic_templates": [
+      {
+        "all": {
+          "match": "*",
+          "match_mapping_type": "string",
+          "mapping": {
+            "type": "text",
+            "analyzer": "folding",
+            "fields": {
+              "keyword": {
+                "type": "keyword",
+                "ignore_above": 256
+              }
+            }
+          }
+        }
+      }
+    ],
+    "properties": {
+    }
+  }
+}
diff --git a/tools/shell-commands/src/main/resources/requestBody/searchScope.json b/tools/shell-commands/src/main/resources/requestBody/searchScope.json
new file mode 100644
index 000000000..8e7732707
--- /dev/null
+++ b/tools/shell-commands/src/main/resources/requestBody/searchScope.json
@@ -0,0 +1,16 @@
+{
+  "_source": false,
+  "size": 0,
+  "aggs": {
+    "scopes": {
+      "terms": {
+        "field": "scope.keyword"
+      }
+    },
+    "bucketInfos": {
+      "stats_bucket": {
+        "buckets_path": "scopes._count"
+      }
+    }
+  }
+}
diff --git a/tools/shell-commands/src/main/resources/requestBody/updateMapping.json b/tools/shell-commands/src/main/resources/requestBody/updateMapping.json
new file mode 100644
index 000000000..82d12c4d4
--- /dev/null
+++ b/tools/shell-commands/src/main/resources/requestBody/updateMapping.json
@@ -0,0 +1,14 @@
+{
+  "properties": {
+    "sourceId": {
+      "analyzer": "folding",
+      "type": "text",
+      "fields": {
+        "keyword": {
+          "type": "keyword",
+          "ignore_above": 256
+        }
+      }
+    }
+  }
+}