You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2019/12/27 19:15:49 UTC

[geode] 01/01: GEODE-7626: Break dependency on LocalViewMessage in membership

This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch feature/GEODE-7626
in repository https://gitbox.apache.org/repos/asf/geode.git

commit de47eef421516291d6e8618fd797185e4efd4eda
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Fri Dec 27 11:07:55 2019 -0800

    GEODE-7626: Break dependency on LocalViewMessage in membership
    
    LocalViewMessage was a DistributionMessage executed in an executor owned
    by ClusterDistributionmanager.  This arrangement was very convoluted
    because CDM only had upstream involvement in membership view
    installation.
    
    This PR moves view installation into GMSMembership using a
    single-threaded executor similar to what CDM used but without
    statistics.  Stats for the view installation thread have never been
    useful so I have not retained that functionality.
    
    There are already many tests for view installation, so while I've
    modified a couple I haven't added any new tests.
---
 .../internal/membership/MembershipJUnitTest.java   |  7 +-
 .../membership/gms/GMSMembershipJUnitTest.java     |  8 +--
 .../internal/ClusterOperationExecutors.java        | 42 +----------
 .../distributed/internal/DistributionStats.java    | 51 -------------
 .../internal/PooledExecutorWithDMStats.java        |  1 -
 .../membership/adapter/LocalViewMessage.java       | 84 ----------------------
 .../internal/membership/gms/GMSMembership.java     | 57 +++++++++++----
 .../MembershipDependenciesJUnitTest.java           |  6 +-
 8 files changed, 53 insertions(+), 203 deletions(-)

