You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by GitBox <gi...@apache.org> on 2022/06/16 10:57:52 UTC

[GitHub] [unomi] sergehuber commented on a diff in pull request #440: UNOMI-505 Study replication of existing profileIDs into new alias index

sergehuber commented on code in PR #440:
URL: https://github.com/apache/unomi/pull/440#discussion_r898957679


##########
tools/shell-commands/src/main/java/org/apache/unomi/shell/migration/impl/MigrationTo200.java:
##########
@@ -246,6 +253,109 @@ private Set<String> getSetOfScopes(Set<String> indices) throws IOException {
         return scopes;
     }
 
+    private void createProfileAliasDocumentsFromProfile() throws IOException {
+        System.out.println("Migration \"Create profileAlias from profile\" started");
+        Instant migrationTime = Instant.now();
+        int initialOffset = 1000;
+        int size = 1000;
+        doProcessResponse(migrationTime, initialOffset, size, null);
+        System.out.println("Migration \"Create profileAlias from profile\" completed.");
+    }
+
+    private void doProcessResponse(Instant migrationTime, int initialOffset, int offset, String scrollId) throws IOException {

Review Comment:
   I think we should rename this method to something like processProfiles or something like that, because we are not processing a response here.



##########
tools/shell-commands/src/main/java/org/apache/unomi/shell/migration/impl/MigrationTo200.java:
##########
@@ -246,6 +253,109 @@ private Set<String> getSetOfScopes(Set<String> indices) throws IOException {
         return scopes;
     }
 
+    private void createProfileAliasDocumentsFromProfile() throws IOException {
+        System.out.println("Migration \"Create profileAlias from profile\" started");
+        Instant migrationTime = Instant.now();
+        int initialOffset = 1000;
+        int size = 1000;
+        doProcessResponse(migrationTime, initialOffset, size, null);
+        System.out.println("Migration \"Create profileAlias from profile\" completed.");
+    }
+
+    private void doProcessResponse(Instant migrationTime, int initialOffset, int offset, String scrollId) throws IOException {
+        HttpUriRequest request = scrollId == null
+                ? createSearchRequest(offset)
+                : createSearchRequestWithScrollId(scrollId);
+
+        try (CloseableHttpResponse response = httpClient.execute(request)) {
+            if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
+                JSONObject responseAsJson = new JSONObject(EntityUtils.toString(response.getEntity()));
+
+                if (responseAsJson.has("hits")) {
+                    JSONObject hitsObject = responseAsJson.getJSONObject("hits");
+                    if (hitsObject.has("hits")) {
+                        StringBuilder bulkCreateRequest = new StringBuilder();
+                        JSONArray hits = hitsObject.getJSONArray("hits");
+                        for (Object o : hits) {
+                            JSONObject hit = (JSONObject) o;
+                            if (hit.has("_source")) {
+                                JSONObject hitSource = hit.getJSONObject("_source");
+                                if (hitSource.has("itemId")) {
+                                    String itemId = hitSource.getString("itemId");
+                                    String bulkSaveProfileAliases = resourceAsString("requestBody/bulkSaveProfileAliases.ndjson");
+                                    bulkCreateRequest.append(bulkSaveProfileAliases.
+                                            replace("$itemId", itemId).
+                                            replace("$migrationTime", migrationTime.toString()));
+                                }
+                            }
+                        }
+
+                        CloseableHttpResponse bulkResponse = httpClient.execute(createProfileAliasRequest(bulkCreateRequest.toString()));
+                        if (bulkResponse != null) {
+                            bulkResponse.close();
+                        }
+                    }
+
+                    if (hitsObject.getJSONObject("total").getInt("value") > offset) {
+                        doProcessResponse(migrationTime, initialOffset, offset + initialOffset, responseAsJson.getString("_scroll_id"));

Review Comment:
   I don't think we should do a recursive call here. After all we are using a scroll query so we should simply use a continue the scroll query until all profiles have been processed. Also, the use of a recursive call will be very problematic if we have million of profiles in the data, which can happen.



##########
tools/shell-commands/src/main/java/org/apache/unomi/shell/migration/impl/MigrationTo200.java:
##########
@@ -246,6 +253,109 @@ private Set<String> getSetOfScopes(Set<String> indices) throws IOException {
         return scopes;
     }
 
+    private void createProfileAliasDocumentsFromProfile() throws IOException {
+        System.out.println("Migration \"Create profileAlias from profile\" started");
+        Instant migrationTime = Instant.now();
+        int initialOffset = 1000;
+        int size = 1000;
+        doProcessResponse(migrationTime, initialOffset, size, null);
+        System.out.println("Migration \"Create profileAlias from profile\" completed.");
+    }
+
+    private void doProcessResponse(Instant migrationTime, int initialOffset, int offset, String scrollId) throws IOException {
+        HttpUriRequest request = scrollId == null
+                ? createSearchRequest(offset)
+                : createSearchRequestWithScrollId(scrollId);
+
+        try (CloseableHttpResponse response = httpClient.execute(request)) {
+            if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
+                JSONObject responseAsJson = new JSONObject(EntityUtils.toString(response.getEntity()));
+
+                if (responseAsJson.has("hits")) {
+                    JSONObject hitsObject = responseAsJson.getJSONObject("hits");
+                    if (hitsObject.has("hits")) {
+                        StringBuilder bulkCreateRequest = new StringBuilder();
+                        JSONArray hits = hitsObject.getJSONArray("hits");
+                        for (Object o : hits) {
+                            JSONObject hit = (JSONObject) o;
+                            if (hit.has("_source")) {
+                                JSONObject hitSource = hit.getJSONObject("_source");
+                                if (hitSource.has("itemId")) {
+                                    String itemId = hitSource.getString("itemId");
+                                    String bulkSaveProfileAliases = resourceAsString("requestBody/bulkSaveProfileAliases.ndjson");
+                                    bulkCreateRequest.append(bulkSaveProfileAliases.
+                                            replace("$itemId", itemId).
+                                            replace("$migrationTime", migrationTime.toString()));
+                                }
+                            }
+                        }
+
+                        CloseableHttpResponse bulkResponse = httpClient.execute(createProfileAliasRequest(bulkCreateRequest.toString()));
+                        if (bulkResponse != null) {
+                            bulkResponse.close();
+                        }
+                    }
+
+                    if (hitsObject.getJSONObject("total").getInt("value") > offset) {
+                        doProcessResponse(migrationTime, initialOffset, offset + initialOffset, responseAsJson.getString("_scroll_id"));

Review Comment:
   Also using a continue scroll query will make the handling of the result count easier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@unomi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org