You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2021/12/20 01:47:27 UTC

[iotdb] 01/01: add CRaft

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch expr_plus
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b5ac3f7667fc60669f133f4a46cc1c84369e5b7f
Author: jt <jt...@163.com>
AuthorDate: Mon Dec 20 09:46:28 2021 +0800

    add CRaft
---
 cluster/distribute-ecs.sh                          |  10 +
 .../org/apache/iotdb/cluster/expr/ExprBench.java   |   4 +-
 .../org/apache/iotdb/cluster/expr/ExprMember.java  |   5 +-
 .../org/apache/iotdb/cluster/expr/ExprServer.java  |   5 +-
 .../apache/iotdb/cluster/expr/VotingLogList.java   |   8 +-
 .../iotdb/cluster/log/FragmentedLogDispatcher.java | 110 ++++
 .../java/org/apache/iotdb/cluster/log/Log.java     |  21 +-
 .../apache/iotdb/cluster/log/LogDispatcher.java    |  32 +-
 .../org/apache/iotdb/cluster/log/LogParser.java    |   6 +
 .../org/apache/iotdb/cluster/log/VotingLog.java    |  11 +-
 .../iotdb/cluster/log/applier/MetaLogApplier.java  |   3 +-
 .../iotdb/cluster/log/logtypes/FragmentedLog.java  | 140 ++++
 .../iotdb/cluster/log/manage/RaftLogManager.java   |  16 +-
 .../log/sequencing/AsynchronousSequencer.java      |   9 +-
 .../handlers/caller/AppendNodeEntryHandler.java    |  22 +-
 .../server/heartbeat/MetaHeartbeatServer.java      |   5 +-
 .../iotdb/cluster/server/member/RaftMember.java    | 135 ++--
 .../apache/iotdb/cluster/server/monitor/Timer.java |   8 +
 .../iotdb/cluster/utils/encode/rs/Galois.java      | 722 +++++++++++++++++++++
 .../iotdb/cluster/utils/encode/rs/Matrix.java      | 312 +++++++++
 .../iotdb/cluster/utils/encode/rs/ReedSolomon.java | 359 ++++++++++
 .../cluster/utils/encode/rs/SampleDecoder.java     |  98 +++
 .../cluster/utils/encode/rs/SampleEncoder.java     |  91 +++
 23 files changed, 2041 insertions(+), 91 deletions(-)

