You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2016/02/22 22:43:08 UTC

[020/100] [abbrv] [partial] incubator-geode git commit: Merge remote-tracking branch 'origin/develop' into feature/GEODE-917

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/redis/GemFireRedisServer.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/redis/GemFireRedisServer.java
index e90b724,0000000..1f7e7a9
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/redis/GemFireRedisServer.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/redis/GemFireRedisServer.java
@@@ -1,717 -1,0 +1,717 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package com.gemstone.gemfire.redis;
 +
 +import io.netty.bootstrap.ServerBootstrap;
 +import io.netty.buffer.PooledByteBufAllocator;
 +import io.netty.channel.Channel;
 +import io.netty.channel.ChannelFuture;
 +import io.netty.channel.ChannelInitializer;
 +import io.netty.channel.ChannelOption;
 +import io.netty.channel.ChannelPipeline;
 +import io.netty.channel.EventLoopGroup;
 +import io.netty.channel.ServerChannel;
 +import io.netty.channel.nio.NioEventLoopGroup;
 +import io.netty.channel.oio.OioEventLoopGroup;
 +import io.netty.channel.socket.SocketChannel;
 +import io.netty.channel.socket.nio.NioServerSocketChannel;
 +import io.netty.channel.socket.oio.OioServerSocketChannel;
 +import io.netty.util.concurrent.Future;
 +
 +import java.io.IOException;
 +import java.net.InetAddress;
 +import java.net.InetSocketAddress;
 +import java.net.UnknownHostException;
 +import java.util.Collection;
 +import java.util.Map.Entry;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.ConcurrentMap;
 +import java.util.concurrent.Executors;
 +import java.util.concurrent.ScheduledExecutorService;
 +import java.util.concurrent.ScheduledFuture;
 +import java.util.concurrent.ThreadFactory;
 +import java.util.concurrent.atomic.AtomicInteger;
 +
 +import com.gemstone.gemfire.LogWriter;
 +import com.gemstone.gemfire.cache.Cache;
 +import com.gemstone.gemfire.cache.CacheFactory;
 +import com.gemstone.gemfire.cache.EntryEvent;
 +import com.gemstone.gemfire.cache.Region;
 +import com.gemstone.gemfire.cache.RegionDestroyedException;
 +import com.gemstone.gemfire.cache.RegionFactory;
 +import com.gemstone.gemfire.cache.RegionShortcut;
 +import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
 +import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
 +import com.gemstone.gemfire.internal.SocketCreator;
 +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
 +import com.gemstone.gemfire.internal.redis.ByteArrayWrapper;
 +import com.gemstone.gemfire.internal.redis.ByteToCommandDecoder;
 +import com.gemstone.gemfire.internal.redis.Coder;
 +import com.gemstone.gemfire.internal.redis.ExecutionHandlerContext;
 +import com.gemstone.gemfire.internal.redis.RedisDataType;
 +import com.gemstone.gemfire.internal.redis.RegionProvider;
