You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/04/21 20:45:32 UTC

[1/2] geode git commit: GEODE-2632: refactor code to use InternalCache instead of GemFireCacheImpl

Repository: geode
Updated Branches:
  refs/heads/develop 0862174c3 -> 363e50d21


http://git-wip-us.apache.org/repos/asf/geode/blob/363e50d2/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceVsdStats.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceVsdStats.java b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceVsdStats.java
index 8435c4c..5dc7bb0 100644
--- a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceVsdStats.java
+++ b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceVsdStats.java
@@ -25,9 +25,9 @@ import org.apache.geode.cache.query.CqException;
 import org.apache.geode.cache.query.CqQuery;
 import org.apache.geode.cache.query.internal.DefaultQueryService;
 import org.apache.geode.internal.NanoTimer;
+import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.statistics.StatisticsTypeFactoryImpl;
 import org.apache.geode.internal.cache.FilterProfile;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.logging.LogService;
 
 /**
@@ -44,35 +44,34 @@ public class CqServiceVsdStats {
   private static final StatisticsType _type;
 
   /** Name of the created CQs statistic */
-  protected static final String CQS_CREATED = "numCqsCreated";
+  private static final String CQS_CREATED = "numCqsCreated";
 
   /** Name of the active CQs statistic */
-  protected static final String CQS_ACTIVE = "numCqsActive";
+  private static final String CQS_ACTIVE = "numCqsActive";
 
   /** Name of the stopped CQs statistic */
-  protected static final String CQS_STOPPED = "numCqsStopped";
+  private static final String CQS_STOPPED = "numCqsStopped";
 
   /** Name of the closed CQs statistic */
-  protected static final String CQS_CLOSED = "numCqsClosed";
+  private static final String CQS_CLOSED = "numCqsClosed";
 
   /** Name of the client's CQs statistic */
-  protected static final String CQS_ON_CLIENT = "numCqsOnClient";
+  private static final String CQS_ON_CLIENT = "numCqsOnClient";
 
   /** Number of clients with CQs statistic */
-  protected static final String CLIENTS_WITH_CQS = "numClientsWithCqs";
-
+  private static final String CLIENTS_WITH_CQS = "numClientsWithCqs";
 
   /** CQ query execution time. */
-  protected static final String CQ_QUERY_EXECUTION_TIME = "cqQueryExecutionTime";
+  private static final String CQ_QUERY_EXECUTION_TIME = "cqQueryExecutionTime";
 
   /** CQ query execution in progress */
-  protected static final String CQ_QUERY_EXECUTION_IN_PROGRESS = "cqQueryExecutionInProgress";
+  private static final String CQ_QUERY_EXECUTION_IN_PROGRESS = "cqQueryExecutionInProgress";
 
   /** Completed CQ query executions */
-  protected static final String CQ_QUERY_EXECUTIONS_COMPLETED = "cqQueryExecutionsCompleted";
+  private static final String CQ_QUERY_EXECUTIONS_COMPLETED = "cqQueryExecutionsCompleted";
 
   /** Unique CQs, number of different CQ queries */
-  protected static final String UNIQUE_CQ_QUERY = "numUniqueCqQuery";
+  private static final String UNIQUE_CQ_QUERY = "numUniqueCqQuery";
 
   /** Id of the CQs created statistic */
   private static final int _numCqsCreatedId;
