You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by rv...@apache.org on 2015/04/28 23:40:17 UTC
[12/51] [partial] incubator-geode git commit: Init
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ContainsKeyOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ContainsKeyOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ContainsKeyOp.java
new file mode 100644
index 0000000..b3fdbc0
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ContainsKeyOp.java
@@ -0,0 +1,93 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+
+/**
+ * Does a region containsKey on a server
+ * @author darrel
+ * @since 5.7
+ */
+public class ContainsKeyOp {
+ /**
+ * Does a region entry containsKey on a server using connections from the given pool
+ * to communicate with the server.
+ * @param pool the pool to use to communicate with the server.
+ * @param region the name of the region to do the entry containsKey on
+ * @param key the entry key to do the containsKey on
+ * @return the result of invoking containsKey on the server
+ */
+ public static boolean execute(ExecutablePool pool,
+ String region,
+ Object key,
+ MODE mode)
+ {
+ AbstractOp op = new ContainsKeyOpImpl(region, key, mode);
+ Boolean result = (Boolean)pool.execute(op);
+ return result.booleanValue();
+ }
+
+ private ContainsKeyOp() {
+ // no instances allowed
+ }
+
+ private static class ContainsKeyOpImpl extends AbstractOp {
+
+ private String region;
+ private Object key;
+ private final MODE mode;
+
+ /**
+ * @throws com.gemstone.gemfire.SerializationException if serialization fails
+ */
+ public ContainsKeyOpImpl(String region,
+ Object key,
+ MODE mode) {
+ super(MessageType.CONTAINS_KEY, 3);
+ getMessage().addStringPart(region);
+ getMessage().addStringOrObjPart(key);
+ getMessage().addIntPart(mode.ordinal());
+ this.region = region;
+ this.key = key;
+ this.mode = mode;
+ }
+ @Override
+ protected Object processResponse(Message msg) throws Exception {
+ return processObjResponse(msg, "containsKey");
+ }
+ @Override
+ protected boolean isErrorResponse(int msgType) {
+ return msgType == MessageType.CONTAINS_KEY_DATA_ERROR;
+ }
+ @Override
+ protected long startAttempt(ConnectionStats stats) {
+ return stats.startContainsKey();
+ }
+ @Override
+ protected void endSendAttempt(ConnectionStats stats, long start) {
+ stats.endContainsKeySend(start, hasFailed());
+ }
+ @Override
+ protected void endAttempt(ConnectionStats stats, long start) {
+ stats.endContainsKey(start, hasTimedOut(), hasFailed());
+ }
+
+ @Override
+ public String toString() {
+ return "ContainsKeyOp(region=" + region + ";key=" + key+";mode="+mode;
+ }
+ }
+
+ public enum MODE {
+ KEY,
+ VALUE_FOR_KEY,
+ VALUE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/DataSerializerRecoveryListener.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/DataSerializerRecoveryListener.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/DataSerializerRecoveryListener.java
new file mode 100644
index 0000000..66cf256
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/DataSerializerRecoveryListener.java
@@ -0,0 +1,144 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.CancelException;
+import com.gemstone.gemfire.cache.client.internal.PoolImpl.PoolTask;
+import com.gemstone.gemfire.internal.InternalDataSerializer;
+import com.gemstone.gemfire.internal.InternalDataSerializer.SerializerAttributesHolder;
+import com.gemstone.gemfire.internal.cache.EventID;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+
+public class DataSerializerRecoveryListener extends EndpointManager.EndpointListenerAdapter {
+ private static final Logger logger = LogService.getLogger();
+
+ private final AtomicInteger endpointCount = new AtomicInteger();
+ protected final InternalPool pool;
+ protected final ScheduledExecutorService background;
+ protected final long pingInterval;
+ protected final Object recoveryScheduledLock = new Object();
+ protected boolean recoveryScheduled;
+
+ public DataSerializerRecoveryListener(ScheduledExecutorService background, InternalPool pool) {
+ this.pool = pool;
+ this.pingInterval = pool.getPingInterval();
+ this.background = background;
+ }
+
+ @Override
+ public void endpointCrashed(Endpoint endpoint) {
+ int count = endpointCount.decrementAndGet();
+ if(logger.isDebugEnabled()) {
+ logger.debug("DataSerializerRecoveryTask - EndpointCrashed. Now have {} endpoints", count);
+ }
+ }
+
+ @Override
+ public void endpointNoLongerInUse(Endpoint endpoint) {
+ int count = endpointCount.decrementAndGet();
+ if(logger.isDebugEnabled()) {
+ logger.debug("DataSerializerRecoveryTask - EndpointNoLongerInUse. Now have {} endpoints", count);
+ }
+ }
+
+ @Override
+ public void endpointNowInUse(Endpoint endpoint) {
+ int count = endpointCount.incrementAndGet();
+ if(logger.isDebugEnabled()) {
+ logger.debug("DataSerializerRecoveryTask - EndpointNowInUse. Now have {} endpoints", count);
+ }
+ if(count == 1) {
+ synchronized(recoveryScheduledLock) {
+ if(!recoveryScheduled) {
+ try {
+ recoveryScheduled = true;
+ background.execute(new RecoveryTask());
+ logger.debug("DataSerializerRecoveryTask - Scheduled Recovery Task");
+ } catch(RejectedExecutionException e) {
+ //ignore, the timer has been cancelled, which means we're shutting down.
+ }
+ }
+ }
+ }
+ }
+
+ protected class RecoveryTask extends PoolTask {
+
+ @Override
+ public void run2() {
+ if(pool.getCancelCriterion().cancelInProgress() != null) {
+ return;
+ }
+ synchronized(recoveryScheduledLock) {
+ recoveryScheduled = false;
+ }
+ logger.debug("DataSerializerRecoveryTask - Attempting to recover dataSerializers");
+ SerializerAttributesHolder[] holders= InternalDataSerializer.getSerializersForDistribution();
+ if(holders.length == 0) {
+ return;
+ }
+ EventID eventId = InternalDataSerializer.generateEventId();
+ //Fix for bug:40930
+ if (eventId == null) {
+ try {
+ background.schedule(new RecoveryTask(), pingInterval,
+ TimeUnit.MILLISECONDS);
+ recoveryScheduled = true;
+ } catch (RejectedExecutionException e) {
+ pool.getCancelCriterion().checkCancelInProgress(e);
+ throw e;
+ }
+ }
+ else {
+ try {
+ RegisterDataSerializersOp.execute(pool, holders, eventId);
+ }
+ catch (CancelException e) {
+ throw e;
+ }
+ catch (RejectedExecutionException e) {
+ // This is probably because we've started to shut down.
+ pool.getCancelCriterion().checkCancelInProgress(e);
+ throw e; // weird
+ }
+ catch(Exception e) {
+ pool.getCancelCriterion().checkCancelInProgress(e);
+
+ // If ClassNotFoundException occurred on server, don't retry
+ Throwable cause = e.getCause();
+ boolean cnfException = false;
+ if (cause instanceof ClassNotFoundException) {
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.DataSerializerRecoveryListener_ERROR_CLASSNOTFOUNDEXCEPTION,
+ cause.getMessage()));
+ cnfException = true;
+ }
+
+ if(!recoveryScheduled && !cnfException) {
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.DataSerializerRecoveryListener_ERROR_RECOVERING_DATASERIALIZERS),
+ e);
+ background.schedule(new RecoveryTask(), pingInterval, TimeUnit.MILLISECONDS);
+ recoveryScheduled = true;
+ }
+ } finally {
+ pool.releaseThreadLocalConnection();
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/DestroyOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/DestroyOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/DestroyOp.java
new file mode 100644
index 0000000..7dc0eaa
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/DestroyOp.java
@@ -0,0 +1,282 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.CacheClosedException;
+import com.gemstone.gemfire.cache.EntryNotFoundException;
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.cache.client.AllConnectionsInUseException;
+import com.gemstone.gemfire.cache.client.ServerConnectivityException;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.cache.util.BridgeWriterException;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.cache.EntryEventImpl;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
+import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+/**
+ * Does a region destroy on a server
+ * @author darrel
+ * @since 5.7
+ */
+public class DestroyOp {
+
+ private static final Logger logger = LogService.getLogger();
+
+ public static final int HAS_VERSION_TAG = 0x01;
+
+ public static final int HAS_ENTRY_NOT_FOUND_PART = 0x02;
+
+ /**
+ * Does a region entry destroy on a server using connections from the given pool
+ * to communicate with the server.
+ * @param pool the pool to use to communicate with the server.
+ * @param region the region to do the entry destroy on
+ * @param key the entry key to do the destroy on
+ * @param event the event for this destroy operation
+ * @param callbackArg an optional callback arg to pass to any cache callbacks
+ */
+ public static Object execute(ExecutablePool pool, LocalRegion region,
+ Object key,
+ Object expectedOldValue, Operation operation,
+ EntryEventImpl event, Object callbackArg,
+ boolean prSingleHopEnabled) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Preparing DestroyOp for {} operation={}", key, operation);
+ }
+ AbstractOp op = new DestroyOpImpl(region, key, expectedOldValue,
+ operation, event, callbackArg, prSingleHopEnabled);
+ if (prSingleHopEnabled) {
+ ClientMetadataService cms = region.getCache()
+ .getClientMetadataService();
+ ServerLocation server = cms.getBucketServerLocation(region,
+ Operation.DESTROY, key, null, callbackArg);
+ if (server != null) {
+ try {
+ PoolImpl poolImpl = (PoolImpl)pool;
+ boolean onlyUseExistingCnx = ((poolImpl.getMaxConnections() != -1 && poolImpl
+ .getConnectionCount() >= poolImpl.getMaxConnections()) ? true
+ : false);
+ return pool.executeOn(server, op, true, onlyUseExistingCnx);
+ }
+ catch (AllConnectionsInUseException e) {
+ }
+ catch (ServerConnectivityException e) {
+ if (e instanceof ServerOperationException) {
+ throw e; // fixed 44656
+ }
+ cms.removeBucketServerLocation(server);
+ }
+ catch (BridgeWriterException e) {
+ if (e.getCause() instanceof ServerConnectivityException)
+ cms.removeBucketServerLocation(server);
+ }
+ }
+ }
+ return pool.execute(op);
+ }
+
+ /**
+ * Does a region entry destroy on a server using the given connection to
+ * communicate with the server.
+ *
+ * @param con
+ * the connection to use to send to the server
+ * @param pool
+ * the pool to use to communicate with the server.
+ * @param region
+ * the region to do the entry destroy on
+ * @param key
+ * the entry key to do the destroy on
+ * @param event
+ * the event for this destroy operation
+ * @param callbackArg
+ * an optional callback arg to pass to any cache callbacks
+ */
+ public static void execute(Connection con,
+ ExecutablePool pool,
+ String region,
+ Object key,
+ Object expectedOldValue,
+ Operation operation,
+ EntryEventImpl event,
+ Object callbackArg)
+ {
+ AbstractOp op = new DestroyOpImpl(region, key, expectedOldValue,
+ operation, event, callbackArg);
+ pool.executeOn(con, op);
+ }
+
+ /** this is set if a response is received indicating that the entry was not found on the server */
+ public static boolean TEST_HOOK_ENTRY_NOT_FOUND;
+
+ private DestroyOp() {
+ // no instances allowed
+ }
+
+ private static class DestroyOpImpl extends AbstractOp {
+
+ Object key = null;
+
+ private LocalRegion region;
+
+ private Operation operation;
+
+ private boolean prSingleHopEnabled = false;
+
+ private Object callbackArg;
+
+ private EntryEventImpl event;
+
+ /**
+ * @throws com.gemstone.gemfire.SerializationException if serialization fails
+ */
+ public DestroyOpImpl(LocalRegion region,
+ Object key,
+ Object expectedOldValue,
+ Operation operation,
+ EntryEventImpl event,
+ Object callbackArg,
+ boolean prSingleHopEnabled) {
+ super(MessageType.DESTROY, callbackArg != null ? 6 : 5);
+ this.key = key;
+ this.region = region ;
+ this.operation = operation;
+ this.prSingleHopEnabled = prSingleHopEnabled;
+ this.callbackArg = callbackArg ;
+ this.event = event;
+ getMessage().addStringPart(region.getFullPath());
+ getMessage().addStringOrObjPart(key);
+ getMessage().addObjPart(expectedOldValue);
+ getMessage().addObjPart(operation==Operation.DESTROY? null : operation); // server interprets null as DESTROY
+ getMessage().addBytesPart(event.getEventId().calcBytes());
+ if (callbackArg != null) {
+ getMessage().addObjPart(callbackArg);
+ }
+ }
+
+
+ public DestroyOpImpl(String region, Object key, Object expectedOldValue,
+ Operation operation, EntryEventImpl event,
+ Object callbackArg) {
+ super(MessageType.DESTROY, callbackArg != null ? 6 : 5);
+ this.key = key;
+ this.event = event;
+ getMessage().addStringPart(region);
+ getMessage().addStringOrObjPart(key);
+ getMessage().addObjPart(expectedOldValue);
+ getMessage().addObjPart(operation==Operation.DESTROY? null : operation); // server interprets null as DESTROY
+ getMessage().addBytesPart(event.getEventId().calcBytes());
+ if (callbackArg != null) {
+ getMessage().addObjPart(callbackArg);
+ }
+ }
+
+ @Override
+ protected Object processResponse(Message msg) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+
+ @Override
+ protected Object processResponse(Message msg, Connection con) throws Exception {
+ processAck(msg, "destroy");
+ boolean isReply = (msg.getMessageType() == MessageType.REPLY);
+ int partIdx = 0;
+ int flags = 0;
+ if (isReply) {
+ flags = msg.getPart(partIdx++).getInt();
+ if ((flags & HAS_VERSION_TAG) != 0) {
+ VersionTag tag = (VersionTag)msg.getPart(partIdx++).getObject();
+ // we use the client's ID since we apparently don't track the server's ID in connections
+ tag.replaceNullIDs((InternalDistributedMember) con.getEndpoint().getMemberId());
+ this.event.setVersionTag(tag);
+ if (logger.isDebugEnabled()) {
+ logger.debug("received Destroy response with {}", tag);
+ }
+ } else if (logger.isDebugEnabled()) {
+ logger.debug("received Destroy response with no version tag");
+ }
+ }
+ if (prSingleHopEnabled) {
+ byte version = 0 ;
+// if (log.fineEnabled()) {
+// log.fine("reading prSingleHop part #" + (partIdx+1));
+// }
+ Part part = msg.getPart(partIdx++);
+ byte[] bytesReceived = part.getSerializedForm();
+ if (bytesReceived[0] != ClientMetadataService.INITIAL_VERSION
+ && bytesReceived.length == ClientMetadataService.SIZE_BYTES_ARRAY_RECEIVED) {
+ if (this.region != null) {
+ ClientMetadataService cms = null;
+ try {
+ cms = region.getCache().getClientMetadataService();
+ version = cms.getMetaDataVersion(region, Operation.UPDATE,
+ key, null, callbackArg);
+ }
+ catch (CacheClosedException e) {
+ return null;
+ }
+ if (bytesReceived[0] != version) {
+ cms.scheduleGetPRMetaData(region, false,bytesReceived[1]);
+ }
+ }
+ }
+ } else {
+ partIdx++; // skip OK byte
+ }
+ boolean entryNotFound = false;
+ if (msg.getMessageType() == MessageType.REPLY && (flags & HAS_ENTRY_NOT_FOUND_PART) != 0) {
+ entryNotFound = (msg.getPart(partIdx++).getInt() == 1);
+ if (logger.isDebugEnabled() && (flags & HAS_ENTRY_NOT_FOUND_PART) != 0) {
+ logger.debug("destroy response has entryNotFound={}", entryNotFound);
+ }
+ if (entryNotFound) {
+ TEST_HOOK_ENTRY_NOT_FOUND = true;
+ }
+ }
+ if (this.operation == Operation.REMOVE && entryNotFound) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("received REMOVE response from server with entryNotFound={}", entryNotFound);
+ }
+ return new EntryNotFoundException(LocalizedStrings.AbstractRegionMap_ENTRY_NOT_FOUND_WITH_EXPECTED_VALUE.toLocalizedString());
+ }
+ return null;
+ }
+
+ @Override
+ protected boolean isErrorResponse(int msgType) {
+ return msgType == MessageType.DESTROY_DATA_ERROR;
+ }
+
+ @Override
+ protected long startAttempt(ConnectionStats stats) {
+ return stats.startDestroy();
+ }
+ @Override
+ protected void endSendAttempt(ConnectionStats stats, long start) {
+ stats.endDestroySend(start, hasFailed());
+ }
+ @Override
+ protected void endAttempt(ConnectionStats stats, long start) {
+ stats.endDestroy(start, hasTimedOut(), hasFailed());
+ }
+ @Override
+ public String toString() {
+ return "DestroyOp:"+key;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/DestroyRegionOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/DestroyRegionOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/DestroyRegionOp.java
new file mode 100644
index 0000000..f1e3cd9
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/DestroyRegionOp.java
@@ -0,0 +1,95 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.EventID;
+
+/**
+ * Does a region destroyRegion (or create) on a server
+ * @author darrel
+ * @since 5.7
+ */
+public class DestroyRegionOp {
+ /**
+ * Does a region destroyRegion on a server using connections from the given pool
+ * to communicate with the server.
+ * @param pool the pool to use to communicate with the server.
+ * @param region the name of the region to do the destroyRegion on
+ * @param eventId the event id for this destroyRegion
+ * @param callbackArg an optional callback arg to pass to any cache callbacks
+ */
+ public static void execute(ExecutablePool pool,
+ String region,
+ EventID eventId,
+ Object callbackArg)
+ {
+ AbstractOp op = new DestroyRegionOpImpl(region, eventId, callbackArg);
+ pool.execute(op);
+ }
+ /**
+ * Does a region destroyRegion on a server using the given connection
+ * to communicate with the server.
+ * @param con the connection to use to send to the server
+ * @param pool the pool to use to communicate with the server.
+ * @param region the name of the region to do the destroyRegion on
+ * @param eventId the event id for this destroyRegion
+ * @param callbackArg an optional callback arg to pass to any cache callbacks
+ */
+ public static void execute(Connection con,
+ ExecutablePool pool,
+ String region,
+ EventID eventId,
+ Object callbackArg)
+ {
+ AbstractOp op = new DestroyRegionOpImpl(region, eventId, callbackArg);
+ pool.executeOn(con, op);
+ }
+
+ private DestroyRegionOp() {
+ // no instances allowed
+ }
+
+ private static class DestroyRegionOpImpl extends AbstractOp {
+ /**
+ * @throws com.gemstone.gemfire.SerializationException if serialization fails
+ */
+ public DestroyRegionOpImpl(String region,
+ EventID eventId,
+ Object callbackArg) {
+ super(MessageType.DESTROY_REGION, callbackArg != null ? 3 : 2);
+ getMessage().addStringPart(region);
+ getMessage().addBytesPart(eventId.calcBytes());
+ if (callbackArg != null) {
+ getMessage().addObjPart(callbackArg);
+ }
+ }
+ @Override
+ protected Object processResponse(Message msg) throws Exception {
+ processAck(msg, "destroyRegion");
+ return null;
+ }
+ @Override
+ protected boolean isErrorResponse(int msgType) {
+ return msgType == MessageType.DESTROY_REGION_DATA_ERROR;
+ }
+ @Override
+ protected long startAttempt(ConnectionStats stats) {
+ return stats.startDestroyRegion();
+ }
+ @Override
+ protected void endSendAttempt(ConnectionStats stats, long start) {
+ stats.endDestroyRegionSend(start, hasFailed());
+ }
+ @Override
+ protected void endAttempt(ConnectionStats stats, long start) {
+ stats.endDestroyRegion(start, hasTimedOut(), hasFailed());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/Endpoint.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/Endpoint.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/Endpoint.java
new file mode 100644
index 0000000..25518a3
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/Endpoint.java
@@ -0,0 +1,102 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.DistributedSystem;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+
+/**
+ * Represents a server. Keeps track of information about the specific server
+ * @author dsmith
+ * @since 5.7
+ *
+ */
+public class Endpoint {
+
+ private AtomicLong lastExecute = new AtomicLong();
+ private AtomicInteger references = new AtomicInteger();
+ private final ServerLocation location;
+ private final ConnectionStats stats;
+ private final EndpointManagerImpl manager;
+ private final DistributedMember memberId;
+ private volatile boolean closed;
+
+ Endpoint(EndpointManagerImpl endpointManager, DistributedSystem ds,
+ ServerLocation location, ConnectionStats stats,
+ DistributedMember memberId) {
+ this.manager =endpointManager;
+ this.location = location;
+ this.stats = stats;
+ this.memberId = memberId;
+ updateLastExecute();
+ }
+
+ public void updateLastExecute() {
+ this.lastExecute.set(System.nanoTime());
+ }
+
+ private long getLastExecute() {
+ return lastExecute.get();
+ }
+
+ public boolean timeToPing(long pingIntervalNanos) {
+ long now = System.nanoTime();
+ return getLastExecute() <= (now - pingIntervalNanos);
+ }
+
+ public void close() {
+ if(!closed) {
+ closed = true;
+ }
+ }
+
+ public boolean isClosed() {
+ return closed == true;
+ }
+
+ public ConnectionStats getStats() {
+ return stats;
+ }
+
+ void addReference() {
+ references.incrementAndGet();
+ }
+
+ /**
+ * @return true if this was the last reference to the endpoint
+ */
+ public boolean removeReference() {
+ boolean lastReference = (references.decrementAndGet() <= 0);
+ if(lastReference) {
+ manager.endpointNotInUse(this);
+ }
+ return lastReference;
+ }
+
+ /**
+ * @return the location of this server
+ */
+ public ServerLocation getLocation() {
+ return location;
+ }
+
+ @Override
+ public String toString() {
+ return location.toString();
+ }
+
+ public DistributedMember getMemberId() {
+ return memberId;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/EndpointManager.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/EndpointManager.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/EndpointManager.java
new file mode 100644
index 0000000..5d4a1f2
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/EndpointManager.java
@@ -0,0 +1,91 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.Map;
+
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+
+/**
+ * The endpoint manager keeps track of which servers we
+ * are connected to. Other parts of the client code can register
+ * listeners that will be notified about when endpoints are created
+ * or died. For example the connection manager registers a listener
+ * to be notified if a server dies and closes all of it's connections.
+ * @author dsmith
+ *
+ */
+public interface EndpointManager {
+ /**
+ * Get the endpoint for this server and create it if it does not already exist.
+ * This increments the reference count for the endpoint,
+ * so you should call {@link Endpoint#removeReference()} after
+ * the endpoint is no longer in use.
+ */
+ Endpoint referenceEndpoint(ServerLocation server, DistributedMember memberId);
+
+ /**
+ * Indicate that a particular server has crashed. All of the listeners will be notified
+ * that the server has crashed.
+ * @param endpoint
+ */
+ void serverCrashed(Endpoint endpoint);
+
+ /**
+ * Get the map of all endpoints currently in use.
+ * @return a map for ServerLocation->Endpoint
+ */
+ Map<ServerLocation, Endpoint> getEndpointMap();
+
+ void close();
+
+ /** Add a listener which will be notified when the state of
+ * an endpoint changes.
+ */
+ void addListener(EndpointManager.EndpointListener listener);
+
+ /**
+ * Remove a listener.
+ */
+ void removeListener(EndpointManager.EndpointListener listener);
+
+ /**
+ * Get the stats for all of the servers we ever connected too.
+ * @return a map of ServerLocation-> ConnectionStats
+ */
+ public Map getAllStats();
+
+ /**
+ * Test hook that returns the number of servers we currently have connections to.
+ */
+ public int getConnectedServerCount();
+
+ public static interface EndpointListener {
+
+ void endpointNoLongerInUse(Endpoint endpoint);
+
+ void endpointCrashed(Endpoint endpoint);
+
+ void endpointNowInUse(Endpoint endpoint);
+ }
+
+ public static class EndpointListenerAdapter implements EndpointListener {
+
+ public void endpointCrashed(Endpoint endpoint) {
+ }
+
+ public void endpointNoLongerInUse(Endpoint endpoint) {
+ }
+
+ public void endpointNowInUse(Endpoint endpoint) {
+ }
+ }
+
+ public String getPoolName();
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/EndpointManagerImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/EndpointManagerImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/EndpointManagerImpl.java
new file mode 100644
index 0000000..5bf3a48
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/EndpointManagerImpl.java
@@ -0,0 +1,301 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.CancelCriterion;
+import com.gemstone.gemfire.cache.CacheClosedException;
+import com.gemstone.gemfire.cache.client.PoolManager;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.DistributedSystem;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.DummyStatisticsFactory;
+import com.gemstone.gemfire.internal.cache.PoolStats;
+import com.gemstone.gemfire.internal.cache.execute.TransactionFunctionService;
+import com.gemstone.gemfire.internal.cache.tier.InternalBridgeMembership;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+/**
+ * @author dsmith
+ *
+ */
+public class EndpointManagerImpl implements EndpointManager {
+ private static final Logger logger = LogService.getLogger();
+
+ private volatile Map<ServerLocation, Endpoint> endpointMap = Collections.emptyMap();
+ private final Map/*<ServerLocation, ConnectionStats>*/<ServerLocation, ConnectionStats> statMap = new HashMap<ServerLocation, ConnectionStats>();
+ private final DistributedSystem ds;
+ private final String poolName;
+ private final EndpointListenerBroadcaster listener = new EndpointListenerBroadcaster();
+ protected final CancelCriterion cancelCriterion;
+ private final PoolStats poolStats;
+
+ public EndpointManagerImpl(String poolName, DistributedSystem ds,CancelCriterion cancelCriterion, PoolStats poolStats) {
+ this.ds = ds;
+ this.poolName = poolName;
+ this.cancelCriterion = cancelCriterion;
+ this.poolStats = poolStats;
+ listener.addListener(new EndpointListenerForBridgeMembership());
+ listener.addListener(new TransactionFunctionService.ListenerForTransactionFunctionService());
+ }
+
+ /* (non-Javadoc)
+ * @see com.gemstone.gemfire.cache.client.internal.EndpointManager#referenceEndpoint(com.gemstone.gemfire.distributed.internal.ServerLocation)
+ */
+ public Endpoint referenceEndpoint(ServerLocation server, DistributedMember memberId) {
+ //logger.warn("REFENDPOINT server:"+server+" memberId:"+memberId);
+ Endpoint endpoint = endpointMap.get(server);
+ boolean addedEndpoint = false;
+ if(endpoint == null || endpoint.isClosed()) {
+ synchronized(this) {
+ endpoint = endpointMap.get(server);
+ if(endpoint == null || endpoint.isClosed()) {
+ ConnectionStats stats = getStats(server);
+ Map<ServerLocation, Endpoint> endpointMapTemp = new HashMap<ServerLocation, Endpoint>(endpointMap);
+ endpoint = new Endpoint(this, ds, server, stats, memberId);
+ endpointMapTemp.put(server, endpoint);
+ endpointMap = Collections.unmodifiableMap(endpointMapTemp);
+ addedEndpoint = true;
+ poolStats.setServerCount(endpointMap.size());
+ }
+ }
+ }
+
+ endpoint.addReference();
+
+ if(addedEndpoint) {
+ //logger.warn("EMANFIRE2:JOIN:"+endpoint.getLocation()+" mid:"+endpoint.getMemberId());
+ listener.endpointNowInUse(endpoint);
+ } else {
+ //logger.warn("EMANFIRE33:NOJOIN:"+endpoint.getLocation()+" mid:"+endpoint.getMemberId());
+ }
+
+ return endpoint;
+ }
+
+ /* (non-Javadoc)
+ * @see com.gemstone.gemfire.cache.client.internal.EndpointManager#serverCrashed(com.gemstone.gemfire.cache.client.internal.Endpoint)
+ */
+ public void serverCrashed(Endpoint endpoint) {
+ removeEndpoint(endpoint, true);
+ }
+
+ void endpointNotInUse(Endpoint endpoint) {
+ removeEndpoint(endpoint, false);
+ }
+
+ /** Used by Endpoint only, when the reference count for this endpoint reaches 0 */
+ private void removeEndpoint(Endpoint endpoint, boolean crashed) {
+ endpoint.close();
+ boolean removedEndpoint = false;
+ synchronized(this) {
+ Map<ServerLocation, Endpoint> endpointMapTemp = new HashMap<ServerLocation, Endpoint>(endpointMap);
+ endpoint = endpointMapTemp.remove(endpoint.getLocation());
+ if(endpoint != null) {
+ endpointMap = Collections.unmodifiableMap(endpointMapTemp);
+ removedEndpoint = true;
+ }
+ poolStats.setServerCount(endpointMap.size());
+ }
+ if(removedEndpoint) {
+ PoolImpl pool = (PoolImpl)PoolManager.find(this.poolName);
+ if (pool != null && pool.getMultiuserAuthentication()) {
+ int size = 0;
+ ArrayList<ProxyCache> proxyCaches = pool.getProxyCacheList();
+ synchronized (proxyCaches) {
+ for (ProxyCache proxyCache : proxyCaches) {
+ try {
+ Long userId = proxyCache.getUserAttributes().getServerToId().remove(
+ endpoint.getLocation());
+ if (userId != null) {
+ ++size;
+ }
+ } catch (CacheClosedException cce) {
+ // If this call is triggered by a Cache.close(), then this can be
+ // expected.
+ }
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("EndpointManagerImpl.removeEndpoint() Removed server {} from {} user's ProxyCache", endpoint.getLocation(), size);
+ }
+ }
+ UserAttributes ua = UserAttributes.userAttributes.get();
+ if (ua != null) {
+ Long userId = ua.getServerToId().remove(endpoint.getLocation());
+ if (userId != null && logger.isDebugEnabled()) {
+ logger.debug("EndpointManagerImpl.removeEndpoint() Removed server {} from thread local variable", endpoint.getLocation());
+ }
+ }
+ } else if (pool != null && !pool.getMultiuserAuthentication()) {
+ endpoint.getLocation().setUserId(-1);
+ }
+ if(crashed) {
+ listener.endpointCrashed(endpoint);
+ }
+ else {
+ listener.endpointNoLongerInUse(endpoint);
+ }
+ }
+ }
+
+
+
+ /* (non-Javadoc)
+ * @see com.gemstone.gemfire.cache.client.internal.EndpointManager#getEndpointMap()
+ */
+ public Map<ServerLocation, Endpoint> getEndpointMap() {
+ return endpointMap;
+ }
+
+ /* (non-Javadoc)
+ * @see com.gemstone.gemfire.cache.client.internal.EndpointManager#close()
+ */
+ public synchronized void close() {
+ for(Iterator<ConnectionStats> itr = statMap.values().iterator(); itr.hasNext(); ) {
+ ConnectionStats stats = itr.next();
+ stats.close();
+ }
+
+ statMap.clear();
+ endpointMap = Collections.emptyMap();
+ listener.clear();
+ }
+
+ /* (non-Javadoc)
+ * @see com.gemstone.gemfire.cache.client.internal.EndpointManager#addListener(com.gemstone.gemfire.cache.client.internal.EndpointManagerImpl.EndpointListener)
+ */
+ public void addListener(EndpointManager.EndpointListener listener) {
+ this.listener.addListener(listener);
+ }
+
+ /* (non-Javadoc)
+ * @see com.gemstone.gemfire.cache.client.internal.EndpointManager#removeListener(com.gemstone.gemfire.cache.client.internal.EndpointManagerImpl.EndpointListener)
+ */
+ public void removeListener(EndpointManager.EndpointListener listener) {
+ this.listener.removeListener(listener);
+ }
+
+ private synchronized ConnectionStats getStats(ServerLocation location) {
+ ConnectionStats stats = statMap.get(location);
+ if(stats == null) {
+ String statName = poolName + "-" + location.toString();
+ PoolImpl pool = (PoolImpl)PoolManager.find(this.poolName);
+ if (pool != null) {
+ if (pool.getGatewaySender() != null) {
+ stats = new ConnectionStats(new DummyStatisticsFactory(), statName,
+ this.poolStats/*, this.gatewayStats*/);
+ }
+ }
+ if (stats == null) {
+ stats = new ConnectionStats(ds, statName, this.poolStats/*,
+ this.gatewayStats*/);
+ }
+ statMap.put(location, stats);
+ }
+
+ return stats;
+ }
+
+ public synchronized Map<ServerLocation, ConnectionStats> getAllStats() {
+ return new HashMap<ServerLocation, ConnectionStats>(statMap);
+ }
+
+ public int getConnectedServerCount() {
+ return getEndpointMap().size();
+ }
+
+ public static void loadEmergencyClasses() {
+ //do nothing
+ }
+
+ protected static class EndpointListenerBroadcaster implements EndpointManager.EndpointListener {
+
+ private volatile Set/*<EndpointListener>*/<EndpointListener> endpointListeners = Collections.emptySet();
+
+ public synchronized void addListener(EndpointManager.EndpointListener listener) {
+ HashSet<EndpointListener> tmpListeners = new HashSet<EndpointListener>(endpointListeners);
+ tmpListeners.add(listener);
+ endpointListeners = Collections.unmodifiableSet(tmpListeners);
+ }
+
+ public synchronized void clear() {
+ endpointListeners = Collections.emptySet();
+ }
+
+ public void removeListener(EndpointManager.EndpointListener listener) {
+ HashSet<EndpointListener> tmpListeners = new HashSet<EndpointListener>(endpointListeners);
+ tmpListeners.remove(listener);
+ endpointListeners = Collections.unmodifiableSet(tmpListeners);
+ }
+
+ public void endpointCrashed(Endpoint endpoint) {
+ for(Iterator<EndpointListener> itr = endpointListeners.iterator(); itr.hasNext(); ) {
+ EndpointManager.EndpointListener listener = itr.next();
+ listener.endpointCrashed(endpoint);
+ }
+ }
+
+ public void endpointNoLongerInUse(Endpoint endpoint) {
+ for(Iterator<EndpointListener> itr = endpointListeners.iterator(); itr.hasNext(); ) {
+ EndpointManager.EndpointListener listener = itr.next();
+ listener.endpointNoLongerInUse(endpoint);
+ }
+ }
+
+ public void endpointNowInUse(Endpoint endpoint) {
+ //logger.warn("HIGHUP:JOIN:"+endpoint.getLocation());
+ for(Iterator<EndpointListener> itr = endpointListeners.iterator(); itr.hasNext(); ) {
+ EndpointManager.EndpointListener listener = itr.next();
+ listener.endpointNowInUse(endpoint);
+ }
+ }
+ }
+
+
+
+ public class EndpointListenerForBridgeMembership implements EndpointManager.EndpointListener {
+
+ public void endpointCrashed(Endpoint endpoint) {
+ if(endpoint.getMemberId()==null || cancelCriterion.cancelInProgress()!=null) {
+ return;
+ }
+ //logger.warn("EMANFIRE:CRASH:"+endpoint.getLocation());
+ InternalBridgeMembership.notifyCrashed(endpoint.getMemberId(), false);
+ }
+
+ public void endpointNoLongerInUse(Endpoint endpoint) {
+ if(endpoint.getMemberId()==null || cancelCriterion.cancelInProgress()!=null) {
+ return;
+ }
+ //logger.warn("EMANFIRE:LEFT:"+endpoint.getLocation());
+ InternalBridgeMembership.notifyLeft(endpoint.getMemberId(), false);
+ }
+
+ public void endpointNowInUse(Endpoint endpoint) {
+ if(cancelCriterion.cancelInProgress()!=null) {
+ return;
+ }
+ //logger.warn("EMANFIRE:JOIN:"+endpoint.getLocation()+" mid:"+endpoint.getMemberId(),new Exception());
+ InternalBridgeMembership.notifyJoined(endpoint.getMemberId(), false);
+ }
+ }
+
+ public String getPoolName() {
+ return poolName;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecutablePool.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecutablePool.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecutablePool.java
new file mode 100644
index 0000000..1477a7e
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecutablePool.java
@@ -0,0 +1,141 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+
+import com.gemstone.gemfire.cache.NoSubscriptionServersAvailableException;
+import com.gemstone.gemfire.cache.client.SubscriptionNotEnabledException;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+
+/**
+ * Provides methods to execute AbstractOp instances on a client pool.
+ * @author darrel
+ * @since 5.7
+ */
+public interface ExecutablePool {
+ /**
+ * Execute the given op on the servers that this pool connects to.
+ * This method is responsible for retrying the op if an attempt fails.
+ * It will only execute it once and on one server.
+ * @param op the operation to execute
+ * @return the result of execution if any; null if not
+ * @since 5.7
+ */
+ public Object execute(Op op);
+
+ /**
+ * Execute the given op on the servers that this pool connects to.
+ * This method is responsible for retrying the op if an attempt fails.
+ * It will only execute it once and on one server.
+ * @param op the operation to execute
+ * @return the result of execution if any; null if not
+ * @since 5.7
+ */
+ public Object execute(Op op, int retryAttempts);
+
+ /**
+ * Execute the given op on all the servers that have server-to-client queues
+ * for this pool The last exception from any server will be thrown if the op fails.
+ * The op is executed with the primary first, followed by the backups.
+ *
+ * @param op
+ * the operation to execute.
+ * @throws NoSubscriptionServersAvailableException if we have no queue server
+ * @throws SubscriptionNotEnabledException If the pool does not have queues enabled
+ */
+ public void executeOnAllQueueServers(Op op) throws NoSubscriptionServersAvailableException, SubscriptionNotEnabledException;
+ /**
+ * Execute the given op on all the servers that have server-to-client queues
+ * for this pool. The op will be executed on all backups, and then the primary.
+ * This method will block until a primary is available.
+ *
+ * @param op
+ * the operation to execute
+ * @return The result from the primary server.
+ * @throws NoSubscriptionServersAvailableException if we have no queue server
+ * @throws SubscriptionNotEnabledException If the pool does not have queues enabled
+ * @since 5.7
+ */
+ public Object executeOnQueuesAndReturnPrimaryResult(Op op) throws NoSubscriptionServersAvailableException, SubscriptionNotEnabledException;
+ /**
+ * Execute the given op on the given server.
+ * @param server the server to do the execution on
+ * @param op the operation to execute
+ * @return the result of execution if any; null if not
+ */
+ public Object executeOn(ServerLocation server, Op op);
+ /**
+ * Execute the given op on the given server.
+ * @param server the server to do the execution on
+ * @param op the operation to execute
+ * @param accessed true if the connection is accessed by this execute
+ * @return the result of execution if any; null if not
+ */
+ public Object executeOn(ServerLocation server, Op op, boolean accessed,boolean onlyUseExistingCnx);
+ /**
+ * Execute the given op on the given connection.
+ * @param con the connection to do the execution on
+ * @param op the operation to execute
+ * @return the result of execution if any; null if not
+ */
+ public Object executeOn(Connection con, Op op);
+ /**
+ * Execute the given op on the given connection.
+ * @param con the connection to do the execution on
+ * @param op the operation to execute
+ * @param timeoutFatal true if a timeout exception should be treated as a fatal one
+ * @return the result of execution if any; null if not
+ */
+ public Object executeOn(Connection con, Op op, boolean timeoutFatal);
+ /**
+ * Execute the given op on the current primary server.
+ * @param op the operation to execute
+ * @return the result of execution if any; null if not
+ */
+ public Object executeOnPrimary(Op op);
+ public RegisterInterestTracker getRITracker();
+
+ /**
+ * Release the connection held by the calling
+ * thread if we're using thread local connections
+ */
+ void releaseThreadLocalConnection();
+
+ /**
+ * The calling thread will connect to only one server for
+ * executing all ops until it calls {@link #releaseServerAffinity()}
+ * @param allowFailover true if we want to failover to another
+ * server when the first server is unreachable. Affinity to the
+ * new server will be maintained
+ * @since 6.6
+ */
+ public void setupServerAffinity(boolean allowFailover);
+
+ /**
+ * Release the server affinity established
+ * by {@link #setupServerAffinity(boolean)}
+ * @since 6.6
+ */
+ public void releaseServerAffinity();
+
+ /**
+ * When server affinity is enabled by this thread, returns the server against which all ops in this thread are performed
+ * @return location of the affinity server
+ * @since 6.6
+ * @see ExecutablePool#setupServerAffinity(boolean)
+ */
+ public ServerLocation getServerAffinityLocation();
+
+ /**
+ * All subsequent operations by this thread will be performed on
+ * the given ServerLocation. Used for resuming suspended transactions.
+ * @param serverLocation
+ * @since 6.6
+ */
+ public void setServerAffinityLocation(ServerLocation serverLocation);
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteFunctionHelper.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteFunctionHelper.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteFunctionHelper.java
new file mode 100644
index 0000000..bced6de
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteFunctionHelper.java
@@ -0,0 +1,21 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+public class ExecuteFunctionHelper {
+
+ public final static byte BUCKETS_AS_FILTER_MASK = 0x02;
+ public final static byte IS_REXECUTE_MASK = 0x01;
+
+ static byte createFlags(boolean executeOnBucketSet, byte isReExecute) {
+ byte flags = executeOnBucketSet ?
+ (byte)(0x00 | BUCKETS_AS_FILTER_MASK) : 0x00;
+ flags = isReExecute == 1? (byte)(flags | IS_REXECUTE_MASK) : flags;
+ return flags;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteFunctionNoAckOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteFunctionNoAckOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteFunctionNoAckOp.java
new file mode 100755
index 0000000..706e8c4
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteFunctionNoAckOp.java
@@ -0,0 +1,234 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.cache.execute.Function;
+import com.gemstone.gemfire.cache.execute.FunctionException;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.cache.execute.AbstractExecution;
+import com.gemstone.gemfire.internal.cache.execute.MemberMappedArgument;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+
+/**
+ * Does a Execution of function on server (possibly without region/cache)
+ * It does not get the resulf from the server (follows Fire&Forget approch)
+ * @author Suranjan Kumar
+ * @since 5.8Beta
+ */
+public class ExecuteFunctionNoAckOp {
+
+ private static final Logger logger = LogService.getLogger();
+
+ private ExecuteFunctionNoAckOp() {
+ // no instances allowed
+ }
+
+ /**
+ * Does a execute Function on a server using connections from the given pool
+ * to communicate with the server.
+ *
+ * @param pool
+ * the pool to use to communicate with the server.
+ * @param function
+ * of the function to be executed
+ * @param args
+ * specified arguments to the application function
+ */
+ public static void execute(PoolImpl pool, Function function,
+ Object args, MemberMappedArgument memberMappedArg,
+ boolean allServers, byte hasResult, boolean isFnSerializationReqd, String[] groups) {
+ List servers = null;
+ AbstractOp op = new ExecuteFunctionNoAckOpImpl(function, args, memberMappedArg,
+ hasResult, isFnSerializationReqd, groups, allServers);
+ try {
+ // In case of allServers getCurrentServers and call
+ // executeOn(ServerLocation server, Op op)
+ if (allServers && groups.length == 0) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("ExecuteFunctionNoAckOp#execute : Sending Function Execution Message:" + op.getMessage() + " to all servers using pool: " +pool);
+ }
+ servers = pool.getCurrentServers();
+ Iterator i = servers.iterator();
+ while (i.hasNext()) {
+ pool.executeOn((ServerLocation)i.next(), op);
+ }
+ }
+ else {
+ if (logger.isDebugEnabled()) {
+ logger.debug("ExecuteFunctionNoAckOp#execute : Sending Function Execution Message:" + op.getMessage() + " to server using pool: " + pool + " with groups:" + Arrays.toString(groups) + " all members:" + allServers);
+ }
+ pool.execute(op,0);
+ }
+ }
+ catch (Exception ex) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("ExecuteFunctionNoAckOp#execute : Exception occured while Sending Function Execution Message:" + op.getMessage() + " to server using pool: " +pool, ex);
+ }
+ if (ex.getMessage() != null)
+ throw new FunctionException(ex.getMessage(), ex);
+ else
+ throw new FunctionException(
+ "Unexpected exception during function execution:", ex);
+ }
+ }
+
+ public static void execute(PoolImpl pool, String functionId,
+ Object args, MemberMappedArgument memberMappedArg,
+ boolean allServers, byte hasResult, boolean isFnSerializationReqd, boolean isHA, boolean optimizeForWrite, String[] groups) {
+ List servers = null;
+ AbstractOp op = new ExecuteFunctionNoAckOpImpl(functionId, args, memberMappedArg,
+ hasResult, isFnSerializationReqd, isHA, optimizeForWrite, groups, allServers);
+ try {
+ // In case of allServers getCurrentServers and call
+ // executeOn(ServerLocation server, Op op)
+ if (allServers && groups.length == 0) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("ExecuteFunctionNoAckOp#execute : Sending Function Execution Message:" + op.getMessage() + " to all servers using pool: " +pool);
+ }
+ servers = pool.getCurrentServers();
+ Iterator i = servers.iterator();
+ while (i.hasNext()) {
+ pool.executeOn((ServerLocation)i.next(), op);
+ }
+ }
+ else {
+ if (logger.isDebugEnabled()) {
+ logger.debug("ExecuteFunctionNoAckOp#execute : Sending Function Execution Message:" + op.getMessage() + " to server using pool: " + pool + " with groups:" + Arrays.toString(groups) + " all members:" + allServers);
+ }
+ pool.execute(op,0);
+ }
+ }
+ catch (Exception ex) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("ExecuteFunctionNoAckOp#execute : Exception occured while Sending Function Execution Message:" + op.getMessage() + " to server using pool: " +pool, ex);
+ }
+ if (ex.getMessage() != null)
+ throw new FunctionException(ex.getMessage(), ex);
+ else
+ throw new FunctionException(
+ "Unexpected exception during function execution:", ex);
+ }
+ }
+
+ private static class ExecuteFunctionNoAckOpImpl extends AbstractOp {
+
+ /**
+ * number of parts in the request message
+ */
+ private static final int MSG_PARTS = 6;
+
+ /**
+ * @throws com.gemstone.gemfire.SerializationException
+ * if serialization fails
+ */
+ public ExecuteFunctionNoAckOpImpl(Function function, Object args,
+ MemberMappedArgument memberMappedArg, byte hasResult,
+ boolean isFnSerializationReqd, String[] groups, boolean allMembers) {
+ super(MessageType.EXECUTE_FUNCTION, MSG_PARTS);
+ byte functionState = AbstractExecution.getFunctionState(function.isHA(),
+ function.hasResult(), function.optimizeForWrite());
+ getMessage().addBytesPart(new byte[]{functionState});
+ if(isFnSerializationReqd){
+ getMessage().addStringOrObjPart(function);
+ }
+ else{
+ getMessage().addStringOrObjPart(function.getId());
+ }
+ getMessage().addObjPart(args);
+ getMessage().addObjPart(memberMappedArg);
+ getMessage().addObjPart(groups);
+ getMessage().addBytesPart(ExecuteFunctionOp.getByteArrayForFlags(allMembers));
+ }
+
+ /**
+ * @param functionId
+ * @param args
+ * @param memberMappedArg
+ * @param hasResult
+ * @param isFnSerializationReqd
+ * @param isHA
+ * @param optimizeForWrite
+ */
+ public ExecuteFunctionNoAckOpImpl(String functionId, Object args,
+ MemberMappedArgument memberMappedArg, byte hasResult,
+ boolean isFnSerializationReqd, boolean isHA, boolean optimizeForWrite,
+ String[] groups, boolean allMembers) {
+ super(MessageType.EXECUTE_FUNCTION, MSG_PARTS);
+ getMessage().addBytesPart(
+ new byte[] { AbstractExecution.getFunctionState(isHA,
+ hasResult == (byte)1 ? true : false, optimizeForWrite) });
+ getMessage().addStringOrObjPart(functionId);
+ getMessage().addObjPart(args);
+ getMessage().addObjPart(memberMappedArg);
+ getMessage().addObjPart(groups);
+ getMessage().addBytesPart(ExecuteFunctionOp.getByteArrayForFlags(allMembers));
+ }
+
+ @Override
+ protected Object processResponse(Message msg) throws Exception {
+ final int msgType = msg.getMessageType();
+ if (msgType == MessageType.REPLY) {
+ return null;
+ }
+ else {
+ Part part = msg.getPart(0);
+ if (msgType == MessageType.EXCEPTION) {
+ Throwable t = (Throwable)part.getObject();
+ logger.warn(LocalizedMessage.create(LocalizedStrings.EXECUTE_FUNCTION_NO_HAS_RESULT_RECEIVED_EXCEPTION), t);
+ }
+ else if (isErrorResponse(msgType)) {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.EXECUTE_FUNCTION_NO_HAS_RESULT_RECEIVED_EXCEPTION)); // TODO:LOG:FIXED: was ", part.getString());" which makes no sense
+ }
+ else {
+ throw new InternalGemFireError("Unexpected message type "
+ + MessageType.getString(msgType));
+ }
+ return null;
+ }
+ }
+
+ @Override
+ protected boolean isErrorResponse(int msgType) {
+ return msgType == MessageType.EXECUTE_FUNCTION_ERROR;
+ }
+
+ @Override
+ protected long startAttempt(ConnectionStats stats) {
+ return stats.startExecuteFunction();
+ }
+
+ @Override
+ protected void endSendAttempt(ConnectionStats stats, long start) {
+ stats.endExecuteFunctionSend(start, hasFailed());
+ }
+
+ @Override
+ protected void endAttempt(ConnectionStats stats, long start) {
+ stats.endExecuteFunction(start, hasTimedOut(), hasFailed());
+ }
+
+ @Override
+ protected Message createResponseMessage() {
+ return new Message(1, Version.CURRENT);
+ }
+ }
+
+}