You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2021/12/20 01:47:26 UTC

[iotdb] branch expr_plus created (now b5ac3f7)

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a change to branch expr_plus
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at b5ac3f7  add CRaft

This branch includes the following new commits:

     new b5ac3f7  add CRaft

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 01/01: add CRaft

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch expr_plus
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b5ac3f7667fc60669f133f4a46cc1c84369e5b7f
Author: jt <jt...@163.com>
AuthorDate: Mon Dec 20 09:46:28 2021 +0800

    add CRaft
---
 cluster/distribute-ecs.sh                          |  10 +
 .../org/apache/iotdb/cluster/expr/ExprBench.java   |   4 +-
 .../org/apache/iotdb/cluster/expr/ExprMember.java  |   5 +-
 .../org/apache/iotdb/cluster/expr/ExprServer.java  |   5 +-
 .../apache/iotdb/cluster/expr/VotingLogList.java   |   8 +-
 .../iotdb/cluster/log/FragmentedLogDispatcher.java | 110 ++++
 .../java/org/apache/iotdb/cluster/log/Log.java     |  21 +-
 .../apache/iotdb/cluster/log/LogDispatcher.java    |  32 +-
 .../org/apache/iotdb/cluster/log/LogParser.java    |   6 +
 .../org/apache/iotdb/cluster/log/VotingLog.java    |  11 +-
 .../iotdb/cluster/log/applier/MetaLogApplier.java  |   3 +-
 .../iotdb/cluster/log/logtypes/FragmentedLog.java  | 140 ++++
 .../iotdb/cluster/log/manage/RaftLogManager.java   |  16 +-
 .../log/sequencing/AsynchronousSequencer.java      |   9 +-
 .../handlers/caller/AppendNodeEntryHandler.java    |  22 +-
 .../server/heartbeat/MetaHeartbeatServer.java      |   5 +-
 .../iotdb/cluster/server/member/RaftMember.java    | 135 ++--
 .../apache/iotdb/cluster/server/monitor/Timer.java |   8 +
 .../iotdb/cluster/utils/encode/rs/Galois.java      | 722 +++++++++++++++++++++
 .../iotdb/cluster/utils/encode/rs/Matrix.java      | 312 +++++++++
 .../iotdb/cluster/utils/encode/rs/ReedSolomon.java | 359 ++++++++++
 .../cluster/utils/encode/rs/SampleDecoder.java     |  98 +++
 .../cluster/utils/encode/rs/SampleEncoder.java     |  91 +++
 23 files changed, 2041 insertions(+), 91 deletions(-)

