You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2019/03/28 06:14:19 UTC

[incubator-ratis] branch branch-0.3 updated: RATIS-475. Writing data from LogStream fails with IllegalStateException

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new 06129ee  RATIS-475. Writing data from LogStream fails with IllegalStateException
06129ee is described below

commit 06129ee706bd351f4022c500efc8df3cf43d4a4e
Author: Rajeshbabu Chintaguntla <ra...@apache.org>
AuthorDate: Wed Mar 27 16:08:39 2019 -0400

    RATIS-475. Writing data from LogStream fails with IllegalStateException
    
    Signed-off-by: Josh Elser <el...@apache.org>
---
 .../apache/ratis/logservice/server/LogServer.java  |   6 +-
 .../ratis/logservice/tool/VerificationTool.java    | 142 +++++++++++++++++++++
 2 files changed, 144 insertions(+), 4 deletions(-)

diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServer.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServer.java
index d68a9f7..d25b0b3 100644
--- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServer.java
+++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServer.java
@@ -79,15 +79,13 @@ public class LogServer extends BaseServer {
         RaftGroup meta = RaftGroup.valueOf(RaftGroupId.valueOf(opts.getMetaGroupId()), peers);
         raftServer = RaftServer.newBuilder()
                 .setStateMachineRegistry(new StateMachine.Registry() {
-                    private final StateMachine managementMachine = new ManagementStateMachine();
-                    private final StateMachine logMachine  = new LogStateMachine();
                     @Override
                     public StateMachine apply(RaftGroupId raftGroupId) {
                         // TODO this looks wrong. Why isn't this metaGroupId?
                         if(raftGroupId.equals(logServerGroupId)) {
-                            return managementMachine;
+                            return new ManagementStateMachine();
                         }
-                        return logMachine;
+                        return new LogStateMachine();
                     }
                 })
                 .setProperties(properties)
diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/tool/VerificationTool.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/tool/VerificationTool.java
new file mode 100644
index 0000000..a4b5c96
--- /dev/null
+++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/tool/VerificationTool.java
@@ -0,0 +1,142 @@
+package org.apache.ratis.logservice.tool;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.*;
+
+import org.apache.ratis.logservice.api.LogName;
+import org.apache.ratis.logservice.api.LogReader;
+import org.apache.ratis.logservice.api.LogStream;
+import org.apache.ratis.logservice.api.LogWriter;
+import org.apache.ratis.logservice.client.LogServiceClient;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import org.apache.ratis.logservice.server.LogStateMachine;
+import org.jline.utils.Log;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class VerificationTool {
+
+    public static final Logger LOG = LoggerFactory.getLogger(LogStateMachine.class);
+
+    @Parameter(names = {"-q", "--metaQuorum"}, description = "Metadata Service Quorum")
+    private String metaQuorum;
+    public static String LOG_NAME_PREFIX = "testlog";
+    public static String MESSAGE_PREFIX = "message";
+
+    private int numLogs = 10;
+    private int numRecords = 1000;
+
+    public static void main(String[] args) {
+        VerificationTool tool = new VerificationTool();
+        JCommander.newBuilder()
+                .addObject(tool)
+                .build()
+                .parse(args);
+        System.out.println(tool.metaQuorum);
+        LogServiceClient client = new LogServiceClient(tool.metaQuorum);
+        ExecutorService executor = Executors.newCachedThreadPool();
+        List<Future<?>> futures = new ArrayList<Future<?>>(tool.numLogs);
+        for (int i = 0; i < tool.numLogs; i++) {
+            BulkWriter writer = new BulkWriter(LOG_NAME_PREFIX + i, client, tool.numRecords);
+            futures.add(executor.submit(writer));
+        }
+        waitForCompletion(futures);
+        futures = new ArrayList<Future<?>>(tool.numLogs);
+        for (int i = 0; i < tool.numLogs; i++) {
+            BulkReader reader = new BulkReader(LOG_NAME_PREFIX + i, client, tool.numRecords);
+            futures.add(executor.submit(reader));
+        }
+        waitForCompletion(futures);
+        executor.shutdownNow();
+    }
+
+    private static void waitForCompletion(List<Future<?>> futures) {
+        for (Future<?> future : futures) {
+            try {
+                Object object = future.get();
+                if (object != null) {
+                    LOG.error("Operation failed with error ", object);
+                    System.exit(-1);
+                }
+            } catch (Exception e) {
+                LOG.error("Got exception ", e);
+                System.exit(-1);
+            }
+
+        }
+    }
+
+    static class BulkWriter implements Runnable {
+        private String logName;
+        private LogServiceClient logServiceClient;
+        private int numRecords;
+
+        BulkWriter(String logName, LogServiceClient logServiceClient, int numRecords) {
+            this.logName = logName;
+            this.logServiceClient = logServiceClient;
+            this.numRecords = numRecords;
+        }
+
+        public void run() {
+            try {
+                LogStream logStream = this.logServiceClient.createLog(LogName.of(logName));
+                LogWriter writer = logStream.createWriter();
+                for (int i = 0; i < this.numRecords; i++) {
+                    String message = MESSAGE_PREFIX + i;
+                    System.out.println(logName + " Writing " + message);
+                    writer.write(ByteBuffer.wrap(message.getBytes(StandardCharsets.UTF_8)));
+                }
+                writer.close();
+                Log.info("" + numRecords + "log entries written to log "+ this.logName + " successfully.");
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    static class BulkReader implements Runnable {
+        private String logName;
+        private LogServiceClient logServiceClient;
+        private int numRecords;
+
+        BulkReader(String logName, LogServiceClient logServiceClient, int numRecords) {
+            this.logName = logName;
+            this.logServiceClient = logServiceClient;
+            this.numRecords = numRecords;
+        }
+
+        public void run() {
+            try {
+                LogStream logStream = this.logServiceClient.getLog(LogName.of(logName));
+                LogReader reader = logStream.createReader();
+                long size = logStream.getLength();
+                if(size != this.numRecords) {
+                    Log.error("There is mismatch is number of records. Expected Records: "+
+                            this.numRecords +", Actual Records: " + size);
+                    System.exit(-1);
+                }
+                for (int i = 0; i < size; i++) {
+                    ByteBuffer buffer = reader.readNext();
+                    String message = new String(buffer.array(), buffer.arrayOffset(),
+                            buffer.remaining(), StandardCharsets.UTF_8);
+                    System.out.println(logName + " Read " + message);
+                    if(!message.equals(MESSAGE_PREFIX + i)) {
+                        Log.error("Message is not correct. Expected: "+(MESSAGE_PREFIX + i)
+                                +". Actual:" +message);
+                        System.exit(-1);
+                    }
+                }
+                Log.info("" + numRecords + " log entries read from log "+ this.logName + " successfully.");
+                reader.close();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+}