You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/04/19 21:46:39 UTC
[1/2] geode git commit: 2632: refactor code to use InternalCache
instead of GemFireCacheImpl
Repository: geode
Updated Branches:
refs/heads/feature/GEODE-2632-5 [created] bc39e9739
http://git-wip-us.apache.org/repos/asf/geode/blob/bc39e973/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceVsdStats.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceVsdStats.java b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceVsdStats.java
index 8435c4c..5dc7bb0 100644
--- a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceVsdStats.java
+++ b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceVsdStats.java
@@ -25,9 +25,9 @@ import org.apache.geode.cache.query.CqException;
import org.apache.geode.cache.query.CqQuery;
import org.apache.geode.cache.query.internal.DefaultQueryService;
import org.apache.geode.internal.NanoTimer;
+import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.statistics.StatisticsTypeFactoryImpl;
import org.apache.geode.internal.cache.FilterProfile;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.logging.LogService;
/**
@@ -44,35 +44,34 @@ public class CqServiceVsdStats {
private static final StatisticsType _type;
/** Name of the created CQs statistic */
- protected static final String CQS_CREATED = "numCqsCreated";
+ private static final String CQS_CREATED = "numCqsCreated";
/** Name of the active CQs statistic */
- protected static final String CQS_ACTIVE = "numCqsActive";
+ private static final String CQS_ACTIVE = "numCqsActive";
/** Name of the stopped CQs statistic */
- protected static final String CQS_STOPPED = "numCqsStopped";
+ private static final String CQS_STOPPED = "numCqsStopped";
/** Name of the closed CQs statistic */
- protected static final String CQS_CLOSED = "numCqsClosed";
+ private static final String CQS_CLOSED = "numCqsClosed";
/** Name of the client's CQs statistic */
- protected static final String CQS_ON_CLIENT = "numCqsOnClient";
+ private static final String CQS_ON_CLIENT = "numCqsOnClient";
/** Number of clients with CQs statistic */
- protected static final String CLIENTS_WITH_CQS = "numClientsWithCqs";
-
+ private static final String CLIENTS_WITH_CQS = "numClientsWithCqs";
/** CQ query execution time. */
- protected static final String CQ_QUERY_EXECUTION_TIME = "cqQueryExecutionTime";
+ private static final String CQ_QUERY_EXECUTION_TIME = "cqQueryExecutionTime";
/** CQ query execution in progress */
- protected static final String CQ_QUERY_EXECUTION_IN_PROGRESS = "cqQueryExecutionInProgress";
+ private static final String CQ_QUERY_EXECUTION_IN_PROGRESS = "cqQueryExecutionInProgress";
/** Completed CQ query executions */
- protected static final String CQ_QUERY_EXECUTIONS_COMPLETED = "cqQueryExecutionsCompleted";
+ private static final String CQ_QUERY_EXECUTIONS_COMPLETED = "cqQueryExecutionsCompleted";
/** Unique CQs, number of different CQ queries */
- protected static final String UNIQUE_CQ_QUERY = "numUniqueCqQuery";
+ private static final String UNIQUE_CQ_QUERY = "numUniqueCqQuery";
/** Id of the CQs created statistic */
private static final int _numCqsCreatedId;
@@ -104,7 +103,7 @@ public class CqServiceVsdStats {
/** Id for unique CQs, difference in CQ queries */
private static final int _numUniqueCqQuery;
- /**
+ /*
* Static initializer to create and initialize the <code>StatisticsType</code>
*/
static {
@@ -140,7 +139,6 @@ public class CqServiceVsdStats {
_cqQueryExecutionsCompletedId = _type.nameToId(CQ_QUERY_EXECUTIONS_COMPLETED);
_cqQueryExecutionInProgressId = _type.nameToId(CQ_QUERY_EXECUTION_IN_PROGRESS);
_numUniqueCqQuery = _type.nameToId(UNIQUE_CQ_QUERY);
-
}
/** The <code>Statistics</code> instance to which most behavior is delegated */
@@ -152,12 +150,10 @@ public class CqServiceVsdStats {
* @param factory The <code>StatisticsFactory</code> which creates the <code>Statistics</code>
* instance
*/
- public CqServiceVsdStats(StatisticsFactory factory) {
+ CqServiceVsdStats(StatisticsFactory factory) {
this._stats = factory.createAtomicStatistics(_type, "CqServiceStats");
}
- // /////////////////// Instance Methods /////////////////////
-
/**
* Closes the <code>HARegionQueueStats</code>.
*/
@@ -170,14 +166,14 @@ public class CqServiceVsdStats {
*
* @return the current value of the "numCqsCreated" stat
*/
- public long getNumCqsCreated() {
+ long getNumCqsCreated() {
return this._stats.getLong(_numCqsCreatedId);
}
/**
* Increments the "numCqsCreated" stat by 1.
*/
- public void incCqsCreated() {
+ void incCqsCreated() {
this._stats.incLong(_numCqsCreatedId, 1);
}
@@ -186,21 +182,21 @@ public class CqServiceVsdStats {
*
* @return the current value of the "numCqsActive" stat
*/
- public long getNumCqsActive() {
+ long getNumCqsActive() {
return this._stats.getLong(_numCqsActiveId);
}
/**
* Increments the "numCqsActive" stat by 1.
*/
- public void incCqsActive() {
+ void incCqsActive() {
this._stats.incLong(_numCqsActiveId, 1);
}
/**
* Decrements the "numCqsActive" stat by 1.
*/
- public void decCqsActive() {
+ void decCqsActive() {
this._stats.incLong(_numCqsActiveId, -1);
}
@@ -209,21 +205,21 @@ public class CqServiceVsdStats {
*
* @return the current value of the "numCqsStopped" stat
*/
- public long getNumCqsStopped() {
+ long getNumCqsStopped() {
return this._stats.getLong(_numCqsStoppedId);
}
/**
* Increments the "numCqsStopped" stat by 1.
*/
- public void incCqsStopped() {
+ void incCqsStopped() {
this._stats.incLong(_numCqsStoppedId, 1);
}
/**
* Decrements the "numCqsStopped" stat by 1.
*/
- public void decCqsStopped() {
+ void decCqsStopped() {
this._stats.incLong(_numCqsStoppedId, -1);
}
@@ -232,14 +228,14 @@ public class CqServiceVsdStats {
*
* @return the current value of the "numCqsClosed" stat
*/
- public long getNumCqsClosed() {
+ long getNumCqsClosed() {
return this._stats.getLong(_numCqsClosedId);
}
/**
* Increments the "numCqsClosed" stat by 1.
*/
- public void incCqsClosed() {
+ void incCqsClosed() {
this._stats.incLong(_numCqsClosedId, 1);
}
@@ -248,21 +244,21 @@ public class CqServiceVsdStats {
*
* @return the current value of the "numCqsOnClient" stat
*/
- public long getNumCqsOnClient() {
+ long getNumCqsOnClient() {
return this._stats.getLong(_numCqsOnClientId);
}
/**
* Increments the "numCqsOnClient" stat by 1.
*/
- public void incCqsOnClient() {
+ void incCqsOnClient() {
this._stats.incLong(_numCqsOnClientId, 1);
}
/**
* Decrements the "numCqsOnClient" stat by 1.
*/
- public void decCqsOnClient() {
+ void decCqsOnClient() {
this._stats.incLong(_numCqsOnClientId, -1);
}
@@ -278,21 +274,21 @@ public class CqServiceVsdStats {
/**
* Increments the "numClientsWithCqs" stat by 1.
*/
- public void incClientsWithCqs() {
+ void incClientsWithCqs() {
this._stats.incLong(_numClientsWithCqsId, 1);
}
/**
* Decrements the "numCqsOnClient" stat by 1.
*/
- public void decClientsWithCqs() {
+ void decClientsWithCqs() {
this._stats.incLong(_numClientsWithCqsId, -1);
}
/**
* Start the CQ Query Execution time.
*/
- public long startCqQueryExecution() {
+ long startCqQueryExecution() {
this._stats.incInt(_cqQueryExecutionInProgressId, 1);
return NanoTimer.getTime();
}
@@ -302,7 +298,7 @@ public class CqServiceVsdStats {
*
* @param start long time value.
*/
- public void endCqQueryExecution(long start) {
+ void endCqQueryExecution(long start) {
long ts = NanoTimer.getTime();
this._stats.incLong(_cqQueryExecutionTimeId, ts - start);
this._stats.incInt(_cqQueryExecutionInProgressId, -1);
@@ -321,14 +317,14 @@ public class CqServiceVsdStats {
/**
* Increments number of Unique queries.
*/
- public void incUniqueCqQuery() {
+ void incUniqueCqQuery() {
this._stats.incInt(_numUniqueCqQuery, 1);
}
/**
* Decrements number of unique Queries.
*/
- public void decUniqueCqQuery() {
+ void decUniqueCqQuery() {
this._stats.incInt(_numUniqueCqQuery, -1);
}
@@ -338,11 +334,8 @@ public class CqServiceVsdStats {
* tests.
* <p>
* Returns the number of CQs (active + suspended) on the given region.
- *
- * @param regionName
*/
- public long numCqsOnRegion(String regionName) {
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+ public long numCqsOnRegion(final InternalCache cache, String regionName) {
if (cache == null) {
return 0;
}
http://git-wip-us.apache.org/repos/asf/geode/blob/bc39e973/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteCQ.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteCQ.java b/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteCQ.java
index bcf9806..9bddbc7 100644
--- a/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteCQ.java
+++ b/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteCQ.java
@@ -27,7 +27,6 @@ import org.apache.geode.cache.query.internal.DefaultQueryService;
import org.apache.geode.cache.query.internal.cq.CqService;
import org.apache.geode.cache.query.internal.cq.ServerCQ;
import org.apache.geode.distributed.internal.DistributionStats;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.tier.CachedRegionHelper;
import org.apache.geode.internal.cache.tier.Command;
import org.apache.geode.internal.cache.tier.MessageType;
@@ -85,8 +84,7 @@ public class ExecuteCQ extends BaseCQCommand {
ServerCQ cqQuery = null;
try {
- qService =
- (DefaultQueryService) ((GemFireCacheImpl) crHelper.getCache()).getLocalQueryService();
+ qService = (DefaultQueryService) crHelper.getCache().getLocalQueryService();
// Authorization check
AuthorizeRequest authzRequest = servConn.getAuthzRequest();
http://git-wip-us.apache.org/repos/asf/geode/blob/bc39e973/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteCQ61.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteCQ61.java b/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteCQ61.java
index f333b4b..de61445 100755
--- a/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteCQ61.java
+++ b/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteCQ61.java
@@ -28,7 +28,6 @@ import org.apache.geode.cache.query.internal.cq.CqServiceImpl;
import org.apache.geode.cache.query.internal.cq.CqServiceProvider;
import org.apache.geode.cache.query.internal.cq.ServerCQImpl;
import org.apache.geode.distributed.internal.DistributionStats;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.tier.CachedRegionHelper;
import org.apache.geode.internal.cache.tier.Command;
import org.apache.geode.internal.cache.tier.MessageType;
@@ -111,8 +110,7 @@ public class ExecuteCQ61 extends BaseCQCommand {
ServerCQImpl cqQuery = null;
try {
- qService =
- (DefaultQueryService) ((GemFireCacheImpl) crHelper.getCache()).getLocalQueryService();
+ qService = (DefaultQueryService) crHelper.getCache().getLocalQueryService();
// Authorization check
AuthorizeRequest authzRequest = servConn.getAuthzRequest();
http://git-wip-us.apache.org/repos/asf/geode/blob/bc39e973/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetDurableCQs.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetDurableCQs.java b/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetDurableCQs.java
index eac9ed3..a2d201d 100755
--- a/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetDurableCQs.java
+++ b/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetDurableCQs.java
@@ -22,7 +22,6 @@ import java.util.List;
import org.apache.geode.cache.query.CqException;
import org.apache.geode.cache.query.internal.DefaultQueryService;
import org.apache.geode.cache.query.internal.cq.CqService;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.tier.CachedRegionHelper;
import org.apache.geode.internal.cache.tier.Command;
import org.apache.geode.internal.cache.tier.MessageType;
@@ -64,8 +63,7 @@ public class GetDurableCQs extends BaseCQCommand {
CqService cqServiceForExec = null;
try {
- qService =
- (DefaultQueryService) ((GemFireCacheImpl) crHelper.getCache()).getLocalQueryService();
+ qService = (DefaultQueryService) crHelper.getCache().getLocalQueryService();
this.securityService.authorizeClusterRead();
http://git-wip-us.apache.org/repos/asf/geode/blob/bc39e973/geode-cq/src/test/java/org/apache/geode/cache/query/cq/dunit/CqStatsDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/test/java/org/apache/geode/cache/query/cq/dunit/CqStatsDUnitTest.java b/geode-cq/src/test/java/org/apache/geode/cache/query/cq/dunit/CqStatsDUnitTest.java
index 7ace0e8..9840f78 100644
--- a/geode-cq/src/test/java/org/apache/geode/cache/query/cq/dunit/CqStatsDUnitTest.java
+++ b/geode-cq/src/test/java/org/apache/geode/cache/query/cq/dunit/CqStatsDUnitTest.java
@@ -14,20 +14,15 @@
*/
package org.apache.geode.cache.query.cq.dunit;
-import org.junit.experimental.categories.Category;
-import org.junit.Test;
-
import static org.junit.Assert.*;
-import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
-import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
-import org.apache.geode.test.junit.categories.DistributedTest;
-
import java.util.Collection;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.query.CqException;
-import org.apache.geode.cache.query.CqQuery;
import org.apache.geode.cache.query.CqServiceStatistics;
import org.apache.geode.cache.query.CqStatistics;
import org.apache.geode.cache.query.QueryService;
@@ -40,7 +35,7 @@ import org.apache.geode.cache.query.internal.cq.CqServiceImpl;
import org.apache.geode.cache.query.internal.cq.CqServiceVsdStats;
import org.apache.geode.cache.query.internal.cq.InternalCqQuery;
import org.apache.geode.cache30.CacheSerializableRunnable;
-import org.apache.geode.cache30.CacheTestCase;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.Invoke;
import org.apache.geode.test.dunit.LogWriterUtils;
@@ -48,27 +43,26 @@ import org.apache.geode.test.dunit.NetworkUtils;
import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.Wait;
+import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
+import org.apache.geode.test.junit.categories.DistributedTest;
/**
- * This class tests the ContiunousQuery mechanism in GemFire. This includes the test with different
+ * This class tests the ContinuousQuery mechanism in GemFire. This includes the test with different
* data activities.
- *
*/
@Category(DistributedTest.class)
public class CqStatsDUnitTest extends JUnit4CacheTestCase {
+ // TODO: delete this use of CqQueryDUnitTest
private CqQueryDUnitTest cqDUnitTest = new CqQueryDUnitTest();
- public CqStatsDUnitTest() {
- super();
- }
-
@Override
public final void postSetUp() throws Exception {
// avoid IllegalStateException from HandShake by connecting all vms to
// system before creating pool
getSystem();
Invoke.invokeInEveryVM(new SerializableRunnable("getSystem") {
+ @Override
public void run() {
getSystem();
}
@@ -81,6 +75,7 @@ public class CqStatsDUnitTest extends JUnit4CacheTestCase {
public void validateCQStats(VM vm, final String cqName, final int creates, final int updates,
final int deletes, final int totalEvents, final int cqListenerInvocations) {
vm.invoke(new CacheSerializableRunnable("Validate CQs") {
+ @Override
public void run2() throws CacheException {
LogWriterUtils.getLogWriter().info("### Validating CQ Stats. ### " + cqName);
// Get CQ Service.
@@ -161,6 +156,7 @@ public class CqStatsDUnitTest extends JUnit4CacheTestCase {
final int stopped, final int closed, final int cqsOnClient, final int cqsOnRegion,
final int clientsWithCqs) {
vm.invoke(new CacheSerializableRunnable("Validate CQ Service Stats") {
+ @Override
public void run2() throws CacheException {
LogWriterUtils.getLogWriter().info("### Validating CQ Service Stats. ### ");
// Get CQ Service.
@@ -176,7 +172,7 @@ public class CqStatsDUnitTest extends JUnit4CacheTestCase {
CqServiceVsdStats cqServiceVsdStats = null;
try {
cqServiceVsdStats =
- ((CqServiceImpl) ((DefaultQueryService) qService).getCqService()).stats;
+ ((CqServiceImpl) ((DefaultQueryService) qService).getCqService()).stats();
} catch (CqException e) {
// TODO Auto-generated catch block
e.printStackTrace();
@@ -189,7 +185,7 @@ public class CqStatsDUnitTest extends JUnit4CacheTestCase {
+ cqServiceStats.numCqsCreated() + " CQs active: " + cqServiceStats.numCqsActive()
+ " CQs stopped: " + cqServiceStats.numCqsStopped() + " CQs closed: "
+ cqServiceStats.numCqsClosed() + " CQs on Client: " + cqServiceStats.numCqsOnClient()
- + " CQs on region /root/regionA : " + cqServiceVsdStats.numCqsOnRegion("/root/regionA")
+ + " CQs on region /root/regionA : " + cqServiceVsdStats.numCqsOnRegion(GemFireCacheImpl.getInstance(), "/root/regionA")
+ " Clients with CQs: " + cqServiceVsdStats.getNumClientsWithCqs());
@@ -223,7 +219,7 @@ public class CqStatsDUnitTest extends JUnit4CacheTestCase {
// Check for CQs on region.
if (cqsOnRegion != CqQueryDUnitTest.noTest) {
assertEquals("Number of CQs on region /root/regionA mismatch", cqsOnRegion,
- cqServiceVsdStats.numCqsOnRegion("/root/regionA"));
+ cqServiceVsdStats.numCqsOnRegion(GemFireCacheImpl.getInstance(), "/root/regionA"));
}
// Check for clients with CQs count.
http://git-wip-us.apache.org/repos/asf/geode/blob/bc39e973/geode-cq/src/test/java/org/apache/geode/cache/query/cq/dunit/CqStatsUsingPoolDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/test/java/org/apache/geode/cache/query/cq/dunit/CqStatsUsingPoolDUnitTest.java b/geode-cq/src/test/java/org/apache/geode/cache/query/cq/dunit/CqStatsUsingPoolDUnitTest.java
index d6068f1..4313595 100644
--- a/geode-cq/src/test/java/org/apache/geode/cache/query/cq/dunit/CqStatsUsingPoolDUnitTest.java
+++ b/geode-cq/src/test/java/org/apache/geode/cache/query/cq/dunit/CqStatsUsingPoolDUnitTest.java
@@ -14,21 +14,16 @@
*/
package org.apache.geode.cache.query.cq.dunit;
-import org.junit.experimental.categories.Category;
-import org.junit.Test;
-
import static org.junit.Assert.*;
-import org.apache.geode.distributed.*;
-import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
-import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
-import org.apache.geode.test.junit.categories.DistributedTest;
+import java.util.Collection;
+import java.util.Properties;
-import java.util.*;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.query.CqException;
-import org.apache.geode.cache.query.CqQuery;
import org.apache.geode.cache.query.CqServiceStatistics;
import org.apache.geode.cache.query.CqStatistics;
import org.apache.geode.cache.query.QueryService;
@@ -41,7 +36,8 @@ import org.apache.geode.cache.query.internal.cq.CqServiceImpl;
import org.apache.geode.cache.query.internal.cq.CqServiceVsdStats;
import org.apache.geode.cache.query.internal.cq.InternalCqQuery;
import org.apache.geode.cache30.CacheSerializableRunnable;
-import org.apache.geode.cache30.CacheTestCase;
+import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.Invoke;
import org.apache.geode.test.dunit.LogWriterUtils;
@@ -49,21 +45,19 @@ import org.apache.geode.test.dunit.NetworkUtils;
import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.Wait;
+import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
+import org.apache.geode.test.junit.categories.DistributedTest;
/**
- * This class tests the ContiunousQuery mechanism in GemFire. This includes the test with different
+ * This class tests the ContinuousQuery mechanism in GemFire. This includes the test with different
* data activities.
- *
*/
@Category(DistributedTest.class)
public class CqStatsUsingPoolDUnitTest extends JUnit4CacheTestCase {
+ // TODO: delete this use of CqQueryUsingPoolDUnitTest
private CqQueryUsingPoolDUnitTest cqDUnitTest = new CqQueryUsingPoolDUnitTest();
- public CqStatsUsingPoolDUnitTest() {
- super();
- }
-
@Override
public Properties getDistributedSystemProperties() {
Properties result = super.getDistributedSystemProperties();
@@ -77,6 +71,7 @@ public class CqStatsUsingPoolDUnitTest extends JUnit4CacheTestCase {
// system before creating pool
getSystem();
Invoke.invokeInEveryVM(new SerializableRunnable("getSystem") {
+ @Override
public void run() {
getSystem();
}
@@ -89,6 +84,7 @@ public class CqStatsUsingPoolDUnitTest extends JUnit4CacheTestCase {
private void validateCQStats(VM vm, final String cqName, final int creates, final int updates,
final int deletes, final int totalEvents, final int cqListenerInvocations) {
vm.invoke(new CacheSerializableRunnable("Validate CQs") {
+ @Override
public void run2() throws CacheException {
LogWriterUtils.getLogWriter().info("### Validating CQ Stats. ### " + cqName);
// Get CQ Service.
@@ -169,6 +165,7 @@ public class CqStatsUsingPoolDUnitTest extends JUnit4CacheTestCase {
final int stopped, final int closed, final int cqsOnClient, final int cqsOnRegion,
final int clientsWithCqs) {
vm.invoke(new CacheSerializableRunnable("Validate CQ Service Stats") {
+ @Override
public void run2() throws CacheException {
LogWriterUtils.getLogWriter().info("### Validating CQ Service Stats. ### ");
// Get CQ Service.
@@ -184,7 +181,7 @@ public class CqStatsUsingPoolDUnitTest extends JUnit4CacheTestCase {
CqServiceVsdStats cqServiceVsdStats = null;
try {
cqServiceVsdStats =
- ((CqServiceImpl) ((DefaultQueryService) qService).getCqService()).stats;
+ ((CqServiceImpl) ((DefaultQueryService) qService).getCqService()).stats();
} catch (CqException e) {
// TODO Auto-generated catch block
e.printStackTrace();
@@ -197,7 +194,7 @@ public class CqStatsUsingPoolDUnitTest extends JUnit4CacheTestCase {
+ cqServiceStats.numCqsCreated() + " CQs active: " + cqServiceStats.numCqsActive()
+ " CQs stopped: " + cqServiceStats.numCqsStopped() + " CQs closed: "
+ cqServiceStats.numCqsClosed() + " CQs on Client: " + cqServiceStats.numCqsOnClient()
- + " CQs on region /root/regionA : " + cqServiceVsdStats.numCqsOnRegion("/root/regionA")
+ + " CQs on region /root/regionA : " + cqServiceVsdStats.numCqsOnRegion(GemFireCacheImpl.getInstance(), "/root/regionA")
+ " Clients with CQs: " + cqServiceVsdStats.getNumClientsWithCqs());
@@ -231,7 +228,7 @@ public class CqStatsUsingPoolDUnitTest extends JUnit4CacheTestCase {
// Check for CQs on region.
if (cqsOnRegion != CqQueryUsingPoolDUnitTest.noTest) {
assertEquals("Number of CQs on region /root/regionA mismatch", cqsOnRegion,
- cqServiceVsdStats.numCqsOnRegion("/root/regionA"));
+ cqServiceVsdStats.numCqsOnRegion(GemFireCacheImpl.getInstance(), "/root/regionA"));
}
// Check for clients with CQs count.
http://git-wip-us.apache.org/repos/asf/geode/blob/bc39e973/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesFunctionCollector.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesFunctionCollector.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesFunctionCollector.java
index 66c4c0a..a587287 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesFunctionCollector.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesFunctionCollector.java
@@ -15,10 +15,8 @@
package org.apache.geode.cache.lucene.internal.distributed;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.Logger;
@@ -27,7 +25,7 @@ import org.apache.geode.cache.execute.FunctionException;
import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.cache.lucene.LuceneQuery;
import org.apache.geode.distributed.DistributedMember;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.logging.LogService;
/**
@@ -44,16 +42,15 @@ import org.apache.geode.internal.logging.LogService;
*/
public class TopEntriesFunctionCollector
implements ResultCollector<TopEntriesCollector, TopEntries> {
+ private static final Logger logger = LogService.getLogger();
+
// Use this instance to perform reduce operation
- final CollectorManager<TopEntriesCollector> manager;
+ private final CollectorManager<TopEntriesCollector> manager;
- final String id;
-
- // Instance of gemfire cache to check status and other utility methods
- final private GemFireCacheImpl cache;
- private static final Logger logger = LogService.getLogger();
+ private final String id;
private final Collection<TopEntriesCollector> subResults = new ArrayList<>();
+
private TopEntriesCollector mergedResults;
public TopEntriesFunctionCollector() {
@@ -65,8 +62,7 @@ public class TopEntriesFunctionCollector
}
public TopEntriesFunctionCollector(LuceneFunctionContext<TopEntriesCollector> context,
- GemFireCacheImpl cache) {
- this.cache = cache;
+ InternalCache cache) {
id = cache == null ? String.valueOf(this.hashCode()) : cache.getName();
int limit = context == null ? 0 : context.getLimit();
[2/2] geode git commit: 2632: refactor code to use InternalCache
instead of GemFireCacheImpl
Posted by kl...@apache.org.
2632: refactor code to use InternalCache instead of GemFireCacheImpl
* minor cleanup also
Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/bc39e973
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/bc39e973
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/bc39e973
Branch: refs/heads/feature/GEODE-2632-5
Commit: bc39e97390d23537428ea3e7cd6c2e553f54bc73
Parents: 76c4983
Author: Kirk Lund <kl...@apache.org>
Authored: Wed Apr 19 14:41:42 2017 -0700
Committer: Kirk Lund <kl...@apache.org>
Committed: Wed Apr 19 14:41:42 2017 -0700
----------------------------------------------------------------------
.../query/internal/cq/CqServiceProvider.java | 19 +-
.../query/internal/cq/spi/CqServiceFactory.java | 8 +-
.../cache/query/internal/cq/ClientCQImpl.java | 94 +--
.../cache/query/internal/cq/CqQueryImpl.java | 90 ++-
.../query/internal/cq/CqServiceFactoryImpl.java | 17 +-
.../cache/query/internal/cq/CqServiceImpl.java | 668 ++++---------------
.../internal/cq/CqServiceStatisticsImpl.java | 21 +-
.../query/internal/cq/CqServiceVsdStats.java | 73 +-
.../cache/tier/sockets/command/ExecuteCQ.java | 4 +-
.../cache/tier/sockets/command/ExecuteCQ61.java | 4 +-
.../tier/sockets/command/GetDurableCQs.java | 4 +-
.../cache/query/cq/dunit/CqStatsDUnitTest.java | 32 +-
.../cq/dunit/CqStatsUsingPoolDUnitTest.java | 35 +-
.../TopEntriesFunctionCollector.java | 18 +-
14 files changed, 316 insertions(+), 771 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/geode/blob/bc39e973/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceProvider.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceProvider.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceProvider.java
index cded9c3..d629352 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceProvider.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceProvider.java
@@ -16,7 +16,7 @@ package org.apache.geode.cache.query.internal.cq;
import org.apache.geode.cache.query.internal.cq.spi.CqServiceFactory;
import org.apache.geode.distributed.internal.DistributionConfig;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
import java.io.DataInput;
import java.io.IOException;
@@ -26,17 +26,19 @@ import java.util.ServiceLoader;
public class CqServiceProvider {
private static final CqServiceFactory factory;
- // System property to maintain the CQ event references for optimizing the updates.
- // This will allows to run the CQ query only once during update events.
+
+ /**
+ * System property to maintain the CQ event references for optimizing the updates.
+ * This will allow running the CQ query only once during update events.
+ */
public static boolean MAINTAIN_KEYS = Boolean
- .valueOf(System.getProperty(DistributionConfig.GEMFIRE_PREFIX + "cq.MAINTAIN_KEYS", "true"))
- .booleanValue();
+ .valueOf(System.getProperty(DistributionConfig.GEMFIRE_PREFIX + "cq.MAINTAIN_KEYS", "true"));
+
/**
* A debug flag used for testing vMotion during CQ registration
*/
public static boolean VMOTION_DURING_CQ_REGISTRATION_FLAG = false;
-
static {
ServiceLoader<CqServiceFactory> loader = ServiceLoader.load(CqServiceFactory.class);
Iterator<CqServiceFactory> itr = loader.iterator();
@@ -48,8 +50,7 @@ public class CqServiceProvider {
}
}
- public static CqService create(GemFireCacheImpl cache) {
-
+ public static CqService create(InternalCache cache) {
if (factory == null) {
return new MissingCqService();
}
@@ -63,10 +64,8 @@ public class CqServiceProvider {
} else {
return factory.readCqQuery(in);
}
-
}
private CqServiceProvider() {
-
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/bc39e973/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/spi/CqServiceFactory.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/spi/CqServiceFactory.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/spi/CqServiceFactory.java
index 68ebbd5..2b8a47e 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/spi/CqServiceFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/spi/CqServiceFactory.java
@@ -19,16 +19,16 @@ import java.io.IOException;
import org.apache.geode.cache.query.internal.cq.CqService;
import org.apache.geode.cache.query.internal.cq.ServerCQ;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
public interface CqServiceFactory {
- public void initialize();
+ void initialize();
/**
* Create a new CqService for the given cache
*/
- public CqService create(GemFireCacheImpl cache);
+ CqService create(InternalCache cache);
- public ServerCQ readCqQuery(DataInput in) throws ClassNotFoundException, IOException;
+ ServerCQ readCqQuery(DataInput in) throws ClassNotFoundException, IOException;
}
http://git-wip-us.apache.org/repos/asf/geode/blob/bc39e973/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/ClientCQImpl.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/ClientCQImpl.java b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/ClientCQImpl.java
index 00a0aa5..1a331da 100644
--- a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/ClientCQImpl.java
+++ b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/ClientCQImpl.java
@@ -35,7 +35,7 @@ import org.apache.geode.cache.query.CqResults;
import org.apache.geode.cache.query.CqStatusListener;
import org.apache.geode.cache.query.RegionNotFoundException;
import org.apache.geode.cache.query.internal.CqStateImpl;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
@@ -57,7 +57,7 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
*/
private volatile ConcurrentLinkedQueue<CqEventImpl> queuedEvents = null;
- public final Object queuedEventsSynchObject = new Object();
+ final Object queuedEventsSynchObject = new Object();
private boolean connected = false;
@@ -73,22 +73,15 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
return this.cqName;
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.cache.query.internal.InternalCqQuery2#getCQProxy()
- */
- public ServerCQProxyImpl getCQProxy() {
+ ServerCQProxyImpl getCQProxy() {
return this.cqProxy;
}
/**
* Initializes the connection using the pool from the client region. Also sets the cqBaseRegion
* value of this CQ.
- *
- * @throws CqException
*/
- public void initConnectionProxy() throws CqException, RegionNotFoundException {
+ private void initConnectionProxy() throws CqException, RegionNotFoundException {
cqBaseRegion = (LocalRegion) cqService.getCache().getRegion(regionName);
// Check if the region exists on the local cache.
// In the current implementation of 5.1 the Server Connection is (ConnectionProxyImpl)
@@ -113,17 +106,9 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
throw new CqException(
"Unable to get the connection pool. The Region does not have a pool configured.");
}
-
- // if (proxy == null) {
- // throw new
- // CqException(LocalizedStrings.CqQueryImpl_UNABLE_TO_GET_THE_CONNECTIONPROXY_THE_REGION_MAY_NOT_HAVE_A_BRIDGEWRITER_OR_BRIDGECLIENT_INSTALLED_ON_IT.toLocalizedString());
- // } else if(!proxy.getEstablishCallbackConnection()){
- // throw new
- // CqException(LocalizedStrings.CqQueryImpl_THE_ESTABLISHCALLBACKCONNECTION_ON_BRIDGEWRITER_CLIENT_INSTALLED_ON_REGION_0_IS_SET_TO_FALSE
- // .toLocalizedString(regionName));
- // }
}
+ @Override
public void close() throws CqClosedException, CqException {
this.close(true);
}
@@ -182,15 +167,15 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
if (cqProxy == null || !sendRequestToServer || isClosed) {
// Stat update.
if (stateBeforeClosing == CqStateImpl.RUNNING) {
- cqService.stats.decCqsActive();
+ cqService.stats().decCqsActive();
} else if (stateBeforeClosing == CqStateImpl.STOPPED) {
- cqService.stats.decCqsStopped();
+ cqService.stats().decCqsStopped();
}
// Set the state to close, and update stats
this.cqState.setState(CqStateImpl.CLOSED);
- cqService.stats.incCqsClosed();
- cqService.stats.decCqsOnClient();
+ cqService.stats().incCqsClosed();
+ cqService.stats().decCqsOnClient();
if (this.stats != null)
this.stats.close();
} else {
@@ -201,7 +186,7 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
if (exception != null) {
throw new CqException(
LocalizedStrings.CqQueryImpl_FAILED_TO_CLOSE_THE_CQ_CQNAME_0_ERROR_FROM_LAST_ENDPOINT_1
- .toLocalizedString(new Object[] {this.cqName, exception.getLocalizedMessage()}),
+ .toLocalizedString(this.cqName, exception.getLocalizedMessage()),
exception.getCause());
} else {
throw new CqException(
@@ -261,31 +246,28 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
/**
* Clears the resource used by CQ.
- *
- * @throws CqException
*/
+ @Override
protected void cleanup() throws CqException {
this.cqService.removeFromBaseRegionToCqNameMap(this.regionName, this.getServerCqName());
}
+ @Override
public CqAttributes getCqAttributes() {
return cqAttributes;
}
-
-
/**
* @return Returns the cqListeners.
*/
public CqListener[] getCqListeners() {
-
return cqAttributes.getCqListeners();
}
-
/**
* Start or resume executing the query.
*/
+ @Override
public void execute() throws CqClosedException, RegionNotFoundException, CqException {
executeCqOnRedundantsAndPrimary(false);
}
@@ -293,7 +275,8 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
/**
* Start or resume executing the query. Gets or updates the CQ results and returns them.
*/
- public CqResults executeWithInitialResults()
+ @Override
+ public <E> CqResults<E> executeWithInitialResults()
throws CqClosedException, RegionNotFoundException, CqException {
synchronized (queuedEventsSynchObject) {
@@ -320,16 +303,7 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
CqResults initialResults;
try {
initialResults = (CqResults) executeCqOnRedundantsAndPrimary(true);
- } catch (CqClosedException e) {
- queuedEvents = null;
- throw e;
- } catch (RegionNotFoundException e) {
- queuedEvents = null;
- throw e;
- } catch (CqException e) {
- queuedEvents = null;
- throw e;
- } catch (RuntimeException e) {
+ } catch (RegionNotFoundException|CqException|RuntimeException e) {
queuedEvents = null;
throw e;
}
@@ -343,6 +317,7 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
if (!this.queuedEvents.isEmpty()) {
try {
Runnable r = new Runnable() {
+ @Override
public void run() {
Object[] eventArray = null;
if (CqQueryImpl.testHook != null) {
@@ -395,7 +370,7 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
* @param executeWithInitialResults boolean
* @return Object SelectResults in case of executeWithInitialResults
*/
- public Object executeCqOnRedundantsAndPrimary(boolean executeWithInitialResults)
+ private Object executeCqOnRedundantsAndPrimary(boolean executeWithInitialResults)
throws CqClosedException, RegionNotFoundException, CqException {
Object initialResults = null;
@@ -462,7 +437,7 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
String errMsg =
LocalizedStrings.CqQueryImpl_FAILED_TO_EXECUTE_THE_CQ_CQNAME_0_QUERY_STRING_IS_1_ERROR_FROM_LAST_SERVER_2
.toLocalizedString(
- new Object[] {this.cqName, this.queryString, ex.getLocalizedMessage()});
+ this.cqName, this.queryString, ex.getLocalizedMessage());
if (logger.isDebugEnabled()) {
logger.debug(errMsg, ex);
}
@@ -498,8 +473,8 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
}
}
// Update CQ-base region for book keeping.
- this.cqService.stats.incCqsActive();
- this.cqService.stats.decCqsStopped();
+ this.cqService.stats().incCqsActive();
+ this.cqService.stats().decCqsStopped();
return initialResults;
}
@@ -509,23 +484,22 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
* @return true if shutdown in progress else false.
*/
private boolean shutdownInProgress() {
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+ InternalCache cache = cqService.getInternalCache();
if (cache == null || cache.isClosed()) {
return true; // bail, things are shutting down
}
-
String reason = cqProxy.getPool().getCancelCriterion().cancelInProgress();
if (reason != null) {
return true;
}
return false;
-
}
/**
* Stop or pause executing the query.
*/
+ @Override
public void stop() throws CqClosedException, CqException {
boolean isStopped = false;
synchronized (this.cqState) {
@@ -558,8 +532,8 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
if (cqProxy == null || isStopped) {
// Change state and stats on the client side
this.cqState.setState(CqStateImpl.STOPPED);
- this.cqService.stats.incCqsStopped();
- this.cqService.stats.decCqsActive();
+ this.cqService.stats().incCqsStopped();
+ this.cqService.stats().decCqsActive();
if (logger.isDebugEnabled()) {
logger.debug("Successfully stopped the CQ. {}", cqName);
}
@@ -568,7 +542,7 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
if (exception != null) {
throw new CqException(
LocalizedStrings.CqQueryImpl_FAILED_TO_STOP_THE_CQ_CQNAME_0_ERROR_FROM_LAST_SERVER_1
- .toLocalizedString(new Object[] {this.cqName, exception.getLocalizedMessage()}),
+ .toLocalizedString(this.cqName, exception.getLocalizedMessage()),
exception.getCause());
} else {
throw new CqException(
@@ -579,24 +553,15 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
}
}
+ @Override
public CqAttributesMutator getCqAttributesMutator() {
return (CqAttributesMutator) this.cqAttributes;
}
-
- public ConcurrentLinkedQueue<CqEventImpl> getQueuedEvents() {
+ ConcurrentLinkedQueue<CqEventImpl> getQueuedEvents() {
return this.queuedEvents;
}
-
-
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.geode.cache.query.internal.InternalCqQuery2#setProxyCache(org.apache.geode.cache.
- * client.internal.ProxyCache)
- */
@Override
public void setProxyCache(ProxyCache proxyCache) {
this.proxyCache = proxyCache;
@@ -612,7 +577,6 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
@Override
public void createOn(Connection conn, boolean isDurable) {
-
byte regionDataPolicyOrdinal = getCqBaseRegion() == null ? (byte) 0
: getCqBaseRegion().getAttributes().getDataPolicy().ordinal;
@@ -620,6 +584,4 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
this.cqProxy.createOn(getName(), conn, getQueryString(), state, isDurable,
regionDataPolicyOrdinal);
}
-
-
}
http://git-wip-us.apache.org/repos/asf/geode/blob/bc39e973/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqQueryImpl.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqQueryImpl.java b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqQueryImpl.java
index 22b4137..a76cb62 100644
--- a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqQueryImpl.java
+++ b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqQueryImpl.java
@@ -21,11 +21,9 @@ import java.util.Set;
import org.apache.logging.log4j.Logger;
import org.apache.geode.StatisticsFactory;
-import org.apache.geode.cache.Cache;
import org.apache.geode.cache.query.CqClosedException;
import org.apache.geode.cache.query.CqEvent;
import org.apache.geode.cache.query.CqException;
-import org.apache.geode.cache.query.CqExistsException;
import org.apache.geode.cache.query.CqState;
import org.apache.geode.cache.query.CqStatistics;
import org.apache.geode.cache.query.Query;
@@ -38,7 +36,7 @@ import org.apache.geode.cache.query.internal.CqStateImpl;
import org.apache.geode.cache.query.internal.DefaultQuery;
import org.apache.geode.cache.query.internal.ExecutionContext;
import org.apache.geode.cache.query.internal.QueryExecutionContext;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.InternalLogWriter;
@@ -58,13 +56,13 @@ public abstract class CqQueryImpl implements InternalCqQuery {
protected String queryString;
- protected static final Object TOKEN = new Object();
+ static final Object TOKEN = new Object();
- protected LocalRegion cqBaseRegion;
+ LocalRegion cqBaseRegion;
protected Query query = null;
- protected InternalLogWriter securityLogWriter;
+ InternalLogWriter securityLogWriter;
protected CqServiceImpl cqService;
@@ -72,14 +70,14 @@ public abstract class CqQueryImpl implements InternalCqQuery {
protected boolean isDurable = false;
- // Stats counters
- protected CqStatisticsImpl cqStats;
+ /** Stats counters */
+ private CqStatisticsImpl cqStats;
protected CqQueryVsdStats stats;
protected final CqStateImpl cqState = new CqStateImpl();
- protected ExecutionContext queryExecutionContext = null;
+ private ExecutionContext queryExecutionContext = null;
public static TestHook testHook = null;
@@ -100,6 +98,7 @@ public abstract class CqQueryImpl implements InternalCqQuery {
/**
* returns CQ name
*/
+ @Override
public String getName() {
return this.cqName;
}
@@ -109,6 +108,7 @@ public abstract class CqQueryImpl implements InternalCqQuery {
this.cqName = cqName;
}
+ @Override
public void setCqService(CqService cqService) {
this.cqService = (CqServiceImpl) cqService;
}
@@ -121,25 +121,25 @@ public abstract class CqQueryImpl implements InternalCqQuery {
return this.regionName;
}
- public void updateCqCreateStats() {
+ void updateCqCreateStats() {
// Initialize the VSD statistics
StatisticsFactory factory = cqService.getCache().getDistributedSystem();
this.stats = new CqQueryVsdStats(factory, getServerCqName());
this.cqStats = new CqStatisticsImpl(this);
// Update statistics with CQ creation.
- this.cqService.stats.incCqsStopped();
- this.cqService.stats.incCqsCreated();
- this.cqService.stats.incCqsOnClient();
+ this.cqService.stats().incCqsStopped();
+ this.cqService.stats().incCqsCreated();
+ this.cqService.stats().incCqsOnClient();
}
/**
* Validates the CQ. Checks for cq constraints. Also sets the base region name.
*/
- public void validateCq() {
- Cache cache = cqService.getCache();
+ void validateCq() {
+ InternalCache cache = cqService.getInternalCache();
DefaultQuery locQuery =
- (DefaultQuery) ((GemFireCacheImpl) cache).getLocalQueryService().newQuery(this.queryString);
+ (DefaultQuery) cache.getLocalQueryService().newQuery(this.queryString);
this.query = locQuery;
// assert locQuery != null;
@@ -221,10 +221,8 @@ public abstract class CqQueryImpl implements InternalCqQuery {
/**
* Removes the CQ from CQ repository.
- *
- * @throws CqException
*/
- protected void removeFromCqMap() throws CqException {
+ void removeFromCqMap() throws CqException {
try {
cqService.removeCq(this.getServerCqName());
} catch (Exception ex) {
@@ -243,6 +241,7 @@ public abstract class CqQueryImpl implements InternalCqQuery {
/**
* Returns the QueryString of this CQ.
*/
+ @Override
public String getQueryString() {
return queryString;
}
@@ -252,23 +251,16 @@ public abstract class CqQueryImpl implements InternalCqQuery {
*
* @return the Query for the query string
*/
+ @Override
public Query getQuery() {
return query;
}
-
- /**
- * @see org.apache.geode.cache.query.CqQuery#getStatistics()
- */
+ @Override
public CqStatistics getStatistics() {
return cqStats;
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.cache.query.internal.InternalCqQuery2#getCqBaseRegion()
- */
@Override
public LocalRegion getCqBaseRegion() {
return this.cqBaseRegion;
@@ -279,11 +271,12 @@ public abstract class CqQueryImpl implements InternalCqQuery {
/**
* @return Returns the Region name on which this cq is created.
*/
- public String getBaseRegionName() {
+ String getBaseRegionName() {
return this.regionName;
}
+ @Override
public abstract String getServerCqName();
/**
@@ -291,15 +284,11 @@ public abstract class CqQueryImpl implements InternalCqQuery {
*
* @return STOPPED RUNNING or CLOSED
*/
+ @Override
public CqState getState() {
return this.cqState;
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.cache.query.internal.InternalCqQuery2#setCqState(int)
- */
@Override
public void setCqState(int state) {
if (this.isClosed()) {
@@ -309,18 +298,13 @@ public abstract class CqQueryImpl implements InternalCqQuery {
synchronized (cqState) {
if (state == CqStateImpl.RUNNING) {
- if (this.isRunning()) {
- // throw new
- // IllegalStateException(LocalizedStrings.CqQueryImpl_CQ_IS_NOT_IN_RUNNING_STATE_STOP_CQ_DOES_NOT_APPLY_CQNAME_0
- // .toLocalizedString(this.cqName));
- }
this.cqState.setState(CqStateImpl.RUNNING);
- this.cqService.stats.incCqsActive();
- this.cqService.stats.decCqsStopped();
+ this.cqService.stats().incCqsActive();
+ this.cqService.stats().decCqsStopped();
} else if (state == CqStateImpl.STOPPED) {
this.cqState.setState(CqStateImpl.STOPPED);
- this.cqService.stats.incCqsStopped();
- this.cqService.stats.decCqsActive();
+ this.cqService.stats().incCqsStopped();
+ this.cqService.stats().decCqsActive();
} else if (state == CqStateImpl.CLOSING) {
this.cqState.setState(state);
}
@@ -332,7 +316,7 @@ public abstract class CqQueryImpl implements InternalCqQuery {
*
* @param cqEvent object
*/
- public void updateStats(CqEvent cqEvent) {
+ void updateStats(CqEvent cqEvent) {
this.stats.updateStats(cqEvent); // Stats for VSD
}
@@ -341,15 +325,17 @@ public abstract class CqQueryImpl implements InternalCqQuery {
*
* @return true if running, false otherwise
*/
+ @Override
public boolean isRunning() {
return this.cqState.isRunning();
}
/**
- * Return true if the CQ is in Sstopped state
+ * Return true if the CQ is in stopped state
*
* @return true if stopped, false otherwise
*/
+ @Override
public boolean isStopped() {
return this.cqState.isStopped();
}
@@ -359,6 +345,7 @@ public abstract class CqQueryImpl implements InternalCqQuery {
*
* @return true if closed, false otherwise
*/
+ @Override
public boolean isClosed() {
return this.cqState.isClosed();
}
@@ -377,6 +364,7 @@ public abstract class CqQueryImpl implements InternalCqQuery {
*
* @return true if durable, false otherwise
*/
+ @Override
public boolean isDurable() {
return this.isDurable;
}
@@ -391,22 +379,22 @@ public abstract class CqQueryImpl implements InternalCqQuery {
return stats;
}
- public ExecutionContext getQueryExecutionContext() {
+ ExecutionContext getQueryExecutionContext() {
return queryExecutionContext;
}
- public void setQueryExecutionContext(ExecutionContext queryExecutionContext) {
+ private void setQueryExecutionContext(ExecutionContext queryExecutionContext) {
this.queryExecutionContext = queryExecutionContext;
}
/** Test Hook */
public interface TestHook {
- public void pauseUntilReady();
+ void pauseUntilReady();
- public void ready();
+ void ready();
- public int numQueuedEvents();
+ int numQueuedEvents();
- public void setEventCount(int count);
+ void setEventCount(int count);
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/bc39e973/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceFactoryImpl.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceFactoryImpl.java b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceFactoryImpl.java
index db90632..9cc2eea 100644
--- a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceFactoryImpl.java
+++ b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceFactoryImpl.java
@@ -22,7 +22,7 @@ import java.util.Map;
import org.apache.geode.cache.query.internal.cq.spi.CqServiceFactory;
import org.apache.geode.internal.Version;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.tier.Command;
import org.apache.geode.internal.cache.tier.MessageType;
import org.apache.geode.internal.cache.tier.sockets.CommandInitializer;
@@ -36,14 +36,13 @@ import org.apache.geode.internal.cache.tier.sockets.command.StopCQ;
public class CqServiceFactoryImpl implements CqServiceFactory {
+ @Override
public void initialize() {
- {
- Map<Version, Command> versions = new HashMap<Version, Command>();
- versions.put(Version.GFE_57, ExecuteCQ.getCommand());
- versions.put(Version.GFE_61, ExecuteCQ61.getCommand());
- CommandInitializer.registerCommand(MessageType.EXECUTECQ_MSG_TYPE, versions);
- CommandInitializer.registerCommand(MessageType.EXECUTECQ_WITH_IR_MSG_TYPE, versions);
- }
+ Map<Version, Command> versions = new HashMap<>();
+ versions.put(Version.GFE_57, ExecuteCQ.getCommand());
+ versions.put(Version.GFE_61, ExecuteCQ61.getCommand());
+ CommandInitializer.registerCommand(MessageType.EXECUTECQ_MSG_TYPE, versions);
+ CommandInitializer.registerCommand(MessageType.EXECUTECQ_WITH_IR_MSG_TYPE, versions);
CommandInitializer.registerCommand(MessageType.GETCQSTATS_MSG_TYPE,
Collections.singletonMap(Version.GFE_57, GetCQStats.getCommand()));
@@ -58,7 +57,7 @@ public class CqServiceFactoryImpl implements CqServiceFactory {
}
@Override
- public CqService create(GemFireCacheImpl cache) {
+ public CqService create(InternalCache cache) {
return new CqServiceImpl(cache);
}
http://git-wip-us.apache.org/repos/asf/geode/blob/bc39e973/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceImpl.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceImpl.java b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceImpl.java
index f1ca832..d0a8176 100644
--- a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceImpl.java
+++ b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceImpl.java
@@ -14,19 +14,63 @@
*/
package org.apache.geode.cache.query.internal.cq;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.logging.log4j.Logger;
+
import org.apache.geode.InvalidDeltaException;
import org.apache.geode.StatisticsFactory;
import org.apache.geode.SystemFailure;
-import org.apache.geode.cache.*;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheEvent;
+import org.apache.geode.cache.CacheLoaderException;
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.RegionEvent;
+import org.apache.geode.cache.TimeoutException;
import org.apache.geode.cache.client.Pool;
-import org.apache.geode.cache.client.internal.*;
-import org.apache.geode.cache.query.*;
-import org.apache.geode.cache.query.internal.*;
+import org.apache.geode.cache.client.internal.GetEventValueOp;
+import org.apache.geode.cache.client.internal.InternalPool;
+import org.apache.geode.cache.client.internal.QueueManager;
+import org.apache.geode.cache.client.internal.ServerCQProxyImpl;
+import org.apache.geode.cache.client.internal.UserAttributes;
+import org.apache.geode.cache.query.CqAttributes;
+import org.apache.geode.cache.query.CqClosedException;
+import org.apache.geode.cache.query.CqException;
+import org.apache.geode.cache.query.CqExistsException;
+import org.apache.geode.cache.query.CqListener;
+import org.apache.geode.cache.query.CqQuery;
+import org.apache.geode.cache.query.CqServiceStatistics;
+import org.apache.geode.cache.query.CqStatusListener;
+import org.apache.geode.cache.query.QueryException;
+import org.apache.geode.cache.query.QueryInvalidException;
+import org.apache.geode.cache.query.RegionNotFoundException;
+import org.apache.geode.cache.query.SelectResults;
+import org.apache.geode.cache.query.internal.CompiledSelect;
+import org.apache.geode.cache.query.internal.CqQueryVsdStats;
+import org.apache.geode.cache.query.internal.CqStateImpl;
+import org.apache.geode.cache.query.internal.DefaultQuery;
+import org.apache.geode.cache.query.internal.ExecutionContext;
import org.apache.geode.distributed.internal.DistributionAdvisor.Profile;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.i18n.StringId;
import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile;
-import org.apache.geode.internal.cache.*;
+import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.FilterProfile;
+import org.apache.geode.internal.cache.FilterRoutingInfo;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.tier.MessageType;
import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy;
@@ -35,31 +79,19 @@ import org.apache.geode.internal.cache.tier.sockets.Part;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.log4j.LocalizedMessage;
-import org.apache.logging.log4j.Logger;
-
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
/**
- * @since GemFire 5.5
- *
- * Implements the CqService functionality.
- *
- */
-/**
+ * Implements the CqService functionality.
*
+ * @since GemFire 5.5
*/
public final class CqServiceImpl implements CqService {
private static final Logger logger = LogService.getLogger();
- private static final Integer MESSAGE_TYPE_LOCAL_CREATE =
- Integer.valueOf(MessageType.LOCAL_CREATE);
- private static final Integer MESSAGE_TYPE_LOCAL_UPDATE =
- Integer.valueOf(MessageType.LOCAL_UPDATE);
- private static final Integer MESSAGE_TYPE_LOCAL_DESTROY =
- Integer.valueOf(MessageType.LOCAL_DESTROY);
- private static final Integer MESSAGE_TYPE_EXCEPTION = Integer.valueOf(MessageType.EXCEPTION);
+ private static final Integer MESSAGE_TYPE_LOCAL_CREATE = MessageType.LOCAL_CREATE;
+ private static final Integer MESSAGE_TYPE_LOCAL_UPDATE = MessageType.LOCAL_UPDATE;
+ private static final Integer MESSAGE_TYPE_LOCAL_DESTROY = MessageType.LOCAL_DESTROY;
+ private static final Integer MESSAGE_TYPE_EXCEPTION = MessageType.EXCEPTION;
/**
* System property to evaluate the query even though the initial results are not required when cq
@@ -67,25 +99,24 @@ public final class CqServiceImpl implements CqService {
*/
public static boolean EXECUTE_QUERY_DURING_INIT = Boolean
.valueOf(System
- .getProperty(DistributionConfig.GEMFIRE_PREFIX + "cq.EXECUTE_QUERY_DURING_INIT", "true"))
- .booleanValue();
+ .getProperty(DistributionConfig.GEMFIRE_PREFIX + "cq.EXECUTE_QUERY_DURING_INIT", "true"));
private static final String CQ_NAME_PREFIX = "GfCq";
- private final Cache cache;
+ private final InternalCache cache;
/**
* Manages cq pools to determine if a status of connect or disconnect needs to be sent out
*/
- private final HashMap<String, Boolean> cqPoolsConnected = new HashMap<String, Boolean>();
-
+ private final HashMap<String, Boolean> cqPoolsConnected = new HashMap<>();
/**
* Manages CQ objects. uses serverCqName as key and CqQueryImpl as value
*
- * @guarded.By cqQueryMapLock
+ * GuardedBy cqQueryMapLock
*/
- private volatile HashMap<String, CqQueryImpl> cqQueryMap = new HashMap<String, CqQueryImpl>();
+ private volatile HashMap<String, CqQueryImpl> cqQueryMap = new HashMap<>();
+
private final Object cqQueryMapLock = new Object();
private volatile boolean isRunning = false;
@@ -93,36 +124,21 @@ public final class CqServiceImpl implements CqService {
/**
* Used by client when multiuser-authentication is true.
*/
- private final HashMap<String, UserAttributes> cqNameToUserAttributesMap =
- new HashMap<String, UserAttributes>();
-
- // private boolean isServer = true;
-
- /*
- * // Map to manage CQ to satisfied CQ events (keys) for optimizing updates. private final HashMap
- * cqToCqEventKeysMap = CqService.MAINTAIN_KEYS ? new HashMap() : null;
- */
+ private final HashMap<String, UserAttributes> cqNameToUserAttributesMap = new HashMap<>();
// Map to manage the similar CQs (having same query - performance optimization).
// With query as key and Set of CQs as values.
private final ConcurrentHashMap matchingCqMap;
// CQ Service statistics
- public final CqServiceStatisticsImpl cqServiceStats;
- public final CqServiceVsdStats stats;
+ private final CqServiceStatisticsImpl cqServiceStats;
+ private final CqServiceVsdStats stats;
// CQ identifier, also used in auto generated CQ names
private volatile long cqId = 1;
- /**
- * Used to synchronize access to CQs in the repository
- */
- final Object cqSync = new Object();
-
/* This is to manage region to CQs map, client side book keeping. */
- private HashMap<String, ArrayList<String>> baseRegionToCqNameMap =
- new HashMap<String, ArrayList<String>>();
-
+ private HashMap<String, ArrayList<String>> baseRegionToCqNameMap = new HashMap<>();
/**
* Access and modification to the contents of this map do not necessarily need to be lock
@@ -135,33 +151,24 @@ public final class CqServiceImpl implements CqService {
/**
* Constructor.
- *
- * @param c The cache used for the service
+ *
+ * @param cache The cache used for the service
*/
- public CqServiceImpl(final Cache c) {
- if (c == null) {
+ public CqServiceImpl(final InternalCache cache) {
+ if (cache == null) {
throw new IllegalStateException(LocalizedStrings.CqService_CACHE_IS_NULL.toLocalizedString());
}
- GemFireCacheImpl gfc = (GemFireCacheImpl) c;
- gfc.getCancelCriterion().checkCancelInProgress(null);
-
- this.cache = gfc;
+ cache.getCancelCriterion().checkCancelInProgress(null);
+ this.cache = cache;
// Initialize the Map which maintains the matching cqs.
this.matchingCqMap = new ConcurrentHashMap<String, HashSet<String>>();
// Initialize the VSD statistics
- StatisticsFactory factory = cache.getDistributedSystem();
+ StatisticsFactory factory = this.cache.getDistributedSystem();
this.stats = new CqServiceVsdStats(factory);
this.cqServiceStats = new CqServiceStatisticsImpl(this);
-
- // final LoggingThreadGroup group =
- // LoggingThreadGroup.createThreadGroup("CqExecutor Threads", logger);
-
- // if (this.cache.getCacheServers().isEmpty()) {
- // isServer = false;
- // }
}
/**
@@ -171,13 +178,14 @@ public final class CqServiceImpl implements CqService {
return this.cache;
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.cache.query.internal.InternalCqService#newCq(java.lang.String,
- * java.lang.String, org.apache.geode.cache.query.CqAttributes,
- * org.apache.geode.cache.client.internal.ServerCQProxy, boolean)
- */
+ public InternalCache getInternalCache() {
+ return this.cache;
+ }
+
+ public CqServiceVsdStats stats() {
+ return this.stats;
+ }
+
@Override
public synchronized ClientCQ newCq(String cqName, String queryString, CqAttributes cqAttributes,
InternalPool pool, boolean isDurable)
@@ -242,22 +250,15 @@ public final class CqServiceImpl implements CqService {
return cQuery;
}
-
/**
* Executes the given CqQuery, if the CqQuery for that name is not there it registers the one and
* executes. This is called on the Server.
*
- * @param cqName
- * @param queryString
- * @param cqState
- * @param clientProxyId
- * @param ccn
* @param manageEmptyRegions whether to update the 6.1 emptyRegions map held in the CCN
* @param regionDataPolicy the data policy of the region associated with the query. This is only
* needed if manageEmptyRegions is true.
* @param emptyRegionsMap map of empty regions.
* @throws IllegalStateException if this is called at client side.
- * @throws CqException
*/
@Override
public synchronized ServerCQ executeCq(String cqName, String queryString, int cqState,
@@ -271,7 +272,7 @@ public final class CqServiceImpl implements CqService {
}
String serverCqName = constructServerCqName(cqName, clientProxyId);
- ServerCQImpl cQuery = null;
+ ServerCQImpl cQuery;
// If this CQ is not yet registered in Server, register CQ.
if (!isCqExists(serverCqName)) {
@@ -292,7 +293,6 @@ public final class CqServiceImpl implements CqService {
logger.info(LocalizedMessage.create(
LocalizedStrings.CqService_EXCEPTION_WHILE_REGISTERING_CQ_ON_SERVER_CQNAME___0,
cQuery.getName()));
- cQuery = null;
throw cqe;
}
@@ -308,6 +308,7 @@ public final class CqServiceImpl implements CqService {
return cQuery;
}
+ @Override
public void resumeCQ(int cqState, ServerCQ cQuery) {
// Initialize the state of CQ.
if (((CqStateImpl) cQuery.getState()).getState() != cqState) {
@@ -324,25 +325,10 @@ public final class CqServiceImpl implements CqService {
}
}
- /*
- * public void addToCqEventKeysMap(CqQuery cq){ if (cqToCqEventKeysMap != null) { synchronized
- * (cqToCqEventKeysMap){ String serverCqName = ((CqQueryImpl)cq).getServerCqName(); if
- * (!cqToCqEventKeysMap.containsKey(serverCqName)){ cqToCqEventKeysMap.put(serverCqName, new
- * HashSet()); if (_logger.isDebugEnabled()) {
- * _logger.debug("CQ Event key maintenance for CQ, CqName: " + serverCqName + " is Enabled." +
- * " key maintenance map size is: " + cqToCqEventKeysMap.size()); } } } // synchronized } }
- */
-
- public boolean hasCq() {
- HashMap<String, CqQueryImpl> cqMap = cqQueryMap;
- return (cqMap.size() > 0);
- }
-
-
/**
* Adds the given CQ and cqQuery object into the CQ map.
*/
- public void addToCqMap(CqQueryImpl cq) throws CqExistsException, CqException {
+ void addToCqMap(CqQueryImpl cq) throws CqExistsException, CqException {
// On server side cqName will be server side cqName.
String sCqName = cq.getServerCqName();
if (logger.isDebugEnabled()) {
@@ -355,7 +341,7 @@ public final class CqServiceImpl implements CqService {
.toLocalizedString(sCqName));
}
synchronized (cqQueryMapLock) {
- HashMap<String, CqQueryImpl> tmpCqQueryMap = new HashMap<String, CqQueryImpl>(cqQueryMap);
+ HashMap<String, CqQueryImpl> tmpCqQueryMap = new HashMap<>(cqQueryMap);
try {
tmpCqQueryMap.put(sCqName, cq);
} catch (Exception ex) {
@@ -377,66 +363,34 @@ public final class CqServiceImpl implements CqService {
/**
* Removes given CQ from the cqMap..
*/
- public void removeCq(String cqName) {
+ void removeCq(String cqName) {
// On server side cqName will be server side cqName.
synchronized (cqQueryMapLock) {
- HashMap<String, CqQueryImpl> tmpCqQueryMap = new HashMap<String, CqQueryImpl>(cqQueryMap);
+ HashMap<String, CqQueryImpl> tmpCqQueryMap = new HashMap<>(cqQueryMap);
tmpCqQueryMap.remove(cqName);
this.cqNameToUserAttributesMap.remove(cqName);
cqQueryMap = tmpCqQueryMap;
}
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.geode.cache.query.internal.InternalCqService#getClientCqFromServer(org.apache.geode.
- * internal.cache.tier.sockets.ClientProxyMembershipID, java.lang.String)
- */
@Override
public CqQuery getClientCqFromServer(ClientProxyMembershipID clientProxyId, String clientCqName) {
// On server side cqName will be server side cqName.
HashMap<String, CqQueryImpl> cqMap = cqQueryMap;
- return (CqQuery) cqMap.get(this.constructServerCqName(clientCqName, clientProxyId));
+ return cqMap.get(this.constructServerCqName(clientCqName, clientProxyId));
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.cache.query.internal.InternalCqService#getCq(java.lang.String)
- */
@Override
public InternalCqQuery getCq(String cqName) {
// On server side cqName will be server side cqName.
- return (InternalCqQuery) cqQueryMap.get(cqName);
+ return cqQueryMap.get(cqName);
}
- /**
- * Clears the CQ Query Map.
- */
- public void clearCqQueryMap() {
- // On server side cqName will be server side cqName.
- synchronized (cqQueryMapLock) {
- cqQueryMap = new HashMap<String, CqQueryImpl>();
- }
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.cache.query.internal.InternalCqService#getAllCqs()
- */
@Override
public Collection<? extends InternalCqQuery> getAllCqs() {
return cqQueryMap.values();
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.cache.query.internal.InternalCqService#getAllCqs(java.lang.String)
- */
@Override
public Collection<? extends InternalCqQuery> getAllCqs(final String regionName)
throws CqException {
@@ -445,7 +399,7 @@ public final class CqServiceImpl implements CqService {
LocalizedStrings.CqService_NULL_ARGUMENT_0.toLocalizedString("regionName"));
}
- String[] cqNames = null;
+ String[] cqNames;
synchronized (this.baseRegionToCqNameMap) {
ArrayList<String> cqs = this.baseRegionToCqNameMap.get(regionName);
@@ -456,7 +410,7 @@ public final class CqServiceImpl implements CqService {
cqs.toArray(cqNames);
}
- ArrayList<InternalCqQuery> cQueryList = new ArrayList<InternalCqQuery>();
+ ArrayList<InternalCqQuery> cQueryList = new ArrayList<>();
for (int cqCnt = 0; cqCnt < cqNames.length; cqCnt++) {
InternalCqQuery cq = getCq(cqNames[cqCnt]);
if (cq != null) {
@@ -467,34 +421,16 @@ public final class CqServiceImpl implements CqService {
return cQueryList;
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.cache.query.internal.InternalCqService#executeAllClientCqs()
- */
@Override
public synchronized void executeAllClientCqs() throws CqException {
executeCqs(this.getAllCqs());
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.geode.cache.query.internal.InternalCqService#executeAllRegionCqs(java.lang.String)
- */
@Override
public synchronized void executeAllRegionCqs(final String regionName) throws CqException {
executeCqs(getAllCqs(regionName));
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.geode.cache.query.internal.InternalCqService#executeCqs(org.apache.geode.cache.query
- * .CqQuery[])
- */
@Override
public synchronized void executeCqs(Collection<? extends InternalCqQuery> cqs)
throws CqException {
@@ -503,53 +439,31 @@ public final class CqServiceImpl implements CqService {
}
String cqName = null;
for (InternalCqQuery internalCq : cqs) {
- CqQuery cq = (CqQuery) internalCq;
+ CqQuery cq = internalCq;
if (!cq.isClosed() && cq.isStopped()) {
try {
cqName = cq.getName();
cq.execute();
- } catch (QueryException qe) {
- if (logger.isDebugEnabled()) {
- logger.debug("Failed to execute the CQ, CqName : {} Error : {}", cqName,
- qe.getMessage());
- }
- } catch (CqClosedException cce) {
+ } catch (QueryException|CqClosedException e) {
if (logger.isDebugEnabled()) {
logger.debug("Failed to execute the CQ, CqName : {} Error : {}", cqName,
- cce.getMessage());
+ e.getMessage());
}
}
}
}
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.cache.query.internal.InternalCqService#stopAllClientCqs()
- */
@Override
public synchronized void stopAllClientCqs() throws CqException {
stopCqs(this.getAllCqs());
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.cache.query.internal.InternalCqService#stopAllRegionCqs(java.lang.String)
- */
@Override
public synchronized void stopAllRegionCqs(final String regionName) throws CqException {
stopCqs(this.getAllCqs(regionName));
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.geode.cache.query.internal.InternalCqService#stopCqs(org.apache.geode.cache.query.
- * CqQuery[])
- */
@Override
public synchronized void stopCqs(Collection<? extends InternalCqQuery> cqs) throws CqException {
final boolean isDebugEnabled = logger.isDebugEnabled();
@@ -567,29 +481,20 @@ public final class CqServiceImpl implements CqService {
String cqName = null;
for (InternalCqQuery internalCqQuery : cqs) {
- CqQuery cq = (CqQuery) internalCqQuery;
+ CqQuery cq = internalCqQuery;
if (!cq.isClosed() && cq.isRunning()) {
try {
cqName = cq.getName();
cq.stop();
- } catch (QueryException qe) {
- if (isDebugEnabled) {
- logger.debug("Failed to stop the CQ, CqName : {} Error : {}", cqName, qe.getMessage());
- }
- } catch (CqClosedException cce) {
+ } catch (QueryException|CqClosedException e) {
if (isDebugEnabled) {
- logger.debug("Failed to stop the CQ, CqName : {} Error : {}", cqName, cce.getMessage());
+ logger.debug("Failed to stop the CQ, CqName : {} Error : {}", cqName, e.getMessage());
}
}
}
}
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.cache.query.internal.InternalCqService#closeCqs(java.lang.String)
- */
@Override
public void closeCqs(final String regionName) throws CqException {
Collection<? extends InternalCqQuery> cqs = this.getAllCqs(regionName);
@@ -603,8 +508,8 @@ public final class CqServiceImpl implements CqService {
// invoked on the server
cq.close(false);
} else {
- // @todo grid: if regionName has a pool check its keepAlive
- boolean keepAlive = ((GemFireCacheImpl) this.cache).keepDurableSubscriptionsAlive();
+ // TODO: grid: if regionName has a pool check its keepAlive
+ boolean keepAlive = this.cache.keepDurableSubscriptionsAlive();
if (cq.isDurable() && keepAlive) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.CqService_NOT_SENDING_CQ_CLOSE_TO_THE_SERVER_AS_IT_IS_A_DURABLE_CQ));
@@ -614,14 +519,9 @@ public final class CqServiceImpl implements CqService {
}
}
- } catch (QueryException qe) {
+ } catch (QueryException|CqClosedException e) {
if (logger.isDebugEnabled()) {
- logger.debug("Failed to close the CQ, CqName : {} Error : {}", cqName, qe.getMessage());
- }
- } catch (CqClosedException cce) {
- if (logger.isDebugEnabled()) {
- logger.debug("Failed to close the CQ, CqName : {} Error : {}", cqName,
- cce.getMessage());
+ logger.debug("Failed to close the CQ, CqName : {} Error : {}", cqName, e.getMessage());
}
}
}
@@ -630,10 +530,6 @@ public final class CqServiceImpl implements CqService {
/**
* Called directly on server side.
- *
- * @param cqName
- * @param clientId
- * @throws CqException
*/
@Override
public void stopCq(String cqName, ClientProxyMembershipID clientId) throws CqException {
@@ -650,8 +546,6 @@ public final class CqServiceImpl implements CqService {
try {
HashMap<String, CqQueryImpl> cqMap = cqQueryMap;
if (!cqMap.containsKey(serverCqName)) {
- // throw new
- // CqException(LocalizedStrings.CqService_CQ_NOT_FOUND_FAILED_TO_STOP_THE_SPECIFIED_CQ_0.toLocalizedString(serverCqName));
/*
* gregp 052808: We should silently fail here instead of throwing error. This is to deal
* with races in recovery
@@ -689,15 +583,8 @@ public final class CqServiceImpl implements CqService {
}
// Send stop message to peers.
cQuery.getCqBaseRegion().getFilterProfile().stopCq(cQuery);
-
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.cache.query.internal.InternalCqService#closeCq(java.lang.String,
- * org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID)
- */
@Override
public void closeCq(String cqName, ClientProxyMembershipID clientProxyId) throws CqException {
String serverCqName = cqName;
@@ -713,9 +600,6 @@ public final class CqServiceImpl implements CqService {
try {
HashMap<String, CqQueryImpl> cqMap = cqQueryMap;
if (!cqMap.containsKey(serverCqName)) {
- // throw new
- // CqException(LocalizedStrings.CqService_CQ_NOT_FOUND_FAILED_TO_CLOSE_THE_SPECIFIED_CQ_0
- // .toLocalizedString(serverCqName));
/*
* gregp 052808: We should silently fail here instead of throwing error. This is to deal
* with races in recovery
@@ -791,12 +675,6 @@ public final class CqServiceImpl implements CqService {
}
}
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.cache.query.internal.InternalCqService#closeAllCqs(boolean)
- */
@Override
public void closeAllCqs(boolean clientInitiated) {
closeAllCqs(clientInitiated, getAllCqs());
@@ -807,21 +685,13 @@ public final class CqServiceImpl implements CqService {
* CqQuerys created by other VMs are unaffected.
*/
private void closeAllCqs(boolean clientInitiated, Collection<? extends InternalCqQuery> cqs) {
- closeAllCqs(clientInitiated, cqs,
- ((GemFireCacheImpl) this.cache).keepDurableSubscriptionsAlive());
+ closeAllCqs(clientInitiated, cqs, this.cache.keepDurableSubscriptionsAlive());
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.cache.query.internal.InternalCqService#closeAllCqs(boolean,
- * org.apache.geode.cache.query.CqQuery[], boolean)
- */
@Override
public void closeAllCqs(boolean clientInitiated, Collection<? extends InternalCqQuery> cqs,
boolean keepAlive) {
- // CqQuery[] cqs = getAllCqs();
if (cqs != null) {
String cqName = null;
if (logger.isDebugEnabled()) {
@@ -830,7 +700,6 @@ public final class CqServiceImpl implements CqService {
for (InternalCqQuery cQuery : cqs) {
try {
cqName = cQuery.getName();
- // boolean keepAlive = ((GemFireCache)this.cache).keepDurableSubscriptionsAlive();
if (isServer()) {
cQuery.close(false);
@@ -847,47 +716,26 @@ public final class CqServiceImpl implements CqService {
}
}
}
- } catch (QueryException cqe) {
+ } catch (QueryException|CqClosedException e) {
if (!isRunning()) {
// Not cache shutdown
logger
.warn(LocalizedMessage.create(LocalizedStrings.CqService_FAILED_TO_CLOSE_CQ__0___1,
- new Object[] {cqName, cqe.getMessage()}));
+ new Object[] {cqName, e.getMessage()}));
}
if (logger.isDebugEnabled()) {
- logger.debug(cqe.getMessage(), cqe);
- }
- } catch (CqClosedException cqe) {
- if (!isRunning()) {
- // Not cache shutdown
- logger
- .warn(LocalizedMessage.create(LocalizedStrings.CqService_FAILED_TO_CLOSE_CQ__0___1,
- new Object[] {cqName, cqe.getMessage()}));
- }
- if (logger.isDebugEnabled()) {
- logger.debug(cqe.getMessage(), cqe);
+ logger.debug(e.getMessage(), e);
}
}
}
}
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.cache.query.internal.InternalCqService#getCqStatistics()
- */
@Override
public CqServiceStatistics getCqStatistics() {
return cqServiceStats;
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.cache.query.internal.InternalCqService#closeClientCqs(org.apache.geode.
- * internal.cache.tier.sockets.ClientProxyMembershipID)
- */
@Override
public void closeClientCqs(ClientProxyMembershipID clientProxyId) throws CqException {
final boolean isDebugEnabled = logger.isDebugEnabled();
@@ -899,30 +747,19 @@ public final class CqServiceImpl implements CqService {
CqQueryImpl cQuery = (CqQueryImpl) cq;
try {
cQuery.close(false);
- } catch (QueryException qe) {
+ } catch (QueryException|CqClosedException e) {
if (isDebugEnabled) {
logger.debug("Failed to close the CQ, CqName : {} Error : {}", cQuery.getName(),
- qe.getMessage());
- }
- } catch (CqClosedException cce) {
- if (isDebugEnabled) {
- logger.debug("Failed to close the CQ, CqName : {} Error : {}", cQuery.getName(),
- cce.getMessage());
+ e.getMessage());
}
}
}
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.cache.query.internal.InternalCqService#getAllClientCqs(org.apache.geode.
- * internal.cache.tier.sockets.ClientProxyMembershipID)
- */
@Override
public List<ServerCQ> getAllClientCqs(ClientProxyMembershipID clientProxyId) {
Collection<? extends InternalCqQuery> cqs = getAllCqs();
- ArrayList<ServerCQ> clientCqs = new ArrayList<ServerCQ>();
+ ArrayList<ServerCQ> clientCqs = new ArrayList<>();
for (InternalCqQuery cq : cqs) {
ServerCQImpl cQuery = (ServerCQImpl) cq;
@@ -934,23 +771,16 @@ public final class CqServiceImpl implements CqService {
return clientCqs;
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.geode.cache.query.internal.InternalCqService#getAllDurableClientCqs(org.apache.geode
- * .internal.cache.tier.sockets.ClientProxyMembershipID)
- */
@Override
public List<String> getAllDurableClientCqs(ClientProxyMembershipID clientProxyId)
throws CqException {
if (clientProxyId == null) {
throw new CqException(
LocalizedStrings.CqService_UNABLE_TO_RETRIEVE_DURABLE_CQS_FOR_CLIENT_PROXY_ID
- .toLocalizedString(clientProxyId));
+ .toLocalizedString(null));
}
List<ServerCQ> cqs = getAllClientCqs(clientProxyId);
- ArrayList<String> durableClientCqs = new ArrayList<String>();
+ ArrayList<String> durableClientCqs = new ArrayList<>();
for (ServerCQ cq : cqs) {
ServerCQImpl cQuery = (ServerCQImpl) cq;
@@ -966,9 +796,6 @@ public final class CqServiceImpl implements CqService {
/**
* Server side method. Closes non-durable CQs for the given client proxy id.
- *
- * @param clientProxyId
- * @throws CqException
*/
@Override
public void closeNonDurableClientCqs(ClientProxyMembershipID clientProxyId) throws CqException {
@@ -983,15 +810,10 @@ public final class CqServiceImpl implements CqService {
if (!cQuery.isDurable()) {
cQuery.close(false);
}
- } catch (QueryException qe) {
- if (isDebugEnabled) {
- logger.debug("Failed to close the CQ, CqName : {} Error : {}", cQuery.getName(),
- qe.getMessage());
- }
- } catch (CqClosedException cce) {
+ } catch (QueryException|CqClosedException e) {
if (isDebugEnabled) {
logger.debug("Failed to close the CQ, CqName : {} Error : {}", cQuery.getName(),
- cce.getMessage());
+ e.getMessage());
}
}
}
@@ -1028,6 +850,7 @@ public final class CqServiceImpl implements CqService {
return this.isRunning;
}
+ @Override
public void start() {
this.isRunning = true;
}
@@ -1035,9 +858,10 @@ public final class CqServiceImpl implements CqService {
/**
* @return Returns the serverCqName.
*/
+ @Override
public String constructServerCqName(String cqName, ClientProxyMembershipID clientProxyId) {
ConcurrentHashMap<ClientProxyMembershipID, String> cache = serverCqNameCache
- .computeIfAbsent(cqName, key -> new ConcurrentHashMap<ClientProxyMembershipID, String>());
+ .computeIfAbsent(cqName, key -> new ConcurrentHashMap<>());
String cName = cache.get(clientProxyId);
if (null == cName) {
@@ -1065,7 +889,7 @@ public final class CqServiceImpl implements CqService {
}
}
- /*
+ /**
* Checks if CQ with the given name already exists.
*
* @param cqName name of the CQ.
@@ -1073,17 +897,15 @@ public final class CqServiceImpl implements CqService {
* @return true if exists else false.
*/
private synchronized boolean isCqExists(String cqName) {
- boolean status = false;
HashMap<String, CqQueryImpl> cqMap = cqQueryMap;
- status = cqMap.containsKey(cqName);
- return status;
+ return cqMap.containsKey(cqName);
}
- /*
+ /**
* Generates a name for CQ. Checks if CQ with that name already exists if so generates a new
* cqName.
*/
- public synchronized String generateCqName() {
+ private synchronized String generateCqName() {
while (true) {
String cqName = CQ_NAME_PREFIX + (cqId++);
if (!isCqExists(cqName)) {
@@ -1092,18 +914,9 @@ public final class CqServiceImpl implements CqService {
}
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.geode.cache.query.internal.InternalCqService#dispatchCqListeners(java.util.HashMap,
- * int, java.lang.Object, java.lang.Object, byte[],
- * org.apache.geode.cache.client.internal.QueueManager, org.apache.geode.internal.cache.EventID)
- */
@Override
public void dispatchCqListeners(HashMap<String, Integer> cqs, int messageType, Object key,
Object value, byte[] delta, QueueManager qManager, EventID eventId) {
- ClientCQImpl cQuery = null;
Object[] fullValue = new Object[1];
Iterator<Map.Entry<String, Integer>> iter = cqs.entrySet().iterator();
String cqName = null;
@@ -1112,7 +925,7 @@ public final class CqServiceImpl implements CqService {
try {
Map.Entry<String, Integer> entry = iter.next();
cqName = entry.getKey();
- cQuery = (ClientCQImpl) this.getCq(cqName);
+ ClientCQImpl cQuery = (ClientCQImpl) this.getCq(cqName);
if (cQuery == null || (!cQuery.isRunning() && cQuery.getQueuedEvents() == null)) {
if (isDebugEnabled) {
@@ -1122,7 +935,7 @@ public final class CqServiceImpl implements CqService {
continue;
}
- Integer cqOp = (Integer) entry.getValue();
+ Integer cqOp = entry.getValue();
// If Region destroy event, close the cq.
if (cqOp.intValue() == MessageType.DESTROY_REGION) {
@@ -1136,8 +949,7 @@ public final class CqServiceImpl implements CqService {
}
// Construct CqEvent.
- CqEventImpl cqEvent = null;
- cqEvent = new CqEventImpl(cQuery, getOperation(messageType), getOperation(cqOp.intValue()),
+ CqEventImpl cqEvent = new CqEventImpl(cQuery, getOperation(messageType), getOperation(cqOp),
key, value, delta, qManager, eventId);
// Update statistics
@@ -1181,11 +993,11 @@ public final class CqServiceImpl implements CqService {
} // iteration.
}
- public void invokeListeners(String cqName, ClientCQImpl cQuery, CqEventImpl cqEvent) {
+ void invokeListeners(String cqName, ClientCQImpl cQuery, CqEventImpl cqEvent) {
invokeListeners(cqName, cQuery, cqEvent, null);
}
- public void invokeListeners(String cqName, ClientCQImpl cQuery, CqEventImpl cqEvent,
+ private void invokeListeners(String cqName, ClientCQImpl cQuery, CqEventImpl cqEvent,
Object[] fullValue) {
if (!cQuery.isRunning() || cQuery.getCqAttributes() == null) {
return;
@@ -1217,8 +1029,8 @@ public final class CqServiceImpl implements CqService {
}
Part result = (Part) GetEventValueOp
.executeOnPrimary(cqEvent.getQueueManager().getPool(), cqEvent.getEventID(), null);
- Object newVal = null;
- if (result == null || (newVal = result.getObject()) == null) {
+ Object newVal = result.getObject();
+ if (result == null || newVal == null) {
if (!cache.getCancelCriterion().isCancelInProgress()) {
Exception ex =
new Exception("Failed to retrieve full value from server for eventID "
@@ -1231,7 +1043,7 @@ public final class CqServiceImpl implements CqService {
}
}
} else {
- ((GemFireCacheImpl) this.cache).getCachePerfStats().incDeltaFullValuesRequested();
+ this.cache.getCachePerfStats().incDeltaFullValuesRequested();
cqEvent = new CqEventImpl(cQuery, cqEvent.getBaseOperation(),
cqEvent.getQueryOperation(), cqEvent.getKey(), newVal, cqEvent.getDeltaValue(),
cqEvent.getQueueManager(), cqEvent.getEventID());
@@ -1278,7 +1090,7 @@ public final class CqServiceImpl implements CqService {
}
}
- public void invokeCqConnectedListeners(String cqName, ClientCQImpl cQuery, boolean connected) {
+ private void invokeCqConnectedListeners(String cqName, ClientCQImpl cQuery, boolean connected) {
if (!cQuery.isRunning() || cQuery.getCqAttributes() == null) {
return;
}
@@ -1335,12 +1147,8 @@ public final class CqServiceImpl implements CqService {
}
}
-
/**
* Returns the Operation for the given EnumListenerEvent type.
- *
- * @param eventType
- * @return Operation
*/
private Operation getOperation(int eventType) {
Operation op = null;
@@ -1372,15 +1180,6 @@ public final class CqServiceImpl implements CqService {
return op;
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.geode.cache.query.internal.InternalCqService#processEvents(org.apache.geode.cache.
- * CacheEvent, org.apache.geode.distributed.internal.DistributionAdvisor.Profile,
- * org.apache.geode.distributed.internal.DistributionAdvisor.Profile[],
- * org.apache.geode.internal.cache.FilterRoutingInfo)
- */
@Override
public void processEvents(CacheEvent event, Profile localProfile, Profile[] profiles,
FilterRoutingInfo frInfo) throws CqException {
@@ -1421,7 +1220,7 @@ public final class CqServiceImpl implements CqService {
continue;
}
Map cqs = pf.getCqMap();
- HashMap<Long, Integer> cqInfo = new HashMap<Long, Integer>();
+ HashMap<Long, Integer> cqInfo = new HashMap<>();
Iterator cqIter = cqs.entrySet().iterator();
while (cqIter.hasNext()) {
Map.Entry cqEntry = (Map.Entry) cqIter.next();
@@ -1454,10 +1253,10 @@ public final class CqServiceImpl implements CqService {
private void processEntryEvent(CacheEvent event, Profile localProfile, Profile[] profiles,
FilterRoutingInfo frInfo) throws CqException {
final boolean isDebugEnabled = logger.isDebugEnabled();
- HashSet<Object> cqUnfilteredEventsSet_newValue = new HashSet<Object>();
- HashSet<Object> cqUnfilteredEventsSet_oldValue = new HashSet<Object>();
- boolean b_cqResults_newValue = false;
- boolean b_cqResults_oldValue = false;
+ HashSet<Object> cqUnfilteredEventsSet_newValue = new HashSet<>();
+ HashSet<Object> cqUnfilteredEventsSet_oldValue = new HashSet<>();
+ boolean b_cqResults_newValue;
+ boolean b_cqResults_oldValue;
boolean queryOldValue;
EntryEvent entryEvent = (EntryEvent) event;
Object eventKey = entryEvent.getKey();
@@ -1472,8 +1271,8 @@ public final class CqServiceImpl implements CqService {
|| event.getOperation().isDestroy() || event.getOperation().isInvalidate()
|| (event.getOperation().isCreate() && isDupEvent));
- HashMap<String, Integer> matchedCqs = new HashMap<String, Integer>();
- long executionStartTime = 0;
+ HashMap<String, Integer> matchedCqs = new HashMap<>();
+ long executionStartTime;
for (int i = -1; i < profiles.length; i++) {
CacheProfile cf;
if (i < 0) {
@@ -1498,7 +1297,6 @@ public final class CqServiceImpl implements CqService {
continue;
}
-
// Get new value. If its not retrieved.
if (cqUnfilteredEventsSet_newValue.isEmpty()
&& (event.getOperation().isCreate() || event.getOperation().isUpdate())) {
@@ -1509,7 +1307,7 @@ public final class CqServiceImpl implements CqService {
}
}
- HashMap<Long, Integer> cqInfo = new HashMap<Long, Integer>();
+ HashMap<Long, Integer> cqInfo = new HashMap<>();
Iterator cqIter = cqs.entrySet().iterator();
while (cqIter.hasNext()) {
@@ -1546,7 +1344,6 @@ public final class CqServiceImpl implements CqService {
}
} else {
boolean error = false;
- // synchronized (cQuery)
{
try {
synchronized (cQuery) {
@@ -1644,7 +1441,7 @@ public final class CqServiceImpl implements CqService {
cQuery.markAsDestroyedInCqResultKeys(eventKey);
}
}
- } // end synchronized(cQuery)
+ }
// Get the matching CQs if any.
// synchronized (this.matchingCqMap){
@@ -1663,7 +1460,6 @@ public final class CqServiceImpl implements CqService {
}
}
}
- // }
}
if (cqEvent != null && cQuery.isRunning()) {
@@ -1694,153 +1490,35 @@ public final class CqServiceImpl implements CqService {
} // iteration over Profiles.
}
-
- /*
- * public void processEvents (EnumListenerEvent operation, CacheEvent event, ClientUpdateMessage
- * clientMessage, CM<ClientProxyMembershipID, CM<CqQuery, Boolean>> clientIds) throws CqException
- * {
- *
- * //Is this a region event or an entry event if (event instanceof RegionEvent){
- * processRegionEvent(operation, event, clientMessage, clientIds); } else { processEntryEvent
- * (operation, event, clientMessage, clientIds); }
- *
- * }
- *
- * private void processRegionEvent(EnumListenerEvent operation, CacheEvent event,
- * ClientUpdateMessage clientMessage, CM<ClientProxyMembershipID, CM<CqQuery, Boolean>> clientIds)
- * throws CqException {
- *
- * if (logger.isDebugEnabled()) { logger.debug("Processing region event for region " +
- * ((LocalRegion)(event.getRegion())).getName()); } HashMap filteredCqs = new HashMap(); Integer
- * cqRegionEvent = generateCqRegionEvent(operation); Iterator it =
- * clientIds.entrySet().iterator(); while (it.hasNext()) { Map.Entry me = (Map.Entry)it.next();
- * ClientProxyMembershipID clientId = (ClientProxyMembershipID)me.getKey(); CM cqsToBooleans =
- * (CM)me.getValue(); if (cqsToBooleans == null) { continue; } Set<CqQuery> cqs =
- * cqsToBooleans.keySet(); if (cqs.isEmpty()) { continue; } filteredCqs.clear(); Iterator cqIt =
- * cqs.iterator(); while (cqIt.hasNext()) { CqQueryImpl cQuery = (CqQueryImpl)cqIt.next(); if
- * (operation == EnumListenerEvent.AFTER_REGION_DESTROY) { try { if (logger.isDebugEnabled()){
- * logger.debug("Closing CQ on region destroy event. CqName :" + cQuery.getName()); }
- * cQuery.close(false); } catch (Exception ex) {
- * logger.debug("Failed to Close CQ on region destroy. CqName :" + cQuery.getName(), ex); }
- *
- * } filteredCqs.put(cQuery.cqName, cqRegionEvent);
- * cQuery.getVsdStats().updateStats(cqRegionEvent);
- *
- * } if (!filteredCqs.isEmpty()){ ((ClientUpdateMessageImpl)clientMessage).addClientCqs( clientId,
- * filteredCqs); }
- *
- * }
- *
- * }
- *
- * private void processEntryEvent(EnumListenerEvent operation, CacheEvent event,
- * ClientUpdateMessage clientMessage, CM<ClientProxyMembershipID, CM<CqQuery, Boolean>> clientIds)
- * throws CqException { HashSet cqUnfilteredEventsSet_newValue = new HashSet(); HashSet
- * cqUnfilteredEventsSet_oldValue = new HashSet(); boolean b_cqResults_newValue = false; boolean
- * b_cqResults_oldValue = false; EntryEvent entryEvent = (EntryEvent)event; Object eventKey =
- * entryEvent.getKey(); if (operation == EnumListenerEvent.AFTER_CREATE || operation ==
- * EnumListenerEvent.AFTER_UPDATE) { if (entryEvent.getNewValue() != null) { //We have a new value
- * to run the query on cqUnfilteredEventsSet_newValue.clear();
- * cqUnfilteredEventsSet_newValue.add(entryEvent.getNewValue()); } }
- *
- * HashMap matchedCqs = new HashMap(); long executionStartTime = 0; Iterator it =
- * clientIds.entrySet().iterator(); while (it.hasNext()) { Map.Entry me = (Map.Entry)it.next();
- * ClientProxyMembershipID clientId = (ClientProxyMembershipID)me.getKey(); if
- * (logger.isDebugEnabled()) { logger.debug("Processing event for CQ filter, ClientId : " +
- * clientId); } CM cqsToBooleans = (CM)me.getValue(); if (cqsToBooleans == null) { continue; }
- * Set<CqQuery> cqs = cqsToBooleans.keySet(); if (cqs.isEmpty()) { continue; } HashMap filteredCqs
- * = new HashMap(); Iterator cqIt = cqs.iterator(); while (cqIt.hasNext()) { CqQueryImpl cQuery =
- * (CqQueryImpl)cqIt.next(); b_cqResults_newValue = false; b_cqResults_oldValue = false; if
- * (cQuery == null || !(cQuery.isRunning())){ continue; } String cqName =
- * cQuery.getServerCqName(); Integer cqEvent = null; if (matchedCqs.containsKey(cqName)) { if
- * (logger.isDebugEnabled()){ logger.
- * debug("Similar cq/query is already processed, getting the cq event-type from the matched cq.");
- * } cqEvent = (Integer)matchedCqs.get(cqName); } else { boolean error = false; boolean
- * hasSeenEvent = false; HashSet cqEventKeys = null; synchronized (cQuery) { try { // Apply query
- * on new value. if (!cqUnfilteredEventsSet_newValue.isEmpty()) { executionStartTime =
- * this.stats.startCqQueryExecution(); b_cqResults_newValue = evaluateQuery(cQuery, new Object[]
- * {cqUnfilteredEventsSet_newValue}); this.stats.endCqQueryExecution(executionStartTime); } //
- * Check if old value is cached, if not apply query on old value. if (cqToCqEventKeysMap != null)
- * { synchronized (cqToCqEventKeysMap) { if ((cqEventKeys =
- * (HashSet)cqToCqEventKeysMap.get(cqName)) != null) { hasSeenEvent =
- * cqEventKeys.contains(eventKey); } } } if (!hasSeenEvent) { // get the oldValue. // In case of
- * Update, destroy and invalidate. if (operation == EnumListenerEvent.AFTER_UPDATE || operation ==
- * EnumListenerEvent.AFTER_DESTROY || operation == EnumListenerEvent.AFTER_INVALIDATE) { if
- * (entryEvent.getOldValue() != null) { cqUnfilteredEventsSet_oldValue.clear();
- * cqUnfilteredEventsSet_oldValue.add(entryEvent.getOldValue()); // Apply query on old value.
- * executionStartTime = this.stats.startCqQueryExecution(); b_cqResults_oldValue =
- * evaluateQuery(cQuery, new Object[] {cqUnfilteredEventsSet_oldValue});
- * this.stats.endCqQueryExecution(executionStartTime); } } } } catch (Exception ex) { //Any
- * exception in running the query // should be caught here and buried //because this code is
- * running inline with the //message processing code and we don't want to //kill that thread error
- * = true; logger.info( LocalizedStrings.
- * CqService_ERROR_WHILE_PROCESSING_CQ_ON_THE_EVENT_KEY_0_CQNAME_1_CLIENTID_2_ERROR_3, new
- * Object[] { ((EntryEvent)event).getKey(), cQuery.getName(), clientId,
- * ex.getLocalizedMessage()}); }
- *
- * if (error) { cqEvent = Integer.valueOf(MessageType.EXCEPTION); } else { if
- * (b_cqResults_newValue) { if (hasSeenEvent || b_cqResults_oldValue) { cqEvent =
- * Integer.valueOf(MessageType.LOCAL_UPDATE); } else { cqEvent =
- * Integer.valueOf(MessageType.LOCAL_CREATE); } // If its create and caching is enabled, cache the
- * key for this CQ. if (!hasSeenEvent && cqEventKeys != null) { cqEventKeys.add(eventKey); } }
- * else if (hasSeenEvent || (b_cqResults_oldValue)) { // Base invalidate operation is treated as
- * destroy. // When the invalidate comes through, the entry will no longer satisfy // the query
- * and will need to be deleted. cqEvent = Integer.valueOf(MessageType.LOCAL_DESTROY); // If
- * caching is enabled, remove this event's key from the cache. if (hasSeenEvent && cqEventKeys !=
- * null) { cqEventKeys.remove(eventKey); } } }
- *
- * } //end synchronized(cQuery)
- *
- * // Get the matching CQs if any. synchronized (this.matchingCqMap){ String query =
- * cQuery.getQueryString(); ArrayList matchingCqs = (ArrayList)matchingCqMap.get(query); if
- * (matchingCqs != null) { Iterator iter = matchingCqs.iterator(); while (iter.hasNext()) { String
- * matchingCqName = (String)iter.next(); if (!matchingCqName.equals(cqName)){
- * matchedCqs.put(matchingCqName, cqEvent); } } } }
- *
- * }
- *
- * if (cqEvent != null){ if (logger.isDebugEnabled()) {
- * logger.debug("Event is added for the CQ, CqName (clientside): " + cQuery.cqName +
- * " With CQ Op : " + cqEvent + " for Client : " + clientId); } filteredCqs.put(cQuery.cqName,
- * cqEvent); cQuery.getVsdStats().updateStats(cqEvent); }
- *
- * } // iteration over cqsToBooleans.keySet() if (!filteredCqs.isEmpty()){
- * logger.debug("Adding event map for client : "+clientId +
- * " with event map size : "+filteredCqs.size());
- * ((ClientUpdateMessageImpl)clientMessage).addClientCqs(clientId, filteredCqs); } } // iteration
- * over clientIds.entrySet() }
- */
-
private Integer generateCqRegionEvent(CacheEvent event) {
Integer cqEvent = null;
if (event.getOperation().isRegionDestroy()) {
- cqEvent = Integer.valueOf(MessageType.DESTROY_REGION);
+ cqEvent = MessageType.DESTROY_REGION;
} else if (event.getOperation().isRegionInvalidate()) {
- cqEvent = Integer.valueOf(MessageType.INVALIDATE_REGION);
+ cqEvent = MessageType.INVALIDATE_REGION;
} else if (event.getOperation().isClear()) {
- cqEvent = Integer.valueOf(MessageType.CLEAR_REGION);
+ cqEvent = MessageType.CLEAR_REGION;
}
return cqEvent;
}
-
/**
* Manages the CQs created for the base region. This is managed here, instead of on the base
* region; since the cq could be created on the base region, before base region is created (using
* newCq()).
*/
- public void addToBaseRegionToCqNameMap(String regionName, String cqName) {
+ private void addToBaseRegionToCqNameMap(String regionName, String cqName) {
synchronized (this.baseRegionToCqNameMap) {
ArrayList<String> cqs = this.baseRegionToCqNameMap.get(regionName);
if (cqs == null) {
- cqs = new ArrayList<String>();
+ cqs = new ArrayList<>();
}
cqs.add(cqName);
this.baseRegionToCqNameMap.put(regionName, cqs);
}
}
- public void removeFromBaseRegionToCqNameMap(String regionName, String cqName) {
+ void removeFromBaseRegionToCqNameMap(String regionName, String cqName) {
synchronized (this.baseRegionToCqNameMap) {
ArrayList<String> cqs = this.baseRegionToCqNameMap.get(regionName);
if (cqs != null) {
@@ -1864,37 +1542,12 @@ public final class CqServiceImpl implements CqService {
}
/**
- * Removes this CQ from CQ event Cache map. This disables the caching events for this CQ.
- *
- * @param cqName
- */
- /*
- * synchronized public void removeCQFromCaching(String cqName){ if (cqToCqEventKeysMap != null) {
- * // Take a lock on CqQuery object. In processEvents the maps are // handled under CqQuery
- * object. if (cqToCqEventKeysMap != null){ synchronized (cqToCqEventKeysMap) {
- * cqToCqEventKeysMap.remove(cqName); } } } }
- */
-
- /**
- * Returns the CQ event cache map.
- *
- * @return HashMap cqToCqEventKeysMap
- *
- * Caller must synchronize on the returned value in order to inspect.
- */
- /*
- * public HashMap getCqToCqEventKeysMap(){ return cqToCqEventKeysMap; }
- */
-
- /**
* Adds the query from the given CQ to the matched CQ map.
- *
- * @param cq
*/
- public void addToMatchingCqMap(CqQueryImpl cq) {
+ void addToMatchingCqMap(CqQueryImpl cq) {
synchronized (this.matchingCqMap) {
String cqQuery = cq.getQueryString();
- Set<String> matchingCQs = null;
+ Set<String> matchingCQs;
if (!matchingCqMap.containsKey(cqQuery)) {
matchingCQs = Collections.newSetFromMap(new ConcurrentHashMap());
matchingCqMap.put(cqQuery, matchingCQs);
@@ -1912,10 +1565,8 @@ public final class CqServiceImpl implements CqService {
/**
* Removes the query from the given CQ from the matched CQ map.
- *
- * @param cq
*/
- public void removeFromMatchingCqMap(CqQueryImpl cq) {
+ private void removeFromMatchingCqMap(CqQueryImpl cq) {
synchronized (this.matchingCqMap) {
String cqQuery = cq.getQueryString();
if (matchingCqMap.containsKey(cqQuery)) {
@@ -1947,10 +1598,6 @@ public final class CqServiceImpl implements CqService {
* Applies the query on the event. This method takes care of the performance related changed done
* to improve the CQ-query performance. When CQ-query is executed first time, it saves the query
* related information in the execution context and uses that info in later executions.
- *
- * @param cQuery
- * @param event
- * @return boolean
*/
private boolean evaluateQuery(CqQueryImpl cQuery, Object[] event) throws Exception {
ExecutionContext execContext = cQuery.getQueryExecutionContext();
@@ -1983,19 +1630,6 @@ public final class CqServiceImpl implements CqService {
return this.cqNameToUserAttributesMap.get(cqName);
}
- // public static void memberLeft(String poolName) {
- // if (cqServiceSingleton != null && !cqServiceSingleton.isServer()) {
- // cqServiceSingleton.sendMemberDisconnectedMessageToCqs(poolName);
- // }
- // }
- //
- // public static void memberCrashed(String poolName) {
- // if (cqServiceSingleton != null && !cqServiceSingleton.isServer()) {
- // cqServiceSingleton.sendMemberDisconnectedMessageToCqs(poolName);
- // }
- // }
- //
-
@Override
public void cqsDisconnected(Pool pool) {
invokeCqsConnected(pool, false);
@@ -2014,7 +1648,7 @@ public final class CqServiceImpl implements CqService {
// Check to see if we are already connected/disconnected.
// If state has not changed, do not invoke another connected/disconnected
synchronized (cqPoolsConnected) {
- // don't repeatily send same connect/disconnect message to cq's on repeated fails of
+ // don't repeatedly send same connect/disconnect message to cq's on repeated fails of
// RedundancySatisfier
if (cqPoolsConnected.containsKey(poolName) && connected == cqPoolsConnected.get(poolName)) {
return;
@@ -2059,13 +1693,6 @@ public final class CqServiceImpl implements CqService {
SystemFailure.checkFailure();
logger.warn(LocalizedMessage
.create(LocalizedStrings.CqService_ERROR_SENDING_CQ_CONNECTION_STATUS, cqName), t);
-
- if (t instanceof VirtualMachineError) {
- logger.warn(LocalizedMessage.create(
- LocalizedStrings.CqService_VIRTUALMACHINEERROR_PROCESSING_CQLISTENER_FOR_CQ_0,
- cqName), t);
- return;
- }
}
}
}
@@ -2075,7 +1702,4 @@ public final class CqServiceImpl implements CqService {
public List<String> getAllDurableCqsFromServer(InternalPool pool) {
return new ServerCQProxyImpl(pool).getAllDurableCqsFromServer();
}
-
-
}
-
http://git-wip-us.apache.org/repos/asf/geode/blob/bc39e973/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceStatisticsImpl.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceStatisticsImpl.java b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceStatisticsImpl.java
index ba71143..a675162 100644
--- a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceStatisticsImpl.java
+++ b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceStatisticsImpl.java
@@ -14,11 +14,9 @@
*/
package org.apache.geode.cache.query.internal.cq;
-import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.query.CqServiceStatistics;
import org.apache.geode.cache.query.CqQuery;
import org.apache.geode.cache.query.internal.DefaultQueryService;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
/**
* Provides statistical information about CqService.
@@ -26,24 +24,22 @@ import org.apache.geode.internal.cache.GemFireCacheImpl;
* @since GemFire 5.5
*/
public class CqServiceStatisticsImpl implements CqServiceStatistics {
+
private CqServiceImpl cqService;
- // private long activeCqs;
- // private long stoppedCqs;
- // private long closedCqs;
- // private long createdCqs;
/**
* Constructor for CqStatisticsImpl
*
* @param cqs - CqService
*/
- public CqServiceStatisticsImpl(CqServiceImpl cqs) {
+ CqServiceStatisticsImpl(CqServiceImpl cqs) {
cqService = cqs;
}
/**
* Returns the number of CQs currently executing
*/
+ @Override
public long numCqsActive() {
return this.cqService.getCqServiceVsdStats().getNumCqsActive();
}
@@ -53,6 +49,7 @@ public class CqServiceStatisticsImpl implements CqServiceStatistics {
*
* @return long number of cqs created.
*/
+ @Override
public long numCqsCreated() {
return this.cqService.getCqServiceVsdStats().getNumCqsCreated();
}
@@ -60,6 +57,7 @@ public class CqServiceStatisticsImpl implements CqServiceStatistics {
/**
* Returns number of Cqs that are closed.
*/
+ @Override
public long numCqsClosed() {
return this.cqService.getCqServiceVsdStats().getNumCqsClosed();
}
@@ -67,6 +65,7 @@ public class CqServiceStatisticsImpl implements CqServiceStatistics {
/**
* Returns number of Cqs that are stopped.
*/
+ @Override
public long numCqsStopped() {
return this.cqService.getCqServiceVsdStats().getNumCqsStopped();
}
@@ -74,20 +73,18 @@ public class CqServiceStatisticsImpl implements CqServiceStatistics {
/**
* Returns number of CQs created from the client.
*/
+ @Override
public long numCqsOnClient() {
return this.cqService.getCqServiceVsdStats().getNumCqsOnClient();
}
/**
* Returns the number of CQs (active + suspended) on the given region.
- *
- * @param regionName
*/
+ @Override
public long numCqsOnRegion(String regionName) {
-
DefaultQueryService queryService =
- (DefaultQueryService) ((GemFireCacheImpl) CacheFactory.getAnyInstance())
- .getLocalQueryService();
+ (DefaultQueryService) cqService.getInternalCache().getLocalQueryService();
try {
CqQuery[] cqs = queryService.getCqs(regionName);