You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by zm...@apache.org on 2017/04/06 08:54:29 UTC

aurora git commit: Reliably subscribe to Mesos in the HTTP Driver.

Repository: aurora
Updated Branches:
  refs/heads/master 7678d194f -> 656cf9ac5


Reliably subscribe to Mesos in the HTTP Driver.

As noted in AURORA-1911 the `V1Mesos` driver doesn't re try `SUBSCRIBE` calls if
they fail. This means that after a leader subscribes and disconnects, it is
possible for it to never re subscribe again if the Mesos Master is unhealthy.

To fix this, I have moved the subscription into the dedicated
`SchedulerExecutor` and it coninutes to attempt to subscribe using truncated
binary backoff. It only stops if we are disconnected or if we sucessfully
connect.

Bugs closed: AURORA-1911

Reviewed at https://reviews.apache.org/r/58053/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/656cf9ac
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/656cf9ac
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/656cf9ac

Branch: refs/heads/master
Commit: 656cf9ac59754cb25084821b74ca22ef0d0ff2d5
Parents: 7678d19
Author: Zameer Manji <zm...@apache.org>
Authored: Thu Apr 6 10:40:41 2017 +0200
Committer: Zameer Manji <zm...@apache.org>
Committed: Thu Apr 6 10:40:41 2017 +0200

----------------------------------------------------------------------
 .../aurora/benchmark/StatusUpdateBenchmark.java |   3 +-
 .../scheduler/mesos/MesosCallbackHandler.java   |  17 +--
 .../scheduler/mesos/SchedulerDriverModule.java  |  19 ++-
 .../mesos/VersionedMesosSchedulerImpl.java      |  57 +++++--
 .../mesos/VersionedMesosSchedulerImplTest.java  | 148 ++++++++++++++++++-
 5 files changed, 211 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/656cf9ac/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java b/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java
index 206b114..c81387f 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java
@@ -66,6 +66,7 @@ import org.apache.aurora.scheduler.mesos.MesosCallbackHandler;
 import org.apache.aurora.scheduler.mesos.MesosCallbackHandler.MesosCallbackHandlerImpl;
 import org.apache.aurora.scheduler.mesos.MesosSchedulerImpl;
 import org.apache.aurora.scheduler.mesos.ProtosConversion;
+import org.apache.aurora.scheduler.mesos.SchedulerDriverModule;
 import org.apache.aurora.scheduler.mesos.TestExecutorSettings;
 import org.apache.aurora.scheduler.offers.OfferManager;
 import org.apache.aurora.scheduler.preemptor.ClusterStateImpl;
