You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by rv...@apache.org on 2015/04/28 23:40:14 UTC
[09/51] [partial] incubator-geode git commit: Init
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/InvalidateOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/InvalidateOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/InvalidateOp.java
new file mode 100644
index 0000000..f15260f
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/InvalidateOp.java
@@ -0,0 +1,112 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.cache.EntryEventImpl;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+/**
+ * Does a region invalidate on a server
+ * @author gregp
+ * @since 6.6
+ */
+public class InvalidateOp {
+
+ private static final Logger logger = LogService.getLogger();
+
+ public static final int HAS_VERSION_TAG = 0x01;
+
+ /**
+ * Does a region invalidate on a server using connections from the given pool
+ * to communicate with the server.
+ * @param pool the pool to use to communicate with the server.
+ * @param region the name of the region to do the entry keySet on
+ */
+ public static void execute(ExecutablePool pool,
+ String region, EntryEventImpl event)
+ {
+ AbstractOp op = new InvalidateOpImpl(region, event);
+ pool.execute(op);
+ }
+
+ private InvalidateOp() {
+ // no instances allowed
+ }
+
+ private static class InvalidateOpImpl extends AbstractOp {
+ private EntryEventImpl event;
+ /**
+ * @throws com.gemstone.gemfire.SerializationException if serialization fails
+ */
+ public InvalidateOpImpl(String region,
+ EntryEventImpl event) {
+ super(MessageType.INVALIDATE, event.getCallbackArgument() != null ? 4 : 3);
+ Object callbackArg = event.getCallbackArgument();
+ this.event = event;
+ getMessage().addStringPart(region);
+ getMessage().addStringOrObjPart(event.getKeyInfo().getKey());
+ getMessage().addBytesPart(event.getEventId().calcBytes());
+ if (callbackArg != null) {
+ getMessage().addObjPart(callbackArg);
+ }
+ }
+
+ @Override
+ protected Object processResponse(Message msg) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+
+ @Override
+ protected Object processResponse(Message msg, Connection con) throws Exception {
+ processAck(msg, "invalidate");
+ boolean isReply = (msg.getMessageType() == MessageType.REPLY);
+ int partIdx = 0;
+ int flags = 0;
+ if (isReply) {
+ flags = msg.getPart(partIdx++).getInt();
+ if ((flags & HAS_VERSION_TAG) != 0) {
+ VersionTag tag = (VersionTag)msg.getPart(partIdx++).getObject();
+ // we use the client's ID since we apparently don't track the server's ID in connections
+ tag.replaceNullIDs((InternalDistributedMember) con.getEndpoint().getMemberId());
+ this.event.setVersionTag(tag);
+ if (logger.isDebugEnabled()) {
+ logger.debug("received Invalidate response with {}", tag);
+ }
+ } else {
+ if (logger.isDebugEnabled()) {
+ logger.debug("received Invalidate response");
+ }
+ }
+ }
+ return null;
+ }
+ @Override
+ protected boolean isErrorResponse(int msgType) {
+ return msgType == MessageType.INVALIDATE_ERROR;
+ }
+ @Override
+ protected long startAttempt(ConnectionStats stats) {
+ return stats.startInvalidate();
+ }
+ @Override
+ protected void endSendAttempt(ConnectionStats stats, long start) {
+ stats.endInvalidateSend(start, hasFailed());
+ }
+ @Override
+ protected void endAttempt(ConnectionStats stats, long start) {
+ stats.endInvalidate(start, hasTimedOut(), hasFailed());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/KeySetOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/KeySetOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/KeySetOp.java
new file mode 100644
index 0000000..26cc4a2
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/KeySetOp.java
@@ -0,0 +1,121 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ChunkedMessage;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+
+import java.util.List;
+import java.util.Set;
+import java.util.HashSet;
+
+/**
+ * Does a region keySet on a server
+ * @author darrel
+ * @since 5.7
+ */
+public class KeySetOp {
+ /**
+ * Does a region entry keySet on a server using connections from the given pool
+ * to communicate with the server.
+ * @param pool the pool to use to communicate with the server.
+ * @param region the name of the region to do the entry keySet on
+ */
+ public static Set execute(ExecutablePool pool,
+ String region)
+ {
+ AbstractOp op = new KeySetOpImpl(region);
+ return (Set)pool.execute(op);
+ }
+
+ private KeySetOp() {
+ // no instances allowed
+ }
+
+ private static class KeySetOpImpl extends AbstractOp {
+ /**
+ * @throws com.gemstone.gemfire.SerializationException if serialization fails
+ */
+ public KeySetOpImpl(String region) {
+ super(MessageType.KEY_SET, 1);
+ getMessage().addStringPart(region);
+ }
+ @Override
+ protected Message createResponseMessage() {
+ return new ChunkedMessage(1, Version.CURRENT);
+ }
+ @Override
+ protected Object processResponse(Message msg) throws Exception {
+
+ ChunkedMessage keySetResponseMessage = (ChunkedMessage)msg;
+ final HashSet result = new HashSet();
+ final Exception[] exceptionRef = new Exception[1];
+
+ keySetResponseMessage.readHeader();
+ final int msgType = keySetResponseMessage.getMessageType();
+ if (msgType == MessageType.RESPONSE) {
+ do {
+ keySetResponseMessage.receiveChunk();
+ //callback.handle(msg);
+ Part part = keySetResponseMessage.getPart(0);
+ Object o = part.getObject();
+ if (o instanceof Throwable) {
+ String s = "While performing a remote keySet";
+ exceptionRef[0] = new ServerOperationException(s, (Throwable)o);
+ } else {
+ result.addAll((List)o);
+ }
+ } while (!keySetResponseMessage.isLastChunk());
+ } else {
+ if (msgType == MessageType.EXCEPTION) {
+ keySetResponseMessage.receiveChunk();
+ Part part = msg.getPart(0);
+ String s = "While performing a remote " + "keySet";
+ throw new ServerOperationException(s, (Throwable) part.getObject());
+ // Get the exception toString part.
+ // This was added for c++ thin client and not used in java
+ // Part exceptionToStringPart = msg.getPart(1);
+ } else if (isErrorResponse(msgType)) {
+ keySetResponseMessage.receiveChunk();
+ Part part = msg.getPart(0);
+ throw new ServerOperationException(part.getString());
+ } else {
+ throw new InternalGemFireError("Unexpected message type "
+ + MessageType.getString(msgType));
+ }
+ }
+
+ if (exceptionRef[0] != null) {
+ throw exceptionRef[0];
+ } else {
+ return result;
+ }
+ }
+ @Override
+ protected boolean isErrorResponse(int msgType) {
+ return msgType == MessageType.KEY_SET_DATA_ERROR;
+ }
+ @Override
+ protected long startAttempt(ConnectionStats stats) {
+ return stats.startKeySet();
+ }
+ @Override
+ protected void endSendAttempt(ConnectionStats stats, long start) {
+ stats.endKeySetSend(start, hasFailed());
+ }
+ @Override
+ protected void endAttempt(ConnectionStats stats, long start) {
+ stats.endKeySet(start, hasTimedOut(), hasFailed());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/LiveServerPinger.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/LiveServerPinger.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/LiveServerPinger.java
new file mode 100644
index 0000000..1f01cf5
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/LiveServerPinger.java
@@ -0,0 +1,108 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.client.internal.EndpointManager.EndpointListenerAdapter;
+import com.gemstone.gemfire.cache.client.internal.PoolImpl.PoolTask;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+/**
+ * Responsible for pinging live
+ * servers to make sure they
+ * are still alive.
+ * @author dsmith
+ *
+ */
+public class LiveServerPinger extends EndpointListenerAdapter {
+ private static final Logger logger = LogService.getLogger();
+
+ private static final long NANOS_PER_MS = 1000000L;
+
+ private final ConcurrentMap/*<Endpoint,Future>*/ taskFutures = new ConcurrentHashMap();
+ protected final InternalPool pool;
+ protected final long pingIntervalNanos;
+
+ public LiveServerPinger(InternalPool pool) {
+ this.pool = pool;
+ this.pingIntervalNanos = pool.getPingInterval() * NANOS_PER_MS;
+ }
+
+ @Override
+ public void endpointCrashed(Endpoint endpoint) {
+ cancelFuture(endpoint);
+ }
+
+ @Override
+ public void endpointNoLongerInUse(Endpoint endpoint) {
+ cancelFuture(endpoint);
+ }
+
+ @Override
+ public void endpointNowInUse(Endpoint endpoint) {
+ try {
+ Future future = pool.getBackgroundProcessor().scheduleWithFixedDelay(
+ new PingTask(endpoint), pingIntervalNanos, pingIntervalNanos,
+ TimeUnit.NANOSECONDS);
+ taskFutures.put(endpoint, future);
+ } catch (RejectedExecutionException e) {
+ if (pool.getCancelCriterion().cancelInProgress() == null) {
+ throw e;
+ }
+ }
+ }
+
+ private void cancelFuture(Endpoint endpoint) {
+ Future future = (Future) taskFutures.remove(endpoint);
+ if(future != null) {
+ future.cancel(false);
+ }
+ }
+
+ private class PingTask extends PoolTask {
+ private final Endpoint endpoint;
+
+ public PingTask(Endpoint endpoint) {
+ this.endpoint = endpoint;
+ }
+
+ @Override
+ public void run2() {
+ if(endpoint.timeToPing(pingIntervalNanos)) {
+// logger.fine("DEBUG pinging " + server);
+ try {
+ PingOp.execute(pool, endpoint.getLocation());
+ } catch(Exception e) {
+ if(logger.isDebugEnabled()) {
+ logger.debug("Error occured while pinging server: {} - {}", endpoint.getLocation(), e.getMessage());
+ }
+ GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+ if (cache != null) {
+ ClientMetadataService cms = cache.getClientMetadataService();
+ cms.removeBucketServerLocation(endpoint.getLocation());
+ }
+ //any failure to ping the server should be considered a crash (eg.
+ //socket timeout exception, security exception, failure to connect).
+ pool.getEndpointManager().serverCrashed(endpoint);
+ }
+ } else {
+// logger.fine("DEBUG skipping ping of " + server
+// + " because lastAccessed=" + endpoint.getLastAccessed());
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/LocatorDiscoveryCallback.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/LocatorDiscoveryCallback.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/LocatorDiscoveryCallback.java
new file mode 100644
index 0000000..02ccbd9
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/LocatorDiscoveryCallback.java
@@ -0,0 +1,38 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.List;
+
+/**
+ * A callback to receive notifications about locator discovery. Currently
+ * only used internally.
+ * @author dsmith
+ * @since 5.7
+ */
+public interface LocatorDiscoveryCallback {
+
+ /**
+ * Called to indicate that new locators
+ * have been discovered
+ * @param locators a list of InetSocketAddresses of new
+ * locators that have been discovered.
+ */
+ void locatorsDiscovered(List locators);
+
+ /**
+ * Called to indicated that locators
+ * have been removed from the list
+ * of available locators.
+ * @param locators a list of InetSocketAddresses
+ * of locators that have been removed
+ */
+ void locatorsRemoved(List locators);
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/LocatorDiscoveryCallbackAdapter.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/LocatorDiscoveryCallbackAdapter.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/LocatorDiscoveryCallbackAdapter.java
new file mode 100644
index 0000000..b8ba3aa
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/LocatorDiscoveryCallbackAdapter.java
@@ -0,0 +1,27 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.List;
+
+/**
+ * A locator discovery callback that does nothing.
+ * @author dsmith
+ * @since 5.7
+ *
+ */
+public class LocatorDiscoveryCallbackAdapter implements
+ LocatorDiscoveryCallback {
+
+ public void locatorsDiscovered(List locators) {
+ }
+
+ public void locatorsRemoved(List locators) {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/MakePrimaryOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/MakePrimaryOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/MakePrimaryOp.java
new file mode 100644
index 0000000..79d2e1f
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/MakePrimaryOp.java
@@ -0,0 +1,82 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+
+/**
+ * Tell a server to become the primary host of a server-to-client queue
+ * @author darrel
+ * @since 5.7
+ */
+public class MakePrimaryOp {
+ /**
+ * Tell the given server to become the primary host of a server-to-client queue
+ * @param pool the pool to use to communicate with the server.
+ * @param conn the connection to do the execution on
+ * @param sentClientReady true if the client ready message has already been sent
+ */
+ public static void execute(ExecutablePool pool, Connection conn, boolean sentClientReady)
+ {
+ AbstractOp op = new MakePrimaryOpImpl(sentClientReady);
+ pool.executeOn(conn, op);
+ }
+
+ private MakePrimaryOp() {
+ // no instances allowed
+ }
+
+ private static class MakePrimaryOpImpl extends AbstractOp {
+ /**
+ * @throws com.gemstone.gemfire.SerializationException if serialization fails
+ */
+ public MakePrimaryOpImpl(boolean sentClientReady) {
+ super(MessageType.MAKE_PRIMARY, 1);
+ getMessage().addBytesPart(new byte[] {(byte)(sentClientReady?0x01:0x00)});
+ }
+
+ @Override
+ protected void processSecureBytes(Connection cnx, Message message)
+ throws Exception {
+ }
+
+ @Override
+ protected boolean needsUserId() {
+ return false;
+ }
+
+ @Override
+ protected void sendMessage(Connection cnx) throws Exception {
+ getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
+ getMessage().send(false);
+ }
+
+ @Override
+ protected Object processResponse(Message msg) throws Exception {
+ processAck(msg, "makePrimary");
+ return null;
+ }
+ @Override
+ protected boolean isErrorResponse(int msgType) {
+ return false;
+ }
+ @Override
+ protected long startAttempt(ConnectionStats stats) {
+ return stats.startMakePrimary();
+ }
+ @Override
+ protected void endSendAttempt(ConnectionStats stats, long start) {
+ stats.endMakePrimarySend(start, hasFailed());
+ }
+ @Override
+ protected void endAttempt(ConnectionStats stats, long start) {
+ stats.endMakePrimary(start, hasTimedOut(), hasFailed());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/Op.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/Op.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/Op.java
new file mode 100644
index 0000000..403a112
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/Op.java
@@ -0,0 +1,34 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+/**
+ * An operation to perform on a server. Used by
+ * {@link ExecutablePool} to attempt the operation on
+ * multiple servers until the retryAttempts is exceeded.
+ * @author dsmith
+ * @since 5.7
+ *
+ */
+public interface Op {
+
+ /**
+ * Attempts to execute this operation by sending its message out on the
+ * given connection, waiting for a response, and returning it.
+ * @param cnx the connection to use when attempting execution of the operation.
+ * @return the result of the operation
+ * or <code>null</code if the operation has no result.
+ * @throws Exception if the execute failed
+ */
+ Object attempt(Connection cnx) throws Exception;
+
+ /**
+ * @return true if this Op should use a threadLocalConnection, false otherwise
+ */
+ boolean useThreadLocalConnection();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/OpExecutorImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/OpExecutorImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/OpExecutorImpl.java
new file mode 100644
index 0000000..967a8c3
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/OpExecutorImpl.java
@@ -0,0 +1,964 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.NotSerializableException;
+import java.net.ConnectException;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.nio.BufferUnderflowException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.CancelCriterion;
+import com.gemstone.gemfire.CancelException;
+import com.gemstone.gemfire.CopyException;
+import com.gemstone.gemfire.GemFireException;
+import com.gemstone.gemfire.SerializationException;
+import com.gemstone.gemfire.cache.CacheRuntimeException;
+import com.gemstone.gemfire.cache.RegionDestroyedException;
+import com.gemstone.gemfire.cache.SynchronizationCommitConflictException;
+import com.gemstone.gemfire.cache.TransactionException;
+import com.gemstone.gemfire.cache.client.NoAvailableServersException;
+import com.gemstone.gemfire.cache.client.ServerConnectivityException;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.cache.client.ServerRefusedConnectionException;
+import com.gemstone.gemfire.cache.client.SubscriptionNotEnabledException;
+import com.gemstone.gemfire.cache.client.internal.ExecuteFunctionOp.ExecuteFunctionOpImpl;
+import com.gemstone.gemfire.cache.client.internal.ExecuteRegionFunctionOp.ExecuteRegionFunctionOpImpl;
+import com.gemstone.gemfire.cache.client.internal.QueueManager.QueueConnections;
+import com.gemstone.gemfire.cache.client.internal.pooling.ConnectionDestroyedException;
+import com.gemstone.gemfire.cache.client.internal.pooling.ConnectionManager;
+import com.gemstone.gemfire.cache.execute.FunctionException;
+import com.gemstone.gemfire.cache.execute.FunctionInvocationTargetException;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.cache.PoolManagerImpl;
+import com.gemstone.gemfire.internal.cache.PutAllPartialResultException;
+import com.gemstone.gemfire.internal.cache.TXManagerImpl;
+import com.gemstone.gemfire.internal.cache.TXStateProxy;
+import com.gemstone.gemfire.internal.cache.execute.InternalFunctionInvocationTargetException;
+import com.gemstone.gemfire.internal.cache.tier.BatchException;
+import com.gemstone.gemfire.internal.cache.wan.BatchException70;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
+import com.gemstone.gemfire.security.AuthenticationRequiredException;
+import com.gemstone.gemfire.security.GemFireSecurityException;
+
+/**
+ * Called from the client and execute client to server
+ * requests against servers. Handles retrying to different servers,
+ * and marking servers dead if we get exception from them.
+ * @author dsmith
+ * @since 5.7
+ */
+public class OpExecutorImpl implements ExecutablePool {
+ private static final Logger logger = LogService.getLogger();
+
+ private static final boolean TRY_SERVERS_ONCE = Boolean.getBoolean("gemfire.PoolImpl.TRY_SERVERS_ONCE");
+ private static final int TX_RETRY_ATTEMPT = Integer.getInteger("gemfire.txRetryAttempt", 500);
+
+ private final ConnectionManager connectionManager;
+ private final int retryAttempts;
+ private final long serverTimeout;
+ private final boolean threadLocalConnections;
+ private final ThreadLocal<Connection> localConnection = new ThreadLocal<Connection>();
+ /**
+ * maps serverLocations to Connections when threadLocalConnections is enabled with single-hop.
+ */
+ private final ThreadLocal<Map<ServerLocation, Connection>> localConnectionMap = new ThreadLocal<Map<ServerLocation,Connection>>();
+ private final EndpointManager endpointManager;
+ private final RegisterInterestTracker riTracker;
+ private final QueueManager queueManager;
+ private final CancelCriterion cancelCriterion;
+ private /*final*/ PoolImpl pool;
+ private final ThreadLocal<Boolean> serverAffinity = new ThreadLocal<Boolean>() {
+ @Override
+ protected Boolean initialValue() {
+ return Boolean.FALSE;};
+ };
+ private boolean serverAffinityFailover = false;
+ private final ThreadLocal<ServerLocation> affinityServerLocation = new ThreadLocal<ServerLocation>();
+ private final ThreadLocal<Integer> affinityRetryCount = new ThreadLocal<Integer>() {
+ protected Integer initialValue() {
+ return 0;
+ };
+ };
+
+ public OpExecutorImpl(ConnectionManager manager, QueueManager queueManager, EndpointManager endpointManager, RegisterInterestTracker riTracker, int retryAttempts,
+ long serverTimeout, boolean threadLocalConnections, CancelCriterion cancelCriterion, PoolImpl pool) {
+ this.connectionManager = manager;
+ this.queueManager = queueManager;
+ this.endpointManager = endpointManager;
+ this.riTracker = riTracker;
+ this.retryAttempts = retryAttempts;
+ this.serverTimeout = serverTimeout;
+ this.threadLocalConnections = threadLocalConnections;
+ this.cancelCriterion = cancelCriterion;
+ this.pool = pool;
+ }
+
+ public Object execute(Op op) {
+ return execute(op, retryAttempts);
+ }
+
+ public Object execute(Op op, int retries) {
+ if (this.serverAffinity.get()) {
+ ServerLocation loc = this.affinityServerLocation.get();
+ if (loc == null) {
+ loc = getNextOpServerLocation();
+ this.affinityServerLocation.set(loc);
+ if (logger.isDebugEnabled()) {
+ logger.debug("setting server affinity to {}", this.affinityServerLocation.get());
+ }
+ }
+ return executeWithServerAffinity(loc, op);
+ }
+ boolean success = false;
+
+ Set attemptedServers = new HashSet();
+
+ Connection conn = (Connection) (threadLocalConnections ? localConnection.get() : null);
+ if (conn == null || conn.isDestroyed()) {
+ conn = connectionManager.borrowConnection(serverTimeout);
+ }
+ else if (threadLocalConnections) {
+ //Fix for 43718. Clear the thread local connection
+ //while we're performing the op. It will be reset
+ //if the op succeeds.
+ localConnection.set(null);
+ try {
+ this.connectionManager.activate(conn);
+ }
+ catch (ConnectionDestroyedException ex) {
+ conn = connectionManager.borrowConnection(serverTimeout);
+ }
+ }
+ try {
+ for(int attempt = 0; true; attempt++) {
+ // when an op is retried we may need to try to recover the previous
+ // attempt's version stamp
+ if (attempt == 1 && (op instanceof AbstractOp)) {
+ AbstractOp absOp = (AbstractOp)op;
+ absOp.getMessage().setIsRetry();
+ }
+ try {
+ authenticateIfRequired(conn, op);
+ Object result = executeWithPossibleReAuthentication(conn, op);
+ success = true;
+ return result;
+ }
+ catch (Exception e) {
+ //This method will throw an exception if we need to stop
+ //It also unsets the threadlocal connection and notifies
+ //the connection manager if there are failures.
+ handleException(e, conn, attempt, attempt >= retries && retries != -1);
+ attemptedServers.add(conn.getServer());
+ try {
+ conn = connectionManager.exchangeConnection(conn, attemptedServers, serverTimeout);
+ }
+ catch(NoAvailableServersException nse) {
+ //if retries is -1, don't try again after the last server has failed
+ if(retries == -1 || TRY_SERVERS_ONCE) {
+ handleException(e, conn, attempt, true);
+ }
+ else {
+ //try one of the failed servers again, until we exceed the retry attempts.
+ attemptedServers.clear();
+ try {
+ conn = connectionManager.exchangeConnection(conn, attemptedServers, serverTimeout);
+ }
+ catch(NoAvailableServersException nse2) {
+ handleException(e, conn, attempt, true);
+ }
+ }
+ }
+ }
+ }
+ } finally {
+ if(threadLocalConnections) {
+ this.connectionManager.passivate(conn, success);
+ //Fix for 43718. If the thread local was set to a different
+ //connection deeper in the call stack, return that connection
+ //and set our connection on the thread local.
+ Connection existingConnection = localConnection.get();
+ if(existingConnection != null && existingConnection != conn) {
+ connectionManager.returnConnection(existingConnection);
+ }
+
+ if(!conn.isDestroyed()) {
+ localConnection.set(conn);
+ } else {
+ localConnection.set(null);
+ }
+ } else {
+ connectionManager.returnConnection(conn);
+ }
+ }
+ }
+
+ /**
+ * execute the given op on the given server. If the server cannot
+ * be reached, sends a TXFailoverOp, then retries the given op
+ * @param loc the server to execute the op on
+ * @param op the op to execute
+ * @return the result of execution
+ */
+ private Object executeWithServerAffinity(ServerLocation loc, Op op) {
+ try {
+ Object retVal = executeOnServer(loc, op, true, false);
+ affinityRetryCount.set(0);
+ return retVal;
+ } catch (ServerConnectivityException e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("caught exception while executing with affinity:{}", e.getMessage(), e);
+ }
+ if (!this.serverAffinityFailover || e instanceof ServerOperationException) {
+ affinityRetryCount.set(0);
+ throw e;
+ }
+ int retryCount = affinityRetryCount.get();
+ if ((retryAttempts != -1 && retryCount >= retryAttempts) ||
+ retryCount > TX_RETRY_ATTEMPT) { // prevent stack overflow fixes bug 46535
+ affinityRetryCount.set(0);
+ throw e;
+ }
+ affinityRetryCount.set(retryCount + 1);
+ }
+ this.affinityServerLocation.set(null);
+ if (logger.isDebugEnabled()) {
+ logger.debug("reset server affinity: attempting txFailover");
+ }
+ // send TXFailoverOp, so that new server can
+ // do bootstrapping, then re-execute original op
+ AbstractOp absOp = (AbstractOp) op;
+ absOp.getMessage().setIsRetry();
+ int transactionId = absOp.getMessage().getTransactionId();
+ // for CommitOp we do not have transactionId in AbstractOp
+ // so set it explicitly for TXFailoverOp
+ try {
+ TXFailoverOp.execute(this.pool, transactionId);
+ } catch (TransactionException e) {
+ // If this is the first operation in the transaction then
+ // do not throw TransactionDataNodeHasDeparted back to the
+ // user, re-try the op instead. fixes bug 44375. NOTE: TXFailoverOp
+ // is sent even after first op, as it is not known if the first
+ // operation has established a TXState already
+ TXStateProxy txState = TXManagerImpl.getCurrentTXState();
+ if (txState == null) {
+ throw e;
+ } else if (txState.operationCount() > 1) {
+ throw e;
+ }
+ }
+ if(op instanceof ExecuteRegionFunctionOpImpl){
+ op = new ExecuteRegionFunctionOpImpl(
+ (ExecuteRegionFunctionOpImpl)op, (byte)1/*isReExecute*/, new HashSet<String>());
+ ((ExecuteRegionFunctionOpImpl)op).getMessage().setTransactionId(transactionId);
+ }else if (op instanceof ExecuteFunctionOpImpl){
+ op = new ExecuteFunctionOpImpl(
+ (ExecuteFunctionOpImpl)op, (byte)1/*isReExecute*/);
+ ((ExecuteFunctionOpImpl)op).getMessage().setTransactionId(transactionId);
+ }
+ return this.pool.execute(op);
+ }
+
+ public void setupServerAffinity(boolean allowFailover) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("setting up server affinity");
+ }
+ this.serverAffinityFailover = allowFailover;
+ this.serverAffinity.set(Boolean.TRUE);
+ }
+
+ public void releaseServerAffinity() {
+ if (logger.isDebugEnabled()) {
+ logger.debug("reset server affinity");
+ }
+ this.serverAffinity.set(Boolean.FALSE);
+ this.affinityServerLocation.set(null);
+ }
+
+ public ServerLocation getServerAffinityLocation() {
+ return this.affinityServerLocation.get();
+ }
+
+ public void setServerAffinityLocation(ServerLocation serverLocation) {
+ assert this.affinityServerLocation.get() == null;
+ this.affinityServerLocation.set(serverLocation);
+ }
+
+ public ServerLocation getNextOpServerLocation() {
+ ServerLocation retVal = null;
+ Connection conn = (Connection) (threadLocalConnections ? localConnection.get() : null);
+ if (conn == null || conn.isDestroyed()) {
+ conn = connectionManager.borrowConnection(serverTimeout);
+ retVal = conn.getServer();
+ this.connectionManager.returnConnection(conn);
+ } else {
+ retVal = conn.getServer();
+ }
+ return retVal;
+ }
+
+ /* (non-Javadoc)
+ * @see com.gemstone.gemfire.cache.client.internal.OpExecutor#executeOn(com.gemstone.gemfire.distributed.internal.ServerLocation, com.gemstone.gemfire.cache.client.internal.Op)
+ */
+ public Object executeOn(ServerLocation server, Op op) {
+ return executeOn(server, op, true,false);
+ }
+ public Object executeOn(ServerLocation p_server, Op op, boolean accessed,boolean onlyUseExistingCnx) {
+ ServerLocation server = p_server;
+ if (this.serverAffinity.get()) {
+ ServerLocation affinityserver = this.affinityServerLocation.get();
+ if (affinityserver != null) {
+ server = affinityserver;
+ } else {
+ this.affinityServerLocation.set(server);
+ }
+ // redirect to executeWithServerAffinity so that we
+ // can send a TXFailoverOp.
+ return executeWithServerAffinity(server, op);
+ }
+ return executeOnServer(server, op, accessed, onlyUseExistingCnx);
+ }
+ private Object executeOnServer(ServerLocation p_server, Op op, boolean accessed,boolean onlyUseExistingCnx) {
+ ServerLocation server = p_server;
+ boolean returnCnx = true;
+ boolean pingOp = (op instanceof PingOp.PingOpImpl);
+ Connection conn = null;
+ if (pingOp) {
+ // currently for pings we prefer to queue clientToServer cnx so that we will
+ // not create a pooled cnx when all we have is queue cnxs.
+ if (this.queueManager != null) {
+ // see if our QueueManager has a connection to this guy that we can send
+ // the ping on.
+ Endpoint ep = (Endpoint)this.endpointManager.getEndpointMap().get(server);
+ if (ep != null) {
+ QueueConnections qcs = this.queueManager.getAllConnectionsNoWait();
+ conn = qcs.getConnection(ep);
+ if (conn != null) {
+ // we found one to do the ping on
+ returnCnx = false;
+ }
+ }
+ }
+ }
+ if (conn == null) {
+ if (useThreadLocalConnection(op, pingOp)) {
+ // no need to set threadLocal to null while the op is in progress since
+ // 43718 does not impact single-hop
+ conn = getActivatedThreadLocalConnectionForSingleHop(server, onlyUseExistingCnx);
+ returnCnx = false;
+ } else {
+ conn = connectionManager.borrowConnection(server, serverTimeout,onlyUseExistingCnx);
+ }
+ }
+ boolean success = true;
+ try {
+ return executeWithPossibleReAuthentication(conn, op);
+ } catch (Exception e) {
+ success = false;
+ //This method will throw an exception if we need to stop
+ //It also unsets the threadlocal connection and notifies
+ //the connection manager if there are failures.
+ handleException(e, conn, 0, true);
+ //this shouldn't actually be reached, handle exception will throw something
+ throw new ServerConnectivityException("Received error connecting to server", e);
+ } finally {
+ if (this.serverAffinity.get() && this.affinityServerLocation.get() == null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("setting server affinity to {} server:{}", conn.getEndpoint().getMemberId(), conn.getServer());
+ }
+ this.affinityServerLocation.set(conn.getServer());
+ }
+ if (useThreadLocalConnection(op, pingOp)) {
+ this.connectionManager.passivate(conn, success);
+ setThreadLocalConnectionForSingleHop(server, conn);
+ }
+ if (returnCnx) {
+ connectionManager.returnConnection(conn, accessed);
+ }
+ }
+ }
+
+ private boolean useThreadLocalConnection(Op op, boolean pingOp) {
+ return threadLocalConnections && !pingOp && op.useThreadLocalConnection();
+ }
+
+ /**
+ * gets a connection to the given serverLocation either by looking up the threadLocal {@link #localConnectionMap}.
+ * If a connection does not exist (or has been destroyed) we borrow one from connectionManager.
+ * @return the activated connection
+ */
+ private Connection getActivatedThreadLocalConnectionForSingleHop(ServerLocation server, boolean onlyUseExistingCnx) {
+ assert threadLocalConnections;
+ Connection conn = null;
+ Map<ServerLocation, Connection> connMap = this.localConnectionMap.get();
+ if (connMap != null && !connMap.isEmpty()) {
+ conn = connMap.get(server);
+ }
+ boolean borrow = true;
+ if (conn != null) {
+ try {
+ this.connectionManager.activate(conn);
+ borrow = false;
+ if (!conn.getServer().equals(server)) {
+ // poolLoadConditioningMonitor can replace the connection's
+ // endpoint from underneath us. fixes bug 45151
+ borrow = true;
+ }
+ } catch (ConnectionDestroyedException e) {
+ }
+ }
+ if (conn == null || borrow) {
+ conn = connectionManager.borrowConnection(server, serverTimeout, onlyUseExistingCnx);
+ }
+ if (borrow && connMap != null) {
+ connMap.remove(server);
+ }
+ return conn;
+ }
+
+ /**
+ * initializes the threadLocal {@link #localConnectionMap} and adds mapping
+ * of serverLocation to Connection.
+ */
+ private void setThreadLocalConnectionForSingleHop(ServerLocation server,
+ Connection conn) {
+ assert threadLocalConnections;
+ Map<ServerLocation, Connection> connMap = this.localConnectionMap.get();
+ if (connMap == null) {
+ connMap = new HashMap<ServerLocation, Connection>();
+ this.localConnectionMap.set(connMap);
+ }
+ connMap.put(server, conn);
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see com.gemstone.gemfire.cache.client.internal.ExecutablePool#executeOnPrimary(com.gemstone.gemfire.cache.client.internal.Op)
+ */
+ public Object executeOnPrimary(Op op) {
+ if(queueManager == null) {
+ throw new SubscriptionNotEnabledException();
+ }
+
+ HashSet attemptedPrimaries = new HashSet();
+ while(true) {
+ Connection primary = queueManager.getAllConnections().getPrimary();
+ try {
+ return executeWithPossibleReAuthentication(primary, op);
+ } catch (Exception e) {
+ boolean finalAttempt = ! attemptedPrimaries.add(primary.getServer());
+ handleException(e, primary, 0, finalAttempt);
+ //we shouldn't reach this code, but just in case
+ if(finalAttempt) {
+ throw new ServerConnectivityException("Tried the same primary server twice.", e);
+ }
+ }
+ }
+ }
+
+ public void executeOnAllQueueServers(Op op) {
+ if(queueManager == null) {
+ throw new SubscriptionNotEnabledException();
+ }
+
+ RuntimeException lastException = null;
+
+ QueueConnections connections = queueManager.getAllConnectionsNoWait();
+ Connection primary = connections.getPrimary();
+ if(primary != null) {
+ try {
+ executeWithPossibleReAuthentication(primary, op);
+ } catch (Exception e) {
+ try {
+ handleException(e, primary, 0, false);
+ } catch(RuntimeException e2) {
+ lastException = e2;
+ }
+ }
+ }
+
+ List backups = connections.getBackups();
+ for(int i = 0; i < backups.size(); i++) {
+ Connection conn = (Connection) backups.get(i);
+ try {
+ executeWithPossibleReAuthentication(conn, op);
+ } catch (Exception e) {
+ try {
+ handleException(e, conn, 0, false);
+ } catch(RuntimeException e2) {
+ lastException = e2;
+ }
+ }
+ }
+
+ if (lastException != null) {
+ throw lastException;
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see com.gemstone.gemfire.cache.client.internal.ExecutablePool#executeOnAllQueueServers(com.gemstone.gemfire.cache.client.internal.Op)
+ */
+ public Object executeOnQueuesAndReturnPrimaryResult(Op op) {
+ if(queueManager == null) {
+ throw new SubscriptionNotEnabledException();
+ }
+ QueueConnections connections = queueManager.getAllConnections();
+
+ List backups = connections.getBackups();
+ if (logger.isTraceEnabled(LogMarker.BRIDGE_SERVER)) {
+ logger.trace(LogMarker.BRIDGE_SERVER, "sending {} to backups: {}", op, backups);
+ }
+ for(int i = backups.size() - 1; i >= 0; i--) {
+ Connection conn = (Connection) backups.get(i);
+ try {
+ executeWithPossibleReAuthentication(conn, op);
+ } catch (Exception e) {
+ handleException(e, conn, 0, false);
+ }
+ }
+
+ Connection primary = connections.getPrimary();
+ HashSet attemptedPrimaries = new HashSet();
+ while(true) {
+ try {
+ if (logger.isTraceEnabled(LogMarker.BRIDGE_SERVER)) {
+ logger.trace(LogMarker.BRIDGE_SERVER, "sending {} to primary: {}", op, primary);
+ }
+ return executeWithPossibleReAuthentication(primary, op);
+ } catch (Exception e) {
+ if (logger.isTraceEnabled(LogMarker.BRIDGE_SERVER)) {
+ logger.trace(LogMarker.BRIDGE_SERVER, "caught exception sending to primary {}", e.getMessage(), e);
+ }
+ boolean finalAttempt = !attemptedPrimaries.add(primary.getServer());
+ handleException(e, primary, 0, finalAttempt);
+ primary = queueManager.getAllConnections().getPrimary();
+ //we shouldn't reach this code, but just in case
+ if(finalAttempt) {
+ throw new ServerConnectivityException("Tried the same primary server twice.", e);
+ }
+ }
+ }
+ }
+
+ public void releaseThreadLocalConnection() {
+ Connection conn = localConnection.get();
+ localConnection.set(null);
+ if(conn != null) {
+ connectionManager.returnConnection(conn);
+ }
+ Map<ServerLocation, Connection> connMap = localConnectionMap.get();
+ localConnectionMap.set(null);
+ if (connMap != null) {
+ for (Connection c : connMap.values()) {
+ connectionManager.returnConnection(c);
+ }
+ }
+ }
+
+ /**
+ * Used by GatewayBatchOp
+ */
+ public Object executeOn(Connection conn, Op op, boolean timeoutFatal) {
+ try {
+ return executeWithPossibleReAuthentication(conn, op);
+ } catch (Exception e) {
+ //This method will throw an exception if we need to stop
+ //It also unsets the threadlocal connection and notifies
+ //the connection manager if there are failures.
+ handleException(op, e, conn, 0, true, timeoutFatal);
+ //this shouldn't actually be reached, handle exception will throw something
+ throw new ServerConnectivityException("Received error connecting to server", e);
+ }
+ }
+ /**
+ * This is used by unit tests
+ */
+ public Object executeOn(Connection conn, Op op) {
+ return executeOn(conn, op, false);
+ }
+
+ public RegisterInterestTracker getRITracker() {
+ return riTracker;
+ }
+
+ protected void handleException(Throwable e,
+ Connection conn,
+ int retryCount, boolean finalAttempt) {
+ handleException(e, conn, retryCount, finalAttempt, false/*timeoutFatal*/);
+ }
+
+ protected void handleException(Op op,
+ Throwable e,
+ Connection conn,
+ int retryCount,
+ boolean finalAttempt,
+ boolean timeoutFatal)
+ throws CacheRuntimeException {
+ if (op instanceof AuthenticateUserOp.AuthenticateUserOpImpl) {
+ if (e instanceof GemFireSecurityException) {
+ throw (GemFireSecurityException)e;
+ } else if (e instanceof ServerRefusedConnectionException) {
+ throw (ServerRefusedConnectionException)e;
+ }
+ }
+ handleException(e, conn, retryCount, finalAttempt, timeoutFatal);
+ }
+
+ protected void handleException(Throwable e,
+ Connection conn,
+ int retryCount,
+ boolean finalAttempt,
+ boolean timeoutFatal)
+ throws CacheRuntimeException
+ {
+ GemFireException exToThrow = null;
+ String title;
+ boolean invalidateServer = true;
+ boolean warn = true;
+ boolean forceThrow = false;
+ Throwable cause = e;
+
+ cancelCriterion.checkCancelInProgress(e);
+
+ if(logger.isDebugEnabled() && !(e instanceof java.io.EOFException)) {
+ if (e instanceof java.io.EOFException){
+ logger.debug("OpExecutor.handleException on Connection to {} found EOF", conn.getServer());
+ } else if (e instanceof java.net.SocketTimeoutException) {
+ logger.debug("OpExecutor.handleException on Connection to {} read timed out", conn.getServer());
+ } else {
+ logger.debug("OpExecutor.handleException on Connection to {}", conn.getServer(),e);
+ }
+ }
+ if (e instanceof NotSerializableException) {
+ title = null; //no message
+ exToThrow = new SerializationException("Pool message failure", e);
+ }
+ else if (e instanceof BatchException || e instanceof BatchException70) {
+ title = null; //no message
+ exToThrow = new ServerOperationException(e);
+ }
+ else if (e instanceof RegionDestroyedException) {
+ invalidateServer = false;
+ title = null;
+ exToThrow =(RegionDestroyedException) e;
+ }
+ else if (e instanceof GemFireSecurityException) {
+ title = null;
+ exToThrow = new ServerOperationException(e);
+ }
+ else if (e instanceof SerializationException) {
+ title = null; // no message
+ exToThrow = new ServerOperationException(e);
+ }
+ else if (e instanceof CopyException) {
+ title = null; // no message
+ exToThrow = new ServerOperationException(e);
+ }
+ else if (e instanceof ClassNotFoundException) {
+ title = null; // no message
+ exToThrow = new ServerOperationException(e);
+ }
+ else if (e instanceof TransactionException) {
+ title = null; // no message
+ exToThrow = (TransactionException)e;
+ invalidateServer = false;
+ }
+ else if (e instanceof SynchronizationCommitConflictException) {
+ title = null;
+ exToThrow = (SynchronizationCommitConflictException)e;
+ invalidateServer = false;
+ }
+ else if (e instanceof SocketException) {
+ if ("Socket closed".equals(e.getMessage())
+ || "Connection reset".equals(e.getMessage())
+ || "Connection refused: connect".equals(e.getMessage())
+ || "Connection refused".equals(e.getMessage())) {
+ title = e.getMessage();
+ } else {
+ title = "SocketException";
+ }
+ }
+ else if (e instanceof SocketTimeoutException) {
+ invalidateServer = timeoutFatal;
+ title = "socket timed out on client";
+ cause = null;
+ }
+ else if (e instanceof ConnectionDestroyedException) {
+ invalidateServer = false;
+ title = "connection was asynchronously destroyed";
+ cause = null;
+ }
+ else if (e instanceof java.io.EOFException) {
+ /*
+// it is still listening so make this into a timeout exception
+ invalidateServer = false;
+ title = "socket closed on server";
+ SocketTimeoutException ste = new SocketTimeoutException(title);
+ ste.setStackTrace(e.getStackTrace());
+ e = ste;
+ cause = null;
+ */
+
+ /*
+ * note: the old code in ConnectionProxyImpl used to create a new socket here to the server to determine if it really crashed.
+ * We may have to add this back in for some reason, but hopefully not.
+ *
+ * note 05/21/08: an attempt to address this was made by increasing the time waited on server before closing timeoutd clients
+ * see ServerConnection.hasBeenTimedOutOnClient
+ */
+ title = "closed socket on server";
+ }
+ else if (e instanceof IOException) {
+ title = "IOException";
+ }
+ else if (e instanceof BufferUnderflowException) {
+ title = "buffer underflow reading from server";
+ }
+ else if (e instanceof CancelException) {
+ title = "Cancelled";
+ warn = false;
+ }
+ else if (e instanceof InternalFunctionInvocationTargetException) {
+ //In this case, function will be re executed
+ title = null;
+ exToThrow = (InternalFunctionInvocationTargetException)e;
+ }
+ else if (e instanceof FunctionInvocationTargetException) {
+ //in this case function will not be re executed
+ title = null;
+ exToThrow = (GemFireException)e;
+ }
+ else if (e instanceof PutAllPartialResultException) {
+ title = null;
+ exToThrow =(PutAllPartialResultException) e;
+ invalidateServer = false;
+ }
+ else {
+ Throwable t = e.getCause();
+ if ((t instanceof ConnectException)
+ || (t instanceof SocketException)
+ || (t instanceof SocketTimeoutException)
+ || (t instanceof IOException)
+ || (t instanceof SerializationException)
+ || (t instanceof CopyException)
+ || (t instanceof GemFireSecurityException)
+ || (t instanceof ServerOperationException)
+ || (t instanceof TransactionException)
+ || (t instanceof CancelException)) {
+ handleException(t, conn, retryCount, finalAttempt, timeoutFatal);
+ return;
+ } else if (e instanceof ServerOperationException) {
+ title = null; // no message
+ exToThrow = (ServerOperationException)e;
+ invalidateServer = false; // fix for bug #42225
+ }
+ else if (e instanceof FunctionException) {
+ if (t instanceof InternalFunctionInvocationTargetException) {
+ // Client server to re-execute for node failure
+ handleException(t, conn, retryCount, finalAttempt, timeoutFatal);
+ return;
+ }
+ else {
+ title = null; // no message
+ exToThrow = (FunctionException)e;
+ }
+ } else if (e instanceof ServerConnectivityException
+ && e.getMessage()
+ .equals("Connection error while authenticating user")) {
+ title = null;
+ if (logger.isDebugEnabled()) {
+ logger.debug(e.getMessage(), e);
+ }
+ } else {
+ title = e.toString();
+ forceThrow = true;
+ }
+ }
+ if (title != null) {
+ conn.destroy();
+ if(invalidateServer) {
+ endpointManager.serverCrashed(conn.getEndpoint());
+ }
+ boolean logEnabled = warn ? logger.isWarnEnabled() : logger.isDebugEnabled();
+ boolean msgNeeded = logEnabled || finalAttempt;
+ if (msgNeeded) {
+ final StringBuffer sb = getExceptionMessage(title, retryCount, finalAttempt, conn, e);
+ final String msg = sb.toString();
+ if (logEnabled) {
+ if (warn) {
+ logger.warn(msg /*, e*/);
+ } else {
+ logger.debug(msg /*, e*/);
+ }
+ }
+ if (forceThrow || finalAttempt) {
+ exToThrow = new ServerConnectivityException(msg, cause);
+ }
+ }
+ }
+ if (exToThrow != null) {
+ throw exToThrow;
+ }
+ }
+
+ private StringBuffer getExceptionMessage(String exceptionName,
+ int retryCount,
+ boolean finalAttempt,
+ Connection connection,
+ Throwable ex) {
+ StringBuffer message = new StringBuffer(200);
+ message
+ .append("Pool unexpected ")
+ .append(exceptionName);
+ if (connection != null) {
+ message
+ .append(" connection=")
+ .append(connection);
+ }
+ if (retryCount > 0) {
+ message
+ .append(" attempt=")
+ .append(retryCount+1);
+ }
+ message.append(')');
+ if (finalAttempt) {
+ message
+ .append(". Server unreachable: could not connect after ")
+ .append(retryCount+1)
+ .append(" attempts");
+ }
+ return message;
+ }
+
+ public Connection getThreadLocalConnection() {
+ return localConnection.get();
+ }
+
+ public void setThreadLocalConnection(Connection conn) {
+ localConnection.set(conn);
+ }
+
+ private void authenticateIfRequired(Connection conn, Op op) {
+ if (!conn.getServer().getRequiresCredentials()) {
+ return;
+ }
+
+ if (this.pool == null) {
+ PoolImpl poolImpl = (PoolImpl)PoolManagerImpl.getPMI().find(
+ this.endpointManager.getPoolName());
+ if (poolImpl == null) {
+ return;
+ }
+ this.pool = poolImpl;
+ }
+ if (this.pool.getMultiuserAuthentication()) {
+ if (((AbstractOp)op).needsUserId()) {
+ UserAttributes ua = UserAttributes.userAttributes.get();
+ if (ua != null) {
+ if (!ua.getServerToId().containsKey(conn.getServer())) {
+ authenticateMultiuser(this.pool, conn, ua);
+ }
+ } else {
+ // This should never be reached.
+ }
+ }
+ } else if (((AbstractOp)op).needsUserId()) {
+ // This should not be reached, but keeping this code here in case it is
+ // reached.
+ if (conn.getServer().getUserId() == -1) {
+ Connection connImpl = this.connectionManager.getConnection(conn);
+ conn.getServer().setUserId(
+ (Long)AuthenticateUserOp.executeOn(connImpl, this.pool));
+ if (logger.isDebugEnabled()) {
+ logger.debug("OpExecutorImpl.execute() - single user mode - authenticated this user on {}", conn);
+ }
+ }
+ }
+ }
+
+ private void authenticateMultiuser(PoolImpl pool, Connection conn,
+ UserAttributes ua) {
+ try {
+ Long userId = (Long)AuthenticateUserOp.executeOn(conn.getServer(),
+ pool, ua.getCredentials());
+ if (userId != null) {
+ ua.setServerToId(conn.getServer(), userId);
+ if (logger.isDebugEnabled()) {
+ logger.debug("OpExecutorImpl.execute() - multiuser mode - authenticated this user on {}", conn);
+ }
+ }
+ } catch (ServerConnectivityException sce) {
+ Throwable cause = sce.getCause();
+ if (cause instanceof SocketException
+ || cause instanceof EOFException
+ || cause instanceof IOException
+ || cause instanceof BufferUnderflowException
+ || cause instanceof CancelException
+ || (sce.getMessage() != null && (sce.getMessage().indexOf(
+ "Could not create a new connection to server") != -1
+ || sce.getMessage().indexOf("socket timed out on client") != -1 || sce
+ .getMessage().indexOf(
+ "connection was asynchronously destroyed") != -1))) {
+ throw new ServerConnectivityException(
+ "Connection error while authenticating user");
+ } else {
+ throw sce;
+ }
+ }
+ }
+
+ private Object executeWithPossibleReAuthentication(Connection conn, Op op)
+ throws Exception {
+ try {
+ return conn.execute(op);
+
+ } catch (ServerConnectivityException sce) {
+ Throwable cause = sce.getCause();
+ if ((cause instanceof AuthenticationRequiredException
+ && "User authorization attributes not found.".equals(cause
+ .getMessage()))
+ || sce.getMessage().contains(
+ "Connection error while authenticating user")) {
+ // (ashetkar) Need a cleaner way of doing above check.
+ // 2nd exception-message above is from AbstractOp.sendMessage()
+
+ PoolImpl pool = (PoolImpl)PoolManagerImpl.getPMI().find(
+ this.endpointManager.getPoolName());
+ if (!pool.getMultiuserAuthentication()) {
+ Connection connImpl = this.connectionManager.getConnection(conn);
+ conn.getServer().setUserId(
+ (Long)AuthenticateUserOp.executeOn(connImpl, this));
+ return conn.execute(op);
+ } else {
+ UserAttributes ua = UserAttributes.userAttributes.get();
+ if (ua != null) {
+ authenticateMultiuser(pool, conn, ua);
+ }
+ return conn.execute(op);
+ }
+
+ } else {
+ throw sce;
+ }
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PdxRegistryRecoveryListener.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PdxRegistryRecoveryListener.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PdxRegistryRecoveryListener.java
new file mode 100644
index 0000000..c70e312
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PdxRegistryRecoveryListener.java
@@ -0,0 +1,81 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.pdx.internal.TypeRegistry;
+
+/**
+ * A listener which will wipe out the PDX registry on the client side if the
+ * entire server distributed system was lost and came back on line. <br>
+ * <br>
+ * TODO - There is a window in which all of the servers could crash and come
+ * back up and we would connect to a new server before realizing that all the
+ * servers crashed. To fix this, we would need to get some kind of birthdate of
+ * the server ds we connect and use that to decide if we need to recover
+ * the PDX registry.
+ *
+ * We can also lose connectivity with the servers, even if the servers are still
+ * running. Maybe for the PDX registry we need some way of telling if the PDX
+ * registry was lost at the server side in the interval.
+ *
+ *
+ * @author dsmith
+ *
+ */
+public class PdxRegistryRecoveryListener extends EndpointManager.EndpointListenerAdapter {
+ private static final Logger logger = LogService.getLogger();
+
+ private final AtomicInteger endpointCount = new AtomicInteger();
+ private final InternalPool pool;
+
+ public PdxRegistryRecoveryListener(InternalPool pool) {
+ this.pool = pool;
+ }
+
+ @Override
+ public void endpointCrashed(Endpoint endpoint) {
+ int count = endpointCount.decrementAndGet();
+ if (logger.isDebugEnabled()) {
+ logger.debug("PdxRegistryRecoveryListener - EndpointCrashed. Now have {} endpoints", count);
+ }
+ }
+
+ @Override
+ public void endpointNoLongerInUse(Endpoint endpoint) {
+ int count = endpointCount.decrementAndGet();
+ if (logger.isDebugEnabled()) {
+ logger.debug("PdxRegistryRecoveryListener - EndpointNoLongerInUse. Now have {} endpoints", count);
+ }
+ }
+
+ @Override
+ public void endpointNowInUse(Endpoint endpoint) {
+ int count = endpointCount.incrementAndGet();
+ if (logger.isDebugEnabled()) {
+ logger.debug("PdxRegistryRecoveryListener - EndpointNowInUse. Now have {} endpoints", count);
+ }
+ if(count == 1) {
+ GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+ if(cache == null) {
+ return;
+ }
+ TypeRegistry registry = cache.getPdxRegistry();
+
+ if(registry == null) {
+ return;
+ }
+ registry.clear();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PingOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PingOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PingOp.java
new file mode 100644
index 0000000..ac32e39
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PingOp.java
@@ -0,0 +1,104 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+
+/**
+ * Ping a server to see if it is still alive.
+ * @author darrel
+ * @since 5.7
+ */
+public class PingOp {
+ /**
+ * Ping the specified server to see if it is still alive
+ * @param pool the pool to use to communicate with the server.
+ * @param server the server to do the execution on
+ */
+ public static void execute(ExecutablePool pool, ServerLocation server)
+ {
+ AbstractOp op = new PingOpImpl();
+ pool.executeOn(server, op, false,false);
+ }
+
+ private PingOp() {
+ // no instances allowed
+ }
+
+ static class PingOpImpl extends AbstractOp {
+
+ private long startTime;
+
+ /**
+ * @throws com.gemstone.gemfire.SerializationException if serialization fails
+ */
+ public PingOpImpl() {
+ super(MessageType.PING, 0);
+ }
+
+ @Override
+ protected void processSecureBytes(Connection cnx, Message message)
+ throws Exception {
+ Message.messageType.set(null);
+ }
+
+ @Override
+ protected boolean needsUserId() {
+ return false;
+ }
+
+ @Override
+ protected void sendMessage(Connection cnx) throws Exception {
+ getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
+ startTime = System.currentTimeMillis();
+ getMessage().send(false);
+ Message.messageType.set(MessageType.PING);
+ }
+
+ @Override
+ protected Object processResponse(Message msg) throws Exception {
+ processAck(msg, "ping");
+ final int msgType = msg.getMessageType();
+ if (msgType == MessageType.REPLY && msg.getNumberOfParts() > 1) {
+ long endTime = System.currentTimeMillis();
+ long serverTime = msg.getPart(1).getLong();
+ // the new clock offset is computed assuming that the server's timestamp was
+ // taken mid-way between when the ping was sent and the reply was
+ // received:
+ // timestampElapsedTime = (endTime - startTime)/2
+ // localTime = startTime + timestampElapsedTime
+ // offsetFromServer = serverTime - localTime
+ long newCacheTimeOffset = serverTime - startTime/2 - endTime/2;
+ InternalDistributedSystem ds = InternalDistributedSystem.getConnectedInstance();
+ if (ds != null && ds.isLoner()) { // check for loner so we don't jump time offsets across WAN connections
+ ds.getClock().setCacheTimeOffset(null, newCacheTimeOffset, false);
+ }
+ }
+ return null;
+ }
+ @Override
+ protected boolean isErrorResponse(int msgType) {
+ return false;
+ }
+ @Override
+ protected long startAttempt(ConnectionStats stats) {
+ return stats.startPing();
+ }
+ @Override
+ protected void endSendAttempt(ConnectionStats stats, long start) {
+ stats.endPingSend(start, hasFailed());
+ }
+ @Override
+ protected void endAttempt(ConnectionStats stats, long start) {
+ stats.endPing(start, hasTimedOut(), hasFailed());
+ }
+ }
+}