You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2016/02/22 22:43:08 UTC
[020/100] [abbrv] [partial] incubator-geode git commit: Merge
remote-tracking branch 'origin/develop' into feature/GEODE-917
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/redis/GemFireRedisServer.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/redis/GemFireRedisServer.java
index e90b724,0000000..1f7e7a9
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/redis/GemFireRedisServer.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/redis/GemFireRedisServer.java
@@@ -1,717 -1,0 +1,717 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.redis;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.ServerChannel;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.oio.OioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.oio.OioServerSocketChannel;
+import io.netty.util.concurrent.Future;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.gemstone.gemfire.LogWriter;
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.EntryEvent;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionDestroyedException;
+import com.gemstone.gemfire.cache.RegionFactory;
+import com.gemstone.gemfire.cache.RegionShortcut;
+import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.internal.SocketCreator;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.redis.ByteArrayWrapper;
+import com.gemstone.gemfire.internal.redis.ByteToCommandDecoder;
+import com.gemstone.gemfire.internal.redis.Coder;
+import com.gemstone.gemfire.internal.redis.ExecutionHandlerContext;
+import com.gemstone.gemfire.internal.redis.RedisDataType;
+import com.gemstone.gemfire.internal.redis.RegionProvider;
- import com.gemstone.gemfire.internal.redis.executor.hll.HyperLogLogPlus;
++import com.gemstone.gemfire.internal.hll.HyperLogLogPlus;
+
+/**
+ * The GemFireRedisServer is a server that understands the Redis protocol. As
+ * commands are sent to the server, each command is picked up by a thread,
+ * interpreted and then executed and a response is sent back to the client. The
+ * default connection port is 6379 but that can be altered when run through GFSH
+ * or started through the provided static main class.
+ * <p>
+ * Each Redis data type instance is stored in a separate {@link Region} except
+ * for the Strings and HyperLogLogs which are collectively stored in one Region
+ * respectively. That Region along with a meta data region used internally are
+ * protected so the client may not store keys with the name {@link GemFireRedisServer#REDIS_META_DATA_REGION}
+ * or {@link GemFireRedisServer#STRING_REGION}. The default Region type is
+ * {@link RegionShortcut#PARTITION} although this can be changed by specifying the
+ * SystemProperty {@value #DEFAULT_REGION_SYS_PROP_NAME} to a type defined by {@link RegionShortcut}.
+ * If the {@link GemFireRedisServer#NUM_THREADS_SYS_PROP_NAME} system property is set to 0,
+ * one thread per client will be created. Otherwise a worker thread pool of specified size is
+ * used or a default size of 4 * {@link Runtime#availableProcessors()} if the property is not set.
+ * <p>
+ * Setting the AUTH password requires setting the property "redis-password" just as "redis-port"
+ * would be in xml or through GFSH.
+ * <p>
+ * The supported commands are as follows:
+ * <p>
+ * Supported String commands - APPEND, BITCOUNT, BITOP, BITPOS, DECR, DECRBY,
+ * GET, GETBIT, GETRANGE, GETSET, INCR, INCRBY, INCRBYFLOAT, MGET, MSET, MSETNX,
+ * PSETEX, SET, SETBIT, SETEX, SETNX, STRLEN
+ * <p>
+ * Supported List commands - LINDEX, LLEN, LPOP, LPUSH, LPUSHX, LRANGE,
+ * LREM, LSET, LTRIM, RPOP, RPUSH, RPUSHX
+ * <p>
+ * Supported Hash commands - HDEL, HEXISTS, HGET, HGETALL, HINCRBY, HINCRBYFLOAT,
+ * HKEYS, HMGET, HMSET, HSETNX, HLEN, HSCAN, HSET, HVALS
+ * <p>
+ * Supported Set commands - SADD, SCARD, SDIFF, SDIFFSTORE, SINTER,
+ * SINTERSTORE, SISMEMBER, SMEMBERS, SMOVE, SREM, SPOP, SRANDMEMBER,
+ * SCAN, SUNION, SUNIONSTORE
+ * <p>
+ * Supported SortedSet commands - ZADD, ZCARD, ZCOUNT, ZINCRBY, ZLEXCOUNT,
+ * ZRANGE, ZRANGEBYLEX, ZRANGEBYSCORE, ZRANK, ZREM, ZREMRANGEBYLEX,
+ * ZREMRANGEBYRANK, ZREMRANGEBYSCORE, ZREVRANGE, ZREVRANGEBYSCORE, ZREVRANK,
+ * ZSCAN, ZSCORE
+ * <p>
+ * Supported HyperLogLog commands - PFADD, PFCOUNT, PFMERGE
+ * <p>
+ * Supported Keys commands - DEL, DBSIZE, EXISTS, EXPIRE, EXPIREAT, FLUSHDB, FLUSHALL,
+ * KEYS, PERSIST, PEXPIRE, PEXPIREAT, PTTL, SCAN, TTL
+ * <p>
+ * Supported Transaction commands - DISCARD, EXEC, MULTI
+ * <P>
+ * Supported Server commands - AUTH, ECHO, PING, TIME, QUIT
+ * <p>
+ * <p>
+ * The command executors are not explicitly documented but the functionality
+ * can be found at <a href="http://redis.io/commands">Redis Commands</a>
+ * <p>
+ * Exceptions to the Redis Commands Documents:<p>
+ * <ul>
+ * <li>Any command that removes keys and returns a count of removed
+ * entries will not return a total remove count but rather a count of how
+ * many entries have been removed that existed on the local vm, though
+ * all entries will be removed</li>
+ * <li>Any command that returns a count of newly set members has an
+ * unspecified return value. The command will work just as the Redis protocol
+ * states but the count will not necessary reflect the number set compared
+ * to overridden.</li>
+ * <li>Transactions work just as they would on a Redis instance, they are local
+ * transaction. Transactions cannot be executed on data that is not local to the
+ * executing server, that is on a partitioned region in a different server
+ * instance or on a persistent region that does not have transactions enabled.
+ * Also, you cannot watch or unwatch keys as all keys within a GemFire
+ * transaction are watched by default.</li>
+ * </ul>
+ * @author Vitaliy Gavrilov
+ *
+ */
+
+public class GemFireRedisServer {
+
+ /**
+ * Thread used to start main method
+ */
+ private static Thread mainThread = null;
+
+ /**
+ * The default Redis port as specified by their protocol, {@value #DEFAULT_REDIS_SERVER_PORT}
+ */
+ public static final int DEFAULT_REDIS_SERVER_PORT = 6379;
+
+ /**
+ * The number of threads that will work on handling requests
+ */
+ private final int numWorkerThreads;
+
+ /**
+ * The number of threads that will work socket selectors
+ */
+ private final int numSelectorThreads;
+
+ /**
+ * The actual port being used by the server
+ */
+ private final int serverPort;
+
+ /**
+ * The address to bind to
+ */
+ private final String bindAddress;
+
+ /**
+ * Connection timeout in milliseconds
+ */
+ private static final int connectTimeoutMillis = 1000;
+
+ /**
+ * Temporary constant whether to use old single thread per connection
+ * model for worker group
+ */
+ private boolean singleThreadPerConnection;
+
+ /**
+ * Logging level
+ */
+ private final String logLevel;
+
+ /**
+ * The cache instance pointer on this vm
+ */
+ private Cache cache;
+
+ /**
+ * Channel to be closed when shutting down
+ */
+ private Channel serverChannel;
+
+ /**
+ * Gem logwriter
+ */
+ private LogWriter logger;
+
+ private RegionProvider regionCache;
+
+ private final MetaCacheListener metaListener;
+
+ private EventLoopGroup bossGroup;
+ private EventLoopGroup workerGroup;
+ private final static int numExpirationThreads = 1;
+ private final ScheduledExecutorService expirationExecutor;
+
+ /**
+ * Map of futures to be executed for key expirations
+ */
+ private final ConcurrentMap<ByteArrayWrapper, ScheduledFuture<?>> expirationFutures;
+
+
+ /**
+ * The field that defines the name of the {@link Region} which holds all of
+ * the strings. The current value of this field is {@value #STRING_REGION}.
+ */
+ public static final String STRING_REGION = "__StRiNgS";
+
+ /**
+ * The field that defines the name of the {@link Region} which holds all of
+ * the HyperLogLogs. The current value of this field is {@value #HLL_REGION}.
+ */
+ public static final String HLL_REGION = "__HlL";
+
+ /**
+ * The field that defines the name of the {@link Region} which holds all of
+ * the Redis meta data. The current value of this field is {@value #REDIS_META_DATA_REGION}.
+ */
+ public static final String REDIS_META_DATA_REGION = "__ReDiS_MeTa_DaTa";
+
+ /**
+ * The system property name used to set the default {@link Region} creation
+ * type. The property name is {@value #DEFAULT_REGION_SYS_PROP_NAME} and the
+ * acceptable values are types defined by {@link RegionShortcut},
+ * i.e. "PARTITION" would be used for {@link RegionShortcut#PARTITION}.
+ */
+ public static final String DEFAULT_REGION_SYS_PROP_NAME = "gemfireredis.regiontype";
+
+ /**
+ * System property name that can be used to set the number of threads to be
+ * used by the GemFireRedisServer
+ */
+ public static final String NUM_THREADS_SYS_PROP_NAME = "gemfireredis.numthreads";
+
+ /**
+ * The actual {@link RegionShortcut} type specified by the system property
+ * {@value #DEFAULT_REGION_SYS_PROP_NAME}.
+ */
+ public final RegionShortcut DEFAULT_REGION_TYPE;
+
+ private boolean shutdown;
+ private boolean started;
+
+ /**
+ * Determine the {@link RegionShortcut} type from a String value.
+ * If the String value doesn't map to a RegionShortcut type then
+ * {@link RegionShortcut#PARTITION} will be used by default.
+ *
+ * @return {@link RegionShortcut}
+ */
+ private static RegionShortcut setRegionType() {
+ String regionType = System.getProperty(DEFAULT_REGION_SYS_PROP_NAME, "PARTITION");
+ RegionShortcut type;
+ try {
+ type = RegionShortcut.valueOf(regionType);
+ } catch (Exception e) {
+ type = RegionShortcut.PARTITION;
+ }
+ return type;
+ }
+
+ /**
+ * Helper method to set the number of worker threads
+ *
+ * @return If the System property {@value #NUM_THREADS_SYS_PROP_NAME} is set then that number
+ * is used, otherwise 4 * # of cores
+ */
+ private int setNumWorkerThreads() {
+ String prop = System.getProperty(NUM_THREADS_SYS_PROP_NAME);
+ int numCores = Runtime.getRuntime().availableProcessors();
+ int def = 4 * numCores;
+ if (prop == null || prop.isEmpty())
+ return def;
+ int threads;
+ try {
+ threads = Integer.parseInt(prop);
+ } catch (NumberFormatException e) {
+ return def;
+ }
+ return threads;
+ }
+
+ /**
+ * Constructor for {@link GemFireRedisServer} that will start the
+ * server on the given port and bind to the first non-loopback address
+ *
+ * @param port The port the server will bind to, will use {@value #DEFAULT_REDIS_SERVER_PORT} by default
+ */
+ public GemFireRedisServer(int port) {
+ this(null, port, null);
+ }
+
+ /**
+ * Constructor for {@link GemFireRedisServer} that will start the
+ * server and bind to the given address and port
+ *
+ * @param bindAddress The address to which the server will attempt to bind to
+ * @param port The port the server will bind to, will use {@value #DEFAULT_REDIS_SERVER_PORT} by default if argument is less than or equal to 0
+ */
+ public GemFireRedisServer(String bindAddress, int port) {
+ this(bindAddress, port, null);
+ }
+
+
+ /**
+ * Constructor for {@link GemFireRedisServer} that will start the
+ * server and bind to the given address and port. Keep in mind that the
+ * log level configuration will only be set if a {@link Cache} does not already
+ * exist, if one already exists then setting that property will have no effect.
+ *
+ * @param bindAddress The address to which the server will attempt to bind to
+ * @param port The port the server will bind to, will use {@value #DEFAULT_REDIS_SERVER_PORT} by default if argument is less than or equal to 0
+ * @param logLevel The logging level to be used by GemFire
+ */
+ public GemFireRedisServer(String bindAddress, int port, String logLevel) {
+ if (port <= 0)
+ this.serverPort = DEFAULT_REDIS_SERVER_PORT;
+ else
+ this.serverPort = port;
+ this.bindAddress = bindAddress;
+ this.logLevel = logLevel;
+ this.numWorkerThreads = setNumWorkerThreads();
+ if (this.numWorkerThreads == 0)
+ this.singleThreadPerConnection = true;
+ this.numSelectorThreads = 1;
+ this.metaListener = new MetaCacheListener();
+ this.expirationFutures = new ConcurrentHashMap<ByteArrayWrapper, ScheduledFuture<?>>();
+ this.expirationExecutor = Executors.newScheduledThreadPool(numExpirationThreads, new ThreadFactory() {
+ private final AtomicInteger counter = new AtomicInteger();
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r);
+ t.setName("GemFireRedis-ScheduledExecutor-" + counter.incrementAndGet());
+ t.setDaemon(true);
+ return t;
+ }
+
+ });
+ this.DEFAULT_REGION_TYPE = setRegionType();
+ this.shutdown = false;
+ this.started = false;
+ }
+
+ /**
+ * Helper method to get the host name to bind to
+ *
+ * @return The InetAddress to bind to
+ * @throws UnknownHostException
+ */
+ private InetAddress getBindAddress() throws UnknownHostException {
+ return this.bindAddress == null || this.bindAddress.isEmpty()
+ ? SocketCreator.getLocalHost()
+ : InetAddress.getByName(this.bindAddress);
+ }
+
+ /**
+ * This is function to call on a {@link GemFireRedisServer} instance
+ * to start it running
+ */
+ public synchronized void start() {
+ if (!started) {
+ try {
+ startGemFire();
+ initializeRedis();
+ startRedisServer();
+ } catch (IOException e) {
+ throw new RuntimeException("Could not start Server", e);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Could not start Server", e);
+ }
+ started = true;
+ }
+ }
+
+ /**
+ * Initializes the {@link Cache}, and creates Redis necessities
+ * Region and protects declares that {@link Region} to be protected.
+ * Also, every {@link GemFireRedisServer} will check for entries already in the
+ * meta data Region.
+ */
+ private void startGemFire() {
+ Cache c = GemFireCacheImpl.getInstance();
+ if (c == null) {
+ synchronized (GemFireRedisServer.class) {
+ c = GemFireCacheImpl.getInstance();
+ if (c == null) {
+ CacheFactory cacheFactory = new CacheFactory();
+ if (logLevel != null)
+ cacheFactory.set("log-level", logLevel);
+ c = cacheFactory.create();
+ }
+ }
+ }
+ this.cache = c;
+ this.logger = c.getLogger();
+ }
+
+ private void initializeRedis() {
+ synchronized (this.cache) {
+ RegionFactory<String, RedisDataType> rfMeta = cache.createRegionFactory(RegionShortcut.REPLICATE);
+ rfMeta.addCacheListener(this.metaListener);
+ RegionFactory<ByteArrayWrapper, ByteArrayWrapper> rfString = cache.createRegionFactory(DEFAULT_REGION_TYPE);
+ RegionFactory<ByteArrayWrapper, HyperLogLogPlus> rfHLL = cache.createRegionFactory(DEFAULT_REGION_TYPE);
+ Region<ByteArrayWrapper, ByteArrayWrapper> stringsRegion;
+ if ((stringsRegion = this.cache.getRegion(STRING_REGION)) == null)
+ stringsRegion = rfString.create(GemFireRedisServer.STRING_REGION);
+ Region<ByteArrayWrapper, HyperLogLogPlus> hLLRegion;
+ if ((hLLRegion = this.cache.getRegion(HLL_REGION)) == null)
+ hLLRegion = rfHLL.create(HLL_REGION);
+ Region<String, RedisDataType> redisMetaData;
+ if ((redisMetaData = this.cache.getRegion(REDIS_META_DATA_REGION)) == null)
+ redisMetaData = rfMeta.create(REDIS_META_DATA_REGION);
+ this.regionCache = new RegionProvider(stringsRegion, hLLRegion, redisMetaData, expirationFutures, expirationExecutor, this.DEFAULT_REGION_TYPE);
+ redisMetaData.put(REDIS_META_DATA_REGION, RedisDataType.REDIS_PROTECTED);
+ redisMetaData.put(HLL_REGION, RedisDataType.REDIS_PROTECTED);
+ redisMetaData.put(STRING_REGION, RedisDataType.REDIS_PROTECTED);
+ }
+ checkForRegions();
+ }
+
+ private void checkForRegions() {
+ Collection<Entry<String, RedisDataType>> entrySet = this.regionCache.metaEntrySet();
+ for (Entry<String, RedisDataType> entry: entrySet) {
+ String regionName = entry.getKey();
+ RedisDataType type = entry.getValue();
+ Region<?, ?> newRegion = cache.getRegion(regionName);
+ if (newRegion == null && type != RedisDataType.REDIS_STRING && type != RedisDataType.REDIS_HLL && type != RedisDataType.REDIS_PROTECTED) {
+ try {
+ this.regionCache.createRemoteRegionReferenceLocally(Coder.stringToByteArrayWrapper(regionName), type);
+ } catch (Exception e) {
+ if (logger.errorEnabled())
+ logger.error(e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Helper method to start the server listening for connections. The
+ * server is bound to the port specified by {@link GemFireRedisServer#serverPort}
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ private void startRedisServer() throws IOException, InterruptedException {
+ ThreadFactory selectorThreadFactory = new ThreadFactory() {
+ private final AtomicInteger counter = new AtomicInteger();
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r);
+ t.setName("GemFireRedisServer-SelectorThread-" + counter.incrementAndGet());
+ t.setDaemon(true);
+ return t;
+ }
+
+ };
+
+ ThreadFactory workerThreadFactory = new ThreadFactory() {
+ private final AtomicInteger counter = new AtomicInteger();
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r);
+ t.setName("GemFireRedisServer-WorkerThread-" + counter.incrementAndGet());
+ return t;
+ }
+
+ };
+
+ bossGroup = null;
+ workerGroup = null;
+ Class<? extends ServerChannel> socketClass = null;
+ if (singleThreadPerConnection) {
+ bossGroup = new OioEventLoopGroup(Integer.MAX_VALUE, selectorThreadFactory);
+ workerGroup = new OioEventLoopGroup(Integer.MAX_VALUE, workerThreadFactory);
+ socketClass = OioServerSocketChannel.class;
+ } else {
+ bossGroup = new NioEventLoopGroup(this.numSelectorThreads, selectorThreadFactory);
+ workerGroup = new NioEventLoopGroup(this.numWorkerThreads, workerThreadFactory);
+ socketClass = NioServerSocketChannel.class;
+ }
+ InternalDistributedSystem system = (InternalDistributedSystem) cache.getDistributedSystem();
+ String pwd = system.getConfig().getRedisPassword();
+ final byte[] pwdB = Coder.stringToBytes(pwd);
+ ServerBootstrap b = new ServerBootstrap();
+ b.group(bossGroup, workerGroup)
+ .channel(socketClass)
+ .childHandler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ public void initChannel(SocketChannel ch) throws Exception {
+ if (logger.fineEnabled())
+ logger.fine("GemFireRedisServer-Connection established with " + ch.remoteAddress());
+ ChannelPipeline p = ch.pipeline();
+ p.addLast(ByteToCommandDecoder.class.getSimpleName(), new ByteToCommandDecoder());
+ p.addLast(ExecutionHandlerContext.class.getSimpleName(), new ExecutionHandlerContext(ch, cache, regionCache, GemFireRedisServer.this, pwdB));
+ }
+ })
+ .option(ChannelOption.SO_REUSEADDR, true)
+ .option(ChannelOption.SO_RCVBUF, getBufferSize())
+ .childOption(ChannelOption.SO_KEEPALIVE, true)
+ .childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, GemFireRedisServer.connectTimeoutMillis)
+ .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+
+ // Bind and start to accept incoming connections.
+ ChannelFuture f = b.bind(new InetSocketAddress(getBindAddress(), serverPort)).sync();
+ if (this.logger.infoEnabled()) {
+ String logMessage = "GemFireRedisServer started {" + getBindAddress() + ":" + serverPort + "}, Selector threads: " + this.numSelectorThreads;
+ if (this.singleThreadPerConnection)
+ logMessage += ", One worker thread per connection";
+ else
+ logMessage += ", Worker threads: " + this.numWorkerThreads;
+ this.logger.info(logMessage);
+ }
+ this.serverChannel = f.channel();
+ }
+
+ /**
+ * Takes an entry event and processes it. If the entry denotes that a
+ * {@link RedisDataType#REDIS_LIST} or {@link RedisDataType#REDIS_SORTEDSET}
+ * was created then this function will call the necessary calls to create the
+ * parameterized queries for those keys.
+ *
+ * @param event EntryEvent from meta data region
+ */
+ private void afterKeyCreate(EntryEvent<String, RedisDataType> event) {
+ if (event.isOriginRemote()) {
+ final String key = (String) event.getKey();
+ final RedisDataType value = event.getNewValue();
+ if (value != RedisDataType.REDIS_STRING && value != RedisDataType.REDIS_HLL && value != RedisDataType.REDIS_PROTECTED) {
+ try {
+ this.regionCache.createRemoteRegionReferenceLocally(Coder.stringToByteArrayWrapper(key), value);
+ } catch (RegionDestroyedException ignore) { // Region already destroyed, ignore
+ }
+ }
+ }
+ }
+
+ /**
+ * When a key is removed then this function will make sure the associated
+ * queries with the key are also removed from each vm to avoid unnecessary
+ * data retention
+ */
+ private void afterKeyDestroy(EntryEvent<String, RedisDataType> event) {
+ if (event.isOriginRemote()) {
+ final String key = (String) event.getKey();
+ final RedisDataType value = event.getOldValue();
+ if (value != null && value != RedisDataType.REDIS_STRING && value != RedisDataType.REDIS_HLL && value != RedisDataType.REDIS_PROTECTED) {
+ ByteArrayWrapper kW = Coder.stringToByteArrayWrapper(key);
+ Region<?, ?> r = this.regionCache.getRegion(kW);
+ if (r != null) {
+ this.regionCache.removeRegionReferenceLocally(kW, value);
+ }
+ }
+ }
+ }
+
+ private final class MetaCacheListener extends CacheListenerAdapter<String, RedisDataType> {
+
+ @Override
+ public void afterCreate(EntryEvent<String, RedisDataType> event) {
+ afterKeyCreate(event);
+ }
+
+ @Override
+ public void afterDestroy(EntryEvent<String, RedisDataType> event) {
+ afterKeyDestroy(event);
+ }
+ }
+
+ /**
+ * Helper method to get GemFire set socket buffer size,
+ * possibly a default of 32k
+ *
+ * @return Buffer size to use for server
+ */
+ private int getBufferSize() {
+ InternalDistributedSystem system = (InternalDistributedSystem) cache.getDistributedSystem();
+ return system.getConfig().getSocketBufferSize();
+ }
+
+ /**
+ * Shutdown method for {@link GemFireRedisServer}. This closes the {@link Cache},
+ * interrupts all execution and forcefully closes all connections.
+ */
+ public synchronized void shutdown() {
+ if (!shutdown) {
+ if (logger.infoEnabled())
+ logger.info("GemFireRedisServer shutting down");
+ ChannelFuture closeFuture = this.serverChannel.closeFuture();
+ Future<?> c = workerGroup.shutdownGracefully();
+ Future<?> c2 = bossGroup.shutdownGracefully();
+ this.serverChannel.close();
+ c.syncUninterruptibly();
+ c2.syncUninterruptibly();
+ this.regionCache.close();
+ if (mainThread != null)
+ mainThread.interrupt();
+ for (ScheduledFuture<?> f : this.expirationFutures.values())
+ f.cancel(true);
+ this.expirationFutures.clear();
+ this.expirationExecutor.shutdownNow();
+ closeFuture.syncUninterruptibly();
+ shutdown = true;
+ }
+ }
+
+ /**
+ * Static main method that allows the {@link GemFireRedisServer} to be
+ * started from the command line. The supported command line arguments are
+ * <p>-port=
+ * <br>-bind-address=
+ * <br>-log-level=
+ *
+ * @param args Command line args
+ */
+ public static void main(String[] args) {
+ int port = DEFAULT_REDIS_SERVER_PORT;
+ String bindAddress = null;
+ String logLevel = null;
+ for (String arg: args) {
+ if (arg.startsWith("-port"))
+ port = getPort(arg);
+ else if (arg.startsWith("-bind-address"))
+ bindAddress = getBindAddress(arg);
+ else if (arg.startsWith("-log-level"))
+ logLevel = getLogLevel(arg);
+ }
+ mainThread = Thread.currentThread();
+ GemFireRedisServer server = new GemFireRedisServer(bindAddress, port, logLevel);
+ server.start();
+ while(true) {
+ try {
+ Thread.sleep(Long.MAX_VALUE);
+ } catch (InterruptedException e1) {
+ break;
+ } catch (Exception e) {
+ }
+ }
+ }
+
+ /**
+ * Helper method to parse the port to a number
+ *
+ * @param arg String where the argument is
+ * @return The port number when the correct syntax was used,
+ * otherwise will return {@link #DEFAULT_REDIS_SERVER_PORT}
+ */
+ private static int getPort(String arg) {
+ int port = DEFAULT_REDIS_SERVER_PORT;
+ if (arg != null && arg.length() > 6) {
+ if (arg.startsWith("-port")) {
+ String p = arg.substring(arg.indexOf('=') + 1);
+ p = p.trim();
+ try {
+ port = Integer.parseInt(p);
+ } catch (NumberFormatException e) {
+ System.out.println("Unable to parse port, using default port");
+ }
+ }
+ }
+ return port;
+ }
+
+ /**
+ * Helper method to parse bind address
+ *
+ * @param arg String holding bind address
+ * @return Bind address
+ */
+ private static String getBindAddress(String arg) {
+ String address = null;
+ if (arg != null && arg.length() > 14) {
+ if (arg.startsWith("-bind-address")) {
+ String p = arg.substring(arg.indexOf('=') + 1);
+ address = p.trim();
+ }
+ }
+ return address;
+ }
+
+ /**
+ * Helper method to parse log level
+ *
+ * @param arg String holding log level
+ * @return Log level
+ */
+ private static String getLogLevel(String arg) {
+ String logLevel = null;
+ if (arg != null && arg.length() > 11) {
+ if (arg.startsWith("-log-level")) {
+ String p = arg.substring(arg.indexOf('=') + 1);
+ logLevel = p.trim();
+ }
+ }
+ return logLevel;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/test/java/com/gemstone/gemfire/cache/CacheRegionClearStatsDUnitTest.java
----------------------------------------------------------------------
diff --cc geode-core/src/test/java/com/gemstone/gemfire/cache/CacheRegionClearStatsDUnitTest.java
index 7ea3565,0000000..88a32a3
mode 100755,000000..100755
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache/CacheRegionClearStatsDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/CacheRegionClearStatsDUnitTest.java
@@@ -1,248 -1,0 +1,240 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache;
+
+import java.util.Properties;
+
+import com.gemstone.gemfire.cache.client.PoolManager;
+import com.gemstone.gemfire.cache.client.internal.PoolImpl;
+import com.gemstone.gemfire.cache.server.CacheServer;
+import com.gemstone.gemfire.distributed.DistributedSystem;
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.internal.AvailablePort;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.test.dunit.Assert;
+import com.gemstone.gemfire.test.dunit.DistributedTestCase;
+import com.gemstone.gemfire.test.dunit.Host;
+import com.gemstone.gemfire.test.dunit.NetworkUtils;
+import com.gemstone.gemfire.test.dunit.VM;
+/**
+ * verifies the count of clear operation
+ *
+ * @author aingle
+ */
+public class CacheRegionClearStatsDUnitTest extends DistributedTestCase {
+ /** the cache */
+ private static GemFireCacheImpl cache = null;
+
- private static VM server1 = null;
++ private VM server1 = null;
+
+ private static VM client1 = null;
+
+ /** name of the test region */
+ private static final String REGION_NAME = "CacheRegionClearStatsDUnitTest_Region";
+
+ private static final String k1 = "k1";
+
+ private static final String k2 = "k2";
+
+ private static final String client_k1 = "client-k1";
+
+ private static final String client_k2 = "client-k2";
+
+ private static final int clearOp = 2;
+
+ /** constructor */
+ public CacheRegionClearStatsDUnitTest(String name) {
+ super(name);
+ }
+
+ public void setUp() throws Exception {
+ super.setUp();
+ final Host host = Host.getHost(0);
+ server1 = host.getVM(0);
+ client1 = host.getVM(1);
+ }
+
+ private void createCache(Properties props) throws Exception {
+ DistributedSystem ds = getSystem(props);
+ ds.disconnect();
+ ds = getSystem(props);
+ assertNotNull(ds);
+ cache = (GemFireCacheImpl)CacheFactory.create(ds);
+ assertNotNull(cache);
+ }
+
+ public static void createClientCache(String host, Integer port1)
+ throws Exception {
+ new CacheRegionClearStatsDUnitTest("temp");
+ Properties props = new Properties();
+ props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
+ props.setProperty(DistributionConfig.LOCATORS_NAME, "");
+ new CacheRegionClearStatsDUnitTest("temp").createCache(props);
+ PoolImpl p = (PoolImpl)PoolManager.createFactory().addServer(host,
+ port1.intValue()).setSubscriptionEnabled(false)
+ .setThreadLocalConnections(true).setMinConnections(1).setReadTimeout(
+ 20000).setPingInterval(10000).setRetryAttempts(1)
+ .create("CacheRegionClearStatsDUnitTest");
+
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.DISTRIBUTED_ACK);
+ factory.setPoolName(p.getName());
+
+ RegionAttributes attrs = factory.create();
+ Region region = cache.createRegion(REGION_NAME, attrs);
+ //region.registerInterest("ALL_KEYS");
+ }
+
+ public static Integer createServerCacheDisk() throws Exception {
+ return createCache(DataPolicy.PERSISTENT_REPLICATE);
+ }
+
+ private static Integer createCache(DataPolicy dataPolicy) throws Exception {
+ new CacheRegionClearStatsDUnitTest("temp").createCache(new Properties());
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.DISTRIBUTED_ACK);
+ factory.setDataPolicy(dataPolicy);
+ RegionAttributes attrs = factory.create();
+ cache.createRegion(REGION_NAME, attrs);
+ int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+ CacheServer server1 = cache.addCacheServer();
+ server1.setPort(port);
+ server1.setNotifyBySubscription(true);
+ server1.start();
+ return new Integer(server1.getPort());
+ }
+
+ public static Integer createServerCache() throws Exception {
+ return createCache(DataPolicy.REPLICATE);
+ }
+
+ public static void createClientCacheDisk(String host, Integer port1)
+ throws Exception {
+ new CacheRegionClearStatsDUnitTest("temp");
+ Properties props = new Properties();
+ props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
+ props.setProperty(DistributionConfig.LOCATORS_NAME, "");
+ new CacheRegionClearStatsDUnitTest("temp").createCache(props);
+ PoolImpl p = (PoolImpl)PoolManager.createFactory().addServer(host,
+ port1.intValue()).setSubscriptionEnabled(false)
+ .setThreadLocalConnections(true).setMinConnections(1).setReadTimeout(
+ 20000).setPingInterval(10000).setRetryAttempts(1).create(
+ "CacheRegionClearStatsDUnitTest");
+
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.DISTRIBUTED_ACK);
+ factory.setPoolName(p.getName());
+ factory.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
+ RegionAttributes attrs = factory.create();
+ Region region = cache.createRegion(REGION_NAME, attrs);
+ //region.registerInterest("ALL_KEYS");
+ }
+ /**
+ * This test does the following (<b> clear stats counter </b>):<br>
+ * 1)Verifies that clear operation count matches with stats count<br>
+ */
+ public void testClearStatsWithNormalRegion(){
- Integer port1 = ((Integer)server1.invoke(
- CacheRegionClearStatsDUnitTest.class, "createServerCache"));
++ Integer port1 = ((Integer)server1.invoke(() -> CacheRegionClearStatsDUnitTest.createServerCache()));
+
- client1.invoke(CacheRegionClearStatsDUnitTest.class,
- "createClientCache", new Object[] {
- NetworkUtils.getServerHostName(server1.getHost()), port1 });
- client1.invoke(CacheRegionClearStatsDUnitTest.class, "put");
++ client1.invoke(() -> CacheRegionClearStatsDUnitTest.createClientCache(
++ NetworkUtils.getServerHostName(server1.getHost()), port1 ));
++ client1.invoke(() -> CacheRegionClearStatsDUnitTest.put());
+
+ try{
+ Thread.sleep(10000);
+ }catch(Exception e){
+ // sleep
+ }
+
- client1.invoke(CacheRegionClearStatsDUnitTest.class,
- "validationClearStat");
++ client1.invoke(() -> CacheRegionClearStatsDUnitTest.validationClearStat());
+
- server1.invoke(CacheRegionClearStatsDUnitTest.class,
- "validationClearStat");
++ server1.invoke(() -> CacheRegionClearStatsDUnitTest.validationClearStat());
+ }
+ /**
+ * This test does the following (<b> clear stats counter when disk involved </b>):<br>
+ * 1)Verifies that clear operation count matches with stats count <br>
+ */
+ public void testClearStatsWithDiskRegion(){
- Integer port1 = ((Integer)server1.invoke(
- CacheRegionClearStatsDUnitTest.class, "createServerCacheDisk"));
++ Integer port1 = ((Integer)server1.invoke(() -> CacheRegionClearStatsDUnitTest.createServerCacheDisk()));
+
- client1.invoke(CacheRegionClearStatsDUnitTest.class,
- "createClientCacheDisk", new Object[] {
- NetworkUtils.getServerHostName(server1.getHost()), port1 });
- client1.invoke(CacheRegionClearStatsDUnitTest.class, "put");
++ client1.invoke(() -> CacheRegionClearStatsDUnitTest.createClientCacheDisk(
++ NetworkUtils.getServerHostName(server1.getHost()), port1 ));
++ client1.invoke(() -> CacheRegionClearStatsDUnitTest.put());
+
+ try{
+ Thread.sleep(10000);
+ }catch(Exception e){
+ // sleep
+ }
+
- client1.invoke(CacheRegionClearStatsDUnitTest.class,
- "validationClearStat");
++ client1.invoke(() -> CacheRegionClearStatsDUnitTest.validationClearStat());
+
- server1.invoke(CacheRegionClearStatsDUnitTest.class,
- "validationClearStat");
++ server1.invoke(() -> CacheRegionClearStatsDUnitTest.validationClearStat());
+ }
+
+ @Override
+ protected final void preTearDown() throws Exception {
- client1.invoke(CacheRegionClearStatsDUnitTest.class, "closeCache");
++ client1.invoke(() -> CacheRegionClearStatsDUnitTest.closeCache());
+ // then close the servers
- server1.invoke(CacheRegionClearStatsDUnitTest.class, "closeCache");
++ server1.invoke(() -> CacheRegionClearStatsDUnitTest.closeCache());
+ }
+
+ public static void closeCache() {
+ if (cache != null && !cache.isClosed()) {
+ cache.close();
+ cache.getDistributedSystem().disconnect();
+ }
+ }
+
+ public static void put() {
+ try {
+ Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME);
+ assertNotNull(r1);
+
+ r1.put(k1, client_k1);
+ assertEquals(r1.getEntry(k1).getValue(), client_k1);
+ r1.put(k2, client_k2);
+ assertEquals(r1.getEntry(k2).getValue(), client_k2);
+ try{
+ Thread.sleep(10000);
+ }catch(Exception e){
+ // sleep
+ }
+ r1.clear();
+
+ r1.put(k1, client_k1);
+ assertEquals(r1.getEntry(k1).getValue(), client_k1);
+ r1.put(k2, client_k2);
+ assertEquals(r1.getEntry(k2).getValue(), client_k2);
+ try{
+ Thread.sleep(10000);
+ }catch(Exception e){
+ // sleep
+ }
+ r1.clear();
+ }
+ catch (Exception ex) {
+ Assert.fail("failed while put", ex);
+ }
+ }
+
+ public static void validationClearStat(){
+ assertEquals(cache.getCachePerfStats().getClearCount(), clearOp);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/test/java/com/gemstone/gemfire/cache/ClientServerTimeSyncDUnitTest.java
----------------------------------------------------------------------
diff --cc geode-core/src/test/java/com/gemstone/gemfire/cache/ClientServerTimeSyncDUnitTest.java
index 8166318,0000000..8b04ab5
mode 100755,000000..100755
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache/ClientServerTimeSyncDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/ClientServerTimeSyncDUnitTest.java
@@@ -1,203 -1,0 +1,203 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.junit.Ignore;
+
+import com.gemstone.gemfire.cache.client.ClientCache;
+import com.gemstone.gemfire.cache.client.ClientCacheFactory;
+import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
+import com.gemstone.gemfire.cache.server.CacheServer;
+import com.gemstone.gemfire.cache30.CacheTestCase;
+import com.gemstone.gemfire.distributed.internal.DSClock;
+import com.gemstone.gemfire.internal.AvailablePortHelper;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.test.dunit.Assert;
+import com.gemstone.gemfire.test.dunit.Host;
+import com.gemstone.gemfire.test.dunit.LogWriterUtils;
+import com.gemstone.gemfire.test.dunit.NetworkUtils;
+import com.gemstone.gemfire.test.dunit.SerializableCallable;
+import com.gemstone.gemfire.test.dunit.VM;
+import com.gemstone.gemfire.test.dunit.Wait;
+import com.gemstone.gemfire.test.dunit.WaitCriterion;
+
+public class ClientServerTimeSyncDUnitTest extends CacheTestCase {
+
+ public ClientServerTimeSyncDUnitTest(String name) {
+ super(name);
+ }
+
+ @Ignore("Bug 52327")
+ public void DISABLED_testClientTimeAdvances() {
+ Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0); // Server
+ VM vm1 = host.getVM(1); // Client
+
+ final String regionName = "testRegion";
+ final long TEST_OFFSET = 10000;
+
+ ClientCache cache = null;
+
+ try {
+
+ final int serverPort = (Integer)vm0.invoke(new SerializableCallable("Start server with a region") {
+
+ @Override
+ public Object call() {
+ Cache cache = getCache();
+ cache.createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
+ LogWriterUtils.getLogWriter().info("Done creating region, now creating CacheServer");
+ CacheServer server = null;
+ try {
+ server = cache.addCacheServer();
+ server.setPort(AvailablePortHelper.getRandomAvailableTCPPort());
+ server.start();
+ } catch (IOException e) {
+ Assert.fail("Starting cache server failed.", e);
+ }
+
+ // now set an artificial time offset for the test
+ system.getClock().setCacheTimeOffset(null, TEST_OFFSET, true);
+
+ LogWriterUtils.getLogWriter().info("Done creating and starting CacheServer on port " + server.getPort());
+ return server.getPort();
+ }
+ });
+
+ final String hostName = NetworkUtils.getServerHostName(vm0.getHost());
+
+ // Start client with proxy region and register interest
+
+ disconnectFromDS();
+ Properties props = new Properties();
+ props.setProperty("locators", "");
+ props = getSystem(props).getProperties();
+ cache = new ClientCacheFactory(props).setPoolSubscriptionEnabled(true)
+ .addPoolServer(hostName, serverPort)
+ .setPoolPingInterval(5000)
+ .create();
+ Region proxyRegion = cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create(regionName);
+
+ proxyRegion.registerInterestRegex(".*");
+
+ proxyRegion.put("testkey", "testValue1");
+
+ final DSClock clock = ((GemFireCacheImpl)cache).getSystem().getClock();
+ WaitCriterion wc = new WaitCriterion() {
+ public boolean done() {
+ long clientTimeOffset = clock.getCacheTimeOffset();
+ LogWriterUtils.getLogWriter().info("Client node's new time offset is: " + clientTimeOffset);
+ return clientTimeOffset >= TEST_OFFSET;
+ }
+ public String description() {
+ return "Waiting for cacheTimeOffset to be non-zero. PingOp should have set it to something";
+ }
+ };
+ Wait.waitForCriterion(wc, 60000, 1000, true);
+ } finally {
+ cache.close();
- vm1.invoke(CacheTestCase.class, "disconnectFromDS");
++ vm1.invoke(() -> CacheTestCase.disconnectFromDS());
+ }
+ }
+
+ public void testNothing() {
+ // place-holder to keep dunit runner from barfing
+ }
+
+ @Ignore("not yet implemented")
+ public void DISABLED_testClientTimeSlowsDown() {
+ Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0); // Server
+ VM vm1 = host.getVM(1); // Client
+
+ final String regionName = "testRegion";
+ final long TEST_OFFSET = 10000;
+
+ ClientCache cache = null;
+
+ try {
+
+ final int serverPort = (Integer)vm0.invoke(new SerializableCallable("Start server with a region") {
+
+ @Override
+ public Object call() {
+ Cache cache = getCache();
+ cache.createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
+ LogWriterUtils.getLogWriter().info("Done creating region, now creating CacheServer");
+ CacheServer server = null;
+ try {
+ server = cache.addCacheServer();
+ server.setPort(AvailablePortHelper.getRandomAvailableTCPPort());
+ server.start();
+ } catch (IOException e) {
+ Assert.fail("Starting cache server failed.", e);
+ }
+
+ // now set an artificial time offset for the test
+ system.getClock().setCacheTimeOffset(null, -TEST_OFFSET, true);
+
+ LogWriterUtils.getLogWriter().info("Done creating and starting CacheServer on port " + server.getPort());
+ return server.getPort();
+ }
+ });
+
+ Wait.pause((int)TEST_OFFSET); // let cacheTimeMillis consume the time offset
+
+ final String hostName = NetworkUtils.getServerHostName(vm0.getHost());
+
+ // Start client with proxy region and register interest
+
+ disconnectFromDS();
+ Properties props = new Properties();
+ props.setProperty("locators", "");
+ props = getSystem(props).getProperties();
+ cache = new ClientCacheFactory(props).setPoolSubscriptionEnabled(true)
+ .addPoolServer(hostName, serverPort)
+ .setPoolPingInterval(5000)
+ .create();
+ Region proxyRegion = cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create(regionName);
+
+ proxyRegion.registerInterestRegex(".*");
+
+ proxyRegion.put("testkey", "testValue1");
+
+ final DSClock clock = ((GemFireCacheImpl)cache).getSystem().getClock();
+ WaitCriterion wc = new WaitCriterion() {
+ public boolean done() {
+ long clientTimeOffset = clock.getCacheTimeOffset();
+ LogWriterUtils.getLogWriter().info("Client node's new time offset is: " + clientTimeOffset);
+ if (clientTimeOffset >= 0) {
+ return false;
+ }
+ long cacheTime = clock.cacheTimeMillis();
+ return Math.abs(System.currentTimeMillis() - (cacheTime - clientTimeOffset)) < 5;
+ }
+ public String description() {
+ return "Waiting for cacheTimeOffset to be negative and cacheTimeMillis to stabilize";
+ }
+ };
+ Wait.waitForCriterion(wc, 60000, 1000, true);
+ } finally {
+ cache.close();
- vm1.invoke(CacheTestCase.class, "disconnectFromDS");
++ vm1.invoke(() -> CacheTestCase.disconnectFromDS());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/AutoConnectionSourceDUnitTest.java
----------------------------------------------------------------------
diff --cc geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/AutoConnectionSourceDUnitTest.java
index b0f3b59,0000000..03368de
mode 100644,000000..100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/AutoConnectionSourceDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/AutoConnectionSourceDUnitTest.java
@@@ -1,598 -1,0 +1,598 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.client.NoAvailableLocatorsException;
+import com.gemstone.gemfire.cache.client.NoAvailableServersException;
+import com.gemstone.gemfire.cache.client.PoolManager;
+import com.gemstone.gemfire.cache.server.CacheServer;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.AvailablePort;
+import com.gemstone.gemfire.internal.AvailablePortHelper;
+import com.gemstone.gemfire.management.membership.ClientMembership;
+import com.gemstone.gemfire.management.membership.ClientMembershipEvent;
+import com.gemstone.gemfire.management.membership.ClientMembershipListenerAdapter;
+import com.gemstone.gemfire.test.dunit.Host;
+import com.gemstone.gemfire.test.dunit.IgnoredException;
+import com.gemstone.gemfire.test.dunit.NetworkUtils;
+import com.gemstone.gemfire.test.dunit.SerializableCallable;
+import com.gemstone.gemfire.test.dunit.SerializableRunnable;
+import com.gemstone.gemfire.test.dunit.VM;
+import com.gemstone.gemfire.test.dunit.Wait;
+
+/**
+ * Tests cases that are particular for the auto connection source
+ * - dynamically discovering servers, locators, handling
+ * locator disappearance, etc.
+ * @author dsmith
+ *
+ */
+public class AutoConnectionSourceDUnitTest extends LocatorTestBase {
+
+ protected static final Object BRIDGE_LISTENER = "BRIDGE_LISTENER";
+ private static final long MAX_WAIT = 60000;
+
+ public void setUp() throws Exception {
+ super.setUp();
+ IgnoredException.addIgnoredException("NoAvailableLocatorsException");
+ }
+
+ public AutoConnectionSourceDUnitTest(String name) {
+ super(name);
+ }
+
+ public void testDiscoverBridgeServers() throws Exception {
+ final Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+ VM vm2 = host.getVM(2);
+
+ int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+ startLocatorInVM(vm0, locatorPort, "");
+
+ String locators = NetworkUtils.getServerHostName(vm0.getHost())+ "[" + locatorPort + "]";
+
+ startBridgeServerInVM(vm1, null, locators);
+
+ startBridgeClientInVM(vm2, null, NetworkUtils.getServerHostName(vm0.getHost()), locatorPort);
+
+ putAndWaitForSuccess(vm2, REGION_NAME, "key", "value");
+
+ Assert.assertEquals("value", getInVM(vm1, "key"));
+ }
+
+ public void testNoLocators() {
+
+ final Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+
+ try {
+ startBridgeClientInVM(vm0, null, NetworkUtils.getServerHostName(vm0.getHost()), AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET));
+ putInVM(vm0, "key", "value");
+ fail("Client cache should not have been able to start");
+ } catch(Exception e) {
+ //expected an exception
+ }
+ }
+
+ public void testNoBridgeServer() {
+ final Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+
+ int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+ startLocatorInVM(vm0, locatorPort, "");
+ try {
+ startBridgeClientInVM(vm1, null, NetworkUtils.getServerHostName(vm0.getHost()), locatorPort);
+ putInVM(vm0, "key", "value");
+ fail("Client cache should not have been able to start");
+ } catch(Exception e) {
+ //expected an exception
+ }
+ }
+
+ public void testDynamicallyFindBridgeServer() throws Exception {
+ final Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+ VM vm2 = host.getVM(2);
+ VM vm3 = host.getVM(3);
+
+ int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+ startLocatorInVM(vm0, locatorPort, "");
+
+ String locators = NetworkUtils.getServerHostName(vm0.getHost()) + "[" + locatorPort + "]";
+
+ startBridgeServerInVM(vm1, null, locators);
+
+ startBridgeClientInVM(vm2, null, NetworkUtils.getServerHostName(vm0.getHost()), locatorPort);
+
+ putAndWaitForSuccess(vm2, REGION_NAME, "key", "value");
+
+ startBridgeServerInVM(vm3, null, locators);
+
+ stopBridgeMemberVM(vm1);
+
+ putAndWaitForSuccess(vm2, REGION_NAME, "key2", "value2");
+
+ Assert.assertEquals("value2", getInVM(vm3, "key2"));
+ }
+
+ public void testDynamicallyFindLocators() throws Exception {
+ final Host host = Host.getHost(0);
+ final String hostName = NetworkUtils.getServerHostName(host);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+ VM vm2 = host.getVM(2);
+ VM vm3 = host.getVM(3);
+
+ int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(3);
+
+ final int locatorPort0 = ports[0];
+ final int locatorPort1 = ports[1];
+ final int locatorPort3 = ports[2];
+ String locators = getLocatorString(host, new int[] { locatorPort0, locatorPort1, locatorPort3});
+ startLocatorInVM(vm0, locatorPort0, locators);
+
+ startLocatorInVM(vm1, locatorPort1, locators);
+ startBridgeClientInVM(vm2, null, NetworkUtils.getServerHostName(vm0.getHost()), locatorPort0);
+
+ InetSocketAddress locatorToWaitFor= new InetSocketAddress(hostName, locatorPort1);
+ waitForLocatorDiscovery(vm2, locatorToWaitFor);
+
+ stopLocatorInVM(vm0);
+ startBridgeServerInVM(vm0, null, locators);
+
+ putAndWaitForSuccess(vm2, REGION_NAME, "key", "value");
+ Assert.assertEquals("value", getInVM(vm0, "key"));
+
+ startLocatorInVM(vm3, locatorPort3, locators);
+ stopBridgeMemberVM(vm0);
+ locatorToWaitFor= new InetSocketAddress(hostName, locatorPort3);
+ waitForLocatorDiscovery(vm2, locatorToWaitFor);
+ stopLocatorInVM(vm1);
+ startBridgeServerInVM(vm1, null, locators);
+ putAndWaitForSuccess(vm2, REGION_NAME, "key2", "value2");
+ Assert.assertEquals("value2", getInVM(vm1, "key2"));
+
+ }
+
+ public void testEmbeddedLocator() throws Exception {
+ final Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+ VM vm2 = host.getVM(2);
+ VM vm3 = host.getVM(3);
+
+ int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+
+ String locators = NetworkUtils.getServerHostName(vm0.getHost()) + "[" + locatorPort + "]";
+
+ startBridgeServerWithEmbeddedLocator(vm0, null, locators, new String[] {REGION_NAME}, CacheServer.DEFAULT_LOAD_PROBE);
+
+ startBridgeClientInVM(vm2, null, NetworkUtils.getServerHostName(vm0.getHost()), locatorPort);
+
+ putAndWaitForSuccess(vm2, REGION_NAME, "key", "value");
+
+ Assert.assertEquals("value", getInVM(vm2, "key"));
+ }
+
+ private void waitForLocatorDiscovery(VM vm,
+ final InetSocketAddress locatorToWaitFor) {
+ vm.invoke(new SerializableCallable() {
+ public Object call() throws InterruptedException {
+ MyLocatorCallback callback = (MyLocatorCallback) remoteObjects.get(CALLBACK_KEY);
+
+ boolean discovered = callback.waitForDiscovery(locatorToWaitFor, MAX_WAIT);
+ Assert.assertTrue("Waited " + MAX_WAIT + " for " + locatorToWaitFor
+ + " to be discovered on client. List is now: "
+ + callback.getDiscovered(), discovered);
+ return null;
+ }
+ });
+ }
+
+ public void testServerGroups() throws Exception {
+ final Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+ VM vm2 = host.getVM(2);
+ VM vm3 = host.getVM(3);
+
+ int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+ startLocatorInVM(vm0, locatorPort, "");
+
+ String locators = NetworkUtils.getServerHostName(vm0.getHost()) + "[" + locatorPort + "]";
+
+ startBridgeServerInVM(vm1, new String[] {"group1", "group2"} , locators, new String[] {"A", "B"});
+ startBridgeServerInVM(vm2, new String[] {"group2", "group3"}, locators, new String[] {"B", "C"});
+
+
+ startBridgeClientInVM(vm3, "group1", NetworkUtils.getServerHostName(vm0.getHost()), locatorPort, new String [] {"A", "B", "C"});
+ putAndWaitForSuccess(vm3, "A", "key", "value");
+ Assert.assertEquals("value", getInVM(vm1, "A", "key"));
+ try {
+ putInVM(vm3, "C", "key2", "value2");
+ fail("Should not have been able to find Region C on the server");
+ } catch(Exception expected) {}
+
+ stopBridgeMemberVM(vm3);
+
+ startBridgeClientInVM(vm3, "group3", NetworkUtils.getServerHostName(vm0.getHost()), locatorPort, new String [] {"A", "B", "C"});
+ try {
+ putInVM(vm3, "A", "key3", "value");
+ fail("Should not have been able to find Region A on the server");
+ } catch(Exception expected) {}
+ putInVM(vm3, "C", "key4", "value");
+ Assert.assertEquals("value", getInVM(vm2, "C", "key4"));
+
+ stopBridgeMemberVM(vm3);
+
+ startBridgeClientInVM(vm3, "group2", NetworkUtils.getServerHostName(vm0.getHost()), locatorPort, new String [] {"A", "B", "C"});
+ putInVM(vm3, "B", "key5", "value");
+ Assert.assertEquals("value", getInVM(vm1, "B", "key5"));
+ Assert.assertEquals("value", getInVM(vm2, "B", "key5"));
+
+ stopBridgeMemberVM(vm1);
+ putInVM(vm3, "B", "key6", "value");
+ Assert.assertEquals("value", getInVM(vm2, "B", "key6"));
+ startBridgeServerInVM(vm1, new String[] {"group1", "group2"} , locators, new String[] {"A", "B"});
+ stopBridgeMemberVM(vm2);
+
+ putInVM(vm3, "B", "key7", "value");
+ Assert.assertEquals("value", getInVM(vm1, "B", "key7"));
+ }
+
- public void testTwoServersInSameVM() {
++ public void testTwoServersInSameVM() throws Exception {
+ final Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+ VM vm2 = host.getVM(2);
+// VM vm3 = host.getVM(3);
+
+ int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+
+ startLocatorInVM(vm0, locatorPort, "");
+
+ final String locators = NetworkUtils.getServerHostName(vm0.getHost()) + "[" + locatorPort + "]";
+
+ final int serverPort1 =startBridgeServerInVM(vm1, new String[] {"group1"}, locators);
+ final int serverPort2 =addCacheServerInVM(vm1, new String[] {"group2"});
+
+ startBridgeClientInVM(vm2, "group2", NetworkUtils.getServerHostName(vm0.getHost()), locatorPort);
+
+ checkEndpoints(vm2, new int[] {serverPort2});
+
+ stopBridgeMemberVM(vm2);
+
+ startBridgeClientInVM(vm2, "group1", NetworkUtils.getServerHostName(vm0.getHost()), locatorPort);
+
+ checkEndpoints(vm2, new int[] {serverPort1});
+ }
+
+ public void testClientMembershipListener() throws Exception {
+ final Host host = Host.getHost(0);
+ VM locatorVM = host.getVM(0);
+ VM bridge1VM = host.getVM(1);
+ VM bridge2VM = host.getVM(2);
+ VM clientVM = host.getVM(3);
+ int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+ startLocatorInVM(locatorVM, locatorPort, "");
+ String locators = NetworkUtils.getServerHostName(locatorVM.getHost()) + "[" + locatorPort + "]";
+
+ //start a bridge server with a listener
+ addBridgeListener(bridge1VM);
+ int serverPort1 = startBridgeServerInVM(bridge1VM, null, locators);
+
+ //start a bridge client with a listener
+ addBridgeListener(clientVM);
+ startBridgeClientInVM(clientVM, null, NetworkUtils.getServerHostName(locatorVM.getHost()), locatorPort);
+ // wait for client to connect
+ checkEndpoints(clientVM, new int[] {serverPort1});
+
+ //make sure the client and bridge server both noticed each other
+ waitForJoin(bridge1VM);
+ MyListener serverListener = getBridgeListener(bridge1VM);
+ Assert.assertEquals(0, serverListener.getCrashes());
+ Assert.assertEquals(0, serverListener.getDepartures());
+ Assert.assertEquals(1, serverListener.getJoins());
+ resetBridgeListener(bridge1VM);
+
+ waitForJoin(clientVM);
+ MyListener clientListener= getBridgeListener(clientVM);
+ Assert.assertEquals(0, clientListener.getCrashes());
+ Assert.assertEquals(0, clientListener.getDepartures());
+ Assert.assertEquals(1, clientListener.getJoins());
+ resetBridgeListener(clientVM);
+
+ checkEndpoints(clientVM, new int[] {serverPort1});
+
+ //start another bridge server and make sure it is detected by the client
+ int serverPort2 = startBridgeServerInVM(bridge2VM, null, locators);
+
+ checkEndpoints(clientVM, new int[] {serverPort1, serverPort2});
+ serverListener = getBridgeListener(bridge1VM);
+ Assert.assertEquals(0, serverListener.getCrashes());
+ Assert.assertEquals(0, serverListener.getDepartures());
+ Assert.assertEquals(0, serverListener.getJoins());
+ resetBridgeListener(bridge1VM);
+ waitForJoin(clientVM);
+ clientListener= getBridgeListener(clientVM);
+ Assert.assertEquals(0, clientListener.getCrashes());
+ Assert.assertEquals(0, clientListener.getDepartures());
+ Assert.assertEquals(1, clientListener.getJoins());
+ resetBridgeListener(clientVM);
+
+ //stop the second bridge server and make sure it is detected by the client
+ stopBridgeMemberVM(bridge2VM);
+
+ checkEndpoints(clientVM, new int[] {serverPort1});
+ serverListener = getBridgeListener(bridge1VM);
+ Assert.assertEquals(0, serverListener.getCrashes());
+ Assert.assertEquals(0, serverListener.getDepartures());
+ Assert.assertEquals(0, serverListener.getJoins());
+ resetBridgeListener(bridge1VM);
+ waitForCrash(clientVM);
+ clientListener= getBridgeListener(clientVM);
+ Assert.assertEquals(1, clientListener.getCrashes());
+ Assert.assertEquals(0, clientListener.getDepartures());
+ Assert.assertEquals(0, clientListener.getJoins());
+ resetBridgeListener(clientVM);
+
+ //stop the client and make sure the bridge server notices
+ stopBridgeMemberVM(clientVM);
+ waitForDeparture(bridge1VM);
+ serverListener = getBridgeListener(bridge1VM);
+ Assert.assertEquals(0, serverListener.getCrashes());
+ Assert.assertEquals(1, serverListener.getDepartures());
+ Assert.assertEquals(0, serverListener.getJoins());
+ }
+
+ protected Object getInVM(VM vm, final Serializable key) {
+ return getInVM(vm, REGION_NAME, key);
+ }
+
+ protected Object getInVM(VM vm, final String regionName, final Serializable key) {
+ return vm.invoke(new SerializableCallable("Get in VM") {
+ public Object call() throws Exception {
+ Cache cache = (Cache) remoteObjects.get(CACHE_KEY);
+ Region region = cache.getRegion(regionName);
+ return region.get(key);
+ }
+ });
+ }
+
+ protected void putAndWaitForSuccess(VM vm, final String regionName, final Serializable key, final Serializable value) throws InterruptedException
+ {
+ long endTime = System.currentTimeMillis() + MAX_WAIT;
+ long remaining = MAX_WAIT;
+ int i = 0;
+ while(true) {
+ try {
+ System.err.println("Attempt: " + (i++));
+ putInVM(vm, regionName, key, value);
+ break;
+ } catch (NoAvailableLocatorsException | com.gemstone.gemfire.test.dunit.RMIException e) {
+ if( !(e instanceof NoAvailableLocatorsException)
+ && !(e.getCause() instanceof NoAvailableServersException)) {
+ throw e;
+ }
+ if(remaining <= 0) {
+ throw e;
+ }
+ Wait.pause(100);
+ remaining = endTime - System.currentTimeMillis();
+ }
+ }
+ }
+
+ protected void putInVM(VM vm, final Serializable key, final Serializable value) {
+ putInVM(vm, REGION_NAME, key, value);
+ }
+
+
+
+ protected void putInVM(VM vm, final String regionName, final Serializable key, final Serializable value) {
+ vm.invoke(new SerializableCallable("Put in VM") {
+ public Object call() throws Exception {
+ Cache cache = (Cache) remoteObjects.get(CACHE_KEY);
+ Region region = cache.getRegion(regionName);
+ return region.put(key, value);
+ }
+ });
+ }
+
+ /**
+ * Assert that there is one endpoint with the given host in port
+ * on the client vm.
+ * @param vm - the vm the client is running in
+ * @param expectedPorts - The server ports we expect the client to be connected to.
+ */
+ protected void checkEndpoints(VM vm, final int[] expectedPorts) {
+ vm.invoke(new SerializableRunnable("Check endpoint") {
+ public void run() {
+ PoolImpl pool = (PoolImpl) PoolManager.find(POOL_NAME);
+ int retryCount = 50;
+ List/*<ServerLocation>*/ endpoints;
+ HashSet actualEndpointPorts;
+ HashSet expectedEndpointPorts = new HashSet();
+ for(int i = 0; i < expectedPorts.length; i++) {
+ expectedEndpointPorts.add(new Integer(expectedPorts[i]));
+ }
+ do {
+ endpoints = pool.getCurrentServers();
+ actualEndpointPorts = new HashSet();
+ for(Iterator itr = endpoints.iterator(); itr.hasNext();) {
+ ServerLocation sl = (ServerLocation)itr.next();
+ actualEndpointPorts.add(new Integer(sl.getPort()));
+ }
+ if (expectedEndpointPorts.size() == actualEndpointPorts.size()) {
+ break;
+ } else {
+ Wait.pause(100);
+ }
+ } while(retryCount-- > 0);
+ Assert.assertEquals(expectedEndpointPorts, actualEndpointPorts);
+ }
+ });
+ }
+
+ protected void addBridgeListener(VM vm) {
+ vm.invoke(new SerializableRunnable("Add membership listener") {
+ public void run() {
+ MyListener listener = new MyListener();
+ ClientMembership.registerClientMembershipListener(listener);
+ remoteObjects.put(BRIDGE_LISTENER, listener);
+ }
+ });
+ }
+
+ protected void resetBridgeListener(VM vm) {
+ vm.invoke(new SerializableRunnable("Add membership listener") {
+ public void run() {
+ MyListener listener = (MyListener) remoteObjects.get(BRIDGE_LISTENER);
+ listener.reset();
+ }
+ });
+ }
+
+ private MyListener getBridgeListener(VM vm) {
+ return (MyListener) vm.invoke(new SerializableCallable("Add membership listener") {
+ public Object call() {
+ return remoteObjects.get(BRIDGE_LISTENER);
+ }
+ });
+ }
+
+ private void waitForJoin(VM vm) {
+ vm.invoke(new SerializableRunnable() {
+ public void run() {
+ MyListener listener = (MyListener) remoteObjects.get(BRIDGE_LISTENER);
+ synchronized(listener) {
+ long end = System.currentTimeMillis() + 10000;
+ while (listener.joins == 0) {
+ try {
+ long remaining = end - System.currentTimeMillis();
+ if(remaining < 0) {
+ break;
+ }
+ listener.wait(remaining);
+ } catch (InterruptedException e) {
+ fail("interrupted");
+ }
+ }
+ }
+ }
+ });
+ }
+
+ private void waitForCrash(VM vm) {
+ vm.invoke(new SerializableRunnable() {
+ public void run() {
+ MyListener listener = (MyListener) remoteObjects.get(BRIDGE_LISTENER);
+ synchronized(listener) {
+ long end = System.currentTimeMillis() + 10000;
+ while (listener.crashes== 0) {
+ try {
+ long remaining = end - System.currentTimeMillis();
+ if(remaining < 0) {
+ break;
+ }
+ listener.wait(remaining);
+ } catch (InterruptedException e) {
+ fail("interrupted");
+ }
+ }
+ }
+ }
+ });
+ }
+
+ private void waitForDeparture(VM vm) {
+ vm.invoke(new SerializableRunnable() {
+ public void run() {
+ MyListener listener = (MyListener) remoteObjects.get(BRIDGE_LISTENER);
+ synchronized(listener) {
+ long end = System.currentTimeMillis() + 10000;
+ while (listener.departures == 0) {
+ try {
+ long remaining = end - System.currentTimeMillis();
+ if(remaining < 0) {
+ break;
+ }
+ listener.wait(remaining);
+ } catch (InterruptedException e) {
+ fail("interrupted");
+ }
+ }
+ }
+ }
+ });
+ }
+
+ public static class MyListener extends ClientMembershipListenerAdapter implements Serializable {
+ protected int crashes = 0;
+ protected int joins = 0;
+ protected int departures= 0;
+
+ @Override
+ public synchronized void memberCrashed(ClientMembershipEvent event) {
+ crashes++;
+ notifyAll();
+ }
+
+ public synchronized void reset() {
+ crashes = 0;
+ joins = 0;
+ departures = 0;
+ }
+
+ @Override
+ public synchronized void memberJoined(ClientMembershipEvent event) {
+ joins++;
+ notifyAll();
+ }
+
+ @Override
+ public synchronized void memberLeft(ClientMembershipEvent event) {
+ departures++;
+ notifyAll();
+ }
+
+ public synchronized int getCrashes() {
+ return crashes;
+ }
+
+ public synchronized int getJoins() {
+ return joins;
+ }
+
+ public synchronized int getDepartures() {
+ return departures;
+ }
+ }
+}