diff --git a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/MembershipJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/MembershipJUnitTest.java
index 7ccbdf0..513db62 100755
--- a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/MembershipJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/MembershipJUnitTest.java
@@ -22,6 +22,7 @@ import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
 import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
 import static org.apache.geode.distributed.ConfigurationProperties.MEMBER_TIMEOUT;
 import static org.apache.geode.distributed.internal.membership.adapter.TcpSocketCreatorAdapter.asTcpSocketCreator;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -238,7 +239,8 @@ public class MembershipJUnitTest {
       m2.disconnect(false);
       assertTrue(!m2.isConnected());
 
-      assertTrue(m1.getView().size() == 1);
+      Membership waitingMember = m1;
+      await().untilAsserted(() -> assertTrue(waitingMember.getView().size() == 1));
 
       return result;
     } finally {
@@ -412,7 +414,8 @@ public class MembershipJUnitTest {
       m2.disconnect(false);
       assertTrue(!m2.isConnected());
 
-      assertTrue(m1.getView().size() == 1);
+      Membership waitingMember = m1;
+      await().untilAsserted(() -> assertTrue(waitingMember.getView().size() == 1));
 
     } finally {
 
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/GMSMembershipJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/GMSMembershipJUnitTest.java
index 44ff1f0..95ce326 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/GMSMembershipJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/GMSMembershipJUnitTest.java
@@ -23,6 +23,7 @@ import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
 import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
 import static org.apache.geode.distributed.ConfigurationProperties.MCAST_TTL;
 import static org.apache.geode.distributed.ConfigurationProperties.MEMBER_TIMEOUT;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -56,7 +57,6 @@ import org.apache.geode.distributed.internal.DistributionConfigImpl;
 import org.apache.geode.distributed.internal.DistributionMessage;
 import org.apache.geode.distributed.internal.HighPriorityAckedMessage;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.distributed.internal.membership.adapter.LocalViewMessage;
 import org.apache.geode.distributed.internal.membership.adapter.ServiceConfig;
 import org.apache.geode.distributed.internal.membership.gms.GMSMembership.StartupEvent;
 import org.apache.geode.distributed.internal.membership.gms.Services.Stopper;
@@ -79,8 +79,6 @@ import org.apache.geode.test.junit.categories.MembershipTest;
 @Category({MembershipTest.class})
 public class GMSMembershipJUnitTest {
 
-
-
   private Services services;
   private MembershipConfig mockConfig;
   private DistributionConfig distConfig;
@@ -273,10 +271,8 @@ public class GMSMembershipJUnitTest {
     // for code coverage also install a view after we finish joining but before
     // event processing has started. This should notify the distribution manager
     // with a LocalViewMessage to process the view
-    reset(listener);
     manager.handleOrDeferViewEvent(new MembershipView(myMemberId, 5, viewmembers));
-    assertEquals(0, manager.getStartupEvents().size());
-    verify(messageListener).messageReceived(isA(LocalViewMessage.class));
+    await().untilAsserted(() -> assertEquals(manager.getView().getViewId(), 5));
 
     // process a suspect now - it will be passed to the listener
     reset(listener);
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterOperationExecutors.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterOperationExecutors.java
index 587ebf2..ddd2aff 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterOperationExecutors.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterOperationExecutors.java
@@ -157,13 +157,6 @@ public class ClusterOperationExecutors implements OperationExecutors {
   private ExecutorService serialThread;
 
   /**
-   * Message processing executor for view messages
-   *
-   * @see ViewAckMessage
-   */
-  private ExecutorService viewThread;
-
-  /**
    * If using a throttling queue for the serialThread, we cache the queue here so we can see if
    * delivery would block
    */
@@ -227,11 +220,6 @@ public class ClusterOperationExecutors implements OperationExecutors {
 
     }
 
-    viewThread =
-        CoreLoggingExecutors.newSerialThreadPoolWithUnlimitedFeed("View Message Processor",
-            thread -> stats.incViewThreadStarts(), this::doViewThread,
-            stats.getViewProcessorHelper(), threadMonitor);
-
     threadPool =
         CoreLoggingExecutors.newThreadPoolWithFeedStatistics("Pooled Message Processor ",
             thread -> stats.incProcessingThreadStarts(), this::doProcessingThread,
@@ -306,8 +294,6 @@ public class ClusterOperationExecutors implements OperationExecutors {
         return getThreadPool();
       case SERIAL_EXECUTOR:
         return getSerialExecutor(sender);
-      case VIEW_EXECUTOR:
-        return viewThread;
       case HIGH_PRIORITY_EXECUTOR:
         return getHighPriorityThreadPool();
       case WAITING_POOL_EXECUTOR:
@@ -446,18 +432,6 @@ public class ClusterOperationExecutors implements OperationExecutors {
     }
   }
 
-  private void doViewThread(Runnable command) {
-    stats.incNumViewThreads(1);
-    try {
-      ConnectionTable.threadWantsSharedResources();
-      Connection.makeReaderThread();
-      runUntilShutdown(command);
-    } finally {
-      ConnectionTable.releaseThreadsSockets();
-      stats.incNumViewThreads(-1);
-    }
-  }
-
   private void doSerialThread(Runnable command) {
     stats.incNumSerialThreads(1);
     try {
@@ -500,13 +474,6 @@ public class ClusterOperationExecutors implements OperationExecutors {
     if (es != null) {
       es.shutdown();
     }
-    es = viewThread;
-    if (es != null) {
-      // Hmmm...OK, I'll let any view events currently in the queue be
-      // processed. Not sure it's very important whether they get
-      // handled...
-      es.shutdown();
-    }
     if (serialQueuedExecutorPool != null) {
       serialQueuedExecutorPool.shutdown();
     }
@@ -548,7 +515,7 @@ public class ClusterOperationExecutors implements OperationExecutors {
     long start = System.currentTimeMillis();
     long remaining = timeInMillis;
 
-    ExecutorService[] allExecutors = new ExecutorService[] {serialThread, viewThread,
+    ExecutorService[] allExecutors = new ExecutorService[] {serialThread,
         functionExecutionThread, functionExecutionPool, partitionedRegionThread,
         partitionedRegionPool, highPriorityPool, waitingPool,
         prMetaDataCleanupThreadPool, threadPool};
@@ -597,10 +564,6 @@ public class ClusterOperationExecutors implements OperationExecutors {
         stillAlive = true;
         culprits.append(" serial thread;");
       }
-      if (executorAlive(viewThread, "view thread")) {
-        stillAlive = true;
-        culprits.append(" view thread;");
-      }
       if (executorAlive(partitionedRegionThread, "partitioned region thread")) {
         stillAlive = true;
         culprits.append(" partitioned region thread;");
@@ -651,9 +614,6 @@ public class ClusterOperationExecutors implements OperationExecutors {
     if (serialThread != null) {
       serialThread.shutdownNow();
     }
-    if (viewThread != null) {
-      viewThread.shutdownNow();
-    }
     if (functionExecutionThread != null) {
       functionExecutionThread.shutdownNow();
     }
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java
index 5061032..b08a512 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java
@@ -204,7 +204,6 @@ public class DistributionStats implements DMStats {
   private static final int messageBytesBeingReceivedId;
 
   private static final int serialThreadStartsId;
-  private static final int viewThreadStartsId;
   private static final int processingThreadStartsId;
   private static final int highPriorityThreadStartsId;
   private static final int waitingThreadStartsId;
@@ -215,9 +214,7 @@ public class DistributionStats implements DMStats {
 
   private static final int replyHandoffTimeId;
 
-  private static final int viewThreadsId;
   private static final int serialThreadJobsId;
-  private static final int viewProcessorThreadJobsId;
   private static final int serialPooledThreadJobsId;
   private static final int pooledMessageThreadJobsId;
   private static final int highPriorityThreadJobsId;
@@ -243,8 +240,6 @@ public class DistributionStats implements DMStats {
   private static final int tcpFinalCheckResponsesSentId;
   private static final int tcpFinalCheckResponsesReceivedId;
   private static final int udpFinalCheckRequestsSentId;
-  private static final int udpFinalCheckRequestsReceivedId;
-  private static final int udpFinalCheckResponsesSentId;
   private static final int udpFinalCheckResponsesReceivedId;
 
   static {
@@ -391,11 +386,8 @@ public class DistributionStats implements DMStats {
         "The number of messages currently being processed by partitioned region threads";
     final String functionExecutionThreadJobsDesc =
         "The number of messages currently being processed by function execution threads";
-    final String viewThreadsDesc = "The number of threads currently processing view messages.";
     final String serialThreadJobsDesc =
         "The number of messages currently being processed by serial threads.";
-    final String viewThreadJobsDesc =
-        "The number of messages currently being processed by view threads.";
     final String serialPooledThreadJobsDesc =
         "The number of messages currently being processed by pooled serial processor threads.";
     final String processingThreadJobsDesc =
@@ -681,9 +673,6 @@ public class DistributionStats implements DMStats {
         f.createLongCounter("serialThreadStarts",
             "Total number of times a thread has been created for the serial message executor.",
             "starts", false),
-        f.createLongCounter("viewThreadStarts",
-            "Total number of times a thread has been created for the view message executor.",
-            "starts", false),
         f.createLongCounter("processingThreadStarts",
             "Total number of times a thread has been created for the pool processing normal messages.",
             "starts", false),
@@ -710,9 +699,7 @@ public class DistributionStats implements DMStats {
             "messages"),
         f.createIntGauge("functionExecutionThreadJobs", functionExecutionThreadJobsDesc,
             "messages"),
-        f.createIntGauge("viewThreads", viewThreadsDesc, "threads"),
         f.createIntGauge("serialThreadJobs", serialThreadJobsDesc, "messages"),
-        f.createIntGauge("viewThreadJobs", viewThreadJobsDesc, "messages"),
         f.createIntGauge("serialPooledThreadJobs", serialPooledThreadJobsDesc, "messages"),
         f.createIntGauge("processingThreadJobs", processingThreadJobsDesc, "messages"),
         f.createIntGauge("highPriorityThreadJobs", highPriorityThreadJobsDesc, "messages"),
@@ -898,7 +885,6 @@ public class DistributionStats implements DMStats {
     messageBytesBeingReceivedId = type.nameToId("messageBytesBeingReceived");
 
     serialThreadStartsId = type.nameToId("serialThreadStarts");
-    viewThreadStartsId = type.nameToId("viewThreadStarts");
     processingThreadStartsId = type.nameToId("processingThreadStarts");
     highPriorityThreadStartsId = type.nameToId("highPriorityThreadStarts");
     waitingThreadStartsId = type.nameToId("waitingThreadStarts");
@@ -909,9 +895,7 @@ public class DistributionStats implements DMStats {
     replyHandoffTimeId = type.nameToId("replyHandoffTime");
     partitionedRegionThreadJobsId = type.nameToId("partitionedRegionThreadJobs");
     functionExecutionThreadJobsId = type.nameToId("functionExecutionThreadJobs");
-    viewThreadsId = type.nameToId("viewThreads");
     serialThreadJobsId = type.nameToId("serialThreadJobs");
-    viewProcessorThreadJobsId = type.nameToId("viewThreadJobs");
     serialPooledThreadJobsId = type.nameToId("serialPooledThreadJobs");
     pooledMessageThreadJobsId = type.nameToId("processingThreadJobs");
     highPriorityThreadJobsId = type.nameToId("highPriorityThreadJobs");
@@ -937,8 +921,6 @@ public class DistributionStats implements DMStats {
     tcpFinalCheckResponsesSentId = type.nameToId("tcpFinalCheckResponsesSent");
     tcpFinalCheckResponsesReceivedId = type.nameToId("tcpFinalCheckResponsesReceived");
     udpFinalCheckRequestsSentId = type.nameToId("udpFinalCheckRequestsSent");
-    udpFinalCheckRequestsReceivedId = type.nameToId("udpFinalCheckRequestsReceived");
-    udpFinalCheckResponsesSentId = type.nameToId("udpFinalCheckResponsesSent");
     udpFinalCheckResponsesReceivedId = type.nameToId("udpFinalCheckResponsesReceived");
   }
 
@@ -2322,10 +2304,6 @@ public class DistributionStats implements DMStats {
     stats.incLong(serialThreadStartsId, 1);
   }
 
-  public void incViewThreadStarts() {
-    stats.incLong(viewThreadStartsId, 1);
-  }
-
   public void incProcessingThreadStarts() {
     stats.incLong(processingThreadStartsId, 1);
   }
@@ -2367,10 +2345,6 @@ public class DistributionStats implements DMStats {
     this.stats.incInt(functionExecutionThreadJobsId, i);
   }
 
-  public void incNumViewThreads(int threads) {
-    this.stats.incInt(viewThreadsId, threads);
-  }
-
   public PoolStatHelper getSerialProcessorHelper() {
     return new PoolStatHelper() {
       @Override
@@ -2393,31 +2367,6 @@ public class DistributionStats implements DMStats {
     this.stats.incInt(serialThreadJobsId, jobs);
   }
 
-  public PoolStatHelper getViewProcessorHelper() {
-    return new PoolStatHelper() {
-      @Override
-      public void startJob() {
-        incViewProcessorThreadJobs(1);
-        if (logger.isTraceEnabled()) {
-          logger.trace("[DM.SerialQueuedExecutor.execute] numViewThreads={}", getNumViewThreads());
-        }
-      }
-
-      @Override
-      public void endJob() {
-        incViewProcessorThreadJobs(-1);
-      }
-    };
-  }
-
-  public int getNumViewThreads() {
-    return this.stats.getInt(viewThreadsId);
-  }
-
-  protected void incViewProcessorThreadJobs(int jobs) {
-    this.stats.incInt(viewProcessorThreadJobsId, jobs);
-  }
-
   public PoolStatHelper getSerialPooledProcessorHelper() {
     return new PoolStatHelper() {
       @Override
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/PooledExecutorWithDMStats.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/PooledExecutorWithDMStats.java
index 9394ac7..e6472b4 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/PooledExecutorWithDMStats.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/PooledExecutorWithDMStats.java
@@ -216,7 +216,6 @@ public class PooledExecutorWithDMStats extends ThreadPoolExecutor {
           RejectedExecutionException e = new RejectedExecutionException(
               "interrupted");
           e.initCause(ie);
-          throw e;
         }
       }
     }
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/adapter/LocalViewMessage.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/adapter/LocalViewMessage.java
deleted file mode 100755
index 700c8a6..0000000
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/adapter/LocalViewMessage.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.distributed.internal.membership.adapter;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.geode.distributed.internal.ClusterDistributionManager;
-import org.apache.geode.distributed.internal.ClusterOperationExecutors;
-import org.apache.geode.distributed.internal.SerialDistributionMessage;
-import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.distributed.internal.membership.gms.GMSMembership;
-import org.apache.geode.distributed.internal.membership.gms.api.MembershipView;
-import org.apache.geode.internal.serialization.DeserializationContext;
-import org.apache.geode.internal.serialization.SerializationContext;
-
-
-/**
- * LocalViewMessage is used to pass a new membership view to the GemFire cache in an orderly manner.
- * It is intended to be queued with serially executed messages so that the view takes effect at the
- * proper time.
- *
- */
-
-public class LocalViewMessage extends SerialDistributionMessage {
-
-  private GMSMembership<InternalDistributedMember> manager;
-  private long viewId;
-  private MembershipView<InternalDistributedMember> view;
-
-  public LocalViewMessage(InternalDistributedMember addr, long viewId,
-      MembershipView<InternalDistributedMember> view,
-      GMSMembership<InternalDistributedMember> manager) {
-    super();
-    this.sender = addr;
-    this.viewId = viewId;
-    this.view = view;
-    this.manager = manager;
-  }
-
-  @Override
-  public int getProcessorType() {
-    return ClusterOperationExecutors.VIEW_EXECUTOR;
-  }
-
-
-  @Override
-  protected void process(ClusterDistributionManager dm) {
-    // dm.getLogger().info("view message processed", new Exception());
-    manager.processView(viewId, view);
-  }
-
-  // These "messages" are never DataSerialized
-
-  @Override
-  public int getDSFID() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void toData(DataOutput out,
-      SerializationContext context) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void fromData(DataInput in,
-      DeserializationContext context) throws IOException, ClassNotFoundException {
-    throw new UnsupportedOperationException();
-  }
-}
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/GMSMembership.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/GMSMembership.java
index ad8b5a6..21ac3e1 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/GMSMembership.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/GMSMembership.java
@@ -27,10 +27,17 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.Lock;
@@ -46,8 +53,6 @@ import org.apache.geode.SystemFailure;
 import org.apache.geode.annotations.internal.MakeNotStatic;
 import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.distributed.internal.StartupMessage;
-import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.distributed.internal.membership.adapter.LocalViewMessage;
 import org.apache.geode.distributed.internal.membership.gms.api.LifecycleListener;
 import org.apache.geode.distributed.internal.membership.gms.api.MemberDisconnectedException;
 import org.apache.geode.distributed.internal.membership.gms.api.MemberIdentifier;
@@ -66,6 +71,7 @@ import org.apache.geode.distributed.internal.membership.gms.interfaces.Manager;
 import org.apache.geode.internal.serialization.Version;
 import org.apache.geode.logging.internal.executors.LoggingExecutors;
 import org.apache.geode.logging.internal.executors.LoggingThread;
+import org.apache.geode.logging.internal.executors.LoggingThreadFactory;
 
 public class GMSMembership<ID extends MemberIdentifier> implements Membership<ID> {
   private static final Logger logger = Services.getLogger();
@@ -98,6 +104,8 @@ public class GMSMembership<ID extends MemberIdentifier> implements Membership<ID
 
   private volatile boolean isCloseInProgress;
 
+  private ExecutorService viewExecutor;
+
   /**
    * Trick class to make the startup synch more visible in stack traces
    *
@@ -652,6 +660,10 @@ public class GMSMembership<ID extends MemberIdentifier> implements Membership<ID
     this.listener = listener;
     this.messageListener = messageListener;
     this.gmsManager = new ManagerImpl();
+    LinkedBlockingQueue<Runnable> feed = new LinkedBlockingQueue<>();
+    ThreadFactory threadFactory = new LoggingThreadFactory("Geode View Processor");
+    this.viewExecutor = new ThreadPoolExecutor(1, 1, 30,
+        TimeUnit.SECONDS, feed, threadFactory, new ViewExecutorBlockHandler(feed));
   }
 
   public Manager<ID> getGMSManager() {
@@ -977,18 +989,9 @@ public class GMSMembership<ID extends MemberIdentifier> implements Membership<ID
           }
         }
       }
-      // view processing can take a while, so we use a separate thread
-      // to avoid blocking a reader thread
-      long newId = viewArg.getViewId();
-      LocalViewMessage v = new LocalViewMessage((InternalDistributedMember) address, newId,
-          (MembershipView<InternalDistributedMember>) viewArg,
-          (GMSMembership<InternalDistributedMember>) GMSMembership.this);
 
-      try {
-        messageListener.messageReceived((Message<ID>) v);
-      } catch (MemberShunnedException e) {
-        logger.error("View installation was blocked by a MemberShunnedException", e);
-      }
+      viewExecutor.submit(() -> processView(viewArg.getViewId(), viewArg));
+
     } finally {
       latestViewWriteLock.unlock();
     }
@@ -1268,6 +1271,7 @@ public class GMSMembership<ID extends MemberIdentifier> implements Membership<ID
   public void shutdown() {
     setShutdown();
     services.stop();
+    viewExecutor.shutdownNow();
   }
 
   @Override
@@ -2107,4 +2111,31 @@ public class GMSMembership<ID extends MemberIdentifier> implements Membership<ID
 
   }
 
+
+  static class ViewExecutorBlockHandler implements RejectedExecutionHandler {
+
+    private final Queue queue;
+
+    ViewExecutorBlockHandler(Queue feed) {
+      queue = feed;
+    }
+
+    @Override
+    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+      if (executor.isShutdown()) {
+        throw new RejectedExecutionException(
+            "executor has been shutdown");
+      } else {
+        try {
+          executor.getQueue().put(r);
+        } catch (InterruptedException ie) {
+          Thread.currentThread().interrupt();
+          RejectedExecutionException e = new RejectedExecutionException(
+              "interrupted");
+          e.initCause(ie);
+        }
+      }
+    }
+  }
+
 }
diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/membership/MembershipDependenciesJUnitTest.java b/geode-core/src/test/java/org/apache/geode/distributed/internal/membership/MembershipDependenciesJUnitTest.java
index 9e9b1d0..f506346 100644
--- a/geode-core/src/test/java/org/apache/geode/distributed/internal/membership/MembershipDependenciesJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/distributed/internal/membership/MembershipDependenciesJUnitTest.java
@@ -30,7 +30,6 @@ import org.junit.runner.RunWith;
 
 import org.apache.geode.distributed.Locator;
 import org.apache.geode.distributed.internal.LocatorStats;
-import org.apache.geode.distributed.internal.membership.adapter.LocalViewMessage;
 import org.apache.geode.internal.OSProcess;
 import org.apache.geode.internal.inet.LocalHostUtil;
 import org.apache.geode.internal.security.SecurableCommunicationChannel;
@@ -114,8 +113,5 @@ public class MembershipDependenciesJUnitTest {
               .or(type(JavaWorkarounds.class))
 
               // TODO:
-              .or(type(OSProcess.class))
-
-              // TODO:
-              .or(type(LocalViewMessage.class)));
+              .or(type(OSProcess.class)));
 }