You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2022/12/20 16:35:27 UTC

[GitHub] [ignite-3] sanpwc commented on a diff in pull request #1426: IGNITE-18115 Populate DistributionZoneManager with MetaStorage listeners to logical topology events.

sanpwc commented on code in PR #1426:
URL: https://github.com/apache/ignite-3/pull/1426#discussion_r1053470125


##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -107,24 +120,30 @@ public void onTopologyLeap(LogicalTopologySnapshot newTopology) {
         }
     };
 
+    /** The logical topology on the last watch event. */
+    private Set<String> logicalTopology = Collections.emptySet();

Review Comment:
   Commonly, it's better to instantiate the state in one place only, meaning that it's better to move `= Collections.emptySet()` to constructor. 



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -137,7 +156,7 @@ public CompletableFuture<Void> createZone(DistributionZoneConfigurationParameter
         Objects.requireNonNull(distributionZoneCfg, "Distribution zone configuration is null.");
 
         if (!busyLock.enterBusy()) {
-            throw new IgniteException(new NodeStoppingException());
+            throw new IgniteException(NODE_STOPPING_ERR, new NodeStoppingException());

Review Comment:
   Here and in other places, please use withCause in order not to regenerate traceId.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -107,24 +120,30 @@ public void onTopologyLeap(LogicalTopologySnapshot newTopology) {
         }
     };
 
+    /** The logical topology on the last watch event. */
+    private Set<String> logicalTopology = Collections.emptySet();
+
+    /** Watch listener id to unregister the watch listener on {@link DistributionZoneManager#stop()}. */
+    private Long watchListenerId;

Review Comment:
   Same as above, this variable should be thread-safe. It's possible to you will try to set and read it from different threads.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -524,4 +540,140 @@ private void initMetaStorageKeysOnStart() {
             }
         });
     }
+
+    /**
+     * Initialises data nodes of distribution zones in meta storage
+     * from {@link DistributionZonesUtil#zonesLogicalTopologyKey()} in vault.
+     */
+    private void initDataNodesFromVaultManager() {
+        long vaultAppliedRevision = vaultAppliedRevision();
+
+        VaultEntry vaultEntry;
+
+        try {
+            vaultEntry = vaultMgr.get(zonesLogicalTopologyKey()).get();

Review Comment:
   Same, as above. It's not valid to call get() on futures in such cases. 
   There's no sense in going further with review until you fix it. 



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -524,4 +540,140 @@ private void initMetaStorageKeysOnStart() {
             }
         });
     }
+
+    /**
+     * Initialises data nodes of distribution zones in meta storage
+     * from {@link DistributionZonesUtil#zonesLogicalTopologyKey()} in vault.
+     */
+    private void initDataNodesFromVaultManager() {
+        long vaultAppliedRevision = vaultAppliedRevision();
+
+        VaultEntry vaultEntry;
+
+        try {
+            vaultEntry = vaultMgr.get(zonesLogicalTopologyKey()).get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new IgniteInternalException(UNEXPECTED_ERR, e);
+        }
+
+        if (vaultEntry != null && vaultEntry.value() != null) {
+            logicalTopology = ByteUtils.fromBytes(vaultEntry.value());
+
+            zonesConfiguration.distributionZones().value().namedListKeys()
+                    .forEach(zoneName -> {
+                        int zoneId = zonesConfiguration.distributionZones().get(zoneName).zoneId().value();
+
+                        saveDataNodesToMetaStorage(zoneId, toBytes(logicalTopology), vaultAppliedRevision);
+                    });
+        }
+    }
+
+    /**
+     * Registers {@link WatchListener} which updates data nodes of distribution zones on logical topology changing event.
+     */
+    private void registerMetaStorageWatchListener() {
+        metaStorageManager.registerWatch(zonesLogicalTopologyKey(), new WatchListener() {
+                    @Override
+                    public boolean onUpdate(@NotNull WatchEvent evt) {
+                        if (!busyLock.enterBusy()) {
+                            throw new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
+                        }
+
+                        try {
+                            assert evt.single();
+
+                            Entry newEntry = evt.entryEvent().newEntry();
+
+                            Set<String> newLogicalTopology = ByteUtils.fromBytes(newEntry.value());
+
+                            List<String> removedNodes =
+                                    logicalTopology.stream().filter(node -> !newLogicalTopology.contains(node)).collect(toList());
+
+                            List<String> addedNodes =
+                                    newLogicalTopology.stream().filter(node -> !logicalTopology.contains(node)).collect(toList());
+
+                            logicalTopology = newLogicalTopology;
+
+                            zonesConfiguration.distributionZones().value().namedListKeys()
+                                    .forEach(zoneName -> {
+                                        DistributionZoneConfiguration zoneCfg = zonesConfiguration.distributionZones().get(zoneName);
+
+                                        int autoAdjust = zoneCfg.dataNodesAutoAdjust().value();
+                                        int autoAdjustScaleDown = zoneCfg.dataNodesAutoAdjustScaleDown().value();
+                                        int autoAdjustScaleUp = zoneCfg.dataNodesAutoAdjustScaleUp().value();
+
+                                        Integer zoneId = zoneCfg.zoneId().value();
+
+                                        if ((!addedNodes.isEmpty() || !removedNodes.isEmpty()) && autoAdjust != Integer.MAX_VALUE) {
+                                            //TODO: IGNITE-18134 Create scheduler with dataNodesAutoAdjust timer.
+                                            saveDataNodesToMetaStorage(
+                                                    zoneId, newEntry.value(), newEntry.revision()
+                                            );
+                                        } else {
+                                            if (!addedNodes.isEmpty() && autoAdjustScaleUp != Integer.MAX_VALUE) {
+                                                //TODO: IGNITE-18121 Create scale up scheduler with dataNodesAutoAdjustScaleUp timer.
+                                                saveDataNodesToMetaStorage(
+                                                        zoneId, newEntry.value(), newEntry.revision()
+                                                );
+                                            }
+
+                                            if (!removedNodes.isEmpty() && autoAdjustScaleDown != Integer.MAX_VALUE) {
+                                                //TODO: IGNITE-18132 Create scale down scheduler with dataNodesAutoAdjustScaleDown timer.
+                                                saveDataNodesToMetaStorage(
+                                                        zoneId, newEntry.value(), newEntry.revision()
+                                                );
+                                            }
+                                        }
+                                    });
+
+                            return true;
+                        } finally {
+                            busyLock.leaveBusy();
+                        }
+                    }
+
+                    @Override
+                    public void onError(@NotNull Throwable e) {
+                        LOG.warn("Unable to process logical topology event", e);
+                    }
+                })
+                .thenAccept(id -> watchListenerId = id);
+    }
+
+    /**
+     * Returns applied vault revision.
+     *
+     * @return Applied vault revision.
+     */
+    private long vaultAppliedRevision() {
+        try {
+            return vaultMgr.get(APPLIED_REV)
+                    .thenApply(appliedRevision -> appliedRevision == null ? 0L : bytesToLong(appliedRevision.value()))
+                    .get();

Review Comment:
   We should never-ever call get() or join() on futures in such cases. It's an important one.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -107,24 +120,30 @@ public void onTopologyLeap(LogicalTopologySnapshot newTopology) {
         }
     };
 
+    /** The logical topology on the last watch event. */
+    private Set<String> logicalTopology = Collections.emptySet();

Review Comment:
   This variable should be thread safe, it's possible that you will init in one thread(node startup thread) and will try to read in watch thread, thus visibility is broken.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org