You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by GitBox <gi...@apache.org> on 2020/06/26 14:20:14 UTC

[GitHub] [storm] kishorvpatil opened a new pull request #3295: STORM-3660: Remove use of queues for updating credentials

kishorvpatil opened a new pull request #3295:
URL: https://github.com/apache/storm/pull/3295


   ## What is the purpose of the change
   - The change would eliminate use of ___credentials_ stream  and corresponding control message to invoke _ICredentialsListener_ method on component. Instead now each Executor has its own flag where need for refreshing credentials is flagged by worker any time there is change in credentials.
   
   ## How was the change tested
   
   After launching topology, wait for workertokens to be refreshed by nimbus, I noticed below changes in the worker log suggesting each executor has received the newer worker tokens:
   ```2020-06-25 18:07:43.288 o.a.s.s.a.ClientAuthUtils main-EventThread [INFO] Replaced WorkerToken for service type NIMBUS
   2020-06-25 18:07:43.289 o.a.s.s.a.ClientAuthUtils main-EventThread [INFO] Replaced WorkerToken for service type DRPC
   2020-06-25 18:07:43.289 o.a.s.s.a.ClientAuthUtils main-EventThread [INFO] Replaced WorkerToken for service type SUPERVISOR
   2020-06-25 18:07:43.289 o.a.s.e.Executor Thread-18-split-executor[10, 10] [INFO] The credentials are being updated [10, 10].
   2020-06-25 18:07:43.289 o.a.s.e.Executor Thread-25-count-executor[3, 3] [INFO] The credentials are being updated [3, 3].
   2020-06-25 18:07:43.294 o.a.s.e.Executor Thread-29-split-executor[9, 9] [INFO] The credentials are being updated [9, 9].
   2020-06-25 18:07:43.296 o.a.s.e.Executor Thread-20-count-executor[6, 6] [INFO] The credentials are being updated [6, 6].
   2020-06-25 18:07:43.298 o.a.s.e.Executor Thread-26-__acker-executor[1, 1] [INFO] The credentials are being updated [1, 1].
   2020-06-25 18:07:43.298 o.a.s.e.Executor Thread-19-split-executor[8, 8] [INFO] The credentials are being updated [8, 8].
   2020-06-25 18:07:43.309 o.a.s.e.Executor Thread-28-spout-executor[11, 11] [INFO] The credentials are being updated [11, 11].
   2020-06-25 18:07:43.309 o.a.s.e.Executor Thread-21-count-executor[4, 4] [INFO] The credentials are being updated [4, 4].
   2020-06-25 18:07:43.309 o.a.s.e.Executor Thread-16-spout-executor[14, 14] [INFO] The credentials are being updated [14, 14].
   2020-06-25 18:07:43.309 o.a.s.e.Executor Thread-17-spout-executor[12, 12] [INFO] The credentials are being updated [12, 12].
   2020-06-25 18:07:43.311 o.a.s.e.Executor Thread-27-spout-executor[13, 13] [INFO] The credentials are being updated [13, 13].
   2020-06-25 18:07:43.316 o.a.s.e.Executor Thread-23-split-executor[7, 7] [INFO] The credentials are being updated [7, 7].
   2020-06-25 18:07:43.316 o.a.s.e.Executor Thread-24-count-executor[5, 5] [INFO] The credentials are being updated [5, 5].```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] Ethanlm commented on a change in pull request #3295: STORM-3660: Remove use of queues for updating credentials

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3295:
URL: https://github.com/apache/storm/pull/3295#discussion_r452390529



##########
File path: storm-client/src/jvm/org/apache/storm/executor/Executor.java
##########
@@ -124,6 +126,7 @@
     protected int idToTaskBase;
     protected String hostname;
     private static final double msDurationFactor = 1.0 / TimeUnit.MILLISECONDS.toNanos(1);
+    private AtomicBoolean needToRefreshCreds = new AtomicBoolean(true);

Review comment:
       The initial value should be `false` to avoid unnecessary update at the beginning.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] kishorvpatil commented on a change in pull request #3295: STORM-3660: Remove use of queues for updating credentials

