You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2017/08/07 22:26:10 UTC
incubator-ratis git commit: RATIS-101. Use ByteString instead of
byte[] in RaftId, ClientId, RaftGroupId and RaftPeerId.
Repository: incubator-ratis
Updated Branches:
refs/heads/master 1e7a06ef1 -> e2bdc2478
RATIS-101. Use ByteString instead of byte[] in RaftId, ClientId, RaftGroupId and RaftPeerId.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/e2bdc247
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/e2bdc247
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/e2bdc247
Branch: refs/heads/master
Commit: e2bdc2478f45832a12582a0f94f32735047f151c
Parents: 1e7a06e
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Mon Aug 7 14:08:16 2017 -0700
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Mon Aug 7 14:09:06 2017 -0700
----------------------------------------------------------------------
.../ratis/client/impl/ClientProtoUtils.java | 101 ++++++++++---------
.../org/apache/ratis/protocol/ClientId.java | 8 +-
.../org/apache/ratis/protocol/RaftGroupId.java | 8 +-
.../java/org/apache/ratis/protocol/RaftId.java | 43 ++++----
.../org/apache/ratis/protocol/RaftPeerId.java | 23 ++---
.../java/org/apache/ratis/util/ProtoUtils.java | 35 ++++---
.../ratis/grpc/client/AppendStreamer.java | 17 ++--
.../ratis/server/impl/RaftServerImpl.java | 2 +-
.../ratis/server/impl/ServerProtoUtils.java | 67 ++++++------
.../java/org/apache/ratis/RaftTestUtil.java | 18 ++--
.../org/apache/ratis/protocol/TestRaftId.java | 48 +++++++++
.../server/simulation/RaftServerReply.java | 7 +-
.../server/simulation/RaftServerRequest.java | 7 +-
13 files changed, 218 insertions(+), 166 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e2bdc247/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 146622b..2968884 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -17,9 +17,9 @@
*/
package org.apache.ratis.client.impl;
+import org.apache.ratis.protocol.*;
import org.apache.ratis.shaded.com.google.protobuf.ByteString;
import org.apache.ratis.shaded.proto.RaftProtos.*;
-import org.apache.ratis.protocol.*;
import org.apache.ratis.util.ProtoUtils;
import org.apache.ratis.util.ReflectionUtils;
@@ -29,54 +29,68 @@ import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto.Exce
import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.STATEMACHINEEXCEPTION;
public class ClientProtoUtils {
+
public static RaftRpcReplyProto.Builder toRaftRpcReplyProtoBuilder(
- byte[] requestorId, byte[] replyId, byte[] groupId, long callId, boolean success) {
+ ByteString requestorId, ByteString replyId, RaftGroupId groupId,
+ long callId, boolean success) {
return RaftRpcReplyProto.newBuilder()
- .setRequestorId(ProtoUtils.toByteString(requestorId))
- .setReplyId(ProtoUtils.toByteString(replyId))
- .setRaftGroupId(RaftGroupIdProto.newBuilder().setId(ProtoUtils.toByteString(groupId)))
+ .setRequestorId(requestorId)
+ .setReplyId(replyId)
+ .setRaftGroupId(ProtoUtils.toRaftGroupIdProtoBuilder(groupId))
.setCallId(callId)
.setSuccess(success);
}
public static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(
- byte[] requesterId, byte[] replyId, byte[] raftGroupId, long callId) {
+ ByteString requesterId, ByteString replyId, RaftGroupId groupId, long callId) {
return RaftRpcRequestProto.newBuilder()
- .setRequestorId(ProtoUtils.toByteString(requesterId))
- .setReplyId(ProtoUtils.toByteString(replyId))
- .setRaftGroupId(RaftGroupIdProto.newBuilder().setId(ProtoUtils.toByteString(raftGroupId)))
+ .setRequestorId(requesterId)
+ .setReplyId(replyId)
+ .setRaftGroupId(ProtoUtils.toRaftGroupIdProtoBuilder(groupId))
.setCallId(callId);
}
+ public static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(
+ ClientId requesterId, RaftPeerId replyId, RaftGroupId groupId, long callId) {
+ return toRaftRpcRequestProtoBuilder(
+ requesterId.toByteString(), replyId.toByteString(), groupId, callId);
+ }
+
+ private static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(
+ RaftClientRequest request) {
+ return toRaftRpcRequestProtoBuilder(
+ request.getClientId(),
+ request.getServerId(),
+ request.getRaftGroupId(),
+ request.getCallId());
+ }
+
public static RaftClientRequest toRaftClientRequest(RaftClientRequestProto p) {
- ClientId clientId = new ClientId(
- p.getRpcRequest().getRequestorId().toByteArray());
- RaftGroupId groupId =
- new RaftGroupId(p.getRpcRequest().getRaftGroupId().getId().toByteArray());
- RaftPeerId serverId = RaftPeerId.valueOf(p.getRpcRequest().getReplyId());
- return new RaftClientRequest(clientId, serverId, groupId,
- p.getRpcRequest().getCallId(),
+ final RaftRpcRequestProto request = p.getRpcRequest();
+ return new RaftClientRequest(
+ new ClientId(request.getRequestorId()),
+ RaftPeerId.valueOf(request.getReplyId()),
+ ProtoUtils.toRaftGroupId(request.getRaftGroupId()),
+ request.getCallId(),
toMessage(p.getMessage()), p.getReadOnly());
}
public static RaftClientRequestProto toRaftClientRequestProto(
RaftClientRequest request) {
return RaftClientRequestProto.newBuilder()
- .setRpcRequest(toRaftRpcRequestProtoBuilder(request.getClientId().toBytes(),
- request.getServerId().toBytes(), request.getRaftGroupId().toBytes(),
- request.getCallId()))
- .setMessage(toClientMessageEntryProto(request.getMessage()))
+ .setRpcRequest(toRaftRpcRequestProtoBuilder(request))
+ .setMessage(toClientMessageEntryProtoBuilder(request.getMessage()))
.setReadOnly(request.isReadOnly())
.build();
}
- public static RaftClientRequestProto genRaftClientRequestProto(
+ public static RaftClientRequestProto toRaftClientRequestProto(
ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId,
ByteString content, boolean readOnly) {
return RaftClientRequestProto.newBuilder()
- .setRpcRequest(toRaftRpcRequestProtoBuilder(clientId.toBytes(),
- serverId.toBytes(), groupId.toBytes(), callId))
- .setMessage(ClientMessageEntryProto.newBuilder().setContent(content))
+ .setRpcRequest(toRaftRpcRequestProtoBuilder(
+ clientId, serverId, groupId, callId))
+ .setMessage(toClientMessageEntryProtoBuilder(content))
.setReadOnly(readOnly)
.build();
}
@@ -85,11 +99,11 @@ public class ClientProtoUtils {
RaftClientReply reply) {
final RaftClientReplyProto.Builder b = RaftClientReplyProto.newBuilder();
if (reply != null) {
- b.setRpcReply(toRaftRpcReplyProtoBuilder(reply.getClientId().toBytes(),
- reply.getServerId().toBytes(), reply.getRaftGroupId().toBytes(),
+ b.setRpcReply(toRaftRpcReplyProtoBuilder(reply.getClientId().toByteString(),
+ reply.getServerId().toByteString(), reply.getRaftGroupId(),
reply.getCallId(), reply.isSuccess()));
if (reply.getMessage() != null) {
- b.setMessage(toClientMessageEntryProto(reply.getMessage()));
+ b.setMessage(toClientMessageEntryProtoBuilder(reply.getMessage()));
}
if (reply.isNotLeader()) {
NotLeaderException nle = reply.getNotLeaderException();
@@ -134,8 +148,8 @@ public class ClientProtoUtils {
smeProto.getExceptionClassName(), smeProto.getErrorMsg(),
smeProto.getStacktrace());
}
- ClientId clientId = new ClientId(rp.getRequestorId().toByteArray());
- RaftGroupId groupId = new RaftGroupId(rp.getRaftGroupId().getId().toByteArray());
+ ClientId clientId = new ClientId(rp.getRequestorId());
+ final RaftGroupId groupId = ProtoUtils.toRaftGroupId(rp.getRaftGroupId());
return new RaftClientReply(clientId, RaftPeerId.valueOf(rp.getReplyId()),
groupId, rp.getCallId(), rp.getSuccess(),
toMessage(replyProto.getMessage()), e);
@@ -167,9 +181,12 @@ public class ClientProtoUtils {
return p::getContent;
}
- private static ClientMessageEntryProto toClientMessageEntryProto(Message message) {
- return ClientMessageEntryProto.newBuilder()
- .setContent(message.getContent()).build();
+ private static ClientMessageEntryProto.Builder toClientMessageEntryProtoBuilder(ByteString message) {
+ return ClientMessageEntryProto.newBuilder().setContent(message);
+ }
+
+ private static ClientMessageEntryProto.Builder toClientMessageEntryProtoBuilder(Message message) {
+ return toClientMessageEntryProtoBuilder(message.getContent());
}
public static SetConfigurationRequest toSetConfigurationRequest(
@@ -177,20 +194,16 @@ public class ClientProtoUtils {
final RaftRpcRequestProto m = p.getRpcRequest();
final RaftPeer[] peers = ProtoUtils.toRaftPeerArray(p.getPeersList());
return new SetConfigurationRequest(
- new ClientId(m.getRequestorId().toByteArray()),
+ new ClientId(m.getRequestorId()),
RaftPeerId.valueOf(m.getReplyId()),
- new RaftGroupId(m.getRaftGroupId().getId().toByteArray()),
+ ProtoUtils.toRaftGroupId(m.getRaftGroupId()),
p.getRpcRequest().getCallId(), peers);
}
public static SetConfigurationRequestProto toSetConfigurationRequestProto(
SetConfigurationRequest request) {
return SetConfigurationRequestProto.newBuilder()
- .setRpcRequest(toRaftRpcRequestProtoBuilder(
- request.getClientId().toBytes(),
- request.getServerId().toBytes(),
- request.getRaftGroupId().toBytes(),
- request.getCallId()))
+ .setRpcRequest(toRaftRpcRequestProtoBuilder(request))
.addAllPeers(ProtoUtils.toRaftPeerProtos(
Arrays.asList(request.getPeersInNewConf())))
.build();
@@ -201,20 +214,16 @@ public class ClientProtoUtils {
final RaftRpcRequestProto m = p.getRpcRequest();
final RaftPeer[] peers = ProtoUtils.toRaftPeerArray(p.getPeersList());
return new ReinitializeRequest(
- new ClientId(m.getRequestorId().toByteArray()),
+ new ClientId(m.getRequestorId()),
RaftPeerId.valueOf(m.getReplyId()),
- new RaftGroupId(m.getRaftGroupId().getId().toByteArray()),
+ ProtoUtils.toRaftGroupId(m.getRaftGroupId()),
p.getRpcRequest().getCallId(), peers);
}
public static ReinitializeRequestProto toReinitializeRequestProto(
ReinitializeRequest request) {
return ReinitializeRequestProto.newBuilder()
- .setRpcRequest(toRaftRpcRequestProtoBuilder(
- request.getClientId().toBytes(),
- request.getServerId().toBytes(),
- request.getRaftGroupId().toBytes(),
- request.getCallId()))
+ .setRpcRequest(toRaftRpcRequestProtoBuilder(request))
.addAllPeers(ProtoUtils.toRaftPeerProtos(request.getPeersInGroup().getPeers()))
.build();
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e2bdc247/ratis-common/src/main/java/org/apache/ratis/protocol/ClientId.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/ClientId.java b/ratis-common/src/main/java/org/apache/ratis/protocol/ClientId.java
index a058a21..9b42076 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/ClientId.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/ClientId.java
@@ -17,6 +17,8 @@
*/
package org.apache.ratis.protocol;
+import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+
import java.util.UUID;
/**
@@ -29,7 +31,7 @@ public class ClientId extends RaftId {
return new ClientId(uuid);
}
- public ClientId(byte[] data) {
+ public ClientId(ByteString data) {
super(data);
}
@@ -38,7 +40,7 @@ public class ClientId extends RaftId {
}
@Override
- String createUuidString() {
- return "client-" + super.createUuidString();
+ String createUuidString(UUID uuid) {
+ return "client-" + super.createUuidString(uuid);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e2bdc247/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroupId.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroupId.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroupId.java
index f75ac8f..a7ea70d 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroupId.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroupId.java
@@ -17,6 +17,8 @@
*/
package org.apache.ratis.protocol;
+import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+
import java.util.UUID;
public class RaftGroupId extends RaftId {
@@ -30,12 +32,12 @@ public class RaftGroupId extends RaftId {
super(id);
}
- public RaftGroupId(byte[] data) {
+ public RaftGroupId(ByteString data) {
super(data);
}
@Override
- String createUuidString() {
- return "group-" + super.createUuidString();
+ String createUuidString(UUID uuid) {
+ return "group-" + super.createUuidString(uuid);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e2bdc247/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java
index 5556c17..ebf9f75 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.protocol;
+import org.apache.ratis.shaded.com.google.protobuf.ByteString;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
@@ -28,46 +29,50 @@ import java.util.function.Supplier;
public abstract class RaftId {
public static final int BYTE_LENGTH = 16;
- static UUID toUuid(byte[] data) {
- Objects.requireNonNull(data, "data == null");
- Preconditions.assertTrue(data.length == BYTE_LENGTH,
- "data.length = %s != BYTE_LENGTH = %s", data.length, BYTE_LENGTH);
- ByteBuffer buffer = ByteBuffer.wrap(data);
- return new UUID(buffer.getLong(), buffer.getLong());
+ private static void checkLength(int length, String name) {
+ Preconditions.assertTrue(length == BYTE_LENGTH,
+ " = %s != BYTE_LENGTH = %s", name, length, BYTE_LENGTH);
}
- static byte[] toBytes(UUID uuid) {
+ private static UUID toUuid(ByteString bytes) {
+ Objects.requireNonNull(bytes, "bytes == null");
+ checkLength(bytes.size(), "bytes.size()");
+ final ByteBuffer buf = bytes.asReadOnlyByteBuffer();
+ return new UUID(buf.getLong(), buf.getLong());
+ }
+
+ private static ByteString toByteString(UUID uuid) {
Objects.requireNonNull(uuid, "uuid == null");
- ByteBuffer buf = ByteBuffer.wrap(new byte[BYTE_LENGTH]);
+ final ByteBuffer buf = ByteBuffer.wrap(new byte[BYTE_LENGTH]);
buf.putLong(uuid.getMostSignificantBits());
buf.putLong(uuid.getLeastSignificantBits());
- return buf.array();
+ return ByteString.copyFrom(buf.array());
}
private final UUID uuid;
- private final byte[] uuidBytes;
+ private final Supplier<ByteString> uuidBytes;
private final Supplier<String> uuidString;
- private RaftId(UUID uuid, byte[] bytes) {
+ private RaftId(UUID uuid, Supplier<ByteString> uuidBytes) {
this.uuid = uuid;
- this.uuidBytes = bytes;
- this.uuidString = JavaUtils.memoize(this::createUuidString);
+ this.uuidBytes = uuidBytes;
+ this.uuidString = JavaUtils.memoize(() -> createUuidString(uuid));
}
RaftId(UUID uuid) {
- this(uuid, toBytes(uuid));
+ this(uuid, JavaUtils.memoize(() -> toByteString(uuid)));
}
- public RaftId(byte[] uuidBytes) {
- this(toUuid(uuidBytes), uuidBytes);
+ public RaftId(ByteString uuidBytes) {
+ this(toUuid(uuidBytes), () -> uuidBytes);
}
- String createUuidString() {
+ String createUuidString(UUID uuid) {
return uuid.toString().toUpperCase();
}
- public byte[] toBytes() {
- return uuidBytes;
+ public ByteString toByteString() {
+ return uuidBytes.get();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e2bdc247/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeerId.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeerId.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeerId.java
index 4e1a5d8..06ad836 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeerId.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeerId.java
@@ -21,7 +21,6 @@ import org.apache.ratis.shaded.com.google.protobuf.ByteString;
import org.apache.ratis.util.Preconditions;
import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
@@ -34,8 +33,7 @@ public class RaftPeerId {
private static final Map<String, RaftPeerId> stringMap = new ConcurrentHashMap<>();
public static RaftPeerId valueOf(ByteString id) {
- return byteStringMap.computeIfAbsent(id,
- key -> new RaftPeerId(key.toByteArray()));
+ return byteStringMap.computeIfAbsent(id, RaftPeerId::new);
}
public static RaftPeerId valueOf(String id) {
@@ -49,24 +47,24 @@ public class RaftPeerId {
/** UTF-8 string as id */
private final String idString;
/** The corresponding bytes of {@link #idString}. */
- private final byte[] id;
+ private final ByteString id;
private RaftPeerId(String id) {
this.idString = Objects.requireNonNull(id, "id == null");
Preconditions.assertTrue(!id.isEmpty(), "id is an empty string.");
- this.id = id.getBytes(StandardCharsets.UTF_8);
+ this.id = ByteString.copyFrom(idString, StandardCharsets.UTF_8);
}
- private RaftPeerId(byte[] id) {
+ private RaftPeerId(ByteString id) {
this.id = Objects.requireNonNull(id, "id == null");
- Preconditions.assertTrue(id.length > 0, "id is an empty array.");
- this.idString = new String(id, StandardCharsets.UTF_8);
+ Preconditions.assertTrue(id.size() > 0, "id is empty.");
+ this.idString = id.toString(StandardCharsets.UTF_8);
}
/**
- * @return id in byte[].
+ * @return id in {@link ByteString}.
*/
- public byte[] toBytes() {
+ public ByteString toByteString() {
return id;
}
@@ -78,12 +76,11 @@ public class RaftPeerId {
@Override
public boolean equals(Object other) {
return other == this ||
- (other instanceof RaftPeerId &&
- Arrays.equals(id, ((RaftPeerId) other).id));
+ (other instanceof RaftPeerId && idString.equals(((RaftPeerId)other).idString));
}
@Override
public int hashCode() {
- return Arrays.hashCode(id);
+ return idString.hashCode();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e2bdc247/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
index 7d73251..9a2d530 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
@@ -17,26 +17,20 @@
*/
package org.apache.ratis.util;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.shaded.com.google.protobuf.ByteString;
import org.apache.ratis.shaded.com.google.protobuf.ServiceException;
-import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto;
-import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
-import org.apache.ratis.shaded.proto.RaftProtos.RaftPeerProto;
-import org.apache.ratis.shaded.proto.RaftProtos.RaftRpcReplyProto;
-import org.apache.ratis.shaded.proto.RaftProtos.RaftRpcRequestProto;
-import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteReplyProto;
-import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto;
+import org.apache.ratis.shaded.proto.RaftProtos.*;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
public class ProtoUtils {
public static ByteString toByteString(Object obj) {
@@ -73,7 +67,7 @@ public class ProtoUtils {
public static RaftPeerProto toRaftPeerProto(RaftPeer peer) {
RaftPeerProto.Builder builder = RaftPeerProto.newBuilder()
- .setId(toByteString(peer.getId().toBytes()));
+ .setId(peer.getId().toByteString());
if (peer.getAddress() != null) {
builder.setAddress(peer.getAddress());
}
@@ -109,6 +103,15 @@ public class ProtoUtils {
};
}
+ public static RaftGroupId toRaftGroupId(RaftGroupIdProto proto) {
+ return new RaftGroupId(proto.getId());
+ }
+
+ public static RaftGroupIdProto.Builder toRaftGroupIdProtoBuilder(RaftGroupId id) {
+ return RaftGroupIdProto.newBuilder().setId(id.toByteString());
+ }
+
+
public static boolean isConfigurationLogEntry(LogEntryProto entry) {
return entry.getLogEntryBodyCase() ==
LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY;
@@ -119,7 +122,7 @@ public class ProtoUtils {
ClientId clientId, long callId) {
return LogEntryProto.newBuilder().setTerm(term).setIndex(index)
.setSmLogEntry(operation)
- .setClientId(toByteString(clientId.toBytes())).setCallId(callId)
+ .setClientId(clientId.toByteString()).setCallId(callId)
.build();
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e2bdc247/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
index e7d2cd0..36a588e 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
@@ -17,10 +17,8 @@
*/
package org.apache.ratis.grpc.client;
+import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.util.CollectionUtils;
-import org.apache.ratis.util.Preconditions;
-import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.grpc.RaftGrpcUtil;
import org.apache.ratis.protocol.*;
@@ -28,8 +26,7 @@ import org.apache.ratis.shaded.com.google.protobuf.ByteString;
import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
import org.apache.ratis.shaded.proto.RaftProtos.RaftRpcRequestProto;
-import org.apache.ratis.util.Daemon;
-import org.apache.ratis.util.PeerProxyMap;
+import org.apache.ratis.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,8 +38,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
-import static org.apache.ratis.client.impl.ClientProtoUtils.*;
-
public class AppendStreamer implements Closeable {
public static final Logger LOG = LoggerFactory.getLogger(AppendStreamer.class);
@@ -158,7 +153,7 @@ public class AppendStreamer implements Closeable {
}
if (isRunning()) {
// wrap the current buffer into a RaftClientRequestProto
- final RaftClientRequestProto request = genRaftClientRequestProto(
+ final RaftClientRequestProto request = ClientProtoUtils.toRaftClientRequestProto(
clientId, leaderId, groupId, seqNum, content, false);
dataQueue.offer(request);
this.notifyAll();
@@ -274,7 +269,7 @@ public class AppendStreamer implements Closeable {
}
} else {
// this may be a NotLeaderException
- RaftClientReply r = toRaftClientReply(reply);
+ RaftClientReply r = ClientProtoUtils.toRaftClientReply(reply);
if (r.isNotLeader()) {
LOG.debug("{} received a NotLeaderException from {}", this,
r.getServerId());
@@ -365,8 +360,8 @@ public class AppendStreamer implements Closeable {
RaftClientRequestProto newRequest = RaftClientRequestProto.newBuilder()
.setMessage(oldRequest.getMessage())
.setReadOnly(oldRequest.getReadOnly())
- .setRpcRequest(toRaftRpcRequestProtoBuilder(clientId.toBytes(),
- newLeader.toBytes(), groupId.toBytes(), r.getCallId()))
+ .setRpcRequest(ClientProtoUtils.toRaftRpcRequestProtoBuilder(
+ clientId, newLeader, groupId, r.getCallId()))
.build();
dataQueue.offerFirst(newRequest);
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e2bdc247/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 1a73cba..9a5e8bc 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -861,7 +861,7 @@ public class RaftServerImpl implements RaftServerProtocol,
private void replyPendingRequest(LogEntryProto logEntry,
CompletableFuture<Message> stateMachineFuture) {
// update the retry cache
- final ClientId clientId = new ClientId(logEntry.getClientId().toByteArray());
+ final ClientId clientId = new ClientId(logEntry.getClientId());
final long callId = logEntry.getCallId();
final RaftPeerId serverId = getId();
final RetryCache.CacheEntry cacheEntry = retryCache.getOrCreateEntry(
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e2bdc247/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index 5b11599..845a6ca 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -18,7 +18,6 @@
package org.apache.ratis.server.impl;
import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID;
-import static org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.SUCCESS;
import java.util.Arrays;
import java.util.List;
@@ -28,19 +27,8 @@ import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.protocol.TermIndex;
-import org.apache.ratis.shaded.proto.RaftProtos;
-import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto;
-import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto;
-import org.apache.ratis.shaded.proto.RaftProtos.FileChunkProto;
-import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotReplyProto;
-import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto;
-import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotResult;
-import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
-import org.apache.ratis.shaded.proto.RaftProtos.RaftConfigurationProto;
-import org.apache.ratis.shaded.proto.RaftProtos.RaftRpcReplyProto;
-import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteReplyProto;
-import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteRequestProto;
-import org.apache.ratis.shaded.proto.RaftProtos.TermIndexProto;
+import org.apache.ratis.shaded.proto.RaftProtos.*;
+import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto.*;
import org.apache.ratis.util.ProtoUtils;
@@ -108,23 +96,32 @@ public class ServerProtoUtils {
.build();
}
+ static RaftRpcReplyProto.Builder toRaftRpcReplyProtoBuilder(
+ RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId, boolean success) {
+ return ClientProtoUtils.toRaftRpcReplyProtoBuilder(
+ requestorId.toByteString(), replyId.toByteString(), groupId, DEFAULT_CALLID, success);
+ }
+
public static RequestVoteReplyProto toRequestVoteReplyProto(
RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId,
boolean success, long term, boolean shouldShutdown) {
- final RequestVoteReplyProto.Builder b = RequestVoteReplyProto.newBuilder();
- b.setServerReply(ClientProtoUtils.toRaftRpcReplyProtoBuilder(
- requestorId.toBytes(), replyId.toBytes(), groupId.toBytes(), DEFAULT_CALLID, success))
+ return RequestVoteReplyProto.newBuilder()
+ .setServerReply(toRaftRpcReplyProtoBuilder(requestorId, replyId, groupId, success))
.setTerm(term)
- .setShouldShutdown(shouldShutdown);
- return b.build();
+ .setShouldShutdown(shouldShutdown)
+ .build();
+ }
+
+ static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(
+ RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId) {
+ return ClientProtoUtils.toRaftRpcRequestProtoBuilder(
+ requestorId.toByteString(), replyId.toByteString(), groupId, DEFAULT_CALLID);
}
public static RequestVoteRequestProto toRequestVoteRequestProto(
RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId, long term, TermIndex lastEntry) {
- RaftProtos.RaftRpcRequestProto.Builder rpb = ClientProtoUtils
- .toRaftRpcRequestProtoBuilder(requestorId.toBytes(), replyId.toBytes(), groupId.toBytes(), DEFAULT_CALLID);
final RequestVoteRequestProto.Builder b = RequestVoteRequestProto.newBuilder()
- .setServerRequest(rpb)
+ .setServerRequest(toRaftRpcRequestProtoBuilder(requestorId, replyId, groupId))
.setCandidateTerm(term);
if (lastEntry != null) {
b.setCandidateLastEntry(toTermIndexProto(lastEntry));
@@ -135,8 +132,8 @@ public class ServerProtoUtils {
public static InstallSnapshotReplyProto toInstallSnapshotReplyProto(
RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId,
long term, int requestIndex, InstallSnapshotResult result) {
- final RaftRpcReplyProto.Builder rb = ClientProtoUtils.toRaftRpcReplyProtoBuilder(requestorId.toBytes(),
- replyId.toBytes(), groupId.toBytes(), DEFAULT_CALLID, result == InstallSnapshotResult.SUCCESS);
+ final RaftRpcReplyProto.Builder rb = toRaftRpcReplyProtoBuilder(requestorId,
+ replyId, groupId, result == InstallSnapshotResult.SUCCESS);
final InstallSnapshotReplyProto.Builder builder = InstallSnapshotReplyProto
.newBuilder().setServerReply(rb).setTerm(term).setResult(result)
.setRequestIndex(requestIndex);
@@ -148,9 +145,7 @@ public class ServerProtoUtils {
long term, TermIndex lastTermIndex, List<FileChunkProto> chunks,
long totalSize, boolean done) {
return InstallSnapshotRequestProto.newBuilder()
- .setServerRequest(
- ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId.toBytes(),
- replyId.toBytes(), groupId.toBytes(), DEFAULT_CALLID))
+ .setServerRequest(toRaftRpcRequestProtoBuilder(requestorId, replyId, groupId))
.setRequestId(requestId)
.setRequestIndex(requestIndex)
// .setRaftConfiguration() TODO: save and pass RaftConfiguration
@@ -163,13 +158,13 @@ public class ServerProtoUtils {
public static AppendEntriesReplyProto toAppendEntriesReplyProto(
RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId, long term,
- long nextIndex, AppendEntriesReplyProto.AppendResult appendResult) {
- RaftRpcReplyProto.Builder rb = ClientProtoUtils.toRaftRpcReplyProtoBuilder(requestorId.toBytes(),
- replyId.toBytes(), groupId.toBytes(), DEFAULT_CALLID, appendResult == SUCCESS);
- final AppendEntriesReplyProto.Builder b = AppendEntriesReplyProto.newBuilder();
- b.setServerReply(rb).setTerm(term).setNextIndex(nextIndex)
- .setResult(appendResult);
- return b.build();
+ long nextIndex, AppendResult result) {
+ return AppendEntriesReplyProto.newBuilder()
+ .setServerReply(toRaftRpcReplyProtoBuilder(
+ requestorId, replyId, groupId, result == AppendResult.SUCCESS))
+ .setTerm(term)
+ .setNextIndex(nextIndex)
+ .setResult(result).build();
}
public static AppendEntriesRequestProto toAppendEntriesRequestProto(
@@ -178,9 +173,7 @@ public class ServerProtoUtils {
TermIndex previous) {
final AppendEntriesRequestProto.Builder b = AppendEntriesRequestProto
.newBuilder()
- .setServerRequest(
- ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId.toBytes(),
- replyId.toBytes(), groupId.toBytes(), DEFAULT_CALLID))
+ .setServerRequest(toRaftRpcRequestProtoBuilder(requestorId, replyId, groupId))
.setLeaderTerm(leaderTerm)
.setLeaderCommit(leaderCommit)
.setInitializing(initializing);
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e2bdc247/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index ec39073..f785a30 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -39,8 +39,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
import java.util.Objects;
@@ -48,8 +47,6 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BooleanSupplier;
import java.util.function.IntSupplier;
-import static org.apache.ratis.util.ProtoUtils.toByteString;
-
public class RaftTestUtil {
public static final LogEntryProto[] EMPTY_LOGENTRY_ARRAY = new LogEntryProto[0];
static final Logger LOG = LoggerFactory.getLogger(RaftTestUtil.class);
@@ -168,6 +165,10 @@ public class RaftTestUtil {
}
}
+ public static ByteString toByteString(String string) {
+ return ByteString.copyFrom(string, StandardCharsets.UTF_8);
+ }
+
public static class SimpleMessage implements Message {
public static SimpleMessage[] create(int numMessages) {
return create(numMessages, "m");
@@ -211,7 +212,7 @@ public class RaftTestUtil {
@Override
public ByteString getContent() {
- return toByteString(messageId.getBytes(Charset.forName("UTF-8")));
+ return toByteString(messageId);
}
}
@@ -240,12 +241,7 @@ public class RaftTestUtil {
}
public SMLogEntryProto getLogEntryContent() {
- try {
- return SMLogEntryProto.newBuilder()
- .setData(toByteString(op.getBytes("UTF-8"))).build();
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException(e);
- }
+ return SMLogEntryProto.newBuilder().setData(toByteString(op)).build();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e2bdc247/ratis-server/src/test/java/org/apache/ratis/protocol/TestRaftId.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/protocol/TestRaftId.java b/ratis-server/src/test/java/org/apache/ratis/protocol/TestRaftId.java
new file mode 100644
index 0000000..b5806d5
--- /dev/null
+++ b/ratis-server/src/test/java/org/apache/ratis/protocol/TestRaftId.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestRaftId {
+ @Test
+ public void testClientId() {
+ final ClientId id = ClientId.createId();
+ final ByteString bytes = id.toByteString();
+ Assert.assertEquals(bytes, id.toByteString());
+ Assert.assertEquals(id, new ClientId(bytes));
+ }
+
+ @Test
+ public void testRaftGroupId() {
+ final RaftGroupId id = RaftGroupId.createId();
+ final ByteString bytes = id.toByteString();
+ Assert.assertEquals(bytes, id.toByteString());
+ Assert.assertEquals(id, new RaftGroupId(bytes));
+ }
+
+ @Test
+ public void testRaftPeerId() {
+ final RaftPeerId id = RaftPeerId.valueOf("abc");
+ final ByteString bytes = id.toByteString();
+ Assert.assertEquals(bytes, id.toByteString());
+ Assert.assertEquals(id, RaftPeerId.valueOf(bytes));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e2bdc247/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerReply.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerReply.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerReply.java
index 10d6272..df0545d 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerReply.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerReply.java
@@ -22,6 +22,7 @@ import org.apache.ratis.protocol.RaftRpcMessage;
import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto;
import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotReplyProto;
import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteReplyProto;
+import org.apache.ratis.util.ProtoUtils;
import java.util.Objects;
@@ -102,11 +103,11 @@ public class RaftServerReply implements RaftRpcMessage {
@Override
public RaftGroupId getRaftGroupId() {
if (isAppendEntries()) {
- return new RaftGroupId(appendEntries.getServerReply().getRaftGroupId().toByteArray());
+ return ProtoUtils.toRaftGroupId(appendEntries.getServerReply().getRaftGroupId());
} else if (isRequestVote()) {
- return new RaftGroupId(requestVote.getServerReply().getRaftGroupId().toByteArray());
+ return ProtoUtils.toRaftGroupId(requestVote.getServerReply().getRaftGroupId());
} else {
- return new RaftGroupId(installSnapshot.getServerReply().getRaftGroupId().toByteArray());
+ return ProtoUtils.toRaftGroupId(installSnapshot.getServerReply().getRaftGroupId());
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e2bdc247/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerRequest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerRequest.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerRequest.java
index f9d3d31..e38296c 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerRequest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerRequest.java
@@ -22,6 +22,7 @@ import org.apache.ratis.protocol.RaftRpcMessage;
import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto;
import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto;
import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteRequestProto;
+import org.apache.ratis.util.ProtoUtils;
class RaftServerRequest implements RaftRpcMessage {
private final AppendEntriesRequestProto appendEntries;
@@ -100,11 +101,11 @@ class RaftServerRequest implements RaftRpcMessage {
@Override
public RaftGroupId getRaftGroupId() {
if (isAppendEntries()) {
- return new RaftGroupId(appendEntries.getServerRequest().getRaftGroupId().getId().toByteArray());
+ return ProtoUtils.toRaftGroupId(appendEntries.getServerRequest().getRaftGroupId());
} else if (isRequestVote()) {
- return new RaftGroupId(requestVote.getServerRequest().getRaftGroupId().getId().toByteArray());
+ return ProtoUtils.toRaftGroupId(requestVote.getServerRequest().getRaftGroupId());
} else {
- return new RaftGroupId(installSnapshot.getServerRequest().getRaftGroupId().getId().toByteArray());
+ return ProtoUtils.toRaftGroupId(installSnapshot.getServerRequest().getRaftGroupId());
}
}
}