You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ji...@apache.org on 2017/01/31 21:17:18 UTC
[33/54] [abbrv] incubator-ratis git commit: Renamed the packages from
raft to ratis in preperation for Apache Incubation - Moved all java packages
from org.apache.raft to org.apache.ratis. - Moved native package to
org_apache_ratis, and native lib to l
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/test/java/org/apache/raft/server/storage/TestRaftLogCache.java
----------------------------------------------------------------------
diff --git a/raft-server/src/test/java/org/apache/raft/server/storage/TestRaftLogCache.java b/raft-server/src/test/java/org/apache/raft/server/storage/TestRaftLogCache.java
deleted file mode 100644
index dde6c7a..0000000
--- a/raft-server/src/test/java/org/apache/raft/server/storage/TestRaftLogCache.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.storage;
-
-import org.apache.raft.RaftTestUtil.SimpleOperation;
-import org.apache.raft.server.storage.RaftLogCache.TruncationSegments;
-import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto;
-import org.apache.raft.util.ProtoUtils;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.Iterator;
-
-public class TestRaftLogCache {
- private RaftLogCache cache;
-
- @Before
- public void setup() {
- cache = new RaftLogCache();
- }
-
- private LogSegment prepareLogSegment(long start, long end, boolean isOpen) {
- LogSegment s = LogSegment.newOpenSegment(start);
- for (long i = start; i <= end; i++) {
- SimpleOperation m = new SimpleOperation("m" + i);
- LogEntryProto entry = ProtoUtils.toLogEntryProto(m.getLogEntryContent(),
- 0, i);
- s.appendToOpenSegment(entry);
- }
- if (!isOpen) {
- s.close();
- }
- return s;
- }
-
- private void checkCache(long start, long end, int segmentSize) {
- Assert.assertEquals(start, cache.getStartIndex());
- Assert.assertEquals(end, cache.getEndIndex());
-
- for (long index = start; index <= end; index++) {
- LogEntryProto entry = cache.getEntry(index);
- Assert.assertEquals(index, entry.getIndex());
- }
-
- long[] offsets = new long[]{start, start + 1, start + (end - start) / 2,
- end - 1, end};
- for (long offset : offsets) {
- checkCacheEntries(offset, (int) (end - offset + 1), end);
- checkCacheEntries(offset, 1, end);
- checkCacheEntries(offset, 20, end);
- checkCacheEntries(offset, segmentSize, end);
- checkCacheEntries(offset, segmentSize - 1, end);
- }
- }
-
- private void checkCacheEntries(long offset, int size, long end) {
- LogEntryProto[] entries = cache.getEntries(offset, offset + size);
- long realEnd = offset + size > end + 1 ? end + 1 : offset + size;
- Assert.assertEquals(realEnd - offset, entries.length);
- for (long i = offset; i < realEnd; i++) {
- Assert.assertEquals(i, entries[(int) (i - offset)].getIndex());
- }
- }
-
- @Test
- public void testAddSegments() throws Exception {
- LogSegment s1 = prepareLogSegment(1, 100, false);
- cache.addSegment(s1);
- checkCache(1, 100, 100);
-
- try {
- LogSegment s = prepareLogSegment(102, 103, true);
- cache.addSegment(s);
- Assert.fail("should fail since there is gap between two segments");
- } catch (IllegalStateException ignored) {
- }
-
- LogSegment s2 = prepareLogSegment(101, 200, true);
- cache.addSegment(s2);
- checkCache(1, 200, 100);
-
- try {
- LogSegment s = prepareLogSegment(201, 202, true);
- cache.addSegment(s);
- Assert.fail("should fail since there is still an open segment in cache");
- } catch (IllegalStateException ignored) {
- }
-
- cache.rollOpenSegment(false);
- checkCache(1, 200, 100);
-
- try {
- LogSegment s = prepareLogSegment(202, 203, true);
- cache.addSegment(s);
- Assert.fail("should fail since there is gap between two segments");
- } catch (IllegalStateException ignored) {
- }
-
- LogSegment s3 = prepareLogSegment(201, 300, true);
- cache.addSegment(s3);
- Assert.assertNotNull(cache.getOpenSegment());
- checkCache(1, 300, 100);
-
- cache.rollOpenSegment(true);
- Assert.assertNotNull(cache.getOpenSegment());
- checkCache(1, 300, 100);
- }
-
- @Test
- public void testAppendEntry() throws Exception {
- LogSegment closedSegment = prepareLogSegment(0, 99, false);
- cache.addSegment(closedSegment);
-
- final SimpleOperation m = new SimpleOperation("m");
- try {
- LogEntryProto entry = ProtoUtils.toLogEntryProto(m.getLogEntryContent(),
- 0, 0);
- cache.appendEntry(entry);
- Assert.fail("the open segment is null");
- } catch (IllegalStateException ignored) {
- }
-
- LogSegment openSegment = prepareLogSegment(100, 100, true);
- cache.addSegment(openSegment);
- for (long index = 101; index < 200; index++) {
- LogEntryProto entry = ProtoUtils.toLogEntryProto(m.getLogEntryContent(),
- 0, index);
- cache.appendEntry(entry);
- }
-
- Assert.assertNotNull(cache.getOpenSegment());
- checkCache(0, 199, 100);
- }
-
- @Test
- public void testTruncate() throws Exception {
- long start = 0;
- for (int i = 0; i < 5; i++) { // 5 closed segments
- LogSegment s = prepareLogSegment(start, start + 99, false);
- cache.addSegment(s);
- start += 100;
- }
- // add another open segment
- LogSegment s = prepareLogSegment(start, start + 99, true);
- cache.addSegment(s);
-
- long end = cache.getEndIndex();
- Assert.assertEquals(599, end);
- int numOfSegments = 6;
- // start truncation
- for (int i = 0; i < 10; i++) { // truncate 10 times
- // each time truncate 37 entries
- end -= 37;
- TruncationSegments ts = cache.truncate(end + 1);
- checkCache(0, end, 100);
-
- // check TruncationSegments
- int currentNum= (int) (end / 100 + 1);
- if (currentNum < numOfSegments) {
- Assert.assertEquals(1, ts.toDelete.length);
- numOfSegments = currentNum;
- } else {
- Assert.assertEquals(0, ts.toDelete.length);
- }
- }
-
- // 230 entries remaining. truncate at the segment boundary
- TruncationSegments ts = cache.truncate(200);
- checkCache(0, 199, 100);
- Assert.assertEquals(1, ts.toDelete.length);
- Assert.assertEquals(200, ts.toDelete[0].startIndex);
- Assert.assertEquals(229, ts.toDelete[0].endIndex);
- Assert.assertEquals(0, ts.toDelete[0].targetLength);
- Assert.assertFalse(ts.toDelete[0].isOpen);
- Assert.assertNull(ts.toTruncate);
-
- // add another open segment and truncate it as a whole
- LogSegment newOpen = prepareLogSegment(200, 249, true);
- cache.addSegment(newOpen);
- ts = cache.truncate(200);
- checkCache(0, 199, 100);
- Assert.assertEquals(1, ts.toDelete.length);
- Assert.assertEquals(200, ts.toDelete[0].startIndex);
- Assert.assertEquals(249, ts.toDelete[0].endIndex);
- Assert.assertEquals(0, ts.toDelete[0].targetLength);
- Assert.assertTrue(ts.toDelete[0].isOpen);
- Assert.assertNull(ts.toTruncate);
-
- // add another open segment and truncate part of it
- newOpen = prepareLogSegment(200, 249, true);
- cache.addSegment(newOpen);
- ts = cache.truncate(220);
- checkCache(0, 219, 100);
- Assert.assertNull(cache.getOpenSegment());
- Assert.assertEquals(0, ts.toDelete.length);
- Assert.assertTrue(ts.toTruncate.isOpen);
- Assert.assertEquals(219, ts.toTruncate.newEndIndex);
- Assert.assertEquals(200, ts.toTruncate.startIndex);
- Assert.assertEquals(249, ts.toTruncate.endIndex);
- }
-
- private void testIterator(long startIndex) {
- Iterator<LogEntryProto> iterator = cache.iterator(startIndex);
- LogEntryProto prev = null;
- while (iterator.hasNext()) {
- LogEntryProto entry = iterator.next();
- Assert.assertEquals(cache.getEntry(entry.getIndex()), entry);
- if (prev != null) {
- Assert.assertEquals(prev.getIndex() + 1, entry.getIndex());
- }
- prev = entry;
- }
- if (startIndex <= cache.getEndIndex()) {
- Assert.assertNotNull(prev);
- Assert.assertEquals(cache.getEndIndex(), prev.getIndex());
- }
- }
-
- @Test
- public void testIterator() throws Exception {
- long start = 0;
- for (int i = 0; i < 2; i++) { // 2 closed segments
- LogSegment s = prepareLogSegment(start, start + 99, false);
- cache.addSegment(s);
- start += 100;
- }
- // add another open segment
- LogSegment s = prepareLogSegment(start, start + 99, true);
- cache.addSegment(s);
-
- for (long startIndex = 0; startIndex < 300; startIndex += 50) {
- testIterator(startIndex);
- }
- testIterator(299);
-
- Iterator<LogEntryProto> iterator = cache.iterator(300);
- Assert.assertFalse(iterator.hasNext());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/test/java/org/apache/raft/server/storage/TestRaftLogReadWrite.java
----------------------------------------------------------------------
diff --git a/raft-server/src/test/java/org/apache/raft/server/storage/TestRaftLogReadWrite.java b/raft-server/src/test/java/org/apache/raft/server/storage/TestRaftLogReadWrite.java
deleted file mode 100644
index fa17696..0000000
--- a/raft-server/src/test/java/org/apache/raft/server/storage/TestRaftLogReadWrite.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.storage;
-
-import org.apache.raft.RaftTestUtil;
-import org.apache.raft.RaftTestUtil.SimpleOperation;
-import org.apache.raft.conf.RaftProperties;
-import org.apache.raft.protocol.ChecksumException;
-import org.apache.raft.server.impl.RaftServerConstants;
-import org.apache.raft.server.impl.RaftServerConstants.StartupOption;
-import org.apache.raft.shaded.com.google.protobuf.CodedOutputStream;
-import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto;
-import org.apache.raft.util.FileUtils;
-import org.apache.raft.util.ProtoUtils;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.apache.raft.server.RaftServerConfigKeys.*;
-
-/**
- * Test basic functionality of LogReader, LogInputStream, and LogOutputStream.
- */
-public class TestRaftLogReadWrite {
- private static final Logger LOG = LoggerFactory.getLogger(TestRaftLogReadWrite.class);
-
- private File storageDir;
- private RaftProperties properties;
- private int segmentMaxSize;
-
- @Before
- public void setup() throws Exception {
- storageDir = RaftTestUtil.getTestDir(TestRaftLogReadWrite.class);
- properties = new RaftProperties();
- properties.set(RAFT_SERVER_STORAGE_DIR_KEY,
- FileUtils.fileAsURI(storageDir).toString());
- }
-
- @After
- public void tearDown() throws Exception {
- if (storageDir != null) {
- FileUtils.fullyDelete(storageDir.getParentFile());
- }
- }
-
- private LogEntryProto[] readLog(File file, long startIndex, long endIndex,
- boolean isOpen) throws IOException {
- List<LogEntryProto> list = new ArrayList<>();
- try (LogInputStream in =
- new LogInputStream(file, startIndex, endIndex, isOpen)) {
- LogEntryProto entry;
- while ((entry = in.nextEntry()) != null) {
- list.add(entry);
- }
- }
- return list.toArray(new LogEntryProto[list.size()]);
- }
-
- private long writeMessages(LogEntryProto[] entries, LogOutputStream out)
- throws IOException {
- long size = 0;
- for (int i = 0; i < entries.length; i++) {
- SimpleOperation m = new SimpleOperation("m" + i);
- entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i);
- final int s = entries[i].getSerializedSize();
- size += CodedOutputStream.computeUInt32SizeNoTag(s) + s + 4;
- out.write(entries[i]);
- }
- return size;
- }
-
- /**
- * Test basic functionality: write several log entries, then read
- */
- @Test
- public void testReadWriteLog() throws IOException {
- RaftStorage storage = new RaftStorage(properties, StartupOption.REGULAR);
- File openSegment = storage.getStorageDir().getOpenLogFile(0);
- long size = SegmentedRaftLog.HEADER_BYTES.length;
-
- final LogEntryProto[] entries = new LogEntryProto[100];
- try (LogOutputStream out =
- new LogOutputStream(openSegment, false, properties)) {
- size += writeMessages(entries, out);
- } finally {
- storage.close();
- }
-
- Assert.assertEquals(size, openSegment.length());
-
- LogEntryProto[] readEntries = readLog(openSegment, 0,
- RaftServerConstants.INVALID_LOG_INDEX, true);
- Assert.assertArrayEquals(entries, readEntries);
- }
-
- @Test
- public void testAppendLog() throws IOException {
- RaftStorage storage = new RaftStorage(properties, StartupOption.REGULAR);
- File openSegment = storage.getStorageDir().getOpenLogFile(0);
- LogEntryProto[] entries = new LogEntryProto[200];
- try (LogOutputStream out =
- new LogOutputStream(openSegment, false, properties)) {
- for (int i = 0; i < 100; i++) {
- SimpleOperation m = new SimpleOperation("m" + i);
- entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i);
- out.write(entries[i]);
- }
- }
-
- try (LogOutputStream out =
- new LogOutputStream(openSegment, true, properties)) {
- for (int i = 100; i < 200; i++) {
- SimpleOperation m = new SimpleOperation("m" + i);
- entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i);
- out.write(entries[i]);
- }
- }
-
- LogEntryProto[] readEntries = readLog(openSegment, 0,
- RaftServerConstants.INVALID_LOG_INDEX, true);
- Assert.assertArrayEquals(entries, readEntries);
-
- storage.close();
- }
-
- /**
- * Simulate the scenario that the peer is shutdown without truncating
- * log segment file padding. Make sure the reader can correctly handle this.
- */
- @Test
- public void testReadWithPadding() throws IOException {
- RaftStorage storage = new RaftStorage(properties, StartupOption.REGULAR);
- File openSegment = storage.getStorageDir().getOpenLogFile(0);
- long size = SegmentedRaftLog.HEADER_BYTES.length;
-
- LogEntryProto[] entries = new LogEntryProto[100];
- LogOutputStream out = new LogOutputStream(openSegment, false, properties);
- size += writeMessages(entries, out);
- out.flush();
-
- // make sure the file contains padding
- Assert.assertEquals(RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_DEFAULT,
- openSegment.length());
-
- // check if the reader can correctly read the log file
- LogEntryProto[] readEntries = readLog(openSegment, 0,
- RaftServerConstants.INVALID_LOG_INDEX, true);
- Assert.assertArrayEquals(entries, readEntries);
-
- out.close();
- Assert.assertEquals(size, openSegment.length());
- }
-
- /**
- * corrupt the padding by inserting non-zero bytes. Make sure the reader
- * throws exception.
- */
- @Test
- public void testReadWithCorruptPadding() throws IOException {
- properties.setLong(RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_KEY, 4 * 1024 * 1024);
- properties.setLong(RAFT_LOG_SEGMENT_MAX_SIZE_KEY, 16 * 1024 * 1024);
-
- RaftStorage storage = new RaftStorage(properties, StartupOption.REGULAR);
- File openSegment = storage.getStorageDir().getOpenLogFile(0);
-
- LogEntryProto[] entries = new LogEntryProto[10];
- LogOutputStream out = new LogOutputStream(openSegment, false, properties);
- for (int i = 0; i < 10; i++) {
- SimpleOperation m = new SimpleOperation("m" + i);
- entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i);
- out.write(entries[i]);
- }
- out.flush();
-
- // make sure the file contains padding
- Assert.assertEquals(4 * 1024 * 1024, openSegment.length());
-
- try (FileOutputStream fout = new FileOutputStream(openSegment, true)) {
- ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[]{-1, 1});
- fout.getChannel()
- .write(byteBuffer, 16 * 1024 * 1024 - 10);
- }
-
- List<LogEntryProto> list = new ArrayList<>();
- try (LogInputStream in = new LogInputStream(openSegment, 0,
- RaftServerConstants.INVALID_LOG_INDEX, true)) {
- LogEntryProto entry;
- while ((entry = in.nextEntry()) != null) {
- list.add(entry);
- }
- Assert.fail("should fail since we corrupt the padding");
- } catch (IOException e) {
- boolean findVerifyTerminator = false;
- for (StackTraceElement s : e.getStackTrace()) {
- if (s.getMethodName().equals("verifyTerminator")) {
- findVerifyTerminator = true;
- break;
- }
- }
- Assert.assertTrue(findVerifyTerminator);
- }
- Assert.assertArrayEquals(entries,
- list.toArray(new LogEntryProto[list.size()]));
- }
-
- /**
- * Test the log reader to make sure it can detect the checksum mismatch.
- */
- @Test
- public void testReadWithEntryCorruption() throws IOException {
- RaftStorage storage = new RaftStorage(properties, StartupOption.REGULAR);
- File openSegment = storage.getStorageDir().getOpenLogFile(0);
- try (LogOutputStream out =
- new LogOutputStream(openSegment, false, properties)) {
- for (int i = 0; i < 100; i++) {
- LogEntryProto entry = ProtoUtils.toLogEntryProto(
- new SimpleOperation("m" + i).getLogEntryContent(), 0, i);
- out.write(entry);
- }
- } finally {
- storage.close();
- }
-
- // corrupt the log file
- try (RandomAccessFile raf = new RandomAccessFile(openSegment.getCanonicalFile(),
- "rw")) {
- raf.seek(100);
- int correctValue = raf.read();
- raf.seek(100);
- raf.write(correctValue + 1);
- }
-
- try {
- readLog(openSegment, 0, RaftServerConstants.INVALID_LOG_INDEX, true);
- Assert.fail("The read of corrupted log file should fail");
- } catch (ChecksumException e) {
- LOG.info("Caught ChecksumException as expected", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/test/java/org/apache/raft/server/storage/TestRaftLogSegment.java
----------------------------------------------------------------------
diff --git a/raft-server/src/test/java/org/apache/raft/server/storage/TestRaftLogSegment.java b/raft-server/src/test/java/org/apache/raft/server/storage/TestRaftLogSegment.java
deleted file mode 100644
index 470f80f..0000000
--- a/raft-server/src/test/java/org/apache/raft/server/storage/TestRaftLogSegment.java
+++ /dev/null
@@ -1,303 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.storage;
-
-import org.apache.raft.RaftTestUtil;
-import org.apache.raft.RaftTestUtil.SimpleOperation;
-import org.apache.raft.conf.RaftProperties;
-import org.apache.raft.server.RaftServerConfigKeys;
-import org.apache.raft.server.impl.RaftServerConstants.StartupOption;
-import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto;
-import org.apache.raft.shaded.proto.RaftProtos.SMLogEntryProto;
-import org.apache.raft.util.FileUtils;
-import org.apache.raft.util.ProtoUtils;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import static org.apache.raft.server.RaftServerConfigKeys.*;
-import static org.apache.raft.server.impl.RaftServerConstants.INVALID_LOG_INDEX;
-import static org.apache.raft.server.storage.LogSegment.getEntrySize;
-
-/**
- * Test basic functionality of {@link LogSegment}
- */
-public class TestRaftLogSegment {
- private File storageDir;
- private final RaftProperties properties = new RaftProperties();
-
- @Before
- public void setup() throws Exception {
- storageDir = RaftTestUtil.getTestDir(TestRaftLogSegment.class);
- properties.set(RaftServerConfigKeys.RAFT_SERVER_STORAGE_DIR_KEY,
- storageDir.getCanonicalPath());
- }
-
- @After
- public void tearDown() throws Exception {
- if (storageDir != null) {
- FileUtils.fullyDelete(storageDir.getParentFile());
- }
- }
-
- private File prepareLog(boolean isOpen, long start, int size, long term)
- throws IOException {
- RaftStorage storage = new RaftStorage(properties, StartupOption.REGULAR);
- File file = isOpen ? storage.getStorageDir().getOpenLogFile(start) :
- storage.getStorageDir().getClosedLogFile(start, start + size - 1);
-
- LogEntryProto[] entries = new LogEntryProto[size];
- try (LogOutputStream out = new LogOutputStream(file, false, properties)) {
- for (int i = 0; i < size; i++) {
- SimpleOperation op = new SimpleOperation("m" + i);
- entries[i] = ProtoUtils.toLogEntryProto(op.getLogEntryContent(),
- term, i + start);
- out.write(entries[i]);
- }
- }
- storage.close();
- return file;
- }
-
- private void checkLogSegment(LogSegment segment, long start, long end,
- boolean isOpen, long totalSize, long term) {
- Assert.assertEquals(start, segment.getStartIndex());
- Assert.assertEquals(end, segment.getEndIndex());
- Assert.assertEquals(isOpen, segment.isOpen());
- Assert.assertEquals(totalSize, segment.getTotalSize());
-
- long offset = SegmentedRaftLog.HEADER_BYTES.length;
- for (long i = start; i <= end; i++) {
- LogSegment.LogRecord record = segment.getLogRecord(i);
- Assert.assertEquals(i, record.entry.getIndex());
- Assert.assertEquals(term, record.entry.getTerm());
- Assert.assertEquals(offset, record.offset);
-
- offset += getEntrySize(record.entry);
- }
- }
-
- @Test
- public void testLoadLogSegment() throws Exception {
- // load an open segment
- File openSegmentFile = prepareLog(true, 0, 100, 0);
- LogSegment openSegment = LogSegment.loadSegment(openSegmentFile, 0,
- INVALID_LOG_INDEX, true, null);
- checkLogSegment(openSegment, 0, 99, true, openSegmentFile.length(), 0);
-
- // load a closed segment (1000-1099)
- File closedSegmentFile = prepareLog(false, 1000, 100, 1);
- LogSegment closedSegment = LogSegment.loadSegment(closedSegmentFile, 1000,
- 1099, false, null);
- checkLogSegment(closedSegment, 1000, 1099, false,
- closedSegment.getTotalSize(), 1);
- }
-
- @Test
- public void testAppendEntries() throws Exception {
- final long start = 1000;
- LogSegment segment = LogSegment.newOpenSegment(start);
- long size = SegmentedRaftLog.HEADER_BYTES.length;
- final long max = 8 * 1024 * 1024;
- checkLogSegment(segment, start, start - 1, true, size, 0);
-
- // append till full
- long term = 0;
- int i = 0;
- List<LogEntryProto> list = new ArrayList<>();
- while (size < max) {
- SimpleOperation op = new SimpleOperation("m" + i);
- LogEntryProto entry = ProtoUtils.toLogEntryProto(op.getLogEntryContent(),
- term, i++ + start);
- size += getEntrySize(entry);
- list.add(entry);
- }
-
- segment.appendToOpenSegment(list.toArray(new LogEntryProto[list.size()]));
- Assert.assertTrue(segment.getTotalSize() >= max);
- checkLogSegment(segment, start, i - 1 + start, true, size, term);
- }
-
- @Test
- public void testAppendWithGap() throws Exception {
- LogSegment segment = LogSegment.newOpenSegment(1000);
- SimpleOperation op = new SimpleOperation("m");
- final SMLogEntryProto m = op.getLogEntryContent();
- try {
- LogEntryProto entry = ProtoUtils.toLogEntryProto(m, 0, 1001);
- segment.appendToOpenSegment(entry);
- Assert.fail("should fail since the entry's index needs to be 1000");
- } catch (Exception e) {
- Assert.assertTrue(e instanceof IllegalArgumentException);
- }
-
- LogEntryProto entry = ProtoUtils.toLogEntryProto(m, 0, 1000);
- segment.appendToOpenSegment(entry);
-
- try {
- entry = ProtoUtils.toLogEntryProto(m, 0, 1002);
- segment.appendToOpenSegment(entry);
- Assert.fail("should fail since the entry's index needs to be 1001");
- } catch (Exception e) {
- Assert.assertTrue(e instanceof IllegalArgumentException);
- }
-
- LogEntryProto[] entries = new LogEntryProto[2];
- for (int i = 0; i < 2; i++) {
- entries[i] = ProtoUtils.toLogEntryProto(m, 0, 1001 + i * 2);
- }
- try {
- segment.appendToOpenSegment(entries);
- Assert.fail("should fail since there is gap between entries");
- } catch (Exception e) {
- Assert.assertTrue(e instanceof IllegalArgumentException);
- }
- }
-
- @Test
- public void testTruncate() throws Exception {
- final long term = 1;
- final long start = 1000;
- LogSegment segment = LogSegment.newOpenSegment(start);
- for (int i = 0; i < 100; i++) {
- LogEntryProto entry = ProtoUtils.toLogEntryProto(
- new SimpleOperation("m" + i).getLogEntryContent(), term, i + start);
- segment.appendToOpenSegment(entry);
- }
-
- // truncate an open segment (remove 1080~1099)
- long newSize = segment.getLogRecord(start + 80).offset;
- segment.truncate(start + 80);
- Assert.assertEquals(80, segment.numOfEntries());
- checkLogSegment(segment, start, start + 79, false, newSize, term);
-
- // truncate a closed segment (remove 1050~1079)
- newSize = segment.getLogRecord(start + 50).offset;
- segment.truncate(start + 50);
- Assert.assertEquals(50, segment.numOfEntries());
- checkLogSegment(segment, start, start + 49, false, newSize, term);
-
- // truncate all the remaining entries
- segment.truncate(start);
- Assert.assertEquals(0, segment.numOfEntries());
- checkLogSegment(segment, start, start - 1, false,
- SegmentedRaftLog.HEADER_BYTES.length, term);
- }
-
- private RaftProperties getProperties(long maxSegmentSize,
- long preallocatedSize) {
- RaftProperties p = new RaftProperties();
- p.setLong(RAFT_LOG_SEGMENT_MAX_SIZE_KEY,
- maxSegmentSize);
- p.setLong(RaftServerConfigKeys.RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_KEY,
- preallocatedSize);
- return p;
- }
-
- @Test
- public void testPreallocateSegment() throws Exception {
- RaftStorage storage = new RaftStorage(properties, StartupOption.REGULAR);
- final File file = storage.getStorageDir().getOpenLogFile(0);
- final int[] maxSizes = new int[]{1024, 1025, 1024 * 1024 - 1, 1024 * 1024,
- 1024 * 1024 + 1, 2 * 1024 * 1024 - 1, 2 * 1024 * 1024,
- 2 * 1024 * 1024 + 1, 8 * 1024 * 1024};
- final int[] preallocated = new int[]{512, 1024, 1025, 1024 * 1024,
- 1024 * 1024 + 1, 2 * 1024 * 1024};
-
- // make sure preallocation is correct with different max/pre-allocated size
- for (int max : maxSizes) {
- for (int a : preallocated) {
- try (LogOutputStream ignored =
- new LogOutputStream(file, false, getProperties(max, a))) {
- Assert.assertEquals(file.length(), Math.min(max, a));
- }
- try (LogInputStream in =
- new LogInputStream(file, 0, INVALID_LOG_INDEX, true)) {
- LogEntryProto entry = in.nextEntry();
- Assert.assertNull(entry);
- }
- }
- }
-
- // test the scenario where an entry's size is larger than the max size
- final byte[] content = new byte[1024 * 2];
- Arrays.fill(content, (byte) 1);
- final long size;
- try (LogOutputStream out = new LogOutputStream(file, false,
- getProperties(1024, 1024))) {
- SimpleOperation op = new SimpleOperation(new String(content));
- LogEntryProto entry = ProtoUtils.toLogEntryProto(op.getLogEntryContent(),
- 0, 0);
- size = LogSegment.getEntrySize(entry);
- out.write(entry);
- }
- Assert.assertEquals(file.length(),
- size + SegmentedRaftLog.HEADER_BYTES.length);
- try (LogInputStream in = new LogInputStream(file, 0,
- INVALID_LOG_INDEX, true)) {
- LogEntryProto entry = in.nextEntry();
- Assert.assertArrayEquals(content,
- entry.getSmLogEntry().getData().toByteArray());
- Assert.assertNull(in.nextEntry());
- }
- }
-
- /**
- * Keep appending and check if pre-allocation is correct
- */
- @Test
- public void testPreallocationAndAppend() throws Exception {
- final long max = 2 * 1024 * 1024;
- properties.setLong(RAFT_LOG_SEGMENT_MAX_SIZE_KEY, max);
- properties.setLong(RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_KEY, 16 * 1024);
- properties.setLong(RAFT_LOG_WRITE_BUFFER_SIZE_KEY, 10 * 1024);
- RaftStorage storage = new RaftStorage(properties, StartupOption.REGULAR);
- final File file = storage.getStorageDir().getOpenLogFile(0);
-
- final byte[] content = new byte[1024];
- Arrays.fill(content, (byte) 1);
- SimpleOperation op = new SimpleOperation(new String(content));
- LogEntryProto entry = ProtoUtils.toLogEntryProto(op.getLogEntryContent(),
- 0, 0);
- final long entrySize = LogSegment.getEntrySize(entry);
-
- long totalSize = SegmentedRaftLog.HEADER_BYTES.length;
- long preallocated = 16 * 1024;
- try (LogOutputStream out = new LogOutputStream(file, false, properties)) {
- Assert.assertEquals(preallocated, file.length());
- while (totalSize + entrySize < max) {
- totalSize += entrySize;
- out.write(entry);
- if (totalSize > preallocated) {
- Assert.assertEquals("totalSize==" + totalSize,
- preallocated + 16 * 1024, file.length());
- preallocated += 16 * 1024;
- }
- }
- }
-
- Assert.assertEquals(totalSize, file.length());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/test/java/org/apache/raft/server/storage/TestRaftStorage.java
----------------------------------------------------------------------
diff --git a/raft-server/src/test/java/org/apache/raft/server/storage/TestRaftStorage.java b/raft-server/src/test/java/org/apache/raft/server/storage/TestRaftStorage.java
deleted file mode 100644
index 1b14199..0000000
--- a/raft-server/src/test/java/org/apache/raft/server/storage/TestRaftStorage.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.storage;
-
-import org.apache.raft.RaftTestUtil;
-import org.apache.raft.conf.RaftProperties;
-import org.apache.raft.io.nativeio.NativeIO;
-import org.apache.raft.server.RaftServerConfigKeys;
-import org.apache.raft.server.impl.RaftServerConstants.StartupOption;
-import org.apache.raft.server.protocol.TermIndex;
-import org.apache.raft.server.storage.RaftStorageDirectory.StorageState;
-import org.apache.raft.statemachine.SimpleStateMachineStorage;
-import org.apache.raft.util.FileUtils;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.internal.util.reflection.Whitebox;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.concurrent.ThreadLocalRandom;
-
-/**
- * Test RaftStorage and RaftStorageDirectory
- */
-public class TestRaftStorage {
- private File storageDir;
- private final RaftProperties properties = new RaftProperties();
-
- @Before
- public void setup() throws Exception {
- storageDir = RaftTestUtil.getTestDir(TestRaftStorage.class);
- properties.set(RaftServerConfigKeys.RAFT_SERVER_STORAGE_DIR_KEY,
- storageDir.getCanonicalPath());
- }
-
- @After
- public void tearDown() throws Exception {
- if (storageDir != null) {
- FileUtils.fullyDelete(storageDir.getParentFile());
- }
- }
-
- @Test
- public void testNotExistent() throws IOException {
- FileUtils.fullyDelete(storageDir);
-
- // we will format the empty directory
- RaftStorage storage = new RaftStorage(properties, StartupOption.REGULAR);
- Assert.assertEquals(StorageState.NORMAL, storage.getState());
-
- try {
- new RaftStorage(properties, StartupOption.FORMAT).close();
- Assert.fail("the format should fail since the storage is still locked");
- } catch (IOException e) {
- Assert.assertTrue(e.getMessage().contains("directory is already locked"));
- }
-
- storage.close();
- FileUtils.fullyDelete(storageDir);
- Assert.assertTrue(storageDir.createNewFile());
- try {
- new RaftStorage(properties, StartupOption.REGULAR);
- Assert.fail();
- } catch (IOException e) {
- Assert.assertTrue(
- e.getMessage().contains(StorageState.NON_EXISTENT.name()));
- }
- }
-
- /**
- * make sure the RaftStorage format works
- */
- @Test
- public void testStorage() throws Exception {
- RaftStorageDirectory sd = new RaftStorageDirectory(storageDir);
- try {
- StorageState state = sd.analyzeStorage(true);
- Assert.assertEquals(StorageState.NOT_FORMATTED, state);
- Assert.assertTrue(sd.isCurrentEmpty());
- } finally {
- sd.unlock();
- }
-
- RaftStorage storage = new RaftStorage(properties, StartupOption.REGULAR);
- Assert.assertEquals(StorageState.NORMAL, storage.getState());
- storage.close();
-
- Assert.assertEquals(StorageState.NORMAL, sd.analyzeStorage(false));
- File m = sd.getMetaFile();
- Assert.assertTrue(m.exists());
- MetaFile metaFile = new MetaFile(m);
- Assert.assertEquals(MetaFile.DEFAULT_TERM, metaFile.getTerm());
- Assert.assertEquals(MetaFile.EMPTY_VOTEFOR, metaFile.getVotedFor());
-
- metaFile.set(123, "peer1");
- metaFile.readFile();
- Assert.assertEquals(123, metaFile.getTerm());
- Assert.assertEquals("peer1", metaFile.getVotedFor());
-
- MetaFile metaFile2 = new MetaFile(m);
- Assert.assertFalse((Boolean) Whitebox.getInternalState(metaFile2, "loaded"));
- Assert.assertEquals(123, metaFile.getTerm());
- Assert.assertEquals("peer1", metaFile.getVotedFor());
-
- // test format
- storage = new RaftStorage(properties, StartupOption.FORMAT);
- Assert.assertEquals(StorageState.NORMAL, storage.getState());
- metaFile = new MetaFile(sd.getMetaFile());
- Assert.assertEquals(MetaFile.DEFAULT_TERM, metaFile.getTerm());
- Assert.assertEquals(MetaFile.EMPTY_VOTEFOR, metaFile.getVotedFor());
- storage.close();
- }
-
- @Test
- public void testMetaFile() throws Exception {
- RaftStorage storage = new RaftStorage(properties, StartupOption.FORMAT);
- File m = storage.getStorageDir().getMetaFile();
- Assert.assertTrue(m.exists());
- MetaFile metaFile = new MetaFile(m);
- Assert.assertEquals(MetaFile.DEFAULT_TERM, metaFile.getTerm());
- Assert.assertEquals(MetaFile.EMPTY_VOTEFOR, metaFile.getVotedFor());
-
- metaFile.set(123, "peer1");
- metaFile.readFile();
- Assert.assertEquals(123, metaFile.getTerm());
- Assert.assertEquals("peer1", metaFile.getVotedFor());
-
- MetaFile metaFile2 = new MetaFile(m);
- Assert.assertFalse((Boolean) Whitebox.getInternalState(metaFile2, "loaded"));
- Assert.assertEquals(123, metaFile.getTerm());
- Assert.assertEquals("peer1", metaFile.getVotedFor());
-
- storage.close();
- }
-
- /**
- * check if RaftStorage deletes tmp metafile when startup
- */
- @Test
- public void testCleanMetaTmpFile() throws Exception {
- RaftStorage storage = new RaftStorage(properties, StartupOption.REGULAR);
- Assert.assertEquals(StorageState.NORMAL, storage.getState());
- storage.close();
-
- RaftStorageDirectory sd = new RaftStorageDirectory(storageDir);
- File metaFile = sd.getMetaFile();
- NativeIO.renameTo(metaFile, sd.getMetaTmpFile());
-
- Assert.assertEquals(StorageState.NOT_FORMATTED, sd.analyzeStorage(false));
-
- try {
- new RaftStorage(properties, StartupOption.REGULAR);
- Assert.fail("should throw IOException since storage dir is not formatted");
- } catch (IOException e) {
- Assert.assertTrue(
- e.getMessage().contains(StorageState.NOT_FORMATTED.name()));
- }
-
- // let the storage dir contain both raft-meta and raft-meta.tmp
- new RaftStorage(properties, StartupOption.FORMAT).close();
- Assert.assertTrue(sd.getMetaFile().exists());
- Assert.assertTrue(sd.getMetaTmpFile().createNewFile());
- Assert.assertTrue(sd.getMetaTmpFile().exists());
- try {
- storage = new RaftStorage(properties, StartupOption.REGULAR);
- Assert.assertEquals(StorageState.NORMAL, storage.getState());
- Assert.assertFalse(sd.getMetaTmpFile().exists());
- Assert.assertTrue(sd.getMetaFile().exists());
- } finally {
- storage.close();
- }
- }
-
- @Test
- public void testSnapshotFileName() throws Exception {
- final long term = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
- final long index = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
- final String name = SimpleStateMachineStorage.getSnapshotFileName(term, index);
- System.out.println("name = " + name);
- final File file = new File(storageDir, name);
- final TermIndex ti = SimpleStateMachineStorage.getTermIndexFromSnapshotFile(file);
- System.out.println("file = " + file);
- Assert.assertEquals(term, ti.getTerm());
- Assert.assertEquals(index, ti.getIndex());
- System.out.println("ti = " + ti);
-
- final File foo = new File(storageDir, "foo");
- try {
- SimpleStateMachineStorage.getTermIndexFromSnapshotFile(foo);
- Assert.fail();
- } catch(IllegalArgumentException iae) {
- System.out.println("Good " + iae);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/test/java/org/apache/raft/server/storage/TestSegmentedRaftLog.java
----------------------------------------------------------------------
diff --git a/raft-server/src/test/java/org/apache/raft/server/storage/TestSegmentedRaftLog.java b/raft-server/src/test/java/org/apache/raft/server/storage/TestSegmentedRaftLog.java
deleted file mode 100644
index 264ba8e..0000000
--- a/raft-server/src/test/java/org/apache/raft/server/storage/TestSegmentedRaftLog.java
+++ /dev/null
@@ -1,329 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.storage;
-
-import org.apache.log4j.Level;
-import org.apache.raft.MiniRaftCluster;
-import org.apache.raft.RaftTestUtil;
-import org.apache.raft.RaftTestUtil.SimpleOperation;
-import org.apache.raft.conf.RaftProperties;
-import org.apache.raft.server.RaftServerConfigKeys;
-import org.apache.raft.server.impl.ConfigurationManager;
-import org.apache.raft.server.impl.RaftServerConstants;
-import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto;
-import org.apache.raft.util.FileUtils;
-import org.apache.raft.util.ProtoUtils;
-import org.apache.raft.util.RaftUtils;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.function.Supplier;
-
-import static org.apache.raft.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_MAX_SIZE_KEY;
-import static org.apache.raft.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_KEY;
-
-public class TestSegmentedRaftLog {
- static {
- RaftUtils.setLogLevel(RaftLogWorker.LOG, Level.DEBUG);
- }
-
- private static final String peerId = "s0";
-
- private static class SegmentRange {
- final long start;
- final long end;
- final long term;
- final boolean isOpen;
-
- SegmentRange(long s, long e, long term, boolean isOpen) {
- this.start = s;
- this.end = e;
- this.term = term;
- this.isOpen = isOpen;
- }
- }
-
- private File storageDir;
- private RaftProperties properties;
- private RaftStorage storage;
- private final ConfigurationManager cm = new ConfigurationManager(
- MiniRaftCluster.initConfiguration(MiniRaftCluster.generateIds(3, 0)));
-
- @Before
- public void setup() throws Exception {
- storageDir = RaftTestUtil.getTestDir(TestSegmentedRaftLog.class);
- properties = new RaftProperties();
- properties.set(RaftServerConfigKeys.RAFT_SERVER_STORAGE_DIR_KEY,
- storageDir.getCanonicalPath());
- storage = new RaftStorage(properties, RaftServerConstants.StartupOption.REGULAR);
- }
-
- @After
- public void tearDown() throws Exception {
- if (storageDir != null) {
- FileUtils.fullyDelete(storageDir.getParentFile());
- }
- }
-
- private LogEntryProto[] prepareLog(List<SegmentRange> list) throws IOException {
- List<LogEntryProto> entryList = new ArrayList<>();
- for (SegmentRange range : list) {
- File file = range.isOpen ?
- storage.getStorageDir().getOpenLogFile(range.start) :
- storage.getStorageDir().getClosedLogFile(range.start, range.end);
-
- final int size = (int) (range.end - range.start + 1);
- LogEntryProto[] entries = new LogEntryProto[size];
- try (LogOutputStream out = new LogOutputStream(file, false, properties)) {
- for (int i = 0; i < size; i++) {
- SimpleOperation m = new SimpleOperation("m" + (i + range.start));
- entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(),
- range.term, i + range.start);
- out.write(entries[i]);
- }
- }
- Collections.addAll(entryList, entries);
- }
- return entryList.toArray(new LogEntryProto[entryList.size()]);
- }
-
- private List<SegmentRange> prepareRanges(int number, int segmentSize,
- long startIndex) {
- List<SegmentRange> list = new ArrayList<>(number);
- for (int i = 0; i < number; i++) {
- list.add(new SegmentRange(startIndex, startIndex + segmentSize - 1, i,
- i == number - 1));
- startIndex += segmentSize;
- }
- return list;
- }
-
- @Test
- public void testLoadLogSegments() throws Exception {
- // first generate log files
- List<SegmentRange> ranges = prepareRanges(5, 100, 0);
- LogEntryProto[] entries = prepareLog(ranges);
-
- // create RaftLog object and load log file
- try (SegmentedRaftLog raftLog =
- new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
- raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX);
- // check if log entries are loaded correctly
- for (LogEntryProto e : entries) {
- LogEntryProto entry = raftLog.get(e.getIndex());
- Assert.assertEquals(e, entry);
- }
-
- Assert.assertArrayEquals(entries, raftLog.getEntries(0, 500));
- Assert.assertEquals(entries[entries.length - 1], raftLog.getLastEntry());
- }
- }
-
- List<LogEntryProto> prepareLogEntries(List<SegmentRange> slist,
- Supplier<String> stringSupplier) {
- List<LogEntryProto> eList = new ArrayList<>();
- for (SegmentRange range : slist) {
- for (long index = range.start; index <= range.end; index++) {
- SimpleOperation m = stringSupplier == null ?
- new SimpleOperation("m" + index) :
- new SimpleOperation(stringSupplier.get());
- eList.add(ProtoUtils.toLogEntryProto(m.getLogEntryContent(),
- range.term, index));
- }
- }
- return eList;
- }
-
- /**
- * Append entry one by one and check if log state is correct.
- */
- @Test
- public void testAppendEntry() throws Exception {
- List<SegmentRange> ranges = prepareRanges(5, 200, 0);
- List<LogEntryProto> entries = prepareLogEntries(ranges, null);
-
- try (SegmentedRaftLog raftLog =
- new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
- raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX);
- // append entries to the raftlog
- entries.forEach(raftLog::appendEntry);
- raftLog.logSync();
- }
-
- try (SegmentedRaftLog raftLog =
- new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
- raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX);
- // check if the raft log is correct
- checkEntries(raftLog, entries, 0, entries.size());
- }
- }
-
- /**
- * Keep appending entries, make sure the rolling is correct.
- */
- @Test
- public void testAppendAndRoll() throws Exception {
- properties.setLong(RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_KEY, 16 * 1024);
- properties.setLong(RAFT_LOG_SEGMENT_MAX_SIZE_KEY, 128 * 1024);
-
- List<SegmentRange> ranges = prepareRanges(1, 1024, 0);
- final byte[] content = new byte[1024];
- List<LogEntryProto> entries = prepareLogEntries(ranges,
- () -> new String(content));
-
- try (SegmentedRaftLog raftLog =
- new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
- raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX);
- // append entries to the raftlog
- entries.forEach(raftLog::appendEntry);
- raftLog.logSync();
- }
-
- try (SegmentedRaftLog raftLog =
- new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
- raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX);
- // check if the raft log is correct
- checkEntries(raftLog, entries, 0, entries.size());
- Assert.assertEquals(9, raftLog.getRaftLogCache().getNumOfSegments());
- }
- }
-
- @Test
- public void testTruncate() throws Exception {
- // prepare the log for truncation
- List<SegmentRange> ranges = prepareRanges(5, 200, 0);
- List<LogEntryProto> entries = prepareLogEntries(ranges, null);
-
- try (SegmentedRaftLog raftLog =
- new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
- raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX);
- // append entries to the raftlog
- entries.forEach(raftLog::appendEntry);
- raftLog.logSync();
- }
-
- for (long fromIndex = 900; fromIndex >= 0; fromIndex -= 150) {
- testTruncate(entries, fromIndex);
- }
- }
-
- private void testTruncate(List<LogEntryProto> entries, long fromIndex)
- throws Exception {
- try (SegmentedRaftLog raftLog =
- new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
- raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX);
- // truncate the log
- raftLog.truncate(fromIndex);
- raftLog.logSync();
-
- checkEntries(raftLog, entries, 0, (int) fromIndex);
- }
-
- try (SegmentedRaftLog raftLog =
- new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
- raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX);
- // check if the raft log is correct
- if (fromIndex > 0) {
- Assert.assertEquals(entries.get((int) (fromIndex - 1)),
- raftLog.getLastEntry());
- } else {
- Assert.assertNull(raftLog.getLastEntry());
- }
- checkEntries(raftLog, entries, 0, (int) fromIndex);
- }
- }
-
- private void checkEntries(RaftLog raftLog, List<LogEntryProto> expected,
- int offset, int size) {
- if (size > 0) {
- for (int i = offset; i < size + offset; i++) {
- LogEntryProto entry = raftLog.get(expected.get(i).getIndex());
- Assert.assertEquals(expected.get(i), entry);
- }
- LogEntryProto[] entriesFromLog = raftLog.getEntries(
- expected.get(offset).getIndex(),
- expected.get(offset + size - 1).getIndex() + 1);
- LogEntryProto[] expectedArray = expected.subList(offset, offset + size)
- .toArray(SegmentedRaftLog.EMPTY_LOGENTRY_ARRAY);
- Assert.assertArrayEquals(expectedArray, entriesFromLog);
- }
- }
-
- /**
- * Test append with inconsistent entries
- */
- @Test
- public void testAppendEntriesWithInconsistency() throws Exception {
- // prepare the log for truncation
- List<SegmentRange> ranges = prepareRanges(5, 200, 0);
- List<LogEntryProto> entries = prepareLogEntries(ranges, null);
-
- try (SegmentedRaftLog raftLog =
- new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
- raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX);
- // append entries to the raftlog
- entries.forEach(raftLog::appendEntry);
- raftLog.logSync();
- }
-
- // append entries whose first 100 entries are the same with existing log,
- // and the next 100 are with different term
- SegmentRange r1 = new SegmentRange(550, 599, 2, false);
- SegmentRange r2 = new SegmentRange(600, 649, 3, false);
- SegmentRange r3 = new SegmentRange(650, 749, 10, false);
- List<LogEntryProto> newEntries = prepareLogEntries(
- Arrays.asList(r1, r2, r3), null);
-
- try (SegmentedRaftLog raftLog =
- new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
- raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX);
- raftLog.append(newEntries.toArray(new LogEntryProto[newEntries.size()]));
- raftLog.logSync();
-
- checkEntries(raftLog, entries, 0, 650);
- checkEntries(raftLog, newEntries, 100, 100);
- Assert.assertEquals(newEntries.get(newEntries.size() - 1),
- raftLog.getLastEntry());
- Assert.assertEquals(newEntries.get(newEntries.size() - 1).getIndex(),
- raftLog.getLatestFlushedIndex());
- }
-
- // load the raftlog again and check
- try (SegmentedRaftLog raftLog =
- new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
- raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX);
- checkEntries(raftLog, entries, 0, 650);
- checkEntries(raftLog, newEntries, 100, 100);
- Assert.assertEquals(newEntries.get(newEntries.size() - 1),
- raftLog.getLastEntry());
- Assert.assertEquals(newEntries.get(newEntries.size() - 1).getIndex(),
- raftLog.getLatestFlushedIndex());
-
- RaftLogCache cache = raftLog.getRaftLogCache();
- Assert.assertEquals(5, cache.getNumOfSegments());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/test/java/org/apache/raft/statemachine/RaftSnapshotBaseTest.java
----------------------------------------------------------------------
diff --git a/raft-server/src/test/java/org/apache/raft/statemachine/RaftSnapshotBaseTest.java b/raft-server/src/test/java/org/apache/raft/statemachine/RaftSnapshotBaseTest.java
deleted file mode 100644
index fbdcb8b..0000000
--- a/raft-server/src/test/java/org/apache/raft/statemachine/RaftSnapshotBaseTest.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.statemachine;
-
-import org.apache.log4j.Level;
-import org.apache.raft.MiniRaftCluster;
-import org.apache.raft.RaftTestUtil;
-import org.apache.raft.RaftTestUtil.SimpleMessage;
-import org.apache.raft.client.RaftClient;
-import org.apache.raft.conf.RaftProperties;
-import org.apache.raft.protocol.RaftClientReply;
-import org.apache.raft.protocol.SetConfigurationRequest;
-import org.apache.raft.server.RaftServer;
-import org.apache.raft.server.impl.RaftServerImpl;
-import org.apache.raft.server.impl.RaftServerTestUtil;
-import org.apache.raft.server.simulation.RequestHandler;
-import org.apache.raft.server.storage.RaftLog;
-import org.apache.raft.server.storage.RaftStorageDirectory;
-import org.apache.raft.server.storage.RaftStorageDirectory.LogPathAndIndex;
-import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto;
-import org.apache.raft.util.FileUtils;
-import org.apache.raft.util.RaftUtils;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.apache.raft.server.RaftServerConfigKeys.RAFT_SERVER_AUTO_SNAPSHOT_ENABLED_KEY;
-import static org.apache.raft.server.RaftServerConfigKeys.RAFT_SERVER_SNAPSHOT_TRIGGER_THRESHOLD_KEY;
-import static org.apache.raft.server.impl.RaftServerConstants.DEFAULT_SEQNUM;
-
-public abstract class RaftSnapshotBaseTest {
- static {
- RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
- RaftUtils.setLogLevel(RaftLog.LOG, Level.DEBUG);
- RaftUtils.setLogLevel(RequestHandler.LOG, Level.DEBUG);
- RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
- }
-
- static final Logger LOG = LoggerFactory.getLogger(RaftSnapshotBaseTest.class);
- private static final int SNAPSHOT_TRIGGER_THRESHOLD = 10;
-
- static File getSnapshotFile(MiniRaftCluster cluster, int i) {
- final RaftServerImpl leader = cluster.getLeader();
- final SimpleStateMachine4Testing sm = SimpleStateMachine4Testing.get(leader);
- return sm.getStateMachineStorage().getSnapshotFile(
- leader.getState().getCurrentTerm(), i);
- }
-
- static void assertLeaderContent(MiniRaftCluster cluster)
- throws InterruptedException {
- final RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster);
- Assert.assertEquals(SNAPSHOT_TRIGGER_THRESHOLD * 2,
- leader.getState().getLog().getLastCommittedIndex());
- final LogEntryProto[] entries = SimpleStateMachine4Testing.get(leader).getContent();
-
- for (int i = 1; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) {
- Assert.assertEquals(i+1, entries[i].getIndex());
- Assert.assertArrayEquals(
- new SimpleMessage("m" + i).getContent().toByteArray(),
- entries[i].getSmLogEntry().getData().toByteArray());
- }
- }
-
- private MiniRaftCluster cluster;
-
- public abstract MiniRaftCluster initCluster(int numServer, RaftProperties prop)
- throws IOException;
-
- @Before
- public void setup() throws IOException {
- final RaftProperties prop = new RaftProperties();
- prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
- SimpleStateMachine4Testing.class, StateMachine.class);
- prop.setLong(RAFT_SERVER_SNAPSHOT_TRIGGER_THRESHOLD_KEY,
- SNAPSHOT_TRIGGER_THRESHOLD);
- prop.setBoolean(RAFT_SERVER_AUTO_SNAPSHOT_ENABLED_KEY, true);
- this.cluster = initCluster(1, prop);
- cluster.start();
- }
-
- @After
- public void tearDown() {
- if (cluster != null) {
- cluster.shutdown();
- }
- }
-
- /**
- * Keep generating writing traffic and make sure snapshots are taken.
- * We then restart the whole raft peer and check if it can correctly load
- * snapshots + raft log.
- */
- @Test
- public void testRestartPeer() throws Exception {
- RaftTestUtil.waitForLeader(cluster);
- final String leaderId = cluster.getLeader().getId();
- int i = 0;
- try(final RaftClient client = cluster.createClient("client", leaderId)) {
- for (; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) {
- RaftClientReply reply = client.send(new SimpleMessage("m" + i));
- Assert.assertTrue(reply.isSuccess());
- }
- }
-
- // wait for the snapshot to be done
- final File snapshotFile = getSnapshotFile(cluster, i);
-
- int retries = 0;
- do {
- Thread.sleep(1000);
- } while (!snapshotFile.exists() && retries++ < 10);
-
- Assert.assertTrue(snapshotFile + " does not exist", snapshotFile.exists());
-
- // restart the peer and check if it can correctly load snapshot
- cluster.restart(false);
- try {
- // 200 messages + two leader elections --> last committed = 201
- assertLeaderContent(cluster);
- } finally {
- cluster.shutdown();
- }
- }
-
- /**
- * Basic test for install snapshot: start a one node cluster and let it
- * generate a snapshot. Then delete the log and restart the node, and add more
- * nodes as followers.
- */
- @Test
- public void testBasicInstallSnapshot() throws Exception {
- List<LogPathAndIndex> logs = new ArrayList<>();
- try {
- RaftTestUtil.waitForLeader(cluster);
- final String leaderId = cluster.getLeader().getId();
-
- int i = 0;
- try(final RaftClient client = cluster.createClient("client", leaderId)) {
- for (; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) {
- RaftClientReply reply = client.send(new SimpleMessage("m" + i));
- Assert.assertTrue(reply.isSuccess());
- }
- }
-
- // wait for the snapshot to be done
- RaftStorageDirectory storageDirectory = cluster.getLeader().getState()
- .getStorage().getStorageDir();
- final File snapshotFile = getSnapshotFile(cluster, i);
- logs = storageDirectory.getLogSegmentFiles();
-
- int retries = 0;
- do {
- Thread.sleep(1000);
- } while (!snapshotFile.exists() && retries++ < 10);
-
- Assert.assertTrue(snapshotFile + " does not exist", snapshotFile.exists());
- } finally {
- cluster.shutdown();
- }
-
- // delete the log segments from the leader
- for (LogPathAndIndex path : logs) {
- FileUtils.deleteFile(path.path.toFile());
- }
-
- // restart the peer
- LOG.info("Restarting the cluster");
- cluster.restart(false);
- try {
- assertLeaderContent(cluster);
-
- // generate some more traffic
- try(final RaftClient client = cluster.createClient("client",
- cluster.getLeader().getId())) {
- Assert.assertTrue(client.send(new SimpleMessage("test")).isSuccess());
- }
-
- // add two more peers
- MiniRaftCluster.PeerChanges change = cluster.addNewPeers(
- new String[]{"s3", "s4"}, true);
- // trigger setConfiguration
- SetConfigurationRequest request = new SetConfigurationRequest("client",
- cluster.getLeader().getId(), DEFAULT_SEQNUM, change.allPeersInNewConf);
- LOG.info("Start changing the configuration: {}", request);
- cluster.getLeader().setConfiguration(request);
-
- RaftServerTestUtil.waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, null);
- } finally {
- cluster.shutdown();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/test/java/org/apache/raft/statemachine/SimpleStateMachine4Testing.java
----------------------------------------------------------------------
diff --git a/raft-server/src/test/java/org/apache/raft/statemachine/SimpleStateMachine4Testing.java b/raft-server/src/test/java/org/apache/raft/statemachine/SimpleStateMachine4Testing.java
deleted file mode 100644
index b8dd3f3..0000000
--- a/raft-server/src/test/java/org/apache/raft/statemachine/SimpleStateMachine4Testing.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.statemachine;
-
-import com.google.common.base.Preconditions;
-import org.apache.raft.RaftTestUtil.SimpleMessage;
-import org.apache.raft.conf.RaftProperties;
-import org.apache.raft.io.MD5Hash;
-import org.apache.raft.protocol.Message;
-import org.apache.raft.protocol.RaftClientReply;
-import org.apache.raft.protocol.RaftClientRequest;
-import org.apache.raft.server.impl.RaftServerConstants;
-import org.apache.raft.server.impl.RaftServerImpl;
-import org.apache.raft.server.impl.RaftServerTestUtil;
-import org.apache.raft.server.impl.ServerProtoUtils;
-import org.apache.raft.server.protocol.TermIndex;
-import org.apache.raft.server.storage.LogInputStream;
-import org.apache.raft.server.storage.LogOutputStream;
-import org.apache.raft.server.storage.RaftStorage;
-import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto;
-import org.apache.raft.shaded.proto.RaftProtos.SMLogEntryProto;
-import org.apache.raft.util.Daemon;
-import org.apache.raft.util.LifeCycle;
-import org.apache.raft.util.MD5FileUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-
-/**
- * A {@link StateMachine} implementation example that simply stores all the log
- * entries in a list. Mainly used for test.
- *
- * For snapshot it simply merges all the log segments together.
- */
-public class SimpleStateMachine4Testing extends BaseStateMachine {
- static volatile int SNAPSHOT_THRESHOLD = 100;
- static final Logger LOG = LoggerFactory.getLogger(SimpleStateMachine4Testing.class);
- public static final String RAFT_TEST_SIMPLE_STATE_MACHINE_TAKE_SNAPSHOT_KEY
- = "raft.test.simple.state.machine.take.snapshot";
- public static final boolean RAFT_TEST_SIMPLE_STATE_MACHINE_TAKE_SNAPSHOT_DEFAULT = false;
-
- public static SimpleStateMachine4Testing get(RaftServerImpl s) {
- return (SimpleStateMachine4Testing)RaftServerTestUtil.getStateMachine(s);
- }
-
- private final List<LogEntryProto> list =
- Collections.synchronizedList(new ArrayList<>());
- private final Daemon checkpointer;
- private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage();
- private final TermIndexTracker termIndexTracker = new TermIndexTracker();
- private final RaftProperties properties = new RaftProperties();
-
- private volatile boolean running = true;
- private long endIndexLastCkpt = RaftServerConstants.INVALID_LOG_INDEX;
-
- SimpleStateMachine4Testing() {
- checkpointer = new Daemon(() -> {
- while (running) {
- try {
- if (list.get(list.size() - 1).getIndex() - endIndexLastCkpt >=
- SNAPSHOT_THRESHOLD) {
- endIndexLastCkpt = takeSnapshot();
- }
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ignored) {
- }
- } catch (IOException ioe) {
- LOG.warn("Received IOException in Checkpointer", ioe);
- }
- }
- });
- }
-
- @Override
- public synchronized void initialize(String id, RaftProperties properties,
- RaftStorage raftStorage) throws IOException {
- LOG.info("Initializing " + getClass().getSimpleName() + ":" + id);
- lifeCycle.startAndTransition(() -> {
- super.initialize(id, properties, raftStorage);
- storage.init(raftStorage);
- loadSnapshot(storage.findLatestSnapshot());
-
- if (properties.getBoolean(
- RAFT_TEST_SIMPLE_STATE_MACHINE_TAKE_SNAPSHOT_KEY,
- RAFT_TEST_SIMPLE_STATE_MACHINE_TAKE_SNAPSHOT_DEFAULT)) {
- checkpointer.start();
- }
- });
- }
-
- @Override
- public synchronized void pause() {
- lifeCycle.transition(LifeCycle.State.PAUSING);
- lifeCycle.transition(LifeCycle.State.PAUSED);
- }
-
- @Override
- public synchronized void reinitialize(String id, RaftProperties properties,
- RaftStorage storage) throws IOException {
- LOG.info("Reinitializing " + getClass().getSimpleName() + ":" + id);
- initialize(id, properties, storage);
- }
-
- @Override
- public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
- LogEntryProto entry = trx.getLogEntry().get();
- Preconditions.checkNotNull(entry);
- list.add(entry);
- termIndexTracker.update(ServerProtoUtils.toTermIndex(entry));
- return CompletableFuture.completedFuture(
- new SimpleMessage(entry.getIndex() + " OK"));
- }
-
- @Override
- public long takeSnapshot() throws IOException {
- TermIndex termIndex = termIndexTracker.getLatestTermIndex();
- if (termIndex.getTerm() <= 0 || termIndex.getIndex() <= 0) {
- return RaftServerConstants.INVALID_LOG_INDEX;
- }
- final long endIndex = termIndex.getIndex();
-
- // TODO: snapshot should be written to a tmp file, then renamed
- File snapshotFile = storage.getSnapshotFile(termIndex.getTerm(),
- termIndex.getIndex());
- LOG.debug("Taking a snapshot with t:{}, i:{}, file:{}", termIndex.getTerm(),
- termIndex.getIndex(), snapshotFile);
- try (LogOutputStream out = new LogOutputStream(snapshotFile, false, properties)) {
- for (final LogEntryProto entry : list) {
- if (entry.getIndex() > endIndex) {
- break;
- } else {
- out.write(entry);
- }
- }
- out.flush();
- } catch (IOException e) {
- LOG.warn("Failed to take snapshot", e);
- }
-
- try {
- final MD5Hash digest = MD5FileUtil.computeMd5ForFile(snapshotFile);
- MD5FileUtil.saveMD5File(snapshotFile, digest);
- } catch (IOException e) {
- LOG.warn("Hit IOException when computing MD5 for snapshot file "
- + snapshotFile, e);
- }
-
- try {
- this.storage.loadLatestSnapshot();
- } catch (IOException e) {
- LOG.warn("Hit IOException when loading latest snapshot for snapshot file "
- + snapshotFile, e);
- }
- // TODO: purge log segments
- return endIndex;
- }
-
- @Override
- public SimpleStateMachineStorage getStateMachineStorage() {
- return storage;
- }
-
- public synchronized long loadSnapshot(SingleFileSnapshotInfo snapshot)
- throws IOException {
- if (snapshot == null || !snapshot.getFile().getPath().toFile().exists()) {
- LOG.info("The snapshot file {} does not exist",
- snapshot == null ? null : snapshot.getFile());
- return RaftServerConstants.INVALID_LOG_INDEX;
- } else {
- LOG.info("Loading snapshot with t:{}, i:{}, file:{}", snapshot.getTerm(),
- snapshot.getIndex(), snapshot.getFile().getPath());
- final long endIndex = snapshot.getIndex();
- try (LogInputStream in = new LogInputStream(
- snapshot.getFile().getPath().toFile(), 0, endIndex, false)) {
- LogEntryProto entry;
- while ((entry = in.nextEntry()) != null) {
- list.add(entry);
- termIndexTracker.update(ServerProtoUtils.toTermIndex(entry));
- }
- }
- Preconditions.checkState(
- !list.isEmpty() && endIndex == list.get(list.size() - 1).getIndex(),
- "endIndex=%s, list=%s", endIndex, list);
- this.endIndexLastCkpt = endIndex;
- termIndexTracker.init(snapshot.getTermIndex());
- this.storage.loadLatestSnapshot();
- return endIndex;
- }
- }
-
- @Override
- public CompletableFuture<RaftClientReply> query(
- RaftClientRequest request) {
- return CompletableFuture.completedFuture(
- new RaftClientReply(request, new SimpleMessage("query success")));
- }
-
- @Override
- public TransactionContext startTransaction(RaftClientRequest request)
- throws IOException {
- return new TransactionContext(this, request, SMLogEntryProto.newBuilder()
- .setData(request.getMessage().getContent())
- .build());
- }
-
- @Override
- public void notifyNotLeader(Collection<TransactionContext> pendingEntries) {
- // do nothing
- }
-
- @Override
- public void close() {
- lifeCycle.checkStateAndClose(() -> {
- running = false;
- checkpointer.interrupt();
- });
- }
-
- public LogEntryProto[] getContent() {
- return list.toArray(new LogEntryProto[list.size()]);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/test/java/org/apache/raft/statemachine/TermIndexTracker.java
----------------------------------------------------------------------
diff --git a/raft-server/src/test/java/org/apache/raft/statemachine/TermIndexTracker.java b/raft-server/src/test/java/org/apache/raft/statemachine/TermIndexTracker.java
deleted file mode 100644
index b08fe11..0000000
--- a/raft-server/src/test/java/org/apache/raft/statemachine/TermIndexTracker.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.statemachine;
-
-import com.google.common.base.Preconditions;
-import org.apache.raft.server.protocol.TermIndex;
-
-import static org.apache.raft.server.impl.RaftServerConstants.INVALID_LOG_INDEX;
-
-/**
- * Tracks the term index that is applied to the StateMachine for simple state machines with
- * no concurrent snapshoting capabilities.
- */
-class TermIndexTracker {
- static final TermIndex INIT_TERMINDEX =
- TermIndex.newTermIndex(INVALID_LOG_INDEX, INVALID_LOG_INDEX);
-
- private TermIndex current = INIT_TERMINDEX;
-
- //TODO: developer note: everything is synchronized for now for convenience.
-
- /**
- * Initialize the tracker with a term index (likely from a snapshot).
- */
- public synchronized void init(TermIndex termIndex) {
- this.current = termIndex;
- }
-
- public synchronized void reset() {
- init(INIT_TERMINDEX);
- }
-
- /**
- * Update the tracker with a new TermIndex. It means that the StateMachine has
- * this index in memory.
- */
- public synchronized void update(TermIndex termIndex) {
- Preconditions.checkArgument(termIndex != null &&
- termIndex.compareTo(current) >= 0);
- this.current = termIndex;
- }
-
- /**
- * Return latest term and index that is inserted to this tracker as an atomic
- * entity.
- */
- public synchronized TermIndex getLatestTermIndex() {
- return current;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/test/java/org/apache/raft/statemachine/TestStateMachine.java
----------------------------------------------------------------------
diff --git a/raft-server/src/test/java/org/apache/raft/statemachine/TestStateMachine.java b/raft-server/src/test/java/org/apache/raft/statemachine/TestStateMachine.java
deleted file mode 100644
index 546bfb8..0000000
--- a/raft-server/src/test/java/org/apache/raft/statemachine/TestStateMachine.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.statemachine;
-
-import org.apache.log4j.Level;
-import org.apache.raft.MiniRaftCluster;
-import org.apache.raft.RaftTestUtil;
-import org.apache.raft.client.RaftClient;
-import org.apache.raft.conf.RaftProperties;
-import org.apache.raft.protocol.Message;
-import org.apache.raft.protocol.RaftClientRequest;
-import org.apache.raft.server.RaftServerConfigKeys;
-import org.apache.raft.server.impl.RaftServerImpl;
-import org.apache.raft.server.impl.RaftServerTestUtil;
-import org.apache.raft.server.simulation.MiniRaftClusterWithSimulatedRpc;
-import org.apache.raft.shaded.proto.RaftProtos.SMLogEntryProto;
-import org.apache.raft.util.RaftUtils;
-import org.junit.*;
-import org.junit.rules.Timeout;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
-
-import static org.junit.Assert.*;
-
-/**
- * Test StateMachine related functionality
- */
-public class TestStateMachine {
- static {
- RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
- RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
- }
-
- public static final int NUM_SERVERS = 5;
-
- private final RaftProperties properties = new RaftProperties();
- {
- // TODO: fix and run with in-memory log. It fails with NPE
- properties.setBoolean(RaftServerConfigKeys.RAFT_SERVER_USE_MEMORY_LOG_KEY, false);
- }
-
- private MiniRaftClusterWithSimulatedRpc cluster;
-
- @Rule
- public Timeout globalTimeout = new Timeout(60 * 1000);
-
- @Before
- public void setup() throws IOException {
- }
-
- private void startCluster() {
- cluster = new MiniRaftClusterWithSimulatedRpc(NUM_SERVERS, properties);
- Assert.assertNull(getCluster().getLeader());
- getCluster().start();
- }
-
- @After
- public void tearDown() {
- final MiniRaftCluster cluster = getCluster();
- if (cluster != null) {
- cluster.shutdown();
- }
- }
-
- public MiniRaftClusterWithSimulatedRpc getCluster() {
- return cluster;
- }
-
- public RaftProperties getProperties() {
- return properties;
- }
-
- static class SMTransactionContext extends SimpleStateMachine4Testing {
- public static SMTransactionContext get(RaftServerImpl s) {
- return (SMTransactionContext)RaftServerTestUtil.getStateMachine(s);
- }
-
- AtomicReference<Throwable> throwable = new AtomicReference<>(null);
- AtomicLong transactions = new AtomicLong(0);
- AtomicBoolean isLeader = new AtomicBoolean(false);
- AtomicLong numApplied = new AtomicLong(0);
- ConcurrentLinkedQueue<Long> applied = new ConcurrentLinkedQueue<>();
-
- @Override
- public TransactionContext startTransaction(RaftClientRequest request) throws IOException {
- // only leader will get this call
- isLeader.set(true);
- // send the next transaction id as the "context" from SM
- return new TransactionContext(this, request, SMLogEntryProto.newBuilder()
- .setData(request.getMessage().getContent())
- .build(), transactions.incrementAndGet());
- }
-
- @Override
- public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
- try {
- assertTrue(trx.getLogEntry().isPresent());
- assertTrue(trx.getSMLogEntry().isPresent());
- Optional<Object> context = trx.getStateMachineContext();
- if (isLeader.get()) {
- assertTrue(trx.getClientRequest().isPresent());
- assertTrue(context.isPresent());
- assertTrue(context.get() instanceof Long);
- Long val = (Long)context.get();
- assertTrue(val <= transactions.get());
- applied.add(val);
- } else {
- assertFalse(trx.getClientRequest().isPresent());
- assertFalse(context.isPresent());
- }
- numApplied.incrementAndGet();
- } catch (Throwable t) {
- throwable.set(t);
- }
- return CompletableFuture.completedFuture(null);
- }
-
- void rethrowIfException() throws Throwable {
- Throwable t = throwable.get();
- if (t != null) {
- throw t;
- }
- }
- }
-
- @Test
- public void testTransactionContextIsPassedBack() throws Throwable {
- // tests that the TrxContext set by the StateMachine in Leader is passed back to the SM
- properties.setClass(
- MiniRaftCluster.STATEMACHINE_CLASS_KEY,
- SMTransactionContext.class, StateMachine.class);
- startCluster();
-
- int numTrx = 100;
- final RaftTestUtil.SimpleMessage[] messages = RaftTestUtil.SimpleMessage.create(numTrx);
- try(final RaftClient client = cluster.createClient("client", null)) {
- for (RaftTestUtil.SimpleMessage message : messages) {
- client.send(message);
- }
- }
-
- // TODO: there eshould be a better way to ensure all data is replicated and applied
- Thread.sleep(cluster.getMaxTimeout() + 100);
-
- for (RaftServerImpl raftServer : cluster.getServers()) {
- final SMTransactionContext sm = SMTransactionContext.get(raftServer);
- sm.rethrowIfException();
- assertEquals(numTrx, sm.numApplied.get());
- }
-
- // check leader
- RaftServerImpl raftServer = cluster.getLeader();
- // assert every transaction has obtained context in leader
- final SMTransactionContext sm = SMTransactionContext.get(raftServer);
- List<Long> ll = sm.applied.stream().collect(Collectors.toList());
- Collections.sort(ll);
- assertEquals(ll.toString(), ll.size(), numTrx);
- for (int i=0; i < numTrx; i++) {
- assertEquals(ll.toString(), Long.valueOf(i+1), ll.get(i));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/raft-server/src/test/resources/log4j.properties b/raft-server/src/test/resources/log4j.properties
deleted file mode 100644
index ced0687..0000000
--- a/raft-server/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,18 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# log4j configuration used during build and unit tests
-
-log4j.rootLogger=info,stdout
-log4j.threshold=ALL
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-client/pom.xml
----------------------------------------------------------------------
diff --git a/ratis-client/pom.xml b/ratis-client/pom.xml
new file mode 100644
index 0000000..58d9817
--- /dev/null
+++ b/ratis-client/pom.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>ratis-project-dist</artifactId>
+ <groupId>org.apache.ratis</groupId>
+ <version>1.0-SNAPSHOT</version>
+ <relativePath>../ratis-project-dist</relativePath>
+ </parent>
+
+ <artifactId>ratis-client</artifactId>
+ <name>Ratis Client</name>
+
+ <dependencies>
+ <dependency>
+ <artifactId>ratis-proto-shaded</artifactId>
+ <groupId>org.apache.ratis</groupId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <artifactId>ratis-common</artifactId>
+ <groupId>org.apache.ratis</groupId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
new file mode 100644
index 0000000..e4e0b84
--- /dev/null
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.client;
+
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftPeer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/** A client who sends requests to a raft service. */
+public interface RaftClient extends Closeable {
+ Logger LOG = LoggerFactory.getLogger(RaftClient.class);
+ long DEFAULT_SEQNUM = 0;
+
+ /** @return the id of this client. */
+ String getId();
+
+ /**
+ * Send the given message to the raft service.
+ * The message may change the state of the service.
+ * For readonly messages, use {@link #sendReadOnly(Message)} instead.
+ */
+ RaftClientReply send(Message message) throws IOException;
+
+ /** Send the given readonly message to the raft service. */
+ RaftClientReply sendReadOnly(Message message) throws IOException;
+
+ /** Send set configuration request to the raft service. */
+ RaftClientReply setConfiguration(RaftPeer[] peersInNewConf) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
new file mode 100644
index 0000000..e1e1593
--- /dev/null
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.client;
+
+public interface RaftClientConfigKeys {
+ String RAFT_RPC_TIMEOUT_MS_KEY = "raft.rpc.timeout.ms";
+ int RAFT_RPC_TIMEOUT_MS_DEFAULT = 300;
+}