You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/08/11 21:38:17 UTC
svn commit: r684888 - in
/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash:
HashBin.java HashIndex.java HashPage.java HashPageInfo.java
Author: chirino
Date: Mon Aug 11 12:38:16 2008
New Revision: 684888
URL: http://svn.apache.org/viewvc?rev=684888&view=rev
Log:
Added a little better logging and fixed the HashTest
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashBin.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashIndex.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashPage.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashPageInfo.java
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashBin.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashBin.java?rev=684888&r1=684887&r2=684888&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashBin.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashBin.java Mon Aug 11 12:38:16 2008
@@ -326,7 +326,7 @@
while (page != null) {
page.begin();
str +=page.dump();
- page.end();
+ page.end(id);
page = (HashPageInfo) page.getNext();
}
return str;
@@ -334,7 +334,7 @@
private void end() throws IOException {
HashPageInfo page = root;
while (page != null) {
- page.end();
+ page.end(id);
page = (HashPageInfo) page.getNext();
}
}
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashIndex.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashIndex.java?rev=684888&r1=684887&r2=684888&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashIndex.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashIndex.java Mon Aug 11 12:38:16 2008
@@ -424,13 +424,14 @@
return page;
}
- LOG.debug("Reading page: "+id);
readIndexFile.seek(id);
readIndexFile.readFully(readBuffer, 0, pageSize);
dataIn.restart(readBuffer);
HashPage page = new HashPage(this);
page.setOffset(id);
page.read(keyMarshaller, dataIn);
+
+ LOG.debug("Read in page: "+id+", contains keys: "+page.getKeys());
return page;
}
@@ -516,6 +517,7 @@
}
private void doCompress() throws IOException {
+ LOG.debug("Compress to new index file: start");
String backFileName = name + "-COMPRESS";
HashIndex backIndex = new HashIndex(directory,backFileName,indexManager);
backIndex.setKeyMarshaller(keyMarshaller);
@@ -538,13 +540,16 @@
offset += pageSize;
}
backIndex.unload();
-
+ LOG.debug("Compress to new index file: end");
+
+ LOG.debug("Compress loading new index file: start");
unload();
IOHelper.deleteFile(file);
IOHelper.copyFile(backFile, file);
IOHelper.deleteFile(backFile);
openIndexFile();
doLoad();
+ LOG.debug("Compress loading new index file: end");
}
private void resize(int newCapacity) throws IOException {
@@ -562,6 +567,7 @@
if (newCapacity != numberOfBins) {
LOG.info("Resize hash bins " + this.name + " from " + numberOfBins + " to " + newCapacity);
+ LOG.debug("Resize create new index file: start");
String backFileName = name + "-REISZE";
HashIndex backIndex = new HashIndex(directory,backFileName,indexManager);
backIndex.setKeyMarshaller(keyMarshaller);
@@ -584,7 +590,9 @@
offset += pageSize;
}
backIndex.unload();
+ LOG.debug("Resize create new index file: end");
+ LOG.debug("Resize reloading new index file: start");
unload();
IOHelper.deleteFile(file);
IOHelper.copyFile(backFile, file);
@@ -594,6 +602,7 @@
threshold = calculateThreashold();
openIndexFile();
doLoad();
+ LOG.debug("Resize reloading new index file: end");
}
}
}else {
@@ -684,11 +693,15 @@
throw new IOException("Page Size overflow: pageSize is " + pageSize + " trying to write " + dataOut.size());
}
+ if( !page.isActive() && !page.getEntries().isEmpty() ) {
+ throw new IOException("Page is not active but is also not empty");
+ }
+
// ByteSequence data = new ByteSequence(dataOut.getData(), 0, dataOut.size());
ByteSequence data = new ByteSequence(dataOut.getData(), 0, pageSize);
Long key = page.getOffset();
- LOG.debug("Page write request at: "+page.getOffset()+", keys: ");
+ LOG.debug("Page write request for offset: "+page.getOffset()+", contains keys: "+page.getKeys());
synchronized( writes ) {
// If it's not in the write cache...
PageWrite write = writes.get(key);
@@ -711,7 +724,7 @@
* @throws InterruptedException
* @throws IOException
*/
- public boolean pollWrites(long timeout, TimeUnit unit) throws IOException {
+ public boolean doWrites(long timeout, TimeUnit unit) throws IOException {
int batchLength=8+4; // Account for the: lastTxid + recovery record counter
ArrayList<PageWrite> batch = new ArrayList<PageWrite>(MAX_PAGES_IN_RECOVERY_BUFFER);
@@ -752,14 +765,13 @@
}
long txId = nextTxid.get();
- LOG.debug("Starting write batch transaction id: "+txId);
+ StringBuilder pageOffsets = new StringBuilder();
// Now the batch array has all the writes, write the batch to the recovery buffer.
writeIndexFile.seek(0);
writeIndexFile.writeLong(txId); // write txid of the batch
writeIndexFile.writeInt(batch.size()); // write the recovery record counter.
for (PageWrite w : batch) {
- LOG.debug("Journaling write at offset: "+w.page.getOffset());
writeIndexFile.writeLong(w.page.getOffset());
writeIndexFile.writeInt(w.diskBound.length);
writeIndexFile.write(w.diskBound.data, w.diskBound.offset, w.diskBound.length);
@@ -770,14 +782,18 @@
// Now update the actual index locations
for (PageWrite w : batch) {
- LOG.debug("Doing write at offset: "+w.page.getOffset());
+ if( pageOffsets.length()!=0 ) {
+ pageOffsets.append(", ");
+ }
+ pageOffsets.append(w.page.getOffset());
writeIndexFile.seek(w.page.getOffset());
writeIndexFile.write(w.diskBound.data, w.diskBound.offset, w.diskBound.length);
}
// Sync again
writeIndexFile.getFD().sync();
-
+ LOG.debug("Index write complete tx: "+txId+", pages: "+pageOffsets);
+
nextTxid.incrementAndGet();
synchronized( writes ) {
for (PageWrite w : batch) {
@@ -947,7 +963,7 @@
public void run() {
try {
while( !stopWriter.get() ) {
- pollWrites(500, TimeUnit.MILLISECONDS);
+ doWrites(500, TimeUnit.MILLISECONDS);
}
} catch (IOException e) {
e.printStackTrace();
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashPage.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashPage.java?rev=684888&r1=684887&r2=684888&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashPage.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashPage.java Mon Aug 11 12:38:16 2008
@@ -210,4 +210,15 @@
public void setTxId(long txId) {
this.txId = txId;
}
+
+ public String getKeys() {
+ StringBuilder sb = new StringBuilder();
+ for (HashEntry e : hashIndexEntries) {
+ if( sb.length() != 0 ) {
+ sb.append(", ");
+ }
+ sb.append(e.getKey());
+ }
+ return sb.toString();
+ }
}
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashPageInfo.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashPageInfo.java?rev=684888&r1=684887&r2=684888&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashPageInfo.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashPageInfo.java Mon Aug 11 12:38:16 2008
@@ -98,9 +98,10 @@
}
}
- void end() throws IOException {
+ void end(int id) throws IOException {
if (page != null) {
if (dirty) {
+ page.setBinId(id);
hashIndex.writeFullPage(page);
}
}