You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by rv...@apache.org on 2015/04/28 23:40:23 UTC
[18/51] [partial] incubator-geode git commit: Init
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/AsyncEventQueueFactory.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/AsyncEventQueueFactory.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/AsyncEventQueueFactory.java
new file mode 100644
index 0000000..3316ac7
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/AsyncEventQueueFactory.java
@@ -0,0 +1,182 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.asyncqueue;
+
+import java.util.List;
+
+import com.gemstone.gemfire.cache.wan.GatewayEventFilter;
+import com.gemstone.gemfire.cache.wan.GatewayEventSubstitutionFilter;
+import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy;
+import com.gemstone.gemfire.cache.wan.GatewaySenderFactory;
+
+/**
+ * Factory to create the <code>AsyncEventQueue</code>.
+ * Below example illustrates how to get the instance of factory and create the
+ * <code>AsyncEventQueue</code>.
+<PRE>
+ Cache c = new CacheFactory().create();
+ // get AsyncEventQueueFactory from cache
+ AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
+
+ // set the attributes on factory
+ factory.setBatchSize(batchSize);
+ factory.setBatchConflationEnabled(isConflation);
+ factory.setMaximumQueueMemory(maxMemory);
+ factory.setParallel(isParallel);
+ .
+ .
+ // create instance of AsyncEventListener
+ AsyncEventListener asyncEventListener = new <AsyncEventListener class>;
+ // create AsyncEventQueue by providing the id and instance of AsyncEventListener
+ AsyncEventQueue asyncQueue = factory.create(asyncQueueId, asyncEventListener);
+</PRE>
+ *
+ * @author pdeole
+ * @since 7.0
+ */
+public interface AsyncEventQueueFactory {
+
+ /**
+ * Sets the disk store name for overflow or persistence.
+ *
+ * @param name
+ */
+ public AsyncEventQueueFactory setDiskStoreName(String name);
+
+ /**
+ * Sets the maximum amount of memory (in MB) for an
+ * <code>AsyncEventQueue</code>'s queue.
+ * Default is 100 MB.
+ *
+ * @param memory
+ * The maximum amount of memory (in MB) for an
+ * <code>AsyncEventQueue</code>'s queue
+ */
+ public AsyncEventQueueFactory setMaximumQueueMemory(int memory);
+
+ /**
+ * Sets whether or not the writing to the disk is synchronous.
+ * Default is true.
+ *
+ * @param isSynchronous
+ * boolean if true indicates synchronous writes
+ *
+ */
+ public AsyncEventQueueFactory setDiskSynchronous(boolean isSynchronous);
+
+ /**
+ * Sets the batch size for an <code>AsyncEventQueue</code>'s queue.
+ * Default is 100.
+ *
+ * @param size
+ * The size of batches sent to its <code>AsyncEventListener</code>
+ */
+ public AsyncEventQueueFactory setBatchSize(int size);
+
+ /**
+ * Sets the batch time interval (in milliseconds) for a <code>AsyncEventQueue</code>.
+ * Default is 5 ms.
+ *
+ * @param interval
+ * The maximum time interval that can elapse before a partial batch
+ * is sent from a <code>AsyncEventQueue</code>.
+ */
+ public AsyncEventQueueFactory setBatchTimeInterval(int interval);
+
+ /**
+ * Sets whether the <code>AsyncEventQueue</code> is persistent or not.
+ * Default is false.
+ *
+ * @param isPersistent
+ * Whether to enable persistence for an <code>AsyncEventQueue</code>.
+ */
+ public AsyncEventQueueFactory setPersistent(boolean isPersistent);
+
+ /**
+ * Indicates whether all VMs need to distribute events to remote site. In this
+ * case only the events originating in a particular VM will be in dispatched
+ * in order.
+ * Default is false.
+ *
+ * @param isParallel
+ * boolean to indicate whether distribution policy is parallel
+ */
+
+ public AsyncEventQueueFactory setParallel(boolean isParallel);
+
+ /**
+ * Sets whether to enable batch conflation for <code>AsyncEventQueue</code>.
+ * Default is false.
+ *
+ * @param isConflation
+ * Whether or not to enable batch conflation for batches sent from a
+ * <code>AsyncEventQueue</code>
+ */
+ public AsyncEventQueueFactory setBatchConflationEnabled(boolean isConflation);
+
+ /**
+ * Sets the number of dispatcher thread.
+ * Default is 5.
+ *
+ * @param numThreads
+ */
+ public AsyncEventQueueFactory setDispatcherThreads(int numThreads);
+
+ /**
+ * Removes a <code>GatewayEventFilter</code> to the attributes of
+ * AsyncEventQueue being created by factory.
+ *
+ * @param filter
+ * GatewayEventFilter
+ */
+ public AsyncEventQueueFactory addGatewayEventFilter(GatewayEventFilter filter);
+
+ /**
+ * Removes the provided <code>GatewayEventFilter</code> from the attributes of
+ * AsyncEventQueue being created by factory.
+ *
+ * @param filter
+ */
+ public AsyncEventQueueFactory removeGatewayEventFilter(
+ GatewayEventFilter filter);
+
+ /**
+ * Sets the order policy for multiple dispatchers.
+ * Default is KEY.
+ *
+ * @param policy
+ */
+ public AsyncEventQueueFactory setOrderPolicy(OrderPolicy policy);
+
+ /**
+ * Sets the <code>GatewayEventSubstitutionFilter</code>.
+ *
+ * @param filter
+ * The <code>GatewayEventSubstitutionFilter</code>
+ */
+ public AsyncEventQueueFactory setGatewayEventSubstitutionListener(
+ GatewayEventSubstitutionFilter filter);
+
+
+ /**
+ * Creates the <code>AsyncEventQueue</code>. It accepts Id of AsyncEventQueue
+ * and instance of AsyncEventListener. Multiple queues can be created using
+ * Same listener instance. So, the instance of <code>AsyncEventListener</code>
+ * should be thread safe in that case. The <code>AsyncEventListener</code>
+ * will start receiving events when the <code>AsyncEventQueue</code> is
+ * created.
+ *
+ *
+ * @param id
+ * Id of AsyncEventQueue
+ * @param listener
+ * <code>AsyncEventListener</code> to be added to the regions that
+ * are configured to use this queue.
+ */
+ public AsyncEventQueue create(String id, AsyncEventListener listener);
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java
new file mode 100644
index 0000000..203c444
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java
@@ -0,0 +1,279 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.asyncqueue.internal;
+
+import com.gemstone.gemfire.internal.cache.wan.AsyncEventQueueConfigurationException;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueueFactory;
+import com.gemstone.gemfire.cache.wan.GatewayEventFilter;
+import com.gemstone.gemfire.cache.wan.GatewayEventSubstitutionFilter;
+import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy;
+import com.gemstone.gemfire.cache.wan.GatewaySenderFactory;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAttributes;
+import com.gemstone.gemfire.internal.cache.xmlcache.AsyncEventQueueCreation;
+import com.gemstone.gemfire.internal.cache.xmlcache.CacheCreation;
+import com.gemstone.gemfire.internal.cache.xmlcache.ParallelAsyncEventQueueCreation;
+import com.gemstone.gemfire.internal.cache.xmlcache.ParallelGatewaySenderCreation;
+import com.gemstone.gemfire.internal.cache.xmlcache.SerialAsyncEventQueueCreation;
+import com.gemstone.gemfire.internal.cache.xmlcache.SerialGatewaySenderCreation;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+public class AsyncEventQueueFactoryImpl implements AsyncEventQueueFactory {
+
+ private static final Logger logger = LogService.getLogger();
+
+ /**
+ * Used internally to pass the attributes from this factory to the real
+ * GatewaySender it is creating.
+ */
+ private GatewaySenderAttributes attrs = new GatewaySenderAttributes();
+
+ private Cache cache;
+
+ /**
+ * The default batchTimeInterval for AsyncEventQueue in milliseconds.
+ */
+ public static final int DEFAULT_BATCH_TIME_INTERVAL = 5;
+
+
+ public AsyncEventQueueFactoryImpl(Cache cache) {
+ this.cache = cache;
+ this.attrs = new GatewaySenderAttributes();
+ // set a different default for batchTimeInterval for AsyncEventQueue
+ this.attrs.batchTimeInterval = DEFAULT_BATCH_TIME_INTERVAL;
+ }
+
+ @Override
+ public AsyncEventQueueFactory setBatchSize(int size) {
+ this.attrs.batchSize = size;
+ return this;
+ }
+ public AsyncEventQueueFactory setPersistent(boolean isPersistent) {
+ this.attrs.isPersistenceEnabled = isPersistent;
+ return this;
+ }
+
+ @Override
+ public AsyncEventQueueFactory setDiskStoreName(String name) {
+ this.attrs.diskStoreName = name;
+ return this;
+ }
+
+ @Override
+ public AsyncEventQueueFactory setMaximumQueueMemory(int memory) {
+ this.attrs.maximumQueueMemory = memory;
+ return this;
+ }
+
+ @Override
+ public AsyncEventQueueFactory setDiskSynchronous(boolean isSynchronous) {
+ this.attrs.isDiskSynchronous = isSynchronous;
+ return this;
+ }
+
+ @Override
+ public AsyncEventQueueFactory setBatchTimeInterval(int batchTimeInterval) {
+ this.attrs.batchTimeInterval = batchTimeInterval;
+ return this;
+ }
+
+ @Override
+ public AsyncEventQueueFactory setBatchConflationEnabled(boolean isConflation) {
+ this.attrs.isBatchConflationEnabled = isConflation;
+ return this;
+ }
+
+ @Override
+ public AsyncEventQueueFactory setDispatcherThreads(int numThreads) {
+ this.attrs.dispatcherThreads = numThreads;
+ return this;
+ }
+
+ @Override
+ public AsyncEventQueueFactory setOrderPolicy(OrderPolicy policy) {
+ this.attrs.policy = policy;
+ return this;
+ }
+
+ @Override
+ public AsyncEventQueueFactory addGatewayEventFilter(GatewayEventFilter filter) {
+ this.attrs.addGatewayEventFilter(filter);
+ return this;
+ }
+
+ @Override
+ public AsyncEventQueueFactory removeGatewayEventFilter(
+ GatewayEventFilter filter) {
+ this.attrs.eventFilters.remove(filter);
+ return this;
+ }
+ @Override
+ public AsyncEventQueueFactory setGatewayEventSubstitutionListener(
+ GatewayEventSubstitutionFilter filter) {
+ this.attrs.eventSubstitutionFilter = filter;
+ return this;
+ }
+
+ public AsyncEventQueueFactory removeGatewayEventAlternateValueProvider(
+ GatewayEventSubstitutionFilter provider) {
+ return this;
+ }
+
+ public AsyncEventQueueFactory addAsyncEventListener(
+ AsyncEventListener listener) {
+ this.attrs.addAsyncEventListener(listener);
+ return this;
+ }
+
+ public AsyncEventQueue create(String asyncQueueId, AsyncEventListener listener) {
+ if (listener == null) {
+ throw new IllegalArgumentException(
+ LocalizedStrings.AsyncEventQueue_ASYNC_EVENT_LISTENER_CANNOT_BE_NULL.toLocalizedString());
+ }
+
+ AsyncEventQueue asyncEventQueue = null;
+ if (this.cache instanceof GemFireCacheImpl) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Creating GatewaySender that underlies the AsyncEventQueue");
+ }
+
+ //TODO: Suranjan .separate asynceventqueue from gatewaysender
+ //GatewaySenderFactory senderFactory = this.cache.createGatewaySenderFactory();
+ //senderFactory.setMaximumQueueMemory(attrs.getMaximumQueueMemory());
+ //senderFactory.setBatchSize(attrs.getBatchSize());
+ //senderFactory.setBatchTimeInterval(attrs.getBatchTimeInterval());
+ //if (attrs.isPersistenceEnabled()) {
+ //senderFactory.setPersistenceEnabled(true);
+ //}
+ //senderFactory.setDiskStoreName(attrs.getDiskStoreName());
+ //senderFactory.setDiskSynchronous(attrs.isDiskSynchronous());
+ //senderFactory.setBatchConflationEnabled(attrs.isBatchConflationEnabled());
+ //senderFactory.setParallel(attrs.isParallel());
+ //senderFactory.setDispatcherThreads(attrs.getDispatcherThreads());
+ //if OrderPolicy is not null, set it, otherwise, let the default OrderPolicy take the charge
+ //if (attrs.getOrderPolicy() != null) {
+ //senderFactory.setOrderPolicy(attrs.getOrderPolicy());
+ //}
+ //for (GatewayEventFilter filter : attrs.eventFilters) {
+ //senderFactory.addGatewayEventFilter(filter);
+ //}
+ //senderFactory.setGatewayEventSubstitutionFilter(attrs.getGatewayEventSubstitutionFilter());
+ //Type cast to GatewaySenderFactory implementation impl to add the async event listener
+ //and set the isForInternalUse to true. These methods are not exposed on GatewaySenderFactory
+ //GatewaySenderFactory factoryImpl = (GatewaySenderFactoryImpl) senderFactory;
+ //senderFactory.setForInternalUse(true);
+ //senderFactory.addAsyncEventListener(listener);
+ //senderFactory.setBucketSorted(attrs.isBucketSorted());
+ // add member id to differentiate between this region and the redundant bucket
+ // region created for this queue.
+ //GatewaySender sender =
+ // senderFactory.create(
+ // AsyncEventQueueImpl.getSenderIdFromAsyncEventQueueId(asyncQueueId));
+ addAsyncEventListener(listener);
+ GatewaySender sender = create(AsyncEventQueueImpl.getSenderIdFromAsyncEventQueueId(asyncQueueId));
+ asyncEventQueue = new AsyncEventQueueImpl(sender, listener);
+ ((GemFireCacheImpl) cache).addAsyncEventQueue(asyncEventQueue);
+ } else if (this.cache instanceof CacheCreation) {
+ asyncEventQueue = new AsyncEventQueueCreation(asyncQueueId, attrs, listener);
+ ((CacheCreation) cache).addAsyncEventQueue(asyncEventQueue);
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Returning AsyncEventQueue" + asyncEventQueue);
+ }
+ return asyncEventQueue;
+ }
+
+ private GatewaySender create(String id) {
+ this.attrs.id = id;
+ GatewaySender sender = null;
+
+ if(this.attrs.getDispatcherThreads() <= 0) {
+ throw new AsyncEventQueueConfigurationException(
+ LocalizedStrings.AsyncEventQueue_0_CANNOT_HAVE_DISPATCHER_THREADS_LESS_THAN_1
+ .toLocalizedString(id));
+ }
+
+ if (this.attrs.isParallel()) {
+ if ((this.attrs.getOrderPolicy() != null)
+ && this.attrs.getOrderPolicy().equals(OrderPolicy.THREAD)) {
+ throw new AsyncEventQueueConfigurationException(
+ LocalizedStrings.AsyncEventQueue_0_CANNOT_BE_CREATED_WITH_ORDER_POLICY_1
+ .toLocalizedString(id, this.attrs.getOrderPolicy()));
+ }
+
+ if (this.cache instanceof GemFireCacheImpl) {
+ sender = new ParallelAsyncEventQueueImpl(this.cache, this.attrs);
+ ((GemFireCacheImpl)this.cache).addGatewaySender(sender);
+ if (!this.attrs.isManualStart()) {
+ sender.start();
+ }
+ }
+ else if (this.cache instanceof CacheCreation) {
+ sender = new ParallelAsyncEventQueueCreation(this.cache, this.attrs);
+ ((CacheCreation)this.cache).addGatewaySender(sender);
+ }
+ }
+ else {
+// if (this.attrs.getOrderPolicy() != null) {
+// if (this.attrs.getDispatcherThreads() == GatewaySender.DEFAULT_DISPATCHER_THREADS) {
+// throw new AsyncEventQueueConfigurationException(
+// LocalizedStrings.AsyncEventQueue_INVALID_ORDER_POLICY_CONCURRENCY_0
+// .toLocalizedString(id));
+// }
+// }
+ if (this.attrs.getOrderPolicy() == null && this.attrs.getDispatcherThreads() > 1) {
+ this.attrs.policy = GatewaySender.DEFAULT_ORDER_POLICY;
+ }
+ if (this.cache instanceof GemFireCacheImpl) {
+ sender = new SerialAsyncEventQueueImpl(this.cache, this.attrs);
+ ((GemFireCacheImpl)this.cache).addGatewaySender(sender);
+ if (!this.attrs.isManualStart()) {
+ sender.start();
+ }
+ }
+ else if (this.cache instanceof CacheCreation) {
+ sender = new SerialAsyncEventQueueCreation(this.cache, this.attrs);
+ ((CacheCreation)this.cache).addGatewaySender(sender);
+ }
+ }
+ return sender;
+ }
+
+ public void configureAsyncEventQueue(AsyncEventQueue asyncQueueCreation) {
+ this.attrs.batchSize = asyncQueueCreation.getBatchSize();
+ this.attrs.batchTimeInterval = asyncQueueCreation.getBatchTimeInterval();
+ this.attrs.isBatchConflationEnabled = asyncQueueCreation.isBatchConflationEnabled();
+ this.attrs.isPersistenceEnabled = asyncQueueCreation.isPersistent();
+ this.attrs.diskStoreName = asyncQueueCreation.getDiskStoreName();
+ this.attrs.isDiskSynchronous = asyncQueueCreation.isDiskSynchronous();
+ this.attrs.maximumQueueMemory = asyncQueueCreation.getMaximumQueueMemory();
+ this.attrs.isParallel = asyncQueueCreation.isParallel();
+ this.attrs.isBucketSorted = ((AsyncEventQueueCreation)asyncQueueCreation).isBucketSorted();
+ this.attrs.dispatcherThreads = asyncQueueCreation.getDispatcherThreads();
+ this.attrs.policy = asyncQueueCreation.getOrderPolicy();
+ this.attrs.eventFilters = asyncQueueCreation.getGatewayEventFilters();
+ this.attrs.eventSubstitutionFilter = asyncQueueCreation.getGatewayEventSubstitutionFilter();
+ }
+
+ public AsyncEventQueueFactory setParallel(boolean isParallel) {
+ this.attrs.isParallel = isParallel;
+ return this;
+ }
+ public AsyncEventQueueFactory setBucketSorted(boolean isbucketSorted) {
+ this.attrs.isBucketSorted = isbucketSorted;
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueImpl.java
new file mode 100644
index 0000000..5e621f1
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueImpl.java
@@ -0,0 +1,188 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.asyncqueue.internal;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue;
+import com.gemstone.gemfire.cache.wan.GatewayEventFilter;
+import com.gemstone.gemfire.cache.wan.GatewayEventSubstitutionFilter;
+import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy;
+import com.gemstone.gemfire.internal.cache.RegionQueue;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor;
+import com.gemstone.gemfire.internal.cache.wan.serial.ConcurrentSerialGatewaySenderEventProcessor;
+
+public class AsyncEventQueueImpl implements AsyncEventQueue {
+
+ private GatewaySender sender = null;
+
+ private AsyncEventListener asyncEventListener = null;
+
+ public static final String ASYNC_EVENT_QUEUE_PREFIX = "AsyncEventQueue_";
+
+ public AsyncEventQueueImpl(GatewaySender sender, AsyncEventListener eventListener) {
+ this.sender = sender;
+ this.asyncEventListener = eventListener;
+ }
+
+ @Override
+ public String getId() {
+ return getAsyncEventQueueIdFromSenderId(this.sender.getId());
+ }
+
+ @Override
+ public AsyncEventListener getAsyncEventListener() {
+ return asyncEventListener;
+ }
+
+ @Override
+ public List<GatewayEventFilter> getGatewayEventFilters() {
+ return sender.getGatewayEventFilters();
+ }
+
+ @Override
+ public GatewayEventSubstitutionFilter getGatewayEventSubstitutionFilter() {
+ return sender.getGatewayEventSubstitutionFilter();
+ }
+
+ @Override
+ public int getBatchSize() {
+ return sender.getBatchSize();
+ }
+
+ @Override
+ public String getDiskStoreName() {
+ return sender.getDiskStoreName();
+ }
+
+ @Override
+ public int getBatchTimeInterval() {
+ return sender.getBatchTimeInterval();
+ }
+
+ @Override
+ public boolean isBatchConflationEnabled() {
+ return sender.isBatchConflationEnabled();
+ }
+
+ @Override
+ public int getMaximumQueueMemory() {
+ return sender.getMaximumQueueMemory();
+ }
+
+ @Override
+ public boolean isPersistent() {
+ return sender.isPersistenceEnabled();
+ }
+
+ @Override
+ public boolean isDiskSynchronous() {
+ return sender.isDiskSynchronous();
+ }
+
+ @Override
+ public int getDispatcherThreads() {
+ return sender.getDispatcherThreads();
+ }
+
+ @Override
+ public OrderPolicy getOrderPolicy() {
+ return sender.getOrderPolicy();
+ }
+
+ @Override
+ public boolean isPrimary() {
+ return ((AbstractGatewaySender) sender).isPrimary();
+ }
+
+ @Override
+ public int size() {
+ AbstractGatewaySenderEventProcessor eventProcessor =
+ ((AbstractGatewaySender) sender).getEventProcessor();
+
+ int size = 0;
+ if (eventProcessor instanceof ConcurrentSerialGatewaySenderEventProcessor) {
+ Set<RegionQueue> queues =
+ ((ConcurrentSerialGatewaySenderEventProcessor) eventProcessor).getQueues();
+ Iterator<RegionQueue> itr = queues.iterator();
+ while (itr.hasNext()) {
+ size = size + itr.next().size();
+ }
+ } else {
+ size = eventProcessor.getQueue().size();
+ }
+ return size;
+ }
+
+ public GatewaySender getSender() {
+ return this.sender;
+ }
+
+ public AsyncEventQueueStats getStatistics() {
+ AbstractGatewaySender abstractSender = (AbstractGatewaySender) this.sender;
+ return ((AsyncEventQueueStats) abstractSender.getStatistics());
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null) {
+ return false;
+ }
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof AsyncEventQueue)) {
+ return false;
+ }
+ AsyncEventQueueImpl asyncEventQueue = (AsyncEventQueueImpl) obj;
+ if (asyncEventQueue.getId().equals(this.getId())) {
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return getId().hashCode();
+ }
+
+ public static String getSenderIdFromAsyncEventQueueId(String asyncQueueId) {
+ StringBuilder builder = new StringBuilder();
+ builder.append(ASYNC_EVENT_QUEUE_PREFIX);
+ builder.append(asyncQueueId);
+ return builder.toString();
+ }
+
+ public static String getAsyncEventQueueIdFromSenderId(String senderId) {
+ if (!senderId.startsWith(ASYNC_EVENT_QUEUE_PREFIX)) {
+ return senderId;
+ }
+ else {
+ return senderId.substring(ASYNC_EVENT_QUEUE_PREFIX.length());
+ }
+ }
+
+ public static boolean isAsyncEventQueue(String senderId) {
+ return senderId.startsWith(ASYNC_EVENT_QUEUE_PREFIX);
+ }
+
+ public boolean isParallel() {
+ return sender.isParallel();
+ }
+
+ public boolean isBucketSorted() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueStats.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueStats.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueStats.java
new file mode 100644
index 0000000..3f0c7a0
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueStats.java
@@ -0,0 +1,177 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.asyncqueue.internal;
+
+import com.gemstone.gemfire.StatisticDescriptor;
+import com.gemstone.gemfire.StatisticsFactory;
+import com.gemstone.gemfire.StatisticsType;
+import com.gemstone.gemfire.StatisticsTypeFactory;
+import com.gemstone.gemfire.internal.StatisticsTypeFactoryImpl;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderStats;
+
+public class AsyncEventQueueStats extends GatewaySenderStats {
+
+ public static final String typeName = "AsyncEventQueueStatistics";
+
+ /** The <code>StatisticsType</code> of the statistics */
+ private static final StatisticsType type;
+
+
+ static {
+
+ StatisticsTypeFactory f = StatisticsTypeFactoryImpl.singleton();
+
+ type = f.createType(typeName, "Stats for activity in the AsyncEventQueue",
+ new StatisticDescriptor[] {
+ f.createIntCounter
+ (EVENTS_RECEIVED,
+ "Number of events received by this queue.",
+ "operations"),
+ f.createIntCounter
+ (EVENTS_QUEUED,
+ "Number of events added to the event queue.",
+ "operations"),
+ f.createLongCounter
+ (EVENT_QUEUE_TIME,
+ "Total time spent queueing events.",
+ "nanoseconds"),
+ f.createIntGauge
+ (EVENT_QUEUE_SIZE,
+ "Size of the event queue.",
+ "operations", false),
+ f.createIntGauge
+ (TMP_EVENT_QUEUE_SIZE,
+ "Size of the temporary events queue.",
+ "operations", false),
+ f.createIntCounter
+ (EVENTS_NOT_QUEUED_CONFLATED,
+ "Number of events received but not added to the event queue because the queue already contains an event with the event's key.",
+ "operations"),
+ f.createIntCounter
+ (EVENTS_CONFLATED_FROM_BATCHES,
+ "Number of events conflated from batches.",
+ "operations"),
+ f.createIntCounter
+ (EVENTS_DISTRIBUTED,
+ "Number of events removed from the event queue and sent.",
+ "operations"),
+ f.createIntCounter
+ (EVENTS_EXCEEDING_ALERT_THRESHOLD,
+ "Number of events exceeding the alert threshold.",
+ "operations", false),
+ f.createLongCounter
+ (BATCH_DISTRIBUTION_TIME,
+ "Total time spent distributing batches of events to receivers.",
+ "nanoseconds"),
+ f.createIntCounter
+ (BATCHES_DISTRIBUTED,
+ "Number of batches of events removed from the event queue and sent.",
+ "operations"),
+ f.createIntCounter
+ (BATCHES_REDISTRIBUTED,
+ "Number of batches of events removed from the event queue and resent.",
+ "operations", false),
+ f.createIntCounter
+ (UNPROCESSED_TOKENS_ADDED_BY_PRIMARY,
+ "Number of tokens added to the secondary's unprocessed token map by the primary (though a listener).",
+ "tokens"),
+ f.createIntCounter
+ (UNPROCESSED_EVENTS_ADDED_BY_SECONDARY,
+ "Number of events added to the secondary's unprocessed event map by the secondary.",
+ "events"),
+ f.createIntCounter
+ (UNPROCESSED_EVENTS_REMOVED_BY_PRIMARY,
+ "Number of events removed from the secondary's unprocessed event map by the primary (though a listener).",
+ "events"),
+ f.createIntCounter
+ (UNPROCESSED_TOKENS_REMOVED_BY_SECONDARY,
+ "Number of tokens removed from the secondary's unprocessed token map by the secondary.",
+ "tokens"),
+ f.createIntCounter
+ (UNPROCESSED_EVENTS_REMOVED_BY_TIMEOUT,
+ "Number of events removed from the secondary's unprocessed event map by a timeout.",
+ "events"),
+ f.createIntCounter
+ (UNPROCESSED_TOKENS_REMOVED_BY_TIMEOUT,
+ "Number of tokens removed from the secondary's unprocessed token map by a timeout.",
+ "tokens"),
+ f.createIntGauge
+ (UNPROCESSED_EVENT_MAP_SIZE,
+ "Current number of entries in the secondary's unprocessed event map.",
+ "events", false),
+ f.createIntGauge
+ (UNPROCESSED_TOKEN_MAP_SIZE,
+ "Current number of entries in the secondary's unprocessed token map.",
+ "tokens", false),
+ f.createIntGauge
+ (CONFLATION_INDEXES_MAP_SIZE,
+ "Current number of entries in the conflation indexes map.",
+ "events"),
+ f.createIntCounter
+ (NOT_QUEUED_EVENTS,
+ "Number of events not added to queue.",
+ "events"),
+ f.createIntCounter
+ (EVENTS_FILTERED,
+ "Number of events filtered through GatewayEventFilter.",
+ "events"),
+ f.createIntCounter
+ (LOAD_BALANCES_COMPLETED,
+ "Number of load balances completed",
+ "operations"),
+ f.createIntGauge
+ (LOAD_BALANCES_IN_PROGRESS,
+ "Number of load balances in progress",
+ "operations"),
+ f.createLongCounter
+ (LOAD_BALANCE_TIME,
+ "Total time spent load balancing this sender",
+ "nanoseconds"),
+ });
+
+ // Initialize id fields
+ eventsReceivedId = type.nameToId(EVENTS_RECEIVED);
+ eventsQueuedId = type.nameToId(EVENTS_QUEUED);
+ eventsNotQueuedConflatedId = type.nameToId(EVENTS_NOT_QUEUED_CONFLATED);
+ eventQueueTimeId = type.nameToId(EVENT_QUEUE_TIME);
+ eventQueueSizeId = type.nameToId(EVENT_QUEUE_SIZE);
+ eventTmpQueueSizeId = type.nameToId(TMP_EVENT_QUEUE_SIZE);
+ eventsDistributedId = type.nameToId(EVENTS_DISTRIBUTED);
+ eventsExceedingAlertThresholdId = type.nameToId(EVENTS_EXCEEDING_ALERT_THRESHOLD);
+ batchDistributionTimeId = type.nameToId(BATCH_DISTRIBUTION_TIME);
+ batchesDistributedId = type.nameToId(BATCHES_DISTRIBUTED);
+ batchesRedistributedId = type.nameToId(BATCHES_REDISTRIBUTED);
+ unprocessedTokensAddedByPrimaryId = type.nameToId(UNPROCESSED_TOKENS_ADDED_BY_PRIMARY);
+ unprocessedEventsAddedBySecondaryId = type.nameToId(UNPROCESSED_EVENTS_ADDED_BY_SECONDARY);
+ unprocessedEventsRemovedByPrimaryId = type.nameToId(UNPROCESSED_EVENTS_REMOVED_BY_PRIMARY);
+ unprocessedTokensRemovedBySecondaryId = type.nameToId(UNPROCESSED_TOKENS_REMOVED_BY_SECONDARY);
+ unprocessedEventsRemovedByTimeoutId = type.nameToId(UNPROCESSED_EVENTS_REMOVED_BY_TIMEOUT);
+ unprocessedTokensRemovedByTimeoutId = type.nameToId(UNPROCESSED_TOKENS_REMOVED_BY_TIMEOUT);
+ unprocessedEventMapSizeId = type.nameToId(UNPROCESSED_EVENT_MAP_SIZE);
+ unprocessedTokenMapSizeId = type.nameToId(UNPROCESSED_TOKEN_MAP_SIZE);
+ conflationIndexesMapSizeId = type.nameToId(CONFLATION_INDEXES_MAP_SIZE);
+ notQueuedEventsId = type.nameToId(NOT_QUEUED_EVENTS);
+ eventsFilteredId = type.nameToId(EVENTS_FILTERED);
+ eventsConflatedFromBatchesId = type.nameToId(EVENTS_CONFLATED_FROM_BATCHES);
+ loadBalancesCompletedId = type.nameToId(LOAD_BALANCES_COMPLETED);
+ loadBalancesInProgressId = type.nameToId(LOAD_BALANCES_IN_PROGRESS);
+ loadBalanceTimeId = type.nameToId(LOAD_BALANCE_TIME);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param f The <code>StatisticsFactory</code> which creates the
+ * <code>Statistics</code> instance
+ * @param asyncQueueId The id of the <code>AsyncEventQueue</code> used to
+ * generate the name of the <code>Statistics</code>
+ */
+ public AsyncEventQueueStats(StatisticsFactory f, String asyncQueueId) {
+ super(f, asyncQueueId, type);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java
new file mode 100644
index 0000000..74a8ef5
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java
@@ -0,0 +1,244 @@
+package com.gemstone.gemfire.cache.asyncqueue.internal;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.EntryOperation;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
+import com.gemstone.gemfire.cache.wan.GatewayTransportFilter;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.ResourceEvent;
+import com.gemstone.gemfire.distributed.internal.DistributionAdvisor.Profile;
+import com.gemstone.gemfire.internal.cache.DistributedRegion;
+import com.gemstone.gemfire.internal.cache.EntryEventImpl;
+import com.gemstone.gemfire.internal.cache.EventID;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.PartitionedRegionHelper;
+import com.gemstone.gemfire.internal.cache.UpdateAttributesProcessor;
+import com.gemstone.gemfire.internal.cache.ha.ThreadIdentifier;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAttributes;
+import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderEventProcessor;
+import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAdvisor.GatewaySenderProfile;
+import com.gemstone.gemfire.internal.cache.xmlcache.CacheCreation;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+
+public class ParallelAsyncEventQueueImpl extends AbstractGatewaySender {
+
+ private static final Logger logger = LogService.getLogger();
+
+ final ThreadGroup loggerGroup = LoggingThreadGroup.createThreadGroup(
+ "Remote Site Discovery Logger Group", logger);
+
+ public ParallelAsyncEventQueueImpl(){
+ super();
+ this.isParallel = true;
+ }
+
+ public ParallelAsyncEventQueueImpl(Cache cache, GatewaySenderAttributes attrs) {
+ super(cache, attrs);
+ if (!(this.cache instanceof CacheCreation)) {
+ // this sender lies underneath the AsyncEventQueue. Need to have
+ // AsyncEventQueueStats
+ this.statistics = new AsyncEventQueueStats(
+ cache.getDistributedSystem(), AsyncEventQueueImpl
+ .getAsyncEventQueueIdFromSenderId(id));
+ }
+ this.isForInternalUse = true;
+ }
+
+ @Override
+ public void start() {
+ this.lifeCycleLock.writeLock().lock();
+ try {
+ if (isRunning()) {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.GatewaySender_SENDER_0_IS_ALREADY_RUNNING, this.getId()));
+ return;
+ }
+
+ if (this.remoteDSId != DEFAULT_DISTRIBUTED_SYSTEM_ID) {
+ String locators = ((GemFireCacheImpl)this.cache).getDistributedSystem()
+ .getConfig().getLocators();
+ if (locators.length() == 0) {
+ throw new IllegalStateException(
+ LocalizedStrings.AbstractGatewaySender_LOCATOR_SHOULD_BE_CONFIGURED_BEFORE_STARTING_GATEWAY_SENDER
+ .toLocalizedString());
+ }
+ }
+ /*
+ * Now onwards all processing will happen through "ConcurrentParallelGatewaySenderEventProcessor"
+ * we have made "ParallelGatewaySenderEventProcessor" and "ParallelGatewaySenderQueue" as a
+ * utility classes of Concurrent version of processor and queue.
+ */
+ eventProcessor = new ConcurrentParallelGatewaySenderEventProcessor(this);
+ /*if (getDispatcherThreads() > 1) {
+ eventProcessor = new ConcurrentParallelGatewaySenderEventProcessor(this);
+ } else {
+ eventProcessor = new ParallelGatewaySenderEventProcessor(this);
+ }*/
+
+ eventProcessor.start();
+ waitForRunningStatus();
+ //Only notify the type registry if this is a WAN gateway queue
+ if(!isAsyncEventQueue()) {
+ ((GemFireCacheImpl) getCache()).getPdxRegistry().gatewaySenderStarted(this);
+ }
+ new UpdateAttributesProcessor(this).distribute(false);
+
+ InternalDistributedSystem system = (InternalDistributedSystem) this.cache
+ .getDistributedSystem();
+ system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_START, this);
+
+ logger.info(LocalizedMessage.create(LocalizedStrings.ParallelGatewaySenderImpl_STARTED__0, this));
+
+ if (!tmpQueuedEvents.isEmpty()) {
+ enqueTempEvents();
+ }
+ }
+ finally {
+ this.lifeCycleLock.writeLock().unlock();
+ }
+ }
+
+// /**
+// * The sender is not started but only the message queue i.e. shadowPR is created on the node.
+// * @param targetPr
+// */
+// private void createMessageQueueOnAccessorNode(PartitionedRegion targetPr) {
+// eventProcessor = new ParallelGatewaySenderEventProcessor(this, targetPr);
+// }
+
+
+ @Override
+ public void stop() {
+ this.lifeCycleLock.writeLock().lock();
+ try {
+ if (!this.isRunning()) {
+ return;
+ }
+ // Stop the dispatcher
+ AbstractGatewaySenderEventProcessor ev = this.eventProcessor;
+ if (ev != null && !ev.isStopped()) {
+ ev.stopProcessing();
+ }
+
+ // Stop the proxy (after the dispatcher, so the socket is still
+ // alive until after the dispatcher has stopped)
+ stompProxyDead();
+
+ // Close the listeners
+ for (AsyncEventListener listener : this.listeners) {
+ listener.close();
+ }
+ //stop the running threads, open sockets if any
+ ((ConcurrentParallelGatewaySenderQueue)this.eventProcessor.getQueue()).cleanUp();
+
+ logger.info(LocalizedMessage.create(LocalizedStrings.GatewayImpl_STOPPED__0, this));
+
+ InternalDistributedSystem system = (InternalDistributedSystem) this.cache
+ .getDistributedSystem();
+ system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_STOP, this);
+
+ clearTempEventsAfterSenderStopped();
+ }
+ finally {
+ this.lifeCycleLock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuffer sb = new StringBuffer();
+ sb.append("ParallelGatewaySender{");
+ sb.append("id=" + getId());
+ sb.append(",remoteDsId="+ getRemoteDSId());
+ sb.append(",isRunning ="+ isRunning());
+ sb.append("}");
+ return sb.toString();
+ }
+
+ public void fillInProfile(Profile profile) {
+ assert profile instanceof GatewaySenderProfile;
+ GatewaySenderProfile pf = (GatewaySenderProfile)profile;
+ pf.Id = getId();
+ pf.remoteDSId = getRemoteDSId();
+ pf.isRunning = isRunning();
+ pf.isPrimary = isPrimary();
+ pf.isParallel = true;
+ pf.isBatchConflationEnabled = isBatchConflationEnabled();
+ pf.isPersistenceEnabled = isPersistenceEnabled();
+ pf.alertThreshold = getAlertThreshold();
+ pf.manualStart = isManualStart();
+ pf.dispatcherThreads = getDispatcherThreads();
+ pf.orderPolicy = getOrderPolicy();
+ for (com.gemstone.gemfire.cache.wan.GatewayEventFilter filter : getGatewayEventFilters()) {
+ pf.eventFiltersClassNames.add(filter.getClass().getName());
+ }
+ for (GatewayTransportFilter filter : getGatewayTransportFilters()) {
+ pf.transFiltersClassNames.add(filter.getClass().getName());
+ }
+ for (AsyncEventListener listener : getAsyncEventListeners()) {
+ pf.senderEventListenerClassNames.add(listener.getClass().getName());
+ }
+ pf.isDiskSynchronous = isDiskSynchronous();
+ }
+
+ /* (non-Javadoc)
+ * @see com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender#setModifiedEventId(com.gemstone.gemfire.internal.cache.EntryEventImpl)
+ */
+ @Override
+ protected void setModifiedEventId(EntryEventImpl clonedEvent) {
+ int bucketId = -1;
+ //merged from 42004
+ if (clonedEvent.getRegion() instanceof DistributedRegion) {
+// if (getOrderPolicy() == OrderPolicy.THREAD) {
+// bucketId = PartitionedRegionHelper.getHashKey(
+// ((EntryEventImpl)clonedEvent).getEventId().getThreadID(),
+// getMaxParallelismForReplicatedRegion());
+// }
+// else
+ bucketId = PartitionedRegionHelper.getHashKey(clonedEvent.getKey(),
+ getMaxParallelismForReplicatedRegion());
+ }
+ else {
+ bucketId = PartitionedRegionHelper
+ .getHashKey((EntryOperation)clonedEvent);
+ }
+ EventID originalEventId = clonedEvent.getEventId();
+ long originatingThreadId = ThreadIdentifier.getRealThreadID(originalEventId.getThreadID());
+
+ long newThreadId = ThreadIdentifier
+ .createFakeThreadIDForParallelGSPrimaryBucket(bucketId,
+ originatingThreadId, getEventIdIndex());
+
+ // In case of parallel as all events go through primary buckets
+ // we don't neet to generate different threadId for secondary buckets
+ // as they will be rejected if seen at PR level itself
+
+// boolean isPrimary = ((PartitionedRegion)getQueue().getRegion())
+// .getRegionAdvisor().getBucketAdvisor(bucketId).isPrimary();
+// if (isPrimary) {
+// newThreadId = ThreadIdentifier
+// .createFakeThreadIDForParallelGSPrimaryBucket(bucketId,
+// originatingThreadId);
+// } else {
+// newThreadId = ThreadIdentifier
+// .createFakeThreadIDForParallelGSSecondaryBucket(bucketId,
+// originatingThreadId);
+// }
+
+ EventID newEventId = new EventID(originalEventId.getMembershipID(),
+ newThreadId, originalEventId.getSequenceID(), bucketId);
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}: Generated event id for event with key={}, bucketId={}, original event id={}, threadId={}, new event id={}, newThreadId={}",
+ this, clonedEvent.getKey(), bucketId, originalEventId, originatingThreadId, newEventId, newThreadId);
+ }
+ clonedEvent.setEventId(newEventId);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
new file mode 100644
index 0000000..90b41ac
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
@@ -0,0 +1,241 @@
+package com.gemstone.gemfire.cache.asyncqueue.internal;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
+import com.gemstone.gemfire.cache.wan.GatewayTransportFilter;
+import com.gemstone.gemfire.distributed.DistributedLockService;
+import com.gemstone.gemfire.distributed.internal.DistributionAdvisor.Profile;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.ResourceEvent;
+import com.gemstone.gemfire.internal.cache.EntryEventImpl;
+import com.gemstone.gemfire.internal.cache.EventID;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.RegionQueue;
+import com.gemstone.gemfire.internal.cache.UpdateAttributesProcessor;
+import com.gemstone.gemfire.internal.cache.ha.ThreadIdentifier;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAdvisor.GatewaySenderProfile;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAttributes;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderConfigurationException;
+import com.gemstone.gemfire.internal.cache.wan.serial.ConcurrentSerialGatewaySenderEventProcessor;
+import com.gemstone.gemfire.internal.cache.wan.serial.SerialGatewaySenderEventProcessor;
+import com.gemstone.gemfire.internal.cache.wan.serial.SerialGatewaySenderQueue;
+import com.gemstone.gemfire.internal.cache.xmlcache.CacheCreation;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+
+public class SerialAsyncEventQueueImpl extends AbstractGatewaySender {
+
+ private static final Logger logger = LogService.getLogger();
+
+ final ThreadGroup loggerGroup = LoggingThreadGroup.createThreadGroup(
+ "Remote Site Discovery Logger Group", logger);
+
+ public SerialAsyncEventQueueImpl(){
+ super();
+ this.isParallel = false;
+ }
+ public SerialAsyncEventQueueImpl(Cache cache,
+ GatewaySenderAttributes attrs) {
+ super(cache, attrs);
+ if (!(this.cache instanceof CacheCreation)) {
+ // this sender lies underneath the AsyncEventQueue. Need to have
+ // AsyncEventQueueStats
+ this.statistics = new AsyncEventQueueStats(
+ cache.getDistributedSystem(), AsyncEventQueueImpl
+ .getAsyncEventQueueIdFromSenderId(id));
+ }
+ }
+
+ @Override
+ public void start() {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Starting gatewaySender : {}", this);
+ }
+
+ this.lifeCycleLock.writeLock().lock();
+ try {
+ if (isRunning()) {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.GatewaySender_SENDER_0_IS_ALREADY_RUNNING, this.getId()));
+ return;
+ }
+ if (this.remoteDSId != DEFAULT_DISTRIBUTED_SYSTEM_ID) {
+ String locators = ((GemFireCacheImpl)this.cache).getDistributedSystem()
+ .getConfig().getLocators();
+ if (locators.length() == 0) {
+ throw new GatewaySenderConfigurationException(
+ LocalizedStrings.AbstractGatewaySender_LOCATOR_SHOULD_BE_CONFIGURED_BEFORE_STARTING_GATEWAY_SENDER
+ .toLocalizedString());
+ }
+ }
+ getSenderAdvisor().initDLockService();
+ if (!isPrimary()) {
+ if (getSenderAdvisor().volunteerForPrimary()) {
+ getSenderAdvisor().makePrimary();
+ } else {
+ getSenderAdvisor().makeSecondary();
+ }
+ }
+ if (getDispatcherThreads() > 1) {
+ eventProcessor = new ConcurrentSerialGatewaySenderEventProcessor(
+ SerialAsyncEventQueueImpl.this);
+ } else {
+ eventProcessor = new SerialGatewaySenderEventProcessor(
+ SerialAsyncEventQueueImpl.this, getId());
+ }
+ eventProcessor.start();
+ waitForRunningStatus();
+ this.startTime = System.currentTimeMillis();
+
+ //Only notify the type registry if this is a WAN gateway queue
+ if(!isAsyncEventQueue()) {
+ ((GemFireCacheImpl) getCache()).getPdxRegistry().gatewaySenderStarted(this);
+ }
+ new UpdateAttributesProcessor(this).distribute(false);
+
+
+ InternalDistributedSystem system = (InternalDistributedSystem) this.cache
+ .getDistributedSystem();
+ system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_START, this);
+
+ logger.info(LocalizedMessage.create(LocalizedStrings.SerialGatewaySenderImpl_STARTED__0, this));
+
+ enqueTempEvents();
+ } finally {
+ this.lifeCycleLock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public void stop() {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Stopping Gateway Sender : {}", this);
+ }
+ this.lifeCycleLock.writeLock().lock();
+ try {
+ // Stop the dispatcher
+ AbstractGatewaySenderEventProcessor ev = this.eventProcessor;
+ if (ev != null && !ev.isStopped()) {
+ ev.stopProcessing();
+ }
+
+ // Stop the proxy (after the dispatcher, so the socket is still
+ // alive until after the dispatcher has stopped)
+ stompProxyDead();
+
+ // Close the listeners
+ for (AsyncEventListener listener : this.listeners) {
+ listener.close();
+ }
+ logger.info(LocalizedMessage.create(LocalizedStrings.GatewayImpl_STOPPED__0, this));
+
+ clearTempEventsAfterSenderStopped();
+ } finally {
+ this.lifeCycleLock.writeLock().unlock();
+ }
+ if (this.isPrimary()) {
+ try {
+ DistributedLockService
+ .destroy(getSenderAdvisor().getDLockServiceName());
+ } catch (IllegalArgumentException e) {
+ // service not found... ignore
+ }
+ }
+ if (getQueues() != null && !getQueues().isEmpty()) {
+ for (RegionQueue q : getQueues()) {
+ ((SerialGatewaySenderQueue)q).cleanUp();
+ }
+ }
+ this.setIsPrimary(false);
+ new UpdateAttributesProcessor(this).distribute(false);
+ Thread lockObtainingThread = getSenderAdvisor().getLockObtainingThread();
+ if (lockObtainingThread != null && lockObtainingThread.isAlive()) {
+ // wait a while for thread to terminate
+ try {
+ lockObtainingThread.join(3000);
+ } catch (InterruptedException ex) {
+ // we allowed our join to be canceled
+ // reset interrupt bit so this thread knows it has been interrupted
+ Thread.currentThread().interrupt();
+ }
+ if (lockObtainingThread.isAlive()) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.GatewaySender_COULD_NOT_STOP_LOCK_OBTAINING_THREAD_DURING_GATEWAY_SENDER_STOP));
+ }
+ }
+
+ InternalDistributedSystem system = (InternalDistributedSystem) this.cache
+ .getDistributedSystem();
+ system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_STOP, this);
+ }
+
+ @Override
+ public String toString() {
+ StringBuffer sb = new StringBuffer();
+ sb.append("SerialGatewaySender{");
+ sb.append("id=" + getId());
+ sb.append(",remoteDsId="+ getRemoteDSId());
+ sb.append(",isRunning ="+ isRunning());
+ sb.append(",isPrimary ="+ isPrimary());
+ sb.append("}");
+ return sb.toString();
+ }
+
+ @Override
+ public void fillInProfile(Profile profile) {
+ assert profile instanceof GatewaySenderProfile;
+ GatewaySenderProfile pf = (GatewaySenderProfile)profile;
+ pf.Id = getId();
+ pf.startTime = getStartTime();
+ pf.remoteDSId = getRemoteDSId();
+ pf.isRunning = isRunning();
+ pf.isPrimary = isPrimary();
+ pf.isParallel = false;
+ pf.isBatchConflationEnabled = isBatchConflationEnabled();
+ pf.isPersistenceEnabled = isPersistenceEnabled();
+ pf.alertThreshold = getAlertThreshold();
+ pf.manualStart = isManualStart();
+ for (com.gemstone.gemfire.cache.wan.GatewayEventFilter filter : getGatewayEventFilters()) {
+ pf.eventFiltersClassNames.add(filter.getClass().getName());
+ }
+ for (GatewayTransportFilter filter : getGatewayTransportFilters()) {
+ pf.transFiltersClassNames.add(filter.getClass().getName());
+ }
+ for (AsyncEventListener listener : getAsyncEventListeners()) {
+ pf.senderEventListenerClassNames.add(listener.getClass().getName());
+ }
+ pf.isDiskSynchronous = isDiskSynchronous();
+ pf.dispatcherThreads = getDispatcherThreads();
+ pf.orderPolicy = getOrderPolicy();
+ pf.serverLocation = this.getServerLocation();
+ }
+
+ /* (non-Javadoc)
+ * @see com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender#setModifiedEventId(com.gemstone.gemfire.internal.cache.EntryEventImpl)
+ */
+ @Override
+ protected void setModifiedEventId(EntryEventImpl clonedEvent) {
+ EventID originalEventId = clonedEvent.getEventId();
+ long originalThreadId = originalEventId.getThreadID();
+ long newThreadId = originalThreadId;
+ if (ThreadIdentifier.isWanTypeThreadID(newThreadId)) {
+ // This thread id has already been converted. Do nothing.
+ } else {
+ newThreadId = ThreadIdentifier
+ .createFakeThreadIDForParallelGSPrimaryBucket(0, originalThreadId,
+ getEventIdIndex());
+ }
+ EventID newEventId = new EventID(originalEventId.getMembershipID(),
+ newThreadId, originalEventId.getSequenceID());
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}: Generated event id for event with key={}, original event id={}, originalThreadId={}, new event id={}, newThreadId={}",
+ this, clonedEvent.getKey(), originalEventId, originalThreadId, newEventId, newThreadId);
+ }
+ clonedEvent.setEventId(newEventId);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/AllConnectionsInUseException.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/AllConnectionsInUseException.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/AllConnectionsInUseException.java
new file mode 100644
index 0000000..95c86f0
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/AllConnectionsInUseException.java
@@ -0,0 +1,52 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client;
+
+
+/**
+ * Indicates that the connection pool is at its maximum size and
+ * all connections are in use.
+ * @author dsmith
+ * @since 5.7
+ */
+public class AllConnectionsInUseException extends ServerConnectivityException {
+
+ private static final long serialVersionUID = 7304243507881787071L;
+
+ /**
+ * Create a new instance of AllConnectionsInUseException without a detail message or cause.
+ */
+ public AllConnectionsInUseException() {
+ }
+
+ /**
+ * Create a new instance of AllConnectionsInUseException with a detail message
+ * @param message the detail message
+ */
+ public AllConnectionsInUseException(String message) {
+ super(message);
+ }
+
+ /**
+ * Create a new instance of AllConnectionsInUseException with a cause
+ * @param cause the cause
+ */
+ public AllConnectionsInUseException(Throwable cause) {
+ super(cause);
+ }
+
+ /**
+ * Create a new instance of AllConnectionsInUseException with a detail message and cause
+ * @param message the detail message
+ * @param cause the cause
+ */
+ public AllConnectionsInUseException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/ClientCache.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/ClientCache.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/ClientCache.java
new file mode 100644
index 0000000..7582da9
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/ClientCache.java
@@ -0,0 +1,159 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *========================================================================
+ */
+
+package com.gemstone.gemfire.cache.client;
+
+import java.net.InetSocketAddress;
+import java.util.Properties;
+import java.util.Set;
+
+import com.gemstone.gemfire.cache.*;
+import com.gemstone.gemfire.cache.query.QueryService;
+
+/**
+ * A ClientCache instance controls the life cycle of the local singleton cache in a client.
+ * <p>A ClientCache is created using {@link ClientCacheFactory#create}.
+ * See {@link ClientCacheFactory} for common usage patterns for creating the client cache instance.
+ * <p>ClientCache provides
+ * access to functionality when a member connects as a client to GemFire servers.
+ * It provides the following services:
+ <ul>
+ * <li> Access to existing regions (see {@link #getRegion} and {@link #rootRegions}).
+ * <li> Creation of regions (see {@link #createClientRegionFactory(ClientRegionShortcut)} and {@link #createClientRegionFactory(String)}).
+ * <li> Access the query service (see {@link #getQueryService} and {@link #getLocalQueryService}).</li>
+ * <li> Access the GemFire logger (see {@link #getLogger}).</li>
+ * <li> Access the GemFire distributed system (see {@link #getDistributedSystem}).</li>
+ * <li> Access the GemFire resource manager (see {@link #getResourceManager}).</li>
+ * <li> Manages local disk stores for this cache instance (see {@link #createDiskStoreFactory}).</li>
+ * <li> Creation of authenticated cache views that support multiple users (see {@link #createAuthenticatedView}).
+ </ul>
+ * <p>A ClientCache connects to a server using a {@link Pool}. This pool can be
+ * configured in the ClientCacheFactory (by default GemFire tries to create a pool
+ * which tries to connect to a server on the localhost on port 40404). This default pool
+ * is used by {@link Region}s (created using {@link ClientRegionFactory}) to talk to
+ * regions on the server.
+ * <p>More pools can be created using {@link PoolManager} or by declaring them in cache.xml.
+ * @since 6.5
+ * @author darrel
+ */
+public interface ClientCache extends GemFireCache {
+ /**
+ * Return the QueryService for the named pool.
+ * The query operations performed
+ * using this QueryService will be executed on the servers that are associated
+ * with this pool.
+ */
+ public QueryService getQueryService(String poolName);
+
+ /**
+ * Return a QueryService that queries the local state in the client cache.
+ * These queries will not be sent to a server.
+ */
+ public QueryService getLocalQueryService();
+
+ /**
+ * Terminates this object cache and releases all the resources.
+ * Calls {@link Region#close} on each region in the cache.
+ * After this cache is closed, any further
+ * method call on this cache or any region object will throw
+ * {@link CacheClosedException}, unless otherwise noted.
+ * @param keepalive whether the server should keep the durable client's queues alive for the timeout period
+ * @throws CacheClosedException if the cache is already closed.
+ */
+ public void close(boolean keepalive);
+
+ /**
+ * Create and return a client region factory that is initialized to create
+ * a region using the given predefined region attributes.
+ * @param shortcut the predefined region attributes to initialize the factory with.
+ * @return a factory that will produce a client region.
+ */
+ public <K,V> ClientRegionFactory<K,V> createClientRegionFactory(ClientRegionShortcut shortcut);
+
+ /**
+ * Create and return a client region factory that is initialized to create
+ * a region using the given named region attributes.
+ * <p>Named region attributes are defined in cache.xml by setting the name as
+ * the value of the <code>id</code> attribute on a <code>region-attributes</code> element.
+ * @param regionAttributesId the named region attributes to initialize the factory with.
+ * @throws IllegalStateException if named region attributes has not been defined.
+ * @return a factory that will produce a client region.
+ */
+ public <K,V> ClientRegionFactory<K,V> createClientRegionFactory(String regionAttributesId);
+
+ /**
+ * Notifies the server that this durable client is ready to receive updates.
+ * This method is used by durable clients to notify servers that they
+ * are ready to receive updates. As soon as the server receives this message,
+ * it will forward updates to this client (if necessary).
+ * <p>
+ * Durable clients must call this method after they are done creating regions
+ * and issuing interest registration requests.If it is called before then events
+ * will be lost.Any time a new {@link Pool} is created and regions have been
+ * added to it then this method needs to be called again.
+ * <p>
+ *
+ * @throws IllegalStateException if called by a non-durable client
+ */
+ public void readyForEvents();
+
+ /**
+ * Creates an authenticated cache view using the given user security properties
+ * on the client cache's default pool.
+ * Multiple views with different user properties can be created on a
+ * single client cache.
+ *
+ * Requires that {@link ClientCacheFactory#setPoolMultiuserAuthentication(boolean) multiuser-authentication}
+ * to be set to true on the default pool.
+
+ * Applications must use this instance to do operations, when
+ * multiuser-authentication is set to true.
+ *
+ * <p>
+ * Authenticated cache views are only allows to access {@link ClientRegionShortcut#PROXY proxy} regions.
+ * The {@link RegionService#getRegion} method will throw IllegalStateException
+ * if an attempt is made to get a region that has local storage.
+ *
+ * @throws UnsupportedOperationException
+ * when invoked with multiuser-authentication as false.
+ * @param userSecurityProperties
+ * the security properties of a user.
+ * @return the {@link RegionService} instance associated with a user and the given
+ * properties.
+ */
+ public RegionService createAuthenticatedView(Properties userSecurityProperties);
+
+ /**
+ * Creates an authenticated cache view using the given user security properties
+ * using the given pool to connect to servers.
+ * Requires that {@link PoolFactory#setMultiuserAuthentication(boolean) multiuser-authentication} to be set to true
+ * on the given pool.
+ * <p>See {@link #createAuthenticatedView(Properties)} for more information
+ * on the returned cache view.
+ * @param userSecurityProperties the security properties of a user.
+ * @param poolName - the pool that the users should be authenticated against.
+ * @return the {@link RegionService} instance associated with a user and the given
+ * properties.
+ */
+ public RegionService createAuthenticatedView(Properties userSecurityProperties, String poolName);
+
+ /**
+ * Returns a set of the servers to which this client is currently connected.
+ * @since 6.6
+ */
+ public Set<InetSocketAddress> getCurrentServers();
+
+ /**
+ * Returns the default server pool. If one or more non-default pools were
+ * configured, this may return null.
+ * @since 7.0
+ * @see com.gemstone.gemfire.cache.client.Pool
+ */
+ public Pool getDefaultPool();
+
+}