You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by as...@apache.org on 2022/05/19 10:21:22 UTC

[unomi] 01/01: UNOMI-505 Study replication of existing profileIDs into new alias index

This is an automated email from the ASF dual-hosted git repository.

asi pushed a commit to branch UNOMI-505
in repository https://gitbox.apache.org/repos/asf/unomi.git

commit dd5c75d49fdb57e9f328adc2290aabffbb224fb3
Author: Anatol Sialitski <as...@enonic.com>
AuthorDate: Thu May 19 12:21:00 2022 +0200

    UNOMI-505 Study replication of existing profileIDs into new alias index
---
 .../unomi/shell/migration/impl/MigrationTo200.java | 114 ++++++++++++++++++++-
 1 file changed, 110 insertions(+), 4 deletions(-)

diff --git a/tools/shell-commands/src/main/java/org/apache/unomi/shell/migration/impl/MigrationTo200.java b/tools/shell-commands/src/main/java/org/apache/unomi/shell/migration/impl/MigrationTo200.java
index 94dfd40be..3ea172a4f 100644
--- a/tools/shell-commands/src/main/java/org/apache/unomi/shell/migration/impl/MigrationTo200.java
+++ b/tools/shell-commands/src/main/java/org/apache/unomi/shell/migration/impl/MigrationTo200.java
@@ -18,20 +18,25 @@ package org.apache.unomi.shell.migration.impl;
 
 import org.apache.http.HttpStatus;
 import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.methods.HttpPost;
 import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpUriRequest;
 import org.apache.http.entity.StringEntity;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.util.EntityUtils;
 import org.apache.karaf.shell.api.console.Session;
 import org.apache.unomi.shell.migration.Migration;
+import org.json.JSONArray;
 import org.json.JSONObject;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.Version;
 import org.osgi.service.component.annotations.Component;
 
 import java.io.IOException;
+import java.net.URI;
+import java.time.Instant;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -63,10 +68,11 @@ public class MigrationTo200 implements Migration {
         this.session = session;
         this.esAddress = esAddress;
 
-        doExecute();
+        doMigrate_Copy_Scope_to_SourceId();
+        doMigrate_Create_ProfileAlias_From_Profile();
     }
 
-    private void doExecute() throws IOException {
+    private void doMigrate_Copy_Scope_to_SourceId() throws IOException {
         try (CloseableHttpResponse response = httpClient.execute(new HttpGet(esAddress + "/_aliases"))) {
 
             if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
@@ -77,13 +83,13 @@ public class MigrationTo200 implements Migration {
                         collect(Collectors.toSet());
 
                 for (String indexName : indices) {
-                    updateMapping(indexName, httpClient);
+                    updateMapping(indexName);
                 }
             }
         }
     }
 
-    private void updateMapping(final String indexName, final CloseableHttpClient httpClient) throws IOException {
+    private void updateMapping(final String indexName) throws IOException {
         HttpPut httpPut = new HttpPut(esAddress + "/" + indexName + "/_mapping");
 
         httpPut.addHeader("Accept", "application/json");
@@ -147,4 +153,104 @@ public class MigrationTo200 implements Migration {
         }
     }
 
+    private void doMigrate_Create_ProfileAlias_From_Profile() throws IOException {
+        Instant migrationTime = Instant.now();
+        int initialOffset = 1000;
+        int size = 1000;
+        doProcessResponse(migrationTime, initialOffset, size, null);
+    }
+
+    private void doProcessResponse(Instant migrationTime, int initialOffset, int offset, String scrollId) throws IOException {
+        HttpUriRequest request = scrollId == null
+                ? createSearchRequest(offset)
+                : createSearchRequestWithScrollId(scrollId);
+
+        try (CloseableHttpResponse response = httpClient.execute(request)) {
+            if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
+                JSONObject responseAsJson = new JSONObject(EntityUtils.toString(response.getEntity()));
+
+                if (responseAsJson.has("hits")) {
+                    JSONObject hitsObject = responseAsJson.getJSONObject("hits");
+                    if (hitsObject.has("hits")) {
+                        StringBuilder bulkCreateRequest = new StringBuilder();
+                        JSONArray hits = hitsObject.getJSONArray("hits");
+                        for (Object o : hits) {
+                            JSONObject hit = (JSONObject) o;
+                            if (hit.has("_source")) {
+                                JSONObject hitSource = hit.getJSONObject("_source");
+                                if (hitSource.has("itemId")) {
+                                    String itemId = hitSource.getString("itemId");
+                                    bulkCreateRequest.append("{\"create\":{\"_id\":\"").append(itemId).append("\"}}\n").
+                                            append("{\"itemId\": \"").append(itemId).append("\", ").
+                                            append("\"itemType\": \"profileAlias\", ").
+                                            append("\"profileID\": \"").append(itemId).append("\", ").
+                                            append("\"scope\": null, ").
+                                            append("\"clientID\": \"defaultClientId\", ").
+                                            append("\"creationTime\": \"").append(migrationTime.toString()).append("\", ").
+                                            append("\"modifiedTime\": \"").append(migrationTime.toString()).append("\" ").
+                                            append("}\n");
+                                }
+                            }
+                        }
+
+                        httpClient.execute(createProfileAliasRequest(bulkCreateRequest.toString()));
+                    }
+
+                    if (hitsObject.getJSONObject("total").getInt("value") > offset) {
+                        doProcessResponse(migrationTime, initialOffset, offset + initialOffset, responseAsJson.getString("_scroll_id"));
+                    } else {
+                        if (scrollId != null) {
+                            httpClient.execute(createDeleteScrollRequest(scrollId));
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    private HttpPost createSearchRequestWithScrollId(final String scrollId) throws IOException {
+        final String requestBody = "{\n" +
+                "  \"scroll_id\": \"" + scrollId + "\",\n" +
+                "  \"scroll\": \"1h\"\n" +
+                "}";
+
+        final HttpPost request = new HttpPost(esAddress + "/_search/scroll");
+
+        request.addHeader("Accept", "application/json");
+        request.addHeader("Content-Type", "application/json");
+        request.setEntity(new StringEntity(requestBody));
+
+        return request;
+    }
+
+    private HttpGet createSearchRequest(int size) {
+        return new HttpGet(esAddress + "/context-profile/_search?&scroll=1h&_source_includes=itemId&size=" + size);
+    }
+
+    private HttpEntityEnclosingRequestBase createDeleteScrollRequest(final String scrollId) throws IOException {
+        final HttpEntityEnclosingRequestBase deleteRequest = new HttpEntityEnclosingRequestBase() {
+            @Override
+            public String getMethod() {
+                return "DELETE";
+            }
+        };
+
+        deleteRequest.setURI(URI.create(esAddress + "/_search/scroll"));
+        deleteRequest.setEntity(new StringEntity("{ \"scroll_id\": \"" + scrollId + "\" }"));
+        deleteRequest.addHeader("Accept", "application/json");
+        deleteRequest.addHeader("Content-Type", "application/json");
+
+        return deleteRequest;
+    }
+
+    private HttpPost createProfileAliasRequest(String bulkRequestAsString) throws IOException {
+        final HttpPost bulkRequest = new HttpPost(esAddress + "/context-profilealias/_bulk");
+
+        bulkRequest.addHeader("Accept", "application/json");
+        bulkRequest.addHeader("Content-Type", "application/json");
+        bulkRequest.setEntity(new StringEntity(bulkRequestAsString));
+
+        return bulkRequest;
+    }
+
 }