Posted by GitBox <gi...@apache.org>.
kishorvpatil commented on a change in pull request #3295:
URL: https://github.com/apache/storm/pull/3295#discussion_r448381948



##########
File path: storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
##########
@@ -415,13 +414,13 @@ public void updateBlobUpdates() throws IOException {
 
     public void checkCredentialsChanged() {
         Credentials newCreds = workerState.stormClusterState.credentials(topologyId, null);
-        if (!ObjectUtils.equals(newCreds, credentialsAtom.get())) {
+        if (!ObjectUtils.equals(newCreds, this.workerState.credentialsAtom.get())) {
             // This does not have to be atomic, worst case we update when one is not needed
             ClientAuthUtils.updateSubject(subject, autoCreds, (null == newCreds) ? null : newCreds.get_creds());
             for (IRunningExecutor executor : executorsAtom.get()) {
                 executor.credentialsChanged(newCreds);
             }
-            credentialsAtom.set(newCreds);
+            this.workerState.credentialsAtom.set(newCreds);

Review comment:
       Moved this up before for loop.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] kishorvpatil commented on a change in pull request #3295: STORM-3660: Remove use of queues for updating credentials

Posted by GitBox <gi...@apache.org>.
kishorvpatil commented on a change in pull request #3295:
URL: https://github.com/apache/storm/pull/3295#discussion_r448384044



##########
File path: storm-client/src/jvm/org/apache/storm/executor/Executor.java
##########
@@ -270,6 +273,15 @@ public ExecutorShutdown execute() throws Exception {
 
     @Override
     public void accept(Object event) {
+        if (this.needToRefreshCreds) {
+            this.needToRefreshCreds = false;

Review comment:
       The _needToRefreshCreds_ could be updated to _true_ anytime after if condition is evaluated by the worker credentials refresh thread. If there are consecutive updates, it will either execute the _setCredentials_ twice or will execute only once with latest credentials - no harm/no deadlock.   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] Ethanlm commented on a change in pull request #3295: STORM-3660: Remove use of queues for updating credentials

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3295:
URL: https://github.com/apache/storm/pull/3295#discussion_r452389342



##########
File path: storm-client/src/jvm/org/apache/storm/executor/Executor.java
##########
@@ -124,6 +126,7 @@
     protected int idToTaskBase;
     protected String hostname;
     private static final double msDurationFactor = 1.0 / TimeUnit.MILLISECONDS.toNanos(1);
+    private boolean needToRefreshCreds = true;

Review comment:
       If only simple `get()` and `set()` methods are used in `AtomicBoolean`, it is basically the same as using `volatile boolean`. 
   https://github.com/AdoptOpenJDK/openjdk-jdk8u/blob/master/jdk/src/share/classes/java/util/concurrent/atomic/AtomicBoolean.java#L85-L87 (although the implementation uses `volatile int`)
   
   unless methods like `compareAndSet` and `getAndSet` are required. 
   
   But anyway, using `AtomicBoolean` is easier to understand.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] kishorvpatil commented on a change in pull request #3295: STORM-3660: Remove use of queues for updating credentials

Posted by GitBox <gi...@apache.org>.
kishorvpatil commented on a change in pull request #3295:
URL: https://github.com/apache/storm/pull/3295#discussion_r448381511



##########
File path: storm-client/src/jvm/org/apache/storm/executor/Executor.java
##########
@@ -124,6 +126,7 @@
     protected int idToTaskBase;
     protected String hostname;
     private static final double msDurationFactor = 1.0 / TimeUnit.MILLISECONDS.toNanos(1);
+    protected boolean needToRefreshCreds = true;

Review comment:
       Addressed

##########
File path: storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
##########
@@ -152,6 +153,7 @@
     private final AtomicLong nextLoadUpdate = new AtomicLong(0);
     private final boolean trySerializeLocal;
     private final Collection<IAutoCredentials> autoCredentials;
+    AtomicReference<Credentials> credentialsAtom;

Review comment:
       Addressed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] Ethanlm commented on a change in pull request #3295: STORM-3660: Remove use of queues for updating credentials

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3295:
URL: https://github.com/apache/storm/pull/3295#discussion_r452389342



##########
File path: storm-client/src/jvm/org/apache/storm/executor/Executor.java
##########
@@ -124,6 +126,7 @@
     protected int idToTaskBase;
     protected String hostname;
     private static final double msDurationFactor = 1.0 / TimeUnit.MILLISECONDS.toNanos(1);
+    private boolean needToRefreshCreds = true;

Review comment:
       If only simple `get()` and `set()` methods are used in `AtomicBoolean`, it is basically the same as using `volatile boolean` unless methods like `compareAndSet` and `getAndSet` are required. 
   https://github.com/AdoptOpenJDK/openjdk-jdk8u/blob/master/jdk/src/share/classes/java/util/concurrent/atomic/AtomicBoolean.java#L85-L87 (although the implementation uses `volatile int`)
   
   
   But anyway, using `AtomicBoolean` is easier to understand.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] kishorvpatil commented on a change in pull request #3295: STORM-3660: Remove use of queues for updating credentials

Posted by GitBox <gi...@apache.org>.
kishorvpatil commented on a change in pull request #3295:
URL: https://github.com/apache/storm/pull/3295#discussion_r446379664



##########
File path: storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
##########
@@ -60,16 +60,7 @@ public ExecutorStats renderStats() {
 
     @Override
     public void credentialsChanged(Credentials credentials) {
-        TupleImpl tuple = new TupleImpl(executor.getWorkerTopologyContext(), new Values(credentials),
-                                        Constants.SYSTEM_COMPONENT_ID, (int) Constants.SYSTEM_TASK_ID,
-                                        Constants.CREDENTIALS_CHANGED_STREAM_ID);
-        AddressedTuple addressedTuple = new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple);
-        try {
-            executor.getReceiveQueue().publish(addressedTuple);
-            executor.getReceiveQueue().flush();
-        } catch (InterruptedException e) {
-            throw new RuntimeException(e);
-        }
+        executor.needToRefreshCreds = true;

Review comment:
       Since the _SpoutExecutor_ and _BoltExecutor_ during init uses the initialCredentials,  the _accept_ method does not need to worry about it. 

##########
File path: storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
##########
@@ -60,16 +60,7 @@ public ExecutorStats renderStats() {
 
     @Override
     public void credentialsChanged(Credentials credentials) {
-        TupleImpl tuple = new TupleImpl(executor.getWorkerTopologyContext(), new Values(credentials),
-                                        Constants.SYSTEM_COMPONENT_ID, (int) Constants.SYSTEM_TASK_ID,
-                                        Constants.CREDENTIALS_CHANGED_STREAM_ID);
-        AddressedTuple addressedTuple = new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple);
-        try {
-            executor.getReceiveQueue().publish(addressedTuple);
-            executor.getReceiveQueue().flush();
-        } catch (InterruptedException e) {
-            throw new RuntimeException(e);
-        }
+        executor.needToRefreshCreds = true;

Review comment:
       e.g. https://github.com/apache/storm/blob/master/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java#L144-L146
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] Ethanlm commented on a change in pull request #3295: STORM-3660: Remove use of queues for updating credentials

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3295:
URL: https://github.com/apache/storm/pull/3295#discussion_r452389342



##########
File path: storm-client/src/jvm/org/apache/storm/executor/Executor.java
##########
@@ -124,6 +126,7 @@
     protected int idToTaskBase;
     protected String hostname;
     private static final double msDurationFactor = 1.0 / TimeUnit.MILLISECONDS.toNanos(1);
+    private boolean needToRefreshCreds = true;

Review comment:
       If only simple `get()` and `set()` methods are used in `AtomicBoolean`, it is basically the same as using `violate boolean`. 
   https://github.com/AdoptOpenJDK/openjdk-jdk8u/blob/master/jdk/src/share/classes/java/util/concurrent/atomic/AtomicBoolean.java#L85-L87
   
   unless methods like `compareAndSet` and `getAndSet` are required. 
   
   But anyway, using `AtomicBoolean` is easier to understand.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] kishorvpatil commented on a change in pull request #3295: STORM-3660: Remove use of queues for updating credentials

Posted by GitBox <gi...@apache.org>.
kishorvpatil commented on a change in pull request #3295:
URL: https://github.com/apache/storm/pull/3295#discussion_r448387076



##########
File path: storm-client/src/jvm/org/apache/storm/executor/Executor.java
##########
@@ -270,6 +273,15 @@ public ExecutorShutdown execute() throws Exception {
 
     @Override
     public void accept(Object event) {
+        if (this.needToRefreshCreds) {

Review comment:
       moved this to separate method and being called fro withing `BoltExecutor` and `SpoutExecutor` call at the beginning of each iteration in the asyncloop. This would ensure that credentials are updated on executor irrespective of back pressure or no tuples to process scenarios. For backward compatibility and agreement about invoking _setCreentials_ on user implementation on _ICredentialsListener_ from executor thread, if Executor thread is stuck in _execute_ or _nextTuple_  methods delays can not be avoided. But otherwise we should be able to get credentials update on next iteration.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad commented on a change in pull request #3295: STORM-3660: Remove use of queues for updating credentials

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3295:
URL: https://github.com/apache/storm/pull/3295#discussion_r446255031



##########
File path: storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
##########
@@ -60,16 +60,7 @@ public ExecutorStats renderStats() {
 
     @Override
     public void credentialsChanged(Credentials credentials) {
-        TupleImpl tuple = new TupleImpl(executor.getWorkerTopologyContext(), new Values(credentials),
-                                        Constants.SYSTEM_COMPONENT_ID, (int) Constants.SYSTEM_TASK_ID,
-                                        Constants.CREDENTIALS_CHANGED_STREAM_ID);
-        AddressedTuple addressedTuple = new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple);
-        try {
-            executor.getReceiveQueue().publish(addressedTuple);
-            executor.getReceiveQueue().flush();
-        } catch (InterruptedException e) {
-            throw new RuntimeException(e);
-        }
+        executor.needToRefreshCreds = true;

Review comment:
       Minor: Does this method name and signature needs to change since it is setting a flag and not using the supplied parameter?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] kishorvpatil commented on a change in pull request #3295: STORM-3660: Remove use of queues for updating credentials

Posted by GitBox <gi...@apache.org>.
kishorvpatil commented on a change in pull request #3295:
URL: https://github.com/apache/storm/pull/3295#discussion_r452306573



##########
File path: storm-client/src/jvm/org/apache/storm/executor/Executor.java
##########
@@ -124,6 +126,7 @@
     protected int idToTaskBase;
     protected String hostname;
     private static final double msDurationFactor = 1.0 / TimeUnit.MILLISECONDS.toNanos(1);
+    private boolean needToRefreshCreds = true;

Review comment:
       Since multiple threads are changing this flag- switching to _AtomicBoolean_.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] kishorvpatil commented on a change in pull request #3295: STORM-3660: Remove use of queues for updating credentials

Posted by GitBox <gi...@apache.org>.
kishorvpatil commented on a change in pull request #3295:
URL: https://github.com/apache/storm/pull/3295#discussion_r453078872



##########
File path: storm-client/src/jvm/org/apache/storm/executor/Executor.java
##########
@@ -124,6 +126,7 @@
     protected int idToTaskBase;
     protected String hostname;
     private static final double msDurationFactor = 1.0 / TimeUnit.MILLISECONDS.toNanos(1);
+    private AtomicBoolean needToRefreshCreds = new AtomicBoolean(true);

Review comment:
       Fixed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] agresch commented on a change in pull request #3295: STORM-3660: Remove use of queues for updating credentials

Posted by GitBox <gi...@apache.org>.
agresch commented on a change in pull request #3295:
URL: https://github.com/apache/storm/pull/3295#discussion_r446236425



##########
File path: storm-client/src/jvm/org/apache/storm/executor/Executor.java
##########
@@ -270,6 +273,15 @@ public ExecutorShutdown execute() throws Exception {
 
     @Override
     public void accept(Object event) {
+        if (this.needToRefreshCreds) {
+            LOG.info("The credentials are being updated {}.", executorId);
+            Credentials creds = this.workerData.getCredentials();
+            idToTask.stream().map(Task::getTaskObject).filter(taskObject -> taskObject instanceof ICredentialsListener)
+                    .forEach(taskObject -> {
+                        ((ICredentialsListener) taskObject).setCredentials(creds == null ? null : creds.get_creds());
+                    });
+            this.needToRefreshCreds = false;

Review comment:
       I think maybe we should first set  this.needToRefreshCreds to false and then update the creds.  Imagine we uploaded credentials twice in a row and this loaded the first creds and the thread paused.  It would then clear the update flag and miss the second credentials upload.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] kishorvpatil commented on a change in pull request #3295: STORM-3660: Remove use of queues for updating credentials

Posted by GitBox <gi...@apache.org>.
kishorvpatil commented on a change in pull request #3295:
URL: https://github.com/apache/storm/pull/3295#discussion_r446401963



##########
File path: storm-client/src/jvm/org/apache/storm/executor/Executor.java
##########
@@ -270,6 +273,15 @@ public ExecutorShutdown execute() throws Exception {
 
     @Override
     public void accept(Object event) {
+        if (this.needToRefreshCreds) {
+            LOG.info("The credentials are being updated {}.", executorId);
+            Credentials creds = this.workerData.getCredentials();
+            idToTask.stream().map(Task::getTaskObject).filter(taskObject -> taskObject instanceof ICredentialsListener)
+                    .forEach(taskObject -> {
+                        ((ICredentialsListener) taskObject).setCredentials(creds == null ? null : creds.get_creds());
+                    });
+            this.needToRefreshCreds = false;

Review comment:
       Addressed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] Ethanlm merged pull request #3295: STORM-3660: Remove use of queues for updating credentials

Posted by GitBox <gi...@apache.org>.
Ethanlm merged pull request #3295:
URL: https://github.com/apache/storm/pull/3295


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] agresch commented on a change in pull request #3295: STORM-3660: Remove use of queues for updating credentials

Posted by GitBox <gi...@apache.org>.
agresch commented on a change in pull request #3295:
URL: https://github.com/apache/storm/pull/3295#discussion_r450241225



##########
File path: storm-client/src/jvm/org/apache/storm/executor/Executor.java
##########
@@ -291,6 +294,22 @@ public void accept(Object event) {
         }
     }
 
+    public void setNeedToRefreshCreds() {
+        this.needToRefreshCreds = true;
+    }
+
+    protected void updateExecCredsIfRequired() {
+        if (this.needToRefreshCreds) {
+            this.needToRefreshCreds = false;
+            LOG.info("The credentials are being updated {}.", executorId);

Review comment:
       I think info should be fine.  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] Ethanlm commented on a change in pull request #3295: STORM-3660: Remove use of queues for updating credentials

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3295:
URL: https://github.com/apache/storm/pull/3295#discussion_r448434269



##########
File path: storm-client/src/jvm/org/apache/storm/executor/Executor.java
##########
@@ -291,6 +294,22 @@ public void accept(Object event) {
         }
     }
 
+    public void setNeedToRefreshCreds() {
+        this.needToRefreshCreds = true;
+    }
+
+    protected void updateExecCredsIfRequired() {
+        if (this.needToRefreshCreds) {
+            this.needToRefreshCreds = false;
+            LOG.info("The credentials are being updated {}.", executorId);

Review comment:
       Maybe we should change it to be `DEBUG` since this is on a critical path (even though this `method` might not be called very frequently)

##########
File path: storm-client/src/jvm/org/apache/storm/executor/Executor.java
##########
@@ -124,6 +126,7 @@
     protected int idToTaskBase;
     protected String hostname;
     private static final double msDurationFactor = 1.0 / TimeUnit.MILLISECONDS.toNanos(1);
+    private boolean needToRefreshCreds = true;

Review comment:
       This needs to be `volatile ` to make sure the changes from other threads can be seen immediately 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org