You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/11/20 01:42:58 UTC
[incubator-ratis] branch master updated: RATIS-1166. Link
DataStream with LogEntryProto. (#290)
This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 6887001 RATIS-1166. Link DataStream with LogEntryProto. (#290)
6887001 is described below
commit 6887001ef8ac750b5280e2745cfb5303d10bc949
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Fri Nov 20 09:42:48 2020 +0800
RATIS-1166. Link DataStream with LogEntryProto. (#290)
---
.../ratis/netty/server/DataStreamManagement.java | 39 ++++++++++++---
ratis-proto/src/main/proto/Raft.proto | 6 +++
.../org/apache/ratis/server/DataStreamMap.java | 34 +++++++++++++
.../java/org/apache/ratis/server/RaftServer.java | 15 +++++-
.../ratis/server/impl/DataStreamMapImpl.java | 57 ++++++++++++++++++++++
.../apache/ratis/server/impl/RaftServerImpl.java | 18 ++++++-
.../apache/ratis/server/impl/RaftServerProxy.java | 4 +-
.../apache/ratis/server/impl/ServerProtoUtils.java | 17 +++++--
.../raftlog/segmented/SegmentedRaftLogWorker.java | 20 +++++---
.../test/java/org/apache/ratis/RaftTestUtil.java | 2 +-
.../ratis/server/impl/RaftServerTestUtil.java | 5 ++
.../ratis/datastream/DataStreamBaseTest.java | 50 +++++++++++++++----
.../ratis/datastream/TestDataStreamNetty.java | 8 ++-
13 files changed, 239 insertions(+), 36 deletions(-)
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index f88a52a..c2a9e20 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -26,14 +26,16 @@ import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
import org.apache.ratis.proto.RaftProtos.RaftClientReplyProto;
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.exceptions.AlreadyExistsException;
import org.apache.ratis.protocol.exceptions.DataStreamException;
import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServer.Division;
import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.StateMachine.DataStream;
import org.apache.ratis.statemachine.StateMachine.StateMachineDataChannel;
import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
@@ -41,6 +43,7 @@ import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelId;
import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.function.CheckedFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,6 +53,7 @@ import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
@@ -214,14 +218,26 @@ public class DataStreamManagement {
RaftServerConfigKeys.DataStream.asyncThreadPoolSize(properties));
}
+ private CompletableFuture<DataStream> computeDataStreamIfAbsent(RaftClientRequest request) throws IOException {
+ final Division division = server.getDivision(request.getRaftGroupId());
+ final ClientInvocationId invocationId = ClientInvocationId.valueOf(request);
+ final MemoizedSupplier<CompletableFuture<DataStream>> supplier = JavaUtils.memoize(
+ () -> division.getStateMachine().data().stream(request));
+ final CompletableFuture<DataStream> f = division.getDataStreamMap()
+ .computeIfAbsent(invocationId, key -> supplier.get());
+ if (!supplier.isInitialized()) {
+ throw new AlreadyExistsException("A DataStream already exists for " + invocationId);
+ }
+ return f;
+ }
+
private StreamInfo newStreamInfo(ByteBuf buf,
CheckedFunction<RaftClientRequest, List<DataStreamOutputRpc>, IOException> getDataStreamOutput) {
try {
final RaftClientRequest request = ClientProtoUtils.toRaftClientRequest(
RaftClientRequestProto.parseFrom(buf.nioBuffer()));
- final StateMachine stateMachine = server.getStateMachine(request.getRaftGroupId());
final boolean isPrimary = server.getId().equals(request.getServerId());
- return new StreamInfo(request, isPrimary, stateMachine.data().stream(request),
+ return new StreamInfo(request, isPrimary, computeDataStreamIfAbsent(request),
isPrimary? getDataStreamOutput.apply(request): Collections.emptyList());
} catch (Throwable e) {
throw new CompletionException(e);
@@ -400,13 +416,20 @@ public class DataStreamManagement {
LOG.debug("{}: read {}", this, request);
final ByteBuf buf = request.slice();
final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
- final StreamInfo info = request.getType() != Type.STREAM_HEADER? streams.get(key)
- : streams.computeIfAbsent(key, id -> newStreamInfo(buf, getDataStreamOutput));
-
- if (info == null) {
- throw new IllegalStateException("Failed to get StreamInfo for " + request);
+ final StreamInfo info;
+ if (request.getType() == Type.STREAM_HEADER) {
+ final MemoizedSupplier<StreamInfo> supplier = JavaUtils.memoize(() -> newStreamInfo(buf, getDataStreamOutput));
+ info = streams.computeIfAbsent(key, id -> supplier.get());
+ if (!supplier.isInitialized()) {
+ throw new IllegalStateException("Failed to create a new stream for " + request
+ + " since a stream already exists: " + info);
+ }
+ } else {
+ info = Optional.ofNullable(streams.get(key)).orElseThrow(
+ () -> new IllegalStateException("Failed to get StreamInfo for " + request));
}
+
if (request.getType() == Type.START_TRANSACTION) {
// for peers to start transaction
composeAsync(info.getPrevious(), executor, v -> startTransaction(info, request, ctx))
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index 1749118..7a90fb5 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -70,6 +70,12 @@ message StateMachineLogEntryProto {
*/
StateMachineEntryProto stateMachineEntry = 2;
+ enum Type {
+ WRITE = 0;
+ DATASTREAM = 1;
+ }
+
+ Type type = 13;
// clientId and callId are used to rebuild the retry cache.
bytes clientId = 14;
uint64 callId = 15;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/DataStreamMap.java b/ratis-server/src/main/java/org/apache/ratis/server/DataStreamMap.java
new file mode 100644
index 0000000..87905df
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/DataStreamMap.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server;
+
+import org.apache.ratis.protocol.ClientInvocationId;
+import org.apache.ratis.statemachine.StateMachine.DataStream;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+/** A {@link ClientInvocationId}-to-{@link DataStream} map. */
+public interface DataStreamMap {
+ /** Similar to {@link java.util.Map#computeIfAbsent(Object, Function)}. */
+ CompletableFuture<DataStream> computeIfAbsent(ClientInvocationId invocationId,
+ Function<ClientInvocationId, CompletableFuture<DataStream>> newDataStream);
+
+ /** Similar to {@link java.util.Map#remove(java.lang.Object). */
+ CompletableFuture<DataStream> remove(ClientInvocationId invocationId);
+}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
index 30ca21a..e8f42c1 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
@@ -37,6 +37,19 @@ public interface RaftServer extends Closeable, RpcType.Get,
RaftServerProtocol, RaftServerAsynchronousProtocol,
RaftClientProtocol, RaftClientAsynchronousProtocol,
AdminProtocol, AdminAsynchronousProtocol {
+ /** A division of a {@link RaftServer} for a particular group. */
+ interface Division {
+ /** @return the {@link RaftGroupMemberId} for this division. */
+ RaftGroupMemberId getMemberId();
+
+ /** @return the {@link RaftGroup} for this division. */
+ RaftGroup getGroup();
+
+ /** @return the {@link StateMachine} for this division. */
+ StateMachine getStateMachine();
+
+ DataStreamMap getDataStreamMap();
+ }
/** @return the server ID. */
RaftPeerId getId();
@@ -47,7 +60,7 @@ public interface RaftServer extends Closeable, RpcType.Get,
/** @return the groups the server is part of. */
Iterable<RaftGroup> getGroups() throws IOException;
- StateMachine getStateMachine(RaftGroupId groupId) throws IOException;
+ Division getDivision(RaftGroupId groupId) throws IOException;
/** @return the server properties. */
RaftProperties getProperties();
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamMapImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamMapImpl.java
new file mode 100644
index 0000000..a57a6f5
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamMapImpl.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import org.apache.ratis.protocol.ClientInvocationId;
+import org.apache.ratis.server.DataStreamMap;
+import org.apache.ratis.statemachine.StateMachine.DataStream;
+import org.apache.ratis.util.JavaUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+
+class DataStreamMapImpl implements DataStreamMap {
+ public static final Logger LOG = LoggerFactory.getLogger(DataStreamMapImpl.class);
+
+ private final String name;
+ private final ConcurrentMap<ClientInvocationId, CompletableFuture<DataStream>> map = new ConcurrentHashMap<>();
+
+ DataStreamMapImpl(Object name) {
+ this.name = name + "-" + JavaUtils.getClassSimpleName(getClass());
+ }
+
+ @Override
+ public CompletableFuture<DataStream> remove(ClientInvocationId invocationId) {
+ return map.remove(invocationId);
+ }
+
+ @Override
+ public CompletableFuture<DataStream> computeIfAbsent(ClientInvocationId invocationId,
+ Function<ClientInvocationId, CompletableFuture<DataStream>> newDataStream) {
+ return map.computeIfAbsent(invocationId, newDataStream);
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 9318a39..efe0aa8 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -30,6 +30,8 @@ import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
import org.apache.ratis.protocol.exceptions.ServerNotReadyException;
import org.apache.ratis.protocol.exceptions.StaleReadException;
import org.apache.ratis.protocol.exceptions.StateMachineException;
+import org.apache.ratis.server.DataStreamMap;
+import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RaftServerMXBean;
import org.apache.ratis.server.RaftServerRpc;
@@ -76,7 +78,8 @@ import static org.apache.ratis.util.LifeCycle.State.STARTING;
import com.codahale.metrics.Timer;
-public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronousProtocol,
+public class RaftServerImpl implements RaftServer.Division,
+ RaftServerProtocol, RaftServerAsynchronousProtocol,
RaftClientProtocol, RaftClientAsynchronousProtocol {
public static final Logger LOG = LoggerFactory.getLogger(RaftServerImpl.class);
@@ -100,6 +103,8 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
RaftPeer.newBuilder().setId(getId()).setAddress(getServerRpc().getInetSocketAddress()).build());
private final RoleInfo role;
+ private final DataStreamMap dataStreamMap;
+
private final RetryCache retryCache;
private final CommitInfoCache commitInfoCache = new CommitInfoCache();
@@ -137,6 +142,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
this.state = new ServerState(id, group, properties, this, stateMachine);
this.retryCache = initRetryCache(properties);
this.inProgressInstallSnapshotRequest = new AtomicReference<>(null);
+ this.dataStreamMap = new DataStreamMapImpl(id);
this.jmxAdapter = new RaftServerJmxAdapter();
this.leaderElectionMetrics = LeaderElectionMetrics.getLeaderElectionMetrics(this);
@@ -183,10 +189,16 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
return sleepDeviationThresholdMs;
}
+ @Override
public StateMachine getStateMachine() {
return stateMachine;
}
+ @Override
+ public DataStreamMap getDataStreamMap() {
+ return dataStreamMap;
+ }
+
@VisibleForTesting
public RetryCache getRetryCache() {
return retryCache;
@@ -257,6 +269,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
return state;
}
+ @Override
public RaftGroupMemberId getMemberId() {
return getState().getMemberId();
}
@@ -273,7 +286,8 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
return getState().getRaftConf();
}
- RaftGroup getGroup() {
+ @Override
+ public RaftGroup getGroup() {
return RaftGroup.valueOf(getMemberId().getGroupId(), getRaftConf().getPeers());
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index d4b611c..ed0c2ca 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -304,8 +304,8 @@ public class RaftServerProxy implements RaftServer {
}
@Override
- public StateMachine getStateMachine(RaftGroupId groupId) throws IOException {
- return getImpl(groupId).getStateMachine();
+ public Division getDivision(RaftGroupId groupId) throws IOException {
+ return getImpl(groupId);
}
@Override
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index a3c29c2..7bb58a8 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -235,14 +235,25 @@ public interface ServerProtoUtils {
if (logData == null) {
logData = request.getMessage().getContent();
}
- return toStateMachineLogEntryProto(request.getClientId(), request.getCallId(), logData, stateMachineData);
+ return toStateMachineLogEntryProto(request.getClientId(), request.getCallId(),
+ toStateMachineLogEntryProtoType(request.getType().getTypeCase()), logData, stateMachineData);
}
- static StateMachineLogEntryProto toStateMachineLogEntryProto(
- ClientId clientId, long callId, ByteString logData, ByteString stateMachineData) {
+ static StateMachineLogEntryProto.Type toStateMachineLogEntryProtoType(RaftClientRequestProto.TypeCase typeCase) {
+ switch (typeCase) {
+ case WRITE: return StateMachineLogEntryProto.Type.WRITE;
+ case DATASTREAM: return StateMachineLogEntryProto.Type.DATASTREAM;
+ default:
+ throw new IllegalStateException("Unexpected type case " + typeCase);
+ }
+ }
+
+ static StateMachineLogEntryProto toStateMachineLogEntryProto(ClientId clientId, long callId,
+ StateMachineLogEntryProto.Type type, ByteString logData, ByteString stateMachineData) {
final StateMachineLogEntryProto.Builder b = StateMachineLogEntryProto.newBuilder()
.setClientId(clientId.toByteString())
.setCallId(callId)
+ .setType(type)
.setLogData(logData);
if (stateMachineData != null) {
b.setStateMachineEntry(toStateMachineEntryProtoBuilder(stateMachineData));
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index 01f98a5..6069469 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -20,6 +20,8 @@ package org.apache.ratis.server.raftlog.segmented;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Timer;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
+import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.exceptions.TimeoutIOException;
import org.apache.ratis.server.RaftServerConfigKeys;
@@ -158,7 +160,6 @@ class SegmentedRaftLogWorker implements Runnable {
private long lastWrittenIndex;
/** the largest index of the entry that has been flushed */
private final RaftLogIndex flushIndex = new RaftLogIndex("flushIndex", 0);
- /** the largest index of the entry in a closed segment */
/** the index up to which cache can be evicted - max of snapshotIndex and
* largest index in a closed segment */
private final RaftLogIndex safeCacheEvictIndex = new RaftLogIndex("safeCacheEvictIndex", 0);
@@ -167,7 +168,6 @@ class SegmentedRaftLogWorker implements Runnable {
private final long segmentMaxSize;
private final long preallocatedSize;
- private final int bufferSize;
private final RaftServerImpl server;
private int flushBatchSize;
@@ -191,7 +191,6 @@ class SegmentedRaftLogWorker implements Runnable {
this.segmentMaxSize = RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
this.preallocatedSize = RaftServerConfigKeys.Log.preallocatedSize(properties).getSize();
- this.bufferSize = RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt();
this.forceSyncNum = RaftServerConfigKeys.Log.forceSyncNum(properties);
this.flushBatchSize = 0;
@@ -208,6 +207,7 @@ class SegmentedRaftLogWorker implements Runnable {
this.raftLogQueueingTimer = metricRegistry.getRaftLogQueueTimer();
this.raftLogEnqueueingDelayTimer = metricRegistry.getRaftLogEnqueueDelayTimer();
+ final int bufferSize = RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt();
this.writeBuffer = ByteBuffer.allocateDirect(bufferSize);
}
@@ -460,8 +460,14 @@ class SegmentedRaftLogWorker implements Runnable {
WriteLog(LogEntryProto entry) {
this.entry = ServerProtoUtils.removeStateMachineData(entry);
- if (this.entry == entry || stateMachine == null) {
- this.stateMachineFuture = null;
+ if (this.entry == entry) {
+ final StateMachineLogEntryProto proto = entry.hasStateMachineLogEntry()? entry.getStateMachineLogEntry(): null;
+ if (stateMachine != null && proto != null && proto.getType() == StateMachineLogEntryProto.Type.DATASTREAM) {
+ this.stateMachineFuture = server.getDataStreamMap().remove(ClientInvocationId.valueOf(proto))
+ .thenApply(stream -> stateMachine.data().link(stream, entry));
+ } else {
+ this.stateMachineFuture = null;
+ }
} else {
try {
// this.entry != entry iff the entry has state machine data
@@ -605,19 +611,17 @@ class SegmentedRaftLogWorker implements Runnable {
private class TruncateLog extends Task {
private final TruncationSegments segments;
- private final long truncateIndex;
private CompletableFuture<Void> stateMachineFuture = null;
TruncateLog(TruncationSegments ts, long index) {
this.segments = ts;
- this.truncateIndex = index;
if (stateMachine != null) {
// TruncateLog and WriteLog instance is created while taking a RaftLog write lock.
// StateMachine call is made inside the constructor so that it is lock
// protected. This is to make sure that stateMachine can determine which
// indexes to truncate as stateMachine calls would happen in the sequence
// of log operations.
- stateMachineFuture = stateMachine.data().truncate(truncateIndex);
+ stateMachineFuture = stateMachine.data().truncate(index);
}
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index 1a8a1fb..6dd124b 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -364,7 +364,7 @@ public interface RaftTestUtil {
this.op = Objects.requireNonNull(op);
final ByteString bytes = ProtoUtils.toByteString(op);
this.smLogEntryProto = ServerProtoUtils.toStateMachineLogEntryProto(
- clientId, callId, bytes, hasStateMachineData? bytes: null);
+ clientId, callId, StateMachineLogEntryProto.Type.WRITE, bytes, hasStateMachineData? bytes: null);
}
@Override
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
index 34e8112..440b3bb 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
@@ -25,6 +25,7 @@ import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.DataStreamMap;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Log4jUtils;
import org.apache.ratis.util.TimeDuration;
@@ -127,4 +128,8 @@ public class RaftServerTestUtil {
public static RaftServerImpl getRaftServerImpl(RaftServerProxy proxy, RaftGroupId groupId) {
return JavaUtils.callAsUnchecked(() -> proxy.getImpl(groupId));
}
+
+ public static DataStreamMap newDataStreamMap(Object name) {
+ return new DataStreamMapImpl(name);
+ }
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 0ca5377..17f675a 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -39,14 +39,17 @@ import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
import org.apache.ratis.protocol.SetConfigurationRequest;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.apache.ratis.rpc.RpcType;
+import org.apache.ratis.server.DataStreamMap;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.impl.DataStreamServerImpl;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.impl.ServerFactory;
import org.apache.ratis.statemachine.StateMachine.StateMachineDataChannel;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
@@ -172,6 +175,35 @@ abstract class DataStreamBaseTest extends BaseTest {
}
}
+ static class MyDivision implements RaftServer.Division {
+ private final MultiDataStreamStateMachine stateMachine = new MultiDataStreamStateMachine();
+ private final DataStreamMap streamMap;
+
+ MyDivision(Object name) {
+ this.streamMap = RaftServerTestUtil.newDataStreamMap(name);
+ }
+
+ @Override
+ public RaftGroupMemberId getMemberId() {
+ return null;
+ }
+
+ @Override
+ public RaftGroup getGroup() {
+ return null;
+ }
+
+ @Override
+ public MultiDataStreamStateMachine getStateMachine() {
+ return stateMachine;
+ }
+
+ @Override
+ public DataStreamMap getDataStreamMap() {
+ return streamMap;
+ }
+ }
+
static class Server {
private final RaftPeer peer;
private final RaftServer raftServer;
@@ -187,8 +219,8 @@ abstract class DataStreamBaseTest extends BaseTest {
return peer;
}
- MultiDataStreamStateMachine getStateMachine(RaftGroupId groupId) throws IOException {
- return (MultiDataStreamStateMachine)raftServer.getStateMachine(groupId);
+ MyDivision getDivision(RaftGroupId groupId) throws IOException {
+ return (MyDivision) raftServer.getDivision(groupId);
}
void start() {
@@ -215,7 +247,7 @@ abstract class DataStreamBaseTest extends BaseTest {
protected RaftServer newRaftServer(RaftPeer peer, RaftProperties properties) {
return new RaftServer() {
- private final ConcurrentMap<RaftGroupId, MultiDataStreamStateMachine> stateMachines = new ConcurrentHashMap<>();
+ private final ConcurrentMap<RaftGroupId, MyDivision> divisions = new ConcurrentHashMap<>();
@Override
public RaftPeerId getId() {
@@ -223,8 +255,8 @@ abstract class DataStreamBaseTest extends BaseTest {
}
@Override
- public MultiDataStreamStateMachine getStateMachine(RaftGroupId groupId) {
- return stateMachines.computeIfAbsent(groupId, key -> new MultiDataStreamStateMachine());
+ public MyDivision getDivision(RaftGroupId groupId) {
+ return divisions.computeIfAbsent(groupId, MyDivision::new);
}
@Override
@@ -232,7 +264,6 @@ abstract class DataStreamBaseTest extends BaseTest {
return properties;
}
-
@Override
public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) {
return null;
@@ -270,7 +301,7 @@ abstract class DataStreamBaseTest extends BaseTest {
@Override
public CompletableFuture<RaftClientReply> submitClientRequestAsync(RaftClientRequest request) {
- final MultiDataStreamStateMachine stateMachine = getStateMachine(request.getRaftGroupId());
+ final MultiDataStreamStateMachine stateMachine = getDivision(request.getRaftGroupId()).getStateMachine();
final SingleDataStream stream = stateMachine.getSingleDataStream(request);
Assert.assertFalse(stream.getWritableByteChannel().isOpen());
return CompletableFuture.completedFuture(RaftClientReply.newBuilder()
@@ -538,8 +569,9 @@ abstract class DataStreamBaseTest extends BaseTest {
Assert.assertEquals(RaftClientRequest.dataStreamRequestType(), header.getType());
// check stream
- final MultiDataStreamStateMachine s = server.getStateMachine(header.getRaftGroupId());
- final SingleDataStream stream = s.getSingleDataStream(header);
+ final MyDivision d = server.getDivision(header.getRaftGroupId());
+ Assert.assertNotNull(d.getDataStreamMap().remove(ClientInvocationId.valueOf(header)));
+ final SingleDataStream stream = d.getStateMachine().getSingleDataStream(header);
Assert.assertEquals(dataSize, stream.getByteWritten());
Assert.assertEquals(dataSize, stream.getForcedPosition());
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
index dd035f1..1655a0c 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
@@ -43,6 +43,8 @@ import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import static org.mockito.Mockito.mock;
@@ -126,9 +128,11 @@ public class TestDataStreamNetty extends DataStreamBaseTest {
when(raftServer.getProperties()).thenReturn(properties);
when(raftServer.getId()).thenReturn(peerId);
if (getStateMachineException == null) {
- when(raftServer.getStateMachine(Mockito.any(RaftGroupId.class))).thenReturn(new MultiDataStreamStateMachine());
+ final ConcurrentMap<RaftGroupId, MyDivision> divisions = new ConcurrentHashMap<>();
+ when(raftServer.getDivision(Mockito.any(RaftGroupId.class))).thenAnswer(
+ invocation -> divisions.computeIfAbsent((RaftGroupId)invocation.getArguments()[0], MyDivision::new));
} else {
- when(raftServer.getStateMachine(Mockito.any(RaftGroupId.class))).thenThrow(getStateMachineException);
+ when(raftServer.getDivision(Mockito.any(RaftGroupId.class))).thenThrow(getStateMachineException);
}
raftServers.add(raftServer);