You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2018/03/02 18:09:56 UTC

[geode] branch feature/GEODE-4685 updated: GEODE-4685: cleaned up the serialization of the StreamingOperation to handle the case where the cache does not exist

This is an automated email from the ASF dual-hosted git repository.

udo pushed a commit to branch feature/GEODE-4685
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/feature/GEODE-4685 by this push:
     new 8906ee2  GEODE-4685: cleaned up the serialization of the StreamingOperation to handle the case where the cache does not exist
8906ee2 is described below

commit 8906ee2c97dad8b3fcf973e18af71fcf35ca3645
Author: Udo <uk...@pivotal.io>
AuthorDate: Fri Mar 2 10:09:43 2018 -0800

    GEODE-4685: cleaned up the serialization of the StreamingOperation to handle the case where the cache does not exist
---
 .../internal/streaming/StreamingOperation.java     | 91 +++++++++++++---------
 .../PRQueryRemoteNodeExceptionDUnitTest.java       |  2 +-
 2 files changed, 56 insertions(+), 37 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/streaming/StreamingOperation.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/streaming/StreamingOperation.java
index 1ad5ec0..cb2cd45 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/streaming/StreamingOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/streaming/StreamingOperation.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.geode.cache.CacheClosedException;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.CancelException;
@@ -66,7 +67,6 @@ import org.apache.geode.pdx.internal.TypeRegistry;
  * StreamingOperation is an abstraction for sending messages to multiple (or single) recipient
  * requesting a potentially large amount of data and receiving the reply with data chunked into
  * several messages.
