You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by rv...@apache.org on 2015/04/28 23:40:10 UTC
[05/51] [partial] incubator-geode git commit: Init
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterInstantiatorsOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterInstantiatorsOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterInstantiatorsOp.java
new file mode 100644
index 0000000..f3c9f03
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterInstantiatorsOp.java
@@ -0,0 +1,171 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.io.IOException;
+
+import com.gemstone.gemfire.Instantiator;
+import com.gemstone.gemfire.SerializationException;
+import com.gemstone.gemfire.internal.InternalInstantiator.InstantiatorAttributesHolder;
+import com.gemstone.gemfire.internal.cache.BridgeObserver;
+import com.gemstone.gemfire.internal.cache.BridgeObserverHolder;
+import com.gemstone.gemfire.internal.cache.EventID;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.util.BlobHelper;
+
+/**
+ * Register a bunch of instantiators on a server
+ * @author darrel
+ * @since 5.7
+ */
+public class RegisterInstantiatorsOp {
+ /**
+ * Register a bunch of instantiators on a server
+ * using connections from the given pool
+ * to communicate with the server.
+ * @param pool the pool to use to communicate with the server.
+ * @param instantiators the instantiators to register
+ * @param eventId the id of this event
+ */
+ public static void execute(ExecutablePool pool,
+ Instantiator[] instantiators,
+ EventID eventId)
+ {
+ AbstractOp op = new RegisterInstantiatorsOpImpl(instantiators, eventId);
+ pool.execute(op, Integer.MAX_VALUE);
+ }
+
+ /**
+ * Register a bunch of instantiators on a server using connections from the
+ * given pool to communicate with the server.
+ *
+ * @param pool
+ * the pool to use to communicate with the server.
+ * @param holders
+ * the {@link InstantiatorAttributesHolder}s containing info about
+ * the instantiators to register
+ * @param eventId
+ * the id of this event
+ */
+ public static void execute(ExecutablePool pool,
+ Object[] holders, EventID eventId) {
+ AbstractOp op = new RegisterInstantiatorsOpImpl(holders,
+ eventId);
+ pool.execute(op, Integer.MAX_VALUE);
+ }
+
+ private RegisterInstantiatorsOp() {
+ // no instances allowed
+ }
+
+ private static class RegisterInstantiatorsOpImpl extends AbstractOp {
+ /**
+ * @throws com.gemstone.gemfire.SerializationException if serialization fails
+ */
+ public RegisterInstantiatorsOpImpl(Instantiator[] instantiators,
+ EventID eventId) {
+ super(MessageType.REGISTER_INSTANTIATORS, instantiators.length * 3 + 1);
+ for(int i = 0; i < instantiators.length; i++) {
+ Instantiator instantiator = instantiators[i];
+ // strip '.class' off these class names
+ String className = instantiator.getClass().toString().substring(6);
+ String instantiatedClassName = instantiator.getInstantiatedClass().toString().substring(6);
+ try {
+ getMessage().addBytesPart(BlobHelper.serializeToBlob(className));
+ getMessage().addBytesPart(BlobHelper.serializeToBlob(instantiatedClassName));
+ } catch (IOException ex) {
+ throw new SerializationException("failed serializing object", ex);
+ }
+ getMessage().addIntPart(instantiator.getId());
+ }
+ getMessage().addBytesPart(eventId.calcBytes());
+// // // CALLBACK FOR TESTING PURPOSE ONLY ////
+ if (PoolImpl.IS_INSTANTIATOR_CALLBACK) {
+ BridgeObserver bo = BridgeObserverHolder.getInstance();
+ bo.beforeSendingToServer(eventId);
+ }
+ }
+
+ /**
+ * @throws com.gemstone.gemfire.SerializationException
+ * if serialization fails
+ */
+ public RegisterInstantiatorsOpImpl(Object[] holders,
+ EventID eventId) {
+ super(MessageType.REGISTER_INSTANTIATORS, holders.length * 3 + 1);
+ for (Object obj : holders) {
+ String instantiatorClassName = null;
+ String instantiatedClassName = null;
+ int id = 0;
+ if (obj instanceof Instantiator) {
+ instantiatorClassName = ((Instantiator)obj).getClass().getName();
+ instantiatedClassName = ((Instantiator)obj).getInstantiatedClass()
+ .getName();
+ id = ((Instantiator)obj).getId();
+ } else {
+ instantiatorClassName = ((InstantiatorAttributesHolder)obj)
+ .getInstantiatorClassName();
+ instantiatedClassName = ((InstantiatorAttributesHolder)obj)
+ .getInstantiatedClassName();
+ id = ((InstantiatorAttributesHolder)obj).getId();
+ }
+ try {
+ getMessage().addBytesPart(
+ BlobHelper.serializeToBlob(instantiatorClassName));
+ getMessage().addBytesPart(
+ BlobHelper.serializeToBlob(instantiatedClassName));
+ } catch (IOException ex) {
+ throw new SerializationException("failed serializing object", ex);
+ }
+ getMessage().addIntPart(id);
+ }
+ getMessage().addBytesPart(eventId.calcBytes());
+ // // // CALLBACK FOR TESTING PURPOSE ONLY ////
+ if (PoolImpl.IS_INSTANTIATOR_CALLBACK) {
+ BridgeObserver bo = BridgeObserverHolder.getInstance();
+ bo.beforeSendingToServer(eventId);
+ }
+ }
+
+ @Override
+ protected Object processResponse(Message msg) throws Exception {
+ processAck(msg, "registerInstantiators");
+ return null;
+ }
+ @Override
+ protected boolean isErrorResponse(int msgType) {
+ return false;
+ }
+ @Override
+ protected long startAttempt(ConnectionStats stats) {
+ return stats.startRegisterInstantiators();
+ }
+ @Override
+ protected void endSendAttempt(ConnectionStats stats, long start) {
+ stats.endRegisterInstantiatorsSend(start, hasFailed());
+ }
+ @Override
+ protected void endAttempt(ConnectionStats stats, long start) {
+ stats.endRegisterInstantiators(start, hasTimedOut(), hasFailed());
+ }
+ @Override
+ protected void processSecureBytes(Connection cnx, Message message)
+ throws Exception {
+ }
+ @Override
+ protected boolean needsUserId() {
+ return false;
+ }
+ @Override
+ protected void sendMessage(Connection cnx) throws Exception {
+ getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
+ getMessage().send(false);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterInterestListOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterInterestListOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterInterestListOp.java
new file mode 100644
index 0000000..873f3d6
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterInterestListOp.java
@@ -0,0 +1,138 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.List;
+
+import com.gemstone.gemfire.cache.InterestResultPolicy;
+import com.gemstone.gemfire.cache.client.internal.RegisterInterestOp.RegisterInterestOpImpl;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+
+/**
+ * Does a region registerInterestList on a server
+ * @author darrel
+ * @since 5.7
+ */
+public class RegisterInterestListOp {
+ /**
+ * Does a region registerInterestList on a server using connections from the given pool
+ * to communicate with the server.
+ * @param pool the pool to use to communicate with the server.
+ * @param region the name of the region to do the registerInterestList on
+ * @param keys list of keys we are interested in
+ * @param policy the interest result policy for this registration
+ * @param isDurable true if this registration is durable
+ * @param regionDataPolicy the data policy ordinal of the region
+ * @return list of keys
+ */
+ public static List execute(ExecutablePool pool,
+ String region,
+ List keys,
+ InterestResultPolicy policy,
+ boolean isDurable,
+ boolean receiveUpdatesAsInvalidates,
+ byte regionDataPolicy)
+ {
+ AbstractOp op = new RegisterInterestListOpImpl(region, keys, policy,
+ isDurable, receiveUpdatesAsInvalidates, regionDataPolicy);
+ return (List) pool.executeOnQueuesAndReturnPrimaryResult(op);
+ }
+
+ private RegisterInterestListOp() {
+ // no instances allowed
+ }
+ /**
+ * Does a region registerInterestList on a server using connections from the given pool
+ * to communicate with the given server location.
+ * @param sl the server to do the register interest on.
+ * @param pool the pool to use to communicate with the server.
+ * @param region the name of the region to do the registerInterest on
+ * @param keys describes what we are interested in
+ * @param policy the interest result policy for this registration
+ * @param isDurable true if this registration is durable
+ * @param regionDataPolicy the data policy ordinal of the region
+ * @return list of keys
+ */
+ public static List executeOn(ServerLocation sl,
+ ExecutablePool pool,
+ String region,
+ List keys,
+ InterestResultPolicy policy,
+ boolean isDurable,
+ boolean receiveUpdatesAsInvalidates,
+ byte regionDataPolicy)
+ {
+ AbstractOp op = new RegisterInterestListOpImpl(region, keys, policy,
+ isDurable, receiveUpdatesAsInvalidates, regionDataPolicy);
+ return (List) pool.executeOn(sl, op);
+ }
+
+
+ /**
+ * Does a region registerInterestList on a server using connections from the given pool
+ * to communicate with the given server location.
+ * @param conn the connection to do the register interest on.
+ * @param pool the pool to use to communicate with the server.
+ * @param region the name of the region to do the registerInterest on
+ * @param keys describes what we are interested in
+ * @param policy the interest result policy for this registration
+ * @param isDurable true if this registration is durable
+ * @param regionDataPolicy the data policy ordinal of the region
+ * @return list of keys
+ */
+ public static List executeOn(Connection conn,
+ ExecutablePool pool,
+ String region,
+ List keys,
+ InterestResultPolicy policy,
+ boolean isDurable,
+ boolean receiveUpdatesAsInvalidates,
+ byte regionDataPolicy)
+ {
+ AbstractOp op = new RegisterInterestListOpImpl(region, keys, policy,
+ isDurable, receiveUpdatesAsInvalidates, regionDataPolicy);
+ return (List) pool.executeOn(conn, op);
+ }
+
+ private static class RegisterInterestListOpImpl extends RegisterInterestOpImpl {
+ /**
+ * @throws com.gemstone.gemfire.SerializationException if serialization fails
+ */
+ public RegisterInterestListOpImpl(String region,
+ List keys,
+ InterestResultPolicy policy,
+ boolean isDurable,
+ boolean receiveUpdatesAsInvalidates,
+ byte regionDataPolicy) {
+ super(region, MessageType.REGISTER_INTEREST_LIST, 6);
+ getMessage().addStringPart(region);
+ getMessage().addObjPart(policy);
+ {
+ byte durableByte = (byte)(isDurable ? 0x01 : 0x00);
+ getMessage().addBytesPart(new byte[] {durableByte});
+ }
+ //Set chunk size of HDOS for keys
+ getMessage().setChunkSize(keys.size()*16);
+ getMessage().addObjPart(keys);
+
+ byte notifyByte = (byte)(receiveUpdatesAsInvalidates ? 0x01 : 0x00);
+ getMessage().addBytesPart(new byte[] {notifyByte});
+
+ // The second byte '1' below tells server to serialize values in VersionObjectList.
+ // Java clients always expect serializeValues to be true in VersionObjectList unlike Native clients.
+ // This was being sent as part of GetAllOp prior to fixing #43684.
+ getMessage().addBytesPart(new byte[] {regionDataPolicy, (byte)0x01});
+ }
+ @Override
+ protected String getOpName() {
+ return "registerInterestList";
+ }
+ // note we reuse the same stats used by RegisterInterestOpImpl
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterInterestOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterInterestOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterInterestOp.java
new file mode 100644
index 0000000..db0a47f
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterInterestOp.java
@@ -0,0 +1,287 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ChunkedMessage;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
+import com.gemstone.gemfire.internal.cache.tier.sockets.VersionedObjectList;
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.internal.cache.tier.InterestType;
+import com.gemstone.gemfire.cache.InterestResultPolicy;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+
+import java.util.ArrayList;
+import java.util.List;
+/**
+ * Does a region registerInterest on a server
+ * @author darrel
+ * @since 5.7
+ */
+public class RegisterInterestOp {
+ /**
+ * Does a region registerInterest on a server using connections from the given pool
+ * to communicate with the server.
+ * @param pool the pool to use to communicate with the server.
+ * @param region the name of the region to do the registerInterest on
+ * @param key describes what we are interested in
+ * @param interestType the {@link InterestType} for this registration
+ * @param policy the interest result policy for this registration
+ * @param isDurable true if this registration is durable
+ * @param regionDataPolicy the data policy ordinal of the region
+ * @return list of keys
+ */
+ public static List execute(ExecutablePool pool,
+ String region,
+ Object key,
+ int interestType,
+ InterestResultPolicy policy,
+ boolean isDurable,
+ boolean receiveUpdatesAsInvalidates,
+ byte regionDataPolicy)
+ {
+ AbstractOp op = new RegisterInterestOpImpl(region, key,
+ interestType, policy, isDurable, receiveUpdatesAsInvalidates, regionDataPolicy);
+ return (List) pool.executeOnQueuesAndReturnPrimaryResult(op);
+ }
+
+ /**
+ * Does a region registerInterest on a server using connections from the given pool
+ * to communicate with the given server location.
+ * @param sl the server to do the register interest on.
+ * @param pool the pool to use to communicate with the server.
+ * @param region the name of the region to do the registerInterest on
+ * @param key describes what we are interested in
+ * @param interestType the {@link InterestType} for this registration
+ * @param policy the interest result policy for this registration
+ * @param isDurable true if this registration is durable
+ * @param regionDataPolicy the data policy ordinal of the region
+ * @return list of keys
+ */
+ public static List executeOn(ServerLocation sl,
+ ExecutablePool pool,
+ String region,
+ Object key,
+ int interestType,
+ InterestResultPolicy policy,
+ boolean isDurable,
+ boolean receiveUpdatesAsInvalidates,
+ byte regionDataPolicy)
+ {
+ AbstractOp op = new RegisterInterestOpImpl(region, key,
+ interestType, policy, isDurable, receiveUpdatesAsInvalidates, regionDataPolicy);
+ return (List) pool.executeOn(sl, op);
+ }
+
+
+ /**
+ * Does a region registerInterest on a server using connections from the given pool
+ * to communicate with the given server location.
+ * @param conn the connection to do the register interest on.
+ * @param pool the pool to use to communicate with the server.
+ * @param region the name of the region to do the registerInterest on
+ * @param key describes what we are interested in
+ * @param interestType the {@link InterestType} for this registration
+ * @param policy the interest result policy for this registration
+ * @param isDurable true if this registration is durable
+ * @param regionDataPolicy the data policy ordinal of the region
+ * @return list of keys
+ */
+ public static List executeOn(Connection conn,
+ ExecutablePool pool,
+ String region,
+ Object key,
+ int interestType,
+ InterestResultPolicy policy,
+ boolean isDurable,
+ boolean receiveUpdatesAsInvalidates,
+ byte regionDataPolicy)
+ {
+ AbstractOp op = new RegisterInterestOpImpl(region, key,
+ interestType, policy, isDurable, receiveUpdatesAsInvalidates, regionDataPolicy);
+ return (List) pool.executeOn(conn, op);
+ }
+
+
+ private RegisterInterestOp() {
+ // no instances allowed
+ }
+
+ protected static class RegisterInterestOpImpl extends AbstractOp {
+ protected String region;
+ /**
+ * @throws com.gemstone.gemfire.SerializationException if serialization fails
+ */
+ public RegisterInterestOpImpl(String region,
+ Object key,
+ int interestType,
+ InterestResultPolicy policy,
+ boolean isDurable,
+ boolean receiveUpdatesAsInvalidates,
+ byte regionDataPolicy) {
+ super(MessageType.REGISTER_INTEREST, 7);
+ this.region = region;
+ getMessage().addStringPart(region);
+ getMessage().addIntPart(interestType);
+ getMessage().addObjPart(policy);
+ {
+ byte durableByte = (byte)(isDurable ? 0x01 : 0x00);
+ getMessage().addBytesPart(new byte[] {durableByte});
+ }
+ getMessage().addStringOrObjPart(key);
+ byte notifyByte = (byte)(receiveUpdatesAsInvalidates ? 0x01 : 0x00);
+ getMessage().addBytesPart(new byte[] {notifyByte});
+
+ // The second byte '1' below tells server to serialize values in VersionObjectList.
+ // Java clients always expect serializeValues to be true in VersionObjectList unlike Native clients.
+ // This was being sent as part of GetAllOp prior to fixing #43684.
+ getMessage().addBytesPart(new byte[] {regionDataPolicy, (byte)0x01});
+ }
+ /**
+ * This constructor is used by our subclass CreateCQWithIROpImpl
+ * @throws com.gemstone.gemfire.SerializationException if serialization fails
+ */
+ protected RegisterInterestOpImpl(String region, int msgType, int numParts) {
+ super(msgType, numParts);
+ this.region = region;
+ }
+
+ @Override
+ protected Message createResponseMessage() {
+ return new ChunkedMessage(1, Version.CURRENT);
+ }
+
+ @Override
+ protected Object processResponse(Message msg) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected Object processResponse(Message m, Connection con) throws Exception {
+ ChunkedMessage msg = (ChunkedMessage)m;
+ msg.readHeader();
+ switch (msg.getMessageType()) {
+ case MessageType.RESPONSE_FROM_PRIMARY: {
+ ArrayList serverKeys = new ArrayList();
+ VersionedObjectList serverEntries = null;
+ LocalRegion r = null;
+
+ try {
+ r = (LocalRegion)GemFireCacheImpl.getInstance().getRegion(this.region);
+ }catch(Exception ex) {
+ //ignore but read message
+ // GemFireCacheImpl.getInstance().getLogger().config("hitesh error " + ex.getClass());
+ }
+
+ ArrayList list = new ArrayList();
+ ArrayList listOfList = new ArrayList();
+ listOfList.add(list);
+
+ // Process the chunks
+ do {
+ // Read the chunk
+ msg.receiveChunk();
+
+ // Deserialize the result
+ Part part = msg.getPart(0);
+
+ Object partObj = part.getObject();
+ if (partObj instanceof Throwable) {
+ String s = "While performing a remote " + getOpName();
+ throw new ServerOperationException(s, (Throwable)partObj);
+ // Get the exception toString part.
+ // This was added for c++ thin client and not used in java
+ //Part exceptionToStringPart = msg.getPart(1);
+ }
+ else {
+ if (partObj instanceof VersionedObjectList) {
+ if (serverEntries == null) {
+ serverEntries = new VersionedObjectList(true);
+ }
+ ((VersionedObjectList)partObj).replaceNullIDs(con.getEndpoint().getMemberId());
+ // serverEntries.addAll((VersionedObjectList)partObj);
+ list.clear();
+ list.add(partObj);
+
+ if(r != null) {
+ try {
+ r.refreshEntriesFromServerKeys(con, listOfList, InterestResultPolicy.KEYS_VALUES);
+ }catch(Exception ex) {
+ // GemFireCacheImpl.getInstance().getLogger().config("hitesh error2 " + ex.getClass());
+ }
+ }
+ } else {
+ // Add the result to the list of results
+ serverKeys.add((List)partObj);
+ }
+ }
+
+ } while (!msg.isLastChunk());
+ if (serverEntries != null) {
+ list.clear();
+ list.add(serverEntries); // serverEntries will always be empty.
+ return listOfList;
+ }
+ return serverKeys;
+ }
+ case MessageType.RESPONSE_FROM_SECONDARY:
+ // Read the chunk
+ msg.receiveChunk();
+ return null;
+ case MessageType.EXCEPTION:
+ // Read the chunk
+ msg.receiveChunk();
+ // Deserialize the result
+ Part part = msg.getPart(0);
+ // Get the exception toString part.
+ // This was added for c++ thin client and not used in java
+ //Part exceptionToStringPart = msg.getPart(1);
+ Object obj = part.getObject();
+ {
+ String s = this + ": While performing a remote " + getOpName();
+ throw new ServerOperationException(s, (Throwable) obj);
+ }
+ case MessageType.REGISTER_INTEREST_DATA_ERROR:
+ // Read the chunk
+ msg.receiveChunk();
+
+ // Deserialize the result
+ String errorMessage = msg.getPart(0).getString();
+ String s = this + ": While performing a remote " + getOpName() + ": ";
+ throw new ServerOperationException(s + errorMessage);
+ default:
+ throw new InternalGemFireError("Unknown message type "
+ + msg.getMessageType());
+ }
+ }
+ protected String getOpName() {
+ return "registerInterest";
+ }
+ @Override
+ protected boolean isErrorResponse(int msgType) {
+ return msgType == MessageType.REGISTER_INTEREST_DATA_ERROR;
+ }
+ @Override
+ protected long startAttempt(ConnectionStats stats) {
+ return stats.startRegisterInterest();
+ }
+ @Override
+ protected void endSendAttempt(ConnectionStats stats, long start) {
+ stats.endRegisterInterestSend(start, hasFailed());
+ }
+ @Override
+ protected void endAttempt(ConnectionStats stats, long start) {
+ stats.endRegisterInterest(start, hasTimedOut(), hasFailed());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterInterestTracker.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterInterestTracker.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterInterestTracker.java
new file mode 100644
index 0000000..9d67251
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterInterestTracker.java
@@ -0,0 +1,410 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.cache.InterestResultPolicy;
+import com.gemstone.gemfire.cache.query.internal.cq.InternalCqQuery;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.tier.InterestType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.UnregisterAllInterest;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+
+/**
+ * Used to keep track of what interest a client has registered.
+ * This code was extracted from the old ConnectionProxyImpl.
+ * @author darrel
+ * @since 5.7
+ */
+public class RegisterInterestTracker {
+ private static final Logger logger = LogService.getLogger();
+
+ public final static int interestListIndex = 0;
+ public final static int durableInterestListIndex = 1;
+ public final static int interestListIndexForUpdatesAsInvalidates = 2;
+ public final static int durableInterestListIndexForUpdatesAsInvalidates = 3;
+
+ private final FailoverInterestList[] fils = new FailoverInterestList[4];
+
+ /** Manages CQs */
+ private final ConcurrentMap cqs /* <CqQuery,Boolean> */= new ConcurrentHashMap();
+
+ public RegisterInterestTracker() {
+ this.fils[interestListIndex] = new FailoverInterestList();
+ this.fils[interestListIndexForUpdatesAsInvalidates] = new FailoverInterestList();
+ this.fils[durableInterestListIndex] = new FailoverInterestList();
+ this.fils[durableInterestListIndexForUpdatesAsInvalidates] = new FailoverInterestList();
+ }
+
+ public static int getInterestLookupIndex(boolean isDurable, boolean receiveUpdatesAsInvalidates) {
+ if (isDurable) {
+ if (receiveUpdatesAsInvalidates) {
+ return durableInterestListIndexForUpdatesAsInvalidates;
+ } else {
+ return durableInterestListIndex;
+ }
+ } else {
+ if (receiveUpdatesAsInvalidates) {
+ return interestListIndexForUpdatesAsInvalidates;
+ } else {
+ return interestListIndex;
+ }
+ }
+ }
+
+ public List getInterestList(String regionName, int interestType)
+ {
+ RegionInterestEntry rie1 = readRegionInterests(regionName, interestType, false, false);
+ RegionInterestEntry rie2 = readRegionInterests(regionName, interestType, false, true);
+ RegionInterestEntry rie3 = readRegionInterests(regionName, interestType, true, false);
+ RegionInterestEntry rie4 = readRegionInterests(regionName, interestType, true, true);
+
+ ArrayList result = new ArrayList();
+
+ if (rie1 != null) {
+ result.addAll(rie1.getInterests().keySet());
+ }
+
+ if (rie2 != null) {
+ result.addAll(rie2.getInterests().keySet());
+ }
+
+ if (rie3 != null) {
+ result.addAll(rie3.getInterests().keySet());
+ }
+
+ if (rie4 != null) {
+ result.addAll(rie4.getInterests().keySet());
+ }
+
+ return result;
+ }
+
+ public void addSingleInterest(LocalRegion r, Object key, int interestType,
+ InterestResultPolicy pol, boolean isDurable, boolean receiveUpdatesAsInvalidates)
+ {
+ RegionInterestEntry rie = getRegionInterests(r, interestType, false,
+ isDurable, receiveUpdatesAsInvalidates);
+ rie.getInterests().put(key, pol);
+ }
+
+ public boolean removeSingleInterest(LocalRegion r, Object key,
+ int interestType, boolean isDurable, boolean receiveUpdatesAsInvalidates)
+ {
+ RegionInterestEntry rie = getRegionInterests(r, interestType, true,
+ isDurable, receiveUpdatesAsInvalidates);
+ if (rie == null) {
+ return false;
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("removeSingleInterest region={} key={}", r.getFullPath(), key);
+ }
+ Object interest = rie.getInterests().remove(key);
+ if (interest == null) {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.RegisterInterestTracker_REMOVESINGLEINTEREST_KEY_0_NOT_REGISTERED_IN_THE_CLIENT, key));
+ return false;
+ }
+ else {
+ return true;
+ }
+ //return rie.getInterests().remove(key) != null;
+ }
+
+ public void addInterestList(LocalRegion r, List keys,
+ InterestResultPolicy pol, boolean isDurable, boolean receiveUpdatesAsInvalidates)
+ {
+ RegionInterestEntry rie = getRegionInterests(r, InterestType.KEY, false,
+ isDurable, receiveUpdatesAsInvalidates);
+ for (int i = 0; i < keys.size(); i++) {
+ rie.getInterests().put(keys.get(i), pol);
+ }
+ }
+
+ public void addCq(InternalCqQuery cqi, boolean isDurable)
+ {
+ this.cqs.put(cqi, Boolean.valueOf(isDurable));
+ /*
+ RegionInterestEntry rie = getRegionInterests(r, InterestType.CQ, false, isDurable);
+ rie.getInterests().put(cqi.getName(), cqi);
+ */
+ }
+
+ public void removeCq(InternalCqQuery cqi, boolean isDurable)
+ {
+ this.cqs.remove(cqi);
+ /*
+ RegionInterestEntry rie = getRegionInterests(r, InterestType.CQ, false, isDurable);
+ rie.getInterests().remove(cqi.getName());
+ */
+ }
+
+ public Map getCqsMap(){
+ return this.cqs;
+ }
+
+ /**
+ * Unregisters everything registered on the given region name
+ */
+ public void unregisterRegion(ServerRegionProxy srp,
+ boolean keepalive) {
+ removeAllInterests(srp, InterestType.KEY, false, keepalive, false);
+ removeAllInterests(srp, InterestType.FILTER_CLASS, false, keepalive, false);
+ removeAllInterests(srp, InterestType.OQL_QUERY, false, keepalive, false);
+ removeAllInterests(srp, InterestType.REGULAR_EXPRESSION, false, keepalive, false);
+ removeAllInterests(srp, InterestType.KEY, false, keepalive, true);
+ removeAllInterests(srp, InterestType.FILTER_CLASS, false, keepalive, true);
+ removeAllInterests(srp, InterestType.OQL_QUERY, false, keepalive, true);
+ removeAllInterests(srp, InterestType.REGULAR_EXPRESSION, false, keepalive, true);
+ //durable
+ if (srp.getPool().isDurableClient()) {
+ removeAllInterests(srp, InterestType.KEY, true, keepalive, true);
+ removeAllInterests(srp, InterestType.FILTER_CLASS, true, keepalive, true);
+ removeAllInterests(srp, InterestType.OQL_QUERY, true, keepalive, true);
+ removeAllInterests(srp, InterestType.REGULAR_EXPRESSION, true, keepalive, true);
+ removeAllInterests(srp, InterestType.KEY, true, keepalive, false);
+ removeAllInterests(srp, InterestType.FILTER_CLASS, true, keepalive, false);
+ removeAllInterests(srp, InterestType.OQL_QUERY, true, keepalive, false);
+ removeAllInterests(srp, InterestType.REGULAR_EXPRESSION, true, keepalive, false);
+ }
+ }
+
+ /**
+ * Remove all interests of a given type on the given proxy's region.
+ * @param interestType
+ * the interest type
+ * @param durable
+ * a boolean stating whether to remove durable or non-durable registrations
+ */
+ private void removeAllInterests(ServerRegionProxy srp, int interestType,
+ boolean durable, boolean keepAlive, boolean receiveUpdatesAsInvalidates)
+ {
+ String regName = srp.getRegionName();
+ ConcurrentMap allInterests = getRegionToInterestsMap(interestType, durable, receiveUpdatesAsInvalidates);
+ if (allInterests.remove(regName) != null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("removeAllInterests region={} type={}", regName, InterestType.getString(interestType));
+ }
+ try {
+ // fix bug 35680 by using a UnregisterAllInterest token
+ Object key = UnregisterAllInterest.singleton();
+ // we have already cleaned up the tracker so send the op directly
+ UnregisterInterestOp.execute(srp.getPool(), regName, key, interestType,
+ true/*isClosing*/, keepAlive);
+ }
+ catch (Exception e) {
+ if(srp.getPool().getCancelCriterion().cancelInProgress() == null) {
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.RegisterInterestTracker_PROBLEM_REMOVING_ALL_INTEREST_ON_REGION_0_INTERESTTYPE_1_2,
+ new Object[] {regName, InterestType.getString(interestType), e.getLocalizedMessage()}));
+ }
+ }
+ }
+ }
+
+ public boolean removeInterestList(LocalRegion r, List keys, boolean isDurable,
+ boolean receiveUpdatesAsInvalidates)
+ {
+ RegionInterestEntry rie = getRegionInterests(r, InterestType.KEY, true,
+ isDurable, receiveUpdatesAsInvalidates);
+ if (rie == null) {
+ return false;
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("removeInterestList region={} keys={}", r.getFullPath(), keys);
+ }
+ int removeCount = 0;
+ for (int i = 0; i < keys.size(); i++) {
+ Object key = keys.get(i);
+ Object interest = rie.getInterests().remove(key);
+ if (interest != null) {
+ removeCount++;
+ }
+ else {
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.RegisterInterestTracker_REMOVEINTERESTLIST_KEY_0_NOT_REGISTERED_IN_THE_CLIENT,
+ key));
+ }
+ }
+ return removeCount != 0;
+ }
+
+ /**
+ * Return keys of interest for a given region. The keys in this Map are the
+ * full names of the regions. The values are instances of RegionInterestEntry.
+ *
+ * @param interestType
+ * the type to return
+ * @return the map
+ */
+ public ConcurrentMap getRegionToInterestsMap(int interestType, boolean isDurable,
+ boolean receiveUpdatesAsInvalidates)
+ {
+ FailoverInterestList fil =
+ this.fils[getInterestLookupIndex(isDurable, receiveUpdatesAsInvalidates)];
+
+ ConcurrentMap mapOfInterest = null;
+
+ switch (interestType) {
+ case InterestType.KEY:
+ mapOfInterest = fil.keysOfInterest;
+ break;
+ case InterestType.REGULAR_EXPRESSION:
+ mapOfInterest = fil.regexOfInterest;
+ break;
+ case InterestType.FILTER_CLASS:
+ mapOfInterest = fil.filtersOfInterest;
+ break;
+ case InterestType.CQ:
+ mapOfInterest = fil.cqsOfInterest;
+ break;
+ case InterestType.OQL_QUERY:
+ mapOfInterest = fil.queriesOfInterest;
+ break;
+ default:
+ throw new InternalGemFireError("Unknown interestType");
+ }
+ return mapOfInterest;
+ }
+
+ /**
+ * Return the RegionInterestEntry for the given region. Create one if none
+ * exists and forRemoval is false.
+ *
+ * @param r
+ * specified region
+ * @param interestType
+ * desired interest type
+ * @param forRemoval
+ * true if calls wants one for removal
+ * @return the entry or null if none exists and forRemoval is true.
+ */
+ private RegionInterestEntry getRegionInterests(LocalRegion r,
+ int interestType,
+ boolean forRemoval,
+ boolean isDurable,
+ boolean receiveUpdatesAsInvalidates)
+ {
+ final String regionName = r.getFullPath();
+ ConcurrentMap mapOfInterest = getRegionToInterestsMap(interestType, isDurable, receiveUpdatesAsInvalidates);
+ RegionInterestEntry result = (RegionInterestEntry)
+ mapOfInterest.get(regionName);
+ if (result == null && !forRemoval) {
+ RegionInterestEntry rie = new RegionInterestEntry(r);
+ result = (RegionInterestEntry)mapOfInterest.putIfAbsent(regionName, rie);
+ if (result == null) {
+ result = rie;
+ }
+ }
+ return result;
+ }
+ private RegionInterestEntry readRegionInterests(String regionName,
+ int interestType,
+ boolean isDurable,
+ boolean receiveUpdatesAsInvalidates)
+ {
+ ConcurrentMap mapOfInterest = getRegionToInterestsMap(interestType, isDurable, receiveUpdatesAsInvalidates);
+ return (RegionInterestEntry)mapOfInterest.get(regionName);
+ }
+
+ /**
+ * A Holder object for client's register interest, this is required when
+ * a client fails over to another server and does register interest based on
+ * this Data structure
+ *
+ * @author Yogesh Mahajan
+ * @since 5.5
+ *
+ */
+ static protected class FailoverInterestList {
+ /**
+ * Record of enumerated keys of interest.
+ *
+ * This list is maintained here in case an endpoint (server) bounces. In that
+ * case, a message will be sent to the endpoint as soon as it has restarted.
+ *
+ * The keys in this Map are the full names of the regions. The values are
+ * instances of {@link RegionInterestEntry}.
+ */
+ final ConcurrentMap keysOfInterest = new ConcurrentHashMap();
+
+ /**
+ * Record of regular expression keys of interest.
+ *
+ * This list is maintained here in case an endpoint (server) bounces. In that
+ * case, a message will be sent to the endpoint as soon as it has restarted.
+ *
+ * The keys in this Map are the full names of the regions. The values are
+ * instances of {@link RegionInterestEntry}.
+ */
+ final ConcurrentMap regexOfInterest = new ConcurrentHashMap();
+
+ /**
+ * Record of filtered keys of interest.
+ *
+ * This list is maintained here in case an endpoint (server) bounces. In that
+ * case, a message will be sent to the endpoint as soon as it has restarted.
+ *
+ * The keys in this Map are the full names of the regions. The values are
+ * instances of {@link RegionInterestEntry}.
+ */
+ final ConcurrentMap filtersOfInterest = new ConcurrentHashMap();
+
+ /**
+ * Record of QOL keys of interest.
+ *
+ * This list is maintained here in case an endpoint (server) bounces. In that
+ * case, a message will be sent to the endpoint as soon as it has restarted.
+ *
+ * The keys in this Map are the full names of the regions. The values are
+ * instances of {@link RegionInterestEntry}.
+ */
+ final ConcurrentMap queriesOfInterest = new ConcurrentHashMap();
+
+ /**
+ * Record of registered CQs
+ *
+ */
+ final ConcurrentMap cqsOfInterest = new ConcurrentHashMap();
+ }
+
+ /**
+ * Description of the interests of a particular region.
+ *
+ * @author jpenney
+ *
+ */
+ static public class RegionInterestEntry
+ {
+ private final LocalRegion region;
+
+ private final ConcurrentMap interests;
+
+ RegionInterestEntry(LocalRegion r) {
+ this.region = r;
+ this.interests = new ConcurrentHashMap();
+ }
+
+ public LocalRegion getRegion() {
+ return this.region;
+ }
+
+ public ConcurrentMap getInterests() {
+ return this.interests;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RemoveAllOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RemoveAllOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RemoveAllOp.java
new file mode 100644
index 0000000..b4bf64d
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RemoveAllOp.java
@@ -0,0 +1,382 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.CacheClosedException;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.cache.EventID;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.PutAllPartialResultException;
+import com.gemstone.gemfire.internal.cache.PutAllPartialResultException.PutAllPartialResult;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ChunkedMessage;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
+import com.gemstone.gemfire.internal.cache.tier.sockets.VersionedObjectList;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+/**
+ * Does a region removeAll on a server
+ * @author darrel
+ * @since 8.1
+ */
+public class RemoveAllOp {
+
+ public static final Logger logger = LogService.getLogger();
+
+ public static final int FLAG_EMPTY = 0x01;
+ public static final int FLAG_CONCURRENCY_CHECKS = 0x02;
+
+
+ /**
+ * Does a region removeAll on a server using connections from the given pool
+ * to communicate with the server.
+ * @param pool the pool to use to communicate with the server.
+ * @param region the name of the region to do the removeAll on
+ * @param keys Collection of keys to remove
+ * @param eventId the event id for this op
+ */
+ public static VersionedObjectList execute(ExecutablePool pool,
+ Region region,
+ Collection<Object> keys,
+ EventID eventId,
+ boolean isRetry, Object callbackArg)
+ {
+ RemoveAllOpImpl op = new RemoveAllOpImpl(region, keys,
+ eventId, ((PoolImpl)pool).getPRSingleHopEnabled(), callbackArg);
+ op.initMessagePart();
+ if(isRetry) {
+ op.getMessage().setIsRetry();
+ }
+ return (VersionedObjectList)pool.execute(op);
+ }
+
+ /**
+ * Does a region put on a server using connections from the given pool
+ * to communicate with the server.
+ * @param pool the pool to use to communicate with the server.
+ * @param region the name of the region to do the removeAll on
+ * @param keys the Collection of keys to remove
+ * @param eventId the event id for this removeAll
+ */
+ public static VersionedObjectList execute(ExecutablePool pool,
+ Region region,
+ Collection<Object> keys,
+ EventID eventId,
+ int retryAttempts, Object callbackArg)
+ {
+ final boolean isDebugEnabled = logger.isDebugEnabled();
+ ClientMetadataService cms = ((LocalRegion)region).getCache()
+ .getClientMetadataService();
+
+ Map<ServerLocation, HashSet> serverToFilterMap = cms.getServerToFilterMap(
+ keys, region, true);
+
+ if (serverToFilterMap == null || serverToFilterMap.isEmpty()) {
+ AbstractOp op = new RemoveAllOpImpl(region, keys,
+ eventId, ((PoolImpl)pool).getPRSingleHopEnabled(), callbackArg);
+ op.initMessagePart();
+ return (VersionedObjectList)pool.execute(op);
+ }
+
+ List callableTasks = constructAndGetRemoveAllTasks(region,
+ eventId, serverToFilterMap, (PoolImpl)pool, callbackArg);
+
+ if (isDebugEnabled) {
+ logger.debug("RemoveAllOp#execute : Number of removeAll tasks is :{}", callableTasks.size());
+ }
+ HashMap<ServerLocation, RuntimeException> failedServers = new HashMap<ServerLocation,RuntimeException>();
+ PutAllPartialResult result = new PutAllPartialResult(keys.size());
+ try {
+ Map<ServerLocation, Object> results = SingleHopClientExecutor
+ .submitBulkOp(callableTasks, cms, (LocalRegion)region, failedServers);
+ for (Map.Entry<ServerLocation, Object> entry: results.entrySet()) {
+ Object value = entry.getValue();
+ if (value instanceof PutAllPartialResultException) {
+ PutAllPartialResultException pap = (PutAllPartialResultException)value;
+ if (isDebugEnabled) {
+ logger.debug("RemoveAll SingleHop encountered BulkOpPartialResultException exception: {}, failedServers are {}", pap, failedServers.keySet());
+ }
+ result.consolidate(pap.getResult());
+ } else {
+ if (value != null) {
+ VersionedObjectList list = (VersionedObjectList)value;
+ result.addKeysAndVersions(list);
+ }
+ }
+ }
+ } catch (RuntimeException ex) {
+ logger.debug("single-hop removeAll encountered unexpected exception: {}",ex);
+ throw ex;
+ }
+
+ if (!failedServers.isEmpty()) {
+ if (retryAttempts == 0) {
+ throw failedServers.values().iterator().next();
+ }
+
+ // if the partial result set doesn't already have keys (for tracking version tags)
+ // then we need to gather up the keys that we know have succeeded so far and
+ // add them to the partial result set
+ if (result.getSucceededKeysAndVersions().size() == 0) {
+ // if there're failed servers, we need to save the succeed keys in submitRemoveAll
+ // if retry succeeded, everything is ok, otherwise, the saved "succeeded
+ // keys" should be consolidated into PutAllPartialResultException
+ // succeedKeySet is used to send back to client in PartialResult case
+ // so it's not a must to use LinkedHashSet
+ Set succeedKeySet = new LinkedHashSet();
+ Set<ServerLocation> serverSet = serverToFilterMap.keySet();
+ for (ServerLocation server : serverSet) {
+ if (!failedServers.containsKey(server)) {
+ succeedKeySet.addAll(serverToFilterMap.get(server));
+ }
+ }
+
+ // save succeedKeys, but if retries all succeeded, discard the PutAllPartialResult
+ result.addKeys(succeedKeySet);
+ }
+
+ // send maps for the failed servers one by one instead of merging
+ // them into one big map. The reason is, we have to keep the same event
+ // ids for each sub map. There is a unit test in PutAllCSDUnitTest for
+ // the otherwise case.
+ boolean oneSubMapRetryFailed = false;
+ Set<ServerLocation> failedServerSet = failedServers.keySet();
+ for (ServerLocation failedServer : failedServerSet) {
+ // Throwable failedServers.values().iterator().next();
+ RuntimeException savedRTE = failedServers.get(failedServer);
+ if (savedRTE instanceof PutAllPartialResultException) {
+ // will not retry for BulkOpPartialResultException
+ // but it means at least one sub map ever failed
+ oneSubMapRetryFailed = true;
+ continue;
+ }
+ Collection<Object> newKeys = serverToFilterMap.get(failedServer);
+ try {
+ VersionedObjectList v = RemoveAllOp.execute(pool, region, newKeys, eventId, true, callbackArg);
+ if (v == null) {
+ result.addKeys(newKeys);
+ } else {
+ result.addKeysAndVersions(v);
+ }
+ } catch (PutAllPartialResultException pre) {
+ oneSubMapRetryFailed = true;
+ logger.debug("Retry failed with BulkOpPartialResultException: {} Before retry: {}", pre, result.getKeyListString());
+ result.consolidate(pre.getResult());
+ } catch (Exception rte) {
+ oneSubMapRetryFailed = true;
+ Object firstKey = newKeys.iterator().next();
+ result.saveFailedKey(firstKey, rte);
+ }
+ } // for failedServer
+
+ // If all retries succeeded, the PRE in first tries can be ignored
+ if (oneSubMapRetryFailed && result.hasFailure()) {
+ PutAllPartialResultException pre = new PutAllPartialResultException(result);
+ throw pre;
+ }
+ } // failedServers!=null
+
+ return result.getSucceededKeysAndVersions();
+ }
+
+ private RemoveAllOp() {
+ // no instances allowed
+ }
+
+
+ static List constructAndGetRemoveAllTasks(Region region,
+ final EventID eventId,
+ final Map<ServerLocation, HashSet> serverToFilterMap,
+ final PoolImpl pool, Object callbackArg) {
+ final List<SingleHopOperationCallable> tasks = new ArrayList<SingleHopOperationCallable>();
+ ArrayList<ServerLocation> servers = new ArrayList<ServerLocation>(
+ serverToFilterMap.keySet());
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Constructing tasks for the servers{}", servers);
+ }
+ for (ServerLocation server : servers) {
+ AbstractOp RemoveAllOp = new RemoveAllOpImpl(region,
+ serverToFilterMap.get(server), eventId, true, callbackArg);
+
+ SingleHopOperationCallable task = new SingleHopOperationCallable(
+ new ServerLocation(server.getHostName(), server.getPort()), pool,
+ RemoveAllOp,UserAttributes.userAttributes.get());
+ tasks.add(task);
+ }
+ return tasks;
+ }
+
+ private static class RemoveAllOpImpl extends AbstractOp {
+
+ private boolean prSingleHopEnabled = false;
+
+ private LocalRegion region = null;
+
+ private Collection<Object> keys = null;
+ private final Object callbackArg;
+
+ /**
+ * @throws com.gemstone.gemfire.SerializationException if serialization fails
+ */
+ public RemoveAllOpImpl(Region region, Collection<Object> keys,
+ EventID eventId, boolean prSingleHopEnabled, Object callbackArg) {
+ super(MessageType.REMOVE_ALL, 5 + keys.size());
+ this.prSingleHopEnabled = prSingleHopEnabled;
+ this.region = (LocalRegion)region;
+ getMessage().addStringPart(region.getFullPath());
+ getMessage().addBytesPart(eventId.calcBytes());
+ this.keys = keys;
+ this.callbackArg = callbackArg;
+ }
+
+ @Override
+ protected void initMessagePart() {
+ int size = keys.size();
+ int flags = 0;
+ if (region.getDataPolicy() == DataPolicy.EMPTY) {
+ flags |= FLAG_EMPTY;
+ }
+ if (region.getConcurrencyChecksEnabled()) {
+ flags |= FLAG_CONCURRENCY_CHECKS;
+ }
+ getMessage().addIntPart(flags);
+ getMessage().addObjPart(this.callbackArg);
+ getMessage().addIntPart(size);
+
+ for (Object key: this.keys) {
+ getMessage().addStringOrObjPart(key);
+ }
+ }
+ @Override
+ protected Message createResponseMessage() {
+ return new ChunkedMessage(2, Version.CURRENT);
+ }
+
+ @Override
+ protected Object processResponse(Message msg) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected Object processResponse(final Message msg, final Connection con) throws Exception {
+ final VersionedObjectList result = new VersionedObjectList();
+ final Exception[] exceptionRef = new Exception[1];
+ final boolean isDebugEnabled = logger.isDebugEnabled();
+ try {
+ processChunkedResponse((ChunkedMessage)msg,
+ "removeAll",
+ new ChunkHandler() {
+ public void handle(ChunkedMessage cm) throws Exception {
+ int numParts = msg.getNumberOfParts();
+ if (isDebugEnabled) {
+ logger.debug("RemoveAllOp.processChunkedResponse processing message with {} parts", numParts);
+ }
+ for (int partNo=0; partNo < numParts; partNo++) {
+ Part part = cm.getPart(partNo);
+ try {
+ Object o = part.getObject();
+ if (isDebugEnabled) {
+ logger.debug("part({}) contained {}", partNo, o);
+ }
+ if (o == null) {
+ // no response is an okay response
+ } else if (o instanceof byte[]) {
+ if (prSingleHopEnabled) {
+ byte[] bytesReceived = part.getSerializedForm();
+ if (/*bytesReceived.length==1 &&*/ bytesReceived[0] != ClientMetadataService.INITIAL_VERSION) { // nw hop
+ if (region != null) {
+ ClientMetadataService cms;
+ try {
+ cms = region.getCache().getClientMetadataService();
+ cms.scheduleGetPRMetaData(region, false,bytesReceived[1]);
+ }
+ catch (CacheClosedException e) {
+ }
+ }
+ }
+ }
+ } else if (o instanceof Throwable) {
+ String s = "While performing a remote removeAll";
+ exceptionRef[0] = new ServerOperationException(s, (Throwable)o);
+ } else {
+ VersionedObjectList chunk = (VersionedObjectList)o;
+ chunk.replaceNullIDs(con.getEndpoint().getMemberId());
+ result.addAll(chunk);
+ }
+ } catch(Exception e) {
+ exceptionRef[0] = new ServerOperationException("Unable to deserialize value" , e);
+ }
+ }
+ }
+ });
+ } catch (ServerOperationException e) {
+ if (e.getCause() instanceof PutAllPartialResultException) {
+ PutAllPartialResultException cause = (PutAllPartialResultException)e.getCause();
+ cause.getSucceededKeysAndVersions().replaceNullIDs(con.getEndpoint().getMemberId());
+ throw cause;
+ } else {
+ throw e;
+ }
+ }
+ if (exceptionRef[0] != null) {
+ throw exceptionRef[0];
+ } else {
+ // v7.0.1: fill in the keys
+ if (result.hasVersions() && result.getKeys().isEmpty()) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("setting keys of response to {}", this.keys);
+ }
+ ArrayList<Object> tmpKeys;
+ if (this.keys instanceof ArrayList) {
+ tmpKeys = (ArrayList<Object>) this.keys;
+ } else {
+ tmpKeys = new ArrayList<Object>(this.keys);
+ }
+ result.setKeys(tmpKeys);
+ }
+ }
+ return result;
+ }
+
+ @Override
+ protected boolean isErrorResponse(int msgType) {
+ return msgType == MessageType.PUT_DATA_ERROR;
+ }
+
+ @Override
+ protected long startAttempt(ConnectionStats stats) {
+ return stats.startRemoveAll();
+ }
+ @Override
+ protected void endSendAttempt(ConnectionStats stats, long start) {
+ stats.endRemoveAllSend(start, hasFailed());
+ }
+ @Override
+ protected void endAttempt(ConnectionStats stats, long start) {
+ stats.endRemoveAll(start, hasTimedOut(), hasFailed());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RollbackOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RollbackOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RollbackOp.java
new file mode 100644
index 0000000..35c4494
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RollbackOp.java
@@ -0,0 +1,90 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+
+/**
+ * Does a Rollback on the server
+ * @since 6.6
+ * @author sbawaska
+ */
+public class RollbackOp {
+
+ /**
+ * Does a rollback on the server for given transaction
+ * @param pool the pool to use to communicate with the server.
+ * @param txId the id of the transaction to rollback
+ */
+ public static void execute(ExecutablePool pool, int txId) {
+ RollbackOpImpl op = new RollbackOpImpl(txId);
+ pool.execute(op);
+ }
+
+ private RollbackOp() {
+ // no instance allowed
+ }
+
+ private static class RollbackOpImpl extends AbstractOp {
+ private int txId;
+
+ protected RollbackOpImpl(int txId) {
+ super(MessageType.ROLLBACK, 1);
+ getMessage().setTransactionId(txId);
+ this.txId = txId;
+ }
+
+ @Override
+ public String toString() {
+ return "Rollback(txId="+this.txId+")";
+ }
+
+ @Override
+ protected Object processResponse(Message msg) throws Exception {
+ processAck(msg, "rollback");
+ return null;
+ }
+
+ @Override
+ protected boolean isErrorResponse(int msgType) {
+ return msgType == MessageType.EXCEPTION;
+ }
+
+ @Override
+ protected long startAttempt(ConnectionStats stats) {
+ return stats.startRollback();
+ }
+
+ @Override
+ protected void endSendAttempt(ConnectionStats stats, long start) {
+ stats.endRollbackSend(start, hasFailed());
+ }
+
+ @Override
+ protected void endAttempt(ConnectionStats stats, long start) {
+ stats.endRollback(start, hasTimedOut(), hasFailed());
+ }
+
+ @Override
+ protected void processSecureBytes(Connection cnx, Message message)
+ throws Exception {
+ }
+
+ @Override
+ protected boolean needsUserId() {
+ return false;
+ }
+
+ @Override
+ protected void sendMessage(Connection cnx) throws Exception {
+ getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
+ getMessage().send(false);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ServerBlackList.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ServerBlackList.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ServerBlackList.java
new file mode 100644
index 0000000..aec202d
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ServerBlackList.java
@@ -0,0 +1,179 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.client.internal.PoolImpl.PoolTask;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+/**
+ * This class is designed to prevent the client from spinning
+ * and reconnected to the same failed server over and over.
+ * We've removed the old dead server monitor code because
+ * the locator is supposed to keep track of what servers are
+ * alive or dead. However, there is still the possibility that the locator
+ * may tell us a server is alive but we are unable to reach it.
+ *
+ * This class keeps track of the number of consecutive failures
+ * that happen to on each server. If the number of failures exceeds the limit,
+ * the server is added to a blacklist for a certain period of time. After
+ * the time is expired, the server comes off the blacklist, but the next
+ * failure will put the server back on the list for a longer period of time.
+ *
+ * @author dsmith
+ *
+ */
+public class ServerBlackList {
+
+ private static final Logger logger = LogService.getLogger();
+
+ private final Map/*<ServerLocation, AI>*/ failureTrackerMap = new HashMap();
+ protected final Set blacklist = new CopyOnWriteArraySet();
+ private final Set unmodifiableBlacklist = Collections.unmodifiableSet(blacklist);
+ protected ScheduledExecutorService background;
+ protected final ListenerBroadcaster broadcaster = new ListenerBroadcaster();
+
+ //not final for tests.
+ static int THRESHOLD = Integer.getInteger("gemfire.ServerBlackList.THRESHOLD", 3).intValue();
+ protected final long pingInterval;
+
+ public ServerBlackList(long pingInterval) {
+ this.pingInterval = pingInterval;
+ }
+
+ public void start(ScheduledExecutorService background) {
+ this.background = background;
+ }
+
+ FailureTracker getFailureTracker(ServerLocation location) {
+ FailureTracker failureTracker;
+ synchronized(failureTrackerMap) {
+ failureTracker = (FailureTracker) failureTrackerMap.get(location);
+ if(failureTracker == null) {
+ failureTracker = new FailureTracker(location);
+ failureTrackerMap.put(location, failureTracker);
+ }
+ }
+
+ return failureTracker;
+ }
+
+ public Set getBadServers() {
+ return unmodifiableBlacklist;
+ }
+
+ public class FailureTracker {
+ private final AtomicInteger consecutiveFailures = new AtomicInteger();
+ private final ServerLocation location;
+
+ public FailureTracker(ServerLocation location) {
+ this.location = location;
+ }
+
+ public void reset() {
+ consecutiveFailures.set(0);
+ }
+
+ public void addFailure() {
+ if(blacklist.contains(location)) {
+ //A second failure must have happened before we added
+ //this server to the blacklist. Don't count that failure.
+ return;
+ }
+ long failures = consecutiveFailures.incrementAndGet();
+ if(failures >= THRESHOLD) {
+ if(logger.isDebugEnabled()) {
+ logger.debug("Blacklisting server {} for {}ms because it had {} consecutive failures", location, pingInterval, failures);
+ }
+ blacklist.add(location);
+ broadcaster.serverAdded(location);
+ try {
+ background.schedule(new ExpireBlackListTask(location), pingInterval, TimeUnit.MILLISECONDS);
+ } catch(RejectedExecutionException e) {
+ //ignore, the timer has been cancelled, which means we're shutting down.
+ }
+
+ }
+ }
+ }
+
+ public void addListener(BlackListListener blackListListener) {
+ broadcaster.listeners.add(blackListListener);
+ }
+
+ public void removeListener(BlackListListener blackListListener) {
+ broadcaster.listeners.remove(blackListListener);
+ }
+
+
+
+ private class ExpireBlackListTask extends PoolTask {
+ private ServerLocation location;
+
+ public ExpireBlackListTask(ServerLocation location) {
+ this.location = location;
+ }
+
+ @Override
+ public void run2() {
+ if(logger.isDebugEnabled()) {
+ logger.debug("{} is no longer blacklisted", location);
+ }
+ blacklist.remove(location);
+ broadcaster.serverRemoved(location);
+ }
+ }
+
+ public static interface BlackListListener {
+ public void serverAdded(ServerLocation location);
+
+ public void serverRemoved(ServerLocation location);
+ }
+
+ public static class BlackListListenerAdapter implements BlackListListener {
+ public void serverAdded(ServerLocation location) {
+ //do nothing
+ }
+
+ public void serverRemoved(ServerLocation location) {
+ //do nothing
+ }
+ }
+
+ protected static class ListenerBroadcaster implements BlackListListener {
+
+ protected Set listeners = new CopyOnWriteArraySet();
+
+ public void serverAdded(ServerLocation location) {
+ for(Iterator itr = listeners.iterator(); itr.hasNext(); ) {
+ BlackListListener listener = (BlackListListener) itr.next();
+ listener.serverAdded(location);
+ }
+ }
+
+ public void serverRemoved(ServerLocation location) {
+ for(Iterator itr = listeners.iterator(); itr.hasNext(); ) {
+ BlackListListener listener = (BlackListListener) itr.next();
+ listener.serverRemoved(location);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ServerProxy.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ServerProxy.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ServerProxy.java
new file mode 100644
index 0000000..a428e01
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ServerProxy.java
@@ -0,0 +1,60 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.List;
+
+import com.gemstone.gemfire.cache.query.SelectResults;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+
+/**
+ * Used to send operations from a client to a server.
+ * @author darrel
+ * @since 5.7
+ */
+public class ServerProxy {
+ protected final InternalPool pool;
+ /**
+ * Creates a server proxy for the given pool.
+ * @param pool the pool that this proxy will use to communicate with servers
+ */
+ public ServerProxy(InternalPool pool) {
+ this.pool = pool;
+ }
+ /**
+ * Returns the pool the proxy is using.
+ */
+ public InternalPool getPool() {
+ return this.pool;
+ }
+ /**
+ * Release use of this pool
+ */
+ public void detach() {
+ this.pool.detach();
+ }
+
+ /**
+ * Ping the specified server to see if it is still alive
+ * @param server the server to do the execution on
+ */
+ public void ping(ServerLocation server) {
+ PingOp.execute(this.pool, server);
+ }
+ /**
+ * Does a query on a server
+ * @param queryPredicate A query language boolean query predicate
+ * @return A <code>SelectResults</code> containing the values
+ * that match the <code>queryPredicate</code>.
+ */
+ public SelectResults query(String queryPredicate, Object[] queryParams)
+ {
+ return QueryOp.execute(this.pool, queryPredicate, queryParams);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ServerRegionDataAccess.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ServerRegionDataAccess.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ServerRegionDataAccess.java
new file mode 100644
index 0000000..0787a03
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ServerRegionDataAccess.java
@@ -0,0 +1,134 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.Region.Entry;
+import com.gemstone.gemfire.internal.cache.EntryEventImpl;
+import com.gemstone.gemfire.internal.cache.EntrySnapshot;
+import com.gemstone.gemfire.internal.cache.EventID;
+import com.gemstone.gemfire.internal.cache.tier.sockets.VersionedObjectList;
+import com.gemstone.gemfire.internal.cache.tx.TransactionalOperation;
+
+public interface ServerRegionDataAccess {
+
+ /**
+ * Does a get on the server
+ * @param key the entry key to do the get on
+ * @param callbackArg an optional callback arg to pass to any cache callbacks
+ * @param clientEvent the client event, if any, for version propagation
+ * @return the entry value found by the get if any
+ */
+ public abstract Object get(Object key, Object callbackArg, EntryEventImpl clientEvent);
+
+ /**
+ * Does a region put on the server
+ * @param key the entry key to do the put on
+ * @param value the entry value to put
+ * @param clientEvent the client event, if any, for eventID and version tag propagation
+ * @param op the operation type of this event
+ * @param requireOldValue
+ * @param expectedOldValue
+ * @param callbackArg an optional callback arg to pass to any cache callbacks
+ * @param isCreateFwd
+ */
+ public abstract Object put(Object key, Object value, byte[] deltaBytes, EntryEventImpl clientEvent,
+ Operation op, boolean requireOldValue, Object expectedOldValue,
+ Object callbackArg, boolean isCreateFwd);
+
+
+ /**
+ * Does a region entry destroy on the server
+ * @param key the entry key to do the destroy on
+ * @param expectedOldValue the value that must be associated with the entry, or null
+ * @param operation the operation being performed (Operation.DESTROY, Operation.REMOVE)
+ * @param clientEvent the client event, if any, for version propagation
+ * @param callbackArg an optional callback arg to pass to any cache callbacks
+ */
+ public abstract Object destroy(Object key, Object expectedOldValue,
+ Operation operation, EntryEventImpl clientEvent, Object callbackArg);
+
+
+ /**
+ * Does a region entry invalidate on the server
+ * @param event the entryEventImpl that represents the invalidate
+ */
+ public abstract void invalidate(EntryEventImpl event);
+
+
+ /**
+ * Does a region clear on the server
+ * @param eventId the event id for this clear
+ * @param callbackArg an optional callback arg to pass to any cache callbacks
+ */
+ public abstract void clear(EventID eventId, Object callbackArg);
+
+
+ /**
+ * Does a region containsKey on a server
+ * @param key the entry key to do the containsKey on
+ */
+ public abstract boolean containsKey(Object key);
+
+ /**
+ * Does a region containsKey on a server
+ * @param key the entry key to do the containsKey on
+ */
+ public abstract boolean containsValueForKey(Object key);
+
+
+ /**
+ * Does a region containsValue on a server
+ * @param value the entry value to search for
+ */
+ public boolean containsValue(Object value);
+
+
+ /**
+ * Does a region keySet on a server
+ */
+ public abstract Set keySet();
+
+ public abstract VersionedObjectList putAll(Map map, EventID eventId, boolean skipCallbacks, Object callbackArg);
+
+ public abstract VersionedObjectList removeAll(Collection<Object> keys, EventID eventId, Object callbackArg);
+
+ public abstract VersionedObjectList getAll(List keys, Object callback);
+
+ public int size();
+
+ /**
+ * gets an entry from the server, does not invoke loaders
+ * @param key
+ * @return an {@link EntrySnapshot} for the given key
+ */
+ public Entry getEntry(Object key);
+// public boolean containsValue(Object value);
+// public Set entries(boolean recursive) {
+// public void invalidate(Object key) throws TimeoutException,
+// public int size()
+
+ /**
+ * returns the name of the region to which this interface provides access
+ */
+ public String getRegionName();
+
+ /**
+ * returns the region to which this interface provides access. This may be
+ * null in an admin system
+ */
+ public Region getRegion();
+
+
+}
\ No newline at end of file