You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by as...@apache.org on 2022/05/19 10:21:21 UTC

[unomi] branch UNOMI-505 created (now dd5c75d49)

This is an automated email from the ASF dual-hosted git repository.

asi pushed a change to branch UNOMI-505
in repository https://gitbox.apache.org/repos/asf/unomi.git


      at dd5c75d49 UNOMI-505 Study replication of existing profileIDs into new alias index

This branch includes the following new commits:

     new dd5c75d49 UNOMI-505 Study replication of existing profileIDs into new alias index

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[unomi] 01/01: UNOMI-505 Study replication of existing profileIDs into new alias index

Posted by as...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

asi pushed a commit to branch UNOMI-505
in repository https://gitbox.apache.org/repos/asf/unomi.git

commit dd5c75d49fdb57e9f328adc2290aabffbb224fb3
Author: Anatol Sialitski <as...@enonic.com>
AuthorDate: Thu May 19 12:21:00 2022 +0200

    UNOMI-505 Study replication of existing profileIDs into new alias index
---
 .../unomi/shell/migration/impl/MigrationTo200.java | 114 ++++++++++++++++++++-
 1 file changed, 110 insertions(+), 4 deletions(-)

diff --git a/tools/shell-commands/src/main/java/org/apache/unomi/shell/migration/impl/MigrationTo200.java b/tools/shell-commands/src/main/java/org/apache/unomi/shell/migration/impl/MigrationTo200.java
index 94dfd40be..3ea172a4f 100644
--- a/tools/shell-commands/src/main/java/org/apache/unomi/shell/migration/impl/MigrationTo200.java
+++ b/tools/shell-commands/src/main/java/org/apache/unomi/shell/migration/impl/MigrationTo200.java
@@ -18,20 +18,25 @@ package org.apache.unomi.shell.migration.impl;
 
 import org.apache.http.HttpStatus;
 import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.methods.HttpPost;
 import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpUriRequest;
 import org.apache.http.entity.StringEntity;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.util.EntityUtils;
 import org.apache.karaf.shell.api.console.Session;
 import org.apache.unomi.shell.migration.Migration;
+import org.json.JSONArray;
 import org.json.JSONObject;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.Version;
 import org.osgi.service.component.annotations.Component;
 
 import java.io.IOException;
+import java.net.URI;
+import java.time.Instant;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -63,10 +68,11 @@ public class MigrationTo200 implements Migration {
         this.session = session;
         this.esAddress = esAddress;
 
-        doExecute();
+        doMigrate_Copy_Scope_to_SourceId();
+        doMigrate_Create_ProfileAlias_From_Profile();
     }
 