- import com.gemstone.gemfire.internal.redis.executor.hll.HyperLogLogPlus;
++import com.gemstone.gemfire.internal.hll.HyperLogLogPlus;
 +
 +/**
 + * The GemFireRedisServer is a server that understands the Redis protocol. As
 + * commands are sent to the server, each command is picked up by a thread,
 + * interpreted and then executed and a response is sent back to the client. The
 + * default connection port is 6379 but that can be altered when run through GFSH
 + * or started through the provided static main class.
 + * <p>
 + * Each Redis data type instance is stored in a separate {@link Region} except
 + * for the Strings and HyperLogLogs which are collectively stored in one Region
 + * respectively. That Region along with a meta data region used internally are 
 + * protected so the client may not store keys with the name {@link GemFireRedisServer#REDIS_META_DATA_REGION}
 + * or {@link GemFireRedisServer#STRING_REGION}. The default Region type is 
 + * {@link RegionShortcut#PARTITION} although this can be changed by specifying the
 + * SystemProperty {@value #DEFAULT_REGION_SYS_PROP_NAME} to a type defined by {@link RegionShortcut}.
 + * If the {@link GemFireRedisServer#NUM_THREADS_SYS_PROP_NAME} system property is set to 0,
 + * one thread per client will be created. Otherwise a worker thread pool of specified size is
 + * used or a default size of 4 * {@link Runtime#availableProcessors()} if the property is not set.
 + * <p>
 + * Setting the AUTH password requires setting the property "redis-password" just as "redis-port"
 + * would be in xml or through GFSH.
 + * <p>
 + * The supported commands are as follows:
 + * <p>
 + * Supported String commands - APPEND, BITCOUNT, BITOP, BITPOS, DECR, DECRBY, 
 + * GET, GETBIT, GETRANGE, GETSET, INCR, INCRBY, INCRBYFLOAT, MGET, MSET, MSETNX,
 + * PSETEX, SET, SETBIT, SETEX, SETNX, STRLEN
 + * <p>
 + * Supported List commands - LINDEX, LLEN, LPOP, LPUSH, LPUSHX, LRANGE,
 + * LREM, LSET, LTRIM, RPOP, RPUSH, RPUSHX
 + * <p>
 + * Supported Hash commands - HDEL, HEXISTS, HGET, HGETALL, HINCRBY, HINCRBYFLOAT,
 + * HKEYS, HMGET, HMSET, HSETNX, HLEN, HSCAN, HSET, HVALS
 + * <p>
 + * Supported Set commands - SADD, SCARD, SDIFF, SDIFFSTORE, SINTER,
 + * SINTERSTORE, SISMEMBER, SMEMBERS, SMOVE, SREM, SPOP, SRANDMEMBER, 
 + * SCAN, SUNION, SUNIONSTORE
 + * <p>
 + * Supported SortedSet commands - ZADD, ZCARD, ZCOUNT, ZINCRBY, ZLEXCOUNT,
 + * ZRANGE, ZRANGEBYLEX, ZRANGEBYSCORE, ZRANK, ZREM, ZREMRANGEBYLEX,
 + * ZREMRANGEBYRANK, ZREMRANGEBYSCORE, ZREVRANGE, ZREVRANGEBYSCORE, ZREVRANK,
 + * ZSCAN, ZSCORE
 + * <p>
 + * Supported HyperLogLog commands - PFADD, PFCOUNT, PFMERGE
 + * <p>
 + * Supported Keys commands - DEL, DBSIZE, EXISTS, EXPIRE, EXPIREAT, FLUSHDB, FLUSHALL, 
 + * KEYS, PERSIST, PEXPIRE, PEXPIREAT, PTTL, SCAN, TTL
 + * <p>
 + * Supported Transaction commands - DISCARD, EXEC, MULTI
 + * <P>
 + * Supported Server commands - AUTH, ECHO, PING, TIME, QUIT
 + * <p>
 + * <p>
 + * The command executors are not explicitly documented but the functionality
 + * can be found at <a href="http://redis.io/commands">Redis Commands</a>
 + * <p>
 + * Exceptions to the Redis Commands Documents:<p>
 + * <ul>
 + * <li>Any command that removes keys and returns a count of removed
 + * entries will not return a total remove count but rather a count of how
 + * many entries have been removed that existed on the local vm, though 
 + * all entries will be removed</li>
 + * <li>Any command that returns a count of newly set members has an
 + * unspecified return value. The command will work just as the Redis protocol
 + * states but the count will not necessary reflect the number set compared
 + * to overridden.</li>
 + * <li>Transactions work just as they would on a Redis instance, they are local
 + * transaction. Transactions cannot be executed on data that is not local to the 
 + * executing server, that is on a partitioned region in a different server 
 + * instance or on a persistent region that does not have transactions enabled.
 + * Also, you cannot watch or unwatch keys as all keys within a GemFire 
 + * transaction are watched by default.</li>
 + * </ul>
 + * @author Vitaliy Gavrilov
 + *
 + */
 +
 +public class GemFireRedisServer {
 +
 +  /**
 +   * Thread used to start main method
 +   */
 +  private static Thread mainThread = null;
 +
 +  /**
 +   * The default Redis port as specified by their protocol, {@value #DEFAULT_REDIS_SERVER_PORT}
 +   */
 +  public static final int DEFAULT_REDIS_SERVER_PORT = 6379;
 +
 +  /**
 +   * The number of threads that will work on handling requests
 +   */
 +  private final int numWorkerThreads;
 +
 +  /**
 +   * The number of threads that will work socket selectors
 +   */
 +  private final int numSelectorThreads;
 +
 +  /**
 +   * The actual port being used by the server
 +   */
 +  private final int serverPort;
 +
 +  /**
 +   * The address to bind to
 +   */
 +  private final String bindAddress;
 +
 +  /**
 +   * Connection timeout in milliseconds
 +   */
 +  private static final int connectTimeoutMillis = 1000;
 +
 +  /**
 +   * Temporary constant whether to use old single thread per connection
 +   * model for worker group
 +   */
 +  private boolean singleThreadPerConnection;
 +
 +  /**
 +   * Logging level
 +   */
 +  private final String logLevel;
 +
 +  /**
 +   * The cache instance pointer on this vm
 +   */
 +  private Cache cache;
 +
 +  /**
 +   * Channel to be closed when shutting down
 +   */
 +  private Channel serverChannel;
 +
 +  /**
 +   * Gem logwriter
 +   */
 +  private LogWriter logger;
 +
 +  private RegionProvider regionCache;
 +
 +  private final MetaCacheListener metaListener;
 +
 +  private EventLoopGroup bossGroup;
 +  private EventLoopGroup workerGroup;
 +  private final static int numExpirationThreads = 1;
 +  private final ScheduledExecutorService expirationExecutor;
 +
 +  /**
 +   * Map of futures to be executed for key expirations
 +   */
 +  private final ConcurrentMap<ByteArrayWrapper, ScheduledFuture<?>> expirationFutures;
 +
 +
 +  /**
 +   * The field that defines the name of the {@link Region} which holds all of
 +   * the strings. The current value of this field is {@value #STRING_REGION}.
 +   */
 +  public static final String STRING_REGION = "__StRiNgS";
 +
 +  /**
 +   * The field that defines the name of the {@link Region} which holds all of
 +   * the HyperLogLogs. The current value of this field is {@value #HLL_REGION}.
 +   */
 +  public static final String HLL_REGION = "__HlL";
 +
 +  /**
 +   * The field that defines the name of the {@link Region} which holds all of
 +   * the Redis meta data. The current value of this field is {@value #REDIS_META_DATA_REGION}.
 +   */
 +  public static final String REDIS_META_DATA_REGION = "__ReDiS_MeTa_DaTa";
 +
 +  /**
 +   * The system property name used to set the default {@link Region} creation
 +   * type. The property name is {@value #DEFAULT_REGION_SYS_PROP_NAME} and the
 +   * acceptable values are types defined by {@link RegionShortcut}, 
 +   * i.e. "PARTITION" would be used for {@link RegionShortcut#PARTITION}.
 +   */
 +  public static final String DEFAULT_REGION_SYS_PROP_NAME = "gemfireredis.regiontype";
 +
 +  /**
 +   * System property name that can be used to set the number of threads to be
 +   * used by the GemFireRedisServer
 +   */
 +  public static final String NUM_THREADS_SYS_PROP_NAME = "gemfireredis.numthreads";
 +
 +  /**
 +   * The actual {@link RegionShortcut} type specified by the system property
 +   * {@value #DEFAULT_REGION_SYS_PROP_NAME}.
 +   */
 +  public final RegionShortcut DEFAULT_REGION_TYPE;
 +
 +  private boolean shutdown;
 +  private boolean started;
 +
 +  /**
 +   * Determine the {@link RegionShortcut} type from a String value.
 +   * If the String value doesn't map to a RegionShortcut type then 
 +   * {@link RegionShortcut#PARTITION} will be used by default.
 +   * 
 +   * @return {@link RegionShortcut}
 +   */
 +  private static RegionShortcut setRegionType() {
 +    String regionType = System.getProperty(DEFAULT_REGION_SYS_PROP_NAME, "PARTITION");
 +    RegionShortcut type;
 +    try {
 +      type = RegionShortcut.valueOf(regionType);
 +    } catch (Exception e) {
 +      type = RegionShortcut.PARTITION;
 +    }
 +    return type;
 +  }
 +
 +  /**
 +   * Helper method to set the number of worker threads
 +   * 
 +   * @return If the System property {@value #NUM_THREADS_SYS_PROP_NAME} is set then that number
 +   * is used, otherwise 4 * # of cores
 +   */
 +  private int setNumWorkerThreads() {
 +    String prop = System.getProperty(NUM_THREADS_SYS_PROP_NAME);
 +    int numCores = Runtime.getRuntime().availableProcessors();
 +    int def = 4 * numCores;
 +    if (prop == null || prop.isEmpty())
 +      return def;
 +    int threads;
 +    try {
 +      threads = Integer.parseInt(prop);
 +    } catch (NumberFormatException e) {
 +      return def;
 +    }
 +    return threads;
 +  }
 +
 +  /**
 +   * Constructor for {@link GemFireRedisServer} that will start the
 +   * server on the given port and bind to the first non-loopback address
 +   * 
 +   * @param port The port the server will bind to, will use {@value #DEFAULT_REDIS_SERVER_PORT} by default
 +   */
 +  public GemFireRedisServer(int port) {
 +    this(null, port, null);
 +  }
 +
 +  /**
 +   * Constructor for {@link GemFireRedisServer} that will start the
 +   * server and bind to the given address and port
 +   * 
 +   * @param bindAddress The address to which the server will attempt to bind to
 +   * @param port The port the server will bind to, will use {@value #DEFAULT_REDIS_SERVER_PORT} by default if argument is less than or equal to 0
 +   */
 +  public GemFireRedisServer(String bindAddress, int port) {
 +    this(bindAddress, port, null);
 +  }
 +
 +
 +  /**
 +   * Constructor for {@link GemFireRedisServer} that will start the
 +   * server and bind to the given address and port. Keep in mind that the
 +   * log level configuration will only be set if a {@link Cache} does not already
 +   * exist, if one already exists then setting that property will have no effect.
 +   * 
 +   * @param bindAddress The address to which the server will attempt to bind to
 +   * @param port The port the server will bind to, will use {@value #DEFAULT_REDIS_SERVER_PORT} by default if argument is less than or equal to 0
 +   * @param logLevel The logging level to be used by GemFire
 +   */
 +  public GemFireRedisServer(String bindAddress, int port, String logLevel) {
 +    if (port <= 0)
 +      this.serverPort = DEFAULT_REDIS_SERVER_PORT;
 +    else
 +      this.serverPort = port;
 +    this.bindAddress = bindAddress;
 +    this.logLevel = logLevel;
 +    this.numWorkerThreads = setNumWorkerThreads();
 +    if (this.numWorkerThreads == 0)
 +      this.singleThreadPerConnection = true;
 +    this.numSelectorThreads = 1;
 +    this.metaListener = new MetaCacheListener();
 +    this.expirationFutures = new ConcurrentHashMap<ByteArrayWrapper, ScheduledFuture<?>>();
 +    this.expirationExecutor = Executors.newScheduledThreadPool(numExpirationThreads, new ThreadFactory() {
 +      private final AtomicInteger counter = new AtomicInteger();
 +      @Override
 +      public Thread newThread(Runnable r) {
 +        Thread t = new Thread(r);
 +        t.setName("GemFireRedis-ScheduledExecutor-" + counter.incrementAndGet());
 +        t.setDaemon(true);
 +        return t;
 +      }
 +
 +    });
 +    this.DEFAULT_REGION_TYPE = setRegionType();
 +    this.shutdown = false;
 +    this.started = false;
 +  }
 +
 +  /**
 +   * Helper method to get the host name to bind to
 +   * 
 +   * @return The InetAddress to bind to
 +   * @throws UnknownHostException
 +   */
 +  private InetAddress getBindAddress() throws UnknownHostException {
 +    return this.bindAddress == null || this.bindAddress.isEmpty()
 +        ? SocketCreator.getLocalHost()
 +            : InetAddress.getByName(this.bindAddress);
 +  }
 +
 +  /**
 +   * This is function to call on a {@link GemFireRedisServer} instance
 +   * to start it running
 +   */
 +  public synchronized void start() {
 +    if (!started) {
 +      try {
 +        startGemFire();
 +        initializeRedis();
 +        startRedisServer();
 +      } catch (IOException e) {
 +        throw new RuntimeException("Could not start Server", e);
 +      } catch (InterruptedException e) {
 +        throw new RuntimeException("Could not start Server", e);
 +      }
 +      started = true;
 +    }
 +  }
 +
 +  /**
 +   * Initializes the {@link Cache}, and creates Redis necessities
 +   * Region and protects declares that {@link Region} to be protected. 
 +   * Also, every {@link GemFireRedisServer} will check for entries already in the 
 +   * meta data Region.
 +   */
 +  private void startGemFire() {
 +    Cache c = GemFireCacheImpl.getInstance();
 +    if (c == null) {
 +      synchronized (GemFireRedisServer.class) {
 +        c = GemFireCacheImpl.getInstance();
 +        if (c == null) {
 +          CacheFactory cacheFactory = new CacheFactory();
 +          if (logLevel != null)
 +            cacheFactory.set("log-level", logLevel);
 +          c = cacheFactory.create();
 +        }
 +      }
 +    }
 +    this.cache = c;
 +    this.logger = c.getLogger();
 +  }
 +
 +  private void initializeRedis() {
 +    synchronized (this.cache) {
 +      RegionFactory<String, RedisDataType> rfMeta = cache.createRegionFactory(RegionShortcut.REPLICATE);
 +      rfMeta.addCacheListener(this.metaListener);
 +      RegionFactory<ByteArrayWrapper, ByteArrayWrapper> rfString = cache.createRegionFactory(DEFAULT_REGION_TYPE);
 +      RegionFactory<ByteArrayWrapper, HyperLogLogPlus> rfHLL = cache.createRegionFactory(DEFAULT_REGION_TYPE);
 +      Region<ByteArrayWrapper, ByteArrayWrapper> stringsRegion;
 +      if ((stringsRegion = this.cache.getRegion(STRING_REGION)) == null)
 +        stringsRegion = rfString.create(GemFireRedisServer.STRING_REGION);
 +      Region<ByteArrayWrapper, HyperLogLogPlus> hLLRegion;
 +      if ((hLLRegion = this.cache.getRegion(HLL_REGION)) == null)
 +        hLLRegion = rfHLL.create(HLL_REGION);
 +      Region<String, RedisDataType> redisMetaData;
 +      if ((redisMetaData = this.cache.getRegion(REDIS_META_DATA_REGION)) == null)
 +        redisMetaData = rfMeta.create(REDIS_META_DATA_REGION);
 +      this.regionCache = new RegionProvider(stringsRegion, hLLRegion, redisMetaData, expirationFutures, expirationExecutor, this.DEFAULT_REGION_TYPE);
 +      redisMetaData.put(REDIS_META_DATA_REGION, RedisDataType.REDIS_PROTECTED);
 +      redisMetaData.put(HLL_REGION, RedisDataType.REDIS_PROTECTED);
 +      redisMetaData.put(STRING_REGION, RedisDataType.REDIS_PROTECTED);
 +    }
 +    checkForRegions();
 +  }
 +
 +  private void checkForRegions() {
 +    Collection<Entry<String, RedisDataType>> entrySet = this.regionCache.metaEntrySet();
 +    for (Entry<String, RedisDataType> entry: entrySet) {
 +      String regionName = entry.getKey();
 +      RedisDataType type = entry.getValue();
 +      Region<?, ?> newRegion = cache.getRegion(regionName);
 +      if (newRegion == null && type != RedisDataType.REDIS_STRING && type != RedisDataType.REDIS_HLL && type != RedisDataType.REDIS_PROTECTED) {
 +        try {
 +          this.regionCache.createRemoteRegionReferenceLocally(Coder.stringToByteArrayWrapper(regionName), type);
 +        } catch (Exception e) {
 +          if (logger.errorEnabled())
 +            logger.error(e);
 +        }
 +      }
 +    }
 +  }
 +
 +  /**
 +   * Helper method to start the server listening for connections. The
 +   * server is bound to the port specified by {@link GemFireRedisServer#serverPort}
 +   * 
 +   * @throws IOException
 +   * @throws InterruptedException
 +   */
 +  private void startRedisServer() throws IOException, InterruptedException {
 +    ThreadFactory selectorThreadFactory = new ThreadFactory() {
 +      private final AtomicInteger counter = new AtomicInteger();
 +      @Override
 +      public Thread newThread(Runnable r) {
 +        Thread t = new Thread(r);
 +        t.setName("GemFireRedisServer-SelectorThread-" + counter.incrementAndGet());
 +        t.setDaemon(true);
 +        return t;
 +      }
 +
 +    };
 +
 +    ThreadFactory workerThreadFactory = new ThreadFactory() {
 +      private final AtomicInteger counter = new AtomicInteger();
 +      @Override
 +      public Thread newThread(Runnable r) {
 +        Thread t = new Thread(r);
 +        t.setName("GemFireRedisServer-WorkerThread-" + counter.incrementAndGet());
 +        return t;
 +      }
 +
 +    };
 +
 +    bossGroup = null;
 +    workerGroup = null;
 +    Class<? extends ServerChannel> socketClass = null;
 +    if (singleThreadPerConnection) {
 +      bossGroup = new OioEventLoopGroup(Integer.MAX_VALUE, selectorThreadFactory);
 +      workerGroup = new OioEventLoopGroup(Integer.MAX_VALUE, workerThreadFactory);
 +      socketClass = OioServerSocketChannel.class;
 +    } else {
 +      bossGroup = new NioEventLoopGroup(this.numSelectorThreads, selectorThreadFactory);
 +      workerGroup = new NioEventLoopGroup(this.numWorkerThreads, workerThreadFactory);
 +      socketClass = NioServerSocketChannel.class;
 +    }
 +    InternalDistributedSystem system = (InternalDistributedSystem) cache.getDistributedSystem();
 +    String pwd = system.getConfig().getRedisPassword();
 +    final byte[] pwdB = Coder.stringToBytes(pwd);
 +    ServerBootstrap b = new ServerBootstrap();
 +    b.group(bossGroup, workerGroup)
 +    .channel(socketClass)
 +    .childHandler(new ChannelInitializer<SocketChannel>() {
 +      @Override
 +      public void initChannel(SocketChannel ch) throws Exception {
 +        if (logger.fineEnabled())
 +          logger.fine("GemFireRedisServer-Connection established with " + ch.remoteAddress());
 +        ChannelPipeline p = ch.pipeline();
 +        p.addLast(ByteToCommandDecoder.class.getSimpleName(), new ByteToCommandDecoder());
 +        p.addLast(ExecutionHandlerContext.class.getSimpleName(), new ExecutionHandlerContext(ch, cache, regionCache, GemFireRedisServer.this, pwdB));
 +      }
 +    })
 +    .option(ChannelOption.SO_REUSEADDR, true)
 +    .option(ChannelOption.SO_RCVBUF, getBufferSize())
 +    .childOption(ChannelOption.SO_KEEPALIVE, true)
 +    .childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, GemFireRedisServer.connectTimeoutMillis)
 +    .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
 +
 +    // Bind and start to accept incoming connections.
 +    ChannelFuture f = b.bind(new InetSocketAddress(getBindAddress(), serverPort)).sync();
 +    if (this.logger.infoEnabled()) {
 +      String logMessage = "GemFireRedisServer started {" + getBindAddress() + ":" + serverPort + "}, Selector threads: " + this.numSelectorThreads;
 +      if (this.singleThreadPerConnection)
 +        logMessage += ", One worker thread per connection";
 +      else
 +        logMessage += ", Worker threads: " + this.numWorkerThreads;
 +      this.logger.info(logMessage);
 +    }
 +    this.serverChannel = f.channel();
 +  }
 +
 +  /**
 +   * Takes an entry event and processes it. If the entry denotes that a
 +   * {@link RedisDataType#REDIS_LIST} or {@link RedisDataType#REDIS_SORTEDSET}
 +   * was created then this function will call the necessary calls to create the 
 +   * parameterized queries for those keys.
 +   * 
 +   * @param event EntryEvent from meta data region
 +   */
 +  private void afterKeyCreate(EntryEvent<String, RedisDataType> event) {
 +    if (event.isOriginRemote()) {
 +      final String key = (String) event.getKey();
 +      final RedisDataType value = event.getNewValue();
 +      if (value != RedisDataType.REDIS_STRING && value != RedisDataType.REDIS_HLL && value != RedisDataType.REDIS_PROTECTED) {
 +        try {
 +          this.regionCache.createRemoteRegionReferenceLocally(Coder.stringToByteArrayWrapper(key), value);
 +        } catch (RegionDestroyedException ignore) { // Region already destroyed, ignore
 +        }
 +      }
 +    }
 +  }
 +
 +  /**
 +   * When a key is removed then this function will make sure the associated
 +   * queries with the key are also removed from each vm to avoid unnecessary
 +   * data retention
 +   */
 +  private void afterKeyDestroy(EntryEvent<String, RedisDataType> event) {
 +    if (event.isOriginRemote()) {
 +      final String key = (String) event.getKey();
 +      final RedisDataType value = event.getOldValue();
 +      if (value != null && value != RedisDataType.REDIS_STRING && value != RedisDataType.REDIS_HLL && value != RedisDataType.REDIS_PROTECTED) {
 +        ByteArrayWrapper kW = Coder.stringToByteArrayWrapper(key);
 +        Region<?, ?> r = this.regionCache.getRegion(kW);
 +        if (r != null) { 
 +          this.regionCache.removeRegionReferenceLocally(kW, value);
 +        }
 +      }
 +    }
 +  }
 +
 +  private final class MetaCacheListener extends CacheListenerAdapter<String, RedisDataType> {
 +
 +    @Override
 +    public void afterCreate(EntryEvent<String, RedisDataType> event) {
 +      afterKeyCreate(event);
 +    }
 +
 +    @Override
 +    public void afterDestroy(EntryEvent<String, RedisDataType> event) {
 +      afterKeyDestroy(event);
 +    }
 +  }
 +
 +  /**
 +   * Helper method to get GemFire set socket buffer size,
 +   * possibly a default of 32k
 +   * 
 +   * @return Buffer size to use for server
 +   */
 +  private int getBufferSize() {
 +    InternalDistributedSystem system = (InternalDistributedSystem) cache.getDistributedSystem();
 +    return system.getConfig().getSocketBufferSize();
 +  }
 +
 +  /**
 +   * Shutdown method for {@link GemFireRedisServer}. This closes the {@link Cache},
 +   * interrupts all execution and forcefully closes all connections.
 +   */
 +  public synchronized void shutdown() {
 +    if (!shutdown) {
 +      if (logger.infoEnabled())
 +        logger.info("GemFireRedisServer shutting down");
 +      ChannelFuture closeFuture = this.serverChannel.closeFuture();
 +      Future<?> c = workerGroup.shutdownGracefully();
 +      Future<?> c2 = bossGroup.shutdownGracefully();
 +      this.serverChannel.close();
 +      c.syncUninterruptibly();
 +      c2.syncUninterruptibly();
 +      this.regionCache.close();
 +      if (mainThread != null)
 +        mainThread.interrupt();
 +      for (ScheduledFuture<?> f : this.expirationFutures.values())
 +        f.cancel(true);
 +      this.expirationFutures.clear();
 +      this.expirationExecutor.shutdownNow();
 +      closeFuture.syncUninterruptibly();
 +      shutdown = true;
 +    }
 +  }
 +
 +  /**
 +   * Static main method that allows the {@link GemFireRedisServer} to be 
 +   * started from the command line. The supported command line arguments are
 +   * <p>-port=
 +   * <br>-bind-address=
 +   * <br>-log-level=
 +   * 
 +   * @param args Command line args
 +   */
 +  public static void main(String[] args) {
 +    int port = DEFAULT_REDIS_SERVER_PORT;
 +    String bindAddress = null;
 +    String logLevel = null;
 +    for (String arg: args) {
 +      if (arg.startsWith("-port"))
 +        port = getPort(arg);
 +      else if (arg.startsWith("-bind-address"))
 +        bindAddress = getBindAddress(arg);
 +      else if (arg.startsWith("-log-level"))
 +        logLevel = getLogLevel(arg);
 +    }
 +    mainThread = Thread.currentThread();
 +    GemFireRedisServer server = new GemFireRedisServer(bindAddress, port, logLevel);
 +    server.start();
 +    while(true) {
 +      try {
 +        Thread.sleep(Long.MAX_VALUE);
 +      } catch (InterruptedException e1) { 
 +        break;
 +      } catch (Exception e) {
 +      }
 +    }
 +  }
 +
 +  /**
 +   * Helper method to parse the port to a number
 +   * 
 +   * @param arg String where the argument is
 +   * @return The port number when the correct syntax was used, 
 +   * otherwise will return {@link #DEFAULT_REDIS_SERVER_PORT}
 +   */
 +  private static int getPort(String arg) {
 +    int port = DEFAULT_REDIS_SERVER_PORT;
 +    if (arg != null && arg.length() > 6) {
 +      if (arg.startsWith("-port")) {
 +        String p = arg.substring(arg.indexOf('=') + 1);
 +        p = p.trim();
 +        try {
 +          port = Integer.parseInt(p);
 +        } catch (NumberFormatException e) {
 +          System.out.println("Unable to parse port, using default port");
 +        }
 +      }
 +    }
 +    return port;
 +  }
 +
 +  /**
 +   * Helper method to parse bind address
 +   * 
 +   * @param arg String holding bind address
 +   * @return Bind address
 +   */
 +  private static String getBindAddress(String arg) {
 +    String address = null;
 +    if (arg != null && arg.length() > 14) {
 +      if (arg.startsWith("-bind-address")) {
 +        String p = arg.substring(arg.indexOf('=') + 1);
 +        address = p.trim();
 +      }
 +    }
 +    return address;
 +  }
 +
 +  /**
 +   * Helper method to parse log level
 +   * 
 +   * @param arg String holding log level
 +   * @return Log level
 +   */
 +  private static String getLogLevel(String arg) {
 +    String logLevel = null;
 +    if (arg != null && arg.length() > 11) {
 +      if (arg.startsWith("-log-level")) {
 +        String p = arg.substring(arg.indexOf('=') + 1);
 +        logLevel = p.trim();
 +      }
 +    }
 +    return logLevel;
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/test/java/com/gemstone/gemfire/cache/CacheRegionClearStatsDUnitTest.java
----------------------------------------------------------------------
diff --cc geode-core/src/test/java/com/gemstone/gemfire/cache/CacheRegionClearStatsDUnitTest.java
index 7ea3565,0000000..88a32a3
mode 100755,000000..100755
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache/CacheRegionClearStatsDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/CacheRegionClearStatsDUnitTest.java
@@@ -1,248 -1,0 +1,240 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package com.gemstone.gemfire.cache;
 +
 +import java.util.Properties;
 +
 +import com.gemstone.gemfire.cache.client.PoolManager;
 +import com.gemstone.gemfire.cache.client.internal.PoolImpl;
 +import com.gemstone.gemfire.cache.server.CacheServer;
 +import com.gemstone.gemfire.distributed.DistributedSystem;
 +import com.gemstone.gemfire.distributed.internal.DistributionConfig;
 +import com.gemstone.gemfire.internal.AvailablePort;
 +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
 +import com.gemstone.gemfire.test.dunit.Assert;
 +import com.gemstone.gemfire.test.dunit.DistributedTestCase;
 +import com.gemstone.gemfire.test.dunit.Host;
 +import com.gemstone.gemfire.test.dunit.NetworkUtils;
 +import com.gemstone.gemfire.test.dunit.VM;
 +/**
 + * verifies the count of clear operation
 + *  
 + * @author aingle
 + */
 +public class CacheRegionClearStatsDUnitTest extends DistributedTestCase {
 +  /** the cache */
 +  private static GemFireCacheImpl cache = null;
 +
-   private static VM server1 = null;
++  private VM server1 = null;
 +
 +  private static VM client1 = null;
 +
 +  /** name of the test region */
 +  private static final String REGION_NAME = "CacheRegionClearStatsDUnitTest_Region";
 +
 +  private static final String k1 = "k1";
 +
 +  private static final String k2 = "k2";
 +
 +  private static final String client_k1 = "client-k1";
 +
 +  private static final String client_k2 = "client-k2";
 +  
 +  private static final int clearOp = 2;
 +  
 +  /** constructor */
 +  public CacheRegionClearStatsDUnitTest(String name) {
 +    super(name);
 +  }
 +
 +  public void setUp() throws Exception {
 +    super.setUp();
 +    final Host host = Host.getHost(0);
 +    server1 = host.getVM(0);
 +    client1 = host.getVM(1);
 +  }
 +
 +  private void createCache(Properties props) throws Exception {
 +    DistributedSystem ds = getSystem(props);
 +    ds.disconnect();
 +    ds = getSystem(props);
 +    assertNotNull(ds);
 +    cache = (GemFireCacheImpl)CacheFactory.create(ds);
 +    assertNotNull(cache);
 +  }
 +
 +  public static void createClientCache(String host, Integer port1)
 +      throws Exception {
 +    new CacheRegionClearStatsDUnitTest("temp");
 +    Properties props = new Properties();
 +    props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
 +    props.setProperty(DistributionConfig.LOCATORS_NAME, "");
 +    new CacheRegionClearStatsDUnitTest("temp").createCache(props);
 +    PoolImpl p = (PoolImpl)PoolManager.createFactory().addServer(host,
 +        port1.intValue()).setSubscriptionEnabled(false)
 +        .setThreadLocalConnections(true).setMinConnections(1).setReadTimeout(
 +            20000).setPingInterval(10000).setRetryAttempts(1)
 +        .create("CacheRegionClearStatsDUnitTest");
 +
 +    AttributesFactory factory = new AttributesFactory();
 +    factory.setScope(Scope.DISTRIBUTED_ACK);
 +    factory.setPoolName(p.getName());
 +
 +    RegionAttributes attrs = factory.create();
 +    Region region = cache.createRegion(REGION_NAME, attrs);
 +    //region.registerInterest("ALL_KEYS");
 +  }
 +
 +  public static Integer createServerCacheDisk() throws Exception {
 +    return createCache(DataPolicy.PERSISTENT_REPLICATE);
 +  }
 +
 +  private static Integer createCache(DataPolicy dataPolicy) throws Exception {
 +    new CacheRegionClearStatsDUnitTest("temp").createCache(new Properties());
 +    AttributesFactory factory = new AttributesFactory();
 +    factory.setScope(Scope.DISTRIBUTED_ACK);
 +    factory.setDataPolicy(dataPolicy);
 +    RegionAttributes attrs = factory.create();
 +    cache.createRegion(REGION_NAME, attrs);
 +    int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
 +    CacheServer server1 = cache.addCacheServer();
 +    server1.setPort(port);
 +    server1.setNotifyBySubscription(true);
 +    server1.start();
 +    return new Integer(server1.getPort());
 +  }
 +
 +  public static Integer createServerCache() throws Exception {
 +    return createCache(DataPolicy.REPLICATE);
 +  }
 +
 +  public static void createClientCacheDisk(String host, Integer port1)
 +      throws Exception {
 +    new CacheRegionClearStatsDUnitTest("temp");
 +    Properties props = new Properties();
 +    props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
 +    props.setProperty(DistributionConfig.LOCATORS_NAME, "");
 +    new CacheRegionClearStatsDUnitTest("temp").createCache(props);
 +    PoolImpl p = (PoolImpl)PoolManager.createFactory().addServer(host,
 +        port1.intValue()).setSubscriptionEnabled(false)
 +        .setThreadLocalConnections(true).setMinConnections(1).setReadTimeout(
 +            20000).setPingInterval(10000).setRetryAttempts(1).create(
 +            "CacheRegionClearStatsDUnitTest");
 +
 +    AttributesFactory factory = new AttributesFactory();
 +    factory.setScope(Scope.DISTRIBUTED_ACK);
 +    factory.setPoolName(p.getName());
 +    factory.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
 +    RegionAttributes attrs = factory.create();
 +    Region region = cache.createRegion(REGION_NAME, attrs);
 +    //region.registerInterest("ALL_KEYS");
 +  }
 +  /**
 +   * This test does the following (<b> clear stats counter </b>):<br>
 +   * 1)Verifies that clear operation count matches with stats count<br>
 +   */
 +  public void testClearStatsWithNormalRegion(){
-     Integer port1 = ((Integer)server1.invoke(
-         CacheRegionClearStatsDUnitTest.class, "createServerCache"));
++    Integer port1 = ((Integer)server1.invoke(() -> CacheRegionClearStatsDUnitTest.createServerCache()));
 +
-     client1.invoke(CacheRegionClearStatsDUnitTest.class,
-         "createClientCache", new Object[] {
-             NetworkUtils.getServerHostName(server1.getHost()), port1 });
-     client1.invoke(CacheRegionClearStatsDUnitTest.class, "put");
++    client1.invoke(() -> CacheRegionClearStatsDUnitTest.createClientCache(
++            NetworkUtils.getServerHostName(server1.getHost()), port1 ));
++    client1.invoke(() -> CacheRegionClearStatsDUnitTest.put());
 +    
 +    try{
 +      Thread.sleep(10000);
 +    }catch(Exception e){
 +      // sleep 
 +    }
 +    
-     client1.invoke(CacheRegionClearStatsDUnitTest.class,
-         "validationClearStat");
++    client1.invoke(() -> CacheRegionClearStatsDUnitTest.validationClearStat());
 +    
-     server1.invoke(CacheRegionClearStatsDUnitTest.class,
-     "validationClearStat");
++    server1.invoke(() -> CacheRegionClearStatsDUnitTest.validationClearStat());
 +  }
 +  /**
 +   * This test does the following (<b> clear stats counter when disk involved </b>):<br>
 +   * 1)Verifies that clear operation count matches with stats count <br>
 +   */
 +  public void testClearStatsWithDiskRegion(){
-     Integer port1 = ((Integer)server1.invoke(
-         CacheRegionClearStatsDUnitTest.class, "createServerCacheDisk"));
++    Integer port1 = ((Integer)server1.invoke(() -> CacheRegionClearStatsDUnitTest.createServerCacheDisk()));
 +
-     client1.invoke(CacheRegionClearStatsDUnitTest.class,
-         "createClientCacheDisk", new Object[] {
-             NetworkUtils.getServerHostName(server1.getHost()), port1 });
-     client1.invoke(CacheRegionClearStatsDUnitTest.class, "put");
++    client1.invoke(() -> CacheRegionClearStatsDUnitTest.createClientCacheDisk(
++            NetworkUtils.getServerHostName(server1.getHost()), port1 ));
++    client1.invoke(() -> CacheRegionClearStatsDUnitTest.put());
 +    
 +    try{
 +      Thread.sleep(10000);
 +    }catch(Exception e){
 +      // sleep 
 +    }
 +    
-     client1.invoke(CacheRegionClearStatsDUnitTest.class,
-         "validationClearStat");
++    client1.invoke(() -> CacheRegionClearStatsDUnitTest.validationClearStat());
 +    
-     server1.invoke(CacheRegionClearStatsDUnitTest.class,
-     "validationClearStat");
++    server1.invoke(() -> CacheRegionClearStatsDUnitTest.validationClearStat());
 +  }
 +  
 +  @Override
 +  protected final void preTearDown() throws Exception {
-     client1.invoke(CacheRegionClearStatsDUnitTest.class, "closeCache");
++    client1.invoke(() -> CacheRegionClearStatsDUnitTest.closeCache());
 +    // then close the servers
-     server1.invoke(CacheRegionClearStatsDUnitTest.class, "closeCache");
++    server1.invoke(() -> CacheRegionClearStatsDUnitTest.closeCache());
 +  }
 +
 +  public static void closeCache() {
 +    if (cache != null && !cache.isClosed()) {
 +      cache.close();
 +      cache.getDistributedSystem().disconnect();
 +    }
 +  }
 +  
 +  public static void put() {
 +    try {
 +      Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME);
 +      assertNotNull(r1);
 +
 +      r1.put(k1, client_k1);
 +      assertEquals(r1.getEntry(k1).getValue(), client_k1);
 +      r1.put(k2, client_k2);
 +      assertEquals(r1.getEntry(k2).getValue(), client_k2);
 +      try{
 +        Thread.sleep(10000);
 +      }catch(Exception e){
 +        // sleep 
 +      }
 +      r1.clear();
 +      
 +      r1.put(k1, client_k1);
 +      assertEquals(r1.getEntry(k1).getValue(), client_k1);
 +      r1.put(k2, client_k2);
 +      assertEquals(r1.getEntry(k2).getValue(), client_k2);
 +      try{
 +        Thread.sleep(10000);
 +      }catch(Exception e){
 +        // sleep 
 +      }
 +      r1.clear();
 +    }
 +    catch (Exception ex) {
 +      Assert.fail("failed while put", ex);
 +    }
 +  }
 +  
 +  public static void validationClearStat(){
 +    assertEquals(cache.getCachePerfStats().getClearCount(), clearOp);
 +  }
 +  
 +  
 +}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/test/java/com/gemstone/gemfire/cache/ClientServerTimeSyncDUnitTest.java