- *
  */
 public abstract class StreamingOperation {
   private static final Logger logger = LogService.getLogger();
@@ -79,22 +79,25 @@ public abstract class StreamingOperation {
 
   public final InternalDistributedSystem sys;
 
-  /** Creates a new instance of StreamingOperation */
+  /**
+   * Creates a new instance of StreamingOperation
+   */
   public StreamingOperation(InternalDistributedSystem sys) {
     this.sys = sys;
   }
 
   /**
    * Returns normally if succeeded to get data, otherwise throws an exception
-   *
    * @throws InterruptedException TODO-javadocs
    */
   public void getDataFromAll(Set recipients)
       throws org.apache.geode.cache.TimeoutException, InterruptedException {
-    if (Thread.interrupted())
+    if (Thread.interrupted()) {
       throw new InterruptedException();
-    if (recipients.isEmpty())
+    }
+    if (recipients.isEmpty()) {
       return;
+    }
 
     StreamingProcessor processor = new StreamingProcessor(this.sys, recipients);
     DistributionMessage m = createRequestMessage(recipients, processor);
@@ -115,30 +118,30 @@ public abstract class StreamingOperation {
     }
   }
 
-  /** Override in subclass to instantiate request message */
+  /**
+   * Override in subclass to instantiate request message
+   */
   protected abstract DistributionMessage createRequestMessage(Set recipients,
-      ReplyProcessor21 processor);
+                                                              ReplyProcessor21 processor);
 
   /**
    * Called from separate thread when reply is processed.
-   *
    * @return false if should abort (region was destroyed or cache was closed)
    */
   public boolean processChunk(List objects, InternalDistributedMember sender, int sequenceNum,
-      boolean lastInSequence) {
+                              boolean lastInSequence) {
     return processData(objects, sender, sequenceNum, lastInSequence);
   }
 
   /**
    * Override in subclass to do something useful with the data.
-   *
    * @param sequenceNum the sequence of this data (0-based), in case ordering matters
    * @param lastInSequence true if this is the last chunk in the sequence
    * @return false to abort
    */
 
   protected abstract boolean processData(List objects, InternalDistributedMember sender,
-      int sequenceNum, boolean lastInSequence);
+                                         int sequenceNum, boolean lastInSequence);
 
   public class StreamingProcessor extends ReplyProcessor21 {
     protected volatile boolean abort = false;
@@ -150,7 +153,9 @@ public abstract class StreamingOperation {
       int msgsProcessed = 0;
       int numMsgs = 0;
 
-      /** Return true if this is the very last reply msg to process for this member */
+      /**
+       * Return true if this is the very last reply msg to process for this member
+       */
       protected synchronized boolean trackMessage(StreamingReplyMessage m) {
         this.msgsProcessed++;
 
@@ -174,7 +179,7 @@ public abstract class StreamingOperation {
 
 
     public StreamingProcessor(final InternalDistributedSystem system,
-        InternalDistributedMember member) {
+                              InternalDistributedMember member) {
       super(system, member);
     }
 
@@ -213,12 +218,12 @@ public abstract class StreamingOperation {
         }
         if (isLast) {
           super.process(msg, false); // removes from members and cause us to
-                                     // ignore future messages received from that member
+          // ignore future messages received from that member
         }
       } finally {
         this.msgsBeingProcessed.decrementAndGet();
         checkIfDone(); // check to see if decrementing msgsBeingProcessed requires signalling to
-                       // proceed
+        // proceed
       }
     }
 
@@ -316,7 +321,7 @@ public abstract class StreamingOperation {
             // for the next objects, disallow stream from allocating more storage
             do {
               outStream.disallowExpansion(CHUNK_FULL); // sets the mark where rollback occurs on
-                                                       // CHUNK_FULL
+              // CHUNK_FULL
 
               nextObject = getNextReplyObject();
 
@@ -345,8 +350,8 @@ public abstract class StreamingOperation {
             break; // receiver no longer cares
           }
           outStream.reset(); // ready for reuse, assumes replyWithData
-                             // does not queue the message but outStream has
-                             // already been used
+          // does not queue the message but outStream has
+          // already been used
         } while (nextObject != Token.END_OF_STREAM);
         // } catch (CancelException e) {
         // // if cache is closed, we cannot send a reply (correct?)
@@ -385,7 +390,7 @@ public abstract class StreamingOperation {
     // }
 
     protected void replyWithData(ClusterDistributionManager dm, HeapDataOutputStream outStream,
-        int numObjects, int msgNum, boolean lastMsg) {
+                                 int numObjects, int msgNum, boolean lastMsg) {
       StreamingReplyMessage.send(getSender(), this.processorId, null, dm, outStream, numObjects,
           msgNum, lastMsg);
     }
@@ -421,39 +426,45 @@ public abstract class StreamingOperation {
 
   public static class StreamingReplyMessage extends ReplyMessage {
 
-    /** the number of this message */
+    /**
+     * the number of this message
+     */
     protected int msgNum;
 
-    /** whether this message is the last one in this series */
+    /**
+     * whether this message is the last one in this series
+     */
     protected boolean lastMsg;
 
     private transient HeapDataOutputStream chunkStream; // used only on sending side, null means
-                                                        // abort
+    // abort
     private transient int numObjects; // used only on sending side
     private transient List objectList = null; // used only on receiving side
 
     private boolean pdxReadSerialized = false; // used to read PDX types in serialized form.
     private transient boolean isCanceled = false; // used only on receiving side and if
-                                                  // messageProcessor is of type
-                                                  // PartitionedRegionQueryEvaluator.StreamingQueryPartitionResponse
+    // messageProcessor is of type
+    // PartitionedRegionQueryEvaluator.StreamingQueryPartitionResponse
 
     /**
      * @param chunkStream the data to send back, if null then all the following parameters are
-     *        ignored and any future replies from this member will be ignored, and the streaming of
-     *        chunks is considered aborted by the receiver.
-     *
+     * ignored and any future replies from this member will be ignored, and the streaming of
+     * chunks is considered aborted by the receiver.
      * @param msgNum message number in this series (0-based)
      * @param lastMsg if this is the last message in this series
      */
     public static void send(InternalDistributedMember recipient, int processorId,
-        ReplyException exception, DistributionManager dm, HeapDataOutputStream chunkStream,
-        int numObjects, int msgNum, boolean lastMsg) {
+                            ReplyException exception, DistributionManager dm,
+                            HeapDataOutputStream chunkStream,
+                            int numObjects, int msgNum, boolean lastMsg) {
       send(recipient, processorId, exception, dm, chunkStream, numObjects, msgNum, lastMsg, false);
     }
 
     public static void send(InternalDistributedMember recipient, int processorId,
-        ReplyException exception, DistributionManager dm, HeapDataOutputStream chunkStream,
-        int numObjects, int msgNum, boolean lastMsg, boolean pdxReadSerialized) {
+                            ReplyException exception, DistributionManager dm,
+                            HeapDataOutputStream chunkStream,
+                            int numObjects, int msgNum, boolean lastMsg,
+                            boolean pdxReadSerialized) {
       StreamingReplyMessage replyMessage = new StreamingReplyMessage();
       replyMessage.processorId = processorId;
 
@@ -483,7 +494,9 @@ public abstract class StreamingOperation {
       return isCanceled;
     }
 
-    /** Return the objects in this chunk as a List, used only on receiving side */
+    /**
+     * Return the objects in this chunk as a List, used only on receiving side
+     */
     public List getObjects() {
       return this.objectList;
     }
@@ -504,8 +517,14 @@ public abstract class StreamingOperation {
       this.pdxReadSerialized = in.readBoolean();
       Version senderVersion = InternalDataSerializer.getVersionForDataStream(in);
       boolean isSenderAbove_8_1 = senderVersion.compareTo(Version.GFE_81) > 0;
-      InternalCache cache = (InternalCache) CacheFactory.getAnyInstance();
-      Boolean initialPdxReadSerialized = cache.getPdxRegistry().getPdxReadSerializedOverride();
+      InternalCache cache = null;
+      Boolean initialPdxReadSerialized = false;
+      try {
+        cache = (InternalCache) CacheFactory.getAnyInstance();
+        initialPdxReadSerialized = cache.getPdxRegistry().getPdxReadSerializedOverride();
+      } catch (CacheClosedException e) {
+
+      }
       if (n == -1) {
         this.objectList = null;
       } else {
@@ -513,7 +532,7 @@ public abstract class StreamingOperation {
         this.objectList = new ArrayList(n);
         // Check if the PDX types needs to be kept in serialized form.
         // This will make readObject() to return PdxInstance form.
-        if (this.pdxReadSerialized) {
+        if (this.pdxReadSerialized && cache != null) {
           cache.getPdxRegistry().setPdxReadSerializedOverride(true);
         }
         try {
@@ -559,7 +578,7 @@ public abstract class StreamingOperation {
             }
           }
         } finally {
-          if (this.pdxReadSerialized) {
+          if (this.pdxReadSerialized && cache != null) {
             cache.getPdxRegistry().setPdxReadSerializedOverride(initialPdxReadSerialized);
           }
         }
diff --git a/geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRQueryRemoteNodeExceptionDUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRQueryRemoteNodeExceptionDUnitTest.java
index bdfbddc..48cfcd3 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRQueryRemoteNodeExceptionDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRQueryRemoteNodeExceptionDUnitTest.java
@@ -78,7 +78,7 @@ public class PRQueryRemoteNodeExceptionDUnitTest extends CacheTestCase {
   public void tearDown() throws Exception {
     disconnectAllFromDS();
     invokeInEveryVM(() -> PRQueryDUnitHelper.setCache(null));
-    invokeInEveryVM(() -> QueryObserverHolder.reset());
+    invokeInEveryVM(QueryObserverHolder::reset);
   }
 
   @Override

-- 
To stop receiving notification emails like this one, please contact
udo@apache.org.