You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/09/20 17:51:14 UTC

[1/3] storm git commit: STORM-3230: Add in sync if key not found

Repository: storm
Updated Branches:
  refs/heads/master f72ef8331 -> b2a1a5872


STORM-3230: Add in sync if key not found


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/83907136
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/83907136
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/83907136

Branch: refs/heads/master
Commit: 839071367754338b83096e9d1f908f859e9d5493
Parents: 6d0f2eb
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Wed Sep 19 10:05:23 2018 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Wed Sep 19 10:05:23 2018 -0500

----------------------------------------------------------------------
 .../org/apache/storm/cluster/StormClusterStateImpl.java  | 11 ++++++++++-
 .../security/auth/workertoken/WorkerTokenAuthorizer.java |  9 ++++++---
 2 files changed, 16 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/83907136/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
index 69d25f4..1468868 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
@@ -840,7 +840,14 @@ public class StormClusterStateImpl implements IStormClusterState {
     @Override
     public PrivateWorkerKey getPrivateWorkerKey(WorkerTokenServiceType type, String topologyId, long keyVersion) {
         String path = ClusterUtils.secretKeysPath(type, topologyId, keyVersion);
-        return ClusterUtils.maybeDeserialize(stateStorage.get_data(path, false), PrivateWorkerKey.class);
+        byte[] data = stateStorage.get_data(path, false);
+        if (data == null) {
+            LOG.debug("Could not find entry at {} will sync to see if that fixes it", path);
+            //We didn't find it, but there are races, so we want to check again after a sync
+            stateStorage.sync_path(path);
+            data = stateStorage.get_data(path, false);
+        }
+        return ClusterUtils.maybeDeserialize(data, PrivateWorkerKey.class);
     }
 
     @Override
@@ -879,6 +886,7 @@ public class StormClusterStateImpl implements IStormClusterState {
                         PrivateWorkerKey key =
                             ClusterUtils.maybeDeserialize(stateStorage.get_data(fullPath, false), PrivateWorkerKey.class);
                         if (Time.currentTimeMillis() > key.get_expirationTimeMillis()) {
+                            LOG.debug("Removing expired worker key {}", fullPath);
                             stateStorage.delete_node(fullPath);
                         }
                     } catch (RuntimeException e) {
@@ -903,6 +911,7 @@ public class StormClusterStateImpl implements IStormClusterState {
         for (WorkerTokenServiceType type : WorkerTokenServiceType.values()) {
             String path = ClusterUtils.secretKeysPath(type, topologyId);
             try {
+                LOG.debug("Removing worker keys under {}", path);
                 stateStorage.delete_node(path);
             } catch (RuntimeException e) {
                 //This should never happen because only the primary nimbus is active, but just in case

http://git-wip-us.apache.org/repos/asf/storm/blob/83907136/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java b/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java
index 7144c79..3919cb7 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java
@@ -95,9 +95,12 @@ public class WorkerTokenAuthorizer implements PasswordProvider {
             throw new IllegalArgumentException("Token is not valid, token has expired.");
         }
 
-        PrivateWorkerKey key = keyCache.getUnchecked(deser);
-        if (key == null) {
-            throw new IllegalArgumentException("Token is not valid, private key not found.");
+        PrivateWorkerKey key;
+        try {
+            key = keyCache.getUnchecked(deser);
+        } catch (CacheLoader.InvalidCacheLoadException e) {
+            //This happens when the cache has a null returned to it.
+            throw new IllegalArgumentException("Token is not valid, private key not found.", e);
         }
 
         if (key.is_set_expirationTimeMillis() && key.get_expirationTimeMillis() <= Time.currentTimeMillis()) {


[3/3] storm git commit: Merge branch 'STORM-3230' of https://github.com/revans2/incubator-storm into STORM-3230

Posted by bo...@apache.org.
Merge branch 'STORM-3230' of https://github.com/revans2/incubator-storm into STORM-3230

STORM-3230: Add in sync if key not found

This closes #2843


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b2a1a587
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b2a1a587
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b2a1a587

Branch: refs/heads/master
Commit: b2a1a5872bf78d516417b326f06f85774b0c4d48
Parents: f72ef83 f89349c
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Thu Sep 20 12:27:41 2018 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Thu Sep 20 12:27:41 2018 -0500

----------------------------------------------------------------------
 .../apache/storm/cluster/StormClusterStateImpl.java    | 13 +++++++++++--
 .../auth/workertoken/WorkerTokenAuthorizer.java        | 10 +++++++---
 2 files changed, 18 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b2a1a587/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java
----------------------------------------------------------------------


[2/3] storm git commit: STORM-3230: Addressed review comments

Posted by bo...@apache.org.
STORM-3230:  Addressed review comments


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f89349ca
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f89349ca
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f89349ca

Branch: refs/heads/master
Commit: f89349ca83662d088f02a3ea1382109982d8cf06
Parents: 8390713
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Wed Sep 19 13:54:12 2018 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Wed Sep 19 13:54:12 2018 -0500

----------------------------------------------------------------------
 .../jvm/org/apache/storm/cluster/StormClusterStateImpl.java    | 6 +++---
 .../storm/security/auth/workertoken/WorkerTokenAuthorizer.java | 3 ++-
 2 files changed, 5 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/f89349ca/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
index 1468868..644f465 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
@@ -856,7 +856,7 @@ public class StormClusterStateImpl implements IStormClusterState {
         stateStorage.mkdirs(ClusterUtils.SECRET_KEYS_SUBTREE, defaultAcls);
         List<ACL> secretAcls = context.getZkSecretAcls(type);
         String path = ClusterUtils.secretKeysPath(type, topologyId, keyVersion);
-        LOG.debug("Storing private key for {} connecting to a {} at {} with ACL {}\n\n", topologyId, type, path, secretAcls);
+        LOG.info("Storing private key for {} connecting to a {} at {} with ACL {}", topologyId, type, path, secretAcls);
         stateStorage.set_data(path, Utils.serialize(key), secretAcls);
     }
 
@@ -886,7 +886,7 @@ public class StormClusterStateImpl implements IStormClusterState {
                         PrivateWorkerKey key =
                             ClusterUtils.maybeDeserialize(stateStorage.get_data(fullPath, false), PrivateWorkerKey.class);
                         if (Time.currentTimeMillis() > key.get_expirationTimeMillis()) {
-                            LOG.debug("Removing expired worker key {}", fullPath);
+                            LOG.info("Removing expired worker key {}", fullPath);
                             stateStorage.delete_node(fullPath);
                         }
                     } catch (RuntimeException e) {
@@ -911,7 +911,7 @@ public class StormClusterStateImpl implements IStormClusterState {
         for (WorkerTokenServiceType type : WorkerTokenServiceType.values()) {
             String path = ClusterUtils.secretKeysPath(type, topologyId);
             try {
-                LOG.debug("Removing worker keys under {}", path);
+                LOG.info("Removing worker keys under {}", path);
                 stateStorage.delete_node(path);
             } catch (RuntimeException e) {
                 //This should never happen because only the primary nimbus is active, but just in case

http://git-wip-us.apache.org/repos/asf/storm/blob/f89349ca/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java b/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java
index 3919cb7..fcc5eeb 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java
@@ -99,7 +99,8 @@ public class WorkerTokenAuthorizer implements PasswordProvider {
         try {
             key = keyCache.getUnchecked(deser);
         } catch (CacheLoader.InvalidCacheLoadException e) {
-            //This happens when the cache has a null returned to it.
+            //This happens when the key is not found, the cache loader returns a null and this exception is thrown.
+            // because the cache cannot store a null.
             throw new IllegalArgumentException("Token is not valid, private key not found.", e);
         }