You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/11/20 01:42:58 UTC

[incubator-ratis] branch master updated: RATIS-1166. Link DataStream with LogEntryProto. (#290)

This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 6887001  RATIS-1166. Link DataStream with LogEntryProto. (#290)
6887001 is described below

commit 6887001ef8ac750b5280e2745cfb5303d10bc949
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Fri Nov 20 09:42:48 2020 +0800

    RATIS-1166. Link DataStream with LogEntryProto. (#290)
---
 .../ratis/netty/server/DataStreamManagement.java   | 39 ++++++++++++---
 ratis-proto/src/main/proto/Raft.proto              |  6 +++
 .../org/apache/ratis/server/DataStreamMap.java     | 34 +++++++++++++
 .../java/org/apache/ratis/server/RaftServer.java   | 15 +++++-
 .../ratis/server/impl/DataStreamMapImpl.java       | 57 ++++++++++++++++++++++
 .../apache/ratis/server/impl/RaftServerImpl.java   | 18 ++++++-
 .../apache/ratis/server/impl/RaftServerProxy.java  |  4 +-
 .../apache/ratis/server/impl/ServerProtoUtils.java | 17 +++++--
 .../raftlog/segmented/SegmentedRaftLogWorker.java  | 20 +++++---
 .../test/java/org/apache/ratis/RaftTestUtil.java   |  2 +-
 .../ratis/server/impl/RaftServerTestUtil.java      |  5 ++
 .../ratis/datastream/DataStreamBaseTest.java       | 50 +++++++++++++++----
 .../ratis/datastream/TestDataStreamNetty.java      |  8 ++-
 13 files changed, 239 insertions(+), 36 deletions(-)

diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index f88a52a..c2a9e20 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -26,14 +26,16 @@ import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
 import org.apache.ratis.proto.RaftProtos.RaftClientReplyProto;
 import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
 import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.ClientInvocationId;
 import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.exceptions.AlreadyExistsException;
 import org.apache.ratis.protocol.exceptions.DataStreamException;
 import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServer.Division;
 import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.statemachine.StateMachine.DataStream;
 import org.apache.ratis.statemachine.StateMachine.StateMachineDataChannel;
 import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
@@ -41,6 +43,7 @@ import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
 import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
 import org.apache.ratis.thirdparty.io.netty.channel.ChannelId;
 import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.MemoizedSupplier;
 import org.apache.ratis.util.function.CheckedFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,6 +53,7 @@ import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
@@ -214,14 +218,26 @@ public class DataStreamManagement {
         RaftServerConfigKeys.DataStream.asyncThreadPoolSize(properties));
   }
 
+  private CompletableFuture<DataStream> computeDataStreamIfAbsent(RaftClientRequest request) throws IOException {
+    final Division division = server.getDivision(request.getRaftGroupId());
+    final ClientInvocationId invocationId = ClientInvocationId.valueOf(request);
+    final MemoizedSupplier<CompletableFuture<DataStream>> supplier = JavaUtils.memoize(
+        () -> division.getStateMachine().data().stream(request));
+    final CompletableFuture<DataStream> f = division.getDataStreamMap()
+        .computeIfAbsent(invocationId, key -> supplier.get());
+    if (!supplier.isInitialized()) {
+      throw new AlreadyExistsException("A DataStream already exists for " + invocationId);
+    }
+    return f;
+  }
+
   private StreamInfo newStreamInfo(ByteBuf buf,
       CheckedFunction<RaftClientRequest, List<DataStreamOutputRpc>, IOException> getDataStreamOutput) {
     try {
       final RaftClientRequest request = ClientProtoUtils.toRaftClientRequest(
           RaftClientRequestProto.parseFrom(buf.nioBuffer()));
-      final StateMachine stateMachine = server.getStateMachine(request.getRaftGroupId());
       final boolean isPrimary = server.getId().equals(request.getServerId());
-      return new StreamInfo(request, isPrimary, stateMachine.data().stream(request),
+      return new StreamInfo(request, isPrimary, computeDataStreamIfAbsent(request),
           isPrimary? getDataStreamOutput.apply(request): Collections.emptyList());
     } catch (Throwable e) {
       throw new CompletionException(e);
@@ -400,13 +416,20 @@ public class DataStreamManagement {
     LOG.debug("{}: read {}", this, request);
     final ByteBuf buf = request.slice();
     final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
-    final StreamInfo info = request.getType() != Type.STREAM_HEADER? streams.get(key)
-        : streams.computeIfAbsent(key, id -> newStreamInfo(buf, getDataStreamOutput));
-
-    if (info == null) {
-      throw new IllegalStateException("Failed to get StreamInfo for " + request);
+    final StreamInfo info;
+    if (request.getType() == Type.STREAM_HEADER) {
+      final MemoizedSupplier<StreamInfo> supplier = JavaUtils.memoize(() -> newStreamInfo(buf, getDataStreamOutput));
+      info = streams.computeIfAbsent(key, id -> supplier.get());
+      if (!supplier.isInitialized()) {
+        throw new IllegalStateException("Failed to create a new stream for " + request
+            + " since a stream already exists: " + info);
+      }
+    } else {
+      info = Optional.ofNullable(streams.get(key)).orElseThrow(
+          () -> new IllegalStateException("Failed to get StreamInfo for " + request));
     }
 
+
     if (request.getType() == Type.START_TRANSACTION) {
       // for peers to start transaction
       composeAsync(info.getPrevious(), executor, v -> startTransaction(info, request, ctx))
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index 1749118..7a90fb5 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -70,6 +70,12 @@ message StateMachineLogEntryProto {
    */
   StateMachineEntryProto stateMachineEntry = 2;
 
+  enum Type {
+    WRITE = 0;
+    DATASTREAM = 1;
+  }
+
+  Type type = 13;
   // clientId and callId are used to rebuild the retry cache.
   bytes clientId = 14;
   uint64 callId = 15;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/DataStreamMap.java b/ratis-server/src/main/java/org/apache/ratis/server/DataStreamMap.java
new file mode 100644
index 0000000..87905df
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/DataStreamMap.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server;
+
+import org.apache.ratis.protocol.ClientInvocationId;
+import org.apache.ratis.statemachine.StateMachine.DataStream;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+/** A {@link ClientInvocationId}-to-{@link DataStream} map. */
+public interface DataStreamMap {
+  /** Similar to {@link java.util.Map#computeIfAbsent(Object, Function)}. */
+  CompletableFuture<DataStream> computeIfAbsent(ClientInvocationId invocationId,
+      Function<ClientInvocationId, CompletableFuture<DataStream>> newDataStream);
+
+  /** Similar to {@link java.util.Map#remove(java.lang.Object). */
+  CompletableFuture<DataStream> remove(ClientInvocationId invocationId);
+}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
index 30ca21a..e8f42c1 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
@@ -37,6 +37,19 @@ public interface RaftServer extends Closeable, RpcType.Get,
     RaftServerProtocol, RaftServerAsynchronousProtocol,
     RaftClientProtocol, RaftClientAsynchronousProtocol,
     AdminProtocol, AdminAsynchronousProtocol {
+  /** A division of a {@link RaftServer} for a particular group. */
+  interface Division {
+    /** @return the {@link RaftGroupMemberId} for this division. */
+    RaftGroupMemberId getMemberId();
+
+    /** @return the {@link RaftGroup} for this division. */
+    RaftGroup getGroup();
+
+    /** @return the {@link StateMachine} for this division. */
+    StateMachine getStateMachine();
+
+    DataStreamMap getDataStreamMap();
+  }
 
   /** @return the server ID. */
   RaftPeerId getId();
@@ -47,7 +60,7 @@ public interface RaftServer extends Closeable, RpcType.Get,
   /** @return the groups the server is part of. */
   Iterable<RaftGroup> getGroups() throws IOException;
 
-  StateMachine getStateMachine(RaftGroupId groupId) throws IOException;
+  Division getDivision(RaftGroupId groupId) throws IOException;
 
   /** @return the server properties. */
   RaftProperties getProperties();
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamMapImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamMapImpl.java
new file mode 100644
index 0000000..a57a6f5
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamMapImpl.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import org.apache.ratis.protocol.ClientInvocationId;
+import org.apache.ratis.server.DataStreamMap;
+import org.apache.ratis.statemachine.StateMachine.DataStream;
+import org.apache.ratis.util.JavaUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+
+class DataStreamMapImpl implements DataStreamMap {
+  public static final Logger LOG = LoggerFactory.getLogger(DataStreamMapImpl.class);
+
+  private final String name;
+  private final ConcurrentMap<ClientInvocationId, CompletableFuture<DataStream>> map = new ConcurrentHashMap<>();
+
+  DataStreamMapImpl(Object name) {
+    this.name = name + "-" + JavaUtils.getClassSimpleName(getClass());
+  }
+
+  @Override
+  public CompletableFuture<DataStream> remove(ClientInvocationId invocationId) {
+    return map.remove(invocationId);
+  }
+
+  @Override
+  public CompletableFuture<DataStream> computeIfAbsent(ClientInvocationId invocationId,
+      Function<ClientInvocationId, CompletableFuture<DataStream>> newDataStream) {
+    return map.computeIfAbsent(invocationId, newDataStream);
+  }
+
+  @Override
+  public String toString() {
+    return name;
+  }
+}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 9318a39..efe0aa8 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -30,6 +30,8 @@ import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
 import org.apache.ratis.protocol.exceptions.ServerNotReadyException;
 import org.apache.ratis.protocol.exceptions.StaleReadException;
 import org.apache.ratis.protocol.exceptions.StateMachineException;
+import org.apache.ratis.server.DataStreamMap;
+import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.RaftServerMXBean;
 import org.apache.ratis.server.RaftServerRpc;
@@ -76,7 +78,8 @@ import static org.apache.ratis.util.LifeCycle.State.STARTING;
 
 import com.codahale.metrics.Timer;
 
-public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronousProtocol,
+public class RaftServerImpl implements RaftServer.Division,
+    RaftServerProtocol, RaftServerAsynchronousProtocol,
     RaftClientProtocol, RaftClientAsynchronousProtocol {
   public static final Logger LOG = LoggerFactory.getLogger(RaftServerImpl.class);
 
@@ -100,6 +103,8 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
       RaftPeer.newBuilder().setId(getId()).setAddress(getServerRpc().getInetSocketAddress()).build());
   private final RoleInfo role;
 
+  private final DataStreamMap dataStreamMap;
+
   private final RetryCache retryCache;
   private final CommitInfoCache commitInfoCache = new CommitInfoCache();
 
@@ -137,6 +142,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
     this.state = new ServerState(id, group, properties, this, stateMachine);
     this.retryCache = initRetryCache(properties);
     this.inProgressInstallSnapshotRequest = new AtomicReference<>(null);
+    this.dataStreamMap = new DataStreamMapImpl(id);
 
     this.jmxAdapter = new RaftServerJmxAdapter();
     this.leaderElectionMetrics = LeaderElectionMetrics.getLeaderElectionMetrics(this);
@@ -183,10 +189,16 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
     return sleepDeviationThresholdMs;
   }
 
+  @Override
   public StateMachine getStateMachine() {
     return stateMachine;
   }
 
+  @Override
+  public DataStreamMap getDataStreamMap() {
+    return dataStreamMap;
+  }
+
   @VisibleForTesting
   public RetryCache getRetryCache() {
     return retryCache;
@@ -257,6 +269,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
     return state;
   }
 
+  @Override
   public RaftGroupMemberId getMemberId() {
     return getState().getMemberId();
   }
@@ -273,7 +286,8 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
     return getState().getRaftConf();
   }
 
-  RaftGroup getGroup() {
+  @Override
+  public RaftGroup getGroup() {
     return RaftGroup.valueOf(getMemberId().getGroupId(), getRaftConf().getPeers());
   }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index d4b611c..ed0c2ca 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -304,8 +304,8 @@ public class RaftServerProxy implements RaftServer {
   }
 
   @Override
-  public StateMachine getStateMachine(RaftGroupId groupId) throws IOException {
-    return getImpl(groupId).getStateMachine();
+  public Division getDivision(RaftGroupId groupId) throws IOException {
+    return getImpl(groupId);
   }
 
   @Override
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index a3c29c2..7bb58a8 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -235,14 +235,25 @@ public interface ServerProtoUtils {
     if (logData == null) {
       logData = request.getMessage().getContent();
     }
-    return toStateMachineLogEntryProto(request.getClientId(), request.getCallId(), logData, stateMachineData);
+    return toStateMachineLogEntryProto(request.getClientId(), request.getCallId(),
+        toStateMachineLogEntryProtoType(request.getType().getTypeCase()), logData, stateMachineData);
   }
 
-  static StateMachineLogEntryProto toStateMachineLogEntryProto(
-      ClientId clientId, long callId, ByteString logData, ByteString stateMachineData) {
+  static StateMachineLogEntryProto.Type toStateMachineLogEntryProtoType(RaftClientRequestProto.TypeCase typeCase) {
+    switch (typeCase) {
+      case WRITE: return StateMachineLogEntryProto.Type.WRITE;
+      case DATASTREAM: return StateMachineLogEntryProto.Type.DATASTREAM;
+      default:
+        throw new IllegalStateException("Unexpected type case " + typeCase);
+    }
+  }
+
+  static StateMachineLogEntryProto toStateMachineLogEntryProto(ClientId clientId, long callId,
+      StateMachineLogEntryProto.Type type, ByteString logData, ByteString stateMachineData) {
     final StateMachineLogEntryProto.Builder b = StateMachineLogEntryProto.newBuilder()
         .setClientId(clientId.toByteString())
         .setCallId(callId)
+        .setType(type)
         .setLogData(logData);
     if (stateMachineData != null) {
       b.setStateMachineEntry(toStateMachineEntryProtoBuilder(stateMachineData));
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index 01f98a5..6069469 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -20,6 +20,8 @@ package org.apache.ratis.server.raftlog.segmented;
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Timer;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
+import org.apache.ratis.protocol.ClientInvocationId;
 import org.apache.ratis.protocol.RaftGroupMemberId;
 import org.apache.ratis.protocol.exceptions.TimeoutIOException;
 import org.apache.ratis.server.RaftServerConfigKeys;
@@ -158,7 +160,6 @@ class SegmentedRaftLogWorker implements Runnable {
   private long lastWrittenIndex;
   /** the largest index of the entry that has been flushed */
   private final RaftLogIndex flushIndex = new RaftLogIndex("flushIndex", 0);
-  /** the largest index of the entry in a closed segment */
   /** the index up to which cache can be evicted - max of snapshotIndex and
    * largest index in a closed segment */
   private final RaftLogIndex safeCacheEvictIndex = new RaftLogIndex("safeCacheEvictIndex", 0);
@@ -167,7 +168,6 @@ class SegmentedRaftLogWorker implements Runnable {
 
   private final long segmentMaxSize;
   private final long preallocatedSize;
-  private final int bufferSize;
   private final RaftServerImpl server;
   private int flushBatchSize;
 
@@ -191,7 +191,6 @@ class SegmentedRaftLogWorker implements Runnable {
 
     this.segmentMaxSize = RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
     this.preallocatedSize = RaftServerConfigKeys.Log.preallocatedSize(properties).getSize();
-    this.bufferSize = RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt();
     this.forceSyncNum = RaftServerConfigKeys.Log.forceSyncNum(properties);
     this.flushBatchSize = 0;
 
@@ -208,6 +207,7 @@ class SegmentedRaftLogWorker implements Runnable {
     this.raftLogQueueingTimer = metricRegistry.getRaftLogQueueTimer();
     this.raftLogEnqueueingDelayTimer = metricRegistry.getRaftLogEnqueueDelayTimer();
 
+    final int bufferSize = RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt();
     this.writeBuffer = ByteBuffer.allocateDirect(bufferSize);
   }
 
@@ -460,8 +460,14 @@ class SegmentedRaftLogWorker implements Runnable {
 
     WriteLog(LogEntryProto entry) {
       this.entry = ServerProtoUtils.removeStateMachineData(entry);
-      if (this.entry == entry || stateMachine == null) {
-        this.stateMachineFuture = null;
+      if (this.entry == entry) {
+        final StateMachineLogEntryProto proto = entry.hasStateMachineLogEntry()? entry.getStateMachineLogEntry(): null;
+        if (stateMachine != null && proto != null && proto.getType() == StateMachineLogEntryProto.Type.DATASTREAM) {
+          this.stateMachineFuture = server.getDataStreamMap().remove(ClientInvocationId.valueOf(proto))
+              .thenApply(stream -> stateMachine.data().link(stream, entry));
+        } else {
+          this.stateMachineFuture = null;
+        }
       } else {
         try {
           // this.entry != entry iff the entry has state machine data
@@ -605,19 +611,17 @@ class SegmentedRaftLogWorker implements Runnable {
 
   private class TruncateLog extends Task {
     private final TruncationSegments segments;
-    private final long truncateIndex;
     private CompletableFuture<Void> stateMachineFuture = null;
 
     TruncateLog(TruncationSegments ts, long index) {
       this.segments = ts;
-      this.truncateIndex = index;
       if (stateMachine != null) {
         // TruncateLog and WriteLog instance is created while taking a RaftLog write lock.
         // StateMachine call is made inside the constructor so that it is lock
         // protected. This is to make sure that stateMachine can determine which
         // indexes to truncate as stateMachine calls would happen in the sequence
         // of log operations.
-        stateMachineFuture = stateMachine.data().truncate(truncateIndex);
+        stateMachineFuture = stateMachine.data().truncate(index);
       }
     }
 
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index 1a8a1fb..6dd124b 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -364,7 +364,7 @@ public interface RaftTestUtil {
       this.op = Objects.requireNonNull(op);
       final ByteString bytes = ProtoUtils.toByteString(op);
       this.smLogEntryProto = ServerProtoUtils.toStateMachineLogEntryProto(
-          clientId, callId, bytes, hasStateMachineData? bytes: null);
+          clientId, callId, StateMachineLogEntryProto.Type.WRITE, bytes, hasStateMachineData? bytes: null);
     }
 
     @Override
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
index 34e8112..440b3bb 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
@@ -25,6 +25,7 @@ import org.apache.ratis.protocol.ClientInvocationId;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.DataStreamMap;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Log4jUtils;
 import org.apache.ratis.util.TimeDuration;
@@ -127,4 +128,8 @@ public class RaftServerTestUtil {
   public static RaftServerImpl getRaftServerImpl(RaftServerProxy proxy, RaftGroupId groupId) {
     return JavaUtils.callAsUnchecked(() -> proxy.getImpl(groupId));
   }
+
+  public static DataStreamMap newDataStreamMap(Object name) {
+    return new DataStreamMapImpl(name);
+  }
 }
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 0ca5377..17f675a 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -39,14 +39,17 @@ import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftGroupMemberId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
 import org.apache.ratis.protocol.SetConfigurationRequest;
 import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
 import org.apache.ratis.rpc.RpcType;
+import org.apache.ratis.server.DataStreamMap;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.impl.DataStreamServerImpl;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
 import org.apache.ratis.server.impl.ServerFactory;
 import org.apache.ratis.statemachine.StateMachine.StateMachineDataChannel;
 import org.apache.ratis.statemachine.impl.BaseStateMachine;
@@ -172,6 +175,35 @@ abstract class DataStreamBaseTest extends BaseTest {
     }
   }
 
+  static class MyDivision implements RaftServer.Division {
+    private final MultiDataStreamStateMachine stateMachine = new MultiDataStreamStateMachine();
+    private final DataStreamMap streamMap;
+
+    MyDivision(Object name) {
+      this.streamMap = RaftServerTestUtil.newDataStreamMap(name);
+    }
+
+    @Override
+    public RaftGroupMemberId getMemberId() {
+      return null;
+    }
+
+    @Override
+    public RaftGroup getGroup() {
+      return null;
+    }
+
+    @Override
+    public MultiDataStreamStateMachine getStateMachine() {
+      return stateMachine;
+    }
+
+    @Override
+    public DataStreamMap getDataStreamMap() {
+      return streamMap;
+    }
+  }
+
   static class Server {
     private final RaftPeer peer;
     private final RaftServer raftServer;
@@ -187,8 +219,8 @@ abstract class DataStreamBaseTest extends BaseTest {
       return peer;
     }
 
-    MultiDataStreamStateMachine getStateMachine(RaftGroupId groupId) throws IOException {
-      return (MultiDataStreamStateMachine)raftServer.getStateMachine(groupId);
+    MyDivision getDivision(RaftGroupId groupId) throws IOException {
+      return (MyDivision) raftServer.getDivision(groupId);
     }
 
     void start() {
@@ -215,7 +247,7 @@ abstract class DataStreamBaseTest extends BaseTest {
 
   protected RaftServer newRaftServer(RaftPeer peer, RaftProperties properties) {
     return new RaftServer() {
-      private final ConcurrentMap<RaftGroupId, MultiDataStreamStateMachine> stateMachines = new ConcurrentHashMap<>();
+      private final ConcurrentMap<RaftGroupId, MyDivision> divisions = new ConcurrentHashMap<>();
 
       @Override
       public RaftPeerId getId() {
@@ -223,8 +255,8 @@ abstract class DataStreamBaseTest extends BaseTest {
       }
 
       @Override
-      public MultiDataStreamStateMachine getStateMachine(RaftGroupId groupId) {
-        return stateMachines.computeIfAbsent(groupId, key -> new MultiDataStreamStateMachine());
+      public MyDivision getDivision(RaftGroupId groupId) {
+        return divisions.computeIfAbsent(groupId, MyDivision::new);
       }
 
       @Override
@@ -232,7 +264,6 @@ abstract class DataStreamBaseTest extends BaseTest {
         return properties;
       }
 
-
       @Override
       public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) {
         return null;
@@ -270,7 +301,7 @@ abstract class DataStreamBaseTest extends BaseTest {
 
       @Override
       public CompletableFuture<RaftClientReply> submitClientRequestAsync(RaftClientRequest request) {
-        final MultiDataStreamStateMachine stateMachine = getStateMachine(request.getRaftGroupId());
+        final MultiDataStreamStateMachine stateMachine = getDivision(request.getRaftGroupId()).getStateMachine();
         final SingleDataStream stream = stateMachine.getSingleDataStream(request);
         Assert.assertFalse(stream.getWritableByteChannel().isOpen());
         return CompletableFuture.completedFuture(RaftClientReply.newBuilder()
@@ -538,8 +569,9 @@ abstract class DataStreamBaseTest extends BaseTest {
     Assert.assertEquals(RaftClientRequest.dataStreamRequestType(), header.getType());
 
     // check stream
-    final MultiDataStreamStateMachine s = server.getStateMachine(header.getRaftGroupId());
-    final SingleDataStream stream = s.getSingleDataStream(header);
+    final MyDivision d = server.getDivision(header.getRaftGroupId());
+    Assert.assertNotNull(d.getDataStreamMap().remove(ClientInvocationId.valueOf(header)));
+    final SingleDataStream stream = d.getStateMachine().getSingleDataStream(header);
     Assert.assertEquals(dataSize, stream.getByteWritten());
     Assert.assertEquals(dataSize, stream.getForcedPosition());
 
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
index dd035f1..1655a0c 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
@@ -43,6 +43,8 @@ import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.stream.Collectors;
 
 import static org.mockito.Mockito.mock;
@@ -126,9 +128,11 @@ public class TestDataStreamNetty extends DataStreamBaseTest {
       when(raftServer.getProperties()).thenReturn(properties);
       when(raftServer.getId()).thenReturn(peerId);
       if (getStateMachineException == null) {
-        when(raftServer.getStateMachine(Mockito.any(RaftGroupId.class))).thenReturn(new MultiDataStreamStateMachine());
+        final ConcurrentMap<RaftGroupId, MyDivision> divisions = new ConcurrentHashMap<>();
+        when(raftServer.getDivision(Mockito.any(RaftGroupId.class))).thenAnswer(
+            invocation -> divisions.computeIfAbsent((RaftGroupId)invocation.getArguments()[0], MyDivision::new));
       } else {
-        when(raftServer.getStateMachine(Mockito.any(RaftGroupId.class))).thenThrow(getStateMachineException);
+        when(raftServer.getDivision(Mockito.any(RaftGroupId.class))).thenThrow(getStateMachineException);
       }
 
       raftServers.add(raftServer);