You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by hi...@apache.org on 2016/09/13 22:44:48 UTC

[60/61] [abbrv] incubator-geode git commit: GEODE-37 change package name from com.gemstone.gemfire (for ./geode-wan/src/main/java/com/gemstone/gemfire)to org.apache.geode for(to ./geode-wan/src/main/java/org/apache/geode)

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewayReceiverFactoryImpl.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewayReceiverFactoryImpl.java b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewayReceiverFactoryImpl.java
deleted file mode 100644
index c5c2a2f..0000000
--- a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewayReceiverFactoryImpl.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.wan;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import com.gemstone.gemfire.cache.Cache;
-import com.gemstone.gemfire.cache.wan.GatewayReceiver;
-import com.gemstone.gemfire.cache.wan.GatewayReceiverFactory;
-import com.gemstone.gemfire.cache.wan.GatewayTransportFilter;
-import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
-import com.gemstone.gemfire.distributed.internal.ResourceEvent;
-import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
-import com.gemstone.gemfire.internal.cache.xmlcache.CacheCreation;
-import com.gemstone.gemfire.internal.cache.xmlcache.GatewayReceiverCreation;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-
-/**
- * 
- * @since GemFire 7.0
- */
-public class GatewayReceiverFactoryImpl implements GatewayReceiverFactory {
-
-  private int startPort = GatewayReceiver.DEFAULT_START_PORT;
-  
-  private int endPort = GatewayReceiver.DEFAULT_END_PORT;
-  
-  private int timeBetPings = GatewayReceiver.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS;
-
-  private int socketBuffSize = GatewayReceiver.DEFAULT_SOCKET_BUFFER_SIZE;
-
-  private String bindAdd= GatewayReceiver.DEFAULT_BIND_ADDRESS; 
-  
-  private String hostnameForSenders = GatewayReceiver.DEFAULT_HOSTNAME_FOR_SENDERS;  
-  
-  private boolean manualStart = GatewayReceiver.DEFAULT_MANUAL_START;
-
-  private List<GatewayTransportFilter> filters = new ArrayList<GatewayTransportFilter>();
-  
-  private Cache cache;
-
-  public GatewayReceiverFactoryImpl() {
-    
-  }
-  public GatewayReceiverFactoryImpl(Cache cache) {
-   this.cache = cache;
-  }
-  
-  public GatewayReceiverFactory addGatewayTransportFilter(
-      GatewayTransportFilter filter) {
-    this.filters.add(filter);
-    return this;
-  }
-
-  public GatewayReceiverFactory removeGatewayTransportFilter(
-      GatewayTransportFilter filter) {
-    this.filters.remove(filter);
-    return this;
-  }
-
-  public GatewayReceiverFactory setMaximumTimeBetweenPings(int time) {
-    this.timeBetPings = time;
-    return this;
-  }
-
-  public GatewayReceiverFactory setStartPort(int port) {
-    this.startPort = port;
-    return this;
-  }
-  
-  public GatewayReceiverFactory setEndPort(int port) {
-    this.endPort = port;
-    return this;
-  }
-  
-  public GatewayReceiverFactory setSocketBufferSize(int size) {
-    this.socketBuffSize = size;
-    return this;
-  }
-
-  public GatewayReceiverFactory setBindAddress(String address) {
-    this.bindAdd = address;
-    return this;
-  }
-  
-  public GatewayReceiverFactory setHostnameForSenders(String address) {
-    this.hostnameForSenders = address;
-    return this;
-  } 
-
-  public GatewayReceiverFactory setManualStart(boolean start) {
-    this.manualStart = start;
-    return this;
-  }
-  
-  public GatewayReceiver create() {
-    if (this.startPort > this.endPort) {
-      throw new IllegalStateException(
-          "Please specify either start port a value which is less than end port.");
-    }
-    GatewayReceiver recv = null;
-    if (this.cache instanceof GemFireCacheImpl) {
-      recv = new GatewayReceiverImpl(this.cache, this.startPort, this.endPort,
-          this.timeBetPings, this.socketBuffSize, this.bindAdd, this.filters,
-          this.hostnameForSenders, this.manualStart);
-      ((GemFireCacheImpl)cache).addGatewayReceiver(recv);
-      InternalDistributedSystem system = (InternalDistributedSystem) this.cache
-      .getDistributedSystem();
-      system.handleResourceEvent(ResourceEvent.GATEWAYRECEIVER_CREATE, recv);
-      if (!this.manualStart) {
-        try {
-          recv.start();
-        }
-        catch (IOException ioe) {
-          throw new GatewayReceiverException(
-              LocalizedStrings.GatewayReceiver_EXCEPTION_WHILE_STARTING_GATEWAY_RECEIVER
-                  .toLocalizedString(), ioe);
-        }
-      }
-    } else if (this.cache instanceof CacheCreation) {
-      recv = new GatewayReceiverCreation(this.cache, this.startPort, this.endPort,
-          this.timeBetPings, this.socketBuffSize, this.bindAdd, this.filters,
-          this.hostnameForSenders, this.manualStart);
-      ((CacheCreation)cache).addGatewayReceiver(recv);
-    }
-    return recv;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewayReceiverImpl.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewayReceiverImpl.java b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewayReceiverImpl.java
deleted file mode 100644
index b8768d4..0000000
--- a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewayReceiverImpl.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.wan;
-
-import java.io.IOException;
-import java.net.BindException;
-import java.net.SocketException;
-import java.net.UnknownHostException;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.cache.Cache;
-import com.gemstone.gemfire.cache.server.CacheServer;
-import com.gemstone.gemfire.cache.wan.GatewayReceiver;
-import com.gemstone.gemfire.cache.wan.GatewayTransportFilter;
-import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
-import com.gemstone.gemfire.distributed.internal.ResourceEvent;
-import com.gemstone.gemfire.internal.AvailablePort;
-import com.gemstone.gemfire.internal.net.SocketCreator;
-import com.gemstone.gemfire.internal.cache.CacheServerImpl;
-import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-import com.gemstone.gemfire.internal.logging.LogService;
-import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
-
-/**
- * @since GemFire 7.0
- */
-@SuppressWarnings("deprecation")
-public class GatewayReceiverImpl implements GatewayReceiver {
-
-  private static final Logger logger = LogService.getLogger();
-  
-  private String host;
-
-  private int startPort;
-  
-  private int endPort;
-  
-  private int port;
-
-  private int timeBetPings;
-
-  private int socketBufferSize;
-  
-  private boolean manualStart;
-
-  private final List<GatewayTransportFilter> filters;
-
-  private String bindAdd;
-  
-  private CacheServer receiver;
-
-  private final GemFireCacheImpl cache;
-  
-  public GatewayReceiverImpl(Cache cache, int startPort,
-      int endPort, int timeBetPings, int buffSize, String bindAdd,
-      List<GatewayTransportFilter> filters, String hostnameForSenders, boolean manualStart) {
-    this.cache = (GemFireCacheImpl)cache;
-    
-    /*
-     * If user has set hostNameForSenders then it should take precedence over
-     * bindAddress. If user hasn't set either hostNameForSenders or bindAddress
-     * then getLocalHost().getHostName() should be used.
-     */
-    if (hostnameForSenders == null || hostnameForSenders.isEmpty()) {
-      if (bindAdd == null || bindAdd.isEmpty()) {
-        try {
-          logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayReceiverImpl_USING_LOCAL_HOST));
-          this.host = SocketCreator.getLocalHost().getHostName();
-        } catch (UnknownHostException e) {
-          throw new IllegalStateException(
-              LocalizedStrings.GatewayReceiverImpl_COULD_NOT_GET_HOST_NAME
-                  .toLocalizedString(),
-              e);
-        }
-      } else {
-        this.host = bindAdd;
-      }
-    } else {
-      this.host = hostnameForSenders;
-    }
-
-    this.startPort = startPort;
-    this.endPort = endPort;
-    this.timeBetPings = timeBetPings;
-    this.socketBufferSize = buffSize;
-    this.bindAdd = bindAdd;
-    this.filters = filters;
-    this.manualStart = manualStart;
-  }
-
-  public List<GatewayTransportFilter> getGatewayTransportFilters() {
-    return this.filters;
-  }
-
-  public int getMaximumTimeBetweenPings() {
-    return this.timeBetPings;
-  }
-
-  public int getPort() {
-    return this.port;
-  }
-
-  public int getStartPort() {
-    return this.startPort;
-  }
-  
-  public int getEndPort() {
-    return this.endPort;
-  }
-  
-  public int getSocketBufferSize() {
-    return this.socketBufferSize;
-  }
-
-  public boolean isManualStart() {
-    return this.manualStart;
-  }
-  
-  public CacheServer getServer() {
-    return receiver;
-  }
-  
-  public void start() throws IOException {
-    if (receiver == null) {
-      receiver = this.cache.addCacheServer(true);
-    }
-    if (receiver.isRunning()) {
-      logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayReceiver_IS_ALREADY_RUNNING));
-      return;
-    }
-    boolean started = false;
-    this.port = getPortToStart();
-    while (!started && this.port != -1) {
-      receiver.setPort(this.port);
-      receiver.setSocketBufferSize(socketBufferSize);
-      receiver.setMaximumTimeBetweenPings(timeBetPings);
-      receiver.setHostnameForClients(host);
-      receiver.setBindAddress(bindAdd);
-      receiver.setGroups(new String[] { GatewayReceiverImpl.RECEIVER_GROUP });
-      ((CacheServerImpl)receiver).setGatewayTransportFilter(this.filters);
-      try {
-        ((CacheServerImpl)receiver).start();
-        started = true;
-      } catch (BindException be) {
-        if (be.getCause() != null
-            && be.getCause().getMessage()
-                .contains("assign requested address")) {
-          throw new GatewayReceiverException(
-              LocalizedStrings.SocketCreator_FAILED_TO_CREATE_SERVER_SOCKET_ON_0_1
-                  .toLocalizedString(new Object[] { bindAdd,
-                      Integer.valueOf(this.port) }));
-        }
-        // ignore as this port might have been used by other threads.
-        logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayReceiver_Address_Already_In_Use, this.port));
-        this.port = getPortToStart();
-      } catch (SocketException se) {
-        if (se.getMessage().contains("Address already in use")) {
-          logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayReceiver_Address_Already_In_Use, this.port));
-          this.port = getPortToStart();
-
-        } else {
-          throw se;
-        }
-      }
-      
-    }
-    if (!started) {
-      throw new IllegalStateException(
-          "No available free port found in the given range.");
-    }
-    logger.info(LocalizedMessage.create(LocalizedStrings.GatewayReceiver_STARTED_ON_PORT, this.port));
-
-    InternalDistributedSystem system = this.cache.getDistributedSystem();
-    system.handleResourceEvent(ResourceEvent.GATEWAYRECEIVER_START, this);
-
-  }
-  
-  private int getPortToStart(){
-    // choose a random port from the given port range
-    int rPort;
-    if (this.startPort == this.endPort) {
-      rPort = this.startPort;
-    } else {
-      rPort = AvailablePort.getRandomAvailablePortInRange(this.startPort,
-          this.endPort, AvailablePort.SOCKET);
-    }
-    return rPort;
-  }
-  
-  public void stop() {
-    if(!isRunning()){
-      throw new GatewayReceiverException(LocalizedStrings.GatewayReceiver_IS_NOT_RUNNING.toLocalizedString());
-    }
-    receiver.stop();
-
-//    InternalDistributedSystem system = ((GemFireCacheImpl) this.cache)
-//        .getDistributedSystem();
-//    system.handleResourceEvent(ResourceEvent.GATEWAYRECEIVER_STOP, this);
-
-  }
-
-  public String getHost() {
-    return this.host;
-  }
-
-  public String getBindAddress() {
-    return this.bindAdd;
-  }
-
-  public boolean isRunning() {
-    if (this.receiver != null) {
-      return this.receiver.isRunning();
-    }
-    return false;
-  }
-  
-  public String toString() {
-    return new StringBuffer()
-      .append("Gateway Receiver")
-      .append("@").append(Integer.toHexString(hashCode()))
-      .append(" [")
-      .append("host='").append(getHost())
-      .append("'; port=").append(getPort())
-      .append("; bindAddress=").append(getBindAddress())
-      .append("; maximumTimeBetweenPings=").append(getMaximumTimeBetweenPings())
-      .append("; socketBufferSize=").append(getSocketBufferSize())
-      .append("; isManualStart=").append(isManualStart())
-      .append("; group=").append(Arrays.toString(new String[]{GatewayReceiverImpl.RECEIVER_GROUP}))
-      .append("]")
-      .toString();
-  }
-   
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
deleted file mode 100644
index d2302c4..0000000
--- a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
+++ /dev/null
@@ -1,802 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.wan;
-
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import com.gemstone.gemfire.GemFireIOException;
-import com.gemstone.gemfire.internal.cache.tier.sockets.MessageTooLargeException;
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.CancelException;
-import com.gemstone.gemfire.cache.Cache;
-import com.gemstone.gemfire.cache.RegionDestroyedException;
-import com.gemstone.gemfire.cache.client.ServerConnectivityException;
-import com.gemstone.gemfire.cache.client.ServerOperationException;
-import com.gemstone.gemfire.cache.client.internal.Connection;
-import com.gemstone.gemfire.cache.client.internal.pooling.ConnectionDestroyedException;
-import com.gemstone.gemfire.cache.wan.GatewaySender;
-import com.gemstone.gemfire.distributed.internal.ServerLocation;
-import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
-import com.gemstone.gemfire.internal.cache.UpdateAttributesProcessor;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-import com.gemstone.gemfire.internal.logging.LogService;
-import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
-import com.gemstone.gemfire.pdx.PdxRegistryMismatchException;
-import com.gemstone.gemfire.security.GemFireSecurityException;
-import com.gemstone.gemfire.cache.client.internal.SenderProxy;
-
-/**
- * @since GemFire 7.0
- *
- */
-public class GatewaySenderEventRemoteDispatcher implements
-    GatewaySenderEventDispatcher {
-
-  private static final Logger logger = LogService.getLogger();
-  
-  private final AbstractGatewaySenderEventProcessor processor;
-
-  private volatile Connection connection;
-
-  private final Set<String> notFoundRegions = new HashSet<String>();
-  
-  private final Object notFoundRegionsSync = new Object();
-  
-  private final AbstractGatewaySender sender;
-  
-  private AckReaderThread ackReaderThread;
-  
-  private ReentrantReadWriteLock connectionLifeCycleLock = new ReentrantReadWriteLock();
-  
-  /**
-   * This count is reset to 0 each time a successful connection is made.
-   */
-  private int failedConnectCount = 0;
-  
-  public GatewaySenderEventRemoteDispatcher(AbstractGatewaySenderEventProcessor eventProcessor) {
-    this.processor = eventProcessor;
-    this.sender = eventProcessor.getSender();
-//    this.ackReaderThread = new AckReaderThread(sender);
-    try {
-      initializeConnection();
-    }
-    catch (GatewaySenderException e) {
-      if (e.getCause() instanceof GemFireSecurityException) {
-        throw e;
-      }
-    }
-  }
-  
-  protected GatewayAck readAcknowledgement() {
-    SenderProxy sp = new SenderProxy(this.processor.getSender().getProxy());
-    GatewayAck ack = null;
-    Exception ex;
-    try {
-      connection = getConnection(false);
-      if (logger.isDebugEnabled()) {
-        logger.debug(" Receiving ack on the thread {}", connection);
-      }
-      this.connectionLifeCycleLock.readLock().lock();
-      try {
-        if (connection != null) {
-          ack = (GatewayAck)sp.receiveAckFromReceiver(connection);
-        }
-      } finally {
-        this.connectionLifeCycleLock.readLock().unlock();
-      }
-
-    } catch (Exception e) {
-      Throwable t = e.getCause();
-      if (t instanceof BatchException70) {
-        // A BatchException has occurred.
-        // Do not process the connection as dead since it is not dead.
-        ex = (BatchException70)t;
-      } else if (e instanceof GatewaySenderException) { //This Exception is thrown from getConnection
-        ex = (Exception) e.getCause();
-      }else {
-        ex = e;
-        // keep using the connection if we had a batch exception. Else, destroy
-        // it
-        destroyConnection();
-      }
-      if (this.sender.getProxy() == null || this.sender.getProxy().isDestroyed()) {
-        // if our pool is shutdown then just be silent
-      } else if (ex instanceof IOException
-          || (ex instanceof ServerConnectivityException && !(ex.getCause() instanceof PdxRegistryMismatchException))
-          || ex instanceof ConnectionDestroyedException) {
-        // If the cause is an IOException or a ServerException, sleep and retry.
-        // Sleep for a bit and recheck.
-        try {
-          Thread.sleep(100);
-        } catch (InterruptedException ie) {
-          Thread.currentThread().interrupt();
-        }
-      } else {
-        if (!(ex instanceof CancelException)) {
-          logger.fatal(LocalizedMessage.create(
-                  LocalizedStrings.GatewayEventRemoteDispatcher_STOPPING_THE_PROCESSOR_BECAUSE_THE_FOLLOWING_EXCEPTION_OCCURRED_WHILE_PROCESSING_A_BATCH),
-                  ex);
-        }
-        this.processor.setIsStopped(true);
-      }
-    }
-    return ack;
-  }
-  
-  @Override
-  public boolean dispatchBatch(List events, boolean isRetry) {
-    GatewaySenderStats statistics = this.sender.getStatistics();
-    boolean success = false;
-    try {
-      long start = statistics.startTime();
-      success =_dispatchBatch(events, isRetry);
-      if (success) {
-        statistics.endBatch(start, events.size());
-      }
-    } catch (GatewaySenderException ge) {
-
-      Throwable t = ge.getCause();
-      if (this.sender.getProxy() == null || this.sender.getProxy().isDestroyed()) {
-        // if our pool is shutdown then just be silent
-      } else if (t instanceof IOException
-          || t instanceof ServerConnectivityException
-          || t instanceof ConnectionDestroyedException
-          || t instanceof MessageTooLargeException
-          || t instanceof IllegalStateException) {
-        this.processor.handleException();
-        // If the cause is an IOException or a ServerException, sleep and retry.
-        // Sleep for a bit and recheck.
-        try {
-          Thread.sleep(100);
-        } catch (InterruptedException ie) {
-          Thread.currentThread().interrupt();
-        }
-        if (logger.isDebugEnabled()) {
-          logger.debug("Because of IOException, failed to dispatch a batch with id : {}", this.processor.getBatchId());
-        }
-      }
-      else {
-        logger.fatal(LocalizedMessage.create(
-            LocalizedStrings.GatewayEventRemoteDispatcher_STOPPING_THE_PROCESSOR_BECAUSE_THE_FOLLOWING_EXCEPTION_OCCURRED_WHILE_PROCESSING_A_BATCH), ge);
-        this.processor.setIsStopped(true);
-      }
-    }
-    catch (CancelException e) {
-      if (logger.isDebugEnabled()) {
-        logger.debug("Stopping the processor because cancellation occurred while processing a batch");
-      }
-      this.processor.setIsStopped(true);
-      throw e;
-    } catch (Exception e) {
-      this.processor.setIsStopped(true);
-      logger.fatal(LocalizedMessage.create(
-              LocalizedStrings.GatewayEventRemoteDispatcher_STOPPING_THE_PROCESSOR_BECAUSE_THE_FOLLOWING_EXCEPTION_OCCURRED_WHILE_PROCESSING_A_BATCH),
-              e);
-    }
-    return success;
-  }
-
-  private boolean _dispatchBatch(List events, boolean isRetry) {
-    Exception ex = null;
-    int currentBatchId = this.processor.getBatchId();
-    connection = getConnection(true);
-    int batchIdForThisConnection = this.processor.getBatchId();
-    GatewaySenderStats statistics = this.sender.getStatistics();
-    // This means we are writing to a new connection than the previous batch.
-    // i.e The connection has been reset. It also resets the batchId.
-    if (currentBatchId != batchIdForThisConnection
-        || this.processor.isConnectionReset()) {
-      return false;
-    }
-    try {
-      if (this.processor.isConnectionReset()) {
-        isRetry = true;
-      }
-      SenderProxy sp = new SenderProxy(this.sender.getProxy());
-      this.connectionLifeCycleLock.readLock().lock();
-      try {
-        if (connection != null) {
-          sp.dispatchBatch_NewWAN(connection, events, currentBatchId, isRetry);
-          if (logger.isDebugEnabled()) {
-            logger.debug("{} : Dispatched batch (id={}) of {} events, queue size: {} on connection {}",
-                this.processor.getSender(), currentBatchId,  events.size(), this.processor.getQueue().size(), connection);
-          }
-        } else {
-          throw new ConnectionDestroyedException();
-        }
-      }
-      finally{
-        this.connectionLifeCycleLock.readLock().unlock();
-      }
-      return true;
-    }
-    catch (ServerOperationException e) {
-      Throwable t = e.getCause();
-      if (t instanceof BatchException70) {
-        // A BatchException has occurred.
-        // Do not process the connection as dead since it is not dead.
-        ex = (BatchException70)t;
-      }
-      else {
-        ex = e;
-        // keep using the connection if we had a batch exception. Else, destroy it
-        destroyConnection();
-      }
-      throw new GatewaySenderException(
-          LocalizedStrings.GatewayEventRemoteDispatcher_0_EXCEPTION_DURING_PROCESSING_BATCH_1_ON_CONNECTION_2.toLocalizedString(
-              new Object[] {this, Integer.valueOf(currentBatchId), connection}), ex);
-    }
-    catch (GemFireIOException e) {
-      Throwable t = e.getCause();
-      if (t instanceof MessageTooLargeException) {
-        // A MessageTooLargeException has occurred.
-        // Do not process the connection as dead since it is not dead.
-        ex = (MessageTooLargeException)t;
-        // Reduce the batch size by half of the configured batch size or number of events in the current batch (whichever is less)
-        int newBatchSize = Math.min(events.size(), this.processor.getBatchSize())/2;
-        logger.warn(LocalizedMessage.create(
-            LocalizedStrings.GatewaySenderEventRemoteDispatcher_MESSAGE_TOO_LARGE_EXCEPTION, new Object[] { events.size(), newBatchSize }), e);
-        this.processor.setBatchSize(newBatchSize);
-        statistics.incBatchesResized();
-      }
-      else {
-        ex = e;
-        // keep using the connection if we had a MessageTooLargeException. Else, destroy it
-        destroyConnection();
-      }
-      throw new GatewaySenderException(
-          LocalizedStrings.GatewayEventRemoteDispatcher_0_EXCEPTION_DURING_PROCESSING_BATCH_1_ON_CONNECTION_2.toLocalizedString(
-              new Object[] {this, Integer.valueOf(currentBatchId), connection}), ex);
-    }
-    catch (IllegalStateException e) {
-      this.processor.setException(new GatewaySenderException(e));
-      throw new GatewaySenderException(
-          LocalizedStrings.GatewayEventRemoteDispatcher_0_EXCEPTION_DURING_PROCESSING_BATCH_1_ON_CONNECTION_2.toLocalizedString(
-              new Object[] {this, Integer.valueOf(currentBatchId), connection}), e);
-    }
-    catch (Exception e) {
-      // An Exception has occurred. Get its cause.
-      Throwable t = e.getCause();
-      if (t instanceof IOException) {
-        // An IOException has occurred.
-        ex = (IOException)t;
-      } else {
-        ex = e;
-      }
-      //the cause is not going to be BatchException70. So, destroy the connection
-      destroyConnection();
-      
-      throw new GatewaySenderException(
-          LocalizedStrings.GatewayEventRemoteDispatcher_0_EXCEPTION_DURING_PROCESSING_BATCH_1_ON_CONNECTION_2.toLocalizedString(
-              new Object[] {this, Integer.valueOf(currentBatchId), connection}), ex);
-    }
-  }
-  
-  /**
-   * Acquires or adds a new <code>Connection</code> to the corresponding
-   * <code>Gateway</code>
-   *
-   * @return the <code>Connection</code>
-   *
-   * @throws GatewaySenderException
-   */
-  public Connection getConnection(boolean startAckReaderThread) throws GatewaySenderException{
-    if (this.processor.isStopped()) {
-      return null;
-    }
-    // IF the connection is null 
-    // OR the connection's ServerLocation doesn't match with the one stored in sender
-    // THEN initialize the connection
-    if(!this.sender.isParallel()) {
-      if (this.connection == null || this.connection.isDestroyed()
-          || !this.connection.getServer().equals(this.sender.getServerLocation())) {
-        if (logger.isDebugEnabled()) {
-          logger.debug("Initializing new connection as serverLocation of old connection is : {} and the serverLocation to connect is {}",
-              ((this.connection == null) ? "null" : this.connection.getServer()),
-              this.sender.getServerLocation());
-        }
-        // Initialize the connection
-        initializeConnection();
-      }
-    } else {
-      if (this.connection == null || this.connection.isDestroyed()) {
-        initializeConnection();
-      }
-    }
-    
-    // Here we might wait on a connection to another server if I was secondary
-    // so don't start waiting until I am primary
-    Cache cache = this.sender.getCache();
-    if (cache != null && !cache.isClosed()) {
-      if (this.sender.isPrimary() && (this.connection != null)) {
-        if (this.ackReaderThread == null || !this.ackReaderThread.isRunning()) {
-          this.ackReaderThread = new AckReaderThread(this.sender, this.processor);
-          this.ackReaderThread.start();
-          this.ackReaderThread.waitForRunningAckReaderThreadRunningState();
-        }
-      }
-    }
-    return this.connection;
-  }
-  
-  public void destroyConnection() {
-    this.connectionLifeCycleLock.writeLock().lock();
-    try {
-      Connection con = this.connection;
-      if (con != null) {
-        if (!con.isDestroyed()) {
-          con.destroy();
-          this.sender.getProxy().returnConnection(con);
-        }
-        
-        // Reset the connection so the next time through a new one will be
-        // obtained
-        this.connection = null;
-        this.sender.setServerLocation(null);
-      }
-    }
-    finally {
-      this.connectionLifeCycleLock.writeLock().unlock();
-    }
-  }
-
-  /**
-   * Initializes the <code>Connection</code>.
-   *
-   * @throws GatewaySenderException
-   */
-  private void initializeConnection() throws GatewaySenderException,
-      GemFireSecurityException {
-    this.connectionLifeCycleLock.writeLock().lock(); 
-    try {
-      // Attempt to acquire a connection
-      if (this.sender.getProxy() == null
-          || this.sender.getProxy().isDestroyed()) {
-        this.sender.initProxy();
-      } else {
-        this.processor.resetBatchId();
-      }
-      Connection con;
-      try {
-        if (this.sender.isParallel()) {
-          /*
-           * TODO - The use of acquireConnection should be removed
-           * from the gateway code. This method is fine for tests,
-           * but these connections should really be managed inside
-           * the pool code. If the gateway needs to persistent connection
-           * to a single server, which should create have the OpExecutor
-           * that holds a reference to the connection (similar to the way
-           * we do with thread local connections).
-           * Use {@link ExecutablePool#setupServerAffinity(boolean)} for
-           * gateway code
-           */
-          con = this.sender.getProxy().acquireConnection();
-          // For parallel sender, setting server location will not matter.
-          // everytime it will ask for acquire connection whenever it needs it. I
-          // am saving this server location for command purpose
-          sender.setServerLocation(con.getServer());  
-        } else {
-          synchronized (this.sender
-              .getLockForConcurrentDispatcher()) {
-            ServerLocation server = this.sender.getServerLocation();
-            if (server != null) {
-              if (logger.isDebugEnabled()) {
-                logger.debug("ServerLocation is: {}. Connecting to this serverLocation...", server);
-              }
-              con = this.sender.getProxy().acquireConnection(server);
-            } else {
-              if (logger.isDebugEnabled()) {
-                logger.debug("ServerLocation is null. Creating new connection. ");
-              }
-              con = this.sender.getProxy().acquireConnection();
-              // Acquired connection from pool!! Update the server location
-              // information in the sender and
-              // distribute the information to other senders ONLY IF THIS SENDER
-              // IS
-              // PRIMARY
-              if (this.sender.isPrimary()) {
-                if (sender.getServerLocation() == null) {
-                  sender.setServerLocation(con.getServer());
-                }
-                new UpdateAttributesProcessor(this.sender).distribute(false);
-              }
-            }
-          }
-        }
-      } catch (ServerConnectivityException e) {
-        this.failedConnectCount++;
-        Throwable ex = null;
-
-        if (e.getCause() instanceof GemFireSecurityException) {
-          ex = e.getCause();
-          if (logConnectionFailure()) {
-            // only log this message once; another msg is logged once we connect
-            logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayEventRemoteDispatcher_0_COULD_NOT_CONNECT_1,
-                new Object[] { this.processor.getSender().getId(), ex.getMessage() }));
-          }
-          throw new GatewaySenderException(ex);
-        }
-        List<ServerLocation> servers = this.sender.getProxy()
-            .getCurrentServers();
-        String ioMsg = null;
-        if (servers.size() == 0) {
-          ioMsg = LocalizedStrings.GatewayEventRemoteDispatcher_THERE_ARE_NO_ACTIVE_SERVERS
-              .toLocalizedString();
-        } else {
-          final StringBuilder buffer = new StringBuilder();
-          for (ServerLocation server : servers) {
-            String endpointName = String.valueOf(server);
-            if (buffer.length() > 0) {
-              buffer.append(", ");
-            }
-            buffer.append(endpointName);
-          }
-          ioMsg = LocalizedStrings.GatewayEventRemoteDispatcher_NO_AVAILABLE_CONNECTION_WAS_FOUND_BUT_THE_FOLLOWING_ACTIVE_SERVERS_EXIST_0
-              .toLocalizedString(buffer.toString());
-        }
-        ex = new IOException(ioMsg);
-        // Set the serverLocation to null so that a new connection can be
-        // obtained in next attempt
-        this.sender.setServerLocation(null);
-        if (this.failedConnectCount == 1) {
-          // only log this message once; another msg is logged once we connect
-          logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayEventRemoteDispatcher__0___COULD_NOT_CONNECT,
-                  this.processor.getSender().getId()));
-
-        }
-        // Wrap the IOException in a GatewayException so it can be processed the
-        // same as the other exceptions that might occur in sendBatch.
-        throw new GatewaySenderException(
-            LocalizedStrings.GatewayEventRemoteDispatcher__0___COULD_NOT_CONNECT
-                .toLocalizedString(this.processor.getSender().getId()), ex);
-      }
-      if (this.failedConnectCount > 0) {
-        Object[] logArgs = new Object[] { this.processor.getSender().getId(),
-            con, Integer.valueOf(this.failedConnectCount) };
-        logger.info(LocalizedMessage.create(LocalizedStrings.GatewayEventRemoteDispatcher_0_USING_1_AFTER_2_FAILED_CONNECT_ATTEMPTS,
-                logArgs));
-        this.failedConnectCount = 0;
-      } else {
-        Object[] logArgs = new Object[] { this.processor.getSender().getId(),
-            con };
-        logger.info(LocalizedMessage.create(LocalizedStrings.GatewayEventRemoteDispatcher_0_USING_1, logArgs));
-      }
-      this.connection = con;
-      this.processor.checkIfPdxNeedsResend(this.connection.getQueueStatus().getPdxSize());
-    }
-    catch (ConnectionDestroyedException e) {
-      throw new GatewaySenderException(
-          LocalizedStrings.GatewayEventRemoteDispatcher__0___COULD_NOT_CONNECT.toLocalizedString(this.processor
-              .getSender().getId()), e);
-    }
-    finally {
-      this.connectionLifeCycleLock.writeLock().unlock();
-    }
-  }
-
-  protected boolean logConnectionFailure() {
-    // always log the first failure
-	if (logger.isDebugEnabled() || this.failedConnectCount == 0) {
-	  return true;
-	}
-	else {
-	  // subsequent failures will be logged on 30th, 300th, 3000th try
-	  // each try is at 100millis from higher layer so this accounts for logging
-	  // after 3s, 30s and then every 5mins
-	  if (this.failedConnectCount >= 3000) {
-	    return (this.failedConnectCount % 3000) == 0;
-	  }
-	  else {
-	    return (this.failedConnectCount == 30 || this.failedConnectCount == 300);
-	  }
-    }
-  }
-  
-  public static class GatewayAck {
-    private int batchId;
-
-    private int numEvents;
-
-    private BatchException70 be;
-
-    public GatewayAck(BatchException70 be, int bId) {
-      this.be = be;
-      this.batchId = bId;
-    }
-
-    public GatewayAck(int batchId, int numEvents) {
-      this.batchId = batchId;
-      this.numEvents = numEvents;
-    }
-
-    /**
-     * @return the numEvents
-     */
-    public int getNumEvents() {
-      return numEvents;
-    }
-
-    /**
-     * @return the batchId
-     */
-    public int getBatchId() {
-      return batchId;
-    }
-
-    public BatchException70 getBatchException() {
-      return this.be;
-    }
-  }
-    
-  class AckReaderThread extends Thread {
-
-    private Object runningStateLock = new Object();
-
-    /**
-     * boolean to make a shutdown request
-     */
-    private volatile boolean shutdown = false;
-
-    private final GemFireCacheImpl cache;
-
-    private volatile boolean ackReaderThreadRunning = false;
-
-    public AckReaderThread(GatewaySender sender, AbstractGatewaySenderEventProcessor processor) {
-      super("AckReaderThread for : " + processor.getName());
-      this.setDaemon(true);
-      this.cache = (GemFireCacheImpl)((AbstractGatewaySender)sender).getCache();
-    }
-
-    public void waitForRunningAckReaderThreadRunningState() {
-      synchronized (runningStateLock) {
-        while (!this.ackReaderThreadRunning) {
-          try {
-            this.runningStateLock.wait();
-          } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            break;
-          }
-        }
-      }
-    }
-
-    private boolean checkCancelled() {
-      if (shutdown) {
-        return true;
-      }
-
-      if (cache.getCancelCriterion().isCancelInProgress()) {
-        return true;
-      }
-      return false;
-    }
-
-    @Override
-    public void run() {
-      if (logger.isDebugEnabled()) {
-        logger.debug("AckReaderThread started.. ");
-      }
-
-      synchronized (runningStateLock) {
-        ackReaderThreadRunning = true;
-        this.runningStateLock.notifyAll();
-      }
-
-      try {
-        for (;;) {
-          if (checkCancelled()) {
-            break;
-          }
-          GatewayAck ack = readAcknowledgement();
-          if (ack != null) {
-            boolean gotBatchException = ack.getBatchException() != null;
-            int batchId = ack.getBatchId();
-            int numEvents = ack.getNumEvents();
-
-            // If the batch is successfully processed, remove it from the
-            // queue.
-            if (gotBatchException) {
-              logger.info(LocalizedMessage.create(
-                  LocalizedStrings.GatewaySenderEventRemoteDispatcher_GATEWAY_SENDER_0_RECEIVED_ACK_FOR_BATCH_ID_1_WITH_EXCEPTION,
-                      new Object[] { processor.getSender(), ack.getBatchId() }, ack.getBatchException()));
-              // If we get PDX related exception in the batch exception then try
-              // to resend all the pdx events as well in the next batch.
-              final GatewaySenderStats statistics = sender.getStatistics();
-              statistics.incBatchesRedistributed();
-              // log batch exceptions and remove all the events if remove from
-              // exception is true
-              // do not remove if it is false
-              logBatchExceptions(ack.getBatchException());
-              processor.handleSuccessBatchAck(batchId);
-
-            } // unsuccessful batch
-            else { // The batch was successful.
-              if (logger.isDebugEnabled()) {
-                logger.debug("Gateway Sender {} : Received ack for batch id {} of {} events",
-                    processor.getSender(), ack.getBatchId(), ack.getNumEvents());
-              }
-              processor.handleSuccessBatchAck(batchId);
-            }
-          } else {
-            // If we have received IOException.
-            if (logger.isDebugEnabled()) {
-              logger.debug("{}: Received null ack from remote site.", processor.getSender());
-            }
-            processor.handleException();
-            try { // This wait is before trying to getting new connection to
-                  // receive ack. Without this there will be continuous call to
-                  // getConnection
-              Thread.sleep(GatewaySender.CONNECTION_RETRY_INTERVAL);
-            } catch (InterruptedException e) {
-              Thread.currentThread().interrupt();
-            }
-          }
-        }
-      } catch (Exception e) {
-        if (!checkCancelled()) {
-          logger.fatal(LocalizedMessage.create(
-              LocalizedStrings.GatewayEventRemoteDispatcher_STOPPING_THE_PROCESSOR_BECAUSE_THE_FOLLOWING_EXCEPTION_OCCURRED_WHILE_PROCESSING_A_BATCH) ,e);
-        }
-        sender.getLifeCycleLock().writeLock().lock();
-        try {
-          processor.stopProcessing();
-          sender.clearTempEventsAfterSenderStopped();
-        } finally {
-          sender.getLifeCycleLock().writeLock().unlock();
-        }
-        // destroyConnection();
-      } finally {
-        if (logger.isDebugEnabled()) {
-          logger.debug("AckReaderThread exiting. ");
-        }
-        ackReaderThreadRunning = false;
-      }
-
-    }
-
-    /**
-     * @param exception 
-     * 
-     */
-    private void logBatchExceptions(BatchException70 exception) {
-      for (BatchException70 be : exception.getExceptions()) {
-        boolean logWarning = true;
-        if (be.getCause() instanceof RegionDestroyedException) {
-          RegionDestroyedException rde = (RegionDestroyedException)be
-              .getCause();
-          synchronized (notFoundRegionsSync) {
-            if (notFoundRegions.contains(rde.getRegionFullPath())) {
-              logWarning = false;
-            } else {
-              notFoundRegions.add(rde.getRegionFullPath());
-            }
-          }
-        } else if (be.getCause() instanceof IllegalStateException
-            && be.getCause().getMessage().contains("Unknown pdx type")) {
-          List<GatewaySenderEventImpl> pdxEvents = processor
-              .getBatchIdToPDXEventsMap().get(be.getBatchId());
-          if (logWarning) {
-            logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayEventRemoteDispatcher_A_BATCHEXCEPTION_OCCURRED_PROCESSING_PDX_EVENT__0,
-                    be.getIndex()), be);
-          }
-          if (pdxEvents != null) {
-            for (GatewaySenderEventImpl senderEvent : pdxEvents) {
-              senderEvent.isAcked = false;
-            }
-            GatewaySenderEventImpl gsEvent = pdxEvents.get(be.getIndex());
-            if (logWarning) {
-              logger.warn(LocalizedMessage.create(
-                  LocalizedStrings.GatewayEventRemoteDispatcher_THE_EVENT_BEING_PROCESSED_WHEN_THE_BATCHEXCEPTION_OCCURRED_WAS__0, gsEvent));
-            }
-          }
-          continue;
-        }
-        if (logWarning) {
-          logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayEventRemoteDispatcher_A_BATCHEXCEPTION_OCCURRED_PROCESSING_EVENT__0,
-              be.getIndex()), be);
-        }
-        List<GatewaySenderEventImpl>[] eventsArr = processor.getBatchIdToEventsMap().get(be.getBatchId()); 
-        if (eventsArr != null) {
-          List<GatewaySenderEventImpl> filteredEvents = eventsArr[1];
-          GatewaySenderEventImpl gsEvent = (GatewaySenderEventImpl)filteredEvents
-              .get(be.getIndex());
-          if (logWarning) {
-            logger.warn(LocalizedMessage.create(
-                LocalizedStrings.GatewayEventRemoteDispatcher_THE_EVENT_BEING_PROCESSED_WHEN_THE_BATCHEXCEPTION_OCCURRED_WAS__0, gsEvent));
-          }
-        }
-      }
-    }
-
-    boolean isRunning() {
-      return this.ackReaderThreadRunning;
-    }
-
-    public void shutdown() {
-      // we need to destroy connection irrespective of we are listening on it or
-      // not. No need to take lock as the reader thread may be blocked and we might not
-      // get chance to destroy unless that returns.
-      if (connection != null) {
-        Connection conn = connection;
-        shutDownAckReaderConnection();
-        if (!conn.isDestroyed()) {
-          conn.destroy();
-          sender.getProxy().returnConnection(conn);
-        }
-      }
-      this.shutdown = true;
-      boolean interrupted = Thread.interrupted();
-      try {
-        this.join(15 * 1000);
-      } catch (InterruptedException e) {
-        interrupted = true;
-      } finally {
-        if (interrupted) {
-          Thread.currentThread().interrupt();
-        }
-      }
-      if (this.isAlive()) {
-        logger.warn(LocalizedMessage.create(LocalizedStrings.GatewaySender_ACKREADERTHREAD_IGNORED_CANCELLATION));
-      }
-    }
-
-    private void shutDownAckReaderConnection() {
-      Connection conn = connection;
-      //attempt to unblock the ackReader thread by shutting down the inputStream, if it was stuck on a read
-      try {
-        if (conn != null && conn.getInputStream() != null) {
-          conn.getInputStream().close();
-        }
-      } catch (IOException e) {
-        logger.warn("Unable to shutdown AckReaderThread Connection");
-      } catch (ConnectionDestroyedException e) {
-        logger.info("AckReader shutting down and connection already destroyed");
-      }
-
-    }
-  }
-    
-  public void stopAckReaderThread() {
-    if (this.ackReaderThread != null) {
-      this.ackReaderThread.shutdown();
-    }
-  }
-  
-  @Override
-  public boolean isRemoteDispatcher() {
-    return true;
-  }
-
-  @Override
-  public boolean isConnectedToRemote() {
-      return connection != null;
-  }
-  
-  public void stop() {
-    stopAckReaderThread();
-    if(this.processor.isStopped()) {
-      destroyConnection();
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderFactoryImpl.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderFactoryImpl.java b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderFactoryImpl.java
deleted file mode 100644
index 4974c6f..0000000
--- a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderFactoryImpl.java
+++ /dev/null
@@ -1,382 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.wan;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.cache.Cache;
-import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
-import com.gemstone.gemfire.cache.client.internal.LocatorDiscoveryCallback;
-import com.gemstone.gemfire.cache.wan.GatewayEventFilter;
-import com.gemstone.gemfire.cache.wan.GatewayEventSubstitutionFilter;
-import com.gemstone.gemfire.cache.wan.GatewaySender;
-import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy;
-import com.gemstone.gemfire.cache.wan.GatewaySenderFactory;
-import com.gemstone.gemfire.cache.wan.GatewayTransportFilter;
-import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
-import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
-import com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderImpl;
-import com.gemstone.gemfire.internal.cache.wan.serial.SerialGatewaySenderImpl;
-import com.gemstone.gemfire.internal.cache.xmlcache.CacheCreation;
-import com.gemstone.gemfire.internal.cache.xmlcache.ParallelGatewaySenderCreation;
-import com.gemstone.gemfire.internal.cache.xmlcache.SerialGatewaySenderCreation;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-import com.gemstone.gemfire.internal.logging.LogService;
-import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
-
-/**
- * 
- * @since GemFire 7.0
- * 
- */
-public class GatewaySenderFactoryImpl implements
-    InternalGatewaySenderFactory {
-
-  private static final Logger logger = LogService.getLogger();
-  
-  /**
-   * Used internally to pass the attributes from this factory to the real
-   * GatewaySender it is creating.
-   */
-  private GatewaySenderAttributes attrs = new GatewaySenderAttributes();
-
-  private Cache cache;
-
-  private static final AtomicBoolean GATEWAY_CONNECTION_READ_TIMEOUT_PROPERTY_CHECKED = new AtomicBoolean(false);
-
-  public GatewaySenderFactoryImpl(Cache cache) {
-    this.cache = cache;
-  }
-
-  public GatewaySenderFactory setParallel(boolean isParallel){
-    this.attrs.isParallel = isParallel;
-    return this;
-  }
-  
-  public GatewaySenderFactory setForInternalUse(boolean isForInternalUse) {
-    this.attrs.isForInternalUse = isForInternalUse;
-    return this;
-  }
-  
-  public GatewaySenderFactory addGatewayEventFilter(
-      GatewayEventFilter filter) {
-    this.attrs.addGatewayEventFilter(filter);
-    return this;
-  }
-
-  public GatewaySenderFactory addGatewayTransportFilter(
-      GatewayTransportFilter filter) {
-    this.attrs.addGatewayTransportFilter(filter);
-    return this;
-  }
-
-  public GatewaySenderFactory addAsyncEventListener(
-      AsyncEventListener listener) {
-    this.attrs.addAsyncEventListener(listener);
-    return this;
-  }
-  
-  public GatewaySenderFactory setSocketBufferSize(int socketBufferSize) {
-    this.attrs.socketBufferSize = socketBufferSize;
-    return this;
-  }
-
-  public GatewaySenderFactory setSocketReadTimeout(int socketReadTimeout) {
-    this.attrs.socketReadTimeout = socketReadTimeout;
-    return this;
-  }
-
-  public GatewaySenderFactory setDiskStoreName(String diskStoreName) {
-    this.attrs.diskStoreName = diskStoreName;
-    return this;
-  }
-
-  public GatewaySenderFactory setMaximumQueueMemory(int maximumQueueMemory) {
-    this.attrs.maximumQueueMemory = maximumQueueMemory;
-    return this;
-  }
-
-  public GatewaySenderFactory setBatchSize(int batchSize) {
-    this.attrs.batchSize = batchSize;
-    return this;
-  }
-
-  public GatewaySenderFactory setBatchTimeInterval(int batchTimeInterval) {
-    this.attrs.batchTimeInterval = batchTimeInterval;
-    return this;
-  }
-
-  public GatewaySenderFactory setBatchConflationEnabled(
-      boolean enableBatchConflation) {
-    this.attrs.isBatchConflationEnabled = enableBatchConflation;
-    return this;
-  }
-
-  public GatewaySenderFactory setPersistenceEnabled(
-      boolean enablePersistence) {
-    this.attrs.isPersistenceEnabled = enablePersistence;
-    return this;
-  }
-
-  public GatewaySenderFactory setAlertThreshold(int threshold) {
-    this.attrs.alertThreshold = threshold;
-    return this;
-  }
-
-  public GatewaySenderFactory setManualStart(boolean start) {
-    this.attrs.manualStart = start;
-    return this;
-  }
-
-  public GatewaySenderFactory setLocatorDiscoveryCallback(
-      LocatorDiscoveryCallback locCallback) {
-    this.attrs.locatorDiscoveryCallback = locCallback;
-    return this;
-  }
-
-  @Override
-  public GatewaySenderFactory setDiskSynchronous(boolean isSynchronous) {
-    this.attrs.isDiskSynchronous = isSynchronous;
-    return this;
-  }
-  
-  @Override
-  public GatewaySenderFactory setDispatcherThreads(int numThreads) {
-    if ((numThreads > 1) && this.attrs.policy == null) {
-      this.attrs.policy = GatewaySender.DEFAULT_ORDER_POLICY;
-    }
-    this.attrs.dispatcherThreads = numThreads;
-    return this;
-  }
-
-  public GatewaySenderFactory setParallelFactorForReplicatedRegion(int parallel) {
-    this.attrs.parallelism = parallel;
-    this.attrs.policy = GatewaySender.DEFAULT_ORDER_POLICY;
-    return this;
-  }    
-    
-  @Override
-  public GatewaySenderFactory setOrderPolicy(OrderPolicy policy) {
-    this.attrs.policy = policy;
-    return this;
-  }
-  
-  public GatewaySenderFactory setBucketSorted(boolean isBucketSorted){
-    this.attrs.isBucketSorted = isBucketSorted;
-    return this;
-  }
-  public GatewaySender create(String id, int remoteDSId) {
-    int myDSId = InternalDistributedSystem.getAnyInstance()
-        .getDistributionManager().getDistributedSystemId();
-    if (remoteDSId == myDSId) {
-      throw new GatewaySenderException(
-          LocalizedStrings.GatewaySenderImpl_GATEWAY_0_CANNOT_BE_CREATED_WITH_REMOTE_SITE_ID_EQUAL_TO_THIS_SITE_ID
-              .toLocalizedString(id));
-    }
-    if (remoteDSId < 0) {
-      throw new GatewaySenderException(
-          LocalizedStrings.GatewaySenderImpl_GATEWAY_0_CANNOT_BE_CREATED_WITH_REMOTE_SITE_ID_LESS_THAN_ZERO
-              .toLocalizedString(id));
-    }
-    this.attrs.id = id;
-    this.attrs.remoteDs = remoteDSId;
-    GatewaySender sender = null;
-
-    if(this.attrs.getDispatcherThreads() <= 0){
-      throw new GatewaySenderException(
-          LocalizedStrings.GatewaySenderImpl_GATEWAY_SENDER_0_CANNOT_HAVE_DISPATCHER_THREADS_LESS_THAN_1
-              .toLocalizedString(id));
-    }
-
-    // Verify socket read timeout if a proper logger is available
-    if (this.cache instanceof GemFireCacheImpl) {
-      // If socket read timeout is less than the minimum, log a warning.
-      // Ideally, this should throw a GatewaySenderException, but wan dunit tests
-      // were failing, and we were running out of time to change them.
-      if (this.attrs.getSocketReadTimeout() != 0
-          && this.attrs.getSocketReadTimeout() < GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT) {
-        logger.warn(LocalizedMessage.create(LocalizedStrings.Gateway_CONFIGURED_SOCKET_READ_TIMEOUT_TOO_LOW,
-            new Object[] { "GatewaySender " + id, this.attrs.getSocketReadTimeout(), GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT }));
-        this.attrs.socketReadTimeout = GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT;
-      }
-
-      // Log a warning if the old system property is set.
-      if (GATEWAY_CONNECTION_READ_TIMEOUT_PROPERTY_CHECKED.compareAndSet(false, true)) {
-        if (System.getProperty(GatewaySender.GATEWAY_CONNECTION_READ_TIMEOUT_PROPERTY) != null) {
-          logger.warn(LocalizedMessage.create(LocalizedStrings.Gateway_OBSOLETE_SYSTEM_POPERTY,
-              new Object[] { GatewaySender.GATEWAY_CONNECTION_READ_TIMEOUT_PROPERTY, "GatewaySender socket read timeout" }));
-        }
-      }
-    }
-
-    if (this.attrs.isParallel()) {
-//      if(this.attrs.getDispatcherThreads() != 1){
-//        throw new GatewaySenderException(
-//            LocalizedStrings.GatewaySenderImpl_PARALLEL_GATEWAY_SENDER_0_CANNOT_BE_CREATED_WITH_DISPATCHER_THREADS_OTHER_THAN_1
-//                .toLocalizedString(id));
-//      }
-      if ((this.attrs.getOrderPolicy() != null)
-          && this.attrs.getOrderPolicy().equals(OrderPolicy.THREAD)) {
-        throw new GatewaySenderException(
-            LocalizedStrings.GatewaySenderImpl_PARALLEL_GATEWAY_SENDER_0_CANNOT_BE_CREATED_WITH_ORDER_POLICY_1
-                .toLocalizedString(id, this.attrs.getOrderPolicy()));
-      }
-      if (this.cache instanceof GemFireCacheImpl) {
-        sender = new ParallelGatewaySenderImpl(this.cache, this.attrs);
-        ((GemFireCacheImpl)this.cache).addGatewaySender(sender);
-        
-        if (!this.attrs.isManualStart()) {
-          sender.start();
-        }
-      }
-      else if (this.cache instanceof CacheCreation) {
-        sender = new ParallelGatewaySenderCreation(this.cache, this.attrs);
-        ((CacheCreation)this.cache).addGatewaySender(sender);
-      }
-    }
-    else {
-      if (this.attrs.getAsyncEventListeners().size() > 0) {
-        throw new GatewaySenderException(
-            LocalizedStrings.SerialGatewaySenderImpl_GATEWAY_0_CANNOT_DEFINE_A_REMOTE_SITE_BECAUSE_AT_LEAST_ONE_LISTENER_IS_ALREADY_ADDED
-                .toLocalizedString(id));
-      }
-//      if (this.attrs.getOrderPolicy() != null) {
-//        if (this.attrs.getDispatcherThreads() == GatewaySender.DEFAULT_DISPATCHER_THREADS) {
-//          throw new GatewaySenderException(
-//              LocalizedStrings.SerialGatewaySender_INVALID_GATEWAY_SENDER_ORDER_POLICY_CONCURRENCY_0
-//                  .toLocalizedString(id));
-//        }
-//      }
-      if (this.attrs.getOrderPolicy() == null && this.attrs.getDispatcherThreads() > 1) {
-        this.attrs.policy = GatewaySender.DEFAULT_ORDER_POLICY;
-      }
-      if (this.cache instanceof GemFireCacheImpl) {
-        sender = new SerialGatewaySenderImpl(this.cache, this.attrs);
-        ((GemFireCacheImpl)this.cache).addGatewaySender(sender);
-
-        if (!this.attrs.isManualStart()) {
-          sender.start();
-        }
-      }
-      else if (this.cache instanceof CacheCreation) {
-        sender = new SerialGatewaySenderCreation(this.cache, this.attrs);
-        ((CacheCreation)this.cache).addGatewaySender(sender);
-      }
-    }
-    return sender;
-  }
-
-  public GatewaySender create(String id) {
-    this.attrs.id = id;
-    GatewaySender sender = null;
-
-    if(this.attrs.getDispatcherThreads() <= 0) {
-      throw new AsyncEventQueueConfigurationException(
-          LocalizedStrings.AsyncEventQueue_0_CANNOT_HAVE_DISPATCHER_THREADS_LESS_THAN_1
-              .toLocalizedString(id));
-    }
-    
-    if (this.attrs.isParallel()) {
-      if ((this.attrs.getOrderPolicy() != null)
-          && this.attrs.getOrderPolicy().equals(OrderPolicy.THREAD)) {
-        throw new AsyncEventQueueConfigurationException(
-            LocalizedStrings.AsyncEventQueue_0_CANNOT_BE_CREATED_WITH_ORDER_POLICY_1
-                .toLocalizedString(id, this.attrs.getOrderPolicy()));
-      }
-      
-      if (this.cache instanceof GemFireCacheImpl) {
-        sender = new ParallelGatewaySenderImpl(this.cache, this.attrs);
-        ((GemFireCacheImpl)this.cache).addGatewaySender(sender);
-        if (!this.attrs.isManualStart()) {
-          sender.start();
-        }
-      }
-      else if (this.cache instanceof CacheCreation) {
-        sender = new ParallelGatewaySenderCreation(this.cache, this.attrs);
-        ((CacheCreation)this.cache).addGatewaySender(sender);
-      }
-    }
-    else {
-//      if (this.attrs.getOrderPolicy() != null) {
-//        if (this.attrs.getDispatcherThreads() == GatewaySender.DEFAULT_DISPATCHER_THREADS) {
-//          throw new AsyncEventQueueConfigurationException(
-//              LocalizedStrings.AsyncEventQueue_INVALID_ORDER_POLICY_CONCURRENCY_0
-//                  .toLocalizedString(id));
-//        }
-//      }
-      if (this.attrs.getOrderPolicy() == null && this.attrs.getDispatcherThreads() > 1) {
-         this.attrs.policy = GatewaySender.DEFAULT_ORDER_POLICY;
-      }
-      if (this.cache instanceof GemFireCacheImpl) {
-        sender = new SerialGatewaySenderImpl(this.cache, this.attrs);
-        ((GemFireCacheImpl)this.cache).addGatewaySender(sender);
-        if (!this.attrs.isManualStart()) {
-          sender.start();
-        }
-      }
-      else if (this.cache instanceof CacheCreation) {
-        sender = new SerialGatewaySenderCreation(this.cache, this.attrs);
-        ((CacheCreation)this.cache).addGatewaySender(sender);
-      }
-    }
-    return sender;
-  }
-  
-  public GatewaySenderFactory removeGatewayEventFilter(
-      GatewayEventFilter filter) {
-    this.attrs.eventFilters.remove(filter);
-    return this;
-  }
-
-  public GatewaySenderFactory removeGatewayTransportFilter(
-      GatewayTransportFilter filter) {
-    this.attrs.transFilters.remove(filter);
-    return this;
-  } 
-  
-  public GatewaySenderFactory setGatewayEventSubstitutionFilter(
-      GatewayEventSubstitutionFilter filter) {
-    this.attrs.eventSubstitutionFilter = filter;
-    return this;
-  }
-
-  public void configureGatewaySender(GatewaySender senderCreation) {
-    this.attrs.isParallel = senderCreation.isParallel();
-    this.attrs.manualStart = senderCreation.isManualStart();
-    this.attrs.socketBufferSize = senderCreation.getSocketBufferSize();
-    this.attrs.socketReadTimeout = senderCreation.getSocketReadTimeout();
-    this.attrs.isBatchConflationEnabled = senderCreation.isBatchConflationEnabled();
-    this.attrs.batchSize = senderCreation.getBatchSize();
-    this.attrs.batchTimeInterval = senderCreation.getBatchTimeInterval();
-    this.attrs.isPersistenceEnabled = senderCreation.isPersistenceEnabled();
-    this.attrs.diskStoreName = senderCreation.getDiskStoreName();
-    this.attrs.isDiskSynchronous = senderCreation.isDiskSynchronous();
-    this.attrs.maximumQueueMemory = senderCreation.getMaximumQueueMemory();
-    this.attrs.alertThreshold = senderCreation.getAlertThreshold();
-    this.attrs.dispatcherThreads = senderCreation.getDispatcherThreads();
-    this.attrs.policy = senderCreation.getOrderPolicy();
-    for(GatewayEventFilter filter : senderCreation.getGatewayEventFilters()){
-      this.attrs.eventFilters.add(filter);
-    }
-    for(GatewayTransportFilter filter : senderCreation.getGatewayTransportFilters()){
-      this.attrs.transFilters.add(filter);
-    }
-    this.attrs.eventSubstitutionFilter = senderCreation.getGatewayEventSubstitutionFilter();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
deleted file mode 100644
index 322b1ba..0000000
--- a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
+++ /dev/null
@@ -1,263 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.wan.parallel;
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.cache.Cache;
-import com.gemstone.gemfire.cache.EntryOperation;
-import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
-import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
-import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueStats;
-import com.gemstone.gemfire.cache.wan.GatewaySender;
-import com.gemstone.gemfire.cache.wan.GatewayTransportFilter;
-import com.gemstone.gemfire.distributed.internal.DistributionAdvisor.Profile;
-import com.gemstone.gemfire.distributed.internal.DistributionAdvisor;
-import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
-import com.gemstone.gemfire.distributed.internal.ResourceEvent;
-import com.gemstone.gemfire.internal.cache.DistributedRegion;
-import com.gemstone.gemfire.internal.cache.EntryEventImpl;
-import com.gemstone.gemfire.internal.cache.EventID;
-import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
-import com.gemstone.gemfire.internal.cache.PartitionedRegionHelper;
-import com.gemstone.gemfire.internal.cache.UpdateAttributesProcessor;
-import com.gemstone.gemfire.internal.cache.ha.ThreadIdentifier;
-import com.gemstone.gemfire.internal.cache.wan.AbstractRemoteGatewaySender;
-import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor;
-import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAdvisor;
-import com.gemstone.gemfire.internal.cache.wan.GatewaySenderStats;
-import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAdvisor.GatewaySenderProfile;
-import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAttributes;
-import com.gemstone.gemfire.internal.cache.xmlcache.CacheCreation;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-import com.gemstone.gemfire.internal.logging.LogService;
-import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
-import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
-
-/**
- * @since GemFire 7.0
- *
- */
-public class ParallelGatewaySenderImpl extends AbstractRemoteGatewaySender {
-  
-  private static final Logger logger = LogService.getLogger();
-  
-  final ThreadGroup loggerGroup = LoggingThreadGroup.createThreadGroup(
-      "Remote Site Discovery Logger Group", logger);
-  
-  public ParallelGatewaySenderImpl(){
-    super();
-    this.isParallel = true;
-  }
-  
-  public ParallelGatewaySenderImpl(Cache cache, GatewaySenderAttributes attrs) {
-    super(cache, attrs);
-  }
-  
-  @Override
-  public void start() {
-    this.getLifeCycleLock().writeLock().lock(); 
-    try {
-      if (isRunning()) {
-        logger.warn(LocalizedMessage.create(LocalizedStrings.GatewaySender_SENDER_0_IS_ALREADY_RUNNING, this.getId()));
-        return;
-      }
-
-      if (this.remoteDSId != DEFAULT_DISTRIBUTED_SYSTEM_ID) {
-        String locators = ((GemFireCacheImpl)this.cache).getDistributedSystem()
-            .getConfig().getLocators();
-        if (locators.length() == 0) {
-          throw new IllegalStateException(
-              LocalizedStrings.AbstractGatewaySender_LOCATOR_SHOULD_BE_CONFIGURED_BEFORE_STARTING_GATEWAY_SENDER
-                  .toLocalizedString());
-        }
-      }
-      /*
-       * Now onwards all processing will happen through "ConcurrentParallelGatewaySenderEventProcessor"
-       * we have made "ParallelGatewaySenderEventProcessor" and "ParallelGatewaySenderQueue" as a
-       * utility classes of Concurrent version of processor and queue.
-       */
-      eventProcessor = new RemoteConcurrentParallelGatewaySenderEventProcessor(this);
-      /*if (getDispatcherThreads() > 1) {
-        eventProcessor = new ConcurrentParallelGatewaySenderEventProcessor(this);
-      } else {
-        eventProcessor = new ParallelGatewaySenderEventProcessor(this);
-      }*/
-      
-      eventProcessor.start();
-      waitForRunningStatus();
-      //Only notify the type registry if this is a WAN gateway queue
-      if(!isAsyncEventQueue()) {
-        ((GemFireCacheImpl) getCache()).getPdxRegistry().gatewaySenderStarted(this);
-      }
-      new UpdateAttributesProcessor(this).distribute(false);
-     
-      InternalDistributedSystem system = (InternalDistributedSystem) this.cache
-          .getDistributedSystem();
-      system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_START, this);
-      
-      logger.info(LocalizedMessage.create(LocalizedStrings.ParallelGatewaySenderImpl_STARTED__0, this));
-      
-      enqueueTempEvents();
-    }
-    finally {
-      this.getLifeCycleLock().writeLock().unlock();
-    }
-  }
-  
-//  /**
-//   * The sender is not started but only the message queue i.e. shadowPR is created on the node.
-//   * @param targetPr
-//   */
-//  private void createMessageQueueOnAccessorNode(PartitionedRegion targetPr) {
-//    eventProcessor = new ParallelGatewaySenderEventProcessor(this, targetPr);
-//  }
-  
-
-  @Override
-  public void stop() {
-    this.getLifeCycleLock().writeLock().lock(); 
-    try {
-      if (!this.isRunning()) {
-        return;
-      }
-      // Stop the dispatcher
-      AbstractGatewaySenderEventProcessor ev = this.eventProcessor;
-      //try {
-      if (ev != null && !ev.isStopped()) {
-        ev.stopProcessing();
-      }
-
-      // Stop the proxy (after the dispatcher, so the socket is still
-      // alive until after the dispatcher has stopped)
-      stompProxyDead();
-
-      // Close the listeners
-      for (AsyncEventListener listener : this.listeners) {
-        listener.close();
-      }
-      //stop the running threads, open sockets if any
-      ((ConcurrentParallelGatewaySenderQueue)this.eventProcessor.getQueue()).cleanUp();
-
-      logger.info(LocalizedMessage.create(LocalizedStrings.GatewayImpl_STOPPED__0, this));
-      
-      InternalDistributedSystem system = (InternalDistributedSystem) this.cache
-      .getDistributedSystem();
-      system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_STOP, this);
-      
-      clearTempEventsAfterSenderStopped();
-      // Keep the eventProcessor around so we can ask it for the regionQueues later.
-      // Tests expect to be able to do this. 
-//      } finally {
-//        this.eventProcessor = null;
-//      }
-    }
-    finally {
-      this.getLifeCycleLock().writeLock().unlock();
-    }
-  }
-  
-  @Override
-  public String toString() {
-    StringBuffer sb = new StringBuffer();
-    sb.append("ParallelGatewaySender{");
-    sb.append("id=" + getId());
-    sb.append(",remoteDsId="+ getRemoteDSId());
-    sb.append(",isRunning ="+ isRunning());
-    sb.append("}");
-    return sb.toString();
-  }
-
-  public void fillInProfile(Profile profile) {
-    assert profile instanceof GatewaySenderProfile;
-    GatewaySenderProfile pf = (GatewaySenderProfile)profile;
-    pf.Id = getId();
-    pf.remoteDSId = getRemoteDSId();
-    pf.isRunning = isRunning();
-    pf.isPrimary = isPrimary();
-    pf.isParallel = true;
-    pf.isBatchConflationEnabled = isBatchConflationEnabled();
-    pf.isPersistenceEnabled = isPersistenceEnabled();
-    pf.alertThreshold = getAlertThreshold();
-    pf.manualStart = isManualStart();
-    pf.dispatcherThreads = getDispatcherThreads();
-    pf.orderPolicy = getOrderPolicy();
-    for (com.gemstone.gemfire.cache.wan.GatewayEventFilter filter : getGatewayEventFilters()) {
-      pf.eventFiltersClassNames.add(filter.getClass().getName());
-    }
-    for (GatewayTransportFilter filter : getGatewayTransportFilters()) {
-      pf.transFiltersClassNames.add(filter.getClass().getName());
-    }
-    for (AsyncEventListener listener : getAsyncEventListeners()) {
-      pf.senderEventListenerClassNames.add(listener.getClass().getName());
-    }
-    pf.isDiskSynchronous = isDiskSynchronous();
-  }
-
-  /* (non-Javadoc)
-   * @see com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender#setModifiedEventId(com.gemstone.gemfire.internal.cache.EntryEventImpl)
-   */
-  @Override
-  protected void setModifiedEventId(EntryEventImpl clonedEvent) {
-    int bucketId = -1;
-    //merged from 42004
-    if (clonedEvent.getRegion() instanceof DistributedRegion) {
-//      if (getOrderPolicy() == OrderPolicy.THREAD) {
-//        bucketId = PartitionedRegionHelper.getHashKey(
-//            ((EntryEventImpl)clonedEvent).getEventId().getThreadID(),
-//            getMaxParallelismForReplicatedRegion());
-//      }
-//      else
-        bucketId = PartitionedRegionHelper.getHashKey(clonedEvent.getKey(),
-            getMaxParallelismForReplicatedRegion());
-    }
-    else {
-      bucketId = PartitionedRegionHelper
-          .getHashKey((EntryOperation)clonedEvent);
-    }
-    EventID originalEventId = clonedEvent.getEventId();
-    long originatingThreadId = ThreadIdentifier.getRealThreadID(originalEventId.getThreadID());
-
-    long newThreadId = ThreadIdentifier
-    .createFakeThreadIDForParallelGSPrimaryBucket(bucketId,
-        originatingThreadId, getEventIdIndex());
-    
-    // In case of parallel as all events go through primary buckets
-    // we don't need to generate different threadId for secondary buckets
-    // as they will be rejected if seen at PR level itself
-    
-//    boolean isPrimary = ((PartitionedRegion)getQueue().getRegion())
-//    .getRegionAdvisor().getBucketAdvisor(bucketId).isPrimary();
-//    if (isPrimary) {
-//      newThreadId = ThreadIdentifier
-//          .createFakeThreadIDForParallelGSPrimaryBucket(bucketId,
-//              originatingThreadId);
-//    } else {
-//      newThreadId = ThreadIdentifier
-//          .createFakeThreadIDForParallelGSSecondaryBucket(bucketId,
-//              originatingThreadId);
-//    }
-
-    EventID newEventId = new EventID(originalEventId.getMembershipID(),
-        newThreadId, originalEventId.getSequenceID(), bucketId);
-    if (logger.isDebugEnabled()) {
-      logger.debug("{}: Generated event id for event with key={}, bucketId={}, original event id={}, threadId={}, new event id={}, newThreadId={}",
-          this, clonedEvent.getKey(), bucketId, originalEventId, originatingThreadId, newEventId, newThreadId);
-    }
-    clonedEvent.setEventId(newEventId);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/RemoteConcurrentParallelGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/RemoteConcurrentParallelGatewaySenderEventProcessor.java b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/RemoteConcurrentParallelGatewaySenderEventProcessor.java
deleted file mode 100644
index 35cdece..0000000
--- a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/RemoteConcurrentParallelGatewaySenderEventProcessor.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.wan.parallel;
-
-
-import java.util.Set;
-
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
-import com.gemstone.gemfire.internal.cache.wan.GatewaySenderStats;
-import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventRemoteDispatcher;
-import com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderEventProcessor;
-/**
- * Remote version of GatewaySenderEvent Processor
- *
- */
-public class RemoteConcurrentParallelGatewaySenderEventProcessor extends ConcurrentParallelGatewaySenderEventProcessor{
-  
-  public RemoteConcurrentParallelGatewaySenderEventProcessor(
-      AbstractGatewaySender sender) {
-    super(sender);
-  }
-
-  @Override
-  protected void createProcessors(int dispatcherThreads, Set<Region> targetRs) {
-    processors = new RemoteParallelGatewaySenderEventProcessor[sender.getDispatcherThreads()];
-    if (logger.isDebugEnabled()) {
-      logger.debug("Creating GatewaySenderEventProcessor");
-    }
-    for (int i = 0; i < sender.getDispatcherThreads(); i++) {
-      processors[i] = new RemoteParallelGatewaySenderEventProcessor(sender,
-          targetRs, i, sender.getDispatcherThreads());
-    }
-  }
-  
-  @Override
-  protected void rebalance() {
-    GatewaySenderStats statistics = this.sender.getStatistics();
-    long startTime = statistics.startLoadBalance();
-    try {
-      for (ParallelGatewaySenderEventProcessor parallelProcessor : this.processors) {
-        GatewaySenderEventRemoteDispatcher remoteDispatcher = (GatewaySenderEventRemoteDispatcher)parallelProcessor.getDispatcher();
-        if (remoteDispatcher.isConnectedToRemote()) {
-          remoteDispatcher.stopAckReaderThread();
-          remoteDispatcher.destroyConnection();
-        }
-      }
-    } finally {
-      statistics.endLoadBalance(startTime);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java
deleted file mode 100644
index 815932e..0000000
--- a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.wan.parallel;
-
-import java.io.IOException;
-import java.util.Set;
-
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache.client.internal.Connection;
-import com.gemstone.gemfire.cache.client.internal.pooling.ConnectionDestroyedException;
-import com.gemstone.gemfire.cache.wan.GatewaySender;
-import com.gemstone.gemfire.internal.Version;
-import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
-import com.gemstone.gemfire.internal.cache.wan.GatewaySenderConfigurationException;
-import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventCallbackDispatcher;
-import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventDispatcher;
-import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventRemoteDispatcher;
-import com.gemstone.gemfire.internal.cache.wan.GatewaySenderException;
-import com.gemstone.gemfire.internal.cache.wan.GatewaySenderStats;
-import com.gemstone.gemfire.internal.logging.LogService;
-
-public class RemoteParallelGatewaySenderEventProcessor extends ParallelGatewaySenderEventProcessor {
-  private static final Logger logger = LogService.getLogger();
-  
-  protected RemoteParallelGatewaySenderEventProcessor(
-      AbstractGatewaySender sender) {
-    super(sender);
-  }
-  
-  /**
-   * use in concurrent scenario where queue is to be shared among all the processors.
-   */
-  protected RemoteParallelGatewaySenderEventProcessor(AbstractGatewaySender sender,  Set<Region> userRegions, int id, int nDispatcher) {
-    super(sender,  userRegions, id, nDispatcher);
-  }
-  
-  @Override
-  protected void rebalance() {
-    GatewaySenderStats statistics = this.sender.getStatistics();
-    long startTime = statistics.startLoadBalance();
-    try {
-      if (this.dispatcher.isRemoteDispatcher()) {
-        GatewaySenderEventRemoteDispatcher remoteDispatcher = (GatewaySenderEventRemoteDispatcher) this.dispatcher;
-        if (remoteDispatcher.isConnectedToRemote()) {
-          remoteDispatcher.stopAckReaderThread();
-          remoteDispatcher.destroyConnection();
-        }
-      }
-    } finally {
-      statistics.endLoadBalance(startTime);
-    }
-  }
-  
-  public void initializeEventDispatcher() {
-    if (logger.isDebugEnabled()) {
-      logger.debug(" Creating the GatewayEventRemoteDispatcher");
-    }
-    if (this.sender.getRemoteDSId() != GatewaySender.DEFAULT_DISTRIBUTED_SYSTEM_ID) {
-      this.dispatcher = new GatewaySenderEventRemoteDispatcher(this);
-    }
-  }
-  
-  /**
-   * Returns if corresponding receiver WAN site of this GatewaySender has
-   * GemfireVersion > 7.0.1
-   * 
-   * @param disp
-   * @return true if remote site Gemfire Version is >= 7.0.1
-   */
-  private boolean shouldSendVersionEvents(GatewaySenderEventDispatcher disp)
-      throws GatewaySenderException {
-      try {
-        GatewaySenderEventRemoteDispatcher remoteDispatcher = (GatewaySenderEventRemoteDispatcher) disp;
-        // This will create a new connection if no batch has been sent till
-        // now.
-        Connection conn = remoteDispatcher.getConnection(false);
-        if (conn != null) {
-          short remoteSiteVersion = conn.getWanSiteVersion();
-          if (Version.GFE_701.compareTo(remoteSiteVersion) <= 0) {
-            return true;
-          }
-        }
-      } catch (GatewaySenderException e) {
-        Throwable cause = e.getCause();
-        if (cause instanceof IOException
-            || e instanceof GatewaySenderConfigurationException
-            || cause instanceof ConnectionDestroyedException) {
-          try {
-            int sleepInterval = GatewaySender.CONNECTION_RETRY_INTERVAL;
-            if (logger.isDebugEnabled()) {
-              logger.debug("Sleeping for {} milliseconds", sleepInterval);
-            }
-            Thread.sleep(sleepInterval);
-          } catch (InterruptedException ie) {
-            // log the exception
-            if (logger.isDebugEnabled()){
-              logger.debug(ie.getMessage(), ie);
-            }
-          }
-        }
-        throw e;
-      }
-    return false;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/RemoteConcurrentSerialGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/RemoteConcurrentSerialGatewaySenderEventProcessor.java b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/RemoteConcurrentSerialGatewaySenderEventProcessor.java
deleted file mode 100644
index 8a25ab6..0000000
--- a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/RemoteConcurrentSerialGatewaySenderEventProcessor.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.wan.serial;
-
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
-import com.gemstone.gemfire.internal.logging.LogService;
-
-public class RemoteConcurrentSerialGatewaySenderEventProcessor extends
-    ConcurrentSerialGatewaySenderEventProcessor {
-
-  private static final Logger logger = LogService.getLogger();
-  
-  public RemoteConcurrentSerialGatewaySenderEventProcessor(
-      AbstractGatewaySender sender) {
-    super(sender);
-  }
-
-  @Override
-  protected void initializeMessageQueue(String id) {
-    for (int i = 0; i < sender.getDispatcherThreads(); i++) {
-      processors.add(new RemoteSerialGatewaySenderEventProcessor(this.sender, id
-          + "." + i));
-      if (logger.isDebugEnabled()) {
-        logger.debug("Created the RemoteSerialGatewayEventProcessor_{}->{}", i, processors.get(i));
-      }
-    }
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java
deleted file mode 100644
index 82fa585..0000000
--- a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.wan.serial;
-
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventRemoteDispatcher;
-import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventCallbackDispatcher;
-import com.gemstone.gemfire.cache.wan.GatewaySender;
-import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
-import com.gemstone.gemfire.internal.logging.LogService;
-
-public class RemoteSerialGatewaySenderEventProcessor extends
-    SerialGatewaySenderEventProcessor {
-
-  private static final Logger logger = LogService.getLogger();
-  public RemoteSerialGatewaySenderEventProcessor(AbstractGatewaySender sender,
-      String id) {
-    super(sender, id);
-  }
-
-  public void initializeEventDispatcher() {
-    if (logger.isDebugEnabled()) {
-      logger.debug(" Creating the GatewayEventRemoteDispatcher");
-    }
-    // In case of serial there is a way to create gatewaySender and attach
-    // asyncEventListener. Not sure of the use-case but there are dunit tests
-    // To make them pass uncommenting the below condition
-    if (this.sender.getRemoteDSId() != GatewaySender.DEFAULT_DISTRIBUTED_SYSTEM_ID) {
-      this.dispatcher = new GatewaySenderEventRemoteDispatcher(this);
-    }else{
-      this.dispatcher = new GatewaySenderEventCallbackDispatcher(this);
-    }
-  }
-  
-}