@@ -189,7 +190,7 @@ public class StatusUpdateBenchmark {
             bind(MesosCallbackHandler.class).to(MesosCallbackHandlerImpl.class);
             bind(MesosCallbackHandlerImpl.class).in(Singleton.class);
             bind(Executor.class)
-                .annotatedWith(MesosCallbackHandlerImpl.SchedulerExecutor.class)
+                .annotatedWith(SchedulerDriverModule.SchedulerExecutor.class)
                 .toInstance(AsyncUtil.singleThreadLoggingScheduledExecutor(
                     "SchedulerImpl-%d",
                     LoggerFactory.getLogger(StatusUpdateBenchmark.class)));

http://git-wip-us.apache.org/repos/asf/aurora/blob/656cf9ac/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java
index 5bf1e4e..5a5281a 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java
@@ -13,8 +13,6 @@
  */
 package org.apache.aurora.scheduler.mesos;
 
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
 import java.time.Instant;
 import java.time.temporal.ChronoUnit;
 import java.util.List;
@@ -22,7 +20,6 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.inject.Inject;
-import javax.inject.Qualifier;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
@@ -57,10 +54,6 @@ import org.apache.mesos.v1.Protos.TaskStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
 import static java.util.Objects.requireNonNull;
 
 import static org.apache.mesos.v1.Protos.TaskStatus.Reason.REASON_RECONCILIATION;
@@ -106,14 +99,6 @@ public interface MesosCallbackHandler {
     private final AtomicBoolean frameworkRegistered;
 
     /**
-     * Binding annotation for the executor the incoming Mesos message handler uses.
-     */
-    @VisibleForTesting
-    @Qualifier
-    @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
-    public @interface SchedulerExecutor { }
-
-    /**
      * Creates a new handler for callbacks.
      *
      * @param storage Store to save host attributes into.
@@ -130,7 +115,7 @@ public interface MesosCallbackHandler {
         TaskStatusHandler taskStatusHandler,
         OfferManager offerManager,
         EventSink eventSink,
-        @SchedulerExecutor Executor executor,
+        @SchedulerDriverModule.SchedulerExecutor Executor executor,
         StatsProvider statsProvider,
         Driver driver,
         Clock clock,

http://git-wip-us.apache.org/repos/asf/aurora/blob/656cf9ac/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java b/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java
index 10d4f1b..18dc3e0 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java
@@ -13,10 +13,14 @@
  */
 package org.apache.aurora.scheduler.mesos;
 
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
 import java.util.concurrent.Executor;
 
+import javax.inject.Qualifier;
 import javax.inject.Singleton;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.inject.AbstractModule;
 
 import org.apache.aurora.scheduler.app.SchedulerMain;
@@ -27,6 +31,11 @@ import org.apache.mesos.Scheduler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
 import static com.google.common.base.Preconditions.checkState;
 
 /**
@@ -40,6 +49,14 @@ public class SchedulerDriverModule extends AbstractModule {
     this.kind = kind;
   }
 
+  /**
+   * Binding annotation for the executor the incoming Mesos message handler uses.
+   */
+  @VisibleForTesting
+  @Qualifier
+  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+  public @interface SchedulerExecutor { }
+
   @Override
   protected void configure() {
     bind(Scheduler.class).to(MesosSchedulerImpl.class);
@@ -48,7 +65,7 @@ public class SchedulerDriverModule extends AbstractModule {
     bind(MesosCallbackHandler.class).to(MesosCallbackHandlerImpl.class);
     bind(MesosCallbackHandlerImpl.class).in(Singleton.class);
     // TODO(zmanji): Create singleThreadedExecutor (non-scheduled) variant.
-    bind(Executor.class).annotatedWith(MesosCallbackHandlerImpl.SchedulerExecutor.class)
+    bind(Executor.class).annotatedWith(SchedulerExecutor.class)
         .toInstance(AsyncUtil.singleThreadLoggingScheduledExecutor("SchedulerImpl-%d", LOG));
 
     switch (kind) {

http://git-wip-us.apache.org/repos/asf/aurora/blob/656cf9ac/src/main/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImpl.java
index 67d356a..5329de5 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImpl.java
@@ -13,6 +13,9 @@
  */
 package org.apache.aurora.scheduler.mesos;
 
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import javax.inject.Inject;
 
 import com.google.common.base.Optional;
@@ -21,6 +24,8 @@ import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 
 import org.apache.aurora.common.inject.TimedInterceptor;
+import org.apache.aurora.common.stats.StatsProvider;
+import org.apache.aurora.common.util.BackoffHelper;
 import org.apache.aurora.scheduler.stats.CachedCounters;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.mesos.v1.Protos;
@@ -45,8 +50,13 @@ public class VersionedMesosSchedulerImpl implements Scheduler {
   private final MesosCallbackHandler handler;
   private final Storage storage;
   private final FrameworkInfoFactory infoFactory;
+  private final Executor executor;
+  private final BackoffHelper backoffHelper;
 
-  private volatile boolean isRegistered = false;
+  private final AtomicBoolean isSubscribed = new AtomicBoolean(false);
+  private final AtomicBoolean isConnected = new AtomicBoolean(false);
+  private final AtomicBoolean isRegistered = new AtomicBoolean(false);
+  private final AtomicLong subcriptionCalls;
 
   private static final String EVENT_COUNTER_STAT_PREFIX = "mesos_scheduler_event_";
   // A cache to hold the metric names to prevent us from creating strings for every event
@@ -64,18 +74,26 @@ public class VersionedMesosSchedulerImpl implements Scheduler {
   VersionedMesosSchedulerImpl(
       MesosCallbackHandler handler,
       CachedCounters counters,
+      StatsProvider statsProvider,
       Storage storage,
+      @SchedulerDriverModule.SchedulerExecutor Executor executor,
+      BackoffHelper backoffHelper,
       FrameworkInfoFactory factory) {
     this.handler = requireNonNull(handler);
     this.counters = requireNonNull(counters);
     this.storage = requireNonNull(storage);
     this.infoFactory = requireNonNull(factory);
+    this.executor = requireNonNull(executor);
+    this.backoffHelper = requireNonNull(backoffHelper);
     initializeEventMetrics();
+
+    this.subcriptionCalls = statsProvider.makeCounter("mesos_scheduler_subscription_attempts");
   }
 
   @Override
   public void connected(Mesos mesos) {
     LOG.info("Connected to Mesos master.");
+    isConnected.set(true);
 
     Optional<String> frameworkId = storage.read(
         storeProvider -> storeProvider.getSchedulerStore().fetchFrameworkId());
@@ -95,15 +113,35 @@ public class VersionedMesosSchedulerImpl implements Scheduler {
       LOG.warn("Did not find a persisted framework ID, connecting as a new framework.");
     }
 
-    LOG.info("Sending subscribe call");
-    mesos.send(call.setSubscribe(Call.Subscribe.newBuilder()
-        .setFrameworkInfo(frameworkBuilder.build())
-        .build())
-        .build());
+    call.setSubscribe(Call.Subscribe.newBuilder().setFrameworkInfo(frameworkBuilder));
+
+    executor.execute(() -> {
+      LOG.info("Starting to subscribe to Mesos with backoff.");
+      try {
+        backoffHelper.doUntilSuccess(() -> {
+          if (!isConnected.get())  {
+            LOG.info("Disconnected while attempting to subscribe. Stopping attempt.");
+            return true;
+          }
+          if (!isSubscribed.get()) {
+            LOG.info("Sending subscribe call.");
+            mesos.send(call.build());
+            subcriptionCalls.incrementAndGet();
+            return false;
+          }
+          LOG.info("Subscribed to Mesos");
+          return true;
+        });
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    });
   }
 
   @Override
   public void disconnected(Mesos mesos) {
+    isSubscribed.set(false);
+    isConnected.set(false);
     handler.handleDisconnection();
   }
 
@@ -126,16 +164,17 @@ public class VersionedMesosSchedulerImpl implements Scheduler {
     switch(event.getType()) {
       case SUBSCRIBED:
         Event.Subscribed subscribed = event.getSubscribed();
-        if (isRegistered) {
+        if (isRegistered.get()) {
           handler.handleReregistration(subscribed.getMasterInfo());
         } else {
+          isRegistered.set(true);
           handler.handleRegistration(subscribed.getFrameworkId(), subscribed.getMasterInfo());
-          isRegistered = true;
         }
+        isSubscribed.set(true);
         break;
 
       case OFFERS:
-        checkState(isRegistered, "Must be registered before receiving offers.");
+        checkState(isSubscribed.get(), "Must be registered before receiving offers.");
         handler.handleOffers(event.getOffers().getOffersList());
         break;
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/656cf9ac/src/test/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImplTest.java
index 756d0d9..8c259bf 100644
--- a/src/test/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImplTest.java
@@ -13,12 +13,16 @@
  */
 package org.apache.aurora.scheduler.mesos;
 
+import java.util.concurrent.Executors;
+
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.net.InetAddresses;
 import com.google.protobuf.ByteString;
 
+import org.apache.aurora.common.base.ExceptionalSupplier;
 import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.common.util.BackoffHelper;
 import org.apache.aurora.scheduler.stats.CachedCounters;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.apache.aurora.scheduler.testing.FakeStatsProvider;
@@ -40,10 +44,12 @@ import org.easymock.Capture;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.capture;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 public class VersionedMesosSchedulerImplTest extends EasyMockTest {
@@ -53,6 +59,7 @@ public class VersionedMesosSchedulerImplTest extends EasyMockTest {
   private Mesos driver;
   private FakeStatsProvider statsProvider;
   private FrameworkInfoFactory infoFactory;
+  private BackoffHelper backoffHelper;
 
   private VersionedMesosSchedulerImpl scheduler;
 
@@ -145,31 +152,44 @@ public class VersionedMesosSchedulerImplTest extends EasyMockTest {
     driver = createMock(Mesos.class);
     statsProvider = new FakeStatsProvider();
     infoFactory = createMock(FrameworkInfoFactory.class);
+    backoffHelper = createMock(BackoffHelper.class);
 
     scheduler = new VersionedMesosSchedulerImpl(
         handler,
         new CachedCounters(statsProvider),
+        statsProvider,
         storageUtil.storage,
+        Executors.newSingleThreadExecutor(),
+        backoffHelper,
         infoFactory);
   }
 
-  @Test
-  public void testConnected() {
+  @Test(timeout = 300000)
+  public void testConnected() throws Exception {
     // Once the V1 driver has connected, we need to establish a subscription to get events
-
-    storageUtil.expectOperations();
-    expect(storageUtil.schedulerStore.fetchFrameworkId()).andReturn(Optional.of(FRAMEWORK_ID));
-    expect(infoFactory.getFrameworkInfo()).andReturn(FRAMEWORK_INFO);
+    expectFrameworkInfoRead();
 
     Capture<Call> subscribeCapture = createCapture();
 
     driver.send(capture(subscribeCapture));
     expectLastCall().once();
 
+    Capture<ExceptionalSupplier<Boolean, RuntimeException>> supplierCapture = createCapture();
+    backoffHelper.doUntilSuccess(capture(supplierCapture));
+    expectLastCall().once();
+
     control.replay();
 
     scheduler.connected(driver);
 
+    waitUntilCaptured(supplierCapture);
+
+    assertTrue(supplierCapture.hasCaptured());
+    ExceptionalSupplier<Boolean, RuntimeException> supplier = supplierCapture.getValue();
+
+    // Make one connection attempt
+    supplier.get();
+
     assertTrue(subscribeCapture.hasCaptured());
 
     Call subscribe = subscribeCapture.getValue();
@@ -181,6 +201,78 @@ public class VersionedMesosSchedulerImplTest extends EasyMockTest {
         FRAMEWORK_INFO.toBuilder().setId(FRAMEWORK).build());
   }
 
+  @Test(timeout = 300000)
+  public void testAttemptSubscriptionSuccessful() throws Exception {
+    expectFrameworkInfoRead();
+
+    // Other tests already check if what we send is correct.
+    driver.send(anyObject());
+    expectLastCall().once();
+    driver.send(anyObject());
+    expectLastCall().once();
+
+    Capture<ExceptionalSupplier<Boolean, RuntimeException>> supplierCapture = createCapture();
+    backoffHelper.doUntilSuccess(capture(supplierCapture));
+    expectLastCall().once();
+
+    handler.handleRegistration(FRAMEWORK, MASTER);
+
+    control.replay();
+
+    scheduler.connected(driver);
+
+    waitUntilCaptured(supplierCapture);
+    assertTrue(supplierCapture.hasCaptured());
+    ExceptionalSupplier<Boolean, RuntimeException> supplier = supplierCapture.getValue();
+
+    // Each attempt should return false.
+    assertFalse(supplier.get());
+    assertFalse(supplier.get());
+
+    // After the callback we should return true because it was successful.
+    scheduler.received(driver, SUBSCRIBED_EVENT);
+
+    assertTrue(supplier.get());
+  }
+
+  @Test(timeout = 300000)
+  public void testAttemptSubscriptionHaltsAfterDisconnection() throws Exception {
+    storageUtil.expectOperations();
+    expect(storageUtil.schedulerStore.fetchFrameworkId()).andReturn(Optional.of(FRAMEWORK_ID));
+    expect(infoFactory.getFrameworkInfo()).andReturn(FRAMEWORK_INFO);
+
+    // Other tests already check if what we send is correct.
+    driver.send(anyObject());
+    expectLastCall().once();
+
+    Capture<ExceptionalSupplier<Boolean, RuntimeException>> supplierCapture = createCapture();
+    backoffHelper.doUntilSuccess(capture(supplierCapture));
+    expectLastCall().once();
+
+    handler.handleDisconnection();
+
+    control.replay();
+
+    scheduler.connected(driver);
+
+    waitUntilCaptured(supplierCapture);
+    assertTrue(supplierCapture.hasCaptured());
+    ExceptionalSupplier<Boolean, RuntimeException> supplier = supplierCapture.getValue();
+
+    assertFalse(supplier.get());
+
+    // After disconnection we should stop.
+    scheduler.disconnected(driver);
+
+    assertTrue(supplier.get());
+  }
+
+  private static void waitUntilCaptured(Capture<?> capture) throws Exception {
+    while (!capture.hasCaptured()) {
+      Thread.sleep(1000);
+    }
+  }
+
   @Test
   public void testDisconnected() {
     handler.handleDisconnection();
@@ -272,4 +364,48 @@ public class VersionedMesosSchedulerImplTest extends EasyMockTest {
     scheduler.received(driver, FAILED_AGENT_EVENT);
     assertEquals(1L, statsProvider.getLongValue("mesos_scheduler_event_FAILURE"));
   }
+
+  @Test(timeout = 300000)
+  public void testSubscribeDisconnectSubscribeCycle() throws Exception {
+    expectFrameworkInfoRead();
+
+    Capture<ExceptionalSupplier<Boolean, RuntimeException>> firstSubscribeAttempt = createCapture();
+    backoffHelper.doUntilSuccess(capture(firstSubscribeAttempt));
+    expectLastCall().once();
+
+    handler.handleRegistration(FRAMEWORK, MASTER);
+    handler.handleDisconnection();
+
+    Capture<ExceptionalSupplier<Boolean, RuntimeException>> secondSubscribeAttempt =
+        createCapture();
+    backoffHelper.doUntilSuccess(capture(secondSubscribeAttempt));
+    expectLastCall().once();
+
+    // Second subscribe should call the reregistration handler.
+    handler.handleReregistration(MASTER);
+
+    control.replay();
+
+    scheduler.connected(driver);
+
+    waitUntilCaptured(firstSubscribeAttempt);
+    assertTrue(firstSubscribeAttempt.hasCaptured());
+
+    scheduler.received(driver, SUBSCRIBED_EVENT);
+    scheduler.disconnected(driver);
+    scheduler.connected(driver);
+
+    waitUntilCaptured(secondSubscribeAttempt);
+    assertTrue(secondSubscribeAttempt.hasCaptured());
+
+    scheduler.received(driver, SUBSCRIBED_EVENT);
+  }
+
+  private void expectFrameworkInfoRead() {
+    storageUtil.expectOperations();
+    expect(storageUtil.schedulerStore.fetchFrameworkId())
+        .andReturn(Optional.of(FRAMEWORK_ID))
+        .anyTimes();
+    expect(infoFactory.getFrameworkInfo()).andReturn(FRAMEWORK_INFO).anyTimes();
+  }
 }