You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@sling.apache.org by GitBox <gi...@apache.org> on 2021/09/14 14:00:15 UTC

[GitHub] [sling-org-apache-sling-discovery-oak] mreutegg commented on a change in pull request #4: SLING-10489 : introducing partial startup detector

mreutegg commented on a change in pull request #4:
URL: https://github.com/apache/sling-org-apache-sling-discovery-oak/pull/4#discussion_r708217617



##########
File path: src/main/java/org/apache/sling/discovery/oak/cluster/PartialStartupDetector.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.oak.cluster;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ValueMap;
+import org.apache.sling.discovery.commons.providers.util.LogSilencer;
+import org.apache.sling.discovery.oak.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Discovery.oak requires that both Oak and Sling are operating normally in
+ * order to declare victory and announce a new topology.
+ * <p/>
+ * The startup phase is especially tricky in this regard, since there are
+ * multiple elements that need to get updated (some are in the Oak layer, some
+ * in Sling):
+ * <ul>
+ * <li>lease & clusterNodeId : this is maintained by Oak</li>
+ * <li>idMap : this is maintained by IdMapService</li>
+ * <li>leaderElectionId : this is maintained by OakViewChecker</li>
+ * <li>syncToken : this is maintained by SyncTokenService</li>
+ * </ul>
+ * A successful join of a cluster instance to the topology requires all 4
+ * elements to be set (and maintained, in case of lease and syncToken)
+ * correctly.
+ * <p/>
+ * This PartialStartupDetector is in charge of ensuring that a newly joined
+ * instance has all these elements set. Otherwise it is considered a "partially
+ * started instance" (PSI) and suppressed.
+ * <p/>
+ * The suppression ensures that existing instances aren't blocked by a rogue,
+ * partially starting instance. However, there's also a timeout after which the
+ * suppression is no longer applied - at which point such a rogue instance will
+ * block existing instances. Infrastructure must ensure that a rogue instance is
+ * detected and restarted/fixed in a reasonable amount of time.
+ */
+public class PartialStartupDetector {
+
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    private final ResourceResolver resourceResolver;
+    private final Config config;
+    private final int me;
+    private final long currentSeqNum;
+
+    private final boolean syncTokenEnabled;
+
+    private final boolean suppressingApplicable;
+    private final Set<Integer> partiallyStartedClusterNodeIds = new HashSet<>();
+
+    private final LogSilencer logSilencer;
+
+    /**
+     * @param lowestLocalSeqNum the lowest sequence number which
+     * the local OakClusterViewService has handled as part of asClusterView
+     * @param me the clusterNodeId (provided by oak) of the local instance (==me)
+     * @param localSlingIds slingId previously seen by this cluster instance (those will not be suppressed)

Review comment:
       Should this be `mySlingId` instead?

##########
File path: src/main/java/org/apache/sling/discovery/oak/cluster/PartialStartupDetector.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.oak.cluster;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ValueMap;
+import org.apache.sling.discovery.commons.providers.util.LogSilencer;
+import org.apache.sling.discovery.oak.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Discovery.oak requires that both Oak and Sling are operating normally in
+ * order to declare victory and announce a new topology.
+ * <p/>
+ * The startup phase is especially tricky in this regard, since there are
+ * multiple elements that need to get updated (some are in the Oak layer, some
+ * in Sling):
+ * <ul>
+ * <li>lease & clusterNodeId : this is maintained by Oak</li>
+ * <li>idMap : this is maintained by IdMapService</li>
+ * <li>leaderElectionId : this is maintained by OakViewChecker</li>
+ * <li>syncToken : this is maintained by SyncTokenService</li>
+ * </ul>
+ * A successful join of a cluster instance to the topology requires all 4
+ * elements to be set (and maintained, in case of lease and syncToken)
+ * correctly.
+ * <p/>
+ * This PartialStartupDetector is in charge of ensuring that a newly joined
+ * instance has all these elements set. Otherwise it is considered a "partially
+ * started instance" (PSI) and suppressed.
+ * <p/>
+ * The suppression ensures that existing instances aren't blocked by a rogue,
+ * partially starting instance. However, there's also a timeout after which the
+ * suppression is no longer applied - at which point such a rogue instance will
+ * block existing instances. Infrastructure must ensure that a rogue instance is
+ * detected and restarted/fixed in a reasonable amount of time.
+ */
+public class PartialStartupDetector {
+
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    private final ResourceResolver resourceResolver;
+    private final Config config;
+    private final int me;
+    private final long currentSeqNum;
+
+    private final boolean syncTokenEnabled;
+
+    private final boolean suppressingApplicable;
+    private final Set<Integer> partiallyStartedClusterNodeIds = new HashSet<>();
+
+    private final LogSilencer logSilencer;
+
+    /**
+     * @param lowestLocalSeqNum the lowest sequence number which
+     * the local OakClusterViewService has handled as part of asClusterView
+     * @param me the clusterNodeId (provided by oak) of the local instance (==me)
+     * @param localSlingIds slingId previously seen by this cluster instance (those will not be suppressed)
+     * @param timeoutMillis -1 or 0 disables the timeout, otherwise the suppression
+     * is only done for the provided maximum number of milliseconds.
+     * @param logSilencer
+     */
+    PartialStartupDetector(ResourceResolver resourceResolver, Config config,
+            long lowestLocalSeqNum, int me, String mySlingId, long currentSeqNum, long timeoutMillis,
+            LogSilencer logSilencer) {
+        this.resourceResolver = resourceResolver;
+        this.config = config;
+        this.me = me;
+        this.currentSeqNum = currentSeqNum;
+
+        this.syncTokenEnabled = config != null && config.getSyncTokenEnabled();
+
+        // suppressing is enabled
+        // * when so configured
+        // * we haven't hit the timeout yet
+        // * when the local instance ever showed to peers that it has fully started.
+        // and one way to verify for that is to demand that it ever wrote a synctoken.
+        // and to check that we keep note of the first ever successful seq num returned
+        // here
+        // and require the current syncToken to be at least that.
+        final long now = System.currentTimeMillis();
+        final long mySyncToken = readSyncToken(resourceResolver, mySlingId);
+        final boolean suppressionConfigured = config != null && config.getSuppressPartiallyStartedInstances();
+        suppressingApplicable = suppressionConfigured
+                && ((timeoutMillis <= 0) || (now < timeoutMillis))
+                && (mySyncToken != -1) && (lowestLocalSeqNum != -1)
+                && (mySyncToken >= lowestLocalSeqNum);
+        if (logger.isDebugEnabled()) {
+            logger.debug("<init> suppressionConfigured = " + suppressionConfigured + ", me = " + me + ", mySlingId = " + mySlingId +
+                    ", timeoutMillis = " + timeoutMillis + ", mySyncToken = " + mySyncToken +
+                    ", lowestLocalSeqNum = " + lowestLocalSeqNum + ", suppressingApplicable = " + suppressingApplicable);
+        }
+        this.logSilencer = logSilencer;
+    }
+
+    private boolean isSuppressing(int id) {

Review comment:
       This method name confused me. Somehow I expect it to check whether `partiallyStartedClusterNodeIds` contains the `id`.
   
   What do you think about `isSuppressible()`?

##########
File path: src/test/java/org/apache/sling/discovery/oak/JoinerDelayTest.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.oak;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.commons.scheduler.impl.QuartzScheduler;
+import org.apache.sling.commons.scheduler.impl.SchedulerServiceFactory;
+import org.apache.sling.commons.threads.impl.DefaultThreadPoolManager;
+import org.apache.sling.discovery.commons.providers.BaseTopologyView;
+import org.apache.sling.discovery.commons.providers.DefaultClusterView;
+import org.apache.sling.discovery.commons.providers.DummyTopologyView;
+import org.apache.sling.discovery.commons.providers.base.DummyScheduler;
+import org.apache.sling.discovery.commons.providers.spi.LocalClusterView;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.mockito.Mockito;
+import org.osgi.framework.BundleContext;
+
+import com.google.common.io.Closer;
+
+import junitx.util.PrivateAccessor;
+
+@RunWith(Parameterized.class)
+public class JoinerDelayTest {
+
+    private static enum SchedulerType {
+            REAL, DUMMY
+    }
+
+    private final SchedulerType schedulerType;
+    private Closer closer;
+
+    private Scheduler scheduler;
+    private BaseTopologyView view;
+    private Runnable callback;
+    private Semaphore callbackSemaphore;
+
+    private Scheduler createRealScheduler() throws Throwable {
+        final BundleContext ctx = Mockito.mock(BundleContext.class);
+        final Map<String, Object> props = new HashMap<>();
+        final DefaultThreadPoolManager threadPoolManager = new DefaultThreadPoolManager(ctx, new Hashtable<String, Object>());
+        final QuartzScheduler qScheduler = new QuartzScheduler();
+        Scheduler scheduler = new SchedulerServiceFactory();
+        PrivateAccessor.setField(qScheduler, "threadPoolManager", threadPoolManager);
+        PrivateAccessor.invoke(qScheduler, "activate", new Class[] {BundleContext.class,  Map.class}, new Object[] {ctx, props});
+        PrivateAccessor.setField(scheduler, "scheduler", qScheduler);
+
+        closer.register(new Closeable() {
+
+            @Override
+            public void close() throws IOException {
+                try {
+                    PrivateAccessor.invoke(qScheduler, "deactivate", new Class[] {BundleContext.class}, new Object[] {ctx});
+                } catch (Throwable e) {
+                    throw new IOException(e);
+                }
+            }
+
+        });
+
+        return scheduler;
+    }
+
+    private Scheduler createDummyScheduler() {
+        return new DummyScheduler(true);
+    }
+
+    @Parameterized.Parameters(name="{0}")
+    public static Collection<SchedulerType> schedulerTypes() {
+        Collection<SchedulerType> result = new ArrayList<>();
+        result.add(SchedulerType.REAL);
+        result.add(SchedulerType.DUMMY);
+        return result;
+    }
+
+    public JoinerDelayTest(SchedulerType schedulerType) {
+        this.schedulerType = schedulerType;
+    }
+
+    @Before
+    public void setup() throws Throwable {
+        closer = Closer.create();
+        switch(schedulerType) {
+        case REAL : {
+            scheduler = createRealScheduler();
+            break;
+        }
+        case DUMMY : {
+            scheduler = createDummyScheduler();
+            break;
+        }
+        default: {
+            fail("unknown schedulerType : " + schedulerType);
+        }
+        }
+        DefaultClusterView cluster = new DefaultClusterView(UUID.randomUUID().toString());
+        view = new DummyTopologyView()
+                .addInstance(UUID.randomUUID().toString(), cluster, true, true)
+                .addInstance(UUID.randomUUID().toString(), cluster, false, false);
+        callbackSemaphore = new Semaphore(0);
+        callback = new Runnable() {
+
+            @Override
+            public void run() {
+                callbackSemaphore.release();
+            }
+
+        };
+    }
+
+    @After
+    public void teardown() throws Throwable {
+        closer.close();
+    }
+
+    @Test
+    public void testDummyCancelSyncCalls() throws Exception {
+        JoinerDelay joinerDelay = new JoinerDelay(-1, scheduler);
+        joinerDelay.cancelSync();
+        joinerDelay.cancelSync();
+    }
+
+    @Test
+    public void testSync_nulValues() throws Exception {
+        JoinerDelay joinerDelay = new JoinerDelay(1, scheduler);
+        try {
+            joinerDelay.sync(null, null);
+            fail("should fail");
+        } catch(Exception e) {

Review comment:
       I would catch and check for the more specific `IllegalArgumentException`.

##########
File path: src/main/java/org/apache/sling/discovery/oak/cluster/PartialStartupDetector.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.oak.cluster;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ValueMap;
+import org.apache.sling.discovery.commons.providers.util.LogSilencer;
+import org.apache.sling.discovery.oak.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Discovery.oak requires that both Oak and Sling are operating normally in
+ * order to declare victory and announce a new topology.
+ * <p/>
+ * The startup phase is especially tricky in this regard, since there are
+ * multiple elements that need to get updated (some are in the Oak layer, some
+ * in Sling):
+ * <ul>
+ * <li>lease & clusterNodeId : this is maintained by Oak</li>
+ * <li>idMap : this is maintained by IdMapService</li>
+ * <li>leaderElectionId : this is maintained by OakViewChecker</li>
+ * <li>syncToken : this is maintained by SyncTokenService</li>
+ * </ul>
+ * A successful join of a cluster instance to the topology requires all 4
+ * elements to be set (and maintained, in case of lease and syncToken)
+ * correctly.
+ * <p/>
+ * This PartialStartupDetector is in charge of ensuring that a newly joined
+ * instance has all these elements set. Otherwise it is considered a "partially
+ * started instance" (PSI) and suppressed.
+ * <p/>
+ * The suppression ensures that existing instances aren't blocked by a rogue,
+ * partially starting instance. However, there's also a timeout after which the
+ * suppression is no longer applied - at which point such a rogue instance will
+ * block existing instances. Infrastructure must ensure that a rogue instance is
+ * detected and restarted/fixed in a reasonable amount of time.
+ */
+public class PartialStartupDetector {
+
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    private final ResourceResolver resourceResolver;
+    private final Config config;
+    private final int me;
+    private final long currentSeqNum;
+
+    private final boolean syncTokenEnabled;
+
+    private final boolean suppressingApplicable;
+    private final Set<Integer> partiallyStartedClusterNodeIds = new HashSet<>();
+
+    private final LogSilencer logSilencer;
+
+    /**
+     * @param lowestLocalSeqNum the lowest sequence number which
+     * the local OakClusterViewService has handled as part of asClusterView
+     * @param me the clusterNodeId (provided by oak) of the local instance (==me)
+     * @param localSlingIds slingId previously seen by this cluster instance (those will not be suppressed)
+     * @param timeoutMillis -1 or 0 disables the timeout, otherwise the suppression
+     * is only done for the provided maximum number of milliseconds.
+     * @param logSilencer
+     */
+    PartialStartupDetector(ResourceResolver resourceResolver, Config config,
+            long lowestLocalSeqNum, int me, String mySlingId, long currentSeqNum, long timeoutMillis,
+            LogSilencer logSilencer) {
+        this.resourceResolver = resourceResolver;
+        this.config = config;
+        this.me = me;
+        this.currentSeqNum = currentSeqNum;
+
+        this.syncTokenEnabled = config != null && config.getSyncTokenEnabled();
+
+        // suppressing is enabled
+        // * when so configured
+        // * we haven't hit the timeout yet
+        // * when the local instance ever showed to peers that it has fully started.
+        // and one way to verify for that is to demand that it ever wrote a synctoken.
+        // and to check that we keep note of the first ever successful seq num returned
+        // here
+        // and require the current syncToken to be at least that.
+        final long now = System.currentTimeMillis();
+        final long mySyncToken = readSyncToken(resourceResolver, mySlingId);
+        final boolean suppressionConfigured = config != null && config.getSuppressPartiallyStartedInstances();
+        suppressingApplicable = suppressionConfigured
+                && ((timeoutMillis <= 0) || (now < timeoutMillis))

Review comment:
       I don't get this. Why is suppressing applicable when `timeoutMillis` is zero?

##########
File path: src/main/java/org/apache/sling/discovery/oak/cluster/PartialStartupDetector.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.oak.cluster;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ValueMap;
+import org.apache.sling.discovery.commons.providers.util.LogSilencer;
+import org.apache.sling.discovery.oak.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Discovery.oak requires that both Oak and Sling are operating normally in
+ * order to declare victory and announce a new topology.
+ * <p/>
+ * The startup phase is especially tricky in this regard, since there are
+ * multiple elements that need to get updated (some are in the Oak layer, some
+ * in Sling):
+ * <ul>
+ * <li>lease & clusterNodeId : this is maintained by Oak</li>
+ * <li>idMap : this is maintained by IdMapService</li>
+ * <li>leaderElectionId : this is maintained by OakViewChecker</li>
+ * <li>syncToken : this is maintained by SyncTokenService</li>
+ * </ul>
+ * A successful join of a cluster instance to the topology requires all 4
+ * elements to be set (and maintained, in case of lease and syncToken)
+ * correctly.
+ * <p/>
+ * This PartialStartupDetector is in charge of ensuring that a newly joined
+ * instance has all these elements set. Otherwise it is considered a "partially
+ * started instance" (PSI) and suppressed.
+ * <p/>
+ * The suppression ensures that existing instances aren't blocked by a rogue,
+ * partially starting instance. However, there's also a timeout after which the
+ * suppression is no longer applied - at which point such a rogue instance will
+ * block existing instances. Infrastructure must ensure that a rogue instance is
+ * detected and restarted/fixed in a reasonable amount of time.
+ */
+public class PartialStartupDetector {
+
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    private final ResourceResolver resourceResolver;
+    private final Config config;
+    private final int me;
+    private final long currentSeqNum;
+
+    private final boolean syncTokenEnabled;
+
+    private final boolean suppressingApplicable;
+    private final Set<Integer> partiallyStartedClusterNodeIds = new HashSet<>();
+
+    private final LogSilencer logSilencer;
+
+    /**
+     * @param lowestLocalSeqNum the lowest sequence number which
+     * the local OakClusterViewService has handled as part of asClusterView
+     * @param me the clusterNodeId (provided by oak) of the local instance (==me)
+     * @param localSlingIds slingId previously seen by this cluster instance (those will not be suppressed)
+     * @param timeoutMillis -1 or 0 disables the timeout, otherwise the suppression
+     * is only done for the provided maximum number of milliseconds.
+     * @param logSilencer

Review comment:
       Remove or add comment.

##########
File path: src/test/java/org/apache/sling/discovery/oak/OakDiscoveryServiceTest.java
##########
@@ -414,4 +428,352 @@ public void testPropertyProviderRegistrations() throws Exception{
         discoveryService.unbindPropertyProvider(p, m);
         discoveryService.updatedPropertyProvider(p, m);
     }
-}
+
+    @Test
+    public void simpleNewJoinerTest() throws Exception {
+        doSimpleNewJoinerTest(0);
+    }
+
+    @Test
+    public void simpleNewJoinerTestWithDelay() throws Exception {
+        doSimpleNewJoinerTest(2);
+    }
+
+    private void doSimpleNewJoinerTest(int joinerDelaySeconds) throws Exception {
+        OakVirtualInstanceBuilder builder1 =
+                (OakVirtualInstanceBuilder) new OakVirtualInstanceBuilder()
+                .setDebugName("instance1")
+                .newRepository("/foo/barrio/foo/", true)
+                .setConnectorPingInterval(999)
+                .setConnectorPingTimeout(999);
+        builder1.getConfig().setSyncTokenEnabled(true);
+        builder1.getConfig().setJoinerDelaySeconds(joinerDelaySeconds);
+        VirtualInstance instance1 = builder1.build();
+        DummyListener listener1 = new DummyListener();
+        OakDiscoveryService discoveryService1 = (OakDiscoveryService) instance1.getDiscoveryService();
+        discoveryService1.bindTopologyEventListener(listener1);
+
+        instance1.heartbeatsAndCheckView();
+        instance1.heartbeatsAndCheckView();
+
+        assertEquals(0, discoveryService1.getViewStateManager().waitForAsyncEvents(2000));
+        Thread.sleep(1000);

Review comment:
       Why is this needed?

##########
File path: src/test/java/org/apache/sling/discovery/oak/JoinerDelayTest.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.oak;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.commons.scheduler.impl.QuartzScheduler;
+import org.apache.sling.commons.scheduler.impl.SchedulerServiceFactory;
+import org.apache.sling.commons.threads.impl.DefaultThreadPoolManager;
+import org.apache.sling.discovery.commons.providers.BaseTopologyView;
+import org.apache.sling.discovery.commons.providers.DefaultClusterView;
+import org.apache.sling.discovery.commons.providers.DummyTopologyView;
+import org.apache.sling.discovery.commons.providers.base.DummyScheduler;
+import org.apache.sling.discovery.commons.providers.spi.LocalClusterView;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.mockito.Mockito;
+import org.osgi.framework.BundleContext;
+
+import com.google.common.io.Closer;
+
+import junitx.util.PrivateAccessor;
+
+@RunWith(Parameterized.class)
+public class JoinerDelayTest {
+
+    private static enum SchedulerType {
+            REAL, DUMMY
+    }
+
+    private final SchedulerType schedulerType;
+    private Closer closer;
+
+    private Scheduler scheduler;
+    private BaseTopologyView view;
+    private Runnable callback;
+    private Semaphore callbackSemaphore;
+
+    private Scheduler createRealScheduler() throws Throwable {
+        final BundleContext ctx = Mockito.mock(BundleContext.class);
+        final Map<String, Object> props = new HashMap<>();
+        final DefaultThreadPoolManager threadPoolManager = new DefaultThreadPoolManager(ctx, new Hashtable<String, Object>());
+        final QuartzScheduler qScheduler = new QuartzScheduler();
+        Scheduler scheduler = new SchedulerServiceFactory();
+        PrivateAccessor.setField(qScheduler, "threadPoolManager", threadPoolManager);
+        PrivateAccessor.invoke(qScheduler, "activate", new Class[] {BundleContext.class,  Map.class}, new Object[] {ctx, props});
+        PrivateAccessor.setField(scheduler, "scheduler", qScheduler);
+
+        closer.register(new Closeable() {
+
+            @Override
+            public void close() throws IOException {
+                try {
+                    PrivateAccessor.invoke(qScheduler, "deactivate", new Class[] {BundleContext.class}, new Object[] {ctx});
+                } catch (Throwable e) {
+                    throw new IOException(e);
+                }
+            }
+
+        });
+
+        return scheduler;
+    }
+
+    private Scheduler createDummyScheduler() {
+        return new DummyScheduler(true);
+    }
+
+    @Parameterized.Parameters(name="{0}")
+    public static Collection<SchedulerType> schedulerTypes() {
+        Collection<SchedulerType> result = new ArrayList<>();
+        result.add(SchedulerType.REAL);
+        result.add(SchedulerType.DUMMY);
+        return result;
+    }
+
+    public JoinerDelayTest(SchedulerType schedulerType) {
+        this.schedulerType = schedulerType;
+    }
+
+    @Before
+    public void setup() throws Throwable {
+        closer = Closer.create();
+        switch(schedulerType) {
+        case REAL : {
+            scheduler = createRealScheduler();
+            break;
+        }
+        case DUMMY : {
+            scheduler = createDummyScheduler();
+            break;
+        }
+        default: {
+            fail("unknown schedulerType : " + schedulerType);
+        }
+        }
+        DefaultClusterView cluster = new DefaultClusterView(UUID.randomUUID().toString());
+        view = new DummyTopologyView()
+                .addInstance(UUID.randomUUID().toString(), cluster, true, true)
+                .addInstance(UUID.randomUUID().toString(), cluster, false, false);
+        callbackSemaphore = new Semaphore(0);
+        callback = new Runnable() {
+
+            @Override
+            public void run() {
+                callbackSemaphore.release();
+            }
+
+        };
+    }
+
+    @After
+    public void teardown() throws Throwable {
+        closer.close();
+    }
+
+    @Test
+    public void testDummyCancelSyncCalls() throws Exception {
+        JoinerDelay joinerDelay = new JoinerDelay(-1, scheduler);
+        joinerDelay.cancelSync();
+        joinerDelay.cancelSync();
+    }
+
+    @Test
+    public void testSync_nulValues() throws Exception {
+        JoinerDelay joinerDelay = new JoinerDelay(1, scheduler);
+        try {
+            joinerDelay.sync(null, null);
+            fail("should fail");
+        } catch(Exception e) {
+            // ok
+        }
+        joinerDelay = new JoinerDelay(1, scheduler);
+        try {
+            joinerDelay.sync(view, null);
+            fail("should fail");
+        } catch(Exception e) {
+            // ok
+        }
+        joinerDelay = new JoinerDelay(1, scheduler);
+        // this one is fine as it is caught
+        joinerDelay.sync(null, callback);

Review comment:
       I think the implementation should also throw an `IllegalArgumentException` when `view` is `null`. The contract does not mention `view` can be `null` and also does not specify what should happen in that case.

##########
File path: src/test/java/org/apache/sling/discovery/oak/JoinerDelayTest.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.oak;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.commons.scheduler.impl.QuartzScheduler;
+import org.apache.sling.commons.scheduler.impl.SchedulerServiceFactory;
+import org.apache.sling.commons.threads.impl.DefaultThreadPoolManager;
+import org.apache.sling.discovery.commons.providers.BaseTopologyView;
+import org.apache.sling.discovery.commons.providers.DefaultClusterView;
+import org.apache.sling.discovery.commons.providers.DummyTopologyView;
+import org.apache.sling.discovery.commons.providers.base.DummyScheduler;
+import org.apache.sling.discovery.commons.providers.spi.LocalClusterView;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.mockito.Mockito;
+import org.osgi.framework.BundleContext;
+
+import com.google.common.io.Closer;
+
+import junitx.util.PrivateAccessor;
+
+@RunWith(Parameterized.class)
+public class JoinerDelayTest {
+
+    private static enum SchedulerType {
+            REAL, DUMMY
+    }
+
+    private final SchedulerType schedulerType;
+    private Closer closer;
+
+    private Scheduler scheduler;
+    private BaseTopologyView view;
+    private Runnable callback;
+    private Semaphore callbackSemaphore;
+
+    private Scheduler createRealScheduler() throws Throwable {
+        final BundleContext ctx = Mockito.mock(BundleContext.class);
+        final Map<String, Object> props = new HashMap<>();
+        final DefaultThreadPoolManager threadPoolManager = new DefaultThreadPoolManager(ctx, new Hashtable<String, Object>());
+        final QuartzScheduler qScheduler = new QuartzScheduler();
+        Scheduler scheduler = new SchedulerServiceFactory();
+        PrivateAccessor.setField(qScheduler, "threadPoolManager", threadPoolManager);
+        PrivateAccessor.invoke(qScheduler, "activate", new Class[] {BundleContext.class,  Map.class}, new Object[] {ctx, props});
+        PrivateAccessor.setField(scheduler, "scheduler", qScheduler);
+
+        closer.register(new Closeable() {
+
+            @Override
+            public void close() throws IOException {
+                try {
+                    PrivateAccessor.invoke(qScheduler, "deactivate", new Class[] {BundleContext.class}, new Object[] {ctx});
+                } catch (Throwable e) {
+                    throw new IOException(e);
+                }
+            }
+
+        });
+
+        return scheduler;
+    }
+
+    private Scheduler createDummyScheduler() {
+        return new DummyScheduler(true);
+    }
+
+    @Parameterized.Parameters(name="{0}")
+    public static Collection<SchedulerType> schedulerTypes() {
+        Collection<SchedulerType> result = new ArrayList<>();
+        result.add(SchedulerType.REAL);
+        result.add(SchedulerType.DUMMY);
+        return result;
+    }
+
+    public JoinerDelayTest(SchedulerType schedulerType) {
+        this.schedulerType = schedulerType;
+    }
+
+    @Before
+    public void setup() throws Throwable {
+        closer = Closer.create();
+        switch(schedulerType) {
+        case REAL : {
+            scheduler = createRealScheduler();
+            break;
+        }
+        case DUMMY : {
+            scheduler = createDummyScheduler();
+            break;
+        }
+        default: {
+            fail("unknown schedulerType : " + schedulerType);
+        }
+        }
+        DefaultClusterView cluster = new DefaultClusterView(UUID.randomUUID().toString());
+        view = new DummyTopologyView()
+                .addInstance(UUID.randomUUID().toString(), cluster, true, true)
+                .addInstance(UUID.randomUUID().toString(), cluster, false, false);
+        callbackSemaphore = new Semaphore(0);
+        callback = new Runnable() {
+
+            @Override
+            public void run() {
+                callbackSemaphore.release();
+            }
+
+        };
+    }
+
+    @After
+    public void teardown() throws Throwable {
+        closer.close();
+    }
+
+    @Test
+    public void testDummyCancelSyncCalls() throws Exception {
+        JoinerDelay joinerDelay = new JoinerDelay(-1, scheduler);
+        joinerDelay.cancelSync();
+        joinerDelay.cancelSync();
+    }
+
+    @Test
+    public void testSync_nulValues() throws Exception {
+        JoinerDelay joinerDelay = new JoinerDelay(1, scheduler);
+        try {
+            joinerDelay.sync(null, null);
+            fail("should fail");
+        } catch(Exception e) {
+            // ok
+        }
+        joinerDelay = new JoinerDelay(1, scheduler);
+        try {
+            joinerDelay.sync(view, null);
+            fail("should fail");
+        } catch(Exception e) {

Review comment:
       Same as above and there are a few more further down.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@sling.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org