You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by rv...@apache.org on 2015/04/28 23:40:23 UTC

[18/51] [partial] incubator-geode git commit: Init

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/AsyncEventQueueFactory.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/AsyncEventQueueFactory.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/AsyncEventQueueFactory.java
new file mode 100644
index 0000000..3316ac7
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/AsyncEventQueueFactory.java
@@ -0,0 +1,182 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.asyncqueue;
+
+import java.util.List;
+
+import com.gemstone.gemfire.cache.wan.GatewayEventFilter;
+import com.gemstone.gemfire.cache.wan.GatewayEventSubstitutionFilter;
+import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy;
+import com.gemstone.gemfire.cache.wan.GatewaySenderFactory;
+
+/**
+ * Factory to create the <code>AsyncEventQueue</code>. 
+ * Below example illustrates how to get the instance of factory and create the 
+ * <code>AsyncEventQueue</code>.
+<PRE>
+  Cache c = new CacheFactory().create();
+  // get AsyncEventQueueFactory from cache
+  AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
+  
+  // set the attributes on factory
+  factory.setBatchSize(batchSize);
+  factory.setBatchConflationEnabled(isConflation);
+  factory.setMaximumQueueMemory(maxMemory);
+  factory.setParallel(isParallel);
+             .
+             .
+  // create instance of AsyncEventListener           
+  AsyncEventListener asyncEventListener = new <AsyncEventListener class>;
+  // create AsyncEventQueue by providing the id and instance of AsyncEventListener
+  AsyncEventQueue asyncQueue = factory.create(asyncQueueId, asyncEventListener);
+</PRE>
+ * 
+ * @author pdeole
+ * @since 7.0
+ */
+public interface AsyncEventQueueFactory {
+
+  /**
+   * Sets the disk store name for overflow or persistence.
+   * 
+   * @param name
+   */
+  public AsyncEventQueueFactory setDiskStoreName(String name);
+
+  /**
+   * Sets the maximum amount of memory (in MB) for an
+   * <code>AsyncEventQueue</code>'s queue.
+   * Default is 100 MB.
+   * 
+   * @param memory
+   *          The maximum amount of memory (in MB) for an
+   *          <code>AsyncEventQueue</code>'s queue
+   */
+  public AsyncEventQueueFactory setMaximumQueueMemory(int memory);
+  
+  /**
+   * Sets whether or not the writing to the disk is synchronous.
+   * Default is true.
+   * 
+   * @param isSynchronous
+   *          boolean if true indicates synchronous writes
+   *  
+   */
+  public AsyncEventQueueFactory setDiskSynchronous(boolean isSynchronous);
+
+  /**
+   * Sets the batch size for an <code>AsyncEventQueue</code>'s queue.
+   * Default is 100.
+   * 
+   * @param size
+   *          The size of batches sent to its <code>AsyncEventListener</code>
+   */
+  public AsyncEventQueueFactory setBatchSize(int size);
+  
+  /**
+   * Sets the batch time interval (in milliseconds) for a <code>AsyncEventQueue</code>.
+   * Default is 5 ms.
+   * 
+   * @param interval
+   *          The maximum time interval that can elapse before a partial batch
+   *          is sent from a <code>AsyncEventQueue</code>.
+   */
+  public AsyncEventQueueFactory setBatchTimeInterval(int interval);
+
+  /**
+   * Sets whether the <code>AsyncEventQueue</code> is persistent or not.
+   * Default is false.
+   * 
+   * @param isPersistent
+   *          Whether to enable persistence for an <code>AsyncEventQueue</code>.
+   */
+  public AsyncEventQueueFactory setPersistent(boolean isPersistent);
+
+  /**
+   * Indicates whether all VMs need to distribute events to remote site. In this
+   * case only the events originating in a particular VM will be in dispatched
+   * in order.
+   * Default is false.
+   * 
+   * @param isParallel
+   *          boolean to indicate whether distribution policy is parallel
+   */
+
+  public AsyncEventQueueFactory setParallel(boolean isParallel);
+  
+  /**
+   * Sets whether to enable batch conflation for <code>AsyncEventQueue</code>.
+   * Default is false.
+   * 
+   * @param     isConflation        
+   *              Whether or not to enable batch conflation for batches sent from a 
+   *              <code>AsyncEventQueue</code>
+   */
+  public AsyncEventQueueFactory setBatchConflationEnabled(boolean isConflation);
+  
+  /**
+   * Sets the number of dispatcher thread.
+   * Default is 5.
+   * 
+   * @param numThreads
+   */
+  public AsyncEventQueueFactory setDispatcherThreads(int numThreads);
+
+  /**
+   * Removes a <code>GatewayEventFilter</code> to the attributes of
+   * AsyncEventQueue being created by factory.
+   * 
+   * @param filter
+   *          GatewayEventFilter
+   */
+  public AsyncEventQueueFactory addGatewayEventFilter(GatewayEventFilter filter);
+
+  /**
+   * Removes the provided <code>GatewayEventFilter</code> from the attributes of
+   * AsyncEventQueue being created by factory.
+   * 
+   * @param filter
+   */
+  public AsyncEventQueueFactory removeGatewayEventFilter(
+      GatewayEventFilter filter);
+  
+  /**
+   * Sets the order policy for multiple dispatchers.
+   * Default is KEY.
+   * 
+   * @param policy
+   */
+  public AsyncEventQueueFactory setOrderPolicy(OrderPolicy policy);
+  
+  /**
+   * Sets the <code>GatewayEventSubstitutionFilter</code>.
+   * 
+   * @param filter
+   *          The <code>GatewayEventSubstitutionFilter</code>
+   */
+  public AsyncEventQueueFactory setGatewayEventSubstitutionListener(
+      GatewayEventSubstitutionFilter filter);
+
+
+  /**
+   * Creates the <code>AsyncEventQueue</code>. It accepts Id of AsyncEventQueue
+   * and instance of AsyncEventListener. Multiple queues can be created using
+   * Same listener instance. So, the instance of <code>AsyncEventListener</code>
+   * should be thread safe in that case. The <code>AsyncEventListener</code>
+   * will start receiving events when the <code>AsyncEventQueue</code> is
+   * created.
+   * 
+   * 
+   * @param id
+   *          Id of AsyncEventQueue
+   * @param listener
+   *          <code>AsyncEventListener</code> to be added to the regions that
+   *          are configured to use this queue.
+   */
+  public AsyncEventQueue create(String id, AsyncEventListener listener);
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java
new file mode 100644
index 0000000..203c444
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java
@@ -0,0 +1,279 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.asyncqueue.internal;
+
+import com.gemstone.gemfire.internal.cache.wan.AsyncEventQueueConfigurationException;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueueFactory;
+import com.gemstone.gemfire.cache.wan.GatewayEventFilter;
+import com.gemstone.gemfire.cache.wan.GatewayEventSubstitutionFilter;
+import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy;
+import com.gemstone.gemfire.cache.wan.GatewaySenderFactory;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAttributes;
+import com.gemstone.gemfire.internal.cache.xmlcache.AsyncEventQueueCreation;
+import com.gemstone.gemfire.internal.cache.xmlcache.CacheCreation;
+import com.gemstone.gemfire.internal.cache.xmlcache.ParallelAsyncEventQueueCreation;
+import com.gemstone.gemfire.internal.cache.xmlcache.ParallelGatewaySenderCreation;
+import com.gemstone.gemfire.internal.cache.xmlcache.SerialAsyncEventQueueCreation;
+import com.gemstone.gemfire.internal.cache.xmlcache.SerialGatewaySenderCreation;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+public class AsyncEventQueueFactoryImpl implements AsyncEventQueueFactory {
+
+  private static final Logger logger = LogService.getLogger();
+  
+  /**
+   * Used internally to pass the attributes from this factory to the real
+   * GatewaySender it is creating.
+   */
+  private GatewaySenderAttributes attrs = new GatewaySenderAttributes();
+
+  private Cache cache;
+  
+  /**
+   * The default batchTimeInterval for AsyncEventQueue in milliseconds.
+   */
+  public static final int DEFAULT_BATCH_TIME_INTERVAL = 5;
+  
+  
+  public AsyncEventQueueFactoryImpl(Cache cache) {
+    this.cache = cache;
+    this.attrs = new GatewaySenderAttributes();
+    // set a different default for batchTimeInterval for AsyncEventQueue
+    this.attrs.batchTimeInterval = DEFAULT_BATCH_TIME_INTERVAL;
+  }
+  
+  @Override
+  public AsyncEventQueueFactory setBatchSize(int size) {
+    this.attrs.batchSize = size;
+    return this;
+  }
+  public AsyncEventQueueFactory setPersistent(boolean isPersistent) {
+    this.attrs.isPersistenceEnabled = isPersistent;
+    return this;
+  }
+  
+  @Override
+  public AsyncEventQueueFactory setDiskStoreName(String name) {
+    this.attrs.diskStoreName = name;
+    return this;
+  }
+
+  @Override
+  public AsyncEventQueueFactory setMaximumQueueMemory(int memory) {
+    this.attrs.maximumQueueMemory = memory;
+    return this;
+  }
+  
+  @Override
+  public AsyncEventQueueFactory setDiskSynchronous(boolean isSynchronous) {
+    this.attrs.isDiskSynchronous = isSynchronous;
+    return this;
+  }
+  
+  @Override
+  public AsyncEventQueueFactory setBatchTimeInterval(int batchTimeInterval) {
+    this.attrs.batchTimeInterval = batchTimeInterval;
+    return this;
+  }
+  
+  @Override
+  public AsyncEventQueueFactory setBatchConflationEnabled(boolean isConflation) {
+    this.attrs.isBatchConflationEnabled = isConflation;
+    return this;
+  }
+  
+  @Override
+  public AsyncEventQueueFactory setDispatcherThreads(int numThreads) {
+    this.attrs.dispatcherThreads = numThreads;
+    return this;
+  }
+  
+  @Override
+  public AsyncEventQueueFactory setOrderPolicy(OrderPolicy policy) {
+    this.attrs.policy = policy;
+    return this;
+  }
+  
+  @Override
+  public AsyncEventQueueFactory addGatewayEventFilter(GatewayEventFilter filter) {
+    this.attrs.addGatewayEventFilter(filter);
+    return this;
+  }
+
+  @Override
+  public AsyncEventQueueFactory removeGatewayEventFilter(
+      GatewayEventFilter filter) {
+    this.attrs.eventFilters.remove(filter);
+    return this;
+  }
+  @Override
+  public AsyncEventQueueFactory setGatewayEventSubstitutionListener(
+      GatewayEventSubstitutionFilter filter) {
+    this.attrs.eventSubstitutionFilter = filter;
+    return this;
+  }
+  
+  public AsyncEventQueueFactory removeGatewayEventAlternateValueProvider(
+      GatewayEventSubstitutionFilter provider) {
+    return this;
+  }
+  
+  public AsyncEventQueueFactory addAsyncEventListener(
+      AsyncEventListener listener) {
+    this.attrs.addAsyncEventListener(listener);
+    return this;
+  }
+
+  public AsyncEventQueue create(String asyncQueueId, AsyncEventListener listener) {
+    if (listener == null) {
+      throw new IllegalArgumentException(
+          LocalizedStrings.AsyncEventQueue_ASYNC_EVENT_LISTENER_CANNOT_BE_NULL.toLocalizedString());
+    }
+    
+    AsyncEventQueue asyncEventQueue = null;
+    if (this.cache instanceof GemFireCacheImpl) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("Creating GatewaySender that underlies the AsyncEventQueue");
+      }
+      
+      //TODO: Suranjan .separate asynceventqueue from gatewaysender
+      //GatewaySenderFactory senderFactory = this.cache.createGatewaySenderFactory();
+      //senderFactory.setMaximumQueueMemory(attrs.getMaximumQueueMemory());
+      //senderFactory.setBatchSize(attrs.getBatchSize());
+      //senderFactory.setBatchTimeInterval(attrs.getBatchTimeInterval());
+      //if (attrs.isPersistenceEnabled()) {
+        //senderFactory.setPersistenceEnabled(true);
+      //}
+      //senderFactory.setDiskStoreName(attrs.getDiskStoreName());
+      //senderFactory.setDiskSynchronous(attrs.isDiskSynchronous());
+      //senderFactory.setBatchConflationEnabled(attrs.isBatchConflationEnabled());
+      //senderFactory.setParallel(attrs.isParallel());
+      //senderFactory.setDispatcherThreads(attrs.getDispatcherThreads());
+      //if OrderPolicy is not null, set it, otherwise, let the default OrderPolicy take the charge
+      //if (attrs.getOrderPolicy() != null) {
+    	//senderFactory.setOrderPolicy(attrs.getOrderPolicy());
+      //}
+      //for (GatewayEventFilter filter : attrs.eventFilters) {
+        //senderFactory.addGatewayEventFilter(filter);
+      //}
+      //senderFactory.setGatewayEventSubstitutionFilter(attrs.getGatewayEventSubstitutionFilter());
+      //Type cast to GatewaySenderFactory implementation impl to add the async event listener 
+      //and set the isForInternalUse to true. These methods are not exposed on GatewaySenderFactory
+      //GatewaySenderFactory factoryImpl = (GatewaySenderFactoryImpl) senderFactory;
+      //senderFactory.setForInternalUse(true);
+      //senderFactory.addAsyncEventListener(listener);
+      //senderFactory.setBucketSorted(attrs.isBucketSorted());
+      // add member id to differentiate between this region and the redundant bucket 
+      // region created for this queue. 
+      //GatewaySender sender = 
+        //  senderFactory.create(
+          //  AsyncEventQueueImpl.getSenderIdFromAsyncEventQueueId(asyncQueueId));
+      addAsyncEventListener(listener);
+      GatewaySender sender = create(AsyncEventQueueImpl.getSenderIdFromAsyncEventQueueId(asyncQueueId));
+      asyncEventQueue = new AsyncEventQueueImpl(sender, listener);
+      ((GemFireCacheImpl) cache).addAsyncEventQueue(asyncEventQueue);
+    } else if (this.cache instanceof CacheCreation) {
+      asyncEventQueue = new AsyncEventQueueCreation(asyncQueueId, attrs, listener);
+      ((CacheCreation) cache).addAsyncEventQueue(asyncEventQueue);
+    }
+    if (logger.isDebugEnabled()) {
+      logger.debug("Returning AsyncEventQueue" + asyncEventQueue);
+    }
+    return asyncEventQueue;
+  }
+  
+  private GatewaySender create(String id) {
+    this.attrs.id = id;
+    GatewaySender sender = null;
+
+    if(this.attrs.getDispatcherThreads() <= 0) {
+      throw new AsyncEventQueueConfigurationException(
+          LocalizedStrings.AsyncEventQueue_0_CANNOT_HAVE_DISPATCHER_THREADS_LESS_THAN_1
+              .toLocalizedString(id));
+    }
+    
+    if (this.attrs.isParallel()) {
+      if ((this.attrs.getOrderPolicy() != null)
+          && this.attrs.getOrderPolicy().equals(OrderPolicy.THREAD)) {
+        throw new AsyncEventQueueConfigurationException(
+            LocalizedStrings.AsyncEventQueue_0_CANNOT_BE_CREATED_WITH_ORDER_POLICY_1
+                .toLocalizedString(id, this.attrs.getOrderPolicy()));
+      }
+      
+      if (this.cache instanceof GemFireCacheImpl) {
+        sender = new ParallelAsyncEventQueueImpl(this.cache, this.attrs);
+        ((GemFireCacheImpl)this.cache).addGatewaySender(sender);
+        if (!this.attrs.isManualStart()) {
+          sender.start();
+        }
+      }
+      else if (this.cache instanceof CacheCreation) {
+        sender = new ParallelAsyncEventQueueCreation(this.cache, this.attrs);
+        ((CacheCreation)this.cache).addGatewaySender(sender);
+      }
+    }
+    else {
+//      if (this.attrs.getOrderPolicy() != null) {
+//        if (this.attrs.getDispatcherThreads() == GatewaySender.DEFAULT_DISPATCHER_THREADS) {
+//          throw new AsyncEventQueueConfigurationException(
+//              LocalizedStrings.AsyncEventQueue_INVALID_ORDER_POLICY_CONCURRENCY_0
+//                  .toLocalizedString(id));
+//        }
+//      }
+      if (this.attrs.getOrderPolicy() == null && this.attrs.getDispatcherThreads() > 1) {
+         this.attrs.policy = GatewaySender.DEFAULT_ORDER_POLICY;
+      }
+      if (this.cache instanceof GemFireCacheImpl) {
+        sender = new SerialAsyncEventQueueImpl(this.cache, this.attrs);
+        ((GemFireCacheImpl)this.cache).addGatewaySender(sender);
+        if (!this.attrs.isManualStart()) {
+          sender.start();
+        }
+      }
+      else if (this.cache instanceof CacheCreation) {
+        sender = new SerialAsyncEventQueueCreation(this.cache, this.attrs);
+        ((CacheCreation)this.cache).addGatewaySender(sender);
+      }
+    }
+    return sender;
+  }
+  
+  public void configureAsyncEventQueue(AsyncEventQueue asyncQueueCreation) {
+    this.attrs.batchSize = asyncQueueCreation.getBatchSize();
+    this.attrs.batchTimeInterval = asyncQueueCreation.getBatchTimeInterval();
+    this.attrs.isBatchConflationEnabled = asyncQueueCreation.isBatchConflationEnabled();
+    this.attrs.isPersistenceEnabled = asyncQueueCreation.isPersistent();
+    this.attrs.diskStoreName = asyncQueueCreation.getDiskStoreName();
+    this.attrs.isDiskSynchronous = asyncQueueCreation.isDiskSynchronous();
+    this.attrs.maximumQueueMemory = asyncQueueCreation.getMaximumQueueMemory();
+    this.attrs.isParallel = asyncQueueCreation.isParallel();
+    this.attrs.isBucketSorted = ((AsyncEventQueueCreation)asyncQueueCreation).isBucketSorted();
+    this.attrs.dispatcherThreads = asyncQueueCreation.getDispatcherThreads();
+    this.attrs.policy = asyncQueueCreation.getOrderPolicy();
+    this.attrs.eventFilters = asyncQueueCreation.getGatewayEventFilters();
+    this.attrs.eventSubstitutionFilter = asyncQueueCreation.getGatewayEventSubstitutionFilter();
+  }
+
+  public AsyncEventQueueFactory setParallel(boolean isParallel) {
+    this.attrs.isParallel = isParallel;
+    return this;
+  }
+  public AsyncEventQueueFactory setBucketSorted(boolean isbucketSorted) {
+    this.attrs.isBucketSorted = isbucketSorted;
+    return this;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueImpl.java
new file mode 100644
index 0000000..5e621f1
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueImpl.java
@@ -0,0 +1,188 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.asyncqueue.internal;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue;
+import com.gemstone.gemfire.cache.wan.GatewayEventFilter;
+import com.gemstone.gemfire.cache.wan.GatewayEventSubstitutionFilter;
+import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy;
+import com.gemstone.gemfire.internal.cache.RegionQueue;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor;
+import com.gemstone.gemfire.internal.cache.wan.serial.ConcurrentSerialGatewaySenderEventProcessor;
+
+public class AsyncEventQueueImpl implements AsyncEventQueue {
+
+  private GatewaySender sender = null;
+  
+  private AsyncEventListener asyncEventListener = null;
+  
+  public static final String ASYNC_EVENT_QUEUE_PREFIX = "AsyncEventQueue_";
+  
+  public AsyncEventQueueImpl(GatewaySender sender, AsyncEventListener eventListener) {
+    this.sender = sender;
+    this.asyncEventListener = eventListener;
+  }
+ 
+  @Override
+  public String getId() {
+    return getAsyncEventQueueIdFromSenderId(this.sender.getId());
+  }
+  
+  @Override
+  public AsyncEventListener getAsyncEventListener() {
+    return asyncEventListener;
+  }
+  
+  @Override
+  public List<GatewayEventFilter> getGatewayEventFilters() {
+    return sender.getGatewayEventFilters();
+  }
+
+  @Override
+  public GatewayEventSubstitutionFilter getGatewayEventSubstitutionFilter() {
+    return sender.getGatewayEventSubstitutionFilter();
+  }
+
+  @Override
+  public int getBatchSize() {
+    return sender.getBatchSize();
+  }
+
+  @Override
+  public String getDiskStoreName() {
+    return sender.getDiskStoreName();
+  }
+  
+  @Override
+  public int getBatchTimeInterval() {
+    return sender.getBatchTimeInterval();
+  }
+  
+  @Override
+  public boolean isBatchConflationEnabled() {
+    return sender.isBatchConflationEnabled();
+  }
+
+  @Override
+  public int getMaximumQueueMemory() {
+    return sender.getMaximumQueueMemory();
+  }
+
+  @Override
+  public boolean isPersistent() {
+    return sender.isPersistenceEnabled();
+  }
+
+  @Override
+  public boolean isDiskSynchronous() {
+    return sender.isDiskSynchronous();
+  }
+  
+  @Override
+  public int getDispatcherThreads() {
+    return sender.getDispatcherThreads();
+  }
+  
+  @Override
+  public OrderPolicy getOrderPolicy() {
+    return sender.getOrderPolicy();
+  }
+  
+  @Override
+  public boolean isPrimary() {
+    return ((AbstractGatewaySender) sender).isPrimary();
+  }
+  
+  @Override
+  public int size() {
+	AbstractGatewaySenderEventProcessor eventProcessor = 
+	  ((AbstractGatewaySender) sender).getEventProcessor();
+	    
+	int size = 0;
+	if (eventProcessor instanceof ConcurrentSerialGatewaySenderEventProcessor) {
+	  Set<RegionQueue> queues = 
+		((ConcurrentSerialGatewaySenderEventProcessor) eventProcessor).getQueues();
+	  Iterator<RegionQueue> itr = queues.iterator();
+	  while (itr.hasNext()) {
+		size = size + itr.next().size();
+	  }
+	} else {
+	  size = eventProcessor.getQueue().size();
+	}
+	return size;
+  }
+  
+  public GatewaySender getSender() {
+    return this.sender;
+  }
+  
+  public AsyncEventQueueStats getStatistics() {
+     AbstractGatewaySender abstractSender =  (AbstractGatewaySender) this.sender;
+     return ((AsyncEventQueueStats) abstractSender.getStatistics());
+  }
+  
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null) {
+      return false;
+    }
+    if (this == obj) {
+      return true;
+    }
+    if (!(obj instanceof AsyncEventQueue)) {
+      return false;
+    }
+    AsyncEventQueueImpl asyncEventQueue = (AsyncEventQueueImpl) obj;
+    if (asyncEventQueue.getId().equals(this.getId())) {
+      return true;
+    }
+    return false;
+  }
+  
+  @Override
+  public int hashCode() {
+    return getId().hashCode();
+  }
+
+  public static String getSenderIdFromAsyncEventQueueId(String asyncQueueId) {
+    StringBuilder builder = new StringBuilder();
+    builder.append(ASYNC_EVENT_QUEUE_PREFIX);
+    builder.append(asyncQueueId);
+    return builder.toString();
+  }
+
+  public static String getAsyncEventQueueIdFromSenderId(String senderId) {
+    if (!senderId.startsWith(ASYNC_EVENT_QUEUE_PREFIX)) {
+      return senderId;
+    }
+    else {
+      return senderId.substring(ASYNC_EVENT_QUEUE_PREFIX.length());
+    }
+  }
+
+  public static boolean isAsyncEventQueue(String senderId) {
+    return senderId.startsWith(ASYNC_EVENT_QUEUE_PREFIX);
+  }
+
+  public boolean isParallel() {
+    return sender.isParallel();
+  }
+
+   public boolean isBucketSorted() {
+    // TODO Auto-generated method stub
+    return false;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueStats.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueStats.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueStats.java
new file mode 100644
index 0000000..3f0c7a0
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueStats.java
@@ -0,0 +1,177 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.asyncqueue.internal;
+
+import com.gemstone.gemfire.StatisticDescriptor;
+import com.gemstone.gemfire.StatisticsFactory;
+import com.gemstone.gemfire.StatisticsType;
+import com.gemstone.gemfire.StatisticsTypeFactory;
+import com.gemstone.gemfire.internal.StatisticsTypeFactoryImpl;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderStats;
+
+public class AsyncEventQueueStats extends GatewaySenderStats {
+
+  public static final String typeName = "AsyncEventQueueStatistics";
+  
+  /** The <code>StatisticsType</code> of the statistics */
+  private static final StatisticsType type;
+  
+  
+  static {
+
+  StatisticsTypeFactory f = StatisticsTypeFactoryImpl.singleton();
+
+  type = f.createType(typeName, "Stats for activity in the AsyncEventQueue",
+     new StatisticDescriptor[] {
+      f.createIntCounter
+      (EVENTS_RECEIVED,
+       "Number of events received by this queue.",
+       "operations"),
+      f.createIntCounter
+      (EVENTS_QUEUED,
+       "Number of events added to the event queue.",
+       "operations"),
+      f.createLongCounter
+       (EVENT_QUEUE_TIME,
+        "Total time spent queueing events.",
+        "nanoseconds"),
+      f.createIntGauge
+       (EVENT_QUEUE_SIZE,
+        "Size of the event queue.",
+        "operations", false),
+      f.createIntGauge
+        (TMP_EVENT_QUEUE_SIZE,
+         "Size of the temporary events queue.",
+         "operations", false),
+      f.createIntCounter
+       (EVENTS_NOT_QUEUED_CONFLATED,
+        "Number of events received but not added to the event queue because the queue already contains an event with the event's key.",
+        "operations"),
+      f.createIntCounter
+        (EVENTS_CONFLATED_FROM_BATCHES,
+         "Number of events conflated from batches.",
+         "operations"),
+      f.createIntCounter
+       (EVENTS_DISTRIBUTED,
+        "Number of events removed from the event queue and sent.",
+        "operations"),
+      f.createIntCounter
+       (EVENTS_EXCEEDING_ALERT_THRESHOLD,
+        "Number of events exceeding the alert threshold.",
+        "operations", false),
+      f.createLongCounter
+       (BATCH_DISTRIBUTION_TIME,
+        "Total time spent distributing batches of events to receivers.",
+        "nanoseconds"),
+      f.createIntCounter
+       (BATCHES_DISTRIBUTED,
+        "Number of batches of events removed from the event queue and sent.",
+        "operations"),
+      f.createIntCounter
+       (BATCHES_REDISTRIBUTED,
+        "Number of batches of events removed from the event queue and resent.",
+        "operations", false),
+      f.createIntCounter
+       (UNPROCESSED_TOKENS_ADDED_BY_PRIMARY,
+        "Number of tokens added to the secondary's unprocessed token map by the primary (though a listener).",
+        "tokens"),
+      f.createIntCounter
+       (UNPROCESSED_EVENTS_ADDED_BY_SECONDARY,
+        "Number of events added to the secondary's unprocessed event map by the secondary.",
+        "events"),
+      f.createIntCounter
+       (UNPROCESSED_EVENTS_REMOVED_BY_PRIMARY,
+        "Number of events removed from the secondary's unprocessed event map by the primary (though a listener).",
+        "events"),
+      f.createIntCounter
+       (UNPROCESSED_TOKENS_REMOVED_BY_SECONDARY,
+        "Number of tokens removed from the secondary's unprocessed token map by the secondary.",
+        "tokens"),
+      f.createIntCounter
+       (UNPROCESSED_EVENTS_REMOVED_BY_TIMEOUT,
+        "Number of events removed from the secondary's unprocessed event map by a timeout.",
+        "events"),
+      f.createIntCounter
+       (UNPROCESSED_TOKENS_REMOVED_BY_TIMEOUT,
+        "Number of tokens removed from the secondary's unprocessed token map by a timeout.",
+        "tokens"),
+      f.createIntGauge
+       (UNPROCESSED_EVENT_MAP_SIZE,
+        "Current number of entries in the secondary's unprocessed event map.",
+        "events", false),
+      f.createIntGauge
+       (UNPROCESSED_TOKEN_MAP_SIZE,
+        "Current number of entries in the secondary's unprocessed token map.",
+        "tokens", false),
+      f.createIntGauge
+       (CONFLATION_INDEXES_MAP_SIZE,
+        "Current number of entries in the conflation indexes map.",
+        "events"),
+      f.createIntCounter
+        (NOT_QUEUED_EVENTS,
+         "Number of events not added to queue.",
+         "events"),
+      f.createIntCounter
+        (EVENTS_FILTERED,
+         "Number of events filtered through GatewayEventFilter.",
+         "events"),
+      f.createIntCounter
+        (LOAD_BALANCES_COMPLETED,
+         "Number of load balances completed",
+         "operations"),
+      f.createIntGauge
+        (LOAD_BALANCES_IN_PROGRESS,
+         "Number of load balances in progress",
+         "operations"),
+      f.createLongCounter
+        (LOAD_BALANCE_TIME,
+         "Total time spent load balancing this sender",
+         "nanoseconds"),
+  });
+
+  // Initialize id fields
+  eventsReceivedId = type.nameToId(EVENTS_RECEIVED);
+  eventsQueuedId = type.nameToId(EVENTS_QUEUED);
+  eventsNotQueuedConflatedId = type.nameToId(EVENTS_NOT_QUEUED_CONFLATED);
+  eventQueueTimeId = type.nameToId(EVENT_QUEUE_TIME);
+  eventQueueSizeId = type.nameToId(EVENT_QUEUE_SIZE);
+  eventTmpQueueSizeId = type.nameToId(TMP_EVENT_QUEUE_SIZE);
+  eventsDistributedId = type.nameToId(EVENTS_DISTRIBUTED);
+  eventsExceedingAlertThresholdId = type.nameToId(EVENTS_EXCEEDING_ALERT_THRESHOLD);
+  batchDistributionTimeId = type.nameToId(BATCH_DISTRIBUTION_TIME);
+  batchesDistributedId = type.nameToId(BATCHES_DISTRIBUTED);
+  batchesRedistributedId = type.nameToId(BATCHES_REDISTRIBUTED);
+  unprocessedTokensAddedByPrimaryId = type.nameToId(UNPROCESSED_TOKENS_ADDED_BY_PRIMARY);
+  unprocessedEventsAddedBySecondaryId = type.nameToId(UNPROCESSED_EVENTS_ADDED_BY_SECONDARY);
+  unprocessedEventsRemovedByPrimaryId = type.nameToId(UNPROCESSED_EVENTS_REMOVED_BY_PRIMARY);
+  unprocessedTokensRemovedBySecondaryId = type.nameToId(UNPROCESSED_TOKENS_REMOVED_BY_SECONDARY);
+  unprocessedEventsRemovedByTimeoutId = type.nameToId(UNPROCESSED_EVENTS_REMOVED_BY_TIMEOUT);
+  unprocessedTokensRemovedByTimeoutId = type.nameToId(UNPROCESSED_TOKENS_REMOVED_BY_TIMEOUT);
+  unprocessedEventMapSizeId = type.nameToId(UNPROCESSED_EVENT_MAP_SIZE);
+  unprocessedTokenMapSizeId = type.nameToId(UNPROCESSED_TOKEN_MAP_SIZE); 
+  conflationIndexesMapSizeId = type.nameToId(CONFLATION_INDEXES_MAP_SIZE); 
+  notQueuedEventsId = type.nameToId(NOT_QUEUED_EVENTS);
+  eventsFilteredId = type.nameToId(EVENTS_FILTERED);
+  eventsConflatedFromBatchesId = type.nameToId(EVENTS_CONFLATED_FROM_BATCHES);
+  loadBalancesCompletedId = type.nameToId(LOAD_BALANCES_COMPLETED);
+  loadBalancesInProgressId = type.nameToId(LOAD_BALANCES_IN_PROGRESS);
+  loadBalanceTimeId = type.nameToId(LOAD_BALANCE_TIME);
+  }
+  
+  /**
+   * Constructor.
+   *
+   * @param f The <code>StatisticsFactory</code> which creates the
+   * <code>Statistics</code> instance
+   * @param asyncQueueId The id of the <code>AsyncEventQueue</code> used to
+   * generate the name of the <code>Statistics</code>
+   */
+  public AsyncEventQueueStats(StatisticsFactory f, String asyncQueueId) {
+    super(f, asyncQueueId, type);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java
new file mode 100644
index 0000000..74a8ef5
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java
@@ -0,0 +1,244 @@
+package com.gemstone.gemfire.cache.asyncqueue.internal;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.EntryOperation;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
+import com.gemstone.gemfire.cache.wan.GatewayTransportFilter;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.ResourceEvent;
+import com.gemstone.gemfire.distributed.internal.DistributionAdvisor.Profile;
+import com.gemstone.gemfire.internal.cache.DistributedRegion;
+import com.gemstone.gemfire.internal.cache.EntryEventImpl;
+import com.gemstone.gemfire.internal.cache.EventID;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.PartitionedRegionHelper;
+import com.gemstone.gemfire.internal.cache.UpdateAttributesProcessor;
+import com.gemstone.gemfire.internal.cache.ha.ThreadIdentifier;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAttributes;
+import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderEventProcessor;
+import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAdvisor.GatewaySenderProfile;
+import com.gemstone.gemfire.internal.cache.xmlcache.CacheCreation;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+
+public class ParallelAsyncEventQueueImpl extends AbstractGatewaySender {
+  
+  private static final Logger logger = LogService.getLogger();
+  
+  final ThreadGroup loggerGroup = LoggingThreadGroup.createThreadGroup(
+      "Remote Site Discovery Logger Group", logger);
+  
+  public ParallelAsyncEventQueueImpl(){
+    super();
+    this.isParallel = true;
+  }
+  
+  public ParallelAsyncEventQueueImpl(Cache cache, GatewaySenderAttributes attrs) {
+    super(cache, attrs);
+    if (!(this.cache instanceof CacheCreation)) {
+      // this sender lies underneath the AsyncEventQueue. Need to have
+      // AsyncEventQueueStats
+        this.statistics = new AsyncEventQueueStats(
+            cache.getDistributedSystem(), AsyncEventQueueImpl
+                .getAsyncEventQueueIdFromSenderId(id));
+      }  
+    this.isForInternalUse = true;
+  }
+  
+  @Override
+  public void start() {
+    this.lifeCycleLock.writeLock().lock(); 
+    try {
+      if (isRunning()) {
+        logger.warn(LocalizedMessage.create(LocalizedStrings.GatewaySender_SENDER_0_IS_ALREADY_RUNNING, this.getId()));
+        return;
+      }
+
+      if (this.remoteDSId != DEFAULT_DISTRIBUTED_SYSTEM_ID) {
+        String locators = ((GemFireCacheImpl)this.cache).getDistributedSystem()
+            .getConfig().getLocators();
+        if (locators.length() == 0) {
+          throw new IllegalStateException(
+              LocalizedStrings.AbstractGatewaySender_LOCATOR_SHOULD_BE_CONFIGURED_BEFORE_STARTING_GATEWAY_SENDER
+                  .toLocalizedString());
+        }
+      }
+      /*
+       * Now onwards all processing will happen through "ConcurrentParallelGatewaySenderEventProcessor"
+       * we have made "ParallelGatewaySenderEventProcessor" and "ParallelGatewaySenderQueue" as a
+       * utility classes of Concurrent version of processor and queue.
+       */
+      eventProcessor = new ConcurrentParallelGatewaySenderEventProcessor(this);
+      /*if (getDispatcherThreads() > 1) {
+        eventProcessor = new ConcurrentParallelGatewaySenderEventProcessor(this);
+      } else {
+        eventProcessor = new ParallelGatewaySenderEventProcessor(this);
+      }*/
+      
+      eventProcessor.start();
+      waitForRunningStatus();
+      //Only notify the type registry if this is a WAN gateway queue
+      if(!isAsyncEventQueue()) {
+        ((GemFireCacheImpl) getCache()).getPdxRegistry().gatewaySenderStarted(this);
+      }
+      new UpdateAttributesProcessor(this).distribute(false);
+     
+      InternalDistributedSystem system = (InternalDistributedSystem) this.cache
+          .getDistributedSystem();
+      system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_START, this);
+      
+      logger.info(LocalizedMessage.create(LocalizedStrings.ParallelGatewaySenderImpl_STARTED__0, this));
+      
+      if (!tmpQueuedEvents.isEmpty()) {
+        enqueTempEvents();
+      }
+    }
+    finally {
+      this.lifeCycleLock.writeLock().unlock();
+    }
+  }
+  
+//  /**
+//   * The sender is not started but only the message queue i.e. shadowPR is created on the node.
+//   * @param targetPr
+//   */
+//  private void createMessageQueueOnAccessorNode(PartitionedRegion targetPr) {
+//    eventProcessor = new ParallelGatewaySenderEventProcessor(this, targetPr);
+//  }
+  
+
+  @Override
+  public void stop() {
+    this.lifeCycleLock.writeLock().lock(); 
+    try {
+      if (!this.isRunning()) {
+        return;
+      }
+      // Stop the dispatcher
+      AbstractGatewaySenderEventProcessor ev = this.eventProcessor;
+      if (ev != null && !ev.isStopped()) {
+        ev.stopProcessing();
+      }
+
+      // Stop the proxy (after the dispatcher, so the socket is still
+      // alive until after the dispatcher has stopped)
+      stompProxyDead();
+
+      // Close the listeners
+      for (AsyncEventListener listener : this.listeners) {
+        listener.close();
+      }
+      //stop the running threads, open sockets if any
+      ((ConcurrentParallelGatewaySenderQueue)this.eventProcessor.getQueue()).cleanUp();
+
+      logger.info(LocalizedMessage.create(LocalizedStrings.GatewayImpl_STOPPED__0, this));
+      
+      InternalDistributedSystem system = (InternalDistributedSystem) this.cache
+      .getDistributedSystem();
+      system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_STOP, this);
+      
+      clearTempEventsAfterSenderStopped();
+    }
+    finally {
+      this.lifeCycleLock.writeLock().unlock();
+    }
+  }
+  
+  @Override
+  public String toString() {
+    StringBuffer sb = new StringBuffer();
+    sb.append("ParallelGatewaySender{");
+    sb.append("id=" + getId());
+    sb.append(",remoteDsId="+ getRemoteDSId());
+    sb.append(",isRunning ="+ isRunning());
+    sb.append("}");
+    return sb.toString();
+  }
+
+  public void fillInProfile(Profile profile) {
+    assert profile instanceof GatewaySenderProfile;
+    GatewaySenderProfile pf = (GatewaySenderProfile)profile;
+    pf.Id = getId();
+    pf.remoteDSId = getRemoteDSId();
+    pf.isRunning = isRunning();
+    pf.isPrimary = isPrimary();
+    pf.isParallel = true;
+    pf.isBatchConflationEnabled = isBatchConflationEnabled();
+    pf.isPersistenceEnabled = isPersistenceEnabled();
+    pf.alertThreshold = getAlertThreshold();
+    pf.manualStart = isManualStart();
+    pf.dispatcherThreads = getDispatcherThreads();
+    pf.orderPolicy = getOrderPolicy();
+    for (com.gemstone.gemfire.cache.wan.GatewayEventFilter filter : getGatewayEventFilters()) {
+      pf.eventFiltersClassNames.add(filter.getClass().getName());
+    }
+    for (GatewayTransportFilter filter : getGatewayTransportFilters()) {
+      pf.transFiltersClassNames.add(filter.getClass().getName());
+    }
+    for (AsyncEventListener listener : getAsyncEventListeners()) {
+      pf.senderEventListenerClassNames.add(listener.getClass().getName());
+    }
+    pf.isDiskSynchronous = isDiskSynchronous();
+  }
+
+  /* (non-Javadoc)
+   * @see com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender#setModifiedEventId(com.gemstone.gemfire.internal.cache.EntryEventImpl)
+   */
+  @Override
+  protected void setModifiedEventId(EntryEventImpl clonedEvent) {
+    int bucketId = -1;
+    //merged from 42004
+    if (clonedEvent.getRegion() instanceof DistributedRegion) {
+//      if (getOrderPolicy() == OrderPolicy.THREAD) {
+//        bucketId = PartitionedRegionHelper.getHashKey(
+//            ((EntryEventImpl)clonedEvent).getEventId().getThreadID(),
+//            getMaxParallelismForReplicatedRegion());
+//      }
+//      else
+        bucketId = PartitionedRegionHelper.getHashKey(clonedEvent.getKey(),
+            getMaxParallelismForReplicatedRegion());
+    }
+    else {
+      bucketId = PartitionedRegionHelper
+          .getHashKey((EntryOperation)clonedEvent);
+    }
+    EventID originalEventId = clonedEvent.getEventId();
+    long originatingThreadId = ThreadIdentifier.getRealThreadID(originalEventId.getThreadID());
+
+    long newThreadId = ThreadIdentifier
+    .createFakeThreadIDForParallelGSPrimaryBucket(bucketId,
+        originatingThreadId, getEventIdIndex());
+    
+    // In case of parallel as all events go through primary buckets
+    // we don't neet to generate different threadId for secondary buckets
+    // as they will be rejected if seen at PR level itself
+    
+//    boolean isPrimary = ((PartitionedRegion)getQueue().getRegion())
+//    .getRegionAdvisor().getBucketAdvisor(bucketId).isPrimary();
+//    if (isPrimary) {
+//      newThreadId = ThreadIdentifier
+//          .createFakeThreadIDForParallelGSPrimaryBucket(bucketId,
+//              originatingThreadId);
+//    } else {
+//      newThreadId = ThreadIdentifier
+//          .createFakeThreadIDForParallelGSSecondaryBucket(bucketId,
+//              originatingThreadId);
+//    }
+
+    EventID newEventId = new EventID(originalEventId.getMembershipID(),
+        newThreadId, originalEventId.getSequenceID(), bucketId);
+    if (logger.isDebugEnabled()) {
+      logger.debug("{}: Generated event id for event with key={}, bucketId={}, original event id={}, threadId={}, new event id={}, newThreadId={}",
+          this, clonedEvent.getKey(), bucketId, originalEventId, originatingThreadId, newEventId, newThreadId);
+    }
+    clonedEvent.setEventId(newEventId);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
new file mode 100644
index 0000000..90b41ac
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
@@ -0,0 +1,241 @@
+package com.gemstone.gemfire.cache.asyncqueue.internal;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
+import com.gemstone.gemfire.cache.wan.GatewayTransportFilter;
+import com.gemstone.gemfire.distributed.DistributedLockService;
+import com.gemstone.gemfire.distributed.internal.DistributionAdvisor.Profile;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.ResourceEvent;
+import com.gemstone.gemfire.internal.cache.EntryEventImpl;
+import com.gemstone.gemfire.internal.cache.EventID;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.RegionQueue;
+import com.gemstone.gemfire.internal.cache.UpdateAttributesProcessor;
+import com.gemstone.gemfire.internal.cache.ha.ThreadIdentifier;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAdvisor.GatewaySenderProfile;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAttributes;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderConfigurationException;
+import com.gemstone.gemfire.internal.cache.wan.serial.ConcurrentSerialGatewaySenderEventProcessor;
+import com.gemstone.gemfire.internal.cache.wan.serial.SerialGatewaySenderEventProcessor;
+import com.gemstone.gemfire.internal.cache.wan.serial.SerialGatewaySenderQueue;
+import com.gemstone.gemfire.internal.cache.xmlcache.CacheCreation;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+
+public class SerialAsyncEventQueueImpl extends AbstractGatewaySender {
+
+  private static final Logger logger = LogService.getLogger();
+  
+  final ThreadGroup loggerGroup = LoggingThreadGroup.createThreadGroup(
+      "Remote Site Discovery Logger Group", logger);
+
+  public SerialAsyncEventQueueImpl(){
+    super();
+    this.isParallel = false;
+  }
+  public SerialAsyncEventQueueImpl(Cache cache,
+      GatewaySenderAttributes attrs) {
+    super(cache, attrs);
+    if (!(this.cache instanceof CacheCreation)) {
+      // this sender lies underneath the AsyncEventQueue. Need to have
+      // AsyncEventQueueStats
+        this.statistics = new AsyncEventQueueStats(
+            cache.getDistributedSystem(), AsyncEventQueueImpl
+                .getAsyncEventQueueIdFromSenderId(id));
+      }
+  }
+  
+  @Override
+  public void start() {
+    if (logger.isDebugEnabled()) {
+      logger.debug("Starting gatewaySender : {}", this);
+    }
+    
+    this.lifeCycleLock.writeLock().lock();
+    try {
+      if (isRunning()) {
+        logger.warn(LocalizedMessage.create(LocalizedStrings.GatewaySender_SENDER_0_IS_ALREADY_RUNNING, this.getId()));
+        return;
+      }
+      if (this.remoteDSId != DEFAULT_DISTRIBUTED_SYSTEM_ID) {
+        String locators = ((GemFireCacheImpl)this.cache).getDistributedSystem()
+            .getConfig().getLocators();
+        if (locators.length() == 0) {
+          throw new GatewaySenderConfigurationException(
+              LocalizedStrings.AbstractGatewaySender_LOCATOR_SHOULD_BE_CONFIGURED_BEFORE_STARTING_GATEWAY_SENDER
+                  .toLocalizedString());
+        }
+      }
+      getSenderAdvisor().initDLockService();
+      if (!isPrimary()) {
+        if (getSenderAdvisor().volunteerForPrimary()) {
+          getSenderAdvisor().makePrimary();
+        } else {
+          getSenderAdvisor().makeSecondary();
+        }
+      }
+      if (getDispatcherThreads() > 1) {
+        eventProcessor = new ConcurrentSerialGatewaySenderEventProcessor(
+            SerialAsyncEventQueueImpl.this);
+      } else {
+        eventProcessor = new SerialGatewaySenderEventProcessor(
+            SerialAsyncEventQueueImpl.this, getId());
+      }
+      eventProcessor.start();
+      waitForRunningStatus();
+      this.startTime = System.currentTimeMillis();
+      
+      //Only notify the type registry if this is a WAN gateway queue
+      if(!isAsyncEventQueue()) {
+        ((GemFireCacheImpl) getCache()).getPdxRegistry().gatewaySenderStarted(this);
+      }
+      new UpdateAttributesProcessor(this).distribute(false);
+
+      
+      InternalDistributedSystem system = (InternalDistributedSystem) this.cache
+          .getDistributedSystem();
+      system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_START, this);
+      
+      logger.info(LocalizedMessage.create(LocalizedStrings.SerialGatewaySenderImpl_STARTED__0, this));
+  
+      enqueTempEvents();
+    } finally {
+      this.lifeCycleLock.writeLock().unlock();
+    }
+  }
+  
+  @Override
+  public void stop() {
+    if (logger.isDebugEnabled()) {
+      logger.debug("Stopping Gateway Sender : {}", this);
+    }
+    this.lifeCycleLock.writeLock().lock();
+    try {
+      // Stop the dispatcher
+      AbstractGatewaySenderEventProcessor ev = this.eventProcessor;
+      if (ev != null && !ev.isStopped()) {
+        ev.stopProcessing();
+      }
+
+      // Stop the proxy (after the dispatcher, so the socket is still
+      // alive until after the dispatcher has stopped)
+      stompProxyDead();
+
+      // Close the listeners
+      for (AsyncEventListener listener : this.listeners) {
+        listener.close();
+      }
+      logger.info(LocalizedMessage.create(LocalizedStrings.GatewayImpl_STOPPED__0, this));
+      
+      clearTempEventsAfterSenderStopped();
+    } finally {
+      this.lifeCycleLock.writeLock().unlock();
+    }
+    if (this.isPrimary()) {
+      try {
+        DistributedLockService
+            .destroy(getSenderAdvisor().getDLockServiceName());
+      } catch (IllegalArgumentException e) {
+        // service not found... ignore
+      }
+    }
+    if (getQueues() != null && !getQueues().isEmpty()) {
+      for (RegionQueue q : getQueues()) {
+        ((SerialGatewaySenderQueue)q).cleanUp();
+      }
+    }
+    this.setIsPrimary(false);
+    new UpdateAttributesProcessor(this).distribute(false);
+    Thread lockObtainingThread = getSenderAdvisor().getLockObtainingThread();
+    if (lockObtainingThread != null && lockObtainingThread.isAlive()) {
+      // wait a while for thread to terminate
+      try {
+        lockObtainingThread.join(3000);
+      } catch (InterruptedException ex) {
+        // we allowed our join to be canceled
+        // reset interrupt bit so this thread knows it has been interrupted
+        Thread.currentThread().interrupt();
+      }
+      if (lockObtainingThread.isAlive()) {
+        logger.info(LocalizedMessage.create(LocalizedStrings.GatewaySender_COULD_NOT_STOP_LOCK_OBTAINING_THREAD_DURING_GATEWAY_SENDER_STOP));
+      }
+    }
+    
+    InternalDistributedSystem system = (InternalDistributedSystem) this.cache
+        .getDistributedSystem();
+    system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_STOP, this);
+  }
+  
+  @Override
+  public String toString() {
+    StringBuffer sb = new StringBuffer();
+    sb.append("SerialGatewaySender{");
+    sb.append("id=" + getId());
+    sb.append(",remoteDsId="+ getRemoteDSId());
+    sb.append(",isRunning ="+ isRunning());
+    sb.append(",isPrimary ="+ isPrimary());
+    sb.append("}");
+    return sb.toString();
+  }
+ 
+  @Override
+  public void fillInProfile(Profile profile) {
+    assert profile instanceof GatewaySenderProfile;
+    GatewaySenderProfile pf = (GatewaySenderProfile)profile;
+    pf.Id = getId();
+    pf.startTime = getStartTime();
+    pf.remoteDSId = getRemoteDSId();
+    pf.isRunning = isRunning();
+    pf.isPrimary = isPrimary();
+    pf.isParallel = false;
+    pf.isBatchConflationEnabled = isBatchConflationEnabled();
+    pf.isPersistenceEnabled = isPersistenceEnabled();
+    pf.alertThreshold = getAlertThreshold();
+    pf.manualStart = isManualStart();
+    for (com.gemstone.gemfire.cache.wan.GatewayEventFilter filter : getGatewayEventFilters()) {
+      pf.eventFiltersClassNames.add(filter.getClass().getName());
+    }
+    for (GatewayTransportFilter filter : getGatewayTransportFilters()) {
+      pf.transFiltersClassNames.add(filter.getClass().getName());
+    }
+    for (AsyncEventListener listener : getAsyncEventListeners()) {
+      pf.senderEventListenerClassNames.add(listener.getClass().getName());
+    }
+    pf.isDiskSynchronous = isDiskSynchronous();
+    pf.dispatcherThreads = getDispatcherThreads();
+    pf.orderPolicy = getOrderPolicy();
+    pf.serverLocation = this.getServerLocation(); 
+  }
+
+  /* (non-Javadoc)
+   * @see com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender#setModifiedEventId(com.gemstone.gemfire.internal.cache.EntryEventImpl)
+   */
+  @Override
+  protected void setModifiedEventId(EntryEventImpl clonedEvent) {
+    EventID originalEventId = clonedEvent.getEventId();
+    long originalThreadId = originalEventId.getThreadID();
+    long newThreadId = originalThreadId;
+    if (ThreadIdentifier.isWanTypeThreadID(newThreadId)) {
+      // This thread id has already been converted. Do nothing.
+    } else {
+      newThreadId = ThreadIdentifier
+        .createFakeThreadIDForParallelGSPrimaryBucket(0, originalThreadId,
+            getEventIdIndex());
+    }
+    EventID newEventId = new EventID(originalEventId.getMembershipID(),
+        newThreadId, originalEventId.getSequenceID());
+    if (logger.isDebugEnabled()) {
+      logger.debug("{}: Generated event id for event with key={}, original event id={}, originalThreadId={}, new event id={}, newThreadId={}",
+          this, clonedEvent.getKey(), originalEventId, originalThreadId, newEventId, newThreadId);
+    }
+    clonedEvent.setEventId(newEventId);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/AllConnectionsInUseException.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/AllConnectionsInUseException.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/AllConnectionsInUseException.java
new file mode 100644
index 0000000..95c86f0
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/AllConnectionsInUseException.java
@@ -0,0 +1,52 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client;
+
+
+/**
+ * Indicates that the connection pool is at its maximum size and
+ * all connections are in use.
+ * @author dsmith
+ * @since 5.7
+ */
+public class AllConnectionsInUseException extends ServerConnectivityException {
+
+  private static final long serialVersionUID = 7304243507881787071L;
+
+  /**
+   * Create a new instance of AllConnectionsInUseException without a detail message or cause.
+   */
+  public AllConnectionsInUseException() {
+  }
+
+  /**
+   * Create a new instance of AllConnectionsInUseException with a detail message
+   * @param message the detail message
+   */
+  public AllConnectionsInUseException(String message) {
+    super(message);
+  }
+  
+  /**
+   * Create a new instance of AllConnectionsInUseException with a cause
+   * @param cause the cause
+   */
+  public AllConnectionsInUseException(Throwable cause) {
+    super(cause);
+  }
+  
+  /**
+   * Create a new instance of AllConnectionsInUseException with a detail message and cause
+   * @param message the detail message
+   * @param cause the cause
+   */
+  public AllConnectionsInUseException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/ClientCache.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/ClientCache.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/ClientCache.java
new file mode 100644
index 0000000..7582da9
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/ClientCache.java
@@ -0,0 +1,159 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *========================================================================
+ */
+
+package com.gemstone.gemfire.cache.client;
+
+import java.net.InetSocketAddress;
+import java.util.Properties;
+import java.util.Set;
+
+import com.gemstone.gemfire.cache.*;
+import com.gemstone.gemfire.cache.query.QueryService;
+
+/**
+ * A ClientCache instance controls the life cycle of the local singleton cache in a client. 
+ * <p>A ClientCache is created using {@link ClientCacheFactory#create}.
+ * See {@link ClientCacheFactory} for common usage patterns for creating the client cache instance.
+ * <p>ClientCache provides
+ * access to functionality when a member connects as a client to GemFire servers.
+ * It provides the following services:
+ <ul>
+ * <li> Access to existing regions (see {@link #getRegion} and {@link #rootRegions}).
+ * <li> Creation of regions (see {@link #createClientRegionFactory(ClientRegionShortcut)} and {@link #createClientRegionFactory(String)}).
+ * <li> Access the query service (see {@link #getQueryService} and {@link #getLocalQueryService}).</li>
+ * <li> Access the GemFire logger (see {@link #getLogger}).</li>
+ * <li> Access the GemFire distributed system (see {@link #getDistributedSystem}).</li>
+ * <li> Access the GemFire resource manager (see {@link #getResourceManager}).</li>
+ * <li> Manages local disk stores for this cache instance (see {@link #createDiskStoreFactory}).</li>
+ * <li> Creation of authenticated cache views that support multiple users (see {@link #createAuthenticatedView}).
+ </ul>
+ * <p>A ClientCache connects to a server using a {@link Pool}. This pool can be
+ * configured in the ClientCacheFactory (by default GemFire tries to create a pool
+ * which tries to connect to a server on the localhost on port 40404). This default pool
+ * is used by {@link Region}s (created using {@link ClientRegionFactory}) to talk to
+ * regions on the server.
+ * <p>More pools can be created using {@link PoolManager} or by declaring them in cache.xml.
+ * @since 6.5
+ * @author darrel
+ */
+public interface ClientCache extends GemFireCache {
+  /**
+   * Return the QueryService for the named pool.
+   * The query operations performed
+   * using this QueryService will be executed on the servers that are associated
+   * with this pool.
+   */
+  public QueryService getQueryService(String poolName);
+  
+  /**
+   * Return a QueryService that queries the local state in the client cache.
+   * These queries will not be sent to a server.
+   */
+  public QueryService getLocalQueryService();
+  
+  /**
+   * Terminates this object cache and releases all the resources.
+   * Calls {@link Region#close} on each region in the cache.
+   * After this cache is closed, any further
+   * method call on this cache or any region object will throw
+   * {@link CacheClosedException}, unless otherwise noted.
+   * @param keepalive whether the server should keep the durable client's queues alive for the timeout period
+   * @throws CacheClosedException if the cache is already closed.
+   */
+  public void close(boolean keepalive);  
+
+  /**
+   * Create and return a client region factory that is initialized to create
+   * a region using the given predefined region attributes.
+   * @param shortcut the predefined region attributes to initialize the factory with.
+   * @return a factory that will produce a client region.
+   */
+  public <K,V> ClientRegionFactory<K,V> createClientRegionFactory(ClientRegionShortcut shortcut);
+
+  /**
+   * Create and return a client region factory that is initialized to create
+   * a region using the given named region attributes.
+   * <p>Named region attributes are defined in cache.xml by setting the name as
+   * the value of the <code>id</code> attribute on a <code>region-attributes</code> element.
+   * @param regionAttributesId the named region attributes to initialize the factory with.
+   * @throws IllegalStateException if named region attributes has not been defined.
+   * @return a factory that will produce a client region.
+   */
+  public <K,V> ClientRegionFactory<K,V> createClientRegionFactory(String regionAttributesId);
+
+  /**
+   * Notifies the server that this durable client is ready to receive updates.
+   * This method is used by durable clients to notify servers that they
+   * are ready to receive updates. As soon as the server receives this message, 
+   * it will forward updates to this client (if necessary).
+   * <p>
+   * Durable clients must call this method after they are done creating regions
+   * and issuing interest registration requests.If it is called before then events
+   * will be lost.Any time a new {@link Pool} is created and regions have been 
+   * added to it then this method needs to be called again.
+   * <p>
+   *
+   * @throws IllegalStateException if called by a non-durable client
+   */
+  public void readyForEvents();
+
+  /**
+   * Creates an authenticated cache view using the given user security properties
+   * on the client cache's default pool.
+   * Multiple views with different user properties can be created on a
+   * single client cache.
+   * 
+   * Requires that {@link ClientCacheFactory#setPoolMultiuserAuthentication(boolean) multiuser-authentication}
+   * to be set to true on the default pool.
+
+   * Applications must use this instance to do operations, when
+   * multiuser-authentication is set to true.
+   *
+   * <p>
+   * Authenticated cache views are only allows to access {@link ClientRegionShortcut#PROXY proxy} regions.
+   * The {@link RegionService#getRegion} method will throw IllegalStateException
+   * if an attempt is made to get a region that has local storage.
+   *
+   * @throws UnsupportedOperationException
+   *           when invoked with multiuser-authentication as false.
+   * @param userSecurityProperties
+   *          the security properties of a user.
+   * @return the {@link RegionService} instance associated with a user and the given
+   *         properties.
+   */
+  public RegionService createAuthenticatedView(Properties userSecurityProperties);
+  
+  /**
+   * Creates an authenticated cache view using the given user security properties
+   * using the given pool to connect to servers.
+   * Requires that {@link PoolFactory#setMultiuserAuthentication(boolean) multiuser-authentication} to be set to true
+   * on the given pool.
+   * <p>See {@link #createAuthenticatedView(Properties)} for more information
+   * on the returned cache view.
+   * @param userSecurityProperties the security properties of a user.
+   * @param poolName - the pool that the users should be authenticated against.
+   * @return the {@link RegionService} instance associated with a user and the given
+   *         properties.
+   */
+  public RegionService createAuthenticatedView(Properties userSecurityProperties, String poolName);
+  
+  /**
+   * Returns a set of the servers to which this client is currently connected.
+   * @since 6.6
+   */
+  public Set<InetSocketAddress> getCurrentServers();
+  
+  /**
+   * Returns the default server pool. If one or more non-default pools were
+   * configured, this may return null.
+   * @since 7.0
+   * @see com.gemstone.gemfire.cache.client.Pool
+   */
+  public Pool getDefaultPool();
+  
+}