You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2017/02/02 17:18:23 UTC
[1/4] storm git commit: STORM-2321 Handle blobstore zk key deletion
in KeySequenceNumber
Repository: storm
Updated Branches:
refs/heads/1.x-branch 90ce07ac6 -> 85ac6b82b
STORM-2321 Handle blobstore zk key deletion in KeySequenceNumber
* If NoNodeException is thrown in getKeySequenceNumber, treat it as KeyNotFoundException
* Change callers to handle KeyNotFoundException accordingly
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3f1e5cf9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3f1e5cf9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3f1e5cf9
Branch: refs/heads/1.x-branch
Commit: 3f1e5cf95c47ae9068d03acb52e2903dac99644c
Parents: 2a01dbc
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Jan 31 17:00:31 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Jan 31 17:23:35 2017 +0900
----------------------------------------------------------------------
.../src/clj/org/apache/storm/daemon/nimbus.clj | 6 +++++-
.../apache/storm/blobstore/BlobStoreUtils.java | 6 +++++-
.../storm/blobstore/BlobSynchronizer.java | 13 +++++++++---
.../storm/blobstore/KeySequenceNumber.java | 21 ++++++++++++++------
.../storm/blobstore/LocalFsBlobStore.java | 4 +++-
5 files changed, 38 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/3f1e5cf9/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index a05dabd..d9d71a1 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -1295,7 +1295,11 @@
(.deleteBlob blob-store key nimbus-subject))
(log-debug "Creating list of key entries for blobstore inside zookeeper" all-keys "local" locally-available-active-keys)
(doseq [key locally-available-active-keys]
- (.setup-blobstore! storm-cluster-state key (:nimbus-host-port-info nimbus) (get-version-for-key key nimbus-host-port-info conf)))))
+ (try
+ (.setup-blobstore! storm-cluster-state key (:nimbus-host-port-info nimbus) (get-version-for-key key nimbus-host-port-info conf))
+ (catch KeyNotFoundException _
+ ; invalid key, remove it from blobstore
+ (.deleteBlob blob-store key nimbus-subject))))))
(defn- get-errors [storm-cluster-state storm-id component-id]
(->> (.errors storm-cluster-state storm-id component-id)
http://git-wip-us.apache.org/repos/asf/storm/blob/3f1e5cf9/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java b/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java
index 0cac61f..71d286c 100644
--- a/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java
@@ -30,6 +30,7 @@ import org.apache.storm.utils.Utils;
import org.apache.storm.utils.ZookeeperAuthInfo;
import org.apache.curator.framework.CuratorFramework;
import org.apache.thrift.transport.TTransportException;
+import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -237,7 +238,7 @@ public class BlobStoreUtils {
LOG.debug("StateInfo for update {}", stateInfo);
Set<NimbusInfo> nimbusInfoList = getNimbodesWithLatestSequenceNumberOfBlob(zkClient, key);
- for (NimbusInfo nimbusInfo:nimbusInfoList) {
+ for (NimbusInfo nimbusInfo : nimbusInfoList) {
if (nimbusInfo.getHost().equals(nimbusDetails.getHost())) {
isListContainsCurrentNimbusInfo = true;
break;
@@ -248,6 +249,9 @@ public class BlobStoreUtils {
LOG.debug("Updating state inside zookeeper for an update");
createStateInZookeeper(conf, key, nimbusDetails);
}
+ } catch (KeeperException.NoNodeException | KeyNotFoundException e) {
+ //race condition with a delete
+ return;
} catch (Exception exp) {
throw new RuntimeException(exp);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/3f1e5cf9/storm-core/src/jvm/org/apache/storm/blobstore/BlobSynchronizer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/BlobSynchronizer.java b/storm-core/src/jvm/org/apache/storm/blobstore/BlobSynchronizer.java
index bd34c32..c6647ac 100644
--- a/storm-core/src/jvm/org/apache/storm/blobstore/BlobSynchronizer.java
+++ b/storm-core/src/jvm/org/apache/storm/blobstore/BlobSynchronizer.java
@@ -17,6 +17,7 @@
*/
package org.apache.storm.blobstore;
+import org.apache.storm.generated.KeyNotFoundException;
import org.apache.storm.nimbus.NimbusInfo;
import org.apache.curator.framework.CuratorFramework;
import org.slf4j.Logger;
@@ -78,9 +79,15 @@ public class BlobSynchronizer {
LOG.debug("Key set Blobstore-> Zookeeper-> DownloadSet {}-> {}-> {}", getBlobStoreKeySet(), getZookeeperKeySet(), keySetToDownload);
for (String key : keySetToDownload) {
- Set<NimbusInfo> nimbusInfoSet = BlobStoreUtils.getNimbodesWithLatestSequenceNumberOfBlob(zkClient, key);
- if(BlobStoreUtils.downloadMissingBlob(conf, blobStore, key, nimbusInfoSet)) {
- BlobStoreUtils.createStateInZookeeper(conf, key, nimbusInfo);
+ try {
+ Set<NimbusInfo> nimbusInfoSet = BlobStoreUtils.getNimbodesWithLatestSequenceNumberOfBlob(zkClient, key);
+ if (BlobStoreUtils.downloadMissingBlob(conf, blobStore, key, nimbusInfoSet)) {
+ BlobStoreUtils.createStateInZookeeper(conf, key, nimbusInfo);
+ }
+ } catch (KeyNotFoundException e) {
+ LOG.debug("Detected deletion for the key {} - deleting the blob instead", key);
+ // race condition with a delete, delete the blob in key instead
+ blobStore.deleteBlob(key, BlobStoreUtils.getNimbusSubject());
}
}
if (zkClient !=null) {
http://git-wip-us.apache.org/repos/asf/storm/blob/3f1e5cf9/storm-core/src/jvm/org/apache/storm/blobstore/KeySequenceNumber.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/KeySequenceNumber.java b/storm-core/src/jvm/org/apache/storm/blobstore/KeySequenceNumber.java
index 2425761..adbd4c4 100644
--- a/storm-core/src/jvm/org/apache/storm/blobstore/KeySequenceNumber.java
+++ b/storm-core/src/jvm/org/apache/storm/blobstore/KeySequenceNumber.java
@@ -18,10 +18,12 @@
package org.apache.storm.blobstore;
+import org.apache.storm.generated.KeyNotFoundException;
import org.apache.storm.nimbus.NimbusInfo;
import org.apache.storm.utils.Utils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -130,12 +132,12 @@ public class KeySequenceNumber {
this.nimbusInfo = nimbusInfo;
}
- public synchronized int getKeySequenceNumber(Map conf) {
+ public synchronized int getKeySequenceNumber(Map conf) throws KeyNotFoundException {
TreeSet<Integer> sequenceNumbers = new TreeSet<Integer>();
CuratorFramework zkClient = BlobStoreUtils.createZKClient(conf);
try {
// Key has not been created yet and it is the first time it is being created
- if(zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + "/" + key) == null) {
+ if (zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + "/" + key) == null) {
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE + "/" + key);
zkClient.setData().forPath(BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE + "/" + key,
@@ -148,7 +150,7 @@ public class KeySequenceNumber {
// if all go down which is unlikely. Hence there might be a need to update the blob if all go down.
List<String> stateInfoList = zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + "/" + key);
LOG.debug("stateInfoList-size {} stateInfoList-data {}", stateInfoList.size(), stateInfoList);
- if(stateInfoList.isEmpty()) {
+ if (stateInfoList.isEmpty()) {
return getMaxSequenceNumber(zkClient);
}
@@ -156,7 +158,7 @@ public class KeySequenceNumber {
// In all other cases check for the latest update sequence of the blob on the nimbus
// and assign the appropriate number. Check if all are have same sequence number,
// if not assign the highest sequence number.
- for (String stateInfo:stateInfoList) {
+ for (String stateInfo : stateInfoList) {
sequenceNumbers.add(Integer.parseInt(BlobStoreUtils.normalizeNimbusHostPortSequenceNumberInfo(stateInfo)
.getSequenceNumber()));
}
@@ -195,15 +197,22 @@ public class KeySequenceNumber {
return sequenceNumbers.first() + 1;
}
}
+
+ // Normal create update sync scenario returns the greatest sequence number in the set
+ return sequenceNumbers.last();
+ } catch (KeeperException.NoNodeException e) {
+ // there's a race condition with a delete: either blobstore or blobstoremaxsequence
+ // this should be thrown to the caller to indicate that the key is invalid now
+ throw new KeyNotFoundException(key);
} catch(Exception e) {
+ // in other case, just set this to 0 to trigger re-sync later
LOG.error("Exception {}", e);
+ return INITIAL_SEQUENCE_NUMBER - 1;
} finally {
if (zkClient != null) {
zkClient.close();
}
}
- // Normal create update sync scenario returns the greatest sequence number in the set
- return sequenceNumbers.last();
}
private boolean checkIfStateContainsCurrentNimbusHost(List<String> stateInfoList, NimbusInfo nimbusInfo) {
http://git-wip-us.apache.org/repos/asf/storm/blob/3f1e5cf9/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java b/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java
index 345d591..648f5cd 100644
--- a/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java
+++ b/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java
@@ -281,7 +281,7 @@ public class LocalFsBlobStore extends BlobStore {
}
//This additional check and download is for nimbus high availability in case you have more than one nimbus
- public synchronized boolean checkForBlobOrDownload(String key) {
+ public synchronized boolean checkForBlobOrDownload(String key) throws KeyNotFoundException {
boolean checkBlobDownload = false;
try {
List<String> keyList = BlobStoreUtils.getKeyListFromBlobStore(this);
@@ -295,6 +295,8 @@ public class LocalFsBlobStore extends BlobStore {
}
}
}
+ } catch (KeyNotFoundException e) {
+ throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
[3/4] storm git commit: Added STORM-2321 to CHANGELOG.
Posted by sr...@apache.org.
Added STORM-2321 to CHANGELOG.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/52ac6b68
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/52ac6b68
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/52ac6b68
Branch: refs/heads/1.x-branch
Commit: 52ac6b6822492a76479d58b3d02f0a75a0d4e617
Parents: 90ce07a
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Thu Feb 2 09:17:40 2017 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Thu Feb 2 09:17:40 2017 -0800
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/52ac6b68/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 8d4f627..6bcdcf5 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 1.1.0
+ * STORM-2321: Handle blobstore zk key deletion in KeySequenceNumber
* STORM-2336: Close Localizer and AsyncLocalizer when supervisor is shutting down
* STORM-2335: Fix broken Topology visualization with empty ':transferred' in executor stats
* STORM-2331: Emitting from JavaScript should work when not anchoring.
[4/4] storm git commit: Merge branch 'STORM-2321' into 1.x-branch
Posted by sr...@apache.org.
Merge branch 'STORM-2321' into 1.x-branch
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/85ac6b82
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/85ac6b82
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/85ac6b82
Branch: refs/heads/1.x-branch
Commit: 85ac6b82bae03f7cef26666243ff1911c9e182cf
Parents: 52ac6b6 c276396
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Thu Feb 2 09:18:08 2017 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Thu Feb 2 09:18:08 2017 -0800
----------------------------------------------------------------------
.../src/clj/org/apache/storm/daemon/nimbus.clj | 6 +++++-
.../apache/storm/blobstore/BlobStoreUtils.java | 6 +++++-
.../storm/blobstore/BlobSynchronizer.java | 13 +++++++++---
.../storm/blobstore/KeySequenceNumber.java | 21 ++++++++++++++------
.../storm/blobstore/LocalFsBlobStore.java | 4 +++-
5 files changed, 38 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
[2/4] storm git commit: Merge branch 'STORM-2321-1.x' of
https://github.com/HeartSaVioR/storm into STORM-2321
Posted by sr...@apache.org.
Merge branch 'STORM-2321-1.x' of https://github.com/HeartSaVioR/storm into STORM-2321
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c2763960
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c2763960
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c2763960
Branch: refs/heads/1.x-branch
Commit: c276396046fed73070692970b6abd3c1653eea44
Parents: 90ce07a 3f1e5cf
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Thu Feb 2 09:09:02 2017 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Thu Feb 2 09:09:02 2017 -0800
----------------------------------------------------------------------
.../src/clj/org/apache/storm/daemon/nimbus.clj | 6 +++++-
.../apache/storm/blobstore/BlobStoreUtils.java | 6 +++++-
.../storm/blobstore/BlobSynchronizer.java | 13 +++++++++---
.../storm/blobstore/KeySequenceNumber.java | 21 ++++++++++++++------
.../storm/blobstore/LocalFsBlobStore.java | 4 +++-
5 files changed, 38 insertions(+), 12 deletions(-)
----------------------------------------------------------------------