You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2022/08/10 14:42:40 UTC

[GitHub] [ignite] NSAmelchev opened a new pull request, #10189: IGNITE-17502 Tasks to sent the snapshot files are not ordered

NSAmelchev opened a new pull request, #10189:
URL: https://github.com/apache/ignite/pull/10189

   Thank you for submitting the pull request to the Apache Ignite.
   
   In order to streamline the review of the contribution 
   we ask you to ensure the following steps have been taken:
   
   ### The Contribution Checklist
   - [ ] There is a single JIRA ticket related to the pull request. 
   - [ ] The web-link to the pull request is attached to the JIRA ticket.
   - [ ] The JIRA ticket has the _Patch Available_ state.
   - [ ] The pull request body describes changes that have been made. 
   The description explains _WHAT_ and _WHY_ was made instead of _HOW_.
   - [ ] The pull request title is treated as the final commit message. 
   The following pattern must be used: `IGNITE-XXXX Change summary` where `XXXX` - number of JIRA issue.
   - [ ] A reviewer has been mentioned through the JIRA comments 
   (see [the Maintainers list](https://cwiki.apache.org/confluence/display/IGNITE/How+to+Contribute#HowtoContribute-ReviewProcessandMaintainers)) 
   - [ ] The pull request has been checked by the Teamcity Bot and 
   the `green visa` attached to the JIRA ticket (see [TC.Bot: Check PR](https://mtcga.gridgain.com/prs.html))
   
   ### Notes
   - [How to Contribute](https://cwiki.apache.org/confluence/display/IGNITE/How+to+Contribute)
   - [Coding abbreviation rules](https://cwiki.apache.org/confluence/display/IGNITE/Abbreviation+Rules)
   - [Coding Guidelines](https://cwiki.apache.org/confluence/display/IGNITE/Coding+Guidelines)
   - [Apache Ignite Teamcity Bot](https://cwiki.apache.org/confluence/display/IGNITE/Apache+Ignite+Teamcity+Bot)
   
   If you need any help, please email dev@ignite.apache.org or ask anу advice on http://asf.slack.com _#ignite_ channel.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite] xtern commented on a diff in pull request #10189: IGNITE-17502 Tasks to sent the snapshot files are not ordered

Posted by GitBox <gi...@apache.org>.
xtern commented on code in PR #10189:
URL: https://github.com/apache/ignite/pull/10189#discussion_r944210268


##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFilesRequestMessage.java:
##########
@@ -91,9 +92,10 @@ public Map<Integer, Set<Integer>> parts() {
         Map<Integer, Set<Integer>> res = new HashMap<>();
 
         for (Map.Entry<Integer, int[]> e : parts.entrySet()) {
-            res.put(e.getKey(), e.getValue().length == 0 ? null : Arrays.stream(e.getValue())
-                .boxed()
-                .collect(Collectors.toSet()));
+            if (F.isEmpty(e.getValue()))

Review Comment:
   Why can't we them filter in the constructor?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite] xtern commented on a diff in pull request #10189: IGNITE-17502 Tasks to sent the snapshot files are not ordered

Posted by GitBox <gi...@apache.org>.
xtern commented on code in PR #10189:
URL: https://github.com/apache/ignite/pull/10189#discussion_r944210268


##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFilesRequestMessage.java:
##########
@@ -91,9 +92,10 @@ public Map<Integer, Set<Integer>> parts() {
         Map<Integer, Set<Integer>> res = new HashMap<>();
 
         for (Map.Entry<Integer, int[]> e : parts.entrySet()) {
-            res.put(e.getKey(), e.getValue().length == 0 ? null : Arrays.stream(e.getValue())
-                .boxed()
-                .collect(Collectors.toSet()));
+            if (F.isEmpty(e.getValue()))

Review Comment:
   Why can't we filter them in the constructor?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite] NSAmelchev commented on a diff in pull request #10189: IGNITE-17502 Tasks to sent the snapshot files are not ordered

Posted by GitBox <gi...@apache.org>.
NSAmelchev commented on code in PR #10189:
URL: https://github.com/apache/ignite/pull/10189#discussion_r944343519


##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotResponseRemoteFutureTask.java:
##########
@@ -72,70 +74,65 @@ public SnapshotResponseRemoteFutureTask(
             return false;
 
         try {
-            List<GroupPartitionId> handled = new ArrayList<>();
+            List<SnapshotMetadata> metas = cctx.snapshotMgr().readSnapshotMetadatas(snpName, snpPath);
 
-            for (Map.Entry<Integer, Set<Integer>> e : parts.entrySet()) {
-                ofNullable(e.getValue()).orElse(Collections.emptySet())
-                    .forEach(p -> handled.add(new GroupPartitionId(e.getKey(), p)));
-            }
+            Function<GroupPartitionId, SnapshotMetadata> findMeta = pair -> {
+                for (SnapshotMetadata meta : metas) {
+                    Map<Integer, Set<Integer>> parts0 = meta.partitions();
 
-            snpSndr.init(handled.size());
+                    if (F.isEmpty(parts0))
+                        continue;
 
-            File snpDir = cctx.snapshotMgr().snapshotLocalDir(snpName, snpPath);
+                    Set<Integer> locParts = parts0.get(pair.getGroupId());
 
-            List<CompletableFuture<Void>> futs = new ArrayList<>();
-            List<SnapshotMetadata> metas = cctx.snapshotMgr().readSnapshotMetadatas(snpName, snpPath);
+                    if (locParts != null && locParts.contains(pair.getPartitionId()))
+                        return meta;
+                }
 
-            for (SnapshotMetadata meta : metas) {
-                Map<Integer, Set<Integer>> parts0 = meta.partitions();
+                return null;
+            };
 
-                if (F.isEmpty(parts0))
-                    continue;
+            Map<GroupPartitionId, SnapshotMetadata> partsToSend = new HashMap<>();
 
-                handled.removeIf(gp -> {
-                    if (ofNullable(parts0.get(gp.getGroupId()))
-                        .orElse(Collections.emptySet())
-                        .contains(gp.getPartitionId())
-                    ) {
-                        futs.add(CompletableFuture.runAsync(() -> {
-                            if (err.get() != null)
-                                return;
+            for (Map.Entry<Integer, Set<Integer>> e : parts.entrySet())
+                e.getValue().forEach(p -> partsToSend.computeIfAbsent(new GroupPartitionId(e.getKey(), p), findMeta));
 
-                            File cacheDir = cacheDirectory(new File(snpDir, databaseRelativePath(meta.folderName())),
-                                gp.getGroupId());
+            if (partsToSend.containsValue(null)) {
+                Collection<GroupPartitionId> missed = F.viewReadOnly(partsToSend.entrySet(), Map.Entry::getKey,
+                    e -> e.getValue() == null);
 
-                            if (cacheDir == null) {
-                                throw new IgniteException("Cache directory not found [snpName=" + snpName + ", meta=" + meta +
-                                    ", pair=" + gp + ']');
-                            }
+                err.compareAndSet(null, new IgniteException("Snapshot partitions missed on local node " +
+                    "[snpName=" + snpName + ", missed=" + missed + ']'));

Review Comment:
   > I assume that we should also stop executing this method at this point.
   Done, thanks
   



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotResponseRemoteFutureTask.java:
##########
@@ -72,70 +74,65 @@ public SnapshotResponseRemoteFutureTask(
             return false;
 
         try {
-            List<GroupPartitionId> handled = new ArrayList<>();
+            List<SnapshotMetadata> metas = cctx.snapshotMgr().readSnapshotMetadatas(snpName, snpPath);
 
-            for (Map.Entry<Integer, Set<Integer>> e : parts.entrySet()) {
-                ofNullable(e.getValue()).orElse(Collections.emptySet())
-                    .forEach(p -> handled.add(new GroupPartitionId(e.getKey(), p)));
-            }
+            Function<GroupPartitionId, SnapshotMetadata> findMeta = pair -> {
+                for (SnapshotMetadata meta : metas) {
+                    Map<Integer, Set<Integer>> parts0 = meta.partitions();
 
-            snpSndr.init(handled.size());
+                    if (F.isEmpty(parts0))
+                        continue;
 
-            File snpDir = cctx.snapshotMgr().snapshotLocalDir(snpName, snpPath);
+                    Set<Integer> locParts = parts0.get(pair.getGroupId());
 
-            List<CompletableFuture<Void>> futs = new ArrayList<>();
-            List<SnapshotMetadata> metas = cctx.snapshotMgr().readSnapshotMetadatas(snpName, snpPath);
+                    if (locParts != null && locParts.contains(pair.getPartitionId()))
+                        return meta;
+                }
 
-            for (SnapshotMetadata meta : metas) {
-                Map<Integer, Set<Integer>> parts0 = meta.partitions();
+                return null;
+            };
 
-                if (F.isEmpty(parts0))
-                    continue;
+            Map<GroupPartitionId, SnapshotMetadata> partsToSend = new HashMap<>();
 
-                handled.removeIf(gp -> {
-                    if (ofNullable(parts0.get(gp.getGroupId()))
-                        .orElse(Collections.emptySet())
-                        .contains(gp.getPartitionId())
-                    ) {
-                        futs.add(CompletableFuture.runAsync(() -> {
-                            if (err.get() != null)
-                                return;
+            for (Map.Entry<Integer, Set<Integer>> e : parts.entrySet())
+                e.getValue().forEach(p -> partsToSend.computeIfAbsent(new GroupPartitionId(e.getKey(), p), findMeta));
 
-                            File cacheDir = cacheDirectory(new File(snpDir, databaseRelativePath(meta.folderName())),
-                                gp.getGroupId());
+            if (partsToSend.containsValue(null)) {
+                Collection<GroupPartitionId> missed = F.viewReadOnly(partsToSend.entrySet(), Map.Entry::getKey,
+                    e -> e.getValue() == null);
 
-                            if (cacheDir == null) {
-                                throw new IgniteException("Cache directory not found [snpName=" + snpName + ", meta=" + meta +
-                                    ", pair=" + gp + ']');
-                            }
+                err.compareAndSet(null, new IgniteException("Snapshot partitions missed on local node " +
+                    "[snpName=" + snpName + ", missed=" + missed + ']'));

Review Comment:
   > I assume that we should also stop executing this method at this point.
   
   Done, thanks
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite] xtern commented on a diff in pull request #10189: IGNITE-17502 Tasks to sent the snapshot files are not ordered

Posted by GitBox <gi...@apache.org>.
xtern commented on code in PR #10189:
URL: https://github.com/apache/ignite/pull/10189#discussion_r944281616


##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotResponseRemoteFutureTask.java:
##########
@@ -72,70 +74,65 @@ public SnapshotResponseRemoteFutureTask(
             return false;
 
         try {
-            List<GroupPartitionId> handled = new ArrayList<>();
+            List<SnapshotMetadata> metas = cctx.snapshotMgr().readSnapshotMetadatas(snpName, snpPath);
 
-            for (Map.Entry<Integer, Set<Integer>> e : parts.entrySet()) {
-                ofNullable(e.getValue()).orElse(Collections.emptySet())
-                    .forEach(p -> handled.add(new GroupPartitionId(e.getKey(), p)));
-            }
+            Function<GroupPartitionId, SnapshotMetadata> findMeta = pair -> {
+                for (SnapshotMetadata meta : metas) {
+                    Map<Integer, Set<Integer>> parts0 = meta.partitions();
 
-            snpSndr.init(handled.size());
+                    if (F.isEmpty(parts0))
+                        continue;
 
-            File snpDir = cctx.snapshotMgr().snapshotLocalDir(snpName, snpPath);
+                    Set<Integer> locParts = parts0.get(pair.getGroupId());
 
-            List<CompletableFuture<Void>> futs = new ArrayList<>();
-            List<SnapshotMetadata> metas = cctx.snapshotMgr().readSnapshotMetadatas(snpName, snpPath);
+                    if (locParts != null && locParts.contains(pair.getPartitionId()))
+                        return meta;
+                }
 
-            for (SnapshotMetadata meta : metas) {
-                Map<Integer, Set<Integer>> parts0 = meta.partitions();
+                return null;
+            };
 
-                if (F.isEmpty(parts0))
-                    continue;
+            Map<GroupPartitionId, SnapshotMetadata> partsToSend = new HashMap<>();
 
-                handled.removeIf(gp -> {
-                    if (ofNullable(parts0.get(gp.getGroupId()))
-                        .orElse(Collections.emptySet())
-                        .contains(gp.getPartitionId())
-                    ) {
-                        futs.add(CompletableFuture.runAsync(() -> {
-                            if (err.get() != null)
-                                return;
+            for (Map.Entry<Integer, Set<Integer>> e : parts.entrySet())
+                e.getValue().forEach(p -> partsToSend.computeIfAbsent(new GroupPartitionId(e.getKey(), p), findMeta));
 
-                            File cacheDir = cacheDirectory(new File(snpDir, databaseRelativePath(meta.folderName())),
-                                gp.getGroupId());
+            if (partsToSend.containsValue(null)) {
+                Collection<GroupPartitionId> missed = F.viewReadOnly(partsToSend.entrySet(), Map.Entry::getKey,
+                    e -> e.getValue() == null);
 
-                            if (cacheDir == null) {
-                                throw new IgniteException("Cache directory not found [snpName=" + snpName + ", meta=" + meta +
-                                    ", pair=" + gp + ']');
-                            }
+                err.compareAndSet(null, new IgniteException("Snapshot partitions missed on local node " +
+                    "[snpName=" + snpName + ", missed=" + missed + ']'));

Review Comment:
   Additionally, we can move this check higher and remove redundant data re-scan.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite] NSAmelchev commented on a diff in pull request #10189: IGNITE-17502 Tasks to sent the snapshot files are not ordered

Posted by GitBox <gi...@apache.org>.
NSAmelchev commented on code in PR #10189:
URL: https://github.com/apache/ignite/pull/10189#discussion_r944343283


##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFilesRequestMessage.java:
##########
@@ -91,9 +92,10 @@ public Map<Integer, Set<Integer>> parts() {
         Map<Integer, Set<Integer>> res = new HashMap<>();
 
         for (Map.Entry<Integer, int[]> e : parts.entrySet()) {
-            res.put(e.getKey(), e.getValue().length == 0 ? null : Arrays.stream(e.getValue())
-                .boxed()
-                .collect(Collectors.toSet()));
+            if (F.isEmpty(e.getValue()))

Review Comment:
   Done, thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite] xtern commented on a diff in pull request #10189: IGNITE-17502 Tasks to sent the snapshot files are not ordered

Posted by GitBox <gi...@apache.org>.
xtern commented on code in PR #10189:
URL: https://github.com/apache/ignite/pull/10189#discussion_r944280285


##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotResponseRemoteFutureTask.java:
##########
@@ -72,70 +74,65 @@ public SnapshotResponseRemoteFutureTask(
             return false;
 
         try {
-            List<GroupPartitionId> handled = new ArrayList<>();
+            List<SnapshotMetadata> metas = cctx.snapshotMgr().readSnapshotMetadatas(snpName, snpPath);
 
-            for (Map.Entry<Integer, Set<Integer>> e : parts.entrySet()) {
-                ofNullable(e.getValue()).orElse(Collections.emptySet())
-                    .forEach(p -> handled.add(new GroupPartitionId(e.getKey(), p)));
-            }
+            Function<GroupPartitionId, SnapshotMetadata> findMeta = pair -> {
+                for (SnapshotMetadata meta : metas) {
+                    Map<Integer, Set<Integer>> parts0 = meta.partitions();
 
-            snpSndr.init(handled.size());
+                    if (F.isEmpty(parts0))
+                        continue;
 
-            File snpDir = cctx.snapshotMgr().snapshotLocalDir(snpName, snpPath);
+                    Set<Integer> locParts = parts0.get(pair.getGroupId());
 
-            List<CompletableFuture<Void>> futs = new ArrayList<>();
-            List<SnapshotMetadata> metas = cctx.snapshotMgr().readSnapshotMetadatas(snpName, snpPath);
+                    if (locParts != null && locParts.contains(pair.getPartitionId()))
+                        return meta;
+                }
 
-            for (SnapshotMetadata meta : metas) {
-                Map<Integer, Set<Integer>> parts0 = meta.partitions();
+                return null;
+            };
 
-                if (F.isEmpty(parts0))
-                    continue;
+            Map<GroupPartitionId, SnapshotMetadata> partsToSend = new HashMap<>();
 
-                handled.removeIf(gp -> {
-                    if (ofNullable(parts0.get(gp.getGroupId()))
-                        .orElse(Collections.emptySet())
-                        .contains(gp.getPartitionId())
-                    ) {
-                        futs.add(CompletableFuture.runAsync(() -> {
-                            if (err.get() != null)
-                                return;
+            for (Map.Entry<Integer, Set<Integer>> e : parts.entrySet())
+                e.getValue().forEach(p -> partsToSend.computeIfAbsent(new GroupPartitionId(e.getKey(), p), findMeta));
 
-                            File cacheDir = cacheDirectory(new File(snpDir, databaseRelativePath(meta.folderName())),
-                                gp.getGroupId());
+            if (partsToSend.containsValue(null)) {
+                Collection<GroupPartitionId> missed = F.viewReadOnly(partsToSend.entrySet(), Map.Entry::getKey,
+                    e -> e.getValue() == null);
 
-                            if (cacheDir == null) {
-                                throw new IgniteException("Cache directory not found [snpName=" + snpName + ", meta=" + meta +
-                                    ", pair=" + gp + ']');
-                            }
+                err.compareAndSet(null, new IgniteException("Snapshot partitions missed on local node " +
+                    "[snpName=" + snpName + ", missed=" + missed + ']'));

Review Comment:
   I assume that we should also stop executing this method at this point.(



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotResponseRemoteFutureTask.java:
##########
@@ -72,70 +74,65 @@ public SnapshotResponseRemoteFutureTask(
             return false;
 
         try {
-            List<GroupPartitionId> handled = new ArrayList<>();
+            List<SnapshotMetadata> metas = cctx.snapshotMgr().readSnapshotMetadatas(snpName, snpPath);
 
-            for (Map.Entry<Integer, Set<Integer>> e : parts.entrySet()) {
-                ofNullable(e.getValue()).orElse(Collections.emptySet())
-                    .forEach(p -> handled.add(new GroupPartitionId(e.getKey(), p)));
-            }
+            Function<GroupPartitionId, SnapshotMetadata> findMeta = pair -> {
+                for (SnapshotMetadata meta : metas) {
+                    Map<Integer, Set<Integer>> parts0 = meta.partitions();
 
-            snpSndr.init(handled.size());
+                    if (F.isEmpty(parts0))
+                        continue;
 
-            File snpDir = cctx.snapshotMgr().snapshotLocalDir(snpName, snpPath);
+                    Set<Integer> locParts = parts0.get(pair.getGroupId());
 
-            List<CompletableFuture<Void>> futs = new ArrayList<>();
-            List<SnapshotMetadata> metas = cctx.snapshotMgr().readSnapshotMetadatas(snpName, snpPath);
+                    if (locParts != null && locParts.contains(pair.getPartitionId()))
+                        return meta;
+                }
 
-            for (SnapshotMetadata meta : metas) {
-                Map<Integer, Set<Integer>> parts0 = meta.partitions();
+                return null;
+            };
 
-                if (F.isEmpty(parts0))
-                    continue;
+            Map<GroupPartitionId, SnapshotMetadata> partsToSend = new HashMap<>();
 
-                handled.removeIf(gp -> {
-                    if (ofNullable(parts0.get(gp.getGroupId()))
-                        .orElse(Collections.emptySet())
-                        .contains(gp.getPartitionId())
-                    ) {
-                        futs.add(CompletableFuture.runAsync(() -> {
-                            if (err.get() != null)
-                                return;
+            for (Map.Entry<Integer, Set<Integer>> e : parts.entrySet())
+                e.getValue().forEach(p -> partsToSend.computeIfAbsent(new GroupPartitionId(e.getKey(), p), findMeta));
 
-                            File cacheDir = cacheDirectory(new File(snpDir, databaseRelativePath(meta.folderName())),
-                                gp.getGroupId());
+            if (partsToSend.containsValue(null)) {
+                Collection<GroupPartitionId> missed = F.viewReadOnly(partsToSend.entrySet(), Map.Entry::getKey,
+                    e -> e.getValue() == null);
 
-                            if (cacheDir == null) {
-                                throw new IgniteException("Cache directory not found [snpName=" + snpName + ", meta=" + meta +
-                                    ", pair=" + gp + ']');
-                            }
+                err.compareAndSet(null, new IgniteException("Snapshot partitions missed on local node " +
+                    "[snpName=" + snpName + ", missed=" + missed + ']'));

Review Comment:
   I assume that we should also stop executing this method at this point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite] NSAmelchev merged pull request #10189: IGNITE-17502 Tasks to sent the snapshot files are not ordered

Posted by GitBox <gi...@apache.org>.
NSAmelchev merged PR #10189:
URL: https://github.com/apache/ignite/pull/10189


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org