diff --git a/cluster/distribute-ecs.sh b/cluster/distribute-ecs.sh
new file mode 100644
index 0000000..84874f2
--- /dev/null
+++ b/cluster/distribute-ecs.sh
@@ -0,0 +1,10 @@
+src_lib_path=/e/codestore/incubator-iotdb2/cluster/target/iotdb-cluster-0.13.0-SNAPSHOT/lib/iotdb*
+
+ips=(ecs1 ecs2 ecs3 ecs4 ecs5)
+target_lib_path=/root/iotdb_expr/lib
+
+for ip in ${ips[*]}
+  do
+    ssh root@$ip "mkdir $target_lib_path"
+    scp -r $src_lib_path root@$ip:$target_lib_path
+  done
\ No newline at end of file
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
index d0891a3..8115e44 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
@@ -43,6 +43,7 @@ public class ExprBench {
   private long maxLatency = 0;
   private int threadNum = 64;
   private int workloadSize = 64 * 1024;
+  private int printInterval = 1000;
   private SyncClientPool clientPool;
   private Node target;
   private int maxRequestNum;
@@ -84,7 +85,7 @@ public class ExprBench {
                 e.printStackTrace();
               }
 
-              if (currRequsetNum % 1000 == 0) {
+              if (currRequsetNum % printInterval == 0) {
                 long elapsedTime = System.currentTimeMillis() - startTime;
                 System.out.println(
                     String.format(
@@ -118,6 +119,7 @@ public class ExprBench {
     bench.maxRequestNum = Integer.parseInt(args[2]);
     bench.threadNum = Integer.parseInt(args[3]);
     bench.workloadSize = Integer.parseInt(args[4]) * 1024;
+    bench.printInterval = Integer.parseInt(args[5]);
     bench.benchmark();
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java
index 04e6b97..bcc26ce 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java
@@ -133,7 +133,10 @@ public class ExprMember extends MetaGroupMember {
         latch.await();
         return StatusUtils.OK;
       }
-      return processNonPartitionedMetaPlan(plan);
+      long startTime = Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY.getOperationStartTime();
+      TSStatus tsStatus = processNonPartitionedMetaPlan(plan);
+      Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY.calOperationCostTimeFromStart(startTime);
+      return tsStatus;
     } catch (Exception e) {
       logger.error("Exception in processing plan", e);
       return StatusUtils.INTERNAL_ERROR.deepCopy().setMessage(e.getMessage());
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprServer.java
index 6aac585..ab2468e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprServer.java
@@ -61,8 +61,7 @@ public class ExprServer extends MetaClusterServer {
 
   @Override
   protected TServerTransport getServerSocket() throws TTransportException {
-    return new TServerSocket(
-        new InetSocketAddress(thisNode.getInternalIp(), thisNode.getMetaPort()));
+    return new TServerSocket(new InetSocketAddress("0.0.0.0", thisNode.getMetaPort()));
   }
 
   public static void main(String[] args)
@@ -82,6 +81,7 @@ public class ExprServer extends MetaClusterServer {
     boolean enableCommitReturn = Boolean.parseBoolean(args[7]);
     int maxBatchSize = Integer.parseInt(args[8]);
     int defaultLogBufferSize = Integer.parseInt(args[9]);
+    boolean useCRaft = Boolean.parseBoolean(args[10]);
 
     ClusterDescriptor.getInstance().getConfig().setSeedNodeUrls(Arrays.asList(allNodeStr));
     ClusterDescriptor.getInstance().getConfig().setInternalMetaPort(port);
@@ -98,6 +98,7 @@ public class ExprServer extends MetaClusterServer {
     ExprMember.ENABLE_WEAK_ACCEPTANCE = enableWeakAcceptance;
     ExprMember.ENABLE_COMMIT_RETURN = enableCommitReturn;
     Log.DEFAULT_BUFFER_SIZE = defaultLogBufferSize * 1024 + 512;
+    RaftMember.USE_CRAFT = useCRaft;
 
     ExprServer server = new ExprServer();
     server.start();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java
index 2723ac8..a1567d0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java
@@ -79,7 +79,7 @@ public class VotingLogList {
           if (votingLog.getStronglyAcceptedNodeIds().size()
                   + votingLog.getWeaklyAcceptedNodeIds().size()
               >= quorumSize) {
-            votingLog.acceptedTime = System.nanoTime();
+            votingLog.acceptedTime.set(System.nanoTime());
           }
         } else if (votingLog.getLog().getCurrLogIndex() > index) {
           break;
@@ -93,9 +93,9 @@ public class VotingLogList {
 
     if (lastEntryIndexToCommit != -1) {
       for (VotingLog acceptedLog : acceptedLogs) {
-        synchronized (acceptedLog) {
-          acceptedLog.acceptedTime = System.nanoTime();
-          acceptedLog.notifyAll();
+        synchronized (acceptedLog.getStronglyAcceptedNodeIds()) {
+          acceptedLog.acceptedTime.set(System.nanoTime());
+          acceptedLog.getStronglyAcceptedNodeIds().notifyAll();
         }
       }
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java
new file mode 100644
index 0000000..f8954ef
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.log;
+
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.log.logtypes.FragmentedLog;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.server.member.RaftMember;
+import org.apache.iotdb.cluster.server.monitor.Timer;
+import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class FragmentedLogDispatcher extends LogDispatcher {
+
+  private static final Logger logger = LoggerFactory.getLogger(FragmentedLogDispatcher.class);
+
+  public FragmentedLogDispatcher(RaftMember member) {
+    super(member);
+  }
+
+  public void offer(SendLogRequest request) {
+    // do serialization here to avoid taking LogManager for too long
+
+    long startTime = Statistic.LOG_DISPATCHER_LOG_ENQUEUE.getOperationStartTime();
+    request.getVotingLog().getLog().setEnqueueTime(System.nanoTime());
+    for (int i = 0; i < nodesLogQueues.size(); i++) {
+      BlockingQueue<SendLogRequest> nodeLogQueue = nodesLogQueues.get(i);
+      SendLogRequest fragmentedRequest = new SendLogRequest(request);
+      fragmentedRequest.setVotingLog(new VotingLog(request.getVotingLog()));
+      fragmentedRequest
+          .getVotingLog()
+          .setLog(new FragmentedLog((FragmentedLog) request.getVotingLog().getLog(), i));
+      try {
+        boolean addSucceeded;
+        if (ClusterDescriptor.getInstance().getConfig().isWaitForSlowNode()) {
+          addSucceeded =
+              nodeLogQueue.offer(
+                  fragmentedRequest,
+                  ClusterDescriptor.getInstance().getConfig().getWriteOperationTimeoutMS(),
+                  TimeUnit.MILLISECONDS);
+        } else {
+          addSucceeded = nodeLogQueue.add(fragmentedRequest);
+        }
+
+        if (!addSucceeded) {
+          logger.debug(
+              "Log queue[{}] of {} is full, ignore the request to this node", i, member.getName());
+        } else {
+          request.setEnqueueTime(System.nanoTime());
+        }
+      } catch (IllegalStateException e) {
+        logger.debug(
+            "Log queue[{}] of {} is full, ignore the request to this node", i, member.getName());
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    }
+    Statistic.LOG_DISPATCHER_LOG_ENQUEUE.calOperationCostTimeFromStart(startTime);
+
+    if (Timer.ENABLE_INSTRUMENTING) {
+      Statistic.LOG_DISPATCHER_FROM_CREATE_TO_ENQUEUE.calOperationCostTimeFromStart(
+          request.getVotingLog().getLog().getCreateTime());
+    }
+  }
+
+  LogDispatcher.DispatcherThread newDispatcherThread(
+      Node node, BlockingQueue<SendLogRequest> logBlockingQueue) {
+    return new DispatcherThread(node, logBlockingQueue);
+  }
+
+  class DispatcherThread extends LogDispatcher.DispatcherThread {
+
+    DispatcherThread(Node receiver, BlockingQueue<SendLogRequest> logBlockingDeque) {
+      super(receiver, logBlockingDeque);
+    }
+
+    @Override
+    protected void serializeEntries() {
+      for (SendLogRequest request : currBatch) {
+        Timer.Statistic.LOG_DISPATCHER_LOG_IN_QUEUE.calOperationCostTimeFromStart(
+            request.getVotingLog().getLog().getEnqueueTime());
+        long start = Statistic.RAFT_SENDER_SERIALIZE_LOG.getOperationStartTime();
+        request.getAppendEntryRequest().entry = request.getVotingLog().getLog().serialize();
+        Statistic.RAFT_SENDER_SERIALIZE_LOG.calOperationCostTimeFromStart(start);
+      }
+    }
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
index 4845bba..01d8541 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
@@ -43,8 +43,10 @@ public abstract class Log implements Comparable<Log> {
   @SuppressWarnings("java:S3077")
   private volatile Exception exception;
 
+  private long receiveTime;
   private long createTime;
   private long enqueueTime;
+  private long sequenceStartTime;
 
   private int byteSize = 0;
 
@@ -63,7 +65,8 @@ public abstract class Log implements Comparable<Log> {
     CLOSE_FILE,
     REMOVE_NODE,
     EMPTY_CONTENT,
-    TEST_LARGE_CONTENT
+    TEST_LARGE_CONTENT,
+    FRAGMENTED
   }
 
   public long getCurrLogIndex() {
@@ -147,4 +150,20 @@ public abstract class Log implements Comparable<Log> {
   public void setByteSize(int byteSize) {
     this.byteSize = byteSize;
   }
+
+  public long getReceiveTime() {
+    return receiveTime;
+  }
+
+  public void setReceiveTime(long receiveTime) {
+    this.receiveTime = receiveTime;
+  }
+
+  public long getSequenceStartTime() {
+    return sequenceStartTime;
+  }
+
+  public void setSequenceStartTime(long sequenceStartTime) {
+    this.sequenceStartTime = sequenceStartTime;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index beaaa77..17b853c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -51,6 +51,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -187,6 +188,15 @@ public class LogDispatcher {
       this.setQuorumSize(quorumSize);
     }
 
+    public SendLogRequest(SendLogRequest request) {
+      this.setVotingLog(request.votingLog);
+      this.setLeaderShipStale(request.leaderShipStale);
+      this.setNewLeaderTerm(request.newLeaderTerm);
+      this.setAppendEntryRequest(request.appendEntryRequest);
+      this.setQuorumSize(request.quorumSize);
+      this.setEnqueueTime(request.enqueueTime);
+    }
+
     public VotingLog getVotingLog() {
       return votingLog;
     }
@@ -245,7 +255,7 @@ public class LogDispatcher {
 
     Node receiver;
     private BlockingQueue<SendLogRequest> logBlockingDeque;
-    private List<SendLogRequest> currBatch = new ArrayList<>();
+    protected List<SendLogRequest> currBatch = new ArrayList<>();
     private Peer peer;
     Client client;
 
@@ -275,13 +285,7 @@ public class LogDispatcher {
             logger.debug("Sending {} logs to {}", currBatch.size(), receiver);
           }
           Statistic.LOG_DISPATCHER_LOG_BATCH_SIZE.add(currBatch.size());
-          for (SendLogRequest request : currBatch) {
-            Timer.Statistic.LOG_DISPATCHER_LOG_IN_QUEUE.calOperationCostTimeFromStart(
-                request.getVotingLog().getLog().getEnqueueTime());
-            long start = Statistic.RAFT_SENDER_SERIALIZE_LOG.getOperationStartTime();
-            request.getAppendEntryRequest().entry = request.serializedLogFuture.get();
-            Statistic.RAFT_SENDER_SERIALIZE_LOG.calOperationCostTimeFromStart(start);
-          }
+          serializeEntries();
           sendBatchLogs(currBatch);
           currBatch.clear();
         }
@@ -293,6 +297,16 @@ public class LogDispatcher {
       logger.info("Dispatcher exits");
     }
 
+    protected void serializeEntries() throws ExecutionException, InterruptedException {
+      for (SendLogRequest request : currBatch) {
+        Timer.Statistic.LOG_DISPATCHER_LOG_IN_QUEUE.calOperationCostTimeFromStart(
+            request.getVotingLog().getLog().getEnqueueTime());
+        long start = Statistic.RAFT_SENDER_SERIALIZE_LOG.getOperationStartTime();
+        request.getAppendEntryRequest().entry = request.serializedLogFuture.get();
+        Statistic.RAFT_SENDER_SERIALIZE_LOG.calOperationCostTimeFromStart(start);
+      }
+    }
+
     private void appendEntriesAsync(
         List<ByteBuffer> logList, AppendEntriesRequest request, List<SendLogRequest> currBatch)
         throws TException {
@@ -424,7 +438,7 @@ public class LogDispatcher {
               peer,
               logRequest.quorumSize);
       // TODO add async interface
-      int retries = 5;
+      int retries = 50;
       try {
         long operationStartTime = Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
         for (int i = 0; i < retries; i++) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java
index e35943b..26f8a42 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.cluster.log.Log.Types;
 import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
 import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
 import org.apache.iotdb.cluster.log.logtypes.EmptyContentLog;
+import org.apache.iotdb.cluster.log.logtypes.FragmentedLog;
 import org.apache.iotdb.cluster.log.logtypes.LargeTestLog;
 import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
 import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
@@ -101,6 +102,11 @@ public class LogParser {
         largeLog.deserialize(buffer);
         log = largeLog;
         break;
+      case FRAGMENTED:
+        FragmentedLog fragmentedLog = new FragmentedLog();
+        fragmentedLog.deserialize(buffer);
+        log = fragmentedLog;
+        break;
       default:
         throw new IllegalArgumentException(type.toString());
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
index d56f3f0..46b5c02 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
@@ -22,18 +22,27 @@ package org.apache.iotdb.cluster.log;
 import java.nio.ByteBuffer;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class VotingLog {
   protected Log log;
   protected Set<Integer> stronglyAcceptedNodeIds;
   protected Set<Integer> weaklyAcceptedNodeIds;
-  public long acceptedTime;
+  public AtomicLong acceptedTime;
   public volatile ByteBuffer serializedCache;
 
   public VotingLog(Log log, int groupSize) {
     this.log = log;
     stronglyAcceptedNodeIds = new HashSet<>(groupSize);
     weaklyAcceptedNodeIds = new HashSet<>(groupSize);
+    acceptedTime = new AtomicLong();
+  }
+
+  public VotingLog(VotingLog another) {
+    this.log = another.log;
+    this.stronglyAcceptedNodeIds = another.stronglyAcceptedNodeIds;
+    this.weaklyAcceptedNodeIds = another.weaklyAcceptedNodeIds;
+    this.acceptedTime = another.acceptedTime;
   }
 
   public Log getLog() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
index 1c501f8..af3163c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.cluster.exception.ChangeMembershipException;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
 import org.apache.iotdb.cluster.log.logtypes.EmptyContentLog;
+import org.apache.iotdb.cluster.log.logtypes.FragmentedLog;
 import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
 import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.apache.iotdb.cluster.server.NodeCharacter;
@@ -52,7 +53,7 @@ public class MetaLogApplier extends BaseApplier {
         applyPhysicalPlan(((PhysicalPlanLog) log).getPlan(), null);
       } else if (log instanceof RemoveNodeLog) {
         applyRemoveNodeLog((RemoveNodeLog) log);
-      } else if (log instanceof EmptyContentLog) {
+      } else if (log instanceof EmptyContentLog || log instanceof FragmentedLog) {
         // Do nothing
       } else {
         logger.error("Unsupported log: {} {}", log.getClass().getName(), log);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/FragmentedLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/FragmentedLog.java
new file mode 100644
index 0000000..7186098
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/FragmentedLog.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.log.logtypes;
+
+import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.utils.encode.rs.ReedSolomon;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import static org.apache.iotdb.cluster.log.Log.Types.FRAGMENTED;
+
+public class FragmentedLog extends Log {
+
+  private byte[][] logFragments;
+  private int logLength;
+  private boolean[] fragmentPresent;
+  private int shardLength;
+  private int dataShardNum;
+  private int parityShardNum;
+
+  public FragmentedLog() {}
+
+  public FragmentedLog(Log base, int nodeNum) {
+
+    ByteBuffer baseBuffer = base.serialize();
+    logLength = baseBuffer.limit();
+    int followerNum = nodeNum - 1;
+    parityShardNum = nodeNum / 2 - 1;
+    dataShardNum = followerNum - parityShardNum;
+    shardLength = logLength / dataShardNum + 1;
+
+    logFragments = new byte[parityShardNum + dataShardNum][];
+    fragmentPresent = new boolean[parityShardNum + dataShardNum];
+    for (int i = 0; i < parityShardNum + dataShardNum; i++) {
+      logFragments[i] = new byte[shardLength];
+      fragmentPresent[i] = true;
+    }
+
+    int start = 0;
+    int end = Math.min(start + shardLength, logLength);
+    for (int i = 0; i < dataShardNum; i++) {
+      System.arraycopy(baseBuffer.array(), start, logFragments[i], 0, end - start);
+      start += shardLength;
+      end = Math.min(start + shardLength, logLength);
+    }
+
+    ReedSolomon reedSolomon = new ReedSolomon(dataShardNum, parityShardNum);
+    reedSolomon.encodeParity(logFragments, 0, shardLength);
+  }
+
+  public FragmentedLog(FragmentedLog parent, int fragmentIndex) {
+    setCurrLogIndex(parent.getCurrLogIndex());
+    setCurrLogTerm(parent.getCurrLogTerm());
+    setCreateTime(parent.getCreateTime());
+    setEnqueueTime(parent.getEnqueueTime());
+    setReceiveTime(parent.getReceiveTime());
+
+    this.logFragments = parent.logFragments;
+    this.fragmentPresent = new boolean[logFragments.length];
+    fragmentPresent[fragmentIndex] = true;
+    this.logLength = parent.logLength;
+    this.dataShardNum = parent.dataShardNum;
+    this.parityShardNum = parent.parityShardNum;
+    this.shardLength = parent.shardLength;
+  }
+
+  @Override
+  public ByteBuffer serialize() {
+    PublicBAOS byteArrayOutputStream = new PublicBAOS(DEFAULT_BUFFER_SIZE);
+    try (DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
+      dataOutputStream.writeByte((byte) FRAGMENTED.ordinal());
+
+      dataOutputStream.writeLong(getCurrLogIndex());
+      dataOutputStream.writeLong(getCurrLogTerm());
+
+      dataOutputStream.writeInt(logLength);
+      dataOutputStream.writeInt(dataShardNum);
+      dataOutputStream.writeInt(parityShardNum);
+      dataOutputStream.writeInt(shardLength);
+
+      for (int i = 0, fragmentPresentLength = fragmentPresent.length;
+          i < fragmentPresentLength;
+          i++) {
+        boolean present = fragmentPresent[i];
+        dataOutputStream.writeBoolean(present);
+        if (present) {
+          dataOutputStream.write(logFragments[i]);
+        }
+      }
+    } catch (IOException e) {
+      // unreachable
+    }
+
+    return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
+  }
+
+  @Override
+  public void deserialize(ByteBuffer buffer) {
+    setCurrLogIndex(buffer.getLong());
+    setCurrLogTerm(buffer.getLong());
+
+    logLength = buffer.getInt();
+    dataShardNum = buffer.getInt();
+    parityShardNum = buffer.getInt();
+    shardLength = buffer.getInt();
+
+    logFragments = new byte[dataShardNum + parityShardNum][shardLength];
+    fragmentPresent = new boolean[dataShardNum + parityShardNum];
+
+    for (int i = 0, fragmentPresentLength = fragmentPresent.length;
+        i < fragmentPresentLength;
+        i++) {
+      boolean present = buffer.get() == 1;
+      fragmentPresent[i] = present;
+      if (present) {
+        buffer.get(logFragments[i], 0, shardLength);
+      }
+    }
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index 4b445df..46ee69c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -65,7 +65,7 @@ public abstract class RaftLogManager {
   /** manage committed entries in disk for safety */
   private StableEntryManager stableEntryManager;
 
-  private long commitIndex;
+  private volatile long commitIndex;
 
   /**
    * The committed logs whose index is smaller than this are all have been applied, for example,
@@ -693,7 +693,7 @@ public abstract class RaftLogManager {
 
     long unappliedLogSize = commitLogIndex - maxHaveAppliedCommitIndex;
     if (unappliedLogSize > ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem()) {
-      logger.debug(
+      logger.info(
           "There are too many unapplied logs [{}], wait for a while to avoid memory overflow",
           unappliedLogSize);
       try {
@@ -995,7 +995,7 @@ public abstract class RaftLogManager {
           || nextToCheckIndex > getCommittedEntryManager().getLastIndex()
           || (blockAppliedCommitIndex > 0 && blockAppliedCommitIndex < nextToCheckIndex)) {
         // avoid spinning
-        Thread.sleep(5);
+        Thread.sleep(0);
         return;
       }
       Log log = getCommittedEntryManager().getEntry(nextToCheckIndex);
@@ -1007,10 +1007,12 @@ public abstract class RaftLogManager {
             nextToCheckIndex);
         return;
       }
-      synchronized (log) {
-        while (!log.isApplied() && maxHaveAppliedCommitIndex < log.getCurrLogIndex()) {
-          // wait until the log is applied or a newer snapshot is installed
-          log.wait(5);
+      if (!log.isApplied() && maxHaveAppliedCommitIndex < log.getCurrLogIndex()) {
+        synchronized (log) {
+          while (!log.isApplied() && maxHaveAppliedCommitIndex < log.getCurrLogIndex()) {
+            // wait until the log is applied or a newer snapshot is installed
+            log.wait(1);
+          }
         }
       }
       synchronized (changeApplyCommitIndexCond) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
index 0def4d0..4390def 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
@@ -41,6 +41,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
+import static org.apache.iotdb.cluster.server.monitor.Timer.Statistic.RAFT_SENDER_SEQUENCE_LOG;
+
 public class AsynchronousSequencer implements LogSequencer {
 
   private static final Logger logger = LoggerFactory.getLogger(AsynchronousSequencer.class);
@@ -56,7 +58,7 @@ public class AsynchronousSequencer implements LogSequencer {
   public AsynchronousSequencer(RaftMember member, RaftLogManager logManager) {
     this.member = member;
     this.logManager = logManager;
-    unsequencedLogQueue = new ArrayBlockingQueue<>(4096);
+    unsequencedLogQueue = new ArrayBlockingQueue<>(40960, true);
     for (int i = 0; i < SEQUENCER_PARALLELISM; i++) {
       SEQUENCER_POOL.submit(this::sequenceTask);
     }
@@ -83,7 +85,9 @@ public class AsynchronousSequencer implements LogSequencer {
     long startTime;
     synchronized (logManager) {
       for (SendLogRequest sendLogRequest : sendLogRequests) {
+        long sequenceStartTime = RAFT_SENDER_SEQUENCE_LOG.getOperationStartTime();
         Log log = sendLogRequest.getVotingLog().getLog();
+        log.setSequenceStartTime(sequenceStartTime);
         log.setCurrLogTerm(member.getTerm().get());
         log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
         if (log instanceof PhysicalPlanLog) {
@@ -101,10 +105,13 @@ public class AsynchronousSequencer implements LogSequencer {
         sendLogRequest.setAppendEntryRequest(appendEntryRequest);
 
         startTime = Statistic.RAFT_SENDER_OFFER_LOG.getOperationStartTime();
+        Statistic.LOG_DISPATCHER_FROM_RECEIVE_TO_CREATE.calOperationCostTimeFromStart(
+            log.getReceiveTime());
         log.setCreateTime(System.nanoTime());
         member.getVotingLogList().insert(sendLogRequest.getVotingLog());
         member.getLogDispatcher().offer(sendLogRequest);
         Statistic.RAFT_SENDER_OFFER_LOG.calOperationCostTimeFromStart(startTime);
+        RAFT_SENDER_SEQUENCE_LOG.calOperationCostTimeFromStart(sequenceStartTime);
       }
     }
     sendLogRequests.clear();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
index b6c0df2..749a3ec 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
@@ -88,13 +88,13 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
     long resp = response.status;
 
     if (resp == RESPONSE_STRONG_ACCEPT) {
-      synchronized (log) {
+      synchronized (log.getStronglyAcceptedNodeIds()) {
         if (log.getWeaklyAcceptedNodeIds().size() + log.getStronglyAcceptedNodeIds().size()
             >= quorumSize) {
-          log.acceptedTime = System.nanoTime();
+          log.acceptedTime.set(System.nanoTime());
         }
         log.getStronglyAcceptedNodeIds().add(receiver.nodeIdentifier);
-        log.notifyAll();
+        log.getStronglyAcceptedNodeIds().notifyAll();
       }
       member
           .getVotingLogList()
@@ -117,17 +117,17 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
         receiverTerm.set(resp);
       }
       leaderShipStale.set(true);
-      synchronized (log) {
-        log.notifyAll();
+      synchronized (log.getStronglyAcceptedNodeIds()) {
+        log.getStronglyAcceptedNodeIds().notifyAll();
       }
     } else if (resp == RESPONSE_WEAK_ACCEPT) {
-      synchronized (log) {
+      synchronized (log.getStronglyAcceptedNodeIds()) {
+        log.getWeaklyAcceptedNodeIds().add(receiver.nodeIdentifier);
         if (log.getWeaklyAcceptedNodeIds().size() + log.getStronglyAcceptedNodeIds().size()
             >= quorumSize) {
-          log.acceptedTime = System.nanoTime();
+          log.acceptedTime.set(System.nanoTime());
         }
-        log.getWeaklyAcceptedNodeIds().add(receiver.nodeIdentifier);
-        log.notifyAll();
+        log.getStronglyAcceptedNodeIds().notifyAll();
       }
     } else {
       // e.g., Response.RESPONSE_LOG_MISMATCH
@@ -155,12 +155,12 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
   }
 
   private void onFail() {
-    synchronized (log) {
+    synchronized (log.getStronglyAcceptedNodeIds()) {
       failedDecreasingCounter--;
       if (failedDecreasingCounter <= 0) {
         // quorum members have failed, there is no need to wait for others
         log.getStronglyAcceptedNodeIds().add(Integer.MAX_VALUE);
-        log.notifyAll();
+        log.getStronglyAcceptedNodeIds().notifyAll();
       }
     }
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatServer.java
index afde282..95e38b1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatServer.java
@@ -68,13 +68,12 @@ public class MetaHeartbeatServer extends HeartbeatServer {
     if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
       return new TNonblockingServerSocket(
           new InetSocketAddress(
-              config.getInternalIp(),
-              config.getInternalMetaPort() + ClusterUtils.META_HEARTBEAT_PORT_OFFSET),
+              "0.0.0.0", config.getInternalMetaPort() + ClusterUtils.META_HEARTBEAT_PORT_OFFSET),
           getConnectionTimeoutInMS());
     } else {
       return new TServerSocket(
           new InetSocketAddress(
-              metaClusterServer.getMember().getThisNode().getInternalIp(),
+              "0.0.0.0",
               metaClusterServer.getMember().getThisNode().getMetaPort()
                   + ClusterUtils.META_HEARTBEAT_PORT_OFFSET));
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index cd54aa6..4a24d13 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
 import org.apache.iotdb.cluster.expr.VotingLogList;
 import org.apache.iotdb.cluster.log.CommitLogCallback;
 import org.apache.iotdb.cluster.log.CommitLogTask;
+import org.apache.iotdb.cluster.log.FragmentedLogDispatcher;
 import org.apache.iotdb.cluster.log.HardState;
 import org.apache.iotdb.cluster.log.IndirectLogDispatcher;
 import org.apache.iotdb.cluster.log.Log;
@@ -39,6 +40,7 @@ import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest;
 import org.apache.iotdb.cluster.log.LogParser;
 import org.apache.iotdb.cluster.log.VotingLog;
 import org.apache.iotdb.cluster.log.catchup.CatchUpTask;
+import org.apache.iotdb.cluster.log.logtypes.FragmentedLog;
 import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
 import org.apache.iotdb.cluster.log.manage.RaftLogManager;
 import org.apache.iotdb.cluster.log.sequencing.AsynchronousSequencer.Factory;
@@ -133,6 +135,7 @@ public abstract class RaftMember {
   public static boolean USE_INDIRECT_LOG_DISPATCHER = false;
   public static boolean ENABLE_WEAK_ACCEPTANCE = true;
   public static boolean ENABLE_COMMIT_RETURN = false;
+  public static boolean USE_CRAFT = false;
 
   protected static final LogSequencerFactory SEQUENCER_FACTORY =
       ClusterDescriptor.getInstance().getConfig().isUseAsyncSequencing()
@@ -1195,6 +1198,11 @@ public abstract class RaftMember {
       ((PhysicalPlanLog) log).setPlan(plan);
     }
 
+    if (USE_CRAFT && allNodes.size() > 2) {
+      log = new FragmentedLog(log, allNodes.size());
+    }
+    log.setReceiveTime(System.nanoTime());
+
     // just like processPlanLocally,we need to check the size of log
     //    if (log.serialize().capacity() + Integer.BYTES
     //        >= ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
@@ -1606,6 +1614,8 @@ public abstract class RaftMember {
     if (logDispatcher == null) {
       if (USE_INDIRECT_LOG_DISPATCHER) {
         logDispatcher = new IndirectLogDispatcher(this);
+      } else if (USE_CRAFT && allNodes.size() > 2) {
+        logDispatcher = new FragmentedLogDispatcher(this);
       } else {
         logDispatcher = new LogDispatcher(this);
       }
@@ -1613,63 +1623,92 @@ public abstract class RaftMember {
     return logDispatcher;
   }
 
+  @SuppressWarnings({"java:S2445"}) // safe synchronized
+  private void waitAppendResultLoop(VotingLog log, int quorumSize) {
+    int stronglyAcceptedNodeNum = log.getStronglyAcceptedNodeIds().size();
+    int weaklyAcceptedNodeNum = log.getWeaklyAcceptedNodeIds().size();
+    int totalAccepted = stronglyAcceptedNodeNum + weaklyAcceptedNodeNum;
+    long nextTimeToPrint = 1000;
+
+    long waitStart = System.currentTimeMillis();
+    long alreadyWait = 0;
+    while (stronglyAcceptedNodeNum < quorumSize
+        && (!ENABLE_WEAK_ACCEPTANCE
+            || (totalAccepted < quorumSize)
+            || votingLogList.size() > config.getMaxNumOfLogsInMem())
+        && alreadyWait < RaftServer.getWriteOperationTimeoutMS()
+        && !log.getStronglyAcceptedNodeIds().contains(Integer.MAX_VALUE)) {
+      long singleWaitTime = 0;
+      long singleWaitStart = System.nanoTime();
+      try {
+        synchronized (log.getStronglyAcceptedNodeIds()) {
+          log.getStronglyAcceptedNodeIds().wait(1);
+        }
+        singleWaitTime = (System.nanoTime() - singleWaitStart) / 1000000;
+        logger.debug("{} ends waiting", log);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        logger.warn("Unexpected interruption when sending a log", e);
+      }
+      alreadyWait = System.currentTimeMillis() - waitStart;
+      if (alreadyWait > nextTimeToPrint) {
+        logger.info(
+            "Still not receive enough votes for {}, strongly accepted {}, weakly "
+                + "accepted {}, voting logs {}, wait {}ms, wait to sequence {}ms, wait to enqueue "
+                + "{}ms, wait to accept "
+                + "{}ms",
+            log,
+            log.getStronglyAcceptedNodeIds(),
+            log.getWeaklyAcceptedNodeIds(),
+            votingLogList.size(),
+            singleWaitTime,
+            (log.getLog().getSequenceStartTime() - singleWaitStart) / 1000000,
+            (log.getLog().getEnqueueTime() - singleWaitStart) / 1000000,
+            (log.acceptedTime.get() - singleWaitStart) / 1000000);
+        nextTimeToPrint *= 2;
+      }
+      stronglyAcceptedNodeNum = log.getStronglyAcceptedNodeIds().size();
+      weaklyAcceptedNodeNum = log.getWeaklyAcceptedNodeIds().size();
+      totalAccepted = stronglyAcceptedNodeNum + weaklyAcceptedNodeNum;
+    }
+
+    if (alreadyWait > 15000) {
+      logger.info(
+          "Slow entry {}, strongly accepted {}, weakly " + "accepted {}, waited time {}ms",
+          log,
+          log.getStronglyAcceptedNodeIds(),
+          log.getWeaklyAcceptedNodeIds(),
+          alreadyWait);
+    }
+  }
+
   /**
    * wait until "voteCounter" counts down to zero, which means the quorum has received the log, or
    * one follower tells the node that it is no longer a valid leader, or a timeout is triggered.
    */
-  @SuppressWarnings({"java:S2445"}) // safe synchronized
   protected AppendLogResult waitAppendResult(
       VotingLog log, AtomicBoolean leaderShipStale, AtomicLong newLeaderTerm, int quorumSize) {
     // wait for the followers to vote
     long startTime = Timer.Statistic.RAFT_SENDER_VOTE_COUNTER.getOperationStartTime();
-    long nextTimeToPrint = 15000;
 
     int stronglyAcceptedNodeNum = log.getStronglyAcceptedNodeIds().size();
     int weaklyAcceptedNodeNum = log.getWeaklyAcceptedNodeIds().size();
     int totalAccepted = stronglyAcceptedNodeNum + weaklyAcceptedNodeNum;
 
-    synchronized (log) {
-      long waitStart = System.currentTimeMillis();
-      long alreadyWait = 0;
-      while (stronglyAcceptedNodeNum < quorumSize
-          && (!ENABLE_WEAK_ACCEPTANCE
-              || (totalAccepted < allNodes.size() - 1)
-              || votingLogList.size() > config.getMaxNumOfLogsInMem())
-          && alreadyWait < RaftServer.getWriteOperationTimeoutMS()
-          && !log.getStronglyAcceptedNodeIds().contains(Integer.MAX_VALUE)) {
-        try {
-          log.wait(0);
-          logger.debug("{} ends waiting", log);
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          logger.warn("Unexpected interruption when sending a log", e);
-        }
-        alreadyWait = System.currentTimeMillis() - waitStart;
-        if (alreadyWait > nextTimeToPrint) {
-          logger.info(
-              "Still not receive enough votes for {}, strongly accepted {}, weakly "
-                  + "accepted {}",
-              log,
-              log.getStronglyAcceptedNodeIds(),
-              log.getWeaklyAcceptedNodeIds());
-          nextTimeToPrint *= 2;
-        }
-        stronglyAcceptedNodeNum = log.getStronglyAcceptedNodeIds().size();
-        weaklyAcceptedNodeNum = log.getWeaklyAcceptedNodeIds().size();
-        totalAccepted = stronglyAcceptedNodeNum + weaklyAcceptedNodeNum;
-      }
-
-      if (alreadyWait > 15000) {
-        logger.info(
-            "Slow entry {}, strongly accepted {}, weakly " + "accepted {}, waited time {}ms",
-            log,
-            log.getStronglyAcceptedNodeIds(),
-            log.getWeaklyAcceptedNodeIds(),
-            alreadyWait);
-      }
+    if (stronglyAcceptedNodeNum < quorumSize
+        && (!ENABLE_WEAK_ACCEPTANCE
+            || (totalAccepted < quorumSize)
+            || votingLogList.size() > config.getMaxNumOfLogsInMem())
+        && !log.getStronglyAcceptedNodeIds().contains(Integer.MAX_VALUE)) {
+      waitAppendResultLoop(log, quorumSize);
     }
-    if (log.acceptedTime != 0) {
-      Statistic.RAFT_WAIT_AFTER_ACCEPTED.calOperationCostTimeFromStart(log.acceptedTime);
+
+    stronglyAcceptedNodeNum = log.getStronglyAcceptedNodeIds().size();
+    weaklyAcceptedNodeNum = log.getWeaklyAcceptedNodeIds().size();
+    totalAccepted = stronglyAcceptedNodeNum + weaklyAcceptedNodeNum;
+
+    if (log.acceptedTime.get() != 0) {
+      Statistic.RAFT_WAIT_AFTER_ACCEPTED.calOperationCostTimeFromStart(log.acceptedTime.get());
     }
     Timer.Statistic.RAFT_SENDER_VOTE_COUNTER.calOperationCostTimeFromStart(startTime);
 
@@ -1702,16 +1741,14 @@ public abstract class RaftMember {
     if (log.getCurrLogIndex() > logManager.getCommitLogIndex()) {
       startTime = Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT.getOperationStartTime();
       synchronized (logManager) {
+        Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT.calOperationCostTimeFromStart(
+            startTime);
         if (log.getCurrLogIndex() > logManager.getCommitLogIndex()) {
-          Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT.calOperationCostTimeFromStart(
-              startTime);
-
           startTime = Statistic.RAFT_SENDER_COMMIT_LOG_IN_MANAGER.getOperationStartTime();
           logManager.commitTo(log.getCurrLogIndex());
           Statistic.RAFT_SENDER_COMMIT_LOG_IN_MANAGER.calOperationCostTimeFromStart(startTime);
-
-          startTime = Statistic.RAFT_SENDER_EXIT_LOG_MANAGER.getOperationStartTime();
         }
+        startTime = Statistic.RAFT_SENDER_EXIT_LOG_MANAGER.getOperationStartTime();
       }
       Statistic.RAFT_SENDER_EXIT_LOG_MANAGER.calOperationCostTimeFromStart(startTime);
     }
@@ -1725,7 +1762,7 @@ public abstract class RaftMember {
       while (!log.isApplied()) {
         // wait until the log is applied
         try {
-          log.wait(0);
+          log.wait(1);
         } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
           throw new LogExecutionException(e);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
index c794211..c72550e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
@@ -86,6 +86,8 @@ public class Timer {
         true,
         META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
     // raft member - sender
+    RAFT_SENDER_SEQUENCE_LOG(
+        RAFT_MEMBER_SENDER, "sequence log", TIME_SCALE, true, META_GROUP_MEMBER_EXECUTE_NON_QUERY),
     RAFT_SENDER_APPEND_LOG(
         RAFT_MEMBER_SENDER,
         "locally append log",
@@ -257,6 +259,12 @@ public class Timer {
         META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
     LOG_DISPATCHER_LOG_BATCH_SIZE(
         LOG_DISPATCHER, "batch size", 1, true, META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    LOG_DISPATCHER_FROM_RECEIVE_TO_CREATE(
+        LOG_DISPATCHER,
+        "from receive to create",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
     LOG_DISPATCHER_FROM_CREATE_TO_SENT(
         LOG_DISPATCHER,
         "from create to sent",
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/encode/rs/Galois.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/encode/rs/Galois.java
new file mode 100644
index 0000000..5f0b8ac
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/encode/rs/Galois.java
@@ -0,0 +1,722 @@
+/**
+ * 8-bit Galois Field
+ *
+ * <p>Copyright 2015, Backblaze, Inc. All rights reserved.
+ */
+package org.apache.iotdb.cluster.utils.encode.rs;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * 8-bit Galois Field
+ *
+ * <p>This class implements multiplication, division, addition, subtraction, and exponentiation.
+ *
+ * <p>The multiplication operation is in the inner loop of erasure coding, so it's been optimized.
+ * Having the class be "final" helps a little, and having the EXP_TABLE repeat the data, so there's
+ * no need to bound the sum of two logarithms to 255 helps a lot.
+ */
+public final class Galois {
+
+  /** The number of elements in the field. */
+  public static final int FIELD_SIZE = 256;
+
+  /**
+   * The polynomial used to generate the logarithm table.
+   *
+   * <p>There are a number of polynomials that work to generate a Galois field of 256 elements. The
+   * choice is arbitrary, and we just use the first one.
+   *
+   * <p>The possibilities are: 29, 43, 45, 77, 95, 99, 101, 105, 113, 135, 141, 169, 195, 207, 231,
+   * and 245.
+   */
+  public static final int GENERATING_POLYNOMIAL = 29;
+
+  /**
+   * Mapping from members of the Galois Field to their integer logarithms. The entry for 0 is
+   * meaningless because there is no log of 0.
+   *
+   * <p>This array is shorts, not bytes, so that they can be used directly to index arrays without
+   * casting. The values (except the non-value at index 0) are all really bytes, so they range from
+   * 0 to 255.
+   *
+   * <p>This table was generated by java_tables.py, and the unit tests check it against the Java
+   * implementation.
+   */
+  public static final short[] LOG_TABLE =
+      new short[] {
+        -1, 0, 1, 25, 2, 50, 26, 198,
+        3, 223, 51, 238, 27, 104, 199, 75,
+        4, 100, 224, 14, 52, 141, 239, 129,
+        28, 193, 105, 248, 200, 8, 76, 113,
+        5, 138, 101, 47, 225, 36, 15, 33,
+        53, 147, 142, 218, 240, 18, 130, 69,
+        29, 181, 194, 125, 106, 39, 249, 185,
+        201, 154, 9, 120, 77, 228, 114, 166,
+        6, 191, 139, 98, 102, 221, 48, 253,
+        226, 152, 37, 179, 16, 145, 34, 136,
+        54, 208, 148, 206, 143, 150, 219, 189,
+        241, 210, 19, 92, 131, 56, 70, 64,
+        30, 66, 182, 163, 195, 72, 126, 110,
+        107, 58, 40, 84, 250, 133, 186, 61,
+        202, 94, 155, 159, 10, 21, 121, 43,
+        78, 212, 229, 172, 115, 243, 167, 87,
+        7, 112, 192, 247, 140, 128, 99, 13,
+        103, 74, 222, 237, 49, 197, 254, 24,
+        227, 165, 153, 119, 38, 184, 180, 124,
+        17, 68, 146, 217, 35, 32, 137, 46,
+        55, 63, 209, 91, 149, 188, 207, 205,
+        144, 135, 151, 178, 220, 252, 190, 97,
+        242, 86, 211, 171, 20, 42, 93, 158,
+        132, 60, 57, 83, 71, 109, 65, 162,
+        31, 45, 67, 216, 183, 123, 164, 118,
+        196, 23, 73, 236, 127, 12, 111, 246,
+        108, 161, 59, 82, 41, 157, 85, 170,
+        251, 96, 134, 177, 187, 204, 62, 90,
+        203, 89, 95, 176, 156, 169, 160, 81,
+        11, 245, 22, 235, 122, 117, 44, 215,
+        79, 174, 213, 233, 230, 231, 173, 232,
+        116, 214, 244, 234, 168, 80, 88, 175
+      };
+
+  /**
+   * Inverse of the logarithm table. Maps integer logarithms to members of the field. There is no
+   * entry for 255 because the highest log is 254.
+   *
+   * <p>This table was generated by java_tables.py
+   */
+  static final byte[] EXP_TABLE =
+      new byte[] {
+        1,
+        2,
+        4,
+        8,
+        16,
+        32,
+        64,
+        -128,
+        29,
+        58,
+        116,
+        -24,
+        -51,
+        -121,
+        19,
+        38,
+        76,
+        -104,
+        45,
+        90,
+        -76,
+        117,
+        -22,
+        -55,
+        -113,
+        3,
+        6,
+        12,
+        24,
+        48,
+        96,
+        -64,
+        -99,
+        39,
+        78,
+        -100,
+        37,
+        74,
+        -108,
+        53,
+        106,
+        -44,
+        -75,
+        119,
+        -18,
+        -63,
+        -97,
+        35,
+        70,
+        -116,
+        5,
+        10,
+        20,
+        40,
+        80,
+        -96,
+        93,
+        -70,
+        105,
+        -46,
+        -71,
+        111,
+        -34,
+        -95,
+        95,
+        -66,
+        97,
+        -62,
+        -103,
+        47,
+        94,
+        -68,
+        101,
+        -54,
+        -119,
+        15,
+        30,
+        60,
+        120,
+        -16,
+        -3,
+        -25,
+        -45,
+        -69,
+        107,
+        -42,
+        -79,
+        127,
+        -2,
+        -31,
+        -33,
+        -93,
+        91,
+        -74,
+        113,
+        -30,
+        -39,
+        -81,
+        67,
+        -122,
+        17,
+        34,
+        68,
+        -120,
+        13,
+        26,
+        52,
+        104,
+        -48,
+        -67,
+        103,
+        -50,
+        -127,
+        31,
+        62,
+        124,
+        -8,
+        -19,
+        -57,
+        -109,
+        59,
+        118,
+        -20,
+        -59,
+        -105,
+        51,
+        102,
+        -52,
+        -123,
+        23,
+        46,
+        92,
+        -72,
+        109,
+        -38,
+        -87,
+        79,
+        -98,
+        33,
+        66,
+        -124,
+        21,
+        42,
+        84,
+        -88,
+        77,
+        -102,
+        41,
+        82,
+        -92,
+        85,
+        -86,
+        73,
+        -110,
+        57,
+        114,
+        -28,
+        -43,
+        -73,
+        115,
+        -26,
+        -47,
+        -65,
+        99,
+        -58,
+        -111,
+        63,
+        126,
+        -4,
+        -27,
+        -41,
+        -77,
+        123,
+        -10,
+        -15,
+        -1,
+        -29,
+        -37,
+        -85,
+        75,
+        -106,
+        49,
+        98,
+        -60,
+        -107,
+        55,
+        110,
+        -36,
+        -91,
+        87,
+        -82,
+        65,
+        -126,
+        25,
+        50,
+        100,
+        -56,
+        -115,
+        7,
+        14,
+        28,
+        56,
+        112,
+        -32,
+        -35,
+        -89,
+        83,
+        -90,
+        81,
+        -94,
+        89,
+        -78,
+        121,
+        -14,
+        -7,
+        -17,
+        -61,
+        -101,
+        43,
+        86,
+        -84,
+        69,
+        -118,
+        9,
+        18,
+        36,
+        72,
+        -112,
+        61,
+        122,
+        -12,
+        -11,
+        -9,
+        -13,
+        -5,
+        -21,
+        -53,
+        -117,
+        11,
+        22,
+        44,
+        88,
+        -80,
+        125,
+        -6,
+        -23,
+        -49,
+        -125,
+        27,
+        54,
+        108,
+        -40,
+        -83,
+        71,
+        -114,
+        // Repeat the table a second time, so multiply()
+        // does not have to check bounds.
+        1,
+        2,
+        4,
+        8,
+        16,
+        32,
+        64,
+        -128,
+        29,
+        58,
+        116,
+        -24,
+        -51,
+        -121,
+        19,
+        38,
+        76,
+        -104,
+        45,
+        90,
+        -76,
+        117,
+        -22,
+        -55,
+        -113,
+        3,
+        6,
+        12,
+        24,
+        48,
+        96,
+        -64,
+        -99,
+        39,
+        78,
+        -100,
+        37,
+        74,
+        -108,
+        53,
+        106,
+        -44,
+        -75,
+        119,
+        -18,
+        -63,
+        -97,
+        35,
+        70,
+        -116,
+        5,
+        10,
+        20,
+        40,
+        80,
+        -96,
+        93,
+        -70,
+        105,
+        -46,
+        -71,
+        111,
+        -34,
+        -95,
+        95,
+        -66,
+        97,
+        -62,
+        -103,
+        47,
+        94,
+        -68,
+        101,
+        -54,
+        -119,
+        15,
+        30,
+        60,
+        120,
+        -16,
+        -3,
+        -25,
+        -45,
+        -69,
+        107,
+        -42,
+        -79,
+        127,
+        -2,
+        -31,
+        -33,
+        -93,
+        91,
+        -74,
+        113,
+        -30,
+        -39,
+        -81,
+        67,
+        -122,
+        17,
+        34,
+        68,
+        -120,
+        13,
+        26,
+        52,
+        104,
+        -48,
+        -67,
+        103,
+        -50,
+        -127,
+        31,
+        62,
+        124,
+        -8,
+        -19,
+        -57,
+        -109,
+        59,
+        118,
+        -20,
+        -59,
+        -105,
+        51,
+        102,
+        -52,
+        -123,
+        23,
+        46,
+        92,
+        -72,
+        109,
+        -38,
+        -87,
+        79,
+        -98,
+        33,
+        66,
+        -124,
+        21,
+        42,
+        84,
+        -88,
+        77,
+        -102,
+        41,
+        82,
+        -92,
+        85,
+        -86,
+        73,
+        -110,
+        57,
+        114,
+        -28,
+        -43,
+        -73,
+        115,
+        -26,
+        -47,
+        -65,
+        99,
+        -58,
+        -111,
+        63,
+        126,
+        -4,
+        -27,
+        -41,
+        -77,
+        123,
+        -10,
+        -15,
+        -1,
+        -29,
+        -37,
+        -85,
+        75,
+        -106,
+        49,
+        98,
+        -60,
+        -107,
+        55,
+        110,
+        -36,
+        -91,
+        87,
+        -82,
+        65,
+        -126,
+        25,
+        50,
+        100,
+        -56,
+        -115,
+        7,
+        14,
+        28,
+        56,
+        112,
+        -32,
+        -35,
+        -89,
+        83,
+        -90,
+        81,
+        -94,
+        89,
+        -78,
+        121,
+        -14,
+        -7,
+        -17,
+        -61,
+        -101,
+        43,
+        86,
+        -84,
+        69,
+        -118,
+        9,
+        18,
+        36,
+        72,
+        -112,
+        61,
+        122,
+        -12,
+        -11,
+        -9,
+        -13,
+        -5,
+        -21,
+        -53,
+        -117,
+        11,
+        22,
+        44,
+        88,
+        -80,
+        125,
+        -6,
+        -23,
+        -49,
+        -125,
+        27,
+        54,
+        108,
+        -40,
+        -83,
+        71,
+        -114
+      };
+
+  /**
+   * Adds two elements of the field. If you're in an inner loop, you should inline this function:
+   * it's just XOR.
+   */
+  public static byte add(byte a, byte b) {
+    return (byte) (a ^ b);
+  }
+
+  /**
+   * Inverse of addition. If you're in an inner loop, you should inline this function: it's just
+   * XOR.
+   */
+  public static byte subtract(byte a, byte b) {
+    return (byte) (a ^ b);
+  }
+
+  /** Multiplies to elements of the field. */
+  public static byte multiply(byte a, byte b) {
+    if (a == 0 || b == 0) {
+      return 0;
+    } else {
+      int logA = LOG_TABLE[a & 0xFF];
+      int logB = LOG_TABLE[b & 0xFF];
+      int logResult = logA + logB;
+      return EXP_TABLE[logResult];
+    }
+  }
+
+  /** Inverse of multiplication. */
+  public static byte divide(byte a, byte b) {
+    if (a == 0) {
+      return 0;
+    }
+    if (b == 0) {
+      throw new IllegalArgumentException("Argument 'divisor' is 0");
+    }
+    int logA = LOG_TABLE[a & 0xFF];
+    int logB = LOG_TABLE[b & 0xFF];
+    int logResult = logA - logB;
+    if (logResult < 0) {
+      logResult += 255;
+    }
+    return EXP_TABLE[logResult];
+  }
+
+  /**
+   * Computes a**n.
+   *
+   * <p>The result will be the same as multiplying a times itself n times.
+   *
+   * @param a A member of the field.
+   * @param n A plain-old integer.
+   * @return The result of multiplying a by itself n times.
+   */
+  public static byte exp(byte a, int n) {
+    if (n == 0) {
+      return 1;
+    } else if (a == 0) {
+      return 0;
+    } else {
+      int logA = LOG_TABLE[a & 0xFF];
+      int logResult = logA * n;
+      while (255 <= logResult) {
+        logResult -= 255;
+      }
+      return EXP_TABLE[logResult];
+    }
+  }
+
+  /** Generates a logarithm table given a starting polynomial. */
+  public static short[] generateLogTable(int polynomial) {
+    short[] result = new short[FIELD_SIZE];
+    for (int i = 0; i < FIELD_SIZE; i++) {
+      result[i] = -1; // -1 means "not set"
+    }
+    int b = 1;
+    for (int log = 0; log < FIELD_SIZE - 1; log++) {
+      if (result[b] != -1) {
+        throw new RuntimeException("BUG: duplicate logarithm (bad polynomial?)");
+      }
+      result[b] = (short) log;
+      b = (b << 1);
+      if (FIELD_SIZE <= b) {
+        b = ((b - FIELD_SIZE) ^ polynomial);
+      }
+    }
+    return result;
+  }
+
+  /** Generates the inverse log table. */
+  public static byte[] generateExpTable(short[] logTable) {
+    final byte[] result = new byte[FIELD_SIZE * 2 - 2];
+    for (int i = 1; i < FIELD_SIZE; i++) {
+      int log = logTable[i];
+      result[log] = (byte) i;
+      result[log + FIELD_SIZE - 1] = (byte) i;
+    }
+    return result;
+  }
+
+  /**
+   * Returns a list of all polynomials that can be used to generate the field.
+   *
+   * <p>This is never used in the code; it's just here for completeness.
+   */
+  public static Integer[] allPossiblePolynomials() {
+    List<Integer> result = new ArrayList<Integer>();
+    for (int i = 0; i < FIELD_SIZE; i++) {
+      try {
+        generateLogTable(i);
+        result.add(i);
+      } catch (RuntimeException e) {
+        // this one didn't work
+      }
+    }
+    return result.toArray(new Integer[result.size()]);
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/encode/rs/Matrix.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/encode/rs/Matrix.java
new file mode 100644
index 0000000..df345b1
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/encode/rs/Matrix.java
@@ -0,0 +1,312 @@
+/**
+ * Matrix Algebra over an 8-bit Galois Field
+ *
+ * <p>Copyright 2015, Backblaze, Inc.
+ */
+package org.apache.iotdb.cluster.utils.encode.rs;
+
+/**
+ * A matrix over the 8-bit Galois field.
+ *
+ * <p>This class is not performance-critical, so the implementations are simple and straightforward.
+ */
+public class Matrix {
+
+  /** The number of rows in the matrix. */
+  private final int rows;
+
+  /** The number of columns in the matrix. */
+  private final int columns;
+
+  /**
+   * The data in the matrix, in row major form.
+   *
+   * <p>To get element (r, c): data[r][c]
+   *
+   * <p>Because this this is computer science, and not math, the indices for both the row and column
+   * start at 0.
+   */
+  private final byte[][] data;
+
+  /**
+   * Initialize a matrix of zeros.
+   *
+   * @param initRows The number of rows in the matrix.
+   * @param initColumns The number of columns in the matrix.
+   */
+  public Matrix(int initRows, int initColumns) {
+    rows = initRows;
+    columns = initColumns;
+    data = new byte[rows][];
+    for (int r = 0; r < rows; r++) {
+      data[r] = new byte[columns];
+    }
+  }
+
+  /** Initializes a matrix with the given row-major data. */
+  public Matrix(byte[][] initData) {
+    rows = initData.length;
+    columns = initData[0].length;
+    data = new byte[rows][];
+    for (int r = 0; r < rows; r++) {
+      if (initData[r].length != columns) {
+        throw new IllegalArgumentException("Not all rows have the same number of columns");
+      }
+      data[r] = new byte[columns];
+      for (int c = 0; c < columns; c++) {
+        data[r][c] = initData[r][c];
+      }
+    }
+  }
+
+  /** Returns an identity matrix of the given size. */
+  public static Matrix identity(int size) {
+    Matrix result = new Matrix(size, size);
+    for (int i = 0; i < size; i++) {
+      result.set(i, i, (byte) 1);
+    }
+    return result;
+  }
+
+  /**
+   * Returns a human-readable string of the matrix contents.
+   *
+   * <p>Example: [[1, 2], [3, 4]]
+   */
+  @Override
+  public String toString() {
+    StringBuilder result = new StringBuilder();
+    result.append('[');
+    for (int r = 0; r < rows; r++) {
+      if (r != 0) {
+        result.append(", ");
+      }
+      result.append('[');
+      for (int c = 0; c < columns; c++) {
+        if (c != 0) {
+          result.append(", ");
+        }
+        result.append(data[r][c] & 0xFF);
+      }
+      result.append(']');
+    }
+    result.append(']');
+    return result.toString();
+  }
+
+  /**
+   * Returns a human-readable string of the matrix contents.
+   *
+   * <p>Example: 00 01 02 03 04 05 06 07 08 09 0a 0b
+   */
+  public String toBigString() {
+    StringBuilder result = new StringBuilder();
+    for (int r = 0; r < rows; r++) {
+      for (int c = 0; c < columns; c++) {
+        int value = get(r, c);
+        if (value < 0) {
+          value += 256;
+        }
+        result.append(String.format("%02x ", value));
+      }
+      result.append("\n");
+    }
+    return result.toString();
+  }
+
+  /** Returns the number of columns in this matrix. */
+  public int getColumns() {
+    return columns;
+  }
+
+  /** Returns the number of rows in this matrix. */
+  public int getRows() {
+    return rows;
+  }
+
+  /** Returns the value at row r, column c. */
+  public byte get(int r, int c) {
+    if (r < 0 || rows <= r) {
+      throw new IllegalArgumentException("Row index out of range: " + r);
+    }
+    if (c < 0 || columns <= c) {
+      throw new IllegalArgumentException("Column index out of range: " + c);
+    }
+    return data[r][c];
+  }
+
+  /** Sets the value at row r, column c. */
+  public void set(int r, int c, byte value) {
+    if (r < 0 || rows <= r) {
+      throw new IllegalArgumentException("Row index out of range: " + r);
+    }
+    if (c < 0 || columns <= c) {
+      throw new IllegalArgumentException("Column index out of range: " + c);
+    }
+    data[r][c] = value;
+  }
+
+  /** Returns true iff this matrix is identical to the other. */
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof Matrix)) {
+      return false;
+    }
+    for (int r = 0; r < rows; r++) {
+      if (!data[r].equals(((Matrix) other).data[r])) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /** Multiplies this matrix (the one on the left) by another matrix (the one on the right). */
+  public Matrix times(Matrix right) {
+    if (getColumns() != right.getRows()) {
+      throw new IllegalArgumentException(
+          "Columns on left ("
+              + getColumns()
+              + ") "
+              + "is different than rows on right ("
+              + right.getRows()
+              + ")");
+    }
+    Matrix result = new Matrix(getRows(), right.getColumns());
+    for (int r = 0; r < getRows(); r++) {
+      for (int c = 0; c < right.getColumns(); c++) {
+        byte value = 0;
+        for (int i = 0; i < getColumns(); i++) {
+          value ^= Galois.multiply(get(r, i), right.get(i, c));
+        }
+        result.set(r, c, value);
+      }
+    }
+    return result;
+  }
+
+  /** Returns the concatenation of this matrix and the matrix on the right. */
+  public Matrix augment(Matrix right) {
+    if (rows != right.rows) {
+      throw new IllegalArgumentException("Matrices don't have the same number of rows");
+    }
+    Matrix result = new Matrix(rows, columns + right.columns);
+    for (int r = 0; r < rows; r++) {
+      for (int c = 0; c < columns; c++) {
+        result.data[r][c] = data[r][c];
+      }
+      for (int c = 0; c < right.columns; c++) {
+        result.data[r][columns + c] = right.data[r][c];
+      }
+    }
+    return result;
+  }
+
+  /** Returns a part of this matrix. */
+  public Matrix submatrix(int rmin, int cmin, int rmax, int cmax) {
+    Matrix result = new Matrix(rmax - rmin, cmax - cmin);
+    for (int r = rmin; r < rmax; r++) {
+      for (int c = cmin; c < cmax; c++) {
+        result.data[r - rmin][c - cmin] = data[r][c];
+      }
+    }
+    return result;
+  }
+
+  /** Returns one row of the matrix as a byte array. */
+  public byte[] getRow(int row) {
+    byte[] result = new byte[columns];
+    for (int c = 0; c < columns; c++) {
+      result[c] = get(row, c);
+    }
+    return result;
+  }
+
+  /** Exchanges two rows in the matrix. */
+  public void swapRows(int r1, int r2) {
+    if (r1 < 0 || rows <= r1 || r2 < 0 || rows <= r2) {
+      throw new IllegalArgumentException("Row index out of range");
+    }
+    byte[] tmp = data[r1];
+    data[r1] = data[r2];
+    data[r2] = tmp;
+  }
+
+  /**
+   * Returns the inverse of this matrix.
+   *
+   * @throws IllegalArgumentException when the matrix is singular and doesn't have an inverse.
+   */
+  public Matrix invert() {
+    // Sanity check.
+    if (rows != columns) {
+      throw new IllegalArgumentException("Only square matrices can be inverted");
+    }
+
+    // Create a working matrix by augmenting this one with
+    // an identity matrix on the right.
+    Matrix work = augment(identity(rows));
+
+    // Do Gaussian elimination to transform the left half into
+    // an identity matrix.
+    work.gaussianElimination();
+
+    // The right half is now the inverse.
+    return work.submatrix(0, rows, columns, columns * 2);
+  }
+
+  /**
+   * Does the work of matrix inversion.
+   *
+   * <p>Assumes that this is an r by 2r matrix.
+   */
+  private void gaussianElimination() {
+    // Clear out the part below the main diagonal and scale the main
+    // diagonal to be 1.
+    for (int r = 0; r < rows; r++) {
+      // If the element on the diagonal is 0, find a row below
+      // that has a non-zero and swap them.
+      if (data[r][r] == (byte) 0) {
+        for (int rowBelow = r + 1; rowBelow < rows; rowBelow++) {
+          if (data[rowBelow][r] != 0) {
+            swapRows(r, rowBelow);
+            break;
+          }
+        }
+      }
+      // If we couldn't find one, the matrix is singular.
+      if (data[r][r] == (byte) 0) {
+        throw new IllegalArgumentException("Matrix is singular");
+      }
+      // Scale to 1.
+      if (data[r][r] != (byte) 1) {
+        byte scale = Galois.divide((byte) 1, data[r][r]);
+        for (int c = 0; c < columns; c++) {
+          data[r][c] = Galois.multiply(data[r][c], scale);
+        }
+      }
+      // Make everything below the 1 be a 0 by subtracting
+      // a multiple of it.  (Subtraction and addition are
+      // both exclusive or in the Galois field.)
+      for (int rowBelow = r + 1; rowBelow < rows; rowBelow++) {
+        if (data[rowBelow][r] != (byte) 0) {
+          byte scale = data[rowBelow][r];
+          for (int c = 0; c < columns; c++) {
+            data[rowBelow][c] ^= Galois.multiply(scale, data[r][c]);
+          }
+        }
+      }
+    }
+
+    // Now clear the part above the main diagonal.
+    for (int d = 0; d < rows; d++) {
+      for (int rowAbove = 0; rowAbove < d; rowAbove++) {
+        if (data[rowAbove][d] != (byte) 0) {
+          byte scale = data[rowAbove][d];
+          for (int c = 0; c < columns; c++) {
+            data[rowAbove][c] ^= Galois.multiply(scale, data[d][c]);
+          }
+        }
+      }
+    }
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/encode/rs/ReedSolomon.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/encode/rs/ReedSolomon.java
new file mode 100644
index 0000000..e312f3c
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/encode/rs/ReedSolomon.java
@@ -0,0 +1,359 @@
+/**
+ * Reed-Solomon Coding over 8-bit values.
+ *
+ * <p>Copyright 2015, Backblaze, Inc.
+ */
+package org.apache.iotdb.cluster.utils.encode.rs;
+
+/** Reed-Solomon Coding over 8-bit values. */
+public class ReedSolomon {
+
+  private final int dataShardCount;
+  private final int parityShardCount;
+  private final int totalShardCount;
+  private final Matrix matrix;
+
+  /**
+   * Rows from the matrix for encoding parity, each one as its own byte array to allow for efficient
+   * access while encoding.
+   */
+  private final byte[][] parityRows;
+
+  /** Initializes a new encoder/decoder. */
+  public ReedSolomon(int dataShardCount, int parityShardCount) {
+    this.dataShardCount = dataShardCount;
+    this.parityShardCount = parityShardCount;
+    this.totalShardCount = dataShardCount + parityShardCount;
+    matrix = buildMatrix(dataShardCount, this.totalShardCount);
+    parityRows = new byte[parityShardCount][];
+    for (int i = 0; i < parityShardCount; i++) {
+      parityRows[i] = matrix.getRow(dataShardCount + i);
+    }
+  }
+
+  /** Returns the number of data shards. */
+  public int getDataShardCount() {
+    return dataShardCount;
+  }
+
+  /** Returns the number of parity shards. */
+  public int getParityShardCount() {
+    return parityShardCount;
+  }
+
+  /** Returns the total number of shards. */
+  public int getTotalShardCount() {
+    return totalShardCount;
+  }
+
+  /**
+   * Encodes parity for a set of data shards.
+   *
+   * @param shards An array containing data shards followed by parity shards. Each shard is a byte
+   *     array, and they must all be the same size.
+   * @param offset The index of the first byte in each shard to encode.
+   * @param byteCount The number of bytes to encode in each shard.
+   */
+  public void encodeParity(byte[][] shards, int offset, int byteCount) {
+    // Check arguments.
+    checkBuffersAndSizes(shards, offset, byteCount);
+
+    // Build the array of output buffers.
+    byte[][] outputs = new byte[parityShardCount][];
+    for (int i = 0; i < parityShardCount; i++) {
+      outputs[i] = shards[dataShardCount + i];
+    }
+
+    // Do the coding.
+    codeSomeShards(parityRows, shards, outputs, parityShardCount, offset, byteCount);
+  }
+
+  /**
+   * Returns true if the parity shards contain the right data.
+   *
+   * @param shards An array containing data shards followed by parity shards. Each shard is a byte
+   *     array, and they must all be the same size.
+   * @param firstByte The index of the first byte in each shard to check.
+   * @param byteCount The number of bytes to check in each shard.
+   */
+  public boolean isParityCorrect(byte[][] shards, int firstByte, int byteCount) {
+    // Check arguments.
+    checkBuffersAndSizes(shards, firstByte, byteCount);
+
+    // Build the array of buffers being checked.
+    byte[][] toCheck = new byte[parityShardCount][];
+    for (int i = 0; i < parityShardCount; i++) {
+      toCheck[i] = shards[dataShardCount + i];
+    }
+
+    // Do the checking.
+    return checkSomeShards(parityRows, shards, toCheck, parityShardCount, firstByte, byteCount);
+  }
+
+  /**
+   * Given a list of shards, some of which contain data, fills in the ones that don't have data.
+   *
+   * <p>Quickly does nothing if all of the shards are present.
+   *
+   * <p>If any shards are missing (based on the flags in shardsPresent), the data in those shards is
+   * recomputed and filled in.
+   */
+  public void decodeMissing(
+      byte[][] shards, boolean[] shardPresent, final int offset, final int byteCount) {
+    // Check arguments.
+    checkBuffersAndSizes(shards, offset, byteCount);
+
+    // Quick check: are all of the shards present?  If so, there's
+    // nothing to do.
+    int numberPresent = 0;
+    for (int i = 0; i < totalShardCount; i++) {
+      if (shardPresent[i]) {
+        numberPresent += 1;
+      }
+    }
+    if (numberPresent == totalShardCount) {
+      // Cool.  All of the shards data data.  We don't
+      // need to do anything.
+      return;
+    }
+
+    // More complete sanity check
+    if (numberPresent < dataShardCount) {
+      throw new IllegalArgumentException("Not enough shards present");
+    }
+
+    // Pull out the rows of the matrix that correspond to the
+    // shards that we have and build a square matrix.  This
+    // matrix could be used to generate the shards that we have
+    // from the original data.
+    //
+    // Also, pull out an array holding just the shards that
+    // correspond to the rows of the submatrix.  These shards
+    // will be the input to the decoding process that re-creates
+    // the missing data shards.
+    Matrix subMatrix = new Matrix(dataShardCount, dataShardCount);
+    byte[][] subShards = new byte[dataShardCount][];
+    {
+      int subMatrixRow = 0;
+      for (int matrixRow = 0;
+          matrixRow < totalShardCount && subMatrixRow < dataShardCount;
+          matrixRow++) {
+        if (shardPresent[matrixRow]) {
+          for (int c = 0; c < dataShardCount; c++) {
+            subMatrix.set(subMatrixRow, c, matrix.get(matrixRow, c));
+          }
+          subShards[subMatrixRow] = shards[matrixRow];
+          subMatrixRow += 1;
+        }
+      }
+    }
+
+    // Invert the matrix, so we can go from the encoded shards
+    // back to the original data.  Then pull out the row that
+    // generates the shard that we want to decode.  Note that
+    // since this matrix maps back to the orginal data, it can
+    // be used to create a data shard, but not a parity shard.
+    Matrix dataDecodeMatrix = subMatrix.invert();
+
+    // Re-create any data shards that were missing.
+    //
+    // The input to the coding is all of the shards we actually
+    // have, and the output is the missing data shards.  The computation
+    // is done using the special decode matrix we just built.
+    byte[][] outputs = new byte[parityShardCount][];
+    byte[][] matrixRows = new byte[parityShardCount][];
+    int outputCount = 0;
+    for (int iShard = 0; iShard < dataShardCount; iShard++) {
+      if (!shardPresent[iShard]) {
+        outputs[outputCount] = shards[iShard];
+        matrixRows[outputCount] = dataDecodeMatrix.getRow(iShard);
+        outputCount += 1;
+      }
+    }
+    codeSomeShards(matrixRows, subShards, outputs, outputCount, offset, byteCount);
+
+    // Now that we have all of the data shards intact, we can
+    // compute any of the parity that is missing.
+    //
+    // The input to the coding is ALL of the data shards, including
+    // any that we just calculated.  The output is whichever of the
+    // data shards were missing.
+    outputCount = 0;
+    for (int iShard = dataShardCount; iShard < totalShardCount; iShard++) {
+      if (!shardPresent[iShard]) {
+        outputs[outputCount] = shards[iShard];
+        matrixRows[outputCount] = parityRows[iShard - dataShardCount];
+        outputCount += 1;
+      }
+    }
+    codeSomeShards(matrixRows, shards, outputs, outputCount, offset, byteCount);
+  }
+
+  /** Checks the consistency of arguments passed to public methods. */
+  private void checkBuffersAndSizes(byte[][] shards, int offset, int byteCount) {
+    // The number of buffers should be equal to the number of
+    // data shards plus the number of parity shards.
+    if (shards.length != totalShardCount) {
+      throw new IllegalArgumentException("wrong number of shards: " + shards.length);
+    }
+
+    // All of the shard buffers should be the same length.
+    int shardLength = shards[0].length;
+    for (int i = 1; i < shards.length; i++) {
+      if (shards[i].length != shardLength) {
+        throw new IllegalArgumentException("Shards are different sizes");
+      }
+    }
+
+    // The offset and byteCount must be non-negative and fit in the buffers.
+    if (offset < 0) {
+      throw new IllegalArgumentException("offset is negative: " + offset);
+    }
+    if (byteCount < 0) {
+      throw new IllegalArgumentException("byteCount is negative: " + byteCount);
+    }
+    if (shardLength < offset + byteCount) {
+      throw new IllegalArgumentException("buffers to small: " + byteCount + offset);
+    }
+  }
+
+  /**
+   * Multiplies a subset of rows from a coding matrix by a full set of input shards to produce some
+   * output shards.
+   *
+   * @param matrixRows The rows from the matrix to use.
+   * @param inputs An array of byte arrays, each of which is one input shard. The inputs array may
+   *     have extra buffers after the ones that are used. They will be ignored. The number of inputs
+   *     used is determined by the length of the each matrix row.
+   * @param outputs Byte arrays where the computed shards are stored. The outputs array may also
+   *     have extra, unused, elements at the end. The number of outputs computed, and the number of
+   *     matrix rows used, is determined by outputCount.
+   * @param outputCount The number of outputs to compute.
+   * @param offset The index in the inputs and output of the first byte to process.
+   * @param byteCount The number of bytes to process.
+   */
+  private void codeSomeShards(
+      final byte[][] matrixRows,
+      final byte[][] inputs,
+      final byte[][] outputs,
+      final int outputCount,
+      final int offset,
+      final int byteCount) {
+
+    // This is the inner loop.  It needs to be fast.  Be careful
+    // if you change it.
+    //
+    // Note that dataShardCount is final in the class, so the
+    // compiler can load it just once, before the loop.  Explicitly
+    // adding a local variable does not make it faster.
+    //
+    // I have tried inlining Galois.multiply(), but it doesn't
+    // make things any faster.  The JIT compiler is known to inline
+    // methods, so it's probably already doing so.
+    //
+    // This method has been timed and compared with a C implementation.
+    // This Java version is only about 10% slower than C.
+
+    for (int iByte = offset; iByte < offset + byteCount; iByte++) {
+      for (int iRow = 0; iRow < outputCount; iRow++) {
+        byte[] matrixRow = matrixRows[iRow];
+        int value = 0;
+        for (int c = 0; c < dataShardCount; c++) {
+          value ^= Galois.multiply(matrixRow[c], inputs[c][iByte]);
+        }
+        outputs[iRow][iByte] = (byte) value;
+      }
+    }
+  }
+
+  /**
+   * Multiplies a subset of rows from a coding matrix by a full set of input shards to produce some
+   * output shards, and checks that the the data is those shards matches what's expected.
+   *
+   * @param matrixRows The rows from the matrix to use.
+   * @param inputs An array of byte arrays, each of which is one input shard. The inputs array may
+   *     have extra buffers after the ones that are used. They will be ignored. The number of inputs
+   *     used is determined by the length of the each matrix row.
+   * @param toCheck Byte arrays where the computed shards are stored. The outputs array may also
+   *     have extra, unused, elements at the end. The number of outputs computed, and the number of
+   *     matrix rows used, is determined by outputCount.
+   * @param checkCount The number of outputs to compute.
+   * @param offset The index in the inputs and output of the first byte to process.
+   * @param byteCount The number of bytes to process.
+   */
+  private boolean checkSomeShards(
+      final byte[][] matrixRows,
+      final byte[][] inputs,
+      final byte[][] toCheck,
+      final int checkCount,
+      final int offset,
+      final int byteCount) {
+
+    // This is the inner loop.  It needs to be fast.  Be careful
+    // if you change it.
+    //
+    // Note that dataShardCount is final in the class, so the
+    // compiler can load it just once, before the loop.  Explicitly
+    // adding a local variable does not make it faster.
+    //
+    // I have tried inlining Galois.multiply(), but it doesn't
+    // make things any faster.  The JIT compiler is known to inline
+    // methods, so it's probably already doing so.
+    //
+    // This method has been timed and compared with a C implementation.
+    // This Java version is only about 10% slower than C.
+
+    for (int iByte = offset; iByte < offset + byteCount; iByte++) {
+      for (int iRow = 0; iRow < checkCount; iRow++) {
+        byte[] matrixRow = matrixRows[iRow];
+        int value = 0;
+        for (int c = 0; c < dataShardCount; c++) {
+          value ^= Galois.multiply(matrixRow[c], inputs[c][iByte]);
+        }
+        if (toCheck[iRow][iByte] != (byte) value) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Create the matrix to use for encoding, given the number of data shards and the number of total
+   * shards.
+   *
+   * <p>The top square of the matrix is guaranteed to be an identity matrix, which means that the
+   * data shards are unchanged after encoding.
+   */
+  private static Matrix buildMatrix(int dataShards, int totalShards) {
+    // Start with a Vandermonde matrix.  This matrix would work,
+    // in theory, but doesn't have the property that the data
+    // shards are unchanged after encoding.
+    Matrix vandermonde = vandermonde(totalShards, dataShards);
+
+    // Multiple by the inverse of the top square of the matrix.
+    // This will make the top square be the identity matrix, but
+    // preserve the property that any square subset of rows  is
+    // invertible.
+    Matrix top = vandermonde.submatrix(0, 0, dataShards, dataShards);
+    return vandermonde.times(top.invert());
+  }
+
+  /**
+   * Create a Vandermonde matrix, which is guaranteed to have the property that any subset of rows
+   * that forms a square matrix is invertible.
+   *
+   * @param rows Number of rows in the result.
+   * @param cols Number of columns in the result.
+   * @return A Matrix.
+   */
+  private static Matrix vandermonde(int rows, int cols) {
+    Matrix result = new Matrix(rows, cols);
+    for (int r = 0; r < rows; r++) {
+      for (int c = 0; c < cols; c++) {
+        result.set(r, c, Galois.exp((byte) r, c));
+      }
+    }
+    return result;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/encode/rs/SampleDecoder.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/encode/rs/SampleDecoder.java
new file mode 100644
index 0000000..87a1ead
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/encode/rs/SampleDecoder.java
@@ -0,0 +1,98 @@
+/**
+ * Command-line program that decodes a file using Reed-Solomon 4+2.
+ *
+ * <p>Copyright 2015, Backblaze, Inc. All rights reserved.
+ */
+package org.apache.iotdb.cluster.utils.encode.rs;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * Command-line program that decodes a file using Reed-Solomon 4+2.
+ *
+ * <p>The file name given should be the name of the file to decode, say "foo.txt". This program will
+ * expected to find "foo.txt.0" through "foo.txt.5", with at most two missing. It will then write
+ * "foo.txt.decoded".
+ */
+public class SampleDecoder {
+
+  public static final int DATA_SHARDS = 4;
+  public static final int PARITY_SHARDS = 2;
+  public static final int TOTAL_SHARDS = 6;
+
+  public static final int BYTES_IN_INT = 4;
+
+  public static void main(String[] arguments) throws IOException {
+
+    // Parse the command line
+    if (arguments.length != 1) {
+      System.out.println("Usage: SampleDecoder <fileName>");
+      return;
+    }
+    final File originalFile = new File(arguments[0]);
+    if (!originalFile.exists()) {
+      System.out.println("Cannot read input file: " + originalFile);
+      return;
+    }
+
+    // Read in any of the shards that are present.
+    // (There should be checking here to make sure the input
+    // shards are the same size, but there isn't.)
+    final byte[][] shards = new byte[TOTAL_SHARDS][];
+    final boolean[] shardPresent = new boolean[TOTAL_SHARDS];
+    int shardSize = 0;
+    int shardCount = 0;
+    for (int i = 0; i < TOTAL_SHARDS; i++) {
+      File shardFile = new File(originalFile.getParentFile(), originalFile.getName() + "." + i);
+      if (shardFile.exists()) {
+        shardSize = (int) shardFile.length();
+        shards[i] = new byte[shardSize];
+        shardPresent[i] = true;
+        shardCount += 1;
+        InputStream in = new FileInputStream(shardFile);
+        in.read(shards[i], 0, shardSize);
+        in.close();
+        System.out.println("Read " + shardFile);
+      }
+    }
+
+    // We need at least DATA_SHARDS to be able to reconstruct the file.
+    if (shardCount < DATA_SHARDS) {
+      System.out.println("Not enough shards present");
+      return;
+    }
+
+    // Make empty buffers for the missing shards.
+    for (int i = 0; i < TOTAL_SHARDS; i++) {
+      if (!shardPresent[i]) {
+        shards[i] = new byte[shardSize];
+      }
+    }
+
+    // Use Reed-Solomon to fill in the missing shards
+    ReedSolomon reedSolomon = new ReedSolomon(DATA_SHARDS, PARITY_SHARDS);
+    reedSolomon.decodeMissing(shards, shardPresent, 0, shardSize);
+
+    // Combine the data shards into one buffer for convenience.
+    // (This is not efficient, but it is convenient.)
+    byte[] allBytes = new byte[shardSize * DATA_SHARDS];
+    for (int i = 0; i < DATA_SHARDS; i++) {
+      System.arraycopy(shards[i], 0, allBytes, shardSize * i, shardSize);
+    }
+
+    // Extract the file length
+    int fileSize = ByteBuffer.wrap(allBytes).getInt();
+
+    // Write the decoded file
+    File decodedFile = new File(originalFile.getParentFile(), originalFile.getName() + ".decoded");
+    OutputStream out = new FileOutputStream(decodedFile);
+    out.write(allBytes, BYTES_IN_INT, fileSize);
+    System.out.println("Wrote " + decodedFile);
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/encode/rs/SampleEncoder.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/encode/rs/SampleEncoder.java
new file mode 100644
index 0000000..1f98707
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/encode/rs/SampleEncoder.java
@@ -0,0 +1,91 @@
+/**
+ * Command-line program encodes one file using Reed-Solomon 4+2.
+ *
+ * <p>Copyright 2015, Backblaze, Inc.
+ */
+package org.apache.iotdb.cluster.utils.encode.rs;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * Command-line program encodes one file using Reed-Solomon 4+2.
+ *
+ * <p>The one argument should be a file name, say "foo.txt". This program will create six files in
+ * the same directory, breaking the input file into four data shards, and two parity shards. The
+ * output files are called "foo.txt.0", "foo.txt.1", ..., and "foo.txt.5". Numbers 4 and 5 are the
+ * parity shards.
+ *
+ * <p>The data stored is the file size (four byte int), followed by the contents of the file, and
+ * then padded to a multiple of four bytes with zeros. The padding is because all four data shards
+ * must be the same size.
+ */
+public class SampleEncoder {
+
+  public static final int DATA_SHARDS = 4;
+  public static final int PARITY_SHARDS = 2;
+  public static final int TOTAL_SHARDS = 6;
+
+  public static final int BYTES_IN_INT = 4;
+
+  public static void main(String[] arguments) throws IOException {
+
+    // Parse the command line
+    if (arguments.length != 1) {
+      System.out.println("Usage: SampleEncoder <fileName>");
+      return;
+    }
+    final File inputFile = new File(arguments[0]);
+    if (!inputFile.exists()) {
+      System.out.println("Cannot read input file: " + inputFile);
+      return;
+    }
+
+    // Get the size of the input file.  (Files bigger that
+    // Integer.MAX_VALUE will fail here!)
+    final int fileSize = (int) inputFile.length();
+
+    // Figure out how big each shard will be.  The total size stored
+    // will be the file size (8 bytes) plus the file.
+    final int storedSize = fileSize + BYTES_IN_INT;
+    final int shardSize = (storedSize + DATA_SHARDS - 1) / DATA_SHARDS;
+
+    // Create a buffer holding the file size, followed by
+    // the contents of the file.
+    final int bufferSize = shardSize * DATA_SHARDS;
+    final byte[] allBytes = new byte[bufferSize];
+    ByteBuffer.wrap(allBytes).putInt(fileSize);
+    InputStream in = new FileInputStream(inputFile);
+    int bytesRead = in.read(allBytes, BYTES_IN_INT, fileSize);
+    if (bytesRead != fileSize) {
+      throw new IOException("not enough bytes read");
+    }
+    in.close();
+
+    // Make the buffers to hold the shards.
+    byte[][] shards = new byte[TOTAL_SHARDS][shardSize];
+
+    // Fill in the data shards
+    for (int i = 0; i < DATA_SHARDS; i++) {
+      System.arraycopy(allBytes, i * shardSize, shards[i], 0, shardSize);
+    }
+
+    // Use Reed-Solomon to calculate the parity.
+    ReedSolomon reedSolomon = new ReedSolomon(DATA_SHARDS, PARITY_SHARDS);
+    reedSolomon.encodeParity(shards, 0, shardSize);
+
+    // Write out the resulting files.
+    for (int i = 0; i < TOTAL_SHARDS; i++) {
+      File outputFile = new File(inputFile.getParentFile(), inputFile.getName() + "." + i);
+      OutputStream out = new FileOutputStream(outputFile);
+      out.write(shards[i]);
+      out.close();
+      System.out.println("wrote " + outputFile);
+    }
+  }
+}