You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2014/03/22 14:35:18 UTC
[03/10] git commit: wip
wip
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/dd1fe969
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/dd1fe969
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/dd1fe969
Branch: refs/heads/rest
Commit: dd1fe969f725fa84317789e08e3228131105c0f6
Parents: d30a266
Author: randgalt <ra...@apache.org>
Authored: Wed Jan 8 16:40:25 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Wed Jan 8 16:40:25 2014 -0500
----------------------------------------------------------------------
.../recipes/cache/PathChildrenCache.java | 11 ++
.../curator/x/rest/ConnectionResource.java | 2 +-
.../curator/x/rest/PathCacheRecipeResource.java | 118 ++++++++++---
.../x/rest/entity/PathChildrenCacheEntity.java | 84 ++++++++++
.../entity/PathChildrenCacheEventEntity.java | 84 ++++++++++
.../curator/x/rest/entity/StatEntity.java | 167 +++++++++++++++++++
.../x/rest/system/PathChildrenCacheThing.java | 59 +++++++
.../apache/curator/x/rest/system/ThingType.java | 11 +-
8 files changed, 503 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/dd1fe969/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
index d66f7f3..a1844db 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
@@ -170,6 +170,17 @@ public class PathChildrenCache implements Closeable
}
/**
+ * @param client the client
+ * @param path path to watch
+ * @param dataIsCompressed if true, data in the path is compressed
+ * @param cacheData if true, node contents are cached in addition to the stat
+ */
+ public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed)
+ {
+ this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory), true));
+ }
+
+ /**
* @param client the client
* @param path path to watch
* @param cacheData if true, node contents are cached in addition to the stat
http://git-wip-us.apache.org/repos/asf/curator/blob/dd1fe969/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java
index 6d3b559..4f70111 100644
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java
@@ -48,7 +48,7 @@ public class ConnectionResource
connectionsManager = contextResolver.getContext(ConnectionsManager.class);
}
- @POST
+ @GET
@Path("{id}/block-on-state-change")
public void registerConnectionStateChange(@Suspended final AsyncResponse asyncResponse, @PathParam("id") String id, @QueryParam("state-count") String currentStateCountArg)
{
http://git-wip-us.apache.org/repos/asf/curator/blob/dd1fe969/curator-x-rest/src/main/java/org/apache/curator/x/rest/PathCacheRecipeResource.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/PathCacheRecipeResource.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/PathCacheRecipeResource.java
index 1fc7b3d..d953bd6 100644
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/PathCacheRecipeResource.java
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/PathCacheRecipeResource.java
@@ -19,14 +19,22 @@
package org.apache.curator.x.rest;
-import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
-import org.apache.curator.x.rest.entity.LockRequestEntity;
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.x.rest.entity.PathChildrenCacheEntity;
+import org.apache.curator.x.rest.entity.PathChildrenCacheEventEntity;
+import org.apache.curator.x.rest.entity.StatEntity;
import org.apache.curator.x.rest.system.Connection;
import org.apache.curator.x.rest.system.ConnectionsManager;
+import org.apache.curator.x.rest.system.PathChildrenCacheThing;
import org.apache.curator.x.rest.system.ThingKey;
import org.apache.curator.x.rest.system.ThingType;
-import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
@@ -34,10 +42,13 @@ import javax.ws.rs.Produces;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Context;
+import javax.ws.rs.core.GenericEntity;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.ext.ContextResolver;
-import java.util.concurrent.TimeUnit;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Future;
@Path("zookeeper/recipes/path-cache/{connectionId}")
public class PathCacheRecipeResource
@@ -50,9 +61,8 @@ public class PathCacheRecipeResource
}
@POST
- @Path("{path:.*}")
@Produces(MediaType.APPLICATION_JSON)
- public Response reEntrantLockAllocate(@PathParam("connectionId") String connectionId, @PathParam("path") String path) throws Exception
+ public Response allocate(@PathParam("connectionId") String connectionId, PathChildrenCacheEntity spec) throws Exception
{
Connection connection = connectionsManager.get(connectionId);
if ( connection == null )
@@ -60,16 +70,22 @@ public class PathCacheRecipeResource
return Response.status(Response.Status.NOT_FOUND).build();
}
- InterProcessSemaphoreMutex mutex = new InterProcessSemaphoreMutex(connection.getClient(), path);
- ThingKey<InterProcessSemaphoreMutex> key = new ThingKey<InterProcessSemaphoreMutex>(ThingType.MUTEX);
- connection.putThing(key, mutex);
+ PathChildrenCache cache = new PathChildrenCache(connection.getClient(), spec.getPath(), spec.isCacheData(), spec.isDataIsCompressed());
+ PathChildrenCacheThing cacheThing = new PathChildrenCacheThing(cache);
+ cache.getListenable().addListener(new LocalListener(cacheThing));
+
+ ThingKey<PathChildrenCacheThing> key = new ThingKey<PathChildrenCacheThing>(ThingType.PATH_CACHE);
+ connection.putThing(key, cacheThing);
+
+ PathChildrenCache.StartMode startMode = spec.isBuildInitial() ? PathChildrenCache.StartMode.POST_INITIALIZED_EVENT : PathChildrenCache.StartMode.NORMAL;
+ cache.start(startMode);
return Response.ok(key.getId()).build();
}
@DELETE
@Path("{id}")
- public Response reEntrantLockDelete(@PathParam("connectionId") String connectionId, @PathParam("id") String lockId) throws Exception
+ public Response delete(@PathParam("connectionId") String connectionId, @PathParam("id") String cacheId) throws IOException
{
Connection connection = connectionsManager.get(connectionId);
if ( connection == null )
@@ -77,23 +93,20 @@ public class PathCacheRecipeResource
return Response.status(Response.Status.NOT_FOUND).build();
}
- InterProcessSemaphoreMutex mutex = connection.removeThing(new ThingKey<InterProcessSemaphoreMutex>(lockId, ThingType.MUTEX));
- if ( mutex == null )
+ PathChildrenCacheThing cacheThing = connection.removeThing(new ThingKey<PathChildrenCacheThing>(cacheId, ThingType.PATH_CACHE));
+ if ( cacheThing == null )
{
return Response.status(Response.Status.NOT_FOUND).build();
}
- if ( mutex.isAcquiredInThisProcess() )
- {
- mutex.release();
- }
+ cacheThing.getCache().close();
return Response.ok().build();
}
- @POST
- @Consumes(MediaType.APPLICATION_JSON)
- public void reEntrantLockAcquire(@Suspended final AsyncResponse asyncResponse, @PathParam("connectionId") String connectionId, final LockRequestEntity lockRequest) throws Exception
+ @GET
+ @Path("{id}/block-on-events")
+ public void getEvents(@Suspended final AsyncResponse asyncResponse, @PathParam("connectionId") String connectionId, @PathParam("id") String cacheId)
{
Connection connection = connectionsManager.get(connectionId);
if ( connection == null )
@@ -102,14 +115,14 @@ public class PathCacheRecipeResource
return;
}
- final InterProcessSemaphoreMutex mutex = connection.getThing(new ThingKey<InterProcessSemaphoreMutex>(lockRequest.getLockId(), ThingType.MUTEX));
- if ( mutex == null )
+ final PathChildrenCacheThing cacheThing = connection.removeThing(new ThingKey<PathChildrenCacheThing>(cacheId, ThingType.PATH_CACHE));
+ if ( cacheThing == null )
{
asyncResponse.resume(Response.status(Response.Status.NOT_FOUND).build());
return;
}
- connectionsManager.getExecutorService().submit
+ Future<?> future = connectionsManager.getExecutorService().submit
(
new Runnable()
{
@@ -118,15 +131,68 @@ public class PathCacheRecipeResource
{
try
{
- boolean success = mutex.acquire(lockRequest.getMaxWaitMs(), TimeUnit.MILLISECONDS);
- asyncResponse.resume(Response.status(success ? Response.Status.OK : Response.Status.REQUEST_TIMEOUT).build());
+ List<PathChildrenCacheEvent> events = cacheThing.blockForPendingEvents();
+ List<PathChildrenCacheEventEntity> transformed = Lists.transform(events, toEntity);
+ GenericEntity<List<PathChildrenCacheEventEntity>> entity = new GenericEntity<List<PathChildrenCacheEventEntity>>(transformed){};
+ asyncResponse.resume(Response.ok(entity).build());
}
- catch ( Exception e )
+ catch ( InterruptedException e )
{
- asyncResponse.resume(e);
+ Thread.currentThread().interrupt();
+ asyncResponse.resume(Response.status(Response.Status.SERVICE_UNAVAILABLE).build());
}
}
}
);
+ connection.putThing(new ThingKey<Future>(ThingType.FUTURE), future);
+ }
+
+ private static final Function<PathChildrenCacheEvent, PathChildrenCacheEventEntity> toEntity = new Function<PathChildrenCacheEvent, PathChildrenCacheEventEntity>()
+ {
+ @Override
+ public PathChildrenCacheEventEntity apply(PathChildrenCacheEvent event)
+ {
+ String path = (event.getData() != null) ? event.getData().getPath() : null;
+ String data = ((event.getData() != null) && (event.getData().getData() != null)) ? new String((event.getData().getData())) : null;
+ StatEntity stat = ((event.getData() != null) && (event.getData().getStat() != null))
+ ? new StatEntity
+ (
+ event.getData().getStat().getCzxid(),
+ event.getData().getStat().getMzxid(),
+ event.getData().getStat().getCtime(),
+ event.getData().getStat().getMtime(),
+ event.getData().getStat().getVersion(),
+ event.getData().getStat().getCversion(),
+ event.getData().getStat().getAversion(),
+ event.getData().getStat().getEphemeralOwner(),
+ event.getData().getStat().getDataLength(),
+ event.getData().getStat().getNumChildren(),
+ event.getData().getStat().getPzxid()
+ )
+ : null;
+ return new PathChildrenCacheEventEntity
+ (
+ event.getType().name(),
+ path,
+ data,
+ stat
+ );
+ }
+ };
+
+ private static class LocalListener implements PathChildrenCacheListener
+ {
+ private final PathChildrenCacheThing cacheThing;
+
+ public LocalListener(PathChildrenCacheThing cacheThing)
+ {
+ this.cacheThing = cacheThing;
+ }
+
+ @Override
+ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
+ {
+ cacheThing.addEvent(event);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/dd1fe969/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/PathChildrenCacheEntity.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/PathChildrenCacheEntity.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/PathChildrenCacheEntity.java
new file mode 100644
index 0000000..c2c3328
--- /dev/null
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/PathChildrenCacheEntity.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest.entity;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement
+public class PathChildrenCacheEntity
+{
+ private String path;
+ private boolean dataIsCompressed;
+ private boolean cacheData;
+ private boolean buildInitial;
+
+ public PathChildrenCacheEntity()
+ {
+ this("", false, false, false);
+ }
+
+ public PathChildrenCacheEntity(String path, boolean dataIsCompressed, boolean cacheData, boolean buildInitial)
+ {
+ this.path = path;
+ this.dataIsCompressed = dataIsCompressed;
+ this.cacheData = cacheData;
+ this.buildInitial = buildInitial;
+ }
+
+ public String getPath()
+ {
+ return path;
+ }
+
+ public void setPath(String path)
+ {
+ this.path = path;
+ }
+
+ public boolean isDataIsCompressed()
+ {
+ return dataIsCompressed;
+ }
+
+ public void setDataIsCompressed(boolean dataIsCompressed)
+ {
+ this.dataIsCompressed = dataIsCompressed;
+ }
+
+ public boolean isCacheData()
+ {
+ return cacheData;
+ }
+
+ public void setCacheData(boolean cacheData)
+ {
+ this.cacheData = cacheData;
+ }
+
+ public boolean isBuildInitial()
+ {
+ return buildInitial;
+ }
+
+ public void setBuildInitial(boolean buildInitial)
+ {
+ this.buildInitial = buildInitial;
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/dd1fe969/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/PathChildrenCacheEventEntity.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/PathChildrenCacheEventEntity.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/PathChildrenCacheEventEntity.java
new file mode 100644
index 0000000..9b08d32
--- /dev/null
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/PathChildrenCacheEventEntity.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest.entity;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement
+public class PathChildrenCacheEventEntity
+{
+ private String eventType;
+ private String path;
+ private String data;
+ private StatEntity stat;
+
+ public PathChildrenCacheEventEntity()
+ {
+ this("", "", "", new StatEntity());
+ }
+
+ public PathChildrenCacheEventEntity(String eventType, String path, String data, StatEntity stat)
+ {
+ this.eventType = eventType;
+ this.path = path;
+ this.data = data;
+ this.stat = stat;
+ }
+
+ public String getEventType()
+ {
+ return eventType;
+ }
+
+ public void setEventType(String eventType)
+ {
+ this.eventType = eventType;
+ }
+
+ public String getPath()
+ {
+ return path;
+ }
+
+ public void setPath(String path)
+ {
+ this.path = path;
+ }
+
+ public String getData()
+ {
+ return data;
+ }
+
+ public void setData(String data)
+ {
+ this.data = data;
+ }
+
+ public StatEntity getStat()
+ {
+ return stat;
+ }
+
+ public void setStat(StatEntity stat)
+ {
+ this.stat = stat;
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/dd1fe969/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/StatEntity.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/StatEntity.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/StatEntity.java
new file mode 100644
index 0000000..ef323ec
--- /dev/null
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/StatEntity.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest.entity;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement
+public class StatEntity
+{
+ private long czxid;
+ private long mzxid;
+ private long ctime;
+ private long mtime;
+ private int version;
+ private int cversion;
+ private int aversion;
+ private long ephemeralOwner;
+ private int dataLength;
+ private int numChildren;
+ private long pzxid;
+
+ public StatEntity()
+ {
+ }
+
+ public StatEntity(long czxid, long mzxid, long ctime, long mtime, int version, int cversion, int aversion, long ephemeralOwner, int dataLength, int numChildren, long pzxid)
+ {
+ this.czxid = czxid;
+ this.mzxid = mzxid;
+ this.ctime = ctime;
+ this.mtime = mtime;
+ this.version = version;
+ this.cversion = cversion;
+ this.aversion = aversion;
+ this.ephemeralOwner = ephemeralOwner;
+ this.dataLength = dataLength;
+ this.numChildren = numChildren;
+ this.pzxid = pzxid;
+ }
+
+ public long getCzxid()
+ {
+ return czxid;
+ }
+
+ public void setCzxid(long czxid)
+ {
+ this.czxid = czxid;
+ }
+
+ public long getMzxid()
+ {
+ return mzxid;
+ }
+
+ public void setMzxid(long mzxid)
+ {
+ this.mzxid = mzxid;
+ }
+
+ public long getCtime()
+ {
+ return ctime;
+ }
+
+ public void setCtime(long ctime)
+ {
+ this.ctime = ctime;
+ }
+
+ public long getMtime()
+ {
+ return mtime;
+ }
+
+ public void setMtime(long mtime)
+ {
+ this.mtime = mtime;
+ }
+
+ public int getVersion()
+ {
+ return version;
+ }
+
+ public void setVersion(int version)
+ {
+ this.version = version;
+ }
+
+ public int getCversion()
+ {
+ return cversion;
+ }
+
+ public void setCversion(int cversion)
+ {
+ this.cversion = cversion;
+ }
+
+ public int getAversion()
+ {
+ return aversion;
+ }
+
+ public void setAversion(int aversion)
+ {
+ this.aversion = aversion;
+ }
+
+ public long getEphemeralOwner()
+ {
+ return ephemeralOwner;
+ }
+
+ public void setEphemeralOwner(long ephemeralOwner)
+ {
+ this.ephemeralOwner = ephemeralOwner;
+ }
+
+ public int getDataLength()
+ {
+ return dataLength;
+ }
+
+ public void setDataLength(int dataLength)
+ {
+ this.dataLength = dataLength;
+ }
+
+ public int getNumChildren()
+ {
+ return numChildren;
+ }
+
+ public void setNumChildren(int numChildren)
+ {
+ this.numChildren = numChildren;
+ }
+
+ public long getPzxid()
+ {
+ return pzxid;
+ }
+
+ public void setPzxid(long pzxid)
+ {
+ this.pzxid = pzxid;
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/dd1fe969/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/PathChildrenCacheThing.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/PathChildrenCacheThing.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/PathChildrenCacheThing.java
new file mode 100644
index 0000000..1c507aa
--- /dev/null
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/PathChildrenCacheThing.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest.system;
+
+import com.google.common.collect.Lists;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import java.util.List;
+
+public class PathChildrenCacheThing
+{
+ private final PathChildrenCache cache;
+ private final List<PathChildrenCacheEvent> events = Lists.newLinkedList();
+
+ public PathChildrenCacheThing(PathChildrenCache cache)
+ {
+ this.cache = cache;
+ }
+
+ public PathChildrenCache getCache()
+ {
+ return cache;
+ }
+
+ public synchronized List<PathChildrenCacheEvent> blockForPendingEvents() throws InterruptedException
+ {
+ while ( events.size() == 0 )
+ {
+ wait();
+ }
+
+ List<PathChildrenCacheEvent> localEvents = Lists.newArrayList(events);
+ events.clear();
+ return localEvents;
+ }
+
+ public synchronized void addEvent(PathChildrenCacheEvent event)
+ {
+ events.add(event);
+ notifyAll();
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/dd1fe969/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingType.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingType.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingType.java
index 13448dd..81251fc 100644
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingType.java
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingType.java
@@ -20,7 +20,6 @@
package org.apache.curator.x.rest.system;
import com.google.common.io.Closeables;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import java.util.concurrent.Future;
@@ -41,18 +40,18 @@ public interface ThingType<T>
}
};
- public static ThingType<PathChildrenCache> PATH_CACHE = new ThingType<PathChildrenCache>()
+ public static ThingType<PathChildrenCacheThing> PATH_CACHE = new ThingType<PathChildrenCacheThing>()
{
@Override
- public Class<PathChildrenCache> getThingClass()
+ public Class<PathChildrenCacheThing> getThingClass()
{
- return PathChildrenCache.class;
+ return PathChildrenCacheThing.class;
}
@Override
- public void closeFor(PathChildrenCache cache)
+ public void closeFor(PathChildrenCacheThing cache)
{
- Closeables.closeQuietly(cache);
+ Closeables.closeQuietly(cache.getCache());
}
};