You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by am...@apache.org on 2013/06/13 20:11:43 UTC

svn commit: r1492792 - /cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListener.java

Author: amichai
Date: Thu Jun 13 18:11:42 2013
New Revision: 1492792

URL: http://svn.apache.org/r1492792
Log:
DOSGI-190 Fix NodeExistsException and missing endpoint after ZooKeeper is restarted

Modified:
    cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListener.java

Modified: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListener.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListener.java?rev=1492792&r1=1492791&r2=1492792&view=diff
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListener.java (original)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListener.java Thu Jun 13 18:11:42 2013
@@ -41,6 +41,9 @@ import org.osgi.util.tracker.ServiceTrac
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.zookeeper.KeeperException.NoNodeException;
+import static org.apache.zookeeper.KeeperException.NodeExistsException;
+
 /**
  * Listens for local Endpoints and publishes them to Zookeeper
  */
@@ -101,7 +104,25 @@ public class PublishingEndpointListener 
             String fullPath = path + '/' + endpointKey;
             LOG.debug("Creating ZooKeeper node: {}", fullPath);
             ensurePath(path, zookeeper);
-            zookeeper.create(fullPath, getData(props), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+            createEphemeralNode(fullPath, getData(props));
+        }
+    }
+
+    private void createEphemeralNode(String fullPath, byte[] data) throws KeeperException, InterruptedException {
+        try {
+            zookeeper.create(fullPath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+        } catch (NodeExistsException nee) {
+            // this sometimes happens after a ZooKeeper node dies and the ephemeral node
+            // that belonged to the old session was not yet deleted. We need to make our
+            // session the owner of the node so it won't get deleted automatically -
+            // we do this by deleting and recreating it ourselves.
+            LOG.info("node for endpoint already exists, recreating: {}", fullPath);
+            try {
+                zookeeper.delete(fullPath, -1);
+            } catch (NoNodeException nne) {
+                // it's a race condition, but as long as it got deleted - it's ok
+            }
+            zookeeper.create(fullPath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
         }
     }
 
@@ -147,8 +168,10 @@ public class PublishingEndpointListener 
         for (String part : parts) {
             current.append('/');
             current.append(part);
-            if (zk.exists(current.toString(), false) == null) {
+            try {
                 zk.create(current.toString(), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            } catch (NodeExistsException nee) {
+                // it's not the first node with this path to ever exist - that's normal
             }
         }
     }