You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/09/05 00:46:44 UTC
svn commit: r692288 [2/3] - in /activemq/sandbox/kahadb/src:
main/java/org/apache/kahadb/ main/java/org/apache/kahadb/impl/
main/java/org/apache/kahadb/impl/async/
main/java/org/apache/kahadb/impl/container/
main/java/org/apache/kahadb/impl/data/ main/...
Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java?rev=692288&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java Thu Sep 4 15:46:42 2008
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.journal;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Map;
+
+import org.apache.kahadb.journal.DataFileAppender.WriteCommand;
+import org.apache.kahadb.journal.DataFileAppender.WriteKey;
+import org.apache.kahadb.util.ByteSequence;
+
+/**
+ * Optimized Store reader and updater. Single threaded and synchronous. Use in
+ * conjunction with the DataFileAccessorPool of concurrent use.
+ *
+ * @version $Revision: 1.1.1.1 $
+ */
+final class DataFileAccessor {
+
+ private final DataFile dataFile;
+ private final Map<WriteKey, WriteCommand> inflightWrites;
+ private final RandomAccessFile file;
+ private boolean disposed;
+
+ /**
+ * Construct a Store reader
+ *
+ * @param fileId
+ * @throws IOException
+ */
+ public DataFileAccessor(Journal dataManager, DataFile dataFile) throws IOException {
+ this.dataFile = dataFile;
+ this.inflightWrites = dataManager.getInflightWrites();
+ this.file = dataFile.openRandomAccessFile(false);
+ }
+
+ public DataFile getDataFile() {
+ return dataFile;
+ }
+
+ public void dispose() {
+ if (disposed) {
+ return;
+ }
+ disposed = true;
+ try {
+ dataFile.closeRandomAccessFile(file);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public ByteSequence readRecord(Location location) throws IOException {
+
+ if (!location.isValid()) {
+ throw new IOException("Invalid location: " + location);
+ }
+
+ WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new WriteKey(location));
+ if (asyncWrite != null) {
+ return asyncWrite.data;
+ }
+
+ try {
+
+ if (location.getSize() == Location.NOT_SET) {
+ file.seek(location.getOffset());
+ location.setSize(file.readInt());
+ file.seek(location.getOffset() + Journal.ITEM_HEAD_SPACE);
+ } else {
+ file.seek(location.getOffset() + Journal.ITEM_HEAD_SPACE);
+ }
+
+ byte[] data = new byte[location.getSize() - Journal.ITEM_HEAD_FOOT_SPACE];
+ file.readFully(data);
+ return new ByteSequence(data, 0, data.length);
+
+ } catch (RuntimeException e) {
+ throw new IOException("Invalid location: " + location + ", : " + e);
+ }
+ }
+
+ public void readLocationDetails(Location location) throws IOException {
+ WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new WriteKey(location));
+ if (asyncWrite != null) {
+ location.setSize(asyncWrite.location.getSize());
+ location.setType(asyncWrite.location.getType());
+ } else {
+ file.seek(location.getOffset());
+ location.setSize(file.readInt());
+ location.setType(file.readByte());
+ }
+ }
+
+ public boolean readLocationDetailsAndValidate(Location location) {
+ try {
+ WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new WriteKey(location));
+ if (asyncWrite != null) {
+ location.setSize(asyncWrite.location.getSize());
+ location.setType(asyncWrite.location.getType());
+ } else {
+ file.seek(location.getOffset());
+ location.setSize(file.readInt());
+ location.setType(file.readByte());
+
+ byte data[] = new byte[3];
+ file.seek(location.getOffset() + Journal.ITEM_HEAD_OFFSET_TO_SOR);
+ file.readFully(data);
+ if (data[0] != Journal.ITEM_HEAD_SOR[0]
+ || data[1] != Journal.ITEM_HEAD_SOR[1]
+ || data[2] != Journal.ITEM_HEAD_SOR[2]) {
+ return false;
+ }
+ file.seek(location.getOffset() + location.getSize() - Journal.ITEM_FOOT_SPACE);
+ file.readFully(data);
+ if (data[0] != Journal.ITEM_HEAD_EOR[0]
+ || data[1] != Journal.ITEM_HEAD_EOR[1]
+ || data[2] != Journal.ITEM_HEAD_EOR[2]) {
+ return false;
+ }
+ }
+ } catch (IOException e) {
+ return false;
+ }
+ return true;
+ }
+
+ public void updateRecord(Location location, ByteSequence data, boolean sync) throws IOException {
+
+ file.seek(location.getOffset() + Journal.ITEM_HEAD_SPACE);
+ int size = Math.min(data.getLength(), location.getSize());
+ file.write(data.getData(), data.getOffset(), size);
+ if (sync) {
+ file.getFD().sync();
+ }
+
+ }
+
+}
Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessorPool.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessorPool.java?rev=692288&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessorPool.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessorPool.java Thu Sep 4 15:46:42 2008
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.journal;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Used to pool DataFileAccessors.
+ *
+ * @author chirino
+ */
+public class DataFileAccessorPool {
+
+ private final Journal dataManager;
+ private final Map<Integer, Pool> pools = new HashMap<Integer, Pool>();
+ private boolean closed;
+ private int maxOpenReadersPerFile = 5;
+
+ class Pool {
+
+ private final DataFile file;
+ private final List<DataFileAccessor> pool = new ArrayList<DataFileAccessor>();
+ private boolean used;
+ private int openCounter;
+ private boolean disposed;
+
+ public Pool(DataFile file) {
+ this.file = file;
+ }
+
+ public DataFileAccessor openDataFileReader() throws IOException {
+ DataFileAccessor rc = null;
+ if (pool.isEmpty()) {
+ rc = new DataFileAccessor(dataManager, file);
+ } else {
+ rc = (DataFileAccessor)pool.remove(pool.size() - 1);
+ }
+ used = true;
+ openCounter++;
+ return rc;
+ }
+
+ public synchronized void closeDataFileReader(DataFileAccessor reader) {
+ openCounter--;
+ if (pool.size() >= maxOpenReadersPerFile || disposed) {
+ reader.dispose();
+ } else {
+ pool.add(reader);
+ }
+ }
+
+ public synchronized void clearUsedMark() {
+ used = false;
+ }
+
+ public synchronized boolean isUsed() {
+ return used;
+ }
+
+ public synchronized void dispose() {
+ for (DataFileAccessor reader : pool) {
+ reader.dispose();
+ }
+ pool.clear();
+ disposed = true;
+ }
+
+ public synchronized int getOpenCounter() {
+ return openCounter;
+ }
+
+ }
+
+ public DataFileAccessorPool(Journal dataManager) {
+ this.dataManager = dataManager;
+ }
+
+ synchronized void clearUsedMark() {
+ for (Iterator iter = pools.values().iterator(); iter.hasNext();) {
+ Pool pool = (Pool)iter.next();
+ pool.clearUsedMark();
+ }
+ }
+
+ synchronized void disposeUnused() {
+ for (Iterator<Pool> iter = pools.values().iterator(); iter.hasNext();) {
+ Pool pool = iter.next();
+ if (!pool.isUsed()) {
+ pool.dispose();
+ iter.remove();
+ }
+ }
+ }
+
+ synchronized void disposeDataFileAccessors(DataFile dataFile) {
+ if (closed) {
+ throw new IllegalStateException("Closed.");
+ }
+ Pool pool = pools.get(dataFile.getDataFileId());
+ if (pool != null) {
+ if (pool.getOpenCounter() == 0) {
+ pool.dispose();
+ pools.remove(dataFile.getDataFileId());
+ } else {
+ throw new IllegalStateException("The data file is still in use: " + dataFile + ", use count: " + pool.getOpenCounter());
+ }
+ }
+ }
+
+ synchronized DataFileAccessor openDataFileAccessor(DataFile dataFile) throws IOException {
+ if (closed) {
+ throw new IOException("Closed.");
+ }
+
+ Pool pool = pools.get(dataFile.getDataFileId());
+ if (pool == null) {
+ pool = new Pool(dataFile);
+ pools.put(dataFile.getDataFileId(), pool);
+ }
+ return pool.openDataFileReader();
+ }
+
+ synchronized void closeDataFileAccessor(DataFileAccessor reader) {
+ Pool pool = pools.get(reader.getDataFile().getDataFileId());
+ if (pool == null || closed) {
+ reader.dispose();
+ } else {
+ pool.closeDataFileReader(reader);
+ }
+ }
+
+ public synchronized void close() {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ for (Iterator<Pool> iter = pools.values().iterator(); iter.hasNext();) {
+ Pool pool = iter.next();
+ pool.dispose();
+ }
+ pools.clear();
+ }
+
+}
Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java?rev=692288&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java Thu Sep 4 15:46:42 2008
@@ -0,0 +1,423 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.journal;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.RandomAccessFile;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.kahadb.util.ByteSequence;
+import org.apache.kahadb.util.DataByteArrayOutputStream;
+import org.apache.kahadb.util.LinkedNode;
+
+/**
+ * An optimized writer to do batch appends to a data file. This object is thread
+ * safe and gains throughput as you increase the number of concurrent writes it
+ * does.
+ *
+ * @version $Revision: 1.1.1.1 $
+ */
+class DataFileAppender {
+
+ protected static final byte[] RESERVED_SPACE = new byte[Journal.ITEM_HEAD_RESERVED_SPACE];
+ protected static final int DEFAULT_MAX_BATCH_SIZE = 1024 * 1024 * 4;
+
+ protected final Journal dataManager;
+ protected final Map<WriteKey, WriteCommand> inflightWrites;
+ protected final Object enqueueMutex = new Object(){};
+ protected WriteBatch nextWriteBatch;
+
+ protected boolean shutdown;
+ protected IOException firstAsyncException;
+ protected final CountDownLatch shutdownDone = new CountDownLatch(1);
+ protected int maxWriteBatchSize = DEFAULT_MAX_BATCH_SIZE;
+
+ private boolean running;
+ private Thread thread;
+
+ public static class WriteKey {
+ private final int file;
+ private final long offset;
+ private final int hash;
+
+ public WriteKey(Location item) {
+ file = item.getDataFileId();
+ offset = item.getOffset();
+ // TODO: see if we can build a better hash
+ hash = (int)(file ^ offset);
+ }
+
+ public int hashCode() {
+ return hash;
+ }
+
+ public boolean equals(Object obj) {
+ if (obj instanceof WriteKey) {
+ WriteKey di = (WriteKey)obj;
+ return di.file == file && di.offset == offset;
+ }
+ return false;
+ }
+ }
+
+ public class WriteBatch {
+
+ public final DataFile dataFile;
+ public final WriteCommand first;
+ public final CountDownLatch latch = new CountDownLatch(1);
+ public int size;
+
+ public WriteBatch(DataFile dataFile, WriteCommand write) throws IOException {
+ this.dataFile = dataFile;
+ this.first = write;
+ size += write.location.getSize();
+ }
+
+ public boolean canAppend(DataFile dataFile, WriteCommand write) {
+ if (dataFile != this.dataFile) {
+ return false;
+ }
+ if (size + write.location.getSize() >= maxWriteBatchSize) {
+ return false;
+ }
+ return true;
+ }
+
+ public void append(WriteCommand write) throws IOException {
+ this.first.getTailNode().linkAfter(write);
+ size += write.location.getSize();
+ }
+ }
+
+ public static class WriteCommand extends LinkedNode {
+ public final Location location;
+ public final ByteSequence data;
+ final boolean sync;
+ public final Runnable onComplete;
+
+ public WriteCommand(Location location, ByteSequence data, boolean sync) {
+ this.location = location;
+ this.data = data;
+ this.sync = sync;
+ this.onComplete=null;
+ }
+
+ public WriteCommand(Location location, ByteSequence data, Runnable onComplete) {
+ this.location = location;
+ this.data = data;
+ this.onComplete = onComplete;
+ this.sync = false;
+ }
+ }
+
+
+ /**
+ * Construct a Store writer
+ *
+ * @param fileId
+ */
+ public DataFileAppender(Journal dataManager) {
+ this.dataManager = dataManager;
+ this.inflightWrites = this.dataManager.getInflightWrites();
+ }
+
+ /**
+ * @param type
+ * @param marshaller
+ * @param payload
+ * @param type
+ * @param sync
+ * @return
+ * @throws IOException
+ * @throws
+ * @throws
+ */
+ public Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException {
+
+ // Write the packet our internal buffer.
+ int size = data.getLength() + Journal.ITEM_HEAD_FOOT_SPACE;
+
+ final Location location = new Location();
+ location.setSize(size);
+ location.setType(type);
+
+ WriteBatch batch;
+ WriteCommand write = new WriteCommand(location, data, sync);
+
+ // Locate datafile and enqueue into the executor in sychronized block so
+ // that writes get equeued onto the executor in order that they were assigned
+ // by the data manager (which is basically just appending)
+
+ synchronized (this) {
+ // Find the position where this item will land at.
+ DataFile dataFile = dataManager.allocateLocation(location);
+ if( !sync ) {
+ inflightWrites.put(new WriteKey(location), write);
+ }
+ batch = enqueue(dataFile, write);
+ }
+ location.setLatch(batch.latch);
+ if (sync) {
+ try {
+ batch.latch.await();
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException();
+ }
+ }
+
+ return location;
+ }
+
+ public Location storeItem(ByteSequence data, byte type, Runnable onComplete) throws IOException {
+ // Write the packet our internal buffer.
+ int size = data.getLength() + Journal.ITEM_HEAD_FOOT_SPACE;
+
+ final Location location = new Location();
+ location.setSize(size);
+ location.setType(type);
+
+ WriteBatch batch;
+ WriteCommand write = new WriteCommand(location, data, onComplete);
+
+ // Locate datafile and enqueue into the executor in sychronized block so
+ // that writes get equeued onto the executor in order that they were assigned
+ // by the data manager (which is basically just appending)
+
+ synchronized (this) {
+ // Find the position where this item will land at.
+ DataFile dataFile = dataManager.allocateLocation(location);
+ inflightWrites.put(new WriteKey(location), write);
+ batch = enqueue(dataFile, write);
+ }
+ location.setLatch(batch.latch);
+
+ return location;
+ }
+
+ private WriteBatch enqueue(DataFile dataFile, WriteCommand write) throws IOException {
+ synchronized (enqueueMutex) {
+ WriteBatch rc = null;
+ if (shutdown) {
+ throw new IOException("Async Writter Thread Shutdown");
+ }
+ if (firstAsyncException != null) {
+ throw firstAsyncException;
+ }
+
+ if (!running) {
+ running = true;
+ thread = new Thread() {
+ public void run() {
+ processQueue();
+ }
+ };
+ thread.setPriority(Thread.MAX_PRIORITY);
+ thread.setDaemon(true);
+ thread.setName("ActiveMQ Data File Writer");
+ thread.start();
+ }
+
+ if (nextWriteBatch == null) {
+ nextWriteBatch = new WriteBatch(dataFile, write);
+ rc = nextWriteBatch;
+ enqueueMutex.notify();
+ } else {
+ // Append to current batch if possible..
+ if (nextWriteBatch.canAppend(dataFile, write)) {
+ nextWriteBatch.append(write);
+ rc = nextWriteBatch;
+ } else {
+ // Otherwise wait for the queuedCommand to be null
+ try {
+ while (nextWriteBatch != null) {
+ enqueueMutex.wait();
+ }
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException();
+ }
+ if (shutdown) {
+ throw new IOException("Async Writter Thread Shutdown");
+ }
+
+ // Start a new batch.
+ nextWriteBatch = new WriteBatch(dataFile, write);
+ rc = nextWriteBatch;
+ enqueueMutex.notify();
+ }
+ }
+ return rc;
+ }
+ }
+
+ public void close() throws IOException {
+ synchronized (enqueueMutex) {
+ if (!shutdown) {
+ shutdown = true;
+ if (running) {
+ enqueueMutex.notifyAll();
+ } else {
+ shutdownDone.countDown();
+ }
+ }
+ }
+
+ try {
+ shutdownDone.await();
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException();
+ }
+
+ }
+
+ /**
+ * The async processing loop that writes to the data files and does the
+ * force calls.
+ *
+ * Since the file sync() call is the slowest of all the operations, this
+ * algorithm tries to 'batch' or group together several file sync() requests
+ * into a single file sync() call. The batching is accomplished attaching
+ * the same CountDownLatch instance to every force request in a group.
+ *
+ */
+ protected void processQueue() {
+ DataFile dataFile = null;
+ RandomAccessFile file = null;
+ try {
+
+ DataByteArrayOutputStream buff = new DataByteArrayOutputStream(maxWriteBatchSize);
+ while (true) {
+
+ Object o = null;
+
+ // Block till we get a command.
+ synchronized (enqueueMutex) {
+ while (true) {
+ if (nextWriteBatch != null) {
+ o = nextWriteBatch;
+ nextWriteBatch = null;
+ break;
+ }
+ if (shutdown) {
+ return;
+ }
+ enqueueMutex.wait();
+ }
+ enqueueMutex.notify();
+ }
+
+ WriteBatch wb = (WriteBatch)o;
+ if (dataFile != wb.dataFile) {
+ if (file != null) {
+ dataFile.closeRandomAccessFile(file);
+ }
+ dataFile = wb.dataFile;
+ file = dataFile.openRandomAccessFile(true);
+ }
+
+ WriteCommand write = wb.first;
+
+ // Write all the data.
+ // Only need to seek to first location.. all others
+ // are in sequence.
+ file.seek(write.location.getOffset());
+
+
+ boolean forceToDisk=false;
+
+ //
+ // is it just 1 big write?
+ if (wb.size == write.location.getSize()) {
+ forceToDisk = write.sync | write.onComplete!=null;
+
+ // Just write it directly..
+ file.writeInt(write.location.getSize());
+ file.writeByte(write.location.getType());
+ file.write(RESERVED_SPACE);
+ file.write(Journal.ITEM_HEAD_SOR);
+ file.write(write.data.getData(), write.data.getOffset(), write.data.getLength());
+ file.write(Journal.ITEM_HEAD_EOR);
+
+ } else {
+
+ // Combine the smaller writes into 1 big buffer
+ while (write != null) {
+ forceToDisk |= write.sync | write.onComplete!=null;
+
+ buff.writeInt(write.location.getSize());
+ buff.writeByte(write.location.getType());
+ buff.write(RESERVED_SPACE);
+ buff.write(Journal.ITEM_HEAD_SOR);
+ buff.write(write.data.getData(), write.data.getOffset(), write.data.getLength());
+ buff.write(Journal.ITEM_HEAD_EOR);
+
+ write = (WriteCommand)write.getNext();
+ }
+
+ // Now do the 1 big write.
+ ByteSequence sequence = buff.toByteSequence();
+ file.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
+ buff.reset();
+ }
+
+ if( forceToDisk ) {
+ file.getFD().sync();
+ }
+
+ WriteCommand lastWrite = (WriteCommand)wb.first.getTailNode();
+ dataManager.setLastAppendLocation(lastWrite.location);
+
+ // Now that the data is on disk, remove the writes from the in
+ // flight
+ // cache.
+ write = wb.first;
+ while (write != null) {
+ if (!write.sync) {
+ inflightWrites.remove(new WriteKey(write.location));
+ }
+ if( write.onComplete !=null ) {
+ try {
+ write.onComplete.run();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ }
+ write = (WriteCommand)write.getNext();
+ }
+
+ // Signal any waiting threads that the write is on disk.
+ wb.latch.countDown();
+ }
+ } catch (IOException e) {
+ synchronized (enqueueMutex) {
+ firstAsyncException = e;
+ }
+ } catch (InterruptedException e) {
+ } finally {
+ try {
+ if (file != null) {
+ dataFile.closeRandomAccessFile(file);
+ }
+ } catch (Throwable ignore) {
+ }
+ shutdownDone.countDown();
+ }
+ }
+
+
+}
Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java?rev=692288&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java Thu Sep 4 15:46:42 2008
@@ -0,0 +1,753 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.journal;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.journal.DataFileAppender.WriteCommand;
+import org.apache.kahadb.journal.DataFileAppender.WriteKey;
+import org.apache.kahadb.util.ByteSequence;
+import org.apache.kahadb.util.IOHelper;
+import org.apache.kahadb.util.Scheduler;
+
+
+
+/**
+ * Manages DataFiles
+ *
+ * @version $Revision: 1.1.1.1 $
+ */
+public class Journal {
+
+ public static final int CONTROL_RECORD_MAX_LENGTH = 1024;
+ public static final int ITEM_HEAD_RESERVED_SPACE = 21;
+ // ITEM_HEAD_SPACE = length + type+ reserved space + SOR
+ public static final int ITEM_HEAD_SPACE = 4 + 1 + ITEM_HEAD_RESERVED_SPACE + 3;
+ public static final int ITEM_HEAD_OFFSET_TO_SOR = ITEM_HEAD_SPACE - 3;
+ public static final int ITEM_FOOT_SPACE = 3; // EOR
+
+ public static final int ITEM_HEAD_FOOT_SPACE = ITEM_HEAD_SPACE + ITEM_FOOT_SPACE;
+
+ public static final byte[] ITEM_HEAD_SOR = new byte[] {'S', 'O', 'R'}; //
+ public static final byte[] ITEM_HEAD_EOR = new byte[] {'E', 'O', 'R'}; //
+
+ public static final byte DATA_ITEM_TYPE = 1;
+ public static final byte REDO_ITEM_TYPE = 2;
+ public static final String DEFAULT_DIRECTORY = "data";
+ public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive";
+ public static final String DEFAULT_FILE_PREFIX = "data-";
+ public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32;
+ public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30;
+ public static final int PREFERED_DIFF = 1024 * 512;
+
+ private static final Log LOG = LogFactory.getLog(Journal.class);
+
+ protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>();
+
+ protected File directory = new File(DEFAULT_DIRECTORY);
+ protected File directoryArchive = new File (DEFAULT_ARCHIVE_DIRECTORY);
+ protected String filePrefix = DEFAULT_FILE_PREFIX;
+ protected ControlFile controlFile;
+ protected boolean started;
+ protected boolean useNio = true;
+
+ protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH;
+ protected int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - PREFERED_DIFF;
+
+ protected DataFileAppender appender;
+ protected DataFileAccessorPool accessorPool = new DataFileAccessorPool(this);
+
+ protected Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
+ protected Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>();
+ protected DataFile currentWriteFile;
+
+ protected Location mark;
+ protected final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>();
+ protected Runnable cleanupTask;
+ protected final AtomicLong storeSize;
+ protected boolean archiveDataLogs;
+
+ public Journal(AtomicLong storeSize) {
+ this.storeSize=storeSize;
+ }
+
+ public Journal() {
+ this(new AtomicLong());
+ }
+
+ @SuppressWarnings("unchecked")
+ public synchronized void start() throws IOException {
+ if (started) {
+ return;
+ }
+
+ started = true;
+ preferedFileLength=Math.max(PREFERED_DIFF, getMaxFileLength()-PREFERED_DIFF);
+ lock();
+
+ ByteSequence sequence = controlFile.load();
+ if (sequence != null && sequence.getLength() > 0) {
+ unmarshallState(sequence);
+ }
+ if (useNio) {
+ appender = new NIODataFileAppender(this);
+ } else {
+ appender = new DataFileAppender(this);
+ }
+
+ File[] files = directory.listFiles(new FilenameFilter() {
+ public boolean accept(File dir, String n) {
+ return dir.equals(directory) && n.startsWith(filePrefix);
+ }
+ });
+
+ if (files != null) {
+ for (int i = 0; i < files.length; i++) {
+ try {
+ File file = files[i];
+ String n = file.getName();
+ String numStr = n.substring(filePrefix.length(), n.length());
+ int num = Integer.parseInt(numStr);
+ DataFile dataFile = new DataFile(file, num, preferedFileLength);
+ fileMap.put(dataFile.getDataFileId(), dataFile);
+ storeSize.addAndGet(dataFile.getLength());
+ } catch (NumberFormatException e) {
+ // Ignore file that do not match the pattern.
+ }
+ }
+
+ // Sort the list so that we can link the DataFiles together in the
+ // right order.
+ List<DataFile> l = new ArrayList<DataFile>(fileMap.values());
+ Collections.sort(l);
+ currentWriteFile = null;
+ for (DataFile df : l) {
+ if (currentWriteFile != null) {
+ currentWriteFile.linkAfter(df);
+ }
+ currentWriteFile = df;
+ fileByFileMap.put(df.getFile(), df);
+ }
+ }
+
+ // Need to check the current Write File to see if there was a partial
+ // write to it.
+ if (currentWriteFile != null) {
+
+ // See if the lastSyncedLocation is valid..
+ Location l = lastAppendLocation.get();
+ if (l != null && l.getDataFileId() != currentWriteFile.getDataFileId().intValue()) {
+ l = null;
+ }
+
+ // If we know the last location that was ok.. then we can skip lots
+ // of checking
+ try{
+ l = recoveryCheck(currentWriteFile, l);
+ lastAppendLocation.set(l);
+ }catch(IOException e){
+ LOG.warn("recovery check failed", e);
+ }
+ }
+
+ storeState(false);
+
+ cleanupTask = new Runnable() {
+ public void run() {
+ cleanup();
+ }
+ };
+ Scheduler.executePeriodically(cleanupTask, DEFAULT_CLEANUP_INTERVAL);
+ }
+
+ public void lock() throws IOException {
+ synchronized (this) {
+ if (controlFile == null) {
+ IOHelper.mkdirs(directory);
+ controlFile = new ControlFile(new File(directory, filePrefix + "control"), CONTROL_RECORD_MAX_LENGTH);
+ }
+ controlFile.lock();
+ }
+ }
+
+ protected Location recoveryCheck(DataFile dataFile, Location location) throws IOException {
+ if (location == null) {
+ location = new Location();
+ location.setDataFileId(dataFile.getDataFileId());
+ location.setOffset(0);
+ }
+ DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
+ try {
+ reader.readLocationDetails(location);
+ while (reader.readLocationDetailsAndValidate(location)) {
+ location.setOffset(location.getOffset() + location.getSize());
+ }
+ } finally {
+ accessorPool.closeDataFileAccessor(reader);
+ }
+ dataFile.setLength(location.getOffset());
+ return location;
+ }
+
+ protected void unmarshallState(ByteSequence sequence) throws IOException {
+ ByteArrayInputStream bais = new ByteArrayInputStream(sequence.getData(), sequence.getOffset(), sequence.getLength());
+ DataInputStream dis = new DataInputStream(bais);
+ if (dis.readBoolean()) {
+ mark = new Location();
+ mark.readExternal(dis);
+ } else {
+ mark = null;
+ }
+ if (dis.readBoolean()) {
+ Location l = new Location();
+ l.readExternal(dis);
+ lastAppendLocation.set(l);
+ } else {
+ lastAppendLocation.set(null);
+ }
+ }
+
+ private synchronized ByteSequence marshallState() throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+
+ if (mark != null) {
+ dos.writeBoolean(true);
+ mark.writeExternal(dos);
+ } else {
+ dos.writeBoolean(false);
+ }
+ Location l = lastAppendLocation.get();
+ if (l != null) {
+ dos.writeBoolean(true);
+ l.writeExternal(dos);
+ } else {
+ dos.writeBoolean(false);
+ }
+
+ byte[] bs = baos.toByteArray();
+ return new ByteSequence(bs, 0, bs.length);
+ }
+
+ synchronized DataFile allocateLocation(Location location) throws IOException {
+ if (currentWriteFile == null || ((currentWriteFile.getLength() + location.getSize()) > maxFileLength)) {
+ int nextNum = currentWriteFile != null ? currentWriteFile.getDataFileId().intValue() + 1 : 1;
+
+ String fileName = filePrefix + nextNum;
+ File file = new File(directory, fileName);
+ DataFile nextWriteFile = new DataFile(file, nextNum, preferedFileLength);
+ //actually allocate the disk space
+ nextWriteFile.closeRandomAccessFile(nextWriteFile.openRandomAccessFile(true));
+ fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile);
+ fileByFileMap.put(file, nextWriteFile);
+ if (currentWriteFile != null) {
+ currentWriteFile.linkAfter(nextWriteFile);
+ if (currentWriteFile.isUnused()) {
+ removeDataFile(currentWriteFile);
+ }
+ }
+ currentWriteFile = nextWriteFile;
+
+ }
+ location.setOffset(currentWriteFile.getLength());
+ location.setDataFileId(currentWriteFile.getDataFileId().intValue());
+ int size = location.getSize();
+ currentWriteFile.incrementLength(size);
+ currentWriteFile.increment();
+ storeSize.addAndGet(size);
+ return currentWriteFile;
+ }
+
+ public synchronized void removeLocation(Location location) throws IOException{
+
+ DataFile dataFile = getDataFile(location);
+ dataFile.decrement();
+ }
+
+ synchronized DataFile getDataFile(Location item) throws IOException {
+ Integer key = Integer.valueOf(item.getDataFileId());
+ DataFile dataFile = fileMap.get(key);
+ if (dataFile == null) {
+ LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
+ throw new IOException("Could not locate data file " + filePrefix + item.getDataFileId());
+ }
+ return dataFile;
+ }
+
+ synchronized File getFile(Location item) throws IOException {
+ Integer key = Integer.valueOf(item.getDataFileId());
+ DataFile dataFile = fileMap.get(key);
+ if (dataFile == null) {
+ LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
+ throw new IOException("Could not locate data file " + filePrefix + item.getDataFileId());
+ }
+ return dataFile.getFile();
+ }
+
+ private DataFile getNextDataFile(DataFile dataFile) {
+ return (DataFile)dataFile.getNext();
+ }
+
+ public synchronized void close() throws IOException {
+ if (!started) {
+ return;
+ }
+ Scheduler.cancel(cleanupTask);
+ accessorPool.close();
+ storeState(false);
+ appender.close();
+ fileMap.clear();
+ fileByFileMap.clear();
+ controlFile.unlock();
+ controlFile.dispose();
+ started = false;
+ }
+
+ synchronized void cleanup() {
+ if (accessorPool != null) {
+ accessorPool.disposeUnused();
+ }
+ }
+
+ public synchronized boolean delete() throws IOException {
+
+ // Close all open file handles...
+ appender.close();
+ accessorPool.close();
+
+ boolean result = true;
+ for (Iterator i = fileMap.values().iterator(); i.hasNext();) {
+ DataFile dataFile = (DataFile)i.next();
+ storeSize.addAndGet(-dataFile.getLength());
+ result &= dataFile.delete();
+ }
+ fileMap.clear();
+ fileByFileMap.clear();
+ lastAppendLocation.set(null);
+ mark = null;
+ currentWriteFile = null;
+
+ // reopen open file handles...
+ accessorPool = new DataFileAccessorPool(this);
+ if (useNio) {
+ appender = new NIODataFileAppender(this);
+ } else {
+ appender = new DataFileAppender(this);
+ }
+ return result;
+ }
+
+ public synchronized void addInterestInFile(int file) throws IOException {
+ if (file >= 0) {
+ Integer key = Integer.valueOf(file);
+ DataFile dataFile = (DataFile)fileMap.get(key);
+ if (dataFile == null) {
+ throw new IOException("That data file does not exist");
+ }
+ addInterestInFile(dataFile);
+ }
+ }
+
+ synchronized void addInterestInFile(DataFile dataFile) {
+ if (dataFile != null) {
+ dataFile.increment();
+ }
+ }
+
+ public synchronized void removeInterestInFile(int file) throws IOException {
+ if (file >= 0) {
+ Integer key = Integer.valueOf(file);
+ DataFile dataFile = (DataFile)fileMap.get(key);
+ removeInterestInFile(dataFile);
+ }
+
+ }
+
+ synchronized void removeInterestInFile(DataFile dataFile) throws IOException {
+ if (dataFile != null) {
+ if (dataFile.decrement() <= 0) {
+ removeDataFile(dataFile);
+ }
+ }
+ }
+
+ public synchronized void consolidateDataFilesNotIn(Set<Integer> inUse, Set<Integer>inProgress) throws IOException {
+ Set<Integer> unUsed = new HashSet<Integer>(fileMap.keySet());
+ unUsed.removeAll(inUse);
+ unUsed.removeAll(inProgress);
+
+ List<DataFile> purgeList = new ArrayList<DataFile>();
+ for (Integer key : unUsed) {
+ DataFile dataFile = (DataFile)fileMap.get(key);
+ purgeList.add(dataFile);
+ }
+ for (DataFile dataFile : purgeList) {
+ if (dataFile.getDataFileId() != currentWriteFile.getDataFileId()) {
+ forceRemoveDataFile(dataFile);
+ }
+ }
+ }
+
+ public synchronized void consolidateDataFilesNotIn(Set<Integer> inUse, Integer lastFile) throws IOException {
+ Set<Integer> unUsed = new HashSet<Integer>(fileMap.keySet());
+ unUsed.removeAll(inUse);
+
+ List<DataFile> purgeList = new ArrayList<DataFile>();
+ for (Integer key : unUsed) {
+ // Only add files less than the lastFile..
+ if( key.intValue() < lastFile.intValue() ) {
+ DataFile dataFile = (DataFile)fileMap.get(key);
+ purgeList.add(dataFile);
+ }
+ }
+ for (DataFile dataFile : purgeList) {
+ forceRemoveDataFile(dataFile);
+ }
+ }
+
+ public synchronized void consolidateDataFiles() throws IOException {
+ List<DataFile> purgeList = new ArrayList<DataFile>();
+ for (DataFile dataFile : fileMap.values()) {
+ if (dataFile.isUnused()) {
+ purgeList.add(dataFile);
+ }
+ }
+ for (DataFile dataFile : purgeList) {
+ removeDataFile(dataFile);
+ }
+ }
+
+ private synchronized void removeDataFile(DataFile dataFile) throws IOException {
+
+ // Make sure we don't delete too much data.
+ if (dataFile == currentWriteFile || mark == null || dataFile.getDataFileId() >= mark.getDataFileId()) {
+ LOG.debug("Won't remove DataFile" + dataFile);
+ return;
+ }
+ forceRemoveDataFile(dataFile);
+ }
+
+ private synchronized void forceRemoveDataFile(DataFile dataFile)
+ throws IOException {
+ accessorPool.disposeDataFileAccessors(dataFile);
+ fileByFileMap.remove(dataFile.getFile());
+ DataFile removed = fileMap.remove(dataFile.getDataFileId());
+ storeSize.addAndGet(-dataFile.getLength());
+ dataFile.unlink();
+ if (archiveDataLogs) {
+ dataFile.move(getDirectoryArchive());
+ LOG.info("moved data file " + dataFile + " to "
+ + getDirectoryArchive());
+ } else {
+ boolean result = dataFile.delete();
+ LOG.info("discarding data file " + dataFile
+ + (result ? "successful " : "failed"));
+ }
+ }
+
+ /**
+ * @return the maxFileLength
+ */
+ public int getMaxFileLength() {
+ return maxFileLength;
+ }
+
+ /**
+ * @param maxFileLength the maxFileLength to set
+ */
+ public void setMaxFileLength(int maxFileLength) {
+ this.maxFileLength = maxFileLength;
+ }
+
+ public String toString() {
+ return "DataManager:(" + filePrefix + ")";
+ }
+
+ public synchronized Location getMark() throws IllegalStateException {
+ return mark;
+ }
+
+ public synchronized Location getNextLocation(Location location) throws IOException, IllegalStateException {
+
+ Location cur = null;
+ while (true) {
+ if (cur == null) {
+ if (location == null) {
+ DataFile head = (DataFile)currentWriteFile.getHeadNode();
+ cur = new Location();
+ cur.setDataFileId(head.getDataFileId());
+ cur.setOffset(0);
+ } else {
+ // Set to the next offset..
+ if( location.getSize() == -1 ) {
+ cur = new Location(location);
+ } else {
+ cur = new Location(location);
+ cur.setOffset(location.getOffset()+location.getSize());
+ }
+ }
+ } else {
+ cur.setOffset(cur.getOffset() + cur.getSize());
+ }
+
+ DataFile dataFile = getDataFile(cur);
+
+ // Did it go into the next file??
+ if (dataFile.getLength() <= cur.getOffset()) {
+ dataFile = getNextDataFile(dataFile);
+ if (dataFile == null) {
+ return null;
+ } else {
+ cur.setDataFileId(dataFile.getDataFileId().intValue());
+ cur.setOffset(0);
+ }
+ }
+
+ // Load in location size and type.
+ DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
+ try {
+ reader.readLocationDetails(cur);
+ } finally {
+ accessorPool.closeDataFileAccessor(reader);
+ }
+
+ if (cur.getType() == 0) {
+ return null;
+ } else if (cur.getType() > 0) {
+ // Only return user records.
+ return cur;
+ }
+ }
+ }
+
+ public synchronized Location getNextLocation(File file, Location lastLocation,boolean thisFileOnly) throws IllegalStateException, IOException{
+ DataFile df = fileByFileMap.get(file);
+ return getNextLocation(df, lastLocation,thisFileOnly);
+ }
+
+ public synchronized Location getNextLocation(DataFile dataFile,
+ Location lastLocation,boolean thisFileOnly) throws IOException, IllegalStateException {
+
+ Location cur = null;
+ while (true) {
+ if (cur == null) {
+ if (lastLocation == null) {
+ DataFile head = (DataFile)dataFile.getHeadNode();
+ cur = new Location();
+ cur.setDataFileId(head.getDataFileId());
+ cur.setOffset(0);
+ } else {
+ // Set to the next offset..
+ cur = new Location(lastLocation);
+ cur.setOffset(cur.getOffset() + cur.getSize());
+ }
+ } else {
+ cur.setOffset(cur.getOffset() + cur.getSize());
+ }
+
+
+ // Did it go into the next file??
+ if (dataFile.getLength() <= cur.getOffset()) {
+ if (thisFileOnly) {
+ return null;
+ }else {
+ dataFile = getNextDataFile(dataFile);
+ if (dataFile == null) {
+ return null;
+ } else {
+ cur.setDataFileId(dataFile.getDataFileId().intValue());
+ cur.setOffset(0);
+ }
+ }
+ }
+
+ // Load in location size and type.
+ DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
+ try {
+ reader.readLocationDetails(cur);
+ } finally {
+ accessorPool.closeDataFileAccessor(reader);
+ }
+
+ if (cur.getType() == 0) {
+ return null;
+ } else if (cur.getType() > 0) {
+ // Only return user records.
+ return cur;
+ }
+ }
+ }
+
+ public synchronized ByteSequence read(Location location) throws IOException, IllegalStateException {
+ DataFile dataFile = getDataFile(location);
+ DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
+ ByteSequence rc = null;
+ try {
+ rc = reader.readRecord(location);
+ } finally {
+ accessorPool.closeDataFileAccessor(reader);
+ }
+ return rc;
+ }
+
+ public void setMark(Location location, boolean sync) throws IOException, IllegalStateException {
+ synchronized (this) {
+ mark = location;
+ }
+ storeState(sync);
+ }
+
+ protected synchronized void storeState(boolean sync) throws IOException {
+ ByteSequence state = marshallState();
+ appender.storeItem(state, Location.MARK_TYPE, sync);
+ controlFile.store(state, sync);
+ }
+
+ public synchronized Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException {
+ Location loc = appender.storeItem(data, Location.USER_TYPE, sync);
+ return loc;
+ }
+
+ public synchronized Location write(ByteSequence data, Runnable onComplete) throws IOException, IllegalStateException {
+ Location loc = appender.storeItem(data, Location.USER_TYPE, onComplete);
+ return loc;
+ }
+
+ public synchronized Location write(ByteSequence data, byte type, boolean sync) throws IOException, IllegalStateException {
+ return appender.storeItem(data, type, sync);
+ }
+
+ public void update(Location location, ByteSequence data, boolean sync) throws IOException {
+ DataFile dataFile = getDataFile(location);
+ DataFileAccessor updater = accessorPool.openDataFileAccessor(dataFile);
+ try {
+ updater.updateRecord(location, data, sync);
+ } finally {
+ accessorPool.closeDataFileAccessor(updater);
+ }
+ }
+
+ public File getDirectory() {
+ return directory;
+ }
+
+ public void setDirectory(File directory) {
+ this.directory = directory;
+ }
+
+ public String getFilePrefix() {
+ return filePrefix;
+ }
+
+ public void setFilePrefix(String filePrefix) {
+ this.filePrefix = filePrefix;
+ }
+
+ public Map<WriteKey, WriteCommand> getInflightWrites() {
+ return inflightWrites;
+ }
+
+ public Location getLastAppendLocation() {
+ return lastAppendLocation.get();
+ }
+
+ public void setLastAppendLocation(Location lastSyncedLocation) {
+ this.lastAppendLocation.set(lastSyncedLocation);
+ }
+
+ public boolean isUseNio() {
+ return useNio;
+ }
+
+ public void setUseNio(boolean useNio) {
+ this.useNio = useNio;
+ }
+
+ public File getDirectoryArchive() {
+ return directoryArchive;
+ }
+
+ public void setDirectoryArchive(File directoryArchive) {
+ this.directoryArchive = directoryArchive;
+ }
+
+ public boolean isArchiveDataLogs() {
+ return archiveDataLogs;
+ }
+
+ public void setArchiveDataLogs(boolean archiveDataLogs) {
+ this.archiveDataLogs = archiveDataLogs;
+ }
+
+ synchronized public Integer getCurrentDataFileId() {
+ if( currentWriteFile==null )
+ return null;
+ return currentWriteFile.getDataFileId();
+ }
+
+ /**
+ * Get a set of files - only valid after start()
+ * @return files currently being used
+ */
+ public Set<File> getFiles(){
+ return fileByFileMap.keySet();
+ }
+
+ synchronized public long getDiskSize() {
+ long rc=0;
+ DataFile cur = (DataFile)currentWriteFile.getHeadNode();
+ while( cur !=null ) {
+ rc += cur.getLength();
+ cur = (DataFile) cur.getNext();
+ }
+ return rc;
+ }
+
+ synchronized public long getDiskSizeUntil(Location startPosition) {
+ long rc=0;
+ DataFile cur = (DataFile)currentWriteFile.getHeadNode();
+ while( cur !=null ) {
+ if( cur.getDataFileId().intValue() >= startPosition.getDataFileId() ) {
+ return rc + startPosition.getOffset();
+ }
+ rc += cur.getLength();
+ cur = (DataFile) cur.getNext();
+ }
+ return rc;
+ }
+
+}
Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Location.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Location.java?rev=692288&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Location.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Location.java Thu Sep 4 15:46:42 2008
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.journal;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Used as a location in the data store.
+ *
+ * @version $Revision: 1.2 $
+ */
+public final class Location implements Comparable<Location> {
+
+ public static final byte MARK_TYPE = -1;
+ public static final byte USER_TYPE = 1;
+ public static final byte NOT_SET_TYPE = 0;
+ public static final int NOT_SET = -1;
+
+ private int dataFileId = NOT_SET;
+ private int offset = NOT_SET;
+ private int size = NOT_SET;
+ private byte type = NOT_SET_TYPE;
+ private CountDownLatch latch;
+
+ public Location() {
+ }
+
+ Location(Location item) {
+ this.dataFileId = item.dataFileId;
+ this.offset = item.offset;
+ this.size = item.size;
+ this.type = item.type;
+ }
+
+ boolean isValid() {
+ return dataFileId != NOT_SET;
+ }
+
+ /**
+ * @return the size of the data record including the header.
+ */
+ public int getSize() {
+ return size;
+ }
+
+ /**
+ * @param size the size of the data record including the header.
+ */
+ public void setSize(int size) {
+ this.size = size;
+ }
+
+ /**
+ * @return the size of the payload of the record.
+ */
+ public int getPaylodSize() {
+ return size - Journal.ITEM_HEAD_FOOT_SPACE;
+ }
+
+ public int getOffset() {
+ return offset;
+ }
+
+ public void setOffset(int offset) {
+ this.offset = offset;
+ }
+
+ public int getDataFileId() {
+ return dataFileId;
+ }
+
+ public void setDataFileId(int file) {
+ this.dataFileId = file;
+ }
+
+ public byte getType() {
+ return type;
+ }
+
+ public void setType(byte type) {
+ this.type = type;
+ }
+
+ public String toString() {
+ String result = "offset = " + offset + ", file = " + dataFileId + ", size = " + size + ", type = "
+ + type;
+ return result;
+ }
+
+ public void writeExternal(DataOutput dos) throws IOException {
+ dos.writeInt(dataFileId);
+ dos.writeInt(offset);
+ dos.writeInt(size);
+ dos.writeByte(type);
+ }
+
+ public void readExternal(DataInput dis) throws IOException {
+ dataFileId = dis.readInt();
+ offset = dis.readInt();
+ size = dis.readInt();
+ type = dis.readByte();
+ }
+
+ public CountDownLatch getLatch() {
+ return latch;
+ }
+
+ public void setLatch(CountDownLatch latch) {
+ this.latch = latch;
+ }
+
+ public int compareTo(Location o) {
+ Location l = (Location)o;
+ if (dataFileId == l.dataFileId) {
+ int rc = offset - l.offset;
+ return rc;
+ }
+ return dataFileId - l.dataFileId;
+ }
+
+ public boolean equals(Object o) {
+ boolean result = false;
+ if (o instanceof Location) {
+ result = compareTo((Location)o) == 0;
+ }
+ return result;
+ }
+
+ public int hashCode() {
+ return dataFileId ^ offset;
+ }
+
+}
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Location.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/NIODataFileAppender.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/NIODataFileAppender.java?rev=692288&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/NIODataFileAppender.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/NIODataFileAppender.java Thu Sep 4 15:46:42 2008
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.journal;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+/**
+ * An AsyncDataFileAppender that uses NIO ByteBuffers and File chanels to more
+ * efficently copy data to files.
+ *
+ * @version $Revision: 1.1.1.1 $
+ */
+class NIODataFileAppender extends DataFileAppender {
+
+ public NIODataFileAppender(Journal fileManager) {
+ super(fileManager);
+ }
+
+ /**
+ * The async processing loop that writes to the data files and does the
+ * force calls.
+ *
+ * Since the file sync() call is the slowest of all the operations, this
+ * algorithm tries to 'batch' or group together several file sync() requests
+ * into a single file sync() call. The batching is accomplished attaching
+ * the same CountDownLatch instance to every force request in a group.
+ *
+ */
+ protected void processQueue() {
+ DataFile dataFile = null;
+ RandomAccessFile file = null;
+ FileChannel channel = null;
+
+ try {
+
+ ByteBuffer header = ByteBuffer.allocateDirect(Journal.ITEM_HEAD_SPACE);
+ ByteBuffer footer = ByteBuffer.allocateDirect(Journal.ITEM_FOOT_SPACE);
+ ByteBuffer buffer = ByteBuffer.allocateDirect(maxWriteBatchSize);
+
+ // Populate the static parts of the headers and footers..
+ header.putInt(0); // size
+ header.put((byte)0); // type
+ header.put(RESERVED_SPACE); // reserved
+ header.put(Journal.ITEM_HEAD_SOR);
+ footer.put(Journal.ITEM_HEAD_EOR);
+
+ while (true) {
+
+ Object o = null;
+
+ // Block till we get a command.
+ synchronized (enqueueMutex) {
+ while (true) {
+ if (nextWriteBatch != null) {
+ o = nextWriteBatch;
+ nextWriteBatch = null;
+ break;
+ }
+ if (shutdown) {
+ return;
+ }
+ enqueueMutex.wait();
+ }
+ enqueueMutex.notify();
+ }
+
+ WriteBatch wb = (WriteBatch)o;
+ if (dataFile != wb.dataFile) {
+ if (file != null) {
+ dataFile.closeRandomAccessFile(file);
+ }
+ dataFile = wb.dataFile;
+ file = dataFile.openRandomAccessFile(true);
+ channel = file.getChannel();
+ }
+
+ WriteCommand write = wb.first;
+
+ // Write all the data.
+ // Only need to seek to first location.. all others
+ // are in sequence.
+ file.seek(write.location.getOffset());
+
+
+ boolean forceToDisk=false;
+
+ //
+ // is it just 1 big write?
+ if (wb.size == write.location.getSize()) {
+ forceToDisk = write.sync | write.onComplete!=null;
+
+ header.clear();
+ header.putInt(write.location.getSize());
+ header.put(write.location.getType());
+ header.clear();
+ transfer(header, channel);
+ ByteBuffer source = ByteBuffer.wrap(write.data.getData(), write.data.getOffset(),
+ write.data.getLength());
+ transfer(source, channel);
+ footer.clear();
+ transfer(footer, channel);
+
+ } else {
+
+ // Combine the smaller writes into 1 big buffer
+ while (write != null) {
+ forceToDisk |= write.sync | write.onComplete!=null;
+
+ header.clear();
+ header.putInt(write.location.getSize());
+ header.put(write.location.getType());
+ header.clear();
+ copy(header, buffer);
+ assert !header.hasRemaining();
+
+ ByteBuffer source = ByteBuffer.wrap(write.data.getData(), write.data.getOffset(),
+ write.data.getLength());
+ copy(source, buffer);
+ assert !source.hasRemaining();
+
+ footer.clear();
+ copy(footer, buffer);
+ assert !footer.hasRemaining();
+
+ write = (WriteCommand)write.getNext();
+ }
+
+ // Fully write out the buffer..
+ buffer.flip();
+ transfer(buffer, channel);
+ buffer.clear();
+ }
+
+ if( forceToDisk ) {
+ file.getChannel().force(false);
+ }
+
+ WriteCommand lastWrite = (WriteCommand)wb.first.getTailNode();
+ dataManager.setLastAppendLocation(lastWrite.location);
+
+ // Now that the data is on disk, remove the writes from the in
+ // flight
+ // cache.
+ write = wb.first;
+ while (write != null) {
+ if (!write.sync) {
+ inflightWrites.remove(new WriteKey(write.location));
+ }
+ if (write.onComplete != null) {
+ try {
+ write.onComplete.run();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ }
+ write = (WriteCommand)write.getNext();
+ }
+
+ // Signal any waiting threads that the write is on disk.
+ wb.latch.countDown();
+ }
+
+ } catch (IOException e) {
+ synchronized (enqueueMutex) {
+ firstAsyncException = e;
+ }
+ } catch (InterruptedException e) {
+ } finally {
+ try {
+ if (file != null) {
+ dataFile.closeRandomAccessFile(file);
+ }
+ } catch (IOException e) {
+ }
+ shutdownDone.countDown();
+ }
+ }
+
+ /**
+ * Copy the bytes in header to the channel.
+ *
+ * @param header - source of data
+ * @param channel - destination where the data will be written.
+ * @throws IOException
+ */
+ private void transfer(ByteBuffer header, FileChannel channel) throws IOException {
+ while (header.hasRemaining()) {
+ channel.write(header);
+ }
+ }
+
+ private int copy(ByteBuffer src, ByteBuffer dest) {
+ int rc = Math.min(dest.remaining(), src.remaining());
+ if (rc > 0) {
+ // Adjust our limit so that we don't overflow the dest buffer.
+ int limit = src.limit();
+ src.limit(src.position() + rc);
+ dest.put(src);
+ // restore the limit.
+ src.limit(limit);
+ }
+ return rc;
+ }
+
+}
Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyDataFile.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyDataFile.java?rev=692288&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyDataFile.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyDataFile.java Thu Sep 4 15:46:42 2008
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.journal;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+import org.apache.kahadb.util.IOHelper;
+
+/**
+ * Allows you to open a data file in read only mode. Useful when working with
+ * archived data files.
+ */
+public class ReadOnlyDataFile extends DataFile {
+
+ ReadOnlyDataFile(File file, int number, int preferedSize) {
+ super(file, number, preferedSize);
+ }
+
+
+ public RandomAccessFile openRandomAccessFile(boolean appender) throws IOException {
+ RandomAccessFile rc = new RandomAccessFile(file, "r");
+ // When we start to write files size them up so that the OS has a chance
+ // to allocate the file contigously.
+ if (appender) {
+ if (length < preferedSize) {
+ rc.setLength(preferedSize);
+ }
+ }
+ return rc;
+ }
+
+ public void closeRandomAccessFile(RandomAccessFile file) throws IOException {
+ file.close();
+ }
+
+ public synchronized boolean delete() throws IOException {
+ throw new RuntimeException("Not valid on a read only file.");
+ }
+
+ public synchronized void move(File targetDirectory) throws IOException{
+ throw new RuntimeException("Not valid on a read only file.");
+ }
+
+}
Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyJournal.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyJournal.java?rev=692288&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyJournal.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyJournal.java Thu Sep 4 15:46:42 2008
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.journal;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.util.ByteSequence;
+import org.apache.kahadb.util.Scheduler;
+
+/**
+ * An AsyncDataManager that works in read only mode against multiple data directories.
+ * Useful for reading back archived data files.
+ */
+public class ReadOnlyJournal extends Journal {
+
+ private static final Log LOG = LogFactory.getLog(ReadOnlyJournal.class);
+ private final ArrayList<File> dirs;
+
+ public ReadOnlyJournal(final ArrayList<File> dirs) {
+ this.dirs = dirs;
+ }
+
+ @SuppressWarnings("unchecked")
+ public synchronized void start() throws IOException {
+ if (started) {
+ return;
+ }
+
+ started = true;
+
+ ArrayList<File> files = new ArrayList<File>();
+ for (File directory : dirs) {
+ final File d = directory;
+ File[] f = d.listFiles(new FilenameFilter() {
+ public boolean accept(File dir, String n) {
+ return dir.equals(d) && n.startsWith(filePrefix);
+ }
+ });
+ for (int i = 0; i < f.length; i++) {
+ files.add(f[i]);
+ }
+ }
+
+ for (File file : files) {
+ try {
+ String n = file.getName();
+ String numStr = n.substring(filePrefix.length(), n.length());
+ int num = Integer.parseInt(numStr);
+ DataFile dataFile = new ReadOnlyDataFile(file, num, preferedFileLength);
+ fileMap.put(dataFile.getDataFileId(), dataFile);
+ storeSize.addAndGet(dataFile.getLength());
+ } catch (NumberFormatException e) {
+ // Ignore file that do not match the pattern.
+ }
+ }
+
+ // Sort the list so that we can link the DataFiles together in the
+ // right order.
+ List<DataFile> dataFiles = new ArrayList<DataFile>(fileMap.values());
+ Collections.sort(dataFiles);
+ currentWriteFile = null;
+ for (DataFile df : dataFiles) {
+ if (currentWriteFile != null) {
+ currentWriteFile.linkAfter(df);
+ }
+ currentWriteFile = df;
+ fileByFileMap.put(df.getFile(), df);
+ }
+
+ // Need to check the current Write File to see if there was a partial
+ // write to it.
+ if (currentWriteFile != null) {
+
+ // See if the lastSyncedLocation is valid..
+ Location l = lastAppendLocation.get();
+ if (l != null && l.getDataFileId() != currentWriteFile.getDataFileId().intValue()) {
+ l = null;
+ }
+ }
+ }
+
+ public synchronized void close() throws IOException {
+ if (!started) {
+ return;
+ }
+ accessorPool.close();
+ fileMap.clear();
+ fileByFileMap.clear();
+ started = false;
+ }
+
+
+ public Location getFirstLocation() throws IllegalStateException, IOException {
+ if( currentWriteFile == null ) {
+ return null;
+ }
+
+ DataFile first = (DataFile)currentWriteFile.getHeadNode();
+ Location cur = new Location();
+ cur.setDataFileId(first.getDataFileId());
+ cur.setOffset(0);
+ cur.setSize(0);
+ return getNextLocation(cur);
+ }
+
+ @Override
+ public synchronized boolean delete() throws IOException {
+ throw new RuntimeException("Cannot delete a ReadOnlyAsyncDataManager");
+ }
+}
Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/package.html
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/package.html?rev=692288&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/package.html (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/package.html Thu Sep 4 15:46:42 2008
@@ -0,0 +1,25 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+ journal based data storage - scalable and fast
+
+</body>
+</html>
Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/package.html
------------------------------------------------------------------------------
svn:executable = *
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java?rev=692288&r1=692287&r2=692288&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java Thu Sep 4 15:46:42 2008
@@ -52,7 +52,7 @@
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.wireformat.WireFormat;
-import org.apache.kahadb.impl.async.Location;
+import org.apache.kahadb.journal.Location;
import org.apache.kahadb.page.Transaction;
import org.apache.kahadb.store.MessageDatabase.StoredDestination;
import org.apache.kahadb.store.data.KahaAddMessageCommand;
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java?rev=692288&r1=692287&r2=692288&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java Thu Sep 4 15:46:42 2008
@@ -33,9 +33,9 @@
import org.apache.commons.logging.LogFactory;
import org.apache.kahadb.Marshaller;
import org.apache.kahadb.StringMarshaller;
-import org.apache.kahadb.impl.async.AsyncDataManager;
-import org.apache.kahadb.impl.async.Location;
-import org.apache.kahadb.page.BTreeIndex;
+import org.apache.kahadb.index.BTreeIndex;
+import org.apache.kahadb.journal.Journal;
+import org.apache.kahadb.journal.Location;
import org.apache.kahadb.page.Page;
import org.apache.kahadb.page.PageFile;
import org.apache.kahadb.page.Transaction;
@@ -98,7 +98,7 @@
protected PageFile pageFile;
- protected AsyncDataManager asyncDataManager;
+ protected Journal asyncDataManager;
protected Metadata metadata = new Metadata();
protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller();
@@ -577,8 +577,8 @@
return pf;
}
- private AsyncDataManager createAsyncDataManager() {
- AsyncDataManager manager = new AsyncDataManager();
+ private Journal createAsyncDataManager() {
+ Journal manager = new Journal();
manager.setDirectory(new File(directory, "journal"));
manager.setMaxFileLength(1024 * 1024 * 20);
manager.setUseNio(false);
Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexBenchMark.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexBenchMark.java?rev=692288&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexBenchMark.java (added)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexBenchMark.java Thu Sep 4 15:46:42 2008
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.index;
+
+import java.io.IOException;
+import java.text.NumberFormat;
+
+import org.apache.kahadb.LongMarshaller;
+import org.apache.kahadb.StringMarshaller;
+import org.apache.kahadb.page.Transaction;
+
+public class BTreeIndexBenchMark extends IndexBenchmark {
+
+ private NumberFormat nf;
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ nf = NumberFormat.getIntegerInstance();
+ nf.setMinimumIntegerDigits(10);
+ nf.setGroupingUsed(false);
+ }
+
+ @Override
+ protected Index<String, Long> createIndex() throws Exception {
+
+ Transaction tx = pf.tx();
+ long id = tx.allocate().getPageId();
+ tx.commit();
+
+ BTreeIndex<String, Long> index = new BTreeIndex<String, Long>(pf, id);
+ index.setKeyMarshaller(StringMarshaller.INSTANCE);
+ index.setValueMarshaller(LongMarshaller.INSTANCE);
+
+ return index;
+ }
+
+ @Override
+ protected void dumpIndex(Index<String, Long> index) throws IOException {
+ Transaction tx = pf.tx();
+ ((BTreeIndex)index).printStructure(tx, System.out);
+ }
+
+ /**
+ * Overriding so that this generates keys that are the worst case for the BTree. Keys that
+ * always insert to the end of the BTree.
+ */
+ @Override
+ protected String key(long i) {
+ return "a-long-message-id-like-key:"+nf.format(i);
+ }
+
+}
Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java?rev=692288&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java (added)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java Thu Sep 4 15:46:42 2008
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.index;
+
+import java.io.PrintWriter;
+import java.text.NumberFormat;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.kahadb.LongMarshaller;
+import org.apache.kahadb.StringMarshaller;
+import org.apache.kahadb.index.BTreeIndex;
+import org.apache.kahadb.index.Index;
+
+public class BTreeIndexTest extends IndexTestSupport {
+
+ private NumberFormat nf;
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ nf = NumberFormat.getIntegerInstance();
+ nf.setMinimumIntegerDigits(6);
+ nf.setGroupingUsed(false);
+ }
+
+ @Override
+ protected Index<String, Long> createIndex() throws Exception {
+
+ long id = tx.allocate().getPageId();
+ tx.commit();
+
+ BTreeIndex<String, Long> index = new BTreeIndex<String,Long>(pf, id);
+ index.setKeyMarshaller(StringMarshaller.INSTANCE);
+ index.setValueMarshaller(LongMarshaller.INSTANCE);
+
+ return index;
+ }
+
+ /**
+ * Yeah, the current implementation does NOT try to balance the tree. Here is
+ * a test case showing that it gets out of balance.
+ *
+ * @throws Exception
+ */
+ public void disabled_testTreeBalancing() throws Exception {
+ createPageFileAndIndex(100);
+
+ BTreeIndex index = ((BTreeIndex)this.index);
+ this.index.load();
+
+ doInsert(50);
+
+ int minLeafDepth = index.getMinLeafDepth(tx);
+ int maxLeafDepth = index.getMaxLeafDepth(tx);
+ assertTrue("Tree is balanced", maxLeafDepth-minLeafDepth <= 1);
+
+ // Remove some of the data
+ doRemove(16);
+ minLeafDepth = index.getMinLeafDepth(tx);
+ maxLeafDepth = index.getMaxLeafDepth(tx);
+
+ System.out.println( "min:"+minLeafDepth );
+ System.out.println( "max:"+maxLeafDepth );
+ index.printStructure(tx, new PrintWriter(System.out));
+
+ assertTrue("Tree is balanced", maxLeafDepth-minLeafDepth <= 1);
+
+ this.index.unload();
+ }
+
+ public void testPruning() throws Exception {
+ createPageFileAndIndex(100);
+
+ BTreeIndex<String,Long> index = ((BTreeIndex<String,Long>)this.index);
+
+ this.index.load();
+
+ int minLeafDepth = index.getMinLeafDepth(tx);
+ int maxLeafDepth = index.getMaxLeafDepth(tx);
+ assertEquals(1, minLeafDepth);
+ assertEquals(1, maxLeafDepth);
+
+ doInsert(1000);
+
+ minLeafDepth = index.getMinLeafDepth(tx);
+ maxLeafDepth = index.getMaxLeafDepth(tx);
+ assertTrue("Depth of tree grew", minLeafDepth > 1);
+ assertTrue("Depth of tree grew", maxLeafDepth > 1);
+
+ // Remove the data.
+ doRemove(1000);
+ minLeafDepth = index.getMinLeafDepth(tx);
+ maxLeafDepth = index.getMaxLeafDepth(tx);
+
+ assertEquals(1, minLeafDepth);
+ assertEquals(1, maxLeafDepth);
+
+ this.index.unload();
+ }
+
+ public void testIteration() throws Exception {
+ createPageFileAndIndex(100);
+ BTreeIndex<String,Long> index = ((BTreeIndex<String,Long>)this.index);
+ this.index.load();
+
+ // Insert in reverse order..
+ doInsertReverse(1000);
+
+ this.index.unload();
+ this.index.load();
+
+ // BTree should iterate it in sorted order.
+ int counter=0;
+ for (Iterator<Map.Entry<String,Long>> i = index.iterator(tx); i.hasNext();) {
+ Map.Entry<String,Long> entry = (Map.Entry<String,Long>)i.next();
+ assertEquals(key(counter),entry.getKey());
+ assertEquals(counter,(long)entry.getValue());
+ counter++;
+ }
+
+ this.index.unload();
+ }
+
+ void doInsertReverse(int count) throws Exception {
+ for (int i = count-1; i >= 0; i--) {
+ index.put(tx, key(i), (long)i);
+ tx.commit();
+ }
+ }
+ /**
+ * Overriding so that this generates keys that are the worst case for the BTree. Keys that
+ * always insert to the end of the BTree.
+ */
+ @Override
+ protected String key(int i) {
+ return "key:"+nf.format(i);
+ }
+}