You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by GitBox <gi...@apache.org> on 2020/06/29 15:23:58 UTC

[GitHub] [storm] Ethanlm commented on a change in pull request #3297: STORM-3662: Use Pacemaker if cluster has set up uses it

Ethanlm commented on a change in pull request #3297:
URL: https://github.com/apache/storm/pull/3297#discussion_r447054327



##########
File path: storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
##########
@@ -215,8 +215,11 @@ private Object loadWorker(IStateStorage stateStorage, IStormClusterState stormCl
                 }
             });
 
+        Integer execHeartBeatFreqSecs = workerState.stormClusterState.isPacemakerStateStore()
+            ? (Integer) conf.get(Config.TASK_HEARTBEAT_FREQUENCY_SECS)

Review comment:
       We might want to update the comment here https://github.com/apache/storm/blob/master/storm-client/src/jvm/org/apache/storm/Config.java#L1678-L1684
   since this config is no longer deprecated. and it is used when Pacemaker is in use.

##########
File path: storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
##########
@@ -74,6 +74,13 @@
      */
     boolean isAssignmentsBackendSynchronized();
 
+    /**
+     * Flag to indicate if the Pacameker is backup store.

Review comment:
       Do you mean `backend store`? 

##########
File path: storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
##########
@@ -361,7 +364,11 @@ public void doHeartBeat() throws IOException {
         state.setWorkerHeartBeat(lsWorkerHeartbeat);
         state.cleanup(60); // this is just in case supervisor is down so that disk doesn't fill up.
         // it shouldn't take supervisor 120 seconds between listing dir and reading it
-        heartbeatToMasterIfLocalbeatFail(lsWorkerHeartbeat);
+
+        if (!workerState.stormClusterState.isPacemakerStateStore()) {
+            LOG.debug("If pacemaker is not used, send supervisor");

Review comment:
       We should delete `If`. Since if this message is logged, it means pacemaker is not being used.

##########
File path: storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
##########
@@ -377,15 +378,31 @@ public StormTimer getUserTimer() {
     public SmartThread makeTransferThread() {
         return workerTransfer.makeTransferThread();
     }
-    
+
+    public void suicideIfLocalAssignmentsChanged(Assignment assignment) {
+        if (assignment != null) {
+            Set<List<Long>> assignedExecutors = new HashSet<>(readWorkerExecutors(assignmentId, port, assignment));
+            if (!localExecutors.equals(assignedExecutors)) {
+                LOG.info("Found conflicting assignments. We shouldn't be alive!"
+                         + " Assigned: " + assignedExecutors + ", Current: "
+                         + localExecutors);
+                if (!ConfigUtils.isLocalMode(conf)) {
+                    suicideCallback.run();
+                } else {
+                    LOG.info("Local worker tried to commit suicide!");
+                }
+            }
+        }
+    }
+
     public void refreshConnections() {
         Assignment assignment = null;
         try {
             assignment = getLocalAssignment(stormClusterState, topologyId);
         } catch (Exception e) {
             LOG.warn("Failed to read assignment. This should only happen when topology is shutting down.", e);
         }
-
+        suicideIfLocalAssignmentsChanged(assignment);

Review comment:
       We can remove this from this PR since it is being addressed in your other PR https://github.com/apache/storm/pull/3291




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org