You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by el...@apache.org on 2018/12/05 03:03:10 UTC
incubator-ratis git commit: RATIS-378. Consolidate common logic
across logservice servers
Repository: incubator-ratis
Updated Branches:
refs/heads/master d72d9c6eb -> 046782430
RATIS-378. Consolidate common logic across logservice servers
Signed-off-by: Rajeshbabu Chintaguntla <ra...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/04678243
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/04678243
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/04678243
Branch: refs/heads/master
Commit: 046782430a9aee3094869ca2b63bcf0194c70e93
Parents: d72d9c6
Author: Josh Elser <el...@apache.org>
Authored: Wed Nov 28 12:59:14 2018 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Tue Dec 4 21:38:23 2018 -0500
----------------------------------------------------------------------
.../logservice/client/LogServiceClient.java | 2 +-
.../ratis/logservice/common/Constants.java | 6 +-
.../ratis/logservice/server/BaseServer.java | 94 ++++++++++
.../ratis/logservice/server/LogServer.java | 138 ++++++++++++++
.../ratis/logservice/server/MasterServer.java | 178 -------------------
.../logservice/server/MetaStateMachine.java | 21 ++-
.../ratis/logservice/server/MetadataServer.java | 146 +++++++++++++++
.../ratis/logservice/server/ServerOpts.java | 119 +++++++++++++
.../ratis/logservice/util/LogServiceUtils.java | 1 -
.../logservice/worker/LogServiceWorker.java | 161 -----------------
.../ratis/logservice/server/TestMetaServer.java | 14 +-
.../logservice/util/LogServiceCluster.java | 57 +++---
12 files changed, 558 insertions(+), 379 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/04678243/ratis-logservice/src/main/java/org/apache/ratis/logservice/client/LogServiceClient.java
----------------------------------------------------------------------
diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/client/LogServiceClient.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/client/LogServiceClient.java
index 5bdfb93..1e42ab4 100644
--- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/client/LogServiceClient.java
+++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/client/LogServiceClient.java
@@ -68,7 +68,7 @@ public class LogServiceClient implements AutoCloseable {
public LogServiceClient(String metaQuorum, LogServiceConfiguration config) {
Set<RaftPeer> peers = getPeersFromQuorum(metaQuorum);
RaftProperties properties = new RaftProperties();
- RaftGroup meta = RaftGroup.valueOf(Constants.metaGroupID, peers);
+ RaftGroup meta = RaftGroup.valueOf(Constants.META_GROUP_ID, peers);
client = RaftClient.newBuilder()
.setRaftGroup(meta)
.setClientId(ClientId.randomId())
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/04678243/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/Constants.java
----------------------------------------------------------------------
diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/Constants.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/Constants.java
index ba50154..2da892b 100644
--- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/Constants.java
+++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/Constants.java
@@ -24,8 +24,10 @@ import java.util.UUID;
public class Constants {
- final public static RaftGroupId metaGroupID = RaftGroupId.valueOf(new UUID(0,1));
+ public static final UUID META_GROUP_UUID = new UUID(0,1);
+ public static final RaftGroupId META_GROUP_ID = RaftGroupId.valueOf(META_GROUP_UUID);
- final public static RaftGroupId serversGroupID = RaftGroupId.valueOf(new UUID(0,2));
+ public static final UUID SERVERS_GROUP_UUID = new UUID(0,2);
+ public static final RaftGroupId SERVERS_GROUP_ID = RaftGroupId.valueOf(SERVERS_GROUP_UUID);
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/04678243/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/BaseServer.java
----------------------------------------------------------------------
diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/BaseServer.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/BaseServer.java
new file mode 100644
index 0000000..717d5b6
--- /dev/null
+++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/BaseServer.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.logservice.server;
+
+import java.io.Closeable;
+import java.net.InetSocketAddress;
+import java.util.Objects;
+
+import org.apache.ratis.logservice.util.LogServiceUtils;
+import org.apache.ratis.util.NetUtils;
+
+/**
+ * A base class to encapsulate functionality around a long-lived Java process which runs a state machine.
+ */
+public abstract class BaseServer implements Closeable {
+
+ private final ServerOpts opts;
+
+ public BaseServer(ServerOpts opts) {
+ this.opts = Objects.requireNonNull(opts);
+ }
+
+ public ServerOpts getServerOpts() {
+ return opts;
+ }
+
+ static ServerOpts buildOpts(String hostname, String metaQuorum, int port, String workingDir) {
+ ServerOpts opts = new ServerOpts();
+ opts.setHost(hostname);
+ opts.setMetaQuorum(metaQuorum);
+ opts.setPort(port);
+ opts.setWorkingDir(workingDir);
+ return opts;
+ }
+
+ public abstract static class Builder<T extends BaseServer> {
+ private ServerOpts opts = new ServerOpts();
+
+ protected ServerOpts getOpts() {
+ return opts;
+ }
+
+ public abstract T build();
+
+ public Builder<T> validate() {
+ if (opts.getPort() == -1) {
+ InetSocketAddress addr = NetUtils.createLocalServerAddress();
+ opts.setPort(addr.getPort());
+ }
+ if (opts.getHost() == null) {
+ opts.setHost(LogServiceUtils.getHostName());
+ }
+ if (opts.getWorkingDir() == null) {
+ throw new IllegalArgumentException("Working directory was not specified");
+ }
+ return this;
+ }
+
+ public Builder<T> setMetaQuorum(String meta) {
+ opts.setMetaQuorum(meta);
+ return this;
+ }
+
+ public Builder<T> setPort(int port) {
+ opts.setPort(port);
+ return this;
+ }
+
+ public Builder<T> setWorkingDir(String workingDir) {
+ opts.setWorkingDir(workingDir);
+ return this;
+ }
+
+ public Builder<T> setHostName(String hostName) {
+ opts.setHost(hostName);
+ return this;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/04678243/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServer.java
----------------------------------------------------------------------
diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServer.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServer.java
new file mode 100644
index 0000000..d68a9f7
--- /dev/null
+++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServer.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.logservice.server;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.Set;
+
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.logservice.util.LogServiceUtils;
+import org.apache.ratis.logservice.util.MetaServiceProtoUtil;
+import org.apache.ratis.netty.NettyConfigKeys;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.statemachine.StateMachine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.JCommander;
+
+public class LogServer extends BaseServer {
+ private static final Logger LOG = LoggerFactory.getLogger(LogServer.class);
+
+ private RaftServer raftServer = null;
+ private RaftClient metaClient = null;
+
+ public LogServer(ServerOpts opts) {
+ super(opts);
+ }
+
+ public RaftServer getServer() {
+ return raftServer;
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ public void start() throws IOException {
+ final ServerOpts opts = getServerOpts();
+ Set<RaftPeer> peers = LogServiceUtils.getPeersFromQuorum(opts.getMetaQuorum());
+ RaftProperties properties = new RaftProperties();
+ properties.set("raft.client.rpc.request.timeout", "100000");
+ GrpcConfigKeys.Server.setPort(properties, opts.getPort());
+ NettyConfigKeys.Server.setPort(properties, opts.getPort());
+ InetSocketAddress addr = new InetSocketAddress(opts.getHost(), opts.getPort());
+ if(opts.getWorkingDir() != null) {
+ RaftServerConfigKeys.setStorageDirs(properties, Collections.singletonList(new File(opts.getWorkingDir())));
+ }
+ String id = opts.getHost() +"_" + opts.getPort();
+ RaftPeer peer = new RaftPeer(RaftPeerId.valueOf(id), addr);
+ final RaftGroupId logServerGroupId = RaftGroupId.valueOf(opts.getLogServerGroupId());
+ RaftGroup all = RaftGroup.valueOf(logServerGroupId, peer);
+ RaftGroup meta = RaftGroup.valueOf(RaftGroupId.valueOf(opts.getMetaGroupId()), peers);
+ raftServer = RaftServer.newBuilder()
+ .setStateMachineRegistry(new StateMachine.Registry() {
+ private final StateMachine managementMachine = new ManagementStateMachine();
+ private final StateMachine logMachine = new LogStateMachine();
+ @Override
+ public StateMachine apply(RaftGroupId raftGroupId) {
+ // TODO this looks wrong. Why isn't this metaGroupId?
+ if(raftGroupId.equals(logServerGroupId)) {
+ return managementMachine;
+ }
+ return logMachine;
+ }
+ })
+ .setProperties(properties)
+ .setServerId(RaftPeerId.valueOf(id))
+ .setGroup(all)
+ .build();
+ raftServer.start();
+
+ metaClient = RaftClient.newBuilder()
+ .setRaftGroup(meta)
+ .setClientId(ClientId.randomId())
+ .setProperties(properties)
+ .build();
+ metaClient.send(() -> MetaServiceProtoUtil.toPingRequestProto(peer).toByteString());
+ }
+
+ public static void main(String[] args) throws IOException {
+ ServerOpts opts = new ServerOpts();
+ JCommander.newBuilder()
+ .addObject(opts)
+ .build()
+ .parse(args);
+
+ try (LogServer worker = new LogServer(opts)) {
+ worker.start();
+ while (true) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ }
+ }
+ }
+
+
+ public void close() throws IOException {
+ raftServer.close();
+ }
+
+ public static class Builder extends BaseServer.Builder<LogServer> {
+ public LogServer build() {
+ validate();
+ return new LogServer(getOpts());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/04678243/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MasterServer.java
----------------------------------------------------------------------
diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MasterServer.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MasterServer.java
deleted file mode 100644
index 8e9f7d3..0000000
--- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MasterServer.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ratis.logservice.server;
-
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.Parameter;
-import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.grpc.GrpcConfigKeys;
-import org.apache.ratis.logservice.common.Constants;
-import org.apache.ratis.logservice.util.LogServiceUtils;
-import org.apache.ratis.netty.NettyConfigKeys;
-import org.apache.ratis.protocol.*;
-import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.statemachine.StateMachine;
-import org.apache.ratis.util.FileUtils;
-import org.apache.ratis.util.LifeCycle;
-
-import java.io.Closeable;
-import java.io.File;
-import java.io.IOException;
-import java.net.*;
-import java.nio.file.Path;
-import java.util.Collections;
-import java.util.Set;
-
-import static org.apache.ratis.logservice.common.Constants.metaGroupID;
-import static org.apache.ratis.logservice.util.LogServiceUtils.getPeersFromQuorum;
-
-/**
- * Master quorum is responsible for tracking all available quorum members
- */
-public class MasterServer implements Closeable {
-
-
- // RaftServer internal server. Has meta raft group and MetaStateMachine
- private RaftServer server;
-
- private String id;
-
- private String host;
-
- @Parameter(names = "-port", description = "Port number")
- private int port = 9999;
-
- @Parameter(names = "-dir", description = "Working directory")
- private String workingDir = null;
-
- private StateMachine metaStateMachine;
-
- private LifeCycle lifeCycle;
-
- public MasterServer(String hostname, int port, String workingDir) {
- this.port = port;
- this.host = hostname;
- this.workingDir = workingDir;
- id = host + "_" + port;
- this.lifeCycle = new LifeCycle(this.id);
-
- }
-
- public MasterServer() {
-
- }
-
- public void start(String metaGroupId) throws IOException {
- if (host == null) {
- host = LogServiceUtils.getHostName();
- }
- this.lifeCycle = new LifeCycle(this.id);
- RaftProperties properties = new RaftProperties();
- if(workingDir != null) {
- RaftServerConfigKeys.setStorageDirs(properties, Collections.singletonList(new File(workingDir)));
- }
- GrpcConfigKeys.Server.setPort(properties, port);
- NettyConfigKeys.Server.setPort(properties, port);
- Set<RaftPeer> peers = getPeersFromQuorum(metaGroupId);
- RaftGroup metaGroup = RaftGroup.valueOf(Constants.metaGroupID, peers);
- metaStateMachine = new MetaStateMachine();
- server = RaftServer.newBuilder()
- .setGroup(metaGroup)
- .setServerId(RaftPeerId.valueOf(id))
- .setStateMachineRegistry(raftGroupId -> {
- if(raftGroupId.equals(metaGroupID)) {
- return metaStateMachine;
- }
- return null;
- })
- .setProperties(properties).build();
- lifeCycle.startAndTransition(() -> {
- server.start();
- }, IOException.class);
- }
-
- public static void main(String[] args) throws IOException {
- MasterServer master = new MasterServer();
- JCommander.newBuilder()
- .addObject(master)
- .build()
- .parse(args);
- master.start(null);
-
-
- }
- public static MasterServer.Builder newBuilder() {
- return new MasterServer.Builder();
- }
-
- @Override
- public void close() throws IOException {
- server.close();
- }
-
- public String getId() {
- return id;
- }
-
- public String getAddress() {
- return host + ":" + port;
- }
-
- public void cleanUp() throws IOException {
- FileUtils.deleteFully(new File(workingDir));
- }
-
- public static class Builder {
- private String host = null;
- private int port = 9999;
- private String workingDir = null;
-
- /**
- * @return a {@link MasterServer} object.
- */
- public MasterServer build() {
- if (host == null) {
- host = LogServiceUtils.getHostName();
- }
- return new MasterServer(host, port, workingDir);
- }
-
- /**
- * Set the server hostname.
- */
- public Builder setHost(String host) {
- this.host = host;
- return this;
- }
-
- /**
- * Set server port
- */
- public Builder setPort(int port) {
- this.port = port;
- return this;
- }
-
- public Builder setWorkingDir(String workingDir) {
- this.workingDir = workingDir;
- return this;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/04678243/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
----------------------------------------------------------------------
diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
index 6a386b8..24f5263 100644
--- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
+++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
@@ -18,9 +18,6 @@
package org.apache.ratis.logservice.server;
-import static org.apache.ratis.logservice.common.Constants.metaGroupID;
-import static org.apache.ratis.logservice.common.Constants.serversGroupID;
-
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
@@ -85,7 +82,7 @@ public class MetaStateMachine extends BaseStateMachine {
//Persisted map between log and RaftGroup
private Map<LogName, RaftGroup> map = new ConcurrentHashMap<>();
// List of the currently known peers.
- private final Set<RaftPeer> peers = new HashSet();
+ private final Set<RaftPeer> peers = new HashSet<>();
// keep a copy of raftServer to get group information.
private RaftServer raftServer;
@@ -101,6 +98,13 @@ public class MetaStateMachine extends BaseStateMachine {
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+ private RaftGroupId metadataGroupId;
+ private RaftGroupId logServerGroupId;
+
+ public MetaStateMachine(RaftGroupId metadataGroupId, RaftGroupId logServerGroupId) {
+ this.metadataGroupId = metadataGroupId;
+ this.logServerGroupId = logServerGroupId;
+ }
@Override
public void initialize(RaftServer server, RaftGroupId groupId, RaftStorage storage) throws IOException {
@@ -170,7 +174,9 @@ public class MetaStateMachine extends BaseStateMachine {
public CompletableFuture<Message> query(Message request) {
if (currentGroup == null) {
try {
- List<RaftGroup> x = StreamSupport.stream(raftServer.getGroups().spliterator(), false).filter(group -> group.getGroupId().equals(metaGroupID)).collect(Collectors.toList());
+ List<RaftGroup> x = StreamSupport.stream(raftServer.getGroups().spliterator(), false)
+ .filter(group -> group.getGroupId().equals(metadataGroupId))
+ .collect(Collectors.toList());
if (x.size() == 1) {
currentGroup = x.get(0);
}
@@ -221,7 +227,7 @@ public class MetaStateMachine extends BaseStateMachine {
RaftClient client = RaftClient.newBuilder()
.setProperties(properties)
.setClientId(ClientId.randomId())
- .setRaftGroup(RaftGroup.valueOf(serversGroupID, peer))
+ .setRaftGroup(RaftGroup.valueOf(logServerGroupId, peer))
.build();
try {
client.groupRemove(raftGroup.getGroupId(), true, peer.getId());
@@ -301,7 +307,8 @@ public class MetaStateMachine extends BaseStateMachine {
avail.add(pg);
});
peers.forEach(i -> {
- RaftClient client = RaftClient.newBuilder().setProperties(properties).setRaftGroup(RaftGroup.valueOf(serversGroupID, i)).build();
+ RaftClient client = RaftClient.newBuilder().setProperties(properties)
+ .setRaftGroup(RaftGroup.valueOf(logServerGroupId, i)).build();
try {
client.groupAdd(raftGroup, i.getId());
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/04678243/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetadataServer.java
----------------------------------------------------------------------
diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetadataServer.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetadataServer.java
new file mode 100644
index 0000000..1d658fe
--- /dev/null
+++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetadataServer.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.logservice.server;
+
+import com.beust.jcommander.JCommander;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.logservice.util.LogServiceUtils;
+import org.apache.ratis.netty.NettyConfigKeys;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.FileUtils;
+import org.apache.ratis.util.LifeCycle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Set;
+
+import static org.apache.ratis.logservice.common.Constants.META_GROUP_ID;
+import static org.apache.ratis.logservice.util.LogServiceUtils.getPeersFromQuorum;
+
+/**
+ * Master quorum is responsible for tracking all available quorum members
+ */
+public class MetadataServer extends BaseServer {
+ private static final Logger LOG = LoggerFactory.getLogger(MetadataServer.class);
+
+ // RaftServer internal server. Has meta raft group and MetaStateMachine
+ private RaftServer server;
+
+ private String id;
+
+ private StateMachine metaStateMachine;
+
+ private LifeCycle lifeCycle;
+
+ public MetadataServer(ServerOpts opts) {
+ super(opts);
+ LOG.debug("Metadata Server options: {}", opts);
+ this.id = opts.getHost() + "_" + opts.getPort();
+ this.lifeCycle = new LifeCycle(this.id);
+ }
+
+ public void start() throws IOException {
+ final ServerOpts opts = getServerOpts();
+ if (opts.getHost() == null) {
+ opts.setHost(LogServiceUtils.getHostName());
+ }
+ this.lifeCycle = new LifeCycle(this.id);
+ RaftProperties properties = new RaftProperties();
+ if(opts.getWorkingDir() != null) {
+ RaftServerConfigKeys.setStorageDirs(properties, Collections.singletonList(new File(opts.getWorkingDir())));
+ }
+ GrpcConfigKeys.Server.setPort(properties, opts.getPort());
+ NettyConfigKeys.Server.setPort(properties, opts.getPort());
+ Set<RaftPeer> peers = getPeersFromQuorum(opts.getMetaQuorum());
+ RaftGroupId raftMetaGroupId = RaftGroupId.valueOf(opts.getMetaGroupId());
+ RaftGroup metaGroup = RaftGroup.valueOf(raftMetaGroupId, peers);
+ metaStateMachine = new MetaStateMachine(raftMetaGroupId, RaftGroupId.valueOf(opts.getLogServerGroupId()));
+ server = RaftServer.newBuilder()
+ .setGroup(metaGroup)
+ .setServerId(RaftPeerId.valueOf(id))
+ .setStateMachineRegistry(raftGroupId -> {
+ if(raftGroupId.equals(META_GROUP_ID)) {
+ return metaStateMachine;
+ }
+ return null;
+ })
+ .setProperties(properties).build();
+ lifeCycle.startAndTransition(() -> {
+ server.start();
+ }, IOException.class);
+ }
+
+ public static void main(String[] args) throws IOException {
+ ServerOpts opts = new ServerOpts();
+ JCommander.newBuilder()
+ .addObject(opts)
+ .build()
+ .parse(args);
+
+ try (MetadataServer master = new MetadataServer(opts)) {
+ master.start();
+ while (true) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ }
+ }
+ }
+
+ public static MetadataServer.Builder newBuilder() {
+ return new MetadataServer.Builder();
+ }
+
+ @Override
+ public void close() throws IOException {
+ server.close();
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public String getAddress() {
+ return getServerOpts().getHost() + ":" + getServerOpts().getPort();
+ }
+
+ public void cleanUp() throws IOException {
+ FileUtils.deleteFully(new File(getServerOpts().getWorkingDir()));
+ }
+
+ public static class Builder extends BaseServer.Builder<MetadataServer> {
+ /**
+ * @return a {@link MetadataServer} object.
+ */
+ public MetadataServer build() {
+ validate();
+ return new MetadataServer(getOpts());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/04678243/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/ServerOpts.java
----------------------------------------------------------------------
diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/ServerOpts.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/ServerOpts.java
new file mode 100644
index 0000000..18e4ba6
--- /dev/null
+++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/ServerOpts.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.logservice.server;
+
+import java.util.UUID;
+
+import org.apache.ratis.logservice.common.Constants;
+
+import com.beust.jcommander.IStringConverter;
+import com.beust.jcommander.Parameter;
+
+/**
+ * Options that are common across metadata and log server classes.
+ */
+public class ServerOpts {
+ /**
+ * Converts a string into a UUID
+ */
+ static class UUIDConverter implements IStringConverter<UUID> {
+ @Override public UUID convert(String value) {
+ return UUID.fromString(value);
+ }
+ }
+
+ @Parameter(names = {"-h", "--hostname"}, description = "Hostname")
+ private String host;
+
+ @Parameter(names = {"-p", "--port"}, description = "Port number")
+ private int port = -1;
+
+ @Parameter(names = {"-d", "--dir"}, description = "Working directory")
+ private String workingDir = null;
+
+ @Parameter(names = {"-q", "--metaQuorum"}, description = "Metadata Service Quorum")
+ private String metaQuorum;
+
+ @Parameter(names = {"--metadataServerGroupId"}, description = "UUID corresponding to the RAFT metadata servers group",
+ converter = UUIDConverter.class)
+ private UUID metaGroupId = Constants.META_GROUP_UUID;
+
+ @Parameter(names = {"--logServerGroupId"}, description = "UUID corresponding to the RAFT log servers group",
+ converter = UUIDConverter.class)
+ private UUID logServerGroupId = Constants.SERVERS_GROUP_UUID;
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ public String getWorkingDir() {
+ return workingDir;
+ }
+
+ public void setWorkingDir(String workingDir) {
+ this.workingDir = workingDir;
+ }
+
+ public String getMetaQuorum() {
+ return metaQuorum;
+ }
+
+ public void setMetaQuorum(String metaQuorum) {
+ this.metaQuorum = metaQuorum;
+ }
+
+ public UUID getMetaGroupId() {
+ return metaGroupId;
+ }
+
+ public void setMetaGroupId(UUID metaGroupId) {
+ this.metaGroupId = metaGroupId;
+ }
+
+ public UUID getLogServerGroupId() {
+ return logServerGroupId;
+ }
+
+ public void setLogServerGroupId(UUID logServerGroupId) {
+ this.logServerGroupId = logServerGroupId;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Hostname=").append(host);
+ sb.append(", port=").append(port);
+ sb.append(", dir=").append(workingDir);
+ sb.append(", metaQuorum=").append(metaQuorum);
+ sb.append(", metaGroupId=").append(metaGroupId);
+ sb.append(", logServerGroupId=").append(logServerGroupId);
+ return sb.toString();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/04678243/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceUtils.java
----------------------------------------------------------------------
diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceUtils.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceUtils.java
index a25a7df..c44853f 100644
--- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceUtils.java
+++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceUtils.java
@@ -23,7 +23,6 @@ import org.apache.ratis.protocol.RaftPeerId;
import java.net.DatagramSocket;
import java.net.InetAddress;
-import java.net.SocketException;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/04678243/ratis-logservice/src/main/java/org/apache/ratis/logservice/worker/LogServiceWorker.java
----------------------------------------------------------------------
diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/worker/LogServiceWorker.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/worker/LogServiceWorker.java
deleted file mode 100644
index e8fd895..0000000
--- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/worker/LogServiceWorker.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ratis.logservice.worker;
-
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.Parameter;
-import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.grpc.GrpcConfigKeys;
-import org.apache.ratis.logservice.server.LogStateMachine;
-import org.apache.ratis.logservice.server.ManagementStateMachine;
-import org.apache.ratis.logservice.util.MetaServiceProtoUtil;
-import org.apache.ratis.logservice.util.LogServiceUtils;
-import org.apache.ratis.netty.NettyConfigKeys;
-import org.apache.ratis.protocol.*;
-import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.statemachine.StateMachine;
-import org.apache.ratis.util.NetUtils;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.Set;
-
-import static org.apache.ratis.logservice.common.Constants.metaGroupID;
-import static org.apache.ratis.logservice.common.Constants.serversGroupID;
-
-public class LogServiceWorker implements Cloneable{
-
- @Parameter(names = "-port", description = "Port number")
- private int port;
-
- @Parameter(names = "-dir", description = "Working directory")
- private String workingDir;
-
- @Parameter(names = "-meta", description = "Meta Quorum ID")
- private String metaIdentity;
- RaftServer raftServer = null;
- RaftClient metaClient = null;
-
- public LogServiceWorker() {
-
- }
- public LogServiceWorker(String meta, int port, String workingDir) {
- this.metaIdentity = meta;
- this.port = port;
- this.workingDir = workingDir;
- }
-
- public RaftServer getServer() {
- return raftServer;
- }
-
- public static Builder newBuilder() {
- return new Builder();
- }
-
- public void start() throws IOException {
- Set<RaftPeer> peers = LogServiceUtils.getPeersFromQuorum(metaIdentity);
- String host = LogServiceUtils.getHostName();
- RaftProperties properties = new RaftProperties();
- properties.set("raft.client.rpc.request.timeout", "100000");
- GrpcConfigKeys.Server.setPort(properties, port);
- NettyConfigKeys.Server.setPort(properties, port);
- InetSocketAddress addr = new InetSocketAddress(host,port);
- if(workingDir != null) {
- RaftServerConfigKeys.setStorageDirs(properties, Collections.singletonList(new File(workingDir)));
- }
- String id = host +"_" + port;
- RaftPeer peer = new RaftPeer(RaftPeerId.valueOf(id), addr);
- RaftGroup all = RaftGroup.valueOf(serversGroupID, peer);
- RaftGroup meta = RaftGroup.valueOf(metaGroupID, peers);
- raftServer = RaftServer.newBuilder()
- .setStateMachineRegistry(new StateMachine.Registry() {
- final StateMachine managementMachine = new ManagementStateMachine();
- final StateMachine logMachine = new LogStateMachine();
- @Override
- public StateMachine apply(RaftGroupId raftGroupId) {
- if(raftGroupId.equals(serversGroupID)) {
- return managementMachine;
- }
- return logMachine;
- }
- })
- .setProperties(properties)
- .setServerId(RaftPeerId.valueOf(id))
- .setGroup(all)
- .build();
- raftServer.start();
-
- metaClient = RaftClient.newBuilder()
- .setRaftGroup(meta)
- .setClientId(ClientId.randomId())
- .setProperties(properties)
- .build();
- metaClient.send(() -> MetaServiceProtoUtil.toPingRequestProto(peer).toByteString());
- }
-
- public static void main(String[] args) throws IOException {
- LogServiceWorker worker = new LogServiceWorker();
- JCommander.newBuilder()
- .addObject(worker)
- .build()
- .parse(args);
- worker.start();
-
-
- }
-
-
- public void close() throws IOException {
- raftServer.close();
- }
-
- public static class Builder {
- String meta;
- int port = -1;
- private String workingDir;
-
- public LogServiceWorker build() {
- if(port == -1) {
- InetSocketAddress addr = NetUtils.createLocalServerAddress();
- port = addr.getPort();
- }
- return new LogServiceWorker(meta, port, workingDir);
- }
- public Builder setMetaIdentity(String meta) {
- this.meta = meta;
- return this;
- }
- public Builder setPort(int port) {
- this.port = port;
- return this;
- }
-
- public Builder setWorkingDir(String workingDir) {
- this.workingDir = workingDir;
- return this;
- }
-
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/04678243/ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java
----------------------------------------------------------------------
diff --git a/ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java b/ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java
index 2f5c3d7..cbdd2b5 100644
--- a/ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java
+++ b/ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java
@@ -22,8 +22,8 @@ import org.apache.ratis.logservice.api.*;
import org.apache.ratis.logservice.client.LogServiceClient;
import org.apache.ratis.logservice.common.LogAlreadyExistException;
import org.apache.ratis.logservice.common.LogNotFoundException;
+import org.apache.ratis.logservice.server.LogServer;
import org.apache.ratis.logservice.util.LogServiceCluster;
-import org.apache.ratis.logservice.worker.LogServiceWorker;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.RaftServerProxy;
import org.junit.AfterClass;
@@ -48,13 +48,15 @@ public class TestMetaServer {
public static void beforeClass() {
cluster = new LogServiceCluster(3);
cluster.createWorkers(3);
- List<LogServiceWorker> workers = cluster.getWorkers();
+ List<LogServer> workers = cluster.getWorkers();
assert(workers.size() == 3);
}
@AfterClass
public static void afterClass() {
- cluster.close();
+ if (cluster != null) {
+ cluster.close();
+ }
}
/**
@@ -80,14 +82,14 @@ public class TestMetaServer {
ByteBuffer testMessage = ByteBuffer.wrap("Hello world!".getBytes());
List<LogInfo> listLogs = client.listLogs();
assert(listLogs.stream().filter(log -> log.getLogName().getName().startsWith("testReadWrite")).count() == 1);
- List<LogServiceWorker> workers = cluster.getWorkers();
- for(LogServiceWorker worker : workers) {
+ List<LogServer> workers = cluster.getWorkers();
+ for(LogServer worker : workers) {
RaftServerImpl server = ((RaftServerProxy)worker.getServer())
.getImpl(listLogs.get(0).getRaftGroup().getGroupId());
// TODO: perform all additional checks on state machine level
}
writer.write(testMessage);
- for(LogServiceWorker worker : workers) {
+ for(LogServer worker : workers) {
RaftServerImpl server = ((RaftServerProxy)worker.getServer())
.getImpl(listLogs.get(0).getRaftGroup().getGroupId());
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/04678243/ratis-logservice/src/test/java/org/apache/ratis/logservice/util/LogServiceCluster.java
----------------------------------------------------------------------
diff --git a/ratis-logservice/src/test/java/org/apache/ratis/logservice/util/LogServiceCluster.java b/ratis-logservice/src/test/java/org/apache/ratis/logservice/util/LogServiceCluster.java
index d1c688b..29999dd 100644
--- a/ratis-logservice/src/test/java/org/apache/ratis/logservice/util/LogServiceCluster.java
+++ b/ratis-logservice/src/test/java/org/apache/ratis/logservice/util/LogServiceCluster.java
@@ -19,16 +19,13 @@
package org.apache.ratis.logservice.util;
-import org.apache.commons.io.FileUtils;
import org.apache.ratis.BaseTest;
import org.apache.ratis.logservice.api.LogName;
import org.apache.ratis.logservice.api.LogStream;
-import org.apache.ratis.logservice.api.LogInfo;
import org.apache.ratis.logservice.client.LogServiceClient;
-import org.apache.ratis.logservice.worker.LogServiceWorker;
-import org.apache.ratis.logservice.server.MasterServer;
+import org.apache.ratis.logservice.server.LogServer;
+import org.apache.ratis.logservice.server.MetadataServer;
-import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -40,8 +37,8 @@ import java.util.stream.IntStream;
*/
public class LogServiceCluster implements AutoCloseable {
- private List<MasterServer> masters;
- private List<LogServiceWorker> workers = new ArrayList<>();
+ private List<MetadataServer> masters;
+ private List<LogServer> workers = new ArrayList<>();
private String baseTestDir = BaseTest.getRootTestDir().getAbsolutePath();
/**
@@ -50,9 +47,11 @@ public class LogServiceCluster implements AutoCloseable {
*/
public void createWorkers(int numWorkers) {
String meta = getMetaIdentity();
- List<LogServiceWorker> newWorkers = IntStream.range(0, numWorkers).parallel().mapToObj(i ->
- LogServiceWorker.newBuilder()
- .setMetaIdentity(meta)
+ List<LogServer> newWorkers = IntStream.range(0, numWorkers).parallel().mapToObj(i ->
+ LogServer.newBuilder()
+ .setHostName("localhost")
+ .setPort(10000 + i)
+ .setMetaQuorum(meta)
.setWorkingDir(baseTestDir + "/workers/" + i)
.build()).collect(Collectors.toList());
newWorkers.parallelStream().forEach( worker -> {
@@ -70,6 +69,7 @@ public class LogServiceCluster implements AutoCloseable {
* @return the string that represent the meta quorum ID that can can be used to manually create a worker nodes
*/
public String getMetaIdentity() {
+ // Nb. Can only be called after the masters have been instantiated.
return masters.stream().map(object -> object.getAddress()).collect(Collectors.joining(","));
}
@@ -80,17 +80,28 @@ public class LogServiceCluster implements AutoCloseable {
*/
public LogServiceCluster(int numServers) {
+ // Have to construct the meta quorum by hand -- `getMetaIdentity()` requires
+ // uses the masters to build the quorum (chicken and egg problem).
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < numServers; i++) {
+ if (sb.length() > 0) {
+ sb.append(",");
+ }
+ sb.append("localhost:").append(9000 + i);
+ }
+ String metaQuorum = sb.toString();
this.masters = IntStream.range(0, numServers).parallel().mapToObj(i ->
- MasterServer.newBuilder()
- .setHost(LogServiceUtils.getHostName())
+ MetadataServer.newBuilder()
+ .setHostName("localhost")
.setPort(9000 + i)
.setWorkingDir(baseTestDir + "/masters/" + i)
+ .setMetaQuorum(metaQuorum)
.build())
.collect(Collectors.toList());
masters.parallelStream().forEach(master -> {
try {
master.cleanUp();
- master.start(getMetaIdentity());
+ master.start();
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -103,15 +114,16 @@ public class LogServiceCluster implements AutoCloseable {
* @param logName
* @throws IOException
*/
- public LogStream createLog(LogName logName) throws IOException {
- LogServiceClient client = new LogServiceClient(getMetaIdentity());
- return client.createLog(logName);
+ public LogStream createLog(LogName logName) throws Exception {
+ try (LogServiceClient client = new LogServiceClient(getMetaIdentity())) {
+ return client.createLog(logName);
+ }
}
/**
* @return the current set of the workers
*/
- public List<LogServiceWorker> getWorkers() {
+ public List<LogServer> getWorkers() {
return workers;
}
@@ -119,7 +131,7 @@ public class LogServiceCluster implements AutoCloseable {
*
* @return the current set of the masters
*/
- public List<MasterServer> getMasters() {
+ public List<MetadataServer> getMasters() {
return masters;
}
@@ -146,10 +158,10 @@ public class LogServiceCluster implements AutoCloseable {
});
}
- public LogStream getLog(LogName logName) throws IOException {
- LogServiceClient client = new LogServiceClient(getMetaIdentity());
- return client.getLog(logName);
-
+ public LogStream getLog(LogName logName) throws Exception {
+ try (LogServiceClient client = new LogServiceClient(getMetaIdentity())) {
+ return client.getLog(logName);
+ }
}
/**
@@ -157,6 +169,5 @@ public class LogServiceCluster implements AutoCloseable {
*/
public void cleanUp() {
// FileUtils.deleteDirectory();
-
}
}