You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2017/02/02 17:18:23 UTC

[1/4] storm git commit: STORM-2321 Handle blobstore zk key deletion in KeySequenceNumber

Repository: storm
Updated Branches:
  refs/heads/1.x-branch 90ce07ac6 -> 85ac6b82b


STORM-2321 Handle blobstore zk key deletion in KeySequenceNumber

* If NoNodeException is thrown in getKeySequenceNumber, treat it as KeyNotFoundException
* Change callers to handle KeyNotFoundException accordingly


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3f1e5cf9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3f1e5cf9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3f1e5cf9

Branch: refs/heads/1.x-branch
Commit: 3f1e5cf95c47ae9068d03acb52e2903dac99644c
Parents: 2a01dbc
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Jan 31 17:00:31 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Jan 31 17:23:35 2017 +0900

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/daemon/nimbus.clj  |  6 +++++-
 .../apache/storm/blobstore/BlobStoreUtils.java  |  6 +++++-
 .../storm/blobstore/BlobSynchronizer.java       | 13 +++++++++---
 .../storm/blobstore/KeySequenceNumber.java      | 21 ++++++++++++++------
 .../storm/blobstore/LocalFsBlobStore.java       |  4 +++-
 5 files changed, 38 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/3f1e5cf9/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index a05dabd..d9d71a1 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -1295,7 +1295,11 @@
       (.deleteBlob blob-store key nimbus-subject))
     (log-debug "Creating list of key entries for blobstore inside zookeeper" all-keys "local" locally-available-active-keys)
     (doseq [key locally-available-active-keys]
-      (.setup-blobstore! storm-cluster-state key (:nimbus-host-port-info nimbus) (get-version-for-key key nimbus-host-port-info conf)))))
+      (try
+        (.setup-blobstore! storm-cluster-state key (:nimbus-host-port-info nimbus) (get-version-for-key key nimbus-host-port-info conf))
+        (catch KeyNotFoundException _
+          ; invalid key, remove it from blobstore
+          (.deleteBlob blob-store key nimbus-subject))))))
 
 (defn- get-errors [storm-cluster-state storm-id component-id]
   (->> (.errors storm-cluster-state storm-id component-id)

http://git-wip-us.apache.org/repos/asf/storm/blob/3f1e5cf9/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java b/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java
index 0cac61f..71d286c 100644
--- a/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java
@@ -30,6 +30,7 @@ import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.ZookeeperAuthInfo;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.thrift.transport.TTransportException;
+import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -237,7 +238,7 @@ public class BlobStoreUtils {
             LOG.debug("StateInfo for update {}", stateInfo);
             Set<NimbusInfo> nimbusInfoList = getNimbodesWithLatestSequenceNumberOfBlob(zkClient, key);
 
-            for (NimbusInfo nimbusInfo:nimbusInfoList) {
+            for (NimbusInfo nimbusInfo : nimbusInfoList) {
                 if (nimbusInfo.getHost().equals(nimbusDetails.getHost())) {
                     isListContainsCurrentNimbusInfo = true;
                     break;
@@ -248,6 +249,9 @@ public class BlobStoreUtils {
                 LOG.debug("Updating state inside zookeeper for an update");
                 createStateInZookeeper(conf, key, nimbusDetails);
             }
+        } catch (KeeperException.NoNodeException | KeyNotFoundException e) {
+            //race condition with a delete
+            return;
         } catch (Exception exp) {
             throw new RuntimeException(exp);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/3f1e5cf9/storm-core/src/jvm/org/apache/storm/blobstore/BlobSynchronizer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/BlobSynchronizer.java b/storm-core/src/jvm/org/apache/storm/blobstore/BlobSynchronizer.java
index bd34c32..c6647ac 100644
--- a/storm-core/src/jvm/org/apache/storm/blobstore/BlobSynchronizer.java
+++ b/storm-core/src/jvm/org/apache/storm/blobstore/BlobSynchronizer.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.blobstore;
 
+import org.apache.storm.generated.KeyNotFoundException;
 import org.apache.storm.nimbus.NimbusInfo;
 import org.apache.curator.framework.CuratorFramework;
 import org.slf4j.Logger;
@@ -78,9 +79,15 @@ public class BlobSynchronizer {
             LOG.debug("Key set Blobstore-> Zookeeper-> DownloadSet {}-> {}-> {}", getBlobStoreKeySet(), getZookeeperKeySet(), keySetToDownload);
 
             for (String key : keySetToDownload) {
-                Set<NimbusInfo> nimbusInfoSet = BlobStoreUtils.getNimbodesWithLatestSequenceNumberOfBlob(zkClient, key);
-                if(BlobStoreUtils.downloadMissingBlob(conf, blobStore, key, nimbusInfoSet)) {
-                    BlobStoreUtils.createStateInZookeeper(conf, key, nimbusInfo);
+                try {
+                    Set<NimbusInfo> nimbusInfoSet = BlobStoreUtils.getNimbodesWithLatestSequenceNumberOfBlob(zkClient, key);
+                    if (BlobStoreUtils.downloadMissingBlob(conf, blobStore, key, nimbusInfoSet)) {
+                        BlobStoreUtils.createStateInZookeeper(conf, key, nimbusInfo);
+                    }
+                } catch (KeyNotFoundException e) {
+                    LOG.debug("Detected deletion for the key {} - deleting the blob instead", key);
+                    // race condition with a delete, delete the blob in key instead
+                    blobStore.deleteBlob(key, BlobStoreUtils.getNimbusSubject());
                 }
             }
             if (zkClient !=null) {

http://git-wip-us.apache.org/repos/asf/storm/blob/3f1e5cf9/storm-core/src/jvm/org/apache/storm/blobstore/KeySequenceNumber.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/KeySequenceNumber.java b/storm-core/src/jvm/org/apache/storm/blobstore/KeySequenceNumber.java
index 2425761..adbd4c4 100644
--- a/storm-core/src/jvm/org/apache/storm/blobstore/KeySequenceNumber.java
+++ b/storm-core/src/jvm/org/apache/storm/blobstore/KeySequenceNumber.java
@@ -18,10 +18,12 @@
 
 package org.apache.storm.blobstore;
 
+import org.apache.storm.generated.KeyNotFoundException;
 import org.apache.storm.nimbus.NimbusInfo;
 import org.apache.storm.utils.Utils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -130,12 +132,12 @@ public class KeySequenceNumber {
         this.nimbusInfo = nimbusInfo;
     }
 
-    public synchronized int getKeySequenceNumber(Map conf) {
+    public synchronized int getKeySequenceNumber(Map conf) throws KeyNotFoundException {
         TreeSet<Integer> sequenceNumbers = new TreeSet<Integer>();
         CuratorFramework zkClient = BlobStoreUtils.createZKClient(conf);
         try {
             // Key has not been created yet and it is the first time it is being created
-            if(zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + "/" + key) == null) {
+            if (zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + "/" + key) == null) {
                 zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
                         .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE + "/" + key);
                 zkClient.setData().forPath(BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE + "/" + key,
@@ -148,7 +150,7 @@ public class KeySequenceNumber {
             // if all go down which is unlikely. Hence there might be a need to update the blob if all go down.
             List<String> stateInfoList = zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + "/" + key);
             LOG.debug("stateInfoList-size {} stateInfoList-data {}", stateInfoList.size(), stateInfoList);
-            if(stateInfoList.isEmpty()) {
+            if (stateInfoList.isEmpty()) {
                 return getMaxSequenceNumber(zkClient);
             }
 
@@ -156,7 +158,7 @@ public class KeySequenceNumber {
             // In all other cases check for the latest update sequence of the blob on the nimbus
             // and assign the appropriate number. Check if all are have same sequence number,
             // if not assign the highest sequence number.
-            for (String stateInfo:stateInfoList) {
+            for (String stateInfo : stateInfoList) {
                 sequenceNumbers.add(Integer.parseInt(BlobStoreUtils.normalizeNimbusHostPortSequenceNumberInfo(stateInfo)
                         .getSequenceNumber()));
             }
@@ -195,15 +197,22 @@ public class KeySequenceNumber {
                     return sequenceNumbers.first() + 1;
                 }
             }
+
+            // Normal create update sync scenario returns the greatest sequence number in the set
+            return sequenceNumbers.last();
+        } catch (KeeperException.NoNodeException e) {
+            // there's a race condition with a delete: either blobstore or blobstoremaxsequence
+            // this should be thrown to the caller to indicate that the key is invalid now
+            throw new KeyNotFoundException(key);
         } catch(Exception e) {
+            // in other case, just set this to 0 to trigger re-sync later
             LOG.error("Exception {}", e);
+            return INITIAL_SEQUENCE_NUMBER - 1;
         } finally {
             if (zkClient != null) {
                 zkClient.close();
             }
         }
-        // Normal create update sync scenario returns the greatest sequence number in the set
-        return sequenceNumbers.last();
     }
 
     private boolean checkIfStateContainsCurrentNimbusHost(List<String> stateInfoList, NimbusInfo nimbusInfo) {

http://git-wip-us.apache.org/repos/asf/storm/blob/3f1e5cf9/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java b/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java
index 345d591..648f5cd 100644
--- a/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java
+++ b/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java
@@ -281,7 +281,7 @@ public class LocalFsBlobStore extends BlobStore {
     }
 
     //This additional check and download is for nimbus high availability in case you have more than one nimbus
-    public synchronized boolean checkForBlobOrDownload(String key) {
+    public synchronized boolean checkForBlobOrDownload(String key) throws KeyNotFoundException {
         boolean checkBlobDownload = false;
         try {
             List<String> keyList = BlobStoreUtils.getKeyListFromBlobStore(this);
@@ -295,6 +295,8 @@ public class LocalFsBlobStore extends BlobStore {
                     }
                 }
             }
+        } catch (KeyNotFoundException e) {
+            throw e;
         } catch (Exception e) {
             throw new RuntimeException(e);
         }


[3/4] storm git commit: Added STORM-2321 to CHANGELOG.

Posted by sr...@apache.org.
Added STORM-2321 to CHANGELOG.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/52ac6b68
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/52ac6b68
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/52ac6b68

Branch: refs/heads/1.x-branch
Commit: 52ac6b6822492a76479d58b3d02f0a75a0d4e617
Parents: 90ce07a
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Thu Feb 2 09:17:40 2017 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Thu Feb 2 09:17:40 2017 -0800

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/52ac6b68/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 8d4f627..6bcdcf5 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 1.1.0
+ * STORM-2321: Handle blobstore zk key deletion in KeySequenceNumber
  * STORM-2336: Close Localizer and AsyncLocalizer when supervisor is shutting down
  * STORM-2335: Fix broken Topology visualization with empty ':transferred' in executor stats
  * STORM-2331: Emitting from JavaScript should work when not anchoring.


[4/4] storm git commit: Merge branch 'STORM-2321' into 1.x-branch

Posted by sr...@apache.org.
Merge branch 'STORM-2321' into 1.x-branch


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/85ac6b82
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/85ac6b82
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/85ac6b82

Branch: refs/heads/1.x-branch
Commit: 85ac6b82bae03f7cef26666243ff1911c9e182cf
Parents: 52ac6b6 c276396
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Thu Feb 2 09:18:08 2017 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Thu Feb 2 09:18:08 2017 -0800

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/daemon/nimbus.clj  |  6 +++++-
 .../apache/storm/blobstore/BlobStoreUtils.java  |  6 +++++-
 .../storm/blobstore/BlobSynchronizer.java       | 13 +++++++++---
 .../storm/blobstore/KeySequenceNumber.java      | 21 ++++++++++++++------
 .../storm/blobstore/LocalFsBlobStore.java       |  4 +++-
 5 files changed, 38 insertions(+), 12 deletions(-)
----------------------------------------------------------------------



[2/4] storm git commit: Merge branch 'STORM-2321-1.x' of https://github.com/HeartSaVioR/storm into STORM-2321

Posted by sr...@apache.org.
Merge branch 'STORM-2321-1.x' of https://github.com/HeartSaVioR/storm into STORM-2321


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c2763960
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c2763960
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c2763960

Branch: refs/heads/1.x-branch
Commit: c276396046fed73070692970b6abd3c1653eea44
Parents: 90ce07a 3f1e5cf
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Thu Feb 2 09:09:02 2017 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Thu Feb 2 09:09:02 2017 -0800

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/daemon/nimbus.clj  |  6 +++++-
 .../apache/storm/blobstore/BlobStoreUtils.java  |  6 +++++-
 .../storm/blobstore/BlobSynchronizer.java       | 13 +++++++++---
 .../storm/blobstore/KeySequenceNumber.java      | 21 ++++++++++++++------
 .../storm/blobstore/LocalFsBlobStore.java       |  4 +++-
 5 files changed, 38 insertions(+), 12 deletions(-)
----------------------------------------------------------------------