You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2016/02/22 22:44:08 UTC
[080/100] [abbrv] incubator-geode git commit: GEODE-917: Merge branch
'feature/GEODE-917' into develop
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommand.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommand.java
index 0000000,dd13f19..bef4bf1
mode 000000,100755..100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommand.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommand.java
@@@ -1,0 -1,1625 +1,1611 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ /**
+ *
+ */
+ package com.gemstone.gemfire.internal.cache.tier.sockets;
+
+ import java.io.EOFException;
+ import java.io.IOException;
+ import java.io.InterruptedIOException;
+ import java.io.PrintWriter;
+ import java.io.StringWriter;
+ import java.util.ArrayList;
+ import java.util.Collection;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.Iterator;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.concurrent.Semaphore;
+ import java.util.regex.Pattern;
+
+ import org.apache.logging.log4j.Logger;
+
+ import com.gemstone.gemfire.CancelException;
+ import com.gemstone.gemfire.CopyException;
+ import com.gemstone.gemfire.InternalGemFireError;
+ import com.gemstone.gemfire.SerializationException;
+ import com.gemstone.gemfire.SystemFailure;
+ import com.gemstone.gemfire.cache.CacheLoaderException;
+ import com.gemstone.gemfire.cache.CacheWriterException;
+ import com.gemstone.gemfire.cache.InterestResultPolicy;
+ import com.gemstone.gemfire.cache.Region;
+ import com.gemstone.gemfire.cache.RegionDestroyedException;
+ import com.gemstone.gemfire.cache.TransactionException;
+ import com.gemstone.gemfire.cache.persistence.PartitionOfflineException;
+ import com.gemstone.gemfire.cache.query.types.CollectionType;
+ import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException;
+ import com.gemstone.gemfire.distributed.internal.DistributionStats;
+ import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+ import com.gemstone.gemfire.internal.Assert;
+ import com.gemstone.gemfire.internal.Version;
+ import com.gemstone.gemfire.internal.cache.CachedDeserializable;
+ import com.gemstone.gemfire.internal.cache.DistributedRegion;
+ import com.gemstone.gemfire.internal.cache.EntryEventImpl;
+ import com.gemstone.gemfire.internal.cache.EntrySnapshot;
+ import com.gemstone.gemfire.internal.cache.EventID;
+ import com.gemstone.gemfire.internal.cache.FindVersionTagOperation;
+ import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+ import com.gemstone.gemfire.internal.cache.LocalRegion;
+ import com.gemstone.gemfire.internal.cache.LocalRegion.NonTXEntry;
+ import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+ import com.gemstone.gemfire.internal.cache.PartitionedRegionHelper;
+ import com.gemstone.gemfire.internal.cache.TXManagerImpl;
+ import com.gemstone.gemfire.internal.cache.TXStateProxy;
+ import com.gemstone.gemfire.internal.cache.Token;
+ import com.gemstone.gemfire.internal.cache.tier.CachedRegionHelper;
+ import com.gemstone.gemfire.internal.cache.tier.Command;
+ import com.gemstone.gemfire.internal.cache.tier.InterestType;
+ import com.gemstone.gemfire.internal.cache.tier.MessageType;
+ import com.gemstone.gemfire.internal.cache.versions.VersionStamp;
+ import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+ import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+ import com.gemstone.gemfire.internal.logging.LogService;
+ import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+ import com.gemstone.gemfire.internal.offheap.OffHeapHelper;
+ import com.gemstone.gemfire.internal.sequencelog.EntryLogger;
+ import com.gemstone.gemfire.security.GemFireSecurityException;
+
+ /**
+ * @author ashahid
+ *
+ */
+ public abstract class BaseCommand implements Command {
+ protected static final Logger logger = LogService.getLogger();
+
+ /**
+ * Whether zipped values are being passed to/from the client. Can be modified
+ * using the system property Message.ZIP_VALUES ? This does not appear to
+ * happen anywhere
+ */
+ protected static final boolean zipValues = false;
+
+ protected static final boolean APPLY_RETRIES = Boolean
+ .getBoolean("gemfire.gateway.ApplyRetries");
+
+ public static final byte[] OK_BYTES = new byte[]{0};
+
+ public static final int maximumChunkSize = Integer.getInteger(
+ "BridgeServer.MAXIMUM_CHUNK_SIZE", 100).intValue();
+
+ /** Maximum number of entries in each chunked response chunk */
+
+ /** Whether to suppress logging of IOExceptions */
+ private static boolean suppressIOExceptionLogging = Boolean
+ .getBoolean("gemfire.bridge.suppressIOExceptionLogging");
+
+ /**
+ * Maximum number of concurrent incoming client message bytes that a bridge
+ * server will allow. Once a server is working on this number additional
+ * incoming client messages will wait until one of them completes or fails.
+ * The bytes are computed based in the size sent in the incoming msg header.
+ */
+ private static final int MAX_INCOMING_DATA = Integer.getInteger(
+ "BridgeServer.MAX_INCOMING_DATA", -1).intValue();
+
+ /**
+ * Maximum number of concurrent incoming client messages that a bridge server
+ * will allow. Once a server is working on this number additional incoming
+ * client messages will wait until one of them completes or fails.
+ */
+ private static final int MAX_INCOMING_MSGS = Integer.getInteger(
+ "BridgeServer.MAX_INCOMING_MSGS", -1).intValue();
+
+ private static final Semaphore incomingDataLimiter;
+
+ private static final Semaphore incomingMsgLimiter;
+ static {
+ Semaphore tmp;
+ if (MAX_INCOMING_DATA > 0) {
+ // backport requires that this is fair since we inc by values > 1
+ tmp = new Semaphore(MAX_INCOMING_DATA, true);
+ }
+ else {
+ tmp = null;
+ }
+ incomingDataLimiter = tmp;
+ if (MAX_INCOMING_MSGS > 0) {
+ tmp = new Semaphore(MAX_INCOMING_MSGS, false); // unfair for best
+ // performance
+ }
+ else {
+ tmp = null;
+ }
+ incomingMsgLimiter = tmp;
+
+ }
+
+ final public void execute(Message msg, ServerConnection servConn) {
+ // Read the request and update the statistics
+ long start = DistributionStats.getStatTime();
+ //servConn.resetTransientData();
+ if(EntryLogger.isEnabled() && servConn != null) {
+ EntryLogger.setSource(servConn.getMembershipID(), "c2s");
+ }
+ boolean shouldMasquerade = shouldMasqueradeForTx(msg, servConn);
+ try {
+ if (shouldMasquerade) {
+ GemFireCacheImpl cache = (GemFireCacheImpl)servConn.getCache();
+ InternalDistributedMember member = (InternalDistributedMember)servConn.getProxyID().getDistributedMember();
+ TXManagerImpl txMgr = cache.getTxManager();
+ TXStateProxy tx = null;
+ try {
+ tx = txMgr.masqueradeAs(msg, member, false);
+ cmdExecute(msg, servConn, start);
+ } finally {
+ txMgr.unmasquerade(tx);
+ }
+ } else {
+ cmdExecute(msg, servConn, start);
+ }
+
+ }
++ catch (TransactionException
++ | CopyException
++ | SerializationException
++ | CacheWriterException
++ | CacheLoaderException
++ | GemFireSecurityException
++ | PartitionOfflineException
++ | MessageTooLargeException e) {
++ handleExceptionNoDisconnect(msg, servConn, e);
++ }
+ catch (EOFException eof) {
+ BaseCommand.handleEOFException(msg, servConn, eof);
+ // TODO:Asif: Check if there is any need for explicitly returning
+ return;
+ }
+ catch (InterruptedIOException e) { // Solaris only
+ BaseCommand.handleInterruptedIOException(msg, servConn, e);
- return;
+ }
+ catch (IOException e) {
+ BaseCommand.handleIOException(msg, servConn, e);
- return;
+ }
+ catch (DistributedSystemDisconnectedException e) {
+ BaseCommand.handleShutdownException(msg, servConn, e);
- return;
- }
- catch (PartitionOfflineException e) { // fix for bug #42225
- handleExceptionNoDisconnect(msg, servConn, e);
- }
- catch (GemFireSecurityException e) {
- handleExceptionNoDisconnect(msg, servConn, e);
+ }
- catch (CacheLoaderException e) {
- handleExceptionNoDisconnect(msg, servConn, e);
- }
- catch (CacheWriterException e) {
- handleExceptionNoDisconnect(msg, servConn, e);
- } catch (SerializationException e) {
- handleExceptionNoDisconnect(msg, servConn, e);
- } catch (CopyException e) {
- handleExceptionNoDisconnect(msg, servConn, e);
- } catch (TransactionException e) {
- handleExceptionNoDisconnect(msg, servConn, e);
- }
-
+ catch (VirtualMachineError err) {
+ SystemFailure.initiateFailure(err);
+ // If this ever returns, rethrow the error. We're poisoned
+ // now, so don't let this thread continue.
+ throw err;
+ }
+ catch (Throwable e) {
+ BaseCommand.handleThrowable(msg, servConn, e);
+ } finally {
+ EntryLogger.clearSource();
+ }
- /*
- * finally { // Keep track of the fact that a message is no longer being //
- * processed. servConn.setNotProcessingMessage();
- * servConn.clearRequestMsg(); }
- */
+ }
+
+ /**
+ * checks to see if this thread needs to masquerade as a transactional thread.
+ * clients after GFE_66 should be able to start a transaction.
+ * @param msg
+ * @param servConn
+ * @return true if thread should masquerade as a transactional thread.
+ */
+ protected boolean shouldMasqueradeForTx(Message msg, ServerConnection servConn) {
+ if (servConn.getClientVersion().compareTo(Version.GFE_66) >= 0
+ && msg.getTransactionId() > TXManagerImpl.NOTX) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * If an operation is retried then some server may have seen it already.
+ * We cannot apply this operation to the cache without knowing whether a
+ * version tag has already been created for it. Otherwise caches that have
+ * seen the event already will reject it but others will not, but will have
+ * no version tag with which to perform concurrency checks.
+ * <p>The client event should have the event identifier from the client and
+ * the region affected by the operation.
+ * @param clientEvent
+ */
+ public boolean recoverVersionTagForRetriedOperation(EntryEventImpl clientEvent) {
+ LocalRegion r = clientEvent.getRegion();
+ VersionTag tag = null;
+ if ((clientEvent.getVersionTag() != null) && (clientEvent.getVersionTag().isGatewayTag())) {
+ tag = r.findVersionTagForGatewayEvent(clientEvent.getEventId());
+ }
+ else {
+ tag = r.findVersionTagForClientEvent(clientEvent.getEventId());
+ }
+ if (tag == null) {
+ if (r instanceof DistributedRegion || r instanceof PartitionedRegion) {
+ // TODO this could be optimized for partitioned regions by sending the key
+ // so that the PR could look at an individual bucket for the event
+ tag = FindVersionTagOperation.findVersionTag(r, clientEvent.getEventId(), false);
+ }
+ }
+ if (tag != null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("recovered version tag {} for replayed operation {}", tag, clientEvent.getEventId());
+ }
+ clientEvent.setVersionTag(tag);
+ }
+ return (tag != null);
+ }
+
+ /**
+ * If an operation is retried then some server may have seen it already.
+ * We cannot apply this operation to the cache without knowing whether a
+ * version tag has already been created for it. Otherwise caches that have
+ * seen the event already will reject it but others will not, but will have
+ * no version tag with which to perform concurrency checks.
+ * <p>The client event should have the event identifier from the client and
+ * the region affected by the operation.
+ */
+ protected VersionTag findVersionTagsForRetriedBulkOp(LocalRegion r, EventID eventID) {
+ VersionTag tag = r.findVersionTagForClientBulkOp(eventID);
+ if(tag != null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("recovered version tag {} for replayed bulk operation {}", tag, eventID);
+ }
+ return tag;
+ }
+ if (r instanceof DistributedRegion || r instanceof PartitionedRegion) {
+ // TODO this could be optimized for partitioned regions by sending the key
+ // so that the PR could look at an individual bucket for the event
+ tag = FindVersionTagOperation.findVersionTag(r, eventID, true);
+ }
+ if (tag != null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("recovered version tag {} for replayed bulk operation {}", tag, eventID);
+ }
+ }
+ return tag;
+ }
+
+ abstract public void cmdExecute(Message msg, ServerConnection servConn,
+ long start) throws IOException, ClassNotFoundException, InterruptedException;
+
+ protected void writeReply(Message origMsg, ServerConnection servConn)
+ throws IOException {
+ Message replyMsg = servConn.getReplyMessage();
+ servConn.getCache().getCancelCriterion().checkCancelInProgress(null);
+ replyMsg.setMessageType(MessageType.REPLY);
+ replyMsg.setNumberOfParts(1);
+ replyMsg.setTransactionId(origMsg.getTransactionId());
+ replyMsg.addBytesPart(OK_BYTES);
+ replyMsg.send(servConn);
+ if (logger.isTraceEnabled()) {
+ logger.trace("{}: rpl tx: {}", servConn.getName(), origMsg.getTransactionId());
+ }
+ }
+ protected void writeReplyWithRefreshMetadata(Message origMsg,
+ ServerConnection servConn, PartitionedRegion pr, byte nwHop) throws IOException {
+ Message replyMsg = servConn.getReplyMessage();
+ servConn.getCache().getCancelCriterion().checkCancelInProgress(null);
+ replyMsg.setMessageType(MessageType.REPLY);
+ replyMsg.setNumberOfParts(1);
+ replyMsg.setTransactionId(origMsg.getTransactionId());
+ replyMsg.addBytesPart(new byte[]{pr.getMetadataVersion().byteValue(), nwHop});
+ replyMsg.send(servConn);
+ pr.getPrStats().incPRMetaDataSentCount();
+ if (logger.isTraceEnabled()) {
+ logger.trace("{}: rpl with REFRESH_METADAT tx: {}", servConn.getName(), origMsg.getTransactionId());
+ }
+ }
+
+ private static void handleEOFException(Message msg,
+ ServerConnection servConn, Exception eof) {
+ CachedRegionHelper crHelper = servConn.getCachedRegionHelper();
+ CacheServerStats stats = servConn.getCacheServerStats();
+ boolean potentialModification = servConn.getPotentialModification();
+ if (!crHelper.isShutdown()) {
+ if (potentialModification) {
+ stats.incAbandonedWriteRequests();
+ }
+ else {
+ stats.incAbandonedReadRequests();
+ }
+ if (!suppressIOExceptionLogging) {
+ if (potentialModification) {
+ int transId = (msg != null) ? msg.getTransactionId()
+ : Integer.MIN_VALUE;
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.BaseCommand_0_EOFEXCEPTION_DURING_A_WRITE_OPERATION_ON_REGION__1_KEY_2_MESSAGEID_3,
+ new Object[] {servConn.getName(), servConn.getModRegion(), servConn.getModKey(), Integer.valueOf(transId)}));
+ }
+ else {
+ logger.debug("EOF exception", eof);
+ logger.info(LocalizedMessage.create(
+ LocalizedStrings.BaseCommand_0_CONNECTION_DISCONNECT_DETECTED_BY_EOF,
+ servConn.getName()));
+ }
+ }
+ }
+ servConn.setFlagProcessMessagesAsFalse();
+ }
+
+ private static void handleInterruptedIOException(Message msg,
+ ServerConnection servConn, Exception e) {
+ CachedRegionHelper crHelper = servConn.getCachedRegionHelper();
+ if (!crHelper.isShutdown() && servConn.isOpen()) {
+ if (!suppressIOExceptionLogging) {
+ if (logger.isDebugEnabled())
+ logger.debug("Aborted message due to interrupt: {}", e.getMessage(), e);
+ }
+ }
+ servConn.setFlagProcessMessagesAsFalse();
+ }
+
+ private static void handleIOException(Message msg, ServerConnection servConn,
+ Exception e) {
+ CachedRegionHelper crHelper = servConn.getCachedRegionHelper();
+ boolean potentialModification = servConn.getPotentialModification();
+
+ if (!crHelper.isShutdown() && servConn.isOpen()) {
+ if (!suppressIOExceptionLogging) {
+ if (potentialModification) {
+ int transId = (msg != null) ? msg.getTransactionId()
+ : Integer.MIN_VALUE;
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.BaseCommand_0_UNEXPECTED_IOEXCEPTION_DURING_OPERATION_FOR_REGION_1_KEY_2_MESSID_3,
+ new Object[] {servConn.getName(), servConn.getModRegion(), servConn.getModKey(), Integer.valueOf(transId)}), e);
+ }
+ else {
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.BaseCommand_0_UNEXPECTED_IOEXCEPTION,
+ servConn.getName()), e);
+ }
+ }
+ }
+ servConn.setFlagProcessMessagesAsFalse();
+ }
+
+ private static void handleShutdownException(Message msg,
+ ServerConnection servConn, Exception e) {
+ CachedRegionHelper crHelper = servConn.getCachedRegionHelper();
+ boolean potentialModification = servConn.getPotentialModification();
+
+ if (!crHelper.isShutdown()) {
+ if (potentialModification) {
+ int transId = (msg != null) ? msg.getTransactionId()
+ : Integer.MIN_VALUE;
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.BaseCommand_0_UNEXPECTED_SHUTDOWNEXCEPTION_DURING_OPERATION_ON_REGION_1_KEY_2_MESSAGEID_3,
+ new Object[] {servConn.getName(), servConn.getModRegion(), servConn.getModKey(), Integer.valueOf(transId)}), e);
+ }
+ else {
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.BaseCommand_0_UNEXPECTED_SHUTDOWNEXCEPTION,
+ servConn.getName()),e);
+ }
+ }
+ servConn.setFlagProcessMessagesAsFalse();
+ }
+
+ // Handle GemfireSecurityExceptions separately since the connection should not
+ // be terminated (by setting processMessages to false) unlike in
+ // handleThrowable. Fixes bugs #38384 and #39392.
+ // private static void handleGemfireSecurityException(Message msg,
+ // ServerConnection servConn, GemFireSecurityException e) {
+ //
+ // boolean requiresResponse = servConn.getTransientFlag(REQUIRES_RESPONSE);
+ // boolean responded = servConn.getTransientFlag(RESPONDED);
+ // boolean requiresChunkedResponse = servConn
+ // .getTransientFlag(REQUIRES_CHUNKED_RESPONSE);
+ // boolean potentialModification = servConn.getPotentialModification();
+ //
+ // try {
+ // try {
+ // if (requiresResponse && !responded) {
+ // if (requiresChunkedResponse) {
+ // writeChunkedException(msg, e, false, servConn);
+ // }
+ // else {
+ // writeException(msg, e, false, servConn);
+ // }
+ // servConn.setAsTrue(RESPONDED);
+ // }
+ // }
+ // finally { // inner try-finally to ensure proper ordering of logging
+ // if (potentialModification) {
+ // int transId = (msg != null) ? msg.getTransactionId()
+ // : Integer.MIN_VALUE;
+ // }
+ // }
+ // }
+ // catch (IOException ioe) {
+ // if (logger.isDebugEnabled()) {
+ // logger.fine(servConn.getName()
+ // + ": Unexpected IOException writing security exception: ", ioe);
+ // }
+ // }
+ // }
+
+ private static void handleExceptionNoDisconnect(Message msg,
+ ServerConnection servConn, Exception e) {
+ boolean requiresResponse = servConn.getTransientFlag(REQUIRES_RESPONSE);
+ boolean responded = servConn.getTransientFlag(RESPONDED);
+ boolean requiresChunkedResponse = servConn
+ .getTransientFlag(REQUIRES_CHUNKED_RESPONSE);
+ boolean potentialModification = servConn.getPotentialModification();
+ boolean wroteExceptionResponse = false;
+
+ try {
+ try {
+ if (requiresResponse && !responded) {
+ if (requiresChunkedResponse) {
+ writeChunkedException(msg, e, false, servConn);
+ }
+ else {
+ writeException(msg, e, false, servConn);
+ }
+ wroteExceptionResponse = true;
+ servConn.setAsTrue(RESPONDED);
+ }
+ }
+ finally { // inner try-finally to ensure proper ordering of logging
+ if (potentialModification) {
+ int transId = (msg != null) ? msg.getTransactionId()
+ : Integer.MIN_VALUE;
+ if (!wroteExceptionResponse) {
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.BaseCommand_0_UNEXPECTED_EXCEPTION_DURING_OPERATION_ON_REGION_1_KEY_2_MESSAGEID_3,
+ new Object[] {servConn.getName(),servConn.getModRegion(), servConn.getModKey(), Integer.valueOf(transId)}), e);
+ } else {
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}: Exception during operation on region: {} key: {} messageId: {}", servConn.getName(),
+ servConn.getModRegion(), servConn.getModKey(), transId, e);
+ }
+ }
+ }
+ else {
+ if (!wroteExceptionResponse) {
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.BaseCommand_0_UNEXPECTED_EXCEPTION,
+ servConn.getName()), e);
+ } else {
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}: Exception: {}", servConn.getName(), e.getMessage(), e);
+ }
+ }
+ }
+ }
+ }
+ catch (IOException ioe) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}: Unexpected IOException writing exception: {}", servConn.getName(), ioe.getMessage(), ioe);
+ }
+ }
+ }
+
+ private static void handleThrowable(Message msg, ServerConnection servConn,
+ Throwable th) {
+ boolean requiresResponse = servConn.getTransientFlag(REQUIRES_RESPONSE);
+ boolean responded = servConn.getTransientFlag(RESPONDED);
+ boolean requiresChunkedResponse = servConn
+ .getTransientFlag(REQUIRES_CHUNKED_RESPONSE);
+ boolean potentialModification = servConn.getPotentialModification();
+
+ try {
+ try {
+ if (th instanceof Error) {
+ logger.fatal(LocalizedMessage.create(LocalizedStrings.BaseCommand_0_UNEXPECTED_ERROR_ON_SERVER,
+ servConn.getName()), th);
+ }
+ if (requiresResponse && !responded) {
+ if (requiresChunkedResponse) {
+ writeChunkedException(msg, th, false, servConn);
+ }
+ else {
+ writeException(msg, th, false, servConn);
+ }
+ servConn.setAsTrue(RESPONDED);
+ }
+ }
+ finally { // inner try-finally to ensure proper ordering of logging
+ if (th instanceof Error) {
+ // log nothing
+ } else if (th instanceof CancelException) {
+ // log nothing
+ } else {
+ if (potentialModification) {
+ int transId = (msg != null) ? msg.getTransactionId()
+ : Integer.MIN_VALUE;
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.BaseCommand_0_UNEXPECTED_EXCEPTION_DURING_OPERATION_ON_REGION_1_KEY_2_MESSAGEID_3,
+ new Object[] {servConn.getName(),servConn.getModRegion(), servConn.getModKey(), Integer.valueOf(transId)}), th);
+ }
+ else {
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.BaseCommand_0_UNEXPECTED_EXCEPTION,
+ servConn.getName()), th);
+ }
+ }
+ }
+ } catch (IOException ioe) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}: Unexpected IOException writing exception: {}", servConn.getName(), ioe.getMessage(), ioe);
+ }
+ } finally {
+ servConn.setFlagProcessMessagesAsFalse();
+ }
+ }
+
+ protected static void writeChunkedException(Message origMsg, Throwable e,
+ boolean isSevere, ServerConnection servConn) throws IOException {
+ writeChunkedException(origMsg, e, isSevere, servConn, servConn.getChunkedResponseMessage());
+ }
+
+ protected static void writeChunkedException(Message origMsg, Throwable e,
+ boolean isSevere, ServerConnection servConn, ChunkedMessage originalReponse) throws IOException {
+ writeChunkedException(origMsg, e, isSevere, servConn, originalReponse, 2);
+ }
+
+ protected static void writeChunkedException(Message origMsg, Throwable e,
+ boolean isSevere, ServerConnection servConn, ChunkedMessage originalReponse, int numOfParts) throws IOException {
+ ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage();
+ chunkedResponseMsg.setServerConnection(servConn);
+ if (originalReponse.headerHasBeenSent()) {
+ //chunkedResponseMsg = originalReponse;
+ // fix for bug 35442
+ chunkedResponseMsg.setNumberOfParts(numOfParts);
+ chunkedResponseMsg.setLastChunkAndNumParts(true, numOfParts);
+ chunkedResponseMsg.addObjPart(e);
+ if (numOfParts == 2) {
+ chunkedResponseMsg.addStringPart(getExceptionTrace(e));
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}: Sending exception chunk while reply in progress: {}", servConn.getName(), e.getMessage(), e);
+ }
+ }
+ else {
+ chunkedResponseMsg.setMessageType(MessageType.EXCEPTION);
+ chunkedResponseMsg.setNumberOfParts(numOfParts);
+ chunkedResponseMsg.setLastChunkAndNumParts(true, numOfParts);
+ chunkedResponseMsg.setTransactionId(origMsg.getTransactionId());
+ chunkedResponseMsg.sendHeader();
+ chunkedResponseMsg.addObjPart(e);
+ if (numOfParts == 2) {
+ chunkedResponseMsg.addStringPart(getExceptionTrace(e));
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}: Sending exception chunk: {}", servConn.getName(), e.getMessage(), e);
+ }
+ }
+ chunkedResponseMsg.sendChunk(servConn);
+ }
+
+ // Get the exception stacktrace for native clients
+ public static String getExceptionTrace(Throwable ex) {
+ StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw);
+ ex.printStackTrace(pw);
+ pw.close();
+ return sw.toString();
+ }
+
+ protected static void writeException(Message origMsg, Throwable e,
+ boolean isSevere, ServerConnection servConn) throws IOException {
+ writeException(origMsg, MessageType.EXCEPTION, e, isSevere, servConn);
+ }
+
+ protected static void writeException(Message origMsg, int msgType, Throwable e,
+ boolean isSevere, ServerConnection servConn) throws IOException {
+ Message errorMsg = servConn.getErrorResponseMessage();
+ errorMsg.setMessageType(msgType);
+ errorMsg.setNumberOfParts(2);
+ errorMsg.setTransactionId(origMsg.getTransactionId());
+ if (isSevere) {
+ String msg = e.getMessage();
+ if (msg == null) {
+ msg = e.toString();
+ }
+ logger.fatal(LocalizedMessage.create(LocalizedStrings.BaseCommand_SEVERE_CACHE_EXCEPTION_0, msg));
+ }
+ errorMsg.addObjPart(e);
+ errorMsg.addStringPart(getExceptionTrace(e));
+ errorMsg.send(servConn);
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}: Wrote exception: {}", servConn.getName(), e.getMessage(), e);
+ }
++ if (e instanceof MessageTooLargeException) {
++ throw (IOException)e;
++ }
+ }
+
+ protected static void writeErrorResponse(Message origMsg, int messageType,
+ ServerConnection servConn) throws IOException {
+ Message errorMsg = servConn.getErrorResponseMessage();
+ errorMsg.setMessageType(messageType);
+ errorMsg.setNumberOfParts(1);
+ errorMsg.setTransactionId(origMsg.getTransactionId());
+ errorMsg
+ .addStringPart(LocalizedStrings.BaseCommand_INVALID_DATA_RECEIVED_PLEASE_SEE_THE_CACHE_SERVER_LOG_FILE_FOR_ADDITIONAL_DETAILS.toLocalizedString());
+ errorMsg.send(servConn);
+ }
+
+ protected static void writeErrorResponse(Message origMsg, int messageType,
+ String msg, ServerConnection servConn) throws IOException {
+ Message errorMsg = servConn.getErrorResponseMessage();
+ errorMsg.setMessageType(messageType);
+ errorMsg.setNumberOfParts(1);
+ errorMsg.setTransactionId(origMsg.getTransactionId());
+ errorMsg.addStringPart(msg);
+ errorMsg.send(servConn);
+ }
+
+ protected static void writeRegionDestroyedEx(Message msg, String regionName,
+ String title, ServerConnection servConn) throws IOException {
+ String reason = servConn.getName() + ": Region named " + regionName + title;
+ RegionDestroyedException ex = new RegionDestroyedException(reason,
+ regionName);
+ if (servConn.getTransientFlag(REQUIRES_CHUNKED_RESPONSE)) {
+ writeChunkedException(msg, ex, false, servConn);
+ }
+ else {
+ writeException(msg, ex, false, servConn);
+ }
+ }
+
+ protected static void writeResponse(Object data, Object callbackArg,
+ Message origMsg, boolean isObject, ServerConnection servConn)
+ throws IOException {
+ Message responseMsg = servConn.getResponseMessage();
+ responseMsg.setMessageType(MessageType.RESPONSE);
+ responseMsg.setTransactionId(origMsg.getTransactionId());
+
+
+ if (callbackArg == null) {
+ responseMsg.setNumberOfParts(1);
+ }
+ else {
+ responseMsg.setNumberOfParts(2);
+ }
+ if (data instanceof byte[]) {
+ responseMsg.addRawPart((byte[])data, isObject);
+ }
+ else {
+ Assert.assertTrue(isObject,
+ "isObject should be true when value is not a byte[]");
+ responseMsg.addObjPart(data, zipValues);
+ }
+ if (callbackArg != null) {
+ responseMsg.addObjPart(callbackArg);
+ }
+ servConn.getCache().getCancelCriterion().checkCancelInProgress(null);
+ responseMsg.send(servConn);
+ origMsg.clearParts();
+ }
+
+ protected static void writeResponseWithRefreshMetadata(Object data,
+ Object callbackArg, Message origMsg, boolean isObject,
+ ServerConnection servConn, PartitionedRegion pr, byte nwHop) throws IOException {
+ Message responseMsg = servConn.getResponseMessage();
+ responseMsg.setMessageType(MessageType.RESPONSE);
+ responseMsg.setTransactionId(origMsg.getTransactionId());
+
+ if (callbackArg == null) {
+ responseMsg.setNumberOfParts(2);
+ }
+ else {
+ responseMsg.setNumberOfParts(3);
+ }
+
+ if (data instanceof byte[]) {
+ responseMsg.addRawPart((byte[])data, isObject);
+ }
+ else {
+ Assert.assertTrue(isObject,
+ "isObject should be true when value is not a byte[]");
+ responseMsg.addObjPart(data, zipValues);
+ }
+ if (callbackArg != null) {
+ responseMsg.addObjPart(callbackArg);
+ }
+ responseMsg.addBytesPart(new byte[]{pr.getMetadataVersion().byteValue(),nwHop});
+ servConn.getCache().getCancelCriterion().checkCancelInProgress(null);
+ responseMsg.send(servConn);
+ origMsg.clearParts();
+ }
+
+ protected static void writeResponseWithFunctionAttribute(byte[] data,
+ Message origMsg, ServerConnection servConn) throws IOException {
+ Message responseMsg = servConn.getResponseMessage();
+ responseMsg.setMessageType(MessageType.RESPONSE);
+ responseMsg.setTransactionId(origMsg.getTransactionId());
+ responseMsg.setNumberOfParts(1);
+ responseMsg.addBytesPart(data);
+ servConn.getCache().getCancelCriterion().checkCancelInProgress(null);
+ responseMsg.send(servConn);
+ origMsg.clearParts();
+ }
+
+ static protected void checkForInterrupt(ServerConnection servConn, Exception e)
+ throws InterruptedException, InterruptedIOException {
+ servConn.getCachedRegionHelper().checkCancelInProgress(e);
+ if (e instanceof InterruptedException) {
+ throw (InterruptedException)e;
+ }
+ if (e instanceof InterruptedIOException) {
+ throw (InterruptedIOException)e;
+ }
+ }
+
+ protected static void writeQueryResponseChunk(Object queryResponseChunk,
+ CollectionType collectionType, boolean lastChunk,
+ ServerConnection servConn) throws IOException {
+ ChunkedMessage queryResponseMsg = servConn.getQueryResponseMessage();
+ queryResponseMsg.setNumberOfParts(2);
+ queryResponseMsg.setLastChunk(lastChunk);
+ queryResponseMsg.addObjPart(collectionType, zipValues);
+ queryResponseMsg.addObjPart(queryResponseChunk, zipValues);
+ queryResponseMsg.sendChunk(servConn);
+ }
+
+ protected static void writeQueryResponseException(Message origMsg,
+ Throwable e, boolean isSevere, ServerConnection servConn)
+ throws IOException {
+ ChunkedMessage queryResponseMsg = servConn.getQueryResponseMessage();
+ ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage();
+ if (queryResponseMsg.headerHasBeenSent()) {
+ // fix for bug 35442
+ // This client is expecting 2 parts in this message so send 2 parts
+ queryResponseMsg.setServerConnection(servConn);
+ queryResponseMsg.setNumberOfParts(2);
+ queryResponseMsg.setLastChunkAndNumParts(true, 2);
+ queryResponseMsg.addObjPart(e);
+ queryResponseMsg.addStringPart(getExceptionTrace(e));
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}: Sending exception chunk while reply in progress: {}", servConn.getName(), e.getMessage(), e);
+ }
+ queryResponseMsg.sendChunk(servConn);
+ }
+ else {
+ chunkedResponseMsg.setServerConnection(servConn);
+ chunkedResponseMsg.setMessageType(MessageType.EXCEPTION);
+ chunkedResponseMsg.setNumberOfParts(2);
+ chunkedResponseMsg.setLastChunkAndNumParts(true, 2);
+ chunkedResponseMsg.setTransactionId(origMsg.getTransactionId());
+ chunkedResponseMsg.sendHeader();
+ chunkedResponseMsg.addObjPart(e);
+ chunkedResponseMsg.addStringPart(getExceptionTrace(e));
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}: Sending exception chunk: {}", servConn.getName(), e.getMessage(), e);
+ }
+ chunkedResponseMsg.sendChunk(servConn);
+ }
+ }
+
+ protected static void writeChunkedErrorResponse(Message origMsg,
+ int messageType, String message, ServerConnection servConn)
+ throws IOException {
+ // Send chunked response header identifying error message
+ ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage();
+ if (logger.isDebugEnabled()) {
+ logger.debug(servConn.getName() + ": Sending error message header type: "
+ + messageType + " transaction: " + origMsg.getTransactionId());
+ }
+ chunkedResponseMsg.setMessageType(messageType);
+ chunkedResponseMsg.setTransactionId(origMsg.getTransactionId());
+ chunkedResponseMsg.sendHeader();
+
+ // Send actual error
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}: Sending error message chunk: {}", servConn.getName(), message);
+ }
+ chunkedResponseMsg.setNumberOfParts(1);
+ chunkedResponseMsg.setLastChunk(true);
+ chunkedResponseMsg.addStringPart(message);
+ chunkedResponseMsg.sendChunk(servConn);
+ }
+
+ protected static void writeFunctionResponseException(Message origMsg,
+ int messageType, String message, ServerConnection servConn, Throwable e)
+ throws IOException {
+ ChunkedMessage functionResponseMsg = servConn.getFunctionResponseMessage();
+ ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage();
+ if (functionResponseMsg.headerHasBeenSent()) {
+ functionResponseMsg.setServerConnection(servConn);
+ functionResponseMsg.setNumberOfParts(2);
+ functionResponseMsg.setLastChunkAndNumParts(true,2);
+ functionResponseMsg.addObjPart(e);
+ functionResponseMsg.addStringPart(getExceptionTrace(e));
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}: Sending exception chunk while reply in progress: {}", servConn.getName(), e.getMessage(), e);
+ }
+ functionResponseMsg.sendChunk(servConn);
+ }
+ else {
+ chunkedResponseMsg.setServerConnection(servConn);
+ chunkedResponseMsg.setMessageType(messageType);
+ chunkedResponseMsg.setNumberOfParts(2);
+ chunkedResponseMsg.setLastChunkAndNumParts(true,2);
+ chunkedResponseMsg.setTransactionId(origMsg.getTransactionId());
+ chunkedResponseMsg.sendHeader();
+ chunkedResponseMsg.addObjPart(e);
+ chunkedResponseMsg.addStringPart(getExceptionTrace(e));
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}: Sending exception chunk: {}", servConn.getName(), e.getMessage(), e);
+ }
+ chunkedResponseMsg.sendChunk(servConn);
+ }
+ }
+
+ protected static void writeFunctionResponseError(Message origMsg,
+ int messageType, String message, ServerConnection servConn)
+ throws IOException {
+ ChunkedMessage functionResponseMsg = servConn.getFunctionResponseMessage();
+ ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage();
+ if (functionResponseMsg.headerHasBeenSent()) {
+ functionResponseMsg.setNumberOfParts(1);
+ functionResponseMsg.setLastChunk(true);
+ functionResponseMsg.addStringPart(message);
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}: Sending Error chunk while reply in progress: {}", servConn.getName(), message);
+ }
+ functionResponseMsg.sendChunk(servConn);
+ }
+ else {
+ chunkedResponseMsg.setMessageType(messageType);
+ chunkedResponseMsg.setNumberOfParts(1);
+ chunkedResponseMsg.setLastChunk(true);
+ chunkedResponseMsg.setTransactionId(origMsg.getTransactionId());
+ chunkedResponseMsg.sendHeader();
+ chunkedResponseMsg.addStringPart(message);
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}: Sending Error chunk: {}", servConn.getName(), message);
+ }
+ chunkedResponseMsg.sendChunk(servConn);
+ }
+ }
+
+ protected static void writeKeySetErrorResponse(Message origMsg,
+ int messageType, String message, ServerConnection servConn)
+ throws IOException {
+ // Send chunked response header identifying error message
+ ChunkedMessage chunkedResponseMsg = servConn.getKeySetResponseMessage();
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}: Sending error message header type: {} transaction: {}",
+ servConn.getName(), messageType, origMsg.getTransactionId());
+ }
+ chunkedResponseMsg.setMessageType(messageType);
+ chunkedResponseMsg.setTransactionId(origMsg.getTransactionId());
+ chunkedResponseMsg.sendHeader();
+ // Send actual error
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}: Sending error message chunk: {}", servConn.getName(), message);
+ }
+ chunkedResponseMsg.setNumberOfParts(1);
+ chunkedResponseMsg.setLastChunk(true);
+ chunkedResponseMsg.addStringPart(message);
+ chunkedResponseMsg.sendChunk(servConn);
+ }
+
+ static Message readRequest(ServerConnection servConn) {
+ Message requestMsg = null;
+ try {
+ requestMsg = servConn.getRequestMessage();
+ requestMsg.recv(servConn, MAX_INCOMING_DATA, incomingDataLimiter,
+ incomingMsgLimiter);
+ return requestMsg;
+ }
+ catch (EOFException eof) {
+ handleEOFException(null, servConn, eof);
+ // TODO:Asif: Check if there is any need for explicitly returning
+
+ }
+ catch (InterruptedIOException e) { // Solaris only
+ handleInterruptedIOException(null, servConn, e);
+
+ }
+ catch (IOException e) {
+ handleIOException(null, servConn, e);
+
+ }
+ catch (DistributedSystemDisconnectedException e) {
+ handleShutdownException(null, servConn, e);
+
+ }
+ catch (VirtualMachineError err) {
+ SystemFailure.initiateFailure(err);
+ // If this ever returns, rethrow the error. We're poisoned
+ // now, so don't let this thread continue.
+ throw err;
+ }
+ catch (Throwable e) {
+ SystemFailure.checkFailure();
+ handleThrowable(null, servConn, e);
+ }
+ return requestMsg;
+ }
+
+ protected static void fillAndSendRegisterInterestResponseChunks(
+ LocalRegion region, Object riKey, int interestType,
+ InterestResultPolicy policy, ServerConnection servConn)
+ throws IOException {
+ fillAndSendRegisterInterestResponseChunks(region, riKey, interestType,
+ false, policy, servConn);
+ }
+
+ /*
+ * serializeValues is unused for clients < GFE_80
+ */
+ protected static void fillAndSendRegisterInterestResponseChunks(
+ LocalRegion region, Object riKey, int interestType, boolean serializeValues,
+ InterestResultPolicy policy, ServerConnection servConn)
+ throws IOException {
+ // Client is not interested.
+ if (policy.isNone()) {
+ sendRegisterInterestResponseChunk(region, riKey, new ArrayList(), true,
+ servConn);
+ return;
+ }
+ if (policy.isKeysValues()
+ && servConn.getClientVersion().compareTo(Version.GFE_80) >= 0) {
+ handleKeysValuesPolicy(region, riKey, interestType, serializeValues, servConn);
+ return;
+ }
+ if (riKey instanceof List) {
+ handleList(region, (List)riKey, policy, servConn);
+ return;
+ }
+ if (!(riKey instanceof String)) {
+ handleSingleton(region, riKey, policy, servConn);
+ return;
+ }
+
+ switch (interestType) {
+ case InterestType.OQL_QUERY:
+ // Not supported yet
+ throw new InternalGemFireError(LocalizedStrings.BaseCommand_NOT_YET_SUPPORTED.toLocalizedString());
+ case InterestType.FILTER_CLASS:
+ throw new InternalGemFireError(LocalizedStrings.BaseCommand_NOT_YET_SUPPORTED.toLocalizedString());
+ // handleFilter(region, (String)riKey, policy);
+ // break;
+ case InterestType.REGULAR_EXPRESSION: {
+ String regEx = (String)riKey;
+ if (regEx.equals(".*")) {
+ handleAllKeys(region, policy, servConn);
+ }
+ else {
+ handleRegEx(region, regEx, policy, servConn);
+ }
+ }
+ break;
+ case InterestType.KEY:
+ if (riKey.equals("ALL_KEYS")) {
+ handleAllKeys(region, policy, servConn);
+ }
+ else {
+ handleSingleton(region, riKey, policy, servConn);
+ }
+ break;
+ default:
+ throw new InternalGemFireError(LocalizedStrings.BaseCommand_UNKNOWN_INTEREST_TYPE.toLocalizedString());
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ private static void handleKeysValuesPolicy(LocalRegion region, Object riKey,
+ int interestType, boolean serializeValues, ServerConnection servConn)
+ throws IOException {
+ if (riKey instanceof List) {
+ handleKVList(region, (List)riKey, serializeValues, servConn);
+ return;
+ }
+ if (!(riKey instanceof String)) {
+ handleKVSingleton(region, riKey, serializeValues, servConn);
+ return;
+ }
+
+ switch (interestType) {
+ case InterestType.OQL_QUERY:
+ throw new InternalGemFireError(LocalizedStrings.BaseCommand_NOT_YET_SUPPORTED.toLocalizedString());
+ case InterestType.FILTER_CLASS:
+ throw new InternalGemFireError(LocalizedStrings.BaseCommand_NOT_YET_SUPPORTED.toLocalizedString());
+ case InterestType.REGULAR_EXPRESSION:
+ String regEx = (String)riKey;
+ if (regEx.equals(".*")) {
+ handleKVAllKeys(region, null, serializeValues, servConn);
+ } else {
+ handleKVAllKeys(region, regEx, serializeValues, servConn);
+ }
+ break;
+ case InterestType.KEY:
+ if (riKey.equals("ALL_KEYS")) {
+ handleKVAllKeys(region, null, serializeValues, servConn);
+ } else {
+ handleKVSingleton(region, riKey, serializeValues, servConn);
+ }
+ break;
+ default:
+ throw new InternalGemFireError(LocalizedStrings.BaseCommand_UNKNOWN_INTEREST_TYPE.toLocalizedString());
+ }
+ }
+
+ /**
+ * @param list
+ * is a List of entry keys
+ */
+ protected static void sendRegisterInterestResponseChunk(Region region,
+ Object riKey, ArrayList list, boolean lastChunk, ServerConnection servConn)
+ throws IOException {
+ ChunkedMessage chunkedResponseMsg = servConn.getRegisterInterestResponseMessage();
+ chunkedResponseMsg.setNumberOfParts(1);
+ chunkedResponseMsg.setLastChunk(lastChunk);
+ chunkedResponseMsg.addObjPart(list, zipValues);
+ String regionName = (region == null) ? " null " : region.getFullPath();
+ if (logger.isDebugEnabled()) {
+ String str = servConn.getName() + ": Sending"
+ + (lastChunk ? " last " : " ")
+ + "register interest response chunk for region: " + regionName
+ + " for keys: " + riKey + " chunk=<" + chunkedResponseMsg + ">";
+ logger.debug(str);
+ }
+
+ chunkedResponseMsg.sendChunk(servConn);
+ }
+
+ /**
+ * Determines whether keys for destroyed entries (tombstones) should be sent
+ * to clients in register-interest results.
+ *
+ * @param servConn
+ * @param policy
+ * @return true if tombstones should be sent to the client
+ */
+ private static boolean sendTombstonesInRIResults(ServerConnection servConn, InterestResultPolicy policy) {
+ return (policy == InterestResultPolicy.KEYS_VALUES)
+ && (servConn.getClientVersion().compareTo(Version.GFE_80) >= 0);
+ }
+
+ /**
+ * Process an interest request involving a list of keys
+ *
+ * @param region
+ * the region
+ * @param keyList
+ * the list of keys
+ * @param policy
+ * the policy
+ * @throws IOException
+ */
+ private static void handleList(LocalRegion region, List keyList,
+ InterestResultPolicy policy, ServerConnection servConn)
+ throws IOException {
+ if (region instanceof PartitionedRegion) {
+ // too bad java doesn't provide another way to do this...
+ handleListPR((PartitionedRegion)region, keyList, policy, servConn);
+ return;
+ }
+ ArrayList newKeyList = new ArrayList(maximumChunkSize);
+ // Handle list of keys
+ if (region != null) {
+ for (Iterator it = keyList.iterator(); it.hasNext();) {
+ Object entryKey = it.next();
+ if (region.containsKey(entryKey)
+ || (sendTombstonesInRIResults(servConn, policy) && region.containsTombstone(entryKey))) {
+
+ appendInterestResponseKey(region, keyList, entryKey, newKeyList,
+ "list", servConn);
+ }
+ }
+ }
+ // Send the last chunk (the only chunk for individual and list keys)
+ // always send it back, even if the list is of zero size.
+ sendRegisterInterestResponseChunk(region, keyList, newKeyList, true,
+ servConn);
+ }
+
+ /**
+ * Handles both RR and PR cases
+ */
+ @SuppressWarnings("rawtypes")
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_PARAM_DEREF", justification="Null value handled in sendNewRegisterInterestResponseChunk()")
+ private static void handleKVSingleton(LocalRegion region, Object entryKey,
+ boolean serializeValues, ServerConnection servConn)
+ throws IOException {
+ VersionedObjectList values = new VersionedObjectList(maximumChunkSize,
+ true, region == null ? true : region.getAttributes()
+ .getConcurrencyChecksEnabled(), serializeValues);
+
+ if (region != null) {
+ if (region.containsKey(entryKey) || region.containsTombstone(entryKey)) {
+ EntryEventImpl versionHolder = EntryEventImpl.createVersionTagHolder();
+ ClientProxyMembershipID id = servConn == null ? null : servConn.getProxyID();
+ // From Get70.getValueAndIsObject()
+ Object data = region.get(entryKey, null, true, true, true, id, versionHolder, true, false);
+ VersionTag vt = versionHolder.getVersionTag();
+
+ updateValues(values, entryKey, data, vt);
+ }
+ }
+ // Send the last chunk (the only chunk for individual and list keys)
+ // always send it back, even if the list is of zero size.
+ sendNewRegisterInterestResponseChunk(region, entryKey, values, true, servConn);
+ }
+
+ /**
+ * Process an interest request consisting of a single key
+ *
+ * @param region
+ * the region
+ * @param entryKey
+ * the key
+ * @param policy
+ * the policy
+ * @throws IOException
+ */
+ private static void handleSingleton(LocalRegion region, Object entryKey,
+ InterestResultPolicy policy, ServerConnection servConn)
+ throws IOException {
+ ArrayList keyList = new ArrayList(1);
+ if (region != null) {
+ if (region.containsKey(entryKey) ||
+ (sendTombstonesInRIResults(servConn, policy) && region.containsTombstone(entryKey))) {
+ appendInterestResponseKey(region, entryKey, entryKey, keyList,
+ "individual", servConn);
+ }
+ }
+ // Send the last chunk (the only chunk for individual and list keys)
+ // always send it back, even if the list is of zero size.
+ sendRegisterInterestResponseChunk(region, entryKey, keyList, true, servConn);
+ }
+
+ /**
+ * Process an interest request of type ALL_KEYS
+ *
+ * @param region
+ * the region
+ * @param policy
+ * the policy
+ * @throws IOException
+ */
+ private static void handleAllKeys(LocalRegion region,
+ InterestResultPolicy policy, ServerConnection servConn)
+ throws IOException {
+ ArrayList keyList = new ArrayList(maximumChunkSize);
+ if (region != null) {
+ for (Iterator it = region.keySet(sendTombstonesInRIResults(servConn, policy)).iterator(); it.hasNext();) {
+ appendInterestResponseKey(region, "ALL_KEYS", it.next(), keyList,
+ "ALL_KEYS", servConn);
+ }
+ }
+ // Send the last chunk (the only chunk for individual and list keys)
+ // always send it back, even if the list is of zero size.
+ sendRegisterInterestResponseChunk(region, "ALL_KEYS", keyList, true,
+ servConn);
+ }
+
+ /**
+ * @param region
+ * @param regex
+ * @param serializeValues
+ * @param servConn
+ * @throws IOException
+ */
+ private static void handleKVAllKeys(LocalRegion region, String regex,
+ boolean serializeValues, ServerConnection servConn) throws IOException {
+
+ if (region != null && region instanceof PartitionedRegion) {
+ handleKVKeysPR((PartitionedRegion) region, regex, serializeValues, servConn);
+ return;
+ }
+
+ VersionedObjectList values = new VersionedObjectList(maximumChunkSize,
+ true, region == null ? true : region.getAttributes()
+ .getConcurrencyChecksEnabled(), serializeValues);
+
+ if (region != null) {
+
+ VersionTag versionTag = null;
+ Object data = null;
+
+ Pattern keyPattern = null;
+ if (regex != null) {
+ keyPattern = Pattern.compile(regex);
+ }
+
+ for (Object key : region.keySet(true)) {
+ EntryEventImpl versionHolder = EntryEventImpl.createVersionTagHolder();
+ if (keyPattern != null) {
+ if (!(key instanceof String)) {
+ // key is not a String, cannot apply regex to this entry
+ continue;
+ }
+ if (!keyPattern.matcher((String) key).matches()) {
+ // key does not match the regex, this entry should not be
+ // returned.
+ continue;
+ }
+ }
+
+ ClientProxyMembershipID id = servConn == null ? null : servConn.getProxyID();
+ data = region.get(key, null, true, true, true, id, versionHolder, true, false);
+ versionTag = versionHolder.getVersionTag();
+ updateValues(values, key, data, versionTag);
+
+ if (values.size() == maximumChunkSize) {
+ sendNewRegisterInterestResponseChunk(region, regex != null ? regex : "ALL_KEYS", values, false, servConn);
+ values.clear();
+ }
+ } // for
+ } // if
+
+ // Send the last chunk (the only chunk for individual and list keys)
+ // always send it back, even if the list is of zero size.
+ sendNewRegisterInterestResponseChunk(region, regex != null ? regex : "ALL_KEYS", values, true, servConn);
+ }
+
+ private static void handleKVKeysPR(PartitionedRegion region, Object keyInfo,
+ boolean serializeValues, ServerConnection servConn) throws IOException {
+ int id = 0;
+ HashMap<Integer, HashSet> bucketKeys = null;
+
+ VersionedObjectList values = new VersionedObjectList(maximumChunkSize,
+ true, region.getConcurrencyChecksEnabled(), serializeValues);
+
+ if (keyInfo != null && keyInfo instanceof List) {
+ bucketKeys = new HashMap<Integer, HashSet>();
+ for (Object key : (List) keyInfo) {
+ id = PartitionedRegionHelper.getHashKey(region, null, key, null, null);
+ if (bucketKeys.containsKey(id)) {
+ bucketKeys.get(id).add(key);
+ } else {
+ HashSet<Object> keys = new HashSet<Object>();
+ keys.add(key);
+ bucketKeys.put(id, keys);
+ }
+ }
+ region.fetchEntries(bucketKeys, values, servConn);
+ } else { // keyInfo is a String
+ region.fetchEntries((String)keyInfo, values, servConn);
+ }
+
+ // Send the last chunk (the only chunk for individual and list keys)
+ // always send it back, even if the list is of zero size.
+ sendNewRegisterInterestResponseChunk(region, keyInfo != null ? keyInfo : "ALL_KEYS", values, true, servConn);
+ }
+
+ /**
+ * Copied from Get70.getValueAndIsObject(), except a minor change. (Make the
+ * method static instead of copying it here?)
+ *
+ * @param value
+ */
+ private static void updateValues(VersionedObjectList values, Object key, Object value, VersionTag versionTag) {
+ boolean isObject = true;
+
+ // If the value in the VM is a CachedDeserializable,
+ // get its value. If it is Token.REMOVED, Token.DESTROYED,
+ // Token.INVALID, or Token.LOCAL_INVALID
+ // set it to null. If it is NOT_AVAILABLE, get the value from
+ // disk. If it is already a byte[], set isObject to false.
+ boolean wasInvalid = false;
+ if (value instanceof CachedDeserializable) {
+ value = ((CachedDeserializable)value).getValue();
+ }
+ else if (value == Token.REMOVED_PHASE1 || value == Token.REMOVED_PHASE2 || value == Token.DESTROYED || value == Token.TOMBSTONE) {
+ value = null;
+ }
+ else if (value == Token.INVALID || value == Token.LOCAL_INVALID) {
+ value = null; // fix for bug 35884
+ wasInvalid = true;
+ }
+ else if (value instanceof byte[]) {
+ isObject = false;
+ }
+ boolean keyNotPresent = !wasInvalid && (value == null || value == Token.TOMBSTONE);
+
+ if (keyNotPresent) {
+ values.addObjectPartForAbsentKey(key, value, versionTag);
+ } else {
+ values.addObjectPart(key, value, isObject, versionTag);
+ }
+ }
+
+ public static void appendNewRegisterInterestResponseChunkFromLocal(LocalRegion region,
+ VersionedObjectList values, Object riKeys, Set keySet, ServerConnection servConn)
+ throws IOException {
+ Object key = null;
+ EntryEventImpl versionHolder = null;
+ ClientProxyMembershipID requestingClient = servConn == null ? null : servConn.getProxyID();
+ for (Iterator it = keySet.iterator(); it.hasNext();) {
+ key = it.next();
+ versionHolder = EntryEventImpl.createVersionTagHolder();
+
+ Object value = region.get(key, null, true, true, true, requestingClient, versionHolder, true, false);
+
+ updateValues(values, key, value, versionHolder.getVersionTag());
+
+ if (values.size() == maximumChunkSize) {
+ // Send the chunk and clear the list
+ // values.setKeys(null); // Now we need to send keys too.
+ sendNewRegisterInterestResponseChunk(region, riKeys != null ? riKeys : "ALL_KEYS", values, false, servConn);
+ values.clear();
+ }
+ } // for
+ }
+
+ /**
+ *
+ * @param region
+ * @param values {@link VersionedObjectList}
+ * @param riKeys
+ * @param set set of entries
+ * @param servConn
+ * @throws IOException
+ */
+ public static void appendNewRegisterInterestResponseChunk(LocalRegion region,
+ VersionedObjectList values, Object riKeys, Set set, ServerConnection servConn)
+ throws IOException {
+ for (Iterator<Map.Entry> it = set.iterator(); it.hasNext();) {
+ Map.Entry entry = it.next(); // Region.Entry or Map.Entry
+ if (entry instanceof Region.Entry) { // local entries
+ VersionTag vt = null;
+ Object key = null;
+ Object value = null;
+ if (entry instanceof EntrySnapshot) {
+ vt = ((EntrySnapshot) entry).getVersionTag();
+ key = ((EntrySnapshot) entry).getRegionEntry().getKey();
+ value = ((EntrySnapshot) entry).getRegionEntry().getValue(null);
+ updateValues(values, key, value, vt);
+ } else {
+ VersionStamp vs = ((NonTXEntry)entry).getRegionEntry().getVersionStamp();
+ vt = vs == null ? null : vs.asVersionTag();
+ key = entry.getKey();
+ value = ((NonTXEntry)entry).getRegionEntry()._getValueRetain(region, true);
+ try {
+ updateValues(values, key, value, vt);
+ } finally {
+ // TODO OFFHEAP: in the future we might want to delay this release
+ // until the "values" VersionedObjectList is released.
+ // But for now "updateValues" copies the off-heap value to the heap.
+ OffHeapHelper.release(value);
+ }
+ }
+ } else { // Map.Entry (remote entries)
+ ArrayList list = (ArrayList)entry.getValue();
+ Object value = list.get(0);
+ VersionTag tag = (VersionTag)list.get(1);
+ updateValues(values, entry.getKey(), value, tag);
+ }
+ if (values.size() == maximumChunkSize) {
+ // Send the chunk and clear the list
+ // values.setKeys(null); // Now we need to send keys too.
+ sendNewRegisterInterestResponseChunk(region, riKeys != null ? riKeys : "ALL_KEYS", values, false, servConn);
+ values.clear();
+ }
+ } // for
+ }
+
+ public static void sendNewRegisterInterestResponseChunk(LocalRegion region,
+ Object riKey, VersionedObjectList list, boolean lastChunk, ServerConnection servConn)
+ throws IOException {
+ ChunkedMessage chunkedResponseMsg = servConn.getRegisterInterestResponseMessage();
+ chunkedResponseMsg.setNumberOfParts(1);
+ chunkedResponseMsg.setLastChunk(lastChunk);
+ chunkedResponseMsg.addObjPart(list, zipValues);
+ String regionName = (region == null) ? " null " : region.getFullPath();
+ if (logger.isDebugEnabled()) {
+ String str = servConn.getName() + ": Sending"
+ + (lastChunk ? " last " : " ")
+ + "register interest response chunk for region: " + regionName
+ + " for keys: " + riKey + " chunk=<" + chunkedResponseMsg + ">";
+ logger.debug(str);
+ }
+
+ chunkedResponseMsg.sendChunk(servConn);
+ }
+
+ /**
+ * Process an interest request of type {@link InterestType#REGULAR_EXPRESSION}
+ *
+ * @param region
+ * the region
+ * @param regex
+ * the regex
+ * @param policy
+ * the policy
+ * @throws IOException
+ */
+ private static void handleRegEx(LocalRegion region, String regex,
+ InterestResultPolicy policy, ServerConnection servConn)
+ throws IOException {
+ if (region instanceof PartitionedRegion) {
+ // too bad java doesn't provide another way to do this...
+ handleRegExPR((PartitionedRegion)region, regex, policy, servConn);
+ return;
+ }
+ ArrayList keyList = new ArrayList(maximumChunkSize);
+ // Handle the regex pattern
+ Pattern keyPattern = Pattern.compile(regex);
+ if (region != null) {
+ for (Iterator it = region.keySet(sendTombstonesInRIResults(servConn, policy)).iterator(); it.hasNext();) {
+ Object entryKey = it.next();
+ if (!(entryKey instanceof String)) {
+ // key is not a String, cannot apply regex to this entry
+ continue;
+ }
+ if (!keyPattern.matcher((String)entryKey).matches()) {
+ // key does not match the regex, this entry should not be returned.
+ continue;
+ }
+
+ appendInterestResponseKey(region, regex, entryKey, keyList, "regex",
+ servConn);
+ }
+ }
+ // Send the last chunk (the only chunk for individual and list keys)
+ // always send it back, even if the list is of zero size.
+ sendRegisterInterestResponseChunk(region, regex, keyList, true, servConn);
+ }
+
+ /**
+ * Process an interest request of type {@link InterestType#REGULAR_EXPRESSION}
+ *
+ * @param region
+ * the region
+ * @param regex
+ * the regex
+ * @param policy
+ * the policy
+ * @throws IOException
+ */
+ private static void handleRegExPR(final PartitionedRegion region,
+ final String regex, final InterestResultPolicy policy,
+ final ServerConnection servConn) throws IOException {
+ final ArrayList keyList = new ArrayList(maximumChunkSize);
+ region.getKeysWithRegEx(regex, sendTombstonesInRIResults(servConn, policy), new PartitionedRegion.SetCollector() {
+ public void receiveSet(Set theSet) throws IOException {
+ appendInterestResponseKeys(region, regex, theSet, keyList, "regex",
+ servConn);
+ }
+ });
+ // Send the last chunk (the only chunk for individual and list keys)
+ // always send it back, even if the list is of zero size.
+ sendRegisterInterestResponseChunk(region, regex, keyList, true, servConn);
+ }
+
+ /**
+ * Process an interest request involving a list of keys
+ *
+ * @param region
+ * the region
+ * @param keyList
+ * the list of keys
+ * @param policy
+ * the policy
+ * @throws IOException
+ */
+ private static void handleListPR(final PartitionedRegion region,
+ final List keyList, final InterestResultPolicy policy,
+ final ServerConnection servConn) throws IOException {
+ final ArrayList newKeyList = new ArrayList(maximumChunkSize);
+ region.getKeysWithList(keyList, sendTombstonesInRIResults(servConn, policy), new PartitionedRegion.SetCollector() {
+ public void receiveSet(Set theSet) throws IOException {
+ appendInterestResponseKeys(region, keyList, theSet, newKeyList, "list",
+ servConn);
+ }
+ });
+ // Send the last chunk (the only chunk for individual and list keys)
+ // always send it back, even if the list is of zero size.
+ sendRegisterInterestResponseChunk(region, keyList, newKeyList, true,
+ servConn);
+ }
+
+ @SuppressWarnings("rawtypes")
+ private static void handleKVList(final LocalRegion region,
+ final List keyList, boolean serializeValues,
+ final ServerConnection servConn) throws IOException {
+
+ if (region != null && region instanceof PartitionedRegion) {
+ handleKVKeysPR((PartitionedRegion)region, keyList, serializeValues, servConn);
+ return;
+ }
+ VersionedObjectList values = new VersionedObjectList(maximumChunkSize,
+ true, region == null ? true : region.getAttributes()
+ .getConcurrencyChecksEnabled(), serializeValues);
+
+ // Handle list of keys
+ if (region != null) {
+ VersionTag versionTag = null;
+ Object data = null;
+
+ for (Iterator it = keyList.iterator(); it.hasNext();) {
+ Object key = it.next();
+ if (region.containsKey(key) || region.containsTombstone(key)) {
+ EntryEventImpl versionHolder = EntryEventImpl
+ .createVersionTagHolder();
+
+ ClientProxyMembershipID id = servConn == null ? null : servConn
+ .getProxyID();
+ data = region.get(key, null, true, true, true, id, versionHolder,
+ true, false);
+ versionTag = versionHolder.getVersionTag();
+ updateValues(values, key, data, versionTag);
+
+ if (values.size() == maximumChunkSize) {
+ // Send the chunk and clear the list
+ // values.setKeys(null); // Now we need to send keys too.
+ sendNewRegisterInterestResponseChunk(region, keyList, values, false, servConn);
+ values.clear();
+ }
+ }
+ }
+ }
+ // Send the last chunk (the only chunk for individual and list keys)
+ // always send it back, even if the list is of zero size.
+ sendNewRegisterInterestResponseChunk(region, keyList, values, true, servConn);
+ }
+
+ /**
+ * Append an interest response
+ *
+ * @param region
+ * the region (for debugging)
+ * @param riKey
+ * the registerInterest "key" (what the client is interested
+ * in)
+ * @param entryKey
+ * key we're responding to
+ * @param list
+ * list to append to
+ * @param kind
+ * for debugging
+ */
+ private static void appendInterestResponseKey(LocalRegion region,
+ Object riKey, Object entryKey, ArrayList list, String kind,
+ ServerConnection servConn) throws IOException {
+ list.add(entryKey);
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}: appendInterestResponseKey <{}>; list size was {}; region: {}",
+ servConn.getName(), entryKey, list.size(), region.getFullPath());
+ }
+ if (list.size() == maximumChunkSize) {
+ // Send the chunk and clear the list
+ sendRegisterInterestResponseChunk(region, riKey, list, false, servConn);
+ list.clear();
+ }
+ }
+
+ protected static void appendInterestResponseKeys(LocalRegion region,
+ Object riKey, Collection entryKeys, ArrayList collector, String riDescr,
+ ServerConnection servConn) throws IOException {
+ for (Iterator it = entryKeys.iterator(); it.hasNext();) {
+ appendInterestResponseKey(region, riKey, it.next(), collector, riDescr,
+ servConn);
+ }
+ }
+ }