----------------------------------------------------------------------
diff --cc geode-core/src/test/java/com/gemstone/gemfire/cache/ClientServerTimeSyncDUnitTest.java
index 8166318,0000000..8b04ab5
mode 100755,000000..100755
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache/ClientServerTimeSyncDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/ClientServerTimeSyncDUnitTest.java
@@@ -1,203 -1,0 +1,203 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package com.gemstone.gemfire.cache;
 +
 +import java.io.IOException;
 +import java.util.Properties;
 +
 +import org.junit.Ignore;
 +
 +import com.gemstone.gemfire.cache.client.ClientCache;
 +import com.gemstone.gemfire.cache.client.ClientCacheFactory;
 +import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
 +import com.gemstone.gemfire.cache.server.CacheServer;
 +import com.gemstone.gemfire.cache30.CacheTestCase;
 +import com.gemstone.gemfire.distributed.internal.DSClock;
 +import com.gemstone.gemfire.internal.AvailablePortHelper;
 +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
 +import com.gemstone.gemfire.test.dunit.Assert;
 +import com.gemstone.gemfire.test.dunit.Host;
 +import com.gemstone.gemfire.test.dunit.LogWriterUtils;
 +import com.gemstone.gemfire.test.dunit.NetworkUtils;
 +import com.gemstone.gemfire.test.dunit.SerializableCallable;
 +import com.gemstone.gemfire.test.dunit.VM;
 +import com.gemstone.gemfire.test.dunit.Wait;
 +import com.gemstone.gemfire.test.dunit.WaitCriterion;
 +
 +public class ClientServerTimeSyncDUnitTest extends CacheTestCase {
 +
 +  public ClientServerTimeSyncDUnitTest(String name) {
 +    super(name);
 +  }
 +
 +  @Ignore("Bug 52327")
 +  public void DISABLED_testClientTimeAdvances() {
 +    Host host = Host.getHost(0);
 +    VM vm0 = host.getVM(0); // Server
 +    VM vm1 = host.getVM(1); // Client
 +
 +    final String regionName = "testRegion";
 +    final long TEST_OFFSET = 10000;
 +
 +    ClientCache cache = null;
 +    
 +    try {
 +
 +      final int serverPort = (Integer)vm0.invoke(new SerializableCallable("Start server with a region") {
 +        
 +        @Override
 +        public Object call() {
 +          Cache cache = getCache();
 +          cache.createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
 +          LogWriterUtils.getLogWriter().info("Done creating region, now creating CacheServer");
 +          CacheServer server = null;
 +          try {
 +            server = cache.addCacheServer();
 +            server.setPort(AvailablePortHelper.getRandomAvailableTCPPort());
 +            server.start();
 +          } catch (IOException e) {
 +            Assert.fail("Starting cache server failed.", e);
 +          }
 +  
 +          // now set an artificial time offset for the test
 +          system.getClock().setCacheTimeOffset(null, TEST_OFFSET, true);
 +          
 +          LogWriterUtils.getLogWriter().info("Done creating and starting CacheServer on port " + server.getPort());
 +          return server.getPort();
 +        }
 +      });
 +      
 +      final String hostName = NetworkUtils.getServerHostName(vm0.getHost());
 +  
 +      // Start client with proxy region and register interest
 +        
 +      disconnectFromDS();
 +      Properties props = new Properties();
 +      props.setProperty("locators", "");
 +      props = getSystem(props).getProperties();
 +      cache = new ClientCacheFactory(props).setPoolSubscriptionEnabled(true)
 +          .addPoolServer(hostName, serverPort)
 +          .setPoolPingInterval(5000)
 +          .create();
 +      Region proxyRegion = cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create(regionName);
 +  
 +      proxyRegion.registerInterestRegex(".*");
 +  
 +      proxyRegion.put("testkey", "testValue1");
 +          
 +      final DSClock clock = ((GemFireCacheImpl)cache).getSystem().getClock();
 +      WaitCriterion wc = new WaitCriterion() {
 +        public boolean done() {
 +          long clientTimeOffset = clock.getCacheTimeOffset();
 +          LogWriterUtils.getLogWriter().info("Client node's new time offset is: " + clientTimeOffset);
 +          return clientTimeOffset >= TEST_OFFSET;
 +        }
 +        public String description() {
 +          return "Waiting for cacheTimeOffset to be non-zero.  PingOp should have set it to something";
 +        }
 +      };
 +      Wait.waitForCriterion(wc, 60000, 1000, true);
 +    } finally {
 +      cache.close();
-       vm1.invoke(CacheTestCase.class, "disconnectFromDS");
++      vm1.invoke(() -> CacheTestCase.disconnectFromDS());
 +    }
 +  }
 +  
 +  public void testNothing() {
 +    // place-holder to keep dunit runner from barfing
 +  }
 +
 +  @Ignore("not yet implemented")
 +  public void DISABLED_testClientTimeSlowsDown() {
 +    Host host = Host.getHost(0);
 +    VM vm0 = host.getVM(0); // Server
 +    VM vm1 = host.getVM(1); // Client
 +
 +    final String regionName = "testRegion";
 +    final long TEST_OFFSET = 10000;
 +
 +    ClientCache cache = null;
 +    
 +    try {
 +
 +      final int serverPort = (Integer)vm0.invoke(new SerializableCallable("Start server with a region") {
 +        
 +        @Override
 +        public Object call() {
 +          Cache cache = getCache();
 +          cache.createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
 +          LogWriterUtils.getLogWriter().info("Done creating region, now creating CacheServer");
 +          CacheServer server = null;
 +          try {
 +            server = cache.addCacheServer();
 +            server.setPort(AvailablePortHelper.getRandomAvailableTCPPort());
 +            server.start();
 +          } catch (IOException e) {
 +            Assert.fail("Starting cache server failed.", e);
 +          }
 +  
 +          // now set an artificial time offset for the test
 +          system.getClock().setCacheTimeOffset(null, -TEST_OFFSET, true);
 +          
 +          LogWriterUtils.getLogWriter().info("Done creating and starting CacheServer on port " + server.getPort());
 +          return server.getPort();
 +        }
 +      });
 +      
 +      Wait.pause((int)TEST_OFFSET);  // let cacheTimeMillis consume the time offset
 +      
 +      final String hostName = NetworkUtils.getServerHostName(vm0.getHost());
 +  
 +      // Start client with proxy region and register interest
 +        
 +      disconnectFromDS();
 +      Properties props = new Properties();
 +      props.setProperty("locators", "");
 +      props = getSystem(props).getProperties();
 +      cache = new ClientCacheFactory(props).setPoolSubscriptionEnabled(true)
 +          .addPoolServer(hostName, serverPort)
 +          .setPoolPingInterval(5000)
 +          .create();
 +      Region proxyRegion = cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create(regionName);
 +  
 +      proxyRegion.registerInterestRegex(".*");
 +  
 +      proxyRegion.put("testkey", "testValue1");
 +          
 +      final DSClock clock = ((GemFireCacheImpl)cache).getSystem().getClock();
 +      WaitCriterion wc = new WaitCriterion() {
 +        public boolean done() {
 +          long clientTimeOffset = clock.getCacheTimeOffset();
 +          LogWriterUtils.getLogWriter().info("Client node's new time offset is: " + clientTimeOffset);
 +          if (clientTimeOffset >= 0) {
 +            return false;
 +          }
 +          long cacheTime = clock.cacheTimeMillis();
 +          return Math.abs(System.currentTimeMillis() - (cacheTime - clientTimeOffset)) < 5;
 +        }
 +        public String description() {
 +          return "Waiting for cacheTimeOffset to be negative and cacheTimeMillis to stabilize";
 +        }
 +      };
 +      Wait.waitForCriterion(wc, 60000, 1000, true);
 +    } finally {
 +      cache.close();
-       vm1.invoke(CacheTestCase.class, "disconnectFromDS");
++      vm1.invoke(() -> CacheTestCase.disconnectFromDS());
 +    }
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/AutoConnectionSourceDUnitTest.java
----------------------------------------------------------------------
diff --cc geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/AutoConnectionSourceDUnitTest.java
index b0f3b59,0000000..03368de
mode 100644,000000..100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/AutoConnectionSourceDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/AutoConnectionSourceDUnitTest.java
@@@ -1,598 -1,0 +1,598 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package com.gemstone.gemfire.cache.client.internal;
 +
 +import java.io.Serializable;
 +import java.net.InetSocketAddress;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.List;
 +
 +import junit.framework.Assert;
 +
 +import com.gemstone.gemfire.cache.Cache;
 +import com.gemstone.gemfire.cache.Region;
 +import com.gemstone.gemfire.cache.client.NoAvailableLocatorsException;
 +import com.gemstone.gemfire.cache.client.NoAvailableServersException;
 +import com.gemstone.gemfire.cache.client.PoolManager;
 +import com.gemstone.gemfire.cache.server.CacheServer;
 +import com.gemstone.gemfire.distributed.internal.ServerLocation;
 +import com.gemstone.gemfire.internal.AvailablePort;
 +import com.gemstone.gemfire.internal.AvailablePortHelper;
 +import com.gemstone.gemfire.management.membership.ClientMembership;
 +import com.gemstone.gemfire.management.membership.ClientMembershipEvent;
 +import com.gemstone.gemfire.management.membership.ClientMembershipListenerAdapter;
 +import com.gemstone.gemfire.test.dunit.Host;
 +import com.gemstone.gemfire.test.dunit.IgnoredException;
 +import com.gemstone.gemfire.test.dunit.NetworkUtils;
 +import com.gemstone.gemfire.test.dunit.SerializableCallable;
 +import com.gemstone.gemfire.test.dunit.SerializableRunnable;
 +import com.gemstone.gemfire.test.dunit.VM;
 +import com.gemstone.gemfire.test.dunit.Wait;
 +
 +/**
 + * Tests cases that are particular for the auto connection source
 + * - dynamically discovering servers, locators, handling 
 + * locator disappearance, etc.
 + * @author dsmith
 + *
 + */
 +public class AutoConnectionSourceDUnitTest extends LocatorTestBase {
 +  
 +  protected static final Object BRIDGE_LISTENER = "BRIDGE_LISTENER";
 +  private static final long MAX_WAIT = 60000;
 +  
 +  public void setUp() throws Exception {
 +    super.setUp();
 +    IgnoredException.addIgnoredException("NoAvailableLocatorsException");
 +  }
 +
 +  public AutoConnectionSourceDUnitTest(String name) {
 +    super(name);
 +  }
 +  
 +  public void testDiscoverBridgeServers() throws Exception {
 +    final Host host = Host.getHost(0);
 +    VM vm0 = host.getVM(0);
 +    VM vm1 = host.getVM(1);
 +    VM vm2 = host.getVM(2);
 +    
 +    int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
 +    startLocatorInVM(vm0, locatorPort, "");
 +    
 +    String locators = NetworkUtils.getServerHostName(vm0.getHost())+ "[" + locatorPort + "]";
 +    
 +    startBridgeServerInVM(vm1, null, locators);
 +
 +    startBridgeClientInVM(vm2, null, NetworkUtils.getServerHostName(vm0.getHost()), locatorPort);
 +
 +    putAndWaitForSuccess(vm2, REGION_NAME, "key", "value");
 +    
 +    Assert.assertEquals("value", getInVM(vm1, "key"));
 +  }
 +
 +  public void testNoLocators() {
 +    
 +    final Host host = Host.getHost(0);
 +    VM vm0 = host.getVM(0);
 +    
 +    try {
 +      startBridgeClientInVM(vm0, null, NetworkUtils.getServerHostName(vm0.getHost()), AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET));
 +      putInVM(vm0, "key", "value");
 +      fail("Client cache should not have been able to start");
 +    } catch(Exception e) {
 +      //expected an exception
 +    }
 +  }
 +  
 +  public void testNoBridgeServer() {
 +    final Host host = Host.getHost(0);
 +    VM vm0 = host.getVM(0);
 +    VM vm1 = host.getVM(1);
 +    
 +    int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
 +    startLocatorInVM(vm0, locatorPort, "");
 +    try { 
 +      startBridgeClientInVM(vm1, null, NetworkUtils.getServerHostName(vm0.getHost()), locatorPort);
 +      putInVM(vm0, "key", "value");
 +      fail("Client cache should not have been able to start");
 +    } catch(Exception e) {
 +      //expected an exception
 +    }
 +  }
 +  
 +  public void testDynamicallyFindBridgeServer() throws Exception  {
 +    final Host host = Host.getHost(0);
 +    VM vm0 = host.getVM(0);
 +    VM vm1 = host.getVM(1);
 +    VM vm2 = host.getVM(2);
 +    VM vm3 = host.getVM(3);
 +    
 +    int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
 +    startLocatorInVM(vm0, locatorPort, "");
 +    
 +    String locators = NetworkUtils.getServerHostName(vm0.getHost()) + "[" + locatorPort + "]";
 +    
 +    startBridgeServerInVM(vm1, null, locators);
 +    
 +    startBridgeClientInVM(vm2, null, NetworkUtils.getServerHostName(vm0.getHost()), locatorPort);
 +    
 +    putAndWaitForSuccess(vm2, REGION_NAME, "key", "value");
 +    
 +    startBridgeServerInVM(vm3, null, locators);
 +    
 +    stopBridgeMemberVM(vm1);
 +    
 +    putAndWaitForSuccess(vm2, REGION_NAME, "key2", "value2");
 +    
 +    Assert.assertEquals("value2", getInVM(vm3, "key2"));
 +  }
 +  
 +  public void testDynamicallyFindLocators() throws Exception {
 +    final Host host = Host.getHost(0);
 +    final String hostName = NetworkUtils.getServerHostName(host);
 +    VM vm0 = host.getVM(0);
 +    VM vm1 = host.getVM(1);
 +    VM vm2 = host.getVM(2);
 +    VM vm3 = host.getVM(3);
 +    
 +    int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(3);
 +    
 +    final int locatorPort0 = ports[0];
 +    final int locatorPort1 = ports[1];
 +    final int locatorPort3 = ports[2];
 +    String locators = getLocatorString(host, new int[] { locatorPort0, locatorPort1, locatorPort3});
 +    startLocatorInVM(vm0, locatorPort0, locators);
 +    
 +    startLocatorInVM(vm1, locatorPort1, locators);
 +    startBridgeClientInVM(vm2, null, NetworkUtils.getServerHostName(vm0.getHost()), locatorPort0);
 +    
 +    InetSocketAddress locatorToWaitFor= new InetSocketAddress(hostName, locatorPort1);
 +    waitForLocatorDiscovery(vm2, locatorToWaitFor);
 +    
 +    stopLocatorInVM(vm0);
 +    startBridgeServerInVM(vm0, null, locators);
 +    
 +    putAndWaitForSuccess(vm2, REGION_NAME, "key", "value");
 +    Assert.assertEquals("value", getInVM(vm0, "key"));
 +    
 +    startLocatorInVM(vm3, locatorPort3, locators);
 +    stopBridgeMemberVM(vm0);
 +    locatorToWaitFor= new InetSocketAddress(hostName, locatorPort3);
 +    waitForLocatorDiscovery(vm2, locatorToWaitFor);
 +    stopLocatorInVM(vm1);
 +    startBridgeServerInVM(vm1, null, locators);
 +    putAndWaitForSuccess(vm2, REGION_NAME, "key2", "value2");
 +    Assert.assertEquals("value2", getInVM(vm1, "key2"));
 +    
 +  }
 +  
 +  public void testEmbeddedLocator() throws Exception  {
 +    final Host host = Host.getHost(0);
 +    VM vm0 = host.getVM(0);
 +    VM vm1 = host.getVM(1);
 +    VM vm2 = host.getVM(2);
 +    VM vm3 = host.getVM(3);
 +    
 +    int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
 +    
 +    String locators = NetworkUtils.getServerHostName(vm0.getHost()) + "[" + locatorPort + "]";
 +    
 +    startBridgeServerWithEmbeddedLocator(vm0, null, locators, new String[] {REGION_NAME}, CacheServer.DEFAULT_LOAD_PROBE);
 +    
 +    startBridgeClientInVM(vm2, null, NetworkUtils.getServerHostName(vm0.getHost()), locatorPort);
 +    
 +    putAndWaitForSuccess(vm2, REGION_NAME, "key", "value");
 +    
 +    Assert.assertEquals("value", getInVM(vm2, "key"));
 +  }
 +
 +  private void waitForLocatorDiscovery(VM vm,
 +      final InetSocketAddress locatorToWaitFor) {
 +    vm.invoke(new SerializableCallable() {
 +      public Object call() throws InterruptedException {
 +        MyLocatorCallback callback = (MyLocatorCallback) remoteObjects.get(CALLBACK_KEY);
 +        
 +        boolean discovered = callback.waitForDiscovery(locatorToWaitFor, MAX_WAIT);
 +        Assert.assertTrue("Waited " + MAX_WAIT + " for " + locatorToWaitFor
 +            + " to be discovered on client. List is now: "
 +            + callback.getDiscovered(), discovered);
 +        return null;
 +      }
 +    });
 +  }
 +  
 +  public void testServerGroups() throws Exception {
 +    final Host host = Host.getHost(0);
 +    VM vm0 = host.getVM(0);
 +    VM vm1 = host.getVM(1);
 +    VM vm2 = host.getVM(2);
 +    VM vm3 = host.getVM(3);
 +    
 +    int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
 +    startLocatorInVM(vm0, locatorPort, "");
 +    
 +    String locators = NetworkUtils.getServerHostName(vm0.getHost()) + "[" + locatorPort + "]";
 +    
 +    startBridgeServerInVM(vm1, new String[] {"group1", "group2"} , locators, new String[] {"A", "B"});
 +    startBridgeServerInVM(vm2, new String[] {"group2", "group3"}, locators, new String[] {"B", "C"});
 +
 +    
 +    startBridgeClientInVM(vm3, "group1", NetworkUtils.getServerHostName(vm0.getHost()), locatorPort, new String [] {"A", "B", "C"});
 +    putAndWaitForSuccess(vm3, "A", "key", "value");
 +    Assert.assertEquals("value", getInVM(vm1, "A", "key"));
 +    try {
 +      putInVM(vm3, "C", "key2", "value2");
 +      fail("Should not have been able to find Region C on the server");
 +    } catch(Exception expected) {}
 +    
 +    stopBridgeMemberVM(vm3);
 +    
 +    startBridgeClientInVM(vm3, "group3", NetworkUtils.getServerHostName(vm0.getHost()), locatorPort, new String [] {"A", "B", "C"});
 +    try {
 +      putInVM(vm3, "A", "key3", "value");
 +      fail("Should not have been able to find Region A on the server");
 +    } catch(Exception expected) {}
 +    putInVM(vm3, "C", "key4", "value");
 +    Assert.assertEquals("value", getInVM(vm2, "C", "key4"));
 +    
 +    stopBridgeMemberVM(vm3);
 +    
 +    startBridgeClientInVM(vm3, "group2", NetworkUtils.getServerHostName(vm0.getHost()), locatorPort, new String [] {"A", "B", "C"});
 +    putInVM(vm3, "B", "key5", "value");
 +    Assert.assertEquals("value", getInVM(vm1, "B", "key5"));
 +    Assert.assertEquals("value", getInVM(vm2, "B", "key5"));
 +    
 +    stopBridgeMemberVM(vm1);
 +    putInVM(vm3, "B", "key6", "value");
 +    Assert.assertEquals("value", getInVM(vm2, "B", "key6"));
 +    startBridgeServerInVM(vm1, new String[] {"group1", "group2"} , locators, new String[] {"A", "B"});
 +    stopBridgeMemberVM(vm2);
 +    
 +    putInVM(vm3, "B", "key7", "value");
 +    Assert.assertEquals("value", getInVM(vm1, "B", "key7"));
 +  }
 +  
-   public void testTwoServersInSameVM() {
++  public void testTwoServersInSameVM() throws Exception {
 +    final Host host = Host.getHost(0);
 +    VM vm0 = host.getVM(0);
 +    VM vm1 = host.getVM(1);
 +    VM vm2 = host.getVM(2);
 +//    VM vm3 = host.getVM(3);
 +    
 +    int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
 +    
 +    startLocatorInVM(vm0, locatorPort, "");
 +    
 +    final String locators = NetworkUtils.getServerHostName(vm0.getHost()) + "[" + locatorPort + "]";
 +    
 +    final int serverPort1 =startBridgeServerInVM(vm1, new String[] {"group1"}, locators);
 +    final int serverPort2 =addCacheServerInVM(vm1, new String[] {"group2"});
 +    
 +    startBridgeClientInVM(vm2, "group2", NetworkUtils.getServerHostName(vm0.getHost()), locatorPort);
 +    
 +    checkEndpoints(vm2, new int[] {serverPort2});
 +    
 +    stopBridgeMemberVM(vm2);
 +
 +    startBridgeClientInVM(vm2, "group1", NetworkUtils.getServerHostName(vm0.getHost()), locatorPort);
 +    
 +    checkEndpoints(vm2, new int[] {serverPort1});
 +  }
 +  
 +  public void testClientMembershipListener() throws Exception {
 +    final Host host = Host.getHost(0);
 +    VM locatorVM = host.getVM(0);
 +    VM bridge1VM = host.getVM(1);
 +    VM bridge2VM = host.getVM(2);
 +    VM clientVM = host.getVM(3);
 +    int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
 +    startLocatorInVM(locatorVM, locatorPort, "");
 +    String locators = NetworkUtils.getServerHostName(locatorVM.getHost()) + "[" + locatorPort + "]";
 +
 +    //start a bridge server with a listener
 +    addBridgeListener(bridge1VM);
 +    int serverPort1 = startBridgeServerInVM(bridge1VM, null, locators);
 +
 +    //start a bridge client with a listener
 +    addBridgeListener(clientVM);
 +    startBridgeClientInVM(clientVM, null, NetworkUtils.getServerHostName(locatorVM.getHost()), locatorPort);
 +    // wait for client to connect
 +    checkEndpoints(clientVM, new int[] {serverPort1});
 +    
 +    //make sure the client and bridge server both noticed each other
 +    waitForJoin(bridge1VM);
 +    MyListener serverListener = getBridgeListener(bridge1VM);
 +    Assert.assertEquals(0, serverListener.getCrashes());
 +    Assert.assertEquals(0, serverListener.getDepartures());
 +    Assert.assertEquals(1, serverListener.getJoins());
 +    resetBridgeListener(bridge1VM);
 +    
 +    waitForJoin(clientVM);
 +    MyListener clientListener= getBridgeListener(clientVM);
 +    Assert.assertEquals(0, clientListener.getCrashes());
 +    Assert.assertEquals(0, clientListener.getDepartures());
 +    Assert.assertEquals(1, clientListener.getJoins());
 +    resetBridgeListener(clientVM);
 +    
 +    checkEndpoints(clientVM, new int[] {serverPort1});
 +
 +    //start another bridge server and make sure it is detected by the client
 +    int serverPort2 = startBridgeServerInVM(bridge2VM, null, locators);
 +    
 +    checkEndpoints(clientVM, new int[] {serverPort1, serverPort2});
 +    serverListener = getBridgeListener(bridge1VM);
 +    Assert.assertEquals(0, serverListener.getCrashes());
 +    Assert.assertEquals(0, serverListener.getDepartures());
 +    Assert.assertEquals(0, serverListener.getJoins());
 +    resetBridgeListener(bridge1VM);
 +    waitForJoin(clientVM);
 +    clientListener= getBridgeListener(clientVM);
 +    Assert.assertEquals(0, clientListener.getCrashes());
 +    Assert.assertEquals(0, clientListener.getDepartures());
 +    Assert.assertEquals(1, clientListener.getJoins());
 +    resetBridgeListener(clientVM);
 +    
 +    //stop the second bridge server and make sure it is detected by the client
 +    stopBridgeMemberVM(bridge2VM);
 +    
 +    checkEndpoints(clientVM, new int[] {serverPort1});
 +    serverListener = getBridgeListener(bridge1VM);
 +    Assert.assertEquals(0, serverListener.getCrashes());
 +    Assert.assertEquals(0, serverListener.getDepartures());
 +    Assert.assertEquals(0, serverListener.getJoins());
 +    resetBridgeListener(bridge1VM);
 +    waitForCrash(clientVM);
 +    clientListener= getBridgeListener(clientVM);
 +    Assert.assertEquals(1, clientListener.getCrashes());
 +    Assert.assertEquals(0, clientListener.getDepartures());
 +    Assert.assertEquals(0, clientListener.getJoins());
 +    resetBridgeListener(clientVM);
 +    
 +    //stop the client and make sure the bridge server notices
 +    stopBridgeMemberVM(clientVM);
 +    waitForDeparture(bridge1VM);
 +    serverListener = getBridgeListener(bridge1VM);
 +    Assert.assertEquals(0, serverListener.getCrashes());
 +    Assert.assertEquals(1, serverListener.getDepartures());
 +    Assert.assertEquals(0, serverListener.getJoins());
 +  }
 +
 +  protected Object getInVM(VM vm, final Serializable key) {
 +    return getInVM(vm, REGION_NAME, key);
 +  }
 +  
 +  protected Object getInVM(VM vm, final String regionName, final Serializable key) {
 +    return vm.invoke(new SerializableCallable("Get in VM") {
 +      public Object call() throws Exception {
 +        Cache cache = (Cache) remoteObjects.get(CACHE_KEY);
 +        Region region = cache.getRegion(regionName);
 +        return region.get(key);
 +      }
 +    });
 +  }
 +  
 +  protected void putAndWaitForSuccess(VM vm,  final String regionName, final Serializable key, final Serializable value) throws InterruptedException
 +  {
 +    long endTime = System.currentTimeMillis() + MAX_WAIT;
 +    long remaining = MAX_WAIT;
 +    int i = 0;
 +    while(true) {
 +      try {
 +        System.err.println("Attempt: " + (i++));
 +        putInVM(vm, regionName, key, value);
 +        break;
 +      } catch (NoAvailableLocatorsException | com.gemstone.gemfire.test.dunit.RMIException e) {
 +        if( !(e instanceof NoAvailableLocatorsException)
 +            && !(e.getCause() instanceof NoAvailableServersException)) {
 +          throw e;
 +        }
 +        if(remaining <= 0) {
 +          throw e;
 +        }
 +        Wait.pause(100);
 +        remaining = endTime - System.currentTimeMillis();
 +      }
 +    }
 +  }
 +
 +  protected void putInVM(VM vm, final Serializable key, final Serializable value) {
 +    putInVM(vm, REGION_NAME, key, value);
 +  }
 +  
 +  
 +  
 +  protected void putInVM(VM vm,  final String regionName, final Serializable key, final Serializable value) {
 +    vm.invoke(new SerializableCallable("Put in VM") {
 +      public Object call() throws Exception {
 +        Cache cache = (Cache) remoteObjects.get(CACHE_KEY);
 +        Region region = cache.getRegion(regionName);
 +        return region.put(key, value);
 +      }
 +    });
 +  }
 +  
 +  /**
 +   * Assert that there is one endpoint with the given host in port
 +   * on the client vm.
 +   * @param vm - the vm the client is running in
 +   * @param expectedPorts - The server ports we expect the client to be connected to.
 +   */
 +  protected void checkEndpoints(VM vm, final int[] expectedPorts) {
 +    vm.invoke(new SerializableRunnable("Check endpoint") {
 +      public void run() {
 +        PoolImpl pool = (PoolImpl) PoolManager.find(POOL_NAME);
 +        int retryCount = 50;
 +        List/*<ServerLocation>*/ endpoints;
 +        HashSet actualEndpointPorts;
 +        HashSet expectedEndpointPorts = new HashSet();
 +        for(int i = 0; i < expectedPorts.length; i++) {
 +          expectedEndpointPorts.add(new Integer(expectedPorts[i]));
 +        }
 +        do {
 +          endpoints = pool.getCurrentServers();
 +          actualEndpointPorts = new HashSet();
 +          for(Iterator itr = endpoints.iterator(); itr.hasNext();) {
 +            ServerLocation sl = (ServerLocation)itr.next();
 +            actualEndpointPorts.add(new Integer(sl.getPort()));
 +          }
 +          if (expectedEndpointPorts.size() == actualEndpointPorts.size()) {
 +            break;
 +          } else {
 +            Wait.pause(100);
 +          }
 +        } while(retryCount-- > 0);
 +        Assert.assertEquals(expectedEndpointPorts, actualEndpointPorts);
 +      }
 +    });
 +  }
 +  
 +  protected void addBridgeListener(VM vm) {
 +    vm.invoke(new SerializableRunnable("Add membership listener") {
 +      public void run() {
 +        MyListener listener = new MyListener();
 +        ClientMembership.registerClientMembershipListener(listener);
 +        remoteObjects.put(BRIDGE_LISTENER, listener);
 +      }
 +    });
 +  }
 +  
 +  protected void resetBridgeListener(VM vm) {
 +    vm.invoke(new SerializableRunnable("Add membership listener") {
 +      public void run() {
 +        MyListener listener = (MyListener) remoteObjects.get(BRIDGE_LISTENER);
 +        listener.reset();
 +      }
 +    });
 +  }
 +  
 +  private MyListener getBridgeListener(VM vm) {
 +    return (MyListener) vm.invoke(new SerializableCallable("Add membership listener") {
 +      public Object call() {
 +        return remoteObjects.get(BRIDGE_LISTENER);
 +      }
 +    });
 +  }
 +  
 +  private void waitForJoin(VM vm) {
 +    vm.invoke(new SerializableRunnable() {
 +      public void run() {
 +        MyListener listener = (MyListener) remoteObjects.get(BRIDGE_LISTENER);
 +        synchronized(listener) {
 +          long end = System.currentTimeMillis() + 10000;
 +          while (listener.joins == 0) {
 +            try {
 +              long remaining = end - System.currentTimeMillis();
 +              if(remaining < 0) {
 +                break;
 +              }
 +              listener.wait(remaining);
 +            } catch (InterruptedException e) {
 +              fail("interrupted");
 +            }
 +          }
 +        }
 +      }
 +    });
 +  }
 +  
 +  private void waitForCrash(VM vm) {
 +    vm.invoke(new SerializableRunnable() {
 +      public void run() {
 +        MyListener listener = (MyListener) remoteObjects.get(BRIDGE_LISTENER);
 +        synchronized(listener) {
 +          long end = System.currentTimeMillis() + 10000;
 +          while (listener.crashes== 0) {
 +            try {
 +              long remaining = end - System.currentTimeMillis();
 +              if(remaining < 0) {
 +                break;
 +              }
 +              listener.wait(remaining);
 +            } catch (InterruptedException e) {
 +              fail("interrupted");
 +            }
 +          }
 +        }
 +      }
 +    });
 +  }
 +  
 +  private void waitForDeparture(VM vm) {
 +    vm.invoke(new SerializableRunnable() {
 +      public void run() {
 +        MyListener listener = (MyListener) remoteObjects.get(BRIDGE_LISTENER);
 +        synchronized(listener) {
 +          long end = System.currentTimeMillis() + 10000;
 +          while (listener.departures == 0) {
 +            try {
 +              long remaining = end - System.currentTimeMillis();
 +              if(remaining < 0) {
 +                break;
 +              }
 +              listener.wait(remaining);
 +            } catch (InterruptedException e) {
 +              fail("interrupted");
 +            }
 +          }
 +        }
 +      }
 +    });
 +  }
 +  
 +  public static class MyListener extends ClientMembershipListenerAdapter implements Serializable {
 +    protected int crashes = 0;
 +    protected int joins = 0;
 +    protected int departures= 0;
 +
 +    @Override
 +    public synchronized void memberCrashed(ClientMembershipEvent event) {
 +      crashes++;
 +      notifyAll();
 +    }
 +
 +    public synchronized void reset() {
 +      crashes = 0;
 +      joins = 0;
 +      departures = 0;
 +    }
 +
 +    @Override
 +    public synchronized void memberJoined(ClientMembershipEvent event) {
 +      joins++;
 +      notifyAll();
 +    }
 +
 +    @Override
 +    public synchronized void memberLeft(ClientMembershipEvent event) {
 +      departures++;
 +      notifyAll();
 +    }
 +
 +    public synchronized int getCrashes() {
 +      return crashes;
 +    }
 +
 +    public synchronized int getJoins() {
 +      return joins;
 +    }
 +
 +    public synchronized int  getDepartures() {
 +      return departures;
 +    }
 +  }
 +}