diff --git a/cluster/distribute-ecs.sh b/cluster/distribute-ecs.sh
new file mode 100644
index 0000000..84874f2
--- /dev/null
+++ b/cluster/distribute-ecs.sh
@@ -0,0 +1,10 @@
+src_lib_path=/e/codestore/incubator-iotdb2/cluster/target/iotdb-cluster-0.13.0-SNAPSHOT/lib/iotdb*
+
+ips=(ecs1 ecs2 ecs3 ecs4 ecs5)
+target_lib_path=/root/iotdb_expr/lib
+
+for ip in ${ips[*]}
+  do
+    ssh root@$ip "mkdir $target_lib_path"
+    scp -r $src_lib_path root@$ip:$target_lib_path
+  done
\ No newline at end of file
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
index d0891a3..8115e44 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
@@ -43,6 +43,7 @@ public class ExprBench {
   private long maxLatency = 0;
   private int threadNum = 64;
   private int workloadSize = 64 * 1024;
+  private int printInterval = 1000;
   private SyncClientPool clientPool;
   private Node target;
   private int maxRequestNum;
@@ -84,7 +85,7 @@ public class ExprBench {
                 e.printStackTrace();
               }
 
-              if (currRequsetNum % 1000 == 0) {
+              if (currRequsetNum % printInterval == 0) {
                 long elapsedTime = System.currentTimeMillis() - startTime;
                 System.out.println(
                     String.format(
@@ -118,6 +119,7 @@ public class ExprBench {
     bench.maxRequestNum = Integer.parseInt(args[2]);
     bench.threadNum = Integer.parseInt(args[3]);
     bench.workloadSize = Integer.parseInt(args[4]) * 1024;
+    bench.printInterval = Integer.parseInt(args[5]);
     bench.benchmark();
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java
index 04e6b97..bcc26ce 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java
@@ -133,7 +133,10 @@ public class ExprMember extends MetaGroupMember {
         latch.await();
         return StatusUtils.OK;
       }
-      return processNonPartitionedMetaPlan(plan);
+      long startTime = Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY.getOperationStartTime();
+      TSStatus tsStatus = processNonPartitionedMetaPlan(plan);
+      Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY.calOperationCostTimeFromStart(startTime);
+      return tsStatus;
     } catch (Exception e) {
       logger.error("Exception in processing plan", e);
       return StatusUtils.INTERNAL_ERROR.deepCopy().setMessage(e.getMessage());
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprServer.java
index 6aac585..ab2468e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprServer.java
@@ -61,8 +61,7 @@ public class ExprServer extends MetaClusterServer {
 
   @Override
   protected TServerTransport getServerSocket() throws TTransportException {
-    return new TServerSocket(
-        new InetSocketAddress(thisNode.getInternalIp(), thisNode.getMetaPort()));
+    return new TServerSocket(new InetSocketAddress("0.0.0.0", thisNode.getMetaPort()));
   }
 
   public static void main(String[] args)
@@ -82,6 +81,7 @@ public class ExprServer extends MetaClusterServer {
     boolean enableCommitReturn = Boolean.parseBoolean(args[7]);
     int maxBatchSize = Integer.parseInt(args[8]);
     int defaultLogBufferSize = Integer.parseInt(args[9]);
+    boolean useCRaft = Boolean.parseBoolean(args[10]);
 
     ClusterDescriptor.getInstance().getConfig().setSeedNodeUrls(Arrays.asList(allNodeStr));
     ClusterDescriptor.getInstance().getConfig().setInternalMetaPort(port);
@@ -98,6 +98,7 @@ public class ExprServer extends MetaClusterServer {
     ExprMember.ENABLE_WEAK_ACCEPTANCE = enableWeakAcceptance;
     ExprMember.ENABLE_COMMIT_RETURN = enableCommitReturn;
     Log.DEFAULT_BUFFER_SIZE = defaultLogBufferSize * 1024 + 512;
+    RaftMember.USE_CRAFT = useCRaft;
 
     ExprServer server = new ExprServer();
     server.start();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java
index 2723ac8..a1567d0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java
@@ -79,7 +79,7 @@ public class VotingLogList {
           if (votingLog.getStronglyAcceptedNodeIds().size()
                   + votingLog.getWeaklyAcceptedNodeIds().size()
               >= quorumSize) {
-            votingLog.acceptedTime = System.nanoTime();
+            votingLog.acceptedTime.set(System.nanoTime());
           }
         } else if (votingLog.getLog().getCurrLogIndex() > index) {
           break;
@@ -93,9 +93,9 @@ public class VotingLogList {
 
     if (lastEntryIndexToCommit != -1) {
       for (VotingLog acceptedLog : acceptedLogs) {
-        synchronized (acceptedLog) {
-          acceptedLog.acceptedTime = System.nanoTime();
-          acceptedLog.notifyAll();
+        synchronized (acceptedLog.getStronglyAcceptedNodeIds()) {
+          acceptedLog.acceptedTime.set(System.nanoTime());
+          acceptedLog.getStronglyAcceptedNodeIds().notifyAll();
         }
       }
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java
new file mode 100644
index 0000000..f8954ef
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.log;
+
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.log.logtypes.FragmentedLog;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.server.member.RaftMember;
+import org.apache.iotdb.cluster.server.monitor.Timer;
+import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class FragmentedLogDispatcher extends LogDispatcher {
+
+  private static final Logger logger = LoggerFactory.getLogger(FragmentedLogDispatcher.class);
+
+  public FragmentedLogDispatcher(RaftMember member) {
+    super(member);
+  }
+
+  public void offer(SendLogRequest request) {
+    // do serialization here to avoid taking LogManager for too long
+
+    long startTime = Statistic.LOG_DISPATCHER_LOG_ENQUEUE.getOperationStartTime();
+    request.getVotingLog().getLog().setEnqueueTime(System.nanoTime());
+    for (int i = 0; i < nodesLogQueues.size(); i++) {
+      BlockingQueue<SendLogRequest> nodeLogQueue = nodesLogQueues.get(i);
+      SendLogRequest fragmentedRequest = new SendLogRequest(request);
+      fragmentedRequest.setVotingLog(new VotingLog(request.getVotingLog()));
+      fragmentedRequest
+          .getVotingLog()
+          .setLog(new FragmentedLog((FragmentedLog) request.getVotingLog().getLog(), i));
+      try {
+        boolean addSucceeded;
+        if (ClusterDescriptor.getInstance().getConfig().isWaitForSlowNode()) {
+          addSucceeded =
+              nodeLogQueue.offer(
+                  fragmentedRequest,
+                  ClusterDescriptor.getInstance().getConfig().getWriteOperationTimeoutMS(),
+                  TimeUnit.MILLISECONDS);
+        } else {
+          addSucceeded = nodeLogQueue.add(fragmentedRequest);
+        }
+
+        if (!addSucceeded) {
+          logger.debug(
+              "Log queue[{}] of {} is full, ignore the request to this node", i, member.getName());
+        } else {
+          request.setEnqueueTime(System.nanoTime());
+        }
+      } catch (IllegalStateException e) {
+        logger.debug(
+            "Log queue[{}] of {} is full, ignore the request to this node", i, member.getName());
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    }
+    Statistic.LOG_DISPATCHER_LOG_ENQUEUE.calOperationCostTimeFromStart(startTime);
+
+    if (Timer.ENABLE_INSTRUMENTING) {
+      Statistic.LOG_DISPATCHER_FROM_CREATE_TO_ENQUEUE.calOperationCostTimeFromStart(
+          request.getVotingLog().getLog().getCreateTime());
+    }
+  }
+
+  LogDispatcher.DispatcherThread newDispatcherThread(
+      Node node, BlockingQueue<SendLogRequest> logBlockingQueue) {
+    return new DispatcherThread(node, logBlockingQueue);
+  }
+
+  class DispatcherThread extends LogDispatcher.DispatcherThread {
+
+    DispatcherThread(Node receiver, BlockingQueue<SendLogRequest> logBlockingDeque) {
+      super(receiver, logBlockingDeque);
+    }
+
+    @Override
+    protected void serializeEntries() {
+      for (SendLogRequest request : currBatch) {
+        Timer.Statistic.LOG_DISPATCHER_LOG_IN_QUEUE.calOperationCostTimeFromStart(
+            request.getVotingLog().getLog().getEnqueueTime());
+        long start = Statistic.RAFT_SENDER_SERIALIZE_LOG.getOperationStartTime();
+        request.getAppendEntryRequest().entry = request.getVotingLog().getLog().serialize();
+        Statistic.RAFT_SENDER_SERIALIZE_LOG.calOperationCostTimeFromStart(start);
+      }
+    }
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
index 4845bba..01d8541 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
@@ -43,8 +43,10 @@ public abstract class Log implements Comparable<Log> {
   @SuppressWarnings("java:S3077")
   private volatile Exception exception;
 
+  private long receiveTime;
   private long createTime;
   private long enqueueTime;
+  private long sequenceStartTime;
 
   private int byteSize = 0;
 
@@ -63,7 +65,8 @@ public abstract class Log implements Comparable<Log> {
     CLOSE_FILE,
     REMOVE_NODE,
     EMPTY_CONTENT,
-    TEST_LARGE_CONTENT
+    TEST_LARGE_CONTENT,
+    FRAGMENTED
   }
 
   public long getCurrLogIndex() {
@@ -147,4 +150,20 @@ public abstract class Log implements Comparable<Log> {
   public void setByteSize(int byteSize) {
     this.byteSize = byteSize;
   }
+
+  public long getReceiveTime() {
+    return receiveTime;
+  }
+
+  public void setReceiveTime(long receiveTime) {
+    this.receiveTime = receiveTime;
+  }
+
+  public long getSequenceStartTime() {
+    return sequenceStartTime;
+  }
+
+  public void setSequenceStartTime(long sequenceStartTime) {
+    this.sequenceStartTime = sequenceStartTime;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index beaaa77..17b853c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -51,6 +51,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -187,6 +188,15 @@ public class LogDispatcher {
       this.setQuorumSize(quorumSize);
     }
 
+    public SendLogRequest(SendLogRequest request) {
+      this.setVotingLog(request.votingLog);
+      this.setLeaderShipStale(request.leaderShipStale);
+      this.setNewLeaderTerm(request.newLeaderTerm);
+      this.setAppendEntryRequest(request.appendEntryRequest);
+      this.setQuorumSize(request.quorumSize);
+      this.setEnqueueTime(request.enqueueTime);
+    }
+
     public VotingLog getVotingLog() {
       return votingLog;
     }
@@ -245,7 +255,7 @@ public class LogDispatcher {
 
     Node receiver;
     private BlockingQueue<SendLogRequest> logBlockingDeque;
-    private List<SendLogRequest> currBatch = new ArrayList<>();
+    protected List<SendLogRequest> currBatch = new ArrayList<>();
     private Peer peer;
     Client client;
 
@@ -275,13 +285,7 @@ public class LogDispatcher {
             logger.debug("Sending {} logs to {}", currBatch.size(), receiver);
           }
           Statistic.LOG_DISPATCHER_LOG_BATCH_SIZE.add(currBatch.size());
-          for (SendLogRequest request : currBatch) {
-            Timer.Statistic.LOG_DISPATCHER_LOG_IN_QUEUE.calOperationCostTimeFromStart(
-                request.getVotingLog().getLog().getEnqueueTime());
-            long start = Statistic.RAFT_SENDER_SERIALIZE_LOG.getOperationStartTime();
-            request.getAppendEntryRequest().entry = request.serializedLogFuture.get();
-            Statistic.RAFT_SENDER_SERIALIZE_LOG.calOperationCostTimeFromStart(start);
-          }
+          serializeEntries();
           sendBatchLogs(currBatch);
           currBatch.clear();
         }
@@ -293,6 +297,16 @@ public class LogDispatcher {
       logger.info("Dispatcher exits");
     }
 
+    protected void serializeEntries() throws ExecutionException, InterruptedException {
+      for (SendLogRequest request : currBatch) {
+        Timer.Statistic.LOG_DISPATCHER_LOG_IN_QUEUE.calOperationCostTimeFromStart(
+            request.getVotingLog().getLog().getEnqueueTime());
+        long start = Statistic.RAFT_SENDER_SERIALIZE_LOG.getOperationStartTime();
+        request.getAppendEntryRequest().entry = request.serializedLogFuture.get();
+        Statistic.RAFT_SENDER_SERIALIZE_LOG.calOperationCostTimeFromStart(start);
+      }
+    }
+
     private void appendEntriesAsync(
         List<ByteBuffer> logList, AppendEntriesRequest request, List<SendLogRequest> currBatch)
         throws TException {
@@ -424,7 +438,7 @@ public class LogDispatcher {
               peer,
               logRequest.quorumSize);
       // TODO add async interface
-      int retries = 5;
+      int retries = 50;
       try {
         long operationStartTime = Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
         for (int i = 0; i < retries; i++) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java
index e35943b..26f8a42 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.cluster.log.Log.Types;
 import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
 import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
 import org.apache.iotdb.cluster.log.logtypes.EmptyContentLog;
+import org.apache.iotdb.cluster.log.logtypes.FragmentedLog;
 import org.apache.iotdb.cluster.log.logtypes.LargeTestLog;
 import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
 import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
@@ -101,6 +102,11 @@ public class LogParser {
         largeLog.deserialize(buffer);
         log = largeLog;
         break;
+      case FRAGMENTED:
+        FragmentedLog fragmentedLog = new FragmentedLog();
+        fragmentedLog.deserialize(buffer);
+        log = fragmentedLog;
+        break;
       default:
         throw new IllegalArgumentException(type.toString());
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
index d56f3f0..46b5c02 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
@@ -22,18 +22,27 @@ package org.apache.iotdb.cluster.log;
 import java.nio.ByteBuffer;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class VotingLog {
   protected Log log;
   protected Set<Integer> stronglyAcceptedNodeIds;
   protected Set<Integer> weaklyAcceptedNodeIds;
-  public long acceptedTime;
+  public AtomicLong acceptedTime;
   public volatile ByteBuffer serializedCache;
 
   public VotingLog(Log log, int groupSize) {
     this.log = log;
     stronglyAcceptedNodeIds = new HashSet<>(groupSize);
     weaklyAcceptedNodeIds = new HashSet<>(groupSize);
+    acceptedTime = new AtomicLong();
+  }
+
+  public VotingLog(VotingLog another) {
+    this.log = another.log;
+    this.stronglyAcceptedNodeIds = another.stronglyAcceptedNodeIds;
+    this.weaklyAcceptedNodeIds = another.weaklyAcceptedNodeIds;
+    this.acceptedTime = another.acceptedTime;
   }
 
   public Log getLog() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
index 1c501f8..af3163c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.cluster.exception.ChangeMembershipException;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
 import org.apache.iotdb.cluster.log.logtypes.EmptyContentLog;
+import org.apache.iotdb.cluster.log.logtypes.FragmentedLog;
 import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
 import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.apache.iotdb.cluster.server.NodeCharacter;
@@ -52,7 +53,7 @@ public class MetaLogApplier extends BaseApplier {
         applyPhysicalPlan(((PhysicalPlanLog) log).getPlan(), null);
       } else if (log instanceof RemoveNodeLog) {
         applyRemoveNodeLog((RemoveNodeLog) log);
-      } else if (log instanceof EmptyContentLog) {
+      } else if (log instanceof EmptyContentLog || log instanceof FragmentedLog) {
         // Do nothing
       } else {
         logger.error("Unsupported log: {} {}", log.getClass().getName(), log);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/FragmentedLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/FragmentedLog.java
new file mode 100644
index 0000000..7186098
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/FragmentedLog.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.log.logtypes;
+
+import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.utils.encode.rs.ReedSolomon;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import static org.apache.iotdb.cluster.log.Log.Types.FRAGMENTED;
+
+public class FragmentedLog extends Log {
+
+  private byte[][] logFragments;
+  private int logLength;
+  private boolean[] fragmentPresent;
+  private int shardLength;
+  private int dataShardNum;
+  private int parityShardNum;
+
+  public FragmentedLog() {}
+
+  public FragmentedLog(Log base, int nodeNum) {
+
+    ByteBuffer baseBuffer = base.serialize();
+    logLength = baseBuffer.limit();
+    int followerNum = nodeNum - 1;
+    parityShardNum = nodeNum / 2 - 1;
+    dataShardNum = followerNum - parityShardNum;
+    shardLength = logLength / dataShardNum + 1;
+
+    logFragments = new byte[parityShardNum + dataShardNum][];
+    fragmentPresent = new boolean[parityShardNum + dataShardNum];
+    for (int i = 0; i < parityShardNum + dataShardNum; i++) {
+      logFragments[i] = new byte[shardLength];
+      fragmentPresent[i] = true;
+    }
+
+    int start = 0;
+    int end = Math.min(start + shardLength, logLength);
+    for (int i = 0; i < dataShardNum; i++) {
+      System.arraycopy(baseBuffer.array(), start, logFragments[i], 0, end - start);
+      start += shardLength;
+      end = Math.min(start + shardLength, logLength);
+    }
+
+    ReedSolomon reedSolomon = new ReedSolomon(dataShardNum, parityShardNum);
+    reedSolomon.encodeParity(logFragments, 0, shardLength);
+  }
+
+  public FragmentedLog(FragmentedLog parent, int fragmentIndex) {
+    setCurrLogIndex(parent.getCurrLogIndex());
+    setCurrLogTerm(parent.getCurrLogTerm());
+    setCreateTime(parent.getCreateTime());
+    setEnqueueTime(parent.getEnqueueTime());
+    setReceiveTime(parent.getReceiveTime());
+
+    this.logFragments = parent.logFragments;
+    this.fragmentPresent = new boolean[logFragments.length];
+    fragmentPresent[fragmentIndex] = true;
+    this.logLength = parent.logLength;
+    this.dataShardNum = parent.dataShardNum;
+    this.parityShardNum = parent.parityShardNum;
+    this.shardLength = parent.shardLength;
+  }
+
+  @Override
+  public ByteBuffer serialize() {
+    PublicBAOS byteArrayOutputStream = new PublicBAOS(DEFAULT_BUFFER_SIZE);
+    try (DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
+      dataOutputStream.writeByte((byte) FRAGMENTED.ordinal());
+
+      dataOutputStream.writeLong(getCurrLogIndex());
+      dataOutputStream.writeLong(getCurrLogTerm());
+
+      dataOutputStream.writeInt(logLength);
+      dataOutputStream.writeInt(dataShardNum);
+      dataOutputStream.writeInt(parityShardNum);
+      dataOutputStream.writeInt(shardLength);
+
+      for (int i = 0, fragmentPresentLength = fragmentPresent.length;
+          i < fragmentPresentLength;
+          i++) {
+        boolean present = fragmentPresent[i];
+        dataOutputStream.writeBoolean(present);
+        if (present) {
+          dataOutputStream.write(logFragments[i]);
+        }
+      }
+    } catch (IOException e) {
+      // unreachable
+    }
+
+    return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
+  }
+
+  @Override
+  public void deserialize(ByteBuffer buffer) {
+    setCurrLogIndex(buffer.getLong());
+    setCurrLogTerm(buffer.getLong());
+
+    logLength = buffer.getInt();
+    dataShardNum = buffer.getInt();
+    parityShardNum = buffer.getInt();
+    shardLength = buffer.getInt();
+
+    logFragments = new byte[dataShardNum + parityShardNum][shardLength];
+    fragmentPresent = new boolean[dataShardNum + parityShardNum];
+
+    for (int i = 0, fragmentPresentLength = fragmentPresent.length;
+        i < fragmentPresentLength;
+        i++) {
+      boolean present = buffer.get() == 1;
+      fragmentPresent[i] = present;
+      if (present) {
+        buffer.get(logFragments[i], 0, shardLength);
+      }
+    }
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index 4b445df..46ee69c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -65,7 +65,7 @@ public abstract class RaftLogManager {
   /** manage committed entries in disk for safety */
   private StableEntryManager stableEntryManager;
 
-  private long commitIndex;
+  private volatile long commitIndex;
 
   /**
    * The committed logs whose index is smaller than this are all have been applied, for example,
@@ -693,7 +693,7 @@ public abstract class RaftLogManager {
 
     long unappliedLogSize = commitLogIndex - maxHaveAppliedCommitIndex;
     if (unappliedLogSize > ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem()) {
-      logger.debug(
+      logger.info(
           "There are too many unapplied logs [{}], wait for a while to avoid memory overflow",
           unappliedLogSize);
       try {
@@ -995,7 +995,7 @@ public abstract class RaftLogManager {
           || nextToCheckIndex > getCommittedEntryManager().getLastIndex()
           || (blockAppliedCommitIndex > 0 && blockAppliedCommitIndex < nextToCheckIndex)) {
         // avoid spinning
-        Thread.sleep(5);
+        Thread.sleep(0);
         return;
       }
       Log log = getCommittedEntryManager().getEntry(nextToCheckIndex);
@@ -1007,10 +1007,12 @@ public abstract class RaftLogManager {
             nextToCheckIndex);
         return;
       }
-      synchronized (log) {
-        while (!log.isApplied() && maxHaveAppliedCommitIndex < log.getCurrLogIndex()) {
-          // wait until the log is applied or a newer snapshot is installed
-          log.wait(5);
+      if (!log.isApplied() && maxHaveAppliedCommitIndex < log.getCurrLogIndex()) {
+        synchronized (log) {
+          while (!log.isApplied() && maxHaveAppliedCommitIndex < log.getCurrLogIndex()) {
+            // wait until the log is applied or a newer snapshot is installed
+            log.wait(1);
+          }
         }
       }
       synchronized (changeApplyCommitIndexCond) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
index 0def4d0..4390def 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
@@ -41,6 +41,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
+import static org.apache.iotdb.cluster.server.monitor.Timer.Statistic.RAFT_SENDER_SEQUENCE_LOG;
+
 public class AsynchronousSequencer implements LogSequencer {
 
   private static final Logger logger = LoggerFactory.getLogger(AsynchronousSequencer.class);
@@ -56,7 +58,7 @@ public class AsynchronousSequencer implements LogSequencer {
   public AsynchronousSequencer(RaftMember member, RaftLogManager logManager) {
     this.member = member;
     this.logManager = logManager;
-    unsequencedLogQueue = new ArrayBlockingQueue<>(4096);
+    unsequencedLogQueue = new ArrayBlockingQueue<>(40960, true);
     for (int i = 0; i < SEQUENCER_PARALLELISM; i++) {
       SEQUENCER_POOL.submit(this::sequenceTask);
     }
@@ -83,7 +85,9 @@ public class AsynchronousSequencer implements LogSequencer {
     long startTime;
     synchronized (logManager) {
       for (SendLogRequest sendLogRequest : sendLogRequests) {
+        long sequenceStartTime = RAFT_SENDER_SEQUENCE_LOG.getOperationStartTime();
         Log log = sendLogRequest.getVotingLog().getLog();
+        log.setSequenceStartTime(sequenceStartTime);
         log.setCurrLogTerm(member.getTerm().get());
         log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
         if (log instanceof PhysicalPlanLog) {
@@ -101,10 +105,13 @@ public class AsynchronousSequencer implements LogSequencer {
         sendLogRequest.setAppendEntryRequest(appendEntryRequest);
 
         startTime = Statistic.RAFT_SENDER_OFFER_LOG.getOperationStartTime();
+        Statistic.LOG_DISPATCHER_FROM_RECEIVE_TO_CREATE.calOperationCostTimeFromStart(
+            log.getReceiveTime());
         log.setCreateTime(System.nanoTime());
         member.getVotingLogList().insert(sendLogRequest.getVotingLog());
         member.getLogDispatcher().offer(sendLogRequest);
         Statistic.RAFT_SENDER_OFFER_LOG.calOperationCostTimeFromStart(startTime);
+        RAFT_SENDER_SEQUENCE_LOG.calOperationCostTimeFromStart(sequenceStartTime);
       }
     }
     sendLogRequests.clear();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
index b6c0df2..749a3ec 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
@@ -88,13 +88,13 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
     long resp = response.status;
 
     if (resp == RESPONSE_STRONG_ACCEPT) {
-      synchronized (log) {
+      synchronized (log.getStronglyAcceptedNodeIds()) {
         if (log.getWeaklyAcceptedNodeIds().size() + log.getStronglyAcceptedNodeIds().size()
             >= quorumSize) {
-          log.acceptedTime = System.nanoTime();
+          log.acceptedTime.set(System.nanoTime());
         }
         log.getStronglyAcceptedNodeIds().add(receiver.nodeIdentifier);
-        log.notifyAll();
+        log.getStronglyAcceptedNodeIds().notifyAll();
       }
       member
           .getVotingLogList()
@@ -117,17 +117,17 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
         receiverTerm.set(resp);
       }
       leaderShipStale.set(true);
-      synchronized (log) {
-        log.notifyAll();
+      synchronized (log.getStronglyAcceptedNodeIds()) {
+        log.getStronglyAcceptedNodeIds().notifyAll();
       }
     } else if (resp == RESPONSE_WEAK_ACCEPT) {
-      synchronized (log) {
+      synchronized (log.getStronglyAcceptedNodeIds()) {
+        log.getWeaklyAcceptedNodeIds().add(receiver.nodeIdentifier);
         if (log.getWeaklyAcceptedNodeIds().size() + log.getStronglyAcceptedNodeIds().size()
             >= quorumSize) {
-          log.acceptedTime = System.nanoTime();
+          log.acceptedTime.set(System.nanoTime());
         }
-        log.getWeaklyAcceptedNodeIds().add(receiver.nodeIdentifier);
-        log.notifyAll();
+        log.getStronglyAcceptedNodeIds().notifyAll();
       }
     } else {
       // e.g., Response.RESPONSE_LOG_MISMATCH
@@ -155,12 +155,12 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
   }
 
   private void onFail() {
-    synchronized (log) {
+    synchronized (log.getStronglyAcceptedNodeIds()) {
       failedDecreasingCounter--;
       if (failedDecreasingCounter <= 0) {
         // quorum members have failed, there is no need to wait for others
         log.getStronglyAcceptedNodeIds().add(Integer.MAX_VALUE);
-        log.notifyAll();
+        log.getStronglyAcceptedNodeIds().notifyAll();
       }
     }
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatServer.java
index afde282..95e38b1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatServer.java
@@ -68,13 +68,12 @@ public class MetaHeartbeatServer extends HeartbeatServer {
     if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
       return new TNonblockingServerSocket(
           new InetSocketAddress(
-              config.getInternalIp(),
-              config.getInternalMetaPort() + ClusterUtils.META_HEARTBEAT_PORT_OFFSET),
+              "0.0.0.0", config.getInternalMetaPort() + ClusterUtils.META_HEARTBEAT_PORT_OFFSET),
           getConnectionTimeoutInMS());
     } else {
       return new TServerSocket(
           new InetSocketAddress(
-              metaClusterServer.getMember().getThisNode().getInternalIp(),
+              "0.0.0.0",
               metaClusterServer.getMember().getThisNode().getMetaPort()
                   + ClusterUtils.META_HEARTBEAT_PORT_OFFSET));
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index cd54aa6..4a24d13 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
 import org.apache.iotdb.cluster.expr.VotingLogList;
 import org.apache.iotdb.cluster.log.CommitLogCallback;
 import org.apache.iotdb.cluster.log.CommitLogTask;
+import org.apache.iotdb.cluster.log.FragmentedLogDispatcher;
 import org.apache.iotdb.cluster.log.HardState;
 import org.apache.iotdb.cluster.log.IndirectLogDispatcher;
 import org.apache.iotdb.cluster.log.Log;
@@ -39,6 +40,7 @@ import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest;
 import org.apache.iotdb.cluster.log.LogParser;
 import org.apache.iotdb.cluster.log.VotingLog;
 import org.apache.iotdb.cluster.log.catchup.CatchUpTask;
+import org.apache.iotdb.cluster.log.logtypes.FragmentedLog;
 import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
 import org.apache.iotdb.cluster.log.manage.RaftLogManager;
 import org.apache.iotdb.cluster.log.sequencing.AsynchronousSequencer.Factory;
@@ -133,6 +135,7 @@ public abstract class RaftMember {
   public static boolean USE_INDIRECT_LOG_DISPATCHER = false;
   public static boolean ENABLE_WEAK_ACCEPTANCE = true;
   public static boolean ENABLE_COMMIT_RETURN = false;
+  public static boolean USE_CRAFT = false;
 
   protected static final LogSequencerFactory SEQUENCER_FACTORY =
       ClusterDescriptor.getInstance().getConfig().isUseAsyncSequencing()
@@ -1195,6 +1198,11 @@ public abstract class RaftMember {
       ((PhysicalPlanLog) log).setPlan(plan);
     }
 
+    if (USE_CRAFT && allNodes.size() > 2) {
+      log = new FragmentedLog(log, allNodes.size());
+    }
+    log.setReceiveTime(System.nanoTime());
+
     // just like processPlanLocally,we need to check the size of log
     //    if (log.serialize().capacity() + Integer.BYTES
     //        >= ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
@@ -1606,6 +1614,8 @@ public abstract class RaftMember {
     if (logDispatcher == null) {
       if (USE_INDIRECT_LOG_DISPATCHER) {
         logDispatcher = new IndirectLogDispatcher(this);
+      } else if (USE_CRAFT && allNodes.size() > 2) {
+        logDispatcher = new FragmentedLogDispatcher(this);
       } else {
         logDispatcher = new LogDispatcher(this);
       }
@@ -1613,63 +1623,92 @@ public abstract class RaftMember {
     return logDispatcher;
   }
 
+  @SuppressWarnings({"java:S2445"}) // safe synchronized
+  private void waitAppendResultLoop(VotingLog log, int quorumSize) {
+    int stronglyAcceptedNodeNum = log.getStronglyAcceptedNodeIds().size();
+    int weaklyAcceptedNodeNum = log.getWeaklyAcceptedNodeIds().size();
+    int totalAccepted = stronglyAcceptedNodeNum + weaklyAcceptedNodeNum;
+    long nextTimeToPrint = 1000;
+
+    long waitStart = System.currentTimeMillis();
+    long alreadyWait = 0;
+    while (stronglyAcceptedNodeNum < quorumSize
+        && (!ENABLE_WEAK_ACCEPTANCE
+            || (totalAccepted < quorumSize)
+            || votingLogList.size() > config.getMaxNumOfLogsInMem())
+        && alreadyWait < RaftServer.getWriteOperationTimeoutMS()
+        && !log.getStronglyAcceptedNodeIds().contains(Integer.MAX_VALUE)) {
+      long singleWaitTime = 0;
+      long singleWaitStart = System.nanoTime();
+      try {
+        synchronized (log.getStronglyAcceptedNodeIds()) {
+          log.getStronglyAcceptedNodeIds().wait(1);
+        }
+        singleWaitTime = (System.nanoTime() - singleWaitStart) / 1000000;
+        logger.debug("{} ends waiting", log);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        logger.warn("Unexpected interruption when sending a log", e);
+      }
+      alreadyWait = System.currentTimeMillis() - waitStart;
+      if (alreadyWait > nextTimeToPrint) {
+        logger.info(
+            "Still not receive enough votes for {}, strongly accepted {}, weakly "
+                + "accepted {}, voting logs {}, wait {}ms, wait to sequence {}ms, wait to enqueue "
+                + "{}ms, wait to accept "
+                + "{}ms",
+            log,
+            log.getStronglyAcceptedNodeIds(),
+            log.getWeaklyAcceptedNodeIds(),
+            votingLogList.size(),
+            singleWaitTime,
+            (log.getLog().getSequenceStartTime() - singleWaitStart) / 1000000,
+            (log.getLog().getEnqueueTime() - singleWaitStart) / 1000000,
+            (log.acceptedTime.get() - singleWaitStart) / 1000000);
+        nextTimeToPrint *= 2;
+      }
+      stronglyAcceptedNodeNum = log.getStronglyAcceptedNodeIds().size();
+      weaklyAcceptedNodeNum = log.getWeaklyAcceptedNodeIds().size();
+      totalAccepted = stronglyAcceptedNodeNum + weaklyAcceptedNodeNum;
+    }
+
+    if (alreadyWait > 15000) {
+      logger.info(
+          "Slow entry {}, strongly accepted {}, weakly " + "accepted {}, waited time {}ms",
+          log,
+          log.getStronglyAcceptedNodeIds(),
+          log.getWeaklyAcceptedNodeIds(),
+          alreadyWait);
+    }
+  }
+
   /**
    * wait until "voteCounter" counts down to zero, which means the quorum has received the log, or
    * one follower tells the node that it is no longer a valid leader, or a timeout is triggered.
    */
-  @SuppressWarnings({"java:S2445"}) // safe synchronized
   protected AppendLogResult waitAppendResult(
       VotingLog log, AtomicBoolean leaderShipStale, AtomicLong newLeaderTerm, int quorumSize) {
     // wait for the followers to vote
     long startTime = Timer.Statistic.RAFT_SENDER_VOTE_COUNTER.getOperationStartTime();
-    long nextTimeToPrint = 15000;
 
     int stronglyAcceptedNodeNum = log.getStronglyAcceptedNodeIds().size();
     int weaklyAcceptedNodeNum = log.getWeaklyAcceptedNodeIds().size();
     int totalAccepted = stronglyAcceptedNodeNum + weaklyAcceptedNodeNum;
 
-    synchronized (log) {
-      long waitStart = System.currentTimeMillis();
-      long alreadyWait = 0;
-      while (stronglyAcceptedNodeNum < quorumSize
-          && (!ENABLE_WEAK_ACCEPTANCE
-              || (totalAccepted < allNodes.size() - 1)
-              || votingLogList.size() > config.getMaxNumOfLogsInMem())
-          && alreadyWait < RaftServer.getWriteOperationTimeoutMS()
-          && !log.getStronglyAcceptedNodeIds().contains(Integer.MAX_VALUE)) {
-        try {
-          log.wait(0);
-          logger.debug("{} ends waiting", log);
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          logger.warn("Unexpected interruption when sending a log", e);
-        }
-        alreadyWait = System.currentTimeMillis() - waitStart;
-        if (alreadyWait > nextTimeToPrint) {
-          logger.info(
-              "Still not receive enough votes for {}, strongly accepted {}, weakly "
-                  + "accepted {}",
-              log,
-              log.getStronglyAcceptedNodeIds(),
-              log.getWeaklyAcceptedNodeIds());
-          nextTimeToPrint *= 2;
-        }
-        stronglyAcceptedNodeNum = log.getStronglyAcceptedNodeIds().size();
-        weaklyAcceptedNodeNum = log.getWeaklyAcceptedNodeIds().size();
-        totalAccepted = stronglyAcceptedNodeNum + weaklyAcceptedNodeNum;
-      }
-
-      if (alreadyWait > 15000) {
-        logger.info(
-            "Slow entry {}, strongly accepted {}, weakly " + "accepted {}, waited time {}ms",
-            log,
-            log.getStronglyAcceptedNodeIds(),
-            log.getWeaklyAcceptedNodeIds(),
-            alreadyWait);
-      }
+    if (stronglyAcceptedNodeNum < quorumSize
+        && (!ENABLE_WEAK_ACCEPTANCE
+            || (totalAccepted < quorumSize)
+            || votingLogList.size() > config.getMaxNumOfLogsInMem())
+        && !log.getStronglyAcceptedNodeIds().contains(Integer.MAX_VALUE)) {
+      waitAppendResultLoop(log, quorumSize);
     }
-    if (log.acceptedTime != 0) {
-      Statistic.RAFT_WAIT_AFTER_ACCEPTED.calOperationCostTimeFromStart(log.acceptedTime);
+
+    stronglyAcceptedNodeNum = log.getStronglyAcceptedNodeIds().size();
+    weaklyAcceptedNodeNum = log.getWeaklyAcceptedNodeIds().size();
+    totalAccepted = stronglyAcceptedNodeNum + weaklyAcceptedNodeNum;
+
+    if (log.acceptedTime.get() != 0) {
+      Statistic.RAFT_WAIT_AFTER_ACCEPTED.calOperationCostTimeFromStart(log.acceptedTime.get());
     }
     Timer.Statistic.RAFT_SENDER_VOTE_COUNTER.calOperationCostTimeFromStart(startTime);
 
@@ -1702,16 +1741,14 @@ public abstract class RaftMember {
     if (log.getCurrLogIndex() > logManager.getCommitLogIndex()) {
       startTime = Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT.getOperationStartTime();
       synchronized (logManager) {
+        Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT.calOperationCostTimeFromStart(
+            startTime);
         if (log.getCurrLogIndex() > logManager.getCommitLogIndex()) {
-          Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT.calOperationCostTimeFromStart(
-              startTime);
-
           startTime = Statistic.RAFT_SENDER_COMMIT_LOG_IN_MANAGER.getOperationStartTime();
           logManager.commitTo(log.getCurrLogIndex());
           Statistic.RAFT_SENDER_COMMIT_LOG_IN_MANAGER.calOperationCostTimeFromStart(startTime);
-
-          startTime = Statistic.RAFT_SENDER_EXIT_LOG_MANAGER.getOperationStartTime();
         }
+        startTime = Statistic.RAFT_SENDER_EXIT_LOG_MANAGER.getOperationStartTime();
       }
       Statistic.RAFT_SENDER_EXIT_LOG_MANAGER.calOperationCostTimeFromStart(startTime);
     }
@@ -1725,7 +1762,7 @@ public abstract class RaftMember {
       while (!log.isApplied()) {
         // wait until the log is applied
         try {
-          log.wait(0);
+          log.wait(1);
         } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
           throw new LogExecutionException(e);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
index c794211..c72550e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
@@ -86,6 +86,8 @@ public class Timer {
         true,
         META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
     // raft member - sender
+    RAFT_SENDER_SEQUENCE_LOG(
+        RAFT_MEMBER_SENDER, "sequence log", TIME_SCALE, true, META_GROUP_MEMBER_EXECUTE_NON_QUERY),
     RAFT_SENDER_APPEND_LOG(
         RAFT_MEMBER_SENDER,
         "locally append log",
@@ -257,6 +259,12 @@ public class Timer {
         META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
     LOG_DISPATCHER_LOG_BATCH_SIZE(
         LOG_DISPATCHER, "batch size", 1, true, META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    LOG_DISPATCHER_FROM_RECEIVE_TO_CREATE(
+        LOG_DISPATCHER,
+        "from receive to create",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
     LOG_DISPATCHER_FROM_CREATE_TO_SENT(
         LOG_DISPATCHER,
         "from create to sent",
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/encode/rs/Galois.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/encode/rs/Galois.java
new file mode 100644
index 0000000..5f0b8ac
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/encode/rs/Galois.java
@@ -0,0 +1,722 @@
+/**
+ * 8-bit Galois Field
+ *
+ * <p>Copyright 2015, Backblaze, Inc. All rights reserved.
+ */
+package org.apache.iotdb.cluster.utils.encode.rs;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * 8-bit Galois Field
+ *
+ * <p>This class implements multiplication, division, addition, subtraction, and exponentiation.
+ *
+ * <p>The multiplication operation is in the inner loop of erasure coding, so it's been optimized.
+ * Having the class be "final" helps a little, and having the EXP_TABLE repeat the data, so there's
+ * no need to bound the sum of two logarithms to 255 helps a lot.
+ */
+public final class Galois {
+
+  /** The number of elements in the field. */
+  public static final int FIELD_SIZE = 256;
+
+  /**
+   * The polynomial used to generate the logarithm table.
+   *
+   * <p>There are a number of polynomials that work to generate a Galois field of 256 elements. The
+   * choice is arbitrary, and we just use the first one.
+   *
+   * <p>The possibilities are: 29, 43, 45, 77, 95, 99, 101, 105, 113, 135, 141, 169, 195, 207, 231,
+   * and 245.
+   */
+  public static final int GENERATING_POLYNOMIAL = 29;
+
+  /**
+   * Mapping from members of the Galois Field to their integer logarithms. The entry for 0 is
+   * meaningless because there is no log of 0.
+   *
+   * <p>This array is shorts, not bytes, so that they can be used directly to index arrays without
+   * casting. The values (except the non-value at index 0) are all really bytes, so they range from
+   * 0 to 255.
+   *
+   * <p>This table was generated by java_tables.py, and the unit tests check it against the Java
+   * implementation.
+   */
+  public static final short[] LOG_TABLE =
+      new short[] {
+        -1, 0, 1, 25, 2, 50, 26, 198,
+        3, 223, 51, 238, 27, 104, 199, 75,
+        4, 100, 224, 14, 52, 141, 239, 129,
+        28, 193, 105, 248, 200, 8, 76, 113,
+        5, 138, 101, 47, 225, 36, 15, 33,
+        53, 147, 142, 218, 240, 18, 130, 69,
+        29, 181, 194, 125, 106, 39, 249, 185,
+        201, 154, 9, 120, 77, 228, 114, 166,
+        6, 191, 139, 98, 102, 221, 48, 253,
+        226, 152, 37, 179, 16, 145, 34, 136,
+        54, 208, 148, 206, 143, 150, 219, 189,
+        241, 210, 19, 92, 131, 56, 70, 64,
+        30, 66, 182, 163, 195, 72, 126, 110,
+        107, 58, 40, 84, 250, 133, 186, 61,
+        202, 94, 155, 159, 10, 21, 121, 43,
+        78, 212, 229, 172, 115, 243, 167, 87,
+        7, 112, 192, 247, 140, 128, 99, 13,
+        103, 74, 222, 237, 49, 197, 254, 24,
+        227, 165, 153, 119, 38, 184, 180, 124,
+        17, 68, 146, 217, 35, 32, 137, 46,
+        55, 63, 209, 91, 149, 188, 207, 205,
+        144, 135, 151, 178, 220, 252, 190, 97,
+        242, 86, 211, 171, 20, 42, 93, 158,
+        132, 60, 57, 83, 71, 109, 65, 162,
+        31, 45, 67, 216, 183, 123, 164, 118,
+        196, 23, 73, 236, 127, 12, 111, 246,
+        108, 161, 59, 82, 41, 157, 85, 170,
+        251, 96, 134, 177, 187, 204, 62, 90,
+        203, 89, 95, 176, 156, 169, 160, 81,
+        11, 245, 22, 235, 122, 117, 44, 215,
+        79, 174, 213, 233, 230, 231, 173, 232,
+        116, 214, 244, 234, 168, 80, 88, 175
+      };
+
+  /**
+   * Inverse of the logarithm table. Maps integer logarithms to members of the field. There is no
+   * entry for 255 because the highest log is 254.
+   *
+   * <p>This table was generated by java_tables.py
+   */
+  static final byte[] EXP_TABLE =
+      new byte[] {
+        1,
+        2,
+        4,
+        8,
+        16,
+        32,
+        64,
+        -128,
+        29,
+        58,
+        116,
+        -24,
+        -51,
+        -121,
+        19,
+        38,
+        76,
+        -104,
+        45,
+        90,
+        -76,
+        117,
+        -22,
+        -55,
+        -113,
+        3,
+        6,
+        12,
+        24,
+        48,
+        96,
+        -64,
+        -99,
+        39,
+        78,
+        -100,
+        37,
+        74,
+        -108,
+        53,
+        106,
+        -44,
+        -75,
+        119,
+        -18,
+        -63,
+        -97,
+        35,
+        70,
+        -116,
+        5,
+        10,
+        20,
+        40,
+        80,
+        -96,
+        93,
+        -70,
+        105,
+        -46,
+        -71,
+        111,
+        -34,
+        -95,
+        95,
+        -66,
+        97,
+        -62,
+        -103,
+        47,
+        94,
+        -68,
+        101,
+        -54,
+        -119,
+        15,
+        30,
+        60,
+        120,
+        -16,
+        -3,
+        -25,
+        -45,
+        -69,
+        107,
+        -42,
+        -79,
+        127,
+        -2,
+        -31,
+        -33,
+        -93,
+        91,
+        -74,
+        113,
+        -30,
+        -39,
+        -81,
+        67,
+        -122,
+        17,
+        34,
+        68,
+        -120,
+        13,
+        26,
+        52,
+        104,
+        -48,
+        -67,
+        103,
+        -50,
+        -127,
+        31,
+        62,
+        124,
+        -8,
+        -19,
+        -57,
+        -109,
+        59,
+        118,
+        -20,
+        -59,
+        -105,
+        51,
+        102,
+        -52,
+        -123,
+        23,
+        46,
+        92,
+        -72,
+        109,
+        -38,
+        -87,
+        79,
+        -98,
+        33,
+        66,
+        -124,
+        21,
+        42,
+        84,
+        -88,
+        77,
+        -102,
+        41,
+        82,
+        -92,
+        85,
+        -86,
+        73,
+        -110,
+        57,
+        114,
+        -28,
+        -43,
+        -73,
+        115,
+        -26,
+        -47,
+        -65,
+        99,
+        -58,
+        -111,
+        63,
+        126,
+        -4,
+        -27,
+        -41,
+        -77,
+        123,
+        -10,
+        -15,
+        -1,
+        -29,
+        -37,
+        -85,
+        75,
+        -106,
+        49,
+        98,
+        -60,
+        -107,
+        55,
+        110,
+        -36,
+        -91,
+        87,
+        -82,
+        65,
+        -126,
+        25,
+        50,
+        100,
+        -56,
+        -115,
+        7,
+        14,
+        28,
+        56,
+        112,
+        -32,
+        -35,
+        -89,
+        83,
+        -90,
+        81,
+        -94,
+        89,
+        -78,
+        121,
+        -14,
+        -7,
+        -17,
+        -61,
+        -101,
+        43,
+        86,
+        -84,
+        69,
+        -118,
+        9,
+        18,
+        36,
+        72,
+        -112,
+        61,
+        122,
+        -12,
+        -11,
+        -9,
+        -13,
+        -5,
+        -21,
+        -53,
+        -117,
+        11,
+        22,
+        44,
+        88,
+        -80,
+        125,
+        -6,
+        -23,
+        -49,
+        -125,
+        27,
+        54,
+        108,
+        -40,
+        -83,
+        71,
+        -114,
+        // Repeat the table a second time, so multiply()
+        // does not have to check bounds.
+        1,
+        2,
+        4,
+        8,
+        16,
+        32,
+        64,
+        -128,
+        29,
+        58,
+        116,
+        -24,
+        -51,
+        -121,
+        19,
+        38,
+        76,
+        -104,
+        45,
+        90,
+        -76,
+        117,
+        -22,
+        -55,
+        -113,
+        3,
+        6,
+        12,
+        24,
+        48,
+        96,
+        -64,
+        -99,
+        39,
+        78,
+        -100,
+        37,
+        74,
+        -108,
+        53,
+        106,
+        -44,
+        -75,
+        119,
+        -18,
+        -63,
+        -97,
+        35,
+        70,
+        -116,
+        5,
+        10,
+        20,
+        40,
+        80,
+        -96,
+        93,
+        -70,
+        105,
+        -46,
+        -71,
+        111,
+        -34,
+        -95,
+        95,
+        -66,
+        97,
+        -62,
+        -103,
+        47,
+        94,
+        -68,
+        101,
+        -54,
+        -119,
+        15,
+        30,
+        60,
+        120,
+        -16,
+        -3,
+        -25,
+        -45,
+        -69,
+        107,
+        -42,
+        -79,
+        127,
+        -2,
+        -31,
+        -33,
+        -93,
+        91,
+        -74,
+        113,
+        -30,
+        -39,
+        -81,
+        67,
+        -122,
+        17,
+        34,
+        68,
+        -120,
+        13,
+        26,
+        52,
+        104,
+        -48,
+        -67,
+        103,
+        -50,
+        -127,
+        31,
+        62,
+        124,
+        -8,
+        -19,
+        -57,
+        -109,
+        59,
+        118,
+        -20,
+        -59,
+        -105,
+        51,
+        102,
+        -52,
+        -123,
+        23,
+        46,
+        92,
+        -72,
+        109,
+        -38,
+        -87,
+        79,
+        -98,
+        33,
+        66,
+        -124,
+        21,
+        42,
+        84,
+        -88,
+        77,
+        -102,
+        41,
+        82,
+        -92,
+        85,
+        -86,
+        73,
+        -110,
+        57,
+        114,
+        -28,
+        -43,
+        -73,
+        115,
+        -26,
+        -47,
+        -65,
+        99,
+        -58,
+        -111,
+        63,
+        126,
+        -4,
+        -27,
+        -41,
+        -77,
+        123,
+        -10,
+        -15,
+        -1,
+        -29,
+        -37,
+        -85,
+        75,
+        -106,
+        49,
+        98,
+        -60,
+        -107,
+        55,
+        110,
+        -36,
+        -91,
+        87,
+        -82,
+        65,
+        -126,
+        25,
+        50,
+        100,
+        -56,
+        -115,
+        7,
+        14,
+        28,
+        56,
+        112,
+        -32,
+        -35,
+        -89,
+        83,
+        -90,
+        81,
+        -94,
+        89,
+        -78,
+        121,
+        -14,
+        -7,
+        -17,
+        -61,
+        -101,
+        43,
+        86,
+        -84,
+        69,
+        -118,
+        9,
+        18,
+        36,
+        72,
+        -112,
+        61,
+        122,
+        -12,
+        -11,
+        -9,
+        -13,
+        -5,
+        -21,
+        -53,
+        -117,
+        11,
+        22,
+        44,
+        88,
+        -80,
+        125,
+        -6,
+        -23,
+        -49,
+        -125,
+        27,
+        54,
+        108,
+        -40,
+        -83,
+        71,
+        -114
+      };
+
+  /**
+   * Adds two elements of the field. If you're in an inner loop, you should inline this function:
+   * it's just XOR.
+   */
+  public static byte add(byte a, byte b) {
+    return (byte) (a ^ b);
+  }
+
+  /**
+   * Inverse of addition. If you're in an inner loop, you should inline this function: it's just
+   * XOR.
+   */
+  public static byte subtract(byte a, byte b) {
+    return (byte) (a ^ b);
+  }
+
+  /** Multiplies to elements of the field. */
+  public static byte multiply(byte a, byte b) {
+    if (a == 0 || b == 0) {
+      return 0;
+    } else {
+      int logA = LOG_TABLE[a & 0xFF];
+      int logB = LOG_TABLE[b & 0xFF];
+      int logResult = logA + logB;
+      return EXP_TABLE[logResult];
+    }
+  }
+
+  /** Inverse of multiplication. */
+  public static byte divide(byte a, byte b) {
+    if (a == 0) {
+      return 0;
+    }
+    if (b == 0) {
+      throw new IllegalArgumentException("Argument 'divisor' is 0");
+    }
+    int logA = LOG_TABLE[a & 0xFF];
+    int logB = LOG_TABLE[b & 0xFF];
+    int logResult = logA - logB;
+    if (logResult < 0) {
+      logResult += 255;
+    }
+    return EXP_TABLE[logResult];
+  }
+
+  /**
+   * Computes a**n.
+   *
+   * <p>The result will be the same as multiplying a times itself n times.
+   *
+   * @param a A member of the field.
+   * @param n A plain-old integer.
+   * @return The result of multiplying a by itself n times.
+   */
+  public static byte exp(byte a, int n) {
+    if (n == 0) {
+      return 1;
+    } else if (a == 0) {
+      return 0;
+    } else {
+      int logA = LOG_TABLE[a & 0xFF];
+      int logResult = logA * n;
+      while (255 <= logResult) {
+        logResult -= 255;
+      }
+      return EXP_TABLE[logResult];
+    }
+  }
+
+  /** Generates a logarithm table given a starting polynomial. */
+  public static short[] generateLogTable(int polynomial) {
+    short[] result = new short[FIELD_SIZE];
+    for (int i = 0; i < FIELD_SIZE; i++) {
+      result[i] = -1; // -1 means "not set"
+    }
+    int b = 1;
+    for (int log = 0; log < FIELD_SIZE - 1; log++) {
+      if (result[b] != -1) {
+        throw new RuntimeException("BUG: duplicate logarithm (bad polynomial?)");
+      }
+      result[b] = (short) log;
+      b = (b << 1);
+      if (FIELD_SIZE <= b) {
+        b = ((b - FIELD_SIZE) ^ polynomial);
+      }
+    }
+    return result;
+  }
+
+  /** Generates the inverse log table. */
+  public static byte[] generateExpTable(short[] logTable) {
+    final byte[] result = new byte[FIELD_SIZE * 2 - 2];
+    for (int i = 1; i < FIELD_SIZE; i++) {
+      int log = logTable[i];
+      result[log] = (byte) i;
+      result[log + FIELD_SIZE - 1] = (byte) i;
+    }
+    return result;
+  }
+
+  /**
+   * Returns a list of all polynomials that can be used to generate the field.
+   *
+   * <p>This is never used in the code; it's just here for completeness.
+   */
+  public static Integer[] allPossiblePolynomials() {
+    List<Integer> result = new ArrayList<Integer>();
+    for (int i = 0; i < FIELD_SIZE; i++) {
+      try {
+        generateLogTable(i);
+        result.add(i);
+      } catch (RuntimeException e) {
+        // this one didn't work
+      }
+    }
+    return result.toArray(new Integer[result.size()]);
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/encode/rs/Matrix.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/encode/rs/Matrix.java
new file mode 100644
index 0000000..df345b1
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/encode/rs/Matrix.java
@@ -0,0 +1,312 @@
+/**
+ * Matrix Algebra over an 8-bit Galois Field
+ *
+ * <p>Copyright 2015, Backblaze, Inc.
+ */
+package org.apache.iotdb.cluster.utils.encode.rs;
+
+/**
+ * A matrix over the 8-bit Galois field.
+ *
+ * <p>This class is not performance-critical, so the implementations are simple and straightforward.
+ */
+public class Matrix {
+
+  /** The number of rows in the matrix. */
+  private final int rows;
+
+  /** The number of columns in the matrix. */
+  private final int columns;
+
+  /**
+   * The data in the matrix, in row major form.
+   *
+   * <p>To get element (r, c): data[r][c]
+   *
+   * <p>Because this this is computer science, and not math, the indices for both the row and column
+   * start at 0.
+   */
+  private final byte[][] data;
+
+  /**
+   * Initialize a matrix of zeros.
+   *
+   * @param initRows The number of rows in the matrix.
+   * @param initColumns The number of columns in the matrix.
+   */
+  public Matrix(int initRows, int initColumns) {
+    rows = initRows;
+    columns = initColumns;
+    data = new byte[rows][];
+    for (int r = 0; r < rows; r++) {
+      data[r] = new byte[columns];
+    }
+  }
+
+  /** Initializes a matrix with the given row-major data. */
+  public Matrix(byte[][] initData) {
+    rows = initData.length;
+    columns = initData[0].length;
+    data = new byte[rows][];
+    for (int r = 0; r < rows; r++) {
+      if (initData[r].length != columns) {
+        throw new IllegalArgumentException("Not all rows have the same number of columns");
+      }
+      data[r] = new byte[columns];
+      for (int c = 0; c < columns; c++) {
+        data[r][c] = initData[r][c];
+      }
+    }
+  }
+
+  /** Returns an identity matrix of the given size. */
+  public static Matrix identity(int size) {
+    Matrix result = new Matrix(size, size);
+    for (int i = 0; i < size; i++) {
+      result.set(i, i, (byte) 1);
+    }
+    return result;
+  }
+
+  /**
+   * Returns a human-readable string of the matrix contents.
+   *
+   * <p>Example: [[1, 2], [3, 4]]
+   */
+  @Override
+  public String toString() {
+    StringBuilder result = new StringBuilder();
+    result.append('[');
+    for (int r = 0; r < rows; r++) {
+      if (r != 0) {
+        result.append(", ");
+      }
+      result.append('[');
+      for (int c = 0; c < columns; c++) {
+        if (c != 0) {
+          result.append(", ");
+        }
+        result.append(data[r][c] & 0xFF);
+      }
+      result.append(']');
+    }
+    result.append(']');
+    return result.toString();
+  }
+
+  /**
+   * Returns a human-readable string of the matrix contents.
+   *
+   * <p>Example: 00 01 02 03 04 05 06 07 08 09 0a 0b
+   */
+  public String toBigString() {
+    StringBuilder result = new StringBuilder();
+    for (int r = 0; r < rows; r++) {
+      for (int c = 0; c < columns; c++) {
+        int value = get(r, c);
+        if (value < 0) {
+          value += 256;
+        }
+        result.append(String.format("%02x ", value));
+      }
+      result.append("\n");
+    }
+    return result.toString();
+  }
+
+  /** Returns the number of columns in this matrix. */
+  public int getColumns() {
+    return columns;
+  }
+
+  /** Returns the number of rows in this matrix. */
+  public int getRows() {
+    return rows;
+  }
+
+  /** Returns the value at row r, column c. */
+  public byte get(int r, int c) {
+    if (r < 0 || rows <= r) {
+      throw new IllegalArgumentException("Row index out of range: " + r);
+    }
+    if (c < 0 || columns <= c) {
+      throw new IllegalArgumentException("Column index out of range: " + c);
+    }
+    return data[r][c];
+  }
+
+  /** Sets the value at row r, column c. */
+  public void set(int r, int c, byte value) {
+    if (r < 0 || rows <= r) {
+      throw new IllegalArgumentException("Row index out of range: " + r);
+    }
+    if (c < 0 || columns <= c) {
+      throw new IllegalArgumentException("Column index out of range: " + c);
+    }
+    data[r][c] = value;
+  }
+
+  /** Returns true iff this matrix is identical to the other. */
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof Matrix)) {
+      return false;
+    }
+    for (int r = 0; r < rows; r++) {
+      if (!data[r].equals(((Matrix) other).data[r])) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /** Multiplies this matrix (the one on the left) by another matrix (the one on the right). */
+  public Matrix times(Matrix right) {
+    if (getColumns() != right.getRows()) {
+      throw new IllegalArgumentException(
+          "Columns on left ("
+              + getColumns()
+              + ") "
+              + "is different than rows on right ("
+              + right.getRows()
+              + ")");
+    }
+    Matrix result = new Matrix(getRows(), right.getColumns());
+    for (int r = 0; r < getRows(); r++) {
+      for (int c = 0; c < right.getColumns(); c++) {
+        byte value = 0;
+        for (int i = 0; i < getColumns(); i++) {
+          value ^= Galois.multiply(get(r, i), right.get(i, c));
+        }
+        result.set(r, c, value);
+      }
+    }
+    return result;
+  }
+
+  /** Returns the concatenation of this matrix and the matrix on the right. */
+  public Matrix augment(Matrix right) {
+    if (rows != right.rows) {
+      throw new IllegalArgumentException("Matrices don't have the same number of rows");
+    }
+    Matrix result = new Matrix(rows, columns + right.columns);
+    for (int r = 0; r < rows; r++) {
+      for (int c = 0; c < columns; c++) {
+        result.data[r][c] = data[r][c];
+      }
+      for (int c = 0; c < right.columns; c++) {
+        result.data[r][columns + c] = right.data[r][c];
+      }
+    }
+    return result;
+  }
+
+  /** Returns a part of this matrix. */
+  public Matrix submatrix(int rmin, int cmin, int rmax, int cmax) {
+    Matrix result = new Matrix(rmax - rmin, cmax - cmin);
+    for (int r = rmin; r < rmax; r++) {
+      for (int c = cmin; c < cmax; c++) {
+        result.data[r - rmin][c - cmin] = data[r][c];
+      }
+    }
+    return result;
+  }
+
+  /** Returns one row of the matrix as a byte array. */
+  public byte[] getRow(int row) {
+    byte[] result = new byte[columns];
+    for (int c = 0; c < columns; c++) {
+      result[c] = get(row, c);
+    }
+    return result;
+  }
+
+  /** Exchanges two rows in the matrix. */
+  public void swapRows(int r1, int r2) {
+    if (r1 < 0 || rows <= r1 || r2 < 0 || rows <= r2) {
+      throw new IllegalArgumentException("Row index out of range");
+    }
+    byte[] tmp = data[r1];
+    data[r1] = data[r2];
+    data[r2] = tmp;
+  }
+
+  /**
+   * Returns the inverse of this matrix.
+   *
+   * @throws IllegalArgumentException when the matrix is singular and doesn't have an inverse.
+   */
+  public Matrix invert() {
+    // Sanity check.
+    if (rows != columns) {
+      throw new IllegalArgumentException("Only square matrices can be inverted");
+    }
+
+    // Create a working matrix by augmenting this one with
+    // an identity matrix on the right.
+    Matrix work = augment(identity(rows));
+
+    // Do Gaussian elimination to transform the left half into
+    // an identity matrix.
+    work.gaussianElimination();
+
+    // The right half is now the inverse.
+    return work.submatrix(0, rows, columns, columns * 2);
+  }
+
+  /**
+   * Does the work of matrix inversion.
+   *
+   * <p>Assumes that this is an r by 2r matrix.
+   */
+  private void gaussianElimination() {
+    // Clear out the part below the main diagonal and scale the main
+    // diagonal to be 1.
+    for (int r = 0; r < rows; r++) {
+      // If the element on the diagonal is 0, find a row below
+      // that has a non-zero and swap them.
+      if (data[r][r] == (byte) 0) {
+        for (int rowBelow = r + 1; rowBelow < rows; rowBelow++) {
+          if (data[rowBelow][r] != 0) {
+            swapRows(r, rowBelow);
+            break;
+          }
+        }
+      }
+      // If we couldn't find one, the matrix is singular.
+      if (data[r][r] == (byte) 0) {
+        throw new IllegalArgumentException("Matrix is singular");
+      }
+      // Scale to 1.
+      if (data[r][r] != (byte) 1) {
+        byte scale = Galois.divide((byte) 1, data[r][r]);
+        for (int c = 0; c < columns; c++) {
+          data[r][c] = Galois.multiply(data[r][c], scale);
+        }
+      }
+      // Make everything below the 1 be a 0 by subtracting
+      // a multiple of it.  (Subtraction and addition are
+      // both exclusive or in the Galois field.)
+      for (int rowBelow = r + 1; rowBelow < rows; rowBelow++) {
+        if (data[rowBelow][r] != (byte) 0) {
+          byte scale = data[rowBelow][r];
+          for (int c = 0; c < columns; c++) {
+            data[rowBelow][c] ^= Galois.multiply(scale, data[r][c]);
+          }
+        }
+      }
+    }
+
+    // Now clear the part above the main diagonal.
+    for (int d = 0; d < rows; d++) {
+      for (int rowAbove = 0; rowAbove < d; rowAbove++) {
+        if (data[rowAbove][d] != (byte) 0) {
+          byte scale = data[rowAbove][d];
+          for (int c = 0; c < columns; c++) {
+            data[rowAbove][c] ^= Galois.multiply(scale, data[d][c]);
+          }
+        }
+      }
+    }
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/encode/rs/ReedSolomon.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/encode/rs/ReedSolomon.java
new file mode 100644
index 0000000..e312f3c
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/encode/rs/ReedSolomon.java
@@ -0,0 +1,359 @@
+/**
+ * Reed-Solomon Coding over 8-bit values.
+ *
+ * <p>Copyright 2015, Backblaze, Inc.
+ */
+package org.apache.iotdb.cluster.utils.encode.rs;
+
+/** Reed-Solomon Coding over 8-bit values. */
+public class ReedSolomon {
+
+  private final int dataShardCount;
+  private final int parityShardCount;
+  private final int totalShardCount;
+  private final Matrix matrix;
+
+  /**
+   * Rows from the matrix for encoding parity, each one as its own byte array to allow for efficient
+   * access while encoding.
+   */
+  private final byte[][] parityRows;
+
+  /** Initializes a new encoder/decoder. */
+  public ReedSolomon(int dataShardCount, int parityShardCount) {
+    this.dataShardCount = dataShardCount;
+    this.parityShardCount = parityShardCount;
+    this.totalShardCount = dataShardCount + parityShardCount;
+    matrix = buildMatrix(dataShardCount, this.totalShardCount);
+    parityRows = new byte[parityShardCount][];
+    for (int i = 0; i < parityShardCount; i++) {
+      parityRows[i] = matrix.getRow(dataShardCount + i);
+    }
+  }
+
+  /** Returns the number of data shards. */
+  public int getDataShardCount() {
+    return dataShardCount;
+  }
+
+  /** Returns the number of parity shards. */
+  public int getParityShardCount() {
+    return parityShardCount;
+  }
+
+  /** Returns the total number of shards. */
+  public int getTotalShardCount() {
+    return totalShardCount;
+  }
+
+  /**
+   * Encodes parity for a set of data shards.
+   *
+   * @param shards An array containing data shards followed by parity shards. Each shard is a byte
+   *     array, and they must all be the same size.
+   * @param offset The index of the first byte in each shard to encode.
+   * @param byteCount The number of bytes to encode in each shard.
+   */
+  public void encodeParity(byte[][] shards, int offset, int byteCount) {
+    // Check arguments.
+    checkBuffersAndSizes(shards, offset, byteCount);
+
+    // Build the array of output buffers.
+    byte[][] outputs = new byte[parityShardCount][];
+    for (int i = 0; i < parityShardCount; i++) {
+      outputs[i] = shards[dataShardCount + i];
+    }
+
+    // Do the coding.
+    codeSomeShards(parityRows, shards, outputs, parityShardCount, offset, byteCount);
+  }
+
+  /**
+   * Returns true if the parity shards contain the right data.
+   *
+   * @param shards An array containing data shards followed by parity shards. Each shard is a byte
+   *     array, and they must all be the same size.
+   * @param firstByte The index of the first byte in each shard to check.
+   * @param byteCount The number of bytes to check in each shard.
+   */
+  public boolean isParityCorrect(byte[][] shards, int firstByte, int byteCount) {
+    // Check arguments.
+    checkBuffersAndSizes(shards, firstByte, byteCount);
+
+    // Build the array of buffers being checked.
+    byte[][] toCheck = new byte[parityShardCount][];
+    for (int i = 0; i < parityShardCount; i++) {
+      toCheck[i] = shards[dataShardCount + i];
+    }
+
+    // Do the checking.
+    return checkSomeShards(parityRows, shards, toCheck, parityShardCount, firstByte, byteCount);
+  }
+
+  /**
+   * Given a list of shards, some of which contain data, fills in the ones that don't have data.
+   *
+   * <p>Quickly does nothing if all of the shards are present.
+   *
+   * <p>If any shards are missing (based on the flags in shardsPresent), the data in those shards is
+   * recomputed and filled in.
+   */
+  public void decodeMissing(
+      byte[][] shards, boolean[] shardPresent, final int offset, final int byteCount) {
+    // Check arguments.
+    checkBuffersAndSizes(shards, offset, byteCount);
+
+    // Quick check: are all of the shards present?  If so, there's
+    // nothing to do.
+    int numberPresent = 0;
+    for (int i = 0; i < totalShardCount; i++) {
+      if (shardPresent[i]) {
+        numberPresent += 1;
+      }
+    }
+    if (numberPresent == totalShardCount) {
+      // Cool.  All of the shards data data.  We don't
+      // need to do anything.
+      return;
+    }
+
+    // More complete sanity check
+    if (numberPresent < dataShardCount) {
+      throw new IllegalArgumentException("Not enough shards present");
+    }
+
+    // Pull out the rows of the matrix that correspond to the
+    // shards that we have and build a square matrix.  This
+    // matrix could be used to generate the shards that we have
+    // from the original data.
+    //
+    // Also, pull out an array holding just the shards that
+    // correspond to the rows of the submatrix.  These shards
+    // will be the input to the decoding process that re-creates
+    // the missing data shards.
+    Matrix subMatrix = new Matrix(dataShardCount, dataShardCount);
+    byte[][] subShards = new byte[dataShardCount][];
+    {
+      int subMatrixRow = 0;
+      for (int matrixRow = 0;
+          matrixRow < totalShardCount && subMatrixRow < dataShardCount;
+          matrixRow++) {
+        if (shardPresent[matrixRow]) {
+          for (int c = 0; c < dataShardCount; c++) {
+            subMatrix.set(subMatrixRow, c, matrix.get(matrixRow, c));
+          }
+          subShards[subMatrixRow] = shards[matrixRow];
+          subMatrixRow += 1;
+        }
+      }
+    }
+
+    // Invert the matrix, so we can go from the encoded shards
+    // back to the original data.  Then pull out the row that
+    // generates the shard that we want to decode.  Note that
+    // since this matrix maps back to the orginal data, it can
+    // be used to create a data shard, but not a parity shard.
+    Matrix dataDecodeMatrix = subMatrix.invert();
+
+    // Re-create any data shards that were missing.
+    //
+    // The input to the coding is all of the shards we actually
+    // have, and the output is the missing data shards.  The computation
+    // is done using the special decode matrix we just built.
+    byte[][] outputs = new byte[parityShardCount][];
+    byte[][] matrixRows = new byte[parityShardCount][];
+    int outputCount = 0;
+    for (int iShard = 0; iShard < dataShardCount; iShard++) {
+      if (!shardPresent[iShard]) {
+        outputs[outputCount] = shards[iShard];
+        matrixRows[outputCount] = dataDecodeMatrix.getRow(iShard);
+        outputCount += 1;
+      }
+    }
+    codeSomeShards(matrixRows, subShards, outputs, outputCount, offset, byteCount);
+
+    // Now that we have all of the data shards intact, we can
+    // compute any of the parity that is missing.
+    //
+    // The input to the coding is ALL of the data shards, including
+    // any that we just calculated.  The output is whichever of the
+    // data shards were missing.
+    outputCount = 0;
+    for (int iShard = dataShardCount; iShard < totalShardCount; iShard++) {
+      if (!shardPresent[iShard]) {
+        outputs[outputCount] = shards[iShard];
+        matrixRows[outputCount] = parityRows[iShard - dataShardCount];
+        outputCount += 1;
+      }
+    }
+    codeSomeShards(matrixRows, shards, outputs, outputCount, offset, byteCount);
+  }
+
+  /** Checks the consistency of arguments passed to public methods. */
+  private void checkBuffersAndSizes(byte[][] shards, int offset, int byteCount) {
+    // The number of buffers should be equal to the number of
+    // data shards plus the number of parity shards.
+    if (shards.length != totalShardCount) {
+      throw new IllegalArgumentException("wrong number of shards: " + shards.length);
+    }
+
+    // All of the shard buffers should be the same length.
+    int shardLength = shards[0].length;
+    for (int i = 1; i < shards.length; i++) {
+      if (shards[i].length != shardLength) {
+        throw new IllegalArgumentException("Shards are different sizes");
+      }
+    }
+
+    // The offset and byteCount must be non-negative and fit in the buffers.
+    if (offset < 0) {
+      throw new IllegalArgumentException("offset is negative: " + offset);
+    }
+    if (byteCount < 0) {
+      throw new IllegalArgumentException("byteCount is negative: " + byteCount);
+    }
+    if (shardLength < offset + byteCount) {
+      throw new IllegalArgumentException("buffers to small: " + byteCount + offset);
+    }
+  }
+
+  /**
+   * Multiplies a subset of rows from a coding matrix by a full set of input shards to produce some
+   * output shards.
+   *
+   * @param matrixRows The rows from the matrix to use.
+   * @param inputs An array of byte arrays, each of which is one input shard. The inputs array may
+   *     have extra buffers after the ones that are used. They will be ignored. The number of inputs
+   *     used is determined by the length of the each matrix row.
+   * @param outputs Byte arrays where the computed shards are stored. The outputs array may also
+   *     have extra, unused, elements at the end. The number of outputs computed, and the number of
+   *     matrix rows used, is determined by outputCount.
+   * @param outputCount The number of outputs to compute.
+   * @param offset The index in the inputs and output of the first byte to process.
+   * @param byteCount The number of bytes to process.
+   */
+  private void codeSomeShards(
+      final byte[][] matrixRows,
+      final byte[][] inputs,
+      final byte[][] outputs,
+      final int outputCount,
+      final int offset,
+      final int byteCount) {
+
+    // This is the inner loop.  It needs to be fast.  Be careful
+    // if you change it.
+    //
+    // Note that dataShardCount is final in the class, so the
+    // compiler can load it just once, before the loop.  Explicitly
+    // adding a local variable does not make it faster.
+    //
+    // I have tried inlining Galois.multiply(), but it doesn't
+    // make things any faster.  The JIT compiler is known to inline
+    // methods, so it's probably already doing so.
+    //
+    // This method has been timed and compared with a C implementation.
+    // This Java version is only about 10% slower than C.
+
+    for (int iByte = offset; iByte < offset + byteCount; iByte++) {
+      for (int iRow = 0; iRow < outputCount; iRow++) {
+        byte[] matrixRow = matrixRows[iRow];
+        int value = 0;
+        for (int c = 0; c < dataShardCount; c++) {
+          value ^= Galois.multiply(matrixRow[c], inputs[c][iByte]);
+        }
+        outputs[iRow][iByte] = (byte) value;
+      }
+    }
+  }
+
+  /**
+   * Multiplies a subset of rows from a coding matrix by a full set of input shards to produce some
+   * output shards, and checks that the the data is those shards matches what's expected.
+   *
+   * @param matrixRows The rows from the matrix to use.
+   * @param inputs An array of byte arrays, each of which is one input shard. The inputs array may
+   *     have extra buffers after the ones that are used. They will be ignored. The number of inputs
+   *     used is determined by the length of the each matrix row.
+   * @param toCheck Byte arrays where the computed shards are stored. The outputs array may also
+   *     have extra, unused, elements at the end. The number of outputs computed, and the number of
+   *     matrix rows used, is determined by outputCount.
+   * @param checkCount The number of outputs to compute.
+   * @param offset The index in the inputs and output of the first byte to process.
+   * @param byteCount The number of bytes to process.
+   */
+  private boolean checkSomeShards(
+      final byte[][] matrixRows,
+      final byte[][] inputs,
+      final byte[][] toCheck,
+      final int checkCount,
+      final int offset,
+      final int byteCount) {
+
+    // This is the inner loop.  It needs to be fast.  Be careful
+    // if you change it.
+    //
+    // Note that dataShardCount is final in the class, so the
+    // compiler can load it just once, before the loop.  Explicitly
+    // adding a local variable does not make it faster.
+    //
+    // I have tried inlining Galois.multiply(), but it doesn't
+    // make things any faster.  The JIT compiler is known to inline
+    // methods, so it's probably already doing so.
+    //
+    // This method has been timed and compared with a C implementation.
+    // This Java version is only about 10% slower than C.
+
+    for (int iByte = offset; iByte < offset + byteCount; iByte++) {
+      for (int iRow = 0; iRow < checkCount; iRow++) {
+        byte[] matrixRow = matrixRows[iRow];
+        int value = 0;
+        for (int c = 0; c < dataShardCount; c++) {
+          value ^= Galois.multiply(matrixRow[c], inputs[c][iByte]);
+        }
+        if (toCheck[iRow][iByte] != (byte) value) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Create the matrix to use for encoding, given the number of data shards and the number of total
+   * shards.
+   *
+   * <p>The top square of the matrix is guaranteed to be an identity matrix, which means that the
+   * data shards are unchanged after encoding.
+   */
+  private static Matrix buildMatrix(int dataShards, int totalShards) {
+    // Start with a Vandermonde matrix.  This matrix would work,
+    // in theory, but doesn't have the property that the data
+    // shards are unchanged after encoding.
+    Matrix vandermonde = vandermonde(totalShards, dataShards);
+
+    // Multiple by the inverse of the top square of the matrix.
+    // This will make the top square be the identity matrix, but
+    // preserve the property that any square subset of rows  is
+    // invertible.
+    Matrix top = vandermonde.submatrix(0, 0, dataShards, dataShards);
+    return vandermonde.times(top.invert());
+  }
+
+  /**
+   * Create a Vandermonde matrix, which is guaranteed to have the property that any subset of rows
+   * that forms a square matrix is invertible.
+   *
+   * @param rows Number of rows in the result.
+   * @param cols Number of columns in the result.
+   * @return A Matrix.
+   */
+  private static Matrix vandermonde(int rows, int cols) {
+    Matrix result = new Matrix(rows, cols);
+    for (int r = 0; r < rows; r++) {
+      for (int c = 0; c < cols; c++) {
+        result.set(r, c, Galois.exp((byte) r, c));
+      }
+    }
+    return result;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/encode/rs/SampleDecoder.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/encode/rs/SampleDecoder.java
new file mode 100644
index 0000000..87a1ead
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/encode/rs/SampleDecoder.java
@@ -0,0 +1,98 @@
+/**
+ * Command-line program that decodes a file using Reed-Solomon 4+2.
+ *
+ * <p>Copyright 2015, Backblaze, Inc. All rights reserved.
+ */
+package org.apache.iotdb.cluster.utils.encode.rs;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * Command-line program that decodes a file using Reed-Solomon 4+2.
+ *
+ * <p>The file name given should be the name of the file to decode, say "foo.txt". This program will
+ * expected to find "foo.txt.0" through "foo.txt.5", with at most two missing. It will then write
+ * "foo.txt.decoded".
+ */
+public class SampleDecoder {
+
+  public static final int DATA_SHARDS = 4;
+  public static final int PARITY_SHARDS = 2;
+  public static final int TOTAL_SHARDS = 6;
+
+  public static final int BYTES_IN_INT = 4;
+
+  public static void main(String[] arguments) throws IOException {
+
+    // Parse the command line
+    if (arguments.length != 1) {
+      System.out.println("Usage: SampleDecoder <fileName>");
+      return;
+    }
+    final File originalFile = new File(arguments[0]);
+    if (!originalFile.exists()) {
+      System.out.println("Cannot read input file: " + originalFile);
+      return;
+    }
+
+    // Read in any of the shards that are present.
+    // (There should be checking here to make sure the input
+    // shards are the same size, but there isn't.)
+    final byte[][] shards = new byte[TOTAL_SHARDS][];
+    final boolean[] shardPresent = new boolean[TOTAL_SHARDS];
+    int shardSize = 0;
+    int shardCount = 0;
+    for (int i = 0; i < TOTAL_SHARDS; i++) {
+      File shardFile = new File(originalFile.getParentFile(), originalFile.getName() + "." + i);
+      if (shardFile.exists()) {
+        shardSize = (int) shardFile.length();
+        shards[i] = new byte[shardSize];
+        shardPresent[i] = true;
+        shardCount += 1;
+        InputStream in = new FileInputStream(shardFile);
+        in.read(shards[i], 0, shardSize);
+        in.close();
+        System.out.println("Read " + shardFile);
+      }
+    }
+
+    // We need at least DATA_SHARDS to be able to reconstruct the file.
+    if (shardCount < DATA_SHARDS) {
+      System.out.println("Not enough shards present");
+      return;
+    }
+
+    // Make empty buffers for the missing shards.
+    for (int i = 0; i < TOTAL_SHARDS; i++) {
+      if (!shardPresent[i]) {
+        shards[i] = new byte[shardSize];
+      }
+    }
+
+    // Use Reed-Solomon to fill in the missing shards
+    ReedSolomon reedSolomon = new ReedSolomon(DATA_SHARDS, PARITY_SHARDS);
+    reedSolomon.decodeMissing(shards, shardPresent, 0, shardSize);
+
+    // Combine the data shards into one buffer for convenience.
+    // (This is not efficient, but it is convenient.)
+    byte[] allBytes = new byte[shardSize * DATA_SHARDS];
+    for (int i = 0; i < DATA_SHARDS; i++) {
+      System.arraycopy(shards[i], 0, allBytes, shardSize * i, shardSize);
+    }
+
+    // Extract the file length
+    int fileSize = ByteBuffer.wrap(allBytes).getInt();
+
+    // Write the decoded file
+    File decodedFile = new File(originalFile.getParentFile(), originalFile.getName() + ".decoded");
+    OutputStream out = new FileOutputStream(decodedFile);
+    out.write(allBytes, BYTES_IN_INT, fileSize);
+    System.out.println("Wrote " + decodedFile);
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/encode/rs/SampleEncoder.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/encode/rs/SampleEncoder.java
new file mode 100644
index 0000000..1f98707
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/encode/rs/SampleEncoder.java
@@ -0,0 +1,91 @@
+/**
+ * Command-line program encodes one file using Reed-Solomon 4+2.
+ *
+ * <p>Copyright 2015, Backblaze, Inc.
+ */
+package org.apache.iotdb.cluster.utils.encode.rs;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * Command-line program encodes one file using Reed-Solomon 4+2.
+ *
+ * <p>The one argument should be a file name, say "foo.txt". This program will create six files in
+ * the same directory, breaking the input file into four data shards, and two parity shards. The
+ * output files are called "foo.txt.0", "foo.txt.1", ..., and "foo.txt.5". Numbers 4 and 5 are the
+ * parity shards.
+ *
+ * <p>The data stored is the file size (four byte int), followed by the contents of the file, and
+ * then padded to a multiple of four bytes with zeros. The padding is because all four data shards
+ * must be the same size.
+ */
+public class SampleEncoder {
+
+  public static final int DATA_SHARDS = 4;
+  public static final int PARITY_SHARDS = 2;
+  public static final int TOTAL_SHARDS = 6;
+
+  public static final int BYTES_IN_INT = 4;
+
+  public static void main(String[] arguments) throws IOException {
+
+    // Parse the command line
+    if (arguments.length != 1) {
+      System.out.println("Usage: SampleEncoder <fileName>");
+      return;
+    }
+    final File inputFile = new File(arguments[0]);
+    if (!inputFile.exists()) {
+      System.out.println("Cannot read input file: " + inputFile);
+      return;
+    }
+
+    // Get the size of the input file.  (Files bigger that
+    // Integer.MAX_VALUE will fail here!)
+    final int fileSize = (int) inputFile.length();
+
+    // Figure out how big each shard will be.  The total size stored
+    // will be the file size (8 bytes) plus the file.
+    final int storedSize = fileSize + BYTES_IN_INT;
+    final int shardSize = (storedSize + DATA_SHARDS - 1) / DATA_SHARDS;
+
+    // Create a buffer holding the file size, followed by
+    // the contents of the file.
+    final int bufferSize = shardSize * DATA_SHARDS;
+    final byte[] allBytes = new byte[bufferSize];
+    ByteBuffer.wrap(allBytes).putInt(fileSize);
+    InputStream in = new FileInputStream(inputFile);
+    int bytesRead = in.read(allBytes, BYTES_IN_INT, fileSize);
+    if (bytesRead != fileSize) {
+      throw new IOException("not enough bytes read");
+    }
+    in.close();
+
+    // Make the buffers to hold the shards.
+    byte[][] shards = new byte[TOTAL_SHARDS][shardSize];
+
+    // Fill in the data shards
+    for (int i = 0; i < DATA_SHARDS; i++) {
+      System.arraycopy(allBytes, i * shardSize, shards[i], 0, shardSize);
+    }
+
+    // Use Reed-Solomon to calculate the parity.
+    ReedSolomon reedSolomon = new ReedSolomon(DATA_SHARDS, PARITY_SHARDS);
+    reedSolomon.encodeParity(shards, 0, shardSize);
+
+    // Write out the resulting files.
+    for (int i = 0; i < TOTAL_SHARDS; i++) {
+      File outputFile = new File(inputFile.getParentFile(), inputFile.getName() + "." + i);
+      OutputStream out = new FileOutputStream(outputFile);
+      out.write(shards[i]);
+      out.close();
+      System.out.println("wrote " + outputFile);
+    }
+  }
+}