You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2020/03/30 15:05:08 UTC
[storm] branch 2.1.x-branch updated: [STORM-3609] fix
ClassCastException when credentials are updated for ICredentialsListener
spout/bolt instances
This is an automated email from the ASF dual-hosted git repository.
ethanli pushed a commit to branch 2.1.x-branch
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/2.1.x-branch by this push:
new 8276d6e [STORM-3609] fix ClassCastException when credentials are updated for ICredentialsListener spout/bolt instances
8276d6e is described below
commit 8276d6ebbccac1bd1ec5fe1ea99536211b8a5f77
Author: Meng Li (Ethan) <et...@gmail.com>
AuthorDate: Thu Mar 26 16:19:08 2020 -0500
[STORM-3609] fix ClassCastException when credentials are updated for ICredentialsListener spout/bolt instances
---
storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java | 4 +++-
.../src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java | 4 +++-
2 files changed, 6 insertions(+), 2 deletions(-)
diff --git a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
index 4fbf1b3..dd62e6a 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
@@ -28,6 +28,7 @@ import org.apache.storm.daemon.metrics.BuiltinMetrics;
import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
import org.apache.storm.daemon.worker.WorkerState;
import org.apache.storm.executor.Executor;
+import org.apache.storm.generated.Credentials;
import org.apache.storm.generated.NodeInfo;
import org.apache.storm.hooks.info.BoltExecuteInfo;
import org.apache.storm.messaging.IConnection;
@@ -218,7 +219,8 @@ public class BoltExecutor extends Executor {
} else if (Constants.CREDENTIALS_CHANGED_STREAM_ID.equals(streamId)) {
Object taskObject = idToTask.get(taskId - idToTaskBase).getTaskObject();
if (taskObject instanceof ICredentialsListener) {
- ((ICredentialsListener) taskObject).setCredentials((Map<String, String>) tuple.getValue(0));
+ Credentials creds = (Credentials) tuple.getValue(0);
+ ((ICredentialsListener) taskObject).setCredentials(creds == null ? null : creds.get_creds());
}
} else {
IBolt boltObject = (IBolt) idToTask.get(taskId - idToTaskBase).getTaskObject();
diff --git a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
index c10ab2e..cb2af7f 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
@@ -30,6 +30,7 @@ import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics;
import org.apache.storm.daemon.worker.WorkerState;
import org.apache.storm.executor.Executor;
import org.apache.storm.executor.TupleInfo;
+import org.apache.storm.generated.Credentials;
import org.apache.storm.hooks.info.SpoutAckInfo;
import org.apache.storm.hooks.info.SpoutFailInfo;
import org.apache.storm.policy.IWaitStrategy;
@@ -300,7 +301,8 @@ public class SpoutExecutor extends Executor {
} else if (streamId.equals(Constants.CREDENTIALS_CHANGED_STREAM_ID)) {
Object spoutObj = idToTask.get(taskId - idToTaskBase).getTaskObject();
if (spoutObj instanceof ICredentialsListener) {
- ((ICredentialsListener) spoutObj).setCredentials((Map<String, String>) tuple.getValue(0));
+ Credentials creds = (Credentials) tuple.getValue(0);
+ ((ICredentialsListener) spoutObj).setCredentials(creds == null ? null : creds.get_creds());
}
} else if (streamId.equals(Acker.ACKER_RESET_TIMEOUT_STREAM_ID)) {
Long id = (Long) tuple.getValue(0);