You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/03/16 10:41:46 UTC
svn commit: r1301395 - in /zookeeper/bookkeeper/trunk: CHANGES.txt
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
Author: ivank
Date: Fri Mar 16 09:41:45 2012
New Revision: 1301395
URL: http://svn.apache.org/viewvc?rev=1301395&view=rev
Log:
BOOKKEEPER-182: Entry log file is overwritten when fail to read lastLogId. (sijie via ivank)
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1301395&r1=1301394&r2=1301395&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Fri Mar 16 09:41:45 2012
@@ -56,6 +56,8 @@ Trunk (unreleased changes)
BOOKKEEPER-184: CompactionTest failing on Jenkins (sijie via ivank)
+ BOOKKEEPER-182: Entry log file is overwritten when fail to read lastLogId. (sijie via ivank)
+
hedwig-server/
BOOKKEEPER-140: Hub server doesn't subscribe remote region correctly when a region is down. (Sijie Gou via ivank)
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java?rev=1301395&r1=1301394&r2=1301395&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java Fri Mar 16 09:41:45 2012
@@ -24,6 +24,7 @@ package org.apache.bookkeeper.bookie;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
+import java.io.FileFilter;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
@@ -34,6 +35,7 @@ import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Arrays;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -174,13 +176,14 @@ public class EntryLogger {
// this header buffer is cleared before writing it into the new logChannel.
LOGFILE_HEADER.put("BKLO".getBytes());
// Find the largest logId
+ logId = -1;
for(File f: dirs) {
long lastLogId = getLastLogId(f);
- if (lastLogId >= logId) {
- logId = lastLogId+1;
+ if (lastLogId > logId) {
+ logId = lastLogId;
}
}
- createLogId(logId);
+ createNewLog();
}
/**
@@ -189,16 +192,31 @@ public class EntryLogger {
private ConcurrentHashMap<Long, BufferedChannel> channels = new ConcurrentHashMap<Long, BufferedChannel>();
/**
- * Creates a new log file with the given id.
+ * Creates a new log file
*/
- private void createLogId(long logId) throws IOException {
+ private void createNewLog() throws IOException {
List<File> list = Arrays.asList(dirs);
Collections.shuffle(list);
- File firstDir = list.get(0);
if (logChannel != null) {
logChannel.flush(true);
}
- logChannel = new BufferedChannel(new RandomAccessFile(new File(firstDir, Long.toHexString(logId)+".log"), "rw").getChannel(), 64*1024);
+
+ // It would better not to overwrite existing entry log files
+ File newLogFile = null;
+ do {
+ String logFileName = Long.toHexString(++logId) + ".log";
+ for (File dir : list) {
+ newLogFile = new File(dir, logFileName);
+ if (newLogFile.exists()) {
+ LOG.warn("Found existed entry log " + newLogFile
+ + " when trying to create it as a new log.");
+ newLogFile = null;
+ break;
+ }
+ }
+ } while (newLogFile == null);
+
+ logChannel = new BufferedChannel(new RandomAccessFile(newLogFile, "rw").getChannel(), 64*1024);
logChannel.write((ByteBuffer) LOGFILE_HEADER.clear());
channels.put(logId, logChannel);
for(File f: dirs) {
@@ -252,10 +270,41 @@ public class EntryLogger {
}
}
+ private long getLastLogId(File dir) {
+ long id = readLastLogId(dir);
+ // read success
+ if (id > 0) {
+ return id;
+ }
+ // read failed, scan the ledger directories to find biggest log id
+ File[] logFiles = dir.listFiles(new FileFilter() {
+ @Override
+ public boolean accept(File file) {
+ return file.getName().endsWith(".log");
+ }
+ });
+ List<Long> logs = new ArrayList<Long>();
+ for (File lf : logFiles) {
+ String idString = lf.getName().split("\\.")[0];
+ try {
+ long lid = Long.parseLong(idString, 16);
+ logs.add(lid);
+ } catch (NumberFormatException nfe) {
+ }
+ }
+ // no log file found in this directory
+ if (0 == logs.size()) {
+ return -1;
+ }
+ // order the collections
+ Collections.sort(logs);
+ return logs.get(logs.size() - 1);
+ }
+
/**
* reads id from the "lastId" file in the given directory.
*/
- private long getLastLogId(File f) {
+ private long readLastLogId(File f) {
FileInputStream fis;
try {
fis = new FileInputStream(new File(f, "lastId"));
@@ -278,10 +327,6 @@ public class EntryLogger {
}
}
- private void openNewChannel() throws IOException {
- createLogId(++logId);
- }
-
synchronized void flush() throws IOException {
if (logChannel != null) {
logChannel.flush(true);
@@ -289,7 +334,7 @@ public class EntryLogger {
}
synchronized long addEntry(long ledger, ByteBuffer entry) throws IOException {
if (logChannel.position() + entry.remaining() + 4 > logSizeLimit) {
- openNewChannel();
+ createNewLog();
}
ByteBuffer buff = ByteBuffer.allocate(4);
buff.putInt(entry.remaining());
@@ -351,7 +396,9 @@ public class EntryLogger {
return fc;
}
File file = findFile(entryLogId);
- FileChannel newFc = new RandomAccessFile(file, "rw").getChannel();
+ // get channel is used to open an existing entry log file
+ // it would be better to open using read mode
+ FileChannel newFc = new RandomAccessFile(file, "r").getChannel();
// If the file already exists before creating a BufferedChannel layer above it,
// set the FileChannel's position to the end so the write buffer knows where to start.
newFc.position(newFc.size());
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java?rev=1301395&r1=1301394&r2=1301395&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java Fri Mar 16 09:41:45 2012
@@ -83,10 +83,70 @@ public class EntryLogTest extends TestCa
ByteBuffer bb = ByteBuffer.wrap(new byte[64]);
bb.putLong(ledger);
bb.putLong(entry);
- bb.put(("ledger"+ledger).getBytes());
+ bb.put(("ledger-" + ledger + "-" + entry).getBytes());
bb.flip();
return bb;
}
+
+ @Test
+ public void testMissingLogId() throws Exception {
+ File tmpDir = File.createTempFile("entryLogTest", ".dir");
+ tmpDir.delete();
+ tmpDir.mkdir();
+ File curDir = Bookie.getCurrentDirectory(tmpDir);
+ Bookie.checkDirectoryStructure(curDir);
+
+ ServerConfiguration conf = new ServerConfiguration();
+ conf.setLedgerDirNames(new String[] {tmpDir.toString()});
+ // create some entries
+ int numLogs = 3;
+ int numEntries = 10;
+ long[][] positions = new long[2*numLogs][];
+ for (int i=0; i<numLogs; i++) {
+ positions[i] = new long[numEntries];
+
+ EntryLogger logger = new EntryLogger(conf);
+ for (int j=0; j<numEntries; j++) {
+ positions[i][j] = logger.addEntry(i, generateEntry(i, j));
+ }
+ logger.flush();
+ }
+ // delete last log id
+ File lastLogId = new File(curDir, "lastId");
+ lastLogId.delete();
+
+ // write another entries
+ for (int i=numLogs; i<2*numLogs; i++) {
+ positions[i] = new long[numEntries];
+
+ EntryLogger logger = new EntryLogger(conf);
+ for (int j=0; j<numEntries; j++) {
+ positions[i][j] = logger.addEntry(i, generateEntry(i, j));
+ }
+ logger.flush();
+ }
+
+ EntryLogger newLogger = new EntryLogger(conf);
+ for (int i=0; i<(2*numLogs+1); i++) {
+ File logFile = new File(curDir, Long.toHexString(i) + ".log");
+ assertTrue(logFile.exists());
+ }
+ for (int i=0; i<2*numLogs; i++) {
+ for (int j=0; j<numEntries; j++) {
+ String expectedValue = "ledger-" + i + "-" + j;
+ byte[] value = newLogger.readEntry(i, j, positions[i][j]);
+ ByteBuffer buf = ByteBuffer.wrap(value);
+ long ledgerId = buf.getLong();
+ long entryId = buf.getLong();
+ byte[] data = new byte[buf.remaining()];
+ buf.get(data);
+ assertEquals(i, ledgerId);
+ assertEquals(j, entryId);
+ assertEquals(expectedValue, new String(data));
+ }
+ }
+ }
+
@After
public void tearDown() throws Exception {