You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2018/03/02 18:09:56 UTC
[geode] branch feature/GEODE-4685 updated: GEODE-4685: cleaned up
the serialization of the StreamingOperation to handle the case where the
cache does not exist
This is an automated email from the ASF dual-hosted git repository.
udo pushed a commit to branch feature/GEODE-4685
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-4685 by this push:
new 8906ee2 GEODE-4685: cleaned up the serialization of the StreamingOperation to handle the case where the cache does not exist
8906ee2 is described below
commit 8906ee2c97dad8b3fcf973e18af71fcf35ca3645
Author: Udo <uk...@pivotal.io>
AuthorDate: Fri Mar 2 10:09:43 2018 -0800
GEODE-4685: cleaned up the serialization of the StreamingOperation to handle the case where the cache does not exist
---
.../internal/streaming/StreamingOperation.java | 91 +++++++++++++---------
.../PRQueryRemoteNodeExceptionDUnitTest.java | 2 +-
2 files changed, 56 insertions(+), 37 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/streaming/StreamingOperation.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/streaming/StreamingOperation.java
index 1ad5ec0..cb2cd45 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/streaming/StreamingOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/streaming/StreamingOperation.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.geode.cache.CacheClosedException;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelException;
@@ -66,7 +67,6 @@ import org.apache.geode.pdx.internal.TypeRegistry;
* StreamingOperation is an abstraction for sending messages to multiple (or single) recipient
* requesting a potentially large amount of data and receiving the reply with data chunked into
* several messages.
- *
*/
public abstract class StreamingOperation {
private static final Logger logger = LogService.getLogger();
@@ -79,22 +79,25 @@ public abstract class StreamingOperation {
public final InternalDistributedSystem sys;
- /** Creates a new instance of StreamingOperation */
+ /**
+ * Creates a new instance of StreamingOperation
+ */
public StreamingOperation(InternalDistributedSystem sys) {
this.sys = sys;
}
/**
* Returns normally if succeeded to get data, otherwise throws an exception
- *
* @throws InterruptedException TODO-javadocs
*/
public void getDataFromAll(Set recipients)
throws org.apache.geode.cache.TimeoutException, InterruptedException {
- if (Thread.interrupted())
+ if (Thread.interrupted()) {
throw new InterruptedException();
- if (recipients.isEmpty())
+ }
+ if (recipients.isEmpty()) {
return;
+ }
StreamingProcessor processor = new StreamingProcessor(this.sys, recipients);
DistributionMessage m = createRequestMessage(recipients, processor);
@@ -115,30 +118,30 @@ public abstract class StreamingOperation {
}
}
- /** Override in subclass to instantiate request message */
+ /**
+ * Override in subclass to instantiate request message
+ */
protected abstract DistributionMessage createRequestMessage(Set recipients,
- ReplyProcessor21 processor);
+ ReplyProcessor21 processor);
/**
* Called from separate thread when reply is processed.
- *
* @return false if should abort (region was destroyed or cache was closed)
*/
public boolean processChunk(List objects, InternalDistributedMember sender, int sequenceNum,
- boolean lastInSequence) {
+ boolean lastInSequence) {
return processData(objects, sender, sequenceNum, lastInSequence);
}
/**
* Override in subclass to do something useful with the data.
- *
* @param sequenceNum the sequence of this data (0-based), in case ordering matters
* @param lastInSequence true if this is the last chunk in the sequence
* @return false to abort
*/
protected abstract boolean processData(List objects, InternalDistributedMember sender,
- int sequenceNum, boolean lastInSequence);
+ int sequenceNum, boolean lastInSequence);
public class StreamingProcessor extends ReplyProcessor21 {
protected volatile boolean abort = false;
@@ -150,7 +153,9 @@ public abstract class StreamingOperation {
int msgsProcessed = 0;
int numMsgs = 0;
- /** Return true if this is the very last reply msg to process for this member */
+ /**
+ * Return true if this is the very last reply msg to process for this member
+ */
protected synchronized boolean trackMessage(StreamingReplyMessage m) {
this.msgsProcessed++;
@@ -174,7 +179,7 @@ public abstract class StreamingOperation {
public StreamingProcessor(final InternalDistributedSystem system,
- InternalDistributedMember member) {
+ InternalDistributedMember member) {
super(system, member);
}
@@ -213,12 +218,12 @@ public abstract class StreamingOperation {
}
if (isLast) {
super.process(msg, false); // removes from members and cause us to
- // ignore future messages received from that member
+ // ignore future messages received from that member
}
} finally {
this.msgsBeingProcessed.decrementAndGet();
checkIfDone(); // check to see if decrementing msgsBeingProcessed requires signalling to
- // proceed
+ // proceed
}
}
@@ -316,7 +321,7 @@ public abstract class StreamingOperation {
// for the next objects, disallow stream from allocating more storage
do {
outStream.disallowExpansion(CHUNK_FULL); // sets the mark where rollback occurs on
- // CHUNK_FULL
+ // CHUNK_FULL
nextObject = getNextReplyObject();
@@ -345,8 +350,8 @@ public abstract class StreamingOperation {
break; // receiver no longer cares
}
outStream.reset(); // ready for reuse, assumes replyWithData
- // does not queue the message but outStream has
- // already been used
+ // does not queue the message but outStream has
+ // already been used
} while (nextObject != Token.END_OF_STREAM);
// } catch (CancelException e) {
// // if cache is closed, we cannot send a reply (correct?)
@@ -385,7 +390,7 @@ public abstract class StreamingOperation {
// }
protected void replyWithData(ClusterDistributionManager dm, HeapDataOutputStream outStream,
- int numObjects, int msgNum, boolean lastMsg) {
+ int numObjects, int msgNum, boolean lastMsg) {
StreamingReplyMessage.send(getSender(), this.processorId, null, dm, outStream, numObjects,
msgNum, lastMsg);
}
@@ -421,39 +426,45 @@ public abstract class StreamingOperation {
public static class StreamingReplyMessage extends ReplyMessage {
- /** the number of this message */
+ /**
+ * the number of this message
+ */
protected int msgNum;
- /** whether this message is the last one in this series */
+ /**
+ * whether this message is the last one in this series
+ */
protected boolean lastMsg;
private transient HeapDataOutputStream chunkStream; // used only on sending side, null means
- // abort
+ // abort
private transient int numObjects; // used only on sending side
private transient List objectList = null; // used only on receiving side
private boolean pdxReadSerialized = false; // used to read PDX types in serialized form.
private transient boolean isCanceled = false; // used only on receiving side and if
- // messageProcessor is of type
- // PartitionedRegionQueryEvaluator.StreamingQueryPartitionResponse
+ // messageProcessor is of type
+ // PartitionedRegionQueryEvaluator.StreamingQueryPartitionResponse
/**
* @param chunkStream the data to send back, if null then all the following parameters are
- * ignored and any future replies from this member will be ignored, and the streaming of
- * chunks is considered aborted by the receiver.
- *
+ * ignored and any future replies from this member will be ignored, and the streaming of
+ * chunks is considered aborted by the receiver.
* @param msgNum message number in this series (0-based)
* @param lastMsg if this is the last message in this series
*/
public static void send(InternalDistributedMember recipient, int processorId,
- ReplyException exception, DistributionManager dm, HeapDataOutputStream chunkStream,
- int numObjects, int msgNum, boolean lastMsg) {
+ ReplyException exception, DistributionManager dm,
+ HeapDataOutputStream chunkStream,
+ int numObjects, int msgNum, boolean lastMsg) {
send(recipient, processorId, exception, dm, chunkStream, numObjects, msgNum, lastMsg, false);
}
public static void send(InternalDistributedMember recipient, int processorId,
- ReplyException exception, DistributionManager dm, HeapDataOutputStream chunkStream,
- int numObjects, int msgNum, boolean lastMsg, boolean pdxReadSerialized) {
+ ReplyException exception, DistributionManager dm,
+ HeapDataOutputStream chunkStream,
+ int numObjects, int msgNum, boolean lastMsg,
+ boolean pdxReadSerialized) {
StreamingReplyMessage replyMessage = new StreamingReplyMessage();
replyMessage.processorId = processorId;
@@ -483,7 +494,9 @@ public abstract class StreamingOperation {
return isCanceled;
}
- /** Return the objects in this chunk as a List, used only on receiving side */
+ /**
+ * Return the objects in this chunk as a List, used only on receiving side
+ */
public List getObjects() {
return this.objectList;
}
@@ -504,8 +517,14 @@ public abstract class StreamingOperation {
this.pdxReadSerialized = in.readBoolean();
Version senderVersion = InternalDataSerializer.getVersionForDataStream(in);
boolean isSenderAbove_8_1 = senderVersion.compareTo(Version.GFE_81) > 0;
- InternalCache cache = (InternalCache) CacheFactory.getAnyInstance();
- Boolean initialPdxReadSerialized = cache.getPdxRegistry().getPdxReadSerializedOverride();
+ InternalCache cache = null;
+ Boolean initialPdxReadSerialized = false;
+ try {
+ cache = (InternalCache) CacheFactory.getAnyInstance();
+ initialPdxReadSerialized = cache.getPdxRegistry().getPdxReadSerializedOverride();
+ } catch (CacheClosedException e) {
+
+ }
if (n == -1) {
this.objectList = null;
} else {
@@ -513,7 +532,7 @@ public abstract class StreamingOperation {
this.objectList = new ArrayList(n);
// Check if the PDX types needs to be kept in serialized form.
// This will make readObject() to return PdxInstance form.
- if (this.pdxReadSerialized) {
+ if (this.pdxReadSerialized && cache != null) {
cache.getPdxRegistry().setPdxReadSerializedOverride(true);
}
try {
@@ -559,7 +578,7 @@ public abstract class StreamingOperation {
}
}
} finally {
- if (this.pdxReadSerialized) {
+ if (this.pdxReadSerialized && cache != null) {
cache.getPdxRegistry().setPdxReadSerializedOverride(initialPdxReadSerialized);
}
}
diff --git a/geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRQueryRemoteNodeExceptionDUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRQueryRemoteNodeExceptionDUnitTest.java
index bdfbddc..48cfcd3 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRQueryRemoteNodeExceptionDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRQueryRemoteNodeExceptionDUnitTest.java
@@ -78,7 +78,7 @@ public class PRQueryRemoteNodeExceptionDUnitTest extends CacheTestCase {
public void tearDown() throws Exception {
disconnectAllFromDS();
invokeInEveryVM(() -> PRQueryDUnitHelper.setCache(null));
- invokeInEveryVM(() -> QueryObserverHolder.reset());
+ invokeInEveryVM(QueryObserverHolder::reset);
}
@Override
--
To stop receiving notification emails like this one, please contact
udo@apache.org.