You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2017/04/13 19:32:21 UTC
[6/7] storm git commit: STORM-1114: Handle race condition in
Storm/Trident transactional state when ZK nodes have already been
created/deleted
STORM-1114: Handle race condition in Storm/Trident transactional state when ZK nodes have already been created/deleted
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/adb14ccb
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/adb14ccb
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/adb14ccb
Branch: refs/heads/1.0.x-branch
Commit: adb14ccbeb6f446eff9b4d71323ab8451b4b45d5
Parents: 047166d
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Thu Apr 13 15:31:08 2017 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Thu Apr 13 15:31:08 2017 -0400
----------------------------------------------------------------------
.../transactional/state/TransactionalState.java | 22 ++++++++++++++++----
.../topology/state/TransactionalState.java | 7 +++++++
2 files changed, 25 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/adb14ccb/storm-core/src/jvm/org/apache/storm/transactional/state/TransactionalState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/transactional/state/TransactionalState.java b/storm-core/src/jvm/org/apache/storm/transactional/state/TransactionalState.java
index f05512e..bec5f66 100644
--- a/storm-core/src/jvm/org/apache/storm/transactional/state/TransactionalState.java
+++ b/storm-core/src/jvm/org/apache/storm/transactional/state/TransactionalState.java
@@ -34,8 +34,11 @@ import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class TransactionalState {
+ public static final Logger LOG = LoggerFactory.getLogger(TransactionalState.class);
CuratorFramework _curator;
KryoValuesSerializer _ser;
KryoValuesDeserializer _des;
@@ -83,11 +86,17 @@ public class TransactionalState {
}
}
- protected static String forPath(PathAndBytesable<String> builder,
+ protected static void forPath(PathAndBytesable<String> builder,
String path, byte[] data) throws Exception {
- return (data == null)
- ? builder.forPath(path)
- : builder.forPath(path, data);
+ try {
+ if (data == null) {
+ builder.forPath(path);
+ } else {
+ builder.forPath(path, data);
+ }
+ } catch (KeeperException.NodeExistsException e){
+ LOG.info("Path {} already exists.", path);
+ }
}
protected static void createNode(CuratorFramework curator, String path,
@@ -117,6 +126,8 @@ public class TransactionalState {
TransactionalState.createNode(_curator, path, ser, _zkAcls,
CreateMode.PERSISTENT);
}
+ } catch (KeeperException.NodeExistsException nee) {
+ LOG.warn("Path {} already exists.", path);
} catch(Exception e) {
throw new RuntimeException(e);
}
@@ -126,6 +137,9 @@ public class TransactionalState {
path = "/" + path;
try {
_curator.delete().forPath(path);
+ } catch (KeeperException.NoNodeException nne){
+ // node was already deleted
+ LOG.info("Path {} has already been deleted.", path);
} catch (Exception e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/adb14ccb/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java b/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
index 906906d..5061590 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
@@ -37,8 +37,11 @@ import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class TransactionalState {
+ private static final Logger LOG = LoggerFactory.getLogger(TransactionalState.class);
CuratorFramework _curator;
List<ACL> _zkAcls = null;
@@ -115,6 +118,8 @@ public class TransactionalState {
TransactionalState.createNode(_curator, path, ser, _zkAcls,
CreateMode.PERSISTENT);
}
+ } catch (KeeperException.NodeExistsException nne){
+ LOG.warn("Node {} already created.", path);
} catch(Exception e) {
throw new RuntimeException(e);
}
@@ -124,6 +129,8 @@ public class TransactionalState {
path = "/" + path;
try {
_curator.delete().forPath(path);
+ } catch (KeeperException.NoNodeException nne){
+ LOG.warn("Path {} already deleted.");
} catch (Exception e) {
throw new RuntimeException(e);
}