-    private void doExecute() throws IOException {
+    private void doMigrate_Copy_Scope_to_SourceId() throws IOException {
         try (CloseableHttpResponse response = httpClient.execute(new HttpGet(esAddress + "/_aliases"))) {
 
             if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
@@ -77,13 +83,13 @@ public class MigrationTo200 implements Migration {
                         collect(Collectors.toSet());
 
                 for (String indexName : indices) {
-                    updateMapping(indexName, httpClient);
+                    updateMapping(indexName);
                 }
             }
         }
     }
 
-    private void updateMapping(final String indexName, final CloseableHttpClient httpClient) throws IOException {
+    private void updateMapping(final String indexName) throws IOException {
         HttpPut httpPut = new HttpPut(esAddress + "/" + indexName + "/_mapping");
 
         httpPut.addHeader("Accept", "application/json");
@@ -147,4 +153,104 @@ public class MigrationTo200 implements Migration {
         }
     }
 
+    private void doMigrate_Create_ProfileAlias_From_Profile() throws IOException {
+        Instant migrationTime = Instant.now();
+        int initialOffset = 1000;
+        int size = 1000;
+        doProcessResponse(migrationTime, initialOffset, size, null);
+    }
+
+    private void doProcessResponse(Instant migrationTime, int initialOffset, int offset, String scrollId) throws IOException {
+        HttpUriRequest request = scrollId == null
+                ? createSearchRequest(offset)
+                : createSearchRequestWithScrollId(scrollId);
+
+        try (CloseableHttpResponse response = httpClient.execute(request)) {
+            if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
+                JSONObject responseAsJson = new JSONObject(EntityUtils.toString(response.getEntity()));
+
+                if (responseAsJson.has("hits")) {
+                    JSONObject hitsObject = responseAsJson.getJSONObject("hits");
+                    if (hitsObject.has("hits")) {
+                        StringBuilder bulkCreateRequest = new StringBuilder();
+                        JSONArray hits = hitsObject.getJSONArray("hits");
+                        for (Object o : hits) {
+                            JSONObject hit = (JSONObject) o;
+                            if (hit.has("_source")) {
+                                JSONObject hitSource = hit.getJSONObject("_source");
+                                if (hitSource.has("itemId")) {
+                                    String itemId = hitSource.getString("itemId");
+                                    bulkCreateRequest.append("{\"create\":{\"_id\":\"").append(itemId).append("\"}}\n").
+                                            append("{\"itemId\": \"").append(itemId).append("\", ").
+                                            append("\"itemType\": \"profileAlias\", ").
+                                            append("\"profileID\": \"").append(itemId).append("\", ").
+                                            append("\"scope\": null, ").
+                                            append("\"clientID\": \"defaultClientId\", ").
+                                            append("\"creationTime\": \"").append(migrationTime.toString()).append("\", ").
+                                            append("\"modifiedTime\": \"").append(migrationTime.toString()).append("\" ").
+                                            append("}\n");
+                                }
+                            }
+                        }
+
+                        httpClient.execute(createProfileAliasRequest(bulkCreateRequest.toString()));
+                    }
+
+                    if (hitsObject.getJSONObject("total").getInt("value") > offset) {
+                        doProcessResponse(migrationTime, initialOffset, offset + initialOffset, responseAsJson.getString("_scroll_id"));
+                    } else {
+                        if (scrollId != null) {
+                            httpClient.execute(createDeleteScrollRequest(scrollId));
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    private HttpPost createSearchRequestWithScrollId(final String scrollId) throws IOException {
+        final String requestBody = "{\n" +
+                "  \"scroll_id\": \"" + scrollId + "\",\n" +
+                "  \"scroll\": \"1h\"\n" +
+                "}";
+
+        final HttpPost request = new HttpPost(esAddress + "/_search/scroll");
+
+        request.addHeader("Accept", "application/json");
+        request.addHeader("Content-Type", "application/json");
+        request.setEntity(new StringEntity(requestBody));
+
+        return request;
+    }
+
+    private HttpGet createSearchRequest(int size) {
+        return new HttpGet(esAddress + "/context-profile/_search?&scroll=1h&_source_includes=itemId&size=" + size);
+    }
+
+    private HttpEntityEnclosingRequestBase createDeleteScrollRequest(final String scrollId) throws IOException {
+        final HttpEntityEnclosingRequestBase deleteRequest = new HttpEntityEnclosingRequestBase() {
+            @Override
+            public String getMethod() {
+                return "DELETE";
+            }
+        };
+
+        deleteRequest.setURI(URI.create(esAddress + "/_search/scroll"));
+        deleteRequest.setEntity(new StringEntity("{ \"scroll_id\": \"" + scrollId + "\" }"));
+        deleteRequest.addHeader("Accept", "application/json");
+        deleteRequest.addHeader("Content-Type", "application/json");
+
+        return deleteRequest;
+    }
+
+    private HttpPost createProfileAliasRequest(String bulkRequestAsString) throws IOException {
+        final HttpPost bulkRequest = new HttpPost(esAddress + "/context-profilealias/_bulk");
+
+        bulkRequest.addHeader("Accept", "application/json");
+        bulkRequest.addHeader("Content-Type", "application/json");
+        bulkRequest.setEntity(new StringEntity(bulkRequestAsString));
+
+        return bulkRequest;
+    }
+
 }