You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by as...@apache.org on 2022/05/19 10:21:22 UTC
[unomi] 01/01: UNOMI-505 Study replication of existing profileIDs into new alias index
This is an automated email from the ASF dual-hosted git repository.
asi pushed a commit to branch UNOMI-505
in repository https://gitbox.apache.org/repos/asf/unomi.git
commit dd5c75d49fdb57e9f328adc2290aabffbb224fb3
Author: Anatol Sialitski <as...@enonic.com>
AuthorDate: Thu May 19 12:21:00 2022 +0200
UNOMI-505 Study replication of existing profileIDs into new alias index
---
.../unomi/shell/migration/impl/MigrationTo200.java | 114 ++++++++++++++++++++-
1 file changed, 110 insertions(+), 4 deletions(-)
diff --git a/tools/shell-commands/src/main/java/org/apache/unomi/shell/migration/impl/MigrationTo200.java b/tools/shell-commands/src/main/java/org/apache/unomi/shell/migration/impl/MigrationTo200.java
index 94dfd40be..3ea172a4f 100644
--- a/tools/shell-commands/src/main/java/org/apache/unomi/shell/migration/impl/MigrationTo200.java
+++ b/tools/shell-commands/src/main/java/org/apache/unomi/shell/migration/impl/MigrationTo200.java
@@ -18,20 +18,25 @@ package org.apache.unomi.shell.migration.impl;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.util.EntityUtils;
import org.apache.karaf.shell.api.console.Session;
import org.apache.unomi.shell.migration.Migration;
+import org.json.JSONArray;
import org.json.JSONObject;
import org.osgi.framework.BundleContext;
import org.osgi.framework.Version;
import org.osgi.service.component.annotations.Component;
import java.io.IOException;
+import java.net.URI;
+import java.time.Instant;
import java.util.Set;
import java.util.stream.Collectors;
@@ -63,10 +68,11 @@ public class MigrationTo200 implements Migration {
this.session = session;
this.esAddress = esAddress;
- doExecute();
+ doMigrate_Copy_Scope_to_SourceId();
+ doMigrate_Create_ProfileAlias_From_Profile();
}
- private void doExecute() throws IOException {
+ private void doMigrate_Copy_Scope_to_SourceId() throws IOException {
try (CloseableHttpResponse response = httpClient.execute(new HttpGet(esAddress + "/_aliases"))) {
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
@@ -77,13 +83,13 @@ public class MigrationTo200 implements Migration {
collect(Collectors.toSet());
for (String indexName : indices) {
- updateMapping(indexName, httpClient);
+ updateMapping(indexName);
}
}
}
}
- private void updateMapping(final String indexName, final CloseableHttpClient httpClient) throws IOException {
+ private void updateMapping(final String indexName) throws IOException {
HttpPut httpPut = new HttpPut(esAddress + "/" + indexName + "/_mapping");
httpPut.addHeader("Accept", "application/json");
@@ -147,4 +153,104 @@ public class MigrationTo200 implements Migration {
}
}
+ private void doMigrate_Create_ProfileAlias_From_Profile() throws IOException {
+ Instant migrationTime = Instant.now();
+ int initialOffset = 1000;
+ int size = 1000;
+ doProcessResponse(migrationTime, initialOffset, size, null);
+ }
+
+ private void doProcessResponse(Instant migrationTime, int initialOffset, int offset, String scrollId) throws IOException {
+ HttpUriRequest request = scrollId == null
+ ? createSearchRequest(offset)
+ : createSearchRequestWithScrollId(scrollId);
+
+ try (CloseableHttpResponse response = httpClient.execute(request)) {
+ if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
+ JSONObject responseAsJson = new JSONObject(EntityUtils.toString(response.getEntity()));
+
+ if (responseAsJson.has("hits")) {
+ JSONObject hitsObject = responseAsJson.getJSONObject("hits");
+ if (hitsObject.has("hits")) {
+ StringBuilder bulkCreateRequest = new StringBuilder();
+ JSONArray hits = hitsObject.getJSONArray("hits");
+ for (Object o : hits) {
+ JSONObject hit = (JSONObject) o;
+ if (hit.has("_source")) {
+ JSONObject hitSource = hit.getJSONObject("_source");
+ if (hitSource.has("itemId")) {
+ String itemId = hitSource.getString("itemId");
+ bulkCreateRequest.append("{\"create\":{\"_id\":\"").append(itemId).append("\"}}\n").
+ append("{\"itemId\": \"").append(itemId).append("\", ").
+ append("\"itemType\": \"profileAlias\", ").
+ append("\"profileID\": \"").append(itemId).append("\", ").
+ append("\"scope\": null, ").
+ append("\"clientID\": \"defaultClientId\", ").
+ append("\"creationTime\": \"").append(migrationTime.toString()).append("\", ").
+ append("\"modifiedTime\": \"").append(migrationTime.toString()).append("\" ").
+ append("}\n");
+ }
+ }
+ }
+
+ httpClient.execute(createProfileAliasRequest(bulkCreateRequest.toString()));
+ }
+
+ if (hitsObject.getJSONObject("total").getInt("value") > offset) {
+ doProcessResponse(migrationTime, initialOffset, offset + initialOffset, responseAsJson.getString("_scroll_id"));
+ } else {
+ if (scrollId != null) {
+ httpClient.execute(createDeleteScrollRequest(scrollId));
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private HttpPost createSearchRequestWithScrollId(final String scrollId) throws IOException {
+ final String requestBody = "{\n" +
+ " \"scroll_id\": \"" + scrollId + "\",\n" +
+ " \"scroll\": \"1h\"\n" +
+ "}";
+
+ final HttpPost request = new HttpPost(esAddress + "/_search/scroll");
+
+ request.addHeader("Accept", "application/json");
+ request.addHeader("Content-Type", "application/json");
+ request.setEntity(new StringEntity(requestBody));
+
+ return request;
+ }
+
+ private HttpGet createSearchRequest(int size) {
+ return new HttpGet(esAddress + "/context-profile/_search?&scroll=1h&_source_includes=itemId&size=" + size);
+ }
+
+ private HttpEntityEnclosingRequestBase createDeleteScrollRequest(final String scrollId) throws IOException {
+ final HttpEntityEnclosingRequestBase deleteRequest = new HttpEntityEnclosingRequestBase() {
+ @Override
+ public String getMethod() {
+ return "DELETE";
+ }
+ };
+
+ deleteRequest.setURI(URI.create(esAddress + "/_search/scroll"));
+ deleteRequest.setEntity(new StringEntity("{ \"scroll_id\": \"" + scrollId + "\" }"));
+ deleteRequest.addHeader("Accept", "application/json");
+ deleteRequest.addHeader("Content-Type", "application/json");
+
+ return deleteRequest;
+ }
+
+ private HttpPost createProfileAliasRequest(String bulkRequestAsString) throws IOException {
+ final HttpPost bulkRequest = new HttpPost(esAddress + "/context-profilealias/_bulk");
+
+ bulkRequest.addHeader("Accept", "application/json");
+ bulkRequest.addHeader("Content-Type", "application/json");
+ bulkRequest.setEntity(new StringEntity(bulkRequestAsString));
+
+ return bulkRequest;
+ }
+
}