You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/02/22 19:36:44 UTC
[82/83] [abbrv] incubator-geode git commit: GEODE-917: Merge branch
'feature/GEODE-917' into develop
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/OpExecutorImpl.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/OpExecutorImpl.java
index 0000000,ca14b76..5d05fe5
mode 000000,100644..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/OpExecutorImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/OpExecutorImpl.java
@@@ -1,0 -1,973 +1,985 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package com.gemstone.gemfire.cache.client.internal;
+
+ import java.io.EOFException;
+ import java.io.IOException;
+ import java.io.NotSerializableException;
+ import java.net.ConnectException;
+ import java.net.SocketException;
+ import java.net.SocketTimeoutException;
+ import java.nio.BufferUnderflowException;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+
+ import org.apache.logging.log4j.Logger;
+
+ import com.gemstone.gemfire.CancelCriterion;
+ import com.gemstone.gemfire.CancelException;
+ import com.gemstone.gemfire.CopyException;
+ import com.gemstone.gemfire.GemFireException;
++import com.gemstone.gemfire.GemFireIOException;
+ import com.gemstone.gemfire.SerializationException;
+ import com.gemstone.gemfire.cache.CacheRuntimeException;
+ import com.gemstone.gemfire.cache.RegionDestroyedException;
+ import com.gemstone.gemfire.cache.SynchronizationCommitConflictException;
+ import com.gemstone.gemfire.cache.TransactionException;
+ import com.gemstone.gemfire.cache.client.NoAvailableServersException;
+ import com.gemstone.gemfire.cache.client.ServerConnectivityException;
+ import com.gemstone.gemfire.cache.client.ServerOperationException;
+ import com.gemstone.gemfire.cache.client.ServerRefusedConnectionException;
+ import com.gemstone.gemfire.cache.client.SubscriptionNotEnabledException;
+ import com.gemstone.gemfire.cache.client.internal.ExecuteFunctionOp.ExecuteFunctionOpImpl;
+ import com.gemstone.gemfire.cache.client.internal.ExecuteRegionFunctionOp.ExecuteRegionFunctionOpImpl;
+ import com.gemstone.gemfire.cache.client.internal.QueueManager.QueueConnections;
+ import com.gemstone.gemfire.cache.client.internal.pooling.ConnectionDestroyedException;
+ import com.gemstone.gemfire.cache.client.internal.pooling.ConnectionManager;
+ import com.gemstone.gemfire.cache.execute.FunctionException;
+ import com.gemstone.gemfire.cache.execute.FunctionInvocationTargetException;
+ import com.gemstone.gemfire.distributed.internal.ServerLocation;
+ import com.gemstone.gemfire.internal.cache.PoolManagerImpl;
+ import com.gemstone.gemfire.internal.cache.PutAllPartialResultException;
+ import com.gemstone.gemfire.internal.cache.TXManagerImpl;
+ import com.gemstone.gemfire.internal.cache.TXStateProxy;
+ import com.gemstone.gemfire.internal.cache.execute.InternalFunctionInvocationTargetException;
+ import com.gemstone.gemfire.internal.cache.tier.BatchException;
++import com.gemstone.gemfire.internal.cache.tier.sockets.MessageTooLargeException;
+ import com.gemstone.gemfire.internal.cache.wan.BatchException70;
+ import com.gemstone.gemfire.internal.logging.LogService;
+ import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
+ import com.gemstone.gemfire.security.AuthenticationRequiredException;
+ import com.gemstone.gemfire.security.GemFireSecurityException;
+
+ /**
+ * Called from the client and execute client to server
+ * requests against servers. Handles retrying to different servers,
+ * and marking servers dead if we get exception from them.
+ * @author dsmith
+ * @since 5.7
+ */
+ public class OpExecutorImpl implements ExecutablePool {
+ private static final Logger logger = LogService.getLogger();
+
+ private static final boolean TRY_SERVERS_ONCE = Boolean.getBoolean("gemfire.PoolImpl.TRY_SERVERS_ONCE");
+ private static final int TX_RETRY_ATTEMPT = Integer.getInteger("gemfire.txRetryAttempt", 500);
+
+ private final ConnectionManager connectionManager;
+ private final int retryAttempts;
+ private final long serverTimeout;
+ private final boolean threadLocalConnections;
+ private final ThreadLocal<Connection> localConnection = new ThreadLocal<Connection>();
+ /**
+ * maps serverLocations to Connections when threadLocalConnections is enabled with single-hop.
+ */
+ private final ThreadLocal<Map<ServerLocation, Connection>> localConnectionMap = new ThreadLocal<Map<ServerLocation,Connection>>();
+ private final EndpointManager endpointManager;
+ private final RegisterInterestTracker riTracker;
+ private final QueueManager queueManager;
+ private final CancelCriterion cancelCriterion;
+ private /*final*/ PoolImpl pool;
+ private final ThreadLocal<Boolean> serverAffinity = new ThreadLocal<Boolean>() {
+ @Override
+ protected Boolean initialValue() {
+ return Boolean.FALSE;};
+ };
+ private boolean serverAffinityFailover = false;
+ private final ThreadLocal<ServerLocation> affinityServerLocation = new ThreadLocal<ServerLocation>();
+ private final ThreadLocal<Integer> affinityRetryCount = new ThreadLocal<Integer>() {
+ protected Integer initialValue() {
+ return 0;
+ };
+ };
+
+ public OpExecutorImpl(ConnectionManager manager, QueueManager queueManager, EndpointManager endpointManager, RegisterInterestTracker riTracker, int retryAttempts,
+ long serverTimeout, boolean threadLocalConnections, CancelCriterion cancelCriterion, PoolImpl pool) {
+ this.connectionManager = manager;
+ this.queueManager = queueManager;
+ this.endpointManager = endpointManager;
+ this.riTracker = riTracker;
+ this.retryAttempts = retryAttempts;
+ this.serverTimeout = serverTimeout;
+ this.threadLocalConnections = threadLocalConnections;
+ this.cancelCriterion = cancelCriterion;
+ this.pool = pool;
+ }
+
+ public Object execute(Op op) {
+ return execute(op, retryAttempts);
+ }
+
+ public Object execute(Op op, int retries) {
+ if (this.serverAffinity.get()) {
+ ServerLocation loc = this.affinityServerLocation.get();
+ if (loc == null) {
+ loc = getNextOpServerLocation();
+ this.affinityServerLocation.set(loc);
+ if (logger.isDebugEnabled()) {
+ logger.debug("setting server affinity to {}", this.affinityServerLocation.get());
+ }
+ }
+ return executeWithServerAffinity(loc, op);
+ }
+ boolean success = false;
+
+ Set attemptedServers = new HashSet();
+
+ Connection conn = (Connection) (threadLocalConnections ? localConnection.get() : null);
+ if (conn == null || conn.isDestroyed()) {
+ conn = connectionManager.borrowConnection(serverTimeout);
+ }
+ else if (threadLocalConnections) {
+ //Fix for 43718. Clear the thread local connection
+ //while we're performing the op. It will be reset
+ //if the op succeeds.
+ localConnection.set(null);
+ try {
+ this.connectionManager.activate(conn);
+ }
+ catch (ConnectionDestroyedException ex) {
+ conn = connectionManager.borrowConnection(serverTimeout);
+ }
+ }
+ try {
+ for(int attempt = 0; true; attempt++) {
+ // when an op is retried we may need to try to recover the previous
+ // attempt's version stamp
+ if (attempt == 1 && (op instanceof AbstractOp)) {
+ AbstractOp absOp = (AbstractOp)op;
+ absOp.getMessage().setIsRetry();
+ }
+ try {
+ authenticateIfRequired(conn, op);
+ Object result = executeWithPossibleReAuthentication(conn, op);
+ success = true;
+ return result;
++ } catch (MessageTooLargeException e) {
++ throw new GemFireIOException("unable to transmit message to server", e);
+ }
+ catch (Exception e) {
+ //This method will throw an exception if we need to stop
+ //It also unsets the threadlocal connection and notifies
+ //the connection manager if there are failures.
+ handleException(e, conn, attempt, attempt >= retries && retries != -1);
+ attemptedServers.add(conn.getServer());
+ try {
+ conn = connectionManager.exchangeConnection(conn, attemptedServers, serverTimeout);
+ }
+ catch(NoAvailableServersException nse) {
+ //if retries is -1, don't try again after the last server has failed
+ if(retries == -1 || TRY_SERVERS_ONCE) {
+ handleException(e, conn, attempt, true);
+ }
+ else {
+ //try one of the failed servers again, until we exceed the retry attempts.
+ attemptedServers.clear();
+ try {
+ conn = connectionManager.exchangeConnection(conn, attemptedServers, serverTimeout);
+ }
+ catch(NoAvailableServersException nse2) {
+ handleException(e, conn, attempt, true);
+ }
+ }
+ }
+ }
+ }
+ } finally {
+ if(threadLocalConnections) {
+ this.connectionManager.passivate(conn, success);
+ //Fix for 43718. If the thread local was set to a different
+ //connection deeper in the call stack, return that connection
+ //and set our connection on the thread local.
+ Connection existingConnection = localConnection.get();
+ if(existingConnection != null && existingConnection != conn) {
+ connectionManager.returnConnection(existingConnection);
+ }
+
+ if(!conn.isDestroyed()) {
+ localConnection.set(conn);
+ } else {
+ localConnection.set(null);
+ }
+ } else {
+ connectionManager.returnConnection(conn);
+ }
+ }
+ }
+
+ /**
+ * execute the given op on the given server. If the server cannot
+ * be reached, sends a TXFailoverOp, then retries the given op
+ * @param loc the server to execute the op on
+ * @param op the op to execute
+ * @return the result of execution
+ */
+ private Object executeWithServerAffinity(ServerLocation loc, Op op) {
+ try {
+ Object retVal = executeOnServer(loc, op, true, false);
+ affinityRetryCount.set(0);
+ return retVal;
+ } catch (ServerConnectivityException e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("caught exception while executing with affinity:{}", e.getMessage(), e);
+ }
+ if (!this.serverAffinityFailover || e instanceof ServerOperationException) {
+ affinityRetryCount.set(0);
+ throw e;
+ }
+ int retryCount = affinityRetryCount.get();
+ if ((retryAttempts != -1 && retryCount >= retryAttempts) ||
+ retryCount > TX_RETRY_ATTEMPT) { // prevent stack overflow fixes bug 46535
+ affinityRetryCount.set(0);
+ throw e;
+ }
+ affinityRetryCount.set(retryCount + 1);
+ }
+ this.affinityServerLocation.set(null);
+ if (logger.isDebugEnabled()) {
+ logger.debug("reset server affinity: attempting txFailover");
+ }
+ // send TXFailoverOp, so that new server can
+ // do bootstrapping, then re-execute original op
+ AbstractOp absOp = (AbstractOp) op;
+ absOp.getMessage().setIsRetry();
+ int transactionId = absOp.getMessage().getTransactionId();
+ // for CommitOp we do not have transactionId in AbstractOp
+ // so set it explicitly for TXFailoverOp
+ try {
+ TXFailoverOp.execute(this.pool, transactionId);
+ } catch (TransactionException e) {
+ // If this is the first operation in the transaction then
+ // do not throw TransactionDataNodeHasDeparted back to the
+ // user, re-try the op instead. fixes bug 44375. NOTE: TXFailoverOp
+ // is sent even after first op, as it is not known if the first
+ // operation has established a TXState already
+ TXStateProxy txState = TXManagerImpl.getCurrentTXState();
+ if (txState == null) {
+ throw e;
+ } else if (txState.operationCount() > 1) {
+ throw e;
+ }
+ }
+ if(op instanceof ExecuteRegionFunctionOpImpl){
+ op = new ExecuteRegionFunctionOpImpl(
+ (ExecuteRegionFunctionOpImpl)op, (byte)1/*isReExecute*/, new HashSet<String>());
+ ((ExecuteRegionFunctionOpImpl)op).getMessage().setTransactionId(transactionId);
+ }else if (op instanceof ExecuteFunctionOpImpl){
+ op = new ExecuteFunctionOpImpl(
+ (ExecuteFunctionOpImpl)op, (byte)1/*isReExecute*/);
+ ((ExecuteFunctionOpImpl)op).getMessage().setTransactionId(transactionId);
+ }
+ return this.pool.execute(op);
+ }
+
+ public void setupServerAffinity(boolean allowFailover) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("setting up server affinity");
+ }
+ this.serverAffinityFailover = allowFailover;
+ this.serverAffinity.set(Boolean.TRUE);
+ }
+
+ public void releaseServerAffinity() {
+ if (logger.isDebugEnabled()) {
+ logger.debug("reset server affinity");
+ }
+ this.serverAffinity.set(Boolean.FALSE);
+ this.affinityServerLocation.set(null);
+ }
+
+ public ServerLocation getServerAffinityLocation() {
+ return this.affinityServerLocation.get();
+ }
+
+ public void setServerAffinityLocation(ServerLocation serverLocation) {
+ assert this.affinityServerLocation.get() == null;
+ this.affinityServerLocation.set(serverLocation);
+ }
+
+ public ServerLocation getNextOpServerLocation() {
+ ServerLocation retVal = null;
+ Connection conn = (Connection) (threadLocalConnections ? localConnection.get() : null);
+ if (conn == null || conn.isDestroyed()) {
+ conn = connectionManager.borrowConnection(serverTimeout);
+ retVal = conn.getServer();
+ this.connectionManager.returnConnection(conn);
+ } else {
+ retVal = conn.getServer();
+ }
+ return retVal;
+ }
+
+ /* (non-Javadoc)
+ * @see com.gemstone.gemfire.cache.client.internal.OpExecutor#executeOn(com.gemstone.gemfire.distributed.internal.ServerLocation, com.gemstone.gemfire.cache.client.internal.Op)
+ */
+ public Object executeOn(ServerLocation server, Op op) {
+ return executeOn(server, op, true,false);
+ }
+ public Object executeOn(ServerLocation p_server, Op op, boolean accessed,boolean onlyUseExistingCnx) {
+ ServerLocation server = p_server;
+ if (this.serverAffinity.get()) {
+ ServerLocation affinityserver = this.affinityServerLocation.get();
+ if (affinityserver != null) {
+ server = affinityserver;
+ } else {
+ this.affinityServerLocation.set(server);
+ }
+ // redirect to executeWithServerAffinity so that we
+ // can send a TXFailoverOp.
+ return executeWithServerAffinity(server, op);
+ }
+ return executeOnServer(server, op, accessed, onlyUseExistingCnx);
+ }
+ private Object executeOnServer(ServerLocation p_server, Op op, boolean accessed,boolean onlyUseExistingCnx) {
+ ServerLocation server = p_server;
+ boolean returnCnx = true;
+ boolean pingOp = (op instanceof PingOp.PingOpImpl);
+ Connection conn = null;
+ if (pingOp) {
+ // currently for pings we prefer to queue clientToServer cnx so that we will
+ // not create a pooled cnx when all we have is queue cnxs.
+ if (this.queueManager != null) {
+ // see if our QueueManager has a connection to this guy that we can send
+ // the ping on.
+ Endpoint ep = (Endpoint)this.endpointManager.getEndpointMap().get(server);
+ if (ep != null) {
+ QueueConnections qcs = this.queueManager.getAllConnectionsNoWait();
+ conn = qcs.getConnection(ep);
+ if (conn != null) {
+ // we found one to do the ping on
+ returnCnx = false;
+ }
+ }
+ }
+ }
+ if (conn == null) {
+ if (useThreadLocalConnection(op, pingOp)) {
+ // no need to set threadLocal to null while the op is in progress since
+ // 43718 does not impact single-hop
+ conn = getActivatedThreadLocalConnectionForSingleHop(server, onlyUseExistingCnx);
+ returnCnx = false;
+ } else {
+ conn = connectionManager.borrowConnection(server, serverTimeout,onlyUseExistingCnx);
+ }
+ }
+ boolean success = true;
+ try {
+ return executeWithPossibleReAuthentication(conn, op);
+ } catch (Exception e) {
+ success = false;
+ //This method will throw an exception if we need to stop
+ //It also unsets the threadlocal connection and notifies
+ //the connection manager if there are failures.
+ handleException(e, conn, 0, true);
+ //this shouldn't actually be reached, handle exception will throw something
+ throw new ServerConnectivityException("Received error connecting to server", e);
+ } finally {
+ if (this.serverAffinity.get() && this.affinityServerLocation.get() == null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("setting server affinity to {} server:{}", conn.getEndpoint().getMemberId(), conn.getServer());
+ }
+ this.affinityServerLocation.set(conn.getServer());
+ }
+ if (useThreadLocalConnection(op, pingOp)) {
+ this.connectionManager.passivate(conn, success);
+ setThreadLocalConnectionForSingleHop(server, conn);
+ }
+ if (returnCnx) {
+ connectionManager.returnConnection(conn, accessed);
+ }
+ }
+ }
+
+ private boolean useThreadLocalConnection(Op op, boolean pingOp) {
+ return threadLocalConnections && !pingOp && op.useThreadLocalConnection();
+ }
+
+ /**
+ * gets a connection to the given serverLocation either by looking up the threadLocal {@link #localConnectionMap}.
+ * If a connection does not exist (or has been destroyed) we borrow one from connectionManager.
+ * @return the activated connection
+ */
+ private Connection getActivatedThreadLocalConnectionForSingleHop(ServerLocation server, boolean onlyUseExistingCnx) {
+ assert threadLocalConnections;
+ Connection conn = null;
+ Map<ServerLocation, Connection> connMap = this.localConnectionMap.get();
+ if (connMap != null && !connMap.isEmpty()) {
+ conn = connMap.get(server);
+ }
+ boolean borrow = true;
+ if (conn != null) {
+ try {
+ this.connectionManager.activate(conn);
+ borrow = false;
+ if (!conn.getServer().equals(server)) {
+ // poolLoadConditioningMonitor can replace the connection's
+ // endpoint from underneath us. fixes bug 45151
+ borrow = true;
+ }
+ } catch (ConnectionDestroyedException e) {
+ }
+ }
+ if (conn == null || borrow) {
+ conn = connectionManager.borrowConnection(server, serverTimeout, onlyUseExistingCnx);
+ }
+ if (borrow && connMap != null) {
+ connMap.remove(server);
+ }
+ return conn;
+ }
+
+ /**
+ * initializes the threadLocal {@link #localConnectionMap} and adds mapping
+ * of serverLocation to Connection.
+ */
+ private void setThreadLocalConnectionForSingleHop(ServerLocation server,
+ Connection conn) {
+ assert threadLocalConnections;
+ Map<ServerLocation, Connection> connMap = this.localConnectionMap.get();
+ if (connMap == null) {
+ connMap = new HashMap<ServerLocation, Connection>();
+ this.localConnectionMap.set(connMap);
+ }
+ connMap.put(server, conn);
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see com.gemstone.gemfire.cache.client.internal.ExecutablePool#executeOnPrimary(com.gemstone.gemfire.cache.client.internal.Op)
+ */
+ public Object executeOnPrimary(Op op) {
+ if(queueManager == null) {
+ throw new SubscriptionNotEnabledException();
+ }
+
+ HashSet attemptedPrimaries = new HashSet();
+ while(true) {
+ Connection primary = queueManager.getAllConnections().getPrimary();
+ try {
+ return executeWithPossibleReAuthentication(primary, op);
+ } catch (Exception e) {
+ boolean finalAttempt = ! attemptedPrimaries.add(primary.getServer());
+ handleException(e, primary, 0, finalAttempt);
+ //we shouldn't reach this code, but just in case
+ if(finalAttempt) {
+ throw new ServerConnectivityException("Tried the same primary server twice.", e);
+ }
+ }
+ }
+ }
+
+ public void executeOnAllQueueServers(Op op) {
+ if(queueManager == null) {
+ throw new SubscriptionNotEnabledException();
+ }
+
+ RuntimeException lastException = null;
+
+ QueueConnections connections = queueManager.getAllConnectionsNoWait();
+ Connection primary = connections.getPrimary();
+ if(primary != null) {
+ try {
+ executeWithPossibleReAuthentication(primary, op);
+ } catch (Exception e) {
+ try {
+ handleException(e, primary, 0, false);
+ } catch(RuntimeException e2) {
+ lastException = e2;
+ }
+ }
+ }
+
+ List backups = connections.getBackups();
+ for(int i = 0; i < backups.size(); i++) {
+ Connection conn = (Connection) backups.get(i);
+ try {
+ executeWithPossibleReAuthentication(conn, op);
+ } catch (Exception e) {
+ try {
+ handleException(e, conn, 0, false);
+ } catch(RuntimeException e2) {
+ lastException = e2;
+ }
+ }
+ }
+
+ if (lastException != null) {
+ throw lastException;
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see com.gemstone.gemfire.cache.client.internal.ExecutablePool#executeOnAllQueueServers(com.gemstone.gemfire.cache.client.internal.Op)
+ */
+ public Object executeOnQueuesAndReturnPrimaryResult(Op op) {
+ if(queueManager == null) {
+ throw new SubscriptionNotEnabledException();
+ }
+ QueueConnections connections = queueManager.getAllConnections();
+
+ List backups = connections.getBackups();
+ if (logger.isTraceEnabled(LogMarker.BRIDGE_SERVER)) {
+ logger.trace(LogMarker.BRIDGE_SERVER, "sending {} to backups: {}", op, backups);
+ }
+ for(int i = backups.size() - 1; i >= 0; i--) {
+ Connection conn = (Connection) backups.get(i);
+ try {
+ executeWithPossibleReAuthentication(conn, op);
+ } catch (Exception e) {
+ handleException(e, conn, 0, false);
+ }
+ }
+
+ Connection primary = connections.getPrimary();
+ HashSet attemptedPrimaries = new HashSet();
+ while(true) {
+ try {
+ if (logger.isTraceEnabled(LogMarker.BRIDGE_SERVER)) {
+ logger.trace(LogMarker.BRIDGE_SERVER, "sending {} to primary: {}", op, primary);
+ }
+ return executeWithPossibleReAuthentication(primary, op);
+ } catch (Exception e) {
+ if (logger.isTraceEnabled(LogMarker.BRIDGE_SERVER)) {
+ logger.trace(LogMarker.BRIDGE_SERVER, "caught exception sending to primary {}", e.getMessage(), e);
+ }
+ boolean finalAttempt = !attemptedPrimaries.add(primary.getServer());
+ handleException(e, primary, 0, finalAttempt);
+ primary = queueManager.getAllConnections().getPrimary();
+ //we shouldn't reach this code, but just in case
+ if(finalAttempt) {
+ throw new ServerConnectivityException("Tried the same primary server twice.", e);
+ }
+ }
+ }
+ }
+
+ public void releaseThreadLocalConnection() {
+ Connection conn = localConnection.get();
+ localConnection.set(null);
+ if(conn != null) {
+ connectionManager.returnConnection(conn);
+ }
+ Map<ServerLocation, Connection> connMap = localConnectionMap.get();
+ localConnectionMap.set(null);
+ if (connMap != null) {
+ for (Connection c : connMap.values()) {
+ connectionManager.returnConnection(c);
+ }
+ }
+ }
+
+ /**
+ * Used by GatewayBatchOp
+ */
+ public Object executeOn(Connection conn, Op op, boolean timeoutFatal) {
+ try {
+ return executeWithPossibleReAuthentication(conn, op);
+ } catch (Exception e) {
+ //This method will throw an exception if we need to stop
+ //It also unsets the threadlocal connection and notifies
+ //the connection manager if there are failures.
+ handleException(op, e, conn, 0, true, timeoutFatal);
+ //this shouldn't actually be reached, handle exception will throw something
+ throw new ServerConnectivityException("Received error connecting to server", e);
+ }
+ }
+ /**
+ * This is used by unit tests
+ */
+ public Object executeOn(Connection conn, Op op) {
+ return executeOn(conn, op, false);
+ }
+
+ public RegisterInterestTracker getRITracker() {
+ return riTracker;
+ }
+
+ protected void handleException(Throwable e,
+ Connection conn,
+ int retryCount, boolean finalAttempt) {
+ handleException(e, conn, retryCount, finalAttempt, false/*timeoutFatal*/);
+ }
+
+ protected void handleException(Op op,
+ Throwable e,
+ Connection conn,
+ int retryCount,
+ boolean finalAttempt,
+ boolean timeoutFatal)
+ throws CacheRuntimeException {
+ if (op instanceof AuthenticateUserOp.AuthenticateUserOpImpl) {
+ if (e instanceof GemFireSecurityException) {
+ throw (GemFireSecurityException)e;
+ } else if (e instanceof ServerRefusedConnectionException) {
+ throw (ServerRefusedConnectionException)e;
+ }
+ }
+ handleException(e, conn, retryCount, finalAttempt, timeoutFatal);
+ }
+
+ protected void handleException(Throwable e,
+ Connection conn,
+ int retryCount,
+ boolean finalAttempt,
+ boolean timeoutFatal)
+ throws CacheRuntimeException
+ {
+ GemFireException exToThrow = null;
+ String title;
+ boolean invalidateServer = true;
+ boolean warn = true;
+ boolean forceThrow = false;
+ Throwable cause = e;
+
+ cancelCriterion.checkCancelInProgress(e);
+
+ if(logger.isDebugEnabled() && !(e instanceof java.io.EOFException)) {
+ if (e instanceof java.io.EOFException){
+ logger.debug("OpExecutor.handleException on Connection to {} found EOF", conn.getServer());
+ } else if (e instanceof java.net.SocketTimeoutException) {
+ logger.debug("OpExecutor.handleException on Connection to {} read timed out", conn.getServer());
+ } else {
+ logger.debug("OpExecutor.handleException on Connection to {}", conn.getServer(),e);
+ }
+ }
- if (e instanceof NotSerializableException) {
++
++ // first take care of all exceptions that should not invalidate the
++ // connection and do not need to be logged
++
++ if (e instanceof MessageTooLargeException) {
++ title = null;
++ exToThrow = new GemFireIOException("message is too large to transmit", e);
++ }
++ else if (e instanceof NotSerializableException) {
+ title = null; //no message
+ exToThrow = new SerializationException("Pool message failure", e);
+ }
+ else if (e instanceof BatchException || e instanceof BatchException70) {
+ title = null; //no message
+ exToThrow = new ServerOperationException(e);
+ }
+ else if (e instanceof RegionDestroyedException) {
+ invalidateServer = false;
+ title = null;
+ exToThrow =(RegionDestroyedException) e;
+ }
+ else if (e instanceof GemFireSecurityException) {
+ title = null;
+ exToThrow = new ServerOperationException(e);
+ }
+ else if (e instanceof SerializationException) {
+ title = null; // no message
+ exToThrow = new ServerOperationException(e);
+ }
+ else if (e instanceof CopyException) {
+ title = null; // no message
+ exToThrow = new ServerOperationException(e);
+ }
+ else if (e instanceof ClassNotFoundException) {
+ title = null; // no message
+ exToThrow = new ServerOperationException(e);
+ }
+ else if (e instanceof TransactionException) {
+ title = null; // no message
+ exToThrow = (TransactionException)e;
+ invalidateServer = false;
+ }
+ else if (e instanceof SynchronizationCommitConflictException) {
+ title = null;
+ exToThrow = (SynchronizationCommitConflictException)e;
+ invalidateServer = false;
+ }
+ else if (e instanceof SocketException) {
+ if ("Socket closed".equals(e.getMessage())
+ || "Connection reset".equals(e.getMessage())
+ || "Connection refused: connect".equals(e.getMessage())
+ || "Connection refused".equals(e.getMessage())) {
+ title = e.getMessage();
+ } else {
+ title = "SocketException";
+ }
+ }
+ else if (e instanceof SocketTimeoutException) {
+ invalidateServer = timeoutFatal;
+ title = "socket timed out on client";
+ cause = null;
+ }
+ else if (e instanceof ConnectionDestroyedException) {
+ invalidateServer = false;
+ title = "connection was asynchronously destroyed";
+ cause = null;
+ }
+ else if (e instanceof java.io.EOFException) {
+ /*
+ // it is still listening so make this into a timeout exception
+ invalidateServer = false;
+ title = "socket closed on server";
+ SocketTimeoutException ste = new SocketTimeoutException(title);
+ ste.setStackTrace(e.getStackTrace());
+ e = ste;
+ cause = null;
+ */
+
+ /*
+ * note: the old code in ConnectionProxyImpl used to create a new socket here to the server to determine if it really crashed.
+ * We may have to add this back in for some reason, but hopefully not.
+ *
+ * note 05/21/08: an attempt to address this was made by increasing the time waited on server before closing timeoutd clients
+ * see ServerConnection.hasBeenTimedOutOnClient
+ */
+ title = "closed socket on server";
+ }
+ else if (e instanceof IOException) {
+ title = "IOException";
+ }
+ else if (e instanceof BufferUnderflowException) {
+ title = "buffer underflow reading from server";
+ }
+ else if (e instanceof CancelException) {
+ title = "Cancelled";
+ warn = false;
+ }
+ else if (e instanceof InternalFunctionInvocationTargetException) {
+ //In this case, function will be re executed
+ title = null;
+ exToThrow = (InternalFunctionInvocationTargetException)e;
+ }
+ else if (e instanceof FunctionInvocationTargetException) {
+ //in this case function will not be re executed
+ title = null;
+ exToThrow = (GemFireException)e;
+ }
+ else if (e instanceof PutAllPartialResultException) {
+ title = null;
+ exToThrow =(PutAllPartialResultException) e;
+ invalidateServer = false;
+ }
+ else {
+ Throwable t = e.getCause();
+ if ((t instanceof ConnectException)
+ || (t instanceof SocketException)
+ || (t instanceof SocketTimeoutException)
+ || (t instanceof IOException)
+ || (t instanceof SerializationException)
+ || (t instanceof CopyException)
+ || (t instanceof GemFireSecurityException)
+ || (t instanceof ServerOperationException)
+ || (t instanceof TransactionException)
+ || (t instanceof CancelException)) {
+ handleException(t, conn, retryCount, finalAttempt, timeoutFatal);
+ return;
+ } else if (e instanceof ServerOperationException) {
+ title = null; // no message
+ exToThrow = (ServerOperationException)e;
+ invalidateServer = false; // fix for bug #42225
+ }
+ else if (e instanceof FunctionException) {
+ if (t instanceof InternalFunctionInvocationTargetException) {
+ // Client server to re-execute for node failure
+ handleException(t, conn, retryCount, finalAttempt, timeoutFatal);
+ return;
+ }
+ else {
+ title = null; // no message
+ exToThrow = (FunctionException)e;
+ }
+ } else if (e instanceof ServerConnectivityException
+ && e.getMessage()
+ .equals("Connection error while authenticating user")) {
+ title = null;
+ if (logger.isDebugEnabled()) {
+ logger.debug(e.getMessage(), e);
+ }
+ } else {
+ title = e.toString();
+ forceThrow = true;
+ }
+ }
+ if (title != null) {
+ conn.destroy();
+ if(invalidateServer) {
+ endpointManager.serverCrashed(conn.getEndpoint());
+ }
+ boolean logEnabled = warn ? logger.isWarnEnabled() : logger.isDebugEnabled();
+ boolean msgNeeded = logEnabled || finalAttempt;
+ if (msgNeeded) {
+ final StringBuffer sb = getExceptionMessage(title, retryCount, finalAttempt, conn, e);
+ final String msg = sb.toString();
+ if (logEnabled) {
+ if (warn) {
+ logger.warn(msg /*, e*/);
+ } else {
+ logger.debug(msg /*, e*/);
+ }
+ }
+ if (forceThrow || finalAttempt) {
+ exToThrow = new ServerConnectivityException(msg, cause);
+ }
+ }
+ }
+ if (exToThrow != null) {
+ throw exToThrow;
+ }
+ }
+
+ private StringBuffer getExceptionMessage(String exceptionName,
+ int retryCount,
+ boolean finalAttempt,
+ Connection connection,
+ Throwable ex) {
+ StringBuffer message = new StringBuffer(200);
+ message
+ .append("Pool unexpected ")
+ .append(exceptionName);
+ if (connection != null) {
+ message
+ .append(" connection=")
+ .append(connection);
+ }
+ if (retryCount > 0) {
+ message
+ .append(" attempt=")
+ .append(retryCount+1);
+ }
+ message.append(')');
+ if (finalAttempt) {
+ message
+ .append(". Server unreachable: could not connect after ")
+ .append(retryCount+1)
+ .append(" attempts");
+ }
+ return message;
+ }
+
+ public Connection getThreadLocalConnection() {
+ return localConnection.get();
+ }
+
+ public void setThreadLocalConnection(Connection conn) {
+ localConnection.set(conn);
+ }
+
+ private void authenticateIfRequired(Connection conn, Op op) {
+ if (!conn.getServer().getRequiresCredentials()) {
+ return;
+ }
+
+ if (this.pool == null) {
+ PoolImpl poolImpl = (PoolImpl)PoolManagerImpl.getPMI().find(
+ this.endpointManager.getPoolName());
+ if (poolImpl == null) {
+ return;
+ }
+ this.pool = poolImpl;
+ }
+ if (this.pool.getMultiuserAuthentication()) {
+ if (((AbstractOp)op).needsUserId()) {
+ UserAttributes ua = UserAttributes.userAttributes.get();
+ if (ua != null) {
+ if (!ua.getServerToId().containsKey(conn.getServer())) {
+ authenticateMultiuser(this.pool, conn, ua);
+ }
+ } else {
+ // This should never be reached.
+ }
+ }
+ } else if (((AbstractOp)op).needsUserId()) {
+ // This should not be reached, but keeping this code here in case it is
+ // reached.
+ if (conn.getServer().getUserId() == -1) {
+ Connection connImpl = this.connectionManager.getConnection(conn);
+ conn.getServer().setUserId(
+ (Long)AuthenticateUserOp.executeOn(connImpl, this.pool));
+ if (logger.isDebugEnabled()) {
+ logger.debug("OpExecutorImpl.execute() - single user mode - authenticated this user on {}", conn);
+ }
+ }
+ }
+ }
+
+ private void authenticateMultiuser(PoolImpl pool, Connection conn,
+ UserAttributes ua) {
+ try {
+ Long userId = (Long)AuthenticateUserOp.executeOn(conn.getServer(),
+ pool, ua.getCredentials());
+ if (userId != null) {
+ ua.setServerToId(conn.getServer(), userId);
+ if (logger.isDebugEnabled()) {
+ logger.debug("OpExecutorImpl.execute() - multiuser mode - authenticated this user on {}", conn);
+ }
+ }
+ } catch (ServerConnectivityException sce) {
+ Throwable cause = sce.getCause();
+ if (cause instanceof SocketException
+ || cause instanceof EOFException
+ || cause instanceof IOException
+ || cause instanceof BufferUnderflowException
+ || cause instanceof CancelException
+ || (sce.getMessage() != null && (sce.getMessage().indexOf(
+ "Could not create a new connection to server") != -1
+ || sce.getMessage().indexOf("socket timed out on client") != -1 || sce
+ .getMessage().indexOf(
+ "connection was asynchronously destroyed") != -1))) {
+ throw new ServerConnectivityException(
+ "Connection error while authenticating user");
+ } else {
+ throw sce;
+ }
+ }
+ }
+
+ private Object executeWithPossibleReAuthentication(Connection conn, Op op)
+ throws Exception {
+ try {
+ return conn.execute(op);
+
+ } catch (ServerConnectivityException sce) {
+ Throwable cause = sce.getCause();
+ if ((cause instanceof AuthenticationRequiredException
+ && "User authorization attributes not found.".equals(cause
+ .getMessage()))
+ || sce.getMessage().contains(
+ "Connection error while authenticating user")) {
+ // (ashetkar) Need a cleaner way of doing above check.
+ // 2nd exception-message above is from AbstractOp.sendMessage()
+
+ PoolImpl pool = (PoolImpl)PoolManagerImpl.getPMI().find(
+ this.endpointManager.getPoolName());
+ if (!pool.getMultiuserAuthentication()) {
+ Connection connImpl = this.connectionManager.getConnection(conn);
+ conn.getServer().setUserId(
+ (Long)AuthenticateUserOp.executeOn(connImpl, this));
+ return conn.execute(op);
+ } else {
+ UserAttributes ua = UserAttributes.userAttributes.get();
+ if (ua != null) {
+ authenticateMultiuser(pool, conn, ua);
+ }
+ return conn.execute(op);
+ }
+
+ } else {
+ throw sce;
+ }
+ }
+ }
+
+ }
+
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/main/java/com/gemstone/gemfire/cache/operations/internal/GetOperationContextImpl.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/cache/operations/internal/GetOperationContextImpl.java
index 0000000,d973cd7..11d9248
mode 000000,100644..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/operations/internal/GetOperationContextImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/operations/internal/GetOperationContextImpl.java
@@@ -1,0 -1,124 +1,124 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package com.gemstone.gemfire.cache.operations.internal;
+
+ import com.gemstone.gemfire.SerializationException;
+ import com.gemstone.gemfire.cache.operations.GetOperationContext;
-import com.gemstone.gemfire.internal.offheap.Chunk;
++import com.gemstone.gemfire.internal.offheap.ObjectChunk;
+ import com.gemstone.gemfire.internal.offheap.Releasable;
+ import com.gemstone.gemfire.internal.offheap.StoredObject;
+ import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
+
+ /**
+ * This subclass's job is to keep customers from getting a reference to a value
+ * that is off-heap. Any access to an off-heap value should appear to the customer
+ * as a serialized value.
+ *
+ * @author dschneider
+ *
+ */
+ public class GetOperationContextImpl extends GetOperationContext implements Releasable {
+
+ private boolean released;
+
+ public GetOperationContextImpl(Object key, boolean postOperation) {
+ super(key, postOperation);
+ }
+
+ /**
+ * This method is for internal use and should not be on the public apis.
+ */
+ public @Unretained Object getRawValue() {
+ return super.getValue();
+ }
+
+ @Override
+ public Object getObject() {
+ Object result = super.getObject();
+ if (result instanceof StoredObject) {
+ // For off-heap object act as if they are serialized forcing them to call getSerializedValue or getValue
+ result = null;
+ }
+ return result;
+ }
+
+ @Override
+ public void setObject(Object value, boolean isObject) {
+ this.released = false;
+ super.setObject(value, isObject);
+ }
+
+ @Override
+ public void setValue(Object value, boolean isObject) {
+ this.released = false;
+ super.setValue(value, isObject);
+ }
+
+ private void checkForReleasedOffHeapValue(Object v) {
+ // Note that we only care about Chunk (instead of all StoredObject) because it is the only one using a refcount
- if (this.released && v instanceof Chunk) {
++ if (this.released && v instanceof ObjectChunk) {
+ throw new IllegalStateException("Attempt to access off-heap value after the OperationContext callback returned.");
+ }
+ }
+
+ @Override
+ public byte[] getSerializedValue() {
+ byte[] result = super.getSerializedValue();
+ if (result == null) {
+ Object v = super.getValue();
+ if (v instanceof StoredObject) {
+ checkForReleasedOffHeapValue(v);
+ result = ((StoredObject) v).getValueAsHeapByteArray();
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public Object getDeserializedValue() throws SerializationException {
+ Object result = super.getDeserializedValue();
+ if (result instanceof StoredObject) {
+ checkForReleasedOffHeapValue(result);
+ result = ((StoredObject) result).getValueAsDeserializedHeapObject();
+ }
+ return result;
+ }
+
+ @Override
+ public Object getValue() {
+ Object result = super.getValue();
+ if (result instanceof StoredObject) {
+ checkForReleasedOffHeapValue(result);
+ // since they called getValue they don't care if it is serialized or deserialized so return it as serialized
+ result = ((StoredObject) result).getValueAsHeapByteArray();
+ }
+ return result;
+ }
+
+ @Override
+ public void release() {
+ // Note that if the context's value is stored off-heap
+ // and release has been called then we do not release
+ // our value (since this context did not retain it)
+ // but we do make sure that any future attempt to access
+ // the off-heap value fails.
- if (super.getValue() instanceof Chunk) {
++ if (super.getValue() instanceof ObjectChunk) {
+ this.released = true;
+ }
+ }
+
+ }