You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2020/12/29 10:27:29 UTC
[ignite-3] 02/02: IGNITE-13885 compiling
This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch ignite-13885
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 9df47739b0f1fcbc94211442047b760ff38acb80
Author: Alexey Scherbakov <al...@gmail.com>
AuthorDate: Tue Dec 29 13:26:54 2020 +0300
IGNITE-13885 compiling
---
.../com/alipay/sofa/jraft/RaftGroupService.java | 7 +-
.../jraft/core/DefaultJRaftServiceFactory.java | 4 +-
.../java/com/alipay/sofa/jraft/core/NodeImpl.java | 32 +-
.../sofa/jraft/core/ReadOnlyServiceImpl.java | 1 -
.../sofa/jraft/entity/LocalFileMetaOutter.java | 13 +-
.../sofa/jraft/entity/LocalStorageOutter.java | 54 +-
.../com/alipay/sofa/jraft/entity/RaftOutter.java | 16 +
.../sofa/jraft/entity/codec/v1/V1Decoder.java | 1 +
.../com/alipay/sofa/jraft/rpc/CliRequests.java | 4 +-
.../sofa/jraft/rpc/MessageBuilderFactory.java | 9 +
.../sofa/jraft/rpc/RaftRpcServerFactory.java | 4 -
.../com/alipay/sofa/jraft/rpc/RpcRequests.java | 74 +-
.../sofa/jraft/rpc/impl/AbstractClientService.java | 9 +-
.../sofa/jraft/rpc/impl/BoltRaftRpcFactory.java | 98 --
.../alipay/sofa/jraft/rpc/impl/BoltRpcClient.java | 193 ----
.../alipay/sofa/jraft/rpc/impl/BoltRpcServer.java | 179 ---
.../impl/LocalRaftRpcFactory.java} | 35 +-
.../alipay/sofa/jraft/rpc/impl/LocalRpcClient.java | 64 ++
.../impl/LocalRpcServer.java} | 29 +-
.../ClientServiceConnectionEventProcessor.java | 56 -
.../rpc/message/DefaultMessageBuilderFactory.java | 50 +
.../com/alipay/sofa/jraft/storage/FileService.java | 8 +-
.../sofa/jraft/storage/impl/LocalLogStorage.java | 311 +++++
.../sofa/jraft/storage/impl/RocksDBLogStorage.java | 743 ------------
.../alipay/sofa/jraft/storage/io/ProtoBufFile.java | 59 +-
.../alipay/sofa/jraft/storage/log/AbortFile.java | 74 --
.../sofa/jraft/storage/log/CheckpointFile.java | 119 --
.../com/alipay/sofa/jraft/storage/log/LibC.java | 60 -
.../storage/log/RocksDBSegmentLogStorage.java | 1216 --------------------
.../alipay/sofa/jraft/storage/log/SegmentFile.java | 905 ---------------
.../snapshot/local/LocalSnapshotMetaTable.java | 9 +-
.../alipay/sofa/jraft/util/AsciiStringUtil.java | 11 +-
.../com/alipay/sofa/jraft/util/ByteString.java | 14 +
.../java/com/alipay/sofa/jraft/util/BytesUtil.java | 68 +-
.../com/alipay/sofa/jraft/util/JDKMarshaller.java | 32 +
.../com/alipay/sofa/jraft/util/Marshaller.java | 11 +
.../main/java/com/alipay/sofa/jraft/util/Mpsc.java | 24 +-
.../com/alipay/sofa/jraft/util/SegmentList.java | 14 +-
.../com/alipay/sofa/jraft/util/SignalHelper.java | 114 --
.../sofa/jraft/util/StorageOptionsFactory.java | 350 ------
.../alipay/sofa/jraft/util/internal/ThrowUtil.java | 12 +-
.../util/internal/UnsafeIntegerFieldUpdater.java | 49 -
.../util/internal/UnsafeReferenceFieldUpdater.java | 50 -
.../sofa/jraft/util/internal/UnsafeUtf8Util.java | 481 --------
.../sofa/jraft/util/internal/UnsafeUtil.java | 1194 +++++++++----------
.../alipay/sofa/jraft/util/internal/Updaters.java | 36 +-
46 files changed, 1418 insertions(+), 5478 deletions(-)
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/RaftGroupService.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/RaftGroupService.java
index df9f9d1..09ed56e 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/RaftGroupService.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/RaftGroupService.java
@@ -23,7 +23,6 @@ import org.slf4j.LoggerFactory;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.option.NodeOptions;
import com.alipay.sofa.jraft.option.RpcOptions;
-import com.alipay.sofa.jraft.rpc.ProtobufMsgFactory;
import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory;
import com.alipay.sofa.jraft.rpc.RpcServer;
import com.alipay.sofa.jraft.util.Endpoint;
@@ -40,9 +39,9 @@ public class RaftGroupService {
private static final Logger LOG = LoggerFactory.getLogger(RaftGroupService.class);
- static {
- ProtobufMsgFactory.load();
- }
+// static {
+// ProtobufMsgFactory.load();
+// }
private volatile boolean started = false;
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/core/DefaultJRaftServiceFactory.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/core/DefaultJRaftServiceFactory.java
index 5707ba5..68258f6 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/core/DefaultJRaftServiceFactory.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/core/DefaultJRaftServiceFactory.java
@@ -17,6 +17,7 @@
package com.alipay.sofa.jraft.core;
import com.alipay.sofa.jraft.entity.codec.v1.LogEntryV1CodecFactory;
+import com.alipay.sofa.jraft.storage.impl.LocalLogStorage;
import org.apache.commons.lang.StringUtils;
import com.alipay.sofa.jraft.JRaftServiceFactory;
@@ -26,7 +27,6 @@ import com.alipay.sofa.jraft.storage.LogStorage;
import com.alipay.sofa.jraft.storage.RaftMetaStorage;
import com.alipay.sofa.jraft.storage.SnapshotStorage;
import com.alipay.sofa.jraft.storage.impl.LocalRaftMetaStorage;
-import com.alipay.sofa.jraft.storage.impl.RocksDBLogStorage;
import com.alipay.sofa.jraft.storage.snapshot.local.LocalSnapshotStorage;
import com.alipay.sofa.jraft.util.Requires;
import com.alipay.sofa.jraft.util.SPI;
@@ -47,7 +47,7 @@ public class DefaultJRaftServiceFactory implements JRaftServiceFactory {
@Override
public LogStorage createLogStorage(final String uri, final RaftOptions raftOptions) {
Requires.requireTrue(StringUtils.isNotBlank(uri), "Blank log storage uri.");
- return new RocksDBLogStorage(uri, raftOptions);
+ return new LocalLogStorage(uri, raftOptions);
}
@Override
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java
index 09139d5..1621b30 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java
@@ -103,16 +103,12 @@ import com.alipay.sofa.jraft.storage.snapshot.SnapshotExecutorImpl;
import com.alipay.sofa.jraft.util.Describer;
import com.alipay.sofa.jraft.util.DisruptorBuilder;
import com.alipay.sofa.jraft.util.DisruptorMetricSet;
-import com.alipay.sofa.jraft.util.JRaftServiceLoader;
-import com.alipay.sofa.jraft.util.JRaftSignalHandler;
import com.alipay.sofa.jraft.util.LogExceptionHandler;
import com.alipay.sofa.jraft.util.NamedThreadFactory;
import com.alipay.sofa.jraft.util.OnlyForTest;
-import com.alipay.sofa.jraft.util.Platform;
import com.alipay.sofa.jraft.util.RepeatedTimer;
import com.alipay.sofa.jraft.util.Requires;
import com.alipay.sofa.jraft.util.RpcFactoryHelper;
-import com.alipay.sofa.jraft.util.SignalHelper;
import com.alipay.sofa.jraft.util.SystemPropertyUtil;
import com.alipay.sofa.jraft.util.ThreadHelper;
import com.alipay.sofa.jraft.util.ThreadId;
@@ -140,20 +136,20 @@ public class NodeImpl implements Node, RaftServerService {
private static final Logger LOG = LoggerFactory
.getLogger(NodeImpl.class);
- static {
- try {
- if (SignalHelper.supportSignal()) {
- // TODO support windows signal
- if (!Platform.isWindows()) {
- final List<JRaftSignalHandler> handlers = JRaftServiceLoader.load(JRaftSignalHandler.class) //
- .sort();
- SignalHelper.addSignal(SignalHelper.SIG_USR2, handlers);
- }
- }
- } catch (final Throwable t) {
- LOG.error("Fail to add signal.", t);
- }
- }
+// static {
+// try {
+// if (SignalHelper.supportSignal()) {
+// // TODO support windows signal
+// if (!Platform.isWindows()) {
+// final List<JRaftSignalHandler> handlers = JRaftServiceLoader.load(JRaftSignalHandler.class) //
+// .sort();
+// SignalHelper.addSignal(SignalHelper.SIG_USR2, handlers);
+// }
+// }
+// } catch (final Throwable t) {
+// LOG.error("Fail to add signal.", t);
+// }
+// }
public final static RaftTimerFactory TIMER_FACTORY = JRaftUtils
.raftTimerFactory();
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/core/ReadOnlyServiceImpl.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/core/ReadOnlyServiceImpl.java
index 671671e..b55f208 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/core/ReadOnlyServiceImpl.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/core/ReadOnlyServiceImpl.java
@@ -54,7 +54,6 @@ import com.alipay.sofa.jraft.util.NamedThreadFactory;
import com.alipay.sofa.jraft.util.OnlyForTest;
import com.alipay.sofa.jraft.util.ThreadHelper;
import com.alipay.sofa.jraft.util.Utils;
-import com.google.protobuf.ZeroByteStringHelper;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/entity/LocalFileMetaOutter.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/entity/LocalFileMetaOutter.java
index 2220da9..f808749 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/entity/LocalFileMetaOutter.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/entity/LocalFileMetaOutter.java
@@ -19,6 +19,7 @@
package com.alipay.sofa.jraft.entity;
+import com.alipay.sofa.jraft.rpc.Message;
import com.alipay.sofa.jraft.util.ByteString;
public final class LocalFileMetaOutter {
@@ -67,15 +68,25 @@ public final class LocalFileMetaOutter {
}
}
- public interface LocalFileMeta {
+ public interface LocalFileMeta extends Message {
+ static Builder newBuilder() {
+ return null;
+ }
+
ByteString getUserMeta();
FileSource getSource();
java.lang.String getChecksum();
+ boolean hasChecksum();
+
interface Builder {
LocalFileMeta build();
+
+ Builder setUserMeta(ByteString data);
+
+ void mergeFrom(Message fileMeta);
}
}
}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/entity/LocalStorageOutter.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/entity/LocalStorageOutter.java
index e747173..81e54b4 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/entity/LocalStorageOutter.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/entity/LocalStorageOutter.java
@@ -19,6 +19,11 @@
package com.alipay.sofa.jraft.entity;
+import com.alipay.sofa.jraft.rpc.Message;
+import com.alipay.sofa.jraft.storage.RaftMetaStorage;
+import com.alipay.sofa.jraft.util.DisruptorBuilder;
+import java.nio.ByteBuffer;
+
public final class LocalStorageOutter {
public interface ConfigurationPB {
java.util.List<java.lang.String> getPeersList();
@@ -38,13 +43,34 @@ public final class LocalStorageOutter {
long getFirstLogIndex();
}
- public interface StablePBMeta {
+ public interface StablePBMeta extends Message {
+ static Builder newBuilder() {
+ return null;
+ }
+
long getTerm();
java.lang.String getVotedfor();
+
+ interface Builder {
+
+ Builder setTerm(long term);
+
+ Builder setVotedfor(String votedFor);
+
+ StablePBMeta build();
+ }
}
- public interface LocalSnapshotPbMeta {
+ public interface LocalSnapshotPbMeta extends Message {
+ static Builder newBuilder() {
+ return null;
+ }
+
+ static LocalSnapshotPbMeta parseFrom(ByteBuffer buf) {
+ throw new UnsupportedOperationException();
+ }
+
com.alipay.sofa.jraft.entity.RaftOutter.SnapshotMeta getMeta();
java.util.List<com.alipay.sofa.jraft.entity.LocalStorageOutter.LocalSnapshotPbMeta.File> getFilesList();
@@ -53,10 +79,34 @@ public final class LocalStorageOutter {
com.alipay.sofa.jraft.entity.LocalStorageOutter.LocalSnapshotPbMeta.File getFiles(int index);
+ byte[] toByteArray();
+
+ boolean hasMeta();
+
interface File {
+ static Builder newBuilder() {
+ return null;
+ }
+
java.lang.String getName();
LocalFileMetaOutter.LocalFileMeta getMeta();
+
+ public interface Builder {
+ Builder setName(String key);
+
+ Builder setMeta(LocalFileMetaOutter.LocalFileMeta meta);
+
+ File build();
+ }
+ }
+
+ public interface Builder {
+ Builder setMeta(RaftOutter.SnapshotMeta meta);
+
+ Builder addFiles(File file);
+
+ LocalSnapshotPbMeta build();
}
}
}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/entity/RaftOutter.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/entity/RaftOutter.java
index bf8cf92..83d40ea 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/entity/RaftOutter.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/entity/RaftOutter.java
@@ -81,6 +81,10 @@ public final class RaftOutter {
}
public interface SnapshotMeta {
+ static Builder newBuilder() {
+ return null;
+ }
+
long getLastIncludedIndex();
long getLastIncludedTerm();
@@ -111,6 +115,18 @@ public final class RaftOutter {
interface Builder {
SnapshotMeta build();
+
+ Builder setLastIncludedIndex(long lastAppliedIndex);
+
+ Builder setLastIncludedTerm(long lastAppliedTerm);
+
+ Builder addPeers(String peerId);
+
+ Builder addLearners(String learnerId);
+
+ Builder addOldPeers(String oldPeerId);
+
+ Builder addOldLearners(String oldLearnerId);
}
}
}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/entity/codec/v1/V1Decoder.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/entity/codec/v1/V1Decoder.java
index ff811b2..7b92669 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/entity/codec/v1/V1Decoder.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/entity/codec/v1/V1Decoder.java
@@ -32,6 +32,7 @@ import com.alipay.sofa.jraft.util.Bits;
* V1 log entry decoder
* @author boyan(boyan@antfin.com)
*
+ * TODO checksum
*/
@Deprecated
public final class V1Decoder implements LogEntryDecoder {
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/CliRequests.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/CliRequests.java
index 47aecff..4f1f46d 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/CliRequests.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/CliRequests.java
@@ -19,8 +19,6 @@
package com.alipay.sofa.jraft.rpc;
-import com.alipay.sofa.jraft.entity.LeaderChangeContext;
-
public final class CliRequests {
public interface AddPeerRequest extends Message {
java.lang.String getGroupId();
@@ -40,7 +38,7 @@ public final class CliRequests {
}
public static Builder newBuilder() {
- return null;
+ return MessageBuilderFactory.DEFAULT.create();
}
}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/MessageBuilderFactory.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/MessageBuilderFactory.java
new file mode 100644
index 0000000..9d1a965
--- /dev/null
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/MessageBuilderFactory.java
@@ -0,0 +1,9 @@
+package com.alipay.sofa.jraft.rpc;
+
+import com.alipay.sofa.jraft.rpc.message.DefaultMessageBuilderFactory;
+
+public interface MessageBuilderFactory {
+ public static MessageBuilderFactory DEFAULT = new DefaultMessageBuilderFactory();
+
+ CliRequests.AddPeerRequest.Builder create();
+}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/RaftRpcServerFactory.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/RaftRpcServerFactory.java
index 38dc6b6..2e8eb85 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/RaftRpcServerFactory.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/RaftRpcServerFactory.java
@@ -47,10 +47,6 @@ import com.alipay.sofa.jraft.util.RpcFactoryHelper;
*/
public class RaftRpcServerFactory {
- static {
- ProtobufMsgFactory.load();
- }
-
/**
* Creates a raft RPC server with default request executors.
*
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/RpcRequests.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/RpcRequests.java
index 8a34b6d..22f2ff9 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/RpcRequests.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/RpcRequests.java
@@ -26,12 +26,13 @@ import com.alipay.sofa.jraft.option.BootstrapOptions;
import com.alipay.sofa.jraft.option.ReplicatorOptions;
import com.alipay.sofa.jraft.util.ByteString;
import com.alipay.sofa.jraft.util.DisruptorBuilder;
+import java.io.ByteArrayOutputStream;
public final class RpcRequests {
private RpcRequests() {
}
- public interface PingRequest {
+ public interface PingRequest extends Message {
/**
* <code>required int64 send_timestamp = 1;</code>
*/
@@ -137,7 +138,7 @@ public final class RpcRequests {
}
}
- public interface TimeoutNowRequest {
+ public interface TimeoutNowRequest extends Message {
static Builder newBuilder() {
return null;
}
@@ -168,6 +169,10 @@ public final class RpcRequests {
return null;
}
+ static Message getDefaultInstance() {
+ return null;
+ }
+
/**
* <code>required int64 term = 1;</code>
*/
@@ -192,7 +197,7 @@ public final class RpcRequests {
}
}
- public interface RequestVoteRequest {
+ public interface RequestVoteRequest extends Message {
java.lang.String getGroupId();
java.lang.String getServerId();
@@ -263,7 +268,8 @@ public final class RpcRequests {
}
}
- public interface AppendEntriesRequestHeader {
+ // TODO asch not needed
+ public interface AppendEntriesRequestHeader extends Message {
/**
* <code>required string group_id = 1;</code>
*/
@@ -304,28 +310,18 @@ public final class RpcRequests {
long getPrevLogIndex();
- /**
- * <code>repeated .jraft.EntryMeta entries = 7;</code>
- */
java.util.List<com.alipay.sofa.jraft.entity.RaftOutter.EntryMeta> getEntriesList();
- /**
- * <code>repeated .jraft.EntryMeta entries = 7;</code>
- */
com.alipay.sofa.jraft.entity.RaftOutter.EntryMeta getEntries(int index);
- /**
- * <code>repeated .jraft.EntryMeta entries = 7;</code>
- */
int getEntriesCount();
long getCommittedIndex();
- /**
- * <code>optional bytes data = 9;</code>
- */
ByteString getData();
+ boolean hasData();
+
interface Builder {
AppendEntriesRequest build();
@@ -379,7 +375,11 @@ public final class RpcRequests {
}
}
- public interface GetFileRequest {
+ public interface GetFileRequest extends Message {
+ static Builder newBuilder() {
+ return null;
+ }
+
long getReaderId();
java.lang.String getFilename();
@@ -392,22 +392,56 @@ public final class RpcRequests {
interface Builder {
GetFileRequest build();
+
+ long getReaderId();
+
+ String getFilename();
+
+ long getOffset();
+
+ Builder setCount(long cnt);
+
+ long getCount();
+
+ Builder setOffset(long offset);
+
+ Builder setReadPartly(boolean readPartly);
+
+ Builder setFilename(String fileName);
+
+ Builder setReaderId(long readerId);
}
}
public interface GetFileResponse extends HasErrorResponse {
+ static Message getDefaultInstance() {
+ return null;
+ }
+
+ static Builder newBuilder() {
+ return null;
+ }
+
boolean getEof();
long getReadSize();
ErrorResponse getErrorResponse();
+ ByteString getData();
+
interface Builder {
GetFileResponse build();
+
+ Builder setReadSize(int read);
+
+ Builder setEof(boolean eof);
+
+ Builder setData(ByteString data);
}
}
- public interface ReadIndexRequest {
+ public interface ReadIndexRequest extends Message {
static Builder newBuilder() {
return null;
}
@@ -444,6 +478,10 @@ public final class RpcRequests {
return null;
}
+ static Message getDefaultInstance() {
+ return null;
+ }
+
long getIndex();
boolean getSuccess();
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/AbstractClientService.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/AbstractClientService.java
index dc4f745..80e7d1e 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/AbstractClientService.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/AbstractClientService.java
@@ -33,7 +33,6 @@ import com.alipay.sofa.jraft.option.RpcOptions;
import com.alipay.sofa.jraft.rpc.ClientService;
import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.alipay.sofa.jraft.rpc.InvokeContext;
-import com.alipay.sofa.jraft.rpc.ProtobufMsgFactory;
import com.alipay.sofa.jraft.rpc.RaftRpcFactory;
import com.alipay.sofa.jraft.rpc.RpcClient;
import com.alipay.sofa.jraft.rpc.RpcRequests.ErrorResponse;
@@ -58,10 +57,10 @@ import com.alipay.sofa.jraft.rpc.Message;
public abstract class AbstractClientService implements ClientService {
protected static final Logger LOG = LoggerFactory.getLogger(AbstractClientService.class);
-
- static {
- ProtobufMsgFactory.load();
- }
+//
+// static {
+// ProtobufMsgFactory.load();
+// }
protected volatile RpcClient rpcClient;
protected ThreadPoolExecutor rpcExecutor;
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/BoltRaftRpcFactory.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/BoltRaftRpcFactory.java
deleted file mode 100644
index 9fda8b0..0000000
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/BoltRaftRpcFactory.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alipay.sofa.jraft.rpc.impl;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.alipay.remoting.CustomSerializerManager;
-import com.alipay.remoting.InvokeContext;
-import com.alipay.remoting.rpc.RpcConfigManager;
-import com.alipay.remoting.rpc.RpcConfigs;
-import com.alipay.sofa.jraft.option.RpcOptions;
-import com.alipay.sofa.jraft.rpc.ProtobufSerializer;
-import com.alipay.sofa.jraft.rpc.RaftRpcFactory;
-import com.alipay.sofa.jraft.rpc.RpcClient;
-import com.alipay.sofa.jraft.rpc.RpcServer;
-import com.alipay.sofa.jraft.util.Endpoint;
-import com.alipay.sofa.jraft.util.Requires;
-import com.alipay.sofa.jraft.util.SPI;
-import com.alipay.sofa.jraft.util.SystemPropertyUtil;
-
-/**
- *
- * @author jiachun.fjc
- */
-@SPI
-public class BoltRaftRpcFactory implements RaftRpcFactory {
-
- private static final Logger LOG = LoggerFactory.getLogger(BoltRaftRpcFactory.class);
-
- static final int CHANNEL_WRITE_BUF_LOW_WATER_MARK = SystemPropertyUtil.getInt(
- "bolt.channel_write_buf_low_water_mark",
- 256 * 1024);
- static final int CHANNEL_WRITE_BUF_HIGH_WATER_MARK = SystemPropertyUtil.getInt(
- "bolt.channel_write_buf_high_water_mark",
- 512 * 1024);
-
- @Override
- public void registerProtobufSerializer(final String className, final Object... args) {
- CustomSerializerManager.registerCustomSerializer(className, ProtobufSerializer.INSTANCE);
- }
-
- @Override
- public RpcClient createRpcClient(final ConfigHelper<RpcClient> helper) {
- final com.alipay.remoting.rpc.RpcClient boltImpl = new com.alipay.remoting.rpc.RpcClient();
- final RpcClient rpcClient = new BoltRpcClient(boltImpl);
- if (helper != null) {
- helper.config(rpcClient);
- }
- return rpcClient;
- }
-
- @Override
- public RpcServer createRpcServer(final Endpoint endpoint, final ConfigHelper<RpcServer> helper) {
- final int port = Requires.requireNonNull(endpoint, "endpoint").getPort();
- Requires.requireTrue(port > 0 && port < 0xFFFF, "port out of range:" + port);
- final com.alipay.remoting.rpc.RpcServer boltImpl = new com.alipay.remoting.rpc.RpcServer(port, true, false);
- final RpcServer rpcServer = new BoltRpcServer(boltImpl);
- if (helper != null) {
- helper.config(rpcServer);
- }
- return rpcServer;
- }
-
- @Override
- public ConfigHelper<RpcClient> defaultJRaftClientConfigHelper(final RpcOptions opts) {
- return ins -> {
- final BoltRpcClient client = (BoltRpcClient) ins;
- final InvokeContext ctx = new InvokeContext();
- ctx.put(InvokeContext.BOLT_CRC_SWITCH, opts.isEnableRpcChecksum());
- client.setDefaultInvokeCtx(ctx);
- };
- }
-
- @Override
- public void ensurePipeline() {
- // enable `bolt.rpc.dispatch-msg-list-in-default-executor` system property
- if (RpcConfigManager.dispatch_msg_list_in_default_executor()) {
- System.setProperty(RpcConfigs.DISPATCH_MSG_LIST_IN_DEFAULT_EXECUTOR, "false");
- LOG.warn("JRaft SET {} to be false for replicator pipeline optimistic.",
- RpcConfigs.DISPATCH_MSG_LIST_IN_DEFAULT_EXECUTOR);
- }
- }
-}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/BoltRpcClient.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/BoltRpcClient.java
deleted file mode 100644
index 1728caa..0000000
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/BoltRpcClient.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alipay.sofa.jraft.rpc.impl;
-
-import java.util.Map;
-import java.util.concurrent.Executor;
-
-import com.alipay.remoting.ConnectionEventType;
-import com.alipay.remoting.RejectedExecutionPolicy;
-import com.alipay.remoting.config.switches.GlobalSwitch;
-import com.alipay.sofa.jraft.ReplicatorGroup;
-import com.alipay.sofa.jraft.error.InvokeTimeoutException;
-import com.alipay.sofa.jraft.error.RemotingException;
-import com.alipay.sofa.jraft.option.RpcOptions;
-import com.alipay.sofa.jraft.rpc.InvokeCallback;
-import com.alipay.sofa.jraft.rpc.InvokeContext;
-import com.alipay.sofa.jraft.rpc.RpcClient;
-import com.alipay.sofa.jraft.rpc.impl.core.ClientServiceConnectionEventProcessor;
-import com.alipay.sofa.jraft.util.Endpoint;
-import com.alipay.sofa.jraft.util.Requires;
-
-/**
- * Bolt rpc client impl.
- *
- * @author jiachun.fjc
- */
-public class BoltRpcClient implements RpcClient {
-
- public static final String BOLT_CTX = "BOLT_CTX";
- public static final String BOLT_REJECTED_EXECUTION_POLICY = "BOLT_REJECTED_EXECUTION_POLICY";
-
- private final com.alipay.remoting.rpc.RpcClient rpcClient;
- private com.alipay.remoting.InvokeContext defaultInvokeCtx;
-
- public BoltRpcClient(com.alipay.remoting.rpc.RpcClient rpcClient) {
- this.rpcClient = Requires.requireNonNull(rpcClient, "rpcClient");
- }
-
- @Override
- public boolean init(final RpcOptions opts) {
- this.rpcClient.switches().turnOn(GlobalSwitch.CODEC_FLUSH_CONSOLIDATION);
- this.rpcClient.initWriteBufferWaterMark(BoltRaftRpcFactory.CHANNEL_WRITE_BUF_LOW_WATER_MARK,
- BoltRaftRpcFactory.CHANNEL_WRITE_BUF_HIGH_WATER_MARK);
- this.rpcClient.enableReconnectSwitch();
- this.rpcClient.startup();
- return true;
- }
-
- @Override
- public void shutdown() {
- this.rpcClient.shutdown();
- }
-
- @Override
- public boolean checkConnection(final Endpoint endpoint) {
- Requires.requireNonNull(endpoint, "endpoint");
- return this.rpcClient.checkConnection(endpoint.toString());
- }
-
- @Override
- public boolean checkConnection(final Endpoint endpoint, final boolean createIfAbsent) {
- Requires.requireNonNull(endpoint, "endpoint");
- return this.rpcClient.checkConnection(endpoint.toString(), true, true);
- }
-
- @Override
- public void closeConnection(final Endpoint endpoint) {
- Requires.requireNonNull(endpoint, "endpoint");
- this.rpcClient.closeConnection(endpoint.toString());
- }
-
- @Override
- public void registerConnectEventListener(final ReplicatorGroup replicatorGroup) {
- this.rpcClient.addConnectionEventProcessor(ConnectionEventType.CONNECT,
- new ClientServiceConnectionEventProcessor(replicatorGroup));
- }
-
- @Override
- public Object invokeSync(final Endpoint endpoint, final Object request, final InvokeContext ctx,
- final long timeoutMs) throws InterruptedException, RemotingException {
- Requires.requireNonNull(endpoint, "endpoint");
- try {
- return this.rpcClient.invokeSync(endpoint.toString(), request, getBoltInvokeCtx(ctx), (int) timeoutMs);
- } catch (final com.alipay.remoting.rpc.exception.InvokeTimeoutException e) {
- throw new InvokeTimeoutException(e);
- } catch (final com.alipay.remoting.exception.RemotingException e) {
- throw new RemotingException(e);
- }
- }
-
- @Override
- public void invokeAsync(final Endpoint endpoint, final Object request, final InvokeContext ctx,
- final InvokeCallback callback, final long timeoutMs) throws InterruptedException,
- RemotingException {
- Requires.requireNonNull(endpoint, "endpoint");
- try {
- this.rpcClient.invokeWithCallback(endpoint.toString(), request, getBoltInvokeCtx(ctx),
- getBoltCallback(callback, ctx), (int) timeoutMs);
- } catch (final com.alipay.remoting.rpc.exception.InvokeTimeoutException e) {
- throw new InvokeTimeoutException(e);
- } catch (final com.alipay.remoting.exception.RemotingException e) {
- throw new RemotingException(e);
- }
- }
-
- public com.alipay.remoting.rpc.RpcClient getRpcClient() {
- return rpcClient;
- }
-
- public com.alipay.remoting.InvokeContext getDefaultInvokeCtx() {
- return defaultInvokeCtx;
- }
-
- public void setDefaultInvokeCtx(com.alipay.remoting.InvokeContext defaultInvokeCtx) {
- this.defaultInvokeCtx = defaultInvokeCtx;
- }
-
- private RejectedExecutionPolicy getRejectedPolicy(final InvokeContext ctx) {
- return ctx == null ? RejectedExecutionPolicy.CALLER_HANDLE_EXCEPTION : ctx.getOrDefault(
- BOLT_REJECTED_EXECUTION_POLICY, RejectedExecutionPolicy.CALLER_HANDLE_EXCEPTION);
- }
-
- private com.alipay.remoting.InvokeContext getBoltInvokeCtx(final InvokeContext ctx) {
- if (ctx == null) {
- return this.defaultInvokeCtx;
- }
-
- com.alipay.remoting.InvokeContext boltCtx = ctx.get(BOLT_CTX);
- if (boltCtx != null) {
- return boltCtx;
- }
-
- boltCtx = new com.alipay.remoting.InvokeContext();
- for (Map.Entry<String, Object> entry : ctx.entrySet()) {
- boltCtx.put(entry.getKey(), entry.getValue());
- }
- final Boolean crcSwitch = ctx.get(InvokeContext.CRC_SWITCH);
- if (crcSwitch != null) {
- boltCtx.put(com.alipay.remoting.InvokeContext.BOLT_CRC_SWITCH, crcSwitch);
- }
- return boltCtx;
- }
-
- private BoltCallback getBoltCallback(final InvokeCallback callback, final InvokeContext ctx) {
- Requires.requireNonNull(callback, "callback");
- return new BoltCallback(callback, getRejectedPolicy(ctx));
- }
-
- private static class BoltCallback implements com.alipay.remoting.RejectionProcessableInvokeCallback {
-
- private final InvokeCallback callback;
- private final RejectedExecutionPolicy rejectedPolicy;
-
- private BoltCallback(final InvokeCallback callback, final RejectedExecutionPolicy rejectedPolicy) {
- this.callback = callback;
- this.rejectedPolicy = rejectedPolicy;
- }
-
- @Override
- public void onResponse(final Object result) {
- this.callback.complete(result, null);
- }
-
- @Override
- public void onException(final Throwable err) {
- this.callback.complete(null, err);
- }
-
- @Override
- public Executor getExecutor() {
- return this.callback.executor();
- }
-
- @Override
- public RejectedExecutionPolicy rejectedExecutionPolicy() {
- return this.rejectedPolicy;
- }
- }
-}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/BoltRpcServer.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/BoltRpcServer.java
deleted file mode 100644
index bf5690b..0000000
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/BoltRpcServer.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alipay.sofa.jraft.rpc.impl;
-
-import java.util.concurrent.Executor;
-
-import com.alipay.remoting.AsyncContext;
-import com.alipay.remoting.BizContext;
-import com.alipay.remoting.ConnectionEventType;
-import com.alipay.remoting.config.switches.GlobalSwitch;
-import com.alipay.remoting.rpc.protocol.AsyncUserProcessor;
-import com.alipay.sofa.jraft.rpc.Connection;
-import com.alipay.sofa.jraft.rpc.RpcContext;
-import com.alipay.sofa.jraft.rpc.RpcProcessor;
-import com.alipay.sofa.jraft.rpc.RpcServer;
-import com.alipay.sofa.jraft.util.Requires;
-
-/**
- * Bolt RPC server impl.
- *
- * @author jiachun.fjc
- */
-public class BoltRpcServer implements RpcServer {
-
- private final com.alipay.remoting.rpc.RpcServer rpcServer;
-
- public BoltRpcServer(final com.alipay.remoting.rpc.RpcServer rpcServer) {
- this.rpcServer = Requires.requireNonNull(rpcServer, "rpcServer");
- }
-
- @Override
- public boolean init(final Void opts) {
- this.rpcServer.switches().turnOn(GlobalSwitch.CODEC_FLUSH_CONSOLIDATION);
- this.rpcServer.initWriteBufferWaterMark(BoltRaftRpcFactory.CHANNEL_WRITE_BUF_LOW_WATER_MARK,
- BoltRaftRpcFactory.CHANNEL_WRITE_BUF_HIGH_WATER_MARK);
- this.rpcServer.startup();
- return this.rpcServer.isStarted();
- }
-
- @Override
- public void shutdown() {
- this.rpcServer.shutdown();
- }
-
- @Override
- public void registerConnectionClosedEventListener(final ConnectionClosedEventListener listener) {
- this.rpcServer.addConnectionEventProcessor(ConnectionEventType.CLOSE, (remoteAddress, conn) -> {
- final Connection proxyConn = conn == null ? null : new Connection() {
-
- @Override
- public Object getAttribute(final String key) {
- return conn.getAttribute(key);
- }
-
- @Override
- public Object setAttributeIfAbsent(final String key, final Object value) {
- return conn.setAttributeIfAbsent(key, value);
- }
-
- @Override
- public void setAttribute(final String key, final Object value) {
- conn.setAttribute(key, value);
- }
-
- @Override
- public void close() {
- conn.close();
- }
- };
-
- listener.onClosed(remoteAddress, proxyConn);
- });
- }
-
- @Override
- public int boundPort() {
- return this.rpcServer.port();
- }
-
- @Override
- public void registerProcessor(final RpcProcessor processor) {
- this.rpcServer.registerUserProcessor(new AsyncUserProcessor<Object>() {
-
- @SuppressWarnings("unchecked")
- @Override
- public void handleRequest(final BizContext bizCtx, final AsyncContext asyncCtx, final Object request) {
- final RpcContext rpcCtx = new RpcContext() {
-
- @Override
- public void sendResponse(final Object responseObj) {
- asyncCtx.sendResponse(responseObj);
- }
-
- @Override
- public Connection getConnection() {
- com.alipay.remoting.Connection conn = bizCtx.getConnection();
- if (conn == null) {
- return null;
- }
- return new BoltConnection(conn);
- }
-
- @Override
- public String getRemoteAddress() {
- return bizCtx.getRemoteAddress();
- }
- };
-
- processor.handleRequest(rpcCtx, request);
- }
-
- @Override
- public String interest() {
- return processor.interest();
- }
-
- @Override
- public ExecutorSelector getExecutorSelector() {
- final RpcProcessor.ExecutorSelector realSelector = processor.executorSelector();
- if (realSelector == null) {
- return null;
- }
- return realSelector::select;
- }
-
- @Override
- public Executor getExecutor() {
- return processor.executor();
- }
- });
- }
-
- public com.alipay.remoting.rpc.RpcServer getServer() {
- return this.rpcServer;
- }
-
- private static class BoltConnection implements Connection {
-
- private final com.alipay.remoting.Connection conn;
-
- private BoltConnection(final com.alipay.remoting.Connection conn) {
- this.conn = Requires.requireNonNull(conn, "conn");
- }
-
- @Override
- public Object getAttribute(final String key) {
- return this.conn.getAttribute(key);
- }
-
- @Override
- public Object setAttributeIfAbsent(final String key, final Object value) {
- return this.conn.setAttributeIfAbsent(key, value);
- }
-
- @Override
- public void setAttribute(final String key, final Object value) {
- this.conn.setAttribute(key, value);
- }
-
- @Override
- public void close() {
- this.conn.close();
- }
- }
-}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/internal/UnsafeLongFieldUpdater.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRaftRpcFactory.java
similarity index 51%
rename from modules/raft/src/main/java/com/alipay/sofa/jraft/util/internal/UnsafeLongFieldUpdater.java
rename to modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRaftRpcFactory.java
index d3f5705..acaa136 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/internal/UnsafeLongFieldUpdater.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRaftRpcFactory.java
@@ -14,36 +14,33 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package com.alipay.sofa.jraft.util.internal;
+package com.alipay.sofa.jraft.rpc.impl;
-import java.lang.reflect.Field;
-import sun.misc.Unsafe;
+import com.alipay.sofa.jraft.rpc.RaftRpcFactory;
+import com.alipay.sofa.jraft.rpc.RpcClient;
+import com.alipay.sofa.jraft.rpc.RpcServer;
+import com.alipay.sofa.jraft.util.Endpoint;
+import com.alipay.sofa.jraft.util.SPI;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
*
* @author jiachun.fjc
*/
-final class UnsafeLongFieldUpdater<U> implements LongFieldUpdater<U> {
+@SPI
+public class LocalRaftRpcFactory implements RaftRpcFactory {
+ private static final Logger LOG = LoggerFactory.getLogger(LocalRaftRpcFactory.class);
- private final long offset;
- private final Unsafe unsafe;
+ @Override public void registerProtobufSerializer(String className, Object... args) {
- UnsafeLongFieldUpdater(Unsafe unsafe, Class<? super U> tClass, String fieldName) throws NoSuchFieldException {
- final Field field = tClass.getDeclaredField(fieldName);
- if (unsafe == null) {
- throw new NullPointerException("unsafe");
- }
- this.unsafe = unsafe;
- this.offset = unsafe.objectFieldOffset(field);
}
- @Override
- public void set(final U obj, final long newValue) {
- this.unsafe.putLong(obj, this.offset, newValue);
+ @Override public RpcClient createRpcClient(ConfigHelper<RpcClient> helper) {
+ return null;
}
- @Override
- public long get(final U obj) {
- return this.unsafe.getLong(obj, this.offset);
+ @Override public RpcServer createRpcServer(Endpoint endpoint, ConfigHelper<RpcServer> helper) {
+ return null;
}
}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcClient.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcClient.java
new file mode 100644
index 0000000..a6c063b
--- /dev/null
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcClient.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alipay.sofa.jraft.rpc.impl;
+
+import com.alipay.sofa.jraft.ReplicatorGroup;
+import com.alipay.sofa.jraft.error.RemotingException;
+import com.alipay.sofa.jraft.option.RpcOptions;
+import com.alipay.sofa.jraft.rpc.InvokeCallback;
+import com.alipay.sofa.jraft.rpc.InvokeContext;
+import com.alipay.sofa.jraft.rpc.RpcClient;
+import com.alipay.sofa.jraft.util.Endpoint;
+
+/**
+ * Bolt rpc client impl.
+ *
+ * @author jiachun.fjc
+ */
+public class LocalRpcClient implements RpcClient {
+ @Override public boolean checkConnection(Endpoint endpoint) {
+ return false;
+ }
+
+ @Override public boolean checkConnection(Endpoint endpoint, boolean createIfAbsent) {
+ return false;
+ }
+
+ @Override public void closeConnection(Endpoint endpoint) {
+
+ }
+
+ @Override public void registerConnectEventListener(ReplicatorGroup replicatorGroup) {
+
+ }
+
+ @Override public Object invokeSync(Endpoint endpoint, Object request, InvokeContext ctx, long timeoutMs) throws InterruptedException, RemotingException {
+ return null;
+ }
+
+ @Override public void invokeAsync(Endpoint endpoint, Object request, InvokeContext ctx, InvokeCallback callback, long timeoutMs) throws InterruptedException, RemotingException {
+
+ }
+
+ @Override public boolean init(RpcOptions opts) {
+ return false;
+ }
+
+ @Override public void shutdown() {
+
+ }
+}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/DebugStatistics.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcServer.java
similarity index 59%
rename from modules/raft/src/main/java/com/alipay/sofa/jraft/util/DebugStatistics.java
rename to modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcServer.java
index b2901a7..4423a42 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/DebugStatistics.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcServer.java
@@ -14,23 +14,34 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package com.alipay.sofa.jraft.util;
+package com.alipay.sofa.jraft.rpc.impl;
-import org.rocksdb.Statistics;
+import com.alipay.sofa.jraft.rpc.RpcProcessor;
+import com.alipay.sofa.jraft.rpc.RpcServer;
/**
+ * Bolt RPC server impl.
*
* @author jiachun.fjc
*/
-public class DebugStatistics extends Statistics {
+public class LocalRpcServer implements RpcServer {
+ @Override public void registerConnectionClosedEventListener(ConnectionClosedEventListener listener) {
- public String getString() {
- return super.toString();
}
- @Override
- public String toString() {
- // no crash when debug
- return getClass().getName() + "@" + Integer.toHexString(hashCode());
+ @Override public void registerProcessor(RpcProcessor<?> processor) {
+
+ }
+
+ @Override public int boundPort() {
+ return 0;
+ }
+
+ @Override public boolean init(Void opts) {
+ return false;
+ }
+
+ @Override public void shutdown() {
+
}
}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/core/ClientServiceConnectionEventProcessor.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/core/ClientServiceConnectionEventProcessor.java
deleted file mode 100644
index d83a60c..0000000
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/core/ClientServiceConnectionEventProcessor.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alipay.sofa.jraft.rpc.impl.core;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.alipay.remoting.Connection;
-import com.alipay.remoting.ConnectionEventProcessor;
-import com.alipay.remoting.ConnectionEventType;
-import com.alipay.sofa.jraft.ReplicatorGroup;
-import com.alipay.sofa.jraft.entity.PeerId;
-
-/**
- * Client RPC service connection event processor for {@link ConnectionEventType#CONNECT}
- *
- * @author boyan (boyan@alibaba-inc.com)
- *
- * 2018-Apr-12 10:21:22 AM
- */
-public class ClientServiceConnectionEventProcessor implements ConnectionEventProcessor {
-
- private static final Logger LOG = LoggerFactory.getLogger(ClientServiceConnectionEventProcessor.class);
-
- private final ReplicatorGroup rgGroup;
-
- public ClientServiceConnectionEventProcessor(ReplicatorGroup rgGroup) {
- super();
- this.rgGroup = rgGroup;
- }
-
- @Override
- public void onEvent(final String remoteAddr, final Connection conn) {
- final PeerId peer = new PeerId();
- if (peer.parse(remoteAddr)) {
- LOG.info("Peer {} is connected", peer);
- this.rgGroup.checkReplicator(peer, true);
- } else {
- LOG.error("Fail to parse peer: {}", remoteAddr);
- }
- }
-}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/DefaultMessageBuilderFactory.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/DefaultMessageBuilderFactory.java
new file mode 100644
index 0000000..ce11a1b
--- /dev/null
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/message/DefaultMessageBuilderFactory.java
@@ -0,0 +1,50 @@
+package com.alipay.sofa.jraft.rpc.message;
+
+import com.alipay.sofa.jraft.rpc.CliRequests;
+import com.alipay.sofa.jraft.rpc.MessageBuilderFactory;
+
+public class DefaultMessageBuilderFactory implements MessageBuilderFactory {
+ @Override public CliRequests.AddPeerRequest.Builder create() {
+ return new AddPeerRequestImpl();
+ }
+
+ private static class AddPeerRequestImpl implements CliRequests.AddPeerRequest, CliRequests.AddPeerRequest.Builder {
+ private String groupId;
+ private String leaderId;
+ private String peerId;
+
+ @Override public String getGroupId() {
+ return groupId;
+ }
+
+ @Override public String getLeaderId() {
+ return leaderId;
+ }
+
+ @Override public String getPeerId() {
+ return peerId;
+ }
+
+ @Override public Builder setGroupId(String groupId) {
+ this.groupId = groupId;
+
+ return this;
+ }
+
+ @Override public Builder setLeaderId(String leaderId) {
+ this.leaderId = leaderId;
+
+ return this;
+ }
+
+ @Override public Builder setPeerId(String peerId) {
+ this.peerId = peerId;
+
+ return this;
+ }
+
+ @Override public CliRequests.AddPeerRequest build() {
+ return this;
+ }
+ }
+}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/storage/FileService.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/storage/FileService.java
index 1aa2b25..d206760 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/storage/FileService.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/storage/FileService.java
@@ -16,14 +16,14 @@
*/
package com.alipay.sofa.jraft.storage;
+import com.alipay.sofa.jraft.util.ByteString;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
-import io.netty.util.internal.ThreadLocalRandom;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,9 +37,7 @@ import com.alipay.sofa.jraft.util.ByteBufferCollector;
import com.alipay.sofa.jraft.util.OnlyForTest;
import com.alipay.sofa.jraft.util.RpcFactoryHelper;
import com.alipay.sofa.jraft.util.Utils;
-import com.google.protobuf.ByteString;
import com.alipay.sofa.jraft.rpc.Message;
-import com.google.protobuf.ZeroByteStringHelper;
/**
* File reader service.
@@ -114,7 +112,7 @@ public final class FileService {
responseBuilder.setData(ByteString.EMPTY);
} else {
// TODO check hole
- responseBuilder.setData(ZeroByteStringHelper.wrap(buf));
+ responseBuilder.setData(new ByteString(buf));
}
return responseBuilder.build();
} catch (final RetryAgainException e) {
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/storage/impl/LocalLogStorage.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/storage/impl/LocalLogStorage.java
new file mode 100644
index 0000000..4176210
--- /dev/null
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/storage/impl/LocalLogStorage.java
@@ -0,0 +1,311 @@
+package com.alipay.sofa.jraft.storage.impl;
+
+import com.alipay.sofa.jraft.conf.Configuration;
+import com.alipay.sofa.jraft.conf.ConfigurationEntry;
+import com.alipay.sofa.jraft.conf.ConfigurationManager;
+import com.alipay.sofa.jraft.entity.EnumOutter;
+import com.alipay.sofa.jraft.entity.LogEntry;
+import com.alipay.sofa.jraft.entity.LogId;
+import com.alipay.sofa.jraft.entity.codec.LogEntryDecoder;
+import com.alipay.sofa.jraft.entity.codec.LogEntryEncoder;
+import com.alipay.sofa.jraft.option.LogStorageOptions;
+import com.alipay.sofa.jraft.option.RaftOptions;
+import com.alipay.sofa.jraft.storage.LogStorage;
+import com.alipay.sofa.jraft.util.Describer;
+import com.alipay.sofa.jraft.util.Requires;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Stores log in heap.
+ *
+ * TODO can use SegmentList.
+ */
+public class LocalLogStorage implements LogStorage, Describer {
+ private static final Logger LOG = LoggerFactory.getLogger(LocalLogStorage.class);
+
+ private final String path;
+ private final boolean sync;
+ private final boolean openStatistics;
+ private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+ private final Lock readLock = this.readWriteLock.readLock();
+ private final Lock writeLock = this.readWriteLock.writeLock();
+
+ private volatile long firstLogIndex = 1;
+
+ private final LinkedList<LogEntry> log = new LinkedList<>();
+
+ private LogEntryEncoder logEntryEncoder;
+ private LogEntryDecoder logEntryDecoder;
+
+ private volatile boolean initialized = false;
+
+ public LocalLogStorage(final String path, final RaftOptions raftOptions) {
+ super();
+ this.path = path;
+ this.sync = raftOptions.isSync();
+ this.openStatistics = raftOptions.isOpenStatistics();
+ }
+
+ @Override
+ public boolean init(final LogStorageOptions opts) {
+ Requires.requireNonNull(opts.getConfigurationManager(), "Null conf manager");
+ Requires.requireNonNull(opts.getLogEntryCodecFactory(), "Null log entry codec factory");
+ this.writeLock.lock();
+ try {
+ if (initialized) {
+ LOG.warn("RocksDBLogStorage init() already.");
+ return true;
+ }
+ this.initialized = true;
+ this.logEntryDecoder = opts.getLogEntryCodecFactory().decoder();
+ this.logEntryEncoder = opts.getLogEntryCodecFactory().encoder();
+ Requires.requireNonNull(this.logEntryDecoder, "Null log entry decoder");
+ Requires.requireNonNull(this.logEntryEncoder, "Null log entry encoder");
+
+ return true;
+ } catch (final Exception e) {
+ LOG.error("Fail to init RocksDBLogStorage, path={}.", this.path, e);
+ return false;
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ /**
+ * Save the first log index into conf column family.
+ */
+ private boolean saveFirstLogIndex(final long firstLogIndex) {
+ this.readLock.lock();
+ try {
+// final byte[] vs = new byte[8];
+// Bits.putLong(vs, 0, firstLogIndex);
+// checkState();
+// this.db.put(this.confHandle, this.writeOptions, FIRST_LOG_IDX_KEY, vs);
+
+ this.firstLogIndex = firstLogIndex;
+
+ return true;
+ } catch (final Exception e) {
+ LOG.error("Fail to save first log index {}.", firstLogIndex, e);
+ return false;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ this.writeLock.lock();
+ try {
+
+ this.initialized = false;
+ this.log.clear();
+ LOG.info("DB destroyed, the db path is: {}.", this.path);
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+// private void closeDB() {
+// this.confHandle.close();
+// this.defaultHandle.close();
+// this.db.close();
+// }
+
+ @Override
+ public long getFirstLogIndex() {
+ this.readLock.lock();
+ try {
+// if (this.hasLoadFirstLogIndex) {
+// return this.firstLogIndex;
+// }
+// checkState();
+// it = this.db.newIterator(this.defaultHandle, this.totalOrderReadOptions);
+// it.seekToFirst();
+// if (it.isValid()) {
+// final long ret = Bits.getLong(it.key(), 0);
+// saveFirstLogIndex(ret);
+// setFirstLogIndex(ret);
+// return ret;
+// }
+ return this.firstLogIndex;
+ } finally {
+// if (it != null) {
+// it.close();
+// }
+ this.readLock.unlock();
+ }
+ }
+
+ @Override
+ public long getLastLogIndex() {
+ this.readLock.lock();
+ //checkState();
+ try {
+// it.seekToLast();
+// if (it.isValid()) {
+// return Bits.getLong(it.key(), 0);
+// }
+
+
+
+ return this.firstLogIndex - 1 + this.log.size();
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ @Override
+ public LogEntry getEntry(final long index) {
+ this.readLock.lock();
+ try {
+ if (index < this.firstLogIndex) {
+ return null;
+ }
+
+ return log.get((int) (this.firstLogIndex - 1 + this.log.size()));
+ } catch (Exception e) {
+ LOG.error("Fail to get log entry at index {}.", index, e);
+ } finally {
+ this.readLock.unlock();
+ }
+ return null;
+ }
+
+ @Override
+ public long getTerm(final long index) {
+ final LogEntry entry = getEntry(index);
+ if (entry != null) {
+ return entry.getId().getTerm();
+ }
+ return 0;
+ }
+
+ @Override
+ public boolean appendEntry(final LogEntry entry) {
+ this.readLock.lock();
+ try {
+ if (!initialized) {
+ LOG.warn("DB not initialized or destroyed.");
+ return false;
+ }
+
+ this.log.add(entry);
+
+ return true;
+ } catch (Exception e) {
+ LOG.error("Fail to append entry.", e);
+ return false;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ @Override
+ public int appendEntries(final List<LogEntry> entries) {
+ if (entries == null || entries.isEmpty()) {
+ return 0;
+ }
+ final int entriesCount = entries.size();
+ try {
+ if (!initialized) {
+ LOG.warn("DB not initialized or destroyed.");
+ return 0;
+ }
+
+ this.log.addAll(entries);
+
+ return entriesCount;
+ } catch (Exception e) {
+ LOG.error("Fail to append entry.", e);
+ return 0;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ @Override
+ public boolean truncatePrefix(final long firstIndexKept) {
+ this.readLock.lock();
+ try {
+ final long startIndex = getFirstLogIndex();
+
+ this.firstLogIndex = firstIndexKept;
+
+ for (long i = startIndex; i < firstIndexKept; i++)
+ log.pollFirst();
+
+ return true;
+ } finally {
+ this.readLock.unlock();
+ }
+
+ }
+
+ @Override
+ public boolean truncateSuffix(final long lastIndexKept) {
+ this.readLock.lock();
+ try {
+ long lastLogIndex = getLastLogIndex();
+
+ while(lastLogIndex-- > lastIndexKept)
+ log.pollLast();
+
+ return true;
+ } catch (Exception e) {
+ LOG.error("Fail to truncateSuffix {}.", lastIndexKept, e);
+ } finally {
+ this.readLock.unlock();
+ }
+ return false;
+ }
+
+ @Override
+ // TOOD it doesn't work.
+ public boolean reset(final long nextLogIndex) {
+ if (nextLogIndex <= 0) {
+ throw new IllegalArgumentException("Invalid next log index.");
+ }
+ this.writeLock.lock();
+ try {
+ LogEntry entry = getEntry(nextLogIndex);
+
+ try {
+ if (false) { // TODO should read snapshot.
+ if (entry == null) {
+ entry = new LogEntry();
+ entry.setType(EnumOutter.EntryType.ENTRY_TYPE_NO_OP);
+ entry.setId(new LogId(nextLogIndex, 0));
+ LOG.warn("Entry not found for nextLogIndex {} when reset.", nextLogIndex);
+ }
+ return appendEntry(entry);
+ } else {
+ return false;
+ }
+ } catch (final Exception e) {
+ LOG.error("Fail to reset next log index.", e);
+ return false;
+ }
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ @Override
+ public void describe(final Printer out) {
+ this.readLock.lock();
+ try {
+ // TODO
+ } catch (final Exception e) {
+ out.println(e);
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/storage/impl/RocksDBLogStorage.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/storage/impl/RocksDBLogStorage.java
deleted file mode 100644
index 50ed51c..0000000
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/storage/impl/RocksDBLogStorage.java
+++ /dev/null
@@ -1,743 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alipay.sofa.jraft.storage.impl;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.rocksdb.BlockBasedTableConfig;
-import org.rocksdb.ColumnFamilyDescriptor;
-import org.rocksdb.ColumnFamilyHandle;
-import org.rocksdb.ColumnFamilyOptions;
-import org.rocksdb.DBOptions;
-import org.rocksdb.Options;
-import org.rocksdb.ReadOptions;
-import org.rocksdb.RocksDB;
-import org.rocksdb.RocksDBException;
-import org.rocksdb.RocksIterator;
-import org.rocksdb.StringAppendOperator;
-import org.rocksdb.WriteBatch;
-import org.rocksdb.WriteOptions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.alipay.sofa.jraft.conf.Configuration;
-import com.alipay.sofa.jraft.conf.ConfigurationEntry;
-import com.alipay.sofa.jraft.conf.ConfigurationManager;
-import com.alipay.sofa.jraft.entity.EnumOutter.EntryType;
-import com.alipay.sofa.jraft.entity.LogEntry;
-import com.alipay.sofa.jraft.entity.LogId;
-import com.alipay.sofa.jraft.entity.codec.LogEntryDecoder;
-import com.alipay.sofa.jraft.entity.codec.LogEntryEncoder;
-import com.alipay.sofa.jraft.option.LogStorageOptions;
-import com.alipay.sofa.jraft.option.RaftOptions;
-import com.alipay.sofa.jraft.storage.LogStorage;
-import com.alipay.sofa.jraft.util.Bits;
-import com.alipay.sofa.jraft.util.BytesUtil;
-import com.alipay.sofa.jraft.util.DebugStatistics;
-import com.alipay.sofa.jraft.util.Describer;
-import com.alipay.sofa.jraft.util.Requires;
-import com.alipay.sofa.jraft.util.StorageOptionsFactory;
-import com.alipay.sofa.jraft.util.Utils;
-
-/**
- * Log storage based on rocksdb.
- *
- * @author boyan (boyan@alibaba-inc.com)
- *
- * 2018-Apr-06 7:27:47 AM
- */
-public class RocksDBLogStorage implements LogStorage, Describer {
-
- private static final Logger LOG = LoggerFactory.getLogger(RocksDBLogStorage.class);
-
- static {
- RocksDB.loadLibrary();
- }
-
- /**
- * Write batch template.
- *
- * @author boyan (boyan@alibaba-inc.com)
- *
- * 2017-Nov-08 11:19:22 AM
- */
- private interface WriteBatchTemplate {
-
- void execute(WriteBatch batch) throws RocksDBException, IOException, InterruptedException;
- }
-
- /**
- * A write context
- * @author boyan(boyan@antfin.com)
- *
- */
- public interface WriteContext {
- /**
- * Start a sub job.
- */
- default void startJob() {
- }
-
- /**
- * Finish a sub job
- */
- default void finishJob() {
- }
-
- /**
- * Adds a callback that will be invoked after all sub jobs finish.
- */
- default void addFinishHook(final Runnable r) {
-
- }
-
- /**
- * Set an exception to context.
- * @param e exception
- */
- default void setError(final Exception e) {
- }
-
- /**
- * Wait for all sub jobs finish.
- */
- default void joinAll() throws InterruptedException, IOException {
- }
- }
-
- /**
- * An empty write context
- * @author boyan(boyan@antfin.com)
- *
- */
- protected static class EmptyWriteContext implements WriteContext {
- static EmptyWriteContext INSTANCE = new EmptyWriteContext();
- }
-
- private final String path;
- private final boolean sync;
- private final boolean openStatistics;
- private RocksDB db;
- private DBOptions dbOptions;
- private WriteOptions writeOptions;
- private final List<ColumnFamilyOptions> cfOptions = new ArrayList<>();
- private ColumnFamilyHandle defaultHandle;
- private ColumnFamilyHandle confHandle;
- private ReadOptions totalOrderReadOptions;
- private DebugStatistics statistics;
- private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
- private final Lock readLock = this.readWriteLock.readLock();
- private final Lock writeLock = this.readWriteLock.writeLock();
-
- private volatile long firstLogIndex = 1;
-
- private volatile boolean hasLoadFirstLogIndex;
-
- private LogEntryEncoder logEntryEncoder;
- private LogEntryDecoder logEntryDecoder;
-
- public RocksDBLogStorage(final String path, final RaftOptions raftOptions) {
- super();
- this.path = path;
- this.sync = raftOptions.isSync();
- this.openStatistics = raftOptions.isOpenStatistics();
- }
-
- public static DBOptions createDBOptions() {
- return StorageOptionsFactory.getRocksDBOptions(RocksDBLogStorage.class);
- }
-
- public static ColumnFamilyOptions createColumnFamilyOptions() {
- final BlockBasedTableConfig tConfig = StorageOptionsFactory
- .getRocksDBTableFormatConfig(RocksDBLogStorage.class);
- return StorageOptionsFactory.getRocksDBColumnFamilyOptions(RocksDBLogStorage.class) //
- .useFixedLengthPrefixExtractor(8) //
- .setTableFormatConfig(tConfig) //
- .setMergeOperator(new StringAppendOperator());
- }
-
- @Override
- public boolean init(final LogStorageOptions opts) {
- Requires.requireNonNull(opts.getConfigurationManager(), "Null conf manager");
- Requires.requireNonNull(opts.getLogEntryCodecFactory(), "Null log entry codec factory");
- this.writeLock.lock();
- try {
- if (this.db != null) {
- LOG.warn("RocksDBLogStorage init() already.");
- return true;
- }
- this.logEntryDecoder = opts.getLogEntryCodecFactory().decoder();
- this.logEntryEncoder = opts.getLogEntryCodecFactory().encoder();
- Requires.requireNonNull(this.logEntryDecoder, "Null log entry decoder");
- Requires.requireNonNull(this.logEntryEncoder, "Null log entry encoder");
- this.dbOptions = createDBOptions();
- if (this.openStatistics) {
- this.statistics = new DebugStatistics();
- this.dbOptions.setStatistics(this.statistics);
- }
-
- this.writeOptions = new WriteOptions();
- this.writeOptions.setSync(this.sync);
- this.totalOrderReadOptions = new ReadOptions();
- this.totalOrderReadOptions.setTotalOrderSeek(true);
-
- return initAndLoad(opts.getConfigurationManager());
- } catch (final RocksDBException e) {
- LOG.error("Fail to init RocksDBLogStorage, path={}.", this.path, e);
- return false;
- } finally {
- this.writeLock.unlock();
- }
-
- }
-
- private boolean initAndLoad(final ConfigurationManager confManager) throws RocksDBException {
- this.hasLoadFirstLogIndex = false;
- this.firstLogIndex = 1;
- final List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
- final ColumnFamilyOptions cfOption = createColumnFamilyOptions();
- this.cfOptions.add(cfOption);
- // Column family to store configuration log entry.
- columnFamilyDescriptors.add(new ColumnFamilyDescriptor("Configuration".getBytes(), cfOption));
- // Default column family to store user data log entry.
- columnFamilyDescriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, cfOption));
-
- openDB(columnFamilyDescriptors);
- load(confManager);
- return onInitLoaded();
- }
-
- /**
- * First log index and last log index key in configuration column family.
- */
- public static final byte[] FIRST_LOG_IDX_KEY = Utils.getBytes("meta/firstLogIndex");
-
- private void load(final ConfigurationManager confManager) {
- checkState();
- try (final RocksIterator it = this.db.newIterator(this.confHandle, this.totalOrderReadOptions)) {
- it.seekToFirst();
- while (it.isValid()) {
- final byte[] ks = it.key();
- final byte[] bs = it.value();
-
- // LogEntry index
- if (ks.length == 8) {
- final LogEntry entry = this.logEntryDecoder.decode(bs);
- if (entry != null) {
- if (entry.getType() == EntryType.ENTRY_TYPE_CONFIGURATION) {
- final ConfigurationEntry confEntry = new ConfigurationEntry();
- confEntry.setId(new LogId(entry.getId().getIndex(), entry.getId().getTerm()));
- confEntry.setConf(new Configuration(entry.getPeers(), entry.getLearners()));
- if (entry.getOldPeers() != null) {
- confEntry.setOldConf(new Configuration(entry.getOldPeers(), entry.getOldLearners()));
- }
- if (confManager != null) {
- confManager.add(confEntry);
- }
- }
- } else {
- LOG.warn("Fail to decode conf entry at index {}, the log data is: {}.", Bits.getLong(ks, 0),
- BytesUtil.toHex(bs));
- }
- } else {
- if (Arrays.equals(FIRST_LOG_IDX_KEY, ks)) {
- setFirstLogIndex(Bits.getLong(bs, 0));
- truncatePrefixInBackground(0L, this.firstLogIndex);
- } else {
- LOG.warn("Unknown entry in configuration storage key={}, value={}.", BytesUtil.toHex(ks),
- BytesUtil.toHex(bs));
- }
- }
- it.next();
- }
- }
- }
-
- private void setFirstLogIndex(final long index) {
- this.firstLogIndex = index;
- this.hasLoadFirstLogIndex = true;
- }
-
- /**
- * Save the first log index into conf column family.
- */
- private boolean saveFirstLogIndex(final long firstLogIndex) {
- this.readLock.lock();
- try {
- final byte[] vs = new byte[8];
- Bits.putLong(vs, 0, firstLogIndex);
- checkState();
- this.db.put(this.confHandle, this.writeOptions, FIRST_LOG_IDX_KEY, vs);
- return true;
- } catch (final RocksDBException e) {
- LOG.error("Fail to save first log index {}.", firstLogIndex, e);
- return false;
- } finally {
- this.readLock.unlock();
- }
- }
-
- private void openDB(final List<ColumnFamilyDescriptor> columnFamilyDescriptors) throws RocksDBException {
- final List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();
-
- final File dir = new File(this.path);
- if (dir.exists() && !dir.isDirectory()) {
- throw new IllegalStateException("Invalid log path, it's a regular file: " + this.path);
- }
- this.db = RocksDB.open(this.dbOptions, this.path, columnFamilyDescriptors, columnFamilyHandles);
-
- assert (columnFamilyHandles.size() == 2);
- this.confHandle = columnFamilyHandles.get(0);
- this.defaultHandle = columnFamilyHandles.get(1);
- }
-
- private void checkState() {
- Requires.requireNonNull(this.db, "DB not initialized or destroyed");
- }
-
- /**
- * Execute write batch template.
- *
- * @param template write batch template
- */
- private boolean executeBatch(final WriteBatchTemplate template) {
- this.readLock.lock();
- if (this.db == null) {
- LOG.warn("DB not initialized or destroyed.");
- this.readLock.unlock();
- return false;
- }
- try (final WriteBatch batch = new WriteBatch()) {
- template.execute(batch);
- this.db.write(this.writeOptions, batch);
- } catch (final RocksDBException e) {
- LOG.error("Execute batch failed with rocksdb exception.", e);
- return false;
- } catch (final IOException e) {
- LOG.error("Execute batch failed with io exception.", e);
- return false;
- } catch (final InterruptedException e) {
- LOG.error("Execute batch failed with interrupt.", e);
- Thread.currentThread().interrupt();
- return false;
- } finally {
- this.readLock.unlock();
- }
- return true;
- }
-
- @Override
- public void shutdown() {
- this.writeLock.lock();
- try {
- // The shutdown order is matter.
- // 1. close column family handles
- closeDB();
- onShutdown();
- // 2. close column family options.
- for (final ColumnFamilyOptions opt : this.cfOptions) {
- opt.close();
- }
- // 3. close options
- this.dbOptions.close();
- if (this.statistics != null) {
- this.statistics.close();
- }
- this.writeOptions.close();
- this.totalOrderReadOptions.close();
- // 4. help gc.
- this.cfOptions.clear();
- this.dbOptions = null;
- this.statistics = null;
- this.writeOptions = null;
- this.totalOrderReadOptions = null;
- this.defaultHandle = null;
- this.confHandle = null;
- this.db = null;
- LOG.info("DB destroyed, the db path is: {}.", this.path);
- } finally {
- this.writeLock.unlock();
- }
- }
-
- private void closeDB() {
- this.confHandle.close();
- this.defaultHandle.close();
- this.db.close();
- }
-
- @Override
- public long getFirstLogIndex() {
- this.readLock.lock();
- RocksIterator it = null;
- try {
- if (this.hasLoadFirstLogIndex) {
- return this.firstLogIndex;
- }
- checkState();
- it = this.db.newIterator(this.defaultHandle, this.totalOrderReadOptions);
- it.seekToFirst();
- if (it.isValid()) {
- final long ret = Bits.getLong(it.key(), 0);
- saveFirstLogIndex(ret);
- setFirstLogIndex(ret);
- return ret;
- }
- return 1L;
- } finally {
- if (it != null) {
- it.close();
- }
- this.readLock.unlock();
- }
- }
-
- @Override
- public long getLastLogIndex() {
- this.readLock.lock();
- checkState();
- try (final RocksIterator it = this.db.newIterator(this.defaultHandle, this.totalOrderReadOptions)) {
- it.seekToLast();
- if (it.isValid()) {
- return Bits.getLong(it.key(), 0);
- }
- return 0L;
- } finally {
- this.readLock.unlock();
- }
- }
-
- @Override
- public LogEntry getEntry(final long index) {
- this.readLock.lock();
- try {
- if (this.hasLoadFirstLogIndex && index < this.firstLogIndex) {
- return null;
- }
- final byte[] keyBytes = getKeyBytes(index);
- final byte[] bs = onDataGet(index, getValueFromRocksDB(keyBytes));
- if (bs != null) {
- final LogEntry entry = this.logEntryDecoder.decode(bs);
- if (entry != null) {
- return entry;
- } else {
- LOG.error("Bad log entry format for index={}, the log data is: {}.", index, BytesUtil.toHex(bs));
- // invalid data remove? TODO
- return null;
- }
- }
- } catch (final RocksDBException | IOException e) {
- LOG.error("Fail to get log entry at index {}.", index, e);
- } finally {
- this.readLock.unlock();
- }
- return null;
- }
-
- protected byte[] getValueFromRocksDB(final byte[] keyBytes) throws RocksDBException {
- checkState();
- return this.db.get(this.defaultHandle, keyBytes);
- }
-
- protected byte[] getKeyBytes(final long index) {
- final byte[] ks = new byte[8];
- Bits.putLong(ks, 0, index);
- return ks;
- }
-
- @Override
- public long getTerm(final long index) {
- final LogEntry entry = getEntry(index);
- if (entry != null) {
- return entry.getId().getTerm();
- }
- return 0;
- }
-
- private void addConfBatch(final LogEntry entry, final WriteBatch batch) throws RocksDBException {
- final byte[] ks = getKeyBytes(entry.getId().getIndex());
- final byte[] content = this.logEntryEncoder.encode(entry);
- batch.put(this.defaultHandle, ks, content);
- batch.put(this.confHandle, ks, content);
- }
-
- private void addDataBatch(final LogEntry entry, final WriteBatch batch,
- final WriteContext ctx) throws RocksDBException, IOException, InterruptedException {
- final long logIndex = entry.getId().getIndex();
- final byte[] content = this.logEntryEncoder.encode(entry);
- batch.put(this.defaultHandle, getKeyBytes(logIndex), onDataAppend(logIndex, content, ctx));
- }
-
- @Override
- public boolean appendEntry(final LogEntry entry) {
- if (entry.getType() == EntryType.ENTRY_TYPE_CONFIGURATION) {
- return executeBatch(batch -> addConfBatch(entry, batch));
- } else {
- this.readLock.lock();
- try {
- if (this.db == null) {
- LOG.warn("DB not initialized or destroyed.");
- return false;
- }
- final WriteContext writeCtx = newWriteContext();
- final long logIndex = entry.getId().getIndex();
- final byte[] valueBytes = this.logEntryEncoder.encode(entry);
- final byte[] newValueBytes = onDataAppend(logIndex, valueBytes, writeCtx);
- writeCtx.startJob();
- this.db.put(this.defaultHandle, this.writeOptions, getKeyBytes(logIndex), newValueBytes);
- writeCtx.joinAll();
- if (newValueBytes != valueBytes) {
- doSync();
- }
- return true;
- } catch (final RocksDBException | IOException e) {
- LOG.error("Fail to append entry.", e);
- return false;
- } catch (final InterruptedException e) {
- Thread.currentThread().interrupt();
- return false;
- } finally {
- this.readLock.unlock();
- }
- }
- }
-
- private void doSync() throws IOException, InterruptedException {
- onSync();
- }
-
- @Override
- public int appendEntries(final List<LogEntry> entries) {
- if (entries == null || entries.isEmpty()) {
- return 0;
- }
- final int entriesCount = entries.size();
- final boolean ret = executeBatch(batch -> {
- final WriteContext writeCtx = newWriteContext();
- for (int i = 0; i < entriesCount; i++) {
- final LogEntry entry = entries.get(i);
- if (entry.getType() == EntryType.ENTRY_TYPE_CONFIGURATION) {
- addConfBatch(entry, batch);
- } else {
- writeCtx.startJob();
- addDataBatch(entry, batch, writeCtx);
- }
- }
- writeCtx.joinAll();
- doSync();
- });
-
- if (ret) {
- return entriesCount;
- } else {
- return 0;
- }
- }
-
- @Override
- public boolean truncatePrefix(final long firstIndexKept) {
- this.readLock.lock();
- try {
- final long startIndex = getFirstLogIndex();
- final boolean ret = saveFirstLogIndex(firstIndexKept);
- if (ret) {
- setFirstLogIndex(firstIndexKept);
- }
- truncatePrefixInBackground(startIndex, firstIndexKept);
- return ret;
- } finally {
- this.readLock.unlock();
- }
-
- }
-
- private void truncatePrefixInBackground(final long startIndex, final long firstIndexKept) {
- // delete logs in background.
- Utils.runInThread(() -> {
- this.readLock.lock();
- try {
- if (this.db == null) {
- return;
- }
- onTruncatePrefix(startIndex, firstIndexKept);
- this.db.deleteRange(this.defaultHandle, getKeyBytes(startIndex), getKeyBytes(firstIndexKept));
- this.db.deleteRange(this.confHandle, getKeyBytes(startIndex), getKeyBytes(firstIndexKept));
- } catch (final RocksDBException | IOException e) {
- LOG.error("Fail to truncatePrefix {}.", firstIndexKept, e);
- } finally {
- this.readLock.unlock();
- }
- });
- }
-
- @Override
- public boolean truncateSuffix(final long lastIndexKept) {
- this.readLock.lock();
- try {
- try {
- onTruncateSuffix(lastIndexKept);
- } finally {
- this.db.deleteRange(this.defaultHandle, this.writeOptions, getKeyBytes(lastIndexKept + 1),
- getKeyBytes(getLastLogIndex() + 1));
- this.db.deleteRange(this.confHandle, this.writeOptions, getKeyBytes(lastIndexKept + 1),
- getKeyBytes(getLastLogIndex() + 1));
- }
- return true;
- } catch (final RocksDBException | IOException e) {
- LOG.error("Fail to truncateSuffix {}.", lastIndexKept, e);
- } finally {
- this.readLock.unlock();
- }
- return false;
- }
-
- @Override
- public boolean reset(final long nextLogIndex) {
- if (nextLogIndex <= 0) {
- throw new IllegalArgumentException("Invalid next log index.");
- }
- this.writeLock.lock();
- try (final Options opt = new Options()) {
- LogEntry entry = getEntry(nextLogIndex);
- closeDB();
- try {
- RocksDB.destroyDB(this.path, opt);
- onReset(nextLogIndex);
- if (initAndLoad(null)) {
- if (entry == null) {
- entry = new LogEntry();
- entry.setType(EntryType.ENTRY_TYPE_NO_OP);
- entry.setId(new LogId(nextLogIndex, 0));
- LOG.warn("Entry not found for nextLogIndex {} when reset.", nextLogIndex);
- }
- return appendEntry(entry);
- } else {
- return false;
- }
- } catch (final RocksDBException e) {
- LOG.error("Fail to reset next log index.", e);
- return false;
- }
- } finally {
- this.writeLock.unlock();
- }
- }
-
- // Hooks for {@link RocksDBSegmentLogStorage}
-
- /**
- * Called after opening RocksDB and loading configuration into conf manager.
- */
- protected boolean onInitLoaded() {
- return true;
- }
-
- /**
- * Called after closing db.
- */
- protected void onShutdown() {
- }
-
- /**
- * Called after resetting db.
- *
- * @param nextLogIndex next log index
- */
- protected void onReset(final long nextLogIndex) {
- }
-
- /**
- * Called after truncating prefix logs in rocksdb.
- *
- * @param startIndex the start index
- * @param firstIndexKept the first index to kept
- */
- protected void onTruncatePrefix(final long startIndex, final long firstIndexKept) throws RocksDBException,
- IOException {
- }
-
- /**
- * Called when sync data into file system.
- */
- protected void onSync() throws IOException, InterruptedException {
- }
-
- protected boolean isSync() {
- return this.sync;
- }
-
- /**
- * Called after truncating suffix logs in rocksdb.
- *
- * @param lastIndexKept the last index to kept
- */
- protected void onTruncateSuffix(final long lastIndexKept) throws RocksDBException, IOException {
- }
-
- protected WriteContext newWriteContext() {
- return EmptyWriteContext.INSTANCE;
- }
-
- /**
- * Called before appending data entry.
- *
- * @param logIndex the log index
- * @param value the data value in log entry.
- * @return the new value
- */
- protected byte[] onDataAppend(final long logIndex, final byte[] value,
- final WriteContext ctx) throws IOException, InterruptedException {
- ctx.finishJob();
- return value;
- }
-
- /**
- * Called after getting data from rocksdb.
- *
- * @param logIndex the log index
- * @param value the value in rocksdb
- * @return the new value
- */
- protected byte[] onDataGet(final long logIndex, final byte[] value) throws IOException {
- return value;
- }
-
- @Override
- public void describe(final Printer out) {
- this.readLock.lock();
- try {
- if (this.db != null) {
- out.println(this.db.getProperty("rocksdb.stats"));
- }
- out.println("");
- if (this.statistics != null) {
- out.println(this.statistics.getString());
- }
- } catch (final RocksDBException e) {
- out.println(e);
- } finally {
- this.readLock.unlock();
- }
- }
-}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/storage/io/ProtoBufFile.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/storage/io/ProtoBufFile.java
index 14e5135..4ce83f0 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/storage/io/ProtoBufFile.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/storage/io/ProtoBufFile.java
@@ -16,6 +16,8 @@
*/
package com.alipay.sofa.jraft.storage.io;
+import com.alipay.sofa.jraft.rpc.CliRequests;
+import com.alipay.sofa.jraft.util.Marshaller;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
@@ -24,7 +26,6 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
-import com.alipay.sofa.jraft.rpc.ProtobufMsgFactory;
import com.alipay.sofa.jraft.util.Bits;
import com.alipay.sofa.jraft.util.Utils;
import com.alipay.sofa.jraft.rpc.Message;
@@ -43,9 +44,9 @@ import com.alipay.sofa.jraft.rpc.Message;
*/
public class ProtoBufFile {
- static {
- ProtobufMsgFactory.load();
- }
+// static {
+// ProtobufMsgFactory.load();
+// }
/** file path */
private final String path;
@@ -68,18 +69,19 @@ public class ProtoBufFile {
try (final FileInputStream fin = new FileInputStream(file);
final BufferedInputStream input = new BufferedInputStream(fin)) {
readBytes(lenBytes, input);
- final int len = Bits.getInt(lenBytes, 0);
+ final int len = Bits.getInt(lenBytes, 0); // TODO asch endianess ?
if (len <= 0) {
throw new IOException("Invalid message fullName.");
}
final byte[] nameBytes = new byte[len];
readBytes(nameBytes, input);
- final String name = new String(nameBytes);
- readBytes(lenBytes, input);
- final int msgLen = Bits.getInt(lenBytes, 0);
- final byte[] msgBytes = new byte[msgLen];
- readBytes(msgBytes, input);
- return ProtobufMsgFactory.newMessageByProtoClassName(name, msgBytes);
+// final String name = new String(nameBytes);
+// readBytes(lenBytes, input);
+// final int msgLen = Bits.getInt(lenBytes, 0);
+// final byte[] msgBytes = new byte[msgLen];
+// readBytes(msgBytes, input);
+// return ProtobufMsgFactory.newMessageByProtoClassName(name, msgBytes);
+ return Marshaller.DEFAULT.unmarshall(nameBytes);
}
}
@@ -102,19 +104,12 @@ public class ProtoBufFile {
final File file = new File(this.path + ".tmp");
try (final FileOutputStream fOut = new FileOutputStream(file);
final BufferedOutputStream output = new BufferedOutputStream(fOut)) {
- final byte[] lenBytes = new byte[4];
+ byte[] bytes = Marshaller.DEFAULT.marshall(msg);
- // name len + name
- final String fullName = msg.getDescriptorForType().getFullName();
- final int nameLen = fullName.length();
- Bits.putInt(lenBytes, 0, nameLen);
- output.write(lenBytes);
- output.write(fullName.getBytes());
- // msg len + msg
- final int msgLen = msg.getSerializedSize();
- Bits.putInt(lenBytes, 0, msgLen);
+ final byte[] lenBytes = new byte[4];
+ Bits.putInt(lenBytes, 0, bytes.length);
output.write(lenBytes);
- msg.writeTo(output);
+ output.write(bytes);
output.flush();
}
if (sync) {
@@ -123,4 +118,24 @@ public class ProtoBufFile {
return Utils.atomicMoveFile(file, new File(this.path), sync);
}
+
+ public static void main(String[] args) throws IOException {
+ File file = File.createTempFile("store", "tmp");
+
+ ProtoBufFile tmp = new ProtoBufFile(file.getAbsolutePath());
+
+ CliRequests.AddPeerRequest.Builder b = CliRequests.AddPeerRequest.newBuilder();
+ b.setGroupId("grp1");
+ b.setLeaderId("zzz");
+ b.setPeerId("tmp");
+
+ CliRequests.AddPeerRequest req0 = b.build();
+ boolean saved = tmp.save(req0, false);
+
+ CliRequests.AddPeerRequest req1 = tmp.load();
+
+ System.out.println();
+
+ file.delete();
+ }
}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/storage/log/AbortFile.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/storage/log/AbortFile.java
deleted file mode 100644
index 84f5f6c..0000000
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/storage/log/AbortFile.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alipay.sofa.jraft.storage.log;
-
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.util.Date;
-
-/**
- * Abort file
- *
- * @author boyan(boyan@antfin.com)
- */
-public class AbortFile {
-
- private final String path;
-
- public String getPath() {
- return this.path;
- }
-
- public AbortFile(final String path) {
- super();
- this.path = path;
- }
-
- public boolean create() throws IOException {
- final File file = new File(this.path);
- if (file.createNewFile()) {
- writeDate();
- return true;
- } else {
- return false;
- }
- }
-
- @SuppressWarnings("deprecation")
- private void writeDate() throws IOException {
- final File file = new File(this.path);
- try (final FileWriter writer = new FileWriter(file, false)) {
- writer.write(new Date().toGMTString());
- writer.write(System.lineSeparator());
- }
- }
-
- public void touch() throws IOException {
- writeDate();
- }
-
- public boolean exists() {
- final File file = new File(this.path);
- return file.isFile() && file.exists();
- }
-
- public boolean destroy() {
- return new File(this.path) //
- .delete();
- }
-}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/storage/log/CheckpointFile.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/storage/log/CheckpointFile.java
deleted file mode 100644
index 88a03a9..0000000
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/storage/log/CheckpointFile.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alipay.sofa.jraft.storage.log;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.commons.io.FileUtils;
-
-import com.alipay.sofa.jraft.entity.LocalFileMetaOutter.LocalFileMeta;
-import com.alipay.sofa.jraft.storage.io.ProtoBufFile;
-import com.alipay.sofa.jraft.util.AsciiStringUtil;
-import com.alipay.sofa.jraft.util.Bits;
-import com.google.protobuf.ZeroByteStringHelper;
-
-/**
- * Segments checkpoint file.
- *
- * @author boyan(boyan@antfin.com)
- */
-public class CheckpointFile {
- /**
- * Checkpoint metadata info.
- *
- * @author boyan(boyan@antfin.com)
- */
- public static final class Checkpoint {
- // Segment file name
- public String segFilename;
- // Segment file current commit position.
- public int committedPos;
-
- public Checkpoint(final String segFilename, final int committedPos) {
- super();
- this.segFilename = segFilename;
- this.committedPos = committedPos;
- }
-
- /**
- * commitPos (4 bytes) + path(4 byte len + string bytes)
- */
- byte[] encode() {
- byte[] ps = AsciiStringUtil.unsafeEncode(this.segFilename);
- byte[] bs = new byte[8 + ps.length];
- Bits.putInt(bs, 0, this.committedPos);
- Bits.putInt(bs, 4, ps.length);
- System.arraycopy(ps, 0, bs, 8, ps.length);
- return bs;
- }
-
- boolean decode(final byte[] bs) {
- if (bs.length < 8) {
- return false;
- }
- this.committedPos = Bits.getInt(bs, 0);
- int len = Bits.getInt(bs, 4);
- this.segFilename = AsciiStringUtil.unsafeDecode(bs, 8, len);
- return this.committedPos >= 0 && !this.segFilename.isEmpty();
- }
-
- @Override
- public String toString() {
- return "Checkpoint [segFilename=" + this.segFilename + ", committedPos=" + this.committedPos + "]";
- }
- }
-
- public void destroy() {
- FileUtils.deleteQuietly(new File(this.path));
- }
-
- public String getPath() {
- return this.path;
- }
-
- private final String path;
-
- public CheckpointFile(final String path) {
- super();
- this.path = path;
- }
-
- public synchronized boolean save(final Checkpoint checkpoint) throws IOException {
- final ProtoBufFile file = new ProtoBufFile(this.path);
- final byte[] data = checkpoint.encode();
-
- final LocalFileMeta meta = LocalFileMeta.newBuilder() //
- .setUserMeta(ZeroByteStringHelper.wrap(data)) //
- .build();
-
- return file.save(meta, true);
- }
-
- public Checkpoint load() throws IOException {
- final ProtoBufFile file = new ProtoBufFile(this.path);
- final LocalFileMeta meta = file.load();
- if (meta != null) {
- final byte[] data = meta.getUserMeta().toByteArray();
- Checkpoint checkpoint = new Checkpoint(null, -1);
- if (checkpoint.decode(data)) {
- return checkpoint;
- }
- }
- return null;
- }
-}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/storage/log/LibC.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/storage/log/LibC.java
deleted file mode 100644
index e68ba32..0000000
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/storage/log/LibC.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alipay.sofa.jraft.storage.log;
-
-import com.sun.jna.Library;
-import com.sun.jna.Native;
-import com.sun.jna.NativeLong;
-import com.sun.jna.Platform;
-import com.sun.jna.Pointer;
-
-/**
- * Moved from rocketmq.
- *
- * https://raw.githubusercontent.com/apache/rocketmq/master/store/src/main/java/org/apache/rocketmq/store/util/LibC.java
- * @author boyan(boyan@antfin.com)
- *
- */
-public interface LibC extends Library {
- LibC INSTANCE = Native.load(Platform.isWindows() ? "msvcrt" : "c", LibC.class);
-
- int MADV_WILLNEED = 3;
- int MADV_DONTNEED = 4;
-
- int MCL_CURRENT = 1;
- int MCL_FUTURE = 2;
- int MCL_ONFAULT = 4;
-
- /* sync memory asynchronously */
- int MS_ASYNC = 0x0001;
- /* invalidate mappings & caches */
- int MS_INVALIDATE = 0x0002;
- /* synchronous memory sync */
- int MS_SYNC = 0x0004;
-
- int mlock(Pointer var1, NativeLong var2);
-
- int munlock(Pointer var1, NativeLong var2);
-
- int madvise(Pointer var1, NativeLong var2, int var3);
-
- Pointer memset(Pointer p, int v, long len);
-
- int mlockall(int flags);
-
- int msync(Pointer p, NativeLong length, int flags);
-}
\ No newline at end of file
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/storage/log/RocksDBSegmentLogStorage.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/storage/log/RocksDBSegmentLogStorage.java
deleted file mode 100644
index 0e1077f..0000000
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/storage/log/RocksDBSegmentLogStorage.java
+++ /dev/null
@@ -1,1216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alipay.sofa.jraft.storage.log;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.regex.Pattern;
-
-import org.apache.commons.io.FileUtils;
-import org.rocksdb.RocksDBException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.alipay.sofa.common.profile.StringUtil;
-import com.alipay.sofa.jraft.option.RaftOptions;
-import com.alipay.sofa.jraft.storage.impl.RocksDBLogStorage;
-import com.alipay.sofa.jraft.storage.log.CheckpointFile.Checkpoint;
-import com.alipay.sofa.jraft.storage.log.SegmentFile.SegmentFileOptions;
-import com.alipay.sofa.jraft.util.ArrayDeque;
-import com.alipay.sofa.jraft.util.Bits;
-import com.alipay.sofa.jraft.util.CountDownEvent;
-import com.alipay.sofa.jraft.util.NamedThreadFactory;
-import com.alipay.sofa.jraft.util.Platform;
-import com.alipay.sofa.jraft.util.Requires;
-import com.alipay.sofa.jraft.util.SystemPropertyUtil;
-import com.alipay.sofa.jraft.util.ThreadPoolUtil;
-import com.alipay.sofa.jraft.util.Utils;
-
-/**
- * Log Storage implementation based on rocksdb and segment files.
- *
- * @author boyan(boyan@antfin.com)
- */
-public class RocksDBSegmentLogStorage extends RocksDBLogStorage {
-
- private static final int PRE_ALLOCATE_SEGMENT_COUNT = 2;
- private static final int MEM_SEGMENT_COUNT = 3;
-
- private static class AllocatedResult {
- SegmentFile segmentFile;
- IOException ie;
-
- public AllocatedResult(final SegmentFile segmentFile) {
- super();
- this.segmentFile = segmentFile;
- }
-
- public AllocatedResult(final IOException ie) {
- super();
- this.ie = ie;
- }
-
- }
-
- public static class BarrierWriteContext implements WriteContext {
- private final CountDownEvent events = new CountDownEvent();
- private volatile Exception e;
- private volatile List<Runnable> hooks;
-
- @Override
- public void startJob() {
- this.events.incrementAndGet();
- }
-
- @Override
- public synchronized void addFinishHook(final Runnable r) {
- if (this.hooks == null) {
- this.hooks = new CopyOnWriteArrayList<>();
- }
- this.hooks.add(r);
- }
-
- @Override
- public void finishJob() {
- this.events.countDown();
- }
-
- @Override
- public void setError(final Exception e) {
- this.e = e;
- }
-
- @Override
- public void joinAll() throws InterruptedException, IOException {
- this.events.await();
- if (this.hooks != null) {
- for (Runnable r : this.hooks) {
- r.run();
- }
- }
- if (this.e != null) {
- throw new IOException("Fail to apppend entries", this.e);
- }
- }
-
- }
-
- private static final String SEGMENT_FILE_POSFIX = ".s";
-
- private static final Logger LOG = LoggerFactory
- .getLogger(RocksDBSegmentLogStorage.class);
-
- /**
- * Default checkpoint interval in milliseconds.
- */
- private static final int DEFAULT_CHECKPOINT_INTERVAL_MS = SystemPropertyUtil.getInt(
- "jraft.log_storage.segment.checkpoint.interval.ms",
- 5000);
-
- /**
- * Location metadata format:
- * 1. magic bytes
- * 2. reserved(2 B)
- * 3. segmentFileName(8 B)
- * 4. wrotePosition(4 B)
- */
- private static final int LOCATION_METADATA_SIZE = SegmentFile.RECORD_MAGIC_BYTES_SIZE + 2 + 8 + 4;
-
- /**
- * Max segment file size, 1G
- */
- private static final int MAX_SEGMENT_FILE_SIZE = SystemPropertyUtil.getInt(
- "jraft.log_storage.segment.max.size.bytes",
- 1024 * 1024 * 1024);
-
- // Default value size threshold to decide whether it will be stored in segments or rocksdb, default is 4K.
- // When the value size is less than 4K, it will be stored in rocksdb directly.
- private static int DEFAULT_VALUE_SIZE_THRESHOLD = SystemPropertyUtil.getInt(
- "jraft.log_storage.segment.value.threshold.bytes",
- 4 * 1024);
-
- /**
- * RocksDBSegmentLogStorage builder
- * @author boyan(boyan@antfin.com)
- *
- */
- public static class Builder {
- private String path;
- private RaftOptions raftOptions;
- private int valueSizeThreshold = DEFAULT_VALUE_SIZE_THRESHOLD;
- private int maxSegmentFileSize = MAX_SEGMENT_FILE_SIZE;
- private ThreadPoolExecutor writeExecutor;
- private int preAllocateSegmentCount = PRE_ALLOCATE_SEGMENT_COUNT;
- private int keepInMemorySegmentCount = MEM_SEGMENT_COUNT;
- private int checkpointIntervalMs = DEFAULT_CHECKPOINT_INTERVAL_MS;
-
- public String getPath() {
- return this.path;
- }
-
- public Builder setPath(final String path) {
- this.path = path;
- return this;
- }
-
- public RaftOptions getRaftOptions() {
- return this.raftOptions;
- }
-
- public Builder setRaftOptions(final RaftOptions raftOptions) {
- this.raftOptions = raftOptions;
- return this;
- }
-
- public int getValueSizeThreshold() {
- return this.valueSizeThreshold;
- }
-
- public Builder setValueSizeThreshold(final int valueSizeThreshold) {
- this.valueSizeThreshold = valueSizeThreshold;
- return this;
- }
-
- public int getMaxSegmentFileSize() {
- return this.maxSegmentFileSize;
- }
-
- public Builder setMaxSegmentFileSize(final int maxSegmentFileSize) {
- this.maxSegmentFileSize = maxSegmentFileSize;
- return this;
- }
-
- public ThreadPoolExecutor getWriteExecutor() {
- return this.writeExecutor;
- }
-
- public Builder setWriteExecutor(final ThreadPoolExecutor writeExecutor) {
- this.writeExecutor = writeExecutor;
- return this;
- }
-
- public int getPreAllocateSegmentCount() {
- return this.preAllocateSegmentCount;
- }
-
- public Builder setPreAllocateSegmentCount(final int preAllocateSegmentCount) {
- this.preAllocateSegmentCount = preAllocateSegmentCount;
- return this;
- }
-
- public int getKeepInMemorySegmentCount() {
- return this.keepInMemorySegmentCount;
- }
-
- public Builder setKeepInMemorySegmentCount(final int keepInMemorySegmentCount) {
- this.keepInMemorySegmentCount = keepInMemorySegmentCount;
- return this;
- }
-
- public int getCheckpointIntervalMs() {
- return this.checkpointIntervalMs;
- }
-
- public Builder setCheckpointIntervalMs(final int checkpointIntervalMs) {
- this.checkpointIntervalMs = checkpointIntervalMs;
- return this;
- }
-
- public RocksDBSegmentLogStorage build() {
- return new RocksDBSegmentLogStorage(this.path, this.raftOptions, this.valueSizeThreshold,
- this.maxSegmentFileSize, this.preAllocateSegmentCount, this.keepInMemorySegmentCount,
- this.checkpointIntervalMs, this.writeExecutor);
- }
-
- }
-
- private final int valueSizeThreshold;
- private final String segmentsPath;
- private final CheckpointFile checkpointFile;
- // used or using segments.
- private List<SegmentFile> segments;
- // pre-allocated and blank segments.
- private ArrayDeque<AllocatedResult> blankSegments;
- private final Lock allocateLock = new ReentrantLock();
- private final Condition fullCond = this.allocateLock.newCondition();
- private final Condition emptyCond = this.allocateLock.newCondition();
- // segment file sequence.
- private final AtomicLong nextFileSequence = new AtomicLong(0);
- private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
- private final Lock writeLock = this.readWriteLock.writeLock();
- private final Lock readLock = this.readWriteLock.readLock();
- private ScheduledExecutorService checkpointExecutor;
- private final AbortFile abortFile;
- private final ThreadPoolExecutor writeExecutor;
- private Thread segmentAllocator;
- private final int maxSegmentFileSize;
- private int preAllocateSegmentCount = PRE_ALLOCATE_SEGMENT_COUNT;
- private int keepInMemorySegmentCount = MEM_SEGMENT_COUNT;
- private int checkpointIntervalMs = DEFAULT_CHECKPOINT_INTERVAL_MS;
-
- /**
- * Creates a RocksDBSegmentLogStorage builder.
- * @return a builder instance.
- */
- public static final Builder builder(final String uri, final RaftOptions raftOptions) {
- return new Builder().setPath(uri).setRaftOptions(raftOptions);
- }
-
- public RocksDBSegmentLogStorage(final String path, final RaftOptions raftOptions) {
- this(path, raftOptions, DEFAULT_VALUE_SIZE_THRESHOLD, MAX_SEGMENT_FILE_SIZE);
- }
-
- public RocksDBSegmentLogStorage(final String path, final RaftOptions raftOptions, final int valueSizeThreshold,
- final int maxSegmentFileSize) {
- this(path, raftOptions, valueSizeThreshold, maxSegmentFileSize, PRE_ALLOCATE_SEGMENT_COUNT, MEM_SEGMENT_COUNT,
- DEFAULT_CHECKPOINT_INTERVAL_MS, createDefaultWriteExecutor());
- }
-
- private static ThreadPoolExecutor createDefaultWriteExecutor() {
- return ThreadPoolUtil.newThreadPool("RocksDBSegmentLogStorage-write-pool", true, Utils.cpus(),
- Utils.cpus() * 3, 60, new ArrayBlockingQueue<>(10000), new NamedThreadFactory(
- "RocksDBSegmentLogStorageWriter"), new ThreadPoolExecutor.CallerRunsPolicy());
- }
-
- public RocksDBSegmentLogStorage(final String path, final RaftOptions raftOptions, final int valueSizeThreshold,
- final int maxSegmentFileSize, final int preAllocateSegmentCount,
- final int keepInMemorySegmentCount, final int checkpointIntervalMs,
- final ThreadPoolExecutor writeExecutor) {
- super(path, raftOptions);
- if (Platform.isMac()) {
- LOG.warn("RocksDBSegmentLogStorage is not recommended on mac os x, it's performance is poorer than RocksDBLogStorage.");
- }
- Requires.requireTrue(maxSegmentFileSize > 0, "maxSegmentFileSize is not greater than zero");
- Requires.requireTrue(preAllocateSegmentCount > 0, "preAllocateSegmentCount is not greater than zero");
- Requires.requireTrue(checkpointIntervalMs > 0, "checkpointIntervalMs is not greater than zero");
- Requires.requireTrue(keepInMemorySegmentCount > 0, "keepInMemorySegmentCount is not greater than zero");
- this.segmentsPath = path + File.separator + "segments";
- this.abortFile = new AbortFile(this.segmentsPath + File.separator + "abort");
- this.checkpointFile = new CheckpointFile(this.segmentsPath + File.separator + "checkpoint");
- this.valueSizeThreshold = valueSizeThreshold;
- this.maxSegmentFileSize = maxSegmentFileSize;
- this.writeExecutor = writeExecutor == null ? createDefaultWriteExecutor() : writeExecutor;
- this.preAllocateSegmentCount = preAllocateSegmentCount;
- this.checkpointIntervalMs = checkpointIntervalMs;
- this.keepInMemorySegmentCount = keepInMemorySegmentCount;
-
- }
-
- private SegmentFile getLastSegmentFile(final long logIndex, final int waitToWroteSize,
- final boolean createIfNecessary, final WriteContext ctx) throws IOException,
- InterruptedException {
- SegmentFile lastFile = null;
- while (true) {
- int segmentCount = 0;
- this.readLock.lock();
- try {
-
- if (!this.segments.isEmpty()) {
- segmentCount = this.segments.size();
- final SegmentFile currLastFile = getLastSegmentWithoutLock();
- if (waitToWroteSize <= 0 || !currLastFile.reachesFileEndBy(waitToWroteSize)) {
- lastFile = currLastFile;
- }
- }
- } finally {
- this.readLock.unlock();
- }
- if (lastFile == null && createIfNecessary) {
- lastFile = createNewSegmentFile(logIndex, segmentCount, ctx);
- if (lastFile != null) {
- return lastFile;
- } else {
- // Try again
- continue;
- }
- }
- return lastFile;
- }
- }
-
- private SegmentFile createNewSegmentFile(final long logIndex, final int oldSegmentCount,
- final WriteContext ctx) throws InterruptedException, IOException {
- SegmentFile segmentFile = null;
- this.writeLock.lock();
- try {
- // CAS by segments count.
- if (this.segments.size() != oldSegmentCount) {
- return segmentFile;
- }
- if (!this.segments.isEmpty()) {
- // Sync current last file and correct it's lastLogIndex.
- final SegmentFile currLastFile = getLastSegmentWithoutLock();
- currLastFile.setLastLogIndex(logIndex - 1);
- ctx.startJob();
- // Attach a finish hook to set last segment file to be read-only.
- ctx.addFinishHook(() -> currLastFile.setReadOnly(true));
- // Run sync in parallel
- this.writeExecutor.execute(() -> {
- try {
- currLastFile.sync(isSync());
- } catch (final IOException e) {
- ctx.setError(e);
- } finally {
- ctx.finishJob();
- }
-
- });
-
- }
- segmentFile = allocateSegmentFile(logIndex);
- return segmentFile;
- } finally {
- this.writeLock.unlock();
- }
-
- }
-
- private SegmentFile allocateSegmentFile(final long index) throws InterruptedException, IOException {
- this.allocateLock.lock();
- try {
- while (this.blankSegments.isEmpty()) {
- this.emptyCond.await();
- }
- final AllocatedResult result = this.blankSegments.pollFirst();
- if (result.ie != null) {
- throw result.ie;
- }
- this.fullCond.signal();
- result.segmentFile.setFirstLogIndex(index);
- this.segments.add(result.segmentFile);
- return result.segmentFile;
- } finally {
- this.allocateLock.unlock();
- }
- }
-
- private SegmentFile allocateNewSegmentFile() throws IOException {
- final String newSegPath = getNewSegmentFilePath();
- SegmentFile segmentFile = new SegmentFile(this.maxSegmentFileSize, newSegPath, this.writeExecutor);
- final SegmentFileOptions opts = SegmentFileOptions.builder() //
- .setSync(false) //
- .setRecover(false) //
- .setLastFile(true) //
- .setNewFile(true) //
- .setPos(0).build();
-
- try {
- if (!segmentFile.init(opts)) {
- throw new IOException("Fail to create new segment file");
- }
- segmentFile.hintLoad();
- LOG.info("Create a new segment file {}.", segmentFile.getPath());
- return segmentFile;
- } catch (IOException e) {
- // Delete the file if fails
- FileUtils.deleteQuietly(new File(newSegPath));
- throw e;
- }
- }
-
- private String getNewSegmentFilePath() {
- return this.segmentsPath + File.separator + String.format("%019d", this.nextFileSequence.getAndIncrement())
- + SEGMENT_FILE_POSFIX;
- }
-
- @Override
- protected void onSync() throws IOException, InterruptedException {
- final SegmentFile lastSegmentFile = getLastSegmentFileForRead();
- if (lastSegmentFile != null) {
- lastSegmentFile.sync(isSync());
- }
- }
-
- private static final Pattern SEGMENT_FILE_NAME_PATTERN = Pattern.compile("[0-9]+\\.s");
-
- @Override
- protected boolean onInitLoaded() {
- final long startMs = Utils.monotonicMs();
- this.writeLock.lock();
- try {
- final File segmentsDir = new File(this.segmentsPath);
- if (!ensureDir(segmentsDir)) {
- return false;
- }
- final Checkpoint checkpoint = loadCheckpoint();
-
- final File[] segmentFiles = segmentsDir
- .listFiles((final File dir, final String name) -> SEGMENT_FILE_NAME_PATTERN.matcher(name).matches());
-
- final boolean normalExit = !this.abortFile.exists();
- if (!normalExit) {
- LOG.info("{} {} did not exit normally, will try to recover last file.", getServiceName(),
- this.segmentsPath);
- }
- this.segments = new ArrayList<>(segmentFiles == null ? 10 : segmentFiles.length);
- this.blankSegments = new ArrayDeque<>();
- List<File> corruptedHeaderSegments = new ArrayList<>();
-
- if (segmentFiles != null && segmentFiles.length > 0) {
- // Sort by sequences.
- Arrays.sort(segmentFiles, Comparator.comparing(RocksDBSegmentLogStorage::getFileSequenceFromFileName));
-
- final String checkpointSegFile = getCheckpointSegFilePath(checkpoint);
-
- // mmap files
- for (int i = 0; i < segmentFiles.length; i++) {
- final File segFile = segmentFiles[i];
- this.nextFileSequence.set(getFileSequenceFromFileName(segFile) + 1);
- final SegmentFile segmentFile = new SegmentFile(this.maxSegmentFileSize, segFile.getAbsolutePath(),
- this.writeExecutor);
-
- if (!segmentFile.mmapFile(false)) {
- assert (segmentFile.isHeaderCorrupted());
- corruptedHeaderSegments.add(segFile);
- continue;
- }
-
- if (segmentFile.isBlank()) {
- this.blankSegments.add(new AllocatedResult(segmentFile));
- } else if (segmentFile.isHeaderCorrupted()) {
- corruptedHeaderSegments.add(segFile);
- } else {
- this.segments.add(segmentFile);
- }
- }
-
- // Processing corrupted header files
- //TODO(boyan) maybe we can find a better solution for such case that new allocated segment file is corrupted when power failure etc.
- if(!processCorruptedHeaderFiles(corruptedHeaderSegments)) {
- return false;
- }
-
- // init blank segments
- if(!initBlankFiles()) {
- return false;
- }
-
- // try to recover segments
- if(!recoverFiles(checkpoint, normalExit, checkpointSegFile)) {
- return false;
- }
- } else {
- if (checkpoint != null) {
- LOG.warn("Missing segment files, checkpoint is: {}", checkpoint);
- return false;
- }
- }
-
- LOG.info("{} Loaded {} segment files and {} blank segment files from path {}.", getServiceName(),
- this.segments.size(), this.blankSegments.size(), this.segmentsPath);
-
- LOG.info("{} segments: \n{}", getServiceName(), descSegments());
-
- startCheckpointTask();
-
- if (normalExit) {
- if (!this.abortFile.create()) {
- LOG.error("Fail to create abort file {}.", this.abortFile.getPath());
- return false;
- }
- } else {
- this.abortFile.touch();
- }
- startSegmentAllocator();
-
- return true;
- } catch (final Exception e) {
- LOG.error("Fail to load segment files from directory {}.", this.segmentsPath, e);
- return false;
- } finally {
- this.writeLock.unlock();
- LOG.info("{} init and load cost {} ms.", getServiceName(), Utils.monotonicMs() - startMs);
- }
- }
-
- private boolean recoverFiles(final Checkpoint checkpoint, final boolean normalExit, final String checkpointSegFile) {
- boolean needRecover = false;
- SegmentFile prevFile = null;
- for (int i = 0; i < this.segments.size(); i++) {
- final boolean isLastFile = i == this.segments.size() - 1;
- SegmentFile segmentFile = this.segments.get(i);
- int pos = segmentFile.getSize();
- if (StringUtil.equalsIgnoreCase(checkpointSegFile, segmentFile.getFilename())) {
- needRecover = true;
- assert (checkpoint != null);
- pos = checkpoint.committedPos;
- } else {
- if (needRecover) {
- pos = 0;
- }
- }
-
- final SegmentFileOptions opts = SegmentFileOptions.builder() //
- .setSync(isSync()) //
- .setRecover(needRecover && !normalExit) //
- .setLastFile(isLastFile) //
- .setNewFile(false) //
- .setPos(pos).build();
-
- if (!segmentFile.init(opts)) {
- LOG.error("Fail to load segment file {}.", segmentFile.getPath());
- segmentFile.shutdown();
- return false;
- }
- /**
- * It's wrote position is from start(HEADER_SIZE) but it's not the last file, SHOULD not happen.
- */
- if (segmentFile.getWrotePos() == SegmentFile.HEADER_SIZE && !isLastFile) {
- LOG.error("Detected corrupted segment file {}.", segmentFile.getPath());
- return false;
- }
-
- if (prevFile != null) {
- prevFile.setLastLogIndex(segmentFile.getFirstLogIndex() - 1);
- }
- prevFile = segmentFile;
- }
- if (getLastLogIndex() > 0 && prevFile != null) {
- prevFile.setLastLogIndex(getLastLogIndex());
- }
- return true;
- }
-
- private boolean initBlankFiles() {
- for (AllocatedResult ret : this.blankSegments) {
- final SegmentFile segmentFile = ret.segmentFile;
- final SegmentFileOptions opts = SegmentFileOptions.builder() //
- .setSync(false) //
- .setRecover(false) //
- .setLastFile(true) //
- .build();
-
- if (!segmentFile.init(opts)) {
- LOG.error("Fail to load blank segment file {}.", segmentFile.getPath());
- segmentFile.shutdown();
- return false;
- }
- }
- return true;
- }
-
- private boolean processCorruptedHeaderFiles(final List<File> corruptedHeaderSegments) throws IOException {
- if (corruptedHeaderSegments.size() == 1) {
- final File corruptedFile = corruptedHeaderSegments.get(0);
- if (getFileSequenceFromFileName(corruptedFile) != this.nextFileSequence.get() - 1) {
- LOG.error("Detected corrupted header segment file {}.", corruptedFile);
- return false;
- } else {
- // The file is the last file,it's the new blank segment but fail to save header, we can
- // remove it safely.
- LOG.warn("Truncate the last segment file {} which it's header is corrupted.",
- corruptedFile.getAbsolutePath());
- // We don't want to delete it, but rename it for safety.
- FileUtils.moveFile(corruptedFile, new File(corruptedFile.getAbsolutePath() + ".corrupted"));
- }
- } else if (corruptedHeaderSegments.size() > 1) {
- // FATAL: it should not happen.
- LOG.error("Detected corrupted header segment files: {}.", corruptedHeaderSegments);
- return false;
- }
-
- return true;
- }
-
- private void startSegmentAllocator() throws IOException {
- // Warmup
- if (this.blankSegments.isEmpty()) {
- doAllocateSegment0();
- }
- // Start the thread.
- this.segmentAllocator = new Thread() {
- @Override
- public void run() {
- doAllocateSegment();
- }
-
- };
- this.segmentAllocator.setDaemon(true);
- this.segmentAllocator.setName("SegmentAllocator");
- this.segmentAllocator.start();
- }
-
- private void doAllocateSegment() {
- LOG.info("SegmentAllocator is started.");
- while (!Thread.currentThread().isInterrupted()) {
- doAllocateSegmentInLock();
- doSwappOutSegments();
- }
- LOG.info("SegmentAllocator exit.");
- }
-
- private void doAllocateSegmentInLock() {
- this.allocateLock.lock();
- try {
- //TODO configure cap
- while (this.blankSegments.size() >= this.preAllocateSegmentCount) {
- this.fullCond.await();
- }
- doAllocateSegment0();
- this.emptyCond.signal();
- } catch (final InterruptedException e) {
- Thread.currentThread().interrupt();
- } catch (final IOException e) {
- this.blankSegments.add(new AllocatedResult(e));
- this.emptyCond.signal();
- } finally {
- this.allocateLock.unlock();
- }
- }
-
- private void doSwappOutSegments() {
- this.readLock.lock();
- try {
- if (this.segments.size() <= this.keepInMemorySegmentCount) {
- return;
- }
- int segmentsInMemeCount = 0;
- int swappedOutCount = 0;
- final long beginTime = Utils.monotonicMs();
- final int lastIndex = this.segments.size() - 1;
- for (int i = lastIndex; i >= 0; i--) {
- SegmentFile segFile = this.segments.get(i);
- if (!segFile.isSwappedOut()) {
- segmentsInMemeCount++;
- if (segmentsInMemeCount >= this.keepInMemorySegmentCount && i != lastIndex) {
- segFile.hintUnload();
- segFile.swapOut();
- swappedOutCount++;
- }
- }
- }
- LOG.info("Swapped out {} segment files, cost {} ms.", swappedOutCount, Utils.monotonicMs() - beginTime);
- } catch (final Exception e) {
- LOG.error("Fail to swap out segments.", e);
- } finally {
- this.readLock.unlock();
- }
- }
-
- private void doAllocateSegment0() throws IOException {
- SegmentFile segFile = allocateNewSegmentFile();
- this.blankSegments.add(new AllocatedResult(segFile));
- }
-
- private static long getFileSequenceFromFileName(final File file) {
- final String name = file.getName();
- assert (name.endsWith(SEGMENT_FILE_POSFIX));
- int idx = name.indexOf(SEGMENT_FILE_POSFIX);
- return Long.valueOf(name.substring(0, idx));
- }
-
- private Checkpoint loadCheckpoint() {
- final Checkpoint checkpoint;
- try {
- checkpoint = this.checkpointFile.load();
- if (checkpoint != null) {
- LOG.info("Loaded checkpoint: {} from {}.", checkpoint, this.checkpointFile.getPath());
- }
- } catch (final IOException e) {
- LOG.error("Fail to load checkpoint file: {}", this.checkpointFile.getPath(), e);
- return null;
- }
- return checkpoint;
- }
-
- private boolean ensureDir(final File segmentsDir) {
- try {
- FileUtils.forceMkdir(segmentsDir);
- return true;
- } catch (final IOException e) {
- LOG.error("Fail to create segments directory: {}", this.segmentsPath, e);
- return false;
- }
- }
-
- private String getCheckpointSegFilePath(final Checkpoint checkpoint) {
- return checkpoint != null ? checkpoint.segFilename : null;
- }
-
- private void startCheckpointTask() {
- this.checkpointExecutor = Executors
- .newSingleThreadScheduledExecutor(new NamedThreadFactory(getServiceName() + "-Checkpoint-Thread-", true));
- this.checkpointExecutor.scheduleAtFixedRate(this::doCheckpoint, this.checkpointIntervalMs,
- this.checkpointIntervalMs, TimeUnit.MILLISECONDS);
- LOG.info("{} started checkpoint task.", getServiceName());
- }
-
- private StringBuilder descSegments() {
- final StringBuilder segmentsDesc = new StringBuilder("[\n");
- for (final SegmentFile segFile : this.segments) {
- segmentsDesc.append(" ").append(segFile.toString()).append("\n");
- }
- segmentsDesc.append("]");
- return segmentsDesc;
- }
-
- private String getServiceName() {
- return this.getClass().getSimpleName();
- }
-
- private void stopSegmentAllocator() {
- this.segmentAllocator.interrupt();
- try {
- this.segmentAllocator.join(500);
- } catch (final InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
-
- @Override
- protected void onShutdown() {
- stopCheckpointTask();
- stopSegmentAllocator();
-
- List<SegmentFile> shutdownFiles = Collections.emptyList();
- this.writeLock.lock();
- try {
- doCheckpoint();
- shutdownFiles = new ArrayList<>(this.segments);
- this.segments.clear();
- if (!this.abortFile.destroy()) {
- LOG.error("Fail to delete abort file {}.", this.abortFile.getPath());
- }
- } finally {
- this.writeLock.unlock();
- for (final SegmentFile segmentFile : shutdownFiles) {
- segmentFile.shutdown();
- }
- shutdownBlankSegments();
- }
- this.writeExecutor.shutdown();
- }
-
- private void shutdownBlankSegments() {
- this.allocateLock.lock();
- try {
- for (final AllocatedResult ret : this.blankSegments) {
- if (ret.segmentFile != null) {
- ret.segmentFile.shutdown();
- }
- }
- } finally {
- this.allocateLock.unlock();
- }
- }
-
- private void stopCheckpointTask() {
- if (this.checkpointExecutor != null) {
- this.checkpointExecutor.shutdownNow();
- try {
- this.checkpointExecutor.awaitTermination(10, TimeUnit.SECONDS);
- } catch (final InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- LOG.info("{} stopped checkpoint task.", getServiceName());
- }
- }
-
- private void doCheckpoint() {
- SegmentFile lastSegmentFile = null;
- try {
- lastSegmentFile = getLastSegmentFileForRead();
- if (lastSegmentFile != null) {
- this.checkpointFile.save(new Checkpoint(lastSegmentFile.getFilename(), lastSegmentFile
- .getCommittedPos()));
- }
- } catch (final InterruptedException e) {
- Thread.currentThread().interrupt();
- } catch (final IOException e) {
- LOG.error("Fatal error, fail to do checkpoint, last segment file is {}.",
- lastSegmentFile != null ? lastSegmentFile.getPath() : "null", e);
- }
- }
-
- public SegmentFile getLastSegmentFileForRead() throws IOException, InterruptedException {
- return getLastSegmentFile(-1, 0, false, null);
- }
-
- @Override
- protected void onReset(final long nextLogIndex) {
- List<SegmentFile> destroyedFiles = new ArrayList<>();
- this.writeLock.lock();
- try {
- this.checkpointFile.destroy();
- destroyedFiles.addAll(this.segments);
- this.segments.clear();
- LOG.info("Destroyed segments and checkpoint in path {} by resetting.", this.segmentsPath);
- } finally {
- this.writeLock.unlock();
- for (SegmentFile segFile : destroyedFiles) {
- segFile.destroy();
- }
- }
- }
-
- private SegmentFile getLastSegmentWithoutLock() {
- return this.segments.get(this.segments.size() - 1);
- }
-
- @Override
- protected void onTruncatePrefix(final long startIndex, final long firstIndexKept) throws RocksDBException,
- IOException {
- List<SegmentFile> destroyedFiles = null;
- this.writeLock.lock();
- try {
- int fromIndex = binarySearchFileIndexByLogIndex(startIndex);
- int toIndex = binarySearchFileIndexByLogIndex(firstIndexKept);
-
- if (fromIndex < 0) {
- fromIndex = 0;
- }
- if (toIndex < 0) {
- // When all the segments contain logs that index is smaller than firstIndexKept,
- // truncate all segments.
- do {
- if (!this.segments.isEmpty()) {
- if (getLastSegmentWithoutLock().getLastLogIndex() < firstIndexKept) {
- toIndex = this.segments.size();
- break;
- }
- }
- LOG.warn("Segment file not found by logIndex={} to be truncate_prefix, current segments:\n{}.",
- firstIndexKept, descSegments());
- return;
- } while (false);
- }
-
- final List<SegmentFile> removedFiles = this.segments.subList(fromIndex, toIndex);
- destroyedFiles = new ArrayList<>(removedFiles);
- removedFiles.clear();
- doCheckpoint();
- } finally {
- this.writeLock.unlock();
- if (destroyedFiles != null) {
- for (final SegmentFile segmentFile : destroyedFiles) {
- segmentFile.destroy();
- }
- }
- }
- }
-
- private boolean isMetadata(final byte[] data) {
- for (int offset = 0; offset < SegmentFile.RECORD_MAGIC_BYTES_SIZE; offset++) {
- if (data[offset] != SegmentFile.RECORD_MAGIC_BYTES[offset]) {
- return false;
- }
- }
- return true;
- }
-
- private SegmentFile getFirstSegmentWithoutLock() {
- return this.segments.get(0);
- }
-
- @Override
- protected void onTruncateSuffix(final long lastIndexKept) throws RocksDBException, IOException {
- List<SegmentFile> destroyedFiles = null;
- this.writeLock.lock();
- try {
- final int keptFileIndex = binarySearchFileIndexByLogIndex(lastIndexKept);
- int toIndex = binarySearchFileIndexByLogIndex(getLastLogIndex());
-
- if (keptFileIndex < 0) {
- // When all the segments contain logs that index is greater than lastIndexKept,
- // truncate all segments.
- if (!this.segments.isEmpty()) {
- final long firstLogIndex = getFirstSegmentWithoutLock().getFirstLogIndex();
- if (firstLogIndex > lastIndexKept) {
- final List<SegmentFile> removedFiles = this.segments.subList(0, this.segments.size());
- destroyedFiles = new ArrayList<>(removedFiles);
- removedFiles.clear();
- }
- LOG.info(
- "Truncating all segments in {} because the first log index {} is greater than lastIndexKept={}",
- this.segmentsPath, firstLogIndex, lastIndexKept);
- }
-
- LOG.warn("Segment file not found by logIndex={} to be truncate_suffix, current segments:\n{}.",
- lastIndexKept, descSegments());
- return;
- }
-
- if (toIndex < 0) {
- toIndex = this.segments.size() - 1;
- }
-
- // Destroyed files after keptFile
- final List<SegmentFile> removedFiles = this.segments.subList(keptFileIndex + 1, toIndex + 1);
- destroyedFiles = new ArrayList<>(removedFiles);
- removedFiles.clear();
-
- // Process logs in keptFile(firstLogIndex=lastIndexKept)
- final SegmentFile keptFile = this.segments.get(keptFileIndex);
- if (keptFile.isBlank()) {
- return;
- }
- int logWrotePos = -1; // The truncate position in keptFile.
-
- // Try to find the right position to be truncated.
- {
- // First, find in right [lastIndexKept + 1, getLastLogIndex()]
- long nextIndex = lastIndexKept + 1;
- final long endIndex = Math.min(getLastLogIndex(), keptFile.getLastLogIndex());
- while (nextIndex <= endIndex) {
- final byte[] data = getValueFromRocksDB(getKeyBytes(nextIndex));
- if (data != null) {
- if (data.length == LOCATION_METADATA_SIZE) {
- if (!isMetadata(data)) {
- // Stored in rocksdb directly.
- nextIndex++;
- continue;
- }
- logWrotePos = getWrotePosition(data);
- break;
- } else {
- // Stored in rocksdb directly.
- nextIndex++;
- }
- } else {
- // No more data.
- break;
- }
- }
- }
-
- // Not found in [lastIndexKept + 1, getLastLogIndex()]
- if (logWrotePos < 0) {
- // Second, try to find in left [firstLogIndex, lastIndexKept) when lastIndexKept is not stored in segments.
- final byte[] keptData = getValueFromRocksDB(getKeyBytes(lastIndexKept));
- // The kept log's data is not stored in segments.
- if (!isMetadata(keptData)) {
- //lastIndexKept's log is stored in rocksdb directly, try to find the first previous log that stored in segment.
- long prevIndex = lastIndexKept - 1;
- final long startIndex = keptFile.getFirstLogIndex();
- while (prevIndex >= startIndex) {
- final byte[] data = getValueFromRocksDB(getKeyBytes(prevIndex));
- if (data != null) {
- if (data.length == LOCATION_METADATA_SIZE) {
- if (!isMetadata(data)) {
- // Stored in rocksdb directly.
- prevIndex--;
- continue;
- }
- // Found the position.
- logWrotePos = getWrotePosition(data);
- final byte[] logData = onDataGet(prevIndex, data);
- // Skip this log, it should be kept(logs that are less than lastIndexKept should be kept).
- // Fine the next log position.
- logWrotePos += SegmentFile.getWriteBytes(logData);
- break;
- } else {
- // Stored in rocksdb directly.
- prevIndex--;
- }
- } else {
- LOG.warn(
- "Log entry not found at index={} when truncating logs suffix from lastIndexKept={}.",
- prevIndex, lastIndexKept);
- prevIndex--;
- }
- }
- }
- }
-
- if (logWrotePos >= 0 && logWrotePos < keptFile.getSize()) {
- // Truncate the file from wrotePos and set it's lastLogIndex=lastIndexKept.
- keptFile.truncateSuffix(logWrotePos, lastIndexKept, isSync());
- }
- // Finally, do checkpoint.
- doCheckpoint();
-
- } finally {
- this.writeLock.unlock();
- if (destroyedFiles != null) {
- for (final SegmentFile segmentFile : destroyedFiles) {
- segmentFile.destroy();
- }
- }
- }
- }
-
- /**
- * Retrieve the log wrote position from metadata.
- *
- * @param data the metadata
- * @return the log wrote position
- */
- private int getWrotePosition(final byte[] data) {
- return Bits.getInt(data, SegmentFile.RECORD_MAGIC_BYTES_SIZE + 2 + 8);
- }
-
- @Override
- protected WriteContext newWriteContext() {
- return new BarrierWriteContext();
- }
-
- @Override
- protected byte[] onDataAppend(final long logIndex, final byte[] value, final WriteContext ctx) throws IOException,
- InterruptedException {
- final int waitToWroteBytes = SegmentFile.getWriteBytes(value);
- SegmentFile lastSegmentFile = getLastSegmentFile(logIndex, waitToWroteBytes, true, ctx);
- if (lastSegmentFile.reachesFileEndBy(waitToWroteBytes)) {
- throw new IOException("Too large value size: " + value.length + ", maxSegmentFileSize="
- + this.maxSegmentFileSize);
- }
- if (value.length < this.valueSizeThreshold) {
- // Small value will be stored in rocksdb directly.
- lastSegmentFile.setLastLogIndex(logIndex);
- ctx.finishJob();
- return value;
- }
- // Large value is stored in segment file and returns an encoded location info that will be stored in rocksdb.
- final int pos = lastSegmentFile.write(logIndex, value, ctx);
- final long firstLogIndex = lastSegmentFile.getFirstLogIndex();
- return encodeLocationMetadata(firstLogIndex, pos);
- }
-
- /**
- * Encode segment file firstLogIndex(fileName) and position to a byte array in the format of:
- * <ul>
- * <li> magic bytes(2 B)</li>
- * <li> reserved (2 B)</li>
- * <li> firstLogIndex(8 B)</li>
- * <li> wrote position(4 B)</li>
- * </ul>
- * @param firstLogIndex the first log index
- * @param pos the wrote position
- * @return segment info
- */
- private byte[] encodeLocationMetadata(final long firstLogIndex, final int pos) {
- final byte[] newData = new byte[LOCATION_METADATA_SIZE];
- System.arraycopy(SegmentFile.RECORD_MAGIC_BYTES, 0, newData, 0, SegmentFile.RECORD_MAGIC_BYTES_SIZE);
- // 2 bytes reserved
- Bits.putLong(newData, SegmentFile.RECORD_MAGIC_BYTES_SIZE + 2, firstLogIndex);
- Bits.putInt(newData, SegmentFile.RECORD_MAGIC_BYTES_SIZE + 2 + 8, pos);
- return newData;
- }
-
- private int binarySearchFileIndexByLogIndex(final long logIndex) {
- this.readLock.lock();
- try {
- if (this.segments.isEmpty()) {
- return -1;
- }
- if (this.segments.size() == 1) {
- final SegmentFile firstFile = this.segments.get(0);
- if (firstFile.contains(logIndex)) {
- return 0;
- } else {
- return -1;
- }
- }
-
- int low = 0;
- int high = this.segments.size() - 1;
-
- while (low <= high) {
- final int mid = (low + high) >>> 1;
-
- final SegmentFile file = this.segments.get(mid);
- if (file.getLastLogIndex() < logIndex) {
- low = mid + 1;
- } else if (file.getFirstLogIndex() > logIndex) {
- high = mid - 1;
- } else {
- return mid;
- }
- }
- return -(low + 1);
- } finally {
- this.readLock.unlock();
- }
- }
-
- private SegmentFile binarySearchFileByFirstLogIndex(final long logIndex) {
- this.readLock.lock();
- try {
- if (this.segments.isEmpty()) {
- return null;
- }
- if (this.segments.size() == 1) {
- final SegmentFile firstFile = this.segments.get(0);
- if (firstFile.getFirstLogIndex() == logIndex) {
- return firstFile;
- } else {
- return null;
- }
- }
-
- int low = 0;
- int high = this.segments.size() - 1;
-
- while (low <= high) {
- final int mid = (low + high) >>> 1;
-
- final SegmentFile file = this.segments.get(mid);
- if (file.getFirstLogIndex() < logIndex) {
- low = mid + 1;
- } else if (file.getFirstLogIndex() > logIndex) {
- high = mid - 1;
- } else {
- return file;
- }
- }
- return null;
- } finally {
- this.readLock.unlock();
- }
- }
-
- @Override
- protected byte[] onDataGet(final long logIndex, final byte[] value) throws IOException {
- if (value == null || value.length != LOCATION_METADATA_SIZE) {
- return value;
- }
-
- int offset = 0;
- for (; offset < SegmentFile.RECORD_MAGIC_BYTES_SIZE; offset++) {
- if (value[offset] != SegmentFile.RECORD_MAGIC_BYTES[offset]) {
- return value;
- }
- }
-
- // skip reserved
- offset += 2;
-
- final long firstLogIndex = Bits.getLong(value, offset);
- final int pos = Bits.getInt(value, offset + 8);
- final SegmentFile file = binarySearchFileByFirstLogIndex(firstLogIndex);
- if (file == null) {
- return null;
- }
- return file.read(logIndex, pos);
- }
-}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/storage/log/SegmentFile.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/storage/log/SegmentFile.java
deleted file mode 100644
index 2086930..0000000
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/storage/log/SegmentFile.java
+++ /dev/null
@@ -1,905 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alipay.sofa.jraft.storage.log;
-
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
-import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.channels.FileChannel.MapMode;
-import java.nio.file.Paths;
-import java.nio.file.StandardOpenOption;
-import java.util.Arrays;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.FilenameUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.alipay.sofa.jraft.Lifecycle;
-import com.alipay.sofa.jraft.storage.impl.RocksDBLogStorage.WriteContext;
-import com.alipay.sofa.jraft.storage.log.SegmentFile.SegmentFileOptions;
-import com.alipay.sofa.jraft.util.Bits;
-import com.alipay.sofa.jraft.util.BytesUtil;
-import com.alipay.sofa.jraft.util.OnlyForTest;
-import com.alipay.sofa.jraft.util.Utils;
-import com.sun.jna.NativeLong;
-import com.sun.jna.Pointer;
-
-import sun.nio.ch.DirectBuffer;
-
-/**
- * A fixed size file. The content format is:
- * <pre>
- * magic bytes first log index reserved
- * [0x20 0x20] [... 8 bytes...] [8 bytes]
- *
- * [record, record, ...]
- * <pre>
- *
- * Every record format is:
- * <pre>
- * Magic bytes data length data
- * [0x57, 0x8A] [4 bytes] [bytes]
- *</pre>
- *
- * @author boyan(boyan@antfin.com)
- * @since 1.2.6
- */
-public class SegmentFile implements Lifecycle<SegmentFileOptions> {
-
- private static final int FSYNC_COST_MS_THRESHOLD = 1000;
- private static final int ONE_MINUTE = 60 * 1000;
- public static final int HEADER_SIZE = 18;
- private static final long BLANK_LOG_INDEX = -99;
-
- /**
- * Segment file header.
- * @author boyan(boyan@antfin.com)
- *
- */
- public static class SegmentHeader {
-
- private static final long RESERVED_FLAG = 0L;
- // The file first log index(inclusive)
- volatile long firstLogIndex = BLANK_LOG_INDEX;
- @SuppressWarnings("unused")
- long reserved;
- private static final byte MAGIC = 0x20;
-
- public SegmentHeader() {
- super();
- }
-
- ByteBuffer encode() {
- ByteBuffer buffer = ByteBuffer.allocate(HEADER_SIZE);
- buffer.put(MAGIC);
- buffer.put(MAGIC);
- buffer.putLong(this.firstLogIndex);
- buffer.putLong(RESERVED_FLAG);
- buffer.flip();
- return buffer;
- }
-
- boolean decode(final ByteBuffer buffer) {
- if (buffer == null || buffer.remaining() < HEADER_SIZE) {
- LOG.error("Fail to decode segment header, invalid buffer length: {}",
- buffer == null ? 0 : buffer.remaining());
- return false;
- }
- if (buffer.get() != MAGIC) {
- LOG.error("Fail to decode segment header, invalid magic.");
- return false;
- }
- if (buffer.get() != MAGIC) {
- LOG.error("Fail to decode segment header, invalid magic.");
- return false;
- }
- this.firstLogIndex = buffer.getLong();
- return true;
- }
- }
-
- /**
- * Segment file options.
- *
- * @author boyan(boyan@antfin.com)
- */
- public static class SegmentFileOptions {
- // Whether to recover
- final boolean recover;
- // Recover start position
- final int pos;
- // True when is the last file.
- final boolean isLastFile;
- // True when is a new created file.
- final boolean isNewFile;
- final boolean sync;
-
- private SegmentFileOptions(final boolean recover, final boolean isLastFile, final boolean isNewFile,
- final boolean sync, final int pos) {
- super();
- this.isNewFile = isNewFile;
- this.isLastFile = isLastFile;
- this.recover = recover;
- this.sync = sync;
- this.pos = pos;
- }
-
- public static Builder builder() {
- return new Builder();
- }
-
- public static class Builder {
- boolean recover = false;
- int pos = 0;
- boolean isLastFile = false;
- boolean isNewFile = false;
- boolean sync = true;
-
- public Builder setRecover(final boolean recover) {
- this.recover = recover;
- return this;
- }
-
- public Builder setPos(final int pos) {
- this.pos = pos;
- return this;
- }
-
- public Builder setLastFile(final boolean isLastFile) {
- this.isLastFile = isLastFile;
- return this;
- }
-
- public Builder setNewFile(final boolean isNewFile) {
- this.isNewFile = isNewFile;
- return this;
- }
-
- public Builder setSync(final boolean sync) {
- this.sync = sync;
- return this;
- }
-
- public SegmentFileOptions build() {
- return new SegmentFileOptions(this.recover, this.isLastFile, this.isNewFile, this.sync, this.pos);
- }
- }
-
- }
-
- private static final int BLANK_HOLE_SIZE = 64;
-
- private static final Logger LOG = LoggerFactory.getLogger(SegmentFile.class);
-
- // 4 Bytes for written data length
- private static final int RECORD_DATA_LENGTH_SIZE = 4;
-
- /**
- * Magic bytes for data buffer.
- */
- public static final byte[] RECORD_MAGIC_BYTES = new byte[] { (byte) 0x57, (byte) 0x8A };
-
- public static final int RECORD_MAGIC_BYTES_SIZE = RECORD_MAGIC_BYTES.length;
-
- private final SegmentHeader header;
-
- // The file last log index(inclusive)
- private volatile long lastLogIndex = Long.MAX_VALUE;
- // File size
- private int size;
- // File path
- private final String path;
- // mmap byte buffer.
- private MappedByteBuffer buffer;
- // Wrote position.
- private volatile int wrotePos;
- // Committed position
- private volatile int committedPos;
-
- private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(false);
-
- private final Lock writeLock = this.readWriteLock.writeLock();
- private final Lock readLock = this.readWriteLock.readLock();
- private final ThreadPoolExecutor writeExecutor;
- private volatile boolean swappedOut;
- private volatile boolean readOnly;
- private long swappedOutTimestamp = -1L;
- private final String filename;
-
- public SegmentFile(final int size, final String path, final ThreadPoolExecutor writeExecutor) {
- super();
- this.header = new SegmentHeader();
- this.size = size;
- this.writeExecutor = writeExecutor;
- this.path = path;
- this.filename = FilenameUtils.getName(this.path);
- this.swappedOut = this.readOnly = false;
- }
-
- void setReadOnly(final boolean readOnly) {
- this.readOnly = readOnly;
- }
-
- void setFirstLogIndex(final long index) {
- this.header.firstLogIndex = index;
- }
-
- long getLastLogIndex() {
- return this.lastLogIndex;
- }
-
- @OnlyForTest
- public int getWrotePos() {
- return this.wrotePos;
- }
-
- int getCommittedPos() {
- return this.committedPos;
- }
-
- String getFilename() {
- return this.filename;
- }
-
- long getFirstLogIndex() {
- return this.header.firstLogIndex;
- }
-
- public boolean isSwappedOut() {
- return this.swappedOut;
- }
-
- int getSize() {
- return this.size;
- }
-
- /**
- * return true when this segment file is blank that we don't write any data into it.
- * @return
- */
- boolean isBlank() {
- return this.header.firstLogIndex == BLANK_LOG_INDEX;
- }
-
- boolean isHeaderCorrupted() {
- return this.header == null;
- }
-
- String getPath() {
- return this.path;
- }
-
- public void setLastLogIndex(final long lastLogIndex) {
- this.writeLock.lock();
- try {
- this.lastLogIndex = lastLogIndex;
- } finally {
- this.writeLock.unlock();
- }
- }
-
- private void swapIn() {
- if (this.swappedOut) {
- this.writeLock.lock();
- try {
- if (!this.swappedOut) {
- return;
- }
- long startMs = Utils.monotonicMs();
- mmapFile(false);
- this.swappedOut = false;
- LOG.info("Swapped in segment file {} cost {} ms.", this.path, Utils.monotonicMs() - startMs);
- } finally {
- this.writeLock.unlock();
- }
- }
- }
-
- public void hintLoad() {
- final long address = ((DirectBuffer) (this.buffer)).address();
- Pointer pointer = new Pointer(address);
-
- long beginTime = Utils.monotonicMs();
- int ret = LibC.INSTANCE.madvise(pointer, new NativeLong(this.size), LibC.MADV_WILLNEED);
- LOG.info("madvise(MADV_WILLNEED) {} {} {} ret = {} time consuming = {}", address, this.path, this.size, ret,
- Utils.monotonicMs() - beginTime);
- }
-
- public void hintUnload() {
- final long address = ((DirectBuffer) (this.buffer)).address();
- Pointer pointer = new Pointer(address);
-
- long beginTime = Utils.monotonicMs();
- int ret = LibC.INSTANCE.madvise(pointer, new NativeLong(this.size), LibC.MADV_DONTNEED);
- LOG.info("madvise(MADV_DONTNEED) {} {} {} ret = {} time consuming = {}", address, this.path, this.size, ret,
- Utils.monotonicMs() - beginTime);
- }
-
- public void swapOut() {
- if (!this.swappedOut) {
- this.writeLock.lock();
- try {
- if (this.swappedOut) {
- return;
- }
- if (!this.readOnly) {
- LOG.warn("The segment file {} is not readonly, can't be swapped out.", this.path);
- return;
- }
- final long now = Utils.monotonicMs();
- if (this.swappedOutTimestamp > 0 && now - this.swappedOutTimestamp < ONE_MINUTE) {
- return;
- }
- this.swappedOut = true;
- unmap(this.buffer);
- this.buffer = null;
- this.swappedOutTimestamp = now;
- LOG.info("Swapped out segment file {} cost {} ms.", this.path, Utils.monotonicMs() - now);
- } finally {
- this.writeLock.unlock();
- }
- }
- }
-
- /**
- * Truncate data from wrotePos(inclusive) to the file end and set lastLogIndex=logIndex.
- * @param wrotePos the wrote position(inclusive)
- * @param logIndex the log index
- * @param sync whether to call fsync
- */
- public void truncateSuffix(final int wrotePos, final long logIndex, final boolean sync) {
- this.writeLock.lock();
- try {
- if (wrotePos >= this.wrotePos) {
- return;
- }
- swapInIfNeed();
- final int oldPos = this.wrotePos;
- clear(wrotePos, sync);
- this.wrotePos = wrotePos;
- this.lastLogIndex = logIndex;
- this.buffer.position(wrotePos);
- LOG.info(
- "Segment file {} truncate suffix from pos={}, then set lastLogIndex={}, oldWrotePos={}, newWrotePos={}",
- this.path, wrotePos, logIndex, oldPos, this.wrotePos);
- } finally {
- this.writeLock.unlock();
- }
- }
-
- /**
- * Returns true when the segment file contains the log index.
- *
- * @param logIndex the log index
- * @return true if the segment file contains the log index, otherwise return false
- */
- public boolean contains(final long logIndex) {
- this.readLock.lock();
- try {
- return logIndex >= this.header.firstLogIndex && logIndex <= this.lastLogIndex;
- } finally {
- this.readLock.unlock();
- }
- }
-
- /**
- * Clear data in [startPos, startPos+64).
- *
- * @param startPos the start position(inclusive)
- */
- public void clear(final int startPos, final boolean sync) {
- this.writeLock.lock();
- try {
- if (startPos < 0 || startPos > this.size) {
- return;
- }
- final int endPos = Math.min(this.size, startPos + BLANK_HOLE_SIZE);
- for (int i = startPos; i < endPos; i++) {
- this.buffer.put(i, (byte) 0);
- }
- if (sync) {
- fsync(this.buffer);
- }
- LOG.info("Segment file {} cleared data in [{}, {}).", this.path, startPos, endPos);
- } finally {
- this.writeLock.unlock();
- }
- }
-
- @Override
- public boolean init(final SegmentFileOptions opts) {
- if (opts.isNewFile) {
- return loadNewFile(opts);
- } else {
- return loadExistsFile(opts);
- }
-
- }
-
- private boolean loadNewFile(final SegmentFileOptions opts) {
- assert (opts.pos == 0);
- assert (!opts.recover);
-
- final File file = new File(this.path);
-
- if (file.exists()) {
- LOG.error("File {} already exists.", this.path);
- return false;
- }
- long startMs = Utils.monotonicMs();
- this.writeLock.lock();
- try (FileChannel fc = openFileChannel(true)) {
- this.buffer = fc.map(MapMode.READ_WRITE, 0, this.size);
- // Warmup mmap file
- this.buffer.position(0);
- this.buffer.limit(this.size);
- saveHeader(true);
-
- this.committedPos = this.wrotePos = HEADER_SIZE;
- this.buffer.position(this.wrotePos);
-
- assert (this.wrotePos == this.buffer.position());
-
- LOG.info("Created a new segment file {} cost {} ms, wrotePosition={}, bufferPosition={}, mappedSize={}.",
- this.path, Utils.monotonicMs() - startMs, this.wrotePos, this.buffer.position(), this.size);
- return true;
- } catch (final IOException e) {
- LOG.error("Fail to init segment file {}.", this.path, e);
- return false;
- } finally {
- this.writeLock.unlock();
- }
- }
-
- private boolean loadExistsFile(final SegmentFileOptions opts) {
- this.writeLock.lock();
- try {
- if (!mmapFile(false)) {
- return false;
- }
- if (!tryRecoverExistsFile(opts)) {
- return false;
- }
- this.readOnly = !opts.isLastFile;
- return true;
- } finally {
- this.writeLock.unlock();
- }
- }
-
- private boolean tryRecoverExistsFile(final SegmentFileOptions opts) {
- try {
- if (isBlank()) {
- // A blank segment, we don't need to recover.
- assert (!opts.recover);
- this.committedPos = this.wrotePos = HEADER_SIZE;
- this.buffer.position(this.wrotePos);
- LOG.info("Segment file {} is blank, truncate it from {}.", this.path, HEADER_SIZE);
- clear(this.wrotePos, opts.sync);
- } else {
- if (opts.recover) {
- if (!recover(opts)) {
- return false;
- }
- } else {
- this.wrotePos = opts.pos;
- this.buffer.position(this.wrotePos);
- }
- assert (this.wrotePos == this.buffer.position());
- this.committedPos = this.wrotePos;
- }
- LOG.info("Loaded segment file {}, wrotePosition={}, bufferPosition={}, mappedSize={}.", this.path,
- this.wrotePos, this.buffer.position(), this.size);
- } catch (final Exception e) {
- LOG.error("Fail to load segment file {}.", this.path, e);
- return false;
- }
- return true;
- }
-
- boolean mmapFile(final boolean create) {
- if (this.buffer != null) {
- return true;
- }
- final File file = new File(this.path);
-
- if (file.exists()) {
- this.size = (int) file.length();
- } else {
- if (!create) {
- LOG.error("File {} is not exists.", this.path);
- return false;
- }
- }
- try (FileChannel fc = openFileChannel(create)) {
- this.buffer = fc.map(MapMode.READ_WRITE, 0, this.size);
- this.buffer.limit(this.size);
- if (!loadHeader()) {
- LOG.error("Fail to load segment header from file {}.", this.path);
- return false;
- }
- return true;
- } catch (final IOException e) {
- LOG.error("Fail to mmap segment file {}.", this.path, e);
- return false;
- }
- }
-
- private FileChannel openFileChannel(final boolean create) throws IOException {
- if (create) {
- return FileChannel.open(Paths.get(this.path), StandardOpenOption.CREATE, StandardOpenOption.READ,
- StandardOpenOption.WRITE);
- } else {
- return FileChannel.open(Paths.get(this.path), StandardOpenOption.READ, StandardOpenOption.WRITE);
- }
- }
-
- boolean loadHeader() {
- int oldPos = this.buffer.position();
- try {
- this.buffer.position(0);
- return this.header.decode(this.buffer.asReadOnlyBuffer());
- } finally {
- this.buffer.position(oldPos);
- }
- }
-
- void saveHeader(final boolean sync) {
- int oldPos = this.buffer.position();
- try {
- this.buffer.position(0);
- final ByteBuffer headerBuf = this.header.encode();
- assert (headerBuf.remaining() == HEADER_SIZE);
- this.buffer.put(headerBuf);
- if (sync) {
- fsync(this.buffer);
- }
- } finally {
- this.buffer.position(oldPos);
- }
- }
-
- @SuppressWarnings("NonAtomicOperationOnVolatileField")
- private boolean recover(final SegmentFileOptions opts) throws IOException {
- LOG.info("Start to recover segment file {} from position {}.", this.path, opts.pos);
- this.wrotePos = opts.pos;
- if (this.wrotePos < HEADER_SIZE) {
- this.wrotePos = HEADER_SIZE;
- }
- this.buffer.position(this.wrotePos);
- final long start = Utils.monotonicMs();
- while (this.wrotePos < this.size) {
- if (this.buffer.remaining() < RECORD_MAGIC_BYTES_SIZE) {
- LOG.error("Fail to recover segment file {}, missing magic bytes.", this.path);
- return false;
- }
- final byte[] magicBytes = new byte[RECORD_MAGIC_BYTES_SIZE];
- this.buffer.get(magicBytes);
-
- if (!Arrays.equals(RECORD_MAGIC_BYTES, magicBytes)) {
-
- boolean truncateDirty = false;
-
- int i = 0;
- for (final byte b : magicBytes) {
- i++;
- if (b != 0) {
- if (opts.isLastFile) {
- // It's the last file
- // Truncate the dirty data from wrotePos
- LOG.error("Corrupted magic bytes in segment file {} at pos={}, will truncate it.",
- this.path, this.wrotePos + i);
- truncateDirty = true;
- break;
- } else {
- // It's not the last file, but has invalid magic bytes, the data is corrupted.
- LOG.error("Fail to recover segment file {}, invalid magic bytes: {} at pos={}.", this.path,
- BytesUtil.toHex(magicBytes), this.wrotePos);
- return false;
- }
- }
- }
-
- if (truncateDirty) {
- truncateFile(opts.sync);
- } else {
- // Reach blank hole, rewind position.
- this.buffer.position(this.buffer.position() - RECORD_MAGIC_BYTES_SIZE);
- }
- // Reach end or dirty magic bytes, we should break out.
- break;
- }
-
- if (this.buffer.remaining() < RECORD_DATA_LENGTH_SIZE) {
- if (opts.isLastFile) {
- LOG.error("Corrupted data length in segment file {} at pos={}, will truncate it.", this.path,
- this.buffer.position());
- truncateFile(opts.sync);
- break;
- } else {
- LOG.error(
- "Fail to recover segment file {}, invalid data length remaining: {}, expected {} at pos={}.",
- this.path, this.buffer.remaining(), RECORD_DATA_LENGTH_SIZE, this.wrotePos);
- return false;
- }
- }
-
- final int dataLen = this.buffer.getInt();
- if (this.buffer.remaining() < dataLen) {
- if (opts.isLastFile) {
- LOG.error(
- "Corrupted data in segment file {} at pos={}, expectDataLength={}, but remaining is {}, will truncate it.",
- this.path, this.buffer.position(), dataLen, this.buffer.remaining());
- truncateFile(opts.sync);
- break;
- } else {
- LOG.error(
- "Fail to recover segment file {}, invalid data: expected {} bytes in buf but actual {} at pos={}.",
- this.path, dataLen, this.buffer.remaining(), this.wrotePos);
- return false;
- }
-
- }
- // Skip data
- this.buffer.position(this.buffer.position() + dataLen);
- this.wrotePos += RECORD_MAGIC_BYTES_SIZE + RECORD_DATA_LENGTH_SIZE + dataLen;
- }
- LOG.info("Recover segment file {} cost {} millis.", this.path, Utils.monotonicMs() - start);
- return true;
- }
-
- private void truncateFile(final boolean sync) throws IOException {
- // Truncate dirty data.
- clear(this.wrotePos, sync);
- this.buffer.position(this.wrotePos);
- LOG.warn("Truncated segment file {} from pos={}.", this.path, this.wrotePos);
- }
-
- public boolean reachesFileEndBy(final long waitToWroteBytes) {
- this.readLock.lock();
- try {
- return this.wrotePos + waitToWroteBytes > this.size;
- } finally {
- this.readLock.unlock();
- }
- }
-
- public boolean isFull() {
- return reachesFileEndBy(1);
- }
-
- static int getWriteBytes(final byte[] data) {
- return RECORD_MAGIC_BYTES_SIZE + RECORD_DATA_LENGTH_SIZE + data.length;
- }
-
- /**
- * Write the data and return it's wrote position.
- *
- * @param logIndex the log index
- * @param data data to write
- * @return the wrote position
- */
- @SuppressWarnings("NonAtomicOperationOnVolatileField")
- public int write(final long logIndex, final byte[] data, final WriteContext ctx) {
- int pos = -1;
- MappedByteBuffer buf = null;
- this.writeLock.lock();
- try {
- assert (this.wrotePos == this.buffer.position());
- buf = this.buffer;
- pos = this.wrotePos;
- this.wrotePos += RECORD_MAGIC_BYTES_SIZE + RECORD_DATA_LENGTH_SIZE + data.length;
- this.buffer.position(this.wrotePos);
- // Update log index.
- if (isBlank() || pos == HEADER_SIZE) {
- this.header.firstLogIndex = logIndex;
- // we don't need to call fsync header here, the new header will be flushed with this wrote.
- saveHeader(false);
- }
- this.lastLogIndex = logIndex;
- return pos;
- } finally {
- this.writeLock.unlock();
- final int wroteIndex = pos;
- final MappedByteBuffer buffer = buf;
- this.writeExecutor.execute(() -> {
- try {
- put(buffer, wroteIndex, RECORD_MAGIC_BYTES);
- putInt(buffer, wroteIndex + RECORD_MAGIC_BYTES_SIZE, data.length);
- put(buffer, wroteIndex + RECORD_MAGIC_BYTES_SIZE + RECORD_DATA_LENGTH_SIZE, data);
- } catch (final Exception e) {
- ctx.setError(e);
- } finally {
- ctx.finishJob();
- }
- });
- }
- }
-
- private static void putInt(final MappedByteBuffer buffer, final int index, final int n) {
- byte[] bs = new byte[RECORD_DATA_LENGTH_SIZE];
- Bits.putInt(bs, 0, n);
- for (int i = 0; i < bs.length; i++) {
- buffer.put(index + i, bs[i]);
- }
- }
-
- private static void put(final MappedByteBuffer buffer, final int index, final byte[] data) {
- for (int i = 0; i < data.length; i++) {
- buffer.put(index + i, data[i]);
- }
- }
-
- /**
- * Read data from the position.
- *
- * @param logIndex the log index
- * @param pos the position to read
- * @return read data
- */
- public byte[] read(final long logIndex, final int pos) throws IOException {
- assert (pos >= HEADER_SIZE);
- swapInIfNeed();
- this.readLock.lock();
- try {
- if (logIndex < this.header.firstLogIndex || logIndex > this.lastLogIndex) {
- LOG.warn(
- "Try to read data from segment file {} out of range, logIndex={}, readPos={}, firstLogIndex={}, lastLogIndex={}.",
- this.path, logIndex, pos, this.header.firstLogIndex, this.lastLogIndex);
- return null;
- }
- if (pos >= this.committedPos) {
- LOG.warn(
- "Try to read data from segment file {} out of comitted position, logIndex={}, readPos={}, wrotePos={}, this.committedPos={}.",
- this.path, logIndex, pos, this.wrotePos, this.committedPos);
- return null;
- }
- final ByteBuffer readBuffer = this.buffer.asReadOnlyBuffer();
- readBuffer.position(pos);
- if (readBuffer.remaining() < RECORD_MAGIC_BYTES_SIZE) {
- throw new IOException("Missing magic buffer.");
- }
- readBuffer.position(pos + RECORD_MAGIC_BYTES_SIZE);
- final int dataLen = readBuffer.getInt();
- //TODO(boyan) reuse data array?
- final byte[] data = new byte[dataLen];
- readBuffer.get(data);
- return data;
- } finally {
- this.readLock.unlock();
- }
- }
-
- private void swapInIfNeed() {
- if (this.swappedOut) {
- swapIn();
- }
- }
-
- /**
- * Forces any changes made to this segment file's content to be written to the
- * storage device containing the mapped file.
- */
- public void sync(final boolean sync) throws IOException {
- MappedByteBuffer buf = null;
- this.writeLock.lock();
- try {
- if (this.committedPos >= this.wrotePos) {
- return;
- }
- this.committedPos = this.wrotePos;
- buf = this.buffer;
- LOG.debug("Commit segment file {} at pos {}.", this.path, this.committedPos);
- } finally {
- this.writeLock.unlock();
- }
- if (sync) {
- fsync(buf);
- }
- }
-
- private void fsync(final MappedByteBuffer buffer) {
- if (buffer != null) {
- long startMs = Utils.monotonicMs();
- buffer.force();
- final long cost = Utils.monotonicMs() - startMs;
- if (cost >= FSYNC_COST_MS_THRESHOLD) {
- LOG.warn("Call fsync on file {} cost {} ms.", this.path, cost);
- }
- }
- }
-
- /**
- * Destroy the file.
- */
- public void destroy() {
- this.writeLock.lock();
- try {
- shutdown();
- FileUtils.deleteQuietly(new File(this.path));
- LOG.info("Deleted segment file {}.", this.path);
- } finally {
- this.writeLock.unlock();
- }
- }
-
- // See https://stackoverflow.com/questions/2972986/how-to-unmap-a-file-from-memory-mapped-using-filechannel-in-java
- // TODO move into utils
- @SuppressWarnings({ "unchecked", "rawtypes" })
- private static void unmap(final MappedByteBuffer cb) {
- // JavaSpecVer: 1.6, 1.7, 1.8, 9, 10
- final boolean isOldJDK = System.getProperty("java.specification.version", "99").startsWith("1.");
- try {
- if (isOldJDK) {
- final Method cleaner = cb.getClass().getMethod("cleaner");
- cleaner.setAccessible(true);
- final Method clean = Class.forName("sun.misc.Cleaner").getMethod("clean");
- clean.setAccessible(true);
- clean.invoke(cleaner.invoke(cb));
- } else {
- Class unsafeClass;
- try {
- unsafeClass = Class.forName("sun.misc.Unsafe");
- } catch (final Exception ex) {
- // jdk.internal.misc.Unsafe doesn't yet have an invokeCleaner() method,
- // but that method should be added if sun.misc.Unsafe is removed.
- unsafeClass = Class.forName("jdk.internal.misc.Unsafe");
- }
- final Method clean = unsafeClass.getMethod("invokeCleaner", ByteBuffer.class);
- clean.setAccessible(true);
- final Field theUnsafeField = unsafeClass.getDeclaredField("theUnsafe");
- theUnsafeField.setAccessible(true);
- final Object theUnsafe = theUnsafeField.get(null);
- clean.invoke(theUnsafe, cb);
- }
- } catch (final Exception ex) {
- LOG.error("Fail to un-mapped segment file.", ex);
- }
- }
-
- @Override
- public void shutdown() {
- this.writeLock.lock();
- try {
- if (this.buffer == null) {
- return;
- }
- hintUnload();
- unmap(this.buffer);
- this.buffer = null;
- LOG.info("Unloaded segment file {}, current status: {}.", this.path, toString());
- } finally {
- this.writeLock.unlock();
- }
- }
-
- @Override
- public String toString() {
- return "SegmentFile [firstLogIndex=" + this.header.firstLogIndex + ", lastLogIndex=" + this.lastLogIndex
- + ", size=" + this.size + ", path=" + this.path + ", wrotePos=" + this.wrotePos + ", committedPos="
- + this.committedPos + "]";
- }
-}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/storage/snapshot/local/LocalSnapshotMetaTable.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/storage/snapshot/local/LocalSnapshotMetaTable.java
index ee99ac9..f26b337 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/storage/snapshot/local/LocalSnapshotMetaTable.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/storage/snapshot/local/LocalSnapshotMetaTable.java
@@ -31,8 +31,6 @@ import com.alipay.sofa.jraft.entity.LocalStorageOutter.LocalSnapshotPbMeta.File;
import com.alipay.sofa.jraft.entity.RaftOutter.SnapshotMeta;
import com.alipay.sofa.jraft.option.RaftOptions;
import com.alipay.sofa.jraft.storage.io.ProtoBufFile;
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.ZeroByteStringHelper;
/**
* Table to keep local snapshot metadata infos.
@@ -40,6 +38,7 @@ import com.google.protobuf.ZeroByteStringHelper;
* @author boyan (boyan@alibaba-inc.com)
*
* 2018-Mar-12 7:22:27 PM
+ * TODO asch broken
*/
public class LocalSnapshotMetaTable {
@@ -81,13 +80,13 @@ public class LocalSnapshotMetaTable {
return false;
}
try {
- final LocalSnapshotPbMeta pbMeta = LocalSnapshotPbMeta.parseFrom(ZeroByteStringHelper.wrap(buf));
+ final LocalSnapshotPbMeta pbMeta = LocalSnapshotPbMeta.parseFrom(buf);
if (pbMeta == null) {
LOG.error("Fail to load meta from buffer.");
return false;
}
return loadFromPbMeta(pbMeta);
- } catch (final InvalidProtocolBufferException e) {
+ } catch (final Exception e) {
LOG.error("Fail to parse LocalSnapshotPbMeta from byte buffer", e);
return false;
}
@@ -127,7 +126,7 @@ public class LocalSnapshotMetaTable {
* Returns true when has the snapshot metadata.
*/
public boolean hasMeta() {
- return this.meta != null && this.meta.isInitialized();
+ return this.meta != null;
}
/**
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/AsciiStringUtil.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/util/AsciiStringUtil.java
index 42e7ade..791b6aa 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/AsciiStringUtil.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/util/AsciiStringUtil.java
@@ -16,9 +16,6 @@
*/
package com.alipay.sofa.jraft.util;
-import com.alipay.sofa.jraft.util.internal.UnsafeUtil;
-import com.google.protobuf.ByteString;
-
/**
* @author jiachun.fjc
*/
@@ -38,7 +35,7 @@ public final class AsciiStringUtil {
for (int i = 0; i < len; i++) {
out[i] = (char) (in[i + offset] & 0xFF);
}
- return UnsafeUtil.moveToString(out);
+ return moveToString(out);
}
public static String unsafeDecode(final byte[] in) {
@@ -51,9 +48,13 @@ public final class AsciiStringUtil {
for (int i = 0; i < len; i++) {
out[i] = (char) (in.byteAt(i) & 0xFF);
}
- return UnsafeUtil.moveToString(out);
+ return moveToString(out);
}
private AsciiStringUtil() {
}
+
+ public static String moveToString(final char[] chars) {
+ return new String(chars);
+ }
}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/ByteString.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/util/ByteString.java
index dc76b81..b4d92a5 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/ByteString.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/util/ByteString.java
@@ -1,6 +1,10 @@
package com.alipay.sofa.jraft.util;
+import java.io.IOException;
+import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
public class ByteString {
public static final ByteString EMPTY = new ByteString(ByteBuffer.wrap(new byte[0]));
@@ -22,4 +26,14 @@ public class ByteString {
public ByteBuffer asReadOnlyByteBuffer() {
return buf.asReadOnlyBuffer();
}
+
+ public byte byteAt(int pos) {
+ return buf.get(pos);
+ }
+
+ public void writeTo(OutputStream outputStream) throws IOException {
+ WritableByteChannel channel = Channels.newChannel(outputStream);
+
+ channel.write(buf);
+ }
}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/BytesUtil.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/util/BytesUtil.java
index f3364ee..01ccac2 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/BytesUtil.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/util/BytesUtil.java
@@ -20,8 +20,6 @@ import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.Comparator;
-import com.alipay.sofa.jraft.util.internal.UnsafeUtf8Util;
-import com.alipay.sofa.jraft.util.internal.UnsafeUtil;
/**
* @author jiachun.fjc
@@ -41,39 +39,39 @@ public final class BytesUtil {
return bytes == null || bytes.length == 0;
}
- /**
- * This method has better performance than String#getBytes(Charset),
- * See the benchmark class: Utf8Benchmark for details.
- */
- public static byte[] writeUtf8(final String in) {
- if (in == null) {
- return null;
- }
- if (UnsafeUtil.hasUnsafe()) {
- // Calculate the encoded length.
- final int len = UnsafeUtf8Util.encodedLength(in);
- final byte[] outBytes = new byte[len];
- UnsafeUtf8Util.encodeUtf8(in, outBytes, 0, len);
- return outBytes;
- } else {
- return in.getBytes(StandardCharsets.UTF_8);
- }
- }
-
- /**
- * This method has better performance than String#String(byte[], Charset),
- * See the benchmark class: Utf8Benchmark for details.
- */
- public static String readUtf8(final byte[] in) {
- if (in == null) {
- return null;
- }
- if (UnsafeUtil.hasUnsafe()) {
- return UnsafeUtf8Util.decodeUtf8(in, 0, in.length);
- } else {
- return new String(in, StandardCharsets.UTF_8);
- }
- }
+// /**
+// * This method has better performance than String#getBytes(Charset),
+// * See the benchmark class: Utf8Benchmark for details.
+// */
+// public static byte[] writeUtf8(final String in) {
+// if (in == null) {
+// return null;
+// }
+// if (UnsafeUtil.hasUnsafe()) {
+// // Calculate the encoded length.
+// final int len = UnsafeUtf8Util.encodedLength(in);
+// final byte[] outBytes = new byte[len];
+// UnsafeUtf8Util.encodeUtf8(in, outBytes, 0, len);
+// return outBytes;
+// } else {
+// return in.getBytes(StandardCharsets.UTF_8);
+// }
+// }
+//
+// /**
+// * This method has better performance than String#String(byte[], Charset),
+// * See the benchmark class: Utf8Benchmark for details.
+// */
+// public static String readUtf8(final byte[] in) {
+// if (in == null) {
+// return null;
+// }
+// if (UnsafeUtil.hasUnsafe()) {
+// return UnsafeUtf8Util.decodeUtf8(in, 0, in.length);
+// } else {
+// return new String(in, StandardCharsets.UTF_8);
+// }
+// }
public static byte[] nextBytes(final byte[] bytes) {
Requires.requireNonNull(bytes, "bytes");
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/JDKMarshaller.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/util/JDKMarshaller.java
new file mode 100644
index 0000000..cf9bdce
--- /dev/null
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/util/JDKMarshaller.java
@@ -0,0 +1,32 @@
+package com.alipay.sofa.jraft.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+/** */
+public class JDKMarshaller implements Marshaller {
+ /** {@inheritDoc} */
+ @Override public byte[] marshall(Object o) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(o);
+ oos.close();
+
+ return baos.toByteArray();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object unmarshall(byte[] raw) throws IOException{
+ ByteArrayInputStream bais = new ByteArrayInputStream(raw);
+ ObjectInputStream oos = new ObjectInputStream(bais);
+
+ try {
+ return oos.readObject();
+ } catch (ClassNotFoundException e) {
+ throw new Error(e);
+ }
+ }
+}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/Marshaller.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/util/Marshaller.java
new file mode 100644
index 0000000..6f28493
--- /dev/null
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/util/Marshaller.java
@@ -0,0 +1,11 @@
+package com.alipay.sofa.jraft.util;
+
+import java.io.IOException;
+
+public interface Marshaller {
+ public static Marshaller DEFAULT = new JDKMarshaller();
+
+ byte[] marshall(Object o) throws IOException;
+
+ <T> T unmarshall(byte[] raw) throws IOException;
+}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/Mpsc.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/util/Mpsc.java
index 76a5987..df8e5c1 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/Mpsc.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/util/Mpsc.java
@@ -18,12 +18,13 @@ package com.alipay.sofa.jraft.util;
import java.util.Queue;
-import org.jctools.queues.MpscChunkedArrayQueue;
-import org.jctools.queues.MpscUnboundedArrayQueue;
-import org.jctools.queues.atomic.MpscGrowableAtomicArrayQueue;
-import org.jctools.queues.atomic.MpscUnboundedAtomicArrayQueue;
+//import org.jctools.queues.MpscChunkedArrayQueue;
+//import org.jctools.queues.MpscUnboundedArrayQueue;
+//import org.jctools.queues.atomic.MpscGrowableAtomicArrayQueue;
+//import org.jctools.queues.atomic.MpscUnboundedAtomicArrayQueue;
-import com.alipay.sofa.jraft.util.internal.UnsafeUtil;
+//import com.alipay.sofa.jraft.util.internal.UnsafeUtil;
+import java.util.concurrent.ConcurrentLinkedQueue;
/**
* @author jiachun.fjc
@@ -34,13 +35,16 @@ public final class Mpsc {
private static final int MIN_MAX_MPSC_CAPACITY = MPSC_CHUNK_SIZE << 1;
public static Queue<Runnable> newMpscQueue() {
- return UnsafeUtil.hasUnsafe() ? new MpscUnboundedArrayQueue<>(MPSC_CHUNK_SIZE)
- : new MpscUnboundedAtomicArrayQueue<>(MPSC_CHUNK_SIZE);
+// return UnsafeUtil.hasUnsafe() ? new MpscUnboundedArrayQueue<>(MPSC_CHUNK_SIZE)
+// : new MpscUnboundedAtomicArrayQueue<>(MPSC_CHUNK_SIZE);
+ return new ConcurrentLinkedQueue<>();
}
public static Queue<Runnable> newMpscQueue(final int maxCapacity) {
- final int capacity = Math.max(MIN_MAX_MPSC_CAPACITY, maxCapacity);
- return UnsafeUtil.hasUnsafe() ? new MpscChunkedArrayQueue<>(MPSC_CHUNK_SIZE, capacity)
- : new MpscGrowableAtomicArrayQueue<>(MPSC_CHUNK_SIZE, capacity);
+// final int capacity = Math.max(MIN_MAX_MPSC_CAPACITY, maxCapacity);
+// return UnsafeUtil.hasUnsafe() ? new MpscChunkedArrayQueue<>(MPSC_CHUNK_SIZE, capacity)
+// : new MpscGrowableAtomicArrayQueue<>(MPSC_CHUNK_SIZE, capacity);
+
+ return new ConcurrentLinkedQueue<>();
}
}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/SegmentList.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/util/SegmentList.java
index 1828b20..7f20772 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/SegmentList.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/util/SegmentList.java
@@ -22,7 +22,6 @@ import java.util.Collection;
import java.util.function.Predicate;
import com.alipay.sofa.jraft.util.internal.ReferenceFieldUpdater;
-import com.alipay.sofa.jraft.util.internal.UnsafeUtil;
import com.alipay.sofa.jraft.util.internal.Updaters;
/**
@@ -375,11 +374,14 @@ public class SegmentList<T> {
private Object[] coll2Array(final Collection<T> coll) {
Object[] src;
- if (coll instanceof ArrayList && UnsafeUtil.hasUnsafe()) {
- src = LIST_ARRAY_GETTER.get((ArrayList<T>) coll);
- } else {
- src = coll.toArray();
- }
+// if (coll instanceof ArrayList && UnsafeUtil.hasUnsafe()) {
+// src = LIST_ARRAY_GETTER.get((ArrayList<T>) coll);
+// } else {
+// src = coll.toArray();
+// }
+
+ src = coll.toArray();
+
return src;
}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/SignalHelper.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/util/SignalHelper.java
deleted file mode 100644
index e698485..0000000
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/SignalHelper.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alipay.sofa.jraft.util;
-
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- * @author jiachun.fjc
- */
-public final class SignalHelper {
-
- private static final Logger LOG = LoggerFactory.getLogger(SignalHelper.class);
-
- private static final SignalAccessor SIGNAL_ACCESSOR = getSignalAccessor0();
-
- public static final String SIG_USR2 = "USR2";
-
- public static boolean supportSignal() {
- return SIGNAL_ACCESSOR != null;
- }
-
- /**
- * Registers user signal handlers.
- *
- * @param signalName a signal name
- * @param handlers user signal handlers
- * @return true if support on current platform
- */
- public static boolean addSignal(final String signalName, final List<JRaftSignalHandler> handlers) {
- if (SIGNAL_ACCESSOR != null) {
- SIGNAL_ACCESSOR.addSignal(signalName, handlers);
- return true;
- }
- return false;
- }
-
- private static SignalAccessor getSignalAccessor0() {
- return hasSignal0() ? new SignalAccessor() : null;
- }
-
- private static boolean hasSignal0() {
- try {
- Class.forName("sun.misc.Signal");
- return true;
- } catch (final Throwable t) {
- if (LOG.isWarnEnabled()) {
- LOG.warn("sun.misc.Signal: unavailable, {}.", t);
- }
- }
- return false;
- }
-
- private SignalHelper() {
- }
-
- static class SignalAccessor {
-
- public void addSignal(final String signalName, final List<JRaftSignalHandler> handlers) {
- final sun.misc.Signal signal = new sun.misc.Signal(signalName);
- final SignalHandlerAdapter adapter = new SignalHandlerAdapter(signal, handlers);
- sun.misc.Signal.handle(signal, adapter);
- }
- }
-
- static class SignalHandlerAdapter implements sun.misc.SignalHandler {
-
- private final sun.misc.Signal target;
- private final List<JRaftSignalHandler> handlers;
-
- public static void addSignal(final SignalHandlerAdapter adapter) {
- sun.misc.Signal.handle(adapter.target, adapter);
- }
-
- public SignalHandlerAdapter(sun.misc.Signal target, List<JRaftSignalHandler> handlers) {
- this.target = target;
- this.handlers = handlers;
- }
-
- @Override
- public void handle(final sun.misc.Signal signal) {
- try {
- if (!this.target.equals(signal)) {
- return;
- }
-
- LOG.info("Handling signal {}.", signal);
-
- for (final JRaftSignalHandler h : this.handlers) {
- h.handle(signal.getName());
- }
- } catch (final Throwable t) {
- LOG.error("Fail to handle signal: {}.", signal, t);
- }
- }
- }
-}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/StorageOptionsFactory.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/util/StorageOptionsFactory.java
deleted file mode 100644
index f137a30..0000000
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/StorageOptionsFactory.java
+++ /dev/null
@@ -1,350 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alipay.sofa.jraft.util;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.rocksdb.BlockBasedTableConfig;
-import org.rocksdb.BloomFilter;
-import org.rocksdb.ColumnFamilyOptions;
-import org.rocksdb.CompactionStyle;
-import org.rocksdb.CompressionType;
-import org.rocksdb.DBOptions;
-import org.rocksdb.IndexType;
-import org.rocksdb.RocksDB;
-import org.rocksdb.RocksObject;
-import org.rocksdb.util.SizeUnit;
-
-/**
- *
- * @author jiachun.fjc
- */
-public final class StorageOptionsFactory {
-
- static {
- RocksDB.loadLibrary();
- }
-
- private static final Map<String, DBOptions> rocksDBOptionsTable = new ConcurrentHashMap<>();
- private static final Map<String, ColumnFamilyOptions> columnFamilyOptionsTable = new ConcurrentHashMap<>();
- private static final Map<String, BlockBasedTableConfig> tableFormatConfigTable = new ConcurrentHashMap<>();
-
- /**
- * Releases all storage options from the responsibility of freeing the
- * underlying native C++ object.
- *
- * Note, that once an instance of options has been released, calling any
- * of its functions will lead to undefined behavior.
- */
- public static void releaseAllOptions() {
- for (final DBOptions opts : rocksDBOptionsTable.values()) {
- if (opts != null) {
- opts.close();
- }
- }
- for (final ColumnFamilyOptions opts : columnFamilyOptionsTable.values()) {
- if (opts != null) {
- opts.close();
- }
- }
- }
-
- /**
- * Users can register a custom rocksdb dboptions, then the related
- * classes will get their options by the key of their own class
- * name. If the user does not register an options, a default options
- * will be provided.
- *
- * @param cls the key of DBOptions
- * @param opts the DBOptions
- */
- public static void registerRocksDBOptions(final Class<?> cls, final DBOptions opts) {
- Requires.requireNonNull(cls, "cls");
- Requires.requireNonNull(opts, "opts");
- if (rocksDBOptionsTable.putIfAbsent(cls.getName(), opts) != null) {
- throw new IllegalStateException("DBOptions with class key [" + cls.getName()
- + "] has already been registered");
- }
- }
-
- /**
- * Get a new default DBOptions or a copy of the exist DBOptions.
- * Users should call DBOptions#close() to release resources themselves.
- *
- * @param cls the key of DBOptions
- * @return new default DBOptions or a copy of the exist DBOptions
- */
- public static DBOptions getRocksDBOptions(final Class<?> cls) {
- Requires.requireNonNull(cls, "cls");
- DBOptions opts = rocksDBOptionsTable.get(cls.getName());
- if (opts == null) {
- final DBOptions newOpts = getDefaultRocksDBOptions();
- opts = rocksDBOptionsTable.putIfAbsent(cls.getName(), newOpts);
- if (opts == null) {
- opts = newOpts;
- } else {
- newOpts.close();
- }
- }
- // NOTE: This does a shallow copy, which means env, rate_limiter,
- // sst_file_manager, info_log and other pointers will be cloned!
- return new DBOptions(checkInvalid(opts));
- }
-
- public static DBOptions getDefaultRocksDBOptions() {
- // Turn based on https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide
- final DBOptions opts = new DBOptions();
-
- // If this value is set to true, then the database will be created if it is
- // missing during {@code RocksDB.open()}.
- opts.setCreateIfMissing(true);
-
- // If true, missing column families will be automatically created.
- opts.setCreateMissingColumnFamilies(true);
-
- // Number of open files that can be used by the DB. You may need to increase
- // this if your database has a large working set. Value -1 means files opened
- // are always kept open.
- opts.setMaxOpenFiles(-1);
-
- // The maximum number of concurrent background compactions. The default is 1,
- // but to fully utilize your CPU and storage you might want to increase this
- // to approximately number of cores in the system.
- opts.setMaxBackgroundCompactions(Math.min(Utils.cpus(), 4));
-
- // The maximum number of concurrent flush operations. It is usually good enough
- // to set this to 1.
- opts.setMaxBackgroundFlushes(1);
-
- return opts;
- }
-
- /**
- * Users can register a custom rocksdb ColumnFamilyOptions, then the
- * related classes will get their options by the key of their own class
- * name. If the user does not register an options, a default options
- * will be provided.
- *
- * @param cls the key of ColumnFamilyOptions
- * @param opts the ColumnFamilyOptions
- */
- public static void registerRocksDBColumnFamilyOptions(final Class<?> cls, final ColumnFamilyOptions opts) {
- Requires.requireNonNull(cls, "cls");
- Requires.requireNonNull(opts, "opts");
- if (columnFamilyOptionsTable.putIfAbsent(cls.getName(), opts) != null) {
- throw new IllegalStateException("ColumnFamilyOptions with class key [" + cls.getName()
- + "] has already been registered");
- }
- }
-
- /**
- * Get a new default ColumnFamilyOptions or a copy of the exist
- * ColumnFamilyOptions. Users should call ColumnFamilyOptions#close()
- * to release resources themselves.
- *
- * @param cls the key of ColumnFamilyOptions
- * @return new default ColumnFamilyOptions or a copy of the exist
- * ColumnFamilyOptions
- */
- public static ColumnFamilyOptions getRocksDBColumnFamilyOptions(final Class<?> cls) {
- Requires.requireNonNull(cls, "cls");
- ColumnFamilyOptions opts = columnFamilyOptionsTable.get(cls.getName());
- if (opts == null) {
- final ColumnFamilyOptions newOpts = getDefaultRocksDBColumnFamilyOptions();
- opts = columnFamilyOptionsTable.putIfAbsent(cls.getName(), newOpts);
- if (opts == null) {
- opts = newOpts;
- } else {
- newOpts.close();
- }
- }
- // NOTE: This does a shallow copy, which means comparator, merge_operator,
- // compaction_filter, compaction_filter_factory and other pointers will be
- // cloned!
- return new ColumnFamilyOptions(checkInvalid(opts));
- }
-
- public static ColumnFamilyOptions getDefaultRocksDBColumnFamilyOptions() {
- final ColumnFamilyOptions opts = new ColumnFamilyOptions();
-
- // Flushing options:
- // write_buffer_size sets the size of a single mem_table. Once mem_table exceeds
- // this size, it is marked immutable and a new one is created.
- opts.setWriteBufferSize(64 * SizeUnit.MB);
-
- // Flushing options:
- // max_write_buffer_number sets the maximum number of mem_tables, both active
- // and immutable. If the active mem_table fills up and the total number of
- // mem_tables is larger than max_write_buffer_number we stall further writes.
- // This may happen if the flush process is slower than the write rate.
- opts.setMaxWriteBufferNumber(3);
-
- // Flushing options:
- // min_write_buffer_number_to_merge is the minimum number of mem_tables to be
- // merged before flushing to storage. For example, if this option is set to 2,
- // immutable mem_tables are only flushed when there are two of them - a single
- // immutable mem_table will never be flushed. If multiple mem_tables are merged
- // together, less data may be written to storage since two updates are merged to
- // a single key. However, every Get() must traverse all immutable mem_tables
- // linearly to check if the key is there. Setting this option too high may hurt
- // read performance.
- opts.setMinWriteBufferNumberToMerge(1);
-
- // Level Style Compaction:
- // level0_file_num_compaction_trigger -- Once level 0 reaches this number of
- // files, L0->L1 compaction is triggered. We can therefore estimate level 0
- // size in stable state as
- // write_buffer_size * min_write_buffer_number_to_merge * level0_file_num_compaction_trigger.
- opts.setLevel0FileNumCompactionTrigger(10);
-
- // Soft limit on number of level-0 files. We start slowing down writes at this
- // point. A value 0 means that no writing slow down will be triggered by number
- // of files in level-0.
- opts.setLevel0SlowdownWritesTrigger(20);
-
- // Maximum number of level-0 files. We stop writes at this point.
- opts.setLevel0StopWritesTrigger(40);
-
- // Level Style Compaction:
- // max_bytes_for_level_base and max_bytes_for_level_multiplier
- // -- max_bytes_for_level_base is total size of level 1. As mentioned, we
- // recommend that this be around the size of level 0. Each subsequent level
- // is max_bytes_for_level_multiplier larger than previous one. The default
- // is 10 and we do not recommend changing that.
- opts.setMaxBytesForLevelBase(512 * SizeUnit.MB);
-
- // Level Style Compaction:
- // target_file_size_base and target_file_size_multiplier
- // -- Files in level 1 will have target_file_size_base bytes. Each next
- // level's file size will be target_file_size_multiplier bigger than previous
- // one. However, by default target_file_size_multiplier is 1, so files in all
- // L1..LMax levels are equal. Increasing target_file_size_base will reduce total
- // number of database files, which is generally a good thing. We recommend setting
- // target_file_size_base to be max_bytes_for_level_base / 10, so that there are
- // 10 files in level 1.
- opts.setTargetFileSizeBase(64 * SizeUnit.MB);
-
- // If prefix_extractor is set and memtable_prefix_bloom_size_ratio is not 0,
- // create prefix bloom for memtable with the size of
- // write_buffer_size * memtable_prefix_bloom_size_ratio.
- // If it is larger than 0.25, it is santinized to 0.25.
- opts.setMemtablePrefixBloomSizeRatio(0.125);
-
- // Seems like the rocksDB jni for Windows doesn't come linked with any of the
- // compression type
- if (!Platform.isWindows()) {
- opts.setCompressionType(CompressionType.LZ4_COMPRESSION) //
- .setCompactionStyle(CompactionStyle.LEVEL) //
- .optimizeLevelStyleCompaction();
- }
-
- // https://github.com/facebook/rocksdb/pull/5744
- opts.setForceConsistencyChecks(true);
-
- return opts;
- }
-
- /**
- * Users can register a custom rocksdb BlockBasedTableConfig, then the related
- * classes will get their options by the key of their own class name. If
- * the user does not register a config, a default config will be provided.
- *
- * @param cls the key of BlockBasedTableConfig
- * @param cfg the BlockBasedTableConfig
- */
- public static void registerRocksDBTableFormatConfig(final Class<?> cls, final BlockBasedTableConfig cfg) {
- Requires.requireNonNull(cls, "cls");
- Requires.requireNonNull(cfg, "cfg");
- if (tableFormatConfigTable.putIfAbsent(cls.getName(), cfg) != null) {
- throw new IllegalStateException("TableFormatConfig with class key [" + cls.getName()
- + "] has already been registered");
- }
- }
-
- /**
- * Get a new default TableFormatConfig or a copy of the exist ableFormatConfig.
- *
- * @param cls the key of TableFormatConfig
- * @return new default TableFormatConfig or a copy of the exist TableFormatConfig
- */
- public static BlockBasedTableConfig getRocksDBTableFormatConfig(final Class<?> cls) {
- Requires.requireNonNull(cls, "cls");
- BlockBasedTableConfig cfg = tableFormatConfigTable.get(cls.getName());
- if (cfg == null) {
- final BlockBasedTableConfig newCfg = getDefaultRocksDBTableConfig();
- cfg = tableFormatConfigTable.putIfAbsent(cls.getName(), newCfg);
- if (cfg == null) {
- cfg = newCfg;
- }
- }
- return copyTableFormatConfig(cfg);
- }
-
- public static BlockBasedTableConfig getDefaultRocksDBTableConfig() {
- // See https://github.com/sofastack/sofa-jraft/pull/156
- return new BlockBasedTableConfig() //
- // Begin to use partitioned index filters
- // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters#how-to-use-it
- .setIndexType(IndexType.kTwoLevelIndexSearch) //
- .setFilter(new BloomFilter(16, false)) //
- .setPartitionFilters(true) //
- .setMetadataBlockSize(8 * SizeUnit.KB) //
- .setCacheIndexAndFilterBlocks(false) //
- .setCacheIndexAndFilterBlocksWithHighPriority(true) //
- .setPinL0FilterAndIndexBlocksInCache(true) //
- // End of partitioned index filters settings.
- .setBlockSize(4 * SizeUnit.KB)//
- .setBlockCacheSize(512 * SizeUnit.MB) //
- .setCacheNumShardBits(8);
- }
-
- private static BlockBasedTableConfig copyTableFormatConfig(final BlockBasedTableConfig cfg) {
- return new BlockBasedTableConfig() //
- .setNoBlockCache(cfg.noBlockCache()) //
- .setBlockCacheSize(cfg.blockCacheSize()) //
- .setCacheNumShardBits(cfg.cacheNumShardBits()) //
- .setBlockSize(cfg.blockSize()) //
- .setBlockSizeDeviation(cfg.blockSizeDeviation()) //
- .setBlockRestartInterval(cfg.blockRestartInterval()) //
- .setWholeKeyFiltering(cfg.wholeKeyFiltering()) //
- .setCacheIndexAndFilterBlocks(cfg.cacheIndexAndFilterBlocks()) //
- .setCacheIndexAndFilterBlocksWithHighPriority(cfg.cacheIndexAndFilterBlocksWithHighPriority()) //
- .setPinL0FilterAndIndexBlocksInCache(cfg.pinL0FilterAndIndexBlocksInCache()) //
- .setPartitionFilters(cfg.partitionFilters()) //
- .setMetadataBlockSize(cfg.metadataBlockSize()) //
- .setPinTopLevelIndexAndFilter(cfg.pinTopLevelIndexAndFilter()) //
- .setHashIndexAllowCollision(cfg.hashIndexAllowCollision()) //
- .setBlockCacheCompressedSize(cfg.blockCacheCompressedSize()) //
- .setBlockCacheCompressedNumShardBits(cfg.blockCacheCompressedNumShardBits()) //
- .setChecksumType(cfg.checksumType()) //
- .setIndexType(cfg.indexType()) //
- .setFormatVersion(cfg.formatVersion());
- }
-
- private static <T extends RocksObject> T checkInvalid(final T opts) {
- if (!opts.isOwningHandle()) {
- throw new IllegalStateException(
- "the instance of options [" + opts
- + "] has been released, calling any of its functions will lead to undefined behavior.");
- }
- return opts;
- }
-
- private StorageOptionsFactory() {
- }
-}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/internal/ThrowUtil.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/util/internal/ThrowUtil.java
index c179a89..71b1829 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/internal/ThrowUtil.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/util/internal/ThrowUtil.java
@@ -30,11 +30,13 @@ public final class ThrowUtil {
* Raises an exception bypassing compiler checks for checked exceptions.
*/
public static void throwException(final Throwable t) {
- if (UnsafeUtil.hasUnsafe()) {
- UnsafeUtil.throwException(t);
- } else {
- ThrowUtil.throwException0(t);
- }
+// if (UnsafeUtil.hasUnsafe()) {
+// UnsafeUtil.throwException(t);
+// } else {
+// ThrowUtil.throwException0(t);
+// }
+
+ ThrowUtil.throwException0(t);
}
/**
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/internal/UnsafeIntegerFieldUpdater.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/util/internal/UnsafeIntegerFieldUpdater.java
deleted file mode 100644
index 4bc081c..0000000
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/internal/UnsafeIntegerFieldUpdater.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alipay.sofa.jraft.util.internal;
-
-import java.lang.reflect.Field;
-import sun.misc.Unsafe;
-
-/**
- *
- * @author jiachun.fjc
- */
-final class UnsafeIntegerFieldUpdater<U> implements IntegerFieldUpdater<U> {
-
- private final long offset;
- private final Unsafe unsafe;
-
- UnsafeIntegerFieldUpdater(Unsafe unsafe, Class<? super U> tClass, String fieldName) throws NoSuchFieldException {
- final Field field = tClass.getDeclaredField(fieldName);
- if (unsafe == null) {
- throw new NullPointerException("unsafe");
- }
- this.unsafe = unsafe;
- this.offset = unsafe.objectFieldOffset(field);
- }
-
- @Override
- public void set(final U obj, final int newValue) {
- this.unsafe.putInt(obj, this.offset, newValue);
- }
-
- @Override
- public int get(final U obj) {
- return this.unsafe.getInt(obj, this.offset);
- }
-}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/internal/UnsafeReferenceFieldUpdater.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/util/internal/UnsafeReferenceFieldUpdater.java
deleted file mode 100644
index 9c32c56..0000000
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/internal/UnsafeReferenceFieldUpdater.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alipay.sofa.jraft.util.internal;
-
-import java.lang.reflect.Field;
-import sun.misc.Unsafe;
-
-/**
- *
- * @author jiachun.fjc
- */
-@SuppressWarnings("unchecked")
-final class UnsafeReferenceFieldUpdater<U, W> implements ReferenceFieldUpdater<U, W> {
-
- private final long offset;
- private final Unsafe unsafe;
-
- UnsafeReferenceFieldUpdater(Unsafe unsafe, Class<? super U> tClass, String fieldName) throws NoSuchFieldException {
- final Field field = tClass.getDeclaredField(fieldName);
- if (unsafe == null) {
- throw new NullPointerException("unsafe");
- }
- this.unsafe = unsafe;
- this.offset = unsafe.objectFieldOffset(field);
- }
-
- @Override
- public void set(final U obj, final W newValue) {
- this.unsafe.putObject(obj, this.offset, newValue);
- }
-
- @Override
- public W get(final U obj) {
- return (W) this.unsafe.getObject(obj, this.offset);
- }
-}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/internal/UnsafeUtf8Util.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/util/internal/UnsafeUtf8Util.java
deleted file mode 100644
index d3706ea..0000000
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/internal/UnsafeUtf8Util.java
+++ /dev/null
@@ -1,481 +0,0 @@
-// Protocol Buffers - Google's data interchange format
-// Copyright 2008 Google Inc. All rights reserved.
-// https://developers.google.com/protocol-buffers/
-//
-// Redistribution and use in source and binary forms, with or without
-// modification, are permitted provided that the following conditions are
-// met:
-//
-// * Redistributions of source code must retain the above copyright
-// notice, this list of conditions and the following disclaimer.
-// * Redistributions in binary form must reproduce the above
-// copyright notice, this list of conditions and the following disclaimer
-// in the documentation and/or other materials provided with the
-// distribution.
-// * Neither the name of Google Inc. nor the names of its
-// contributors may be used to endorse or promote products derived from
-// this software without specific prior written permission.
-//
-// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-package com.alipay.sofa.jraft.util.internal;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-
-import static java.lang.Character.MAX_SURROGATE;
-import static java.lang.Character.MIN_HIGH_SURROGATE;
-import static java.lang.Character.MIN_LOW_SURROGATE;
-import static java.lang.Character.MIN_SUPPLEMENTARY_CODE_POINT;
-import static java.lang.Character.MIN_SURROGATE;
-import static java.lang.Character.isSurrogatePair;
-import static java.lang.Character.toCodePoint;
-
-/**
- *
- * Refer to the implementation of protobuf: <A>https://github.com/protocolbuffers/protobuf/blob/master/java/core/src/main/java/com/google/protobuf/Utf8.java<A/>.
- */
-public final class UnsafeUtf8Util {
-
- /**
- * Maximum number of bytes per Java UTF-16 char in UTF-8.
- *
- * @see java.nio.charset.CharsetEncoder#maxBytesPerChar()
- */
- public static final int MAX_BYTES_PER_CHAR = 3;
-
- public static String decodeUtf8(byte[] bytes, int index, int size) {
- if ((index | size | bytes.length - index - size) < 0) {
- throw new ArrayIndexOutOfBoundsException("buffer length=" + bytes.length + ", index=" + index + ", size="
- + size);
- }
-
- int offset = index;
- final int limit = offset + size;
-
- // The longest possible resulting String is the same as the number of input bytes, when it is
- // all ASCII. For other cases, this over-allocates and we will truncate in the end.
- char[] resultArr = new char[size];
- int resultPos = 0;
-
- // Optimize for 100% ASCII (Hotspot loves small simple top-level loops like this).
- // This simple loop stops when we encounter a byte >= 0x80 (i.e. non-ASCII).
- while (offset < limit) {
- byte b = UnsafeUtil.getByte(bytes, offset);
- if (!DecodeUtil.isOneByte(b)) {
- break;
- }
- offset++;
- DecodeUtil.handleOneByte(b, resultArr, resultPos++);
- }
-
- while (offset < limit) {
- byte byte1 = UnsafeUtil.getByte(bytes, offset++);
- if (DecodeUtil.isOneByte(byte1)) {
- DecodeUtil.handleOneByte(byte1, resultArr, resultPos++);
- // It's common for there to be multiple ASCII characters in a run mixed in, so add an
- // extra optimized loop to take care of these runs.
- while (offset < limit) {
- byte b = UnsafeUtil.getByte(bytes, offset);
- if (!DecodeUtil.isOneByte(b)) {
- break;
- }
- offset++;
- DecodeUtil.handleOneByte(b, resultArr, resultPos++);
- }
- } else if (DecodeUtil.isTwoBytes(byte1)) {
- if (offset >= limit) {
- throw invalidUtf8();
- }
- DecodeUtil.handleTwoBytes(byte1, /* byte2 */UnsafeUtil.getByte(bytes, offset++), resultArr,
- resultPos++);
- } else if (DecodeUtil.isThreeBytes(byte1)) {
- if (offset >= limit - 1) {
- throw invalidUtf8();
- }
- DecodeUtil.handleThreeBytes(byte1,
- /* byte2 */UnsafeUtil.getByte(bytes, offset++),
- /* byte3 */UnsafeUtil.getByte(bytes, offset++), resultArr, resultPos++);
- } else {
- if (offset >= limit - 2) {
- throw invalidUtf8();
- }
- DecodeUtil.handleFourBytes(byte1,
- /* byte2 */UnsafeUtil.getByte(bytes, offset++),
- /* byte3 */UnsafeUtil.getByte(bytes, offset++),
- /* byte4 */UnsafeUtil.getByte(bytes, offset++), resultArr, resultPos++);
- // 4-byte case requires two chars.
- resultPos++;
- }
- }
-
- if (resultPos < resultArr.length) {
- resultArr = Arrays.copyOf(resultArr, resultPos);
- }
- return UnsafeUtil.moveToString(resultArr);
- }
-
- public static String decodeUtf8Direct(ByteBuffer buffer, int index, int size) {
- // Bitwise OR combines the sign bits so any negative value fails the check.
- if ((index | size | buffer.limit() - index - size) < 0) {
- throw new ArrayIndexOutOfBoundsException("buffer limit=" + buffer.limit() + ", index=" + index + ", limit="
- + size);
- }
- long address = UnsafeUtil.addressOffset(buffer) + index;
- final long addressLimit = address + size;
-
- // The longest possible resulting String is the same as the number of input bytes, when it is
- // all ASCII. For other cases, this over-allocates and we will truncate in the end.
- char[] resultArr = new char[size];
- int resultPos = 0;
-
- // Optimize for 100% ASCII (Hotspot loves small simple top-level loops like this).
- // This simple loop stops when we encounter a byte >= 0x80 (i.e. non-ASCII).
- while (address < addressLimit) {
- byte b = UnsafeUtil.getByte(address);
- if (!DecodeUtil.isOneByte(b)) {
- break;
- }
- address++;
- DecodeUtil.handleOneByte(b, resultArr, resultPos++);
- }
-
- while (address < addressLimit) {
- byte byte1 = UnsafeUtil.getByte(address++);
- if (DecodeUtil.isOneByte(byte1)) {
- DecodeUtil.handleOneByte(byte1, resultArr, resultPos++);
- // It's common for there to be multiple ASCII characters in a run mixed in, so add an
- // extra optimized loop to take care of these runs.
- while (address < addressLimit) {
- byte b = UnsafeUtil.getByte(address);
- if (!DecodeUtil.isOneByte(b)) {
- break;
- }
- address++;
- DecodeUtil.handleOneByte(b, resultArr, resultPos++);
- }
- } else if (DecodeUtil.isTwoBytes(byte1)) {
- if (address >= addressLimit) {
- throw invalidUtf8();
- }
- DecodeUtil.handleTwoBytes(byte1, /* byte2 */UnsafeUtil.getByte(address++), resultArr, resultPos++);
- } else if (DecodeUtil.isThreeBytes(byte1)) {
- if (address >= addressLimit - 1) {
- throw invalidUtf8();
- }
- DecodeUtil.handleThreeBytes(byte1,
- /* byte2 */UnsafeUtil.getByte(address++),
- /* byte3 */UnsafeUtil.getByte(address++), resultArr, resultPos++);
- } else {
- if (address >= addressLimit - 2) {
- throw invalidUtf8();
- }
- DecodeUtil.handleFourBytes(byte1,
- /* byte2 */UnsafeUtil.getByte(address++),
- /* byte3 */UnsafeUtil.getByte(address++),
- /* byte4 */UnsafeUtil.getByte(address++), resultArr, resultPos++);
- // 4-byte case requires two chars.
- resultPos++;
- }
- }
-
- if (resultPos < resultArr.length) {
- resultArr = Arrays.copyOf(resultArr, resultPos);
- }
- return UnsafeUtil.moveToString(resultArr);
- }
-
- public static int encodeUtf8(CharSequence in, byte[] out, int offset, int length) {
- long outIx = offset;
- final long outLimit = outIx + length;
- final int inLimit = in.length();
- if (inLimit > length || out.length - length < offset) {
- // Not even enough room for an ASCII-encoded string.
- throw new ArrayIndexOutOfBoundsException("Failed writing " + in.charAt(inLimit - 1) + " at index "
- + (offset + length));
- }
-
- // Designed to take advantage of
- // https://wikis.oracle.com/display/HotSpotInternals/RangeCheckElimination
- int inIx = 0;
- for (char c; inIx < inLimit && (c = in.charAt(inIx)) < 0x80; ++inIx) {
- UnsafeUtil.putByte(out, outIx++, (byte) c);
- }
- if (inIx == inLimit) {
- // We're done, it was ASCII encoded.
- return (int) outIx;
- }
-
- for (char c; inIx < inLimit; ++inIx) {
- c = in.charAt(inIx);
- if (c < 0x80 && outIx < outLimit) {
- UnsafeUtil.putByte(out, outIx++, (byte) c);
- } else if (c < 0x800 && outIx <= outLimit - 2L) { // 11 bits, two UTF-8 bytes
- UnsafeUtil.putByte(out, outIx++, (byte) ((0xF << 6) | (c >>> 6)));
- UnsafeUtil.putByte(out, outIx++, (byte) (0x80 | (0x3F & c)));
- } else if ((c < MIN_SURROGATE || MAX_SURROGATE < c) && outIx <= outLimit - 3L) {
- // Maximum single-char code point is 0xFFFF, 16 bits, three UTF-8 bytes
- UnsafeUtil.putByte(out, outIx++, (byte) ((0xF << 5) | (c >>> 12)));
- UnsafeUtil.putByte(out, outIx++, (byte) (0x80 | (0x3F & (c >>> 6))));
- UnsafeUtil.putByte(out, outIx++, (byte) (0x80 | (0x3F & c)));
- } else if (outIx <= outLimit - 4L) {
- // Minimum code point represented by a surrogate pair is 0x10000, 17 bits, four UTF-8
- // bytes
- final char low;
- if (inIx + 1 == inLimit || !isSurrogatePair(c, (low = in.charAt(++inIx)))) {
- throw new IllegalArgumentException("Unpaired surrogate at index " + (inIx - 1) + " of " + inLimit);
- }
- int codePoint = toCodePoint(c, low);
- UnsafeUtil.putByte(out, outIx++, (byte) ((0xF << 4) | (codePoint >>> 18)));
- UnsafeUtil.putByte(out, outIx++, (byte) (0x80 | (0x3F & (codePoint >>> 12))));
- UnsafeUtil.putByte(out, outIx++, (byte) (0x80 | (0x3F & (codePoint >>> 6))));
- UnsafeUtil.putByte(out, outIx++, (byte) (0x80 | (0x3F & codePoint)));
- } else {
- if ((MIN_SURROGATE <= c && c <= MAX_SURROGATE)
- && (inIx + 1 == inLimit || !isSurrogatePair(c, in.charAt(inIx + 1)))) {
- // We are surrogates and we're not a surrogate pair.
- throw new IllegalArgumentException("Unpaired surrogate at index " + inIx + " of " + inLimit);
- }
- // Not enough space in the output buffer.
- throw new ArrayIndexOutOfBoundsException("Failed writing " + c + " at index " + outIx);
- }
- }
-
- // All bytes have been encoded.
- return (int) outIx;
- }
-
- public static void encodeUtf8Direct(CharSequence in, ByteBuffer out) {
- final long address = UnsafeUtil.addressOffset(out);
- long outIx = address + out.position();
- final long outLimit = address + out.limit();
- final int inLimit = in.length();
- if (inLimit > outLimit - outIx) {
- // Not even enough room for an ASCII-encoded string.
- throw new ArrayIndexOutOfBoundsException("Failed writing " + in.charAt(inLimit - 1) + " at index "
- + out.limit());
- }
-
- // Designed to take advantage of
- // https://wikis.oracle.com/display/HotSpotInternals/RangeCheckElimination
- int inIx = 0;
- for (char c; inIx < inLimit && (c = in.charAt(inIx)) < 0x80; ++inIx) {
- UnsafeUtil.putByte(outIx++, (byte) c);
- }
- if (inIx == inLimit) {
- // We're done, it was ASCII encoded.
- out.position((int) (outIx - address));
- return;
- }
-
- for (char c; inIx < inLimit; ++inIx) {
- c = in.charAt(inIx);
- if (c < 0x80 && outIx < outLimit) {
- UnsafeUtil.putByte(outIx++, (byte) c);
- } else if (c < 0x800 && outIx <= outLimit - 2L) { // 11 bits, two UTF-8 bytes
- UnsafeUtil.putByte(outIx++, (byte) ((0xF << 6) | (c >>> 6)));
- UnsafeUtil.putByte(outIx++, (byte) (0x80 | (0x3F & c)));
- } else if ((c < MIN_SURROGATE || MAX_SURROGATE < c) && outIx <= outLimit - 3L) {
- // Maximum single-char code point is 0xFFFF, 16 bits, three UTF-8 bytes
- UnsafeUtil.putByte(outIx++, (byte) ((0xF << 5) | (c >>> 12)));
- UnsafeUtil.putByte(outIx++, (byte) (0x80 | (0x3F & (c >>> 6))));
- UnsafeUtil.putByte(outIx++, (byte) (0x80 | (0x3F & c)));
- } else if (outIx <= outLimit - 4L) {
- // Minimum code point represented by a surrogate pair is 0x10000, 17 bits, four UTF-8
- // bytes
- final char low;
- if (inIx + 1 == inLimit || !isSurrogatePair(c, (low = in.charAt(++inIx)))) {
- throw new IllegalArgumentException("Unpaired surrogate at index " + (inIx - 1) + " of " + inLimit);
- }
- int codePoint = toCodePoint(c, low);
- UnsafeUtil.putByte(outIx++, (byte) ((0xF << 4) | (codePoint >>> 18)));
- UnsafeUtil.putByte(outIx++, (byte) (0x80 | (0x3F & (codePoint >>> 12))));
- UnsafeUtil.putByte(outIx++, (byte) (0x80 | (0x3F & (codePoint >>> 6))));
- UnsafeUtil.putByte(outIx++, (byte) (0x80 | (0x3F & codePoint)));
- } else {
- if ((MIN_SURROGATE <= c && c <= MAX_SURROGATE)
- && (inIx + 1 == inLimit || !isSurrogatePair(c, in.charAt(inIx + 1)))) {
- // We are surrogates and we're not a surrogate pair.
- throw new IllegalArgumentException("Unpaired surrogate at index " + inIx + " of " + inLimit);
- }
- // Not enough space in the output buffer.
- throw new ArrayIndexOutOfBoundsException("Failed writing " + c + " at index " + outIx);
- }
- }
-
- // All bytes have been encoded.
- out.position((int) (outIx - address));
- }
-
- /**
- * Returns the number of bytes in the UTF-8-encoded form of {@code sequence}. For a string,
- * this method is equivalent to {@code string.getBytes(UTF_8).length}, but is more efficient in
- * both time and space.
- *
- * @throws IllegalArgumentException if {@code sequence} contains ill-formed UTF-16 (unpaired
- * surrogates)
- */
- public static int encodedLength(CharSequence sequence) {
- // Warning to maintainers: this implementation is highly optimized.
- int utf16Length = sequence.length();
- int utf8Length = utf16Length;
- int i = 0;
-
- // This loop optimizes for pure ASCII.
- while (i < utf16Length && sequence.charAt(i) < 0x80) {
- i++;
- }
-
- // This loop optimizes for chars less than 0x800.
- for (; i < utf16Length; i++) {
- char c = sequence.charAt(i);
- if (c < 0x800) {
- utf8Length += ((0x7f - c) >>> 31); // branch free!
- } else {
- utf8Length += encodedLengthGeneral(sequence, i);
- break;
- }
- }
-
- if (utf8Length < utf16Length) {
- // Necessary and sufficient condition for overflow because of maximum 3x expansion
- throw new IllegalArgumentException("UTF-8 length does not fit in int: " + (utf8Length + (1L << 32)));
- }
- return utf8Length;
- }
-
- private static int encodedLengthGeneral(CharSequence sequence, int start) {
- int utf16Length = sequence.length();
- int utf8Length = 0;
- for (int i = start; i < utf16Length; i++) {
- char c = sequence.charAt(i);
- if (c < 0x800) {
- utf8Length += (0x7f - c) >>> 31; // branch free!
- } else {
- utf8Length += 2;
- // jdk7+: if (Character.isSurrogate(c)) {
- if (Character.MIN_SURROGATE <= c && c <= Character.MAX_SURROGATE) {
- // Check that we have a well-formed surrogate pair.
- int cp = Character.codePointAt(sequence, i);
- if (cp < MIN_SUPPLEMENTARY_CODE_POINT) {
- throw new IllegalArgumentException("Unpaired surrogate at index " + i + " of " + utf16Length);
- }
- i++;
- }
- }
- }
- return utf8Length;
- }
-
- /**
- * Utility methods for decoding bytes into {@link String}. Callers are responsible for extracting
- * bytes (possibly using Unsafe methods), and checking remaining bytes. All other UTF-8 validity
- * checks and codepoint conversion happen in this class.
- */
- private static class DecodeUtil {
-
- /**
- * Returns whether this is a single-byte codepoint (i.e., ASCII) with the form '0XXXXXXX'.
- */
- private static boolean isOneByte(byte b) {
- return b >= 0;
- }
-
- /**
- * Returns whether this is a two-byte codepoint with the form '10XXXXXX'.
- */
- private static boolean isTwoBytes(byte b) {
- return b < (byte) 0xE0;
- }
-
- /**
- * Returns whether this is a three-byte codepoint with the form '110XXXXX'.
- */
- private static boolean isThreeBytes(byte b) {
- return b < (byte) 0xF0;
- }
-
- private static void handleOneByte(byte byte1, char[] resultArr, int resultPos) {
- resultArr[resultPos] = (char) byte1;
- }
-
- private static void handleTwoBytes(byte byte1, byte byte2, char[] resultArr, int resultPos) {
- // Simultaneously checks for illegal trailing-byte in leading position (<= '11000000') and
- // overlong 2-byte, '11000001'.
- if (byte1 < (byte) 0xC2 || isNotTrailingByte(byte2)) {
- throw invalidUtf8();
- }
- resultArr[resultPos] = (char) (((byte1 & 0x1F) << 6) | trailingByteValue(byte2));
- }
-
- private static void handleThreeBytes(byte byte1, byte byte2, byte byte3, char[] resultArr, int resultPos) {
- if (isNotTrailingByte(byte2)
- // overlong? 5 most significant bits must not all be zero
- || (byte1 == (byte) 0xE0 && byte2 < (byte) 0xA0)
- // check for illegal surrogate codepoints
- || (byte1 == (byte) 0xED && byte2 >= (byte) 0xA0) || isNotTrailingByte(byte3)) {
- throw invalidUtf8();
- }
- resultArr[resultPos] = (char) (((byte1 & 0x0F) << 12) | (trailingByteValue(byte2) << 6) | trailingByteValue(byte3));
- }
-
- private static void handleFourBytes(byte byte1, byte byte2, byte byte3, byte byte4, char[] resultArr,
- int resultPos) {
- if (isNotTrailingByte(byte2)
- // Check that 1 <= plane <= 16. Tricky optimized form of:
- // valid 4-byte leading byte?
- // if (byte1 > (byte) 0xF4 ||
- // overlong? 4 most significant bits must not all be zero
- // byte1 == (byte) 0xF0 && byte2 < (byte) 0x90 ||
- // codepoint larger than the highest code point (U+10FFFF)?
- // byte1 == (byte) 0xF4 && byte2 > (byte) 0x8F)
- || (((byte1 << 28) + (byte2 - (byte) 0x90)) >> 30) != 0 || isNotTrailingByte(byte3)
- || isNotTrailingByte(byte4)) {
- throw invalidUtf8();
- }
- int codePoint = ((byte1 & 0x07) << 18) | (trailingByteValue(byte2) << 12) | (trailingByteValue(byte3) << 6)
- | trailingByteValue(byte4);
- resultArr[resultPos] = DecodeUtil.highSurrogate(codePoint);
- resultArr[resultPos + 1] = DecodeUtil.lowSurrogate(codePoint);
- }
-
- /**
- * Returns whether the byte is not a valid continuation of the form '10XXXXXX'.
- */
- private static boolean isNotTrailingByte(byte b) {
- return b > (byte) 0xBF;
- }
-
- /**
- * Returns the actual value of the trailing byte (removes the prefix '10') for composition.
- */
- private static int trailingByteValue(byte b) {
- return b & 0x3F;
- }
-
- private static char highSurrogate(int codePoint) {
- return (char) ((MIN_HIGH_SURROGATE - (MIN_SUPPLEMENTARY_CODE_POINT >>> 10)) + (codePoint >>> 10));
- }
-
- private static char lowSurrogate(int codePoint) {
- return (char) (MIN_LOW_SURROGATE + (codePoint & 0x3ff));
- }
- }
-
- static IllegalStateException invalidUtf8() {
- return new IllegalStateException("Message had invalid UTF-8.");
- }
-
- private UnsafeUtf8Util() {
- }
-}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/internal/UnsafeUtil.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/util/internal/UnsafeUtil.java
index 02440f7..836d5f3 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/internal/UnsafeUtil.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/util/internal/UnsafeUtil.java
@@ -30,600 +30,600 @@ import org.slf4j.LoggerFactory;
*
* @author jiachun.fjc
*/
-@SuppressWarnings("ConstantConditions")
-public final class UnsafeUtil {
-
- private static final Logger LOG = LoggerFactory.getLogger(UnsafeUtil.class);
-
- private static final Object UNSAFE = getUnsafe0();
-
- private static final UnsafeAccessor UNSAFE_ACCESSOR = getUnsafeAccessor0();
-
- private static final long BYTE_ARRAY_BASE_OFFSET = arrayBaseOffset(byte[].class);
- // Micro-optimization: we can assume a scale of 1 and skip the multiply
- // private static final long BYTE_ARRAY_INDEX_SCALE = 1;
-
- private static final long BOOLEAN_ARRAY_BASE_OFFSET = arrayBaseOffset(boolean[].class);
- private static final long BOOLEAN_ARRAY_INDEX_SCALE = arrayIndexScale(boolean[].class);
-
- private static final long INT_ARRAY_BASE_OFFSET = arrayBaseOffset(int[].class);
- private static final long INT_ARRAY_INDEX_SCALE = arrayIndexScale(int[].class);
-
- private static final long LONG_ARRAY_BASE_OFFSET = arrayBaseOffset(long[].class);
- private static final long LONG_ARRAY_INDEX_SCALE = arrayIndexScale(long[].class);
-
- private static final long FLOAT_ARRAY_BASE_OFFSET = arrayBaseOffset(float[].class);
- private static final long FLOAT_ARRAY_INDEX_SCALE = arrayIndexScale(float[].class);
-
- private static final long DOUBLE_ARRAY_BASE_OFFSET = arrayBaseOffset(double[].class);
- private static final long DOUBLE_ARRAY_INDEX_SCALE = arrayIndexScale(double[].class);
-
- private static final long OBJECT_ARRAY_BASE_OFFSET = arrayBaseOffset(Object[].class);
- private static final long OBJECT_ARRAY_INDEX_SCALE = arrayIndexScale(Object[].class);
-
- private static final long BUFFER_ADDRESS_OFFSET = objectFieldOffset(bufferAddressField());
-
- private static final long STRING_VALUE_OFFSET = objectFieldOffset(stringValueField());
-
- /**
- * Whether or not can use the unsafe api.
- */
- public static boolean hasUnsafe() {
- return UNSAFE != null;
- }
-
- /**
- * Get a {@link UnsafeAccessor} appropriate for the platform.
- */
- public static UnsafeAccessor getUnsafeAccessor() {
- return UNSAFE_ACCESSOR;
- }
-
- public static byte getByte(final Object target, final long offset) {
- return UNSAFE_ACCESSOR.getByte(target, offset);
- }
-
- public static void putByte(final Object target, final long offset, final byte value) {
- UNSAFE_ACCESSOR.putByte(target, offset, value);
- }
-
- public static int getInt(final Object target, final long offset) {
- return UNSAFE_ACCESSOR.getInt(target, offset);
- }
-
- public static void putInt(final Object target, final long offset, final int value) {
- UNSAFE_ACCESSOR.putInt(target, offset, value);
- }
-
- public static long getLong(final Object target, final long offset) {
- return UNSAFE_ACCESSOR.getLong(target, offset);
- }
-
- public static void putLong(final Object target, final long offset, final long value) {
- UNSAFE_ACCESSOR.putLong(target, offset, value);
- }
-
- public static boolean getBoolean(final Object target, final long offset) {
- return UNSAFE_ACCESSOR.getBoolean(target, offset);
- }
-
- public static void putBoolean(final Object target, final long offset, final boolean value) {
- UNSAFE_ACCESSOR.putBoolean(target, offset, value);
- }
-
- public static float getFloat(final Object target, final long offset) {
- return UNSAFE_ACCESSOR.getFloat(target, offset);
- }
-
- public static void putFloat(final Object target, final long offset, final float value) {
- UNSAFE_ACCESSOR.putFloat(target, offset, value);
- }
-
- public static double getDouble(final Object target, final long offset) {
- return UNSAFE_ACCESSOR.getDouble(target, offset);
- }
-
- public static void putDouble(final Object target, final long offset, final double value) {
- UNSAFE_ACCESSOR.putDouble(target, offset, value);
- }
-
- public static Object getObject(final Object target, final long offset) {
- return UNSAFE_ACCESSOR.getObject(target, offset);
- }
-
- public static void putObject(final Object target, final long offset, final Object value) {
- UNSAFE_ACCESSOR.putObject(target, offset, value);
- }
-
- public static byte getByte(final byte[] target, final long index) {
- return UNSAFE_ACCESSOR.getByte(target, BYTE_ARRAY_BASE_OFFSET + index);
- }
-
- public static void putByte(final byte[] target, final long index, final byte value) {
- UNSAFE_ACCESSOR.putByte(target, BYTE_ARRAY_BASE_OFFSET + index, value);
- }
-
- public static int getInt(final int[] target, final long index) {
- return UNSAFE_ACCESSOR.getInt(target, INT_ARRAY_BASE_OFFSET + (index * INT_ARRAY_INDEX_SCALE));
- }
-
- public static void putInt(final int[] target, final long index, final int value) {
- UNSAFE_ACCESSOR.putInt(target, INT_ARRAY_BASE_OFFSET + (index * INT_ARRAY_INDEX_SCALE), value);
- }
-
- public static long getLong(final long[] target, final long index) {
- return UNSAFE_ACCESSOR.getLong(target, LONG_ARRAY_BASE_OFFSET + (index * LONG_ARRAY_INDEX_SCALE));
- }
-
- public static void putLong(final long[] target, final long index, final long value) {
- UNSAFE_ACCESSOR.putLong(target, LONG_ARRAY_BASE_OFFSET + (index * LONG_ARRAY_INDEX_SCALE), value);
- }
-
- public static boolean getBoolean(final boolean[] target, final long index) {
- return UNSAFE_ACCESSOR.getBoolean(target, BOOLEAN_ARRAY_BASE_OFFSET + (index * BOOLEAN_ARRAY_INDEX_SCALE));
- }
-
- public static void putBoolean(final boolean[] target, final long index, final boolean value) {
- UNSAFE_ACCESSOR.putBoolean(target, BOOLEAN_ARRAY_BASE_OFFSET + (index * BOOLEAN_ARRAY_INDEX_SCALE), value);
- }
-
- public static float getFloat(final float[] target, final long index) {
- return UNSAFE_ACCESSOR.getFloat(target, FLOAT_ARRAY_BASE_OFFSET + (index * FLOAT_ARRAY_INDEX_SCALE));
- }
-
- public static void putFloat(final float[] target, final long index, final float value) {
- UNSAFE_ACCESSOR.putFloat(target, FLOAT_ARRAY_BASE_OFFSET + (index * FLOAT_ARRAY_INDEX_SCALE), value);
- }
-
- public static double getDouble(final double[] target, final long index) {
- return UNSAFE_ACCESSOR.getDouble(target, DOUBLE_ARRAY_BASE_OFFSET + (index * DOUBLE_ARRAY_INDEX_SCALE));
- }
-
- public static void putDouble(final double[] target, final long index, final double value) {
- UNSAFE_ACCESSOR.putDouble(target, DOUBLE_ARRAY_BASE_OFFSET + (index * DOUBLE_ARRAY_INDEX_SCALE), value);
- }
-
- public static Object getObject(final Object[] target, final long index) {
- return UNSAFE_ACCESSOR.getObject(target, OBJECT_ARRAY_BASE_OFFSET + (index * OBJECT_ARRAY_INDEX_SCALE));
- }
-
- public static void putObject(final Object[] target, final long index, final Object value) {
- UNSAFE_ACCESSOR.putObject(target, OBJECT_ARRAY_BASE_OFFSET + (index * OBJECT_ARRAY_INDEX_SCALE), value);
- }
-
- public static byte getByte(final long address) {
- return UNSAFE_ACCESSOR.getByte(address);
- }
-
- public static void putByte(final long address, final byte value) {
- UNSAFE_ACCESSOR.putByte(address, value);
- }
-
- public static int getInt(final long address) {
- return UNSAFE_ACCESSOR.getInt(address);
- }
-
- public static void putInt(final long address, final int value) {
- UNSAFE_ACCESSOR.putInt(address, value);
- }
-
- public static long getLong(final long address) {
- return UNSAFE_ACCESSOR.getLong(address);
- }
-
- public static void putLong(final long address, final long value) {
- UNSAFE_ACCESSOR.putLong(address, value);
- }
-
- public static byte getByteVolatile(final byte[] target, final long index) {
- return UNSAFE_ACCESSOR.getByteVolatile(target, BYTE_ARRAY_BASE_OFFSET + index);
- }
-
- public static void putByteVolatile(final byte[] target, final long index, final byte value) {
- UNSAFE_ACCESSOR.putByteVolatile(target, BYTE_ARRAY_BASE_OFFSET + index, value);
- }
-
- public static int getIntVolatile(final int[] target, final long index) {
- return UNSAFE_ACCESSOR.getIntVolatile(target, INT_ARRAY_BASE_OFFSET + (index * INT_ARRAY_INDEX_SCALE));
- }
-
- public static void putIntVolatile(final int[] target, final long index, final int value) {
- UNSAFE_ACCESSOR.putIntVolatile(target, INT_ARRAY_BASE_OFFSET + (index * INT_ARRAY_INDEX_SCALE), value);
- }
-
- public static long getLongVolatile(final long[] target, final long index) {
- return UNSAFE_ACCESSOR.getLongVolatile(target, LONG_ARRAY_BASE_OFFSET + (index * LONG_ARRAY_INDEX_SCALE));
- }
-
- public static void putLongVolatile(final long[] target, final long index, final long value) {
- UNSAFE_ACCESSOR.putLongVolatile(target, LONG_ARRAY_BASE_OFFSET + (index * LONG_ARRAY_INDEX_SCALE), value);
- }
-
- public static boolean getBooleanVolatile(final boolean[] target, final long index) {
- return UNSAFE_ACCESSOR.getBooleanVolatile(target, BOOLEAN_ARRAY_BASE_OFFSET
- + (index * BOOLEAN_ARRAY_INDEX_SCALE));
- }
-
- public static void putBooleanVolatile(final boolean[] target, final long index, final boolean value) {
- UNSAFE_ACCESSOR.putBooleanVolatile(target, BOOLEAN_ARRAY_BASE_OFFSET + (index * BOOLEAN_ARRAY_INDEX_SCALE),
- value);
- }
-
- public static float getFloatVolatile(final float[] target, final long index) {
- return UNSAFE_ACCESSOR.getFloatVolatile(target, FLOAT_ARRAY_BASE_OFFSET + (index * FLOAT_ARRAY_INDEX_SCALE));
- }
-
- public static void putFloatVolatile(final float[] target, final long index, final float value) {
- UNSAFE_ACCESSOR.putFloatVolatile(target, FLOAT_ARRAY_BASE_OFFSET + (index * FLOAT_ARRAY_INDEX_SCALE), value);
- }
-
- public static double getDoubleVolatile(final double[] target, final long index) {
- return UNSAFE_ACCESSOR.getDoubleVolatile(target, DOUBLE_ARRAY_BASE_OFFSET + (index * DOUBLE_ARRAY_INDEX_SCALE));
- }
-
- public static void putDoubleVolatile(final double[] target, final long index, final double value) {
- UNSAFE_ACCESSOR.putDoubleVolatile(target, DOUBLE_ARRAY_BASE_OFFSET + (index * DOUBLE_ARRAY_INDEX_SCALE), value);
- }
-
- public static Object getObjectVolatile(final Object[] target, final long index) {
- return UNSAFE_ACCESSOR.getObjectVolatile(target, OBJECT_ARRAY_BASE_OFFSET + (index * OBJECT_ARRAY_INDEX_SCALE));
- }
-
- public static void putObjectVolatile(final Object[] target, final long index, final Object value) {
- UNSAFE_ACCESSOR.putObjectVolatile(target, OBJECT_ARRAY_BASE_OFFSET + (index * OBJECT_ARRAY_INDEX_SCALE), value);
- }
-
- /**
- * Reports the offset of the first element in the storage allocation of a
- * given array class.
- */
- public static int arrayBaseOffset(final Class<?> clazz) {
- return hasUnsafe() ? UNSAFE_ACCESSOR.arrayBaseOffset(clazz) : -1;
- }
-
- /**
- * Reports the scale factor for addressing elements in the storage
- * allocation of a given array class.
- */
- public static int arrayIndexScale(final Class<?> clazz) {
- return hasUnsafe() ? UNSAFE_ACCESSOR.arrayIndexScale(clazz) : -1;
- }
-
- /**
- * Returns the offset of the provided field, or {@code -1} if {@code sun.misc.Unsafe} is not
- * available.
- */
- public static long objectFieldOffset(final Field field) {
- return field == null || hasUnsafe() ? UNSAFE_ACCESSOR.objectFieldOffset(field) : -1;
- }
-
- /**
- * Returns the offset of the provided class and fieldName, or {@code -1} if {@code sun.misc.Unsafe} is not
- * available.
- */
- public static long objectFieldOffset(final Class<?> clazz, final String fieldName) {
- try {
- return objectFieldOffset(clazz.getDeclaredField(fieldName));
- } catch (final NoSuchFieldException e) {
- UNSAFE_ACCESSOR.throwException(e);
- }
- return -1; // never get here
- }
-
- /**
- * Gets the offset of the {@code address} field of the given
- * direct {@link ByteBuffer}.
- */
- public static long addressOffset(final ByteBuffer buffer) {
- return UNSAFE_ACCESSOR.getLong(buffer, BUFFER_ADDRESS_OFFSET);
- }
-
- public static void throwException(final Throwable t) {
- UNSAFE_ACCESSOR.throwException(t);
- }
-
- /**
- * Returns a new {@link String} backed by the given {@code chars}.
- * The char array should not be mutated any more after calling
- * this function.
- */
- public static String moveToString(final char[] chars) {
- if (STRING_VALUE_OFFSET == -1) {
- // In the off-chance that this JDK does not implement String as we'd expect, just do a copy.
- return new String(chars);
- }
- final String str;
- try {
- str = (String) UNSAFE_ACCESSOR.allocateInstance(String.class);
- } catch (final InstantiationException e) {
- // This should never happen, but return a copy as a fallback just in case.
- return new String(chars);
- }
- UNSAFE_ACCESSOR.putObject(str, STRING_VALUE_OFFSET, chars);
- return str;
- }
-
- /**
- * Returns the system {@link ClassLoader}.
- */
- public static ClassLoader getSystemClassLoader() {
- if (System.getSecurityManager() == null) {
- return ClassLoader.getSystemClassLoader();
- } else {
- return AccessController.doPrivileged((PrivilegedAction<ClassLoader>) ClassLoader::getSystemClassLoader);
- }
- }
-
- /**
- * Finds the address field within a direct {@link Buffer}.
- */
- private static Field bufferAddressField() {
- return field(Buffer.class, "address", long.class);
- }
-
- /**
- * Finds the value field within a {@link String}.
- */
- private static Field stringValueField() {
- return field(String.class, "value", char[].class);
- }
-
- /**
- * Gets the field with the given name within the class, or
- * {@code null} if not found. If found, the field is made accessible.
- */
- private static Field field(final Class<?> clazz, final String fieldName, final Class<?> expectedType) {
- Field field;
- try {
- field = clazz.getDeclaredField(fieldName);
- field.setAccessible(true);
- if (!field.getType().equals(expectedType)) {
- return null;
- }
- } catch (final Throwable t) {
- // Failed to access the fields.
- field = null;
- }
- return field;
- }
-
- private static UnsafeAccessor getUnsafeAccessor0() {
- return hasUnsafe() ? new UnsafeAccessor(UNSAFE) : null;
- }
-
- private static Object getUnsafe0() {
- Object unsafe;
- try {
- final Class<?> unsafeClass = Class.forName("sun.misc.Unsafe");
- final Field unsafeField = unsafeClass.getDeclaredField("theUnsafe");
- unsafeField.setAccessible(true);
- unsafe = unsafeField.get(null);
- } catch (final Throwable t) {
- if (LOG.isWarnEnabled()) {
- LOG.warn("sun.misc.Unsafe.theUnsafe: unavailable, {}.", t);
- }
- unsafe = null;
- }
- return unsafe;
- }
-
- public static class UnsafeAccessor {
-
- private final sun.misc.Unsafe unsafe;
-
- UnsafeAccessor(Object unsafe) {
- this.unsafe = (sun.misc.Unsafe) unsafe;
- }
-
- /**
- * Returns the {@link sun.misc.Unsafe}'s instance.
- */
- public sun.misc.Unsafe getUnsafe() {
- return unsafe;
- }
-
- public byte getByte(final Object target, final long offset) {
- return this.unsafe.getByte(target, offset);
- }
-
- public void putByte(final Object target, final long offset, final byte value) {
- this.unsafe.putByte(target, offset, value);
- }
-
- public short getShort(final Object target, final long offset) {
- return this.unsafe.getShort(target, offset);
- }
-
- public void putShort(final Object target, final long offset, final short value) {
- this.unsafe.putShort(target, offset, value);
- }
-
- public int getInt(final Object target, final long offset) {
- return this.unsafe.getInt(target, offset);
- }
-
- public void putInt(final Object target, final long offset, final int value) {
- this.unsafe.putInt(target, offset, value);
- }
-
- public long getLong(final Object target, final long offset) {
- return this.unsafe.getLong(target, offset);
- }
-
- public void putLong(final Object target, final long offset, final long value) {
- this.unsafe.putLong(target, offset, value);
- }
-
- public boolean getBoolean(final Object target, final long offset) {
- return this.unsafe.getBoolean(target, offset);
- }
-
- public void putBoolean(final Object target, final long offset, final boolean value) {
- this.unsafe.putBoolean(target, offset, value);
- }
-
- public float getFloat(final Object target, final long offset) {
- return this.unsafe.getFloat(target, offset);
- }
-
- public void putFloat(final Object target, final long offset, final float value) {
- this.unsafe.putFloat(target, offset, value);
- }
-
- public double getDouble(final Object target, final long offset) {
- return this.unsafe.getDouble(target, offset);
- }
-
- public void putDouble(final Object target, final long offset, final double value) {
- this.unsafe.putDouble(target, offset, value);
- }
-
- public Object getObject(final Object target, final long offset) {
- return this.unsafe.getObject(target, offset);
- }
-
- public void putObject(final Object target, final long offset, final Object value) {
- this.unsafe.putObject(target, offset, value);
- }
-
- public byte getByte(final long address) {
- return this.unsafe.getByte(address);
- }
-
- public void putByte(final long address, final byte value) {
- this.unsafe.putByte(address, value);
- }
-
- public short getShort(final long address) {
- return this.unsafe.getShort(address);
- }
-
- public void putShort(final long address, final short value) {
- this.unsafe.putShort(address, value);
- }
-
- public int getInt(final long address) {
- return this.unsafe.getInt(address);
- }
-
- public void putInt(final long address, final int value) {
- this.unsafe.putInt(address, value);
- }
-
- public long getLong(final long address) {
- return this.unsafe.getLong(address);
- }
-
- public void putLong(final long address, final long value) {
- this.unsafe.putLong(address, value);
- }
-
- public void copyMemory(final Object srcBase, final long srcOffset, final Object dstBase, final long dstOffset,
- final long bytes) {
- this.unsafe.copyMemory(srcBase, srcOffset, dstBase, dstOffset, bytes);
- }
-
- public void copyMemory(final long srcAddress, final long dstAddress, final long bytes) {
- this.unsafe.copyMemory(srcAddress, dstAddress, bytes);
- }
-
- public byte getByteVolatile(final Object target, final long offset) {
- return this.unsafe.getByteVolatile(target, offset);
- }
-
- public void putByteVolatile(final Object target, final long offset, final byte value) {
- this.unsafe.putByteVolatile(target, offset, value);
- }
-
- public short getShortVolatile(final Object target, final long offset) {
- return this.unsafe.getShortVolatile(target, offset);
- }
-
- public void putShortVolatile(final Object target, final long offset, final short value) {
- this.unsafe.putShortVolatile(target, offset, value);
- }
-
- public int getIntVolatile(final Object target, final long offset) {
- return this.unsafe.getIntVolatile(target, offset);
- }
-
- public void putIntVolatile(final Object target, final long offset, final int value) {
- this.unsafe.putIntVolatile(target, offset, value);
- }
-
- public long getLongVolatile(final Object target, final long offset) {
- return this.unsafe.getLongVolatile(target, offset);
- }
-
- public void putLongVolatile(final Object target, final long offset, final long value) {
- this.unsafe.putLongVolatile(target, offset, value);
- }
-
- public boolean getBooleanVolatile(final Object target, final long offset) {
- return this.unsafe.getBooleanVolatile(target, offset);
- }
-
- public void putBooleanVolatile(final Object target, final long offset, final boolean value) {
- this.unsafe.putBooleanVolatile(target, offset, value);
- }
-
- public float getFloatVolatile(final Object target, final long offset) {
- return this.unsafe.getFloatVolatile(target, offset);
- }
-
- public void putFloatVolatile(final Object target, final long offset, final float value) {
- this.unsafe.putFloatVolatile(target, offset, value);
- }
-
- public double getDoubleVolatile(final Object target, final long offset) {
- return this.unsafe.getDoubleVolatile(target, offset);
- }
-
- public void putDoubleVolatile(final Object target, final long offset, final double value) {
- this.unsafe.putDoubleVolatile(target, offset, value);
- }
-
- public Object getObjectVolatile(final Object target, final long offset) {
- return this.unsafe.getObjectVolatile(target, offset);
- }
-
- public void putObjectVolatile(final Object target, final long offset, final Object value) {
- this.unsafe.putObjectVolatile(target, offset, value);
- }
-
- /**
- * Reports the offset of the first element in the storage allocation of a
- * given array class.
- */
- public int arrayBaseOffset(final Class<?> clazz) {
- return this.unsafe != null ? this.unsafe.arrayBaseOffset(clazz) : -1;
- }
-
- /**
- * Reports the scale factor for addressing elements in the storage
- * allocation of a given array class.
- */
- public int arrayIndexScale(final Class<?> clazz) {
- return this.unsafe != null ? this.unsafe.arrayIndexScale(clazz) : -1;
- }
-
- /**
- * Returns the offset of the provided field, or {@code -1} if {@code sun.misc.Unsafe} is not
- * available.
- */
- public long objectFieldOffset(final Field field) {
- return field == null || this.unsafe == null ? -1 : this.unsafe.objectFieldOffset(field);
- }
-
- public Object allocateInstance(final Class<?> clazz) throws InstantiationException {
- return this.unsafe.allocateInstance(clazz);
- }
-
- public void throwException(final Throwable t) {
- this.unsafe.throwException(t);
- }
- }
-
- private UnsafeUtil() {
- }
-}
+//@SuppressWarnings("ConstantConditions")
+//public final class UnsafeUtil {
+//
+// private static final Logger LOG = LoggerFactory.getLogger(UnsafeUtil.class);
+//
+// private static final Object UNSAFE = getUnsafe0();
+//
+// private static final UnsafeAccessor UNSAFE_ACCESSOR = getUnsafeAccessor0();
+//
+// private static final long BYTE_ARRAY_BASE_OFFSET = arrayBaseOffset(byte[].class);
+// // Micro-optimization: we can assume a scale of 1 and skip the multiply
+// // private static final long BYTE_ARRAY_INDEX_SCALE = 1;
+//
+// private static final long BOOLEAN_ARRAY_BASE_OFFSET = arrayBaseOffset(boolean[].class);
+// private static final long BOOLEAN_ARRAY_INDEX_SCALE = arrayIndexScale(boolean[].class);
+//
+// private static final long INT_ARRAY_BASE_OFFSET = arrayBaseOffset(int[].class);
+// private static final long INT_ARRAY_INDEX_SCALE = arrayIndexScale(int[].class);
+//
+// private static final long LONG_ARRAY_BASE_OFFSET = arrayBaseOffset(long[].class);
+// private static final long LONG_ARRAY_INDEX_SCALE = arrayIndexScale(long[].class);
+//
+// private static final long FLOAT_ARRAY_BASE_OFFSET = arrayBaseOffset(float[].class);
+// private static final long FLOAT_ARRAY_INDEX_SCALE = arrayIndexScale(float[].class);
+//
+// private static final long DOUBLE_ARRAY_BASE_OFFSET = arrayBaseOffset(double[].class);
+// private static final long DOUBLE_ARRAY_INDEX_SCALE = arrayIndexScale(double[].class);
+//
+// private static final long OBJECT_ARRAY_BASE_OFFSET = arrayBaseOffset(Object[].class);
+// private static final long OBJECT_ARRAY_INDEX_SCALE = arrayIndexScale(Object[].class);
+//
+// private static final long BUFFER_ADDRESS_OFFSET = objectFieldOffset(bufferAddressField());
+//
+// private static final long STRING_VALUE_OFFSET = objectFieldOffset(stringValueField());
+//
+// /**
+// * Whether or not can use the unsafe api.
+// */
+// public static boolean hasUnsafe() {
+// return UNSAFE != null;
+// }
+//
+// /**
+// * Get a {@link UnsafeAccessor} appropriate for the platform.
+// */
+// public static UnsafeAccessor getUnsafeAccessor() {
+// return UNSAFE_ACCESSOR;
+// }
+//
+// public static byte getByte(final Object target, final long offset) {
+// return UNSAFE_ACCESSOR.getByte(target, offset);
+// }
+//
+// public static void putByte(final Object target, final long offset, final byte value) {
+// UNSAFE_ACCESSOR.putByte(target, offset, value);
+// }
+//
+// public static int getInt(final Object target, final long offset) {
+// return UNSAFE_ACCESSOR.getInt(target, offset);
+// }
+//
+// public static void putInt(final Object target, final long offset, final int value) {
+// UNSAFE_ACCESSOR.putInt(target, offset, value);
+// }
+//
+// public static long getLong(final Object target, final long offset) {
+// return UNSAFE_ACCESSOR.getLong(target, offset);
+// }
+//
+// public static void putLong(final Object target, final long offset, final long value) {
+// UNSAFE_ACCESSOR.putLong(target, offset, value);
+// }
+//
+// public static boolean getBoolean(final Object target, final long offset) {
+// return UNSAFE_ACCESSOR.getBoolean(target, offset);
+// }
+//
+// public static void putBoolean(final Object target, final long offset, final boolean value) {
+// UNSAFE_ACCESSOR.putBoolean(target, offset, value);
+// }
+//
+// public static float getFloat(final Object target, final long offset) {
+// return UNSAFE_ACCESSOR.getFloat(target, offset);
+// }
+//
+// public static void putFloat(final Object target, final long offset, final float value) {
+// UNSAFE_ACCESSOR.putFloat(target, offset, value);
+// }
+//
+// public static double getDouble(final Object target, final long offset) {
+// return UNSAFE_ACCESSOR.getDouble(target, offset);
+// }
+//
+// public static void putDouble(final Object target, final long offset, final double value) {
+// UNSAFE_ACCESSOR.putDouble(target, offset, value);
+// }
+//
+// public static Object getObject(final Object target, final long offset) {
+// return UNSAFE_ACCESSOR.getObject(target, offset);
+// }
+//
+// public static void putObject(final Object target, final long offset, final Object value) {
+// UNSAFE_ACCESSOR.putObject(target, offset, value);
+// }
+//
+// public static byte getByte(final byte[] target, final long index) {
+// return UNSAFE_ACCESSOR.getByte(target, BYTE_ARRAY_BASE_OFFSET + index);
+// }
+//
+// public static void putByte(final byte[] target, final long index, final byte value) {
+// UNSAFE_ACCESSOR.putByte(target, BYTE_ARRAY_BASE_OFFSET + index, value);
+// }
+//
+// public static int getInt(final int[] target, final long index) {
+// return UNSAFE_ACCESSOR.getInt(target, INT_ARRAY_BASE_OFFSET + (index * INT_ARRAY_INDEX_SCALE));
+// }
+//
+// public static void putInt(final int[] target, final long index, final int value) {
+// UNSAFE_ACCESSOR.putInt(target, INT_ARRAY_BASE_OFFSET + (index * INT_ARRAY_INDEX_SCALE), value);
+// }
+//
+// public static long getLong(final long[] target, final long index) {
+// return UNSAFE_ACCESSOR.getLong(target, LONG_ARRAY_BASE_OFFSET + (index * LONG_ARRAY_INDEX_SCALE));
+// }
+//
+// public static void putLong(final long[] target, final long index, final long value) {
+// UNSAFE_ACCESSOR.putLong(target, LONG_ARRAY_BASE_OFFSET + (index * LONG_ARRAY_INDEX_SCALE), value);
+// }
+//
+// public static boolean getBoolean(final boolean[] target, final long index) {
+// return UNSAFE_ACCESSOR.getBoolean(target, BOOLEAN_ARRAY_BASE_OFFSET + (index * BOOLEAN_ARRAY_INDEX_SCALE));
+// }
+//
+// public static void putBoolean(final boolean[] target, final long index, final boolean value) {
+// UNSAFE_ACCESSOR.putBoolean(target, BOOLEAN_ARRAY_BASE_OFFSET + (index * BOOLEAN_ARRAY_INDEX_SCALE), value);
+// }
+//
+// public static float getFloat(final float[] target, final long index) {
+// return UNSAFE_ACCESSOR.getFloat(target, FLOAT_ARRAY_BASE_OFFSET + (index * FLOAT_ARRAY_INDEX_SCALE));
+// }
+//
+// public static void putFloat(final float[] target, final long index, final float value) {
+// UNSAFE_ACCESSOR.putFloat(target, FLOAT_ARRAY_BASE_OFFSET + (index * FLOAT_ARRAY_INDEX_SCALE), value);
+// }
+//
+// public static double getDouble(final double[] target, final long index) {
+// return UNSAFE_ACCESSOR.getDouble(target, DOUBLE_ARRAY_BASE_OFFSET + (index * DOUBLE_ARRAY_INDEX_SCALE));
+// }
+//
+// public static void putDouble(final double[] target, final long index, final double value) {
+// UNSAFE_ACCESSOR.putDouble(target, DOUBLE_ARRAY_BASE_OFFSET + (index * DOUBLE_ARRAY_INDEX_SCALE), value);
+// }
+//
+// public static Object getObject(final Object[] target, final long index) {
+// return UNSAFE_ACCESSOR.getObject(target, OBJECT_ARRAY_BASE_OFFSET + (index * OBJECT_ARRAY_INDEX_SCALE));
+// }
+//
+// public static void putObject(final Object[] target, final long index, final Object value) {
+// UNSAFE_ACCESSOR.putObject(target, OBJECT_ARRAY_BASE_OFFSET + (index * OBJECT_ARRAY_INDEX_SCALE), value);
+// }
+//
+// public static byte getByte(final long address) {
+// return UNSAFE_ACCESSOR.getByte(address);
+// }
+//
+// public static void putByte(final long address, final byte value) {
+// UNSAFE_ACCESSOR.putByte(address, value);
+// }
+//
+// public static int getInt(final long address) {
+// return UNSAFE_ACCESSOR.getInt(address);
+// }
+//
+// public static void putInt(final long address, final int value) {
+// UNSAFE_ACCESSOR.putInt(address, value);
+// }
+//
+// public static long getLong(final long address) {
+// return UNSAFE_ACCESSOR.getLong(address);
+// }
+//
+// public static void putLong(final long address, final long value) {
+// UNSAFE_ACCESSOR.putLong(address, value);
+// }
+//
+// public static byte getByteVolatile(final byte[] target, final long index) {
+// return UNSAFE_ACCESSOR.getByteVolatile(target, BYTE_ARRAY_BASE_OFFSET + index);
+// }
+//
+// public static void putByteVolatile(final byte[] target, final long index, final byte value) {
+// UNSAFE_ACCESSOR.putByteVolatile(target, BYTE_ARRAY_BASE_OFFSET + index, value);
+// }
+//
+// public static int getIntVolatile(final int[] target, final long index) {
+// return UNSAFE_ACCESSOR.getIntVolatile(target, INT_ARRAY_BASE_OFFSET + (index * INT_ARRAY_INDEX_SCALE));
+// }
+//
+// public static void putIntVolatile(final int[] target, final long index, final int value) {
+// UNSAFE_ACCESSOR.putIntVolatile(target, INT_ARRAY_BASE_OFFSET + (index * INT_ARRAY_INDEX_SCALE), value);
+// }
+//
+// public static long getLongVolatile(final long[] target, final long index) {
+// return UNSAFE_ACCESSOR.getLongVolatile(target, LONG_ARRAY_BASE_OFFSET + (index * LONG_ARRAY_INDEX_SCALE));
+// }
+//
+// public static void putLongVolatile(final long[] target, final long index, final long value) {
+// UNSAFE_ACCESSOR.putLongVolatile(target, LONG_ARRAY_BASE_OFFSET + (index * LONG_ARRAY_INDEX_SCALE), value);
+// }
+//
+// public static boolean getBooleanVolatile(final boolean[] target, final long index) {
+// return UNSAFE_ACCESSOR.getBooleanVolatile(target, BOOLEAN_ARRAY_BASE_OFFSET
+// + (index * BOOLEAN_ARRAY_INDEX_SCALE));
+// }
+//
+// public static void putBooleanVolatile(final boolean[] target, final long index, final boolean value) {
+// UNSAFE_ACCESSOR.putBooleanVolatile(target, BOOLEAN_ARRAY_BASE_OFFSET + (index * BOOLEAN_ARRAY_INDEX_SCALE),
+// value);
+// }
+//
+// public static float getFloatVolatile(final float[] target, final long index) {
+// return UNSAFE_ACCESSOR.getFloatVolatile(target, FLOAT_ARRAY_BASE_OFFSET + (index * FLOAT_ARRAY_INDEX_SCALE));
+// }
+//
+// public static void putFloatVolatile(final float[] target, final long index, final float value) {
+// UNSAFE_ACCESSOR.putFloatVolatile(target, FLOAT_ARRAY_BASE_OFFSET + (index * FLOAT_ARRAY_INDEX_SCALE), value);
+// }
+//
+// public static double getDoubleVolatile(final double[] target, final long index) {
+// return UNSAFE_ACCESSOR.getDoubleVolatile(target, DOUBLE_ARRAY_BASE_OFFSET + (index * DOUBLE_ARRAY_INDEX_SCALE));
+// }
+//
+// public static void putDoubleVolatile(final double[] target, final long index, final double value) {
+// UNSAFE_ACCESSOR.putDoubleVolatile(target, DOUBLE_ARRAY_BASE_OFFSET + (index * DOUBLE_ARRAY_INDEX_SCALE), value);
+// }
+//
+// public static Object getObjectVolatile(final Object[] target, final long index) {
+// return UNSAFE_ACCESSOR.getObjectVolatile(target, OBJECT_ARRAY_BASE_OFFSET + (index * OBJECT_ARRAY_INDEX_SCALE));
+// }
+//
+// public static void putObjectVolatile(final Object[] target, final long index, final Object value) {
+// UNSAFE_ACCESSOR.putObjectVolatile(target, OBJECT_ARRAY_BASE_OFFSET + (index * OBJECT_ARRAY_INDEX_SCALE), value);
+// }
+//
+// /**
+// * Reports the offset of the first element in the storage allocation of a
+// * given array class.
+// */
+// public static int arrayBaseOffset(final Class<?> clazz) {
+// return hasUnsafe() ? UNSAFE_ACCESSOR.arrayBaseOffset(clazz) : -1;
+// }
+//
+// /**
+// * Reports the scale factor for addressing elements in the storage
+// * allocation of a given array class.
+// */
+// public static int arrayIndexScale(final Class<?> clazz) {
+// return hasUnsafe() ? UNSAFE_ACCESSOR.arrayIndexScale(clazz) : -1;
+// }
+//
+// /**
+// * Returns the offset of the provided field, or {@code -1} if {@code sun.misc.Unsafe} is not
+// * available.
+// */
+// public static long objectFieldOffset(final Field field) {
+// return field == null || hasUnsafe() ? UNSAFE_ACCESSOR.objectFieldOffset(field) : -1;
+// }
+//
+// /**
+// * Returns the offset of the provided class and fieldName, or {@code -1} if {@code sun.misc.Unsafe} is not
+// * available.
+// */
+// public static long objectFieldOffset(final Class<?> clazz, final String fieldName) {
+// try {
+// return objectFieldOffset(clazz.getDeclaredField(fieldName));
+// } catch (final NoSuchFieldException e) {
+// UNSAFE_ACCESSOR.throwException(e);
+// }
+// return -1; // never get here
+// }
+//
+// /**
+// * Gets the offset of the {@code address} field of the given
+// * direct {@link ByteBuffer}.
+// */
+// public static long addressOffset(final ByteBuffer buffer) {
+// return UNSAFE_ACCESSOR.getLong(buffer, BUFFER_ADDRESS_OFFSET);
+// }
+//
+// public static void throwException(final Throwable t) {
+// UNSAFE_ACCESSOR.throwException(t);
+// }
+//
+// /**
+// * Returns a new {@link String} backed by the given {@code chars}.
+// * The char array should not be mutated any more after calling
+// * this function.
+// */
+// public static String moveToString(final char[] chars) {
+// if (STRING_VALUE_OFFSET == -1) {
+// // In the off-chance that this JDK does not implement String as we'd expect, just do a copy.
+// return new String(chars);
+// }
+// final String str;
+// try {
+// str = (String) UNSAFE_ACCESSOR.allocateInstance(String.class);
+// } catch (final InstantiationException e) {
+// // This should never happen, but return a copy as a fallback just in case.
+// return new String(chars);
+// }
+// UNSAFE_ACCESSOR.putObject(str, STRING_VALUE_OFFSET, chars);
+// return str;
+// }
+//
+// /**
+// * Returns the system {@link ClassLoader}.
+// */
+// public static ClassLoader getSystemClassLoader() {
+// if (System.getSecurityManager() == null) {
+// return ClassLoader.getSystemClassLoader();
+// } else {
+// return AccessController.doPrivileged((PrivilegedAction<ClassLoader>) ClassLoader::getSystemClassLoader);
+// }
+// }
+//
+// /**
+// * Finds the address field within a direct {@link Buffer}.
+// */
+// private static Field bufferAddressField() {
+// return field(Buffer.class, "address", long.class);
+// }
+//
+// /**
+// * Finds the value field within a {@link String}.
+// */
+// private static Field stringValueField() {
+// return field(String.class, "value", char[].class);
+// }
+//
+// /**
+// * Gets the field with the given name within the class, or
+// * {@code null} if not found. If found, the field is made accessible.
+// */
+// private static Field field(final Class<?> clazz, final String fieldName, final Class<?> expectedType) {
+// Field field;
+// try {
+// field = clazz.getDeclaredField(fieldName);
+// field.setAccessible(true);
+// if (!field.getType().equals(expectedType)) {
+// return null;
+// }
+// } catch (final Throwable t) {
+// // Failed to access the fields.
+// field = null;
+// }
+// return field;
+// }
+//
+// private static UnsafeAccessor getUnsafeAccessor0() {
+// return hasUnsafe() ? new UnsafeAccessor(UNSAFE) : null;
+// }
+//
+// private static Object getUnsafe0() {
+// Object unsafe;
+// try {
+// final Class<?> unsafeClass = Class.forName("sun.misc.Unsafe");
+// final Field unsafeField = unsafeClass.getDeclaredField("theUnsafe");
+// unsafeField.setAccessible(true);
+// unsafe = unsafeField.get(null);
+// } catch (final Throwable t) {
+// if (LOG.isWarnEnabled()) {
+// LOG.warn("sun.misc.Unsafe.theUnsafe: unavailable, {}.", t);
+// }
+// unsafe = null;
+// }
+// return unsafe;
+// }
+//
+// public static class UnsafeAccessor {
+//
+// private final sun.misc.Unsafe unsafe;
+//
+// UnsafeAccessor(Object unsafe) {
+// this.unsafe = (sun.misc.Unsafe) unsafe;
+// }
+//
+// /**
+// * Returns the {@link sun.misc.Unsafe}'s instance.
+// */
+// public sun.misc.Unsafe getUnsafe() {
+// return unsafe;
+// }
+//
+// public byte getByte(final Object target, final long offset) {
+// return this.unsafe.getByte(target, offset);
+// }
+//
+// public void putByte(final Object target, final long offset, final byte value) {
+// this.unsafe.putByte(target, offset, value);
+// }
+//
+// public short getShort(final Object target, final long offset) {
+// return this.unsafe.getShort(target, offset);
+// }
+//
+// public void putShort(final Object target, final long offset, final short value) {
+// this.unsafe.putShort(target, offset, value);
+// }
+//
+// public int getInt(final Object target, final long offset) {
+// return this.unsafe.getInt(target, offset);
+// }
+//
+// public void putInt(final Object target, final long offset, final int value) {
+// this.unsafe.putInt(target, offset, value);
+// }
+//
+// public long getLong(final Object target, final long offset) {
+// return this.unsafe.getLong(target, offset);
+// }
+//
+// public void putLong(final Object target, final long offset, final long value) {
+// this.unsafe.putLong(target, offset, value);
+// }
+//
+// public boolean getBoolean(final Object target, final long offset) {
+// return this.unsafe.getBoolean(target, offset);
+// }
+//
+// public void putBoolean(final Object target, final long offset, final boolean value) {
+// this.unsafe.putBoolean(target, offset, value);
+// }
+//
+// public float getFloat(final Object target, final long offset) {
+// return this.unsafe.getFloat(target, offset);
+// }
+//
+// public void putFloat(final Object target, final long offset, final float value) {
+// this.unsafe.putFloat(target, offset, value);
+// }
+//
+// public double getDouble(final Object target, final long offset) {
+// return this.unsafe.getDouble(target, offset);
+// }
+//
+// public void putDouble(final Object target, final long offset, final double value) {
+// this.unsafe.putDouble(target, offset, value);
+// }
+//
+// public Object getObject(final Object target, final long offset) {
+// return this.unsafe.getObject(target, offset);
+// }
+//
+// public void putObject(final Object target, final long offset, final Object value) {
+// this.unsafe.putObject(target, offset, value);
+// }
+//
+// public byte getByte(final long address) {
+// return this.unsafe.getByte(address);
+// }
+//
+// public void putByte(final long address, final byte value) {
+// this.unsafe.putByte(address, value);
+// }
+//
+// public short getShort(final long address) {
+// return this.unsafe.getShort(address);
+// }
+//
+// public void putShort(final long address, final short value) {
+// this.unsafe.putShort(address, value);
+// }
+//
+// public int getInt(final long address) {
+// return this.unsafe.getInt(address);
+// }
+//
+// public void putInt(final long address, final int value) {
+// this.unsafe.putInt(address, value);
+// }
+//
+// public long getLong(final long address) {
+// return this.unsafe.getLong(address);
+// }
+//
+// public void putLong(final long address, final long value) {
+// this.unsafe.putLong(address, value);
+// }
+//
+// public void copyMemory(final Object srcBase, final long srcOffset, final Object dstBase, final long dstOffset,
+// final long bytes) {
+// this.unsafe.copyMemory(srcBase, srcOffset, dstBase, dstOffset, bytes);
+// }
+//
+// public void copyMemory(final long srcAddress, final long dstAddress, final long bytes) {
+// this.unsafe.copyMemory(srcAddress, dstAddress, bytes);
+// }
+//
+// public byte getByteVolatile(final Object target, final long offset) {
+// return this.unsafe.getByteVolatile(target, offset);
+// }
+//
+// public void putByteVolatile(final Object target, final long offset, final byte value) {
+// this.unsafe.putByteVolatile(target, offset, value);
+// }
+//
+// public short getShortVolatile(final Object target, final long offset) {
+// return this.unsafe.getShortVolatile(target, offset);
+// }
+//
+// public void putShortVolatile(final Object target, final long offset, final short value) {
+// this.unsafe.putShortVolatile(target, offset, value);
+// }
+//
+// public int getIntVolatile(final Object target, final long offset) {
+// return this.unsafe.getIntVolatile(target, offset);
+// }
+//
+// public void putIntVolatile(final Object target, final long offset, final int value) {
+// this.unsafe.putIntVolatile(target, offset, value);
+// }
+//
+// public long getLongVolatile(final Object target, final long offset) {
+// return this.unsafe.getLongVolatile(target, offset);
+// }
+//
+// public void putLongVolatile(final Object target, final long offset, final long value) {
+// this.unsafe.putLongVolatile(target, offset, value);
+// }
+//
+// public boolean getBooleanVolatile(final Object target, final long offset) {
+// return this.unsafe.getBooleanVolatile(target, offset);
+// }
+//
+// public void putBooleanVolatile(final Object target, final long offset, final boolean value) {
+// this.unsafe.putBooleanVolatile(target, offset, value);
+// }
+//
+// public float getFloatVolatile(final Object target, final long offset) {
+// return this.unsafe.getFloatVolatile(target, offset);
+// }
+//
+// public void putFloatVolatile(final Object target, final long offset, final float value) {
+// this.unsafe.putFloatVolatile(target, offset, value);
+// }
+//
+// public double getDoubleVolatile(final Object target, final long offset) {
+// return this.unsafe.getDoubleVolatile(target, offset);
+// }
+//
+// public void putDoubleVolatile(final Object target, final long offset, final double value) {
+// this.unsafe.putDoubleVolatile(target, offset, value);
+// }
+//
+// public Object getObjectVolatile(final Object target, final long offset) {
+// return this.unsafe.getObjectVolatile(target, offset);
+// }
+//
+// public void putObjectVolatile(final Object target, final long offset, final Object value) {
+// this.unsafe.putObjectVolatile(target, offset, value);
+// }
+//
+// /**
+// * Reports the offset of the first element in the storage allocation of a
+// * given array class.
+// */
+// public int arrayBaseOffset(final Class<?> clazz) {
+// return this.unsafe != null ? this.unsafe.arrayBaseOffset(clazz) : -1;
+// }
+//
+// /**
+// * Reports the scale factor for addressing elements in the storage
+// * allocation of a given array class.
+// */
+// public int arrayIndexScale(final Class<?> clazz) {
+// return this.unsafe != null ? this.unsafe.arrayIndexScale(clazz) : -1;
+// }
+//
+// /**
+// * Returns the offset of the provided field, or {@code -1} if {@code sun.misc.Unsafe} is not
+// * available.
+// */
+// public long objectFieldOffset(final Field field) {
+// return field == null || this.unsafe == null ? -1 : this.unsafe.objectFieldOffset(field);
+// }
+//
+// public Object allocateInstance(final Class<?> clazz) throws InstantiationException {
+// return this.unsafe.allocateInstance(clazz);
+// }
+//
+// public void throwException(final Throwable t) {
+// this.unsafe.throwException(t);
+// }
+// }
+//
+// private UnsafeUtil() {
+// }
+//}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/internal/Updaters.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/util/internal/Updaters.java
index 3732d41..a17e62f 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/util/internal/Updaters.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/util/internal/Updaters.java
@@ -32,11 +32,13 @@ public class Updaters {
public static <U> IntegerFieldUpdater<U> newIntegerFieldUpdater(final Class<? super U> tClass,
final String fieldName) {
try {
- if (UnsafeUtil.hasUnsafe()) {
- return new UnsafeIntegerFieldUpdater<>(UnsafeUtil.getUnsafeAccessor().getUnsafe(), tClass, fieldName);
- } else {
- return new ReflectionIntegerFieldUpdater<>(tClass, fieldName);
- }
+// if (UnsafeUtil.hasUnsafe()) {
+// return new UnsafeIntegerFieldUpdater<>(UnsafeUtil.getUnsafeAccessor().getUnsafe(), tClass, fieldName);
+// } else {
+// return new ReflectionIntegerFieldUpdater<>(tClass, fieldName);
+// }
+
+ return new ReflectionIntegerFieldUpdater<>(tClass, fieldName);
} catch (final Throwable t) {
throw new RuntimeException(t);
}
@@ -50,11 +52,13 @@ public class Updaters {
*/
public static <U> LongFieldUpdater<U> newLongFieldUpdater(final Class<? super U> tClass, final String fieldName) {
try {
- if (UnsafeUtil.hasUnsafe()) {
- return new UnsafeLongFieldUpdater<>(UnsafeUtil.getUnsafeAccessor().getUnsafe(), tClass, fieldName);
- } else {
- return new ReflectionLongFieldUpdater<>(tClass, fieldName);
- }
+// if (UnsafeUtil.hasUnsafe()) {
+// return new UnsafeLongFieldUpdater<>(UnsafeUtil.getUnsafeAccessor().getUnsafe(), tClass, fieldName);
+// } else {
+// return new ReflectionLongFieldUpdater<>(tClass, fieldName);
+// }
+
+ return new ReflectionLongFieldUpdater<>(tClass, fieldName);
} catch (final Throwable t) {
throw new RuntimeException(t);
}
@@ -69,11 +73,13 @@ public class Updaters {
public static <U, W> ReferenceFieldUpdater<U, W> newReferenceFieldUpdater(final Class<? super U> tClass,
final String fieldName) {
try {
- if (UnsafeUtil.hasUnsafe()) {
- return new UnsafeReferenceFieldUpdater<>(UnsafeUtil.getUnsafeAccessor().getUnsafe(), tClass, fieldName);
- } else {
- return new ReflectionReferenceFieldUpdater<>(tClass, fieldName);
- }
+// if (UnsafeUtil.hasUnsafe()) {
+// return new UnsafeReferenceFieldUpdater<>(UnsafeUtil.getUnsafeAccessor().getUnsafe(), tClass, fieldName);
+// } else {
+// return new ReflectionReferenceFieldUpdater<>(tClass, fieldName);
+// }
+
+ return new ReflectionReferenceFieldUpdater<>(tClass, fieldName);
} catch (final Throwable t) {
throw new RuntimeException(t);
}