You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2021/12/20 01:47:27 UTC
[iotdb] 01/01: add CRaft
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch expr_plus
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b5ac3f7667fc60669f133f4a46cc1c84369e5b7f
Author: jt <jt...@163.com>
AuthorDate: Mon Dec 20 09:46:28 2021 +0800
add CRaft
---
cluster/distribute-ecs.sh | 10 +
.../org/apache/iotdb/cluster/expr/ExprBench.java | 4 +-
.../org/apache/iotdb/cluster/expr/ExprMember.java | 5 +-
.../org/apache/iotdb/cluster/expr/ExprServer.java | 5 +-
.../apache/iotdb/cluster/expr/VotingLogList.java | 8 +-
.../iotdb/cluster/log/FragmentedLogDispatcher.java | 110 ++++
.../java/org/apache/iotdb/cluster/log/Log.java | 21 +-
.../apache/iotdb/cluster/log/LogDispatcher.java | 32 +-
.../org/apache/iotdb/cluster/log/LogParser.java | 6 +
.../org/apache/iotdb/cluster/log/VotingLog.java | 11 +-
.../iotdb/cluster/log/applier/MetaLogApplier.java | 3 +-
.../iotdb/cluster/log/logtypes/FragmentedLog.java | 140 ++++
.../iotdb/cluster/log/manage/RaftLogManager.java | 16 +-
.../log/sequencing/AsynchronousSequencer.java | 9 +-
.../handlers/caller/AppendNodeEntryHandler.java | 22 +-
.../server/heartbeat/MetaHeartbeatServer.java | 5 +-
.../iotdb/cluster/server/member/RaftMember.java | 135 ++--
.../apache/iotdb/cluster/server/monitor/Timer.java | 8 +
.../iotdb/cluster/utils/encode/rs/Galois.java | 722 +++++++++++++++++++++
.../iotdb/cluster/utils/encode/rs/Matrix.java | 312 +++++++++
.../iotdb/cluster/utils/encode/rs/ReedSolomon.java | 359 ++++++++++
.../cluster/utils/encode/rs/SampleDecoder.java | 98 +++
.../cluster/utils/encode/rs/SampleEncoder.java | 91 +++
23 files changed, 2041 insertions(+), 91 deletions(-)
diff --git a/cluster/distribute-ecs.sh b/cluster/distribute-ecs.sh
new file mode 100644
index 0000000..84874f2
--- /dev/null
+++ b/cluster/distribute-ecs.sh
@@ -0,0 +1,10 @@
+src_lib_path=/e/codestore/incubator-iotdb2/cluster/target/iotdb-cluster-0.13.0-SNAPSHOT/lib/iotdb*
+
+ips=(ecs1 ecs2 ecs3 ecs4 ecs5)
+target_lib_path=/root/iotdb_expr/lib
+
+for ip in ${ips[*]}
+ do
+ ssh root@$ip "mkdir $target_lib_path"
+ scp -r $src_lib_path root@$ip:$target_lib_path
+ done
\ No newline at end of file
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
index d0891a3..8115e44 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
@@ -43,6 +43,7 @@ public class ExprBench {
private long maxLatency = 0;
private int threadNum = 64;
private int workloadSize = 64 * 1024;
+ private int printInterval = 1000;
private SyncClientPool clientPool;
private Node target;
private int maxRequestNum;
@@ -84,7 +85,7 @@ public class ExprBench {
e.printStackTrace();
}
- if (currRequsetNum % 1000 == 0) {
+ if (currRequsetNum % printInterval == 0) {
long elapsedTime = System.currentTimeMillis() - startTime;
System.out.println(
String.format(
@@ -118,6 +119,7 @@ public class ExprBench {
bench.maxRequestNum = Integer.parseInt(args[2]);
bench.threadNum = Integer.parseInt(args[3]);
bench.workloadSize = Integer.parseInt(args[4]) * 1024;
+ bench.printInterval = Integer.parseInt(args[5]);
bench.benchmark();
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java
index 04e6b97..bcc26ce 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java
@@ -133,7 +133,10 @@ public class ExprMember extends MetaGroupMember {
latch.await();
return StatusUtils.OK;
}
- return processNonPartitionedMetaPlan(plan);
+ long startTime = Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY.getOperationStartTime();
+ TSStatus tsStatus = processNonPartitionedMetaPlan(plan);
+ Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY.calOperationCostTimeFromStart(startTime);
+ return tsStatus;
} catch (Exception e) {
logger.error("Exception in processing plan", e);
return StatusUtils.INTERNAL_ERROR.deepCopy().setMessage(e.getMessage());
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprServer.java
index 6aac585..ab2468e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprServer.java
@@ -61,8 +61,7 @@ public class ExprServer extends MetaClusterServer {
@Override
protected TServerTransport getServerSocket() throws TTransportException {
- return new TServerSocket(
- new InetSocketAddress(thisNode.getInternalIp(), thisNode.getMetaPort()));
+ return new TServerSocket(new InetSocketAddress("0.0.0.0", thisNode.getMetaPort()));
}
public static void main(String[] args)
@@ -82,6 +81,7 @@ public class ExprServer extends MetaClusterServer {
boolean enableCommitReturn = Boolean.parseBoolean(args[7]);
int maxBatchSize = Integer.parseInt(args[8]);
int defaultLogBufferSize = Integer.parseInt(args[9]);
+ boolean useCRaft = Boolean.parseBoolean(args[10]);
ClusterDescriptor.getInstance().getConfig().setSeedNodeUrls(Arrays.asList(allNodeStr));
ClusterDescriptor.getInstance().getConfig().setInternalMetaPort(port);
@@ -98,6 +98,7 @@ public class ExprServer extends MetaClusterServer {
ExprMember.ENABLE_WEAK_ACCEPTANCE = enableWeakAcceptance;
ExprMember.ENABLE_COMMIT_RETURN = enableCommitReturn;
Log.DEFAULT_BUFFER_SIZE = defaultLogBufferSize * 1024 + 512;
+ RaftMember.USE_CRAFT = useCRaft;
ExprServer server = new ExprServer();
server.start();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java
index 2723ac8..a1567d0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java
@@ -79,7 +79,7 @@ public class VotingLogList {
if (votingLog.getStronglyAcceptedNodeIds().size()
+ votingLog.getWeaklyAcceptedNodeIds().size()
>= quorumSize) {
- votingLog.acceptedTime = System.nanoTime();
+ votingLog.acceptedTime.set(System.nanoTime());
}
} else if (votingLog.getLog().getCurrLogIndex() > index) {
break;
@@ -93,9 +93,9 @@ public class VotingLogList {
if (lastEntryIndexToCommit != -1) {
for (VotingLog acceptedLog : acceptedLogs) {
- synchronized (acceptedLog) {
- acceptedLog.acceptedTime = System.nanoTime();
- acceptedLog.notifyAll();
+ synchronized (acceptedLog.getStronglyAcceptedNodeIds()) {
+ acceptedLog.acceptedTime.set(System.nanoTime());
+ acceptedLog.getStronglyAcceptedNodeIds().notifyAll();
}
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java
new file mode 100644
index 0000000..f8954ef
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.log;
+
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.log.logtypes.FragmentedLog;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.server.member.RaftMember;
+import org.apache.iotdb.cluster.server.monitor.Timer;
+import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class FragmentedLogDispatcher extends LogDispatcher {
+
+ private static final Logger logger = LoggerFactory.getLogger(FragmentedLogDispatcher.class);
+
+ public FragmentedLogDispatcher(RaftMember member) {
+ super(member);
+ }
+
+ public void offer(SendLogRequest request) {
+ // do serialization here to avoid taking LogManager for too long
+
+ long startTime = Statistic.LOG_DISPATCHER_LOG_ENQUEUE.getOperationStartTime();
+ request.getVotingLog().getLog().setEnqueueTime(System.nanoTime());
+ for (int i = 0; i < nodesLogQueues.size(); i++) {
+ BlockingQueue<SendLogRequest> nodeLogQueue = nodesLogQueues.get(i);
+ SendLogRequest fragmentedRequest = new SendLogRequest(request);
+ fragmentedRequest.setVotingLog(new VotingLog(request.getVotingLog()));
+ fragmentedRequest
+ .getVotingLog()
+ .setLog(new FragmentedLog((FragmentedLog) request.getVotingLog().getLog(), i));
+ try {
+ boolean addSucceeded;
+ if (ClusterDescriptor.getInstance().getConfig().isWaitForSlowNode()) {
+ addSucceeded =
+ nodeLogQueue.offer(
+ fragmentedRequest,
+ ClusterDescriptor.getInstance().getConfig().getWriteOperationTimeoutMS(),
+ TimeUnit.MILLISECONDS);
+ } else {
+ addSucceeded = nodeLogQueue.add(fragmentedRequest);
+ }
+
+ if (!addSucceeded) {
+ logger.debug(
+ "Log queue[{}] of {} is full, ignore the request to this node", i, member.getName());
+ } else {
+ request.setEnqueueTime(System.nanoTime());
+ }
+ } catch (IllegalStateException e) {
+ logger.debug(
+ "Log queue[{}] of {} is full, ignore the request to this node", i, member.getName());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ Statistic.LOG_DISPATCHER_LOG_ENQUEUE.calOperationCostTimeFromStart(startTime);
+
+ if (Timer.ENABLE_INSTRUMENTING) {
+ Statistic.LOG_DISPATCHER_FROM_CREATE_TO_ENQUEUE.calOperationCostTimeFromStart(
+ request.getVotingLog().getLog().getCreateTime());
+ }
+ }
+
+ LogDispatcher.DispatcherThread newDispatcherThread(
+ Node node, BlockingQueue<SendLogRequest> logBlockingQueue) {
+ return new DispatcherThread(node, logBlockingQueue);
+ }
+
+ class DispatcherThread extends LogDispatcher.DispatcherThread {
+
+ DispatcherThread(Node receiver, BlockingQueue<SendLogRequest> logBlockingDeque) {
+ super(receiver, logBlockingDeque);
+ }
+
+ @Override
+ protected void serializeEntries() {
+ for (SendLogRequest request : currBatch) {
+ Timer.Statistic.LOG_DISPATCHER_LOG_IN_QUEUE.calOperationCostTimeFromStart(
+ request.getVotingLog().getLog().getEnqueueTime());
+ long start = Statistic.RAFT_SENDER_SERIALIZE_LOG.getOperationStartTime();
+ request.getAppendEntryRequest().entry = request.getVotingLog().getLog().serialize();
+ Statistic.RAFT_SENDER_SERIALIZE_LOG.calOperationCostTimeFromStart(start);
+ }
+ }
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
index 4845bba..01d8541 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
@@ -43,8 +43,10 @@ public abstract class Log implements Comparable<Log> {
@SuppressWarnings("java:S3077")
private volatile Exception exception;
+ private long receiveTime;
private long createTime;
private long enqueueTime;
+ private long sequenceStartTime;
private int byteSize = 0;
@@ -63,7 +65,8 @@ public abstract class Log implements Comparable<Log> {
CLOSE_FILE,
REMOVE_NODE,
EMPTY_CONTENT,
- TEST_LARGE_CONTENT
+ TEST_LARGE_CONTENT,
+ FRAGMENTED
}
public long getCurrLogIndex() {
@@ -147,4 +150,20 @@ public abstract class Log implements Comparable<Log> {
public void setByteSize(int byteSize) {
this.byteSize = byteSize;
}
+
+ public long getReceiveTime() {
+ return receiveTime;
+ }
+
+ public void setReceiveTime(long receiveTime) {
+ this.receiveTime = receiveTime;
+ }
+
+ public long getSequenceStartTime() {
+ return sequenceStartTime;
+ }
+
+ public void setSequenceStartTime(long sequenceStartTime) {
+ this.sequenceStartTime = sequenceStartTime;
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index beaaa77..17b853c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -51,6 +51,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -187,6 +188,15 @@ public class LogDispatcher {
this.setQuorumSize(quorumSize);
}
+ public SendLogRequest(SendLogRequest request) {
+ this.setVotingLog(request.votingLog);
+ this.setLeaderShipStale(request.leaderShipStale);
+ this.setNewLeaderTerm(request.newLeaderTerm);
+ this.setAppendEntryRequest(request.appendEntryRequest);
+ this.setQuorumSize(request.quorumSize);
+ this.setEnqueueTime(request.enqueueTime);
+ }
+
public VotingLog getVotingLog() {
return votingLog;
}
@@ -245,7 +255,7 @@ public class LogDispatcher {
Node receiver;
private BlockingQueue<SendLogRequest> logBlockingDeque;
- private List<SendLogRequest> currBatch = new ArrayList<>();
+ protected List<SendLogRequest> currBatch = new ArrayList<>();
private Peer peer;
Client client;
@@ -275,13 +285,7 @@ public class LogDispatcher {
logger.debug("Sending {} logs to {}", currBatch.size(), receiver);
}
Statistic.LOG_DISPATCHER_LOG_BATCH_SIZE.add(currBatch.size());
- for (SendLogRequest request : currBatch) {
- Timer.Statistic.LOG_DISPATCHER_LOG_IN_QUEUE.calOperationCostTimeFromStart(
- request.getVotingLog().getLog().getEnqueueTime());
- long start = Statistic.RAFT_SENDER_SERIALIZE_LOG.getOperationStartTime();
- request.getAppendEntryRequest().entry = request.serializedLogFuture.get();
- Statistic.RAFT_SENDER_SERIALIZE_LOG.calOperationCostTimeFromStart(start);
- }
+ serializeEntries();
sendBatchLogs(currBatch);
currBatch.clear();
}
@@ -293,6 +297,16 @@ public class LogDispatcher {
logger.info("Dispatcher exits");
}
+ protected void serializeEntries() throws ExecutionException, InterruptedException {
+ for (SendLogRequest request : currBatch) {
+ Timer.Statistic.LOG_DISPATCHER_LOG_IN_QUEUE.calOperationCostTimeFromStart(
+ request.getVotingLog().getLog().getEnqueueTime());
+ long start = Statistic.RAFT_SENDER_SERIALIZE_LOG.getOperationStartTime();
+ request.getAppendEntryRequest().entry = request.serializedLogFuture.get();
+ Statistic.RAFT_SENDER_SERIALIZE_LOG.calOperationCostTimeFromStart(start);
+ }
+ }
+
private void appendEntriesAsync(
List<ByteBuffer> logList, AppendEntriesRequest request, List<SendLogRequest> currBatch)
throws TException {
@@ -424,7 +438,7 @@ public class LogDispatcher {
peer,
logRequest.quorumSize);
// TODO add async interface
- int retries = 5;
+ int retries = 50;
try {
long operationStartTime = Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
for (int i = 0; i < retries; i++) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java
index e35943b..26f8a42 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.cluster.log.Log.Types;
import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
import org.apache.iotdb.cluster.log.logtypes.EmptyContentLog;
+import org.apache.iotdb.cluster.log.logtypes.FragmentedLog;
import org.apache.iotdb.cluster.log.logtypes.LargeTestLog;
import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
@@ -101,6 +102,11 @@ public class LogParser {
largeLog.deserialize(buffer);
log = largeLog;
break;
+ case FRAGMENTED:
+ FragmentedLog fragmentedLog = new FragmentedLog();
+ fragmentedLog.deserialize(buffer);
+ log = fragmentedLog;
+ break;
default:
throw new IllegalArgumentException(type.toString());
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
index d56f3f0..46b5c02 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
@@ -22,18 +22,27 @@ package org.apache.iotdb.cluster.log;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
public class VotingLog {
protected Log log;
protected Set<Integer> stronglyAcceptedNodeIds;
protected Set<Integer> weaklyAcceptedNodeIds;
- public long acceptedTime;
+ public AtomicLong acceptedTime;
public volatile ByteBuffer serializedCache;
public VotingLog(Log log, int groupSize) {
this.log = log;
stronglyAcceptedNodeIds = new HashSet<>(groupSize);
weaklyAcceptedNodeIds = new HashSet<>(groupSize);
+ acceptedTime = new AtomicLong();
+ }
+
+ public VotingLog(VotingLog another) {
+ this.log = another.log;
+ this.stronglyAcceptedNodeIds = another.stronglyAcceptedNodeIds;
+ this.weaklyAcceptedNodeIds = another.weaklyAcceptedNodeIds;
+ this.acceptedTime = another.acceptedTime;
}
public Log getLog() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
index 1c501f8..af3163c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.cluster.exception.ChangeMembershipException;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
import org.apache.iotdb.cluster.log.logtypes.EmptyContentLog;
+import org.apache.iotdb.cluster.log.logtypes.FragmentedLog;
import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
import org.apache.iotdb.cluster.server.NodeCharacter;
@@ -52,7 +53,7 @@ public class MetaLogApplier extends BaseApplier {
applyPhysicalPlan(((PhysicalPlanLog) log).getPlan(), null);
} else if (log instanceof RemoveNodeLog) {
applyRemoveNodeLog((RemoveNodeLog) log);
- } else if (log instanceof EmptyContentLog) {
+ } else if (log instanceof EmptyContentLog || log instanceof FragmentedLog) {
// Do nothing
} else {
logger.error("Unsupported log: {} {}", log.getClass().getName(), log);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/FragmentedLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/FragmentedLog.java
new file mode 100644
index 0000000..7186098
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/FragmentedLog.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.log.logtypes;
+
+import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.utils.encode.rs.ReedSolomon;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import static org.apache.iotdb.cluster.log.Log.Types.FRAGMENTED;
+
+public class FragmentedLog extends Log {
+
+ private byte[][] logFragments;
+ private int logLength;
+ private boolean[] fragmentPresent;
+ private int shardLength;
+ private int dataShardNum;
+ private int parityShardNum;
+
+ public FragmentedLog() {}
+
+ public FragmentedLog(Log base, int nodeNum) {
+
+ ByteBuffer baseBuffer = base.serialize();
+ logLength = baseBuffer.limit();
+ int followerNum = nodeNum - 1;
+ parityShardNum = nodeNum / 2 - 1;
+ dataShardNum = followerNum - parityShardNum;
+ shardLength = logLength / dataShardNum + 1;
+
+ logFragments = new byte[parityShardNum + dataShardNum][];
+ fragmentPresent = new boolean[parityShardNum + dataShardNum];
+ for (int i = 0; i < parityShardNum + dataShardNum; i++) {
+ logFragments[i] = new byte[shardLength];
+ fragmentPresent[i] = true;
+ }
+
+ int start = 0;
+ int end = Math.min(start + shardLength, logLength);
+ for (int i = 0; i < dataShardNum; i++) {
+ System.arraycopy(baseBuffer.array(), start, logFragments[i], 0, end - start);
+ start += shardLength;
+ end = Math.min(start + shardLength, logLength);
+ }
+
+ ReedSolomon reedSolomon = new ReedSolomon(dataShardNum, parityShardNum);
+ reedSolomon.encodeParity(logFragments, 0, shardLength);
+ }
+
+ public FragmentedLog(FragmentedLog parent, int fragmentIndex) {
+ setCurrLogIndex(parent.getCurrLogIndex());
+ setCurrLogTerm(parent.getCurrLogTerm());
+ setCreateTime(parent.getCreateTime());
+ setEnqueueTime(parent.getEnqueueTime());
+ setReceiveTime(parent.getReceiveTime());
+
+ this.logFragments = parent.logFragments;
+ this.fragmentPresent = new boolean[logFragments.length];
+ fragmentPresent[fragmentIndex] = true;
+ this.logLength = parent.logLength;
+ this.dataShardNum = parent.dataShardNum;
+ this.parityShardNum = parent.parityShardNum;
+ this.shardLength = parent.shardLength;
+ }
+
+ @Override
+ public ByteBuffer serialize() {
+ PublicBAOS byteArrayOutputStream = new PublicBAOS(DEFAULT_BUFFER_SIZE);
+ try (DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
+ dataOutputStream.writeByte((byte) FRAGMENTED.ordinal());
+
+ dataOutputStream.writeLong(getCurrLogIndex());
+ dataOutputStream.writeLong(getCurrLogTerm());
+
+ dataOutputStream.writeInt(logLength);
+ dataOutputStream.writeInt(dataShardNum);
+ dataOutputStream.writeInt(parityShardNum);
+ dataOutputStream.writeInt(shardLength);
+
+ for (int i = 0, fragmentPresentLength = fragmentPresent.length;
+ i < fragmentPresentLength;
+ i++) {
+ boolean present = fragmentPresent[i];
+ dataOutputStream.writeBoolean(present);
+ if (present) {
+ dataOutputStream.write(logFragments[i]);
+ }
+ }
+ } catch (IOException e) {
+ // unreachable
+ }
+
+ return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
+ }
+
+ @Override
+ public void deserialize(ByteBuffer buffer) {
+ setCurrLogIndex(buffer.getLong());
+ setCurrLogTerm(buffer.getLong());
+
+ logLength = buffer.getInt();
+ dataShardNum = buffer.getInt();
+ parityShardNum = buffer.getInt();
+ shardLength = buffer.getInt();
+
+ logFragments = new byte[dataShardNum + parityShardNum][shardLength];
+ fragmentPresent = new boolean[dataShardNum + parityShardNum];
+
+ for (int i = 0, fragmentPresentLength = fragmentPresent.length;
+ i < fragmentPresentLength;
+ i++) {
+ boolean present = buffer.get() == 1;
+ fragmentPresent[i] = present;
+ if (present) {
+ buffer.get(logFragments[i], 0, shardLength);
+ }
+ }
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index 4b445df..46ee69c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -65,7 +65,7 @@ public abstract class RaftLogManager {
/** manage committed entries in disk for safety */
private StableEntryManager stableEntryManager;
- private long commitIndex;
+ private volatile long commitIndex;
/**
* The committed logs whose index is smaller than this are all have been applied, for example,
@@ -693,7 +693,7 @@ public abstract class RaftLogManager {
long unappliedLogSize = commitLogIndex - maxHaveAppliedCommitIndex;
if (unappliedLogSize > ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem()) {
- logger.debug(
+ logger.info(
"There are too many unapplied logs [{}], wait for a while to avoid memory overflow",
unappliedLogSize);
try {
@@ -995,7 +995,7 @@ public abstract class RaftLogManager {
|| nextToCheckIndex > getCommittedEntryManager().getLastIndex()
|| (blockAppliedCommitIndex > 0 && blockAppliedCommitIndex < nextToCheckIndex)) {
// avoid spinning
- Thread.sleep(5);
+ Thread.sleep(0);
return;
}
Log log = getCommittedEntryManager().getEntry(nextToCheckIndex);
@@ -1007,10 +1007,12 @@ public abstract class RaftLogManager {
nextToCheckIndex);
return;
}
- synchronized (log) {
- while (!log.isApplied() && maxHaveAppliedCommitIndex < log.getCurrLogIndex()) {
- // wait until the log is applied or a newer snapshot is installed
- log.wait(5);
+ if (!log.isApplied() && maxHaveAppliedCommitIndex < log.getCurrLogIndex()) {
+ synchronized (log) {
+ while (!log.isApplied() && maxHaveAppliedCommitIndex < log.getCurrLogIndex()) {
+ // wait until the log is applied or a newer snapshot is installed
+ log.wait(1);
+ }
}
}
synchronized (changeApplyCommitIndexCond) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
index 0def4d0..4390def 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
@@ -41,6 +41,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import static org.apache.iotdb.cluster.server.monitor.Timer.Statistic.RAFT_SENDER_SEQUENCE_LOG;
+
public class AsynchronousSequencer implements LogSequencer {
private static final Logger logger = LoggerFactory.getLogger(AsynchronousSequencer.class);
@@ -56,7 +58,7 @@ public class AsynchronousSequencer implements LogSequencer {
public AsynchronousSequencer(RaftMember member, RaftLogManager logManager) {
this.member = member;
this.logManager = logManager;
- unsequencedLogQueue = new ArrayBlockingQueue<>(4096);
+ unsequencedLogQueue = new ArrayBlockingQueue<>(40960, true);
for (int i = 0; i < SEQUENCER_PARALLELISM; i++) {
SEQUENCER_POOL.submit(this::sequenceTask);
}
@@ -83,7 +85,9 @@ public class AsynchronousSequencer implements LogSequencer {
long startTime;
synchronized (logManager) {
for (SendLogRequest sendLogRequest : sendLogRequests) {
+ long sequenceStartTime = RAFT_SENDER_SEQUENCE_LOG.getOperationStartTime();
Log log = sendLogRequest.getVotingLog().getLog();
+ log.setSequenceStartTime(sequenceStartTime);
log.setCurrLogTerm(member.getTerm().get());
log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
if (log instanceof PhysicalPlanLog) {
@@ -101,10 +105,13 @@ public class AsynchronousSequencer implements LogSequencer {
sendLogRequest.setAppendEntryRequest(appendEntryRequest);
startTime = Statistic.RAFT_SENDER_OFFER_LOG.getOperationStartTime();
+ Statistic.LOG_DISPATCHER_FROM_RECEIVE_TO_CREATE.calOperationCostTimeFromStart(
+ log.getReceiveTime());
log.setCreateTime(System.nanoTime());
member.getVotingLogList().insert(sendLogRequest.getVotingLog());
member.getLogDispatcher().offer(sendLogRequest);
Statistic.RAFT_SENDER_OFFER_LOG.calOperationCostTimeFromStart(startTime);
+ RAFT_SENDER_SEQUENCE_LOG.calOperationCostTimeFromStart(sequenceStartTime);
}
}
sendLogRequests.clear();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
index b6c0df2..749a3ec 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
@@ -88,13 +88,13 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
long resp = response.status;
if (resp == RESPONSE_STRONG_ACCEPT) {
- synchronized (log) {
+ synchronized (log.getStronglyAcceptedNodeIds()) {
if (log.getWeaklyAcceptedNodeIds().size() + log.getStronglyAcceptedNodeIds().size()
>= quorumSize) {
- log.acceptedTime = System.nanoTime();
+ log.acceptedTime.set(System.nanoTime());
}
log.getStronglyAcceptedNodeIds().add(receiver.nodeIdentifier);
- log.notifyAll();
+ log.getStronglyAcceptedNodeIds().notifyAll();
}
member
.getVotingLogList()
@@ -117,17 +117,17 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
receiverTerm.set(resp);
}
leaderShipStale.set(true);
- synchronized (log) {
- log.notifyAll();
+ synchronized (log.getStronglyAcceptedNodeIds()) {
+ log.getStronglyAcceptedNodeIds().notifyAll();
}
} else if (resp == RESPONSE_WEAK_ACCEPT) {
- synchronized (log) {
+ synchronized (log.getStronglyAcceptedNodeIds()) {
+ log.getWeaklyAcceptedNodeIds().add(receiver.nodeIdentifier);
if (log.getWeaklyAcceptedNodeIds().size() + log.getStronglyAcceptedNodeIds().size()
>= quorumSize) {
- log.acceptedTime = System.nanoTime();
+ log.acceptedTime.set(System.nanoTime());
}
- log.getWeaklyAcceptedNodeIds().add(receiver.nodeIdentifier);
- log.notifyAll();
+ log.getStronglyAcceptedNodeIds().notifyAll();
}
} else {
// e.g., Response.RESPONSE_LOG_MISMATCH
@@ -155,12 +155,12 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
}
private void onFail() {
- synchronized (log) {
+ synchronized (log.getStronglyAcceptedNodeIds()) {
failedDecreasingCounter--;
if (failedDecreasingCounter <= 0) {
// quorum members have failed, there is no need to wait for others
log.getStronglyAcceptedNodeIds().add(Integer.MAX_VALUE);
- log.notifyAll();
+ log.getStronglyAcceptedNodeIds().notifyAll();
}
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatServer.java
index afde282..95e38b1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatServer.java
@@ -68,13 +68,12 @@ public class MetaHeartbeatServer extends HeartbeatServer {
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
return new TNonblockingServerSocket(
new InetSocketAddress(
- config.getInternalIp(),
- config.getInternalMetaPort() + ClusterUtils.META_HEARTBEAT_PORT_OFFSET),
+ "0.0.0.0", config.getInternalMetaPort() + ClusterUtils.META_HEARTBEAT_PORT_OFFSET),
getConnectionTimeoutInMS());
} else {
return new TServerSocket(
new InetSocketAddress(
- metaClusterServer.getMember().getThisNode().getInternalIp(),
+ "0.0.0.0",
metaClusterServer.getMember().getThisNode().getMetaPort()
+ ClusterUtils.META_HEARTBEAT_PORT_OFFSET));
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index cd54aa6..4a24d13 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
import org.apache.iotdb.cluster.expr.VotingLogList;
import org.apache.iotdb.cluster.log.CommitLogCallback;
import org.apache.iotdb.cluster.log.CommitLogTask;
+import org.apache.iotdb.cluster.log.FragmentedLogDispatcher;
import org.apache.iotdb.cluster.log.HardState;
import org.apache.iotdb.cluster.log.IndirectLogDispatcher;
import org.apache.iotdb.cluster.log.Log;
@@ -39,6 +40,7 @@ import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest;
import org.apache.iotdb.cluster.log.LogParser;
import org.apache.iotdb.cluster.log.VotingLog;
import org.apache.iotdb.cluster.log.catchup.CatchUpTask;
+import org.apache.iotdb.cluster.log.logtypes.FragmentedLog;
import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
import org.apache.iotdb.cluster.log.manage.RaftLogManager;
import org.apache.iotdb.cluster.log.sequencing.AsynchronousSequencer.Factory;
@@ -133,6 +135,7 @@ public abstract class RaftMember {
public static boolean USE_INDIRECT_LOG_DISPATCHER = false;
public static boolean ENABLE_WEAK_ACCEPTANCE = true;
public static boolean ENABLE_COMMIT_RETURN = false;
+ public static boolean USE_CRAFT = false;
protected static final LogSequencerFactory SEQUENCER_FACTORY =
ClusterDescriptor.getInstance().getConfig().isUseAsyncSequencing()
@@ -1195,6 +1198,11 @@ public abstract class RaftMember {
((PhysicalPlanLog) log).setPlan(plan);
}
+ if (USE_CRAFT && allNodes.size() > 2) {
+ log = new FragmentedLog(log, allNodes.size());
+ }
+ log.setReceiveTime(System.nanoTime());
+
// just like processPlanLocally,we need to check the size of log
// if (log.serialize().capacity() + Integer.BYTES
// >= ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
@@ -1606,6 +1614,8 @@ public abstract class RaftMember {
if (logDispatcher == null) {
if (USE_INDIRECT_LOG_DISPATCHER) {
logDispatcher = new IndirectLogDispatcher(this);
+ } else if (USE_CRAFT && allNodes.size() > 2) {
+ logDispatcher = new FragmentedLogDispatcher(this);
} else {
logDispatcher = new LogDispatcher(this);
}
@@ -1613,63 +1623,92 @@ public abstract class RaftMember {
return logDispatcher;
}
+ @SuppressWarnings({"java:S2445"}) // safe synchronized
+ private void waitAppendResultLoop(VotingLog log, int quorumSize) {
+ int stronglyAcceptedNodeNum = log.getStronglyAcceptedNodeIds().size();
+ int weaklyAcceptedNodeNum = log.getWeaklyAcceptedNodeIds().size();
+ int totalAccepted = stronglyAcceptedNodeNum + weaklyAcceptedNodeNum;
+ long nextTimeToPrint = 1000;
+
+ long waitStart = System.currentTimeMillis();
+ long alreadyWait = 0;
+ while (stronglyAcceptedNodeNum < quorumSize
+ && (!ENABLE_WEAK_ACCEPTANCE
+ || (totalAccepted < quorumSize)
+ || votingLogList.size() > config.getMaxNumOfLogsInMem())
+ && alreadyWait < RaftServer.getWriteOperationTimeoutMS()
+ && !log.getStronglyAcceptedNodeIds().contains(Integer.MAX_VALUE)) {
+ long singleWaitTime = 0;
+ long singleWaitStart = System.nanoTime();
+ try {
+ synchronized (log.getStronglyAcceptedNodeIds()) {
+ log.getStronglyAcceptedNodeIds().wait(1);
+ }
+ singleWaitTime = (System.nanoTime() - singleWaitStart) / 1000000;
+ logger.debug("{} ends waiting", log);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.warn("Unexpected interruption when sending a log", e);
+ }
+ alreadyWait = System.currentTimeMillis() - waitStart;
+ if (alreadyWait > nextTimeToPrint) {
+ logger.info(
+ "Still not receive enough votes for {}, strongly accepted {}, weakly "
+ + "accepted {}, voting logs {}, wait {}ms, wait to sequence {}ms, wait to enqueue "
+ + "{}ms, wait to accept "
+ + "{}ms",
+ log,
+ log.getStronglyAcceptedNodeIds(),
+ log.getWeaklyAcceptedNodeIds(),
+ votingLogList.size(),
+ singleWaitTime,
+ (log.getLog().getSequenceStartTime() - singleWaitStart) / 1000000,
+ (log.getLog().getEnqueueTime() - singleWaitStart) / 1000000,
+ (log.acceptedTime.get() - singleWaitStart) / 1000000);
+ nextTimeToPrint *= 2;
+ }
+ stronglyAcceptedNodeNum = log.getStronglyAcceptedNodeIds().size();
+ weaklyAcceptedNodeNum = log.getWeaklyAcceptedNodeIds().size();
+ totalAccepted = stronglyAcceptedNodeNum + weaklyAcceptedNodeNum;
+ }
+
+ if (alreadyWait > 15000) {
+ logger.info(
+ "Slow entry {}, strongly accepted {}, weakly " + "accepted {}, waited time {}ms",
+ log,
+ log.getStronglyAcceptedNodeIds(),
+ log.getWeaklyAcceptedNodeIds(),
+ alreadyWait);
+ }
+ }
+
/**
* wait until "voteCounter" counts down to zero, which means the quorum has received the log, or
* one follower tells the node that it is no longer a valid leader, or a timeout is triggered.
*/
- @SuppressWarnings({"java:S2445"}) // safe synchronized
protected AppendLogResult waitAppendResult(
VotingLog log, AtomicBoolean leaderShipStale, AtomicLong newLeaderTerm, int quorumSize) {
// wait for the followers to vote
long startTime = Timer.Statistic.RAFT_SENDER_VOTE_COUNTER.getOperationStartTime();
- long nextTimeToPrint = 15000;
int stronglyAcceptedNodeNum = log.getStronglyAcceptedNodeIds().size();
int weaklyAcceptedNodeNum = log.getWeaklyAcceptedNodeIds().size();
int totalAccepted = stronglyAcceptedNodeNum + weaklyAcceptedNodeNum;
- synchronized (log) {
- long waitStart = System.currentTimeMillis();
- long alreadyWait = 0;
- while (stronglyAcceptedNodeNum < quorumSize
- && (!ENABLE_WEAK_ACCEPTANCE
- || (totalAccepted < allNodes.size() - 1)
- || votingLogList.size() > config.getMaxNumOfLogsInMem())
- && alreadyWait < RaftServer.getWriteOperationTimeoutMS()
- && !log.getStronglyAcceptedNodeIds().contains(Integer.MAX_VALUE)) {
- try {
- log.wait(0);
- logger.debug("{} ends waiting", log);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- logger.warn("Unexpected interruption when sending a log", e);
- }
- alreadyWait = System.currentTimeMillis() - waitStart;
- if (alreadyWait > nextTimeToPrint) {
- logger.info(
- "Still not receive enough votes for {}, strongly accepted {}, weakly "
- + "accepted {}",
- log,
- log.getStronglyAcceptedNodeIds(),
- log.getWeaklyAcceptedNodeIds());
- nextTimeToPrint *= 2;
- }
- stronglyAcceptedNodeNum = log.getStronglyAcceptedNodeIds().size();
- weaklyAcceptedNodeNum = log.getWeaklyAcceptedNodeIds().size();
- totalAccepted = stronglyAcceptedNodeNum + weaklyAcceptedNodeNum;
- }
-
- if (alreadyWait > 15000) {
- logger.info(
- "Slow entry {}, strongly accepted {}, weakly " + "accepted {}, waited time {}ms",
- log,
- log.getStronglyAcceptedNodeIds(),
- log.getWeaklyAcceptedNodeIds(),
- alreadyWait);
- }
+ if (stronglyAcceptedNodeNum < quorumSize
+ && (!ENABLE_WEAK_ACCEPTANCE
+ || (totalAccepted < quorumSize)
+ || votingLogList.size() > config.getMaxNumOfLogsInMem())
+ && !log.getStronglyAcceptedNodeIds().contains(Integer.MAX_VALUE)) {
+ waitAppendResultLoop(log, quorumSize);
}
- if (log.acceptedTime != 0) {
- Statistic.RAFT_WAIT_AFTER_ACCEPTED.calOperationCostTimeFromStart(log.acceptedTime);
+
+ stronglyAcceptedNodeNum = log.getStronglyAcceptedNodeIds().size();
+ weaklyAcceptedNodeNum = log.getWeaklyAcceptedNodeIds().size();
+ totalAccepted = stronglyAcceptedNodeNum + weaklyAcceptedNodeNum;
+
+ if (log.acceptedTime.get() != 0) {
+ Statistic.RAFT_WAIT_AFTER_ACCEPTED.calOperationCostTimeFromStart(log.acceptedTime.get());
}
Timer.Statistic.RAFT_SENDER_VOTE_COUNTER.calOperationCostTimeFromStart(startTime);
@@ -1702,16 +1741,14 @@ public abstract class RaftMember {
if (log.getCurrLogIndex() > logManager.getCommitLogIndex()) {
startTime = Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT.getOperationStartTime();
synchronized (logManager) {
+ Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT.calOperationCostTimeFromStart(
+ startTime);
if (log.getCurrLogIndex() > logManager.getCommitLogIndex()) {
- Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT.calOperationCostTimeFromStart(
- startTime);
-
startTime = Statistic.RAFT_SENDER_COMMIT_LOG_IN_MANAGER.getOperationStartTime();
logManager.commitTo(log.getCurrLogIndex());
Statistic.RAFT_SENDER_COMMIT_LOG_IN_MANAGER.calOperationCostTimeFromStart(startTime);
-
- startTime = Statistic.RAFT_SENDER_EXIT_LOG_MANAGER.getOperationStartTime();
}
+ startTime = Statistic.RAFT_SENDER_EXIT_LOG_MANAGER.getOperationStartTime();
}
Statistic.RAFT_SENDER_EXIT_LOG_MANAGER.calOperationCostTimeFromStart(startTime);
}
@@ -1725,7 +1762,7 @@ public abstract class RaftMember {
while (!log.isApplied()) {
// wait until the log is applied
try {
- log.wait(0);
+ log.wait(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new LogExecutionException(e);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
index c794211..c72550e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
@@ -86,6 +86,8 @@ public class Timer {
true,
META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
// raft member - sender
+ RAFT_SENDER_SEQUENCE_LOG(
+ RAFT_MEMBER_SENDER, "sequence log", TIME_SCALE, true, META_GROUP_MEMBER_EXECUTE_NON_QUERY),
RAFT_SENDER_APPEND_LOG(
RAFT_MEMBER_SENDER,
"locally append log",
@@ -257,6 +259,12 @@ public class Timer {
META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
LOG_DISPATCHER_LOG_BATCH_SIZE(
LOG_DISPATCHER, "batch size", 1, true, META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ LOG_DISPATCHER_FROM_RECEIVE_TO_CREATE(
+ LOG_DISPATCHER,
+ "from receive to create",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
LOG_DISPATCHER_FROM_CREATE_TO_SENT(
LOG_DISPATCHER,
"from create to sent",
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/encode/rs/Galois.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/encode/rs/Galois.java
new file mode 100644
index 0000000..5f0b8ac
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/encode/rs/Galois.java
@@ -0,0 +1,722 @@
+/**
+ * 8-bit Galois Field
+ *
+ * <p>Copyright 2015, Backblaze, Inc. All rights reserved.
+ */
+package org.apache.iotdb.cluster.utils.encode.rs;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * 8-bit Galois Field
+ *
+ * <p>This class implements multiplication, division, addition, subtraction, and exponentiation.
+ *
+ * <p>The multiplication operation is in the inner loop of erasure coding, so it's been optimized.
+ * Having the class be "final" helps a little, and having the EXP_TABLE repeat the data, so there's
+ * no need to bound the sum of two logarithms to 255 helps a lot.
+ */
+public final class Galois {
+
+ /** The number of elements in the field. */
+ public static final int FIELD_SIZE = 256;
+
+ /**
+ * The polynomial used to generate the logarithm table.
+ *
+ * <p>There are a number of polynomials that work to generate a Galois field of 256 elements. The
+ * choice is arbitrary, and we just use the first one.
+ *
+ * <p>The possibilities are: 29, 43, 45, 77, 95, 99, 101, 105, 113, 135, 141, 169, 195, 207, 231,
+ * and 245.
+ */
+ public static final int GENERATING_POLYNOMIAL = 29;
+
+ /**
+ * Mapping from members of the Galois Field to their integer logarithms. The entry for 0 is
+ * meaningless because there is no log of 0.
+ *
+ * <p>This array is shorts, not bytes, so that they can be used directly to index arrays without
+ * casting. The values (except the non-value at index 0) are all really bytes, so they range from
+ * 0 to 255.
+ *
+ * <p>This table was generated by java_tables.py, and the unit tests check it against the Java
+ * implementation.
+ */
+ public static final short[] LOG_TABLE =
+ new short[] {
+ -1, 0, 1, 25, 2, 50, 26, 198,
+ 3, 223, 51, 238, 27, 104, 199, 75,
+ 4, 100, 224, 14, 52, 141, 239, 129,
+ 28, 193, 105, 248, 200, 8, 76, 113,
+ 5, 138, 101, 47, 225, 36, 15, 33,
+ 53, 147, 142, 218, 240, 18, 130, 69,
+ 29, 181, 194, 125, 106, 39, 249, 185,
+ 201, 154, 9, 120, 77, 228, 114, 166,
+ 6, 191, 139, 98, 102, 221, 48, 253,
+ 226, 152, 37, 179, 16, 145, 34, 136,
+ 54, 208, 148, 206, 143, 150, 219, 189,
+ 241, 210, 19, 92, 131, 56, 70, 64,
+ 30, 66, 182, 163, 195, 72, 126, 110,
+ 107, 58, 40, 84, 250, 133, 186, 61,
+ 202, 94, 155, 159, 10, 21, 121, 43,
+ 78, 212, 229, 172, 115, 243, 167, 87,
+ 7, 112, 192, 247, 140, 128, 99, 13,
+ 103, 74, 222, 237, 49, 197, 254, 24,
+ 227, 165, 153, 119, 38, 184, 180, 124,
+ 17, 68, 146, 217, 35, 32, 137, 46,
+ 55, 63, 209, 91, 149, 188, 207, 205,
+ 144, 135, 151, 178, 220, 252, 190, 97,
+ 242, 86, 211, 171, 20, 42, 93, 158,
+ 132, 60, 57, 83, 71, 109, 65, 162,
+ 31, 45, 67, 216, 183, 123, 164, 118,
+ 196, 23, 73, 236, 127, 12, 111, 246,
+ 108, 161, 59, 82, 41, 157, 85, 170,
+ 251, 96, 134, 177, 187, 204, 62, 90,
+ 203, 89, 95, 176, 156, 169, 160, 81,
+ 11, 245, 22, 235, 122, 117, 44, 215,
+ 79, 174, 213, 233, 230, 231, 173, 232,
+ 116, 214, 244, 234, 168, 80, 88, 175
+ };
+
+ /**
+ * Inverse of the logarithm table. Maps integer logarithms to members of the field. There is no
+ * entry for 255 because the highest log is 254.
+ *
+ * <p>This table was generated by java_tables.py
+ */
+ static final byte[] EXP_TABLE =
+ new byte[] {
+ 1,
+ 2,
+ 4,
+ 8,
+ 16,
+ 32,
+ 64,
+ -128,
+ 29,
+ 58,
+ 116,
+ -24,
+ -51,
+ -121,
+ 19,
+ 38,
+ 76,
+ -104,
+ 45,
+ 90,
+ -76,
+ 117,
+ -22,
+ -55,
+ -113,
+ 3,
+ 6,
+ 12,
+ 24,
+ 48,
+ 96,
+ -64,
+ -99,
+ 39,
+ 78,
+ -100,
+ 37,
+ 74,
+ -108,
+ 53,
+ 106,
+ -44,
+ -75,
+ 119,
+ -18,
+ -63,
+ -97,
+ 35,
+ 70,
+ -116,
+ 5,
+ 10,
+ 20,
+ 40,
+ 80,
+ -96,
+ 93,
+ -70,
+ 105,
+ -46,
+ -71,
+ 111,
+ -34,
+ -95,
+ 95,
+ -66,
+ 97,
+ -62,
+ -103,
+ 47,
+ 94,
+ -68,
+ 101,
+ -54,
+ -119,
+ 15,
+ 30,
+ 60,
+ 120,
+ -16,
+ -3,
+ -25,
+ -45,
+ -69,
+ 107,
+ -42,
+ -79,
+ 127,
+ -2,
+ -31,
+ -33,
+ -93,
+ 91,
+ -74,
+ 113,
+ -30,
+ -39,
+ -81,
+ 67,
+ -122,
+ 17,
+ 34,
+ 68,
+ -120,
+ 13,
+ 26,
+ 52,
+ 104,
+ -48,
+ -67,
+ 103,
+ -50,
+ -127,
+ 31,
+ 62,
+ 124,
+ -8,
+ -19,
+ -57,
+ -109,
+ 59,
+ 118,
+ -20,
+ -59,
+ -105,
+ 51,
+ 102,
+ -52,
+ -123,
+ 23,
+ 46,
+ 92,
+ -72,
+ 109,
+ -38,
+ -87,
+ 79,
+ -98,
+ 33,
+ 66,
+ -124,
+ 21,
+ 42,
+ 84,
+ -88,
+ 77,
+ -102,
+ 41,
+ 82,
+ -92,
+ 85,
+ -86,
+ 73,
+ -110,
+ 57,
+ 114,
+ -28,
+ -43,
+ -73,
+ 115,
+ -26,
+ -47,
+ -65,
+ 99,
+ -58,
+ -111,
+ 63,
+ 126,
+ -4,
+ -27,
+ -41,
+ -77,
+ 123,
+ -10,
+ -15,
+ -1,
+ -29,
+ -37,
+ -85,
+ 75,
+ -106,
+ 49,
+ 98,
+ -60,
+ -107,
+ 55,
+ 110,
+ -36,
+ -91,
+ 87,
+ -82,
+ 65,
+ -126,
+ 25,
+ 50,
+ 100,
+ -56,
+ -115,
+ 7,
+ 14,
+ 28,
+ 56,
+ 112,
+ -32,
+ -35,
+ -89,
+ 83,
+ -90,
+ 81,
+ -94,
+ 89,
+ -78,
+ 121,
+ -14,
+ -7,
+ -17,
+ -61,
+ -101,
+ 43,
+ 86,
+ -84,
+ 69,
+ -118,
+ 9,
+ 18,
+ 36,
+ 72,
+ -112,
+ 61,
+ 122,
+ -12,
+ -11,
+ -9,
+ -13,
+ -5,
+ -21,
+ -53,
+ -117,
+ 11,
+ 22,
+ 44,
+ 88,
+ -80,
+ 125,
+ -6,
+ -23,
+ -49,
+ -125,
+ 27,
+ 54,
+ 108,
+ -40,
+ -83,
+ 71,
+ -114,
+ // Repeat the table a second time, so multiply()
+ // does not have to check bounds.
+ 1,
+ 2,
+ 4,
+ 8,
+ 16,
+ 32,
+ 64,
+ -128,
+ 29,
+ 58,
+ 116,
+ -24,
+ -51,
+ -121,
+ 19,
+ 38,
+ 76,
+ -104,
+ 45,
+ 90,
+ -76,
+ 117,
+ -22,
+ -55,
+ -113,
+ 3,
+ 6,
+ 12,
+ 24,
+ 48,
+ 96,
+ -64,
+ -99,
+ 39,
+ 78,
+ -100,
+ 37,
+ 74,
+ -108,
+ 53,
+ 106,
+ -44,
+ -75,
+ 119,
+ -18,
+ -63,
+ -97,
+ 35,
+ 70,
+ -116,
+ 5,
+ 10,
+ 20,
+ 40,
+ 80,
+ -96,
+ 93,
+ -70,
+ 105,
+ -46,
+ -71,
+ 111,
+ -34,
+ -95,
+ 95,
+ -66,
+ 97,
+ -62,
+ -103,
+ 47,
+ 94,
+ -68,
+ 101,
+ -54,
+ -119,
+ 15,
+ 30,
+ 60,
+ 120,
+ -16,
+ -3,
+ -25,
+ -45,
+ -69,
+ 107,
+ -42,
+ -79,
+ 127,
+ -2,
+ -31,
+ -33,
+ -93,
+ 91,
+ -74,
+ 113,
+ -30,
+ -39,
+ -81,
+ 67,
+ -122,
+ 17,
+ 34,
+ 68,
+ -120,
+ 13,
+ 26,
+ 52,
+ 104,
+ -48,
+ -67,
+ 103,
+ -50,
+ -127,
+ 31,
+ 62,
+ 124,
+ -8,
+ -19,
+ -57,
+ -109,
+ 59,
+ 118,
+ -20,
+ -59,
+ -105,
+ 51,
+ 102,
+ -52,
+ -123,
+ 23,
+ 46,
+ 92,
+ -72,
+ 109,
+ -38,
+ -87,
+ 79,
+ -98,
+ 33,
+ 66,
+ -124,
+ 21,
+ 42,
+ 84,
+ -88,
+ 77,
+ -102,
+ 41,
+ 82,
+ -92,
+ 85,
+ -86,
+ 73,
+ -110,
+ 57,
+ 114,
+ -28,
+ -43,
+ -73,
+ 115,
+ -26,
+ -47,
+ -65,
+ 99,
+ -58,
+ -111,
+ 63,
+ 126,
+ -4,
+ -27,
+ -41,
+ -77,
+ 123,
+ -10,
+ -15,
+ -1,
+ -29,
+ -37,
+ -85,
+ 75,
+ -106,
+ 49,
+ 98,
+ -60,
+ -107,
+ 55,
+ 110,
+ -36,
+ -91,
+ 87,
+ -82,
+ 65,
+ -126,
+ 25,
+ 50,
+ 100,
+ -56,
+ -115,
+ 7,
+ 14,
+ 28,
+ 56,
+ 112,
+ -32,
+ -35,
+ -89,
+ 83,
+ -90,
+ 81,
+ -94,
+ 89,
+ -78,
+ 121,
+ -14,
+ -7,
+ -17,
+ -61,
+ -101,
+ 43,
+ 86,
+ -84,
+ 69,
+ -118,
+ 9,
+ 18,
+ 36,
+ 72,
+ -112,
+ 61,
+ 122,
+ -12,
+ -11,
+ -9,
+ -13,
+ -5,
+ -21,
+ -53,
+ -117,
+ 11,
+ 22,
+ 44,
+ 88,
+ -80,
+ 125,
+ -6,
+ -23,
+ -49,
+ -125,
+ 27,
+ 54,
+ 108,
+ -40,
+ -83,
+ 71,
+ -114
+ };
+
+ /**
+ * Adds two elements of the field. If you're in an inner loop, you should inline this function:
+ * it's just XOR.
+ */
+ public static byte add(byte a, byte b) {
+ return (byte) (a ^ b);
+ }
+
+ /**
+ * Inverse of addition. If you're in an inner loop, you should inline this function: it's just
+ * XOR.
+ */
+ public static byte subtract(byte a, byte b) {
+ return (byte) (a ^ b);
+ }
+
+ /** Multiplies to elements of the field. */
+ public static byte multiply(byte a, byte b) {
+ if (a == 0 || b == 0) {
+ return 0;
+ } else {
+ int logA = LOG_TABLE[a & 0xFF];
+ int logB = LOG_TABLE[b & 0xFF];
+ int logResult = logA + logB;
+ return EXP_TABLE[logResult];
+ }
+ }
+
+ /** Inverse of multiplication. */
+ public static byte divide(byte a, byte b) {
+ if (a == 0) {
+ return 0;
+ }
+ if (b == 0) {
+ throw new IllegalArgumentException("Argument 'divisor' is 0");
+ }
+ int logA = LOG_TABLE[a & 0xFF];
+ int logB = LOG_TABLE[b & 0xFF];
+ int logResult = logA - logB;
+ if (logResult < 0) {
+ logResult += 255;
+ }
+ return EXP_TABLE[logResult];
+ }
+
+ /**
+ * Computes a**n.
+ *
+ * <p>The result will be the same as multiplying a times itself n times.
+ *
+ * @param a A member of the field.
+ * @param n A plain-old integer.
+ * @return The result of multiplying a by itself n times.
+ */
+ public static byte exp(byte a, int n) {
+ if (n == 0) {
+ return 1;
+ } else if (a == 0) {
+ return 0;
+ } else {
+ int logA = LOG_TABLE[a & 0xFF];
+ int logResult = logA * n;
+ while (255 <= logResult) {
+ logResult -= 255;
+ }
+ return EXP_TABLE[logResult];
+ }
+ }
+
+ /** Generates a logarithm table given a starting polynomial. */
+ public static short[] generateLogTable(int polynomial) {
+ short[] result = new short[FIELD_SIZE];
+ for (int i = 0; i < FIELD_SIZE; i++) {
+ result[i] = -1; // -1 means "not set"
+ }
+ int b = 1;
+ for (int log = 0; log < FIELD_SIZE - 1; log++) {
+ if (result[b] != -1) {
+ throw new RuntimeException("BUG: duplicate logarithm (bad polynomial?)");
+ }
+ result[b] = (short) log;
+ b = (b << 1);
+ if (FIELD_SIZE <= b) {
+ b = ((b - FIELD_SIZE) ^ polynomial);
+ }
+ }
+ return result;
+ }
+
+ /** Generates the inverse log table. */
+ public static byte[] generateExpTable(short[] logTable) {
+ final byte[] result = new byte[FIELD_SIZE * 2 - 2];
+ for (int i = 1; i < FIELD_SIZE; i++) {
+ int log = logTable[i];
+ result[log] = (byte) i;
+ result[log + FIELD_SIZE - 1] = (byte) i;
+ }
+ return result;
+ }
+
+ /**
+ * Returns a list of all polynomials that can be used to generate the field.
+ *
+ * <p>This is never used in the code; it's just here for completeness.
+ */
+ public static Integer[] allPossiblePolynomials() {
+ List<Integer> result = new ArrayList<Integer>();
+ for (int i = 0; i < FIELD_SIZE; i++) {
+ try {
+ generateLogTable(i);
+ result.add(i);
+ } catch (RuntimeException e) {
+ // this one didn't work
+ }
+ }
+ return result.toArray(new Integer[result.size()]);
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/encode/rs/Matrix.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/encode/rs/Matrix.java
new file mode 100644
index 0000000..df345b1
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/encode/rs/Matrix.java
@@ -0,0 +1,312 @@
+/**
+ * Matrix Algebra over an 8-bit Galois Field
+ *
+ * <p>Copyright 2015, Backblaze, Inc.
+ */
+package org.apache.iotdb.cluster.utils.encode.rs;
+
+/**
+ * A matrix over the 8-bit Galois field.
+ *
+ * <p>This class is not performance-critical, so the implementations are simple and straightforward.
+ */
+public class Matrix {
+
+ /** The number of rows in the matrix. */
+ private final int rows;
+
+ /** The number of columns in the matrix. */
+ private final int columns;
+
+ /**
+ * The data in the matrix, in row major form.
+ *
+ * <p>To get element (r, c): data[r][c]
+ *
+ * <p>Because this this is computer science, and not math, the indices for both the row and column
+ * start at 0.
+ */
+ private final byte[][] data;
+
+ /**
+ * Initialize a matrix of zeros.
+ *
+ * @param initRows The number of rows in the matrix.
+ * @param initColumns The number of columns in the matrix.
+ */
+ public Matrix(int initRows, int initColumns) {
+ rows = initRows;
+ columns = initColumns;
+ data = new byte[rows][];
+ for (int r = 0; r < rows; r++) {
+ data[r] = new byte[columns];
+ }
+ }
+
+ /** Initializes a matrix with the given row-major data. */
+ public Matrix(byte[][] initData) {
+ rows = initData.length;
+ columns = initData[0].length;
+ data = new byte[rows][];
+ for (int r = 0; r < rows; r++) {
+ if (initData[r].length != columns) {
+ throw new IllegalArgumentException("Not all rows have the same number of columns");
+ }
+ data[r] = new byte[columns];
+ for (int c = 0; c < columns; c++) {
+ data[r][c] = initData[r][c];
+ }
+ }
+ }
+
+ /** Returns an identity matrix of the given size. */
+ public static Matrix identity(int size) {
+ Matrix result = new Matrix(size, size);
+ for (int i = 0; i < size; i++) {
+ result.set(i, i, (byte) 1);
+ }
+ return result;
+ }
+
+ /**
+ * Returns a human-readable string of the matrix contents.
+ *
+ * <p>Example: [[1, 2], [3, 4]]
+ */
+ @Override
+ public String toString() {
+ StringBuilder result = new StringBuilder();
+ result.append('[');
+ for (int r = 0; r < rows; r++) {
+ if (r != 0) {
+ result.append(", ");
+ }
+ result.append('[');
+ for (int c = 0; c < columns; c++) {
+ if (c != 0) {
+ result.append(", ");
+ }
+ result.append(data[r][c] & 0xFF);
+ }
+ result.append(']');
+ }
+ result.append(']');
+ return result.toString();
+ }
+
+ /**
+ * Returns a human-readable string of the matrix contents.
+ *
+ * <p>Example: 00 01 02 03 04 05 06 07 08 09 0a 0b
+ */
+ public String toBigString() {
+ StringBuilder result = new StringBuilder();
+ for (int r = 0; r < rows; r++) {
+ for (int c = 0; c < columns; c++) {
+ int value = get(r, c);
+ if (value < 0) {
+ value += 256;
+ }
+ result.append(String.format("%02x ", value));
+ }
+ result.append("\n");
+ }
+ return result.toString();
+ }
+
+ /** Returns the number of columns in this matrix. */
+ public int getColumns() {
+ return columns;
+ }
+
+ /** Returns the number of rows in this matrix. */
+ public int getRows() {
+ return rows;
+ }
+
+ /** Returns the value at row r, column c. */
+ public byte get(int r, int c) {
+ if (r < 0 || rows <= r) {
+ throw new IllegalArgumentException("Row index out of range: " + r);
+ }
+ if (c < 0 || columns <= c) {
+ throw new IllegalArgumentException("Column index out of range: " + c);
+ }
+ return data[r][c];
+ }
+
+ /** Sets the value at row r, column c. */
+ public void set(int r, int c, byte value) {
+ if (r < 0 || rows <= r) {
+ throw new IllegalArgumentException("Row index out of range: " + r);
+ }
+ if (c < 0 || columns <= c) {
+ throw new IllegalArgumentException("Column index out of range: " + c);
+ }
+ data[r][c] = value;
+ }
+
+ /** Returns true iff this matrix is identical to the other. */
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof Matrix)) {
+ return false;
+ }
+ for (int r = 0; r < rows; r++) {
+ if (!data[r].equals(((Matrix) other).data[r])) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /** Multiplies this matrix (the one on the left) by another matrix (the one on the right). */
+ public Matrix times(Matrix right) {
+ if (getColumns() != right.getRows()) {
+ throw new IllegalArgumentException(
+ "Columns on left ("
+ + getColumns()
+ + ") "
+ + "is different than rows on right ("
+ + right.getRows()
+ + ")");
+ }
+ Matrix result = new Matrix(getRows(), right.getColumns());
+ for (int r = 0; r < getRows(); r++) {
+ for (int c = 0; c < right.getColumns(); c++) {
+ byte value = 0;
+ for (int i = 0; i < getColumns(); i++) {
+ value ^= Galois.multiply(get(r, i), right.get(i, c));
+ }
+ result.set(r, c, value);
+ }
+ }
+ return result;
+ }
+
+ /** Returns the concatenation of this matrix and the matrix on the right. */
+ public Matrix augment(Matrix right) {
+ if (rows != right.rows) {
+ throw new IllegalArgumentException("Matrices don't have the same number of rows");
+ }
+ Matrix result = new Matrix(rows, columns + right.columns);
+ for (int r = 0; r < rows; r++) {
+ for (int c = 0; c < columns; c++) {
+ result.data[r][c] = data[r][c];
+ }
+ for (int c = 0; c < right.columns; c++) {
+ result.data[r][columns + c] = right.data[r][c];
+ }
+ }
+ return result;
+ }
+
+ /** Returns a part of this matrix. */
+ public Matrix submatrix(int rmin, int cmin, int rmax, int cmax) {
+ Matrix result = new Matrix(rmax - rmin, cmax - cmin);
+ for (int r = rmin; r < rmax; r++) {
+ for (int c = cmin; c < cmax; c++) {
+ result.data[r - rmin][c - cmin] = data[r][c];
+ }
+ }
+ return result;
+ }
+
+ /** Returns one row of the matrix as a byte array. */
+ public byte[] getRow(int row) {
+ byte[] result = new byte[columns];
+ for (int c = 0; c < columns; c++) {
+ result[c] = get(row, c);
+ }
+ return result;
+ }
+
+ /** Exchanges two rows in the matrix. */
+ public void swapRows(int r1, int r2) {
+ if (r1 < 0 || rows <= r1 || r2 < 0 || rows <= r2) {
+ throw new IllegalArgumentException("Row index out of range");
+ }
+ byte[] tmp = data[r1];
+ data[r1] = data[r2];
+ data[r2] = tmp;
+ }
+
+ /**
+ * Returns the inverse of this matrix.
+ *
+ * @throws IllegalArgumentException when the matrix is singular and doesn't have an inverse.
+ */
+ public Matrix invert() {
+ // Sanity check.
+ if (rows != columns) {
+ throw new IllegalArgumentException("Only square matrices can be inverted");
+ }
+
+ // Create a working matrix by augmenting this one with
+ // an identity matrix on the right.
+ Matrix work = augment(identity(rows));
+
+ // Do Gaussian elimination to transform the left half into
+ // an identity matrix.
+ work.gaussianElimination();
+
+ // The right half is now the inverse.
+ return work.submatrix(0, rows, columns, columns * 2);
+ }
+
+ /**
+ * Does the work of matrix inversion.
+ *
+ * <p>Assumes that this is an r by 2r matrix.
+ */
+ private void gaussianElimination() {
+ // Clear out the part below the main diagonal and scale the main
+ // diagonal to be 1.
+ for (int r = 0; r < rows; r++) {
+ // If the element on the diagonal is 0, find a row below
+ // that has a non-zero and swap them.
+ if (data[r][r] == (byte) 0) {
+ for (int rowBelow = r + 1; rowBelow < rows; rowBelow++) {
+ if (data[rowBelow][r] != 0) {
+ swapRows(r, rowBelow);
+ break;
+ }
+ }
+ }
+ // If we couldn't find one, the matrix is singular.
+ if (data[r][r] == (byte) 0) {
+ throw new IllegalArgumentException("Matrix is singular");
+ }
+ // Scale to 1.
+ if (data[r][r] != (byte) 1) {
+ byte scale = Galois.divide((byte) 1, data[r][r]);
+ for (int c = 0; c < columns; c++) {
+ data[r][c] = Galois.multiply(data[r][c], scale);
+ }
+ }
+ // Make everything below the 1 be a 0 by subtracting
+ // a multiple of it. (Subtraction and addition are
+ // both exclusive or in the Galois field.)
+ for (int rowBelow = r + 1; rowBelow < rows; rowBelow++) {
+ if (data[rowBelow][r] != (byte) 0) {
+ byte scale = data[rowBelow][r];
+ for (int c = 0; c < columns; c++) {
+ data[rowBelow][c] ^= Galois.multiply(scale, data[r][c]);
+ }
+ }
+ }
+ }
+
+ // Now clear the part above the main diagonal.
+ for (int d = 0; d < rows; d++) {
+ for (int rowAbove = 0; rowAbove < d; rowAbove++) {
+ if (data[rowAbove][d] != (byte) 0) {
+ byte scale = data[rowAbove][d];
+ for (int c = 0; c < columns; c++) {
+ data[rowAbove][c] ^= Galois.multiply(scale, data[d][c]);
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/encode/rs/ReedSolomon.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/encode/rs/ReedSolomon.java
new file mode 100644
index 0000000..e312f3c
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/encode/rs/ReedSolomon.java
@@ -0,0 +1,359 @@
+/**
+ * Reed-Solomon Coding over 8-bit values.
+ *
+ * <p>Copyright 2015, Backblaze, Inc.
+ */
+package org.apache.iotdb.cluster.utils.encode.rs;
+
+/** Reed-Solomon Coding over 8-bit values. */
+public class ReedSolomon {
+
+ private final int dataShardCount;
+ private final int parityShardCount;
+ private final int totalShardCount;
+ private final Matrix matrix;
+
+ /**
+ * Rows from the matrix for encoding parity, each one as its own byte array to allow for efficient
+ * access while encoding.
+ */
+ private final byte[][] parityRows;
+
+ /** Initializes a new encoder/decoder. */
+ public ReedSolomon(int dataShardCount, int parityShardCount) {
+ this.dataShardCount = dataShardCount;
+ this.parityShardCount = parityShardCount;
+ this.totalShardCount = dataShardCount + parityShardCount;
+ matrix = buildMatrix(dataShardCount, this.totalShardCount);
+ parityRows = new byte[parityShardCount][];
+ for (int i = 0; i < parityShardCount; i++) {
+ parityRows[i] = matrix.getRow(dataShardCount + i);
+ }
+ }
+
+ /** Returns the number of data shards. */
+ public int getDataShardCount() {
+ return dataShardCount;
+ }
+
+ /** Returns the number of parity shards. */
+ public int getParityShardCount() {
+ return parityShardCount;
+ }
+
+ /** Returns the total number of shards. */
+ public int getTotalShardCount() {
+ return totalShardCount;
+ }
+
+ /**
+ * Encodes parity for a set of data shards.
+ *
+ * @param shards An array containing data shards followed by parity shards. Each shard is a byte
+ * array, and they must all be the same size.
+ * @param offset The index of the first byte in each shard to encode.
+ * @param byteCount The number of bytes to encode in each shard.
+ */
+ public void encodeParity(byte[][] shards, int offset, int byteCount) {
+ // Check arguments.
+ checkBuffersAndSizes(shards, offset, byteCount);
+
+ // Build the array of output buffers.
+ byte[][] outputs = new byte[parityShardCount][];
+ for (int i = 0; i < parityShardCount; i++) {
+ outputs[i] = shards[dataShardCount + i];
+ }
+
+ // Do the coding.
+ codeSomeShards(parityRows, shards, outputs, parityShardCount, offset, byteCount);
+ }
+
+ /**
+ * Returns true if the parity shards contain the right data.
+ *
+ * @param shards An array containing data shards followed by parity shards. Each shard is a byte
+ * array, and they must all be the same size.
+ * @param firstByte The index of the first byte in each shard to check.
+ * @param byteCount The number of bytes to check in each shard.
+ */
+ public boolean isParityCorrect(byte[][] shards, int firstByte, int byteCount) {
+ // Check arguments.
+ checkBuffersAndSizes(shards, firstByte, byteCount);
+
+ // Build the array of buffers being checked.
+ byte[][] toCheck = new byte[parityShardCount][];
+ for (int i = 0; i < parityShardCount; i++) {
+ toCheck[i] = shards[dataShardCount + i];
+ }
+
+ // Do the checking.
+ return checkSomeShards(parityRows, shards, toCheck, parityShardCount, firstByte, byteCount);
+ }
+
+ /**
+ * Given a list of shards, some of which contain data, fills in the ones that don't have data.
+ *
+ * <p>Quickly does nothing if all of the shards are present.
+ *
+ * <p>If any shards are missing (based on the flags in shardsPresent), the data in those shards is
+ * recomputed and filled in.
+ */
+ public void decodeMissing(
+ byte[][] shards, boolean[] shardPresent, final int offset, final int byteCount) {
+ // Check arguments.
+ checkBuffersAndSizes(shards, offset, byteCount);
+
+ // Quick check: are all of the shards present? If so, there's
+ // nothing to do.
+ int numberPresent = 0;
+ for (int i = 0; i < totalShardCount; i++) {
+ if (shardPresent[i]) {
+ numberPresent += 1;
+ }
+ }
+ if (numberPresent == totalShardCount) {
+ // Cool. All of the shards data data. We don't
+ // need to do anything.
+ return;
+ }
+
+ // More complete sanity check
+ if (numberPresent < dataShardCount) {
+ throw new IllegalArgumentException("Not enough shards present");
+ }
+
+ // Pull out the rows of the matrix that correspond to the
+ // shards that we have and build a square matrix. This
+ // matrix could be used to generate the shards that we have
+ // from the original data.
+ //
+ // Also, pull out an array holding just the shards that
+ // correspond to the rows of the submatrix. These shards
+ // will be the input to the decoding process that re-creates
+ // the missing data shards.
+ Matrix subMatrix = new Matrix(dataShardCount, dataShardCount);
+ byte[][] subShards = new byte[dataShardCount][];
+ {
+ int subMatrixRow = 0;
+ for (int matrixRow = 0;
+ matrixRow < totalShardCount && subMatrixRow < dataShardCount;
+ matrixRow++) {
+ if (shardPresent[matrixRow]) {
+ for (int c = 0; c < dataShardCount; c++) {
+ subMatrix.set(subMatrixRow, c, matrix.get(matrixRow, c));
+ }
+ subShards[subMatrixRow] = shards[matrixRow];
+ subMatrixRow += 1;
+ }
+ }
+ }
+
+ // Invert the matrix, so we can go from the encoded shards
+ // back to the original data. Then pull out the row that
+ // generates the shard that we want to decode. Note that
+ // since this matrix maps back to the orginal data, it can
+ // be used to create a data shard, but not a parity shard.
+ Matrix dataDecodeMatrix = subMatrix.invert();
+
+ // Re-create any data shards that were missing.
+ //
+ // The input to the coding is all of the shards we actually
+ // have, and the output is the missing data shards. The computation
+ // is done using the special decode matrix we just built.
+ byte[][] outputs = new byte[parityShardCount][];
+ byte[][] matrixRows = new byte[parityShardCount][];
+ int outputCount = 0;
+ for (int iShard = 0; iShard < dataShardCount; iShard++) {
+ if (!shardPresent[iShard]) {
+ outputs[outputCount] = shards[iShard];
+ matrixRows[outputCount] = dataDecodeMatrix.getRow(iShard);
+ outputCount += 1;
+ }
+ }
+ codeSomeShards(matrixRows, subShards, outputs, outputCount, offset, byteCount);
+
+ // Now that we have all of the data shards intact, we can
+ // compute any of the parity that is missing.
+ //
+ // The input to the coding is ALL of the data shards, including
+ // any that we just calculated. The output is whichever of the
+ // data shards were missing.
+ outputCount = 0;
+ for (int iShard = dataShardCount; iShard < totalShardCount; iShard++) {
+ if (!shardPresent[iShard]) {
+ outputs[outputCount] = shards[iShard];
+ matrixRows[outputCount] = parityRows[iShard - dataShardCount];
+ outputCount += 1;
+ }
+ }
+ codeSomeShards(matrixRows, shards, outputs, outputCount, offset, byteCount);
+ }
+
+ /** Checks the consistency of arguments passed to public methods. */
+ private void checkBuffersAndSizes(byte[][] shards, int offset, int byteCount) {
+ // The number of buffers should be equal to the number of
+ // data shards plus the number of parity shards.
+ if (shards.length != totalShardCount) {
+ throw new IllegalArgumentException("wrong number of shards: " + shards.length);
+ }
+
+ // All of the shard buffers should be the same length.
+ int shardLength = shards[0].length;
+ for (int i = 1; i < shards.length; i++) {
+ if (shards[i].length != shardLength) {
+ throw new IllegalArgumentException("Shards are different sizes");
+ }
+ }
+
+ // The offset and byteCount must be non-negative and fit in the buffers.
+ if (offset < 0) {
+ throw new IllegalArgumentException("offset is negative: " + offset);
+ }
+ if (byteCount < 0) {
+ throw new IllegalArgumentException("byteCount is negative: " + byteCount);
+ }
+ if (shardLength < offset + byteCount) {
+ throw new IllegalArgumentException("buffers to small: " + byteCount + offset);
+ }
+ }
+
+ /**
+ * Multiplies a subset of rows from a coding matrix by a full set of input shards to produce some
+ * output shards.
+ *
+ * @param matrixRows The rows from the matrix to use.
+ * @param inputs An array of byte arrays, each of which is one input shard. The inputs array may
+ * have extra buffers after the ones that are used. They will be ignored. The number of inputs
+ * used is determined by the length of the each matrix row.
+ * @param outputs Byte arrays where the computed shards are stored. The outputs array may also
+ * have extra, unused, elements at the end. The number of outputs computed, and the number of
+ * matrix rows used, is determined by outputCount.
+ * @param outputCount The number of outputs to compute.
+ * @param offset The index in the inputs and output of the first byte to process.
+ * @param byteCount The number of bytes to process.
+ */
+ private void codeSomeShards(
+ final byte[][] matrixRows,
+ final byte[][] inputs,
+ final byte[][] outputs,
+ final int outputCount,
+ final int offset,
+ final int byteCount) {
+
+ // This is the inner loop. It needs to be fast. Be careful
+ // if you change it.
+ //
+ // Note that dataShardCount is final in the class, so the
+ // compiler can load it just once, before the loop. Explicitly
+ // adding a local variable does not make it faster.
+ //
+ // I have tried inlining Galois.multiply(), but it doesn't
+ // make things any faster. The JIT compiler is known to inline
+ // methods, so it's probably already doing so.
+ //
+ // This method has been timed and compared with a C implementation.
+ // This Java version is only about 10% slower than C.
+
+ for (int iByte = offset; iByte < offset + byteCount; iByte++) {
+ for (int iRow = 0; iRow < outputCount; iRow++) {
+ byte[] matrixRow = matrixRows[iRow];
+ int value = 0;
+ for (int c = 0; c < dataShardCount; c++) {
+ value ^= Galois.multiply(matrixRow[c], inputs[c][iByte]);
+ }
+ outputs[iRow][iByte] = (byte) value;
+ }
+ }
+ }
+
+ /**
+ * Multiplies a subset of rows from a coding matrix by a full set of input shards to produce some
+ * output shards, and checks that the the data is those shards matches what's expected.
+ *
+ * @param matrixRows The rows from the matrix to use.
+ * @param inputs An array of byte arrays, each of which is one input shard. The inputs array may
+ * have extra buffers after the ones that are used. They will be ignored. The number of inputs
+ * used is determined by the length of the each matrix row.
+ * @param toCheck Byte arrays where the computed shards are stored. The outputs array may also
+ * have extra, unused, elements at the end. The number of outputs computed, and the number of
+ * matrix rows used, is determined by outputCount.
+ * @param checkCount The number of outputs to compute.
+ * @param offset The index in the inputs and output of the first byte to process.
+ * @param byteCount The number of bytes to process.
+ */
+ private boolean checkSomeShards(
+ final byte[][] matrixRows,
+ final byte[][] inputs,
+ final byte[][] toCheck,
+ final int checkCount,
+ final int offset,
+ final int byteCount) {
+
+ // This is the inner loop. It needs to be fast. Be careful
+ // if you change it.
+ //
+ // Note that dataShardCount is final in the class, so the
+ // compiler can load it just once, before the loop. Explicitly
+ // adding a local variable does not make it faster.
+ //
+ // I have tried inlining Galois.multiply(), but it doesn't
+ // make things any faster. The JIT compiler is known to inline
+ // methods, so it's probably already doing so.
+ //
+ // This method has been timed and compared with a C implementation.
+ // This Java version is only about 10% slower than C.
+
+ for (int iByte = offset; iByte < offset + byteCount; iByte++) {
+ for (int iRow = 0; iRow < checkCount; iRow++) {
+ byte[] matrixRow = matrixRows[iRow];
+ int value = 0;
+ for (int c = 0; c < dataShardCount; c++) {
+ value ^= Galois.multiply(matrixRow[c], inputs[c][iByte]);
+ }
+ if (toCheck[iRow][iByte] != (byte) value) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Create the matrix to use for encoding, given the number of data shards and the number of total
+ * shards.
+ *
+ * <p>The top square of the matrix is guaranteed to be an identity matrix, which means that the
+ * data shards are unchanged after encoding.
+ */
+ private static Matrix buildMatrix(int dataShards, int totalShards) {
+ // Start with a Vandermonde matrix. This matrix would work,
+ // in theory, but doesn't have the property that the data
+ // shards are unchanged after encoding.
+ Matrix vandermonde = vandermonde(totalShards, dataShards);
+
+ // Multiple by the inverse of the top square of the matrix.
+ // This will make the top square be the identity matrix, but
+ // preserve the property that any square subset of rows is
+ // invertible.
+ Matrix top = vandermonde.submatrix(0, 0, dataShards, dataShards);
+ return vandermonde.times(top.invert());
+ }
+
+ /**
+ * Create a Vandermonde matrix, which is guaranteed to have the property that any subset of rows
+ * that forms a square matrix is invertible.
+ *
+ * @param rows Number of rows in the result.
+ * @param cols Number of columns in the result.
+ * @return A Matrix.
+ */
+ private static Matrix vandermonde(int rows, int cols) {
+ Matrix result = new Matrix(rows, cols);
+ for (int r = 0; r < rows; r++) {
+ for (int c = 0; c < cols; c++) {
+ result.set(r, c, Galois.exp((byte) r, c));
+ }
+ }
+ return result;
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/encode/rs/SampleDecoder.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/encode/rs/SampleDecoder.java
new file mode 100644
index 0000000..87a1ead
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/encode/rs/SampleDecoder.java
@@ -0,0 +1,98 @@
+/**
+ * Command-line program that decodes a file using Reed-Solomon 4+2.
+ *
+ * <p>Copyright 2015, Backblaze, Inc. All rights reserved.
+ */
+package org.apache.iotdb.cluster.utils.encode.rs;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * Command-line program that decodes a file using Reed-Solomon 4+2.
+ *
+ * <p>The file name given should be the name of the file to decode, say "foo.txt". This program will
+ * expected to find "foo.txt.0" through "foo.txt.5", with at most two missing. It will then write
+ * "foo.txt.decoded".
+ */
+public class SampleDecoder {
+
+ public static final int DATA_SHARDS = 4;
+ public static final int PARITY_SHARDS = 2;
+ public static final int TOTAL_SHARDS = 6;
+
+ public static final int BYTES_IN_INT = 4;
+
+ public static void main(String[] arguments) throws IOException {
+
+ // Parse the command line
+ if (arguments.length != 1) {
+ System.out.println("Usage: SampleDecoder <fileName>");
+ return;
+ }
+ final File originalFile = new File(arguments[0]);
+ if (!originalFile.exists()) {
+ System.out.println("Cannot read input file: " + originalFile);
+ return;
+ }
+
+ // Read in any of the shards that are present.
+ // (There should be checking here to make sure the input
+ // shards are the same size, but there isn't.)
+ final byte[][] shards = new byte[TOTAL_SHARDS][];
+ final boolean[] shardPresent = new boolean[TOTAL_SHARDS];
+ int shardSize = 0;
+ int shardCount = 0;
+ for (int i = 0; i < TOTAL_SHARDS; i++) {
+ File shardFile = new File(originalFile.getParentFile(), originalFile.getName() + "." + i);
+ if (shardFile.exists()) {
+ shardSize = (int) shardFile.length();
+ shards[i] = new byte[shardSize];
+ shardPresent[i] = true;
+ shardCount += 1;
+ InputStream in = new FileInputStream(shardFile);
+ in.read(shards[i], 0, shardSize);
+ in.close();
+ System.out.println("Read " + shardFile);
+ }
+ }
+
+ // We need at least DATA_SHARDS to be able to reconstruct the file.
+ if (shardCount < DATA_SHARDS) {
+ System.out.println("Not enough shards present");
+ return;
+ }
+
+ // Make empty buffers for the missing shards.
+ for (int i = 0; i < TOTAL_SHARDS; i++) {
+ if (!shardPresent[i]) {
+ shards[i] = new byte[shardSize];
+ }
+ }
+
+ // Use Reed-Solomon to fill in the missing shards
+ ReedSolomon reedSolomon = new ReedSolomon(DATA_SHARDS, PARITY_SHARDS);
+ reedSolomon.decodeMissing(shards, shardPresent, 0, shardSize);
+
+ // Combine the data shards into one buffer for convenience.
+ // (This is not efficient, but it is convenient.)
+ byte[] allBytes = new byte[shardSize * DATA_SHARDS];
+ for (int i = 0; i < DATA_SHARDS; i++) {
+ System.arraycopy(shards[i], 0, allBytes, shardSize * i, shardSize);
+ }
+
+ // Extract the file length
+ int fileSize = ByteBuffer.wrap(allBytes).getInt();
+
+ // Write the decoded file
+ File decodedFile = new File(originalFile.getParentFile(), originalFile.getName() + ".decoded");
+ OutputStream out = new FileOutputStream(decodedFile);
+ out.write(allBytes, BYTES_IN_INT, fileSize);
+ System.out.println("Wrote " + decodedFile);
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/encode/rs/SampleEncoder.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/encode/rs/SampleEncoder.java
new file mode 100644
index 0000000..1f98707
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/encode/rs/SampleEncoder.java
@@ -0,0 +1,91 @@
+/**
+ * Command-line program encodes one file using Reed-Solomon 4+2.
+ *
+ * <p>Copyright 2015, Backblaze, Inc.
+ */
+package org.apache.iotdb.cluster.utils.encode.rs;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * Command-line program encodes one file using Reed-Solomon 4+2.
+ *
+ * <p>The one argument should be a file name, say "foo.txt". This program will create six files in
+ * the same directory, breaking the input file into four data shards, and two parity shards. The
+ * output files are called "foo.txt.0", "foo.txt.1", ..., and "foo.txt.5". Numbers 4 and 5 are the
+ * parity shards.
+ *
+ * <p>The data stored is the file size (four byte int), followed by the contents of the file, and
+ * then padded to a multiple of four bytes with zeros. The padding is because all four data shards
+ * must be the same size.
+ */
+public class SampleEncoder {
+
+ public static final int DATA_SHARDS = 4;
+ public static final int PARITY_SHARDS = 2;
+ public static final int TOTAL_SHARDS = 6;
+
+ public static final int BYTES_IN_INT = 4;
+
+ public static void main(String[] arguments) throws IOException {
+
+ // Parse the command line
+ if (arguments.length != 1) {
+ System.out.println("Usage: SampleEncoder <fileName>");
+ return;
+ }
+ final File inputFile = new File(arguments[0]);
+ if (!inputFile.exists()) {
+ System.out.println("Cannot read input file: " + inputFile);
+ return;
+ }
+
+ // Get the size of the input file. (Files bigger that
+ // Integer.MAX_VALUE will fail here!)
+ final int fileSize = (int) inputFile.length();
+
+ // Figure out how big each shard will be. The total size stored
+ // will be the file size (8 bytes) plus the file.
+ final int storedSize = fileSize + BYTES_IN_INT;
+ final int shardSize = (storedSize + DATA_SHARDS - 1) / DATA_SHARDS;
+
+ // Create a buffer holding the file size, followed by
+ // the contents of the file.
+ final int bufferSize = shardSize * DATA_SHARDS;
+ final byte[] allBytes = new byte[bufferSize];
+ ByteBuffer.wrap(allBytes).putInt(fileSize);
+ InputStream in = new FileInputStream(inputFile);
+ int bytesRead = in.read(allBytes, BYTES_IN_INT, fileSize);
+ if (bytesRead != fileSize) {
+ throw new IOException("not enough bytes read");
+ }
+ in.close();
+
+ // Make the buffers to hold the shards.
+ byte[][] shards = new byte[TOTAL_SHARDS][shardSize];
+
+ // Fill in the data shards
+ for (int i = 0; i < DATA_SHARDS; i++) {
+ System.arraycopy(allBytes, i * shardSize, shards[i], 0, shardSize);
+ }
+
+ // Use Reed-Solomon to calculate the parity.
+ ReedSolomon reedSolomon = new ReedSolomon(DATA_SHARDS, PARITY_SHARDS);
+ reedSolomon.encodeParity(shards, 0, shardSize);
+
+ // Write out the resulting files.
+ for (int i = 0; i < TOTAL_SHARDS; i++) {
+ File outputFile = new File(inputFile.getParentFile(), inputFile.getName() + "." + i);
+ OutputStream out = new FileOutputStream(outputFile);
+ out.write(shards[i]);
+ out.close();
+ System.out.println("wrote " + outputFile);
+ }
+ }
+}