You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by "dcapwell (via GitHub)" <gi...@apache.org> on 2023/06/07 18:46:03 UTC

[GitHub] [cassandra-accord] dcapwell commented on a diff in pull request #49: Bootstrap tcm integration

dcapwell commented on code in PR #49:
URL: https://github.com/apache/cassandra-accord/pull/49#discussion_r1221993283


##########
accord-core/src/main/java/accord/impl/AbstractConfigurationService.java:
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.primitives.Ints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.api.ConfigurationService;
+import accord.local.Node;
+import accord.topology.Topology;
+import accord.utils.Invariants;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults;
+
+public abstract class AbstractConfigurationService<EpochState extends AbstractConfigurationService.AbstractEpochState,
+                                                   EpochHistory extends AbstractConfigurationService.AbstractEpochHistory<EpochState>>
+                      implements ConfigurationService
+{
+    private static final Logger logger = LoggerFactory.getLogger(AbstractConfigurationService.class);
+
+    protected final Node.Id localId;
+
+    protected final EpochHistory epochs = createEpochHistory();
+
+    protected final List<Listener> listeners = new ArrayList<>();
+
+    public abstract static class AbstractEpochState
+    {
+        protected final long epoch;
+        protected final AsyncResult.Settable<Topology> received = AsyncResults.settable();
+        protected final AsyncResult.Settable<Void> acknowledged = AsyncResults.settable();
+        protected AsyncResult<Void> reads = null;
+
+        protected Topology topology = null;
+
+        public AbstractEpochState(long epoch)
+        {
+            this.epoch = epoch;
+        }
+
+        public long epoch()
+        {
+            return epoch;
+        }
+
+        @Override
+        public String toString()
+        {
+            return "EpochState{" + epoch + '}';
+        }
+    }
+
+    @VisibleForTesting
+    public abstract static class AbstractEpochHistory<EpochState extends AbstractEpochState>
+    {
+        // TODO (low priority): move pendingEpochs / FetchTopology into here?
+        private List<EpochState> epochs = new ArrayList<>();
+
+        protected long lastReceived = 0;
+        protected long lastAcknowledged = 0;
+
+        protected abstract EpochState createEpochState(long epoch);
+
+        public long minEpoch()
+        {
+            return epochs.isEmpty() ? 0L : epochs.get(0).epoch;
+        }
+
+        public long maxEpoch()
+        {
+            int size = epochs.size();
+            return size == 0 ? 0L : epochs.get(size - 1).epoch;
+        }
+
+        @VisibleForTesting
+        EpochState atIndex(int idx)
+        {
+            return epochs.get(idx);
+        }
+
+        @VisibleForTesting
+        int size()
+        {
+            return epochs.size();
+        }
+
+        EpochState getOrCreate(long epoch)
+        {
+            Invariants.checkArgument(epoch > 0);

Review Comment:
   ```suggestion
               Invariants.checkArgument(epoch > 0, "Epoch must be positive but given %d", epoch);
   ```



##########
accord-core/src/main/java/accord/impl/AbstractConfigurationService.java:
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.primitives.Ints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.api.ConfigurationService;
+import accord.local.Node;
+import accord.topology.Topology;
+import accord.utils.Invariants;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults;
+
+public abstract class AbstractConfigurationService<EpochState extends AbstractConfigurationService.AbstractEpochState,
+                                                   EpochHistory extends AbstractConfigurationService.AbstractEpochHistory<EpochState>>
+                      implements ConfigurationService
+{
+    private static final Logger logger = LoggerFactory.getLogger(AbstractConfigurationService.class);
+
+    protected final Node.Id localId;
+
+    protected final EpochHistory epochs = createEpochHistory();
+
+    protected final List<Listener> listeners = new ArrayList<>();
+
+    public abstract static class AbstractEpochState
+    {
+        protected final long epoch;
+        protected final AsyncResult.Settable<Topology> received = AsyncResults.settable();
+        protected final AsyncResult.Settable<Void> acknowledged = AsyncResults.settable();
+        protected AsyncResult<Void> reads = null;
+
+        protected Topology topology = null;
+
+        public AbstractEpochState(long epoch)
+        {
+            this.epoch = epoch;
+        }
+
+        public long epoch()
+        {
+            return epoch;
+        }
+
+        @Override
+        public String toString()
+        {
+            return "EpochState{" + epoch + '}';
+        }
+    }
+
+    @VisibleForTesting
+    public abstract static class AbstractEpochHistory<EpochState extends AbstractEpochState>
+    {
+        // TODO (low priority): move pendingEpochs / FetchTopology into here?
+        private List<EpochState> epochs = new ArrayList<>();
+
+        protected long lastReceived = 0;
+        protected long lastAcknowledged = 0;
+
+        protected abstract EpochState createEpochState(long epoch);
+
+        public long minEpoch()
+        {
+            return epochs.isEmpty() ? 0L : epochs.get(0).epoch;
+        }
+
+        public long maxEpoch()
+        {
+            int size = epochs.size();
+            return size == 0 ? 0L : epochs.get(size - 1).epoch;
+        }
+
+        @VisibleForTesting
+        EpochState atIndex(int idx)
+        {
+            return epochs.get(idx);
+        }
+
+        @VisibleForTesting
+        int size()
+        {
+            return epochs.size();
+        }
+
+        EpochState getOrCreate(long epoch)
+        {
+            Invariants.checkArgument(epoch > 0);
+            if (epochs.isEmpty())
+            {
+                EpochState state = createEpochState(epoch);
+                epochs.add(state);
+                return state;
+            }
+
+            long minEpoch = minEpoch();
+            if (epoch < minEpoch)
+            {
+                int prepend = Ints.checkedCast(minEpoch - epoch);
+                List<EpochState> next = new ArrayList<>(epochs.size() + prepend);
+                for (long addEpoch=epoch; addEpoch<minEpoch; addEpoch++)
+                    next.add(createEpochState(addEpoch));
+                next.addAll(epochs);
+                epochs = next;
+                minEpoch = minEpoch();
+                Invariants.checkState(minEpoch == epoch);
+            }
+            long maxEpoch = maxEpoch();
+            int idx = Ints.checkedCast(epoch - minEpoch);
+
+            // add any missing epochs
+            for (long addEpoch = maxEpoch + 1; addEpoch <= epoch; addEpoch++)
+                epochs.add(createEpochState(addEpoch));
+
+            return epochs.get(idx);
+        }
+
+        public void receive(Topology topology)
+        {
+            long epoch = topology.epoch();
+            Invariants.checkState(lastReceived == epoch - 1 || epoch == 0 || lastReceived == 0);
+            lastReceived = epoch;
+            EpochState state = getOrCreate(epoch);

Review Comment:
   you are calling `getOrCreate`, so this can't be null right?



##########
accord-core/src/main/java/accord/impl/AbstractConfigurationService.java:
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.primitives.Ints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.api.ConfigurationService;
+import accord.local.Node;
+import accord.topology.Topology;
+import accord.utils.Invariants;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults;
+
+public abstract class AbstractConfigurationService<EpochState extends AbstractConfigurationService.AbstractEpochState,
+                                                   EpochHistory extends AbstractConfigurationService.AbstractEpochHistory<EpochState>>
+                      implements ConfigurationService
+{
+    private static final Logger logger = LoggerFactory.getLogger(AbstractConfigurationService.class);
+
+    protected final Node.Id localId;
+
+    protected final EpochHistory epochs = createEpochHistory();
+
+    protected final List<Listener> listeners = new ArrayList<>();
+
+    public abstract static class AbstractEpochState
+    {
+        protected final long epoch;
+        protected final AsyncResult.Settable<Topology> received = AsyncResults.settable();
+        protected final AsyncResult.Settable<Void> acknowledged = AsyncResults.settable();
+        protected AsyncResult<Void> reads = null;
+
+        protected Topology topology = null;
+
+        public AbstractEpochState(long epoch)
+        {
+            this.epoch = epoch;
+        }
+
+        public long epoch()
+        {
+            return epoch;
+        }
+
+        @Override
+        public String toString()
+        {
+            return "EpochState{" + epoch + '}';
+        }
+    }
+
+    @VisibleForTesting
+    public abstract static class AbstractEpochHistory<EpochState extends AbstractEpochState>
+    {
+        // TODO (low priority): move pendingEpochs / FetchTopology into here?
+        private List<EpochState> epochs = new ArrayList<>();
+
+        protected long lastReceived = 0;
+        protected long lastAcknowledged = 0;
+
+        protected abstract EpochState createEpochState(long epoch);
+
+        public long minEpoch()
+        {
+            return epochs.isEmpty() ? 0L : epochs.get(0).epoch;
+        }
+
+        public long maxEpoch()
+        {
+            int size = epochs.size();
+            return size == 0 ? 0L : epochs.get(size - 1).epoch;
+        }
+
+        @VisibleForTesting
+        EpochState atIndex(int idx)
+        {
+            return epochs.get(idx);
+        }
+
+        @VisibleForTesting
+        int size()
+        {
+            return epochs.size();
+        }
+
+        EpochState getOrCreate(long epoch)
+        {
+            Invariants.checkArgument(epoch > 0);
+            if (epochs.isEmpty())
+            {
+                EpochState state = createEpochState(epoch);
+                epochs.add(state);
+                return state;
+            }
+
+            long minEpoch = minEpoch();
+            if (epoch < minEpoch)
+            {
+                int prepend = Ints.checkedCast(minEpoch - epoch);
+                List<EpochState> next = new ArrayList<>(epochs.size() + prepend);
+                for (long addEpoch=epoch; addEpoch<minEpoch; addEpoch++)
+                    next.add(createEpochState(addEpoch));
+                next.addAll(epochs);
+                epochs = next;
+                minEpoch = minEpoch();
+                Invariants.checkState(minEpoch == epoch);
+            }
+            long maxEpoch = maxEpoch();
+            int idx = Ints.checkedCast(epoch - minEpoch);
+
+            // add any missing epochs
+            for (long addEpoch = maxEpoch + 1; addEpoch <= epoch; addEpoch++)
+                epochs.add(createEpochState(addEpoch));
+
+            return epochs.get(idx);
+        }
+
+        public void receive(Topology topology)
+        {
+            long epoch = topology.epoch();
+            Invariants.checkState(lastReceived == epoch - 1 || epoch == 0 || lastReceived == 0);

Review Comment:
   ```suggestion
               Invariants.checkState(lastReceived == epoch - 1 || epoch == 0 || lastReceived == 0, "Epoch %d != %d + 1", epoch, lastReceived);
   ```



##########
accord-core/src/main/java/accord/impl/AbstractConfigurationService.java:
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.primitives.Ints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.api.ConfigurationService;
+import accord.local.Node;
+import accord.topology.Topology;
+import accord.utils.Invariants;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults;
+
+public abstract class AbstractConfigurationService<EpochState extends AbstractConfigurationService.AbstractEpochState,
+                                                   EpochHistory extends AbstractConfigurationService.AbstractEpochHistory<EpochState>>
+                      implements ConfigurationService
+{
+    private static final Logger logger = LoggerFactory.getLogger(AbstractConfigurationService.class);
+
+    protected final Node.Id localId;
+
+    protected final EpochHistory epochs = createEpochHistory();
+
+    protected final List<Listener> listeners = new ArrayList<>();
+
+    public abstract static class AbstractEpochState
+    {
+        protected final long epoch;
+        protected final AsyncResult.Settable<Topology> received = AsyncResults.settable();
+        protected final AsyncResult.Settable<Void> acknowledged = AsyncResults.settable();
+        protected AsyncResult<Void> reads = null;
+
+        protected Topology topology = null;
+
+        public AbstractEpochState(long epoch)
+        {
+            this.epoch = epoch;
+        }
+
+        public long epoch()
+        {
+            return epoch;
+        }
+
+        @Override
+        public String toString()
+        {
+            return "EpochState{" + epoch + '}';
+        }
+    }
+
+    @VisibleForTesting
+    public abstract static class AbstractEpochHistory<EpochState extends AbstractEpochState>
+    {
+        // TODO (low priority): move pendingEpochs / FetchTopology into here?
+        private List<EpochState> epochs = new ArrayList<>();
+
+        protected long lastReceived = 0;
+        protected long lastAcknowledged = 0;
+
+        protected abstract EpochState createEpochState(long epoch);
+
+        public long minEpoch()
+        {
+            return epochs.isEmpty() ? 0L : epochs.get(0).epoch;
+        }
+
+        public long maxEpoch()
+        {
+            int size = epochs.size();
+            return size == 0 ? 0L : epochs.get(size - 1).epoch;
+        }
+
+        @VisibleForTesting
+        EpochState atIndex(int idx)
+        {
+            return epochs.get(idx);
+        }
+
+        @VisibleForTesting
+        int size()
+        {
+            return epochs.size();
+        }
+
+        EpochState getOrCreate(long epoch)
+        {
+            Invariants.checkArgument(epoch > 0);
+            if (epochs.isEmpty())
+            {
+                EpochState state = createEpochState(epoch);
+                epochs.add(state);
+                return state;
+            }
+
+            long minEpoch = minEpoch();
+            if (epoch < minEpoch)
+            {
+                int prepend = Ints.checkedCast(minEpoch - epoch);
+                List<EpochState> next = new ArrayList<>(epochs.size() + prepend);
+                for (long addEpoch=epoch; addEpoch<minEpoch; addEpoch++)
+                    next.add(createEpochState(addEpoch));
+                next.addAll(epochs);
+                epochs = next;
+                minEpoch = minEpoch();
+                Invariants.checkState(minEpoch == epoch);
+            }
+            long maxEpoch = maxEpoch();
+            int idx = Ints.checkedCast(epoch - minEpoch);
+
+            // add any missing epochs
+            for (long addEpoch = maxEpoch + 1; addEpoch <= epoch; addEpoch++)
+                epochs.add(createEpochState(addEpoch));
+
+            return epochs.get(idx);
+        }
+
+        public void receive(Topology topology)
+        {
+            long epoch = topology.epoch();
+            Invariants.checkState(lastReceived == epoch - 1 || epoch == 0 || lastReceived == 0);
+            lastReceived = epoch;
+            EpochState state = getOrCreate(epoch);
+            if (state != null)
+            {
+                state.topology = topology;
+                state.received.setSuccess(topology);
+            }
+        }
+
+        AsyncResult<Topology> receiveFuture(long epoch)
+        {
+            return getOrCreate(epoch).received;
+        }
+
+        Topology topologyFor(long epoch)
+        {
+            return getOrCreate(epoch).topology;
+        }
+
+        public void acknowledge(EpochReady ready)
+        {
+            long epoch = ready.epoch;
+            Invariants.checkState(lastAcknowledged == epoch - 1 || epoch == 0 || lastAcknowledged == 0);

Review Comment:
   ```suggestion
               Invariants.checkState(lastAcknowledged == epoch - 1 || epoch == 0 || lastAcknowledged == 0, "Epoch %d != %d + 1", epoch, lastAcknowledged);
   ```



##########
accord-core/src/main/java/accord/impl/AbstractConfigurationService.java:
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.primitives.Ints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.api.ConfigurationService;
+import accord.local.Node;
+import accord.topology.Topology;
+import accord.utils.Invariants;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults;
+
+public abstract class AbstractConfigurationService<EpochState extends AbstractConfigurationService.AbstractEpochState,
+                                                   EpochHistory extends AbstractConfigurationService.AbstractEpochHistory<EpochState>>
+                      implements ConfigurationService
+{
+    private static final Logger logger = LoggerFactory.getLogger(AbstractConfigurationService.class);
+
+    protected final Node.Id localId;
+
+    protected final EpochHistory epochs = createEpochHistory();
+
+    protected final List<Listener> listeners = new ArrayList<>();
+
+    public abstract static class AbstractEpochState
+    {
+        protected final long epoch;
+        protected final AsyncResult.Settable<Topology> received = AsyncResults.settable();
+        protected final AsyncResult.Settable<Void> acknowledged = AsyncResults.settable();
+        protected AsyncResult<Void> reads = null;
+
+        protected Topology topology = null;
+
+        public AbstractEpochState(long epoch)
+        {
+            this.epoch = epoch;
+        }
+
+        public long epoch()
+        {
+            return epoch;
+        }
+
+        @Override
+        public String toString()
+        {
+            return "EpochState{" + epoch + '}';
+        }
+    }
+
+    @VisibleForTesting
+    public abstract static class AbstractEpochHistory<EpochState extends AbstractEpochState>
+    {
+        // TODO (low priority): move pendingEpochs / FetchTopology into here?
+        private List<EpochState> epochs = new ArrayList<>();
+
+        protected long lastReceived = 0;
+        protected long lastAcknowledged = 0;
+
+        protected abstract EpochState createEpochState(long epoch);
+
+        public long minEpoch()
+        {
+            return epochs.isEmpty() ? 0L : epochs.get(0).epoch;
+        }
+
+        public long maxEpoch()
+        {
+            int size = epochs.size();
+            return size == 0 ? 0L : epochs.get(size - 1).epoch;
+        }
+
+        @VisibleForTesting
+        EpochState atIndex(int idx)
+        {
+            return epochs.get(idx);
+        }
+
+        @VisibleForTesting
+        int size()
+        {
+            return epochs.size();
+        }
+
+        EpochState getOrCreate(long epoch)
+        {
+            Invariants.checkArgument(epoch > 0);
+            if (epochs.isEmpty())
+            {
+                EpochState state = createEpochState(epoch);
+                epochs.add(state);
+                return state;
+            }
+
+            long minEpoch = minEpoch();
+            if (epoch < minEpoch)
+            {
+                int prepend = Ints.checkedCast(minEpoch - epoch);
+                List<EpochState> next = new ArrayList<>(epochs.size() + prepend);
+                for (long addEpoch=epoch; addEpoch<minEpoch; addEpoch++)
+                    next.add(createEpochState(addEpoch));
+                next.addAll(epochs);
+                epochs = next;
+                minEpoch = minEpoch();
+                Invariants.checkState(minEpoch == epoch);
+            }
+            long maxEpoch = maxEpoch();
+            int idx = Ints.checkedCast(epoch - minEpoch);
+
+            // add any missing epochs
+            for (long addEpoch = maxEpoch + 1; addEpoch <= epoch; addEpoch++)
+                epochs.add(createEpochState(addEpoch));
+
+            return epochs.get(idx);
+        }
+
+        public void receive(Topology topology)
+        {
+            long epoch = topology.epoch();
+            Invariants.checkState(lastReceived == epoch - 1 || epoch == 0 || lastReceived == 0);
+            lastReceived = epoch;
+            EpochState state = getOrCreate(epoch);
+            if (state != null)
+            {
+                state.topology = topology;
+                state.received.setSuccess(topology);
+            }
+        }
+
+        AsyncResult<Topology> receiveFuture(long epoch)
+        {
+            return getOrCreate(epoch).received;
+        }
+
+        Topology topologyFor(long epoch)
+        {
+            return getOrCreate(epoch).topology;
+        }
+
+        public void acknowledge(EpochReady ready)
+        {
+            long epoch = ready.epoch;
+            Invariants.checkState(lastAcknowledged == epoch - 1 || epoch == 0 || lastAcknowledged == 0);
+            lastAcknowledged = epoch;
+            EpochState state = getOrCreate(epoch);
+            state.reads = ready.reads;
+            state.acknowledged.setSuccess(null);
+        }
+
+        AsyncResult<Void> acknowledgeFuture(long epoch)
+        {
+            return getOrCreate(epoch).acknowledged;
+        }
+
+        void truncateUntil(long epoch)
+        {
+            Invariants.checkArgument(epoch <= maxEpoch());
+            long minEpoch = minEpoch();
+            int toTrim = Ints.checkedCast(epoch - minEpoch);
+            if (toTrim <=0)
+                return;
+
+            epochs = new ArrayList<>(epochs.subList(toTrim, epochs.size()));
+        }
+    }
+
+    public AbstractConfigurationService(Node.Id localId)
+    {
+        this.localId = localId;
+    }
+
+    protected abstract EpochHistory createEpochHistory();
+
+    protected EpochState getOrCreateEpochState(long epoch)
+    {
+        return epochs.getOrCreate(epoch);
+    }
+
+    @Override
+    public synchronized void registerListener(Listener listener)
+    {
+        listeners.add(listener);
+    }
+
+    @Override
+    public synchronized Topology currentTopology()
+    {
+        return epochs.topologyFor(epochs.lastReceived);
+    }
+
+    @Override
+    public synchronized Topology getTopologyForEpoch(long epoch)

Review Comment:
   this is nullable here... so can be confusing and likely lead to NPEs; feel we should return `AsyncResult` instead



##########
accord-core/src/main/java/accord/impl/AbstractConfigurationService.java:
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.primitives.Ints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.api.ConfigurationService;
+import accord.local.Node;
+import accord.topology.Topology;
+import accord.utils.Invariants;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults;
+
+public abstract class AbstractConfigurationService<EpochState extends AbstractConfigurationService.AbstractEpochState,
+                                                   EpochHistory extends AbstractConfigurationService.AbstractEpochHistory<EpochState>>
+                      implements ConfigurationService
+{
+    private static final Logger logger = LoggerFactory.getLogger(AbstractConfigurationService.class);
+
+    protected final Node.Id localId;
+
+    protected final EpochHistory epochs = createEpochHistory();
+
+    protected final List<Listener> listeners = new ArrayList<>();
+
+    public abstract static class AbstractEpochState
+    {
+        protected final long epoch;
+        protected final AsyncResult.Settable<Topology> received = AsyncResults.settable();
+        protected final AsyncResult.Settable<Void> acknowledged = AsyncResults.settable();
+        protected AsyncResult<Void> reads = null;
+
+        protected Topology topology = null;
+
+        public AbstractEpochState(long epoch)
+        {
+            this.epoch = epoch;
+        }
+
+        public long epoch()
+        {
+            return epoch;
+        }
+
+        @Override
+        public String toString()
+        {
+            return "EpochState{" + epoch + '}';
+        }
+    }
+
+    @VisibleForTesting
+    public abstract static class AbstractEpochHistory<EpochState extends AbstractEpochState>
+    {
+        // TODO (low priority): move pendingEpochs / FetchTopology into here?
+        private List<EpochState> epochs = new ArrayList<>();
+
+        protected long lastReceived = 0;
+        protected long lastAcknowledged = 0;
+
+        protected abstract EpochState createEpochState(long epoch);
+
+        public long minEpoch()
+        {
+            return epochs.isEmpty() ? 0L : epochs.get(0).epoch;
+        }
+
+        public long maxEpoch()
+        {
+            int size = epochs.size();
+            return size == 0 ? 0L : epochs.get(size - 1).epoch;
+        }
+
+        @VisibleForTesting
+        EpochState atIndex(int idx)
+        {
+            return epochs.get(idx);
+        }
+
+        @VisibleForTesting
+        int size()
+        {
+            return epochs.size();
+        }
+
+        EpochState getOrCreate(long epoch)
+        {
+            Invariants.checkArgument(epoch > 0);
+            if (epochs.isEmpty())
+            {
+                EpochState state = createEpochState(epoch);
+                epochs.add(state);
+                return state;
+            }
+
+            long minEpoch = minEpoch();
+            if (epoch < minEpoch)
+            {
+                int prepend = Ints.checkedCast(minEpoch - epoch);
+                List<EpochState> next = new ArrayList<>(epochs.size() + prepend);
+                for (long addEpoch=epoch; addEpoch<minEpoch; addEpoch++)
+                    next.add(createEpochState(addEpoch));
+                next.addAll(epochs);
+                epochs = next;
+                minEpoch = minEpoch();
+                Invariants.checkState(minEpoch == epoch);
+            }
+            long maxEpoch = maxEpoch();
+            int idx = Ints.checkedCast(epoch - minEpoch);
+
+            // add any missing epochs
+            for (long addEpoch = maxEpoch + 1; addEpoch <= epoch; addEpoch++)
+                epochs.add(createEpochState(addEpoch));
+
+            return epochs.get(idx);
+        }
+
+        public void receive(Topology topology)
+        {
+            long epoch = topology.epoch();
+            Invariants.checkState(lastReceived == epoch - 1 || epoch == 0 || lastReceived == 0);
+            lastReceived = epoch;
+            EpochState state = getOrCreate(epoch);
+            if (state != null)
+            {
+                state.topology = topology;
+                state.received.setSuccess(topology);
+            }
+        }
+
+        AsyncResult<Topology> receiveFuture(long epoch)
+        {
+            return getOrCreate(epoch).received;
+        }
+
+        Topology topologyFor(long epoch)
+        {
+            return getOrCreate(epoch).topology;
+        }
+
+        public void acknowledge(EpochReady ready)
+        {
+            long epoch = ready.epoch;
+            Invariants.checkState(lastAcknowledged == epoch - 1 || epoch == 0 || lastAcknowledged == 0);
+            lastAcknowledged = epoch;
+            EpochState state = getOrCreate(epoch);
+            state.reads = ready.reads;
+            state.acknowledged.setSuccess(null);
+        }
+
+        AsyncResult<Void> acknowledgeFuture(long epoch)
+        {
+            return getOrCreate(epoch).acknowledged;
+        }
+
+        void truncateUntil(long epoch)
+        {
+            Invariants.checkArgument(epoch <= maxEpoch());
+            long minEpoch = minEpoch();
+            int toTrim = Ints.checkedCast(epoch - minEpoch);
+            if (toTrim <=0)
+                return;
+
+            epochs = new ArrayList<>(epochs.subList(toTrim, epochs.size()));
+        }
+    }
+
+    public AbstractConfigurationService(Node.Id localId)
+    {
+        this.localId = localId;
+    }
+
+    protected abstract EpochHistory createEpochHistory();
+
+    protected EpochState getOrCreateEpochState(long epoch)
+    {
+        return epochs.getOrCreate(epoch);
+    }
+
+    @Override
+    public synchronized void registerListener(Listener listener)
+    {
+        listeners.add(listener);
+    }
+
+    @Override
+    public synchronized Topology currentTopology()
+    {
+        return epochs.topologyFor(epochs.lastReceived);
+    }
+
+    @Override
+    public synchronized Topology getTopologyForEpoch(long epoch)
+    {
+        return epochs.topologyFor(epoch);
+    }
+
+    protected abstract void fetchTopologyInternal(long epoch);
+
+    @Override
+    public synchronized void fetchTopologyForEpoch(long epoch)
+    {
+        if (epoch <= epochs.lastReceived)
+            return;
+
+        fetchTopologyInternal(epoch);
+    }
+
+    protected abstract void localSyncComplete(Topology topology);
+
+    @Override
+    public synchronized void acknowledgeEpoch(EpochReady ready)
+    {
+        ready.metadata.addCallback(() -> epochs.acknowledge(ready));
+        ready.coordination.addCallback(() ->  localSyncComplete(epochs.getOrCreate(ready.epoch).topology));
+    }
+
+    protected void topologyUpdatePreListenerNotify(Topology topology) {}
+    protected void topologyUpdatePostListenerNotify(Topology topology) {}
+
+    public synchronized AsyncResult<Void> reportTopology(Topology topology, boolean startSync)

Review Comment:
   why have `boolean startSync`?



##########
accord-core/src/main/java/accord/impl/AbstractConfigurationService.java:
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.primitives.Ints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.api.ConfigurationService;
+import accord.local.Node;
+import accord.topology.Topology;
+import accord.utils.Invariants;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults;
+
+public abstract class AbstractConfigurationService<EpochState extends AbstractConfigurationService.AbstractEpochState,
+                                                   EpochHistory extends AbstractConfigurationService.AbstractEpochHistory<EpochState>>
+                      implements ConfigurationService
+{
+    private static final Logger logger = LoggerFactory.getLogger(AbstractConfigurationService.class);
+
+    protected final Node.Id localId;
+
+    protected final EpochHistory epochs = createEpochHistory();
+
+    protected final List<Listener> listeners = new ArrayList<>();
+
+    public abstract static class AbstractEpochState
+    {
+        protected final long epoch;
+        protected final AsyncResult.Settable<Topology> received = AsyncResults.settable();
+        protected final AsyncResult.Settable<Void> acknowledged = AsyncResults.settable();
+        protected AsyncResult<Void> reads = null;
+
+        protected Topology topology = null;
+
+        public AbstractEpochState(long epoch)
+        {
+            this.epoch = epoch;
+        }
+
+        public long epoch()
+        {
+            return epoch;
+        }
+
+        @Override
+        public String toString()
+        {
+            return "EpochState{" + epoch + '}';
+        }
+    }
+
+    @VisibleForTesting
+    public abstract static class AbstractEpochHistory<EpochState extends AbstractEpochState>
+    {
+        // TODO (low priority): move pendingEpochs / FetchTopology into here?
+        private List<EpochState> epochs = new ArrayList<>();
+
+        protected long lastReceived = 0;
+        protected long lastAcknowledged = 0;
+
+        protected abstract EpochState createEpochState(long epoch);
+
+        public long minEpoch()
+        {
+            return epochs.isEmpty() ? 0L : epochs.get(0).epoch;
+        }
+
+        public long maxEpoch()
+        {
+            int size = epochs.size();
+            return size == 0 ? 0L : epochs.get(size - 1).epoch;
+        }
+
+        @VisibleForTesting
+        EpochState atIndex(int idx)
+        {
+            return epochs.get(idx);
+        }
+
+        @VisibleForTesting
+        int size()
+        {
+            return epochs.size();
+        }
+
+        EpochState getOrCreate(long epoch)
+        {
+            Invariants.checkArgument(epoch > 0);
+            if (epochs.isEmpty())
+            {
+                EpochState state = createEpochState(epoch);
+                epochs.add(state);
+                return state;
+            }
+
+            long minEpoch = minEpoch();
+            if (epoch < minEpoch)
+            {
+                int prepend = Ints.checkedCast(minEpoch - epoch);
+                List<EpochState> next = new ArrayList<>(epochs.size() + prepend);
+                for (long addEpoch=epoch; addEpoch<minEpoch; addEpoch++)
+                    next.add(createEpochState(addEpoch));
+                next.addAll(epochs);
+                epochs = next;
+                minEpoch = minEpoch();
+                Invariants.checkState(minEpoch == epoch);
+            }
+            long maxEpoch = maxEpoch();
+            int idx = Ints.checkedCast(epoch - minEpoch);
+
+            // add any missing epochs
+            for (long addEpoch = maxEpoch + 1; addEpoch <= epoch; addEpoch++)
+                epochs.add(createEpochState(addEpoch));
+
+            return epochs.get(idx);
+        }
+
+        public void receive(Topology topology)
+        {
+            long epoch = topology.epoch();
+            Invariants.checkState(lastReceived == epoch - 1 || epoch == 0 || lastReceived == 0);
+            lastReceived = epoch;
+            EpochState state = getOrCreate(epoch);
+            if (state != null)
+            {
+                state.topology = topology;
+                state.received.setSuccess(topology);
+            }
+        }
+
+        AsyncResult<Topology> receiveFuture(long epoch)
+        {
+            return getOrCreate(epoch).received;
+        }
+
+        Topology topologyFor(long epoch)
+        {
+            return getOrCreate(epoch).topology;
+        }
+
+        public void acknowledge(EpochReady ready)
+        {
+            long epoch = ready.epoch;
+            Invariants.checkState(lastAcknowledged == epoch - 1 || epoch == 0 || lastAcknowledged == 0);
+            lastAcknowledged = epoch;
+            EpochState state = getOrCreate(epoch);
+            state.reads = ready.reads;
+            state.acknowledged.setSuccess(null);
+        }
+
+        AsyncResult<Void> acknowledgeFuture(long epoch)
+        {
+            return getOrCreate(epoch).acknowledged;
+        }
+
+        void truncateUntil(long epoch)
+        {
+            Invariants.checkArgument(epoch <= maxEpoch());
+            long minEpoch = minEpoch();
+            int toTrim = Ints.checkedCast(epoch - minEpoch);
+            if (toTrim <=0)
+                return;
+
+            epochs = new ArrayList<>(epochs.subList(toTrim, epochs.size()));
+        }
+    }
+
+    public AbstractConfigurationService(Node.Id localId)
+    {
+        this.localId = localId;
+    }
+
+    protected abstract EpochHistory createEpochHistory();
+
+    protected EpochState getOrCreateEpochState(long epoch)
+    {
+        return epochs.getOrCreate(epoch);
+    }
+
+    @Override
+    public synchronized void registerListener(Listener listener)
+    {
+        listeners.add(listener);
+    }
+
+    @Override
+    public synchronized Topology currentTopology()
+    {
+        return epochs.topologyFor(epochs.lastReceived);
+    }
+
+    @Override
+    public synchronized Topology getTopologyForEpoch(long epoch)
+    {
+        return epochs.topologyFor(epoch);
+    }
+
+    protected abstract void fetchTopologyInternal(long epoch);
+
+    @Override
+    public synchronized void fetchTopologyForEpoch(long epoch)
+    {
+        if (epoch <= epochs.lastReceived)
+            return;
+
+        fetchTopologyInternal(epoch);
+    }
+
+    protected abstract void localSyncComplete(Topology topology);
+
+    @Override
+    public synchronized void acknowledgeEpoch(EpochReady ready)
+    {
+        ready.metadata.addCallback(() -> epochs.acknowledge(ready));
+        ready.coordination.addCallback(() ->  localSyncComplete(epochs.getOrCreate(ready.epoch).topology));
+    }
+
+    protected void topologyUpdatePreListenerNotify(Topology topology) {}
+    protected void topologyUpdatePostListenerNotify(Topology topology) {}
+
+    public synchronized AsyncResult<Void> reportTopology(Topology topology, boolean startSync)
+    {
+        long lastReceived = epochs.lastReceived;
+        if (topology.epoch() <= lastReceived)
+            return AsyncResults.success(null);
+
+        if (lastReceived > 0 && topology.epoch() > lastReceived + 1)
+        {
+            fetchTopologyForEpoch(lastReceived + 1);
+            epochs.receiveFuture(lastReceived + 1).addCallback(() -> reportTopology(topology, startSync));
+            return AsyncResults.success(null);

Review Comment:
   this API is said void, but every instance is not async and always success... why is this async?  Shouldn't it be based off the epoch being acked?



##########
accord-core/src/main/java/accord/impl/AbstractConfigurationService.java:
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.primitives.Ints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.api.ConfigurationService;
+import accord.local.Node;
+import accord.topology.Topology;
+import accord.utils.Invariants;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults;
+
+public abstract class AbstractConfigurationService<EpochState extends AbstractConfigurationService.AbstractEpochState,
+                                                   EpochHistory extends AbstractConfigurationService.AbstractEpochHistory<EpochState>>
+                      implements ConfigurationService
+{
+    private static final Logger logger = LoggerFactory.getLogger(AbstractConfigurationService.class);
+
+    protected final Node.Id localId;
+
+    protected final EpochHistory epochs = createEpochHistory();
+
+    protected final List<Listener> listeners = new ArrayList<>();
+
+    public abstract static class AbstractEpochState
+    {
+        protected final long epoch;
+        protected final AsyncResult.Settable<Topology> received = AsyncResults.settable();
+        protected final AsyncResult.Settable<Void> acknowledged = AsyncResults.settable();
+        protected AsyncResult<Void> reads = null;
+
+        protected Topology topology = null;
+
+        public AbstractEpochState(long epoch)
+        {
+            this.epoch = epoch;
+        }
+
+        public long epoch()
+        {
+            return epoch;
+        }
+
+        @Override
+        public String toString()
+        {
+            return "EpochState{" + epoch + '}';
+        }
+    }
+
+    @VisibleForTesting
+    public abstract static class AbstractEpochHistory<EpochState extends AbstractEpochState>
+    {
+        // TODO (low priority): move pendingEpochs / FetchTopology into here?
+        private List<EpochState> epochs = new ArrayList<>();
+
+        protected long lastReceived = 0;
+        protected long lastAcknowledged = 0;
+
+        protected abstract EpochState createEpochState(long epoch);
+
+        public long minEpoch()
+        {
+            return epochs.isEmpty() ? 0L : epochs.get(0).epoch;
+        }
+
+        public long maxEpoch()
+        {
+            int size = epochs.size();
+            return size == 0 ? 0L : epochs.get(size - 1).epoch;
+        }
+
+        @VisibleForTesting
+        EpochState atIndex(int idx)
+        {
+            return epochs.get(idx);
+        }
+
+        @VisibleForTesting
+        int size()
+        {
+            return epochs.size();
+        }
+
+        EpochState getOrCreate(long epoch)
+        {
+            Invariants.checkArgument(epoch > 0);
+            if (epochs.isEmpty())
+            {
+                EpochState state = createEpochState(epoch);
+                epochs.add(state);
+                return state;
+            }
+
+            long minEpoch = minEpoch();
+            if (epoch < minEpoch)
+            {
+                int prepend = Ints.checkedCast(minEpoch - epoch);
+                List<EpochState> next = new ArrayList<>(epochs.size() + prepend);
+                for (long addEpoch=epoch; addEpoch<minEpoch; addEpoch++)
+                    next.add(createEpochState(addEpoch));
+                next.addAll(epochs);
+                epochs = next;
+                minEpoch = minEpoch();
+                Invariants.checkState(minEpoch == epoch);
+            }
+            long maxEpoch = maxEpoch();
+            int idx = Ints.checkedCast(epoch - minEpoch);
+
+            // add any missing epochs
+            for (long addEpoch = maxEpoch + 1; addEpoch <= epoch; addEpoch++)
+                epochs.add(createEpochState(addEpoch));
+
+            return epochs.get(idx);
+        }
+
+        public void receive(Topology topology)
+        {
+            long epoch = topology.epoch();
+            Invariants.checkState(lastReceived == epoch - 1 || epoch == 0 || lastReceived == 0);
+            lastReceived = epoch;
+            EpochState state = getOrCreate(epoch);
+            if (state != null)
+            {
+                state.topology = topology;
+                state.received.setSuccess(topology);
+            }
+        }
+
+        AsyncResult<Topology> receiveFuture(long epoch)
+        {
+            return getOrCreate(epoch).received;
+        }
+
+        Topology topologyFor(long epoch)
+        {
+            return getOrCreate(epoch).topology;
+        }
+
+        public void acknowledge(EpochReady ready)
+        {
+            long epoch = ready.epoch;
+            Invariants.checkState(lastAcknowledged == epoch - 1 || epoch == 0 || lastAcknowledged == 0);
+            lastAcknowledged = epoch;
+            EpochState state = getOrCreate(epoch);
+            state.reads = ready.reads;
+            state.acknowledged.setSuccess(null);
+        }
+
+        AsyncResult<Void> acknowledgeFuture(long epoch)
+        {
+            return getOrCreate(epoch).acknowledged;
+        }
+
+        void truncateUntil(long epoch)
+        {
+            Invariants.checkArgument(epoch <= maxEpoch());
+            long minEpoch = minEpoch();
+            int toTrim = Ints.checkedCast(epoch - minEpoch);
+            if (toTrim <=0)
+                return;
+
+            epochs = new ArrayList<>(epochs.subList(toTrim, epochs.size()));
+        }
+    }
+
+    public AbstractConfigurationService(Node.Id localId)
+    {
+        this.localId = localId;
+    }
+
+    protected abstract EpochHistory createEpochHistory();
+
+    protected EpochState getOrCreateEpochState(long epoch)
+    {
+        return epochs.getOrCreate(epoch);
+    }
+
+    @Override
+    public synchronized void registerListener(Listener listener)
+    {
+        listeners.add(listener);
+    }
+
+    @Override
+    public synchronized Topology currentTopology()
+    {
+        return epochs.topologyFor(epochs.lastReceived);
+    }
+
+    @Override
+    public synchronized Topology getTopologyForEpoch(long epoch)
+    {
+        return epochs.topologyFor(epoch);
+    }
+
+    protected abstract void fetchTopologyInternal(long epoch);
+
+    @Override
+    public synchronized void fetchTopologyForEpoch(long epoch)
+    {
+        if (epoch <= epochs.lastReceived)
+            return;
+
+        fetchTopologyInternal(epoch);
+    }
+
+    protected abstract void localSyncComplete(Topology topology);
+
+    @Override
+    public synchronized void acknowledgeEpoch(EpochReady ready)
+    {
+        ready.metadata.addCallback(() -> epochs.acknowledge(ready));
+        ready.coordination.addCallback(() ->  localSyncComplete(epochs.getOrCreate(ready.epoch).topology));
+    }
+
+    protected void topologyUpdatePreListenerNotify(Topology topology) {}
+    protected void topologyUpdatePostListenerNotify(Topology topology) {}
+
+    public synchronized AsyncResult<Void> reportTopology(Topology topology, boolean startSync)
+    {
+        long lastReceived = epochs.lastReceived;
+        if (topology.epoch() <= lastReceived)
+            return AsyncResults.success(null);
+
+        if (lastReceived > 0 && topology.epoch() > lastReceived + 1)
+        {
+            fetchTopologyForEpoch(lastReceived + 1);
+            epochs.receiveFuture(lastReceived + 1).addCallback(() -> reportTopology(topology, startSync));
+            return AsyncResults.success(null);
+        }
+
+        long lastAcked = epochs.lastAcknowledged;
+        if (lastAcked > 0 && topology.epoch() > lastAcked + 1)
+        {
+            epochs.acknowledgeFuture(lastAcked + 1).addCallback(() -> reportTopology(topology, startSync));
+            return AsyncResults.success(null);
+        }
+        logger.trace("Epoch {} received by {}", topology.epoch(), localId);
+
+        epochs.receive(topology);
+        topologyUpdatePreListenerNotify(topology);
+        for (Listener listener : listeners)
+            listener.onTopologyUpdate(topology, startSync);
+        topologyUpdatePostListenerNotify(topology);
+        return AsyncResults.success(null);
+    }
+
+    public synchronized AsyncResult<Void> reportTopology(Topology topology)
+    {
+        return reportTopology(topology, true);
+    }
+
+    protected void remoteSyncCompletePreListenerNotify(Node.Id node, long epoch) {}
+
+    public synchronized void remoteSyncComplete(Node.Id node, long epoch)
+    {
+        remoteSyncCompletePreListenerNotify(node, epoch);
+        for (Listener listener : listeners)
+            listener.onRemoteSyncComplete(node, epoch);
+    }
+
+    protected void truncateTopologiesPreListenerNotify(long epoch) {}
+    protected void truncateTopologiesPostListenerNotify(long epoch) {}
+
+    public synchronized void truncateTopologiesUntil(long epoch)
+    {
+        truncateTopologiesPreListenerNotify(epoch);
+        for (Listener listener : listeners)
+            listener.truncateTopologyUntil(epoch);
+        truncateTopologiesPostListenerNotify(epoch);
+        epochs.truncateUntil(epoch);
+    }
+
+    public synchronized AsyncResult<Void> epochReady(long epoch)
+    {
+        EpochState state = epochs.getOrCreate(epoch);
+        if (state.reads != null)
+            return state.reads;
+
+        return state.acknowledged.flatMap(r -> state.reads).beginAsResult();

Review Comment:
   maybe return `AsyncChain` to avoid `beginAsResult`?



##########
accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java:
##########
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.api.Data;
+import accord.api.DataStore;
+import accord.coordinate.FetchCoordinator;
+import accord.local.CommandStore;
+import accord.local.Node;
+import accord.local.Status;
+import accord.messages.Callback;
+import accord.messages.MessageType;
+import accord.messages.ReadData;
+import accord.messages.WaitAndReadData;
+import accord.primitives.PartialDeps;
+import accord.primitives.PartialTxn;
+import accord.primitives.Ranges;
+import accord.primitives.SyncPoint;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.utils.Invariants;
+import accord.utils.async.AsyncChains;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults;
+
+import static accord.primitives.Routables.Slice.Minimal;
+
+public abstract class AbstractFetchCoordinator extends FetchCoordinator
+{
+    private static final Logger logger = LoggerFactory.getLogger(AbstractFetchCoordinator.class);
+
+    static class FetchResult extends AsyncResults.SettableResult<Ranges> implements DataStore.FetchResult
+    {
+        final AbstractFetchCoordinator coordinator;
+
+        FetchResult(AbstractFetchCoordinator coordinator)
+        {
+            this.coordinator = coordinator;
+        }
+
+        @Override
+        public void abort(Ranges abort)
+        {
+            coordinator.abort(abort);
+        }
+    }
+
+    static class Key
+    {
+        final Node.Id id;
+        final Ranges ranges;
+
+        Key(Node.Id id, Ranges ranges)
+        {
+            this.id = id;
+            this.ranges = ranges;
+        }
+
+        @Override
+        public int hashCode()
+                         {
+                            return id.hashCode() + ranges.hashCode();
+                                                                     }

Review Comment:
   formatting



##########
accord-core/src/main/java/accord/impl/AbstractConfigurationService.java:
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.primitives.Ints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.api.ConfigurationService;
+import accord.local.Node;
+import accord.topology.Topology;
+import accord.utils.Invariants;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults;
+
+public abstract class AbstractConfigurationService<EpochState extends AbstractConfigurationService.AbstractEpochState,
+                                                   EpochHistory extends AbstractConfigurationService.AbstractEpochHistory<EpochState>>
+                      implements ConfigurationService
+{
+    private static final Logger logger = LoggerFactory.getLogger(AbstractConfigurationService.class);
+
+    protected final Node.Id localId;
+
+    protected final EpochHistory epochs = createEpochHistory();
+
+    protected final List<Listener> listeners = new ArrayList<>();
+
+    public abstract static class AbstractEpochState
+    {
+        protected final long epoch;
+        protected final AsyncResult.Settable<Topology> received = AsyncResults.settable();
+        protected final AsyncResult.Settable<Void> acknowledged = AsyncResults.settable();
+        protected AsyncResult<Void> reads = null;
+
+        protected Topology topology = null;
+
+        public AbstractEpochState(long epoch)
+        {
+            this.epoch = epoch;
+        }
+
+        public long epoch()
+        {
+            return epoch;
+        }
+
+        @Override
+        public String toString()
+        {
+            return "EpochState{" + epoch + '}';
+        }
+    }
+
+    @VisibleForTesting
+    public abstract static class AbstractEpochHistory<EpochState extends AbstractEpochState>
+    {
+        // TODO (low priority): move pendingEpochs / FetchTopology into here?
+        private List<EpochState> epochs = new ArrayList<>();
+
+        protected long lastReceived = 0;
+        protected long lastAcknowledged = 0;
+
+        protected abstract EpochState createEpochState(long epoch);
+
+        public long minEpoch()
+        {
+            return epochs.isEmpty() ? 0L : epochs.get(0).epoch;
+        }
+
+        public long maxEpoch()
+        {
+            int size = epochs.size();
+            return size == 0 ? 0L : epochs.get(size - 1).epoch;
+        }
+
+        @VisibleForTesting
+        EpochState atIndex(int idx)
+        {
+            return epochs.get(idx);
+        }
+
+        @VisibleForTesting
+        int size()
+        {
+            return epochs.size();
+        }
+
+        EpochState getOrCreate(long epoch)
+        {
+            Invariants.checkArgument(epoch > 0);
+            if (epochs.isEmpty())
+            {
+                EpochState state = createEpochState(epoch);
+                epochs.add(state);
+                return state;
+            }
+
+            long minEpoch = minEpoch();
+            if (epoch < minEpoch)
+            {
+                int prepend = Ints.checkedCast(minEpoch - epoch);
+                List<EpochState> next = new ArrayList<>(epochs.size() + prepend);
+                for (long addEpoch=epoch; addEpoch<minEpoch; addEpoch++)
+                    next.add(createEpochState(addEpoch));
+                next.addAll(epochs);
+                epochs = next;
+                minEpoch = minEpoch();
+                Invariants.checkState(minEpoch == epoch);
+            }
+            long maxEpoch = maxEpoch();
+            int idx = Ints.checkedCast(epoch - minEpoch);
+
+            // add any missing epochs
+            for (long addEpoch = maxEpoch + 1; addEpoch <= epoch; addEpoch++)
+                epochs.add(createEpochState(addEpoch));
+
+            return epochs.get(idx);
+        }
+
+        public void receive(Topology topology)
+        {
+            long epoch = topology.epoch();
+            Invariants.checkState(lastReceived == epoch - 1 || epoch == 0 || lastReceived == 0);
+            lastReceived = epoch;
+            EpochState state = getOrCreate(epoch);
+            if (state != null)
+            {
+                state.topology = topology;
+                state.received.setSuccess(topology);
+            }
+        }
+
+        AsyncResult<Topology> receiveFuture(long epoch)
+        {
+            return getOrCreate(epoch).received;
+        }
+
+        Topology topologyFor(long epoch)
+        {
+            return getOrCreate(epoch).topology;
+        }
+
+        public void acknowledge(EpochReady ready)
+        {
+            long epoch = ready.epoch;
+            Invariants.checkState(lastAcknowledged == epoch - 1 || epoch == 0 || lastAcknowledged == 0);
+            lastAcknowledged = epoch;
+            EpochState state = getOrCreate(epoch);
+            state.reads = ready.reads;
+            state.acknowledged.setSuccess(null);

Review Comment:
   think this fails on double `set*` calls right?  For TCM do we have exactly once semantics?  Can it retry to make sure the notification is idempotent?



##########
accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java:
##########
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.api.Data;
+import accord.api.DataStore;
+import accord.coordinate.FetchCoordinator;
+import accord.local.CommandStore;
+import accord.local.Node;
+import accord.local.Status;
+import accord.messages.Callback;
+import accord.messages.MessageType;
+import accord.messages.ReadData;
+import accord.messages.WaitAndReadData;
+import accord.primitives.PartialDeps;
+import accord.primitives.PartialTxn;
+import accord.primitives.Ranges;
+import accord.primitives.SyncPoint;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.utils.Invariants;
+import accord.utils.async.AsyncChains;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults;
+
+import static accord.primitives.Routables.Slice.Minimal;
+
+public abstract class AbstractFetchCoordinator extends FetchCoordinator
+{
+    private static final Logger logger = LoggerFactory.getLogger(AbstractFetchCoordinator.class);
+
+    static class FetchResult extends AsyncResults.SettableResult<Ranges> implements DataStore.FetchResult
+    {
+        final AbstractFetchCoordinator coordinator;
+
+        FetchResult(AbstractFetchCoordinator coordinator)
+        {
+            this.coordinator = coordinator;
+        }
+
+        @Override
+        public void abort(Ranges abort)
+        {
+            coordinator.abort(abort);
+        }
+    }
+
+    static class Key
+    {
+        final Node.Id id;
+        final Ranges ranges;
+
+        Key(Node.Id id, Ranges ranges)
+        {
+            this.id = id;
+            this.ranges = ranges;
+        }
+
+        @Override
+        public int hashCode()
+                         {
+                            return id.hashCode() + ranges.hashCode();
+                                                                     }
+
+        @Override
+        public boolean equals(Object obj)
+        {
+            if (this == obj) return true;
+            if (!(obj instanceof Key)) return false;
+            Key that = (Key) obj;
+            return id.equals(that.id) && ranges.equals(that.ranges);
+        }
+    }
+
+    final DataStore.FetchRanges fetchRanges;
+    final CommandStore commandStore;
+    final Map<Key, DataStore.StartingRangeFetch> inflight = new HashMap<>();
+    final FetchResult result = new FetchResult(this);
+    final List<AsyncResult<Void>> persisting = new ArrayList<>();
+
+    protected AbstractFetchCoordinator(Node node, Ranges ranges, SyncPoint syncPoint, DataStore.FetchRanges fetchRanges, CommandStore commandStore)
+    {
+        super(node, ranges, syncPoint, fetchRanges);
+        this.fetchRanges = fetchRanges;
+        this.commandStore = commandStore;
+    }
+
+    protected abstract PartialTxn rangeReadTxn(Ranges ranges);
+
+    protected abstract void onReadOk(Node.Id from, CommandStore commandStore, Data data, Ranges ranges);
+
+    @Override
+    public void contact(Node.Id to, Ranges ranges)
+    {
+        Key key = new Key(to, ranges);
+        inflight.put(key, starting(to, ranges));
+        Ranges ownedRanges = ownedRangesForNode(to);
+        Invariants.checkArgument(ownedRanges.containsAll(ranges));
+        PartialDeps partialDeps = syncPoint.waitFor.slice(ownedRanges, ranges);
+        node.send(to, new FetchRequest(syncPoint.sourceEpoch(), syncPoint.syncId, ranges, partialDeps, rangeReadTxn(ranges)), new Callback<ReadData.ReadReply>()
+        {
+            @Override
+            public void onSuccess(Node.Id from, ReadData.ReadReply reply)
+            {
+                if (!reply.isOk())
+                {
+                    fail(to, new RuntimeException(reply.toString()));
+                    inflight.remove(key).cancel();
+                    switch ((ReadData.ReadNack) reply)
+                    {
+                        default: throw new AssertionError("Unhandled enum");
+                        case Invalid:
+                        case Redundant:
+                        case NotCommitted:
+                            throw new AssertionError();

Review Comment:
   ```suggestion
                               throw new AssertionError(String.format("Unexpected reply: %s", reply));
   ```



##########
accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java:
##########
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.api.Data;
+import accord.api.DataStore;
+import accord.coordinate.FetchCoordinator;
+import accord.local.CommandStore;
+import accord.local.Node;
+import accord.local.Status;
+import accord.messages.Callback;
+import accord.messages.MessageType;
+import accord.messages.ReadData;
+import accord.messages.WaitAndReadData;
+import accord.primitives.PartialDeps;
+import accord.primitives.PartialTxn;
+import accord.primitives.Ranges;
+import accord.primitives.SyncPoint;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.utils.Invariants;
+import accord.utils.async.AsyncChains;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults;
+
+import static accord.primitives.Routables.Slice.Minimal;
+
+public abstract class AbstractFetchCoordinator extends FetchCoordinator
+{
+    private static final Logger logger = LoggerFactory.getLogger(AbstractFetchCoordinator.class);
+
+    static class FetchResult extends AsyncResults.SettableResult<Ranges> implements DataStore.FetchResult
+    {
+        final AbstractFetchCoordinator coordinator;
+
+        FetchResult(AbstractFetchCoordinator coordinator)
+        {
+            this.coordinator = coordinator;
+        }
+
+        @Override
+        public void abort(Ranges abort)
+        {
+            coordinator.abort(abort);
+        }
+    }
+
+    static class Key
+    {
+        final Node.Id id;
+        final Ranges ranges;
+
+        Key(Node.Id id, Ranges ranges)
+        {
+            this.id = id;
+            this.ranges = ranges;
+        }
+
+        @Override
+        public int hashCode()
+                         {
+                            return id.hashCode() + ranges.hashCode();
+                                                                     }
+
+        @Override
+        public boolean equals(Object obj)
+        {
+            if (this == obj) return true;
+            if (!(obj instanceof Key)) return false;
+            Key that = (Key) obj;
+            return id.equals(that.id) && ranges.equals(that.ranges);
+        }
+    }
+
+    final DataStore.FetchRanges fetchRanges;
+    final CommandStore commandStore;
+    final Map<Key, DataStore.StartingRangeFetch> inflight = new HashMap<>();
+    final FetchResult result = new FetchResult(this);
+    final List<AsyncResult<Void>> persisting = new ArrayList<>();
+
+    protected AbstractFetchCoordinator(Node node, Ranges ranges, SyncPoint syncPoint, DataStore.FetchRanges fetchRanges, CommandStore commandStore)
+    {
+        super(node, ranges, syncPoint, fetchRanges);
+        this.fetchRanges = fetchRanges;
+        this.commandStore = commandStore;
+    }
+
+    protected abstract PartialTxn rangeReadTxn(Ranges ranges);
+
+    protected abstract void onReadOk(Node.Id from, CommandStore commandStore, Data data, Ranges ranges);
+
+    @Override
+    public void contact(Node.Id to, Ranges ranges)
+    {
+        Key key = new Key(to, ranges);
+        inflight.put(key, starting(to, ranges));
+        Ranges ownedRanges = ownedRangesForNode(to);
+        Invariants.checkArgument(ownedRanges.containsAll(ranges));
+        PartialDeps partialDeps = syncPoint.waitFor.slice(ownedRanges, ranges);
+        node.send(to, new FetchRequest(syncPoint.sourceEpoch(), syncPoint.syncId, ranges, partialDeps, rangeReadTxn(ranges)), new Callback<ReadData.ReadReply>()
+        {
+            @Override
+            public void onSuccess(Node.Id from, ReadData.ReadReply reply)
+            {
+                if (!reply.isOk())
+                {
+                    fail(to, new RuntimeException(reply.toString()));
+                    inflight.remove(key).cancel();
+                    switch ((ReadData.ReadNack) reply)
+                    {
+                        default: throw new AssertionError("Unhandled enum");
+                        case Invalid:
+                        case Redundant:
+                        case NotCommitted:
+                            throw new AssertionError();
+                        case Error:
+                            // TODO (required): ensure errors are propagated to coordinators and can be logged
+                    }
+                    return;
+                }
+
+                FetchResponse ok = (FetchResponse) reply;
+                Ranges received;
+                if (ok.unavailable != null)
+                {
+                    unavailable(to, ok.unavailable);
+                    if (ok.data == null)
+                    {
+                        inflight.remove(key).cancel();
+                        return;
+                    }
+                    received = ranges.difference(ok.unavailable);
+                }
+                else
+                {
+                    received = ranges;
+                }
+
+                // TODO (now): make sure it works if invoked in either order
+                inflight.remove(key).started(ok.maxApplied);
+                onReadOk(to, commandStore, ok.data, received);
+                // received must be invoked after submitting the persistence future, as it triggers onDone
+                // which creates a ReducingFuture over {@code persisting}
+            }
+
+            @Override
+            public void onFailure(Node.Id from, Throwable failure)
+            {
+                inflight.remove(key).cancel();
+                fail(from, failure);
+            }
+
+            @Override
+            public void onCallbackFailure(Node.Id from, Throwable failure)
+            {
+                // TODO (soon)
+                logger.error("Fetch coordination failure from " + from, failure);
+            }
+        });
+    }
+
+    @Override
+    protected synchronized void success(Node.Id to, Ranges ranges)
+    {
+        super.success(to, ranges);
+    }
+
+    @Override
+    protected synchronized void fail(Node.Id to, Ranges ranges, Throwable failure)
+    {
+        super.fail(to, ranges, failure);
+    }
+
+    public FetchResult result()
+    {
+        return result;
+    }
+
+    @Override
+    protected void onDone(Ranges success, Throwable failure)
+    {
+        if (success.isEmpty()) result.setFailure(failure);
+        else if (persisting.isEmpty()) result.setSuccess(Ranges.EMPTY);
+        else AsyncChains.reduce(persisting, (a, b)-> null)

Review Comment:
   ```suggestion
           else AsyncChains.reduce(persisting, (a, b) -> null)
   ```



##########
accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java:
##########
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.api.Data;
+import accord.api.DataStore;
+import accord.coordinate.FetchCoordinator;
+import accord.local.CommandStore;
+import accord.local.Node;
+import accord.local.Status;
+import accord.messages.Callback;
+import accord.messages.MessageType;
+import accord.messages.ReadData;
+import accord.messages.WaitAndReadData;
+import accord.primitives.PartialDeps;
+import accord.primitives.PartialTxn;
+import accord.primitives.Ranges;
+import accord.primitives.SyncPoint;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.utils.Invariants;
+import accord.utils.async.AsyncChains;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults;
+
+import static accord.primitives.Routables.Slice.Minimal;
+
+public abstract class AbstractFetchCoordinator extends FetchCoordinator
+{
+    private static final Logger logger = LoggerFactory.getLogger(AbstractFetchCoordinator.class);
+
+    static class FetchResult extends AsyncResults.SettableResult<Ranges> implements DataStore.FetchResult
+    {
+        final AbstractFetchCoordinator coordinator;
+
+        FetchResult(AbstractFetchCoordinator coordinator)
+        {
+            this.coordinator = coordinator;
+        }
+
+        @Override
+        public void abort(Ranges abort)
+        {
+            coordinator.abort(abort);
+        }
+    }
+
+    static class Key
+    {
+        final Node.Id id;
+        final Ranges ranges;
+
+        Key(Node.Id id, Ranges ranges)
+        {
+            this.id = id;
+            this.ranges = ranges;
+        }
+
+        @Override
+        public int hashCode()
+                         {
+                            return id.hashCode() + ranges.hashCode();
+                                                                     }
+
+        @Override
+        public boolean equals(Object obj)
+        {
+            if (this == obj) return true;
+            if (!(obj instanceof Key)) return false;
+            Key that = (Key) obj;
+            return id.equals(that.id) && ranges.equals(that.ranges);
+        }
+    }
+
+    final DataStore.FetchRanges fetchRanges;
+    final CommandStore commandStore;
+    final Map<Key, DataStore.StartingRangeFetch> inflight = new HashMap<>();
+    final FetchResult result = new FetchResult(this);
+    final List<AsyncResult<Void>> persisting = new ArrayList<>();
+
+    protected AbstractFetchCoordinator(Node node, Ranges ranges, SyncPoint syncPoint, DataStore.FetchRanges fetchRanges, CommandStore commandStore)
+    {
+        super(node, ranges, syncPoint, fetchRanges);
+        this.fetchRanges = fetchRanges;
+        this.commandStore = commandStore;
+    }
+
+    protected abstract PartialTxn rangeReadTxn(Ranges ranges);
+
+    protected abstract void onReadOk(Node.Id from, CommandStore commandStore, Data data, Ranges ranges);
+
+    @Override
+    public void contact(Node.Id to, Ranges ranges)
+    {
+        Key key = new Key(to, ranges);
+        inflight.put(key, starting(to, ranges));
+        Ranges ownedRanges = ownedRangesForNode(to);
+        Invariants.checkArgument(ownedRanges.containsAll(ranges));
+        PartialDeps partialDeps = syncPoint.waitFor.slice(ownedRanges, ranges);
+        node.send(to, new FetchRequest(syncPoint.sourceEpoch(), syncPoint.syncId, ranges, partialDeps, rangeReadTxn(ranges)), new Callback<ReadData.ReadReply>()
+        {
+            @Override
+            public void onSuccess(Node.Id from, ReadData.ReadReply reply)
+            {
+                if (!reply.isOk())
+                {
+                    fail(to, new RuntimeException(reply.toString()));
+                    inflight.remove(key).cancel();
+                    switch ((ReadData.ReadNack) reply)
+                    {
+                        default: throw new AssertionError("Unhandled enum");
+                        case Invalid:
+                        case Redundant:
+                        case NotCommitted:
+                            throw new AssertionError();
+                        case Error:
+                            // TODO (required): ensure errors are propagated to coordinators and can be logged
+                    }
+                    return;
+                }
+
+                FetchResponse ok = (FetchResponse) reply;
+                Ranges received;
+                if (ok.unavailable != null)
+                {
+                    unavailable(to, ok.unavailable);
+                    if (ok.data == null)
+                    {
+                        inflight.remove(key).cancel();
+                        return;
+                    }
+                    received = ranges.difference(ok.unavailable);
+                }
+                else
+                {
+                    received = ranges;
+                }
+
+                // TODO (now): make sure it works if invoked in either order
+                inflight.remove(key).started(ok.maxApplied);
+                onReadOk(to, commandStore, ok.data, received);
+                // received must be invoked after submitting the persistence future, as it triggers onDone
+                // which creates a ReducingFuture over {@code persisting}
+            }
+
+            @Override
+            public void onFailure(Node.Id from, Throwable failure)
+            {
+                inflight.remove(key).cancel();
+                fail(from, failure);
+            }
+
+            @Override
+            public void onCallbackFailure(Node.Id from, Throwable failure)
+            {
+                // TODO (soon)
+                logger.error("Fetch coordination failure from " + from, failure);
+            }
+        });
+    }
+
+    @Override
+    protected synchronized void success(Node.Id to, Ranges ranges)
+    {
+        super.success(to, ranges);
+    }
+
+    @Override
+    protected synchronized void fail(Node.Id to, Ranges ranges, Throwable failure)
+    {
+        super.fail(to, ranges, failure);
+    }
+
+    public FetchResult result()
+    {
+        return result;
+    }
+
+    @Override
+    protected void onDone(Ranges success, Throwable failure)
+    {
+        if (success.isEmpty()) result.setFailure(failure);
+        else if (persisting.isEmpty()) result.setSuccess(Ranges.EMPTY);
+        else AsyncChains.reduce(persisting, (a, b)-> null)
+                        .begin((s, f) -> {
+                            if (f == null) result.setSuccess(ranges);
+                            else result.setFailure(f);
+                        });
+    }
+
+    @Override
+    public void start()
+                      {
+                         super.start();
+                                       }

Review Comment:
   formatting



##########
accord-core/src/main/java/accord/topology/TopologyManager.java:
##########
@@ -174,6 +186,16 @@ public long nextEpoch()
             return current().epoch + 1;
         }
 
+        public long minEpoch()
+        {
+            return currentEpoch - epochs.length + 1;

Review Comment:
   if empty isn't this `0 - 0 + 1 = 1`, and should be `0`?



##########
accord-core/src/test/java/accord/messages/PreAcceptTest.java:
##########
@@ -196,7 +196,8 @@ void multiKeyTimestampUpdate() throws ExecutionException
             messageSink.assertHistorySizes(0, 1);
             Assertions.assertEquals(ID3, messageSink.responses.get(0).to);
             PartialDeps expectedDeps = new PartialDeps(Ranges.of(range(0, 12)), KeyDeps.NONE, RangeDeps.NONE);
-            Assertions.assertEquals(new PreAccept.PreAcceptOk(txnId2, Timestamp.fromValues(1, 110, ID1), expectedDeps),
+            Timestamp expectedTs = Timestamp.fromValues(1, 110, ID1).withExtraFlags(txnId2.flags());

Review Comment:
   what change caused this?  I get the following locally (only changed this test, nothing else)
   
   ```
   Expected :PreAcceptOk{txnId:[1,50,2,3], witnessedAt:[1,110,2,1], deps:[Range(0, 12]]:{}, {}}
   Actual   :PreAcceptOk{txnId:[1,50,2,3], witnessedAt:[1,110,0,1], deps:[Range(0, 12]]:{}, {}}
   ```



##########
accord-core/src/main/java/accord/topology/TopologyManager.java:
##########
@@ -274,6 +299,22 @@ public void onEpochSyncComplete(Id node, long epoch)
         epochs.syncComplete(node, epoch);
     }
 
+    public synchronized void truncateTopologyUntil(long epoch)
+    {
+        Epochs current = epochs;
+        checkArgument(current.epoch() >= epoch);
+
+        if (current.minEpoch() >= epoch)
+            return;
+
+        int newLen = current.epochs.length - (int) (epoch - current.minEpoch());
+        Invariants.checkState(current.epochs[newLen - 1].syncComplete());
+
+        EpochState[] nextEpochs = new EpochState[newLen];
+        System.arraycopy(current.epochs, 0, nextEpochs, 0, newLen);
+        epochs = new Epochs(nextEpochs, current.pendingSyncComplete, current.futureEpochFutures);

Review Comment:
   nit: `java.util.Arrays#copyOfRange(U[], int, int, java.lang.Class<? extends T[]>)`?



##########
accord-core/src/main/java/accord/impl/AbstractConfigurationService.java:
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.primitives.Ints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.api.ConfigurationService;
+import accord.local.Node;
+import accord.topology.Topology;
+import accord.utils.Invariants;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults;
+
+public abstract class AbstractConfigurationService<EpochState extends AbstractConfigurationService.AbstractEpochState,
+                                                   EpochHistory extends AbstractConfigurationService.AbstractEpochHistory<EpochState>>
+                      implements ConfigurationService
+{
+    private static final Logger logger = LoggerFactory.getLogger(AbstractConfigurationService.class);
+
+    protected final Node.Id localId;
+
+    protected final EpochHistory epochs = createEpochHistory();
+
+    protected final List<Listener> listeners = new ArrayList<>();
+
+    public abstract static class AbstractEpochState
+    {
+        protected final long epoch;
+        protected final AsyncResult.Settable<Topology> received = AsyncResults.settable();
+        protected final AsyncResult.Settable<Void> acknowledged = AsyncResults.settable();
+        protected AsyncResult<Void> reads = null;
+
+        protected Topology topology = null;
+
+        public AbstractEpochState(long epoch)
+        {
+            this.epoch = epoch;
+        }
+
+        public long epoch()
+        {
+            return epoch;
+        }
+
+        @Override
+        public String toString()
+        {
+            return "EpochState{" + epoch + '}';
+        }
+    }
+
+    @VisibleForTesting
+    public abstract static class AbstractEpochHistory<EpochState extends AbstractEpochState>
+    {
+        // TODO (low priority): move pendingEpochs / FetchTopology into here?
+        private List<EpochState> epochs = new ArrayList<>();
+
+        protected long lastReceived = 0;
+        protected long lastAcknowledged = 0;
+
+        protected abstract EpochState createEpochState(long epoch);
+
+        public long minEpoch()
+        {
+            return epochs.isEmpty() ? 0L : epochs.get(0).epoch;
+        }
+
+        public long maxEpoch()
+        {
+            int size = epochs.size();
+            return size == 0 ? 0L : epochs.get(size - 1).epoch;
+        }
+
+        @VisibleForTesting
+        EpochState atIndex(int idx)
+        {
+            return epochs.get(idx);
+        }
+
+        @VisibleForTesting
+        int size()
+        {
+            return epochs.size();
+        }
+
+        EpochState getOrCreate(long epoch)
+        {
+            Invariants.checkArgument(epoch > 0);
+            if (epochs.isEmpty())
+            {
+                EpochState state = createEpochState(epoch);
+                epochs.add(state);
+                return state;
+            }
+
+            long minEpoch = minEpoch();
+            if (epoch < minEpoch)
+            {
+                int prepend = Ints.checkedCast(minEpoch - epoch);
+                List<EpochState> next = new ArrayList<>(epochs.size() + prepend);
+                for (long addEpoch=epoch; addEpoch<minEpoch; addEpoch++)
+                    next.add(createEpochState(addEpoch));
+                next.addAll(epochs);
+                epochs = next;
+                minEpoch = minEpoch();
+                Invariants.checkState(minEpoch == epoch);
+            }
+            long maxEpoch = maxEpoch();
+            int idx = Ints.checkedCast(epoch - minEpoch);
+
+            // add any missing epochs
+            for (long addEpoch = maxEpoch + 1; addEpoch <= epoch; addEpoch++)
+                epochs.add(createEpochState(addEpoch));
+
+            return epochs.get(idx);
+        }
+
+        public void receive(Topology topology)
+        {
+            long epoch = topology.epoch();
+            Invariants.checkState(lastReceived == epoch - 1 || epoch == 0 || lastReceived == 0);
+            lastReceived = epoch;
+            EpochState state = getOrCreate(epoch);
+            if (state != null)
+            {
+                state.topology = topology;
+                state.received.setSuccess(topology);
+            }
+        }
+
+        AsyncResult<Topology> receiveFuture(long epoch)
+        {
+            return getOrCreate(epoch).received;
+        }
+
+        Topology topologyFor(long epoch)
+        {
+            return getOrCreate(epoch).topology;
+        }
+
+        public void acknowledge(EpochReady ready)
+        {
+            long epoch = ready.epoch;
+            Invariants.checkState(lastAcknowledged == epoch - 1 || epoch == 0 || lastAcknowledged == 0);
+            lastAcknowledged = epoch;
+            EpochState state = getOrCreate(epoch);
+            state.reads = ready.reads;

Review Comment:
   there are no locks here (yet, still looking at this one class)... what is the expected threading model?  If this is single producer multi consumer, then the `setSuccess` should publish this pointer update, but access would need a volatile read first to fetch the latest state; but we do `synchronized` instead
   
   I only ask due to this being `public`



##########
accord-core/src/main/java/accord/topology/TopologyManager.java:
##########
@@ -274,6 +299,22 @@ public void onEpochSyncComplete(Id node, long epoch)
         epochs.syncComplete(node, epoch);
     }
 
+    public synchronized void truncateTopologyUntil(long epoch)
+    {
+        Epochs current = epochs;
+        checkArgument(current.epoch() >= epoch);
+
+        if (current.minEpoch() >= epoch)
+            return;
+
+        int newLen = current.epochs.length - (int) (epoch - current.minEpoch());
+        Invariants.checkState(current.epochs[newLen - 1].syncComplete());

Review Comment:
   ```suggestion
           Invariants.checkState(current.epochs[newLen - 1].syncComplete(), "Epoch %d's sync is not complete", current.epochs[newLen - 1].epoch);
   ```
   ```suggestion
           Invariants.checkState(current.epochs[newLen - 1].syncComplete());
   ```



##########
accord-core/src/main/java/accord/impl/AbstractConfigurationService.java:
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.primitives.Ints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.api.ConfigurationService;
+import accord.local.Node;
+import accord.topology.Topology;
+import accord.utils.Invariants;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults;
+
+public abstract class AbstractConfigurationService<EpochState extends AbstractConfigurationService.AbstractEpochState,
+                                                   EpochHistory extends AbstractConfigurationService.AbstractEpochHistory<EpochState>>
+                      implements ConfigurationService
+{
+    private static final Logger logger = LoggerFactory.getLogger(AbstractConfigurationService.class);
+
+    protected final Node.Id localId;
+
+    protected final EpochHistory epochs = createEpochHistory();
+
+    protected final List<Listener> listeners = new ArrayList<>();
+
+    public abstract static class AbstractEpochState
+    {
+        protected final long epoch;
+        protected final AsyncResult.Settable<Topology> received = AsyncResults.settable();
+        protected final AsyncResult.Settable<Void> acknowledged = AsyncResults.settable();
+        protected AsyncResult<Void> reads = null;
+
+        protected Topology topology = null;
+
+        public AbstractEpochState(long epoch)
+        {
+            this.epoch = epoch;
+        }
+
+        public long epoch()
+        {
+            return epoch;
+        }
+
+        @Override
+        public String toString()
+        {
+            return "EpochState{" + epoch + '}';
+        }
+    }
+
+    @VisibleForTesting
+    public abstract static class AbstractEpochHistory<EpochState extends AbstractEpochState>
+    {
+        // TODO (low priority): move pendingEpochs / FetchTopology into here?
+        private List<EpochState> epochs = new ArrayList<>();
+
+        protected long lastReceived = 0;
+        protected long lastAcknowledged = 0;
+
+        protected abstract EpochState createEpochState(long epoch);
+
+        public long minEpoch()
+        {
+            return epochs.isEmpty() ? 0L : epochs.get(0).epoch;
+        }
+
+        public long maxEpoch()
+        {
+            int size = epochs.size();
+            return size == 0 ? 0L : epochs.get(size - 1).epoch;
+        }
+
+        @VisibleForTesting
+        EpochState atIndex(int idx)
+        {
+            return epochs.get(idx);
+        }
+
+        @VisibleForTesting
+        int size()
+        {
+            return epochs.size();
+        }
+
+        EpochState getOrCreate(long epoch)
+        {
+            Invariants.checkArgument(epoch > 0);
+            if (epochs.isEmpty())
+            {
+                EpochState state = createEpochState(epoch);
+                epochs.add(state);
+                return state;
+            }
+
+            long minEpoch = minEpoch();
+            if (epoch < minEpoch)
+            {
+                int prepend = Ints.checkedCast(minEpoch - epoch);
+                List<EpochState> next = new ArrayList<>(epochs.size() + prepend);
+                for (long addEpoch=epoch; addEpoch<minEpoch; addEpoch++)
+                    next.add(createEpochState(addEpoch));
+                next.addAll(epochs);
+                epochs = next;
+                minEpoch = minEpoch();
+                Invariants.checkState(minEpoch == epoch);
+            }
+            long maxEpoch = maxEpoch();
+            int idx = Ints.checkedCast(epoch - minEpoch);
+
+            // add any missing epochs
+            for (long addEpoch = maxEpoch + 1; addEpoch <= epoch; addEpoch++)
+                epochs.add(createEpochState(addEpoch));
+
+            return epochs.get(idx);
+        }
+
+        public void receive(Topology topology)
+        {
+            long epoch = topology.epoch();
+            Invariants.checkState(lastReceived == epoch - 1 || epoch == 0 || lastReceived == 0);
+            lastReceived = epoch;
+            EpochState state = getOrCreate(epoch);
+            if (state != null)
+            {
+                state.topology = topology;
+                state.received.setSuccess(topology);
+            }
+        }
+
+        AsyncResult<Topology> receiveFuture(long epoch)
+        {
+            return getOrCreate(epoch).received;
+        }
+
+        Topology topologyFor(long epoch)
+        {
+            return getOrCreate(epoch).topology;
+        }
+
+        public void acknowledge(EpochReady ready)
+        {
+            long epoch = ready.epoch;
+            Invariants.checkState(lastAcknowledged == epoch - 1 || epoch == 0 || lastAcknowledged == 0);
+            lastAcknowledged = epoch;
+            EpochState state = getOrCreate(epoch);
+            state.reads = ready.reads;
+            state.acknowledged.setSuccess(null);
+        }
+
+        AsyncResult<Void> acknowledgeFuture(long epoch)
+        {
+            return getOrCreate(epoch).acknowledged;
+        }
+
+        void truncateUntil(long epoch)
+        {
+            Invariants.checkArgument(epoch <= maxEpoch());
+            long minEpoch = minEpoch();
+            int toTrim = Ints.checkedCast(epoch - minEpoch);
+            if (toTrim <=0)
+                return;
+
+            epochs = new ArrayList<>(epochs.subList(toTrim, epochs.size()));
+        }
+    }
+
+    public AbstractConfigurationService(Node.Id localId)
+    {
+        this.localId = localId;
+    }
+
+    protected abstract EpochHistory createEpochHistory();
+
+    protected EpochState getOrCreateEpochState(long epoch)
+    {
+        return epochs.getOrCreate(epoch);
+    }
+
+    @Override
+    public synchronized void registerListener(Listener listener)
+    {
+        listeners.add(listener);
+    }
+
+    @Override
+    public synchronized Topology currentTopology()
+    {
+        return epochs.topologyFor(epochs.lastReceived);
+    }
+
+    @Override
+    public synchronized Topology getTopologyForEpoch(long epoch)
+    {
+        return epochs.topologyFor(epoch);
+    }
+
+    protected abstract void fetchTopologyInternal(long epoch);
+
+    @Override
+    public synchronized void fetchTopologyForEpoch(long epoch)
+    {
+        if (epoch <= epochs.lastReceived)
+            return;
+
+        fetchTopologyInternal(epoch);
+    }
+
+    protected abstract void localSyncComplete(Topology topology);
+
+    @Override
+    public synchronized void acknowledgeEpoch(EpochReady ready)
+    {
+        ready.metadata.addCallback(() -> epochs.acknowledge(ready));
+        ready.coordination.addCallback(() ->  localSyncComplete(epochs.getOrCreate(ready.epoch).topology));

Review Comment:
   this only triggers if the result was success, but should we have a state for failed bootstrap?  If we failed, what happens if a remote peer uses this epoch?



##########
accord-core/src/main/java/accord/local/CommandStores.java:
##########
@@ -364,7 +364,7 @@ private synchronized TopologyUpdate updateTopology(Node node, Snapshot prev, Top
                 bootstrapUpdates.add(shard.store.interruptBootstraps(epoch, newRanges.currentRanges()));
             }
             // TODO (desired): only sync affected shards
-            if (epoch > 1)
+            if (epoch > 1 && startSync)

Review Comment:
   I feel the `startSync` is to work around the bug where we don't want to bootstrap when a keyspace is added?  If so I added a new `accord.local.CommandStores#shouldBootstrap` for that... 
   
   One issue to deal with is that we could have a new keyspace at the same time we see a range movement; so we need to bootstrap the range movement but not the new keyspace



##########
accord-core/src/main/java/accord/impl/AbstractConfigurationService.java:
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.primitives.Ints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.api.ConfigurationService;
+import accord.local.Node;
+import accord.topology.Topology;
+import accord.utils.Invariants;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults;
+
+public abstract class AbstractConfigurationService<EpochState extends AbstractConfigurationService.AbstractEpochState,
+                                                   EpochHistory extends AbstractConfigurationService.AbstractEpochHistory<EpochState>>
+                      implements ConfigurationService
+{
+    private static final Logger logger = LoggerFactory.getLogger(AbstractConfigurationService.class);
+
+    protected final Node.Id localId;
+
+    protected final EpochHistory epochs = createEpochHistory();
+
+    protected final List<Listener> listeners = new ArrayList<>();
+
+    public abstract static class AbstractEpochState
+    {
+        protected final long epoch;
+        protected final AsyncResult.Settable<Topology> received = AsyncResults.settable();
+        protected final AsyncResult.Settable<Void> acknowledged = AsyncResults.settable();
+        protected AsyncResult<Void> reads = null;
+
+        protected Topology topology = null;
+
+        public AbstractEpochState(long epoch)
+        {
+            this.epoch = epoch;
+        }
+
+        public long epoch()
+        {
+            return epoch;
+        }
+
+        @Override
+        public String toString()
+        {
+            return "EpochState{" + epoch + '}';
+        }
+    }
+
+    @VisibleForTesting
+    public abstract static class AbstractEpochHistory<EpochState extends AbstractEpochState>
+    {
+        // TODO (low priority): move pendingEpochs / FetchTopology into here?
+        private List<EpochState> epochs = new ArrayList<>();
+
+        protected long lastReceived = 0;
+        protected long lastAcknowledged = 0;
+
+        protected abstract EpochState createEpochState(long epoch);
+
+        public long minEpoch()
+        {
+            return epochs.isEmpty() ? 0L : epochs.get(0).epoch;
+        }
+
+        public long maxEpoch()
+        {
+            int size = epochs.size();
+            return size == 0 ? 0L : epochs.get(size - 1).epoch;
+        }
+
+        @VisibleForTesting
+        EpochState atIndex(int idx)
+        {
+            return epochs.get(idx);
+        }
+
+        @VisibleForTesting
+        int size()
+        {
+            return epochs.size();
+        }
+
+        EpochState getOrCreate(long epoch)
+        {
+            Invariants.checkArgument(epoch > 0);
+            if (epochs.isEmpty())
+            {
+                EpochState state = createEpochState(epoch);
+                epochs.add(state);
+                return state;
+            }
+
+            long minEpoch = minEpoch();
+            if (epoch < minEpoch)
+            {
+                int prepend = Ints.checkedCast(minEpoch - epoch);
+                List<EpochState> next = new ArrayList<>(epochs.size() + prepend);
+                for (long addEpoch=epoch; addEpoch<minEpoch; addEpoch++)
+                    next.add(createEpochState(addEpoch));
+                next.addAll(epochs);
+                epochs = next;
+                minEpoch = minEpoch();
+                Invariants.checkState(minEpoch == epoch);
+            }
+            long maxEpoch = maxEpoch();
+            int idx = Ints.checkedCast(epoch - minEpoch);
+
+            // add any missing epochs
+            for (long addEpoch = maxEpoch + 1; addEpoch <= epoch; addEpoch++)
+                epochs.add(createEpochState(addEpoch));
+
+            return epochs.get(idx);
+        }
+
+        public void receive(Topology topology)
+        {
+            long epoch = topology.epoch();
+            Invariants.checkState(lastReceived == epoch - 1 || epoch == 0 || lastReceived == 0);
+            lastReceived = epoch;
+            EpochState state = getOrCreate(epoch);
+            if (state != null)
+            {
+                state.topology = topology;
+                state.received.setSuccess(topology);
+            }
+        }
+
+        AsyncResult<Topology> receiveFuture(long epoch)
+        {
+            return getOrCreate(epoch).received;
+        }
+
+        Topology topologyFor(long epoch)
+        {
+            return getOrCreate(epoch).topology;
+        }
+
+        public void acknowledge(EpochReady ready)
+        {
+            long epoch = ready.epoch;
+            Invariants.checkState(lastAcknowledged == epoch - 1 || epoch == 0 || lastAcknowledged == 0);
+            lastAcknowledged = epoch;
+            EpochState state = getOrCreate(epoch);
+            state.reads = ready.reads;
+            state.acknowledged.setSuccess(null);
+        }
+
+        AsyncResult<Void> acknowledgeFuture(long epoch)
+        {
+            return getOrCreate(epoch).acknowledged;
+        }
+
+        void truncateUntil(long epoch)
+        {
+            Invariants.checkArgument(epoch <= maxEpoch());
+            long minEpoch = minEpoch();
+            int toTrim = Ints.checkedCast(epoch - minEpoch);
+            if (toTrim <=0)
+                return;
+
+            epochs = new ArrayList<>(epochs.subList(toTrim, epochs.size()));
+        }
+    }
+
+    public AbstractConfigurationService(Node.Id localId)
+    {
+        this.localId = localId;
+    }
+
+    protected abstract EpochHistory createEpochHistory();
+
+    protected EpochState getOrCreateEpochState(long epoch)
+    {
+        return epochs.getOrCreate(epoch);
+    }
+
+    @Override
+    public synchronized void registerListener(Listener listener)
+    {
+        listeners.add(listener);
+    }
+
+    @Override
+    public synchronized Topology currentTopology()
+    {
+        return epochs.topologyFor(epochs.lastReceived);
+    }
+
+    @Override
+    public synchronized Topology getTopologyForEpoch(long epoch)
+    {
+        return epochs.topologyFor(epoch);
+    }
+
+    protected abstract void fetchTopologyInternal(long epoch);
+
+    @Override
+    public synchronized void fetchTopologyForEpoch(long epoch)
+    {
+        if (epoch <= epochs.lastReceived)
+            return;
+
+        fetchTopologyInternal(epoch);
+    }
+
+    protected abstract void localSyncComplete(Topology topology);
+
+    @Override
+    public synchronized void acknowledgeEpoch(EpochReady ready)
+    {
+        ready.metadata.addCallback(() -> epochs.acknowledge(ready));
+        ready.coordination.addCallback(() ->  localSyncComplete(epochs.getOrCreate(ready.epoch).topology));
+    }
+
+    protected void topologyUpdatePreListenerNotify(Topology topology) {}
+    protected void topologyUpdatePostListenerNotify(Topology topology) {}
+
+    public synchronized AsyncResult<Void> reportTopology(Topology topology, boolean startSync)
+    {
+        long lastReceived = epochs.lastReceived;
+        if (topology.epoch() <= lastReceived)
+            return AsyncResults.success(null);
+
+        if (lastReceived > 0 && topology.epoch() > lastReceived + 1)
+        {
+            fetchTopologyForEpoch(lastReceived + 1);
+            epochs.receiveFuture(lastReceived + 1).addCallback(() -> reportTopology(topology, startSync));

Review Comment:
   if `latestRecieved = 3` and `epoch() = 5`, then this would "fetch" `epoch = 4` but not `epoch = 5`?



##########
accord-core/src/main/java/accord/impl/AbstractConfigurationService.java:
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.primitives.Ints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.api.ConfigurationService;
+import accord.local.Node;
+import accord.topology.Topology;
+import accord.utils.Invariants;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults;
+
+public abstract class AbstractConfigurationService<EpochState extends AbstractConfigurationService.AbstractEpochState,
+                                                   EpochHistory extends AbstractConfigurationService.AbstractEpochHistory<EpochState>>
+                      implements ConfigurationService
+{
+    private static final Logger logger = LoggerFactory.getLogger(AbstractConfigurationService.class);
+
+    protected final Node.Id localId;
+
+    protected final EpochHistory epochs = createEpochHistory();
+
+    protected final List<Listener> listeners = new ArrayList<>();
+
+    public abstract static class AbstractEpochState
+    {
+        protected final long epoch;
+        protected final AsyncResult.Settable<Topology> received = AsyncResults.settable();
+        protected final AsyncResult.Settable<Void> acknowledged = AsyncResults.settable();
+        protected AsyncResult<Void> reads = null;
+
+        protected Topology topology = null;
+
+        public AbstractEpochState(long epoch)
+        {
+            this.epoch = epoch;
+        }
+
+        public long epoch()
+        {
+            return epoch;
+        }
+
+        @Override
+        public String toString()
+        {
+            return "EpochState{" + epoch + '}';
+        }
+    }
+
+    @VisibleForTesting
+    public abstract static class AbstractEpochHistory<EpochState extends AbstractEpochState>
+    {
+        // TODO (low priority): move pendingEpochs / FetchTopology into here?
+        private List<EpochState> epochs = new ArrayList<>();
+
+        protected long lastReceived = 0;
+        protected long lastAcknowledged = 0;
+
+        protected abstract EpochState createEpochState(long epoch);
+
+        public long minEpoch()
+        {
+            return epochs.isEmpty() ? 0L : epochs.get(0).epoch;
+        }
+
+        public long maxEpoch()
+        {
+            int size = epochs.size();
+            return size == 0 ? 0L : epochs.get(size - 1).epoch;
+        }
+
+        @VisibleForTesting
+        EpochState atIndex(int idx)
+        {
+            return epochs.get(idx);
+        }
+
+        @VisibleForTesting
+        int size()
+        {
+            return epochs.size();
+        }
+
+        EpochState getOrCreate(long epoch)
+        {
+            Invariants.checkArgument(epoch > 0);
+            if (epochs.isEmpty())
+            {
+                EpochState state = createEpochState(epoch);
+                epochs.add(state);
+                return state;
+            }
+
+            long minEpoch = minEpoch();
+            if (epoch < minEpoch)
+            {
+                int prepend = Ints.checkedCast(minEpoch - epoch);
+                List<EpochState> next = new ArrayList<>(epochs.size() + prepend);
+                for (long addEpoch=epoch; addEpoch<minEpoch; addEpoch++)
+                    next.add(createEpochState(addEpoch));
+                next.addAll(epochs);
+                epochs = next;
+                minEpoch = minEpoch();
+                Invariants.checkState(minEpoch == epoch);
+            }
+            long maxEpoch = maxEpoch();
+            int idx = Ints.checkedCast(epoch - minEpoch);
+
+            // add any missing epochs
+            for (long addEpoch = maxEpoch + 1; addEpoch <= epoch; addEpoch++)
+                epochs.add(createEpochState(addEpoch));
+
+            return epochs.get(idx);
+        }
+
+        public void receive(Topology topology)
+        {
+            long epoch = topology.epoch();
+            Invariants.checkState(lastReceived == epoch - 1 || epoch == 0 || lastReceived == 0);
+            lastReceived = epoch;
+            EpochState state = getOrCreate(epoch);
+            if (state != null)
+            {
+                state.topology = topology;
+                state.received.setSuccess(topology);
+            }
+        }
+
+        AsyncResult<Topology> receiveFuture(long epoch)
+        {
+            return getOrCreate(epoch).received;
+        }
+
+        Topology topologyFor(long epoch)
+        {
+            return getOrCreate(epoch).topology;
+        }
+
+        public void acknowledge(EpochReady ready)
+        {
+            long epoch = ready.epoch;
+            Invariants.checkState(lastAcknowledged == epoch - 1 || epoch == 0 || lastAcknowledged == 0);
+            lastAcknowledged = epoch;
+            EpochState state = getOrCreate(epoch);
+            state.reads = ready.reads;
+            state.acknowledged.setSuccess(null);
+        }
+
+        AsyncResult<Void> acknowledgeFuture(long epoch)
+        {
+            return getOrCreate(epoch).acknowledged;
+        }
+
+        void truncateUntil(long epoch)
+        {
+            Invariants.checkArgument(epoch <= maxEpoch());

Review Comment:
   ```suggestion
               Invariants.checkArgument(epoch <= maxEpoch(), "epoch %d > %d", epoch, maxEpoch());
   ```



##########
accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java:
##########
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.api.Data;
+import accord.api.DataStore;
+import accord.coordinate.FetchCoordinator;
+import accord.local.CommandStore;
+import accord.local.Node;
+import accord.local.Status;
+import accord.messages.Callback;
+import accord.messages.MessageType;
+import accord.messages.ReadData;
+import accord.messages.WaitAndReadData;
+import accord.primitives.PartialDeps;
+import accord.primitives.PartialTxn;
+import accord.primitives.Ranges;
+import accord.primitives.SyncPoint;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.utils.Invariants;
+import accord.utils.async.AsyncChains;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults;
+
+import static accord.primitives.Routables.Slice.Minimal;
+
+public abstract class AbstractFetchCoordinator extends FetchCoordinator

Review Comment:
   this class looks like a copy/paste of `accord.impl.list.ListStore.SyncCoordinator`?  



##########
accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java:
##########
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.api.Data;
+import accord.api.DataStore;
+import accord.coordinate.FetchCoordinator;
+import accord.local.CommandStore;
+import accord.local.Node;
+import accord.local.Status;
+import accord.messages.Callback;
+import accord.messages.MessageType;
+import accord.messages.ReadData;
+import accord.messages.WaitAndReadData;
+import accord.primitives.PartialDeps;
+import accord.primitives.PartialTxn;
+import accord.primitives.Ranges;
+import accord.primitives.SyncPoint;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.utils.Invariants;
+import accord.utils.async.AsyncChains;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults;
+
+import static accord.primitives.Routables.Slice.Minimal;
+
+public abstract class AbstractFetchCoordinator extends FetchCoordinator
+{
+    private static final Logger logger = LoggerFactory.getLogger(AbstractFetchCoordinator.class);
+
+    static class FetchResult extends AsyncResults.SettableResult<Ranges> implements DataStore.FetchResult
+    {
+        final AbstractFetchCoordinator coordinator;
+
+        FetchResult(AbstractFetchCoordinator coordinator)
+        {
+            this.coordinator = coordinator;
+        }
+
+        @Override
+        public void abort(Ranges abort)
+        {
+            coordinator.abort(abort);
+        }
+    }
+
+    static class Key
+    {
+        final Node.Id id;
+        final Ranges ranges;
+
+        Key(Node.Id id, Ranges ranges)
+        {
+            this.id = id;
+            this.ranges = ranges;
+        }
+
+        @Override
+        public int hashCode()
+                         {
+                            return id.hashCode() + ranges.hashCode();
+                                                                     }
+
+        @Override
+        public boolean equals(Object obj)
+        {
+            if (this == obj) return true;
+            if (!(obj instanceof Key)) return false;
+            Key that = (Key) obj;
+            return id.equals(that.id) && ranges.equals(that.ranges);
+        }
+    }
+
+    final DataStore.FetchRanges fetchRanges;
+    final CommandStore commandStore;
+    final Map<Key, DataStore.StartingRangeFetch> inflight = new HashMap<>();
+    final FetchResult result = new FetchResult(this);
+    final List<AsyncResult<Void>> persisting = new ArrayList<>();
+
+    protected AbstractFetchCoordinator(Node node, Ranges ranges, SyncPoint syncPoint, DataStore.FetchRanges fetchRanges, CommandStore commandStore)
+    {
+        super(node, ranges, syncPoint, fetchRanges);
+        this.fetchRanges = fetchRanges;
+        this.commandStore = commandStore;
+    }
+
+    protected abstract PartialTxn rangeReadTxn(Ranges ranges);
+
+    protected abstract void onReadOk(Node.Id from, CommandStore commandStore, Data data, Ranges ranges);
+
+    @Override
+    public void contact(Node.Id to, Ranges ranges)
+    {
+        Key key = new Key(to, ranges);
+        inflight.put(key, starting(to, ranges));
+        Ranges ownedRanges = ownedRangesForNode(to);
+        Invariants.checkArgument(ownedRanges.containsAll(ranges));
+        PartialDeps partialDeps = syncPoint.waitFor.slice(ownedRanges, ranges);
+        node.send(to, new FetchRequest(syncPoint.sourceEpoch(), syncPoint.syncId, ranges, partialDeps, rangeReadTxn(ranges)), new Callback<ReadData.ReadReply>()
+        {
+            @Override
+            public void onSuccess(Node.Id from, ReadData.ReadReply reply)
+            {
+                if (!reply.isOk())
+                {
+                    fail(to, new RuntimeException(reply.toString()));
+                    inflight.remove(key).cancel();
+                    switch ((ReadData.ReadNack) reply)
+                    {
+                        default: throw new AssertionError("Unhandled enum");

Review Comment:
   ```suggestion
                           default: throw new AssertionError("Unhandled enum: " + reply);
   ```



##########
accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java:
##########
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.api.Data;
+import accord.api.DataStore;
+import accord.coordinate.FetchCoordinator;
+import accord.local.CommandStore;
+import accord.local.Node;
+import accord.local.Status;
+import accord.messages.Callback;
+import accord.messages.MessageType;
+import accord.messages.ReadData;
+import accord.messages.WaitAndReadData;
+import accord.primitives.PartialDeps;
+import accord.primitives.PartialTxn;
+import accord.primitives.Ranges;
+import accord.primitives.SyncPoint;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.utils.Invariants;
+import accord.utils.async.AsyncChains;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults;
+
+import static accord.primitives.Routables.Slice.Minimal;
+
+public abstract class AbstractFetchCoordinator extends FetchCoordinator
+{
+    private static final Logger logger = LoggerFactory.getLogger(AbstractFetchCoordinator.class);
+
+    static class FetchResult extends AsyncResults.SettableResult<Ranges> implements DataStore.FetchResult
+    {
+        final AbstractFetchCoordinator coordinator;
+
+        FetchResult(AbstractFetchCoordinator coordinator)
+        {
+            this.coordinator = coordinator;
+        }
+
+        @Override
+        public void abort(Ranges abort)
+        {
+            coordinator.abort(abort);
+        }
+    }
+
+    static class Key
+    {
+        final Node.Id id;
+        final Ranges ranges;
+
+        Key(Node.Id id, Ranges ranges)
+        {
+            this.id = id;
+            this.ranges = ranges;
+        }
+
+        @Override
+        public int hashCode()
+                         {
+                            return id.hashCode() + ranges.hashCode();
+                                                                     }
+
+        @Override
+        public boolean equals(Object obj)
+        {
+            if (this == obj) return true;
+            if (!(obj instanceof Key)) return false;
+            Key that = (Key) obj;
+            return id.equals(that.id) && ranges.equals(that.ranges);
+        }
+    }
+
+    final DataStore.FetchRanges fetchRanges;
+    final CommandStore commandStore;
+    final Map<Key, DataStore.StartingRangeFetch> inflight = new HashMap<>();
+    final FetchResult result = new FetchResult(this);
+    final List<AsyncResult<Void>> persisting = new ArrayList<>();
+
+    protected AbstractFetchCoordinator(Node node, Ranges ranges, SyncPoint syncPoint, DataStore.FetchRanges fetchRanges, CommandStore commandStore)
+    {
+        super(node, ranges, syncPoint, fetchRanges);
+        this.fetchRanges = fetchRanges;
+        this.commandStore = commandStore;
+    }
+
+    protected abstract PartialTxn rangeReadTxn(Ranges ranges);
+
+    protected abstract void onReadOk(Node.Id from, CommandStore commandStore, Data data, Ranges ranges);
+
+    @Override
+    public void contact(Node.Id to, Ranges ranges)
+    {
+        Key key = new Key(to, ranges);
+        inflight.put(key, starting(to, ranges));
+        Ranges ownedRanges = ownedRangesForNode(to);
+        Invariants.checkArgument(ownedRanges.containsAll(ranges));
+        PartialDeps partialDeps = syncPoint.waitFor.slice(ownedRanges, ranges);
+        node.send(to, new FetchRequest(syncPoint.sourceEpoch(), syncPoint.syncId, ranges, partialDeps, rangeReadTxn(ranges)), new Callback<ReadData.ReadReply>()
+        {
+            @Override
+            public void onSuccess(Node.Id from, ReadData.ReadReply reply)
+            {
+                if (!reply.isOk())
+                {
+                    fail(to, new RuntimeException(reply.toString()));
+                    inflight.remove(key).cancel();
+                    switch ((ReadData.ReadNack) reply)
+                    {
+                        default: throw new AssertionError("Unhandled enum");
+                        case Invalid:
+                        case Redundant:

Review Comment:
   trying to wrap my head around this... doesn't this just mean concurrent txn (original coordinator + recovery coordinator)?  So this is just saying that "we are reusing message reply, and these states are not valid for us" right?



##########
accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java:
##########
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.api.Data;
+import accord.api.DataStore;
+import accord.coordinate.FetchCoordinator;
+import accord.local.CommandStore;
+import accord.local.Node;
+import accord.local.Status;
+import accord.messages.Callback;
+import accord.messages.MessageType;
+import accord.messages.ReadData;
+import accord.messages.WaitAndReadData;
+import accord.primitives.PartialDeps;
+import accord.primitives.PartialTxn;
+import accord.primitives.Ranges;
+import accord.primitives.SyncPoint;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.utils.Invariants;
+import accord.utils.async.AsyncChains;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults;
+
+import static accord.primitives.Routables.Slice.Minimal;
+
+public abstract class AbstractFetchCoordinator extends FetchCoordinator
+{
+    private static final Logger logger = LoggerFactory.getLogger(AbstractFetchCoordinator.class);
+
+    static class FetchResult extends AsyncResults.SettableResult<Ranges> implements DataStore.FetchResult
+    {
+        final AbstractFetchCoordinator coordinator;
+
+        FetchResult(AbstractFetchCoordinator coordinator)
+        {
+            this.coordinator = coordinator;
+        }
+
+        @Override
+        public void abort(Ranges abort)
+        {
+            coordinator.abort(abort);
+        }
+    }
+
+    static class Key
+    {
+        final Node.Id id;
+        final Ranges ranges;
+
+        Key(Node.Id id, Ranges ranges)
+        {
+            this.id = id;
+            this.ranges = ranges;
+        }
+
+        @Override
+        public int hashCode()
+                         {
+                            return id.hashCode() + ranges.hashCode();
+                                                                     }
+
+        @Override
+        public boolean equals(Object obj)
+        {
+            if (this == obj) return true;
+            if (!(obj instanceof Key)) return false;
+            Key that = (Key) obj;
+            return id.equals(that.id) && ranges.equals(that.ranges);
+        }
+    }
+
+    final DataStore.FetchRanges fetchRanges;
+    final CommandStore commandStore;
+    final Map<Key, DataStore.StartingRangeFetch> inflight = new HashMap<>();
+    final FetchResult result = new FetchResult(this);
+    final List<AsyncResult<Void>> persisting = new ArrayList<>();
+
+    protected AbstractFetchCoordinator(Node node, Ranges ranges, SyncPoint syncPoint, DataStore.FetchRanges fetchRanges, CommandStore commandStore)
+    {
+        super(node, ranges, syncPoint, fetchRanges);
+        this.fetchRanges = fetchRanges;
+        this.commandStore = commandStore;
+    }
+
+    protected abstract PartialTxn rangeReadTxn(Ranges ranges);
+
+    protected abstract void onReadOk(Node.Id from, CommandStore commandStore, Data data, Ranges ranges);
+
+    @Override
+    public void contact(Node.Id to, Ranges ranges)
+    {
+        Key key = new Key(to, ranges);
+        inflight.put(key, starting(to, ranges));
+        Ranges ownedRanges = ownedRangesForNode(to);
+        Invariants.checkArgument(ownedRanges.containsAll(ranges));
+        PartialDeps partialDeps = syncPoint.waitFor.slice(ownedRanges, ranges);
+        node.send(to, new FetchRequest(syncPoint.sourceEpoch(), syncPoint.syncId, ranges, partialDeps, rangeReadTxn(ranges)), new Callback<ReadData.ReadReply>()
+        {
+            @Override
+            public void onSuccess(Node.Id from, ReadData.ReadReply reply)
+            {
+                if (!reply.isOk())
+                {
+                    fail(to, new RuntimeException(reply.toString()));
+                    inflight.remove(key).cancel();
+                    switch ((ReadData.ReadNack) reply)
+                    {
+                        default: throw new AssertionError("Unhandled enum");
+                        case Invalid:
+                        case Redundant:
+                        case NotCommitted:
+                            throw new AssertionError();
+                        case Error:
+                            // TODO (required): ensure errors are propagated to coordinators and can be logged
+                    }
+                    return;
+                }
+
+                FetchResponse ok = (FetchResponse) reply;
+                Ranges received;
+                if (ok.unavailable != null)
+                {
+                    unavailable(to, ok.unavailable);
+                    if (ok.data == null)
+                    {
+                        inflight.remove(key).cancel();
+                        return;
+                    }
+                    received = ranges.difference(ok.unavailable);
+                }
+                else
+                {
+                    received = ranges;
+                }
+
+                // TODO (now): make sure it works if invoked in either order
+                inflight.remove(key).started(ok.maxApplied);
+                onReadOk(to, commandStore, ok.data, received);
+                // received must be invoked after submitting the persistence future, as it triggers onDone
+                // which creates a ReducingFuture over {@code persisting}
+            }
+
+            @Override
+            public void onFailure(Node.Id from, Throwable failure)
+            {
+                inflight.remove(key).cancel();
+                fail(from, failure);
+            }
+
+            @Override
+            public void onCallbackFailure(Node.Id from, Throwable failure)
+            {
+                // TODO (soon)
+                logger.error("Fetch coordination failure from " + from, failure);
+            }
+        });
+    }
+
+    @Override
+    protected synchronized void success(Node.Id to, Ranges ranges)
+    {
+        super.success(to, ranges);
+    }
+
+    @Override
+    protected synchronized void fail(Node.Id to, Ranges ranges, Throwable failure)
+    {
+        super.fail(to, ranges, failure);
+    }

Review Comment:
   why override to add `synchronized`?  



##########
accord-core/src/main/java/accord/local/CommandStores.java:
##########
@@ -364,7 +364,7 @@ private synchronized TopologyUpdate updateTopology(Node node, Snapshot prev, Top
                 bootstrapUpdates.add(shard.store.interruptBootstraps(epoch, newRanges.currentRanges()));
             }
             // TODO (desired): only sync affected shards
-            if (epoch > 1)
+            if (epoch > 1 && startSync)

Review Comment:
   rewritten in https://issues.apache.org/jira/browse/CASSANDRA-18519, the logic is now
   
   ```
   Map<Boolean, Ranges> partitioned = add.partitioningBy(range -> shouldBootstrap(node, prev.local, newLocalTopology, range));
   if (partitioned.containsKey(true))
       bootstrapUpdates.add(shardHolder.store.bootstrapper(node, partitioned.get(true), newLocalTopology.epoch()));
   if (partitioned.containsKey(false))
       bootstrapUpdates.add(() -> shardHolder.store.initialise(epoch, partitioned.get(false)));
   ```



##########
accord-core/src/main/java/accord/local/Node.java:
##########
@@ -150,9 +149,10 @@ public Node(Id id, MessageSink messageSink, ConfigurationService configService,
         configService.registerListener(this);
     }
 
+    // FIXME: remove, only used byy Maelstrom

Review Comment:
   ```suggestion
       // TODO (cleanup, testing): remove, only used byy Maelstrom
   ```



##########
accord-core/src/main/java/accord/topology/TopologyManager.java:
##########
@@ -274,6 +299,22 @@ public void onEpochSyncComplete(Id node, long epoch)
         epochs.syncComplete(node, epoch);
     }
 
+    public synchronized void truncateTopologyUntil(long epoch)
+    {
+        Epochs current = epochs;
+        checkArgument(current.epoch() >= epoch);

Review Comment:
   ```suggestion
           checkArgument(current.epoch() >= epoch, "Unable to truncate; epoch %d is > current epoch %d", epoch , current.epoch());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org