You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by hi...@apache.org on 2016/09/13 22:44:48 UTC
[60/61] [abbrv] incubator-geode git commit: GEODE-37 change package
name from com.gemstone.gemfire (for
./geode-wan/src/main/java/com/gemstone/gemfire)to org.apache.geode for(to
./geode-wan/src/main/java/org/apache/geode)
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewayReceiverFactoryImpl.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewayReceiverFactoryImpl.java b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewayReceiverFactoryImpl.java
deleted file mode 100644
index c5c2a2f..0000000
--- a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewayReceiverFactoryImpl.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.wan;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import com.gemstone.gemfire.cache.Cache;
-import com.gemstone.gemfire.cache.wan.GatewayReceiver;
-import com.gemstone.gemfire.cache.wan.GatewayReceiverFactory;
-import com.gemstone.gemfire.cache.wan.GatewayTransportFilter;
-import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
-import com.gemstone.gemfire.distributed.internal.ResourceEvent;
-import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
-import com.gemstone.gemfire.internal.cache.xmlcache.CacheCreation;
-import com.gemstone.gemfire.internal.cache.xmlcache.GatewayReceiverCreation;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-
-/**
- *
- * @since GemFire 7.0
- */
-public class GatewayReceiverFactoryImpl implements GatewayReceiverFactory {
-
- private int startPort = GatewayReceiver.DEFAULT_START_PORT;
-
- private int endPort = GatewayReceiver.DEFAULT_END_PORT;
-
- private int timeBetPings = GatewayReceiver.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS;
-
- private int socketBuffSize = GatewayReceiver.DEFAULT_SOCKET_BUFFER_SIZE;
-
- private String bindAdd= GatewayReceiver.DEFAULT_BIND_ADDRESS;
-
- private String hostnameForSenders = GatewayReceiver.DEFAULT_HOSTNAME_FOR_SENDERS;
-
- private boolean manualStart = GatewayReceiver.DEFAULT_MANUAL_START;
-
- private List<GatewayTransportFilter> filters = new ArrayList<GatewayTransportFilter>();
-
- private Cache cache;
-
- public GatewayReceiverFactoryImpl() {
-
- }
- public GatewayReceiverFactoryImpl(Cache cache) {
- this.cache = cache;
- }
-
- public GatewayReceiverFactory addGatewayTransportFilter(
- GatewayTransportFilter filter) {
- this.filters.add(filter);
- return this;
- }
-
- public GatewayReceiverFactory removeGatewayTransportFilter(
- GatewayTransportFilter filter) {
- this.filters.remove(filter);
- return this;
- }
-
- public GatewayReceiverFactory setMaximumTimeBetweenPings(int time) {
- this.timeBetPings = time;
- return this;
- }
-
- public GatewayReceiverFactory setStartPort(int port) {
- this.startPort = port;
- return this;
- }
-
- public GatewayReceiverFactory setEndPort(int port) {
- this.endPort = port;
- return this;
- }
-
- public GatewayReceiverFactory setSocketBufferSize(int size) {
- this.socketBuffSize = size;
- return this;
- }
-
- public GatewayReceiverFactory setBindAddress(String address) {
- this.bindAdd = address;
- return this;
- }
-
- public GatewayReceiverFactory setHostnameForSenders(String address) {
- this.hostnameForSenders = address;
- return this;
- }
-
- public GatewayReceiverFactory setManualStart(boolean start) {
- this.manualStart = start;
- return this;
- }
-
- public GatewayReceiver create() {
- if (this.startPort > this.endPort) {
- throw new IllegalStateException(
- "Please specify either start port a value which is less than end port.");
- }
- GatewayReceiver recv = null;
- if (this.cache instanceof GemFireCacheImpl) {
- recv = new GatewayReceiverImpl(this.cache, this.startPort, this.endPort,
- this.timeBetPings, this.socketBuffSize, this.bindAdd, this.filters,
- this.hostnameForSenders, this.manualStart);
- ((GemFireCacheImpl)cache).addGatewayReceiver(recv);
- InternalDistributedSystem system = (InternalDistributedSystem) this.cache
- .getDistributedSystem();
- system.handleResourceEvent(ResourceEvent.GATEWAYRECEIVER_CREATE, recv);
- if (!this.manualStart) {
- try {
- recv.start();
- }
- catch (IOException ioe) {
- throw new GatewayReceiverException(
- LocalizedStrings.GatewayReceiver_EXCEPTION_WHILE_STARTING_GATEWAY_RECEIVER
- .toLocalizedString(), ioe);
- }
- }
- } else if (this.cache instanceof CacheCreation) {
- recv = new GatewayReceiverCreation(this.cache, this.startPort, this.endPort,
- this.timeBetPings, this.socketBuffSize, this.bindAdd, this.filters,
- this.hostnameForSenders, this.manualStart);
- ((CacheCreation)cache).addGatewayReceiver(recv);
- }
- return recv;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewayReceiverImpl.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewayReceiverImpl.java b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewayReceiverImpl.java
deleted file mode 100644
index b8768d4..0000000
--- a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewayReceiverImpl.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.wan;
-
-import java.io.IOException;
-import java.net.BindException;
-import java.net.SocketException;
-import java.net.UnknownHostException;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.cache.Cache;
-import com.gemstone.gemfire.cache.server.CacheServer;
-import com.gemstone.gemfire.cache.wan.GatewayReceiver;
-import com.gemstone.gemfire.cache.wan.GatewayTransportFilter;
-import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
-import com.gemstone.gemfire.distributed.internal.ResourceEvent;
-import com.gemstone.gemfire.internal.AvailablePort;
-import com.gemstone.gemfire.internal.net.SocketCreator;
-import com.gemstone.gemfire.internal.cache.CacheServerImpl;
-import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-import com.gemstone.gemfire.internal.logging.LogService;
-import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
-
-/**
- * @since GemFire 7.0
- */
-@SuppressWarnings("deprecation")
-public class GatewayReceiverImpl implements GatewayReceiver {
-
- private static final Logger logger = LogService.getLogger();
-
- private String host;
-
- private int startPort;
-
- private int endPort;
-
- private int port;
-
- private int timeBetPings;
-
- private int socketBufferSize;
-
- private boolean manualStart;
-
- private final List<GatewayTransportFilter> filters;
-
- private String bindAdd;
-
- private CacheServer receiver;
-
- private final GemFireCacheImpl cache;
-
- public GatewayReceiverImpl(Cache cache, int startPort,
- int endPort, int timeBetPings, int buffSize, String bindAdd,
- List<GatewayTransportFilter> filters, String hostnameForSenders, boolean manualStart) {
- this.cache = (GemFireCacheImpl)cache;
-
- /*
- * If user has set hostNameForSenders then it should take precedence over
- * bindAddress. If user hasn't set either hostNameForSenders or bindAddress
- * then getLocalHost().getHostName() should be used.
- */
- if (hostnameForSenders == null || hostnameForSenders.isEmpty()) {
- if (bindAdd == null || bindAdd.isEmpty()) {
- try {
- logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayReceiverImpl_USING_LOCAL_HOST));
- this.host = SocketCreator.getLocalHost().getHostName();
- } catch (UnknownHostException e) {
- throw new IllegalStateException(
- LocalizedStrings.GatewayReceiverImpl_COULD_NOT_GET_HOST_NAME
- .toLocalizedString(),
- e);
- }
- } else {
- this.host = bindAdd;
- }
- } else {
- this.host = hostnameForSenders;
- }
-
- this.startPort = startPort;
- this.endPort = endPort;
- this.timeBetPings = timeBetPings;
- this.socketBufferSize = buffSize;
- this.bindAdd = bindAdd;
- this.filters = filters;
- this.manualStart = manualStart;
- }
-
- public List<GatewayTransportFilter> getGatewayTransportFilters() {
- return this.filters;
- }
-
- public int getMaximumTimeBetweenPings() {
- return this.timeBetPings;
- }
-
- public int getPort() {
- return this.port;
- }
-
- public int getStartPort() {
- return this.startPort;
- }
-
- public int getEndPort() {
- return this.endPort;
- }
-
- public int getSocketBufferSize() {
- return this.socketBufferSize;
- }
-
- public boolean isManualStart() {
- return this.manualStart;
- }
-
- public CacheServer getServer() {
- return receiver;
- }
-
- public void start() throws IOException {
- if (receiver == null) {
- receiver = this.cache.addCacheServer(true);
- }
- if (receiver.isRunning()) {
- logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayReceiver_IS_ALREADY_RUNNING));
- return;
- }
- boolean started = false;
- this.port = getPortToStart();
- while (!started && this.port != -1) {
- receiver.setPort(this.port);
- receiver.setSocketBufferSize(socketBufferSize);
- receiver.setMaximumTimeBetweenPings(timeBetPings);
- receiver.setHostnameForClients(host);
- receiver.setBindAddress(bindAdd);
- receiver.setGroups(new String[] { GatewayReceiverImpl.RECEIVER_GROUP });
- ((CacheServerImpl)receiver).setGatewayTransportFilter(this.filters);
- try {
- ((CacheServerImpl)receiver).start();
- started = true;
- } catch (BindException be) {
- if (be.getCause() != null
- && be.getCause().getMessage()
- .contains("assign requested address")) {
- throw new GatewayReceiverException(
- LocalizedStrings.SocketCreator_FAILED_TO_CREATE_SERVER_SOCKET_ON_0_1
- .toLocalizedString(new Object[] { bindAdd,
- Integer.valueOf(this.port) }));
- }
- // ignore as this port might have been used by other threads.
- logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayReceiver_Address_Already_In_Use, this.port));
- this.port = getPortToStart();
- } catch (SocketException se) {
- if (se.getMessage().contains("Address already in use")) {
- logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayReceiver_Address_Already_In_Use, this.port));
- this.port = getPortToStart();
-
- } else {
- throw se;
- }
- }
-
- }
- if (!started) {
- throw new IllegalStateException(
- "No available free port found in the given range.");
- }
- logger.info(LocalizedMessage.create(LocalizedStrings.GatewayReceiver_STARTED_ON_PORT, this.port));
-
- InternalDistributedSystem system = this.cache.getDistributedSystem();
- system.handleResourceEvent(ResourceEvent.GATEWAYRECEIVER_START, this);
-
- }
-
- private int getPortToStart(){
- // choose a random port from the given port range
- int rPort;
- if (this.startPort == this.endPort) {
- rPort = this.startPort;
- } else {
- rPort = AvailablePort.getRandomAvailablePortInRange(this.startPort,
- this.endPort, AvailablePort.SOCKET);
- }
- return rPort;
- }
-
- public void stop() {
- if(!isRunning()){
- throw new GatewayReceiverException(LocalizedStrings.GatewayReceiver_IS_NOT_RUNNING.toLocalizedString());
- }
- receiver.stop();
-
-// InternalDistributedSystem system = ((GemFireCacheImpl) this.cache)
-// .getDistributedSystem();
-// system.handleResourceEvent(ResourceEvent.GATEWAYRECEIVER_STOP, this);
-
- }
-
- public String getHost() {
- return this.host;
- }
-
- public String getBindAddress() {
- return this.bindAdd;
- }
-
- public boolean isRunning() {
- if (this.receiver != null) {
- return this.receiver.isRunning();
- }
- return false;
- }
-
- public String toString() {
- return new StringBuffer()
- .append("Gateway Receiver")
- .append("@").append(Integer.toHexString(hashCode()))
- .append(" [")
- .append("host='").append(getHost())
- .append("'; port=").append(getPort())
- .append("; bindAddress=").append(getBindAddress())
- .append("; maximumTimeBetweenPings=").append(getMaximumTimeBetweenPings())
- .append("; socketBufferSize=").append(getSocketBufferSize())
- .append("; isManualStart=").append(isManualStart())
- .append("; group=").append(Arrays.toString(new String[]{GatewayReceiverImpl.RECEIVER_GROUP}))
- .append("]")
- .toString();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
deleted file mode 100644
index d2302c4..0000000
--- a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
+++ /dev/null
@@ -1,802 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.wan;
-
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import com.gemstone.gemfire.GemFireIOException;
-import com.gemstone.gemfire.internal.cache.tier.sockets.MessageTooLargeException;
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.CancelException;
-import com.gemstone.gemfire.cache.Cache;
-import com.gemstone.gemfire.cache.RegionDestroyedException;
-import com.gemstone.gemfire.cache.client.ServerConnectivityException;
-import com.gemstone.gemfire.cache.client.ServerOperationException;
-import com.gemstone.gemfire.cache.client.internal.Connection;
-import com.gemstone.gemfire.cache.client.internal.pooling.ConnectionDestroyedException;
-import com.gemstone.gemfire.cache.wan.GatewaySender;
-import com.gemstone.gemfire.distributed.internal.ServerLocation;
-import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
-import com.gemstone.gemfire.internal.cache.UpdateAttributesProcessor;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-import com.gemstone.gemfire.internal.logging.LogService;
-import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
-import com.gemstone.gemfire.pdx.PdxRegistryMismatchException;
-import com.gemstone.gemfire.security.GemFireSecurityException;
-import com.gemstone.gemfire.cache.client.internal.SenderProxy;
-
-/**
- * @since GemFire 7.0
- *
- */
-public class GatewaySenderEventRemoteDispatcher implements
- GatewaySenderEventDispatcher {
-
- private static final Logger logger = LogService.getLogger();
-
- private final AbstractGatewaySenderEventProcessor processor;
-
- private volatile Connection connection;
-
- private final Set<String> notFoundRegions = new HashSet<String>();
-
- private final Object notFoundRegionsSync = new Object();
-
- private final AbstractGatewaySender sender;
-
- private AckReaderThread ackReaderThread;
-
- private ReentrantReadWriteLock connectionLifeCycleLock = new ReentrantReadWriteLock();
-
- /**
- * This count is reset to 0 each time a successful connection is made.
- */
- private int failedConnectCount = 0;
-
- public GatewaySenderEventRemoteDispatcher(AbstractGatewaySenderEventProcessor eventProcessor) {
- this.processor = eventProcessor;
- this.sender = eventProcessor.getSender();
-// this.ackReaderThread = new AckReaderThread(sender);
- try {
- initializeConnection();
- }
- catch (GatewaySenderException e) {
- if (e.getCause() instanceof GemFireSecurityException) {
- throw e;
- }
- }
- }
-
- protected GatewayAck readAcknowledgement() {
- SenderProxy sp = new SenderProxy(this.processor.getSender().getProxy());
- GatewayAck ack = null;
- Exception ex;
- try {
- connection = getConnection(false);
- if (logger.isDebugEnabled()) {
- logger.debug(" Receiving ack on the thread {}", connection);
- }
- this.connectionLifeCycleLock.readLock().lock();
- try {
- if (connection != null) {
- ack = (GatewayAck)sp.receiveAckFromReceiver(connection);
- }
- } finally {
- this.connectionLifeCycleLock.readLock().unlock();
- }
-
- } catch (Exception e) {
- Throwable t = e.getCause();
- if (t instanceof BatchException70) {
- // A BatchException has occurred.
- // Do not process the connection as dead since it is not dead.
- ex = (BatchException70)t;
- } else if (e instanceof GatewaySenderException) { //This Exception is thrown from getConnection
- ex = (Exception) e.getCause();
- }else {
- ex = e;
- // keep using the connection if we had a batch exception. Else, destroy
- // it
- destroyConnection();
- }
- if (this.sender.getProxy() == null || this.sender.getProxy().isDestroyed()) {
- // if our pool is shutdown then just be silent
- } else if (ex instanceof IOException
- || (ex instanceof ServerConnectivityException && !(ex.getCause() instanceof PdxRegistryMismatchException))
- || ex instanceof ConnectionDestroyedException) {
- // If the cause is an IOException or a ServerException, sleep and retry.
- // Sleep for a bit and recheck.
- try {
- Thread.sleep(100);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- }
- } else {
- if (!(ex instanceof CancelException)) {
- logger.fatal(LocalizedMessage.create(
- LocalizedStrings.GatewayEventRemoteDispatcher_STOPPING_THE_PROCESSOR_BECAUSE_THE_FOLLOWING_EXCEPTION_OCCURRED_WHILE_PROCESSING_A_BATCH),
- ex);
- }
- this.processor.setIsStopped(true);
- }
- }
- return ack;
- }
-
- @Override
- public boolean dispatchBatch(List events, boolean isRetry) {
- GatewaySenderStats statistics = this.sender.getStatistics();
- boolean success = false;
- try {
- long start = statistics.startTime();
- success =_dispatchBatch(events, isRetry);
- if (success) {
- statistics.endBatch(start, events.size());
- }
- } catch (GatewaySenderException ge) {
-
- Throwable t = ge.getCause();
- if (this.sender.getProxy() == null || this.sender.getProxy().isDestroyed()) {
- // if our pool is shutdown then just be silent
- } else if (t instanceof IOException
- || t instanceof ServerConnectivityException
- || t instanceof ConnectionDestroyedException
- || t instanceof MessageTooLargeException
- || t instanceof IllegalStateException) {
- this.processor.handleException();
- // If the cause is an IOException or a ServerException, sleep and retry.
- // Sleep for a bit and recheck.
- try {
- Thread.sleep(100);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- }
- if (logger.isDebugEnabled()) {
- logger.debug("Because of IOException, failed to dispatch a batch with id : {}", this.processor.getBatchId());
- }
- }
- else {
- logger.fatal(LocalizedMessage.create(
- LocalizedStrings.GatewayEventRemoteDispatcher_STOPPING_THE_PROCESSOR_BECAUSE_THE_FOLLOWING_EXCEPTION_OCCURRED_WHILE_PROCESSING_A_BATCH), ge);
- this.processor.setIsStopped(true);
- }
- }
- catch (CancelException e) {
- if (logger.isDebugEnabled()) {
- logger.debug("Stopping the processor because cancellation occurred while processing a batch");
- }
- this.processor.setIsStopped(true);
- throw e;
- } catch (Exception e) {
- this.processor.setIsStopped(true);
- logger.fatal(LocalizedMessage.create(
- LocalizedStrings.GatewayEventRemoteDispatcher_STOPPING_THE_PROCESSOR_BECAUSE_THE_FOLLOWING_EXCEPTION_OCCURRED_WHILE_PROCESSING_A_BATCH),
- e);
- }
- return success;
- }
-
- private boolean _dispatchBatch(List events, boolean isRetry) {
- Exception ex = null;
- int currentBatchId = this.processor.getBatchId();
- connection = getConnection(true);
- int batchIdForThisConnection = this.processor.getBatchId();
- GatewaySenderStats statistics = this.sender.getStatistics();
- // This means we are writing to a new connection than the previous batch.
- // i.e The connection has been reset. It also resets the batchId.
- if (currentBatchId != batchIdForThisConnection
- || this.processor.isConnectionReset()) {
- return false;
- }
- try {
- if (this.processor.isConnectionReset()) {
- isRetry = true;
- }
- SenderProxy sp = new SenderProxy(this.sender.getProxy());
- this.connectionLifeCycleLock.readLock().lock();
- try {
- if (connection != null) {
- sp.dispatchBatch_NewWAN(connection, events, currentBatchId, isRetry);
- if (logger.isDebugEnabled()) {
- logger.debug("{} : Dispatched batch (id={}) of {} events, queue size: {} on connection {}",
- this.processor.getSender(), currentBatchId, events.size(), this.processor.getQueue().size(), connection);
- }
- } else {
- throw new ConnectionDestroyedException();
- }
- }
- finally{
- this.connectionLifeCycleLock.readLock().unlock();
- }
- return true;
- }
- catch (ServerOperationException e) {
- Throwable t = e.getCause();
- if (t instanceof BatchException70) {
- // A BatchException has occurred.
- // Do not process the connection as dead since it is not dead.
- ex = (BatchException70)t;
- }
- else {
- ex = e;
- // keep using the connection if we had a batch exception. Else, destroy it
- destroyConnection();
- }
- throw new GatewaySenderException(
- LocalizedStrings.GatewayEventRemoteDispatcher_0_EXCEPTION_DURING_PROCESSING_BATCH_1_ON_CONNECTION_2.toLocalizedString(
- new Object[] {this, Integer.valueOf(currentBatchId), connection}), ex);
- }
- catch (GemFireIOException e) {
- Throwable t = e.getCause();
- if (t instanceof MessageTooLargeException) {
- // A MessageTooLargeException has occurred.
- // Do not process the connection as dead since it is not dead.
- ex = (MessageTooLargeException)t;
- // Reduce the batch size by half of the configured batch size or number of events in the current batch (whichever is less)
- int newBatchSize = Math.min(events.size(), this.processor.getBatchSize())/2;
- logger.warn(LocalizedMessage.create(
- LocalizedStrings.GatewaySenderEventRemoteDispatcher_MESSAGE_TOO_LARGE_EXCEPTION, new Object[] { events.size(), newBatchSize }), e);
- this.processor.setBatchSize(newBatchSize);
- statistics.incBatchesResized();
- }
- else {
- ex = e;
- // keep using the connection if we had a MessageTooLargeException. Else, destroy it
- destroyConnection();
- }
- throw new GatewaySenderException(
- LocalizedStrings.GatewayEventRemoteDispatcher_0_EXCEPTION_DURING_PROCESSING_BATCH_1_ON_CONNECTION_2.toLocalizedString(
- new Object[] {this, Integer.valueOf(currentBatchId), connection}), ex);
- }
- catch (IllegalStateException e) {
- this.processor.setException(new GatewaySenderException(e));
- throw new GatewaySenderException(
- LocalizedStrings.GatewayEventRemoteDispatcher_0_EXCEPTION_DURING_PROCESSING_BATCH_1_ON_CONNECTION_2.toLocalizedString(
- new Object[] {this, Integer.valueOf(currentBatchId), connection}), e);
- }
- catch (Exception e) {
- // An Exception has occurred. Get its cause.
- Throwable t = e.getCause();
- if (t instanceof IOException) {
- // An IOException has occurred.
- ex = (IOException)t;
- } else {
- ex = e;
- }
- //the cause is not going to be BatchException70. So, destroy the connection
- destroyConnection();
-
- throw new GatewaySenderException(
- LocalizedStrings.GatewayEventRemoteDispatcher_0_EXCEPTION_DURING_PROCESSING_BATCH_1_ON_CONNECTION_2.toLocalizedString(
- new Object[] {this, Integer.valueOf(currentBatchId), connection}), ex);
- }
- }
-
- /**
- * Acquires or adds a new <code>Connection</code> to the corresponding
- * <code>Gateway</code>
- *
- * @return the <code>Connection</code>
- *
- * @throws GatewaySenderException
- */
- public Connection getConnection(boolean startAckReaderThread) throws GatewaySenderException{
- if (this.processor.isStopped()) {
- return null;
- }
- // IF the connection is null
- // OR the connection's ServerLocation doesn't match with the one stored in sender
- // THEN initialize the connection
- if(!this.sender.isParallel()) {
- if (this.connection == null || this.connection.isDestroyed()
- || !this.connection.getServer().equals(this.sender.getServerLocation())) {
- if (logger.isDebugEnabled()) {
- logger.debug("Initializing new connection as serverLocation of old connection is : {} and the serverLocation to connect is {}",
- ((this.connection == null) ? "null" : this.connection.getServer()),
- this.sender.getServerLocation());
- }
- // Initialize the connection
- initializeConnection();
- }
- } else {
- if (this.connection == null || this.connection.isDestroyed()) {
- initializeConnection();
- }
- }
-
- // Here we might wait on a connection to another server if I was secondary
- // so don't start waiting until I am primary
- Cache cache = this.sender.getCache();
- if (cache != null && !cache.isClosed()) {
- if (this.sender.isPrimary() && (this.connection != null)) {
- if (this.ackReaderThread == null || !this.ackReaderThread.isRunning()) {
- this.ackReaderThread = new AckReaderThread(this.sender, this.processor);
- this.ackReaderThread.start();
- this.ackReaderThread.waitForRunningAckReaderThreadRunningState();
- }
- }
- }
- return this.connection;
- }
-
- public void destroyConnection() {
- this.connectionLifeCycleLock.writeLock().lock();
- try {
- Connection con = this.connection;
- if (con != null) {
- if (!con.isDestroyed()) {
- con.destroy();
- this.sender.getProxy().returnConnection(con);
- }
-
- // Reset the connection so the next time through a new one will be
- // obtained
- this.connection = null;
- this.sender.setServerLocation(null);
- }
- }
- finally {
- this.connectionLifeCycleLock.writeLock().unlock();
- }
- }
-
- /**
- * Initializes the <code>Connection</code>.
- *
- * @throws GatewaySenderException
- */
- private void initializeConnection() throws GatewaySenderException,
- GemFireSecurityException {
- this.connectionLifeCycleLock.writeLock().lock();
- try {
- // Attempt to acquire a connection
- if (this.sender.getProxy() == null
- || this.sender.getProxy().isDestroyed()) {
- this.sender.initProxy();
- } else {
- this.processor.resetBatchId();
- }
- Connection con;
- try {
- if (this.sender.isParallel()) {
- /*
- * TODO - The use of acquireConnection should be removed
- * from the gateway code. This method is fine for tests,
- * but these connections should really be managed inside
- * the pool code. If the gateway needs to persistent connection
- * to a single server, which should create have the OpExecutor
- * that holds a reference to the connection (similar to the way
- * we do with thread local connections).
- * Use {@link ExecutablePool#setupServerAffinity(boolean)} for
- * gateway code
- */
- con = this.sender.getProxy().acquireConnection();
- // For parallel sender, setting server location will not matter.
- // everytime it will ask for acquire connection whenever it needs it. I
- // am saving this server location for command purpose
- sender.setServerLocation(con.getServer());
- } else {
- synchronized (this.sender
- .getLockForConcurrentDispatcher()) {
- ServerLocation server = this.sender.getServerLocation();
- if (server != null) {
- if (logger.isDebugEnabled()) {
- logger.debug("ServerLocation is: {}. Connecting to this serverLocation...", server);
- }
- con = this.sender.getProxy().acquireConnection(server);
- } else {
- if (logger.isDebugEnabled()) {
- logger.debug("ServerLocation is null. Creating new connection. ");
- }
- con = this.sender.getProxy().acquireConnection();
- // Acquired connection from pool!! Update the server location
- // information in the sender and
- // distribute the information to other senders ONLY IF THIS SENDER
- // IS
- // PRIMARY
- if (this.sender.isPrimary()) {
- if (sender.getServerLocation() == null) {
- sender.setServerLocation(con.getServer());
- }
- new UpdateAttributesProcessor(this.sender).distribute(false);
- }
- }
- }
- }
- } catch (ServerConnectivityException e) {
- this.failedConnectCount++;
- Throwable ex = null;
-
- if (e.getCause() instanceof GemFireSecurityException) {
- ex = e.getCause();
- if (logConnectionFailure()) {
- // only log this message once; another msg is logged once we connect
- logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayEventRemoteDispatcher_0_COULD_NOT_CONNECT_1,
- new Object[] { this.processor.getSender().getId(), ex.getMessage() }));
- }
- throw new GatewaySenderException(ex);
- }
- List<ServerLocation> servers = this.sender.getProxy()
- .getCurrentServers();
- String ioMsg = null;
- if (servers.size() == 0) {
- ioMsg = LocalizedStrings.GatewayEventRemoteDispatcher_THERE_ARE_NO_ACTIVE_SERVERS
- .toLocalizedString();
- } else {
- final StringBuilder buffer = new StringBuilder();
- for (ServerLocation server : servers) {
- String endpointName = String.valueOf(server);
- if (buffer.length() > 0) {
- buffer.append(", ");
- }
- buffer.append(endpointName);
- }
- ioMsg = LocalizedStrings.GatewayEventRemoteDispatcher_NO_AVAILABLE_CONNECTION_WAS_FOUND_BUT_THE_FOLLOWING_ACTIVE_SERVERS_EXIST_0
- .toLocalizedString(buffer.toString());
- }
- ex = new IOException(ioMsg);
- // Set the serverLocation to null so that a new connection can be
- // obtained in next attempt
- this.sender.setServerLocation(null);
- if (this.failedConnectCount == 1) {
- // only log this message once; another msg is logged once we connect
- logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayEventRemoteDispatcher__0___COULD_NOT_CONNECT,
- this.processor.getSender().getId()));
-
- }
- // Wrap the IOException in a GatewayException so it can be processed the
- // same as the other exceptions that might occur in sendBatch.
- throw new GatewaySenderException(
- LocalizedStrings.GatewayEventRemoteDispatcher__0___COULD_NOT_CONNECT
- .toLocalizedString(this.processor.getSender().getId()), ex);
- }
- if (this.failedConnectCount > 0) {
- Object[] logArgs = new Object[] { this.processor.getSender().getId(),
- con, Integer.valueOf(this.failedConnectCount) };
- logger.info(LocalizedMessage.create(LocalizedStrings.GatewayEventRemoteDispatcher_0_USING_1_AFTER_2_FAILED_CONNECT_ATTEMPTS,
- logArgs));
- this.failedConnectCount = 0;
- } else {
- Object[] logArgs = new Object[] { this.processor.getSender().getId(),
- con };
- logger.info(LocalizedMessage.create(LocalizedStrings.GatewayEventRemoteDispatcher_0_USING_1, logArgs));
- }
- this.connection = con;
- this.processor.checkIfPdxNeedsResend(this.connection.getQueueStatus().getPdxSize());
- }
- catch (ConnectionDestroyedException e) {
- throw new GatewaySenderException(
- LocalizedStrings.GatewayEventRemoteDispatcher__0___COULD_NOT_CONNECT.toLocalizedString(this.processor
- .getSender().getId()), e);
- }
- finally {
- this.connectionLifeCycleLock.writeLock().unlock();
- }
- }
-
- protected boolean logConnectionFailure() {
- // always log the first failure
- if (logger.isDebugEnabled() || this.failedConnectCount == 0) {
- return true;
- }
- else {
- // subsequent failures will be logged on 30th, 300th, 3000th try
- // each try is at 100millis from higher layer so this accounts for logging
- // after 3s, 30s and then every 5mins
- if (this.failedConnectCount >= 3000) {
- return (this.failedConnectCount % 3000) == 0;
- }
- else {
- return (this.failedConnectCount == 30 || this.failedConnectCount == 300);
- }
- }
- }
-
- public static class GatewayAck {
- private int batchId;
-
- private int numEvents;
-
- private BatchException70 be;
-
- public GatewayAck(BatchException70 be, int bId) {
- this.be = be;
- this.batchId = bId;
- }
-
- public GatewayAck(int batchId, int numEvents) {
- this.batchId = batchId;
- this.numEvents = numEvents;
- }
-
- /**
- * @return the numEvents
- */
- public int getNumEvents() {
- return numEvents;
- }
-
- /**
- * @return the batchId
- */
- public int getBatchId() {
- return batchId;
- }
-
- public BatchException70 getBatchException() {
- return this.be;
- }
- }
-
- class AckReaderThread extends Thread {
-
- private Object runningStateLock = new Object();
-
- /**
- * boolean to make a shutdown request
- */
- private volatile boolean shutdown = false;
-
- private final GemFireCacheImpl cache;
-
- private volatile boolean ackReaderThreadRunning = false;
-
- public AckReaderThread(GatewaySender sender, AbstractGatewaySenderEventProcessor processor) {
- super("AckReaderThread for : " + processor.getName());
- this.setDaemon(true);
- this.cache = (GemFireCacheImpl)((AbstractGatewaySender)sender).getCache();
- }
-
- public void waitForRunningAckReaderThreadRunningState() {
- synchronized (runningStateLock) {
- while (!this.ackReaderThreadRunning) {
- try {
- this.runningStateLock.wait();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- break;
- }
- }
- }
- }
-
- private boolean checkCancelled() {
- if (shutdown) {
- return true;
- }
-
- if (cache.getCancelCriterion().isCancelInProgress()) {
- return true;
- }
- return false;
- }
-
- @Override
- public void run() {
- if (logger.isDebugEnabled()) {
- logger.debug("AckReaderThread started.. ");
- }
-
- synchronized (runningStateLock) {
- ackReaderThreadRunning = true;
- this.runningStateLock.notifyAll();
- }
-
- try {
- for (;;) {
- if (checkCancelled()) {
- break;
- }
- GatewayAck ack = readAcknowledgement();
- if (ack != null) {
- boolean gotBatchException = ack.getBatchException() != null;
- int batchId = ack.getBatchId();
- int numEvents = ack.getNumEvents();
-
- // If the batch is successfully processed, remove it from the
- // queue.
- if (gotBatchException) {
- logger.info(LocalizedMessage.create(
- LocalizedStrings.GatewaySenderEventRemoteDispatcher_GATEWAY_SENDER_0_RECEIVED_ACK_FOR_BATCH_ID_1_WITH_EXCEPTION,
- new Object[] { processor.getSender(), ack.getBatchId() }, ack.getBatchException()));
- // If we get PDX related exception in the batch exception then try
- // to resend all the pdx events as well in the next batch.
- final GatewaySenderStats statistics = sender.getStatistics();
- statistics.incBatchesRedistributed();
- // log batch exceptions and remove all the events if remove from
- // exception is true
- // do not remove if it is false
- logBatchExceptions(ack.getBatchException());
- processor.handleSuccessBatchAck(batchId);
-
- } // unsuccessful batch
- else { // The batch was successful.
- if (logger.isDebugEnabled()) {
- logger.debug("Gateway Sender {} : Received ack for batch id {} of {} events",
- processor.getSender(), ack.getBatchId(), ack.getNumEvents());
- }
- processor.handleSuccessBatchAck(batchId);
- }
- } else {
- // If we have received IOException.
- if (logger.isDebugEnabled()) {
- logger.debug("{}: Received null ack from remote site.", processor.getSender());
- }
- processor.handleException();
- try { // This wait is before trying to getting new connection to
- // receive ack. Without this there will be continuous call to
- // getConnection
- Thread.sleep(GatewaySender.CONNECTION_RETRY_INTERVAL);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- }
- } catch (Exception e) {
- if (!checkCancelled()) {
- logger.fatal(LocalizedMessage.create(
- LocalizedStrings.GatewayEventRemoteDispatcher_STOPPING_THE_PROCESSOR_BECAUSE_THE_FOLLOWING_EXCEPTION_OCCURRED_WHILE_PROCESSING_A_BATCH) ,e);
- }
- sender.getLifeCycleLock().writeLock().lock();
- try {
- processor.stopProcessing();
- sender.clearTempEventsAfterSenderStopped();
- } finally {
- sender.getLifeCycleLock().writeLock().unlock();
- }
- // destroyConnection();
- } finally {
- if (logger.isDebugEnabled()) {
- logger.debug("AckReaderThread exiting. ");
- }
- ackReaderThreadRunning = false;
- }
-
- }
-
- /**
- * @param exception
- *
- */
- private void logBatchExceptions(BatchException70 exception) {
- for (BatchException70 be : exception.getExceptions()) {
- boolean logWarning = true;
- if (be.getCause() instanceof RegionDestroyedException) {
- RegionDestroyedException rde = (RegionDestroyedException)be
- .getCause();
- synchronized (notFoundRegionsSync) {
- if (notFoundRegions.contains(rde.getRegionFullPath())) {
- logWarning = false;
- } else {
- notFoundRegions.add(rde.getRegionFullPath());
- }
- }
- } else if (be.getCause() instanceof IllegalStateException
- && be.getCause().getMessage().contains("Unknown pdx type")) {
- List<GatewaySenderEventImpl> pdxEvents = processor
- .getBatchIdToPDXEventsMap().get(be.getBatchId());
- if (logWarning) {
- logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayEventRemoteDispatcher_A_BATCHEXCEPTION_OCCURRED_PROCESSING_PDX_EVENT__0,
- be.getIndex()), be);
- }
- if (pdxEvents != null) {
- for (GatewaySenderEventImpl senderEvent : pdxEvents) {
- senderEvent.isAcked = false;
- }
- GatewaySenderEventImpl gsEvent = pdxEvents.get(be.getIndex());
- if (logWarning) {
- logger.warn(LocalizedMessage.create(
- LocalizedStrings.GatewayEventRemoteDispatcher_THE_EVENT_BEING_PROCESSED_WHEN_THE_BATCHEXCEPTION_OCCURRED_WAS__0, gsEvent));
- }
- }
- continue;
- }
- if (logWarning) {
- logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayEventRemoteDispatcher_A_BATCHEXCEPTION_OCCURRED_PROCESSING_EVENT__0,
- be.getIndex()), be);
- }
- List<GatewaySenderEventImpl>[] eventsArr = processor.getBatchIdToEventsMap().get(be.getBatchId());
- if (eventsArr != null) {
- List<GatewaySenderEventImpl> filteredEvents = eventsArr[1];
- GatewaySenderEventImpl gsEvent = (GatewaySenderEventImpl)filteredEvents
- .get(be.getIndex());
- if (logWarning) {
- logger.warn(LocalizedMessage.create(
- LocalizedStrings.GatewayEventRemoteDispatcher_THE_EVENT_BEING_PROCESSED_WHEN_THE_BATCHEXCEPTION_OCCURRED_WAS__0, gsEvent));
- }
- }
- }
- }
-
- boolean isRunning() {
- return this.ackReaderThreadRunning;
- }
-
- public void shutdown() {
- // we need to destroy connection irrespective of we are listening on it or
- // not. No need to take lock as the reader thread may be blocked and we might not
- // get chance to destroy unless that returns.
- if (connection != null) {
- Connection conn = connection;
- shutDownAckReaderConnection();
- if (!conn.isDestroyed()) {
- conn.destroy();
- sender.getProxy().returnConnection(conn);
- }
- }
- this.shutdown = true;
- boolean interrupted = Thread.interrupted();
- try {
- this.join(15 * 1000);
- } catch (InterruptedException e) {
- interrupted = true;
- } finally {
- if (interrupted) {
- Thread.currentThread().interrupt();
- }
- }
- if (this.isAlive()) {
- logger.warn(LocalizedMessage.create(LocalizedStrings.GatewaySender_ACKREADERTHREAD_IGNORED_CANCELLATION));
- }
- }
-
- private void shutDownAckReaderConnection() {
- Connection conn = connection;
- //attempt to unblock the ackReader thread by shutting down the inputStream, if it was stuck on a read
- try {
- if (conn != null && conn.getInputStream() != null) {
- conn.getInputStream().close();
- }
- } catch (IOException e) {
- logger.warn("Unable to shutdown AckReaderThread Connection");
- } catch (ConnectionDestroyedException e) {
- logger.info("AckReader shutting down and connection already destroyed");
- }
-
- }
- }
-
- public void stopAckReaderThread() {
- if (this.ackReaderThread != null) {
- this.ackReaderThread.shutdown();
- }
- }
-
- @Override
- public boolean isRemoteDispatcher() {
- return true;
- }
-
- @Override
- public boolean isConnectedToRemote() {
- return connection != null;
- }
-
- public void stop() {
- stopAckReaderThread();
- if(this.processor.isStopped()) {
- destroyConnection();
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderFactoryImpl.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderFactoryImpl.java b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderFactoryImpl.java
deleted file mode 100644
index 4974c6f..0000000
--- a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderFactoryImpl.java
+++ /dev/null
@@ -1,382 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.wan;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.cache.Cache;
-import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
-import com.gemstone.gemfire.cache.client.internal.LocatorDiscoveryCallback;
-import com.gemstone.gemfire.cache.wan.GatewayEventFilter;
-import com.gemstone.gemfire.cache.wan.GatewayEventSubstitutionFilter;
-import com.gemstone.gemfire.cache.wan.GatewaySender;
-import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy;
-import com.gemstone.gemfire.cache.wan.GatewaySenderFactory;
-import com.gemstone.gemfire.cache.wan.GatewayTransportFilter;
-import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
-import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
-import com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderImpl;
-import com.gemstone.gemfire.internal.cache.wan.serial.SerialGatewaySenderImpl;
-import com.gemstone.gemfire.internal.cache.xmlcache.CacheCreation;
-import com.gemstone.gemfire.internal.cache.xmlcache.ParallelGatewaySenderCreation;
-import com.gemstone.gemfire.internal.cache.xmlcache.SerialGatewaySenderCreation;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-import com.gemstone.gemfire.internal.logging.LogService;
-import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
-
-/**
- *
- * @since GemFire 7.0
- *
- */
-public class GatewaySenderFactoryImpl implements
- InternalGatewaySenderFactory {
-
- private static final Logger logger = LogService.getLogger();
-
- /**
- * Used internally to pass the attributes from this factory to the real
- * GatewaySender it is creating.
- */
- private GatewaySenderAttributes attrs = new GatewaySenderAttributes();
-
- private Cache cache;
-
- private static final AtomicBoolean GATEWAY_CONNECTION_READ_TIMEOUT_PROPERTY_CHECKED = new AtomicBoolean(false);
-
- public GatewaySenderFactoryImpl(Cache cache) {
- this.cache = cache;
- }
-
- public GatewaySenderFactory setParallel(boolean isParallel){
- this.attrs.isParallel = isParallel;
- return this;
- }
-
- public GatewaySenderFactory setForInternalUse(boolean isForInternalUse) {
- this.attrs.isForInternalUse = isForInternalUse;
- return this;
- }
-
- public GatewaySenderFactory addGatewayEventFilter(
- GatewayEventFilter filter) {
- this.attrs.addGatewayEventFilter(filter);
- return this;
- }
-
- public GatewaySenderFactory addGatewayTransportFilter(
- GatewayTransportFilter filter) {
- this.attrs.addGatewayTransportFilter(filter);
- return this;
- }
-
- public GatewaySenderFactory addAsyncEventListener(
- AsyncEventListener listener) {
- this.attrs.addAsyncEventListener(listener);
- return this;
- }
-
- public GatewaySenderFactory setSocketBufferSize(int socketBufferSize) {
- this.attrs.socketBufferSize = socketBufferSize;
- return this;
- }
-
- public GatewaySenderFactory setSocketReadTimeout(int socketReadTimeout) {
- this.attrs.socketReadTimeout = socketReadTimeout;
- return this;
- }
-
- public GatewaySenderFactory setDiskStoreName(String diskStoreName) {
- this.attrs.diskStoreName = diskStoreName;
- return this;
- }
-
- public GatewaySenderFactory setMaximumQueueMemory(int maximumQueueMemory) {
- this.attrs.maximumQueueMemory = maximumQueueMemory;
- return this;
- }
-
- public GatewaySenderFactory setBatchSize(int batchSize) {
- this.attrs.batchSize = batchSize;
- return this;
- }
-
- public GatewaySenderFactory setBatchTimeInterval(int batchTimeInterval) {
- this.attrs.batchTimeInterval = batchTimeInterval;
- return this;
- }
-
- public GatewaySenderFactory setBatchConflationEnabled(
- boolean enableBatchConflation) {
- this.attrs.isBatchConflationEnabled = enableBatchConflation;
- return this;
- }
-
- public GatewaySenderFactory setPersistenceEnabled(
- boolean enablePersistence) {
- this.attrs.isPersistenceEnabled = enablePersistence;
- return this;
- }
-
- public GatewaySenderFactory setAlertThreshold(int threshold) {
- this.attrs.alertThreshold = threshold;
- return this;
- }
-
- public GatewaySenderFactory setManualStart(boolean start) {
- this.attrs.manualStart = start;
- return this;
- }
-
- public GatewaySenderFactory setLocatorDiscoveryCallback(
- LocatorDiscoveryCallback locCallback) {
- this.attrs.locatorDiscoveryCallback = locCallback;
- return this;
- }
-
- @Override
- public GatewaySenderFactory setDiskSynchronous(boolean isSynchronous) {
- this.attrs.isDiskSynchronous = isSynchronous;
- return this;
- }
-
- @Override
- public GatewaySenderFactory setDispatcherThreads(int numThreads) {
- if ((numThreads > 1) && this.attrs.policy == null) {
- this.attrs.policy = GatewaySender.DEFAULT_ORDER_POLICY;
- }
- this.attrs.dispatcherThreads = numThreads;
- return this;
- }
-
- public GatewaySenderFactory setParallelFactorForReplicatedRegion(int parallel) {
- this.attrs.parallelism = parallel;
- this.attrs.policy = GatewaySender.DEFAULT_ORDER_POLICY;
- return this;
- }
-
- @Override
- public GatewaySenderFactory setOrderPolicy(OrderPolicy policy) {
- this.attrs.policy = policy;
- return this;
- }
-
- public GatewaySenderFactory setBucketSorted(boolean isBucketSorted){
- this.attrs.isBucketSorted = isBucketSorted;
- return this;
- }
- public GatewaySender create(String id, int remoteDSId) {
- int myDSId = InternalDistributedSystem.getAnyInstance()
- .getDistributionManager().getDistributedSystemId();
- if (remoteDSId == myDSId) {
- throw new GatewaySenderException(
- LocalizedStrings.GatewaySenderImpl_GATEWAY_0_CANNOT_BE_CREATED_WITH_REMOTE_SITE_ID_EQUAL_TO_THIS_SITE_ID
- .toLocalizedString(id));
- }
- if (remoteDSId < 0) {
- throw new GatewaySenderException(
- LocalizedStrings.GatewaySenderImpl_GATEWAY_0_CANNOT_BE_CREATED_WITH_REMOTE_SITE_ID_LESS_THAN_ZERO
- .toLocalizedString(id));
- }
- this.attrs.id = id;
- this.attrs.remoteDs = remoteDSId;
- GatewaySender sender = null;
-
- if(this.attrs.getDispatcherThreads() <= 0){
- throw new GatewaySenderException(
- LocalizedStrings.GatewaySenderImpl_GATEWAY_SENDER_0_CANNOT_HAVE_DISPATCHER_THREADS_LESS_THAN_1
- .toLocalizedString(id));
- }
-
- // Verify socket read timeout if a proper logger is available
- if (this.cache instanceof GemFireCacheImpl) {
- // If socket read timeout is less than the minimum, log a warning.
- // Ideally, this should throw a GatewaySenderException, but wan dunit tests
- // were failing, and we were running out of time to change them.
- if (this.attrs.getSocketReadTimeout() != 0
- && this.attrs.getSocketReadTimeout() < GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT) {
- logger.warn(LocalizedMessage.create(LocalizedStrings.Gateway_CONFIGURED_SOCKET_READ_TIMEOUT_TOO_LOW,
- new Object[] { "GatewaySender " + id, this.attrs.getSocketReadTimeout(), GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT }));
- this.attrs.socketReadTimeout = GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT;
- }
-
- // Log a warning if the old system property is set.
- if (GATEWAY_CONNECTION_READ_TIMEOUT_PROPERTY_CHECKED.compareAndSet(false, true)) {
- if (System.getProperty(GatewaySender.GATEWAY_CONNECTION_READ_TIMEOUT_PROPERTY) != null) {
- logger.warn(LocalizedMessage.create(LocalizedStrings.Gateway_OBSOLETE_SYSTEM_POPERTY,
- new Object[] { GatewaySender.GATEWAY_CONNECTION_READ_TIMEOUT_PROPERTY, "GatewaySender socket read timeout" }));
- }
- }
- }
-
- if (this.attrs.isParallel()) {
-// if(this.attrs.getDispatcherThreads() != 1){
-// throw new GatewaySenderException(
-// LocalizedStrings.GatewaySenderImpl_PARALLEL_GATEWAY_SENDER_0_CANNOT_BE_CREATED_WITH_DISPATCHER_THREADS_OTHER_THAN_1
-// .toLocalizedString(id));
-// }
- if ((this.attrs.getOrderPolicy() != null)
- && this.attrs.getOrderPolicy().equals(OrderPolicy.THREAD)) {
- throw new GatewaySenderException(
- LocalizedStrings.GatewaySenderImpl_PARALLEL_GATEWAY_SENDER_0_CANNOT_BE_CREATED_WITH_ORDER_POLICY_1
- .toLocalizedString(id, this.attrs.getOrderPolicy()));
- }
- if (this.cache instanceof GemFireCacheImpl) {
- sender = new ParallelGatewaySenderImpl(this.cache, this.attrs);
- ((GemFireCacheImpl)this.cache).addGatewaySender(sender);
-
- if (!this.attrs.isManualStart()) {
- sender.start();
- }
- }
- else if (this.cache instanceof CacheCreation) {
- sender = new ParallelGatewaySenderCreation(this.cache, this.attrs);
- ((CacheCreation)this.cache).addGatewaySender(sender);
- }
- }
- else {
- if (this.attrs.getAsyncEventListeners().size() > 0) {
- throw new GatewaySenderException(
- LocalizedStrings.SerialGatewaySenderImpl_GATEWAY_0_CANNOT_DEFINE_A_REMOTE_SITE_BECAUSE_AT_LEAST_ONE_LISTENER_IS_ALREADY_ADDED
- .toLocalizedString(id));
- }
-// if (this.attrs.getOrderPolicy() != null) {
-// if (this.attrs.getDispatcherThreads() == GatewaySender.DEFAULT_DISPATCHER_THREADS) {
-// throw new GatewaySenderException(
-// LocalizedStrings.SerialGatewaySender_INVALID_GATEWAY_SENDER_ORDER_POLICY_CONCURRENCY_0
-// .toLocalizedString(id));
-// }
-// }
- if (this.attrs.getOrderPolicy() == null && this.attrs.getDispatcherThreads() > 1) {
- this.attrs.policy = GatewaySender.DEFAULT_ORDER_POLICY;
- }
- if (this.cache instanceof GemFireCacheImpl) {
- sender = new SerialGatewaySenderImpl(this.cache, this.attrs);
- ((GemFireCacheImpl)this.cache).addGatewaySender(sender);
-
- if (!this.attrs.isManualStart()) {
- sender.start();
- }
- }
- else if (this.cache instanceof CacheCreation) {
- sender = new SerialGatewaySenderCreation(this.cache, this.attrs);
- ((CacheCreation)this.cache).addGatewaySender(sender);
- }
- }
- return sender;
- }
-
- public GatewaySender create(String id) {
- this.attrs.id = id;
- GatewaySender sender = null;
-
- if(this.attrs.getDispatcherThreads() <= 0) {
- throw new AsyncEventQueueConfigurationException(
- LocalizedStrings.AsyncEventQueue_0_CANNOT_HAVE_DISPATCHER_THREADS_LESS_THAN_1
- .toLocalizedString(id));
- }
-
- if (this.attrs.isParallel()) {
- if ((this.attrs.getOrderPolicy() != null)
- && this.attrs.getOrderPolicy().equals(OrderPolicy.THREAD)) {
- throw new AsyncEventQueueConfigurationException(
- LocalizedStrings.AsyncEventQueue_0_CANNOT_BE_CREATED_WITH_ORDER_POLICY_1
- .toLocalizedString(id, this.attrs.getOrderPolicy()));
- }
-
- if (this.cache instanceof GemFireCacheImpl) {
- sender = new ParallelGatewaySenderImpl(this.cache, this.attrs);
- ((GemFireCacheImpl)this.cache).addGatewaySender(sender);
- if (!this.attrs.isManualStart()) {
- sender.start();
- }
- }
- else if (this.cache instanceof CacheCreation) {
- sender = new ParallelGatewaySenderCreation(this.cache, this.attrs);
- ((CacheCreation)this.cache).addGatewaySender(sender);
- }
- }
- else {
-// if (this.attrs.getOrderPolicy() != null) {
-// if (this.attrs.getDispatcherThreads() == GatewaySender.DEFAULT_DISPATCHER_THREADS) {
-// throw new AsyncEventQueueConfigurationException(
-// LocalizedStrings.AsyncEventQueue_INVALID_ORDER_POLICY_CONCURRENCY_0
-// .toLocalizedString(id));
-// }
-// }
- if (this.attrs.getOrderPolicy() == null && this.attrs.getDispatcherThreads() > 1) {
- this.attrs.policy = GatewaySender.DEFAULT_ORDER_POLICY;
- }
- if (this.cache instanceof GemFireCacheImpl) {
- sender = new SerialGatewaySenderImpl(this.cache, this.attrs);
- ((GemFireCacheImpl)this.cache).addGatewaySender(sender);
- if (!this.attrs.isManualStart()) {
- sender.start();
- }
- }
- else if (this.cache instanceof CacheCreation) {
- sender = new SerialGatewaySenderCreation(this.cache, this.attrs);
- ((CacheCreation)this.cache).addGatewaySender(sender);
- }
- }
- return sender;
- }
-
- public GatewaySenderFactory removeGatewayEventFilter(
- GatewayEventFilter filter) {
- this.attrs.eventFilters.remove(filter);
- return this;
- }
-
- public GatewaySenderFactory removeGatewayTransportFilter(
- GatewayTransportFilter filter) {
- this.attrs.transFilters.remove(filter);
- return this;
- }
-
- public GatewaySenderFactory setGatewayEventSubstitutionFilter(
- GatewayEventSubstitutionFilter filter) {
- this.attrs.eventSubstitutionFilter = filter;
- return this;
- }
-
- public void configureGatewaySender(GatewaySender senderCreation) {
- this.attrs.isParallel = senderCreation.isParallel();
- this.attrs.manualStart = senderCreation.isManualStart();
- this.attrs.socketBufferSize = senderCreation.getSocketBufferSize();
- this.attrs.socketReadTimeout = senderCreation.getSocketReadTimeout();
- this.attrs.isBatchConflationEnabled = senderCreation.isBatchConflationEnabled();
- this.attrs.batchSize = senderCreation.getBatchSize();
- this.attrs.batchTimeInterval = senderCreation.getBatchTimeInterval();
- this.attrs.isPersistenceEnabled = senderCreation.isPersistenceEnabled();
- this.attrs.diskStoreName = senderCreation.getDiskStoreName();
- this.attrs.isDiskSynchronous = senderCreation.isDiskSynchronous();
- this.attrs.maximumQueueMemory = senderCreation.getMaximumQueueMemory();
- this.attrs.alertThreshold = senderCreation.getAlertThreshold();
- this.attrs.dispatcherThreads = senderCreation.getDispatcherThreads();
- this.attrs.policy = senderCreation.getOrderPolicy();
- for(GatewayEventFilter filter : senderCreation.getGatewayEventFilters()){
- this.attrs.eventFilters.add(filter);
- }
- for(GatewayTransportFilter filter : senderCreation.getGatewayTransportFilters()){
- this.attrs.transFilters.add(filter);
- }
- this.attrs.eventSubstitutionFilter = senderCreation.getGatewayEventSubstitutionFilter();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
deleted file mode 100644
index 322b1ba..0000000
--- a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
+++ /dev/null
@@ -1,263 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.wan.parallel;
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.cache.Cache;
-import com.gemstone.gemfire.cache.EntryOperation;
-import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
-import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
-import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueStats;
-import com.gemstone.gemfire.cache.wan.GatewaySender;
-import com.gemstone.gemfire.cache.wan.GatewayTransportFilter;
-import com.gemstone.gemfire.distributed.internal.DistributionAdvisor.Profile;
-import com.gemstone.gemfire.distributed.internal.DistributionAdvisor;
-import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
-import com.gemstone.gemfire.distributed.internal.ResourceEvent;
-import com.gemstone.gemfire.internal.cache.DistributedRegion;
-import com.gemstone.gemfire.internal.cache.EntryEventImpl;
-import com.gemstone.gemfire.internal.cache.EventID;
-import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
-import com.gemstone.gemfire.internal.cache.PartitionedRegionHelper;
-import com.gemstone.gemfire.internal.cache.UpdateAttributesProcessor;
-import com.gemstone.gemfire.internal.cache.ha.ThreadIdentifier;
-import com.gemstone.gemfire.internal.cache.wan.AbstractRemoteGatewaySender;
-import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor;
-import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAdvisor;
-import com.gemstone.gemfire.internal.cache.wan.GatewaySenderStats;
-import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAdvisor.GatewaySenderProfile;
-import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAttributes;
-import com.gemstone.gemfire.internal.cache.xmlcache.CacheCreation;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-import com.gemstone.gemfire.internal.logging.LogService;
-import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
-import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
-
-/**
- * @since GemFire 7.0
- *
- */
-public class ParallelGatewaySenderImpl extends AbstractRemoteGatewaySender {
-
- private static final Logger logger = LogService.getLogger();
-
- final ThreadGroup loggerGroup = LoggingThreadGroup.createThreadGroup(
- "Remote Site Discovery Logger Group", logger);
-
- public ParallelGatewaySenderImpl(){
- super();
- this.isParallel = true;
- }
-
- public ParallelGatewaySenderImpl(Cache cache, GatewaySenderAttributes attrs) {
- super(cache, attrs);
- }
-
- @Override
- public void start() {
- this.getLifeCycleLock().writeLock().lock();
- try {
- if (isRunning()) {
- logger.warn(LocalizedMessage.create(LocalizedStrings.GatewaySender_SENDER_0_IS_ALREADY_RUNNING, this.getId()));
- return;
- }
-
- if (this.remoteDSId != DEFAULT_DISTRIBUTED_SYSTEM_ID) {
- String locators = ((GemFireCacheImpl)this.cache).getDistributedSystem()
- .getConfig().getLocators();
- if (locators.length() == 0) {
- throw new IllegalStateException(
- LocalizedStrings.AbstractGatewaySender_LOCATOR_SHOULD_BE_CONFIGURED_BEFORE_STARTING_GATEWAY_SENDER
- .toLocalizedString());
- }
- }
- /*
- * Now onwards all processing will happen through "ConcurrentParallelGatewaySenderEventProcessor"
- * we have made "ParallelGatewaySenderEventProcessor" and "ParallelGatewaySenderQueue" as a
- * utility classes of Concurrent version of processor and queue.
- */
- eventProcessor = new RemoteConcurrentParallelGatewaySenderEventProcessor(this);
- /*if (getDispatcherThreads() > 1) {
- eventProcessor = new ConcurrentParallelGatewaySenderEventProcessor(this);
- } else {
- eventProcessor = new ParallelGatewaySenderEventProcessor(this);
- }*/
-
- eventProcessor.start();
- waitForRunningStatus();
- //Only notify the type registry if this is a WAN gateway queue
- if(!isAsyncEventQueue()) {
- ((GemFireCacheImpl) getCache()).getPdxRegistry().gatewaySenderStarted(this);
- }
- new UpdateAttributesProcessor(this).distribute(false);
-
- InternalDistributedSystem system = (InternalDistributedSystem) this.cache
- .getDistributedSystem();
- system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_START, this);
-
- logger.info(LocalizedMessage.create(LocalizedStrings.ParallelGatewaySenderImpl_STARTED__0, this));
-
- enqueueTempEvents();
- }
- finally {
- this.getLifeCycleLock().writeLock().unlock();
- }
- }
-
-// /**
-// * The sender is not started but only the message queue i.e. shadowPR is created on the node.
-// * @param targetPr
-// */
-// private void createMessageQueueOnAccessorNode(PartitionedRegion targetPr) {
-// eventProcessor = new ParallelGatewaySenderEventProcessor(this, targetPr);
-// }
-
-
- @Override
- public void stop() {
- this.getLifeCycleLock().writeLock().lock();
- try {
- if (!this.isRunning()) {
- return;
- }
- // Stop the dispatcher
- AbstractGatewaySenderEventProcessor ev = this.eventProcessor;
- //try {
- if (ev != null && !ev.isStopped()) {
- ev.stopProcessing();
- }
-
- // Stop the proxy (after the dispatcher, so the socket is still
- // alive until after the dispatcher has stopped)
- stompProxyDead();
-
- // Close the listeners
- for (AsyncEventListener listener : this.listeners) {
- listener.close();
- }
- //stop the running threads, open sockets if any
- ((ConcurrentParallelGatewaySenderQueue)this.eventProcessor.getQueue()).cleanUp();
-
- logger.info(LocalizedMessage.create(LocalizedStrings.GatewayImpl_STOPPED__0, this));
-
- InternalDistributedSystem system = (InternalDistributedSystem) this.cache
- .getDistributedSystem();
- system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_STOP, this);
-
- clearTempEventsAfterSenderStopped();
- // Keep the eventProcessor around so we can ask it for the regionQueues later.
- // Tests expect to be able to do this.
-// } finally {
-// this.eventProcessor = null;
-// }
- }
- finally {
- this.getLifeCycleLock().writeLock().unlock();
- }
- }
-
- @Override
- public String toString() {
- StringBuffer sb = new StringBuffer();
- sb.append("ParallelGatewaySender{");
- sb.append("id=" + getId());
- sb.append(",remoteDsId="+ getRemoteDSId());
- sb.append(",isRunning ="+ isRunning());
- sb.append("}");
- return sb.toString();
- }
-
- public void fillInProfile(Profile profile) {
- assert profile instanceof GatewaySenderProfile;
- GatewaySenderProfile pf = (GatewaySenderProfile)profile;
- pf.Id = getId();
- pf.remoteDSId = getRemoteDSId();
- pf.isRunning = isRunning();
- pf.isPrimary = isPrimary();
- pf.isParallel = true;
- pf.isBatchConflationEnabled = isBatchConflationEnabled();
- pf.isPersistenceEnabled = isPersistenceEnabled();
- pf.alertThreshold = getAlertThreshold();
- pf.manualStart = isManualStart();
- pf.dispatcherThreads = getDispatcherThreads();
- pf.orderPolicy = getOrderPolicy();
- for (com.gemstone.gemfire.cache.wan.GatewayEventFilter filter : getGatewayEventFilters()) {
- pf.eventFiltersClassNames.add(filter.getClass().getName());
- }
- for (GatewayTransportFilter filter : getGatewayTransportFilters()) {
- pf.transFiltersClassNames.add(filter.getClass().getName());
- }
- for (AsyncEventListener listener : getAsyncEventListeners()) {
- pf.senderEventListenerClassNames.add(listener.getClass().getName());
- }
- pf.isDiskSynchronous = isDiskSynchronous();
- }
-
- /* (non-Javadoc)
- * @see com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender#setModifiedEventId(com.gemstone.gemfire.internal.cache.EntryEventImpl)
- */
- @Override
- protected void setModifiedEventId(EntryEventImpl clonedEvent) {
- int bucketId = -1;
- //merged from 42004
- if (clonedEvent.getRegion() instanceof DistributedRegion) {
-// if (getOrderPolicy() == OrderPolicy.THREAD) {
-// bucketId = PartitionedRegionHelper.getHashKey(
-// ((EntryEventImpl)clonedEvent).getEventId().getThreadID(),
-// getMaxParallelismForReplicatedRegion());
-// }
-// else
- bucketId = PartitionedRegionHelper.getHashKey(clonedEvent.getKey(),
- getMaxParallelismForReplicatedRegion());
- }
- else {
- bucketId = PartitionedRegionHelper
- .getHashKey((EntryOperation)clonedEvent);
- }
- EventID originalEventId = clonedEvent.getEventId();
- long originatingThreadId = ThreadIdentifier.getRealThreadID(originalEventId.getThreadID());
-
- long newThreadId = ThreadIdentifier
- .createFakeThreadIDForParallelGSPrimaryBucket(bucketId,
- originatingThreadId, getEventIdIndex());
-
- // In case of parallel as all events go through primary buckets
- // we don't need to generate different threadId for secondary buckets
- // as they will be rejected if seen at PR level itself
-
-// boolean isPrimary = ((PartitionedRegion)getQueue().getRegion())
-// .getRegionAdvisor().getBucketAdvisor(bucketId).isPrimary();
-// if (isPrimary) {
-// newThreadId = ThreadIdentifier
-// .createFakeThreadIDForParallelGSPrimaryBucket(bucketId,
-// originatingThreadId);
-// } else {
-// newThreadId = ThreadIdentifier
-// .createFakeThreadIDForParallelGSSecondaryBucket(bucketId,
-// originatingThreadId);
-// }
-
- EventID newEventId = new EventID(originalEventId.getMembershipID(),
- newThreadId, originalEventId.getSequenceID(), bucketId);
- if (logger.isDebugEnabled()) {
- logger.debug("{}: Generated event id for event with key={}, bucketId={}, original event id={}, threadId={}, new event id={}, newThreadId={}",
- this, clonedEvent.getKey(), bucketId, originalEventId, originatingThreadId, newEventId, newThreadId);
- }
- clonedEvent.setEventId(newEventId);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/RemoteConcurrentParallelGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/RemoteConcurrentParallelGatewaySenderEventProcessor.java b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/RemoteConcurrentParallelGatewaySenderEventProcessor.java
deleted file mode 100644
index 35cdece..0000000
--- a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/RemoteConcurrentParallelGatewaySenderEventProcessor.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.wan.parallel;
-
-
-import java.util.Set;
-
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
-import com.gemstone.gemfire.internal.cache.wan.GatewaySenderStats;
-import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventRemoteDispatcher;
-import com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderEventProcessor;
-/**
- * Remote version of GatewaySenderEvent Processor
- *
- */
-public class RemoteConcurrentParallelGatewaySenderEventProcessor extends ConcurrentParallelGatewaySenderEventProcessor{
-
- public RemoteConcurrentParallelGatewaySenderEventProcessor(
- AbstractGatewaySender sender) {
- super(sender);
- }
-
- @Override
- protected void createProcessors(int dispatcherThreads, Set<Region> targetRs) {
- processors = new RemoteParallelGatewaySenderEventProcessor[sender.getDispatcherThreads()];
- if (logger.isDebugEnabled()) {
- logger.debug("Creating GatewaySenderEventProcessor");
- }
- for (int i = 0; i < sender.getDispatcherThreads(); i++) {
- processors[i] = new RemoteParallelGatewaySenderEventProcessor(sender,
- targetRs, i, sender.getDispatcherThreads());
- }
- }
-
- @Override
- protected void rebalance() {
- GatewaySenderStats statistics = this.sender.getStatistics();
- long startTime = statistics.startLoadBalance();
- try {
- for (ParallelGatewaySenderEventProcessor parallelProcessor : this.processors) {
- GatewaySenderEventRemoteDispatcher remoteDispatcher = (GatewaySenderEventRemoteDispatcher)parallelProcessor.getDispatcher();
- if (remoteDispatcher.isConnectedToRemote()) {
- remoteDispatcher.stopAckReaderThread();
- remoteDispatcher.destroyConnection();
- }
- }
- } finally {
- statistics.endLoadBalance(startTime);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java
deleted file mode 100644
index 815932e..0000000
--- a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.wan.parallel;
-
-import java.io.IOException;
-import java.util.Set;
-
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache.client.internal.Connection;
-import com.gemstone.gemfire.cache.client.internal.pooling.ConnectionDestroyedException;
-import com.gemstone.gemfire.cache.wan.GatewaySender;
-import com.gemstone.gemfire.internal.Version;
-import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
-import com.gemstone.gemfire.internal.cache.wan.GatewaySenderConfigurationException;
-import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventCallbackDispatcher;
-import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventDispatcher;
-import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventRemoteDispatcher;
-import com.gemstone.gemfire.internal.cache.wan.GatewaySenderException;
-import com.gemstone.gemfire.internal.cache.wan.GatewaySenderStats;
-import com.gemstone.gemfire.internal.logging.LogService;
-
-public class RemoteParallelGatewaySenderEventProcessor extends ParallelGatewaySenderEventProcessor {
- private static final Logger logger = LogService.getLogger();
-
- protected RemoteParallelGatewaySenderEventProcessor(
- AbstractGatewaySender sender) {
- super(sender);
- }
-
- /**
- * use in concurrent scenario where queue is to be shared among all the processors.
- */
- protected RemoteParallelGatewaySenderEventProcessor(AbstractGatewaySender sender, Set<Region> userRegions, int id, int nDispatcher) {
- super(sender, userRegions, id, nDispatcher);
- }
-
- @Override
- protected void rebalance() {
- GatewaySenderStats statistics = this.sender.getStatistics();
- long startTime = statistics.startLoadBalance();
- try {
- if (this.dispatcher.isRemoteDispatcher()) {
- GatewaySenderEventRemoteDispatcher remoteDispatcher = (GatewaySenderEventRemoteDispatcher) this.dispatcher;
- if (remoteDispatcher.isConnectedToRemote()) {
- remoteDispatcher.stopAckReaderThread();
- remoteDispatcher.destroyConnection();
- }
- }
- } finally {
- statistics.endLoadBalance(startTime);
- }
- }
-
- public void initializeEventDispatcher() {
- if (logger.isDebugEnabled()) {
- logger.debug(" Creating the GatewayEventRemoteDispatcher");
- }
- if (this.sender.getRemoteDSId() != GatewaySender.DEFAULT_DISTRIBUTED_SYSTEM_ID) {
- this.dispatcher = new GatewaySenderEventRemoteDispatcher(this);
- }
- }
-
- /**
- * Returns if corresponding receiver WAN site of this GatewaySender has
- * GemfireVersion > 7.0.1
- *
- * @param disp
- * @return true if remote site Gemfire Version is >= 7.0.1
- */
- private boolean shouldSendVersionEvents(GatewaySenderEventDispatcher disp)
- throws GatewaySenderException {
- try {
- GatewaySenderEventRemoteDispatcher remoteDispatcher = (GatewaySenderEventRemoteDispatcher) disp;
- // This will create a new connection if no batch has been sent till
- // now.
- Connection conn = remoteDispatcher.getConnection(false);
- if (conn != null) {
- short remoteSiteVersion = conn.getWanSiteVersion();
- if (Version.GFE_701.compareTo(remoteSiteVersion) <= 0) {
- return true;
- }
- }
- } catch (GatewaySenderException e) {
- Throwable cause = e.getCause();
- if (cause instanceof IOException
- || e instanceof GatewaySenderConfigurationException
- || cause instanceof ConnectionDestroyedException) {
- try {
- int sleepInterval = GatewaySender.CONNECTION_RETRY_INTERVAL;
- if (logger.isDebugEnabled()) {
- logger.debug("Sleeping for {} milliseconds", sleepInterval);
- }
- Thread.sleep(sleepInterval);
- } catch (InterruptedException ie) {
- // log the exception
- if (logger.isDebugEnabled()){
- logger.debug(ie.getMessage(), ie);
- }
- }
- }
- throw e;
- }
- return false;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/RemoteConcurrentSerialGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/RemoteConcurrentSerialGatewaySenderEventProcessor.java b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/RemoteConcurrentSerialGatewaySenderEventProcessor.java
deleted file mode 100644
index 8a25ab6..0000000
--- a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/RemoteConcurrentSerialGatewaySenderEventProcessor.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.wan.serial;
-
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
-import com.gemstone.gemfire.internal.logging.LogService;
-
-public class RemoteConcurrentSerialGatewaySenderEventProcessor extends
- ConcurrentSerialGatewaySenderEventProcessor {
-
- private static final Logger logger = LogService.getLogger();
-
- public RemoteConcurrentSerialGatewaySenderEventProcessor(
- AbstractGatewaySender sender) {
- super(sender);
- }
-
- @Override
- protected void initializeMessageQueue(String id) {
- for (int i = 0; i < sender.getDispatcherThreads(); i++) {
- processors.add(new RemoteSerialGatewaySenderEventProcessor(this.sender, id
- + "." + i));
- if (logger.isDebugEnabled()) {
- logger.debug("Created the RemoteSerialGatewayEventProcessor_{}->{}", i, processors.get(i));
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java
deleted file mode 100644
index 82fa585..0000000
--- a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.wan.serial;
-
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventRemoteDispatcher;
-import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventCallbackDispatcher;
-import com.gemstone.gemfire.cache.wan.GatewaySender;
-import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
-import com.gemstone.gemfire.internal.logging.LogService;
-
-public class RemoteSerialGatewaySenderEventProcessor extends
- SerialGatewaySenderEventProcessor {
-
- private static final Logger logger = LogService.getLogger();
- public RemoteSerialGatewaySenderEventProcessor(AbstractGatewaySender sender,
- String id) {
- super(sender, id);
- }
-
- public void initializeEventDispatcher() {
- if (logger.isDebugEnabled()) {
- logger.debug(" Creating the GatewayEventRemoteDispatcher");
- }
- // In case of serial there is a way to create gatewaySender and attach
- // asyncEventListener. Not sure of the use-case but there are dunit tests
- // To make them pass uncommenting the below condition
- if (this.sender.getRemoteDSId() != GatewaySender.DEFAULT_DISTRIBUTED_SYSTEM_ID) {
- this.dispatcher = new GatewaySenderEventRemoteDispatcher(this);
- }else{
- this.dispatcher = new GatewaySenderEventCallbackDispatcher(this);
- }
- }
-
-}