You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/01/28 19:13:51 UTC
[49/70] [partial] incubator-geode git commit: WAN and CQ code drop
under the Pivotal SGA
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/main/java/com/gemstone/gemfire/cache/query/internal/cq/CqServiceImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-cq/src/main/java/com/gemstone/gemfire/cache/query/internal/cq/CqServiceImpl.java b/gemfire-cq/src/main/java/com/gemstone/gemfire/cache/query/internal/cq/CqServiceImpl.java
new file mode 100644
index 0000000..7d4dbb0
--- /dev/null
+++ b/gemfire-cq/src/main/java/com/gemstone/gemfire/cache/query/internal/cq/CqServiceImpl.java
@@ -0,0 +1,2079 @@
+/*=========================================================================
+ * Copyright Copyright (c) 2000-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ * $Id: DefaultQueryService.java,v 1.2 2005/02/01 17:19:20 vaibhav Exp $
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.query.internal.cq;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.InvalidDeltaException;
+import com.gemstone.gemfire.StatisticsFactory;
+import com.gemstone.gemfire.SystemFailure;
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheEvent;
+import com.gemstone.gemfire.cache.CacheLoaderException;
+import com.gemstone.gemfire.cache.EntryEvent;
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.cache.RegionEvent;
+import com.gemstone.gemfire.cache.TimeoutException;
+import com.gemstone.gemfire.cache.client.Pool;
+import com.gemstone.gemfire.cache.client.internal.GetEventValueOp;
+import com.gemstone.gemfire.cache.client.internal.InternalPool;
+import com.gemstone.gemfire.cache.client.internal.QueueManager;
+import com.gemstone.gemfire.cache.client.internal.ServerCQProxyImpl;
+import com.gemstone.gemfire.cache.client.internal.UserAttributes;
+import com.gemstone.gemfire.cache.query.CqAttributes;
+import com.gemstone.gemfire.cache.query.CqClosedException;
+import com.gemstone.gemfire.cache.query.CqException;
+import com.gemstone.gemfire.cache.query.CqExistsException;
+import com.gemstone.gemfire.cache.query.CqListener;
+import com.gemstone.gemfire.cache.query.CqQuery;
+import com.gemstone.gemfire.cache.query.CqServiceStatistics;
+import com.gemstone.gemfire.cache.query.CqStatusListener;
+import com.gemstone.gemfire.cache.query.QueryException;
+import com.gemstone.gemfire.cache.query.QueryInvalidException;
+import com.gemstone.gemfire.cache.query.RegionNotFoundException;
+import com.gemstone.gemfire.cache.query.SelectResults;
+import com.gemstone.gemfire.cache.query.internal.CompiledSelect;
+import com.gemstone.gemfire.cache.query.internal.CqQueryVsdStats;
+import com.gemstone.gemfire.cache.query.internal.CqStateImpl;
+import com.gemstone.gemfire.cache.query.internal.DefaultQuery;
+import com.gemstone.gemfire.cache.query.internal.ExecutionContext;
+import com.gemstone.gemfire.distributed.internal.DistributionAdvisor.Profile;
+import com.gemstone.gemfire.internal.cache.CacheDistributionAdvisor.CacheProfile;
+import com.gemstone.gemfire.internal.cache.EntryEventImpl;
+import com.gemstone.gemfire.internal.cache.EventID;
+import com.gemstone.gemfire.internal.cache.FilterProfile;
+import com.gemstone.gemfire.internal.cache.FilterRoutingInfo;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifier;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxy;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import com.gemstone.gemfire.i18n.StringId;
+
+/**
+ * @author Rao Madduri
+ * @since 5.5
+ *
+ * Implements the CqService functionality.
+ *
+ */
+/**
+ * @author agingade
+ *
+ */
+public final class CqServiceImpl implements CqService {
+ private static final Logger logger = LogService.getLogger();
+
+ /**
+ * System property to evaluate the query even though the initial results are not required
+ * when cq is executed using the execute() method.
+ */
+ public static boolean EXECUTE_QUERY_DURING_INIT =
+ Boolean.valueOf(System.getProperty("gemfire.cq.EXECUTE_QUERY_DURING_INIT", "true")).booleanValue();
+
+ private static final String CQ_NAME_PREFIX = "GfCq";
+
+ private final Cache cache;
+
+ /**
+ * Manages cq pools to determine if a status of connect or disconnect needs to be sent out
+ */
+ private final HashMap<String, Boolean> cqPoolsConnected = new HashMap<String, Boolean>();
+
+
+ /** Manages CQ objects. uses serverCqName as key and CqQueryImpl as value
+ * @guarded.By cqQueryMapLock
+ */
+ private volatile HashMap<String, CqQueryImpl> cqQueryMap = new HashMap<String, CqQueryImpl>();
+ private final Object cqQueryMapLock = new Object();
+
+ private volatile boolean isRunning = false;
+
+ /**
+ * Used by client when multiuser-authentication is true.
+ */
+ private final HashMap<String, UserAttributes> cqNameToUserAttributesMap = new HashMap<String, UserAttributes>();
+
+ // private boolean isServer = true;
+
+ /*
+ // Map to manage CQ to satisfied CQ events (keys) for optimizing updates.
+ private final HashMap cqToCqEventKeysMap =
+ CqService.MAINTAIN_KEYS ? new HashMap() : null;
+ */
+
+ // Map to manage the similar CQs (having same query - performance optimization).
+ // With query as key and Set of CQs as values.
+ private final HashMap<String, HashSet<String>> matchingCqMap;
+
+ // CQ Service statistics
+ public final CqServiceStatisticsImpl cqServiceStats;
+ public final CqServiceVsdStats stats;
+
+ // CQ identifier, also used in auto generated CQ names
+ private volatile long cqId = 1;
+
+ /**
+ * Used to synchronize access to CQs in the repository
+ */
+ final Object cqSync = new Object();
+
+ /* This is to manage region to CQs map, client side book keeping. */
+ private HashMap<String, ArrayList<String>> baseRegionToCqNameMap = new HashMap<String, ArrayList<String>>();
+
+
+ /**
+ * Constructor.
+ * @param c The cache used for the service
+ */
+ public CqServiceImpl(final Cache c) {
+ if (c == null) {
+ throw new IllegalStateException(LocalizedStrings.CqService_CACHE_IS_NULL.toLocalizedString());
+ }
+ GemFireCacheImpl gfc = (GemFireCacheImpl) c;
+ gfc.getCancelCriterion().checkCancelInProgress(null);
+
+ this.cache = gfc;
+
+
+ // Initialize the Map which maintains the matching cqs.
+ this.matchingCqMap = new HashMap<String, HashSet<String>>();
+
+ // Initialize the VSD statistics
+ StatisticsFactory factory = cache.getDistributedSystem();
+ this.stats = new CqServiceVsdStats(factory);
+ this.cqServiceStats = new CqServiceStatisticsImpl(this);
+
+// final LoggingThreadGroup group =
+// LoggingThreadGroup.createThreadGroup("CqExecutor Threads", logger);
+
+ //if (this.cache.getCacheServers().isEmpty()) {
+ // isServer = false;
+ //}
+ }
+
+ /**
+ * Returns the cache associated with the cqService.
+ */
+ public Cache getCache() {
+ return this.cache;
+ }
+
+ /* (non-Javadoc)
+ * @see com.gemstone.gemfire.cache.query.internal.InternalCqService#newCq(java.lang.String, java.lang.String, com.gemstone.gemfire.cache.query.CqAttributes, com.gemstone.gemfire.cache.client.internal.ServerCQProxy, boolean)
+ */
+ @Override
+ public synchronized ClientCQ newCq(String cqName, String queryString, CqAttributes cqAttributes, InternalPool pool, boolean isDurable)
+ throws QueryInvalidException, CqExistsException, CqException {
+ if (queryString == null) {
+ throw new IllegalArgumentException(LocalizedStrings.CqService_NULL_ARGUMENT_0.toLocalizedString("queryString"));
+
+ } else if (cqAttributes == null ) {
+ throw new IllegalArgumentException(LocalizedStrings.CqService_NULL_ARGUMENT_0.toLocalizedString("cqAttribute"));
+ }
+
+ if (isServer()) {
+ throw new IllegalStateException(
+ LocalizedStrings.CqService_CLIENT_SIDE_NEWCQ_METHOD_INVOCATION_ON_SERVER.toLocalizedString());
+ }
+
+ // Check if the given cq already exists.
+ if (cqName != null && isCqExists(cqName)) {
+ throw new CqExistsException(
+ LocalizedStrings.
+ CqService_CQ_WITH_THE_GIVEN_NAME_ALREADY_EXISTS_CQNAME_0
+ .toLocalizedString(cqName));
+ }
+
+
+ ServerCQProxyImpl serverProxy= pool == null ? null : new ServerCQProxyImpl(pool);
+ ClientCQImpl cQuery = new ClientCQImpl(this, cqName, queryString, cqAttributes, serverProxy, isDurable);
+ cQuery.updateCqCreateStats();
+
+ //cQuery.initCq();
+
+ // Check if query is valid.
+ cQuery.validateCq();
+
+ // Add cq into meta region.
+ // Check if Name needs to be generated.
+ if (cqName == null) {
+ // in the case of cqname internally generated, the CqExistsException needs
+ // to be taken care internally.
+ while(true) {
+ cQuery.setName(generateCqName());
+ try {
+ addToCqMap(cQuery);
+ } catch (CqExistsException ex) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Got CqExistsException while intializing cq : {} Error : {}", cQuery.getName(), ex.getMessage());
+ }
+ continue;
+ }
+ break;
+ }
+ } else {
+ addToCqMap(cQuery);
+ }
+
+ this.addToBaseRegionToCqNameMap(cQuery.getBaseRegionName(), cQuery.getServerCqName());
+
+ return cQuery;
+ }
+
+
+ /**
+ * Executes the given CqQuery, if the CqQuery for that name is not there
+ * it registers the one and executes. This is called on the Server.
+ * @param cqName
+ * @param queryString
+ * @param cqState
+ * @param clientProxyId
+ * @param ccn
+ * @param manageEmptyRegions whether to update the 6.1 emptyRegions map held in the CCN
+ * @param regionDataPolicy the data policy of the region associated with the
+ * query. This is only needed if manageEmptyRegions is true.
+ * @param emptyRegionsMap map of empty regions.
+ * @throws IllegalStateException if this is called at client side.
+ * @throws CqException
+ */
+ @Override
+ public synchronized ServerCQ executeCq(String cqName, String queryString,
+ int cqState, ClientProxyMembershipID clientProxyId,
+ CacheClientNotifier ccn, boolean isDurable, boolean manageEmptyRegions,
+ int regionDataPolicy, Map emptyRegionsMap)
+ throws CqException, RegionNotFoundException, CqClosedException {
+ if (!isServer()) {
+ throw new IllegalStateException(
+ LocalizedStrings.CqService_SERVER_SIDE_EXECUTECQ_METHOD_IS_CALLED_ON_CLIENT_CQNAME_0
+ .toLocalizedString(cqName));
+ }
+
+ String serverCqName = constructServerCqName(cqName, clientProxyId);
+ ServerCQImpl cQuery = null;
+
+ // If this CQ is not yet registered in Server, register CQ.
+ if (!isCqExists(serverCqName)) {
+ cQuery = new ServerCQImpl(this, cqName, queryString, isDurable, constructServerCqName(cqName, clientProxyId));
+
+ try {
+ cQuery.registerCq(clientProxyId, ccn, cqState);
+ if (manageEmptyRegions) { // new in 6.1
+ if (emptyRegionsMap != null && emptyRegionsMap.containsKey(cQuery.getBaseRegionName())){
+ regionDataPolicy = 0;
+ }
+ ccn.updateMapOfEmptyRegions(ccn.getClientProxy(clientProxyId, true).getRegionsWithEmptyDataPolicy(),
+ cQuery.getBaseRegionName(), regionDataPolicy);
+ }
+ } catch (CqException cqe) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.CqService_EXCEPTION_WHILE_REGISTERING_CQ_ON_SERVER_CQNAME___0, cQuery.getName()));
+ cQuery = null;
+ throw cqe;
+ }
+
+ } else {
+ cQuery = (ServerCQImpl)getCq(serverCqName);
+ resumeCQ(cqState, cQuery);
+ }
+
+
+ if (logger.isDebugEnabled() ) {
+ logger.debug("Successfully created CQ on the server. CqName : {}", cQuery.getName());
+ }
+ return cQuery;
+ }
+
+ public void resumeCQ(int cqState, ServerCQ cQuery) {
+ // Initialize the state of CQ.
+ if(((CqStateImpl)cQuery.getState()).getState() != cqState) {
+ cQuery.setCqState(cqState);
+ // addToCqEventKeysMap(cQuery);
+ // Send state change info to peers.
+ cQuery.getCqBaseRegion().getFilterProfile().setCqState(cQuery);
+ }
+ //If we are going to set the state to running, we need to check to see if it matches any other cq
+ if (cqState == CqStateImpl.RUNNING) {
+ // Add to the matchedCqMap.
+ addToMatchingCqMap((CqQueryImpl) cQuery);
+ }
+ }
+
+ /*
+ public void addToCqEventKeysMap(CqQuery cq){
+ if (cqToCqEventKeysMap != null) {
+ synchronized (cqToCqEventKeysMap){
+ String serverCqName = ((CqQueryImpl)cq).getServerCqName();
+ if (!cqToCqEventKeysMap.containsKey(serverCqName)){
+ cqToCqEventKeysMap.put(serverCqName, new HashSet());
+ if (_logger.isDebugEnabled()) {
+ _logger.debug("CQ Event key maintenance for CQ, CqName: " +
+ serverCqName + " is Enabled." + " key maintenance map size is: " +
+ cqToCqEventKeysMap.size());
+ }
+ }
+ } // synchronized
+ }
+ }
+ */
+
+ public boolean hasCq(){
+ HashMap<String, CqQueryImpl> cqMap = cqQueryMap;
+ return (cqMap.size() > 0);
+ }
+
+
+ /**
+ * Adds the given CQ and cqQuery object into the CQ map.
+ */
+ public void addToCqMap(CqQueryImpl cq) throws CqExistsException, CqException {
+ // On server side cqName will be server side cqName.
+ String sCqName = cq.getServerCqName();
+ if (logger.isDebugEnabled()) {
+ logger.debug("Adding to CQ Repository. CqName : {} ServerCqName : {}", cq.getName(), sCqName);
+ }
+ HashMap<String, CqQueryImpl> cqMap = cqQueryMap;
+ if (cqMap.containsKey(sCqName)) {
+ throw new CqExistsException(
+ LocalizedStrings.CqService_A_CQ_WITH_THE_GIVEN_NAME_0_ALREADY_EXISTS.toLocalizedString(sCqName));
+ }
+ synchronized (cqQueryMapLock) {
+ HashMap<String, CqQueryImpl> tmpCqQueryMap = new HashMap<String, CqQueryImpl>(cqQueryMap);
+ try {
+ tmpCqQueryMap.put(sCqName, cq);
+ }catch (Exception ex){
+ StringId errMsg = LocalizedStrings.CqQueryImpl_FAILED_TO_STORE_CONTINUOUS_QUERY_IN_THE_REPOSITORY_CQNAME_0_1;
+ Object[] errMsgArgs = new Object[] {sCqName, ex.getLocalizedMessage()};
+ String s = errMsg.toLocalizedString(errMsgArgs);
+ logger.error(s);
+ throw new CqException(s, ex);
+ }
+ UserAttributes attributes = UserAttributes.userAttributes.get();
+ if (attributes != null) {
+ this.cqNameToUserAttributesMap.put(cq.getName(), attributes);
+ }
+ cqQueryMap = tmpCqQueryMap;
+ }
+ }
+
+ /**
+ * Removes given CQ from the cqMap..
+ */
+ public void removeCq(String cqName) {
+ // On server side cqName will be server side cqName.
+ synchronized (cqQueryMapLock) {
+ HashMap<String, CqQueryImpl> tmpCqQueryMap = new HashMap<String, CqQueryImpl>(cqQueryMap);
+ tmpCqQueryMap.remove(cqName);
+ this.cqNameToUserAttributesMap.remove(cqName);
+ cqQueryMap = tmpCqQueryMap;
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see com.gemstone.gemfire.cache.query.internal.InternalCqService#getClientCqFromServer(com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID, java.lang.String)
+ */
+ @Override
+ public CqQuery getClientCqFromServer(ClientProxyMembershipID clientProxyId, String clientCqName) {
+ // On server side cqName will be server side cqName.
+ HashMap<String, CqQueryImpl> cqMap = cqQueryMap;
+ return (CqQuery)cqMap.get(this.constructServerCqName(clientCqName, clientProxyId));
+ }
+
+ /* (non-Javadoc)
+ * @see com.gemstone.gemfire.cache.query.internal.InternalCqService#getCq(java.lang.String)
+ */
+ @Override
+ public InternalCqQuery getCq(String cqName) {
+ // On server side cqName will be server side cqName.
+ return (InternalCqQuery)cqQueryMap.get(cqName);
+ }
+
+ /**
+ * Clears the CQ Query Map.
+ */
+ public void clearCqQueryMap() {
+ // On server side cqName will be server side cqName.
+ synchronized (cqQueryMapLock) {
+ cqQueryMap = new HashMap<String, CqQueryImpl>();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see com.gemstone.gemfire.cache.query.internal.InternalCqService#getAllCqs()
+ */
+ @Override
+ public Collection<? extends InternalCqQuery> getAllCqs(){
+ return cqQueryMap.values();
+ }
+
+ /* (non-Javadoc)
+ * @see com.gemstone.gemfire.cache.query.internal.InternalCqService#getAllCqs(java.lang.String)
+ */
+ @Override
+ public Collection<? extends InternalCqQuery> getAllCqs(final String regionName)throws CqException{
+ if (regionName == null){
+ throw new IllegalArgumentException(LocalizedStrings.CqService_NULL_ARGUMENT_0.toLocalizedString("regionName"));
+ }
+
+ String[] cqNames = null;
+
+ synchronized(this.baseRegionToCqNameMap){
+ ArrayList<String> cqs = this.baseRegionToCqNameMap.get(regionName);
+ if(cqs == null) {
+ return null;
+ }
+ cqNames = new String[cqs.size()];
+ cqs.toArray(cqNames);
+ }
+
+ ArrayList<InternalCqQuery> cQueryList = new ArrayList<InternalCqQuery>();
+ for(int cqCnt=0; cqCnt < cqNames.length; cqCnt++){
+ InternalCqQuery cq = getCq(cqNames[cqCnt]);
+ if (cq != null){
+ cQueryList.add(cq);
+ }
+ }
+
+ return cQueryList;
+ }
+
+ /* (non-Javadoc)
+ * @see com.gemstone.gemfire.cache.query.internal.InternalCqService#executeAllClientCqs()
+ */
+ @Override
+ public synchronized void executeAllClientCqs()throws CqException{
+ executeCqs(this.getAllCqs());
+ }
+
+ /* (non-Javadoc)
+ * @see com.gemstone.gemfire.cache.query.internal.InternalCqService#executeAllRegionCqs(java.lang.String)
+ */
+ @Override
+ public synchronized void executeAllRegionCqs(final String regionName)throws CqException{
+ executeCqs(getAllCqs(regionName));
+ }
+
+ /* (non-Javadoc)
+ * @see com.gemstone.gemfire.cache.query.internal.InternalCqService#executeCqs(com.gemstone.gemfire.cache.query.CqQuery[])
+ */
+ @Override
+ public synchronized void executeCqs(Collection<? extends InternalCqQuery> cqs)throws CqException{
+ if(cqs == null) {
+ return;
+ }
+ String cqName = null;
+ for (InternalCqQuery internalCq : cqs) {
+ CqQuery cq = (CqQuery) internalCq;
+ if (!cq.isClosed() && cq.isStopped()) {
+ try {
+ cqName = cq.getName();
+ cq.execute();
+ } catch (QueryException qe) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Failed to execute the CQ, CqName : {} Error : {}", cqName, qe.getMessage());
+ }
+ } catch (CqClosedException cce){
+ if (logger.isDebugEnabled()) {
+ logger.debug("Failed to execute the CQ, CqName : {} Error : {}", cqName, cce.getMessage());
+ }
+ }
+ }
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see com.gemstone.gemfire.cache.query.internal.InternalCqService#stopAllClientCqs()
+ */
+ @Override
+ public synchronized void stopAllClientCqs()throws CqException{
+ stopCqs(this.getAllCqs());
+ }
+
+ /* (non-Javadoc)
+ * @see com.gemstone.gemfire.cache.query.internal.InternalCqService#stopAllRegionCqs(java.lang.String)
+ */
+ @Override
+ public synchronized void stopAllRegionCqs(final String regionName)throws CqException{
+ stopCqs(this.getAllCqs(regionName));
+ }
+
+ /* (non-Javadoc)
+ * @see com.gemstone.gemfire.cache.query.internal.InternalCqService#stopCqs(com.gemstone.gemfire.cache.query.CqQuery[])
+ */
+ @Override
+ public synchronized void stopCqs(Collection<? extends InternalCqQuery> cqs)throws CqException{
+ final boolean isDebugEnabled = logger.isDebugEnabled();
+ if (isDebugEnabled) {
+ if (cqs == null) {
+ logger.debug("CqService.stopCqs cqs : null");
+ } else {
+ logger.debug("CqService.stopCqs cqs : ({} queries)", cqs.size());
+ }
+ }
+
+ if(cqs == null) {
+ return;
+ }
+
+ String cqName = null;
+ for (InternalCqQuery internalCqQuery : cqs) {
+ CqQuery cq = (CqQuery) internalCqQuery;
+ if (!cq.isClosed() && cq.isRunning()) {
+ try {
+ cqName = cq.getName();
+ cq.stop();
+ } catch (QueryException qe) {
+ if (isDebugEnabled){
+ logger.debug("Failed to stop the CQ, CqName : {} Error : {}", cqName, qe.getMessage());
+ }
+ } catch (CqClosedException cce){
+ if (isDebugEnabled){
+ logger.debug("Failed to stop the CQ, CqName : {} Error : {}", cqName, cce.getMessage());
+ }
+ }
+ }
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see com.gemstone.gemfire.cache.query.internal.InternalCqService#closeCqs(java.lang.String)
+ */
+ @Override
+ public void closeCqs(final String regionName)throws CqException{
+ Collection<? extends InternalCqQuery> cqs = this.getAllCqs(regionName);
+ if(cqs != null) {
+ String cqName = null;
+ for (InternalCqQuery cq: cqs) {
+ try {
+ cqName = cq.getName();
+
+ if(isServer()) {
+ // invoked on the server
+ cq.close(false);
+ } else {
+ // @todo grid: if regionName has a pool check its keepAlive
+ boolean keepAlive = ((GemFireCacheImpl)this.cache).keepDurableSubscriptionsAlive();
+ if(cq.isDurable() && keepAlive){
+ logger.warn(LocalizedMessage.create(LocalizedStrings.CqService_NOT_SENDING_CQ_CLOSE_TO_THE_SERVER_AS_IT_IS_A_DURABLE_CQ));
+ cq.close(false);
+ }
+ else {
+ cq.close(true);
+ }
+ }
+
+ } catch (QueryException qe) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Failed to close the CQ, CqName : {} Error : {}", cqName, qe.getMessage());
+ }
+ } catch (CqClosedException cce) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Failed to close the CQ, CqName : {} Error : {}", cqName, cce.getMessage());
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Called directly on server side.
+ * @param cqName
+ * @param clientId
+ * @throws CqException
+ */
+ @Override
+ public void stopCq(String cqName, ClientProxyMembershipID clientId)
+ throws CqException {
+ String serverCqName = cqName;
+ if (clientId != null) {
+ serverCqName = this.constructServerCqName(cqName, clientId);
+ }
+
+ ServerCQImpl cQuery = null;
+ StringId errMsg = null;
+ Exception ex = null;
+
+ try {
+ HashMap<String, CqQueryImpl> cqMap = cqQueryMap;
+ if (!cqMap.containsKey(serverCqName)) {
+// throw new CqException(LocalizedStrings.CqService_CQ_NOT_FOUND_FAILED_TO_STOP_THE_SPECIFIED_CQ_0.toLocalizedString(serverCqName));
+ /* gregp 052808: We should silently fail here instead of throwing error. This is to deal with races in recovery */
+ return;
+ }
+ cQuery = (ServerCQImpl)getCq(serverCqName);
+
+ } catch (CacheLoaderException e1) {
+ errMsg = LocalizedStrings.CqService_CQ_NOT_FOUND_IN_THE_CQ_META_REGION_CQNAME_0;
+ ex = e1;
+ } catch (TimeoutException e2) {
+ errMsg = LocalizedStrings.CqService_TIMEOUT_WHILE_TRYING_TO_GET_CQ_FROM_META_REGION_CQNAME_0;
+ ex = e2;
+ } finally {
+ if (ex != null){
+ String s = errMsg.toLocalizedString(cqName);
+ if (logger.isDebugEnabled()) {
+ logger.debug(s);
+ }
+ throw new CqException(s, ex);
+ }
+ }
+
+ try {
+ if(!cQuery.isStopped()) {
+ cQuery.stop();
+ }
+ } catch (CqClosedException cce){
+ throw new CqException(cce.getMessage());
+ } finally {
+ // If this CQ is stopped, disable caching event keys for this CQ.
+ //this.removeCQFromCaching(cQuery.getServerCqName());
+ this.removeFromMatchingCqMap(cQuery);
+ }
+ // Send stop message to peers.
+ cQuery.getCqBaseRegion().getFilterProfile().stopCq(cQuery);
+
+ }
+
+ /* (non-Javadoc)
+ * @see com.gemstone.gemfire.cache.query.internal.InternalCqService#closeCq(java.lang.String, com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID)
+ */
+ @Override
+ public void closeCq(String cqName, ClientProxyMembershipID clientProxyId)
+ throws CqException {
+ String serverCqName = cqName;
+ if (clientProxyId != null) {
+ serverCqName = this.constructServerCqName(cqName, clientProxyId);
+ }
+
+ ServerCQImpl cQuery = null;
+ StringId errMsg = null;
+ Exception ex = null;
+
+ try {
+ HashMap<String, CqQueryImpl> cqMap = cqQueryMap;
+ if (!cqMap.containsKey(serverCqName)) {
+// throw new CqException(LocalizedStrings.CqService_CQ_NOT_FOUND_FAILED_TO_CLOSE_THE_SPECIFIED_CQ_0
+// .toLocalizedString(serverCqName));
+ /* gregp 052808: We should silently fail here instead of throwing error. This is to deal with races in recovery */
+ return;
+ }
+ cQuery = (ServerCQImpl)cqMap.get(serverCqName);
+
+ } catch (CacheLoaderException e1) {
+ errMsg = LocalizedStrings.CqService_CQ_NOT_FOUND_IN_THE_CQ_META_REGION_CQNAME_0;
+ ex = e1;
+ } catch (TimeoutException e2) {
+ errMsg = LocalizedStrings.CqService_TIMEOUT_WHILE_TRYING_TO_GET_CQ_FROM_META_REGION_CQNAME_0;
+ ex = e2;
+ } finally {
+ if (ex != null){
+ String s = errMsg.toLocalizedString(cqName);
+ if (logger.isDebugEnabled()) {
+ logger.debug(s);
+ }
+ throw new CqException(s, ex);
+ }
+ }
+
+ try {
+ cQuery.close(false);
+
+ // Repository Region.
+ // If CQ event caching is enabled, remove this CQs event cache reference.
+ // removeCQFromCaching(serverCqName);
+
+ // CqBaseRegion
+ try {
+ LocalRegion baseRegion = cQuery.getCqBaseRegion();
+ if (baseRegion != null && !baseRegion.isDestroyed()) {
+ // Server specific clean up.
+ if (isServer()){
+ FilterProfile fp = baseRegion.getFilterProfile();
+ if (fp != null) {
+ fp.closeCq(cQuery);
+ }
+ CacheClientProxy clientProxy = cQuery.getCacheClientNotifier().getClientProxy(clientProxyId);
+ clientProxy.decCqCount();
+ if (clientProxy.hasNoCq()) {
+ this.stats.decClientsWithCqs();
+ }
+ }
+ }
+ }catch (Exception e){
+ // May be cache is being shutdown
+ if (logger.isDebugEnabled()) {
+ logger.debug("Failed to remove CQ from the base region. CqName : {}", cqName);
+ }
+ }
+
+ if (isServer()){
+ removeFromBaseRegionToCqNameMap(cQuery.getRegionName(), serverCqName);
+ }
+
+ LocalRegion baseRegion = cQuery.getCqBaseRegion();
+ if(baseRegion.getFilterProfile().getCqCount() <= 0){
+ if (logger.isDebugEnabled()) {
+ logger.debug("Should update the profile for this partitioned region {} for not requiring old value", baseRegion);
+ }
+ }
+ } catch (CqClosedException cce){
+ throw new CqException(cce.getMessage());
+ } finally {
+ this.removeFromMatchingCqMap(cQuery);
+ }
+ }
+
+
+ /* (non-Javadoc)
+ * @see com.gemstone.gemfire.cache.query.internal.InternalCqService#closeAllCqs(boolean)
+ */
+ @Override
+ public void closeAllCqs(boolean clientInitiated) {
+ closeAllCqs(clientInitiated, getAllCqs());
+ }
+
+ /**
+ * Close all CQs executing in this VM, and release resources
+ * associated with executing CQs.
+ * CqQuerys created by other VMs are unaffected.
+ */
+ private void closeAllCqs(boolean clientInitiated, Collection<? extends InternalCqQuery> cqs) {
+ closeAllCqs(clientInitiated, cqs, ((GemFireCacheImpl)this.cache)
+ .keepDurableSubscriptionsAlive());
+ }
+
+ /* (non-Javadoc)
+ * @see com.gemstone.gemfire.cache.query.internal.InternalCqService#closeAllCqs(boolean, com.gemstone.gemfire.cache.query.CqQuery[], boolean)
+ */
+ @Override
+ public void closeAllCqs(boolean clientInitiated, Collection<? extends InternalCqQuery> cqs,
+ boolean keepAlive) {
+
+ //CqQuery[] cqs = getAllCqs();
+ if (cqs != null) {
+ String cqName = null;
+ if (logger.isDebugEnabled()){
+ logger.debug("Closing all CQs, number of CQ to be closed : {}", cqs.size());
+ }
+ for (InternalCqQuery cQuery: cqs){
+ try {
+ cqName = cQuery.getName();
+// boolean keepAlive = ((GemFireCache)this.cache).keepDurableSubscriptionsAlive();
+
+ if(isServer()) {
+ cQuery.close(false);
+ } else {
+ if (clientInitiated) {
+ cQuery.close(true);
+ }
+ else {
+ if(!isServer() && cQuery.isDurable() && keepAlive){
+ logger.warn(LocalizedMessage.create(LocalizedStrings.CqService_NOT_SENDING_CQ_CLOSE_TO_THE_SERVER_AS_IT_IS_A_DURABLE_CQ));
+ cQuery.close(false);
+ }
+ else {
+ cQuery.close(true);
+ }
+ }
+ }
+ } catch (QueryException cqe) {
+ if (!isRunning()) {
+ // Not cache shutdown
+ logger.warn(LocalizedMessage.create(LocalizedStrings.CqService_FAILED_TO_CLOSE_CQ__0___1, new Object[] {cqName, cqe.getMessage()}));
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug(cqe.getMessage(), cqe);
+ }
+ } catch (CqClosedException cqe){
+ if (!isRunning()) {
+ // Not cache shutdown
+ logger.warn(LocalizedMessage.create(LocalizedStrings.CqService_FAILED_TO_CLOSE_CQ__0___1, new Object[] {cqName, cqe.getMessage()}));
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug(cqe.getMessage(), cqe);
+ }
+ }
+ }
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see com.gemstone.gemfire.cache.query.internal.InternalCqService#getCqStatistics()
+ */
+ @Override
+ public CqServiceStatistics getCqStatistics() {
+ return cqServiceStats;
+ }
+
+ /* (non-Javadoc)
+ * @see com.gemstone.gemfire.cache.query.internal.InternalCqService#closeClientCqs(com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID)
+ */
+ @Override
+ public void closeClientCqs(ClientProxyMembershipID clientProxyId)
+ throws CqException {
+ final boolean isDebugEnabled = logger.isDebugEnabled();
+ if (isDebugEnabled) {
+ logger.debug("Closing Client CQs for the client: {}", clientProxyId);
+ }
+ List<ServerCQ> cqs = getAllClientCqs(clientProxyId);
+ for (ServerCQ cq: cqs) {
+ CqQueryImpl cQuery = (CqQueryImpl)cq;
+ try {
+ cQuery.close(false);
+ } catch (QueryException qe) {
+ if (isDebugEnabled) {
+ logger.debug("Failed to close the CQ, CqName : {} Error : {}", cQuery.getName(), qe.getMessage());
+ }
+ } catch (CqClosedException cce) {
+ if (isDebugEnabled) {
+ logger.debug("Failed to close the CQ, CqName : {} Error : {}", cQuery.getName(), cce.getMessage());
+ }
+ }
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see com.gemstone.gemfire.cache.query.internal.InternalCqService#getAllClientCqs(com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID)
+ */
+ @Override
+ public List<ServerCQ> getAllClientCqs(ClientProxyMembershipID clientProxyId){
+ Collection<? extends InternalCqQuery> cqs = getAllCqs();
+ ArrayList<ServerCQ> clientCqs = new ArrayList<ServerCQ>();
+
+ for (InternalCqQuery cq: cqs) {
+ ServerCQImpl cQuery = (ServerCQImpl)cq;
+ ClientProxyMembershipID id = cQuery.getClientProxyId();
+ if (id != null && id.equals(clientProxyId)) {
+ clientCqs.add(cQuery);
+ }
+ }
+ return clientCqs;
+ }
+
+ /* (non-Javadoc)
+ * @see com.gemstone.gemfire.cache.query.internal.InternalCqService#getAllDurableClientCqs(com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID)
+ */
+ @Override
+ public List<String> getAllDurableClientCqs(ClientProxyMembershipID clientProxyId) throws CqException {
+ if (clientProxyId == null) {
+ throw new CqException (LocalizedStrings.CqService_UNABLE_TO_RETRIEVE_DURABLE_CQS_FOR_CLIENT_PROXY_ID.toLocalizedString(clientProxyId));
+ }
+ List<ServerCQ> cqs = getAllClientCqs(clientProxyId);
+ ArrayList<String> durableClientCqs = new ArrayList<String>();
+
+ for (ServerCQ cq: cqs) {
+ ServerCQImpl cQuery = (ServerCQImpl)cq;
+ if (cQuery != null && cQuery.isDurable()) {
+ ClientProxyMembershipID id = cQuery.getClientProxyId();
+ if (id != null && id.equals(clientProxyId)) {
+ durableClientCqs.add(cQuery.getName());
+ }
+ }
+ }
+ return durableClientCqs;
+ }
+
+ /**
+ * Server side method.
+ * Closes non-durable CQs for the given client proxy id.
+ * @param clientProxyId
+ * @throws CqException
+ */
+ @Override
+ public void closeNonDurableClientCqs(ClientProxyMembershipID clientProxyId)
+ throws CqException {
+ final boolean isDebugEnabled = logger.isDebugEnabled();
+ if (isDebugEnabled) {
+ logger.debug("Closing Client CQs for the client: {}", clientProxyId);
+ }
+ List<ServerCQ> cqs = getAllClientCqs(clientProxyId);
+ for (ServerCQ cq: cqs) {
+ ServerCQImpl cQuery = (ServerCQImpl)cq;
+ try {
+ if (!cQuery.isDurable()) {
+ cQuery.close(false);
+ }
+ } catch (QueryException qe) {
+ if (isDebugEnabled) {
+ logger.debug("Failed to close the CQ, CqName : {} Error : {}", cQuery.getName(), qe.getMessage());
+ }
+ } catch (CqClosedException cce) {
+ if (isDebugEnabled) {
+ logger.debug("Failed to close the CQ, CqName : {} Error : {}", cQuery.getName(), cce.getMessage());
+ }
+ }
+ }
+ }
+
+ /**
+ * Is the CQ service in a cache server environment
+ * @return true if cache server, false otherwise
+ */
+ public boolean isServer() {
+ if (this.cache.getCacheServers().isEmpty()) {
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Cleans up the CqService.
+ */
+ @Override
+ public void close() {
+ if (logger.isDebugEnabled()){
+ logger.debug("Closing CqService. {}", this);
+ }
+ // Close All the CQs.
+ // Need to take care when Clients are still connected...
+ closeAllCqs(false);
+ isRunning = false;
+ }
+
+ @Override
+ public boolean isRunning() {
+ return this.isRunning;
+ }
+
+ public void start() {
+ this.isRunning = true;
+ }
+
+ /**
+ * @return Returns the serverCqName.
+ */
+ @Override
+ public String constructServerCqName(String cqName, ClientProxyMembershipID clientProxyId) {
+ String cName = null;
+ if (clientProxyId.isDurable()) {
+ cName = cqName + "__" + clientProxyId.getDurableId();
+ }
+ else {
+ cName = cqName + "__" + clientProxyId.getDSMembership();
+ }
+ return cName;
+ }
+
+ /*
+ * Checks if CQ with the given name already exists.
+ * @param cqName name of the CQ.
+ * @return true if exists else false.
+ */
+ private synchronized boolean isCqExists(String cqName) {
+ boolean status = false;
+ HashMap<String, CqQueryImpl> cqMap =cqQueryMap;
+ status = cqMap.containsKey(cqName);
+ return status;
+ }
+
+ /*
+ * Generates a name for CQ.
+ * Checks if CQ with that name already exists if so generates a new cqName.
+ */
+ public synchronized String generateCqName() {
+ while (true) {
+ String cqName = CQ_NAME_PREFIX + (cqId++);
+ if (!isCqExists(cqName)) {
+ return cqName;
+ }
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see com.gemstone.gemfire.cache.query.internal.InternalCqService#dispatchCqListeners(java.util.HashMap, int, java.lang.Object, java.lang.Object, byte[], com.gemstone.gemfire.cache.client.internal.QueueManager, com.gemstone.gemfire.internal.cache.EventID)
+ */
+ @Override
+ public void dispatchCqListeners(HashMap<String, Integer> cqs, int messageType, Object key,
+ Object value, byte[] delta, QueueManager qManager, EventID eventId) {
+ ClientCQImpl cQuery = null;
+ Object[] fullValue = new Object[1];
+ Iterator<Map.Entry<String, Integer>> iter = cqs.entrySet().iterator();
+ String cqName = null;
+ final boolean isDebugEnabled = logger.isDebugEnabled();
+ while (iter.hasNext()) {
+ try {
+ Map.Entry<String, Integer> entry = iter.next();
+ cqName = entry.getKey();
+ cQuery = (ClientCQImpl) this.getCq(cqName);
+
+ if (cQuery == null || (!cQuery.isRunning() && cQuery.getQueuedEvents() == null)) {
+ if (isDebugEnabled) {
+ logger.debug("Unable to invoke CqListener, {}, CqName : {}", ((cQuery == null)? "CQ not found":" CQ is Not running"), cqName);
+ }
+ continue;
+ }
+
+ Integer cqOp = (Integer)entry.getValue();
+
+ // If Region destroy event, close the cq.
+ if (cqOp.intValue() == MessageType.DESTROY_REGION) {
+ // The close will also invoke the listeners close().
+ try {
+ cQuery.close(false);
+ } catch (Exception ex) {
+ // handle?
+ }
+ continue;
+ }
+
+ // Construct CqEvent.
+ CqEventImpl cqEvent = null;
+ cqEvent = new CqEventImpl(cQuery, getOperation(messageType),
+ getOperation(cqOp.intValue()), key, value, delta, qManager, eventId);
+
+ // Update statistics
+ cQuery.updateStats(cqEvent);
+
+ // Check if CQ Event needs to be queued.
+ if (cQuery.getQueuedEvents() != null) {
+ synchronized(cQuery.queuedEventsSynchObject) {
+ // Get latest value.
+ ConcurrentLinkedQueue<CqEventImpl> queuedEvents = cQuery.getQueuedEvents();
+ // Check to see, if its not set to null while waiting to get
+ // Synchronization lock.
+ if (queuedEvents != null) {
+ if (isDebugEnabled) {
+ logger.debug("Queueing event for key: {}", key);
+ }
+ cQuery.getVsdStats().incQueuedCqListenerEvents();
+ queuedEvents.add(cqEvent);
+ continue;
+ }
+ }
+ }
+
+ this.invokeListeners(cqName, cQuery, cqEvent, fullValue);
+ if (value == null) {
+ value = fullValue[0];
+ }
+
+ } // outer try
+ catch(Throwable t) {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.CqService_ERROR_PROCESSING_CQLISTENER_FOR_CQ_0, cqName), t);
+
+ if (t instanceof VirtualMachineError) {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.CqService_VIRTUALMACHINEERROR_PROCESSING_CQLISTENER_FOR_CQ_0, cqName), t);
+ return;
+ }
+ }
+ } // iteration.
+ }
+
+ public void invokeListeners(String cqName, ClientCQImpl cQuery,
+ CqEventImpl cqEvent) {
+ invokeListeners(cqName, cQuery, cqEvent, null);
+ }
+
+ public void invokeListeners(String cqName, ClientCQImpl cQuery,
+ CqEventImpl cqEvent, Object[] fullValue) {
+ if (!cQuery.isRunning() || cQuery.getCqAttributes() == null) {
+ return;
+ }
+
+ // invoke CQ Listeners.
+ CqListener[] cqListeners = cQuery.getCqAttributes().getCqListeners();
+
+ final boolean isDebugEnabled = logger.isDebugEnabled();
+ if (isDebugEnabled) {
+ logger.debug("Invoking CQ listeners for {}, number of listeners : {} cqEvent : {}", cqName, cqListeners.length, cqEvent);
+ }
+
+ for (int lCnt=0; lCnt < cqListeners.length; lCnt++) {
+ try {
+ // Check if the listener is not null, it could have been changed/reset
+ // by the CqAttributeMutator.
+ if (cqListeners[lCnt] != null){
+ cQuery.getVsdStats().incNumCqListenerInvocations();
+ try {
+ if (cqEvent.getThrowable() != null) {
+ cqListeners[lCnt].onError(cqEvent);
+ } else {
+ cqListeners[lCnt].onEvent(cqEvent);
+ }
+ } catch (InvalidDeltaException ide) {
+ if (isDebugEnabled) {
+ logger.debug("CqService.dispatchCqListeners(): Requesting full value...");
+ }
+ Part result = (Part)GetEventValueOp.executeOnPrimary(cqEvent
+ .getQueueManager().getPool(), cqEvent.getEventID(), null);
+ Object newVal = null;
+ if (result == null || (newVal = result.getObject()) == null) {
+ if (this.cache.getCancelCriterion().cancelInProgress() == null) {
+ Exception ex = new Exception(
+ "Failed to retrieve full value from server for eventID "
+ + cqEvent.getEventID());
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.CqService_EXCEPTION_IN_THE_CQLISTENER_OF_THE_CQ_CQNAME_0_ERROR__1,
+ new Object[] { cqName, ex.getMessage() }));
+ if (isDebugEnabled) {
+ logger.debug(ex.getMessage(), ex);
+ }
+ }
+ }
+ else {
+ ((GemFireCacheImpl)this.cache).getCachePerfStats().incDeltaFullValuesRequested();
+ cqEvent = new CqEventImpl(cQuery, cqEvent.getBaseOperation(),
+ cqEvent.getQueryOperation(), cqEvent.getKey(), newVal,
+ cqEvent.getDeltaValue(), cqEvent.getQueueManager(), cqEvent.getEventID());
+ if (cqEvent.getThrowable() != null) {
+ cqListeners[lCnt].onError(cqEvent);
+ } else {
+ cqListeners[lCnt].onEvent(cqEvent);
+ }
+ if (fullValue != null) {
+ fullValue[0] = newVal;
+ }
+ }
+ }
+ }
+ // Handle client side exceptions.
+ } catch (Exception ex) {
+ if (this.cache.getCancelCriterion().cancelInProgress() == null) {
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.CqService_EXCEPTION_IN_THE_CQLISTENER_OF_THE_CQ_CQNAME_0_ERROR__1,
+ new Object[] { cqName, ex.getMessage()}));
+ if (isDebugEnabled) {
+ logger.debug(ex.getMessage(), ex);
+ }
+ }
+ }
+ catch (VirtualMachineError err) {
+ SystemFailure.initiateFailure(err);
+ // If this ever returns, rethrow the error. We're poisoned
+ // now, so don't let this thread continue.
+ throw err;
+ }
+ catch (Throwable t) {
+ // Whenever you catch Error or Throwable, you must also
+ // catch VirtualMachineError (see above). However, there is
+ // _still_ a possibility that you are dealing with a cascading
+ // error condition, so you also need to check to see if the JVM
+ // is still usable:
+ SystemFailure.checkFailure();
+ logger.warn(LocalizedMessage.create(LocalizedStrings.CqService_RUNTIME_EXCEPTION_IN_THE_CQLISTENER_OF_THE_CQ_CQNAME_0_ERROR__1, new Object[] {cqName, t.getLocalizedMessage()}));
+ if (isDebugEnabled) {
+ logger.debug(t.getMessage(), t);
+ }
+ }
+ }
+ }
+
+ public void invokeCqConnectedListeners(String cqName, ClientCQImpl cQuery, boolean connected) {
+ if (!cQuery.isRunning() || cQuery.getCqAttributes() == null) {
+ return;
+ }
+ cQuery.setConnected(connected);
+ // invoke CQ Listeners.
+ CqListener[] cqListeners = cQuery.getCqAttributes().getCqListeners();
+
+ if (logger.isDebugEnabled()){
+ logger.debug("Invoking CQ status listeners for {}, number of listeners : {}", cqName, cqListeners.length);
+ }
+
+ for (int lCnt=0; lCnt < cqListeners.length; lCnt++) {
+ try {
+ if (cqListeners[lCnt] != null) {
+ if (cqListeners[lCnt] instanceof CqStatusListener) {
+ CqStatusListener listener = (CqStatusListener) cqListeners[lCnt];
+ if (connected) {
+ listener.onCqConnected();
+ }
+ else {
+ listener.onCqDisconnected();
+ }
+ }
+ }
+ // Handle client side exceptions.
+ } catch (Exception ex) {
+ if (this.cache.getCancelCriterion().cancelInProgress() == null) {
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.CqService_EXCEPTION_IN_THE_CQLISTENER_OF_THE_CQ_CQNAME_0_ERROR__1,
+ new Object[] { cqName, ex.getMessage()}));
+ if (logger.isDebugEnabled()) {
+ logger.debug(ex.getMessage(), ex);
+ }
+ }
+ }
+ catch (VirtualMachineError err) {
+ SystemFailure.initiateFailure(err);
+ // If this ever returns, rethrow the error. We're poisoned
+ // now, so don't let this thread continue.
+ throw err;
+ }
+ catch (Throwable t) {
+ // Whenever you catch Error or Throwable, you must also
+ // catch VirtualMachineError (see above). However, there is
+ // _still_ a possibility that you are dealing with a cascading
+ // error condition, so you also need to check to see if the JVM
+ // is still usable:
+ SystemFailure.checkFailure();
+ logger.warn(LocalizedMessage.create(LocalizedStrings.CqService_RUNTIME_EXCEPTION_IN_THE_CQLISTENER_OF_THE_CQ_CQNAME_0_ERROR__1, new Object[] {cqName, t.getLocalizedMessage()}));
+ if (logger.isDebugEnabled()) {
+ logger.debug(t.getMessage(), t);
+ }
+ }
+ }
+ }
+
+
+ /**
+ * Returns the Operation for the given EnumListenerEvent type.
+ * @param eventType
+ * @return Operation
+ */
+ private Operation getOperation(int eventType) {
+ Operation op = null;
+ switch (eventType) {
+ case MessageType.LOCAL_CREATE :
+ op = Operation.CREATE;
+ break;
+
+ case MessageType.LOCAL_UPDATE :
+ op = Operation.UPDATE;
+ break;
+
+ case MessageType.LOCAL_DESTROY :
+ op = Operation.DESTROY;
+ break;
+
+ case MessageType.LOCAL_INVALIDATE :
+ op = Operation.INVALIDATE;
+ break;
+
+ case MessageType.CLEAR_REGION :
+ op = Operation.REGION_CLEAR;
+ break;
+
+ case MessageType.INVALIDATE_REGION :
+ op = Operation.REGION_INVALIDATE;
+ break;
+ }
+ return op;
+ }
+
+ /* (non-Javadoc)
+ * @see com.gemstone.gemfire.cache.query.internal.InternalCqService#processEvents(com.gemstone.gemfire.cache.CacheEvent, com.gemstone.gemfire.distributed.internal.DistributionAdvisor.Profile, com.gemstone.gemfire.distributed.internal.DistributionAdvisor.Profile[], com.gemstone.gemfire.internal.cache.FilterRoutingInfo)
+ */
+ @Override
+ public void processEvents (CacheEvent event, Profile localProfile, Profile[] profiles, FilterRoutingInfo frInfo)
+ throws CqException {
+ //Is this a region event or an entry event
+ if (event instanceof RegionEvent){
+ processRegionEvent(event, localProfile, profiles, frInfo);
+ } else {
+ // Use the PDX types in serialized form.
+ DefaultQuery.setPdxReadSerialized(this.cache, true);
+ try {
+ processEntryEvent (event, localProfile, profiles, frInfo);
+ } finally {
+ DefaultQuery.setPdxReadSerialized(this.cache, false);
+ }
+ }
+ }
+
+ private void processRegionEvent(CacheEvent event, Profile localProfile, Profile[] profiles, FilterRoutingInfo frInfo)
+ throws CqException {
+
+ final boolean isDebugEnabled = logger.isDebugEnabled();
+ if (isDebugEnabled) {
+ logger.debug("CQ service processing region event {}", event);
+ }
+ Integer cqRegionEvent = generateCqRegionEvent(event);
+
+ for (int i=-1; i < profiles.length; i++) {
+ CacheProfile cf;
+ if (i<0) {
+ cf = (CacheProfile)localProfile;
+ if (cf == null) continue;
+ } else {
+ cf = (CacheProfile)profiles[i];
+ }
+ FilterProfile pf = cf.filterProfile;
+ if (pf == null || pf.getCqMap().isEmpty()) {
+ continue;
+ }
+ Map cqs = pf.getCqMap();
+ HashMap<Long, Integer> cqInfo = new HashMap<Long, Integer>();
+ Iterator cqIter = cqs.entrySet().iterator();
+ while (cqIter.hasNext()){
+ Map.Entry cqEntry = (Map.Entry)cqIter.next();
+ ServerCQImpl cQuery = (ServerCQImpl)cqEntry.getValue();
+ if (!event.isOriginRemote() && event.getOperation().isRegionDestroy() &&
+ !((LocalRegion)event.getRegion()).isUsedForPartitionedRegionBucket()) {
+ try {
+ if (isDebugEnabled){
+ logger.debug("Closing CQ on region destroy event. CqName : {}", cQuery.getName());
+ }
+ cQuery.close(false);
+ }
+ catch (Exception ex) {
+ if (isDebugEnabled) {
+ logger.debug("Failed to Close CQ on region destroy. CqName : {}", cQuery.getName(), ex);
+ }
+ }
+ }
+ cqInfo.put(cQuery.getFilterID(), cqRegionEvent);
+ cQuery.getVsdStats().updateStats(cqRegionEvent);
+ }
+ if(pf.isLocalProfile()){
+ frInfo.setLocalCqInfo(cqInfo);
+ } else {
+ frInfo.setCqRoutingInfo(cf.getDistributedMember(), cqInfo);
+ }
+ }
+ }
+
+ private void processEntryEvent(CacheEvent event, Profile localProfile, Profile[] profiles, FilterRoutingInfo frInfo)
+ throws CqException {
+ final boolean isDebugEnabled = logger.isDebugEnabled();
+ HashSet<Object> cqUnfilteredEventsSet_newValue = new HashSet<Object>();
+ HashSet<Object> cqUnfilteredEventsSet_oldValue = new HashSet<Object>();
+ boolean b_cqResults_newValue = false;
+ boolean b_cqResults_oldValue = false;
+ boolean queryOldValue;
+ EntryEvent entryEvent = (EntryEvent)event;
+ Object eventKey = entryEvent.getKey();
+
+ boolean isDupEvent = ((EntryEventImpl)event).isPossibleDuplicate();
+ // The CQ query needs to be applied when the op is update, destroy
+ // invalidate and in case when op is create and its an duplicate
+ // event, the reason for this is when peer sends a duplicate event
+ // it marks it as create and sends it, so that the receiving node
+ // applies it (see DR.virtualPut()).
+ boolean opRequiringQueryOnOldValue = (event.getOperation().isUpdate() ||
+ event.getOperation().isDestroy() ||
+ event.getOperation().isInvalidate() ||
+ (event.getOperation().isCreate() && isDupEvent));
+
+ HashMap<String, Integer> matchedCqs = new HashMap<String, Integer>();
+ long executionStartTime = 0;
+ for (int i=-1; i < profiles.length; i++) {
+ CacheProfile cf;
+ if (i<0) {
+ cf = (CacheProfile)localProfile;
+ if (cf == null) continue;
+ } else {
+ cf = (CacheProfile)profiles[i];
+ }
+ FilterProfile pf = cf.filterProfile;
+ if (pf == null || pf.getCqMap().isEmpty()) {
+ continue;
+ }
+
+ Map cqs = pf.getCqMap();
+
+ if (isDebugEnabled) {
+ logger.debug("Profile for {} processing {} CQs", cf.peerMemberId, cqs.size());
+ }
+
+ if (cqs.isEmpty()) {
+ continue;
+ }
+
+
+ // Get new value. If its not retrieved.
+ if (cqUnfilteredEventsSet_newValue.isEmpty() && (event.getOperation().isCreate() || event.getOperation().isUpdate())) {
+ Object newValue = entryEvent.getNewValue(); // TODO OFFHEAP: optimize by not copying the value on to the heap
+ if (newValue != null) {
+ //We have a new value to run the query on
+ cqUnfilteredEventsSet_newValue.add(newValue);
+ }
+ }
+
+ HashMap<Long, Integer> cqInfo = new HashMap<Long, Integer>();
+ Iterator cqIter = cqs.entrySet().iterator();
+
+ while (cqIter.hasNext()){
+ Map.Entry cqEntry = (Map.Entry)cqIter.next();
+ ServerCQImpl cQuery = (ServerCQImpl)cqEntry.getValue();
+ b_cqResults_newValue = false;
+ b_cqResults_oldValue = false;
+ queryOldValue = false;
+ if (cQuery == null){
+ continue;
+ }
+ String cqName = cQuery.getServerCqName();
+ Long filterID = cQuery.getFilterID();
+
+ if (isDebugEnabled) {
+ logger.debug("Processing CQ : {} Key: {}", cqName, eventKey);
+ }
+
+ Integer cqEvent = null;
+ if (matchedCqs.containsKey(cqName)) {
+ cqEvent = matchedCqs.get(cqName);
+ if (isDebugEnabled) {
+ logger.debug("query {} has already been processed and returned {}", cqName, cqEvent);
+ }
+ if (cqEvent == null) {
+ continue;
+ }
+ // Update the Cache Results for this CQ.
+ if (cqEvent.intValue() == MessageType.LOCAL_CREATE ||
+ cqEvent.intValue() == MessageType.LOCAL_UPDATE) {
+ cQuery.addToCqResultKeys(eventKey);
+ } else if (cqEvent.intValue() == MessageType.LOCAL_DESTROY) {
+ cQuery.markAsDestroyedInCqResultKeys(eventKey);
+ }
+ } else {
+ boolean error = false;
+ synchronized (cQuery) {
+ try {
+ // Apply query on new value.
+ if (!cqUnfilteredEventsSet_newValue.isEmpty()) {
+ executionStartTime = this.stats.startCqQueryExecution();
+
+ b_cqResults_newValue = evaluateQuery(cQuery,
+ new Object[] {cqUnfilteredEventsSet_newValue});
+ this.stats.endCqQueryExecution(executionStartTime);
+ }
+
+ // In case of Update, destroy and invalidate.
+ // Apply query on oldValue.
+ if (opRequiringQueryOnOldValue) {
+ // Check if CQ Result is cached, if not apply query on old
+ // value. Currently the CQ Results are not cached for the
+ // Partitioned Regions. Once this is added remove the check
+ // with PR region.
+ if (cQuery.cqResultKeysInitialized) {
+ b_cqResults_oldValue = cQuery.isPartOfCqResult(eventKey);
+ // For PR if not found in cache, apply the query on old value.
+ // Also apply if the query was not executed during cq execute
+ if ((cQuery.isPR || !CqServiceImpl.EXECUTE_QUERY_DURING_INIT) && b_cqResults_oldValue == false) {
+ queryOldValue = true;
+ }
+ if (isDebugEnabled && !cQuery.isPR && !b_cqResults_oldValue) {
+ logger.debug("Event Key not found in the CQ Result Queue. EventKey : {} CQ Name : {}", eventKey, cqName );
+ }
+ } else {
+ queryOldValue = true;
+ }
+
+ if (queryOldValue) {
+ if (cqUnfilteredEventsSet_oldValue.isEmpty()) {
+ Object oldValue = entryEvent.getOldValue();
+ if (oldValue != null) {
+ cqUnfilteredEventsSet_oldValue.add(oldValue);
+ }
+ }
+
+ // Apply query on old value.
+ if (!cqUnfilteredEventsSet_oldValue.isEmpty()) {
+ executionStartTime = this.stats.startCqQueryExecution();
+ b_cqResults_oldValue = evaluateQuery(cQuery,
+ new Object[] {cqUnfilteredEventsSet_oldValue});
+ this.stats.endCqQueryExecution(executionStartTime);
+ } else {
+ if (isDebugEnabled) {
+ logger.debug("old value for event with key {} is null - query execution not performed", eventKey);
+ }
+ }
+ } // Query oldValue
+
+ }
+ } catch (Exception ex) {
+ // Any exception in running the query should be caught here and
+ // buried because this code is running in-line with the message
+ // processing code and we don't want to kill that thread
+ error = true;
+ // CHANGE LOG MESSAGE:
+ logger.info(LocalizedMessage.create(
+ LocalizedStrings.CqService_ERROR_WHILE_PROCESSING_CQ_ON_THE_EVENT_KEY_0_CQNAME_1_ERROR_2,
+ new Object[] { ((EntryEvent)event).getKey(), cQuery.getName(), ex.getLocalizedMessage()}));
+ }
+
+ if (error) {
+ cqEvent = Integer.valueOf(MessageType.EXCEPTION);
+ }
+ else {
+ if (b_cqResults_newValue) {
+ if (b_cqResults_oldValue) {
+ cqEvent = Integer.valueOf(MessageType.LOCAL_UPDATE);
+ } else {
+ cqEvent = Integer.valueOf(MessageType.LOCAL_CREATE);
+ }
+ // If its create and caching is enabled, cache the key
+ // for this CQ.
+ cQuery.addToCqResultKeys(eventKey);
+ } else if (b_cqResults_oldValue) {
+ // Base invalidate operation is treated as destroy.
+ // When the invalidate comes through, the entry will no longer
+ // satisfy the query and will need to be deleted.
+ cqEvent = Integer.valueOf(MessageType.LOCAL_DESTROY);
+ // If caching is enabled, mark this event's key as removed
+ // from the CQ cache.
+ cQuery.markAsDestroyedInCqResultKeys(eventKey);
+ }
+ }
+ } //end synchronized(cQuery)
+
+ // Get the matching CQs if any.
+ synchronized (this.matchingCqMap){
+ String query = cQuery.getQueryString();
+ HashSet matchingCqs = matchingCqMap.get(query);
+ if (matchingCqs != null) {
+ Iterator iter = matchingCqs.iterator();
+ while (iter.hasNext()) {
+ String matchingCqName = (String)iter.next();
+ if (!matchingCqName.equals(cqName)){
+ matchedCqs.put(matchingCqName, cqEvent);
+ if (isDebugEnabled) {
+ logger.debug("Adding CQ into Matching CQ Map: {} Event is: {}", matchingCqName, cqEvent);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ if (cqEvent != null && cQuery.isRunning()){
+ if (isDebugEnabled) {
+ logger.debug("Added event to CQ with client-side name: {} key: {} operation : {}", cQuery.cqName, eventKey, cqEvent);
+ }
+ cqInfo.put(filterID, cqEvent);
+ CqQueryVsdStats stats = cQuery.getVsdStats();
+ if (stats != null) {
+ stats.updateStats(cqEvent);
+ }
+ }
+ }
+ if (cqInfo.size() > 0) {
+ if(pf.isLocalProfile()){
+ if (isDebugEnabled) {
+ logger.debug("Setting local CQ matches to {}", cqInfo);
+ }
+ frInfo.setLocalCqInfo(cqInfo);
+ } else {
+ if (isDebugEnabled) {
+ logger.debug("Setting CQ matches for {} to {}", cf.getDistributedMember(), cqInfo);
+ }
+ frInfo.setCqRoutingInfo(cf.getDistributedMember(), cqInfo);
+ }
+ }
+ } // iteration over Profiles.
+ }
+
+
+/* public void processEvents (EnumListenerEvent operation, CacheEvent event,
+ ClientUpdateMessage clientMessage,
+ CM<ClientProxyMembershipID, CM<CqQuery, Boolean>> clientIds)
+ throws CqException {
+
+ //Is this a region event or an entry event
+ if (event instanceof RegionEvent){
+ processRegionEvent(operation, event, clientMessage, clientIds);
+ } else {
+ processEntryEvent (operation, event, clientMessage, clientIds);
+ }
+
+ }
+
+ private void processRegionEvent(EnumListenerEvent operation, CacheEvent event,
+ ClientUpdateMessage clientMessage,
+ CM<ClientProxyMembershipID, CM<CqQuery, Boolean>> clientIds)
+ throws CqException {
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Processing region event for region " +
+ ((LocalRegion)(event.getRegion())).getName());
+ }
+ HashMap filteredCqs = new HashMap();
+ Integer cqRegionEvent = generateCqRegionEvent(operation);
+ Iterator it = clientIds.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry me = (Map.Entry)it.next();
+ ClientProxyMembershipID clientId = (ClientProxyMembershipID)me.getKey();
+ CM cqsToBooleans = (CM)me.getValue();
+ if (cqsToBooleans == null) {
+ continue;
+ }
+ Set<CqQuery> cqs = cqsToBooleans.keySet();
+ if (cqs.isEmpty()) {
+ continue;
+ }
+ filteredCqs.clear();
+ Iterator cqIt = cqs.iterator();
+ while (cqIt.hasNext()) {
+ CqQueryImpl cQuery = (CqQueryImpl)cqIt.next();
+ if (operation == EnumListenerEvent.AFTER_REGION_DESTROY) {
+ try {
+ if (logger.isDebugEnabled()){
+ logger.debug("Closing CQ on region destroy event. CqName :"
+ + cQuery.getName());
+ }
+ cQuery.close(false);
+ }
+ catch (Exception ex) {
+ logger.debug("Failed to Close CQ on region destroy. CqName :" +
+ cQuery.getName(), ex);
+ }
+
+ }
+ filteredCqs.put(cQuery.cqName, cqRegionEvent);
+ cQuery.getVsdStats().updateStats(cqRegionEvent);
+
+ }
+ if (!filteredCqs.isEmpty()){
+ ((ClientUpdateMessageImpl)clientMessage).addClientCqs(
+ clientId, filteredCqs);
+ }
+
+ }
+
+ }
+
+ private void processEntryEvent(EnumListenerEvent operation, CacheEvent event,
+ ClientUpdateMessage clientMessage,
+ CM<ClientProxyMembershipID, CM<CqQuery, Boolean>> clientIds)
+ throws CqException {
+ HashSet cqUnfilteredEventsSet_newValue = new HashSet();
+ HashSet cqUnfilteredEventsSet_oldValue = new HashSet();
+ boolean b_cqResults_newValue = false;
+ boolean b_cqResults_oldValue = false;
+ EntryEvent entryEvent = (EntryEvent)event;
+ Object eventKey = entryEvent.getKey();
+ if (operation == EnumListenerEvent.AFTER_CREATE ||
+ operation == EnumListenerEvent.AFTER_UPDATE) {
+ if (entryEvent.getNewValue() != null) {
+ //We have a new value to run the query on
+ cqUnfilteredEventsSet_newValue.clear();
+ cqUnfilteredEventsSet_newValue.add(entryEvent.getNewValue());
+ }
+ }
+
+ HashMap matchedCqs = new HashMap();
+ long executionStartTime = 0;
+ Iterator it = clientIds.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry me = (Map.Entry)it.next();
+ ClientProxyMembershipID clientId = (ClientProxyMembershipID)me.getKey();
+ if (logger.isDebugEnabled()) {
+ logger.debug("Processing event for CQ filter, ClientId : " + clientId);
+ }
+ CM cqsToBooleans = (CM)me.getValue();
+ if (cqsToBooleans == null) {
+ continue;
+ }
+ Set<CqQuery> cqs = cqsToBooleans.keySet();
+ if (cqs.isEmpty()) {
+ continue;
+ }
+ HashMap filteredCqs = new HashMap();
+ Iterator cqIt = cqs.iterator();
+ while (cqIt.hasNext()) {
+ CqQueryImpl cQuery = (CqQueryImpl)cqIt.next();
+ b_cqResults_newValue = false;
+ b_cqResults_oldValue = false;
+ if (cQuery == null || !(cQuery.isRunning())){
+ continue;
+ }
+ String cqName = cQuery.getServerCqName();
+ Integer cqEvent = null;
+ if (matchedCqs.containsKey(cqName)) {
+ if (logger.isDebugEnabled()){
+ logger.debug("Similar cq/query is already processed, getting the cq event-type from the matched cq.");
+ }
+ cqEvent = (Integer)matchedCqs.get(cqName);
+ } else {
+ boolean error = false;
+ boolean hasSeenEvent = false;
+ HashSet cqEventKeys = null;
+ synchronized (cQuery) {
+ try {
+ // Apply query on new value.
+ if (!cqUnfilteredEventsSet_newValue.isEmpty()) {
+ executionStartTime = this.stats.startCqQueryExecution();
+ b_cqResults_newValue = evaluateQuery(cQuery,
+ new Object[] {cqUnfilteredEventsSet_newValue});
+ this.stats.endCqQueryExecution(executionStartTime);
+ }
+ // Check if old value is cached, if not apply query on old value.
+ if (cqToCqEventKeysMap != null) {
+ synchronized (cqToCqEventKeysMap) {
+ if ((cqEventKeys = (HashSet)cqToCqEventKeysMap.get(cqName)) != null) {
+ hasSeenEvent = cqEventKeys.contains(eventKey);
+ }
+ }
+ }
+ if (!hasSeenEvent) {
+ // get the oldValue.
+ // In case of Update, destroy and invalidate.
+ if (operation == EnumListenerEvent.AFTER_UPDATE ||
+ operation == EnumListenerEvent.AFTER_DESTROY ||
+ operation == EnumListenerEvent.AFTER_INVALIDATE) {
+ if (entryEvent.getOldValue() != null) {
+ cqUnfilteredEventsSet_oldValue.clear();
+ cqUnfilteredEventsSet_oldValue.add(entryEvent.getOldValue());
+ // Apply query on old value.
+ executionStartTime = this.stats.startCqQueryExecution();
+ b_cqResults_oldValue = evaluateQuery(cQuery,
+ new Object[] {cqUnfilteredEventsSet_oldValue});
+ this.stats.endCqQueryExecution(executionStartTime);
+ }
+ }
+ }
+ }
+ catch (Exception ex) {
+ //Any exception in running the query
+ // should be caught here and buried
+ //because this code is running inline with the
+ //message processing code and we don't want to
+ //kill that thread
+ error = true;
+ logger.info(
+ LocalizedStrings.CqService_ERROR_WHILE_PROCESSING_CQ_ON_THE_EVENT_KEY_0_CQNAME_1_CLIENTID_2_ERROR_3,
+ new Object[] { ((EntryEvent)event).getKey(), cQuery.getName(), clientId, ex.getLocalizedMessage()});
+ }
+
+ if (error) {
+ cqEvent = Integer.valueOf(MessageType.EXCEPTION);
+ }
+ else {
+ if (b_cqResults_newValue) {
+ if (hasSeenEvent || b_cqResults_oldValue) {
+ cqEvent = Integer.valueOf(MessageType.LOCAL_UPDATE);
+ } else {
+ cqEvent = Integer.valueOf(MessageType.LOCAL_CREATE);
+ }
+ // If its create and caching is enabled, cache the key for this CQ.
+ if (!hasSeenEvent && cqEventKeys != null) {
+ cqEventKeys.add(eventKey);
+ }
+ }
+ else if (hasSeenEvent || (b_cqResults_oldValue)) {
+ // Base invalidate operation is treated as destroy.
+ // When the invalidate comes through, the entry will no longer satisfy
+ // the query and will need to be deleted.
+ cqEvent = Integer.valueOf(MessageType.LOCAL_DESTROY);
+ // If caching is enabled, remove this event's key from the cache.
+ if (hasSeenEvent && cqEventKeys != null) {
+ cqEventKeys.remove(eventKey);
+ }
+ }
+ }
+
+ } //end synchronized(cQuery)
+
+ // Get the matching CQs if any.
+ synchronized (this.matchingCqMap){
+ String query = cQuery.getQueryString();
+ ArrayList matchingCqs = (ArrayList)matchingCqMap.get(query);
+ if (matchingCqs != null) {
+ Iterator iter = matchingCqs.iterator();
+ while (iter.hasNext()) {
+ String matchingCqName = (String)iter.next();
+ if (!matchingCqName.equals(cqName)){
+ matchedCqs.put(matchingCqName, cqEvent);
+ }
+ }
+ }
+ }
+
+ }
+
+ if (cqEvent != null){
+ if (logger.isDebugEnabled()) {
+ logger.debug("Event is added for the CQ, CqName (clientside): " + cQuery.cqName +
+ " With CQ Op : " + cqEvent + " for Client : " + clientId);
+ }
+ filteredCqs.put(cQuery.cqName, cqEvent);
+ cQuery.getVsdStats().updateStats(cqEvent);
+ }
+
+ } // iteration over cqsToBooleans.keySet()
+ if (!filteredCqs.isEmpty()){
+ logger.debug("Adding event map for client : "+clientId + " with event map size : "+filteredCqs.size());
+ ((ClientUpdateMessageImpl)clientMessage).addClientCqs(clientId, filteredCqs);
+ }
+ } // iteration over clientIds.entrySet()
+ }
+*/
+
+ private Integer generateCqRegionEvent(CacheEvent event) {
+ Integer cqEvent = null;
+ if (event.getOperation().isRegionDestroy()) {
+ cqEvent = Integer.valueOf(MessageType.DESTROY_REGION);
+ }
+ else if (event.getOperation().isRegionInvalidate()) {
+ cqEvent = Integer.valueOf(MessageType.INVALIDATE_REGION);
+ }
+ else if (event.getOperation().isClear()){
+ cqEvent = Integer.valueOf(MessageType.CLEAR_REGION);
+ }
+ return cqEvent;
+ }
+
+
+ /**
+ * Manages the CQs created for the base region.
+ * This is managed here, instead of on the base region; since the cq could be
+ * created on the base region, before base region is created (using newCq()).
+ */
+ public void addToBaseRegionToCqNameMap(String regionName, String cqName){
+ synchronized(this.baseRegionToCqNameMap){
+ ArrayList<String> cqs = this.baseRegionToCqNameMap.get(regionName);
+ if (cqs == null){
+ cqs = new ArrayList<String>();
+ }
+ cqs.add(cqName);
+ this.baseRegionToCqNameMap.put(regionName, cqs);
+ }
+ }
+
+ public void removeFromBaseRegionToCqNameMap(String regionName, String cqName){
+ synchronized(this.baseRegionToCqNameMap){
+ ArrayList<String> cqs = this.baseRegionToCqNameMap.get(regionName);
+ if (cqs != null){
+ cqs.remove(cqName);
+ if (cqs.isEmpty()){
+ this.baseRegionToCqNameMap.remove(regionName);
+ } else {
+ this.baseRegionToCqNameMap.put(regionName, cqs);
+ }
+ }
+ }
+ }
+
+ /**
+ * Get the VSD ststs for CQ Service. There is one CQ Service per cache
+ * @return reference to VSD stats object for the CQ service
+ */
+ public CqServiceVsdStats getCqServiceVsdStats() {
+ return stats;
+ }
+
+ /**
+ * Removes this CQ from CQ event Cache map.
+ * This disables the caching events for this CQ.
+ * @param cqName
+ */
+ /*
+ synchronized public void removeCQFromCaching(String cqName){
+ if (cqToCqEventKeysMap != null) {
+ // Take a lock on CqQuery object. In processEvents the maps are
+ // handled under CqQuery object.
+ if (cqToCqEventKeysMap != null){
+ synchronized (cqToCqEventKeysMap) {
+ cqToCqEventKeysMap.remove(cqName);
+ }
+ }
+ }
+ }
+ */
+
+ /**
+ * Returns the CQ event cache map.
+ * @return HashMap cqToCqEventKeysMap
+ *
+ * Caller must synchronize on the returned value in order
+ * to inspect.
+ */
+ /*
+ public HashMap getCqToCqEventKeysMap(){
+ return cqToCqEventKeysMap;
+ }
+ */
+
+ /**
+ * Adds the query from the given CQ to the matched CQ map.
+ * @param cq
+ */
+ public void addToMatchingCqMap(CqQueryImpl cq) {
+ synchronized(this.matchingCqMap){
+ String cqQuery = cq.getQueryString();
+ HashSet<String> matchingCQs = null;
+ if (!matchingCqMap.containsKey(cqQuery)){
+ matchingCQs = new HashSet<String>();
+ matchingCqMap.put(cqQuery, matchingCQs);
+ this.stats.incUniqueCqQuery();
+ } else {
+ matchingCQs = matchingCqMap.get(cqQuery);
+ }
+ matchingCQs.add(cq.getServerCqName());
+ if (logger.isDebugEnabled()) {
+ logger.debug("Adding CQ into MatchingCQ map, CQName: {} Number of matched querys are: {}" , cq.getServerCqName(), matchingCQs.size());
+ }
+ }
+ }
+
+ /**
+ * Removes the query from the given CQ from the matched CQ map.
+ * @param cq
+ */
+ public void removeFromMatchingCqMap(CqQueryImpl cq) {
+ synchronized(this.matchingCqMap){
+ String cqQuery = cq.getQueryString();
+ if (matchingCqMap.containsKey(cqQuery)){
+ HashSet matchingCQs = matchingCqMap.get(cqQuery);
+ matchingCQs.remove(cq.getServerCqName());
+ if (logger.isDebugEnabled()) {
+ logger.debug("Removing CQ from MatchingCQ map, CQName: {} Number of matched querys are: {}", cq.getServerCqName(), matchingCQs.size());
+ }
+ if (matchingCQs.isEmpty()){
+ matchingCqMap.remove(cqQuery);
+ this.stats.decUniqueCqQuery();
+ }
+ }
+ }
+ }
+
+ /**
+ * Returns the matching CQ map.
+ * @return HashMap matchingCqMap
+ */
+ public HashMap<String, HashSet<String>> getMatchingCqMap(){
+ return matchingCqMap;
+ }
+
+ /**
+ * Applies the query on the event.
+ * This method takes care of the performance related changed done to improve
+ * the CQ-query performance. When CQ-query is executed first time, it saves the
+ * query related information in the execution context and uses that info in later
+ * executions.
+ * @param cQuery
+ * @param event
+ * @return boolean
+ */
+ private boolean evaluateQuery(CqQueryImpl cQuery, Object[] event) throws Exception {
+ ExecutionContext execContext = cQuery.getQueryExecutionContext();
+ execContext.reset();
+ execContext.setBindArguments(event);
+ boolean status = false;
+
+ // Check if the CQ query is executed once.
+ // If not execute the query in normal way.
+ // During this phase the query execution related info are stored in the
+ // ExecutionContext.
+ if (execContext.getScopeNum() <= 0) {
+ SelectResults results = (SelectResults)((DefaultQuery)cQuery.getQuery()).executeUsingContext(execContext);
+ if (results != null && results.size() > 0) {
+ status = true;
+ }
+ } else {
+ // Execute using the saved query info (in ExecutionContext).
+ // This avoids building resultSet, index look-up, generating build-plans
+ // that are not required for; query execution on single object.
+ CompiledSelect cs = ((DefaultQuery)(cQuery.getQuery())).getSelect();
+ status = cs.evaluateCq(execContext);
+ }
+ return status;
+ }
+
+ @Override
+ public UserAttributes getUserAttributes(String cqName) {
+ return this.cqNameToUserAttributesMap.get(cqName);
+ }
+
+// public static void memberLeft(String poolName) {
+// if (cqServiceSingleton != null && !cqServiceSingleton.isServer()) {
+// cqServiceSingleton.sendMemberDisconnectedMessageToCqs(poolName);
+// }
+// }
+//
+// public static void memberCrashed(String poolName) {
+// if (cqServiceSingleton != null && !cqServiceSingleton.isServer()) {
+// cqServiceSingleton.sendMemberDisconnectedMessageToCqs(poolName);
+// }
+// }
+//
+
+ @Override
+ public void cqsDisconnected(Pool pool) {
+ invokeCqsConnected(pool, false);
+ }
+
+ @Override
+ public void cqsConnected(Pool pool) {
+ invokeCqsConnected(pool, true);
+ }
+
+ /**
+ * Let cq listeners know that they are connected or disconnected
+ */
+ private void invokeCqsConnected(Pool pool, boolean connected) {
+ String poolName = pool.getName();
+ //Check to see if we are already connected/disconnected.
+ //If state has not changed, do not invoke another connected/disconnected
+ synchronized(cqPoolsConnected) {
+ //don't repeatily send same connect/disconnect message to cq's on repeated fails of RedundancySatisfier
+ if (cqPoolsConnected.containsKey(poolName) && connected == cqPoolsConnected.get(poolName)){
+ return;
+ }
+ cqPoolsConnected.put(poolName, connected);
+
+ Collection<? extends InternalCqQuery> cqs = this.getAllCqs();
+ String cqName = null;
+ final boolean isDebugEnabled = logger.isDebugEnabled();
+ for (InternalCqQuery query: cqs) {
+ try {
+ if (query == null) {
+ continue;
+ }
+
+ cqName = query.getName();
+ ClientCQImpl cQuery = (ClientCQImpl) this.getCq(cqName);
+
+ //Check cq pool to determine if the pool matches, if not continue.
+ //Also if the connected state is already the same, we do not have to send status again.
+ if (cQuery == null) {
+ continue;
+ }
+ Pool cqPool = cQuery.getCQProxy().getPool();
+ if (cQuery.isConnected() == connected || !cqPool.getName().equals(poolName)) {
+ continue;
+ }
+
+ if ((!cQuery.isRunning() && cQuery.getQueuedEvents() == null)) {
+ if (isDebugEnabled) {
+ logger.debug("Unable to invoke CqListener, {}, CqName : {}", ((cQuery == null) ? "CQ not found" : " CQ is Not running"), cqName);
+ }
+ continue;
+ }
+
+ this.invokeCqConnectedListeners(cqName, cQuery, connected);
+ } catch (VirtualMachineError e) {
+ SystemFailure.initiateFailure(e);
+ throw e;
+ } catch (Throwable t) {
+ SystemFailure.checkFailure();
+ logger.warn(LocalizedMessage.create(LocalizedStrings.CqService_ERROR_SENDING_CQ_CONNECTION_STATUS, cqName), t);
+
+ if (t instanceof VirtualMachineError) {
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.CqService_VIRTUALMACHINEERROR_PROCESSING_CQLISTENER_FOR_CQ_0,
+ cqName), t);
+ return;
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public List<String> getAllDurableCqsFromServer(InternalPool pool) {
+ return new ServerCQProxyImpl(pool).getAllDurableCqsFromServer();
+ }
+
+
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/main/java/com/gemstone/gemfire/cache/query/internal/cq/CqServiceStatisticsImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-cq/src/main/java/com/gemstone/gemfire/cache/query/internal/cq/CqServiceStatisticsImpl.java b/gemfire-cq/src/main/java/com/gemstone/gemfire/cache/query/internal/cq/CqServiceStatisticsImpl.java
new file mode 100644
index 0000000..9f504fa
--- /dev/null
+++ b/gemfire-cq/src/main/java/com/gemstone/gemfire/cache/query/internal/cq/CqServiceStatisticsImpl.java
@@ -0,0 +1,92 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *========================================================================
+ */
+
+package com.gemstone.gemfire.cache.query.internal.cq;
+
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.query.CqServiceStatistics;
+import com.gemstone.gemfire.cache.query.CqQuery;
+import com.gemstone.gemfire.cache.query.internal.DefaultQueryService;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+
+/**
+ * Provides statistical information about CqService.
+ *
+ * @since 5.5
+ * @author anil
+ */
+public class CqServiceStatisticsImpl implements CqServiceStatistics {
+ private CqServiceImpl cqService;
+// private long activeCqs;
+// private long stoppedCqs;
+// private long closedCqs;
+// private long createdCqs;
+
+ /**
+ * Constructor for CqStatisticsImpl
+ * @param cqs - CqService
+ */
+ public CqServiceStatisticsImpl(CqServiceImpl cqs) {
+ cqService = cqs;
+ }
+
+ /**
+ * Returns the number of CQs currently executing
+ */
+ public long numCqsActive(){
+ return this.cqService.getCqServiceVsdStats().getNumCqsActive();
+ }
+
+ /**
+ * Returns number of CQs created.
+ * @return long number of cqs created.
+ */
+ public long numCqsCreated(){
+ return this.cqService.getCqServiceVsdStats().getNumCqsCreated();
+ }
+
+ /**
+ * Returns number of Cqs that are closed.
+ */
+ public long numCqsClosed(){
+ return this.cqService.getCqServiceVsdStats().getNumCqsClosed();
+ }
+
+ /**
+ * Returns number of Cqs that are stopped.
+ */
+ public long numCqsStopped(){
+ return this.cqService.getCqServiceVsdStats().getNumCqsStopped();
+ }
+
+ /**
+ * Returns number of CQs created from the client.
+ */
+ public long numCqsOnClient(){
+ return this.cqService.getCqServiceVsdStats().getNumCqsOnClient();
+ }
+
+ /**
+ * Returns the number of CQs (active + suspended) on the given region.
+ * @param regionName
+ */
+ public long numCqsOnRegion(String regionName){
+
+ DefaultQueryService queryService = (DefaultQueryService)((GemFireCacheImpl)CacheFactory.getAnyInstance()).getLocalQueryService();
+ try {
+ CqQuery[] cqs = queryService.getCqs(regionName);
+
+ if (cqs != null) {
+ return cqs.length;
+ }
+ } catch(Exception ex) {
+ // Dont do anything.
+ }
+ return 0;
+ }
+}