You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/07/26 03:33:43 UTC
[24/55] [abbrv] kylin git commit: KYLIN-1894 GlobalDictionary may
corrupt when server suddenly crash
KYLIN-1894 GlobalDictionary may corrupt when server suddenly crash
Signed-off-by: shaofengshi <sh...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/f0db336f
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/f0db336f
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/f0db336f
Branch: refs/heads/1.5.x-CDH5.7
Commit: f0db336fc238f88af952de8672806c62c9c57aee
Parents: 9200475
Author: sunyerui <su...@gmail.com>
Authored: Fri Jul 15 12:30:06 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Jul 18 16:41:21 2016 +0800
----------------------------------------------------------------------
.../apache/kylin/dict/AppendTrieDictionary.java | 65 ++---
.../org/apache/kylin/dict/CachedTreeMap.java | 83 +++++-
.../kylin/dict/AppendTrieDictionaryTest.java | 6 +-
.../apache/kylin/dict/CachedTreeMapTest.java | 265 +++++++++++++++++++
4 files changed, 364 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/f0db336f/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java
index 32038bf..4cce586 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java
@@ -99,13 +99,7 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
}
}
- public void update(String baseDir, int baseId, int maxId, int maxValueLength, int nValues, BytesConverter bytesConverter, byte[] dictMapBytes) throws IOException {
- ByteArrayInputStream buf = new ByteArrayInputStream(dictMapBytes);
- DataInputStream input = new DataInputStream(buf);
- update(baseDir, baseId, maxId, maxValueLength, nValues, bytesConverter, input);
- }
-
- public void update(String baseDir, int baseId, int maxId, int maxValueLength, int nValues, BytesConverter bytesConverter, DataInput input) throws IOException {
+ public void update(String baseDir, int baseId, int maxId, int maxValueLength, int nValues, BytesConverter bytesConverter, CachedTreeMap dictMap) throws IOException {
this.baseDir = baseDir;
this.baseId = baseId;
this.maxId = maxId;
@@ -114,11 +108,8 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
this.bytesConverter = bytesConverter;
int cacheSize = KylinConfig.getInstanceFromEnv().getAppendDictCacheSize();
- if (dictSliceMap == null) {
- dictSliceMap = CachedTreeMap.CachedTreeMapBuilder.newBuilder().maxSize(cacheSize).baseDir(baseDir).persistent(true).immutable(true).keyClazz(DictSliceKey.class).valueClazz(DictSlice.class).build();
- }
- dictSliceMap.clear();
- ((Writable) dictSliceMap).readFields(input);
+ dictSliceMap = CachedTreeMap.CachedTreeMapBuilder.newBuilder().maxSize(cacheSize).baseDir(baseDir).persistent(true).immutable(true).keyClazz(DictSliceKey.class).valueClazz(DictSlice.class).build();
+ ((CachedTreeMap)dictSliceMap).loadEntry(dictMap);
}
public byte[] writeDictMap() throws IOException {
@@ -777,7 +768,7 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
private AppendTrieDictionary dict;
- private TreeMap<DictSliceKey, DictNode> dictSliceMap;
+ private TreeMap<DictSliceKey, DictNode> mutableDictSliceMap;
private static int MAX_ENTRY_IN_SLICE = 10_000_000;
private static final double MAX_ENTRY_OVERHEAD_FACTOR = 1.0;
@@ -803,9 +794,9 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
MAX_ENTRY_IN_SLICE = KylinConfig.getInstanceFromEnv().getAppendDictEntrySize();
int cacheSize = KylinConfig.getInstanceFromEnv().getAppendDictCacheSize();
// create a new cached map with baseDir
- dictSliceMap = CachedTreeMap.CachedTreeMapBuilder.newBuilder().maxSize(cacheSize).baseDir(baseDir).keyClazz(DictSliceKey.class).valueClazz(DictNode.class).persistent(true).immutable(false).build();
+ mutableDictSliceMap = CachedTreeMap.CachedTreeMapBuilder.newBuilder().maxSize(cacheSize).baseDir(baseDir).keyClazz(DictSliceKey.class).valueClazz(DictNode.class).persistent(true).immutable(false).build();
if (dictMapBytes != null) {
- ((Writable) dictSliceMap).readFields(new DataInputStream(new ByteArrayInputStream(dictMapBytes)));
+ ((Writable) mutableDictSliceMap).readFields(new DataInputStream(new ByteArrayInputStream(dictMapBytes)));
}
}
@@ -819,23 +810,23 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
}
maxValueLength = Math.max(maxValueLength, value.length);
- if (dictSliceMap.isEmpty()) {
+ if (mutableDictSliceMap.isEmpty()) {
DictNode root = new DictNode(new byte[0], false);
- dictSliceMap.put(DictSliceKey.wrap(new byte[0]), root);
+ mutableDictSliceMap.put(DictSliceKey.wrap(new byte[0]), root);
}
- DictSliceKey sliceKey = dictSliceMap.floorKey(DictSliceKey.wrap(value));
+ DictSliceKey sliceKey = mutableDictSliceMap.floorKey(DictSliceKey.wrap(value));
if (sliceKey == null) {
- sliceKey = dictSliceMap.firstKey();
+ sliceKey = mutableDictSliceMap.firstKey();
}
- DictNode root = dictSliceMap.get(sliceKey);
+ DictNode root = mutableDictSliceMap.get(sliceKey);
addValueR(root, value, 0);
if (root.childrenCount > MAX_ENTRY_IN_SLICE * MAX_ENTRY_OVERHEAD_FACTOR) {
- dictSliceMap.remove(sliceKey);
+ mutableDictSliceMap.remove(sliceKey);
DictNode newRoot = splitNodeTree(root);
DictNode.mergeSingleByteNode(root, 1);
DictNode.mergeSingleByteNode(newRoot, 0);
- dictSliceMap.put(DictSliceKey.wrap(root.firstValue()), root);
- dictSliceMap.put(DictSliceKey.wrap(newRoot.firstValue()), newRoot);
+ mutableDictSliceMap.put(DictSliceKey.wrap(root.firstValue()), root);
+ mutableDictSliceMap.put(DictSliceKey.wrap(newRoot.firstValue()), newRoot);
}
}
@@ -956,18 +947,11 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
}
public AppendTrieDictionary<T> build(int baseId) throws IOException {
- ByteArrayOutputStream buf = new ByteArrayOutputStream();
- DataOutputStream out = new DataOutputStream(buf);
- ((Writable) dictSliceMap).write(out);
- byte[] dictMapBytes = buf.toByteArray();
- buf.close();
- out.close();
-
if (dict == null) {
dict = new AppendTrieDictionary<T>();
}
- dict.update(baseDir, baseId, maxId, maxValueLength, nValues, bytesConverter, dictMapBytes);
- dict.flushIndex();
+ dict.flushIndex((CachedTreeMap) mutableDictSliceMap);
+ dict.update(baseDir, baseId, maxId, maxValueLength, nValues, bytesConverter, (CachedTreeMap)mutableDictSliceMap);
return dict;
}
@@ -1047,24 +1031,25 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
throw new UnsupportedOperationException("AppendTrieDictionary can't retrive value from id");
}
- public void flushIndex() throws IOException {
- Path filePath = new Path(baseDir + "/.index");
+ public void flushIndex(CachedTreeMap dictSliceMap) throws IOException {
+ Path filePath = new Path(dictSliceMap.getCurrentDir() + "/.index");
Configuration conf = new Configuration();
- try (FSDataOutputStream indexOut = (FileSystem.get(filePath.toUri(), conf)).create(filePath, true, 8 * 1024 * 1024, (short) 2, 8 * 1024 * 1024 * 8)) {
+ try (FSDataOutputStream indexOut = (FileSystem.get(filePath.toUri(), conf)).create(filePath, true, 8 * 1024 * 1024, (short) 5, 8 * 1024 * 1024 * 8)) {
indexOut.writeInt(baseId);
indexOut.writeInt(maxId);
indexOut.writeInt(maxValueLength);
indexOut.writeInt(nValues);
indexOut.writeUTF(bytesConverter.getClass().getName());
- ((Writable) dictSliceMap).write(indexOut);
+ dictSliceMap.write(indexOut);
}
+ dictSliceMap.commit(false);
}
@Override
public AppendTrieDictionary copyToAnotherMeta(KylinConfig srcConfig, KylinConfig dstConfig) throws IOException {
Configuration conf = new Configuration();
AppendTrieDictionary newDict = new AppendTrieDictionary();
- newDict.update(baseDir.replaceFirst(srcConfig.getHdfsWorkingDirectory(), dstConfig.getHdfsWorkingDirectory()), baseId, maxId, maxValueLength, nValues, bytesConverter, writeDictMap());
+ newDict.update(baseDir.replaceFirst(srcConfig.getHdfsWorkingDirectory(), dstConfig.getHdfsWorkingDirectory()), baseId, maxId, maxValueLength, nValues, bytesConverter, (CachedTreeMap)dictSliceMap);
logger.info("Copy AppendDict from {} to {}", this.baseDir, newDict.baseDir);
Path srcPath = new Path(this.baseDir);
Path dstPath = new Path(newDict.baseDir);
@@ -1081,7 +1066,6 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(baseDir);
- flushIndex();
}
@Override
@@ -1103,7 +1087,10 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
throw new IOException(e);
}
}
- update(baseDir, baseId, maxId, maxValueLength, nValues, converter, input);
+ CachedTreeMap dictMap = CachedTreeMap.CachedTreeMapBuilder.newBuilder()
+ .baseDir(baseDir).persistent(true).immutable(true).keyClazz(DictSliceKey.class).valueClazz(DictSlice.class).build();
+ dictMap.readFields(input);
+ update(baseDir, baseId, maxId, maxValueLength, nValues, converter, dictMap);
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/f0db336f/core-dictionary/src/main/java/org/apache/kylin/dict/CachedTreeMap.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/CachedTreeMap.java b/core-dictionary/src/main/java/org/apache/kylin/dict/CachedTreeMap.java
index ec29bb5..1ea3c1c 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/CachedTreeMap.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/CachedTreeMap.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -58,6 +59,8 @@ public class CachedTreeMap<K extends WritableComparable, V extends Writable> ext
private final TreeSet<String> fileList;
private final Configuration conf;
private final String baseDir;
+ private final String tmpDir;
+ private final FileSystem fs;
private final boolean persistent;
private final boolean immutable;
private long writeValueTime = 0;
@@ -110,7 +113,7 @@ public class CachedTreeMap<K extends WritableComparable, V extends Writable> ext
return this;
}
- public CachedTreeMap build() {
+ public CachedTreeMap build() throws IOException {
if (baseDir == null) {
throw new RuntimeException("CachedTreeMap need a baseDir to cache data");
}
@@ -122,13 +125,19 @@ public class CachedTreeMap<K extends WritableComparable, V extends Writable> ext
}
}
- private CachedTreeMap(int maxCount, Class<K> keyClazz, Class<V> valueClazz, String baseDir, boolean persistent, boolean immutable) {
+ private CachedTreeMap(int maxCount, Class<K> keyClazz, Class<V> valueClazz, String baseDir, boolean persistent, boolean immutable) throws IOException {
super();
this.keyClazz = keyClazz;
this.valueClazz = valueClazz;
this.fileList = new TreeSet<>();
this.conf = new Configuration();
- this.baseDir = baseDir;
+ if (baseDir.endsWith("/")) {
+ this.baseDir = baseDir.substring(0, baseDir.length()-1);
+ } else {
+ this.baseDir = baseDir;
+ }
+ this.tmpDir = this.baseDir + ".tmp";
+ this.fs = FileSystem.get(new Path(baseDir).toUri(), conf);
this.persistent = persistent;
this.immutable = immutable;
CacheBuilder builder = CacheBuilder.newBuilder().removalListener(new RemovalListener<K, V>() {
@@ -140,17 +149,27 @@ public class CachedTreeMap<K extends WritableComparable, V extends Writable> ext
writeValue(notification.getKey(), notification.getValue());
break;
case EXPLICIT:
- // skip delete files to recover from error during dict appending
- // deleteValue(notification.getKey());
+ deleteValue(notification.getKey());
break;
default:
throw new RuntimeException("unexpected evict reason " + notification.getCause());
}
}
- }).maximumSize(maxCount);
- // For immutable values, use soft reference to free memory when gc, and just load again when need it
+ });
+ // For immutable values, load all values as much as possible, and evict by soft reference to free memory when gc
if (this.immutable) {
builder.softValues();
+ } else {
+ builder.maximumSize(maxCount);
+ // For mutable map, copy all data into tmp and modify on tmp data, avoiding suddenly server crash made data corrupt
+ if (fs.exists(new Path(tmpDir))) {
+ fs.delete(new Path(tmpDir), true);
+ }
+ if (fs.exists(new Path(this.baseDir))) {
+ FileUtil.copy(fs, new Path(this.baseDir), fs, new Path(tmpDir), false, true, conf);
+ } else {
+ fs.mkdirs(new Path(this.baseDir));
+ }
}
this.valueCache = builder.build(new CacheLoader<K, V>() {
@Override
@@ -163,10 +182,47 @@ public class CachedTreeMap<K extends WritableComparable, V extends Writable> ext
}
private String generateFileName(K key) {
- String file = baseDir + "/cached_" + key.toString();
+ String file = (immutable ? baseDir : tmpDir) + "/cached_" + key.toString();
return file;
}
+ public String getCurrentDir() {
+ return immutable ? baseDir : tmpDir;
+ }
+
+ public void commit(boolean stillMutable) throws IOException {
+ assert !immutable : "Only support commit method with immutable false";
+
+ Path basePath = new Path(baseDir);
+ Path backupPath = new Path(baseDir+".bak");
+ Path tmpPath = new Path(tmpDir);
+ try {
+ fs.rename(basePath, backupPath);
+ } catch (IOException e) {
+ logger.info("CachedTreeMap commit backup basedir failed, " + e, e);
+ throw e;
+ }
+
+ try {
+ if (stillMutable) {
+ FileUtil.copy(fs, tmpPath, fs, basePath, false, true, conf);
+ } else {
+ fs.rename(tmpPath, basePath);
+ }
+ fs.delete(backupPath, true);
+ } catch (IOException e) {
+ fs.rename(backupPath, basePath);
+ logger.info("CachedTreeMap commit move/copy tmpdir failed, " + e, e);
+ throw e;
+ }
+ }
+
+ public void loadEntry(CachedTreeMap other) {
+ for (Object key : other.keySet()) {
+ super.put((K)key, null);
+ }
+ }
+
private void writeValue(K key, V value) {
if (immutable) {
return;
@@ -174,10 +230,10 @@ public class CachedTreeMap<K extends WritableComparable, V extends Writable> ext
long t0 = System.currentTimeMillis();
String fileName = generateFileName(key);
Path filePath = new Path(fileName);
- try (FSDataOutputStream out = (FileSystem.get(filePath.toUri(), conf)).create(filePath, true, BUFFER_SIZE, (short) 2, BUFFER_SIZE * 8)) {
+ try (FSDataOutputStream out = fs.create(filePath, true, BUFFER_SIZE, (short) 5, BUFFER_SIZE * 8)) {
value.write(out);
if (!persistent) {
- FileSystem.get(filePath.toUri(), conf).deleteOnExit(filePath);
+ fs.deleteOnExit(filePath);
}
} catch (Exception e) {
logger.error(String.format("write value into %s exception: %s", fileName, e), e);
@@ -192,7 +248,7 @@ public class CachedTreeMap<K extends WritableComparable, V extends Writable> ext
long t0 = System.currentTimeMillis();
String fileName = generateFileName(key);
Path filePath = new Path(fileName);
- try (FSDataInputStream input = (FileSystem.get(filePath.toUri(), conf)).open(filePath, BUFFER_SIZE)) {
+ try (FSDataInputStream input = fs.open(filePath, BUFFER_SIZE)) {
V value = valueClazz.newInstance();
value.readFields(input);
return value;
@@ -211,7 +267,6 @@ public class CachedTreeMap<K extends WritableComparable, V extends Writable> ext
String fileName = generateFileName(key);
Path filePath = new Path(fileName);
try {
- FileSystem fs = FileSystem.get(filePath.toUri(), conf);
if (fs.exists(filePath)) {
fs.delete(filePath, true);
}
@@ -224,6 +279,7 @@ public class CachedTreeMap<K extends WritableComparable, V extends Writable> ext
@Override
public V put(K key, V value) {
+ assert !immutable : "Only support put method with immutable false";
super.put(key, null);
valueCache.put(key, value);
return null;
@@ -245,6 +301,7 @@ public class CachedTreeMap<K extends WritableComparable, V extends Writable> ext
@Override
public V remove(Object key) {
+ assert !immutable : "Only support remove method with immutable false";
super.remove(key);
valueCache.invalidate(key);
return null;
@@ -300,6 +357,7 @@ public class CachedTreeMap<K extends WritableComparable, V extends Writable> ext
@Override
public void remove() {
+ assert !immutable : "Only support remove method with immutable false";
keyIterator.remove();
valueCache.invalidate(currentKey);
}
@@ -344,7 +402,6 @@ public class CachedTreeMap<K extends WritableComparable, V extends Writable> ext
for (String file : fileList) {
try {
Path filePath = new Path(file);
- FileSystem fs = FileSystem.get(filePath.toUri(), conf);
fs.delete(filePath, true);
} catch (Throwable t) {
//do nothing?
http://git-wip-us.apache.org/repos/asf/kylin/blob/f0db336f/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
index 0ea5ebe..b81a439 100644
--- a/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
+++ b/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
@@ -56,7 +56,7 @@ public class AppendTrieDictionaryTest {
KylinConfig config = KylinConfig.getInstanceFromEnv();
config.setAppendDictEntrySize(50000);
config.setAppendDictCacheSize(3);
- config.setProperty("kylin.hdfs.working.dir", "/tmp");
+ config.setProperty("kylin.hdfs.working.dir", "/tmp/kylin_append_dict");
}
@AfterClass
@@ -122,7 +122,7 @@ public class AppendTrieDictionaryTest {
@Test
public void testHugeKeySet() throws IOException {
BytesConverter converter = new StringBytesConverter();
- AppendTrieDictionary.Builder<String> b = AppendTrieDictionary.Builder.create("/tmp");
+ AppendTrieDictionary.Builder<String> b = AppendTrieDictionary.Builder.create("/tmp/kylin_append_dict");
AppendTrieDictionary<String> dict = null;
InputStream is = new FileInputStream("src/test/resources/dict/huge_key");
@@ -152,7 +152,7 @@ public class AppendTrieDictionaryTest {
}
BytesConverter converter = new StringBytesConverter();
- AppendTrieDictionary.Builder<String> b = AppendTrieDictionary.Builder.create("/tmp");
+ AppendTrieDictionary.Builder<String> b = AppendTrieDictionary.Builder.create("/tmp/kylin_append_dict");
AppendTrieDictionary<String> dict = null;
TreeMap<Integer, String> checkMap = new TreeMap<>();
int firstAppend = rnd.nextInt(strList.size() / 2);
http://git-wip-us.apache.org/repos/asf/kylin/blob/f0db336f/core-dictionary/src/test/java/org/apache/kylin/dict/CachedTreeMapTest.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/CachedTreeMapTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/CachedTreeMapTest.java
new file mode 100644
index 0000000..d2af621
--- /dev/null
+++ b/core-dictionary/src/test/java/org/apache/kylin/dict/CachedTreeMapTest.java
@@ -0,0 +1,265 @@
+package org.apache.kylin.dict;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.*;
+
+import static org.junit.Assert.*;
+
+/**
+ * Created by sunyerui on 16/7/12.
+ */
+public class CachedTreeMapTest {
+
+ public static class Key implements WritableComparable {
+ int keyInt;
+
+ public static Key of(int keyInt) {
+ Key newKey = new Key();
+ newKey.keyInt = keyInt;
+ return newKey;
+ }
+
+ @Override
+ public int compareTo(Object o) {
+ return keyInt - ((Key)o).keyInt;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(keyInt);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ keyInt = in.readInt();
+ }
+
+ @Override
+ public String toString() {
+ return String.valueOf(keyInt);
+ }
+ }
+
+ public static boolean VALUE_WRITE_ERROR_TOGGLE = false;
+ public static class Value implements Writable {
+ String valueStr;
+
+ public static Value of(String valueStr) {
+ Value newValue = new Value();
+ newValue.valueStr = valueStr;
+ return newValue;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ if (VALUE_WRITE_ERROR_TOGGLE) {
+ out.write(new byte[0]);
+ return;
+ }
+ out.writeUTF(valueStr);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ valueStr = in.readUTF();
+ }
+ }
+
+ public static class CachedFileFilter implements FileFilter {
+ @Override
+ public boolean accept(File pathname) {
+ return pathname.getName().startsWith("cached_");
+ }
+ }
+
+ public static final String baseDir = "/tmp/kylin_cachedtreemap_test/";
+ public static final String backupDir = "/tmp/kylin_cachedtreemap_test.bak/";
+ public static final String tmpDir = "/tmp/kylin_cachedtreemap_test.tmp/";
+
+ @After
+ public void afterTest() {
+ File dir = new File(baseDir);
+ if (dir.exists()) {
+ for (File f : dir.listFiles()) {
+ f.delete();
+ }
+ dir.delete();
+ }
+
+ dir = new File(tmpDir);
+ if (dir.exists()) {
+ for (File f : dir.listFiles()) {
+ f.delete();
+ }
+ dir.delete();
+ }
+
+ dir = new File(backupDir);
+ if (dir.exists()) {
+ for (File f : dir.listFiles()) {
+ f.delete();
+ }
+ dir.delete();
+ }
+
+ VALUE_WRITE_ERROR_TOGGLE = false;
+ }
+
+ @Test
+ public void testCachedTreeMap() throws IOException {
+ CachedTreeMap map = CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir("file://"+baseDir)
+ .persistent(true).immutable(false).maxSize(2).keyClazz(Key.class).valueClazz(Value.class).build();
+ map.put(Key.of(1), Value.of("a"));
+ map.put(Key.of(2), Value.of("b"));
+ map.put(Key.of(3), Value.of("c"));
+ map.put(Key.of(4), Value.of("d"));
+ map.put(Key.of(5), Value.of("e"));
+
+ File dir = new File(tmpDir);
+ assertEquals(3, dir.listFiles(new CachedFileFilter()).length);
+
+ DataOutputStream out = new DataOutputStream(new FileOutputStream(tmpDir+"/.index"));
+ map.write(out);
+ out.flush();
+ out.close();
+ map.commit(false);
+
+ dir = new File(baseDir);
+ assertEquals(5, dir.listFiles(new CachedFileFilter()).length);
+
+ DataInputStream in = new DataInputStream(new FileInputStream(baseDir+".index"));
+ CachedTreeMap map2 = CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir("file://"+baseDir)
+ .persistent(true).immutable(true).maxSize(2).keyClazz(Key.class).valueClazz(Value.class).build();
+ map2.readFields(in);
+ assertEquals(5, map2.size());
+ assertEquals("b", ((Value)map2.get(Key.of(2))).valueStr);
+
+ try {
+ map2.put(Key.of(6), Value.of("f"));
+ fail("Should be error when put value into immutable map");
+ } catch (AssertionError error) {
+ }
+
+ assertFalse(new File(tmpDir).exists());
+ assertFalse(new File(backupDir).exists());
+ }
+
+ @Test
+ public void testWriteFailed() throws IOException {
+ // normal case
+ CachedTreeMap map = CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir("file://"+baseDir)
+ .persistent(true).immutable(false).maxSize(2).keyClazz(Key.class).valueClazz(Value.class).build();
+ map.put(Key.of(1), Value.of("a"));
+ map.put(Key.of(2), Value.of("b"));
+ map.put(Key.of(3), Value.of("c"));
+ map.remove(Key.of(3));
+ map.put(Key.of(4), Value.of("d"));
+
+ DataOutputStream out = new DataOutputStream(new FileOutputStream(tmpDir+".index"));
+ map.write(out);
+ out.flush();
+ out.close();
+ map.commit(false);
+
+ DataInputStream in = new DataInputStream(new FileInputStream(baseDir+".index"));
+ CachedTreeMap map2 = CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir("file://"+baseDir)
+ .persistent(true).immutable(true).maxSize(2).keyClazz(Key.class).valueClazz(Value.class).build();
+ map2.readFields(in);
+ assertEquals(3, map2.size());
+ assertEquals("a", ((Value)map2.get(Key.of(1))).valueStr);
+
+ // suppose write value failed and didn't commit data
+ map = CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir("file://"+baseDir)
+ .persistent(true).immutable(false).maxSize(2).keyClazz(Key.class).valueClazz(Value.class).build();
+ VALUE_WRITE_ERROR_TOGGLE = true;
+ map.put(Key.of(1), Value.of("aa"));
+ map.put(Key.of(2), Value.of("bb"));
+ VALUE_WRITE_ERROR_TOGGLE = false;
+ map.put(Key.of(3), Value.of("cc"));
+ map.put(Key.of(4), Value.of("dd"));
+ out = new DataOutputStream(new FileOutputStream(tmpDir+".index"));
+ map.write(out);
+ out.flush();
+ out.close();
+ // suppose write value failed and didn't commit data
+ //map.commit(false);
+
+ // read map data should not be modified
+ in = new DataInputStream(new FileInputStream(baseDir+".index"));
+ map2 = CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir("file://"+baseDir)
+ .persistent(true).immutable(true).maxSize(2).keyClazz(Key.class).valueClazz(Value.class).build();
+ map2.readFields(in);
+ assertEquals(3, map2.size());
+ assertEquals("a", ((Value)map2.get(Key.of(1))).valueStr);
+
+ assertTrue(new File(tmpDir).exists());
+ assertFalse(new File(backupDir).exists());
+ }
+
+ @Test
+ public void testCommit() throws IOException {
+ CachedTreeMap map = CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir("file://"+baseDir)
+ .persistent(true).immutable(false).maxSize(2).keyClazz(Key.class).valueClazz(Value.class).build();
+ map.put(Key.of(1), Value.of("a"));
+ map.put(Key.of(2), Value.of("b"));
+ map.put(Key.of(3), Value.of("c"));
+ map.put(Key.of(4), Value.of("d"));
+
+ DataOutputStream out = new DataOutputStream(new FileOutputStream(tmpDir+".index"));
+ map.write(out);
+ out.flush();
+ out.close();
+ map.commit(true);
+
+ assertTrue(new File(tmpDir).exists());
+ assertFalse(new File(backupDir).exists());
+
+ DataInputStream in = new DataInputStream(new FileInputStream(baseDir+".index"));
+ CachedTreeMap map2 = CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir("file://"+baseDir)
+ .persistent(true).immutable(true).maxSize(2).keyClazz(Key.class).valueClazz(Value.class).build();
+ map2.readFields(in);
+ assertEquals(4, map2.size());
+ assertEquals("a", ((Value)map2.get(Key.of(1))).valueStr);
+
+ // continue modify map, but not commit
+ map.put(Key.of(1), Value.of("aa"));
+ map.put(Key.of(2), Value.of("bb"));
+ map.put(Key.of(3), Value.of("cc"));
+ map.put(Key.of(5), Value.of("e"));
+ map.put(Key.of(6), Value.of("f"));
+ out = new DataOutputStream(new FileOutputStream(tmpDir+".index"));
+ map.write(out);
+ out.flush();
+ out.close();
+
+ assertTrue(new File(tmpDir).exists());
+ assertEquals(6, new File(tmpDir).listFiles(new CachedFileFilter()).length);
+ assertEquals(4, new File(baseDir).listFiles(new CachedFileFilter()).length);
+
+ in = new DataInputStream(new FileInputStream(baseDir+".index"));
+ map2 = CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir("file://"+baseDir)
+ .persistent(true).immutable(true).maxSize(2).keyClazz(Key.class).valueClazz(Value.class).build();
+ map2.readFields(in);
+ assertEquals(4, map2.size());
+ assertEquals("a", ((Value)map2.get(Key.of(1))).valueStr);
+
+ // commit data
+ map.commit(false);
+ assertFalse(new File(tmpDir).exists());
+ assertEquals(6, new File(baseDir).listFiles(new CachedFileFilter()).length);
+
+ in = new DataInputStream(new FileInputStream(baseDir+".index"));
+ map2 = CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir("file://"+baseDir)
+ .persistent(true).immutable(true).maxSize(2).keyClazz(Key.class).valueClazz(Value.class).build();
+ map2.readFields(in);
+ assertEquals(6, map2.size());
+ assertEquals("aa", ((Value)map2.get(Key.of(1))).valueStr);
+ assertEquals("f", ((Value)map2.get(Key.of(6))).valueStr);
+ }
+}
+