You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/11/12 17:31:50 UTC
svn commit: r713419 - in
/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb:
journal/Journal.java replication/ReplicationMaster.java
replication/ReplicationServer.java replication/ReplicationSlave.java
store/MessageDatabase.java
Author: chirino
Date: Wed Nov 12 08:31:48 2008
New Revision: 713419
URL: http://svn.apache.org/viewvc?rev=713419&view=rev
Log:
Fixed a couple of bugs which were cropping up in the perf test.
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java?rev=713419&r1=713418&r2=713419&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java Wed Nov 12 08:31:48 2008
@@ -209,7 +209,6 @@
}
DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
try {
- reader.readLocationDetails(location);
while (reader.readLocationDetailsAndValidate(location)) {
location.setOffset(location.getOffset() + location.getSize());
}
@@ -426,6 +425,7 @@
if( dataFiles.getTail().getDataFileId() == loc.getDataFileId() ) {
// It's an update to the current log file..
dataFile = dataFiles.getTail();
+ dataFile.incrementLength(length);
} else if( dataFiles.getTail().getDataFileId()+1 == loc.getDataFileId() ) {
// It's an update to the next log file.
int nextNum = loc.getDataFileId();
@@ -438,8 +438,6 @@
} else {
throw new IOException("Invalid external append.");
}
-
- dataFile.incrementLength(length);
}
public synchronized Location getNextLocation(Location location) throws IOException, IllegalStateException {
@@ -484,7 +482,7 @@
// Load in location size and type.
DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
try {
- reader.readLocationDetails(cur);
+ reader.readLocationDetails(cur);
} finally {
accessorPool.closeDataFileAccessor(reader);
}
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java?rev=713419&r1=713418&r2=713419&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java Wed Nov 12 08:31:48 2008
@@ -24,7 +24,6 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -108,6 +107,7 @@
}
public void onClusterChange(ClusterState config) {
+ // For now, we don't really care about changes in the slave config..
}
@@ -115,7 +115,6 @@
* This is called by the Journal so that we can replicate the update to the
* slaves.
*/
- @Override
public void replicate(Location location, ByteSequence sequence, boolean sync) {
if( sessions.isEmpty() )
return;
@@ -259,7 +258,7 @@
for (DataFile df : journalFiles.values()) {
// Look at what the slave has so that only the missing bits are transfered.
String name = "journal-" + df.getDataFileId();
- PBFileInfo slaveInfo = slaveFiles.get(name);
+ PBFileInfo slaveInfo = slaveFiles.remove(name);
// Use the checksum info to see if the slave has the file already.. Checksums are less acurrate for
// small amounts of data.. so ignore small files.
@@ -292,6 +291,13 @@
snapshotInfos.add(info);
rcPayload.setCopyFilesList(snapshotInfos);
+ ArrayList<String> deleteFiles = new ArrayList<String>();
+ slaveFiles.remove("database");
+ for (PBFileInfo unused : slaveFiles.values()) {
+ deleteFiles.add(unused.getName());
+ }
+ rcPayload.setDeleteFilesList(deleteFiles);
+
updateJournalReplicatedFiles();
}
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java?rev=713419&r1=713418&r2=713419&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java Wed Nov 12 08:31:48 2008
@@ -118,7 +118,6 @@
// If the slave service was not yet started.. start it up.
if (slave == null) {
LOG.info("Starting replication slave.");
- store.open();
slave = new ReplicationSlave(this);
slave.start();
}
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java?rev=713419&r1=713418&r2=713419&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java Wed Nov 12 08:31:48 2008
@@ -25,6 +25,7 @@
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.net.URI;
+import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
@@ -59,90 +60,164 @@
private final ReplicationServer replicationServer;
private Transport transport;
+ // Used to bulk transfer the master state over to the slave..
+ private final Object transferMutex = new Object();
+ private final LinkedList<PBFileInfo> transferQueue = new LinkedList<PBFileInfo>();
+ private final LinkedList<TransferSession> transferSessions = new LinkedList<TransferSession>();
+ private final HashMap<String, PBFileInfo> bulkFiles = new HashMap<String, PBFileInfo>();
+ private PBSlaveInitResponse initResponse;
+ private boolean online;
+ private final AtomicBoolean started = new AtomicBoolean();
+
+ // Used to do real time journal updates..
+ int journalUpdateFileId;
+ RandomAccessFile journalUpateFile;
+ private String master;
+
public ReplicationSlave(ReplicationServer replicationServer) {
this.replicationServer = replicationServer;
+ master = replicationServer.getClusterState().getMaster();
}
public void start() throws Exception {
- transport = TransportFactory.connect(new URI(replicationServer.getClusterState().getMaster()));
- transport.setTransportListener(this);
- transport.start();
-
- // Make sure the replication directory exists.
- replicationServer.getTempReplicationDir().mkdirs();
-
- ReplicationFrame frame = new ReplicationFrame();
- frame.setHeader(new PBHeader().setType(PBType.SLAVE_INIT));
- PBSlaveInit payload = new PBSlaveInit();
- payload.setNodeId(replicationServer.getNodeId());
-
- // This call back is executed once the checkpoint is
- // completed and all data has been
- // synced to disk, but while a lock is still held on the
- // store so that no
- // updates are allowed.
+ if( started.compareAndSet(false, true)) {
+ doStart();
+ }
+ }
+
+ public void stop() throws Exception {
+ if( started.compareAndSet(true, false)) {
+ doStop();
+ }
+ }
- HashMap<String, PBFileInfo> infosMap = new HashMap<String, PBFileInfo>();
-
- // Add all the files that were being transfered..
- File tempReplicationDir = replicationServer.getTempReplicationDir();
- File[] list = tempReplicationDir.listFiles();
- if( list!=null ) {
- for (File file : list) {
- String name = file.getName();
- if( name.startsWith("database-") ) {
- int snapshot;
- try {
- snapshot = Integer.parseInt(name.substring("database-".length()));
- } catch (NumberFormatException e) {
- continue;
+ private void doStart() throws Exception, URISyntaxException, IOException {
+ synchronized (transferMutex) {
+
+ // Failure recovery might be trying to start us back up,
+ // but the Replication server may have already stopped us so there is not need to start up.
+ if( !started.get() ) {
+ return;
+ }
+
+ replicationServer.getStore().open();
+
+ transport = TransportFactory.connect(new URI(master));
+ transport.setTransportListener(this);
+ transport.start();
+
+ // Make sure the replication directory exists.
+ replicationServer.getTempReplicationDir().mkdirs();
+
+ ReplicationFrame frame = new ReplicationFrame();
+ frame.setHeader(new PBHeader().setType(PBType.SLAVE_INIT));
+ PBSlaveInit payload = new PBSlaveInit();
+ payload.setNodeId(replicationServer.getNodeId());
+
+ // This call back is executed once the checkpoint is
+ // completed and all data has been
+ // synced to disk, but while a lock is still held on the
+ // store so that no
+ // updates are allowed.
+
+ HashMap<String, PBFileInfo> infosMap = new HashMap<String, PBFileInfo>();
+
+ // Add all the files that were being transfered..
+ File tempReplicationDir = replicationServer.getTempReplicationDir();
+ File[] list = tempReplicationDir.listFiles();
+ if( list!=null ) {
+ for (File file : list) {
+ String name = file.getName();
+ if( name.startsWith("database-") ) {
+ int snapshot;
+ try {
+ snapshot = Integer.parseInt(name.substring("database-".length()));
+ } catch (NumberFormatException e) {
+ continue;
+ }
+
+ PBFileInfo info = replicationServer.createInfo("database", file, 0, file.length());
+ info.setSnapshotId(snapshot);
+ infosMap.put("database", info);
+ } else if( name.startsWith("journal-") ) {
+ PBFileInfo info = replicationServer.createInfo(name, file, 0, file.length());
+ infosMap.put(name, info);
}
-
- PBFileInfo info = replicationServer.createInfo("database", file, 0, file.length());
- info.setSnapshotId(snapshot);
- infosMap.put("database", info);
- } else if( name.startsWith("journal-") ) {
- PBFileInfo info = replicationServer.createInfo(name, file, 0, file.length());
- infosMap.put(name, info);
}
}
- }
-
- // Add all the db files that were not getting transfered..
- KahaDBStore store = replicationServer.getStore();
- Map<Integer, DataFile> journalFiles = store.getJournal().getFileMap();
- for (DataFile df : journalFiles.values()) {
- String name = "journal-" + df.getDataFileId();
- // Did we have a transfer in progress for that file already?
- if( infosMap.containsKey(name) ) {
- continue;
- }
- infosMap.put(name, replicationServer.createInfo(name, df.getFile(), 0, df.getLength()));
- }
- if( !infosMap.containsKey("database") ) {
- File pageFile = store.getPageFile().getFile();
- if( pageFile.exists() ) {
- infosMap.put("database", replicationServer.createInfo("database", pageFile, 0, pageFile.length()));
+
+ // Add all the db files that were not getting transfered..
+ KahaDBStore store = replicationServer.getStore();
+ Map<Integer, DataFile> journalFiles = store.getJournal().getFileMap();
+ for (DataFile df : journalFiles.values()) {
+ String name = "journal-" + df.getDataFileId();
+ // Did we have a transfer in progress for that file already?
+ if( infosMap.containsKey(name) ) {
+ continue;
+ }
+ infosMap.put(name, replicationServer.createInfo(name, df.getFile(), 0, df.getLength()));
+ }
+ if( !infosMap.containsKey("database") ) {
+ File pageFile = store.getPageFile().getFile();
+ if( pageFile.exists() ) {
+ infosMap.put("database", replicationServer.createInfo("database", pageFile, 0, pageFile.length()));
+ }
}
+
+ ArrayList<PBFileInfo> infos = new ArrayList<PBFileInfo>(infosMap.size());
+ for (PBFileInfo info : infosMap.values()) {
+ infos.add(info);
+ }
+ payload.setCurrentFilesList(infos);
+
+ frame.setPayload(payload);
+ LOG.info("Sending master slave init command: " + payload);
+ online = false;
+ transport.oneway(frame);
}
-
- ArrayList<PBFileInfo> infos = new ArrayList<PBFileInfo>(infosMap.size());
- for (PBFileInfo info : infosMap.values()) {
- infos.add(info);
- }
- payload.setCurrentFilesList(infos);
-
- frame.setPayload(payload);
- LOG.info("Sending master slave init command: " + payload);
- bulkSynchronizing = true;
- transport.oneway(frame);
-
}
- public void stop() throws Exception {
+ private void doStop() throws Exception, IOException {
+ synchronized (transferMutex) {
+ if( this.transport!=null ) {
+ this.transport.stop();
+ this.transport=null;
+ }
+
+ // Stop any current transfer sessions.
+ for (TransferSession session : this.transferSessions) {
+ session.stop();
+ }
+
+ this.transferQueue.clear();
+
+ this.initResponse=null;
+ this.bulkFiles.clear();
+ this.online=false;
+
+ if( journalUpateFile !=null ) {
+ journalUpateFile.close();
+ journalUpateFile=null;
+ }
+ journalUpdateFileId=0;
+
+ replicationServer.getStore().close();
+ }
}
public void onClusterChange(ClusterState config) {
+ synchronized (transferMutex) {
+ // When the master changes.. we need to re-sync with the new master.
+ if( !master.equals(config.getMaster()) ) {
+ try {
+ doStop();
+ master = config.getMaster();
+ doStart();
+ } catch (Exception e) {
+ LOG.error("Could not restart syncing with new master: "+config.getMaster()+", due to: "+e,e);
+ }
+ }
+ }
}
public void onCommand(Object command) {
@@ -164,29 +239,23 @@
failed(error);
}
- public void failed(Exception error) {
+ public void failed(Throwable error) {
try {
- LOG.warn("Replication session fail to master: "+transport.getRemoteAddress(), error);
- stop();
+ if( started.get() ) {
+ LOG.warn("Replication session fail to master: "+transport.getRemoteAddress(), error);
+ doStop();
+ // Wait a little an try to establish the session again..
+ Thread.sleep(1000);
+ doStart();
+ }
} catch (Exception ignore) {
}
}
public void transportInterupted() {
}
-
public void transportResumed() {
}
-
- private Object transferMutex = new Object();
- private LinkedList<PBFileInfo> transferQueue = new LinkedList<PBFileInfo>();
- private boolean bulkSynchronizing;
- private PBSlaveInitResponse initResponse;
-
- int journalUpdateFileId;
- RandomAccessFile journalUpateFile;
-
- HashMap<String, PBFileInfo> bulkFiles = new HashMap<String, PBFileInfo>();
private void onJournalUpdate(ReplicationFrame frame, PBJournalUpdate update) throws IOException {
boolean onlineRecovery=false;
@@ -199,7 +268,7 @@
}
File file;
String name = "journal-"+location.getFileId();
- if( bulkSynchronizing ) {
+ if( !online ) {
file = replicationServer.getTempReplicationFile(name, 0);
if( !bulkFiles.containsKey(name) ) {
bulkFiles.put(name, new PBFileInfo().setName(name));
@@ -211,10 +280,12 @@
}
journalUpateFile = new RandomAccessFile(file, "rw");
journalUpdateFileId = location.getFileId();
- }
+ }
+
+// System.out.println("Writing: "+location.getFileId()+":"+location.getOffset()+" with "+data.length);
journalUpateFile.seek(location.getOffset());
journalUpateFile.write(data);
- if( !bulkSynchronizing ) {
+ if( online ) {
onlineRecovery=true;
}
}
@@ -235,45 +306,52 @@
return rc;
}
- private void commitBulkTransfer() throws IOException {
- synchronized (transferMutex) {
+ private void commitBulkTransfer() {
+ try {
- LOG.info("Slave synhcronization complete, going online...");
+ synchronized (transferMutex) {
+
+ LOG.info("Slave synhcronization complete, going online...");
- if( journalUpateFile!=null ) {
- journalUpateFile.close();
- journalUpateFile=null;
- }
- replicationServer.getStore().close();
-
- // If we got a new snapshot of the database, then we need to
- // delete it's assisting files too.
- if( bulkFiles.containsKey("database") ) {
- PageFile pageFile = replicationServer.getStore().getPageFile();
- pageFile.getRecoveryFile().delete();
- pageFile.getFreeFile().delete();
- }
-
- for (PBFileInfo info : bulkFiles.values()) {
- File from = replicationServer.getTempReplicationFile(info.getName(), info.getSnapshotId());
- File to = replicationServer.getReplicationFile(info.getName());
- move(from, to);
+ replicationServer.getStore().close();
+
+ if( journalUpateFile!=null ) {
+ journalUpateFile.close();
+ journalUpateFile=null;
+ }
+
+ // If we got a new snapshot of the database, then we need to
+ // delete it's assisting files too.
+ if( bulkFiles.containsKey("database") ) {
+ PageFile pageFile = replicationServer.getStore().getPageFile();
+ pageFile.getRecoveryFile().delete();
+ pageFile.getFreeFile().delete();
+ }
+
+ for (PBFileInfo info : bulkFiles.values()) {
+ File from = replicationServer.getTempReplicationFile(info.getName(), info.getSnapshotId());
+ File to = replicationServer.getReplicationFile(info.getName());
+ to.getParentFile().mkdirs();
+ move(from, to);
+ }
+
+ delete(initResponse.getDeleteFilesList());
+ online=true;
+
+ replicationServer.getStore().open();
+
+ LOG.info("Slave is now online. We are now eligible to become the master.");
}
- delete(initResponse.getDeleteFilesList());
- bulkSynchronizing=false;
+ // Let the master know we are now online.
+ ReplicationFrame frame = new ReplicationFrame();
+ frame.setHeader(new PBHeader().setType(PBType.SLAVE_ONLINE));
+ transport.oneway(frame);
- replicationServer.getStore().open();
-
- LOG.info("Slave is now online. We are now eligible to become the master.");
+ } catch (Throwable e) {
+ e.printStackTrace();
+ failed(e);
}
-
- // Let the master know we are now online.
- ReplicationFrame frame = new ReplicationFrame();
- frame.setHeader(new PBHeader().setType(PBType.SLAVE_ONLINE));
- transport.oneway(frame);
-
- replicationServer.getStore().incrementalRecover();
}
private void onSlaveInitResponse(ReplicationFrame frame, PBSlaveInitResponse response) throws Exception {
@@ -320,11 +398,9 @@
}
}
- LinkedList<TransferSession> transferSessions = new LinkedList<TransferSession>();
-
- private void addTransferSession() throws Exception {
+ private void addTransferSession() {
synchronized (transferMutex) {
- while (!transferQueue.isEmpty() && transferSessions.size() < MAX_TRANSFER_SESSIONS) {
+ while (transport!=null && !transferQueue.isEmpty() && transferSessions.size() < MAX_TRANSFER_SESSIONS) {
TransferSession transferSession = new TransferSession();
transferSessions.add(transferSession);
try {
@@ -357,11 +433,11 @@
} finally {
try {
is.close();
- } finally {
+ } catch(Throwable e) {
}
try {
os.close();
- } finally {
+ } catch(Throwable e) {
}
}
from.delete();
@@ -418,22 +494,11 @@
}
info = null;
}
- Thread stopThread = new Thread("Transfer Session Shutdown: " + transport.getRemoteAddress()) {
- @Override
- public void run() {
- try {
- transport.stop();
- synchronized (transferMutex) {
- transferSessions.remove(TransferSession.this);
- addTransferSession();
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- };
- stopThread.setDaemon(true);
- stopThread.start();
+ transport.stop();
+ synchronized (transferMutex) {
+ transferSessions.remove(TransferSession.this);
+ addTransferSession();
+ }
}
}
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java?rev=713419&r1=713418&r2=713419&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java Wed Nov 12 08:31:48 2008
@@ -149,16 +149,16 @@
protected boolean syncWrites=true;
int checkpointInterval = 5*1000;
int cleanupInterval = 30*1000;
- boolean opened;
protected AtomicBoolean started = new AtomicBoolean();
+ protected AtomicBoolean opened = new AtomicBoolean();
public MessageDatabase() {
}
public void start() throws Exception {
if (started.compareAndSet(false, true)) {
- load();
+ load();
}
}
@@ -211,7 +211,7 @@
}
public void open() throws IOException {
- if( !opened ) {
+ if( opened.compareAndSet(false, true) ) {
getJournal();
if (failIfJournalIsLocked) {
journal.lock();
@@ -232,74 +232,75 @@
getPageFile();
journal.start();
loadPageFile();
- opened=true;
+
+ checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
+ public void run() {
+ try {
+ long lastCleanup = System.currentTimeMillis();
+ long lastCheckpoint = System.currentTimeMillis();
+
+ // Sleep for a short time so we can periodically check
+ // to see if we need to exit this thread.
+ long sleepTime = Math.min(checkpointInterval, 500);
+ while (opened.get()) {
+ Thread.sleep(sleepTime);
+ long now = System.currentTimeMillis();
+ if( now - lastCleanup >= cleanupInterval ) {
+ checkpointCleanup(true);
+ lastCleanup = now;
+ lastCheckpoint = now;
+ } else if( now - lastCheckpoint >= checkpointInterval ) {
+ checkpointCleanup(false);
+ lastCheckpoint = now;
+ }
+ }
+ } catch (InterruptedException e) {
+ // Looks like someone really wants us to exit this thread...
+ }
+ }
+ };
+ checkpointThread.start();
+ recover();
}
}
public void load() throws IOException {
- open();
- if (deleteAllMessages) {
- journal.delete();
-
- pageFile.unload();
- pageFile.delete();
- metadata = new Metadata();
-
- LOG.info("Persistence store purged.");
- deleteAllMessages = false;
-
- loadPageFile();
- }
-
synchronized (indexMutex) {
- recover();
+ open();
+
+ if (deleteAllMessages) {
+ journal.delete();
+
+ pageFile.unload();
+ pageFile.delete();
+ metadata = new Metadata();
+
+ LOG.info("Persistence store purged.");
+ deleteAllMessages = false;
+
+ loadPageFile();
+ }
+ store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
+
}
- checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
- public void run() {
- try {
- long lastCleanup = System.currentTimeMillis();
- long lastCheckpoint = System.currentTimeMillis();
-
- // Sleep for a short time so we can periodically check
- // to see if we need to exit this thread.
- long sleepTime = Math.min(checkpointInterval, 500);
- while (started.get()) {
- Thread.sleep(sleepTime);
- long now = System.currentTimeMillis();
- if( now - lastCleanup >= cleanupInterval ) {
- checkpointCleanup(true);
- lastCleanup = now;
- lastCheckpoint = now;
- } else if( now - lastCheckpoint >= checkpointInterval ) {
- checkpointCleanup(false);
- lastCheckpoint = now;
- }
- }
- } catch (InterruptedException e) {
- // Looks like someone really wants us to exit this thread...
- }
- }
- };
- checkpointThread.start();
}
- public void close() throws IOException {
- synchronized (indexMutex) {
- pageFile.unload();
- metadata = new Metadata();
- }
- journal.close();
- opened=false;
+ public void close() throws IOException, InterruptedException {
+ if( opened.compareAndSet(true, false)) {
+ synchronized (indexMutex) {
+ pageFile.unload();
+ metadata = new Metadata();
+ }
+ journal.close();
+ checkpointThread.join();
+ }
}
public void unload() throws IOException, InterruptedException {
- checkpointThread.join();
-
synchronized (indexMutex) {
-
metadata.state = CLOSED_STATE;
metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
@@ -308,13 +309,8 @@
tx.store(metadata.page, metadataMarshaller, true);
}
});
-
- pageFile.unload();
- metadata = new Metadata();
+ close();
}
- store(new KahaTraceCommand().setMessage("CLEAN SHUTDOWN " + new Date()));
- journal.close();
- opened=false;
}
/**
@@ -356,6 +352,7 @@
while (recoveryPosition != null) {
JournalCommand message = load(recoveryPosition);
+ metadata.lastUpdate = recoveryPosition;
process(message, recoveryPosition);
redoCounter++;
recoveryPosition = journal.getNextLocation(recoveryPosition);
@@ -377,6 +374,7 @@
}
while (nextRecoveryPosition != null) {
lastRecoveryPosition = nextRecoveryPosition;
+ metadata.lastUpdate = lastRecoveryPosition;
JournalCommand message = load(lastRecoveryPosition);
process(message, lastRecoveryPosition);
nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
@@ -403,15 +401,17 @@
protected void checkpointCleanup(final boolean cleanup) {
try {
synchronized (indexMutex) {
+ if( !opened.get() ) {
+ return;
+ }
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
checkpointUpdate(tx, cleanup);
}
});
- pageFile.flush();
}
- store(new KahaTraceCommand().setMessage("CHECKPOINT " + new Date()), true);
} catch (IOException e) {
+ e.printStackTrace();
}
}
@@ -426,7 +426,6 @@
pageFile.flush();
closure.execute();
}
- store(new KahaTraceCommand().setMessage("CHECKPOINT " + new Date()), true);
}
// /////////////////////////////////////////////////////////////////