You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/08/22 03:16:06 UTC
svn commit: r687919 - in /activemq/sandbox/kahadb/src:
main/java/org/apache/kahadb/page/ main/java/org/apache/kahadb/util/
test/java/org/apache/kahadb/page/
Author: chirino
Date: Thu Aug 21 18:16:06 2008
New Revision: 687919
URL: http://svn.apache.org/viewvc?rev=687919&view=rev
Log:
Added a HashIndex implementation that uses the PageFile layer.
Added:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Chunk.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashBin.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashIndex.java
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/ChunkTest.java
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashIndexBenchMark.java
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashTest.java
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Page.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/ByteSequence.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/LinkedNodeList.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java
Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Chunk.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Chunk.java?rev=687919&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Chunk.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Chunk.java Thu Aug 21 18:16:06 2008
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kahadb.page;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.LinkedList;
+
+import org.apache.kahadb.Marshaller;
+import org.apache.kahadb.util.ByteArrayOutputStream;
+import org.apache.kahadb.util.ByteSequence;
+
+/**
+ * Represents a Chunk of data in a chunk stream. Use the PageInputStream and PageOnputStream classes to access
+ * a linked set of chunks on a PageFile.
+ *
+ */
+public class Chunk extends ByteSequence {
+
+ static final int HEADER_MAX_SIZE=9;
+
+ boolean last;
+ long next;
+
+ public Chunk() {
+ super();
+ }
+
+ public Chunk(byte[] data, int offset, int length) {
+ super(data, offset, length);
+ }
+
+ public Chunk(byte[] data) {
+ super(data);
+ }
+
+ @Override
+ public String toString() {
+ return "Chunk{length: "+length+", last; "+last+", next:"+next+"}";
+ }
+
+ public static class ChunkMarshaller implements Marshaller<Chunk> {
+ private final int chunkSize;
+
+ public ChunkMarshaller(int chunkSize) {
+ this.chunkSize = chunkSize;
+ }
+
+ public Class<Chunk> getType() {
+ return Chunk.class;
+ }
+
+ public void writePayload(Chunk chunk, DataOutput out) throws IOException {
+ if( chunk.last ) {
+ out.writeBoolean(true);
+ out.writeInt(chunk.length);
+ out.write(chunk.data, chunk.offset, chunk.length);
+ } else {
+ out.writeBoolean(false);
+ out.writeLong(chunk.next);
+ out.write(chunk.data, chunk.offset, chunk.length);
+ }
+ }
+
+ public Chunk readPayload(DataInput in) throws IOException {
+ Chunk chunk = new Chunk();
+ if( in.readBoolean() ) {
+ chunk.last=true;
+ chunk.length = in.readInt();
+ chunk.data = new byte[chunk.length];
+ chunk.next=0;
+ in.readFully(chunk.data);
+ } else {
+ chunk.last=false;
+ chunk.next = in.readLong();
+ chunk.length = chunkSize;
+ chunk.data = new byte[chunkSize];
+ in.readFully(chunk.data);
+ }
+ return chunk;
+ }
+
+ public int getChunkSize() {
+ return chunkSize;
+ }
+
+ }
+
+ public static class PageInputStream extends InputStream {
+
+ private PageFile file;
+ private Chunk chunk;
+ private int pos;
+ private int pageCount;
+
+ private int markPos;
+ private Chunk markChunk;
+ private int markPageCount;
+ private ChunkMarshaller marshaller;
+
+ public PageInputStream(PageFile file, long pageId) throws IOException {
+ this.file = file;
+ this.marshaller = new ChunkMarshaller(file.getPageContentSize()-HEADER_MAX_SIZE);
+
+ Page page = file.load(pageId, marshaller);
+ if( page.getType() != Page.CHUNK_TYPE ) {
+ throw new EOFException("Chunk stream does not exist at page: "+pageId);
+ }
+ chunk = (Chunk)page.getData();
+ pageCount++;
+
+ }
+
+ public int read() throws IOException {
+ if (!atEOF()) {
+ return chunk.data[chunk.offset+pos++] & 0xff;
+ } else {
+ return -1;
+ }
+ }
+
+ private boolean atEOF() throws IOException {
+ if( pos < chunk.length ) {
+ return false;
+ }
+ if( chunk.last ) {
+ return true;
+ }
+ fill();
+ return pos >= chunk.length;
+ }
+
+ private void fill() throws IOException {
+ Page page = file.load(chunk.next, marshaller);
+ if( page.getType() == Page.INVALID_TYPE ) {
+ throw new IOException("Invalid page: "+chunk.next);
+ }
+ chunk = (Chunk)page.getData();
+ pageCount++;
+ pos = 0;
+ }
+
+ public int read(byte[] b) throws IOException {
+ return read(b, 0, b.length);
+ }
+
+ public int read(byte b[], int off, int len) throws IOException {
+ if (!atEOF()) {
+ int rc=0;
+ while(!atEOF() && rc < len) {
+ len = Math.min(len, chunk.length - pos);
+ if (len > 0) {
+ System.arraycopy(chunk.data, chunk.offset+pos, b, off, len);
+ pos += len;
+ }
+ rc+=len;
+ }
+ return rc;
+ } else {
+ return -1;
+ }
+ }
+
+ public long skip(long len) throws IOException {
+ if (atEOF()) {
+ int rc=0;
+ while(!atEOF() && rc < len) {
+ len = Math.min(len, chunk.length - pos);
+ if (len > 0) {
+ pos += len;
+ }
+ rc+=len;
+ }
+ return rc;
+ } else {
+ return -1;
+ }
+ }
+
+ public int available() {
+ return chunk.length - pos;
+ }
+
+ public boolean markSupported() {
+ return true;
+ }
+
+ public void mark(int markpos) {
+ markPos = pos;
+ markChunk = chunk;
+ markPageCount = pageCount;
+ }
+
+ public void reset() {
+ pos = markPos;
+ chunk = markChunk;
+ pageCount = markPageCount;
+ }
+
+ public int getPageCount() {
+ return pageCount;
+ }
+
+ }
+
+ static public class PageOutputStream extends ByteArrayOutputStream {
+
+ private PageFile file;
+ private long pageId;
+ private ChunkMarshaller marshaller;
+ private int pageCount;
+ private ArrayList<Page> pages;
+
+ public PageOutputStream(PageFile file, long pageId) {
+ this.file = file;
+ this.pageId = pageId;
+ this.marshaller = new ChunkMarshaller(file.getPageContentSize()-HEADER_MAX_SIZE);
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+
+ ArrayList<Chunk> chunks = new ArrayList<Chunk>();
+ ByteSequence bs = toByteSequence();
+
+ int pos = 0;
+ while( pos < bs.length ) {
+ int len = Math.min(marshaller.getChunkSize(), bs.length - pos);
+ Chunk c = new Chunk(bs.data, pos, len);
+ chunks.add(c);
+ pos+=len;
+ }
+ if( chunks.isEmpty() ) {
+ Chunk c = new Chunk(new byte[]{});
+ chunks.add(c);
+ }
+ chunks.get(chunks.size()-1).last = true;
+
+ // Load the old pages..
+ pages = new ArrayList<Page>();
+ long p = pageId;
+ while( p >= 0 ) {
+ Page page = file.load(p, marshaller);
+ Chunk c = (Chunk)page.getData();
+ if( c!=null && !c.last ) {
+ p = c.next;
+ } else {
+ p = -1;
+ }
+ pages.add(page);
+ }
+
+ // Add more if needed.
+ while( pages.size() < chunks.size() ) {
+ pages.add(file.allocate());
+ }
+
+ // Update the page data.
+ for(int i=0; i < chunks.size(); i++) {
+ Chunk chunk = chunks.get(i);
+ Page page = pages.get(i);
+ page.setType(Page.CHUNK_TYPE);
+ page.setData(chunk);
+ if( !chunk.last ) {
+ chunk.next = pages.get(i+1).getPageId();
+ }
+ }
+
+ // If there were extra pages.. Free them up.
+ for(int i=chunks.size(); i < pages.size(); i++) {
+ Page page = pages.get(i);
+ page.setData(null);
+ page.setType(Page.FREE_TYPE);
+ }
+
+ file.write(pages, marshaller);
+
+ pageCount=chunks.size();
+ }
+
+ public int getPageCount() {
+ return pageCount;
+ }
+
+ }
+
+}
\ No newline at end of file
Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashBin.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashBin.java?rev=687919&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashBin.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashBin.java Thu Aug 21 18:16:06 2008
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.page;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.kahadb.page.Chunk.PageInputStream;
+import org.apache.kahadb.page.Chunk.PageOutputStream;
+
+/**
+ * Bin in a HashIndex
+ *
+ * @version $Revision: 1.1.1.1 $
+ */
+class HashBin {
+
+ private final HashIndex index;
+ private final long pageId;
+ private TreeMap<Comparable, Long> data;
+ private int pageCount;
+
+ /**
+ * Constructor
+ *
+ * @param hashIndex
+ * @param pageId
+ * @param maximumEntries
+ * @throws IOException
+ */
+ HashBin(HashIndex hashIndex, long pageId) throws IOException {
+ this.index = hashIndex;
+ this.pageId = pageId;
+ }
+
+ private void load() throws IOException {
+
+ data = new TreeMap<Comparable, Long>();
+
+ // Using page page streams to store the data makes it easy to marshall the HashBin data,
+ // but it does not give us very good page based caching. As even if the pages are cached,
+ // we will still need to de-marshall from the stream.
+
+ // I think it will be better to make the bin a btree root.
+
+ PageInputStream pis = new PageInputStream(index.getPageFile(), pageId);
+ DataInputStream is = new DataInputStream(pis);
+ try {
+
+ int size = is.readInt();
+ for(int i=0; i < size; i++) {
+ Comparable key = (Comparable)index.getKeyMarshaller().readPayload(is);
+ long value = is.readLong();
+ data.put(key, value);
+ }
+ is.close();
+ pageCount = pis.getPageCount();
+ } catch (IOException e) {
+ throw e;
+ }
+ }
+
+ public void store() throws IOException {
+ PageOutputStream pos = new PageOutputStream(index.getPageFile(), pageId);
+ DataOutputStream os = new DataOutputStream(pos);
+ if( data == null ) {
+ os.writeInt(0);
+ } else {
+ os.writeInt(data.size());
+ for (Map.Entry<Comparable, Long> entry : data.entrySet()) {
+ index.getKeyMarshaller().writePayload(entry.getKey(), os);
+ os.writeLong(entry.getValue());
+ }
+ }
+ os.close();
+ pageCount = pos.getPageCount();
+ }
+
+ public int size() throws IOException {
+ if( data!=null ) {
+ return data.size();
+ } else {
+
+ // Peek at the page to see how many items it contains.
+ PageInputStream pis = new PageInputStream(index.getPageFile(), pageId);
+ DataInputStream is = new DataInputStream(pis);
+ int size = is.readInt();
+ is.close();
+
+ return size;
+ }
+ }
+
+ public Long put(Comparable key, Long value) throws IOException {
+ if( data==null ) {
+ load();
+ }
+ Long rc = data.put(key, value);
+ if( !value.equals(rc) ) {
+ store();
+ }
+ return rc;
+ }
+
+ public Long find(Comparable key) throws IOException {
+ if( data==null ) {
+ load();
+ }
+ return data.get(key);
+ }
+
+ public Map<Comparable, Long> getAll() throws IOException {
+ if( data==null ) {
+ load();
+ }
+ return data;
+ }
+
+ public Long remove(Comparable key) throws IOException {
+ if( data==null ) {
+ load();
+ }
+ Long rc = data.remove(key);
+ if( rc!=null ) {
+ store();
+ }
+ return rc;
+ }
+
+ public long getPageId() {
+ return pageId;
+ }
+
+}
Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashIndex.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashIndex.java?rev=687919&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashIndex.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashIndex.java Thu Aug 21 18:16:06 2008
@@ -0,0 +1,456 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.page;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.Marshaller;
+import org.apache.kahadb.StoreEntry;
+import org.apache.kahadb.impl.index.Index;
+import org.apache.kahadb.impl.index.IndexManager;
+import org.apache.kahadb.page.Chunk.PageInputStream;
+import org.apache.kahadb.page.Chunk.PageOutputStream;
+
+/**
+ * BTree implementation
+ *
+ * @version $Revision: 1.1.1.1 $
+ */
+public class HashIndex implements Index {
+
+ public static final int CLOSED_STATE = 1;
+ public static final int OPEN_STATE = 2;
+ public static final int INITIALIZING_STATE = 3;
+
+ public static final int RESIZING_PHASE1_STATE = 4;
+ public static final int RESIZING_PHASE2_STATE = 5;
+
+ private static final Log LOG = LogFactory.getLog(HashIndex.class);
+
+ public static final int DEFAULT_BIN_CAPACITY;
+ public static final int DEFAULT_MAXIMUM_BIN_CAPACITY;
+ public static final int DEFAULT_MINIMUM_BIN_CAPACITY;
+ public static final int DEFAULT_LOAD_FACTOR;
+
+ static {
+ DEFAULT_BIN_CAPACITY = Integer.parseInt(System.getProperty("defaultBinSize", "1024"));
+ DEFAULT_MAXIMUM_BIN_CAPACITY = Integer.parseInt(System.getProperty("maximumCapacity", "16384"));
+ DEFAULT_MINIMUM_BIN_CAPACITY = Integer.parseInt(System.getProperty("minimumCapacity", "16"));
+ DEFAULT_LOAD_FACTOR = Integer.parseInt(System.getProperty("defaultLoadFactor", "75"));
+ }
+
+ private IndexManager indexManager;
+
+ private Marshaller keyMarshaller;
+ private AtomicBoolean loaded = new AtomicBoolean();
+
+ private int size;
+
+ private int increaseThreshold;
+ private int decreaseThreshold;
+
+ // Where the bin page array starts at.
+ private long binPageId;
+ private int binCapacity = DEFAULT_BIN_CAPACITY;
+ private int binsActive;
+ private int maximumBinCapacity = DEFAULT_MAXIMUM_BIN_CAPACITY;
+ private int minimumBinCapacity = DEFAULT_MINIMUM_BIN_CAPACITY;
+
+ // While resizing, the following contains the new resize data.
+ private int resizeCapacity;
+ private long resizePageId;
+
+ // When the index is initializing or resizing.. state changes so that
+ // on failure it can be properly recovered.
+ private int state;
+
+ // Once binsActive/binCapacity reaches the loadFactor, then we need to
+ // increase the capacity
+ private int loadFactor = DEFAULT_LOAD_FACTOR;
+
+ private PageFile pageFile;
+ // This page holds the index metadata.
+ private long pageId;
+
+ /**
+ * Constructor
+ *
+ * @param directory
+ * @param name
+ * @param indexManager
+ * @param numberOfBins
+ * @throws IOException
+ */
+ public HashIndex(IndexManager indexManager, PageFile pageFile, long pageId) throws IOException {
+ this.pageFile = pageFile;
+ this.indexManager = indexManager;
+ this.pageId = pageId;
+ }
+
+ public synchronized void load() {
+ if (loaded.compareAndSet(false, true)) {
+ try {
+ Page page = pageFile.load(pageId, null);
+
+ // Is this a brand new index?
+ if (page.getType() == Page.FREE_TYPE) {
+
+ // We need to create the pages for the bins
+ Page binPage = pageFile.allocate(binCapacity);
+ binPageId = binPage.getPageId();
+ state = INITIALIZING_STATE;
+ storeMetadata();
+ pageFile.checkpoint();
+
+ // If failure happens now we can continue initializing the
+ // the hash bins...
+
+ } else {
+ // Lets load it's data
+ loadMetadata();
+
+ // If we did not have a clean shutdown...
+ if (state == OPEN_STATE || state == RESIZING_PHASE1_STATE) {
+ // Figure out the size and the # of bins that are
+ // active. Yeah This loads the first page of every bin. :(
+ // We might want to put this in the metadata page, but
+ // then that page would be getting updated on every write.
+ size = 0;
+ for (int i = 0; i < binCapacity; i++) {
+ HashBin hashBin = new HashBin(this, binPageId + i);
+ int t = hashBin.size();
+ if (t > 0) {
+ binsActive++;
+ }
+ size += t;
+ }
+ }
+ }
+
+ if (state == INITIALIZING_STATE) {
+ // TODO:
+ // If a failure occurs mid way through us initializing the
+ // bins.. will the page file still think we have the rest
+ // of them previously allocated to us?
+
+ for (int i = 0; i < binCapacity; i++) {
+ HashBin hashBin = new HashBin(this, binPageId + i);
+ hashBin.store();
+ }
+ size = 0;
+ binsActive = 0;
+ }
+
+ if (state == RESIZING_PHASE1_STATE) {
+ // continue resize phase 1
+ resizePhase1();
+ }
+ if (state == RESIZING_PHASE2_STATE) {
+ // continue resize phase 1
+ resizePhase2();
+ }
+
+ calcThresholds();
+
+ state = OPEN_STATE;
+ storeMetadata();
+ pageFile.checkpoint();
+
+ LOG.debug("HashIndex loaded. Using "+binCapacity+" bins starting at page "+binPageId);
+
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public synchronized void unload() throws IOException {
+ if (loaded.compareAndSet(true, false)) {
+ state = CLOSED_STATE;
+ storeMetadata();
+ }
+ }
+
+ public synchronized StoreEntry get(Object key) throws IOException {
+ load();
+ Long result = getBin(key).find((Comparable)key);
+ return result != null ? indexManager.getIndex(result) : null;
+ }
+
+ public synchronized void store(Object key, StoreEntry value) throws IOException {
+ load();
+ HashBin bin = getBin(key);
+ if (bin.put((Comparable)key, value.getOffset()) == null) {
+ this.size++;
+ if (bin.size() == 1) {
+ binsActive++;
+ }
+ }
+ if (this.binsActive >= this.increaseThreshold) {
+ int newSize = Math.min(maximumBinCapacity, binCapacity*2);
+ if(binCapacity!=newSize) {
+ resize(newSize);
+ }
+ }
+ }
+
+ public synchronized StoreEntry remove(Object key) throws IOException {
+ load();
+ StoreEntry result = null;
+
+ HashBin bin = getBin(key);
+ Long offset = bin.remove((Comparable)key);
+ if (offset != null) {
+ this.size--;
+ if (bin.size() == 0) {
+ binsActive--;
+ }
+ result = this.indexManager.getIndex(offset);
+ }
+
+ if (this.binsActive <= this.decreaseThreshold) {
+ int newSize = Math.max(minimumBinCapacity, binCapacity/2);
+ if(binCapacity!=newSize) {
+ resize(newSize);
+
+ }
+ }
+ return result;
+ }
+
+ public synchronized boolean containsKey(Object key) throws IOException {
+ return get(key) != null;
+ }
+
+ public synchronized void clear() throws IOException {
+ load();
+ for (int i = 0; i < binCapacity; i++) {
+ HashBin hashBin = new HashBin(this, binPageId + i);
+ hashBin.store(); // A store before a load.. clears the data out.
+ }
+ size = 0;
+ binsActive = 0;
+ }
+
+ public String toString() {
+ String str = "HashIndex" + System.identityHashCode(this) + ": " + pageFile;
+ return str;
+ }
+
+ // /////////////////////////////////////////////////////////////////
+ // Implementation Methods
+ // /////////////////////////////////////////////////////////////////
+
+ private void loadMetadata() throws IOException {
+ PageInputStream pis = new PageInputStream(pageFile, pageId);
+ DataInputStream is = new DataInputStream(pis);
+ state = is.readInt();
+ binPageId = is.readLong();
+ binCapacity = is.readInt();
+ size = is.readInt();
+ binsActive = is.readInt();
+ resizePageId = is.readLong();
+ resizeCapacity = is.readInt();
+ is.close();
+ }
+
+ private void storeMetadata() throws IOException {
+ PageOutputStream pos = new PageOutputStream(pageFile, pageId);
+ DataOutputStream os = new DataOutputStream(pos);
+ os.writeInt(state);
+ os.writeLong(binPageId);
+ os.writeInt(binCapacity);
+ os.writeInt(size);
+ os.writeInt(binsActive);
+ os.writeLong(resizePageId);
+ os.writeInt(resizeCapacity);
+ os.close();
+ }
+
+ private void resize(int newSize) throws IOException {
+
+ LOG.debug("Resizing to: "+newSize);
+
+ state = RESIZING_PHASE1_STATE;
+ resizeCapacity = newSize;
+ resizePageId = pageFile.allocate(resizeCapacity).getPageId();
+ storeMetadata();
+ pageFile.checkpoint();
+ resizePhase1();
+ resizePhase2();
+ }
+
+ private void resizePhase1() throws IOException {
+ // In Phase 1 we copy the data to the new bins..
+
+ // Initialize the bins..
+ for (int i = 0; i < resizeCapacity; i++) {
+ HashBin bin = new HashBin(this, resizePageId + i);
+ bin.store();
+ }
+
+ binsActive = 0;
+ // Copy the data from the old bins to the new bins.
+ for (int i = 0; i < binCapacity; i++) {
+ HashBin bin = new HashBin(this, binPageId + i);
+ for (Map.Entry<Comparable, Long> entry : bin.getAll().entrySet()) {
+ HashBin resizeBin = getResizeBin(entry.getKey());
+ resizeBin.put(entry.getKey(), entry.getValue());
+ if( resizeBin.size() == 1) {
+ binsActive++;
+ }
+ }
+ }
+
+ // Now we can release the old data.
+ state = RESIZING_PHASE2_STATE;
+ storeMetadata();
+ pageFile.checkpoint();
+ }
+
+ private void resizePhase2() throws IOException {
+ for (int i = 0; i < binCapacity; i++) {
+ HashBin hashBin = new HashBin(this, binPageId + i);
+ hashBin.store(); // A store before a load.. clears the data out.
+ }
+ pageFile.free(binPageId, binCapacity);
+
+ binCapacity = resizeCapacity;
+ binPageId = resizePageId;
+ resizeCapacity=0;
+ resizePageId=0;
+ state = OPEN_STATE;
+ storeMetadata();
+ pageFile.checkpoint();
+ calcThresholds();
+
+ LOG.debug("Resizing done. New bins start at: "+binPageId);
+
+ }
+
+ private void calcThresholds() {
+ increaseThreshold = (binCapacity * loadFactor)/100;
+ decreaseThreshold = (binCapacity * loadFactor * loadFactor ) / 20000;
+ }
+
+ private HashBin getResizeBin(Object key) throws IOException {
+ int i = indexFor(key, resizeCapacity);
+ return new HashBin(this, resizePageId + i);
+ }
+
+ private HashBin getBin(Object key) throws IOException {
+ int i = indexFor(key, binCapacity);
+ return new HashBin(this, binPageId + i);
+ }
+
+ static int indexFor(Object x, int length) {
+ return Math.abs(x.hashCode()%length);
+ }
+
+ // /////////////////////////////////////////////////////////////////
+ // Property Accessors
+ // /////////////////////////////////////////////////////////////////
+
+ public Marshaller getKeyMarshaller() {
+ return keyMarshaller;
+ }
+
+ /**
+ * Set the marshaller for key objects
+ *
+ * @param marshaller
+ */
+ public synchronized void setKeyMarshaller(Marshaller marshaller) {
+ this.keyMarshaller = marshaller;
+ }
+
+ /**
+ * @return number of bins in the index
+ */
+ public int getBinCapacity() {
+ return this.binCapacity;
+ }
+
+ /**
+ * @param binCapacity
+ */
+ public void setBinCapacity(int binCapacity) {
+ if (loaded.get() && binCapacity != this.binCapacity) {
+ throw new RuntimeException("Pages already loaded - can't reset bin capacity");
+ }
+ this.binCapacity = binCapacity;
+ }
+
+ public boolean isTransient() {
+ return false;
+ }
+
+ /**
+ * @return the loadFactor
+ */
+ public int getLoadFactor() {
+ return loadFactor;
+ }
+
+ /**
+ * @param loadFactor the loadFactor to set
+ */
+ public void setLoadFactor(int loadFactor) {
+ this.loadFactor = loadFactor;
+ }
+
+ /**
+ * @return the maximumCapacity
+ */
+ public int setMaximumBinCapacity() {
+ return maximumBinCapacity;
+ }
+
+ /**
+ * @param maximumCapacity the maximumCapacity to set
+ */
+ public void setMaximumBinCapacity(int maximumCapacity) {
+ this.maximumBinCapacity = maximumCapacity;
+ }
+
+ public synchronized int getSize() {
+ return size;
+ }
+
+ public synchronized int getActiveBins() {
+ return binsActive;
+ }
+
+ public long getBinPageId() {
+ return binPageId;
+ }
+
+ public PageFile getPageFile() {
+ return pageFile;
+ }
+
+ public int getBinsActive() {
+ return binsActive;
+ }
+
+}
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Page.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Page.java?rev=687919&r1=687918&r2=687919&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Page.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Page.java Thu Aug 21 18:16:06 2008
@@ -39,6 +39,7 @@
public static final short INVALID_TYPE = -1;
public static final short FREE_TYPE = 0;
+ public static final short CHUNK_TYPE = 1;
private long pageId;
@@ -54,12 +55,18 @@
this.type = other.type;
this.data = other.data;
}
+
+ Page copy() {
+ Page rc = new Page();
+ rc.copy(this);
+ return rc;
+ }
void write(DataOutput os, Marshaller marshaller) throws IOException {
os.writeShort(type);
os.writeLong(txId);
- if( marshaller!=null ) {
+ if( marshaller!=null && type!=FREE_TYPE ) {
marshaller.writePayload(data, os);
}
}
@@ -67,8 +74,10 @@
void read(DataInput is, Marshaller marshaller) throws IOException {
type = is.readShort();
txId = is.readLong();
- if( marshaller!=null ) {
+ if( marshaller!=null && type!=FREE_TYPE ) {
data = marshaller.readPayload(is);
+ } else {
+ data = null;
}
}
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=687919&r1=687918&r2=687919&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java Thu Aug 21 18:16:06 2008
@@ -16,8 +16,6 @@
*/
package org.apache.kahadb.page;
-import com.sun.tools.javac.tree.Tree.TopLevel;
-
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
@@ -30,12 +28,13 @@
import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
-import java.util.LinkedList;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Properties;
+import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -44,10 +43,9 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kahadb.Marshaller;
-import org.apache.kahadb.util.ByteSequence;
+import org.apache.kahadb.page.Chunk.ChunkMarshaller;
import org.apache.kahadb.util.DataByteArrayInputStream;
import org.apache.kahadb.util.DataByteArrayOutputStream;
-import org.apache.kahadb.util.IOExceptionSupport;
import org.apache.kahadb.util.IOHelper;
import org.apache.kahadb.util.IntrospectionSupport;
import org.apache.kahadb.util.LRUCache;
@@ -93,12 +91,14 @@
private AtomicBoolean loaded = new AtomicBoolean();
private LRUCache<Long, Page> pageCache;
- private boolean enableRecoveryBuffer=true;
- private boolean enableSyncedWrites=true;
- private boolean enablePageCaching=false;//this is off by default - see AMQ-1667
+ private boolean enableRecoveryBuffer=false;
+ private boolean enableSyncedWrites=false;
+ private boolean enablePageCaching=true;
+ private boolean enableAsyncWrites=false;
+
private int pageCacheSize = 10;
- private LinkedHashMap<Long, PageWrite> writes=new LinkedHashMap<Long, PageWrite>();
+ private TreeMap<Long, PageWrite> writes=new TreeMap<Long, PageWrite>();
private Thread writerThread;
AtomicBoolean stopWriter = new AtomicBoolean();
private CountDownLatch checkpointLatch;
@@ -168,16 +168,16 @@
* Internally used by the double write buffer implementation used in this class.
*/
private class PageWrite {
- final Page page;
+ Page page;
byte[] current;
byte[] diskBound;
public PageWrite(Page page, byte[] data) {
- this.page = page;
- this.current = data;
+ setCurrent(page, data);
}
- public void setCurrent(byte[] data) {
+ public void setCurrent(Page page, byte[] data) {
+ this.page=page;
current=data;
}
@@ -193,6 +193,11 @@
diskBound=null;
return current == null;
}
+
+ @Override
+ public String toString() {
+ return "PageWrite{pageId="+page.getPageId()+"}";
+ }
}
@@ -387,22 +392,23 @@
if( !loaded.get() ) {
throw new IllegalStateException("Cannot allocate a page when the page file is not loaded");
}
-
+
Page page = null;
- if(!freeList.isEmpty()) {
- long pageId = freeList.removeFirst();
- page = new Page();
- page.setPageId(pageId);
- page.setType(Page.FREE_TYPE);
- } else {
- // allocate one
+
+ // We may need to create a new free page...
+ if(freeList.isEmpty()) {
page = new Page();
page.setPageId(nextFreePageId);
page.setType(Page.FREE_TYPE);
nextFreePageId ++;
+// LOG.debug("allocated: "+page.getPageId());
write(page, null);
}
- addToCache(page);
+
+ long pageId = freeList.removeFirst();
+ page = new Page();
+ page.setPageId(pageId);
+ page.setType(Page.FREE_TYPE);
return page;
}
@@ -423,28 +429,30 @@
Page page = null;
Sequence seq = freeList.removeFirstSequence(count);
- if(seq!=null) {
- page = new Page();
- page.setPageId(seq.getFirst());
- page.setType(Page.FREE_TYPE);
- } else {
+ if(seq==null) {
- // allocate the pages..
- Page t = new Page();
- while( count > 0 ) {
- t.setPageId(nextFreePageId);
- t.setType(Page.FREE_TYPE);
+ // We may need to create a new free page...
+ page = new Page();
+ int c=count;
+ while( c > 0 ) {
+ page.setPageId(nextFreePageId);
+ page.setType(Page.FREE_TYPE);
nextFreePageId ++;
- write(t, null);
- count--;
-
+// LOG.debug("allocate writing: "+page.getPageId());
+ write(page, null);
+ c--;
if( page == null ) {
- page = t;
+ page = page;
}
}
+ seq = freeList.removeFirstSequence(count);
}
- addToCache(page);
+
+ page = new Page();
+ page.setPageId(seq.getFirst());
+ page.setType(Page.FREE_TYPE);
+// LOG.debug("allocated: "+page.getPageId());
return page;
}
@@ -458,13 +466,63 @@
* if the PageFile is not loaded
*/
public void free(Page page) throws IOException {
+ page.setType(Page.FREE_TYPE);
+ free(page.getPageId(), 1);
+ }
+
+ /**
+ * Frees up a previously allocated page so that it can be re-allocated again.
+ *
+ * @param page the page to free up
+ * @throws IOException
+ * If an disk error occurred.
+ * @throws IllegalStateException
+ * if the PageFile is not loaded
+ */
+ public void free(long pageId) throws IOException {
+ free(pageId, 1);
+ }
+
+ /**
+ * Frees up a previously allocated sequence of pages so that it can be re-allocated again.
+ *
+ * @param page the initial page of the sequence that will be getting freed
+ * @param count the number of pages in the sequence
+ *
+ * @throws IOException
+ * If an disk error occurred.
+ * @throws IllegalStateException
+ * if the PageFile is not loaded
+ */
+ public void free(Page page, int count) throws IOException {
+ page.setType(Page.FREE_TYPE);
+ free(page.getPageId(), count);
+ }
+
+ /**
+ * Frees up a previously allocated sequence of pages so that it can be re-allocated again.
+ *
+ * @param page the initial page of the sequence that will be getting freed
+ * @param count the number of pages in the sequence
+ *
+ * @throws IOException
+ * If an disk error occurred.
+ * @throws IllegalStateException
+ * if the PageFile is not loaded
+ */
+ public void free(long pageId, int count) throws IOException {
if( !loaded.get() ) {
throw new IllegalStateException("Cannot free a page when the page file is not loaded");
}
- removeFromCache(page);
- page.setType(Page.FREE_TYPE);
- write(page, null);
- freeList.add(page.getPageId());
+
+ Page page = new Page();
+ long initialId = pageId;
+ for (int i = 0; i < count; i++) {
+ page.setPageId(initialId+i);
+ page.setType(Page.FREE_TYPE);
+// LOG.debug("free: "+page.getPageId());
+ write(page, null);
+ }
}
/**
@@ -506,7 +564,7 @@
// Can't load invalid offsets...
if (page.getPageId() < 0) {
- page.setTxId(Page.INVALID_TYPE);
+ page.setType(Page.INVALID_TYPE);
return;
}
@@ -523,6 +581,7 @@
dataIn.restart(readBuffer);
// Unmarshall it.
+// LOG.debug("load: "+page.getPageId());
page.read(dataIn, marshaller);
// Cache it.
@@ -620,6 +679,26 @@
};
}
+ /**
+ * Updates multiple pages in a single unit of work.
+ *
+ * @param pages
+ * the pages to write. The Pages object must be fully populated with a valid pageId, type, and data.
+ * @param marshaller
+ * the marshaler to use to load the data portion of the Page, may be null if you do not wish to write the data.
+ * @throws IOException
+ * If an disk error occurred.
+ * @throws IllegalStateException
+ * if the PageFile is not loaded
+ */
+ public void write(Collection<Page> pages, ChunkMarshaller marshaller) throws IOException {
+ // TODO: Need to update double buffer impl so that it handles a collection of writes. As is right now,
+ // the pages in this write may be split across multiple write batches which means that they
+ // will not get applied as a unit of work.
+ for (Page page : pages) {
+ write(page, marshaller);
+ }
+ }
/**
*
@@ -637,7 +716,7 @@
if( !loaded.get() ) {
throw new IllegalStateException("Cannot wriate a page when the page file is not loaded");
}
-
+
DataByteArrayOutputStream dataOut = new DataByteArrayOutputStream(pageSize);
page.setTxId(nextTxid.get());
page.write(dataOut, marshaller);
@@ -645,9 +724,12 @@
throw new IOException("Page Size overflow: pageSize is " + pageSize + " trying to write " + dataOut.size());
}
+ page = page.copy();
Long key = page.getPageId();
+ addToCache(page);
- LOG.debug("Page write request for offset: "+page.getPageId());
+// LOG.debug("write: "+page.getPageId());
+
synchronized( writes ) {
// If it's not in the write cache...
PageWrite write = writes.get(key);
@@ -655,14 +737,26 @@
write = new PageWrite(page, dataOut.getData());
writes.put(key, write);
} else {
- write.setCurrent(dataOut.getData());
+ write.setCurrent(page, dataOut.getData());
}
// Once we start approaching capacity, notify the writer to start writing
if( canStartWriteBatch() ) {
- writes.notify();
+ if( enableAsyncWrites ) {
+ writes.notify();
+ } else {
+ while( canStartWriteBatch() ) {
+ writeBatch(-1, TimeUnit.MILLISECONDS);
+ }
+ }
}
}
+
+ if( page.getType() == Page.FREE_TYPE ) {
+ removeFromCache(page);
+ freeList.add(page.getPageId());
+ }
+
}
/**
@@ -675,7 +769,7 @@
*/
public long checkpoint() throws IOException {
- if( stopWriter.get() ) {
+ if( enableAsyncWrites && stopWriter.get() ) {
throw new IOException("Page file already stopped: checkpointing is not allowed");
}
@@ -689,7 +783,13 @@
this.checkpointLatch = new CountDownLatch(1);
}
checkpointLatch = this.checkpointLatch;
- writes.notify();
+ if( enableAsyncWrites ) {
+ writes.notify();
+ } else {
+ while( !writes.isEmpty() ) {
+ writeBatch(-1, TimeUnit.MILLISECONDS);
+ }
+ }
}
try {
checkpointLatch.await();
@@ -711,11 +811,14 @@
private boolean canStartWriteBatch() {
int capacityUsed = ((writes.size() * 100)/MAX_PAGES_IN_RECOVERY_BUFFER);
- // The constant 10 here controls how soon write batches start going to disk..
- // would be nice to figure out how to auto tune that value. Make to small and
- // we reduce through put because we are locking the write mutex too offen doing writes
-
- return capacityUsed >= 10 || checkpointLatch!=null;
+ if( enableAsyncWrites ) {
+ // The constant 10 here controls how soon write batches start going to disk..
+ // would be nice to figure out how to auto tune that value. Make to small and
+ // we reduce through put because we are locking the write mutex too offen doing writes
+ return capacityUsed >= 10 || checkpointLatch!=null;
+ } else {
+ return capacityUsed >= 80 || checkpointLatch!=null;
+ }
}
@@ -727,14 +830,14 @@
* @throws InterruptedException
* @throws IOException
*/
- private boolean doWrites(long timeout, TimeUnit unit) throws IOException {
+ private boolean writeBatch(long timeout, TimeUnit unit) throws IOException {
int batchLength=8+4; // Account for the: lastTxid + recovery record counter
ArrayList<PageWrite> batch = new ArrayList<PageWrite>(MAX_PAGES_IN_RECOVERY_BUFFER);
synchronized( writes ) {
// If there is not enough to write, wait for a notification...
- if( !canStartWriteBatch() ) {
+ if( !canStartWriteBatch() && timeout>=0 ) {
releaseCheckpointWaiter();
try {
writes.wait(unit.toMillis(timeout));
@@ -800,9 +903,9 @@
// Sync again
if( enableSyncedWrites ) {
writeFile.getFD().sync();
- LOG.debug("Page write complete tx: "+txId+", pages: "+pageOffsets);
}
+// LOG.debug("write done: "+txId+", pages: "+pageOffsets);
nextTxid.incrementAndGet();
synchronized( writes ) {
@@ -897,27 +1000,35 @@
}
private void startWriter() {
- stopWriter.set(false);
- writerThread = new Thread("Page Writer") {
- @Override
- public void run() {
- try {
- while( !stopWriter.get() ) {
- doWrites(1000, TimeUnit.MILLISECONDS);
+ synchronized( writes ) {
+ if( enableAsyncWrites ) {
+ stopWriter.set(false);
+ writerThread = new Thread("Page Writer") {
+ @Override
+ public void run() {
+ try {
+ while( !stopWriter.get() ) {
+ writeBatch(1000, TimeUnit.MILLISECONDS);
+ }
+ } catch (Throwable e) {
+ e.printStackTrace();
+ } finally {
+ releaseCheckpointWaiter();
+ }
}
- } catch (IOException e) {
- e.printStackTrace();
- } finally {
- releaseCheckpointWaiter();
- }
+ };
+ writerThread.start();
}
- };
- writerThread.start();
+ }
}
private void stopWriter() throws InterruptedException {
- stopWriter.set(true);
- writerThread.join();
+ synchronized( writes ) {
+ if( enableAsyncWrites ) {
+ stopWriter.set(true);
+ writerThread.join();
+ }
+ }
}
///////////////////////////////////////////////////////////////////
@@ -942,21 +1053,17 @@
///////////////////////////////////////////////////////////////////
private Page getFromCache(long pageId) {
+ synchronized(writes) {
+ PageWrite pageWrite = writes.get(pageId);
+ if( pageWrite != null ) {
+ return pageWrite.page;
+ }
+ }
+
Page result = null;
if (enablePageCaching) {
result = pageCache.get(pageId);
}
- if( result == null ) {
- synchronized(writes) {
- PageWrite pageWrite = writes.get(pageId);
- if( pageWrite != null ) {
- result = pageWrite.page;
- }
- }
- if (enablePageCaching) {
- pageCache.put(pageId, result);
- }
- }
return result;
}
@@ -1109,6 +1216,13 @@
}
/**
+ * @return the amount of content data that a page can hold.
+ */
+ public int getPageContentSize() {
+ return this.pageSize-Page.PAGE_HEADER_SIZE;
+ }
+
+ /**
* Configures the page size used by the page file. By default it is 4k. Once a page file is created on disk,
* subsequent loads of that file will use the original pageSize. Once the PageFile is loaded, this setting
* can no longer be changed.
@@ -1151,5 +1265,13 @@
public void setPageCacheSize(int pageCacheSize) {
this.pageCacheSize = pageCacheSize;
}
+
+ public boolean isEnableAsyncWrites() {
+ return enableAsyncWrites;
+ }
+
+ public void setEnableAsyncWrites(boolean enableAsyncWrites) {
+ this.enableAsyncWrites = enableAsyncWrites;
+ }
}
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/ByteSequence.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/ByteSequence.java?rev=687919&r1=687918&r2=687919&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/ByteSequence.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/ByteSequence.java Thu Aug 21 18:16:06 2008
@@ -23,6 +23,9 @@
public int offset;
public int length;
+ public ByteSequence() {
+ }
+
public ByteSequence(byte data[]) {
this.data = data;
this.offset = 0;
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/LinkedNodeList.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/LinkedNodeList.java?rev=687919&r1=687918&r2=687919&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/LinkedNodeList.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/LinkedNodeList.java Thu Aug 21 18:16:06 2008
@@ -100,4 +100,22 @@
return size;
}
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("[");
+ boolean first=true;
+ T cur = getHead();
+ while( cur!=null ) {
+ if( !first ) {
+ sb.append(", ");
+ }
+ sb.append(cur);
+ first=false;
+ cur = cur.getNext();
+ }
+ sb.append("]");
+ return sb.toString();
+ }
+
}
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java?rev=687919&r1=687918&r2=687919&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java Thu Aug 21 18:16:06 2008
@@ -254,4 +254,9 @@
public void clear() {
sequences = new LinkedNodeList<Sequence>();
}
+
+ @Override
+ public String toString() {
+ return sequences.toString();
+ }
}
\ No newline at end of file
Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/ChunkTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/ChunkTest.java?rev=687919&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/ChunkTest.java (added)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/ChunkTest.java Thu Aug 21 18:16:06 2008
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.page;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+
+import junit.framework.TestCase;
+import org.apache.kahadb.StringMarshaller;
+import org.apache.kahadb.page.Chunk.PageInputStream;
+import org.apache.kahadb.page.Chunk.PageOutputStream;
+
+public class ChunkTest extends TestCase {
+
+ static final short TEST_TYPE = 65;
+
+ public void testChunkStreams() throws IOException {
+
+ PageFile pf = new PageFile(new File("target/test-data"), getName());
+ pf.delete();
+ pf.load();
+
+ long id = pf.allocate().getPageId();
+
+ PageOutputStream pos = new Chunk.PageOutputStream(pf, id);
+ DataOutputStream os = new DataOutputStream(pos);
+ for( int i=0; i < 10000; i++) {
+ os.writeUTF("Test string:"+i);
+ }
+
+ os.close();
+ System.out.println("Chuncks used: "+pos.getPageCount());
+
+ // Reload the page file.
+ pf.unload();
+ pf.load();
+
+ PageInputStream pis = new PageInputStream(pf, id);
+ DataInputStream is = new DataInputStream(pis);
+ for( int i=0; i < 10000; i++) {
+ assertEquals("Test string:"+i, is.readUTF());
+ }
+ assertEquals(-1, is.read());
+ is.close();
+
+ System.out.println("Chuncks used: "+pis.getPageCount());
+
+ pf.unload();
+ }
+
+}
Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashIndexBenchMark.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashIndexBenchMark.java?rev=687919&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashIndexBenchMark.java (added)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashIndexBenchMark.java Thu Aug 21 18:16:06 2008
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.page;
+
+import java.io.File;
+
+import org.apache.kahadb.Store;
+import org.apache.kahadb.impl.index.Index;
+import org.apache.kahadb.impl.index.IndexBenchmark;
+
+public class HashIndexBenchMark extends IndexBenchmark {
+
+ @Override
+ protected Index createIndex(File root, String name) throws Exception {
+
+ PageFile pf = new PageFile(root, name);
+ pf.load();
+ HashIndex index = new HashIndex(indexManager, pf,pf.allocate().getPageId());
+ index.setKeyMarshaller(Store.STRING_MARSHALLER);
+
+// index.setEnableRecoveryBuffer(false);
+// index.setEnableSyncedWrites(false);
+ return index;
+ }
+
+}
Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashTest.java?rev=687919&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashTest.java (added)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashTest.java Thu Aug 21 18:16:06 2008
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.page;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+import junit.framework.TestCase;
+import org.apache.kahadb.Store;
+import org.apache.kahadb.impl.index.IndexItem;
+import org.apache.kahadb.impl.index.IndexManager;
+import org.apache.kahadb.util.IOHelper;
+
+/**
+ * Test a HashIndex
+ */
+public class HashTest extends TestCase {
+
+ private static final int COUNT = 10000;
+
+ private HashIndex hashIndex;
+ private File directory;
+ private IndexManager indexManager;
+ private PageFile pf;
+
+ /**
+ * @throws java.lang.Exception
+ * @see junit.framework.TestCase#setUp()
+ */
+ protected void setUp() throws Exception {
+ super.setUp();
+ directory = new File(IOHelper.getDefaultDataDirectory());
+ IOHelper.mkdirs(directory);
+ IOHelper.deleteChildren(directory);
+
+ pf = new PageFile(directory, "im-hash-test");
+ pf.load();
+ indexManager = new IndexManager(directory, "im-hash-test", "rw", null, new AtomicLong());
+
+ this.hashIndex = new HashIndex(indexManager, pf, pf.allocate().getPageId());
+ this.hashIndex.setBinCapacity(12);
+ this.hashIndex.setKeyMarshaller(Store.STRING_MARSHALLER);
+ }
+
+ public void testHashIndex() throws Exception {
+ String keyRoot = "key:";
+ this.hashIndex.load();
+ doInsert(keyRoot);
+ this.hashIndex.unload();
+ this.hashIndex.load();
+ checkRetrieve(keyRoot);
+ doRemove(keyRoot);
+ this.hashIndex.unload();
+ this.hashIndex.load();
+ doInsert(keyRoot);
+ doRemoveHalf(keyRoot);
+ doInsertHalf(keyRoot);
+ this.hashIndex.unload();
+ this.hashIndex.load();
+ checkRetrieve(keyRoot);
+ this.hashIndex.unload();
+ }
+
+ void doInsert(String keyRoot) throws Exception {
+ for (int i = 0; i < COUNT; i++) {
+ IndexItem value = indexManager.createNewIndex();
+ indexManager.storeIndex(value);
+ hashIndex.store(keyRoot + i, value);
+ }
+ }
+
+ void checkRetrieve(String keyRoot) throws IOException {
+ for (int i = 0; i < COUNT; i++) {
+ IndexItem item = (IndexItem) hashIndex.get(keyRoot + i);
+ assertNotNull("Key missing: "+keyRoot + i, item);
+ }
+ }
+
+ void doRemoveHalf(String keyRoot) throws Exception {
+ for (int i = 0; i < COUNT; i++) {
+ if (i % 2 == 0) {
+ hashIndex.remove(keyRoot + i);
+ }
+
+ }
+ }
+
+ void doInsertHalf(String keyRoot) throws Exception {
+ for (int i = 0; i < COUNT; i++) {
+ if (i % 2 == 0) {
+ IndexItem value = indexManager.createNewIndex();
+ indexManager.storeIndex(value);
+ hashIndex.store(keyRoot + i, value);
+ }
+ }
+ }
+
+ void doRemove(String keyRoot) throws Exception {
+ for (int i = 0; i < COUNT; i++) {
+ hashIndex.remove(keyRoot + i);
+ }
+ for (int i = 0; i < COUNT; i++) {
+ IndexItem item = (IndexItem) hashIndex.get(keyRoot + i);
+ assertNull(item);
+ }
+ }
+
+ void doRemoveBackwards(String keyRoot) throws Exception {
+ for (int i = COUNT - 1; i >= 0; i--) {
+ hashIndex.remove(keyRoot + i);
+ }
+ for (int i = 0; i < COUNT; i++) {
+ IndexItem item = (IndexItem) hashIndex.get(keyRoot + i);
+ assertNull(item);
+ }
+ }
+
+ /**
+ * @throws java.lang.Exception
+ * @see junit.framework.TestCase#tearDown()
+ */
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ pf.unload();
+ }
+}