@@ -104,7 +103,7 @@ public class CqServiceVsdStats {
   /** Id for unique CQs, difference in CQ queries */
   private static final int _numUniqueCqQuery;
 
-  /**
+  /*
    * Static initializer to create and initialize the <code>StatisticsType</code>
    */
   static {
@@ -140,7 +139,6 @@ public class CqServiceVsdStats {
     _cqQueryExecutionsCompletedId = _type.nameToId(CQ_QUERY_EXECUTIONS_COMPLETED);
     _cqQueryExecutionInProgressId = _type.nameToId(CQ_QUERY_EXECUTION_IN_PROGRESS);
     _numUniqueCqQuery = _type.nameToId(UNIQUE_CQ_QUERY);
-
   }
 
   /** The <code>Statistics</code> instance to which most behavior is delegated */
@@ -152,12 +150,10 @@ public class CqServiceVsdStats {
    * @param factory The <code>StatisticsFactory</code> which creates the <code>Statistics</code>
    *        instance
    */
-  public CqServiceVsdStats(StatisticsFactory factory) {
+  CqServiceVsdStats(StatisticsFactory factory) {
     this._stats = factory.createAtomicStatistics(_type, "CqServiceStats");
   }
 
-  // /////////////////// Instance Methods /////////////////////
-
   /**
    * Closes the <code>HARegionQueueStats</code>.
    */
@@ -170,14 +166,14 @@ public class CqServiceVsdStats {
    * 
    * @return the current value of the "numCqsCreated" stat
    */
-  public long getNumCqsCreated() {
+  long getNumCqsCreated() {
     return this._stats.getLong(_numCqsCreatedId);
   }
 
   /**
    * Increments the "numCqsCreated" stat by 1.
    */
-  public void incCqsCreated() {
+  void incCqsCreated() {
     this._stats.incLong(_numCqsCreatedId, 1);
   }
 
@@ -186,21 +182,21 @@ public class CqServiceVsdStats {
    * 
    * @return the current value of the "numCqsActive" stat
    */
-  public long getNumCqsActive() {
+  long getNumCqsActive() {
     return this._stats.getLong(_numCqsActiveId);
   }
 
   /**
    * Increments the "numCqsActive" stat by 1.
    */
-  public void incCqsActive() {
+  void incCqsActive() {
     this._stats.incLong(_numCqsActiveId, 1);
   }
 
   /**
    * Decrements the "numCqsActive" stat by 1.
    */
-  public void decCqsActive() {
+  void decCqsActive() {
     this._stats.incLong(_numCqsActiveId, -1);
   }
 
@@ -209,21 +205,21 @@ public class CqServiceVsdStats {
    * 
    * @return the current value of the "numCqsStopped" stat
    */
-  public long getNumCqsStopped() {
+  long getNumCqsStopped() {
     return this._stats.getLong(_numCqsStoppedId);
   }
 
   /**
    * Increments the "numCqsStopped" stat by 1.
    */
-  public void incCqsStopped() {
+  void incCqsStopped() {
     this._stats.incLong(_numCqsStoppedId, 1);
   }
 
   /**
    * Decrements the "numCqsStopped" stat by 1.
    */
-  public void decCqsStopped() {
+  void decCqsStopped() {
     this._stats.incLong(_numCqsStoppedId, -1);
   }
 
@@ -232,14 +228,14 @@ public class CqServiceVsdStats {
    * 
    * @return the current value of the "numCqsClosed" stat
    */
-  public long getNumCqsClosed() {
+  long getNumCqsClosed() {
     return this._stats.getLong(_numCqsClosedId);
   }
 
   /**
    * Increments the "numCqsClosed" stat by 1.
    */
-  public void incCqsClosed() {
+  void incCqsClosed() {
     this._stats.incLong(_numCqsClosedId, 1);
   }
 
@@ -248,21 +244,21 @@ public class CqServiceVsdStats {
    * 
    * @return the current value of the "numCqsOnClient" stat
    */
-  public long getNumCqsOnClient() {
+  long getNumCqsOnClient() {
     return this._stats.getLong(_numCqsOnClientId);
   }
 
   /**
    * Increments the "numCqsOnClient" stat by 1.
    */
-  public void incCqsOnClient() {
+  void incCqsOnClient() {
     this._stats.incLong(_numCqsOnClientId, 1);
   }
 
   /**
    * Decrements the "numCqsOnClient" stat by 1.
    */
-  public void decCqsOnClient() {
+  void decCqsOnClient() {
     this._stats.incLong(_numCqsOnClientId, -1);
   }
 
@@ -278,21 +274,21 @@ public class CqServiceVsdStats {
   /**
    * Increments the "numClientsWithCqs" stat by 1.
    */
-  public void incClientsWithCqs() {
+  void incClientsWithCqs() {
     this._stats.incLong(_numClientsWithCqsId, 1);
   }
 
   /**
    * Decrements the "numCqsOnClient" stat by 1.
    */
-  public void decClientsWithCqs() {
+  void decClientsWithCqs() {
     this._stats.incLong(_numClientsWithCqsId, -1);
   }
 
   /**
    * Start the CQ Query Execution time.
    */
-  public long startCqQueryExecution() {
+  long startCqQueryExecution() {
     this._stats.incInt(_cqQueryExecutionInProgressId, 1);
     return NanoTimer.getTime();
   }
@@ -302,7 +298,7 @@ public class CqServiceVsdStats {
    * 
    * @param start long time value.
    */
-  public void endCqQueryExecution(long start) {
+  void endCqQueryExecution(long start) {
     long ts = NanoTimer.getTime();
     this._stats.incLong(_cqQueryExecutionTimeId, ts - start);
     this._stats.incInt(_cqQueryExecutionInProgressId, -1);
@@ -321,14 +317,14 @@ public class CqServiceVsdStats {
   /**
    * Increments number of Unique queries.
    */
-  public void incUniqueCqQuery() {
+  void incUniqueCqQuery() {
     this._stats.incInt(_numUniqueCqQuery, 1);
   }
 
   /**
    * Decrements number of unique Queries.
    */
-  public void decUniqueCqQuery() {
+  void decUniqueCqQuery() {
     this._stats.incInt(_numUniqueCqQuery, -1);
   }
 
@@ -338,11 +334,8 @@ public class CqServiceVsdStats {
    * tests.
    * <p>
    * Returns the number of CQs (active + suspended) on the given region.
-   * 
-   * @param regionName
    */
-  public long numCqsOnRegion(String regionName) {
-    GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+  public long numCqsOnRegion(final InternalCache cache, String regionName) {
     if (cache == null) {
       return 0;
     }

http://git-wip-us.apache.org/repos/asf/geode/blob/363e50d2/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/ServerCQImpl.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/ServerCQImpl.java b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/ServerCQImpl.java
index ec6e984..c484105 100644
--- a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/ServerCQImpl.java
+++ b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/ServerCQImpl.java
@@ -21,24 +21,18 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.logging.log4j.Logger;
 
-import org.apache.geode.CancelException;
 import org.apache.geode.DataSerializable;
 import org.apache.geode.DataSerializer;
-import org.apache.geode.SystemFailure;
-import org.apache.geode.cache.CacheClosedException;
 import org.apache.geode.cache.DataPolicy;
 import org.apache.geode.cache.EvictionAction;
-import org.apache.geode.cache.client.internal.UserAttributes;
 import org.apache.geode.cache.query.CqAttributes;
 import org.apache.geode.cache.query.CqAttributesMutator;
 import org.apache.geode.cache.query.CqClosedException;
 import org.apache.geode.cache.query.CqException;
 import org.apache.geode.cache.query.CqExistsException;
-import org.apache.geode.cache.query.CqListener;
 import org.apache.geode.cache.query.CqResults;
 import org.apache.geode.cache.query.Query;
 import org.apache.geode.cache.query.QueryException;
@@ -49,6 +43,7 @@ import org.apache.geode.cache.query.internal.CompiledRegion;
 import org.apache.geode.cache.query.internal.CompiledSelect;
 import org.apache.geode.cache.query.internal.CqStateImpl;
 import org.apache.geode.cache.query.internal.DefaultQuery;
+import org.apache.geode.i18n.StringId;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.cache.Token;
@@ -58,7 +53,6 @@ import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.log4j.LocalizedMessage;
-import org.apache.geode.i18n.StringId;
 
 public class ServerCQImpl extends CqQueryImpl implements DataSerializable, ServerCQ {
   private static final Logger logger = LogService.getLogger();
@@ -84,7 +78,7 @@ public class ServerCQImpl extends CqQueryImpl implements DataSerializable, Serve
   public volatile boolean cqResultKeysInitialized = false;
 
   /** Boolean flag to see if the CQ is on Partitioned Region */
-  public volatile boolean isPR = false;
+  volatile boolean isPR = false;
 
   private ClientProxyMembershipID clientProxyId = null;
 
@@ -92,7 +86,6 @@ public class ServerCQImpl extends CqQueryImpl implements DataSerializable, Serve
 
   private String serverCqName;
 
-
   /** identifier assigned to this query for FilterRoutingInfos */
   private Long filterID;
 
@@ -106,21 +99,11 @@ public class ServerCQImpl extends CqQueryImpl implements DataSerializable, Serve
     // For deserialization
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.geode.cache.query.internal.InternalCqQuery2#getFilterID()
-   */
   @Override
   public Long getFilterID() {
     return this.filterID;
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.geode.cache.query.internal.InternalCqQuery2#setFilterID(java.lang.Long)
-   */
   @Override
   public void setFilterID(Long filterID) {
     this.filterID = filterID;
@@ -142,19 +125,12 @@ public class ServerCQImpl extends CqQueryImpl implements DataSerializable, Serve
 
     CacheClientProxy clientProxy = null;
     this.clientProxyId = p_clientProxyId;
-    // servConnection = serverSideConnection;
 
     if (p_ccn != null) {
       this.ccn = p_ccn;
       clientProxy = p_ccn.getClientProxy(p_clientProxyId, true);
     }
 
-    /*
-     * try { initCq(); } catch (CqExistsException cqe) { // Should not happen. throw new
-     * CqException(LocalizedStrings.CqQueryImpl_UNABLE_TO_CREATE_CQ_0_ERROR__1.toLocalizedString(new
-     * Object[] { cqName, cqe.getMessage()})); }
-     */
-
     validateCq();
 
     final boolean isDebugEnabled = logger.isDebugEnabled();
@@ -228,13 +204,11 @@ public class ServerCQImpl extends CqQueryImpl implements DataSerializable, Serve
       throw new CqException(errMsg);
     }
 
-    // checkAndSetCqOnRegion();
-
     // Can be null by the time we are here
     if (clientProxy != null) {
       clientProxy.incCqCount();
       if (clientProxy.hasOneCq()) {
-        cqService.stats.incClientsWithCqs();
+        cqService.stats().incClientsWithCqs();
       }
       if (isDebugEnabled) {
         logger.debug("Added CQ to the base region: {} With key as: {}", cqBaseRegion.getFullPath(),
@@ -307,7 +281,6 @@ public class ServerCQImpl extends CqQueryImpl implements DataSerializable, Serve
    * query.
    * 
    * @return String modified query.
-   * @throws CqException
    */
   private Query constructServerSideQuery() throws QueryException {
     GemFireCacheImpl cache = (GemFireCacheImpl) cqService.getCache();
@@ -328,7 +301,6 @@ public class ServerCQImpl extends CqQueryImpl implements DataSerializable, Serve
    * Returns if the passed key is part of the CQs result set. This method needs to be called once
    * the CQ result key caching is completed (cqResultsCacheInitialized is true).
    * 
-   * @param key
    * @return true if key is in the Results Cache.
    */
   public boolean isPartOfCqResult(Object key) {
@@ -352,27 +324,18 @@ public class ServerCQImpl extends CqQueryImpl implements DataSerializable, Serve
     }
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.geode.cache.query.internal.InternalCqQuery2#addToCqResultKeys(java.lang.Object)
-   */
   @Override
   public void addToCqResultKeys(Object key) {
     if (!CqServiceProvider.MAINTAIN_KEYS) {
       return;
     }
 
-    // this.logger.fine("Adding key to Results Cache For CQ :" +
-    // this.cqName + " key :" + key);
     if (this.cqResultKeys != null) {
       synchronized (this.cqResultKeys) {
         this.cqResultKeys.put(key, TOKEN);
         if (!this.cqResultKeysInitialized) {
           // This key could be coming after add, destroy.
           // Remove this from destroy queue.
-          // this.logger.fine("Removing key from Destroy Cache For CQ :" +
-          // this.cqName + " key :" + key);
           if (this.destroysWhileCqResultsInProgress != null) {
             this.destroysWhileCqResultsInProgress.remove(key);
           }
@@ -381,21 +344,11 @@ public class ServerCQImpl extends CqQueryImpl implements DataSerializable, Serve
     }
   }
 
-
-  /*
-   * (non-Javadoc)
-   * 
-   * @see
-   * org.apache.geode.cache.query.internal.InternalCqQuery2#removeFromCqResultKeys(java.lang.Object,
-   * boolean)
-   */
   @Override
   public void removeFromCqResultKeys(Object key, boolean isTokenMode) {
     if (!CqServiceProvider.MAINTAIN_KEYS) {
       return;
     }
-    // this.logger.fine("Removing key from Results Cache For CQ :" +
-    // this.cqName + " key :" + key);
     if (this.cqResultKeys != null) {
       synchronized (this.cqResultKeys) {
         if (isTokenMode && this.cqResultKeys.get(key) != Token.DESTROYED) {
@@ -403,8 +356,6 @@ public class ServerCQImpl extends CqQueryImpl implements DataSerializable, Serve
         }
         this.cqResultKeys.remove(key);
         if (!this.cqResultKeysInitialized) {
-          // this.logger.fine("Adding key to Destroy Cache For CQ :" +
-          // this.cqName + " key :" + key);
           if (this.destroysWhileCqResultsInProgress != null) {
             this.destroysWhileCqResultsInProgress.add(key);
           }
@@ -415,10 +366,8 @@ public class ServerCQImpl extends CqQueryImpl implements DataSerializable, Serve
 
   /**
    * Marks the key as destroyed in the CQ Results key cache.
-   * 
-   * @param key
    */
-  public void markAsDestroyedInCqResultKeys(Object key) {
+  void markAsDestroyedInCqResultKeys(Object key) {
     if (!CqServiceProvider.MAINTAIN_KEYS) {
       return;
     }
@@ -439,12 +388,6 @@ public class ServerCQImpl extends CqQueryImpl implements DataSerializable, Serve
     }
   }
 
-
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.geode.cache.query.internal.InternalCqQuery2#setCqResultsCacheInitialized()
-   */
   @Override
   public void setCqResultsCacheInitialized() {
     if (CqServiceProvider.MAINTAIN_KEYS) {
@@ -466,13 +409,6 @@ public class ServerCQImpl extends CqQueryImpl implements DataSerializable, Serve
     }
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see
-   * org.apache.geode.cache.query.internal.InternalCqQuery2#isOldValueRequiredForQueryProcessing(
-   * java.lang.Object)
-   */
   @Override
   public boolean isOldValueRequiredForQueryProcessing(Object key) {
     if (this.cqResultKeysInitialized && this.isPartOfCqResult(key)) {
@@ -484,18 +420,11 @@ public class ServerCQImpl extends CqQueryImpl implements DataSerializable, Serve
   /**
    * Closes the Query. On Client side, sends the cq close request to server. On Server side, takes
    * care of repository cleanup.
-   * 
-   * @throws CqException
    */
   public void close() throws CqClosedException, CqException {
     close(true);
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.geode.cache.query.internal.InternalCqQuery2#close(boolean)
-   */
   @Override
   public void close(boolean sendRequestToServer) throws CqClosedException, CqException {
     final boolean isDebugEnabled = logger.isDebugEnabled();
@@ -523,9 +452,9 @@ public class ServerCQImpl extends CqQueryImpl implements DataSerializable, Serve
 
       // Stat update.
       if (stateBeforeClosing == CqStateImpl.RUNNING) {
-        cqService.stats.decCqsActive();
+        cqService.stats().decCqsActive();
       } else if (stateBeforeClosing == CqStateImpl.STOPPED) {
-        cqService.stats.decCqsStopped();
+        cqService.stats().decCqsStopped();
       }
 
       // Clean-up the CQ Results Cache.
@@ -537,8 +466,8 @@ public class ServerCQImpl extends CqQueryImpl implements DataSerializable, Serve
 
       // Set the state to close, and update stats
       this.cqState.setState(CqStateImpl.CLOSED);
-      cqService.stats.incCqsClosed();
-      cqService.stats.decCqsOnClient();
+      cqService.stats().incCqsClosed();
+      cqService.stats().decCqsOnClient();
       if (this.stats != null)
         this.stats.close();
     }
@@ -564,9 +493,8 @@ public class ServerCQImpl extends CqQueryImpl implements DataSerializable, Serve
 
   /**
    * Clears the resource used by CQ.
-   * 
-   * @throws CqException
    */
+  @Override
   protected void cleanup() throws CqException {
     // CqBaseRegion
     try {
@@ -575,7 +503,7 @@ public class ServerCQImpl extends CqQueryImpl implements DataSerializable, Serve
         CacheClientProxy clientProxy = ccn.getClientProxy(clientProxyId);
         clientProxy.decCqCount();
         if (clientProxy.hasNoCq()) {
-          cqService.stats.decClientsWithCqs();
+          cqService.stats().decClientsWithCqs();
         }
       }
     } catch (Exception ex) {
@@ -587,16 +515,9 @@ public class ServerCQImpl extends CqQueryImpl implements DataSerializable, Serve
   }
 
   /**
-   * @param serverCqName The serverCqName to set.
-   */
-  public void setServerCqName(String serverCqName) {
-
-    this.serverCqName = serverCqName;
-  }
-
-  /**
    * Stop or pause executing the query.
    */
+  @Override
   public void stop() throws CqClosedException, CqException {
     boolean isStopped = false;
     synchronized (this.cqState) {
@@ -613,18 +534,16 @@ public class ServerCQImpl extends CqQueryImpl implements DataSerializable, Serve
 
       // Change state and stats on the client side
       this.cqState.setState(CqStateImpl.STOPPED);
-      this.cqService.stats.incCqsStopped();
-      this.cqService.stats.decCqsActive();
+      this.cqService.stats().incCqsStopped();
+      this.cqService.stats().decCqsActive();
       if (logger.isDebugEnabled()) {
         logger.debug("Successfully stopped the CQ. {}", cqName);
       }
     }
   }
 
-  /* DataSerializableFixedID methods ---------------------------------------- */
-
+  @Override
   public void fromData(DataInput in) throws IOException, ClassNotFoundException {
-    // this.cqName = DataSerializer.readString(in);
     synchronized (cqState) {
       this.cqState.setState(DataSerializer.readInteger(in));
     }
@@ -633,23 +552,14 @@ public class ServerCQImpl extends CqQueryImpl implements DataSerializable, Serve
     this.filterID = in.readLong();
   }
 
-  /*
-   * public int getDSFID() { return CQ_QUERY; }
-   */
-
+  @Override
   public void toData(DataOutput out) throws IOException {
-    // DataSerializer.writeString(this.cqName, out);
     DataSerializer.writeInteger(this.cqState.getState(), out);
     DataSerializer.writeBoolean(this.isDurable, out);
     DataSerializer.writeString(this.queryString, out);
     out.writeLong(this.filterID);
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.geode.cache.query.internal.InternalCqQuery2#isPR()
-   */
   @Override
   public boolean isPR() {
     return isPR;
@@ -676,5 +586,4 @@ public class ServerCQImpl extends CqQueryImpl implements DataSerializable, Serve
     throw new IllegalStateException("Execute cannot be called on a CQ on the server");
   }
 
-
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/363e50d2/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteCQ.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteCQ.java b/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteCQ.java
index bcf9806..9bddbc7 100644
--- a/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteCQ.java
+++ b/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteCQ.java
@@ -27,7 +27,6 @@ import org.apache.geode.cache.query.internal.DefaultQueryService;
 import org.apache.geode.cache.query.internal.cq.CqService;
 import org.apache.geode.cache.query.internal.cq.ServerCQ;
 import org.apache.geode.distributed.internal.DistributionStats;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.tier.CachedRegionHelper;
 import org.apache.geode.internal.cache.tier.Command;
 import org.apache.geode.internal.cache.tier.MessageType;
@@ -85,8 +84,7 @@ public class ExecuteCQ extends BaseCQCommand {
     ServerCQ cqQuery = null;
 
     try {
-      qService =
-          (DefaultQueryService) ((GemFireCacheImpl) crHelper.getCache()).getLocalQueryService();
+      qService = (DefaultQueryService) crHelper.getCache().getLocalQueryService();
 
       // Authorization check
       AuthorizeRequest authzRequest = servConn.getAuthzRequest();

http://git-wip-us.apache.org/repos/asf/geode/blob/363e50d2/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteCQ61.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteCQ61.java b/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteCQ61.java
index f333b4b..de61445 100755
--- a/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteCQ61.java
+++ b/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteCQ61.java
@@ -28,7 +28,6 @@ import org.apache.geode.cache.query.internal.cq.CqServiceImpl;
 import org.apache.geode.cache.query.internal.cq.CqServiceProvider;
 import org.apache.geode.cache.query.internal.cq.ServerCQImpl;
 import org.apache.geode.distributed.internal.DistributionStats;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.tier.CachedRegionHelper;
 import org.apache.geode.internal.cache.tier.Command;
 import org.apache.geode.internal.cache.tier.MessageType;
@@ -111,8 +110,7 @@ public class ExecuteCQ61 extends BaseCQCommand {
     ServerCQImpl cqQuery = null;
 
     try {
-      qService =
-          (DefaultQueryService) ((GemFireCacheImpl) crHelper.getCache()).getLocalQueryService();
+      qService = (DefaultQueryService) crHelper.getCache().getLocalQueryService();
 
       // Authorization check
       AuthorizeRequest authzRequest = servConn.getAuthzRequest();

http://git-wip-us.apache.org/repos/asf/geode/blob/363e50d2/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetDurableCQs.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetDurableCQs.java b/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetDurableCQs.java
index eac9ed3..a2d201d 100755
--- a/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetDurableCQs.java
+++ b/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetDurableCQs.java
@@ -22,7 +22,6 @@ import java.util.List;
 import org.apache.geode.cache.query.CqException;
 import org.apache.geode.cache.query.internal.DefaultQueryService;
 import org.apache.geode.cache.query.internal.cq.CqService;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.tier.CachedRegionHelper;
 import org.apache.geode.internal.cache.tier.Command;
 import org.apache.geode.internal.cache.tier.MessageType;
@@ -64,8 +63,7 @@ public class GetDurableCQs extends BaseCQCommand {
     CqService cqServiceForExec = null;
 
     try {
-      qService =
-          (DefaultQueryService) ((GemFireCacheImpl) crHelper.getCache()).getLocalQueryService();
+      qService = (DefaultQueryService) crHelper.getCache().getLocalQueryService();
 
       this.securityService.authorizeClusterRead();
 

http://git-wip-us.apache.org/repos/asf/geode/blob/363e50d2/geode-cq/src/test/java/org/apache/geode/cache/query/cq/dunit/CqStatsDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/test/java/org/apache/geode/cache/query/cq/dunit/CqStatsDUnitTest.java b/geode-cq/src/test/java/org/apache/geode/cache/query/cq/dunit/CqStatsDUnitTest.java
index 7ace0e8..f4cd706 100644
--- a/geode-cq/src/test/java/org/apache/geode/cache/query/cq/dunit/CqStatsDUnitTest.java
+++ b/geode-cq/src/test/java/org/apache/geode/cache/query/cq/dunit/CqStatsDUnitTest.java
@@ -14,20 +14,15 @@
  */
 package org.apache.geode.cache.query.cq.dunit;
 
-import org.junit.experimental.categories.Category;
-import org.junit.Test;
-
 import static org.junit.Assert.*;
 
-import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
-import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
-import org.apache.geode.test.junit.categories.DistributedTest;
-
 import java.util.Collection;
 
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
 import org.apache.geode.cache.CacheException;
 import org.apache.geode.cache.query.CqException;
-import org.apache.geode.cache.query.CqQuery;
 import org.apache.geode.cache.query.CqServiceStatistics;
 import org.apache.geode.cache.query.CqStatistics;
 import org.apache.geode.cache.query.QueryService;
@@ -40,7 +35,7 @@ import org.apache.geode.cache.query.internal.cq.CqServiceImpl;
 import org.apache.geode.cache.query.internal.cq.CqServiceVsdStats;
 import org.apache.geode.cache.query.internal.cq.InternalCqQuery;
 import org.apache.geode.cache30.CacheSerializableRunnable;
-import org.apache.geode.cache30.CacheTestCase;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.test.dunit.Host;
 import org.apache.geode.test.dunit.Invoke;
 import org.apache.geode.test.dunit.LogWriterUtils;
@@ -48,27 +43,26 @@ import org.apache.geode.test.dunit.NetworkUtils;
 import org.apache.geode.test.dunit.SerializableRunnable;
 import org.apache.geode.test.dunit.VM;
 import org.apache.geode.test.dunit.Wait;
+import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
+import org.apache.geode.test.junit.categories.DistributedTest;
 
 /**
- * This class tests the ContiunousQuery mechanism in GemFire. This includes the test with different
+ * This class tests the ContinuousQuery mechanism in GemFire. This includes the test with different
  * data activities.
- *
  */
 @Category(DistributedTest.class)
 public class CqStatsDUnitTest extends JUnit4CacheTestCase {
 
+  // TODO: delete this use of CqQueryDUnitTest
   private CqQueryDUnitTest cqDUnitTest = new CqQueryDUnitTest();
 
-  public CqStatsDUnitTest() {
-    super();
-  }
-
   @Override
   public final void postSetUp() throws Exception {
     // avoid IllegalStateException from HandShake by connecting all vms to
     // system before creating pool
     getSystem();
     Invoke.invokeInEveryVM(new SerializableRunnable("getSystem") {
+      @Override
       public void run() {
         getSystem();
       }
@@ -81,6 +75,7 @@ public class CqStatsDUnitTest extends JUnit4CacheTestCase {
   public void validateCQStats(VM vm, final String cqName, final int creates, final int updates,
       final int deletes, final int totalEvents, final int cqListenerInvocations) {
     vm.invoke(new CacheSerializableRunnable("Validate CQs") {
+      @Override
       public void run2() throws CacheException {
         LogWriterUtils.getLogWriter().info("### Validating CQ Stats. ### " + cqName);
         // Get CQ Service.
@@ -161,6 +156,7 @@ public class CqStatsDUnitTest extends JUnit4CacheTestCase {
       final int stopped, final int closed, final int cqsOnClient, final int cqsOnRegion,
       final int clientsWithCqs) {
     vm.invoke(new CacheSerializableRunnable("Validate CQ Service Stats") {
+      @Override
       public void run2() throws CacheException {
         LogWriterUtils.getLogWriter().info("### Validating CQ Service Stats. ### ");
         // Get CQ Service.
@@ -176,7 +172,7 @@ public class CqStatsDUnitTest extends JUnit4CacheTestCase {
         CqServiceVsdStats cqServiceVsdStats = null;
         try {
           cqServiceVsdStats =
-              ((CqServiceImpl) ((DefaultQueryService) qService).getCqService()).stats;
+              ((CqServiceImpl) ((DefaultQueryService) qService).getCqService()).stats();
         } catch (CqException e) {
           // TODO Auto-generated catch block
           e.printStackTrace();
@@ -185,12 +181,14 @@ public class CqStatsDUnitTest extends JUnit4CacheTestCase {
           fail("Failed to get CQ Service Stats");
         }
 
-        getCache().getLogger().info("#### CQ Service stats: " + " CQs created: "
-            + cqServiceStats.numCqsCreated() + " CQs active: " + cqServiceStats.numCqsActive()
-            + " CQs stopped: " + cqServiceStats.numCqsStopped() + " CQs closed: "
-            + cqServiceStats.numCqsClosed() + " CQs on Client: " + cqServiceStats.numCqsOnClient()
-            + " CQs on region /root/regionA : " + cqServiceVsdStats.numCqsOnRegion("/root/regionA")
-            + " Clients with CQs: " + cqServiceVsdStats.getNumClientsWithCqs());
+        getCache().getLogger()
+            .info("#### CQ Service stats: " + " CQs created: " + cqServiceStats.numCqsCreated()
+                + " CQs active: " + cqServiceStats.numCqsActive() + " CQs stopped: "
+                + cqServiceStats.numCqsStopped() + " CQs closed: " + cqServiceStats.numCqsClosed()
+                + " CQs on Client: " + cqServiceStats.numCqsOnClient()
+                + " CQs on region /root/regionA : "
+                + cqServiceVsdStats.numCqsOnRegion(GemFireCacheImpl.getInstance(), "/root/regionA")
+                + " Clients with CQs: " + cqServiceVsdStats.getNumClientsWithCqs());
 
 
         // Check for created count.
@@ -223,7 +221,7 @@ public class CqStatsDUnitTest extends JUnit4CacheTestCase {
         // Check for CQs on region.
         if (cqsOnRegion != CqQueryDUnitTest.noTest) {
           assertEquals("Number of CQs on region /root/regionA mismatch", cqsOnRegion,
-              cqServiceVsdStats.numCqsOnRegion("/root/regionA"));
+              cqServiceVsdStats.numCqsOnRegion(GemFireCacheImpl.getInstance(), "/root/regionA"));
         }
 
         // Check for clients with CQs count.

http://git-wip-us.apache.org/repos/asf/geode/blob/363e50d2/geode-cq/src/test/java/org/apache/geode/cache/query/cq/dunit/CqStatsUsingPoolDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/test/java/org/apache/geode/cache/query/cq/dunit/CqStatsUsingPoolDUnitTest.java b/geode-cq/src/test/java/org/apache/geode/cache/query/cq/dunit/CqStatsUsingPoolDUnitTest.java
index d6068f1..c03bb8b 100644
--- a/geode-cq/src/test/java/org/apache/geode/cache/query/cq/dunit/CqStatsUsingPoolDUnitTest.java
+++ b/geode-cq/src/test/java/org/apache/geode/cache/query/cq/dunit/CqStatsUsingPoolDUnitTest.java
@@ -14,21 +14,16 @@
  */
 package org.apache.geode.cache.query.cq.dunit;
 
-import org.junit.experimental.categories.Category;
-import org.junit.Test;
-
 import static org.junit.Assert.*;
 
-import org.apache.geode.distributed.*;
-import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
-import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
-import org.apache.geode.test.junit.categories.DistributedTest;
+import java.util.Collection;
+import java.util.Properties;
 
-import java.util.*;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import org.apache.geode.cache.CacheException;
 import org.apache.geode.cache.query.CqException;
-import org.apache.geode.cache.query.CqQuery;
 import org.apache.geode.cache.query.CqServiceStatistics;
 import org.apache.geode.cache.query.CqStatistics;
 import org.apache.geode.cache.query.QueryService;
@@ -41,7 +36,8 @@ import org.apache.geode.cache.query.internal.cq.CqServiceImpl;
 import org.apache.geode.cache.query.internal.cq.CqServiceVsdStats;
 import org.apache.geode.cache.query.internal.cq.InternalCqQuery;
 import org.apache.geode.cache30.CacheSerializableRunnable;
-import org.apache.geode.cache30.CacheTestCase;
+import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.test.dunit.Host;
 import org.apache.geode.test.dunit.Invoke;
 import org.apache.geode.test.dunit.LogWriterUtils;
@@ -49,21 +45,19 @@ import org.apache.geode.test.dunit.NetworkUtils;
 import org.apache.geode.test.dunit.SerializableRunnable;
 import org.apache.geode.test.dunit.VM;
 import org.apache.geode.test.dunit.Wait;
+import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
+import org.apache.geode.test.junit.categories.DistributedTest;
 
 /**
- * This class tests the ContiunousQuery mechanism in GemFire. This includes the test with different
+ * This class tests the ContinuousQuery mechanism in GemFire. This includes the test with different
  * data activities.
- *
  */
 @Category(DistributedTest.class)
 public class CqStatsUsingPoolDUnitTest extends JUnit4CacheTestCase {
 
+  // TODO: delete this use of CqQueryUsingPoolDUnitTest
   private CqQueryUsingPoolDUnitTest cqDUnitTest = new CqQueryUsingPoolDUnitTest();
 
-  public CqStatsUsingPoolDUnitTest() {
-    super();
-  }
-
   @Override
   public Properties getDistributedSystemProperties() {
     Properties result = super.getDistributedSystemProperties();
@@ -77,6 +71,7 @@ public class CqStatsUsingPoolDUnitTest extends JUnit4CacheTestCase {
     // system before creating pool
     getSystem();
     Invoke.invokeInEveryVM(new SerializableRunnable("getSystem") {
+      @Override
       public void run() {
         getSystem();
       }
@@ -89,6 +84,7 @@ public class CqStatsUsingPoolDUnitTest extends JUnit4CacheTestCase {
   private void validateCQStats(VM vm, final String cqName, final int creates, final int updates,
       final int deletes, final int totalEvents, final int cqListenerInvocations) {
     vm.invoke(new CacheSerializableRunnable("Validate CQs") {
+      @Override
       public void run2() throws CacheException {
         LogWriterUtils.getLogWriter().info("### Validating CQ Stats. ### " + cqName);
         // Get CQ Service.
@@ -169,6 +165,7 @@ public class CqStatsUsingPoolDUnitTest extends JUnit4CacheTestCase {
       final int stopped, final int closed, final int cqsOnClient, final int cqsOnRegion,
       final int clientsWithCqs) {
     vm.invoke(new CacheSerializableRunnable("Validate CQ Service Stats") {
+      @Override
       public void run2() throws CacheException {
         LogWriterUtils.getLogWriter().info("### Validating CQ Service Stats. ### ");
         // Get CQ Service.
@@ -184,7 +181,7 @@ public class CqStatsUsingPoolDUnitTest extends JUnit4CacheTestCase {
         CqServiceVsdStats cqServiceVsdStats = null;
         try {
           cqServiceVsdStats =
-              ((CqServiceImpl) ((DefaultQueryService) qService).getCqService()).stats;
+              ((CqServiceImpl) ((DefaultQueryService) qService).getCqService()).stats();
         } catch (CqException e) {
           // TODO Auto-generated catch block
           e.printStackTrace();
@@ -193,12 +190,14 @@ public class CqStatsUsingPoolDUnitTest extends JUnit4CacheTestCase {
           fail("Failed to get CQ Service Stats");
         }
 
-        getCache().getLogger().info("#### CQ Service stats: " + " CQs created: "
-            + cqServiceStats.numCqsCreated() + " CQs active: " + cqServiceStats.numCqsActive()
-            + " CQs stopped: " + cqServiceStats.numCqsStopped() + " CQs closed: "
-            + cqServiceStats.numCqsClosed() + " CQs on Client: " + cqServiceStats.numCqsOnClient()
-            + " CQs on region /root/regionA : " + cqServiceVsdStats.numCqsOnRegion("/root/regionA")
-            + " Clients with CQs: " + cqServiceVsdStats.getNumClientsWithCqs());
+        getCache().getLogger()
+            .info("#### CQ Service stats: " + " CQs created: " + cqServiceStats.numCqsCreated()
+                + " CQs active: " + cqServiceStats.numCqsActive() + " CQs stopped: "
+                + cqServiceStats.numCqsStopped() + " CQs closed: " + cqServiceStats.numCqsClosed()
+                + " CQs on Client: " + cqServiceStats.numCqsOnClient()
+                + " CQs on region /root/regionA : "
+                + cqServiceVsdStats.numCqsOnRegion(GemFireCacheImpl.getInstance(), "/root/regionA")
+                + " Clients with CQs: " + cqServiceVsdStats.getNumClientsWithCqs());
 
 
         // Check for created count.
@@ -231,7 +230,7 @@ public class CqStatsUsingPoolDUnitTest extends JUnit4CacheTestCase {
         // Check for CQs on region.
         if (cqsOnRegion != CqQueryUsingPoolDUnitTest.noTest) {
           assertEquals("Number of CQs on region /root/regionA mismatch", cqsOnRegion,
-              cqServiceVsdStats.numCqsOnRegion("/root/regionA"));
+              cqServiceVsdStats.numCqsOnRegion(GemFireCacheImpl.getInstance(), "/root/regionA"));
         }
 
         // Check for clients with CQs count.

http://git-wip-us.apache.org/repos/asf/geode/blob/363e50d2/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesFunctionCollector.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesFunctionCollector.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesFunctionCollector.java
index 66c4c0a..5dd0d24 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesFunctionCollector.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesFunctionCollector.java
@@ -15,10 +15,8 @@
 
 package org.apache.geode.cache.lucene.internal.distributed;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.logging.log4j.Logger;
@@ -27,7 +25,7 @@ import org.apache.geode.cache.execute.FunctionException;
 import org.apache.geode.cache.execute.ResultCollector;
 import org.apache.geode.cache.lucene.LuceneQuery;
 import org.apache.geode.distributed.DistributedMember;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.logging.LogService;
 
 /**
@@ -44,16 +42,15 @@ import org.apache.geode.internal.logging.LogService;
  */
 public class TopEntriesFunctionCollector
     implements ResultCollector<TopEntriesCollector, TopEntries> {
-  // Use this instance to perform reduce operation
-  final CollectorManager<TopEntriesCollector> manager;
+  private static final Logger logger = LogService.getLogger();
 
-  final String id;
+  // Use this instance to perform reduce operation
+  private final CollectorManager<TopEntriesCollector> manager;
 
-  // Instance of gemfire cache to check status and other utility methods
-  final private GemFireCacheImpl cache;
-  private static final Logger logger = LogService.getLogger();
+  private final String id;
 
   private final Collection<TopEntriesCollector> subResults = new ArrayList<>();
+
   private TopEntriesCollector mergedResults;
 
   public TopEntriesFunctionCollector() {
@@ -65,8 +62,7 @@ public class TopEntriesFunctionCollector
   }
 
   public TopEntriesFunctionCollector(LuceneFunctionContext<TopEntriesCollector> context,
-      GemFireCacheImpl cache) {
-    this.cache = cache;
+      InternalCache cache) {
     id = cache == null ? String.valueOf(this.hashCode()) : cache.getName();
 
     int limit = context == null ? 0 : context.getLimit();
@@ -115,4 +111,8 @@ public class TopEntriesFunctionCollector
       subResults.add(resultOfSingleExecution);
     }
   }
+
+  String id() {
+    return this.id;
+  }
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/363e50d2/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java
index 5313ced..6690850 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java
@@ -12,7 +12,6 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-
 package org.apache.geode.cache.lucene.internal.distributed;
 
 import static org.apache.geode.cache.lucene.test.LuceneTestUtilities.DEFAULT_FIELD;
@@ -41,6 +40,7 @@ import org.apache.geode.cache.lucene.internal.StringQueryProvider;
 import org.apache.geode.cache.lucene.internal.repository.IndexRepository;
 import org.apache.geode.cache.lucene.internal.repository.IndexResultCollector;
 import org.apache.geode.cache.lucene.internal.repository.RepositoryManager;
+import org.apache.geode.cache.lucene.test.LuceneTestUtilities;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.execute.InternalFunctionInvocationTargetException;
 import org.apache.geode.internal.cache.execute.InternalRegionFunctionContext;
@@ -58,30 +58,30 @@ import org.mockito.Mockito;
 @Category(UnitTest.class)
 public class LuceneQueryFunctionJUnitTest {
 
-  String regionPath = "/region";
-  String indexName = "index";
-  final EntryScore<String> r1_1 = new EntryScore<String>("key-1-1", .5f);
-  final EntryScore<String> r1_2 = new EntryScore<String>("key-1-2", .4f);
-  final EntryScore<String> r1_3 = new EntryScore<String>("key-1-3", .3f);
-  final EntryScore<String> r2_1 = new EntryScore<String>("key-2-1", .45f);
-  final EntryScore<String> r2_2 = new EntryScore<String>("key-2-2", .35f);
-
-  InternalRegionFunctionContext mockContext;
-  ResultSender<TopEntriesCollector> mockResultSender;
-  Region<Object, Object> mockRegion;
-
-  RepositoryManager mockRepoManager;
-  IndexRepository mockRepository1;
-  IndexRepository mockRepository2;
-  IndexResultCollector mockCollector;
-  InternalLuceneService mockService;
-  LuceneIndexImpl mockIndex;
-  LuceneIndexStats mockStats;
-
-  ArrayList<IndexRepository> repos;
-  LuceneFunctionContext<IndexResultCollector> searchArgs;
-  LuceneQueryProvider queryProvider;
-  Query query;
+  private String regionPath = "/region";
+
+  private final EntryScore<String> r1_1 = new EntryScore<>("key-1-1", .5f);
+  private final EntryScore<String> r1_2 = new EntryScore<>("key-1-2", .4f);
+  private final EntryScore<String> r1_3 = new EntryScore<>("key-1-3", .3f);
+  private final EntryScore<String> r2_1 = new EntryScore<>("key-2-1", .45f);
+  private final EntryScore<String> r2_2 = new EntryScore<>("key-2-2", .35f);
+
+  private InternalRegionFunctionContext mockContext;
+  private ResultSender<TopEntriesCollector> mockResultSender;
+  private Region<Object, Object> mockRegion;
+
+  private RepositoryManager mockRepoManager;
+  private IndexRepository mockRepository1;
+  private IndexRepository mockRepository2;
+  private IndexResultCollector mockCollector;
+  private InternalLuceneService mockService;
+  private LuceneIndexImpl mockIndex;
+  private LuceneIndexStats mockStats;
+
+  private ArrayList<IndexRepository> repos;
+  private LuceneFunctionContext<IndexResultCollector> searchArgs;
+  private LuceneQueryProvider queryProvider;
+  private Query query;
 
   private InternalCache mockCache;
 
@@ -120,7 +120,7 @@ public class LuceneQueryFunctionJUnitTest {
 
     List<EntryScore> hits = result.getEntries().getHits();
     assertEquals(5, hits.size());
-    TopEntriesJUnitTest.verifyResultOrder(result.getEntries().getHits(), r1_1, r2_1, r1_2, r2_2,
+    LuceneTestUtilities.verifyResultOrder(result.getEntries().getHits(), r1_1, r2_1, r1_2, r2_2,
         r1_3);
   }
 
@@ -161,7 +161,7 @@ public class LuceneQueryFunctionJUnitTest {
 
     List<EntryScore> hits = result.getEntries().getHits();
     assertEquals(3, hits.size());
-    TopEntriesJUnitTest.verifyResultOrder(result.getEntries().getHits(), r1_1, r2_1, r1_2);
+    LuceneTestUtilities.verifyResultOrder(result.getEntries().getHits(), r1_1, r2_1, r1_2);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/geode/blob/363e50d2/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesCollectorJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesCollectorJUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesCollectorJUnitTest.java
index 3bfebdf..5767390 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesCollectorJUnitTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesCollectorJUnitTest.java
@@ -26,21 +26,22 @@ import org.junit.experimental.categories.Category;
 import org.apache.geode.CopyHelper;
 import org.apache.geode.cache.lucene.internal.LuceneServiceImpl;
 import org.apache.geode.cache.lucene.internal.distributed.TopEntriesCollectorManager.ListScanner;
+import org.apache.geode.cache.lucene.test.LuceneTestUtilities;
 import org.apache.geode.test.junit.categories.UnitTest;
 
 @Category(UnitTest.class)
 public class TopEntriesCollectorJUnitTest {
 
-  private EntryScore<String> r1_1 = new EntryScore<String>("1-1", .9f);
-  private EntryScore<String> r1_2 = new EntryScore<String>("1-2", .7f);
-  private EntryScore<String> r1_3 = new EntryScore<String>("1-3", .5f);
+  private EntryScore<String> r1_1 = new EntryScore<>("1-1", .9f);
+  private EntryScore<String> r1_2 = new EntryScore<>("1-2", .7f);
+  private EntryScore<String> r1_3 = new EntryScore<>("1-3", .5f);
 
-  private EntryScore<String> r2_1 = new EntryScore<String>("2-1", .85f);
-  private EntryScore<String> r2_2 = new EntryScore<String>("2-2", .65f);
+  private EntryScore<String> r2_1 = new EntryScore<>("2-1", .85f);
+  private EntryScore<String> r2_2 = new EntryScore<>("2-2", .65f);
 
-  private EntryScore<String> r3_1 = new EntryScore<String>("3-1", .8f);
-  private EntryScore<String> r3_2 = new EntryScore<String>("3-2", .6f);
-  private EntryScore<String> r3_3 = new EntryScore<String>("3-3", .4f);
+  private EntryScore<String> r3_1 = new EntryScore<>("3-1", .8f);
+  private EntryScore<String> r3_2 = new EntryScore<>("3-2", .6f);
+  private EntryScore<String> r3_3 = new EntryScore<>("3-3", .4f);
 
   private TopEntriesCollectorManager manager;
 
@@ -72,7 +73,7 @@ public class TopEntriesCollectorJUnitTest {
 
     TopEntriesCollector hits = manager.reduce(collectors);
     assertEquals(8, hits.getEntries().getHits().size());
-    TopEntriesJUnitTest.verifyResultOrder(hits.getEntries().getHits(), r1_1, r2_1, r3_1, r1_2, r2_2,
+    LuceneTestUtilities.verifyResultOrder(hits.getEntries().getHits(), r1_1, r2_1, r3_1, r1_2, r2_2,
         r3_2, r1_3, r3_3);
 
     // input collector should not change
@@ -116,7 +117,7 @@ public class TopEntriesCollectorJUnitTest {
     c1.collect(r1_3.getKey(), r1_3.getScore());
 
     assertEquals(3, c1.getEntries().getHits().size());
-    TopEntriesJUnitTest.verifyResultOrder(c1.getEntries().getHits(), r1_1, r1_2, r1_3);
+    LuceneTestUtilities.verifyResultOrder(c1.getEntries().getHits(), r1_1, r1_2, r1_3);
 
     ListScanner scanner = new ListScanner(c1.getEntries().getHits());
     assertTrue(scanner.hasNext());
@@ -131,6 +132,6 @@ public class TopEntriesCollectorJUnitTest {
     assertFalse(scanner.hasNext());
 
     assertEquals(3, c1.getEntries().getHits().size());
-    TopEntriesJUnitTest.verifyResultOrder(c1.getEntries().getHits(), r1_1, r1_2, r1_3);
+    LuceneTestUtilities.verifyResultOrder(c1.getEntries().getHits(), r1_1, r1_2, r1_3);
   }
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/363e50d2/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesFunctionCollectorJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesFunctionCollectorJUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesFunctionCollectorJUnitTest.java
index bf08877..5fd9e2d 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesFunctionCollectorJUnitTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesFunctionCollectorJUnitTest.java
@@ -12,7 +12,6 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-
 package org.apache.geode.cache.lucene.internal.distributed;
 
 import static org.junit.Assert.*;
@@ -20,9 +19,7 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.*;
 
 import java.util.Collection;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 
 import org.junit.Assert;
 import org.junit.Before;
@@ -31,25 +28,26 @@ import org.junit.experimental.categories.Category;
 import org.mockito.ArgumentMatcher;
 import org.mockito.Mockito;
 
-import org.apache.geode.CancelCriterion;
-import org.apache.geode.cache.execute.FunctionException;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.cache.lucene.test.LuceneTestUtilities;
+import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.test.junit.categories.UnitTest;
 
 @Category(UnitTest.class)
 public class TopEntriesFunctionCollectorJUnitTest {
-  EntryScore<String> r1_1;
-  EntryScore<String> r1_2;
-  EntryScore<String> r2_1;
-  EntryScore<String> r2_2;
-  TopEntriesCollector result1, result2;
+
+  private EntryScore<String> r1_1;
+  private EntryScore<String> r1_2;
+  private EntryScore<String> r2_1;
+  private EntryScore<String> r2_2;
+  private TopEntriesCollector result1;
+  private TopEntriesCollector result2;
 
   @Before
   public void initializeCommonObjects() {
-    r1_1 = new EntryScore<String>("3", .9f);
-    r1_2 = new EntryScore<String>("1", .8f);
-    r2_1 = new EntryScore<String>("2", 0.85f);
-    r2_2 = new EntryScore<String>("4", 0.1f);
+    r1_1 = new EntryScore<>("3", .9f);
+    r1_2 = new EntryScore<>("1", .8f);
+    r2_1 = new EntryScore<>("2", 0.85f);
+    r2_2 = new EntryScore<>("4", 0.1f);
 
     result1 = new TopEntriesCollector(null);
     result1.collect(r1_1);
@@ -73,13 +71,9 @@ public class TopEntriesFunctionCollectorJUnitTest {
     collector.addResult(null, result1);
     collector.addResult(null, result2);
 
-    final CountDownLatch insideThread = new CountDownLatch(1);
-    final CountDownLatch resultReceived = new CountDownLatch(1);
-
-    final AtomicReference<TopEntries> result = new AtomicReference<>();
     TopEntries merged = collector.getResult(1, TimeUnit.SECONDS);
     assertEquals(4, merged.size());
-    TopEntriesJUnitTest.verifyResultOrder(merged.getHits(), r1_1, r2_1, r1_2, r2_2);
+    LuceneTestUtilities.verifyResultOrder(merged.getHits(), r1_1, r2_1, r1_2, r2_2);
   }
 
   @Test
@@ -95,7 +89,7 @@ public class TopEntriesFunctionCollectorJUnitTest {
     TopEntries merged = collector.getResult();
     Assert.assertNotNull(merged);
     assertEquals(3, merged.size());
-    TopEntriesJUnitTest.verifyResultOrder(merged.getHits(), r1_1, r2_1, r1_2);
+    LuceneTestUtilities.verifyResultOrder(merged.getHits(), r1_1, r2_1, r1_2);
   }
 
   @Test
@@ -108,7 +102,7 @@ public class TopEntriesFunctionCollectorJUnitTest {
     TopEntries merged = collector.getResult();
     Assert.assertNotNull(merged);
     assertEquals(4, merged.size());
-    TopEntriesJUnitTest.verifyResultOrder(merged.getHits(), r1_1, r2_1, r1_2, r2_2);
+    LuceneTestUtilities.verifyResultOrder(merged.getHits(), r1_1, r2_1, r1_2, r2_2);
   }
 
   @Test
@@ -121,12 +115,12 @@ public class TopEntriesFunctionCollectorJUnitTest {
     TopEntries merged = collector.getResult();
     Assert.assertNotNull(merged);
     assertEquals(4, merged.size());
-    TopEntriesJUnitTest.verifyResultOrder(merged.getHits(), r1_1, r2_1, r1_2, r2_2);
+    LuceneTestUtilities.verifyResultOrder(merged.getHits(), r1_1, r2_1, r1_2, r2_2);
 
     merged = collector.getResult();
     Assert.assertNotNull(merged);
     assertEquals(4, merged.size());
-    TopEntriesJUnitTest.verifyResultOrder(merged.getHits(), r1_1, r2_1, r1_2, r2_2);
+    LuceneTestUtilities.verifyResultOrder(merged.getHits(), r1_1, r2_1, r1_2, r2_2);
   }
 
   @Test
@@ -167,7 +161,7 @@ public class TopEntriesFunctionCollectorJUnitTest {
     TopEntries merged = collector.getResult();
     Assert.assertNotNull(merged);
     assertEquals(2, merged.size());
-    TopEntriesJUnitTest.verifyResultOrder(merged.getHits(), r2_1, r2_2);
+    LuceneTestUtilities.verifyResultOrder(merged.getHits(), r2_1, r2_2);
   }
 
   @Test(expected = RuntimeException.class)
@@ -184,10 +178,10 @@ public class TopEntriesFunctionCollectorJUnitTest {
 
   @Test
   public void testCollectorName() {
-    GemFireCacheImpl mockCache = mock(GemFireCacheImpl.class);
+    InternalCache mockCache = mock(InternalCache.class);
     Mockito.doReturn("server").when(mockCache).getName();
 
     TopEntriesFunctionCollector function = new TopEntriesFunctionCollector(null, mockCache);
-    assertEquals("server", function.id);
+    assertEquals("server", function.id());
   }
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/363e50d2/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesJUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesJUnitTest.java
index fcfebbc..e21ac7f 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesJUnitTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesJUnitTest.java
@@ -16,9 +16,6 @@ package org.apache.geode.cache.lucene.internal.distributed;
 
 import static org.junit.Assert.*;
 
-import java.util.Collection;
-import java.util.Iterator;
-
 import org.jmock.Mockery;
 import org.jmock.lib.concurrent.Synchroniser;
 import org.jmock.lib.legacy.ClassImposteriser;
@@ -30,6 +27,7 @@ import org.junit.experimental.categories.Category;
 import org.apache.geode.CopyHelper;
 import org.apache.geode.cache.lucene.LuceneQueryFactory;
 import org.apache.geode.cache.lucene.internal.LuceneServiceImpl;
+import org.apache.geode.cache.lucene.test.LuceneTestUtilities;
 import org.apache.geode.test.junit.categories.UnitTest;
 
 @Category(UnitTest.class)
@@ -44,34 +42,34 @@ public class TopEntriesJUnitTest {
 
   @Test
   public void testPopulateTopEntries() {
-    TopEntries<String> hits = new TopEntries<String>();
+    TopEntries<String> hits = new TopEntries<>();
     hits.addHit(r1_1);
     hits.addHit(r2_1);
     hits.addHit(r1_2);
     hits.addHit(r2_2);
 
     assertEquals(4, hits.size());
-    verifyResultOrder(hits.getHits(), r1_1, r2_1, r1_2, r2_2);
+    LuceneTestUtilities.verifyResultOrder(hits.getHits(), r1_1, r2_1, r1_2, r2_2);
   }
 
   @Test
   public void putSameScoreEntries() {
-    TopEntries<String> hits = new TopEntries<String>();
-    EntryScore<String> r1 = new EntryScore<String>("1", .8f);
-    EntryScore<String> r2 = new EntryScore<String>("2", .8f);
+    TopEntries<String> hits = new TopEntries<>();
+    EntryScore<String> r1 = new EntryScore<>("1", .8f);
+    EntryScore<String> r2 = new EntryScore<>("2", .8f);
     hits.addHit(r1);
     hits.addHit(r2);
 
     assertEquals(2, hits.size());
-    verifyResultOrder(hits.getHits(), r1, r2);
+    LuceneTestUtilities.verifyResultOrder(hits.getHits(), r1, r2);
   }
 
   @Test
   public void testInitialization() {
-    TopEntries<String> hits = new TopEntries<String>();
+    TopEntries<String> hits = new TopEntries<>();
     assertEquals(LuceneQueryFactory.DEFAULT_LIMIT, hits.getLimit());
 
-    hits = new TopEntries<String>(123);
+    hits = new TopEntries<>(123);
     assertEquals(123, hits.getLimit());
   }
 
@@ -82,47 +80,33 @@ public class TopEntriesJUnitTest {
 
   @Test
   public void enforceLimit() throws Exception {
-    TopEntries<String> hits = new TopEntries<String>(3);
+    TopEntries<String> hits = new TopEntries<>(3);
     hits.addHit(r1_1);
     hits.addHit(r2_1);
     hits.addHit(r1_2);
     hits.addHit(r2_2);
 
     assertEquals(3, hits.size());
-    verifyResultOrder(hits.getHits(), r1_1, r2_1, r1_2);
+    LuceneTestUtilities.verifyResultOrder(hits.getHits(), r1_1, r2_1, r1_2);
   }
 
   @Test
   public void testSerialization() {
     LuceneServiceImpl.registerDataSerializables();
-    TopEntries<String> hits = new TopEntries<String>(3);
+    TopEntries<String> hits = new TopEntries<>(3);
 
     TopEntries<String> copy = CopyHelper.deepCopy(hits);
     assertEquals(3, copy.getLimit());
     assertEquals(0, copy.getHits().size());
 
-    hits = new TopEntries<String>(3);
+    hits = new TopEntries<>(3);
     hits.addHit(r1_1);
     hits.addHit(r2_1);
     hits.addHit(r1_2);
 
     copy = CopyHelper.deepCopy(hits);
     assertEquals(3, copy.size());
-    verifyResultOrder(copy.getHits(), r1_1, r2_1, r1_2);
-  }
-
-  // TODO: extract to lucene test util class
-  public static void verifyResultOrder(Collection<EntryScore<String>> list,
-      EntryScore<String>... expectedEntries) {
-    Iterator<EntryScore<String>> iter = list.iterator();
-    for (EntryScore expectedEntry : expectedEntries) {
-      if (!iter.hasNext()) {
-        fail();
-      }
-      EntryScore toVerify = iter.next();
-      assertEquals(expectedEntry.getKey(), toVerify.getKey());
-      assertEquals(expectedEntry.getScore(), toVerify.getScore(), .0f);
-    }
+    LuceneTestUtilities.verifyResultOrder(copy.getHits(), r1_1, r2_1, r1_2);
   }
 
   @Before

http://git-wip-us.apache.org/repos/asf/geode/blob/363e50d2/geode-lucene/src/test/java/org/apache/geode/cache/lucene/test/LuceneTestUtilities.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/test/LuceneTestUtilities.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/test/LuceneTestUtilities.java
index 5563112..17f4dea 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/test/LuceneTestUtilities.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/test/LuceneTestUtilities.java
@@ -18,8 +18,10 @@ import static org.junit.Assert.*;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -44,6 +46,7 @@ import org.apache.geode.cache.lucene.LuceneService;
 import org.apache.geode.cache.lucene.LuceneServiceProvider;
 import org.apache.geode.cache.lucene.internal.LuceneIndexForPartitionedRegion;
 import org.apache.geode.cache.lucene.internal.LuceneServiceImpl;
+import org.apache.geode.cache.lucene.internal.distributed.EntryScore;
 import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
 import org.apache.geode.test.dunit.VM;
@@ -82,6 +85,19 @@ public class LuceneTestUtilities {
   public static String Quarter3 = "Q3";
   public static String Quarter4 = "Q4";
 
+  public static void verifyResultOrder(Collection<EntryScore<String>> list,
+      EntryScore<String>... expectedEntries) {
+    Iterator<EntryScore<String>> iter = list.iterator();
+    for (EntryScore expectedEntry : expectedEntries) {
+      if (!iter.hasNext()) {
+        fail();
+      }
+      EntryScore toVerify = iter.next();
+      assertEquals(expectedEntry.getKey(), toVerify.getKey());
+      assertEquals(expectedEntry.getScore(), toVerify.getScore(), .0f);
+    }
+  }
+
   public static class IntRangeQueryProvider implements LuceneQueryProvider {
     String fieldName;
     int lowerValue;


[2/2] geode git commit: GEODE-2632: refactor code to use InternalCache instead of GemFireCacheImpl

Posted by kl...@apache.org.
GEODE-2632: refactor code to use InternalCache instead of GemFireCacheImpl

* minor cleanup also


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/363e50d2
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/363e50d2
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/363e50d2

Branch: refs/heads/develop
Commit: 363e50d213d763f4cca6e0744b206941a4f2d52c
Parents: 0862174
Author: Kirk Lund <kl...@apache.org>
Authored: Wed Apr 19 14:41:42 2017 -0700
Committer: Kirk Lund <kl...@apache.org>
Committed: Fri Apr 21 13:45:22 2017 -0700

----------------------------------------------------------------------
 .../query/internal/cq/CqServiceProvider.java    |  22 +-
 .../query/internal/cq/spi/CqServiceFactory.java |   8 +-
 .../cache/query/internal/cq/ClientCQImpl.java   |  95 +--
 .../cache/query/internal/cq/CqQueryImpl.java    |  91 ++-
 .../query/internal/cq/CqServiceFactoryImpl.java |  17 +-
 .../cache/query/internal/cq/CqServiceImpl.java  | 673 ++++---------------
 .../internal/cq/CqServiceStatisticsImpl.java    |  21 +-
 .../query/internal/cq/CqServiceVsdStats.java    |  73 +-
 .../cache/query/internal/cq/ServerCQImpl.java   | 121 +---
 .../cache/tier/sockets/command/ExecuteCQ.java   |   4 +-
 .../cache/tier/sockets/command/ExecuteCQ61.java |   4 +-
 .../tier/sockets/command/GetDurableCQs.java     |   4 +-
 .../cache/query/cq/dunit/CqStatsDUnitTest.java  |  44 +-
 .../cq/dunit/CqStatsUsingPoolDUnitTest.java     |  47 +-
 .../TopEntriesFunctionCollector.java            |  22 +-
 .../LuceneQueryFunctionJUnitTest.java           |  54 +-
 .../TopEntriesCollectorJUnitTest.java           |  23 +-
 .../TopEntriesFunctionCollectorJUnitTest.java   |  48 +-
 .../distributed/TopEntriesJUnitTest.java        |  44 +-
 .../cache/lucene/test/LuceneTestUtilities.java  |  16 +
 20 files changed, 442 insertions(+), 989 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/363e50d2/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceProvider.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceProvider.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceProvider.java
index cded9c3..90fbf4b 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceProvider.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceProvider.java
@@ -16,7 +16,7 @@ package org.apache.geode.cache.query.internal.cq;
 
 import org.apache.geode.cache.query.internal.cq.spi.CqServiceFactory;
 import org.apache.geode.distributed.internal.DistributionConfig;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
 
 import java.io.DataInput;
 import java.io.IOException;
@@ -26,17 +26,19 @@ import java.util.ServiceLoader;
 public class CqServiceProvider {
 
   private static final CqServiceFactory factory;
-  // System property to maintain the CQ event references for optimizing the updates.
-  // This will allows to run the CQ query only once during update events.
+
+  /**
+   * System property to maintain the CQ event references for optimizing the updates. This will allow
+   * running the CQ query only once during update events.
+   */
   public static boolean MAINTAIN_KEYS = Boolean
-      .valueOf(System.getProperty(DistributionConfig.GEMFIRE_PREFIX + "cq.MAINTAIN_KEYS", "true"))
-      .booleanValue();
+      .valueOf(System.getProperty(DistributionConfig.GEMFIRE_PREFIX + "cq.MAINTAIN_KEYS", "true"));
+
   /**
    * A debug flag used for testing vMotion during CQ registration
    */
   public static boolean VMOTION_DURING_CQ_REGISTRATION_FLAG = false;
 
-
   static {
     ServiceLoader<CqServiceFactory> loader = ServiceLoader.load(CqServiceFactory.class);
     Iterator<CqServiceFactory> itr = loader.iterator();
@@ -48,8 +50,7 @@ public class CqServiceProvider {
     }
   }
 
-  public static CqService create(GemFireCacheImpl cache) {
-
+  public static CqService create(InternalCache cache) {
     if (factory == null) {
       return new MissingCqService();
     }
@@ -63,10 +64,7 @@ public class CqServiceProvider {
     } else {
       return factory.readCqQuery(in);
     }
-
   }
 
-  private CqServiceProvider() {
-
-  }
+  private CqServiceProvider() {}
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/363e50d2/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/spi/CqServiceFactory.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/spi/CqServiceFactory.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/spi/CqServiceFactory.java
index 68ebbd5..2b8a47e 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/spi/CqServiceFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/spi/CqServiceFactory.java
@@ -19,16 +19,16 @@ import java.io.IOException;
 
 import org.apache.geode.cache.query.internal.cq.CqService;
 import org.apache.geode.cache.query.internal.cq.ServerCQ;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
 
 public interface CqServiceFactory {
 
-  public void initialize();
+  void initialize();
 
   /**
    * Create a new CqService for the given cache
    */
-  public CqService create(GemFireCacheImpl cache);
+  CqService create(InternalCache cache);
 
-  public ServerCQ readCqQuery(DataInput in) throws ClassNotFoundException, IOException;
+  ServerCQ readCqQuery(DataInput in) throws ClassNotFoundException, IOException;
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/363e50d2/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/ClientCQImpl.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/ClientCQImpl.java b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/ClientCQImpl.java
index 00a0aa5..111bf84 100644
--- a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/ClientCQImpl.java
+++ b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/ClientCQImpl.java
@@ -35,7 +35,7 @@ import org.apache.geode.cache.query.CqResults;
 import org.apache.geode.cache.query.CqStatusListener;
 import org.apache.geode.cache.query.RegionNotFoundException;
 import org.apache.geode.cache.query.internal.CqStateImpl;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.logging.LogService;
@@ -57,7 +57,7 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
    */
   private volatile ConcurrentLinkedQueue<CqEventImpl> queuedEvents = null;
 
-  public final Object queuedEventsSynchObject = new Object();
+  final Object queuedEventsSynchObject = new Object();
 
   private boolean connected = false;
 
@@ -73,22 +73,15 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
     return this.cqName;
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.geode.cache.query.internal.InternalCqQuery2#getCQProxy()
-   */
-  public ServerCQProxyImpl getCQProxy() {
+  ServerCQProxyImpl getCQProxy() {
     return this.cqProxy;
   }
 
   /**
    * Initializes the connection using the pool from the client region. Also sets the cqBaseRegion
    * value of this CQ.
-   * 
-   * @throws CqException
    */
-  public void initConnectionProxy() throws CqException, RegionNotFoundException {
+  private void initConnectionProxy() throws CqException, RegionNotFoundException {
     cqBaseRegion = (LocalRegion) cqService.getCache().getRegion(regionName);
     // Check if the region exists on the local cache.
     // In the current implementation of 5.1 the Server Connection is (ConnectionProxyImpl)
@@ -113,17 +106,9 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
       throw new CqException(
           "Unable to get the connection pool. The Region does not have a pool configured.");
     }
-
-    // if (proxy == null) {
-    // throw new
-    // CqException(LocalizedStrings.CqQueryImpl_UNABLE_TO_GET_THE_CONNECTIONPROXY_THE_REGION_MAY_NOT_HAVE_A_BRIDGEWRITER_OR_BRIDGECLIENT_INSTALLED_ON_IT.toLocalizedString());
-    // } else if(!proxy.getEstablishCallbackConnection()){
-    // throw new
-    // CqException(LocalizedStrings.CqQueryImpl_THE_ESTABLISHCALLBACKCONNECTION_ON_BRIDGEWRITER_CLIENT_INSTALLED_ON_REGION_0_IS_SET_TO_FALSE
-    // .toLocalizedString(regionName));
-    // }
   }
 
+  @Override
   public void close() throws CqClosedException, CqException {
     this.close(true);
   }
@@ -182,15 +167,15 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
       if (cqProxy == null || !sendRequestToServer || isClosed) {
         // Stat update.
         if (stateBeforeClosing == CqStateImpl.RUNNING) {
-          cqService.stats.decCqsActive();
+          cqService.stats().decCqsActive();
         } else if (stateBeforeClosing == CqStateImpl.STOPPED) {
-          cqService.stats.decCqsStopped();
+          cqService.stats().decCqsStopped();
         }
 
         // Set the state to close, and update stats
         this.cqState.setState(CqStateImpl.CLOSED);
-        cqService.stats.incCqsClosed();
-        cqService.stats.decCqsOnClient();
+        cqService.stats().incCqsClosed();
+        cqService.stats().decCqsOnClient();
         if (this.stats != null)
           this.stats.close();
       } else {
@@ -201,7 +186,7 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
         if (exception != null) {
           throw new CqException(
               LocalizedStrings.CqQueryImpl_FAILED_TO_CLOSE_THE_CQ_CQNAME_0_ERROR_FROM_LAST_ENDPOINT_1
-                  .toLocalizedString(new Object[] {this.cqName, exception.getLocalizedMessage()}),
+                  .toLocalizedString(this.cqName, exception.getLocalizedMessage()),
               exception.getCause());
         } else {
           throw new CqException(
@@ -261,31 +246,28 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
 
   /**
    * Clears the resource used by CQ.
-   * 
-   * @throws CqException
    */
+  @Override
   protected void cleanup() throws CqException {
     this.cqService.removeFromBaseRegionToCqNameMap(this.regionName, this.getServerCqName());
   }
 
+  @Override
   public CqAttributes getCqAttributes() {
     return cqAttributes;
   }
 
-
-
   /**
    * @return Returns the cqListeners.
    */
   public CqListener[] getCqListeners() {
-
     return cqAttributes.getCqListeners();
   }
 
-
   /**
    * Start or resume executing the query.
    */
+  @Override
   public void execute() throws CqClosedException, RegionNotFoundException, CqException {
     executeCqOnRedundantsAndPrimary(false);
   }
@@ -293,7 +275,8 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
   /**
    * Start or resume executing the query. Gets or updates the CQ results and returns them.
    */
-  public CqResults executeWithInitialResults()
+  @Override
+  public <E> CqResults<E> executeWithInitialResults()
       throws CqClosedException, RegionNotFoundException, CqException {
 
     synchronized (queuedEventsSynchObject) {
@@ -320,16 +303,7 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
     CqResults initialResults;
     try {
       initialResults = (CqResults) executeCqOnRedundantsAndPrimary(true);
-    } catch (CqClosedException e) {
-      queuedEvents = null;
-      throw e;
-    } catch (RegionNotFoundException e) {
-      queuedEvents = null;
-      throw e;
-    } catch (CqException e) {
-      queuedEvents = null;
-      throw e;
-    } catch (RuntimeException e) {
+    } catch (RegionNotFoundException | CqException | RuntimeException e) {
       queuedEvents = null;
       throw e;
     }
@@ -343,6 +317,7 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
         if (!this.queuedEvents.isEmpty()) {
           try {
             Runnable r = new Runnable() {
+              @Override
               public void run() {
                 Object[] eventArray = null;
                 if (CqQueryImpl.testHook != null) {
@@ -395,7 +370,7 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
    * @param executeWithInitialResults boolean
    * @return Object SelectResults in case of executeWithInitialResults
    */
-  public Object executeCqOnRedundantsAndPrimary(boolean executeWithInitialResults)
+  private Object executeCqOnRedundantsAndPrimary(boolean executeWithInitialResults)
       throws CqClosedException, RegionNotFoundException, CqException {
 
     Object initialResults = null;
@@ -461,8 +436,7 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
           } else {
             String errMsg =
                 LocalizedStrings.CqQueryImpl_FAILED_TO_EXECUTE_THE_CQ_CQNAME_0_QUERY_STRING_IS_1_ERROR_FROM_LAST_SERVER_2
-                    .toLocalizedString(
-                        new Object[] {this.cqName, this.queryString, ex.getLocalizedMessage()});
+                    .toLocalizedString(this.cqName, this.queryString, ex.getLocalizedMessage());
             if (logger.isDebugEnabled()) {
               logger.debug(errMsg, ex);
             }
@@ -498,8 +472,8 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
       }
     }
     // Update CQ-base region for book keeping.
-    this.cqService.stats.incCqsActive();
-    this.cqService.stats.decCqsStopped();
+    this.cqService.stats().incCqsActive();
+    this.cqService.stats().decCqsStopped();
     return initialResults;
   }
 
@@ -509,23 +483,22 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
    * @return true if shutdown in progress else false.
    */
   private boolean shutdownInProgress() {
-    GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+    InternalCache cache = cqService.getInternalCache();
     if (cache == null || cache.isClosed()) {
       return true; // bail, things are shutting down
     }
 
-
     String reason = cqProxy.getPool().getCancelCriterion().cancelInProgress();
     if (reason != null) {
       return true;
     }
     return false;
-
   }
 
   /**
    * Stop or pause executing the query.
    */
+  @Override
   public void stop() throws CqClosedException, CqException {
     boolean isStopped = false;
     synchronized (this.cqState) {
@@ -558,8 +531,8 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
       if (cqProxy == null || isStopped) {
         // Change state and stats on the client side
         this.cqState.setState(CqStateImpl.STOPPED);
-        this.cqService.stats.incCqsStopped();
-        this.cqService.stats.decCqsActive();
+        this.cqService.stats().incCqsStopped();
+        this.cqService.stats().decCqsActive();
         if (logger.isDebugEnabled()) {
           logger.debug("Successfully stopped the CQ. {}", cqName);
         }
@@ -568,7 +541,7 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
         if (exception != null) {
           throw new CqException(
               LocalizedStrings.CqQueryImpl_FAILED_TO_STOP_THE_CQ_CQNAME_0_ERROR_FROM_LAST_SERVER_1
-                  .toLocalizedString(new Object[] {this.cqName, exception.getLocalizedMessage()}),
+                  .toLocalizedString(this.cqName, exception.getLocalizedMessage()),
               exception.getCause());
         } else {
           throw new CqException(
@@ -579,24 +552,15 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
     }
   }
 
+  @Override
   public CqAttributesMutator getCqAttributesMutator() {
     return (CqAttributesMutator) this.cqAttributes;
   }
 
-
-  public ConcurrentLinkedQueue<CqEventImpl> getQueuedEvents() {
+  ConcurrentLinkedQueue<CqEventImpl> getQueuedEvents() {
     return this.queuedEvents;
   }
 
-
-
-  /*
-   * (non-Javadoc)
-   * 
-   * @see
-   * org.apache.geode.cache.query.internal.InternalCqQuery2#setProxyCache(org.apache.geode.cache.
-   * client.internal.ProxyCache)
-   */
   @Override
   public void setProxyCache(ProxyCache proxyCache) {
     this.proxyCache = proxyCache;
@@ -612,7 +576,6 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
 
   @Override
   public void createOn(Connection conn, boolean isDurable) {
-
     byte regionDataPolicyOrdinal = getCqBaseRegion() == null ? (byte) 0
         : getCqBaseRegion().getAttributes().getDataPolicy().ordinal;
 
@@ -620,6 +583,4 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
     this.cqProxy.createOn(getName(), conn, getQueryString(), state, isDurable,
         regionDataPolicyOrdinal);
   }
-
-
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/363e50d2/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqQueryImpl.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqQueryImpl.java b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqQueryImpl.java
index 22b4137..07e3171 100644
--- a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqQueryImpl.java
+++ b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqQueryImpl.java
@@ -21,11 +21,9 @@ import java.util.Set;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.StatisticsFactory;
-import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.query.CqClosedException;
 import org.apache.geode.cache.query.CqEvent;
 import org.apache.geode.cache.query.CqException;
-import org.apache.geode.cache.query.CqExistsException;
 import org.apache.geode.cache.query.CqState;
 import org.apache.geode.cache.query.CqStatistics;
 import org.apache.geode.cache.query.Query;
@@ -38,7 +36,7 @@ import org.apache.geode.cache.query.internal.CqStateImpl;
 import org.apache.geode.cache.query.internal.DefaultQuery;
 import org.apache.geode.cache.query.internal.ExecutionContext;
 import org.apache.geode.cache.query.internal.QueryExecutionContext;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.logging.InternalLogWriter;
@@ -58,13 +56,13 @@ public abstract class CqQueryImpl implements InternalCqQuery {
 
   protected String queryString;
 
-  protected static final Object TOKEN = new Object();
+  static final Object TOKEN = new Object();
 
-  protected LocalRegion cqBaseRegion;
+  LocalRegion cqBaseRegion;
 
   protected Query query = null;
 
-  protected InternalLogWriter securityLogWriter;
+  InternalLogWriter securityLogWriter;
 
   protected CqServiceImpl cqService;
 
@@ -72,14 +70,14 @@ public abstract class CqQueryImpl implements InternalCqQuery {
 
   protected boolean isDurable = false;
 
-  // Stats counters
-  protected CqStatisticsImpl cqStats;
+  /** Stats counters */
+  private CqStatisticsImpl cqStats;
 
   protected CqQueryVsdStats stats;
 
   protected final CqStateImpl cqState = new CqStateImpl();
 
-  protected ExecutionContext queryExecutionContext = null;
+  private ExecutionContext queryExecutionContext = null;
 
   public static TestHook testHook = null;
 
@@ -100,6 +98,7 @@ public abstract class CqQueryImpl implements InternalCqQuery {
   /**
    * returns CQ name
    */
+  @Override
   public String getName() {
     return this.cqName;
   }
@@ -109,6 +108,7 @@ public abstract class CqQueryImpl implements InternalCqQuery {
     this.cqName = cqName;
   }
 
+  @Override
   public void setCqService(CqService cqService) {
     this.cqService = (CqServiceImpl) cqService;
   }
@@ -121,25 +121,24 @@ public abstract class CqQueryImpl implements InternalCqQuery {
     return this.regionName;
   }
 
-  public void updateCqCreateStats() {
+  void updateCqCreateStats() {
     // Initialize the VSD statistics
     StatisticsFactory factory = cqService.getCache().getDistributedSystem();
     this.stats = new CqQueryVsdStats(factory, getServerCqName());
     this.cqStats = new CqStatisticsImpl(this);
 
     // Update statistics with CQ creation.
-    this.cqService.stats.incCqsStopped();
-    this.cqService.stats.incCqsCreated();
-    this.cqService.stats.incCqsOnClient();
+    this.cqService.stats().incCqsStopped();
+    this.cqService.stats().incCqsCreated();
+    this.cqService.stats().incCqsOnClient();
   }
 
   /**
    * Validates the CQ. Checks for cq constraints. Also sets the base region name.
    */
-  public void validateCq() {
-    Cache cache = cqService.getCache();
-    DefaultQuery locQuery =
-        (DefaultQuery) ((GemFireCacheImpl) cache).getLocalQueryService().newQuery(this.queryString);
+  void validateCq() {
+    InternalCache cache = cqService.getInternalCache();
+    DefaultQuery locQuery = (DefaultQuery) cache.getLocalQueryService().newQuery(this.queryString);
     this.query = locQuery;
     // assert locQuery != null;
 
@@ -221,10 +220,8 @@ public abstract class CqQueryImpl implements InternalCqQuery {
 
   /**
    * Removes the CQ from CQ repository.
-   * 
-   * @throws CqException
    */
-  protected void removeFromCqMap() throws CqException {
+  void removeFromCqMap() throws CqException {
     try {
       cqService.removeCq(this.getServerCqName());
     } catch (Exception ex) {
@@ -243,6 +240,7 @@ public abstract class CqQueryImpl implements InternalCqQuery {
   /**
    * Returns the QueryString of this CQ.
    */
+  @Override
   public String getQueryString() {
     return queryString;
   }
@@ -252,23 +250,16 @@ public abstract class CqQueryImpl implements InternalCqQuery {
    * 
    * @return the Query for the query string
    */
+  @Override
   public Query getQuery() {
     return query;
   }
 
-
-  /**
-   * @see org.apache.geode.cache.query.CqQuery#getStatistics()
-   */
+  @Override
   public CqStatistics getStatistics() {
     return cqStats;
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.geode.cache.query.internal.InternalCqQuery2#getCqBaseRegion()
-   */
   @Override
   public LocalRegion getCqBaseRegion() {
     return this.cqBaseRegion;
@@ -279,11 +270,12 @@ public abstract class CqQueryImpl implements InternalCqQuery {
   /**
    * @return Returns the Region name on which this cq is created.
    */
-  public String getBaseRegionName() {
+  String getBaseRegionName() {
 
     return this.regionName;
   }
 
+  @Override
   public abstract String getServerCqName();
 
   /**
@@ -291,15 +283,11 @@ public abstract class CqQueryImpl implements InternalCqQuery {
    * 
    * @return STOPPED RUNNING or CLOSED
    */
+  @Override
   public CqState getState() {
     return this.cqState;
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.geode.cache.query.internal.InternalCqQuery2#setCqState(int)
-   */
   @Override
   public void setCqState(int state) {
     if (this.isClosed()) {
@@ -309,18 +297,13 @@ public abstract class CqQueryImpl implements InternalCqQuery {
 
     synchronized (cqState) {
       if (state == CqStateImpl.RUNNING) {
-        if (this.isRunning()) {
-          // throw new
-          // IllegalStateException(LocalizedStrings.CqQueryImpl_CQ_IS_NOT_IN_RUNNING_STATE_STOP_CQ_DOES_NOT_APPLY_CQNAME_0
-          // .toLocalizedString(this.cqName));
-        }
         this.cqState.setState(CqStateImpl.RUNNING);
-        this.cqService.stats.incCqsActive();
-        this.cqService.stats.decCqsStopped();
+        this.cqService.stats().incCqsActive();
+        this.cqService.stats().decCqsStopped();
       } else if (state == CqStateImpl.STOPPED) {
         this.cqState.setState(CqStateImpl.STOPPED);
-        this.cqService.stats.incCqsStopped();
-        this.cqService.stats.decCqsActive();
+        this.cqService.stats().incCqsStopped();
+        this.cqService.stats().decCqsActive();
       } else if (state == CqStateImpl.CLOSING) {
         this.cqState.setState(state);
       }
@@ -332,7 +315,7 @@ public abstract class CqQueryImpl implements InternalCqQuery {
    * 
    * @param cqEvent object
    */
-  public void updateStats(CqEvent cqEvent) {
+  void updateStats(CqEvent cqEvent) {
     this.stats.updateStats(cqEvent); // Stats for VSD
   }
 
@@ -341,15 +324,17 @@ public abstract class CqQueryImpl implements InternalCqQuery {
    * 
    * @return true if running, false otherwise
    */
+  @Override
   public boolean isRunning() {
     return this.cqState.isRunning();
   }
 
   /**
-   * Return true if the CQ is in Sstopped state
+   * Return true if the CQ is in stopped state
    * 
    * @return true if stopped, false otherwise
    */
+  @Override
   public boolean isStopped() {
     return this.cqState.isStopped();
   }
@@ -359,6 +344,7 @@ public abstract class CqQueryImpl implements InternalCqQuery {
    * 
    * @return true if closed, false otherwise
    */
+  @Override
   public boolean isClosed() {
     return this.cqState.isClosed();
   }
@@ -377,6 +363,7 @@ public abstract class CqQueryImpl implements InternalCqQuery {
    * 
    * @return true if durable, false otherwise
    */
+  @Override
   public boolean isDurable() {
     return this.isDurable;
   }
@@ -391,22 +378,22 @@ public abstract class CqQueryImpl implements InternalCqQuery {
     return stats;
   }
 
-  public ExecutionContext getQueryExecutionContext() {
+  ExecutionContext getQueryExecutionContext() {
     return queryExecutionContext;
   }
 
-  public void setQueryExecutionContext(ExecutionContext queryExecutionContext) {
+  private void setQueryExecutionContext(ExecutionContext queryExecutionContext) {
     this.queryExecutionContext = queryExecutionContext;
   }
 
   /** Test Hook */
   public interface TestHook {
-    public void pauseUntilReady();
+    void pauseUntilReady();
 
-    public void ready();
+    void ready();
 
-    public int numQueuedEvents();
+    int numQueuedEvents();
 
-    public void setEventCount(int count);
+    void setEventCount(int count);
   }
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/363e50d2/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceFactoryImpl.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceFactoryImpl.java b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceFactoryImpl.java
index db90632..9cc2eea 100644
--- a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceFactoryImpl.java
+++ b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceFactoryImpl.java
@@ -22,7 +22,7 @@ import java.util.Map;
 
 import org.apache.geode.cache.query.internal.cq.spi.CqServiceFactory;
 import org.apache.geode.internal.Version;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.tier.Command;
 import org.apache.geode.internal.cache.tier.MessageType;
 import org.apache.geode.internal.cache.tier.sockets.CommandInitializer;
@@ -36,14 +36,13 @@ import org.apache.geode.internal.cache.tier.sockets.command.StopCQ;
 
 public class CqServiceFactoryImpl implements CqServiceFactory {
 
+  @Override
   public void initialize() {
-    {
-      Map<Version, Command> versions = new HashMap<Version, Command>();
-      versions.put(Version.GFE_57, ExecuteCQ.getCommand());
-      versions.put(Version.GFE_61, ExecuteCQ61.getCommand());
-      CommandInitializer.registerCommand(MessageType.EXECUTECQ_MSG_TYPE, versions);
-      CommandInitializer.registerCommand(MessageType.EXECUTECQ_WITH_IR_MSG_TYPE, versions);
-    }
+    Map<Version, Command> versions = new HashMap<>();
+    versions.put(Version.GFE_57, ExecuteCQ.getCommand());
+    versions.put(Version.GFE_61, ExecuteCQ61.getCommand());
+    CommandInitializer.registerCommand(MessageType.EXECUTECQ_MSG_TYPE, versions);
+    CommandInitializer.registerCommand(MessageType.EXECUTECQ_WITH_IR_MSG_TYPE, versions);
 
     CommandInitializer.registerCommand(MessageType.GETCQSTATS_MSG_TYPE,
         Collections.singletonMap(Version.GFE_57, GetCQStats.getCommand()));
@@ -58,7 +57,7 @@ public class CqServiceFactoryImpl implements CqServiceFactory {
   }
 
   @Override
-  public CqService create(GemFireCacheImpl cache) {
+  public CqService create(InternalCache cache) {
     return new CqServiceImpl(cache);
   }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/363e50d2/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceImpl.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceImpl.java b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceImpl.java
index f1ca832..570c06c 100644
--- a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceImpl.java
+++ b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceImpl.java
@@ -14,19 +14,63 @@
  */
 package org.apache.geode.cache.query.internal.cq;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.logging.log4j.Logger;
+
 import org.apache.geode.InvalidDeltaException;
 import org.apache.geode.StatisticsFactory;
 import org.apache.geode.SystemFailure;
-import org.apache.geode.cache.*;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheEvent;
+import org.apache.geode.cache.CacheLoaderException;
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.RegionEvent;
+import org.apache.geode.cache.TimeoutException;
 import org.apache.geode.cache.client.Pool;
-import org.apache.geode.cache.client.internal.*;
-import org.apache.geode.cache.query.*;
-import org.apache.geode.cache.query.internal.*;
+import org.apache.geode.cache.client.internal.GetEventValueOp;
+import org.apache.geode.cache.client.internal.InternalPool;
+import org.apache.geode.cache.client.internal.QueueManager;
+import org.apache.geode.cache.client.internal.ServerCQProxyImpl;
+import org.apache.geode.cache.client.internal.UserAttributes;
+import org.apache.geode.cache.query.CqAttributes;
+import org.apache.geode.cache.query.CqClosedException;
+import org.apache.geode.cache.query.CqException;
+import org.apache.geode.cache.query.CqExistsException;
+import org.apache.geode.cache.query.CqListener;
+import org.apache.geode.cache.query.CqQuery;
+import org.apache.geode.cache.query.CqServiceStatistics;
+import org.apache.geode.cache.query.CqStatusListener;
+import org.apache.geode.cache.query.QueryException;
+import org.apache.geode.cache.query.QueryInvalidException;
+import org.apache.geode.cache.query.RegionNotFoundException;
+import org.apache.geode.cache.query.SelectResults;
+import org.apache.geode.cache.query.internal.CompiledSelect;
+import org.apache.geode.cache.query.internal.CqQueryVsdStats;
+import org.apache.geode.cache.query.internal.CqStateImpl;
+import org.apache.geode.cache.query.internal.DefaultQuery;
+import org.apache.geode.cache.query.internal.ExecutionContext;
 import org.apache.geode.distributed.internal.DistributionAdvisor.Profile;
 import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.i18n.StringId;
 import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile;
-import org.apache.geode.internal.cache.*;
+import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.FilterProfile;
+import org.apache.geode.internal.cache.FilterRoutingInfo;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.cache.tier.MessageType;
 import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
 import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy;
@@ -35,57 +79,43 @@ import org.apache.geode.internal.cache.tier.sockets.Part;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.log4j.LocalizedMessage;
-import org.apache.logging.log4j.Logger;
-
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
 
 /**
- * @since GemFire 5.5
- *
- *        Implements the CqService functionality.
- * 
- */
-/**
+ * Implements the CqService functionality.
  *
+ * @since GemFire 5.5
  */
 public final class CqServiceImpl implements CqService {
   private static final Logger logger = LogService.getLogger();
 
-  private static final Integer MESSAGE_TYPE_LOCAL_CREATE =
-      Integer.valueOf(MessageType.LOCAL_CREATE);
-  private static final Integer MESSAGE_TYPE_LOCAL_UPDATE =
-      Integer.valueOf(MessageType.LOCAL_UPDATE);
-  private static final Integer MESSAGE_TYPE_LOCAL_DESTROY =
-      Integer.valueOf(MessageType.LOCAL_DESTROY);
-  private static final Integer MESSAGE_TYPE_EXCEPTION = Integer.valueOf(MessageType.EXCEPTION);
+  private static final Integer MESSAGE_TYPE_LOCAL_CREATE = MessageType.LOCAL_CREATE;
+  private static final Integer MESSAGE_TYPE_LOCAL_UPDATE = MessageType.LOCAL_UPDATE;
+  private static final Integer MESSAGE_TYPE_LOCAL_DESTROY = MessageType.LOCAL_DESTROY;
+  private static final Integer MESSAGE_TYPE_EXCEPTION = MessageType.EXCEPTION;
 
   /**
    * System property to evaluate the query even though the initial results are not required when cq
    * is executed using the execute() method.
    */
-  public static boolean EXECUTE_QUERY_DURING_INIT = Boolean
-      .valueOf(System
-          .getProperty(DistributionConfig.GEMFIRE_PREFIX + "cq.EXECUTE_QUERY_DURING_INIT", "true"))
-      .booleanValue();
+  public static boolean EXECUTE_QUERY_DURING_INIT = Boolean.valueOf(System
+      .getProperty(DistributionConfig.GEMFIRE_PREFIX + "cq.EXECUTE_QUERY_DURING_INIT", "true"));
 
   private static final String CQ_NAME_PREFIX = "GfCq";
 
-  private final Cache cache;
+  private final InternalCache cache;
 
   /**
    * Manages cq pools to determine if a status of connect or disconnect needs to be sent out
    */
-  private final HashMap<String, Boolean> cqPoolsConnected = new HashMap<String, Boolean>();
-
+  private final HashMap<String, Boolean> cqPoolsConnected = new HashMap<>();
 
   /**
    * Manages CQ objects. uses serverCqName as key and CqQueryImpl as value
    * 
-   * @guarded.By cqQueryMapLock
+   * GuardedBy cqQueryMapLock
    */
-  private volatile HashMap<String, CqQueryImpl> cqQueryMap = new HashMap<String, CqQueryImpl>();
+  private volatile HashMap<String, CqQueryImpl> cqQueryMap = new HashMap<>();
+
   private final Object cqQueryMapLock = new Object();
 
   private volatile boolean isRunning = false;
@@ -93,36 +123,21 @@ public final class CqServiceImpl implements CqService {
   /**
    * Used by client when multiuser-authentication is true.
    */
-  private final HashMap<String, UserAttributes> cqNameToUserAttributesMap =
-      new HashMap<String, UserAttributes>();
-
-  // private boolean isServer = true;
-
-  /*
-   * // Map to manage CQ to satisfied CQ events (keys) for optimizing updates. private final HashMap
-   * cqToCqEventKeysMap = CqService.MAINTAIN_KEYS ? new HashMap() : null;
-   */
+  private final HashMap<String, UserAttributes> cqNameToUserAttributesMap = new HashMap<>();
 
   // Map to manage the similar CQs (having same query - performance optimization).
   // With query as key and Set of CQs as values.
   private final ConcurrentHashMap matchingCqMap;
 
   // CQ Service statistics
-  public final CqServiceStatisticsImpl cqServiceStats;
-  public final CqServiceVsdStats stats;
+  private final CqServiceStatisticsImpl cqServiceStats;
+  private final CqServiceVsdStats stats;
 
   // CQ identifier, also used in auto generated CQ names
   private volatile long cqId = 1;
 
-  /**
-   * Used to synchronize access to CQs in the repository
-   */
-  final Object cqSync = new Object();
-
   /* This is to manage region to CQs map, client side book keeping. */
-  private HashMap<String, ArrayList<String>> baseRegionToCqNameMap =
-      new HashMap<String, ArrayList<String>>();
-
+  private HashMap<String, ArrayList<String>> baseRegionToCqNameMap = new HashMap<>();
 
   /**
    * Access and modification to the contents of this map do not necessarily need to be lock
@@ -135,33 +150,24 @@ public final class CqServiceImpl implements CqService {
 
   /**
    * Constructor.
-   * 
-   * @param c The cache used for the service
+   *
+   * @param cache The cache used for the service
    */
-  public CqServiceImpl(final Cache c) {
-    if (c == null) {
+  public CqServiceImpl(final InternalCache cache) {
+    if (cache == null) {
       throw new IllegalStateException(LocalizedStrings.CqService_CACHE_IS_NULL.toLocalizedString());
     }
-    GemFireCacheImpl gfc = (GemFireCacheImpl) c;
-    gfc.getCancelCriterion().checkCancelInProgress(null);
-
-    this.cache = gfc;
+    cache.getCancelCriterion().checkCancelInProgress(null);
 
+    this.cache = cache;
 
     // Initialize the Map which maintains the matching cqs.
     this.matchingCqMap = new ConcurrentHashMap<String, HashSet<String>>();
 
     // Initialize the VSD statistics
-    StatisticsFactory factory = cache.getDistributedSystem();
+    StatisticsFactory factory = this.cache.getDistributedSystem();
     this.stats = new CqServiceVsdStats(factory);
     this.cqServiceStats = new CqServiceStatisticsImpl(this);
-
-    // final LoggingThreadGroup group =
-    // LoggingThreadGroup.createThreadGroup("CqExecutor Threads", logger);
-
-    // if (this.cache.getCacheServers().isEmpty()) {
-    // isServer = false;
-    // }
   }
 
   /**
@@ -171,13 +177,14 @@ public final class CqServiceImpl implements CqService {
     return this.cache;
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.geode.cache.query.internal.InternalCqService#newCq(java.lang.String,
-   * java.lang.String, org.apache.geode.cache.query.CqAttributes,
-   * org.apache.geode.cache.client.internal.ServerCQProxy, boolean)
-   */
+  public InternalCache getInternalCache() {
+    return this.cache;
+  }
+
+  public CqServiceVsdStats stats() {
+    return this.stats;
+  }
+
   @Override
   public synchronized ClientCQ newCq(String cqName, String queryString, CqAttributes cqAttributes,
       InternalPool pool, boolean isDurable)
@@ -242,22 +249,15 @@ public final class CqServiceImpl implements CqService {
     return cQuery;
   }
 
-
   /**
    * Executes the given CqQuery, if the CqQuery for that name is not there it registers the one and
    * executes. This is called on the Server.
    * 
-   * @param cqName
-   * @param queryString
-   * @param cqState
-   * @param clientProxyId
-   * @param ccn
    * @param manageEmptyRegions whether to update the 6.1 emptyRegions map held in the CCN
    * @param regionDataPolicy the data policy of the region associated with the query. This is only
    *        needed if manageEmptyRegions is true.
    * @param emptyRegionsMap map of empty regions.
    * @throws IllegalStateException if this is called at client side.
-   * @throws CqException
    */
   @Override
   public synchronized ServerCQ executeCq(String cqName, String queryString, int cqState,
@@ -271,7 +271,7 @@ public final class CqServiceImpl implements CqService {
     }
 
     String serverCqName = constructServerCqName(cqName, clientProxyId);
-    ServerCQImpl cQuery = null;
+    ServerCQImpl cQuery;
 
     // If this CQ is not yet registered in Server, register CQ.
     if (!isCqExists(serverCqName)) {
@@ -292,7 +292,6 @@ public final class CqServiceImpl implements CqService {
         logger.info(LocalizedMessage.create(
             LocalizedStrings.CqService_EXCEPTION_WHILE_REGISTERING_CQ_ON_SERVER_CQNAME___0,
             cQuery.getName()));
-        cQuery = null;
         throw cqe;
       }
 
@@ -308,6 +307,7 @@ public final class CqServiceImpl implements CqService {
     return cQuery;
   }
 
+  @Override
   public void resumeCQ(int cqState, ServerCQ cQuery) {
     // Initialize the state of CQ.
     if (((CqStateImpl) cQuery.getState()).getState() != cqState) {
@@ -324,25 +324,10 @@ public final class CqServiceImpl implements CqService {
     }
   }
 
-  /*
-   * public void addToCqEventKeysMap(CqQuery cq){ if (cqToCqEventKeysMap != null) { synchronized
-   * (cqToCqEventKeysMap){ String serverCqName = ((CqQueryImpl)cq).getServerCqName(); if
-   * (!cqToCqEventKeysMap.containsKey(serverCqName)){ cqToCqEventKeysMap.put(serverCqName, new
-   * HashSet()); if (_logger.isDebugEnabled()) {
-   * _logger.debug("CQ Event key maintenance for CQ, CqName: " + serverCqName + " is Enabled." +
-   * " key maintenance map size is: " + cqToCqEventKeysMap.size()); } } } // synchronized } }
-   */
-
-  public boolean hasCq() {
-    HashMap<String, CqQueryImpl> cqMap = cqQueryMap;
-    return (cqMap.size() > 0);
-  }
-
-
   /**
    * Adds the given CQ and cqQuery object into the CQ map.
    */
-  public void addToCqMap(CqQueryImpl cq) throws CqExistsException, CqException {
+  void addToCqMap(CqQueryImpl cq) throws CqExistsException, CqException {
     // On server side cqName will be server side cqName.
     String sCqName = cq.getServerCqName();
     if (logger.isDebugEnabled()) {
@@ -355,7 +340,7 @@ public final class CqServiceImpl implements CqService {
               .toLocalizedString(sCqName));
     }
     synchronized (cqQueryMapLock) {
-      HashMap<String, CqQueryImpl> tmpCqQueryMap = new HashMap<String, CqQueryImpl>(cqQueryMap);
+      HashMap<String, CqQueryImpl> tmpCqQueryMap = new HashMap<>(cqQueryMap);
       try {
         tmpCqQueryMap.put(sCqName, cq);
       } catch (Exception ex) {
@@ -377,66 +362,34 @@ public final class CqServiceImpl implements CqService {
   /**
    * Removes given CQ from the cqMap..
    */
-  public void removeCq(String cqName) {
+  void removeCq(String cqName) {
     // On server side cqName will be server side cqName.
     synchronized (cqQueryMapLock) {
-      HashMap<String, CqQueryImpl> tmpCqQueryMap = new HashMap<String, CqQueryImpl>(cqQueryMap);
+      HashMap<String, CqQueryImpl> tmpCqQueryMap = new HashMap<>(cqQueryMap);
       tmpCqQueryMap.remove(cqName);
       this.cqNameToUserAttributesMap.remove(cqName);
       cqQueryMap = tmpCqQueryMap;
     }
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see
-   * org.apache.geode.cache.query.internal.InternalCqService#getClientCqFromServer(org.apache.geode.
-   * internal.cache.tier.sockets.ClientProxyMembershipID, java.lang.String)
-   */
   @Override
   public CqQuery getClientCqFromServer(ClientProxyMembershipID clientProxyId, String clientCqName) {
     // On server side cqName will be server side cqName.
     HashMap<String, CqQueryImpl> cqMap = cqQueryMap;
-    return (CqQuery) cqMap.get(this.constructServerCqName(clientCqName, clientProxyId));
+    return cqMap.get(this.constructServerCqName(clientCqName, clientProxyId));
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.geode.cache.query.internal.InternalCqService#getCq(java.lang.String)
-   */
   @Override
   public InternalCqQuery getCq(String cqName) {
     // On server side cqName will be server side cqName.
-    return (InternalCqQuery) cqQueryMap.get(cqName);
+    return cqQueryMap.get(cqName);
   }
 
-  /**
-   * Clears the CQ Query Map.
-   */
-  public void clearCqQueryMap() {
-    // On server side cqName will be server side cqName.
-    synchronized (cqQueryMapLock) {
-      cqQueryMap = new HashMap<String, CqQueryImpl>();
-    }
-  }
-
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.geode.cache.query.internal.InternalCqService#getAllCqs()
-   */
   @Override
   public Collection<? extends InternalCqQuery> getAllCqs() {
     return cqQueryMap.values();
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.geode.cache.query.internal.InternalCqService#getAllCqs(java.lang.String)
-   */
   @Override
   public Collection<? extends InternalCqQuery> getAllCqs(final String regionName)
       throws CqException {
@@ -445,7 +398,7 @@ public final class CqServiceImpl implements CqService {
           LocalizedStrings.CqService_NULL_ARGUMENT_0.toLocalizedString("regionName"));
     }
 
-    String[] cqNames = null;
+    String[] cqNames;
 
     synchronized (this.baseRegionToCqNameMap) {
       ArrayList<String> cqs = this.baseRegionToCqNameMap.get(regionName);
@@ -456,7 +409,7 @@ public final class CqServiceImpl implements CqService {
       cqs.toArray(cqNames);
     }
 
-    ArrayList<InternalCqQuery> cQueryList = new ArrayList<InternalCqQuery>();
+    ArrayList<InternalCqQuery> cQueryList = new ArrayList<>();
     for (int cqCnt = 0; cqCnt < cqNames.length; cqCnt++) {
       InternalCqQuery cq = getCq(cqNames[cqCnt]);
       if (cq != null) {
@@ -467,34 +420,16 @@ public final class CqServiceImpl implements CqService {
     return cQueryList;
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.geode.cache.query.internal.InternalCqService#executeAllClientCqs()
-   */
   @Override
   public synchronized void executeAllClientCqs() throws CqException {
     executeCqs(this.getAllCqs());
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see
-   * org.apache.geode.cache.query.internal.InternalCqService#executeAllRegionCqs(java.lang.String)
-   */
   @Override
   public synchronized void executeAllRegionCqs(final String regionName) throws CqException {
     executeCqs(getAllCqs(regionName));
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see
-   * org.apache.geode.cache.query.internal.InternalCqService#executeCqs(org.apache.geode.cache.query
-   * .CqQuery[])
-   */
   @Override
   public synchronized void executeCqs(Collection<? extends InternalCqQuery> cqs)
       throws CqException {
@@ -503,53 +438,31 @@ public final class CqServiceImpl implements CqService {
     }
     String cqName = null;
     for (InternalCqQuery internalCq : cqs) {
-      CqQuery cq = (CqQuery) internalCq;
+      CqQuery cq = internalCq;
       if (!cq.isClosed() && cq.isStopped()) {
         try {
           cqName = cq.getName();
           cq.execute();
-        } catch (QueryException qe) {
-          if (logger.isDebugEnabled()) {
-            logger.debug("Failed to execute the CQ, CqName : {} Error : {}", cqName,
-                qe.getMessage());
-          }
-        } catch (CqClosedException cce) {
+        } catch (QueryException | CqClosedException e) {
           if (logger.isDebugEnabled()) {
             logger.debug("Failed to execute the CQ, CqName : {} Error : {}", cqName,
-                cce.getMessage());
+                e.getMessage());
           }
         }
       }
     }
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.geode.cache.query.internal.InternalCqService#stopAllClientCqs()
-   */
   @Override
   public synchronized void stopAllClientCqs() throws CqException {
     stopCqs(this.getAllCqs());
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.geode.cache.query.internal.InternalCqService#stopAllRegionCqs(java.lang.String)
-   */
   @Override
   public synchronized void stopAllRegionCqs(final String regionName) throws CqException {
     stopCqs(this.getAllCqs(regionName));
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see
-   * org.apache.geode.cache.query.internal.InternalCqService#stopCqs(org.apache.geode.cache.query.
-   * CqQuery[])
-   */
   @Override
   public synchronized void stopCqs(Collection<? extends InternalCqQuery> cqs) throws CqException {
     final boolean isDebugEnabled = logger.isDebugEnabled();
@@ -567,29 +480,20 @@ public final class CqServiceImpl implements CqService {
 
     String cqName = null;
     for (InternalCqQuery internalCqQuery : cqs) {
-      CqQuery cq = (CqQuery) internalCqQuery;
+      CqQuery cq = internalCqQuery;
       if (!cq.isClosed() && cq.isRunning()) {
         try {
           cqName = cq.getName();
           cq.stop();
-        } catch (QueryException qe) {
-          if (isDebugEnabled) {
-            logger.debug("Failed to stop the CQ, CqName : {} Error : {}", cqName, qe.getMessage());
-          }
-        } catch (CqClosedException cce) {
+        } catch (QueryException | CqClosedException e) {
           if (isDebugEnabled) {
-            logger.debug("Failed to stop the CQ, CqName : {} Error : {}", cqName, cce.getMessage());
+            logger.debug("Failed to stop the CQ, CqName : {} Error : {}", cqName, e.getMessage());
           }
         }
       }
     }
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.geode.cache.query.internal.InternalCqService#closeCqs(java.lang.String)
-   */
   @Override
   public void closeCqs(final String regionName) throws CqException {
     Collection<? extends InternalCqQuery> cqs = this.getAllCqs(regionName);
@@ -603,8 +507,8 @@ public final class CqServiceImpl implements CqService {
             // invoked on the server
             cq.close(false);
           } else {
-            // @todo grid: if regionName has a pool check its keepAlive
-            boolean keepAlive = ((GemFireCacheImpl) this.cache).keepDurableSubscriptionsAlive();
+            // TODO: grid: if regionName has a pool check its keepAlive
+            boolean keepAlive = this.cache.keepDurableSubscriptionsAlive();
             if (cq.isDurable() && keepAlive) {
               logger.warn(LocalizedMessage.create(
                   LocalizedStrings.CqService_NOT_SENDING_CQ_CLOSE_TO_THE_SERVER_AS_IT_IS_A_DURABLE_CQ));
@@ -614,14 +518,9 @@ public final class CqServiceImpl implements CqService {
             }
           }
 
-        } catch (QueryException qe) {
+        } catch (QueryException | CqClosedException e) {
           if (logger.isDebugEnabled()) {
-            logger.debug("Failed to close the CQ, CqName : {} Error : {}", cqName, qe.getMessage());
-          }
-        } catch (CqClosedException cce) {
-          if (logger.isDebugEnabled()) {
-            logger.debug("Failed to close the CQ, CqName : {} Error : {}", cqName,
-                cce.getMessage());
+            logger.debug("Failed to close the CQ, CqName : {} Error : {}", cqName, e.getMessage());
           }
         }
       }
@@ -630,10 +529,6 @@ public final class CqServiceImpl implements CqService {
 
   /**
    * Called directly on server side.
-   * 
-   * @param cqName
-   * @param clientId
-   * @throws CqException
    */
   @Override
   public void stopCq(String cqName, ClientProxyMembershipID clientId) throws CqException {
@@ -650,8 +545,6 @@ public final class CqServiceImpl implements CqService {
     try {
       HashMap<String, CqQueryImpl> cqMap = cqQueryMap;
       if (!cqMap.containsKey(serverCqName)) {
-        // throw new
-        // CqException(LocalizedStrings.CqService_CQ_NOT_FOUND_FAILED_TO_STOP_THE_SPECIFIED_CQ_0.toLocalizedString(serverCqName));
         /*
          * gregp 052808: We should silently fail here instead of throwing error. This is to deal
          * with races in recovery
@@ -689,15 +582,8 @@ public final class CqServiceImpl implements CqService {
     }
     // Send stop message to peers.
     cQuery.getCqBaseRegion().getFilterProfile().stopCq(cQuery);
-
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.geode.cache.query.internal.InternalCqService#closeCq(java.lang.String,
-   * org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID)
-   */
   @Override
   public void closeCq(String cqName, ClientProxyMembershipID clientProxyId) throws CqException {
     String serverCqName = cqName;
@@ -713,9 +599,6 @@ public final class CqServiceImpl implements CqService {
     try {
       HashMap<String, CqQueryImpl> cqMap = cqQueryMap;
       if (!cqMap.containsKey(serverCqName)) {
-        // throw new
-        // CqException(LocalizedStrings.CqService_CQ_NOT_FOUND_FAILED_TO_CLOSE_THE_SPECIFIED_CQ_0
-        // .toLocalizedString(serverCqName));
         /*
          * gregp 052808: We should silently fail here instead of throwing error. This is to deal
          * with races in recovery
@@ -791,12 +674,6 @@ public final class CqServiceImpl implements CqService {
     }
   }
 
-
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.geode.cache.query.internal.InternalCqService#closeAllCqs(boolean)
-   */
   @Override
   public void closeAllCqs(boolean clientInitiated) {
     closeAllCqs(clientInitiated, getAllCqs());
@@ -807,21 +684,13 @@ public final class CqServiceImpl implements CqService {
    * CqQuerys created by other VMs are unaffected.
    */
   private void closeAllCqs(boolean clientInitiated, Collection<? extends InternalCqQuery> cqs) {
-    closeAllCqs(clientInitiated, cqs,
-        ((GemFireCacheImpl) this.cache).keepDurableSubscriptionsAlive());
+    closeAllCqs(clientInitiated, cqs, this.cache.keepDurableSubscriptionsAlive());
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.geode.cache.query.internal.InternalCqService#closeAllCqs(boolean,
-   * org.apache.geode.cache.query.CqQuery[], boolean)
-   */
   @Override
   public void closeAllCqs(boolean clientInitiated, Collection<? extends InternalCqQuery> cqs,
       boolean keepAlive) {
 
-    // CqQuery[] cqs = getAllCqs();
     if (cqs != null) {
       String cqName = null;
       if (logger.isDebugEnabled()) {
@@ -830,7 +699,6 @@ public final class CqServiceImpl implements CqService {
       for (InternalCqQuery cQuery : cqs) {
         try {
           cqName = cQuery.getName();
-          // boolean keepAlive = ((GemFireCache)this.cache).keepDurableSubscriptionsAlive();
 
           if (isServer()) {
             cQuery.close(false);
@@ -847,47 +715,26 @@ public final class CqServiceImpl implements CqService {
               }
             }
           }
-        } catch (QueryException cqe) {
+        } catch (QueryException | CqClosedException e) {
           if (!isRunning()) {
             // Not cache shutdown
             logger
                 .warn(LocalizedMessage.create(LocalizedStrings.CqService_FAILED_TO_CLOSE_CQ__0___1,
-                    new Object[] {cqName, cqe.getMessage()}));
+                    new Object[] {cqName, e.getMessage()}));
           }
           if (logger.isDebugEnabled()) {
-            logger.debug(cqe.getMessage(), cqe);
-          }
-        } catch (CqClosedException cqe) {
-          if (!isRunning()) {
-            // Not cache shutdown
-            logger
-                .warn(LocalizedMessage.create(LocalizedStrings.CqService_FAILED_TO_CLOSE_CQ__0___1,
-                    new Object[] {cqName, cqe.getMessage()}));
-          }
-          if (logger.isDebugEnabled()) {
-            logger.debug(cqe.getMessage(), cqe);
+            logger.debug(e.getMessage(), e);
           }
         }
       }
     }
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.geode.cache.query.internal.InternalCqService#getCqStatistics()
-   */
   @Override
   public CqServiceStatistics getCqStatistics() {
     return cqServiceStats;
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.geode.cache.query.internal.InternalCqService#closeClientCqs(org.apache.geode.
-   * internal.cache.tier.sockets.ClientProxyMembershipID)
-   */
   @Override
   public void closeClientCqs(ClientProxyMembershipID clientProxyId) throws CqException {
     final boolean isDebugEnabled = logger.isDebugEnabled();
@@ -899,30 +746,19 @@ public final class CqServiceImpl implements CqService {
       CqQueryImpl cQuery = (CqQueryImpl) cq;
       try {
         cQuery.close(false);
-      } catch (QueryException qe) {
+      } catch (QueryException | CqClosedException e) {
         if (isDebugEnabled) {
           logger.debug("Failed to close the CQ, CqName : {} Error : {}", cQuery.getName(),
-              qe.getMessage());
-        }
-      } catch (CqClosedException cce) {
-        if (isDebugEnabled) {
-          logger.debug("Failed to close the CQ, CqName : {} Error : {}", cQuery.getName(),
-              cce.getMessage());
+              e.getMessage());
         }
       }
     }
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.geode.cache.query.internal.InternalCqService#getAllClientCqs(org.apache.geode.
-   * internal.cache.tier.sockets.ClientProxyMembershipID)
-   */
   @Override
   public List<ServerCQ> getAllClientCqs(ClientProxyMembershipID clientProxyId) {
     Collection<? extends InternalCqQuery> cqs = getAllCqs();
-    ArrayList<ServerCQ> clientCqs = new ArrayList<ServerCQ>();
+    ArrayList<ServerCQ> clientCqs = new ArrayList<>();
 
     for (InternalCqQuery cq : cqs) {
       ServerCQImpl cQuery = (ServerCQImpl) cq;
@@ -934,23 +770,16 @@ public final class CqServiceImpl implements CqService {
     return clientCqs;
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see
-   * org.apache.geode.cache.query.internal.InternalCqService#getAllDurableClientCqs(org.apache.geode
-   * .internal.cache.tier.sockets.ClientProxyMembershipID)
-   */
   @Override
   public List<String> getAllDurableClientCqs(ClientProxyMembershipID clientProxyId)
       throws CqException {
     if (clientProxyId == null) {
       throw new CqException(
           LocalizedStrings.CqService_UNABLE_TO_RETRIEVE_DURABLE_CQS_FOR_CLIENT_PROXY_ID
-              .toLocalizedString(clientProxyId));
+              .toLocalizedString(null));
     }
     List<ServerCQ> cqs = getAllClientCqs(clientProxyId);
-    ArrayList<String> durableClientCqs = new ArrayList<String>();
+    ArrayList<String> durableClientCqs = new ArrayList<>();
 
     for (ServerCQ cq : cqs) {
       ServerCQImpl cQuery = (ServerCQImpl) cq;
@@ -966,9 +795,6 @@ public final class CqServiceImpl implements CqService {
 
   /**
    * Server side method. Closes non-durable CQs for the given client proxy id.
-   * 
-   * @param clientProxyId
-   * @throws CqException
    */
   @Override
   public void closeNonDurableClientCqs(ClientProxyMembershipID clientProxyId) throws CqException {
@@ -983,15 +809,10 @@ public final class CqServiceImpl implements CqService {
         if (!cQuery.isDurable()) {
           cQuery.close(false);
         }
-      } catch (QueryException qe) {
-        if (isDebugEnabled) {
-          logger.debug("Failed to close the CQ, CqName : {} Error : {}", cQuery.getName(),
-              qe.getMessage());
-        }
-      } catch (CqClosedException cce) {
+      } catch (QueryException | CqClosedException e) {
         if (isDebugEnabled) {
           logger.debug("Failed to close the CQ, CqName : {} Error : {}", cQuery.getName(),
-              cce.getMessage());
+              e.getMessage());
         }
       }
     }
@@ -1028,6 +849,7 @@ public final class CqServiceImpl implements CqService {
     return this.isRunning;
   }
 
+  @Override
   public void start() {
     this.isRunning = true;
   }
@@ -1035,9 +857,10 @@ public final class CqServiceImpl implements CqService {
   /**
    * @return Returns the serverCqName.
    */
+  @Override
   public String constructServerCqName(String cqName, ClientProxyMembershipID clientProxyId) {
-    ConcurrentHashMap<ClientProxyMembershipID, String> cache = serverCqNameCache
-        .computeIfAbsent(cqName, key -> new ConcurrentHashMap<ClientProxyMembershipID, String>());
+    ConcurrentHashMap<ClientProxyMembershipID, String> cache =
+        serverCqNameCache.computeIfAbsent(cqName, key -> new ConcurrentHashMap<>());
 
     String cName = cache.get(clientProxyId);
     if (null == cName) {
@@ -1065,7 +888,7 @@ public final class CqServiceImpl implements CqService {
     }
   }
 
-  /*
+  /**
    * Checks if CQ with the given name already exists.
    * 
    * @param cqName name of the CQ.
@@ -1073,17 +896,15 @@ public final class CqServiceImpl implements CqService {
    * @return true if exists else false.
    */
   private synchronized boolean isCqExists(String cqName) {
-    boolean status = false;
     HashMap<String, CqQueryImpl> cqMap = cqQueryMap;
-    status = cqMap.containsKey(cqName);
-    return status;
+    return cqMap.containsKey(cqName);
   }
 
-  /*
+  /**
    * Generates a name for CQ. Checks if CQ with that name already exists if so generates a new
    * cqName.
    */
-  public synchronized String generateCqName() {
+  private synchronized String generateCqName() {
     while (true) {
       String cqName = CQ_NAME_PREFIX + (cqId++);
       if (!isCqExists(cqName)) {
@@ -1092,18 +913,9 @@ public final class CqServiceImpl implements CqService {
     }
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see
-   * org.apache.geode.cache.query.internal.InternalCqService#dispatchCqListeners(java.util.HashMap,
-   * int, java.lang.Object, java.lang.Object, byte[],
-   * org.apache.geode.cache.client.internal.QueueManager, org.apache.geode.internal.cache.EventID)
-   */
   @Override
   public void dispatchCqListeners(HashMap<String, Integer> cqs, int messageType, Object key,
       Object value, byte[] delta, QueueManager qManager, EventID eventId) {
-    ClientCQImpl cQuery = null;
     Object[] fullValue = new Object[1];
     Iterator<Map.Entry<String, Integer>> iter = cqs.entrySet().iterator();
     String cqName = null;
@@ -1112,7 +924,7 @@ public final class CqServiceImpl implements CqService {
       try {
         Map.Entry<String, Integer> entry = iter.next();
         cqName = entry.getKey();
-        cQuery = (ClientCQImpl) this.getCq(cqName);
+        ClientCQImpl cQuery = (ClientCQImpl) this.getCq(cqName);
 
         if (cQuery == null || (!cQuery.isRunning() && cQuery.getQueuedEvents() == null)) {
           if (isDebugEnabled) {
@@ -1122,7 +934,7 @@ public final class CqServiceImpl implements CqService {
           continue;
         }
 
-        Integer cqOp = (Integer) entry.getValue();
+        Integer cqOp = entry.getValue();
 
         // If Region destroy event, close the cq.
         if (cqOp.intValue() == MessageType.DESTROY_REGION) {
@@ -1136,8 +948,7 @@ public final class CqServiceImpl implements CqService {
         }
 
         // Construct CqEvent.
-        CqEventImpl cqEvent = null;
-        cqEvent = new CqEventImpl(cQuery, getOperation(messageType), getOperation(cqOp.intValue()),
+        CqEventImpl cqEvent = new CqEventImpl(cQuery, getOperation(messageType), getOperation(cqOp),
             key, value, delta, qManager, eventId);
 
         // Update statistics
@@ -1181,11 +992,11 @@ public final class CqServiceImpl implements CqService {
     } // iteration.
   }
 
-  public void invokeListeners(String cqName, ClientCQImpl cQuery, CqEventImpl cqEvent) {
+  void invokeListeners(String cqName, ClientCQImpl cQuery, CqEventImpl cqEvent) {
     invokeListeners(cqName, cQuery, cqEvent, null);
   }
 
-  public void invokeListeners(String cqName, ClientCQImpl cQuery, CqEventImpl cqEvent,
+  private void invokeListeners(String cqName, ClientCQImpl cQuery, CqEventImpl cqEvent,
       Object[] fullValue) {
     if (!cQuery.isRunning() || cQuery.getCqAttributes() == null) {
       return;
@@ -1217,8 +1028,8 @@ public final class CqServiceImpl implements CqService {
             }
             Part result = (Part) GetEventValueOp
                 .executeOnPrimary(cqEvent.getQueueManager().getPool(), cqEvent.getEventID(), null);
-            Object newVal = null;
-            if (result == null || (newVal = result.getObject()) == null) {
+            Object newVal = result.getObject();
+            if (result == null || newVal == null) {
               if (!cache.getCancelCriterion().isCancelInProgress()) {
                 Exception ex =
                     new Exception("Failed to retrieve full value from server for eventID "
@@ -1231,7 +1042,7 @@ public final class CqServiceImpl implements CqService {
                 }
               }
             } else {
-              ((GemFireCacheImpl) this.cache).getCachePerfStats().incDeltaFullValuesRequested();
+              this.cache.getCachePerfStats().incDeltaFullValuesRequested();
               cqEvent = new CqEventImpl(cQuery, cqEvent.getBaseOperation(),
                   cqEvent.getQueryOperation(), cqEvent.getKey(), newVal, cqEvent.getDeltaValue(),
                   cqEvent.getQueueManager(), cqEvent.getEventID());
@@ -1278,7 +1089,7 @@ public final class CqServiceImpl implements CqService {
     }
   }
 
-  public void invokeCqConnectedListeners(String cqName, ClientCQImpl cQuery, boolean connected) {
+  private void invokeCqConnectedListeners(String cqName, ClientCQImpl cQuery, boolean connected) {
     if (!cQuery.isRunning() || cQuery.getCqAttributes() == null) {
       return;
     }
@@ -1335,12 +1146,8 @@ public final class CqServiceImpl implements CqService {
     }
   }
 
-
   /**
    * Returns the Operation for the given EnumListenerEvent type.
-   * 
-   * @param eventType
-   * @return Operation
    */
   private Operation getOperation(int eventType) {
     Operation op = null;
@@ -1372,15 +1179,6 @@ public final class CqServiceImpl implements CqService {
     return op;
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see
-   * org.apache.geode.cache.query.internal.InternalCqService#processEvents(org.apache.geode.cache.
-   * CacheEvent, org.apache.geode.distributed.internal.DistributionAdvisor.Profile,
-   * org.apache.geode.distributed.internal.DistributionAdvisor.Profile[],
-   * org.apache.geode.internal.cache.FilterRoutingInfo)
-   */
   @Override
   public void processEvents(CacheEvent event, Profile localProfile, Profile[] profiles,
       FilterRoutingInfo frInfo) throws CqException {
@@ -1421,7 +1219,7 @@ public final class CqServiceImpl implements CqService {
         continue;
       }
       Map cqs = pf.getCqMap();
-      HashMap<Long, Integer> cqInfo = new HashMap<Long, Integer>();
+      HashMap<Long, Integer> cqInfo = new HashMap<>();
       Iterator cqIter = cqs.entrySet().iterator();
       while (cqIter.hasNext()) {
         Map.Entry cqEntry = (Map.Entry) cqIter.next();
@@ -1454,10 +1252,10 @@ public final class CqServiceImpl implements CqService {
   private void processEntryEvent(CacheEvent event, Profile localProfile, Profile[] profiles,
       FilterRoutingInfo frInfo) throws CqException {
     final boolean isDebugEnabled = logger.isDebugEnabled();
-    HashSet<Object> cqUnfilteredEventsSet_newValue = new HashSet<Object>();
-    HashSet<Object> cqUnfilteredEventsSet_oldValue = new HashSet<Object>();
-    boolean b_cqResults_newValue = false;
-    boolean b_cqResults_oldValue = false;
+    HashSet<Object> cqUnfilteredEventsSet_newValue = new HashSet<>();
+    HashSet<Object> cqUnfilteredEventsSet_oldValue = new HashSet<>();
+    boolean b_cqResults_newValue;
+    boolean b_cqResults_oldValue;
     boolean queryOldValue;
     EntryEvent entryEvent = (EntryEvent) event;
     Object eventKey = entryEvent.getKey();
@@ -1472,8 +1270,8 @@ public final class CqServiceImpl implements CqService {
         || event.getOperation().isDestroy() || event.getOperation().isInvalidate()
         || (event.getOperation().isCreate() && isDupEvent));
 
-    HashMap<String, Integer> matchedCqs = new HashMap<String, Integer>();
-    long executionStartTime = 0;
+    HashMap<String, Integer> matchedCqs = new HashMap<>();
+    long executionStartTime;
     for (int i = -1; i < profiles.length; i++) {
       CacheProfile cf;
       if (i < 0) {
@@ -1498,7 +1296,6 @@ public final class CqServiceImpl implements CqService {
         continue;
       }
 
-
       // Get new value. If its not retrieved.
       if (cqUnfilteredEventsSet_newValue.isEmpty()
           && (event.getOperation().isCreate() || event.getOperation().isUpdate())) {
@@ -1509,7 +1306,7 @@ public final class CqServiceImpl implements CqService {
         }
       }
 
-      HashMap<Long, Integer> cqInfo = new HashMap<Long, Integer>();
+      HashMap<Long, Integer> cqInfo = new HashMap<>();
       Iterator cqIter = cqs.entrySet().iterator();
 
       while (cqIter.hasNext()) {
@@ -1546,7 +1343,6 @@ public final class CqServiceImpl implements CqService {
           }
         } else {
           boolean error = false;
-          // synchronized (cQuery)
           {
             try {
               synchronized (cQuery) {
@@ -1644,7 +1440,7 @@ public final class CqServiceImpl implements CqService {
                 cQuery.markAsDestroyedInCqResultKeys(eventKey);
               }
             }
-          } // end synchronized(cQuery)
+          }
 
           // Get the matching CQs if any.
           // synchronized (this.matchingCqMap){
@@ -1663,7 +1459,6 @@ public final class CqServiceImpl implements CqService {
               }
             }
           }
-          // }
         }
 
         if (cqEvent != null && cQuery.isRunning()) {
@@ -1694,153 +1489,35 @@ public final class CqServiceImpl implements CqService {
     } // iteration over Profiles.
   }
 
-
-  /*
-   * public void processEvents (EnumListenerEvent operation, CacheEvent event, ClientUpdateMessage
-   * clientMessage, CM<ClientProxyMembershipID, CM<CqQuery, Boolean>> clientIds) throws CqException
-   * {
-   * 
-   * //Is this a region event or an entry event if (event instanceof RegionEvent){
-   * processRegionEvent(operation, event, clientMessage, clientIds); } else { processEntryEvent
-   * (operation, event, clientMessage, clientIds); }
-   * 
-   * }
-   * 
-   * private void processRegionEvent(EnumListenerEvent operation, CacheEvent event,
-   * ClientUpdateMessage clientMessage, CM<ClientProxyMembershipID, CM<CqQuery, Boolean>> clientIds)
-   * throws CqException {
-   * 
-   * if (logger.isDebugEnabled()) { logger.debug("Processing region event for region " +
-   * ((LocalRegion)(event.getRegion())).getName()); } HashMap filteredCqs = new HashMap(); Integer
-   * cqRegionEvent = generateCqRegionEvent(operation); Iterator it =
-   * clientIds.entrySet().iterator(); while (it.hasNext()) { Map.Entry me = (Map.Entry)it.next();
-   * ClientProxyMembershipID clientId = (ClientProxyMembershipID)me.getKey(); CM cqsToBooleans =
-   * (CM)me.getValue(); if (cqsToBooleans == null) { continue; } Set<CqQuery> cqs =
-   * cqsToBooleans.keySet(); if (cqs.isEmpty()) { continue; } filteredCqs.clear(); Iterator cqIt =
-   * cqs.iterator(); while (cqIt.hasNext()) { CqQueryImpl cQuery = (CqQueryImpl)cqIt.next(); if
-   * (operation == EnumListenerEvent.AFTER_REGION_DESTROY) { try { if (logger.isDebugEnabled()){
-   * logger.debug("Closing CQ on region destroy event. CqName :" + cQuery.getName()); }
-   * cQuery.close(false); } catch (Exception ex) {
-   * logger.debug("Failed to Close CQ on region destroy. CqName :" + cQuery.getName(), ex); }
-   * 
-   * } filteredCqs.put(cQuery.cqName, cqRegionEvent);
-   * cQuery.getVsdStats().updateStats(cqRegionEvent);
-   * 
-   * } if (!filteredCqs.isEmpty()){ ((ClientUpdateMessageImpl)clientMessage).addClientCqs( clientId,
-   * filteredCqs); }
-   * 
-   * }
-   * 
-   * }
-   * 
-   * private void processEntryEvent(EnumListenerEvent operation, CacheEvent event,
-   * ClientUpdateMessage clientMessage, CM<ClientProxyMembershipID, CM<CqQuery, Boolean>> clientIds)
-   * throws CqException { HashSet cqUnfilteredEventsSet_newValue = new HashSet(); HashSet
-   * cqUnfilteredEventsSet_oldValue = new HashSet(); boolean b_cqResults_newValue = false; boolean
-   * b_cqResults_oldValue = false; EntryEvent entryEvent = (EntryEvent)event; Object eventKey =
-   * entryEvent.getKey(); if (operation == EnumListenerEvent.AFTER_CREATE || operation ==
-   * EnumListenerEvent.AFTER_UPDATE) { if (entryEvent.getNewValue() != null) { //We have a new value
-   * to run the query on cqUnfilteredEventsSet_newValue.clear();
-   * cqUnfilteredEventsSet_newValue.add(entryEvent.getNewValue()); } }
-   * 
-   * HashMap matchedCqs = new HashMap(); long executionStartTime = 0; Iterator it =
-   * clientIds.entrySet().iterator(); while (it.hasNext()) { Map.Entry me = (Map.Entry)it.next();
-   * ClientProxyMembershipID clientId = (ClientProxyMembershipID)me.getKey(); if
-   * (logger.isDebugEnabled()) { logger.debug("Processing event for CQ filter, ClientId : " +
-   * clientId); } CM cqsToBooleans = (CM)me.getValue(); if (cqsToBooleans == null) { continue; }
-   * Set<CqQuery> cqs = cqsToBooleans.keySet(); if (cqs.isEmpty()) { continue; } HashMap filteredCqs
-   * = new HashMap(); Iterator cqIt = cqs.iterator(); while (cqIt.hasNext()) { CqQueryImpl cQuery =
-   * (CqQueryImpl)cqIt.next(); b_cqResults_newValue = false; b_cqResults_oldValue = false; if
-   * (cQuery == null || !(cQuery.isRunning())){ continue; } String cqName =
-   * cQuery.getServerCqName(); Integer cqEvent = null; if (matchedCqs.containsKey(cqName)) { if
-   * (logger.isDebugEnabled()){ logger.
-   * debug("Similar cq/query is already processed, getting the cq event-type from the matched cq.");
-   * } cqEvent = (Integer)matchedCqs.get(cqName); } else { boolean error = false; boolean
-   * hasSeenEvent = false; HashSet cqEventKeys = null; synchronized (cQuery) { try { // Apply query
-   * on new value. if (!cqUnfilteredEventsSet_newValue.isEmpty()) { executionStartTime =
-   * this.stats.startCqQueryExecution(); b_cqResults_newValue = evaluateQuery(cQuery, new Object[]
-   * {cqUnfilteredEventsSet_newValue}); this.stats.endCqQueryExecution(executionStartTime); } //
-   * Check if old value is cached, if not apply query on old value. if (cqToCqEventKeysMap != null)
-   * { synchronized (cqToCqEventKeysMap) { if ((cqEventKeys =
-   * (HashSet)cqToCqEventKeysMap.get(cqName)) != null) { hasSeenEvent =
-   * cqEventKeys.contains(eventKey); } } } if (!hasSeenEvent) { // get the oldValue. // In case of
-   * Update, destroy and invalidate. if (operation == EnumListenerEvent.AFTER_UPDATE || operation ==
-   * EnumListenerEvent.AFTER_DESTROY || operation == EnumListenerEvent.AFTER_INVALIDATE) { if
-   * (entryEvent.getOldValue() != null) { cqUnfilteredEventsSet_oldValue.clear();
-   * cqUnfilteredEventsSet_oldValue.add(entryEvent.getOldValue()); // Apply query on old value.
-   * executionStartTime = this.stats.startCqQueryExecution(); b_cqResults_oldValue =
-   * evaluateQuery(cQuery, new Object[] {cqUnfilteredEventsSet_oldValue});
-   * this.stats.endCqQueryExecution(executionStartTime); } } } } catch (Exception ex) { //Any
-   * exception in running the query // should be caught here and buried //because this code is
-   * running inline with the //message processing code and we don't want to //kill that thread error
-   * = true; logger.info( LocalizedStrings.
-   * CqService_ERROR_WHILE_PROCESSING_CQ_ON_THE_EVENT_KEY_0_CQNAME_1_CLIENTID_2_ERROR_3, new
-   * Object[] { ((EntryEvent)event).getKey(), cQuery.getName(), clientId,
-   * ex.getLocalizedMessage()}); }
-   * 
-   * if (error) { cqEvent = Integer.valueOf(MessageType.EXCEPTION); } else { if
-   * (b_cqResults_newValue) { if (hasSeenEvent || b_cqResults_oldValue) { cqEvent =
-   * Integer.valueOf(MessageType.LOCAL_UPDATE); } else { cqEvent =
-   * Integer.valueOf(MessageType.LOCAL_CREATE); } // If its create and caching is enabled, cache the
-   * key for this CQ. if (!hasSeenEvent && cqEventKeys != null) { cqEventKeys.add(eventKey); } }
-   * else if (hasSeenEvent || (b_cqResults_oldValue)) { // Base invalidate operation is treated as
-   * destroy. // When the invalidate comes through, the entry will no longer satisfy // the query
-   * and will need to be deleted. cqEvent = Integer.valueOf(MessageType.LOCAL_DESTROY); // If
-   * caching is enabled, remove this event's key from the cache. if (hasSeenEvent && cqEventKeys !=
-   * null) { cqEventKeys.remove(eventKey); } } }
-   * 
-   * } //end synchronized(cQuery)
-   * 
-   * // Get the matching CQs if any. synchronized (this.matchingCqMap){ String query =
-   * cQuery.getQueryString(); ArrayList matchingCqs = (ArrayList)matchingCqMap.get(query); if
-   * (matchingCqs != null) { Iterator iter = matchingCqs.iterator(); while (iter.hasNext()) { String
-   * matchingCqName = (String)iter.next(); if (!matchingCqName.equals(cqName)){
-   * matchedCqs.put(matchingCqName, cqEvent); } } } }
-   * 
-   * }
-   * 
-   * if (cqEvent != null){ if (logger.isDebugEnabled()) {
-   * logger.debug("Event is added for the CQ, CqName (clientside): " + cQuery.cqName +
-   * " With CQ Op : " + cqEvent + " for Client : " + clientId); } filteredCqs.put(cQuery.cqName,
-   * cqEvent); cQuery.getVsdStats().updateStats(cqEvent); }
-   * 
-   * } // iteration over cqsToBooleans.keySet() if (!filteredCqs.isEmpty()){
-   * logger.debug("Adding event map for client : "+clientId +
-   * " with event map size : "+filteredCqs.size());
-   * ((ClientUpdateMessageImpl)clientMessage).addClientCqs(clientId, filteredCqs); } } // iteration
-   * over clientIds.entrySet() }
-   */
-
   private Integer generateCqRegionEvent(CacheEvent event) {
     Integer cqEvent = null;
     if (event.getOperation().isRegionDestroy()) {
-      cqEvent = Integer.valueOf(MessageType.DESTROY_REGION);
+      cqEvent = MessageType.DESTROY_REGION;
     } else if (event.getOperation().isRegionInvalidate()) {
-      cqEvent = Integer.valueOf(MessageType.INVALIDATE_REGION);
+      cqEvent = MessageType.INVALIDATE_REGION;
     } else if (event.getOperation().isClear()) {
-      cqEvent = Integer.valueOf(MessageType.CLEAR_REGION);
+      cqEvent = MessageType.CLEAR_REGION;
     }
     return cqEvent;
   }
 
-
   /**
    * Manages the CQs created for the base region. This is managed here, instead of on the base
    * region; since the cq could be created on the base region, before base region is created (using
    * newCq()).
    */
-  public void addToBaseRegionToCqNameMap(String regionName, String cqName) {
+  private void addToBaseRegionToCqNameMap(String regionName, String cqName) {
     synchronized (this.baseRegionToCqNameMap) {
       ArrayList<String> cqs = this.baseRegionToCqNameMap.get(regionName);
       if (cqs == null) {
-        cqs = new ArrayList<String>();
+        cqs = new ArrayList<>();
       }
       cqs.add(cqName);
       this.baseRegionToCqNameMap.put(regionName, cqs);
     }
   }
 
-  public void removeFromBaseRegionToCqNameMap(String regionName, String cqName) {
+  void removeFromBaseRegionToCqNameMap(String regionName, String cqName) {
     synchronized (this.baseRegionToCqNameMap) {
       ArrayList<String> cqs = this.baseRegionToCqNameMap.get(regionName);
       if (cqs != null) {
@@ -1864,37 +1541,12 @@ public final class CqServiceImpl implements CqService {
   }
 
   /**
-   * Removes this CQ from CQ event Cache map. This disables the caching events for this CQ.
-   * 
-   * @param cqName
-   */
-  /*
-   * synchronized public void removeCQFromCaching(String cqName){ if (cqToCqEventKeysMap != null) {
-   * // Take a lock on CqQuery object. In processEvents the maps are // handled under CqQuery
-   * object. if (cqToCqEventKeysMap != null){ synchronized (cqToCqEventKeysMap) {
-   * cqToCqEventKeysMap.remove(cqName); } } } }
-   */
-
-  /**
-   * Returns the CQ event cache map.
-   * 
-   * @return HashMap cqToCqEventKeysMap
-   * 
-   *         Caller must synchronize on the returned value in order to inspect.
-   */
-  /*
-   * public HashMap getCqToCqEventKeysMap(){ return cqToCqEventKeysMap; }
-   */
-
-  /**
    * Adds the query from the given CQ to the matched CQ map.
-   * 
-   * @param cq
    */
-  public void addToMatchingCqMap(CqQueryImpl cq) {
+  void addToMatchingCqMap(CqQueryImpl cq) {
     synchronized (this.matchingCqMap) {
       String cqQuery = cq.getQueryString();
-      Set<String> matchingCQs = null;
+      Set<String> matchingCQs;
       if (!matchingCqMap.containsKey(cqQuery)) {
         matchingCQs = Collections.newSetFromMap(new ConcurrentHashMap());
         matchingCqMap.put(cqQuery, matchingCQs);
@@ -1912,10 +1564,8 @@ public final class CqServiceImpl implements CqService {
 
   /**
    * Removes the query from the given CQ from the matched CQ map.
-   * 
-   * @param cq
    */
-  public void removeFromMatchingCqMap(CqQueryImpl cq) {
+  private void removeFromMatchingCqMap(CqQueryImpl cq) {
     synchronized (this.matchingCqMap) {
       String cqQuery = cq.getQueryString();
       if (matchingCqMap.containsKey(cqQuery)) {
@@ -1947,10 +1597,6 @@ public final class CqServiceImpl implements CqService {
    * Applies the query on the event. This method takes care of the performance related changed done
    * to improve the CQ-query performance. When CQ-query is executed first time, it saves the query
    * related information in the execution context and uses that info in later executions.
-   * 
-   * @param cQuery
-   * @param event
-   * @return boolean
    */
   private boolean evaluateQuery(CqQueryImpl cQuery, Object[] event) throws Exception {
     ExecutionContext execContext = cQuery.getQueryExecutionContext();
@@ -1983,19 +1629,6 @@ public final class CqServiceImpl implements CqService {
     return this.cqNameToUserAttributesMap.get(cqName);
   }
 
-  // public static void memberLeft(String poolName) {
-  // if (cqServiceSingleton != null && !cqServiceSingleton.isServer()) {
-  // cqServiceSingleton.sendMemberDisconnectedMessageToCqs(poolName);
-  // }
-  // }
-  //
-  // public static void memberCrashed(String poolName) {
-  // if (cqServiceSingleton != null && !cqServiceSingleton.isServer()) {
-  // cqServiceSingleton.sendMemberDisconnectedMessageToCqs(poolName);
-  // }
-  // }
-  //
-
   @Override
   public void cqsDisconnected(Pool pool) {
     invokeCqsConnected(pool, false);
@@ -2014,7 +1647,7 @@ public final class CqServiceImpl implements CqService {
     // Check to see if we are already connected/disconnected.
     // If state has not changed, do not invoke another connected/disconnected
     synchronized (cqPoolsConnected) {
-      // don't repeatily send same connect/disconnect message to cq's on repeated fails of
+      // don't repeatedly send same connect/disconnect message to cq's on repeated fails of
       // RedundancySatisfier
       if (cqPoolsConnected.containsKey(poolName) && connected == cqPoolsConnected.get(poolName)) {
         return;
@@ -2059,13 +1692,6 @@ public final class CqServiceImpl implements CqService {
           SystemFailure.checkFailure();
           logger.warn(LocalizedMessage
               .create(LocalizedStrings.CqService_ERROR_SENDING_CQ_CONNECTION_STATUS, cqName), t);
-
-          if (t instanceof VirtualMachineError) {
-            logger.warn(LocalizedMessage.create(
-                LocalizedStrings.CqService_VIRTUALMACHINEERROR_PROCESSING_CQLISTENER_FOR_CQ_0,
-                cqName), t);
-            return;
-          }
         }
       }
     }
@@ -2075,7 +1701,4 @@ public final class CqServiceImpl implements CqService {
   public List<String> getAllDurableCqsFromServer(InternalPool pool) {
     return new ServerCQProxyImpl(pool).getAllDurableCqsFromServer();
   }
-
-
 }
-

http://git-wip-us.apache.org/repos/asf/geode/blob/363e50d2/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceStatisticsImpl.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceStatisticsImpl.java b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceStatisticsImpl.java
index ba71143..a675162 100644
--- a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceStatisticsImpl.java
+++ b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceStatisticsImpl.java
@@ -14,11 +14,9 @@
  */
 package org.apache.geode.cache.query.internal.cq;
 
-import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.query.CqServiceStatistics;
 import org.apache.geode.cache.query.CqQuery;
 import org.apache.geode.cache.query.internal.DefaultQueryService;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
 
 /**
  * Provides statistical information about CqService.
@@ -26,24 +24,22 @@ import org.apache.geode.internal.cache.GemFireCacheImpl;
  * @since GemFire 5.5
  */
 public class CqServiceStatisticsImpl implements CqServiceStatistics {
+
   private CqServiceImpl cqService;
-  // private long activeCqs;
-  // private long stoppedCqs;
-  // private long closedCqs;
-  // private long createdCqs;
 
   /**
    * Constructor for CqStatisticsImpl
    * 
    * @param cqs - CqService
    */
-  public CqServiceStatisticsImpl(CqServiceImpl cqs) {
+  CqServiceStatisticsImpl(CqServiceImpl cqs) {
     cqService = cqs;
   }
 
   /**
    * Returns the number of CQs currently executing
    */
+  @Override
   public long numCqsActive() {
     return this.cqService.getCqServiceVsdStats().getNumCqsActive();
   }
@@ -53,6 +49,7 @@ public class CqServiceStatisticsImpl implements CqServiceStatistics {
    * 
    * @return long number of cqs created.
    */
+  @Override
   public long numCqsCreated() {
     return this.cqService.getCqServiceVsdStats().getNumCqsCreated();
   }
@@ -60,6 +57,7 @@ public class CqServiceStatisticsImpl implements CqServiceStatistics {
   /**
    * Returns number of Cqs that are closed.
    */
+  @Override
   public long numCqsClosed() {
     return this.cqService.getCqServiceVsdStats().getNumCqsClosed();
   }
@@ -67,6 +65,7 @@ public class CqServiceStatisticsImpl implements CqServiceStatistics {
   /**
    * Returns number of Cqs that are stopped.
    */
+  @Override
   public long numCqsStopped() {
     return this.cqService.getCqServiceVsdStats().getNumCqsStopped();
   }
@@ -74,20 +73,18 @@ public class CqServiceStatisticsImpl implements CqServiceStatistics {
   /**
    * Returns number of CQs created from the client.
    */
+  @Override
   public long numCqsOnClient() {
     return this.cqService.getCqServiceVsdStats().getNumCqsOnClient();
   }
 
   /**
    * Returns the number of CQs (active + suspended) on the given region.
-   * 
-   * @param regionName
    */
+  @Override
   public long numCqsOnRegion(String regionName) {
-
     DefaultQueryService queryService =
-        (DefaultQueryService) ((GemFireCacheImpl) CacheFactory.getAnyInstance())
-            .getLocalQueryService();
+        (DefaultQueryService) cqService.getInternalCache().getLocalQueryService();
     try {
       CqQuery[] cqs = queryService.getCqs(regionName);