You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2019/03/28 06:14:19 UTC
[incubator-ratis] branch branch-0.3 updated: RATIS-475. Writing
data from LogStream fails with IllegalStateException
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new 06129ee RATIS-475. Writing data from LogStream fails with IllegalStateException
06129ee is described below
commit 06129ee706bd351f4022c500efc8df3cf43d4a4e
Author: Rajeshbabu Chintaguntla <ra...@apache.org>
AuthorDate: Wed Mar 27 16:08:39 2019 -0400
RATIS-475. Writing data from LogStream fails with IllegalStateException
Signed-off-by: Josh Elser <el...@apache.org>
---
.../apache/ratis/logservice/server/LogServer.java | 6 +-
.../ratis/logservice/tool/VerificationTool.java | 142 +++++++++++++++++++++
2 files changed, 144 insertions(+), 4 deletions(-)
diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServer.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServer.java
index d68a9f7..d25b0b3 100644
--- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServer.java
+++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServer.java
@@ -79,15 +79,13 @@ public class LogServer extends BaseServer {
RaftGroup meta = RaftGroup.valueOf(RaftGroupId.valueOf(opts.getMetaGroupId()), peers);
raftServer = RaftServer.newBuilder()
.setStateMachineRegistry(new StateMachine.Registry() {
- private final StateMachine managementMachine = new ManagementStateMachine();
- private final StateMachine logMachine = new LogStateMachine();
@Override
public StateMachine apply(RaftGroupId raftGroupId) {
// TODO this looks wrong. Why isn't this metaGroupId?
if(raftGroupId.equals(logServerGroupId)) {
- return managementMachine;
+ return new ManagementStateMachine();
}
- return logMachine;
+ return new LogStateMachine();
}
})
.setProperties(properties)
diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/tool/VerificationTool.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/tool/VerificationTool.java
new file mode 100644
index 0000000..a4b5c96
--- /dev/null
+++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/tool/VerificationTool.java
@@ -0,0 +1,142 @@
+package org.apache.ratis.logservice.tool;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.*;
+
+import org.apache.ratis.logservice.api.LogName;
+import org.apache.ratis.logservice.api.LogReader;
+import org.apache.ratis.logservice.api.LogStream;
+import org.apache.ratis.logservice.api.LogWriter;
+import org.apache.ratis.logservice.client.LogServiceClient;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import org.apache.ratis.logservice.server.LogStateMachine;
+import org.jline.utils.Log;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class VerificationTool {
+
+ public static final Logger LOG = LoggerFactory.getLogger(LogStateMachine.class);
+
+ @Parameter(names = {"-q", "--metaQuorum"}, description = "Metadata Service Quorum")
+ private String metaQuorum;
+ public static String LOG_NAME_PREFIX = "testlog";
+ public static String MESSAGE_PREFIX = "message";
+
+ private int numLogs = 10;
+ private int numRecords = 1000;
+
+ public static void main(String[] args) {
+ VerificationTool tool = new VerificationTool();
+ JCommander.newBuilder()
+ .addObject(tool)
+ .build()
+ .parse(args);
+ System.out.println(tool.metaQuorum);
+ LogServiceClient client = new LogServiceClient(tool.metaQuorum);
+ ExecutorService executor = Executors.newCachedThreadPool();
+ List<Future<?>> futures = new ArrayList<Future<?>>(tool.numLogs);
+ for (int i = 0; i < tool.numLogs; i++) {
+ BulkWriter writer = new BulkWriter(LOG_NAME_PREFIX + i, client, tool.numRecords);
+ futures.add(executor.submit(writer));
+ }
+ waitForCompletion(futures);
+ futures = new ArrayList<Future<?>>(tool.numLogs);
+ for (int i = 0; i < tool.numLogs; i++) {
+ BulkReader reader = new BulkReader(LOG_NAME_PREFIX + i, client, tool.numRecords);
+ futures.add(executor.submit(reader));
+ }
+ waitForCompletion(futures);
+ executor.shutdownNow();
+ }
+
+ private static void waitForCompletion(List<Future<?>> futures) {
+ for (Future<?> future : futures) {
+ try {
+ Object object = future.get();
+ if (object != null) {
+ LOG.error("Operation failed with error ", object);
+ System.exit(-1);
+ }
+ } catch (Exception e) {
+ LOG.error("Got exception ", e);
+ System.exit(-1);
+ }
+
+ }
+ }
+
+ static class BulkWriter implements Runnable {
+ private String logName;
+ private LogServiceClient logServiceClient;
+ private int numRecords;
+
+ BulkWriter(String logName, LogServiceClient logServiceClient, int numRecords) {
+ this.logName = logName;
+ this.logServiceClient = logServiceClient;
+ this.numRecords = numRecords;
+ }
+
+ public void run() {
+ try {
+ LogStream logStream = this.logServiceClient.createLog(LogName.of(logName));
+ LogWriter writer = logStream.createWriter();
+ for (int i = 0; i < this.numRecords; i++) {
+ String message = MESSAGE_PREFIX + i;
+ System.out.println(logName + " Writing " + message);
+ writer.write(ByteBuffer.wrap(message.getBytes(StandardCharsets.UTF_8)));
+ }
+ writer.close();
+ Log.info("" + numRecords + "log entries written to log "+ this.logName + " successfully.");
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ static class BulkReader implements Runnable {
+ private String logName;
+ private LogServiceClient logServiceClient;
+ private int numRecords;
+
+ BulkReader(String logName, LogServiceClient logServiceClient, int numRecords) {
+ this.logName = logName;
+ this.logServiceClient = logServiceClient;
+ this.numRecords = numRecords;
+ }
+
+ public void run() {
+ try {
+ LogStream logStream = this.logServiceClient.getLog(LogName.of(logName));
+ LogReader reader = logStream.createReader();
+ long size = logStream.getLength();
+ if(size != this.numRecords) {
+ Log.error("There is mismatch is number of records. Expected Records: "+
+ this.numRecords +", Actual Records: " + size);
+ System.exit(-1);
+ }
+ for (int i = 0; i < size; i++) {
+ ByteBuffer buffer = reader.readNext();
+ String message = new String(buffer.array(), buffer.arrayOffset(),
+ buffer.remaining(), StandardCharsets.UTF_8);
+ System.out.println(logName + " Read " + message);
+ if(!message.equals(MESSAGE_PREFIX + i)) {
+ Log.error("Message is not correct. Expected: "+(MESSAGE_PREFIX + i)
+ +". Actual:" +message);
+ System.exit(-1);
+ }
+ }
+ Log.info("" + numRecords + " log entries read from log "+ this.logName + " successfully.");
+ reader.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}