You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2014/05/27 15:57:34 UTC
[12/14] git commit: wip
wip
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/c8d49ae8
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/c8d49ae8
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/c8d49ae8
Branch: refs/heads/curator-rpc
Commit: c8d49ae877c0ea95302c1339563293f31c1fd5db
Parents: 7c99ddb
Author: randgalt <ra...@apache.org>
Authored: Mon May 26 18:02:05 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon May 26 18:02:05 2014 -0500
----------------------------------------------------------------------
curator-x-rpc/pom.xml | 6 +
.../curator/x/rpc/CuratorProjectionServer.java | 5 +-
.../org/apache/curator/x/rpc/RpcManager.java | 9 +
.../curator/x/rpc/idl/event/EventService.java | 36 +++-
.../x/rpc/idl/event/RpcCuratorEvent.java | 60 +++++-
.../x/rpc/idl/event/RpcCuratorEventType.java | 8 +-
.../x/rpc/idl/projection/CreateSpec.java | 4 +-
.../projection/CuratorProjectionService.java | 29 ++-
curator-x-rpc/src/main/thrift/curator.thrift | 6 +-
.../apache/curator/generated/CreateSpec.java | 37 ++--
.../curator/generated/CuratorEventType.java | 60 +++---
.../apache/curator/generated/EventService.java | 188 +++++++++++++++++--
.../org/apache/curator/x/rpc/TestClient.java | 29 ++-
13 files changed, 410 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/c8d49ae8/curator-x-rpc/pom.xml
----------------------------------------------------------------------
diff --git a/curator-x-rpc/pom.xml b/curator-x-rpc/pom.xml
index 42de982..cf2442c 100644
--- a/curator-x-rpc/pom.xml
+++ b/curator-x-rpc/pom.xml
@@ -20,5 +20,11 @@
<groupId>com.facebook.swift</groupId>
<artifactId>swift-service</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-test</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/curator/blob/c8d49ae8/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
index 6fcc6d7..f5d5649 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
@@ -31,8 +31,9 @@ public class CuratorProjectionServer
{
public static void main(String[] args)
{
- EventService eventService = new EventService();
- CuratorProjectionService projectionService = new CuratorProjectionService(eventService);
+ RpcManager rpcManager = new RpcManager();
+ EventService eventService = new EventService(rpcManager, 5000); // TODO
+ CuratorProjectionService projectionService = new CuratorProjectionService(rpcManager, eventService);
ThriftServiceProcessor processor = new ThriftServiceProcessor(new ThriftCodecManager(), Lists.<ThriftEventHandler>newArrayList(), projectionService, eventService);
ThriftServer server = new ThriftServer(processor, new ThriftServerConfig().setPort(8899)); // TODO
server.start();
http://git-wip-us.apache.org/repos/asf/curator/blob/c8d49ae8/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/RpcManager.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/RpcManager.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/RpcManager.java
index 783297d..bdc5138 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/RpcManager.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/RpcManager.java
@@ -2,6 +2,7 @@ package org.apache.curator.x.rpc;
import com.google.common.collect.Maps;
import org.apache.curator.framework.CuratorFramework;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
@@ -56,4 +57,12 @@ public class RpcManager
CuratorEntry entry = projections.remove(id);
return (entry != null) ? entry.client : null;
}
+
+ public void touch(List<String> ids)
+ {
+ for ( String id : ids )
+ {
+ getClient(id);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/curator/blob/c8d49ae8/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/EventService.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/EventService.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/EventService.java
index 206a347..3b7cde9 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/EventService.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/EventService.java
@@ -2,13 +2,28 @@ package org.apache.curator.x.rpc.idl.event;
import com.facebook.swift.service.ThriftMethod;
import com.facebook.swift.service.ThriftService;
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
+import org.apache.curator.x.rpc.RpcManager;
+import org.apache.curator.x.rpc.idl.projection.CuratorProjection;
+import javax.annotation.Nullable;
+import java.util.List;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
@ThriftService("EventService")
public class EventService
{
private final BlockingQueue<RpcCuratorEvent> events = Queues.newLinkedBlockingQueue();
+ private final RpcManager rpcManager;
+ private final int pingTimeMs;
+
+ public EventService(RpcManager rpcManager, int pingTimeMs)
+ {
+ this.rpcManager = rpcManager;
+ this.pingTimeMs = pingTimeMs;
+ }
public void addEvent(RpcCuratorEvent event)
{
@@ -16,8 +31,25 @@ public class EventService
}
@ThriftMethod
- public RpcCuratorEvent getNextEvent() throws InterruptedException
+ public RpcCuratorEvent getNextEvent(List<CuratorProjection> projections) throws InterruptedException
{
- return events.take();
+ if ( projections != null )
+ {
+ List<String> ids = Lists.transform
+ (
+ projections,
+ new Function<CuratorProjection, String>()
+ {
+ @Override
+ public String apply(CuratorProjection projection)
+ {
+ return projection.id;
+ }
+ }
+ );
+ rpcManager.touch(ids);
+ }
+ RpcCuratorEvent event = events.poll(pingTimeMs, TimeUnit.MILLISECONDS);
+ return (event != null) ? event : new RpcCuratorEvent();
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/c8d49ae8/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcCuratorEvent.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcCuratorEvent.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcCuratorEvent.java
index 6896e89..38a5ec3 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcCuratorEvent.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcCuratorEvent.java
@@ -24,6 +24,7 @@ import com.google.common.base.Function;
import com.google.common.collect.Lists;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.x.rpc.idl.projection.CuratorProjection;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
@@ -71,7 +72,17 @@ public class RpcCuratorEvent
public RpcCuratorEvent()
{
- throw new UnsupportedOperationException();
+ this.projection = null;
+ this.type = RpcCuratorEventType.PING;
+ this.resultCode = 0;
+ this.path = null;
+ this.context = null;
+ this.stat = null;
+ this.data = null;
+ this.name = null;
+ this.children = null;
+ this.aclList = null;
+ this.watchedEvent = null;
}
public RpcCuratorEvent(CuratorProjection projection, CuratorEvent event)
@@ -89,6 +100,53 @@ public class RpcCuratorEvent
this.watchedEvent = toRpcWatchedEvent(event.getWatchedEvent());
}
+ public RpcCuratorEvent(CuratorProjection projection, ConnectionState newState)
+ {
+ this.projection = projection;
+ this.type = toRpcCuratorEventType(newState);
+ this.resultCode = 0;
+ this.path = null;
+ this.context = null;
+ this.stat = null;
+ this.data = null;
+ this.name = null;
+ this.children = null;
+ this.aclList = null;
+ this.watchedEvent = null;
+ }
+
+ private RpcCuratorEventType toRpcCuratorEventType(ConnectionState state)
+ {
+ switch ( state )
+ {
+ case CONNECTED:
+ {
+ return RpcCuratorEventType.CONNECTION_CONNECTED;
+ }
+
+ case SUSPENDED:
+ {
+ return RpcCuratorEventType.CONNECTION_SUSPENDED;
+ }
+
+ case RECONNECTED:
+ {
+ return RpcCuratorEventType.CONNECTION_RECONNECTED;
+ }
+
+ case LOST:
+ {
+ return RpcCuratorEventType.CONNECTION_LOST;
+ }
+
+ case READ_ONLY:
+ {
+ return RpcCuratorEventType.CONNECTION_READ_ONLY;
+ }
+ }
+ throw new IllegalStateException("Unknown state: " + state);
+ }
+
private RpcCuratorEventType toRpcCuratorEventType(CuratorEventType eventType)
{
switch ( eventType )
http://git-wip-us.apache.org/repos/asf/curator/blob/c8d49ae8/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcCuratorEventType.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcCuratorEventType.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcCuratorEventType.java
index f08aa4a..f8d6468 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcCuratorEventType.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcCuratorEventType.java
@@ -23,6 +23,7 @@ import com.facebook.swift.codec.ThriftEnum;
@ThriftEnum("CuratorEventType")
public enum RpcCuratorEventType
{
+ PING,
CREATE,
DELETE,
EXISTS,
@@ -33,5 +34,10 @@ public enum RpcCuratorEventType
GET_ACL,
SET_ACL,
WATCHED,
- CLOSING
+ CLOSING,
+ CONNECTION_CONNECTED,
+ CONNECTION_SUSPENDED,
+ CONNECTION_RECONNECTED,
+ CONNECTION_LOST,
+ CONNECTION_READ_ONLY
}
http://git-wip-us.apache.org/repos/asf/curator/blob/c8d49ae8/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CreateSpec.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CreateSpec.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CreateSpec.java
index 8e7acf7..d451b26 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CreateSpec.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CreateSpec.java
@@ -28,7 +28,7 @@ public class CreateSpec
public String path;
@ThriftField(2)
- public String data;
+ public byte[] data;
@ThriftField(3)
public CreateMode mode;
@@ -49,7 +49,7 @@ public class CreateSpec
{
}
- public CreateSpec(String path, String data, CreateMode mode, boolean doAsync, boolean compressed, boolean creatingParentsIfNeeded, boolean withProtection)
+ public CreateSpec(String path, byte[] data, CreateMode mode, boolean doAsync, boolean compressed, boolean creatingParentsIfNeeded, boolean withProtection)
{
this.path = path;
this.data = data;
http://git-wip-us.apache.org/repos/asf/curator/blob/c8d49ae8/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java
index 34eff59..bca32d9 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java
@@ -1,3 +1,4 @@
+
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -29,6 +30,8 @@ import org.apache.curator.framework.api.CreateBuilder;
import org.apache.curator.framework.api.CreateModable;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.PathAndBytesable;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.x.rpc.RpcManager;
import org.apache.curator.x.rpc.idl.event.EventService;
@@ -38,11 +41,12 @@ import java.util.UUID;
@ThriftService("CuratorService")
public class CuratorProjectionService
{
- private final RpcManager rpcManager = new RpcManager();
+ private final RpcManager rpcManager;
private final EventService eventService;
- public CuratorProjectionService(EventService eventService)
+ public CuratorProjectionService(RpcManager rpcManager, EventService eventService)
{
+ this.rpcManager = rpcManager;
this.eventService = eventService;
}
@@ -53,7 +57,19 @@ public class CuratorProjectionService
String id = UUID.randomUUID().toString();
client.start();
rpcManager.add(id, client);
- return new CuratorProjection(id);
+ final CuratorProjection projection = new CuratorProjection(id);
+
+ ConnectionStateListener listener = new ConnectionStateListener()
+ {
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ eventService.addEvent(new RpcCuratorEvent(projection, newState));
+ }
+ };
+ client.getConnectionStateListenable().addListener(listener);
+
+ return projection;
}
@ThriftMethod
@@ -84,7 +100,10 @@ public class CuratorProjectionService
{
builder = castBuilder(builder, CreateBuilder.class).withProtection();
}
- builder = castBuilder(builder, CreateModable.class).withMode(getRealMode(createSpec.mode));
+ if ( createSpec.mode != null )
+ {
+ builder = castBuilder(builder, CreateModable.class).withMode(getRealMode(createSpec.mode));
+ }
if ( createSpec.doAsync )
{
@@ -99,7 +118,7 @@ public class CuratorProjectionService
builder = castBuilder(builder, Backgroundable.class).inBackground(backgroundCallback);
}
- return String.valueOf(castBuilder(builder, PathAndBytesable.class).forPath(createSpec.path, createSpec.data.getBytes()));
+ return String.valueOf(castBuilder(builder, PathAndBytesable.class).forPath(createSpec.path, createSpec.data));
}
private org.apache.zookeeper.CreateMode getRealMode(CreateMode mode)
http://git-wip-us.apache.org/repos/asf/curator/blob/c8d49ae8/curator-x-rpc/src/main/thrift/curator.thrift
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/thrift/curator.thrift b/curator-x-rpc/src/main/thrift/curator.thrift
index b9d5beb..100f456 100644
--- a/curator-x-rpc/src/main/thrift/curator.thrift
+++ b/curator-x-rpc/src/main/thrift/curator.thrift
@@ -8,7 +8,7 @@ enum CreateMode {
}
enum CuratorEventType {
- CREATE, DELETE, EXISTS, GET_DATA, SET_DATA, CHILDREN, SYNC, GET_ACL, SET_ACL, WATCHED, CLOSING
+ PING, CREATE, DELETE, EXISTS, GET_DATA, SET_DATA, CHILDREN, SYNC, GET_ACL, SET_ACL, WATCHED, CLOSING, CONNECTION_CONNECTED, CONNECTION_SUSPENDED, CONNECTION_RECONNECTED, CONNECTION_LOST, CONNECTION_READ_ONLY
}
enum EventType {
@@ -21,7 +21,7 @@ enum KeeperState {
struct CreateSpec {
1: string path;
- 2: string data;
+ 2: binary data;
3: CreateMode mode;
4: bool doAsync;
5: bool compressed;
@@ -87,5 +87,5 @@ service CuratorService {
}
service EventService {
- CuratorEvent getNextEvent();
+ CuratorEvent getNextEvent(1: list<CuratorProjection> projections);
}
http://git-wip-us.apache.org/repos/asf/curator/blob/c8d49ae8/curator-x-rpc/src/test/java/org/apache/curator/generated/CreateSpec.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/test/java/org/apache/curator/generated/CreateSpec.java b/curator-x-rpc/src/test/java/org/apache/curator/generated/CreateSpec.java
index 67437d3..4488285 100644
--- a/curator-x-rpc/src/test/java/org/apache/curator/generated/CreateSpec.java
+++ b/curator-x-rpc/src/test/java/org/apache/curator/generated/CreateSpec.java
@@ -50,7 +50,7 @@ public class CreateSpec implements org.apache.thrift.TBase<CreateSpec, CreateSpe
}
public String path; // required
- public String data; // required
+ public ByteBuffer data; // required
/**
*
* @see CreateMode
@@ -153,7 +153,7 @@ public class CreateSpec implements org.apache.thrift.TBase<CreateSpec, CreateSpe
tmpMap.put(_Fields.PATH, new org.apache.thrift.meta_data.FieldMetaData("path", org.apache.thrift.TFieldRequirementType.DEFAULT,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
tmpMap.put(_Fields.DATA, new org.apache.thrift.meta_data.FieldMetaData("data", org.apache.thrift.TFieldRequirementType.DEFAULT,
- new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING , true)));
tmpMap.put(_Fields.MODE, new org.apache.thrift.meta_data.FieldMetaData("mode", org.apache.thrift.TFieldRequirementType.DEFAULT,
new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, CreateMode.class)));
tmpMap.put(_Fields.DO_ASYNC, new org.apache.thrift.meta_data.FieldMetaData("doAsync", org.apache.thrift.TFieldRequirementType.DEFAULT,
@@ -173,7 +173,7 @@ public class CreateSpec implements org.apache.thrift.TBase<CreateSpec, CreateSpe
public CreateSpec(
String path,
- String data,
+ ByteBuffer data,
CreateMode mode,
boolean doAsync,
boolean compressed,
@@ -203,7 +203,8 @@ public class CreateSpec implements org.apache.thrift.TBase<CreateSpec, CreateSpe
this.path = other.path;
}
if (other.isSetData()) {
- this.data = other.data;
+ this.data = org.apache.thrift.TBaseHelper.copyBinary(other.data);
+;
}
if (other.isSetMode()) {
this.mode = other.mode;
@@ -257,11 +258,21 @@ public class CreateSpec implements org.apache.thrift.TBase<CreateSpec, CreateSpe
}
}
- public String getData() {
- return this.data;
+ public byte[] getData() {
+ setData(org.apache.thrift.TBaseHelper.rightSize(data));
+ return data == null ? null : data.array();
+ }
+
+ public ByteBuffer bufferForData() {
+ return data;
+ }
+
+ public CreateSpec setData(byte[] data) {
+ setData(data == null ? (ByteBuffer)null : ByteBuffer.wrap(data));
+ return this;
}
- public CreateSpec setData(String data) {
+ public CreateSpec setData(ByteBuffer data) {
this.data = data;
return this;
}
@@ -419,7 +430,7 @@ public class CreateSpec implements org.apache.thrift.TBase<CreateSpec, CreateSpe
if (value == null) {
unsetData();
} else {
- setData((String)value);
+ setData((ByteBuffer)value);
}
break;
@@ -712,7 +723,7 @@ public class CreateSpec implements org.apache.thrift.TBase<CreateSpec, CreateSpe
if (this.data == null) {
sb.append("null");
} else {
- sb.append(this.data);
+ org.apache.thrift.TBaseHelper.toString(this.data, sb);
}
first = false;
if (!first) sb.append(", ");
@@ -794,7 +805,7 @@ public class CreateSpec implements org.apache.thrift.TBase<CreateSpec, CreateSpe
break;
case 2: // DATA
if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
- struct.data = iprot.readString();
+ struct.data = iprot.readBinary();
struct.setDataIsSet(true);
} else {
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
@@ -862,7 +873,7 @@ public class CreateSpec implements org.apache.thrift.TBase<CreateSpec, CreateSpe
}
if (struct.data != null) {
oprot.writeFieldBegin(DATA_FIELD_DESC);
- oprot.writeString(struct.data);
+ oprot.writeBinary(struct.data);
oprot.writeFieldEnd();
}
if (struct.mode != null) {
@@ -926,7 +937,7 @@ public class CreateSpec implements org.apache.thrift.TBase<CreateSpec, CreateSpe
oprot.writeString(struct.path);
}
if (struct.isSetData()) {
- oprot.writeString(struct.data);
+ oprot.writeBinary(struct.data);
}
if (struct.isSetMode()) {
oprot.writeI32(struct.mode.getValue());
@@ -954,7 +965,7 @@ public class CreateSpec implements org.apache.thrift.TBase<CreateSpec, CreateSpe
struct.setPathIsSet(true);
}
if (incoming.get(1)) {
- struct.data = iprot.readString();
+ struct.data = iprot.readBinary();
struct.setDataIsSet(true);
}
if (incoming.get(2)) {
http://git-wip-us.apache.org/repos/asf/curator/blob/c8d49ae8/curator-x-rpc/src/test/java/org/apache/curator/generated/CuratorEventType.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/test/java/org/apache/curator/generated/CuratorEventType.java b/curator-x-rpc/src/test/java/org/apache/curator/generated/CuratorEventType.java
index 60350fc..ce31158 100644
--- a/curator-x-rpc/src/test/java/org/apache/curator/generated/CuratorEventType.java
+++ b/curator-x-rpc/src/test/java/org/apache/curator/generated/CuratorEventType.java
@@ -12,17 +12,23 @@ import java.util.HashMap;
import org.apache.thrift.TEnum;
public enum CuratorEventType implements org.apache.thrift.TEnum {
- CREATE(0),
- DELETE(1),
- EXISTS(2),
- GET_DATA(3),
- SET_DATA(4),
- CHILDREN(5),
- SYNC(6),
- GET_ACL(7),
- SET_ACL(8),
- WATCHED(9),
- CLOSING(10);
+ PING(0),
+ CREATE(1),
+ DELETE(2),
+ EXISTS(3),
+ GET_DATA(4),
+ SET_DATA(5),
+ CHILDREN(6),
+ SYNC(7),
+ GET_ACL(8),
+ SET_ACL(9),
+ WATCHED(10),
+ CLOSING(11),
+ CONNECTION_CONNECTED(12),
+ CONNECTION_SUSPENDED(13),
+ CONNECTION_RECONNECTED(14),
+ CONNECTION_LOST(15),
+ CONNECTION_READ_ONLY(16);
private final int value;
@@ -44,27 +50,39 @@ public enum CuratorEventType implements org.apache.thrift.TEnum {
public static CuratorEventType findByValue(int value) {
switch (value) {
case 0:
- return CREATE;
+ return PING;
case 1:
- return DELETE;
+ return CREATE;
case 2:
- return EXISTS;
+ return DELETE;
case 3:
- return GET_DATA;
+ return EXISTS;
case 4:
- return SET_DATA;
+ return GET_DATA;
case 5:
- return CHILDREN;
+ return SET_DATA;
case 6:
- return SYNC;
+ return CHILDREN;
case 7:
- return GET_ACL;
+ return SYNC;
case 8:
- return SET_ACL;
+ return GET_ACL;
case 9:
- return WATCHED;
+ return SET_ACL;
case 10:
+ return WATCHED;
+ case 11:
return CLOSING;
+ case 12:
+ return CONNECTION_CONNECTED;
+ case 13:
+ return CONNECTION_SUSPENDED;
+ case 14:
+ return CONNECTION_RECONNECTED;
+ case 15:
+ return CONNECTION_LOST;
+ case 16:
+ return CONNECTION_READ_ONLY;
default:
return null;
}
http://git-wip-us.apache.org/repos/asf/curator/blob/c8d49ae8/curator-x-rpc/src/test/java/org/apache/curator/generated/EventService.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/test/java/org/apache/curator/generated/EventService.java b/curator-x-rpc/src/test/java/org/apache/curator/generated/EventService.java
index 7c8e294..7b41888 100644
--- a/curator-x-rpc/src/test/java/org/apache/curator/generated/EventService.java
+++ b/curator-x-rpc/src/test/java/org/apache/curator/generated/EventService.java
@@ -36,13 +36,13 @@ public class EventService {
public interface Iface {
- public CuratorEvent getNextEvent() throws org.apache.thrift.TException;
+ public CuratorEvent getNextEvent(List<CuratorProjection> projections) throws org.apache.thrift.TException;
}
public interface AsyncIface {
- public void getNextEvent(org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
+ public void getNextEvent(List<CuratorProjection> projections, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
}
@@ -66,15 +66,16 @@ public class EventService {
super(iprot, oprot);
}
- public CuratorEvent getNextEvent() throws org.apache.thrift.TException
+ public CuratorEvent getNextEvent(List<CuratorProjection> projections) throws org.apache.thrift.TException
{
- send_getNextEvent();
+ send_getNextEvent(projections);
return recv_getNextEvent();
}
- public void send_getNextEvent() throws org.apache.thrift.TException
+ public void send_getNextEvent(List<CuratorProjection> projections) throws org.apache.thrift.TException
{
getNextEvent_args args = new getNextEvent_args();
+ args.setProjections(projections);
sendBase("getNextEvent", args);
}
@@ -106,21 +107,24 @@ public class EventService {
super(protocolFactory, clientManager, transport);
}
- public void getNextEvent(org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException {
+ public void getNextEvent(List<CuratorProjection> projections, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException {
checkReady();
- getNextEvent_call method_call = new getNextEvent_call(resultHandler, this, ___protocolFactory, ___transport);
+ getNextEvent_call method_call = new getNextEvent_call(projections, resultHandler, this, ___protocolFactory, ___transport);
this.___currentMethod = method_call;
___manager.call(method_call);
}
public static class getNextEvent_call extends org.apache.thrift.async.TAsyncMethodCall {
- public getNextEvent_call(org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
+ private List<CuratorProjection> projections;
+ public getNextEvent_call(List<CuratorProjection> projections, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
super(client, protocolFactory, transport, resultHandler, false);
+ this.projections = projections;
}
public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("getNextEvent", org.apache.thrift.protocol.TMessageType.CALL, 0));
getNextEvent_args args = new getNextEvent_args();
+ args.setProjections(projections);
args.write(prot);
prot.writeMessageEnd();
}
@@ -167,7 +171,7 @@ public class EventService {
public getNextEvent_result getResult(I iface, getNextEvent_args args) throws org.apache.thrift.TException {
getNextEvent_result result = new getNextEvent_result();
- result.success = iface.getNextEvent();
+ result.success = iface.getNextEvent(args.projections);
return result;
}
}
@@ -236,7 +240,7 @@ public class EventService {
}
public void start(I iface, getNextEvent_args args, org.apache.thrift.async.AsyncMethodCallback<CuratorEvent> resultHandler) throws TException {
- iface.getNextEvent(resultHandler);
+ iface.getNextEvent(args.projections,resultHandler);
}
}
@@ -245,6 +249,7 @@ public class EventService {
public static class getNextEvent_args implements org.apache.thrift.TBase<getNextEvent_args, getNextEvent_args._Fields>, java.io.Serializable, Cloneable, Comparable<getNextEvent_args> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("getNextEvent_args");
+ private static final org.apache.thrift.protocol.TField PROJECTIONS_FIELD_DESC = new org.apache.thrift.protocol.TField("projections", org.apache.thrift.protocol.TType.LIST, (short)1);
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
static {
@@ -252,10 +257,11 @@ public class EventService {
schemes.put(TupleScheme.class, new getNextEvent_argsTupleSchemeFactory());
}
+ public List<CuratorProjection> projections; // required
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
-;
+ PROJECTIONS((short)1, "projections");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
@@ -270,6 +276,8 @@ public class EventService {
*/
public static _Fields findByThriftId(int fieldId) {
switch(fieldId) {
+ case 1: // PROJECTIONS
+ return PROJECTIONS;
default:
return null;
}
@@ -308,9 +316,14 @@ public class EventService {
return _fieldName;
}
}
+
+ // isset id assignments
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.PROJECTIONS, new org.apache.thrift.meta_data.FieldMetaData("projections", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST,
+ new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, CuratorProjection.class))));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(getNextEvent_args.class, metaDataMap);
}
@@ -318,10 +331,24 @@ public class EventService {
public getNextEvent_args() {
}
+ public getNextEvent_args(
+ List<CuratorProjection> projections)
+ {
+ this();
+ this.projections = projections;
+ }
+
/**
* Performs a deep copy on <i>other</i>.
*/
public getNextEvent_args(getNextEvent_args other) {
+ if (other.isSetProjections()) {
+ List<CuratorProjection> __this__projections = new ArrayList<CuratorProjection>(other.projections.size());
+ for (CuratorProjection other_element : other.projections) {
+ __this__projections.add(new CuratorProjection(other_element));
+ }
+ this.projections = __this__projections;
+ }
}
public getNextEvent_args deepCopy() {
@@ -330,15 +357,66 @@ public class EventService {
@Override
public void clear() {
+ this.projections = null;
+ }
+
+ public int getProjectionsSize() {
+ return (this.projections == null) ? 0 : this.projections.size();
+ }
+
+ public java.util.Iterator<CuratorProjection> getProjectionsIterator() {
+ return (this.projections == null) ? null : this.projections.iterator();
+ }
+
+ public void addToProjections(CuratorProjection elem) {
+ if (this.projections == null) {
+ this.projections = new ArrayList<CuratorProjection>();
+ }
+ this.projections.add(elem);
+ }
+
+ public List<CuratorProjection> getProjections() {
+ return this.projections;
+ }
+
+ public getNextEvent_args setProjections(List<CuratorProjection> projections) {
+ this.projections = projections;
+ return this;
+ }
+
+ public void unsetProjections() {
+ this.projections = null;
+ }
+
+ /** Returns true if field projections is set (has been assigned a value) and false otherwise */
+ public boolean isSetProjections() {
+ return this.projections != null;
+ }
+
+ public void setProjectionsIsSet(boolean value) {
+ if (!value) {
+ this.projections = null;
+ }
}
public void setFieldValue(_Fields field, Object value) {
switch (field) {
+ case PROJECTIONS:
+ if (value == null) {
+ unsetProjections();
+ } else {
+ setProjections((List<CuratorProjection>)value);
+ }
+ break;
+
}
}
public Object getFieldValue(_Fields field) {
switch (field) {
+ case PROJECTIONS:
+ return getProjections();
+
}
throw new IllegalStateException();
}
@@ -350,6 +428,8 @@ public class EventService {
}
switch (field) {
+ case PROJECTIONS:
+ return isSetProjections();
}
throw new IllegalStateException();
}
@@ -367,6 +447,15 @@ public class EventService {
if (that == null)
return false;
+ boolean this_present_projections = true && this.isSetProjections();
+ boolean that_present_projections = true && that.isSetProjections();
+ if (this_present_projections || that_present_projections) {
+ if (!(this_present_projections && that_present_projections))
+ return false;
+ if (!this.projections.equals(that.projections))
+ return false;
+ }
+
return true;
}
@@ -383,6 +472,16 @@ public class EventService {
int lastComparison = 0;
+ lastComparison = Boolean.valueOf(isSetProjections()).compareTo(other.isSetProjections());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetProjections()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.projections, other.projections);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
return 0;
}
@@ -403,6 +502,13 @@ public class EventService {
StringBuilder sb = new StringBuilder("getNextEvent_args(");
boolean first = true;
+ sb.append("projections:");
+ if (this.projections == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.projections);
+ }
+ first = false;
sb.append(")");
return sb.toString();
}
@@ -446,6 +552,25 @@ public class EventService {
break;
}
switch (schemeField.id) {
+ case 1: // PROJECTIONS
+ if (schemeField.type == org.apache.thrift.protocol.TType.LIST) {
+ {
+ org.apache.thrift.protocol.TList _list16 = iprot.readListBegin();
+ struct.projections = new ArrayList<CuratorProjection>(_list16.size);
+ for (int _i17 = 0; _i17 < _list16.size; ++_i17)
+ {
+ CuratorProjection _elem18;
+ _elem18 = new CuratorProjection();
+ _elem18.read(iprot);
+ struct.projections.add(_elem18);
+ }
+ iprot.readListEnd();
+ }
+ struct.setProjectionsIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
@@ -461,6 +586,18 @@ public class EventService {
struct.validate();
oprot.writeStructBegin(STRUCT_DESC);
+ if (struct.projections != null) {
+ oprot.writeFieldBegin(PROJECTIONS_FIELD_DESC);
+ {
+ oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, struct.projections.size()));
+ for (CuratorProjection _iter19 : struct.projections)
+ {
+ _iter19.write(oprot);
+ }
+ oprot.writeListEnd();
+ }
+ oprot.writeFieldEnd();
+ }
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -478,11 +615,40 @@ public class EventService {
@Override
public void write(org.apache.thrift.protocol.TProtocol prot, getNextEvent_args struct) throws org.apache.thrift.TException {
TTupleProtocol oprot = (TTupleProtocol) prot;
+ BitSet optionals = new BitSet();
+ if (struct.isSetProjections()) {
+ optionals.set(0);
+ }
+ oprot.writeBitSet(optionals, 1);
+ if (struct.isSetProjections()) {
+ {
+ oprot.writeI32(struct.projections.size());
+ for (CuratorProjection _iter20 : struct.projections)
+ {
+ _iter20.write(oprot);
+ }
+ }
+ }
}
@Override
public void read(org.apache.thrift.protocol.TProtocol prot, getNextEvent_args struct) throws org.apache.thrift.TException {
TTupleProtocol iprot = (TTupleProtocol) prot;
+ BitSet incoming = iprot.readBitSet(1);
+ if (incoming.get(0)) {
+ {
+ org.apache.thrift.protocol.TList _list21 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, iprot.readI32());
+ struct.projections = new ArrayList<CuratorProjection>(_list21.size);
+ for (int _i22 = 0; _i22 < _list21.size; ++_i22)
+ {
+ CuratorProjection _elem23;
+ _elem23 = new CuratorProjection();
+ _elem23.read(iprot);
+ struct.projections.add(_elem23);
+ }
+ }
+ struct.setProjectionsIsSet(true);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/c8d49ae8/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/TestClient.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/TestClient.java b/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/TestClient.java
index e479031..79cea30 100644
--- a/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/TestClient.java
+++ b/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/TestClient.java
@@ -18,21 +18,26 @@
*/
package org.apache.curator.x.rpc;
+import org.apache.curator.generated.CreateSpec;
import org.apache.curator.generated.CuratorEvent;
import org.apache.curator.generated.CuratorProjection;
import org.apache.curator.generated.CuratorProjectionSpec;
import org.apache.curator.generated.CuratorService;
import org.apache.curator.generated.EventService;
+import org.apache.curator.test.TestingServer;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
+import java.util.Arrays;
import java.util.concurrent.Executors;
public class TestClient
{
- public static void main(String[] args) throws TException
+ public static void main(String[] args) throws Exception
{
+ new TestingServer(2181);
+
TSocket clientTransport = new TSocket("localhost", 8899);
clientTransport.open();
TProtocol clientProtocol = new TBinaryProtocol(clientTransport);
@@ -43,24 +48,36 @@ public class TestClient
TProtocol eventProtocol = new TBinaryProtocol(eventTransport);
final EventService.Client serviceClient = new EventService.Client(eventProtocol);
+ final CuratorProjection curatorProjection = client.newCuratorProjection(new CuratorProjectionSpec());
+
Executors.newSingleThreadExecutor().submit
- (new Runnable()
+ (
+ new Runnable()
{
@Override
public void run()
{
try
{
- CuratorEvent nextEvent = serviceClient.getNextEvent();
- System.out.println(nextEvent.type);
+ //noinspection InfiniteLoopStatement
+ for(;;)
+ {
+ CuratorEvent nextEvent = serviceClient.getNextEvent(Arrays.asList(curatorProjection));
+ System.out.println(nextEvent.type);
+ }
}
catch ( TException e )
{
e.printStackTrace();
}
}
- });
+ }
+ );
- CuratorProjection curatorProjection = client.newCuratorProjection(new CuratorProjectionSpec());
+ CreateSpec createSpec = new CreateSpec();
+ createSpec.path = "/a/b/c";
+ createSpec.creatingParentsIfNeeded = true;
+ String path = client.create(curatorProjection, createSpec);
+ System.out.println(path);
}
}