You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2017/04/13 19:32:19 UTC

[4/7] storm git commit: STORM-1114: Handle race condition in Storm/Trident transactional state when ZK nodes have already been created/deleted

STORM-1114: Handle race condition in Storm/Trident transactional state when ZK nodes have already been created/deleted


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9cf791b8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9cf791b8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9cf791b8

Branch: refs/heads/1.x-branch
Commit: 9cf791b876918bc9eb99757e180d7dfeca55ed87
Parents: d185367
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Thu Apr 13 15:08:16 2017 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Thu Apr 13 15:08:16 2017 -0400

----------------------------------------------------------------------
 .../apache/storm/transactional/state/TransactionalState.java  | 7 +++++++
 .../storm/trident/topology/state/TransactionalState.java      | 7 +++++--
 2 files changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/9cf791b8/storm-core/src/jvm/org/apache/storm/transactional/state/TransactionalState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/transactional/state/TransactionalState.java b/storm-core/src/jvm/org/apache/storm/transactional/state/TransactionalState.java
index f05512e..7662daa 100644
--- a/storm-core/src/jvm/org/apache/storm/transactional/state/TransactionalState.java
+++ b/storm-core/src/jvm/org/apache/storm/transactional/state/TransactionalState.java
@@ -34,8 +34,11 @@ import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Id;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class TransactionalState {
+    private static final Logger LOG = LoggerFactory.getLogger(TransactionalState.class);
     CuratorFramework _curator;
     KryoValuesSerializer _ser;
     KryoValuesDeserializer _des;
@@ -117,6 +120,8 @@ public class TransactionalState {
                 TransactionalState.createNode(_curator, path, ser, _zkAcls,
                         CreateMode.PERSISTENT);
             }
+        } catch (KeeperException.NodeExistsException nne){
+            LOG.warn("Node {} already created.", path);
         } catch(Exception e) {
             throw new RuntimeException(e);
         }        
@@ -126,6 +131,8 @@ public class TransactionalState {
         path = "/" + path;
         try {
             _curator.delete().forPath(path);
+        } catch (KeeperException.NoNodeException nne){
+           LOG.warn("Path {} already deleted.");
         } catch (Exception e) {
             throw new RuntimeException(e);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/9cf791b8/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java b/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
index 26ac404..71068dc 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
@@ -125,16 +125,19 @@ public class TransactionalState {
                 TransactionalState.createNode(_curator, path, ser, _zkAcls,
                         CreateMode.PERSISTENT);
             }
-            LOG.debug("Set [path = {}] => [data = {}]", path, asString(ser));
+        } catch (KeeperException.NodeExistsException nne){
+            LOG.warn("Node {} already created.", path);
         } catch(Exception e) {
             throw new RuntimeException(e);
-        }        
+        }
     }
     
     public void delete(String path) {
         path = "/" + path;
         try {
             _curator.delete().forPath(path);
+        } catch (KeeperException.NoNodeException nne){
+           LOG.warn("Path {} already deleted.");
         } catch (Exception e) {
             throw new RuntimeException(e);
         }