You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/11/25 07:00:57 UTC
svn commit: r479089 [1/2] - in /incubator/activemq/trunk: ./ activemq-core/
activemq-core/src/main/java/org/apache/activemq/kaha/impl/
activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/
activemq-core/src/main/java/org/apache/activemq/kaha...
Author: chirino
Date: Fri Nov 24 22:00:56 2006
New Revision: 479089
URL: http://svn.apache.org/viewvc?view=rev&rev=479089
Log:
Added a new org.apache.activemq.kaha.impl.asyc package that holds data manager/journal that implements both the Kaha DataManager and ActiveIO Journal interfaces.
- Initial bench marks show it to be as fast or faster than the default ActiveIO Journal.
- The bigest differentiator is that this implementation of the journal was built to also provide fast reads.
- The DataManager interface was extracted and now the KahaStore can switch between the original DataManager implementation and the new implementation in the kaha.impl.async packagge.
- Simplified the original implementation by removing the AsyncDataWriters stuff since this is largely what the new package is based on.
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ControlFile.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessor.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataManagerFacade.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/JournalFacade.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/Location.java (with props)
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java
- copied, changed from r477680, incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManager.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/LinkedNode.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/JournalImplTest.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/JournalPerfTool.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/JournalRWPerfTool.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/LocationTest.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/LinkedNodeTest.java
Removed:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/AsyncDataFileReader.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/AsyncDataFileWriter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFileReader.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFileWriter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManager.java
Modified:
incubator/activemq/trunk/activemq-core/pom.xml
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileReader.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/StoreIndexWriter.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/CachedListContainerImplTest.java
incubator/activemq/trunk/pom.xml
Modified: incubator/activemq/trunk/activemq-core/pom.xml
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/pom.xml?view=diff&rev=479089&r1=479088&r2=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/pom.xml (original)
+++ incubator/activemq/trunk/activemq-core/pom.xml Fri Nov 24 22:00:56 2006
@@ -48,6 +48,12 @@
<artifactId>activeio-core</artifactId>
<optional>false</optional>
</dependency>
+ <dependency>
+ <groupId>${pom.groupId}</groupId>
+ <artifactId>activeio-core</artifactId>
+ <optional>false</optional>
+ <type>test-jar</type>
+ </dependency>
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java?view=auto&rev=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java Fri Nov 24 22:00:56 2006
@@ -0,0 +1,42 @@
+package org.apache.activemq.kaha.impl;
+
+import java.io.IOException;
+
+import org.apache.activemq.kaha.Marshaller;
+import org.apache.activemq.kaha.StoreLocation;
+import org.apache.activemq.kaha.impl.data.RedoListener;
+
+public interface DataManager {
+
+ String getName();
+
+ Object readItem(Marshaller marshaller, StoreLocation item)
+ throws IOException;
+
+ StoreLocation storeDataItem(Marshaller marshaller, Object payload)
+ throws IOException;
+
+ StoreLocation storeRedoItem(Object payload) throws IOException;
+
+ void updateItem(StoreLocation location, Marshaller marshaller,
+ Object payload) throws IOException;
+
+ void recoverRedoItems(RedoListener listener) throws IOException;
+
+ void close() throws IOException;
+
+ void force() throws IOException;
+
+ boolean delete() throws IOException;
+
+ void addInterestInFile(int file) throws IOException;
+
+ void removeInterestInFile(int file) throws IOException;
+
+ void consolidateDataFiles() throws IOException;
+
+ Marshaller getRedoMarshaller();
+
+ void setRedoMarshaller(Marshaller redoMarshaller);
+
+}
\ No newline at end of file
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java?view=diff&rev=479089&r1=479088&r2=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java Fri Nov 24 22:00:56 2006
@@ -27,7 +27,6 @@
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.kaha.StoreLocation;
import org.apache.activemq.kaha.impl.container.ContainerId;
-import org.apache.activemq.kaha.impl.data.DataManager;
import org.apache.activemq.kaha.impl.data.Item;
import org.apache.activemq.kaha.impl.index.IndexItem;
import org.apache.activemq.kaha.impl.index.IndexManager;
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java?view=diff&rev=479089&r1=479088&r2=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java Fri Nov 24 22:00:56 2006
@@ -33,11 +33,13 @@
import org.apache.activemq.kaha.RuntimeStoreException;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StoreLocation;
+import org.apache.activemq.kaha.impl.async.AsyncDataManager;
+import org.apache.activemq.kaha.impl.async.DataManagerFacade;
import org.apache.activemq.kaha.impl.container.BaseContainerImpl;
import org.apache.activemq.kaha.impl.container.ContainerId;
import org.apache.activemq.kaha.impl.container.ListContainerImpl;
import org.apache.activemq.kaha.impl.container.MapContainerImpl;
-import org.apache.activemq.kaha.impl.data.DataManager;
+import org.apache.activemq.kaha.impl.data.DataManagerImpl;
import org.apache.activemq.kaha.impl.data.Item;
import org.apache.activemq.kaha.impl.data.RedoListener;
import org.apache.activemq.kaha.impl.index.IndexItem;
@@ -73,8 +75,8 @@
private String mode;
private boolean initialized;
private boolean logIndexChanges=false;
- private boolean useAsyncWriter=false;
- private long maxDataFileLength=DataManager.MAX_FILE_LENGTH;
+ private boolean useAsyncDataManager=false;
+ private long maxDataFileLength=1024*1024*32;
private FileLock lock;
private String indexType=IndexTypes.DISK_INDEX;
@@ -319,10 +321,21 @@
public synchronized DataManager getDataManager(String name) throws IOException{
DataManager dm=(DataManager)dataManagers.get(name);
if(dm==null){
- dm=new DataManager(directory,name);
- dm.setMaxFileLength(maxDataFileLength);
- dm.setUseAsyncWriter(isUseAsyncWriter());
- recover(dm);
+ if( isUseAsyncDataManager() ) {
+ AsyncDataManager t=new AsyncDataManager();
+ t.setDirectory(directory);
+ t.setFilePrefix("data-"+name+"-");
+ t.setMaxFileLength((int) maxDataFileLength);
+ t.start();
+ dm=new DataManagerFacade(t, name);
+ } else {
+ DataManagerImpl t=new DataManagerImpl(directory,name);
+ t.setMaxFileLength(maxDataFileLength);
+ dm=t;
+ }
+ if( logIndexChanges ) {
+ recover(dm);
+ }
dataManagers.put(name,dm);
}
return dm;
@@ -339,7 +352,6 @@
private void recover(final DataManager dm) throws IOException{
dm.recoverRedoItems(new RedoListener(){
-
public void onRedoItem(StoreLocation item,Object o) throws Exception{
RedoStoreIndexItem redo=(RedoStoreIndexItem)o;
// IndexManager im = getIndexManager(dm, redo.getIndexName());
@@ -531,12 +543,12 @@
}
}
- public synchronized boolean isUseAsyncWriter() {
- return useAsyncWriter;
+ public synchronized boolean isUseAsyncDataManager() {
+ return useAsyncDataManager;
}
- public synchronized void setUseAsyncWriter(boolean useAsyncWriter) {
- this.useAsyncWriter = useAsyncWriter;
+ public synchronized void setUseAsyncDataManager(boolean useAsyncWriter) {
+ this.useAsyncDataManager = useAsyncWriter;
}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java?view=auto&rev=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java Fri Nov 24 22:00:56 2006
@@ -0,0 +1,481 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.async;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteCommand;
+import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteKey;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Manages DataFiles
+ *
+ * @version $Revision: 1.1.1.1 $
+ */
+public final class AsyncDataManager {
+
+ private static final Log log=LogFactory.getLog(AsyncDataManager.class);
+
+ public static int CONTROL_RECORD_MAX_LENGTH=1024;
+
+ public static final int ITEM_HEAD_RESERVED_SPACE=21;
+ // ITEM_HEAD_SPACE = length + type+ reserved space + SOR
+ public static final int ITEM_HEAD_SPACE=4+1+ITEM_HEAD_RESERVED_SPACE+3;
+ public static final int ITEM_HEAD_OFFSET_TO_SOR=ITEM_HEAD_SPACE-3;
+ public static final int ITEM_FOOT_SPACE=3; // EOR
+
+ public static final int ITEM_HEAD_FOOT_SPACE=ITEM_HEAD_SPACE+ITEM_FOOT_SPACE;
+
+ public static final byte[] ITEM_HEAD_SOR=new byte[]{'S', 'O', 'R'}; //
+ public static final byte[] ITEM_HEAD_EOR=new byte[]{'E', 'O', 'R'}; //
+
+ public static final byte DATA_ITEM_TYPE=1;
+ public static final byte REDO_ITEM_TYPE=2;
+
+ public static String DEFAULT_DIRECTORY="data";
+ public static String DEFAULT_FILE_PREFIX="data-";
+ public static int DEFAULT_MAX_FILE_LENGTH=1024*1024*32;
+
+ private File directory = new File(DEFAULT_DIRECTORY);
+ private String filePrefix=DEFAULT_FILE_PREFIX;
+ private int maxFileLength = DEFAULT_MAX_FILE_LENGTH;
+ private int preferedFileLength = DEFAULT_MAX_FILE_LENGTH-1024*512;
+
+ private DataFileAppender appender;
+ private DataFileAccessorPool accessorPool = new DataFileAccessorPool(this);
+
+ private Map<Integer,DataFile> fileMap=new HashMap<Integer,DataFile>();
+ private DataFile currentWriteFile;
+ ControlFile controlFile;
+
+ private Location mark;
+ private final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>();
+ boolean started = false;
+ boolean useNio = true;
+
+ protected final ConcurrentHashMap<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>();
+
+ @SuppressWarnings("unchecked")
+ public synchronized void start() throws IOException {
+ if( started ) {
+ return;
+
+ }
+
+ started=true;
+ directory.mkdirs();
+ controlFile = new ControlFile(new File(directory, filePrefix+"control"), CONTROL_RECORD_MAX_LENGTH);
+ controlFile.lock();
+
+ ByteSequence sequence = controlFile.load();
+ if( sequence != null && sequence.getLength()>0 ) {
+ unmarshallState(sequence);
+ }
+ if( useNio) {
+ appender = new NIODataFileAppender(this);
+ } else {
+ appender = new DataFileAppender(this);
+ }
+
+ File[] files=directory.listFiles(new FilenameFilter(){
+ public boolean accept(File dir,String n){
+ return dir.equals(dir)&&n.startsWith(filePrefix);
+ }
+ });
+
+ if(files!=null){
+ for(int i=0;i<files.length;i++){
+ try {
+ File file=files[i];
+ String n=file.getName();
+ String numStr=n.substring(filePrefix.length(),n.length());
+ int num=Integer.parseInt(numStr);
+ DataFile dataFile=new DataFile(file,num, preferedFileLength);
+ fileMap.put(dataFile.getDataFileId(),dataFile);
+ } catch (NumberFormatException e) {
+ // Ignore file that do not match the patern.
+ }
+ }
+
+ // Sort the list so that we can link the DataFiles together in the right order.
+ ArrayList<DataFile> l = new ArrayList<DataFile>(fileMap.values());
+ Collections.sort(l);
+ currentWriteFile=null;
+ for (DataFile df : l) {
+ if( currentWriteFile!=null ) {
+ currentWriteFile.linkAfter(df);
+ }
+ currentWriteFile=df;
+ }
+ }
+
+ // Need to check the current Write File to see if there was a partial write to it.
+ if( currentWriteFile!=null ) {
+
+ // See if the lastSyncedLocation is valid..
+ Location l = lastAppendLocation.get();
+ if( l!=null && l.getDataFileId() != currentWriteFile.getDataFileId().intValue() ) {
+ l=null;
+ }
+
+ // If we know the last location that was ok.. then we can skip lots of checking
+ l = recoveryCheck(currentWriteFile, l);
+ lastAppendLocation.set(l);
+ }
+
+ storeState(false);
+ }
+
+ private Location recoveryCheck(DataFile dataFile, Location location) throws IOException {
+ if( location == null ) {
+ location = new Location();
+ location.setDataFileId(dataFile.getDataFileId());
+ location.setOffset(0);
+ }
+ DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
+ try {
+ reader.readLocationDetails(location);
+ while( reader.readLocationDetailsAndValidate(location) ) {
+ location.setOffset(location.getOffset()+location.getSize());
+ }
+ } finally {
+ accessorPool.closeDataFileAccessor(reader);
+ }
+ dataFile.setLength(location.getOffset());
+ return location;
+ }
+
+ private void unmarshallState(ByteSequence sequence) throws IOException {
+ ByteArrayInputStream bais = new ByteArrayInputStream(sequence.getData(), sequence.getOffset(), sequence.getLength());
+ DataInputStream dis = new DataInputStream(bais);
+ if( dis.readBoolean() ) {
+ mark = new Location();
+ mark.readExternal(dis);
+ } else {
+ mark = null;
+ }
+ if( dis.readBoolean() ) {
+ Location l = new Location();
+ l.readExternal(dis);
+ lastAppendLocation.set(l);
+ } else {
+ lastAppendLocation.set(null);
+ }
+ }
+
+ private ByteSequence marshallState() throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+
+ if( mark!=null ) {
+ dos.writeBoolean(true);
+ mark.writeExternal(dos);
+ } else {
+ dos.writeBoolean(false);
+ }
+ Location l = lastAppendLocation.get();
+ if( l!=null ) {
+ dos.writeBoolean(true);
+ l.writeExternal(dos);
+ } else {
+ dos.writeBoolean(false);
+ }
+
+ byte[] bs = baos.toByteArray();
+ return new ByteSequence(bs,0,bs.length);
+ }
+
+ synchronized DataFile allocateLocation(Location location) throws IOException{
+ if(currentWriteFile==null||((currentWriteFile.getLength()+location.getSize())>maxFileLength)){
+ int nextNum=currentWriteFile!=null?currentWriteFile.getDataFileId().intValue()+1:1;
+
+ String fileName=filePrefix+nextNum;
+ DataFile nextWriteFile=new DataFile(new File(directory,fileName),nextNum, preferedFileLength);
+ fileMap.put(nextWriteFile.getDataFileId(),nextWriteFile);
+ if( currentWriteFile!=null ) {
+ currentWriteFile.linkAfter(nextWriteFile);
+ if(currentWriteFile.isUnused()){
+ removeDataFile(currentWriteFile);
+ }
+ }
+ currentWriteFile=nextWriteFile;
+
+ }
+ location.setOffset(currentWriteFile.getLength());
+ location.setDataFileId(currentWriteFile.getDataFileId().intValue());
+ currentWriteFile.incrementLength(location.getSize());
+ currentWriteFile.increment();
+ return currentWriteFile;
+ }
+
+ DataFile getDataFile(Location item) throws IOException{
+ Integer key=new Integer(item.getDataFileId());
+ DataFile dataFile=(DataFile) fileMap.get(key);
+ if(dataFile==null){
+ log.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
+ throw new IOException("Could not locate data file "+filePrefix+"-"+item.getDataFileId());
+ }
+ return dataFile;
+ }
+
+ private DataFile getNextDataFile(DataFile dataFile) {
+ return (DataFile) dataFile.getNext();
+ }
+
+ public synchronized void close() throws IOException{
+ accessorPool.close();
+ storeState(false);
+ appender.close();
+ fileMap.clear();
+ controlFile.unlock();
+ controlFile.dispose();
+ }
+
+ public synchronized boolean delete() throws IOException{
+ boolean result=true;
+ for(Iterator i=fileMap.values().iterator();i.hasNext();){
+ DataFile dataFile=(DataFile) i.next();
+ result&=dataFile.delete();
+ }
+ fileMap.clear();
+ return result;
+ }
+
+ public synchronized void addInterestInFile(int file) throws IOException{
+ if(file>=0){
+ Integer key=new Integer(file);
+ DataFile dataFile=(DataFile) fileMap.get(key);
+ if(dataFile==null){
+ throw new IOException("That data file does not exist");
+ }
+ addInterestInFile(dataFile);
+ }
+ }
+
+ synchronized void addInterestInFile(DataFile dataFile){
+ if(dataFile!=null){
+ dataFile.increment();
+ }
+ }
+
+ public synchronized void removeInterestInFile(int file) throws IOException{
+ if(file>=0){
+ Integer key=new Integer(file);
+ DataFile dataFile=(DataFile) fileMap.get(key);
+ removeInterestInFile(dataFile);
+ }
+ }
+
+ synchronized void removeInterestInFile(DataFile dataFile) throws IOException{
+ if(dataFile!=null){
+ if(dataFile.decrement()<=0){
+ if(dataFile!=currentWriteFile){
+ removeDataFile(dataFile);
+ }
+ }
+ }
+ }
+
+ public synchronized void consolidateDataFiles() throws IOException{
+ List<DataFile> purgeList=new ArrayList<DataFile>();
+ for (DataFile dataFile : fileMap.values()) {
+ if(dataFile.isUnused() && dataFile != currentWriteFile){
+ purgeList.add(dataFile);
+ }
+ }
+ for (DataFile dataFile : purgeList) {
+ removeDataFile(dataFile);
+ }
+ }
+
+ private void removeDataFile(DataFile dataFile) throws IOException{
+ fileMap.remove(dataFile.getDataFileId());
+ dataFile.unlink();
+ boolean result=dataFile.delete();
+ log.debug("discarding data file "+dataFile+(result?"successful ":"failed"));
+ }
+
+ /**
+ * @return the maxFileLength
+ */
+ public int getMaxFileLength(){
+ return maxFileLength;
+ }
+
+ /**
+ * @param maxFileLength the maxFileLength to set
+ */
+ public void setMaxFileLength(int maxFileLength){
+ this.maxFileLength=maxFileLength;
+ }
+
+ public String toString(){
+ return "DataManager:("+filePrefix+")";
+ }
+
+ public synchronized Location getMark() throws IllegalStateException {
+ return mark;
+ }
+
+ public Location getNextLocation(Location location) throws IOException, IllegalStateException {
+
+
+ Location cur = null;
+ while( true ) {
+ if( cur == null ) {
+ if( location == null ) {
+ DataFile head = (DataFile) currentWriteFile.getHeadNode();
+ cur = new Location();
+ cur.setDataFileId(head.getDataFileId());
+ cur.setOffset(0);
+
+// DataFileAccessor reader = accessorPool.openDataFileAccessor(head);
+// try {
+// if( !reader.readLocationDetailsAndValidate(cur) ) {
+// return null;
+// }
+// } finally {
+// accessorPool.closeDataFileAccessor(reader);
+// }
+ } else {
+ // Set to the next offset..
+ cur = new Location(location);
+ cur.setOffset(cur.getOffset()+cur.getSize());
+ }
+ } else {
+ cur.setOffset(cur.getOffset()+cur.getSize());
+ }
+
+ DataFile dataFile = getDataFile(cur);
+
+ // Did it go into the next file??
+ if( dataFile.getLength() <= cur.getOffset() ) {
+ dataFile = getNextDataFile(dataFile);
+ if( dataFile == null ) {
+ return null;
+ } else {
+ cur.setDataFileId(dataFile.getDataFileId().intValue());
+ cur.setOffset(0);
+ }
+ }
+
+ // Load in location size and type.
+ DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
+ try {
+ reader.readLocationDetails(cur);
+ } finally {
+ accessorPool.closeDataFileAccessor(reader);
+ }
+
+ if( cur.getType() == 0 ) {
+ return null;
+ } else if( cur.getType() > 0 ) {
+ // Only return user records.
+ return cur;
+ }
+ }
+ }
+
+ public ByteSequence read(Location location) throws IOException, IllegalStateException {
+ DataFile dataFile = getDataFile(location);
+ DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
+ ByteSequence rc=null;
+ try {
+ rc = reader.readRecord(location);
+ } finally {
+ accessorPool.closeDataFileAccessor(reader);
+ }
+ return rc;
+ }
+
+ public synchronized void setMark(Location location, boolean sync) throws IOException, IllegalStateException {
+ mark = location;
+ storeState(sync);
+ }
+
+ private void storeState(boolean sync) throws IOException {
+ ByteSequence state = marshallState();
+ appender.storeItem(state, Location.MARK_TYPE, sync);
+ controlFile.store(state, sync);
+ }
+
+ public Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException {
+ return appender.storeItem(data, Location.USER_TYPE, sync);
+ }
+
+ public Location write(ByteSequence data, byte type, boolean sync) throws IOException, IllegalStateException {
+ return appender.storeItem(data, type, sync);
+ }
+
+ public void update(Location location, ByteSequence data, boolean sync) throws IOException {
+ DataFile dataFile = getDataFile(location);
+ DataFileAccessor updater = accessorPool.openDataFileAccessor(dataFile);
+ try {
+ updater.updateRecord(location, data, sync);
+ } finally {
+ accessorPool.closeDataFileAccessor(updater);
+ }
+ }
+
+ public File getDirectory() {
+ return directory;
+ }
+
+ public void setDirectory(File directory) {
+ this.directory = directory;
+ }
+
+ public String getFilePrefix() {
+ return filePrefix;
+ }
+
+ public void setFilePrefix(String filePrefix) {
+ this.filePrefix = filePrefix;
+ }
+
+ public ConcurrentHashMap<WriteKey, WriteCommand> getInflightWrites() {
+ return inflightWrites;
+ }
+
+ public Location getLastAppendLocation() {
+ return lastAppendLocation.get();
+ }
+
+ public void setLastAppendLocation(Location lastSyncedLocation) {
+ this.lastAppendLocation.set(lastSyncedLocation);
+ }
+
+
+}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ControlFile.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ControlFile.java?view=auto&rev=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ControlFile.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ControlFile.java Fri Nov 24 22:00:56 2006
@@ -0,0 +1,161 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.async;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileLock;
+
+import org.apache.activemq.util.ByteSequence;
+
+/**
+ * Use to reliably store fixed sized state data. It stores the state in
+ * record that is versioned and repeated twice in the file so that a failure in the
+ * middle of the write of the first or second record do not not result in an unknown
+ * state.
+ *
+ * @version $Revision: 1.1 $
+ */
+final public class ControlFile {
+
+ private final static boolean DISABLE_FILE_LOCK = "true".equals(System.getProperty("java.nio.channels.FileLock.broken", "false"));
+ private final File file;
+
+ /** The File that holds the control data. */
+ private final RandomAccessFile randomAccessFile;
+ private final int maxRecordSize;
+
+ private long version=0;
+ private FileLock lock;
+ private boolean disposed;
+
+
+ public ControlFile(File file, int recordSize) throws IOException {
+ this.file = file;
+ this.maxRecordSize = recordSize+4;
+ randomAccessFile = new RandomAccessFile(file, "rw");
+ }
+
+ /**
+ * Locks the control file.
+ * @throws IOException
+ */
+ public void lock() throws IOException {
+ if( DISABLE_FILE_LOCK )
+ return;
+
+ if( lock == null ) {
+ lock = randomAccessFile.getChannel().tryLock();
+ if (lock == null) {
+ throw new IOException("Control file '"+file+"' could not be locked.");
+ }
+ }
+ }
+
+ /**
+ * Un locks the control file.
+ *
+ * @throws IOException
+ */
+ public void unlock() throws IOException {
+ if( DISABLE_FILE_LOCK )
+ return;
+
+ if (lock != null) {
+ lock.release();
+ lock = null;
+ }
+ }
+
+ public void dispose() {
+ if( disposed )
+ return;
+ disposed=true;
+ try {
+ unlock();
+ } catch (IOException e) {
+ }
+ try {
+ randomAccessFile.close();
+ } catch (IOException e) {
+ }
+ }
+
+ synchronized public ByteSequence load() throws IOException {
+ long l = randomAccessFile.length();
+ if( l < maxRecordSize ) {
+ return null;
+ }
+
+ randomAccessFile.seek(0);
+ long v1 = randomAccessFile.readLong();
+ randomAccessFile.seek(maxRecordSize+8);
+ long v1check = randomAccessFile.readLong();
+
+ randomAccessFile.seek(maxRecordSize+16);
+ long v2 = randomAccessFile.readLong();
+ randomAccessFile.seek((maxRecordSize*2)+24);
+ long v2check = randomAccessFile.readLong();
+
+ byte[] data=null;
+ if( v2 == v2check ) {
+ version = v2;
+ randomAccessFile.seek(maxRecordSize+24);
+ int size = randomAccessFile.readInt();
+ data = new byte[size];
+ randomAccessFile.readFully(data);
+ } else if ( v1 == v1check ){
+ version = v1;
+ randomAccessFile.seek(maxRecordSize+8);
+ int size = randomAccessFile.readInt();
+ data = new byte[size];
+ randomAccessFile.readFully(data);
+ } else {
+ // Bummer.. Both checks are screwed. we don't know
+ // if any of the two buffer are ok. This should
+ // only happen is data got corrupted.
+ throw new IOException("Control data corrupted.");
+ }
+ return new ByteSequence(data,0,data.length);
+ }
+
+ public void store(ByteSequence data, boolean sync) throws IOException {
+
+ version++;
+ randomAccessFile.setLength((maxRecordSize*2)+32);
+ randomAccessFile.seek(0);
+
+ // Write the first copy of the control data.
+ randomAccessFile.writeLong(version);
+ randomAccessFile.writeInt(data.getLength());
+ randomAccessFile.write(data.getData());
+ randomAccessFile.writeLong(version);
+
+ // Write the second copy of the control data.
+ randomAccessFile.writeLong(version);
+ randomAccessFile.writeInt(data.getLength());
+ randomAccessFile.write(data.getData());
+ randomAccessFile.writeLong(version);
+
+ if( sync ) {
+ randomAccessFile.getFD().sync();
+ }
+ }
+
+}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java?view=auto&rev=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java Fri Nov 24 22:00:56 2006
@@ -0,0 +1,108 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.async;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+import org.apache.activemq.util.LinkedNode;
+/**
+ * DataFile
+ *
+ * @version $Revision: 1.1.1.1 $
+ */
+class DataFile extends LinkedNode implements Comparable {
+
+ private final File file;
+ private final Integer dataFileId;
+ private final int preferedSize;
+
+ int length=0;
+ private int referenceCount;
+
+ DataFile(File file, int number, int preferedSize){
+ this.file=file;
+ this.preferedSize = preferedSize;
+ this.dataFileId=new Integer(number);
+ length=(int)(file.exists()?file.length():0);
+ }
+
+ public Integer getDataFileId(){
+ return dataFileId;
+ }
+
+ public synchronized int getLength(){
+ return length;
+ }
+ public void setLength(int length) {
+ this.length=length;
+ }
+ public synchronized void incrementLength(int size){
+ length+=size;
+ }
+
+ public synchronized int increment(){
+ return ++referenceCount;
+ }
+
+ public synchronized int decrement(){
+ return --referenceCount;
+ }
+
+ public synchronized boolean isUnused(){
+ return referenceCount<=0;
+ }
+
+ public synchronized String toString(){
+ String result = file.getName() + " number = " + dataFileId + " , length = " + length + " refCount = " + referenceCount;
+ return result;
+ }
+
+ public RandomAccessFile openRandomAccessFile(boolean appender) throws IOException {
+ RandomAccessFile rc=new RandomAccessFile(file,"rw");
+ // When we start to write files size them up so that the OS has a chance
+ // to allocate the file contigously.
+ if( appender ){
+ if( length < preferedSize ) {
+ rc.setLength(preferedSize);
+ }
+ }
+ return rc;
+ }
+
+ public void closeRandomAccessFile(RandomAccessFile file) throws IOException {
+ // On close set the file size to the real size.
+ if( length != file.length() ) {
+ file.setLength(getLength());
+ file.close();
+ }
+ }
+
+ public synchronized boolean delete() throws IOException{
+ return file.delete();
+ }
+
+ public int compareTo(Object o) {
+ DataFile df = (DataFile) o;
+ return dataFileId - df.dataFileId;
+ }
+
+
+
+}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessor.java?view=auto&rev=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessor.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessor.java Fri Nov 24 22:00:56 2006
@@ -0,0 +1,143 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.async;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteCommand;
+import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteKey;
+import org.apache.activemq.util.ByteSequence;
+/**
+ * Optimized Store reader and updater. Single threaded and synchronous. Use in conjunction
+ * with the DataFileAccessorPool of concurrent use.
+ *
+ * @version $Revision: 1.1.1.1 $
+ */
+final class DataFileAccessor {
+
+ private final DataFile dataFile;
+ private final ConcurrentHashMap<WriteKey, WriteCommand> inflightWrites;
+ private final RandomAccessFile file;
+ private boolean disposed;
+
+ /**
+ * Construct a Store reader
+ *
+ * @param file
+ * @throws IOException
+ */
+ public DataFileAccessor(AsyncDataManager dataManager, DataFile dataFile) throws IOException{
+ this.dataFile = dataFile;
+ this.inflightWrites = dataManager.getInflightWrites();
+ this.file = dataFile.openRandomAccessFile(false);
+ }
+
+ public DataFile getDataFile() {
+ return dataFile;
+ }
+
+ public void dispose() {
+ if( disposed )
+ return;
+ disposed=true;
+ try {
+ dataFile.closeRandomAccessFile(file);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public ByteSequence readRecord(Location location) throws IOException {
+
+ if( !location.isValid() || location.getSize()==Location.NOT_SET )
+ throw new IOException("Invalid location: "+location);
+
+ WriteCommand asyncWrite = (WriteCommand) inflightWrites.get(new WriteKey(location));
+ if( asyncWrite!= null ) {
+ return asyncWrite.data;
+ }
+
+ try {
+ byte[] data=new byte[location.getSize()-AsyncDataManager.ITEM_HEAD_FOOT_SPACE];
+ file.seek(location.getOffset()+AsyncDataManager.ITEM_HEAD_SPACE);
+ file.readFully(data);
+ return new ByteSequence(data, 0, data.length);
+ } catch (RuntimeException e) {
+ throw new IOException("Invalid location: "+location+", : "+e);
+ }
+ }
+
+ public void readLocationDetails(Location location) throws IOException {
+ WriteCommand asyncWrite = (WriteCommand) inflightWrites.get(new WriteKey(location));
+ if( asyncWrite!= null ) {
+ location.setSize(asyncWrite.location.getSize());
+ location.setType(asyncWrite.location.getType());
+ } else {
+ file.seek(location.getOffset());
+ location.setSize(file.readInt());
+ location.setType(file.readByte());
+ }
+ }
+
+ public boolean readLocationDetailsAndValidate(Location location) {
+ try {
+ WriteCommand asyncWrite = (WriteCommand) inflightWrites.get(new WriteKey(location));
+ if( asyncWrite!= null ) {
+ location.setSize(asyncWrite.location.getSize());
+ location.setType(asyncWrite.location.getType());
+ } else {
+ file.seek(location.getOffset());
+ location.setSize(file.readInt());
+ location.setType(file.readByte());
+
+ byte data[] = new byte[3];
+ file.seek(location.getOffset()+AsyncDataManager.ITEM_HEAD_OFFSET_TO_SOR);
+ file.readFully(data);
+ if( data[0] != AsyncDataManager.ITEM_HEAD_SOR[0] ||
+ data[1] != AsyncDataManager.ITEM_HEAD_SOR[1] ||
+ data[2] != AsyncDataManager.ITEM_HEAD_SOR[2] ) {
+ return false;
+ }
+ file.seek(location.getOffset()+location.getSize()-AsyncDataManager.ITEM_FOOT_SPACE);
+ file.readFully(data);
+ if( data[0] != AsyncDataManager.ITEM_HEAD_EOR[0] ||
+ data[1] != AsyncDataManager.ITEM_HEAD_EOR[1] ||
+ data[2] != AsyncDataManager.ITEM_HEAD_EOR[2] ) {
+ return false;
+ }
+ }
+ } catch (IOException e) {
+ return false;
+ }
+ return true;
+ }
+
+ public void updateRecord(Location location, ByteSequence data, boolean sync) throws IOException {
+
+ file.seek(location.getOffset()+AsyncDataManager.ITEM_HEAD_SPACE);
+ int size = Math.min(data.getLength(), location.getSize());
+ file.write(data.getData(), data.getOffset(), size);
+ if( sync ) {
+ file.getFD().sync();
+ }
+
+ }
+
+}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java?view=auto&rev=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java Fri Nov 24 22:00:56 2006
@@ -0,0 +1,138 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.async;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+
+/**
+ * Used to pool DataFileAccessors.
+ *
+ * @author chirino
+ */
+public class DataFileAccessorPool {
+
+ private final AsyncDataManager dataManager;
+ private final HashMap<Integer, Pool> pools = new HashMap<Integer, Pool>();
+ private boolean closed=false;
+
+ int MAX_OPEN_READERS_PER_FILE=5;
+
+ class Pool {
+ private final DataFile file;
+ private final ArrayList<DataFileAccessor> pool = new ArrayList<DataFileAccessor>();
+ private boolean used;
+
+ public Pool(DataFile file) {
+ this.file = file;
+ }
+
+ public DataFileAccessor openDataFileReader() throws IOException {
+ DataFileAccessor rc=null;
+ if( pool.isEmpty() ) {
+ rc = new DataFileAccessor(dataManager, file);
+ } else {
+ rc = (DataFileAccessor) pool.remove(pool.size()-1);
+ }
+ used=true;
+ return rc;
+ }
+
+ public void closeDataFileReader(DataFileAccessor reader) {
+ used=true;
+ if(pool.size() >= MAX_OPEN_READERS_PER_FILE ) {
+ reader.dispose();
+ } else {
+ pool.add(reader);
+ }
+ }
+
+ public void clearUsedMark() {
+ used=false;
+ }
+
+ public boolean isUsed() {
+ return used;
+ }
+
+ public void dispose() {
+ for (DataFileAccessor reader : pool) {
+ reader.dispose();
+ }
+ pool.clear();
+ }
+
+ }
+
+ public DataFileAccessorPool(AsyncDataManager dataManager){
+ this.dataManager=dataManager;
+ }
+
+ synchronized void clearUsedMark() {
+ for (Iterator iter = pools.values().iterator(); iter.hasNext();) {
+ Pool pool = (Pool) iter.next();
+ pool.clearUsedMark();
+ }
+ }
+
+ synchronized void disposeUnused() {
+ for (Iterator<Pool> iter = pools.values().iterator(); iter.hasNext();) {
+ Pool pool = iter.next();
+ if( !pool.isUsed() ) {
+ pool.dispose();
+ iter.remove();
+ }
+ }
+ }
+
+ synchronized DataFileAccessor openDataFileAccessor(DataFile dataFile) throws IOException {
+ if( closed ) {
+ throw new IOException("Closed.");
+ }
+
+ Pool pool = pools.get(dataFile.getDataFileId());
+ if( pool == null ) {
+ pool = new Pool(dataFile);
+ pools.put(dataFile.getDataFileId(), pool);
+ }
+ return pool.openDataFileReader();
+ }
+
+ synchronized void closeDataFileAccessor(DataFileAccessor reader) {
+ Pool pool = pools.get(reader.getDataFile().getDataFileId());
+ if( pool == null || closed ) {
+ reader.dispose();
+ } else {
+ pool.closeDataFileReader(reader);
+ }
+ }
+
+ synchronized public void close() {
+ if(closed)
+ return;
+ closed=true;
+ for (Iterator<Pool> iter = pools.values().iterator(); iter.hasNext();) {
+ Pool pool = iter.next();
+ pool.dispose();
+ }
+ pools.clear();
+ }
+
+}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java?view=auto&rev=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java Fri Nov 24 22:00:56 2006
@@ -0,0 +1,380 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.async;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.RandomAccessFile;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.DataByteArrayOutputStream;
+import org.apache.activemq.util.LinkedNode;
+
+/**
+ * An optimized writer to do batch appends to a data file. This object is thread safe
+ * and gains throughput as you increase the number of concurrent writes it does.
+ *
+ * @version $Revision: 1.1.1.1 $
+ */
+class DataFileAppender {
+
+ protected static final byte []RESERVED_SPACE= new byte[AsyncDataManager.ITEM_HEAD_RESERVED_SPACE];
+ protected static final String SHUTDOWN_COMMAND = "SHUTDOWN";
+ int MAX_WRITE_BATCH_SIZE = 1024*1024*4;
+
+ static public class WriteKey {
+ private final int file;
+ private final long offset;
+ private final int hash;
+
+ public WriteKey(Location item){
+ file = item.getDataFileId();
+ offset = item.getOffset();
+ // TODO: see if we can build a better hash
+ hash = (int) (file ^ offset);
+ }
+
+ public int hashCode() {
+ return hash;
+ }
+
+ public boolean equals(Object obj) {
+ WriteKey di = (WriteKey)obj;
+ return di.file == file && di.offset == offset;
+ }
+ }
+
+ public class WriteBatch {
+
+ public final DataFile dataFile;
+ public final WriteCommand first;
+ public CountDownLatch latch;
+ public int size;
+
+ public WriteBatch(DataFile dataFile, WriteCommand write) throws IOException {
+ this.dataFile=dataFile;
+ this.first=write;
+ size+=write.location.getSize();
+ if( write.sync ) {
+ latch = new CountDownLatch(1);
+ }
+ }
+
+ public boolean canAppend(DataFile dataFile, WriteCommand write) {
+ if( dataFile != this.dataFile )
+ return false;
+ if( size+write.location.getSize() >= MAX_WRITE_BATCH_SIZE )
+ return false;
+ return true;
+ }
+
+ public void append(WriteCommand write) throws IOException {
+ this.first.getTailNode().linkAfter(write);
+ size+=write.location.getSize();
+ if( write.sync && latch==null ) {
+ latch = new CountDownLatch(1);
+ }
+ }
+ }
+
+ public static class WriteCommand extends LinkedNode {
+ public final Location location;
+ public final ByteSequence data;
+ final boolean sync;
+
+ public WriteCommand(Location location, ByteSequence data, boolean sync) {
+ this.location = location;
+ this.data = data;
+ this.sync = sync;
+ }
+ }
+
+ protected final AsyncDataManager dataManager;
+
+ protected final ConcurrentHashMap<WriteKey, WriteCommand> inflightWrites;
+
+ protected final Object enqueueMutex = new Object();
+ protected WriteBatch nextWriteBatch;
+
+ private boolean running;
+ protected boolean shutdown;
+ protected IOException firstAsyncException;
+ protected final CountDownLatch shutdownDone = new CountDownLatch(1);
+ private Thread thread;
+
+ /**
+ * Construct a Store writer
+ *
+ * @param file
+ */
+ public DataFileAppender(AsyncDataManager dataManager){
+ this.dataManager=dataManager;
+ this.inflightWrites = this.dataManager.getInflightWrites();
+ }
+
+ /**
+ * @param type
+ * @param marshaller
+ * @param payload
+ * @param type
+ * @param sync
+ * @return
+ * @throws IOException
+ * @throws
+ * @throws
+ */
+ public Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException {
+
+ // Write the packet our internal buffer.
+ int size = data.getLength()+AsyncDataManager.ITEM_HEAD_FOOT_SPACE;
+
+ final Location location=new Location();
+ location.setSize(size);
+ location.setType(type);
+
+ WriteBatch batch;
+ WriteCommand write = new WriteCommand(location, data, sync);
+
+ // Locate datafile and enqueue into the executor in sychronized block so that
+ // writes get equeued onto the executor in order that they were assigned by
+ // the data manager (which is basically just appending)
+
+ synchronized(this) {
+ // Find the position where this item will land at.
+ DataFile dataFile=dataManager.allocateLocation(location);
+ batch = enqueue(dataFile, write);
+ }
+
+ if( sync ) {
+ try {
+ batch.latch.await();
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException();
+ }
+ } else {
+ inflightWrites.put(new WriteKey(location), write);
+ }
+
+ return location;
+ }
+
+ private WriteBatch enqueue(DataFile dataFile, WriteCommand write) throws IOException {
+ synchronized(enqueueMutex) {
+ WriteBatch rc=null;
+ if( shutdown ) {
+ throw new IOException("Async Writter Thread Shutdown");
+ }
+ if( firstAsyncException !=null )
+ throw firstAsyncException;
+
+ if( !running ) {
+ running=true;
+ thread = new Thread() {
+ public void run() {
+ processQueue();
+ }
+ };
+ thread.setPriority(Thread.MAX_PRIORITY);
+ thread.setDaemon(true);
+ thread.setName("ActiveMQ Data File Writer");
+ thread.start();
+ }
+
+ if( nextWriteBatch == null ) {
+ nextWriteBatch = new WriteBatch(dataFile,write);
+ rc = nextWriteBatch;
+ enqueueMutex.notify();
+ } else {
+ // Append to current batch if possible..
+ if( nextWriteBatch.canAppend(dataFile, write) ) {
+ nextWriteBatch.append(write);
+ rc = nextWriteBatch;
+ } else {
+ // Otherwise wait for the queuedCommand to be null
+ try {
+ while( nextWriteBatch!=null ) {
+ enqueueMutex.wait();
+ }
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException();
+ }
+ if( shutdown ) {
+ throw new IOException("Async Writter Thread Shutdown");
+ }
+
+ // Start a new batch.
+ nextWriteBatch = new WriteBatch(dataFile,write);
+ rc = nextWriteBatch;
+ enqueueMutex.notify();
+ }
+ }
+ return rc;
+ }
+ }
+
+ public void close() throws IOException {
+ synchronized( enqueueMutex ) {
+ if( shutdown == false ) {
+ shutdown = true;
+ if( running ) {
+ enqueueMutex.notifyAll();
+ } else {
+ shutdownDone.countDown();
+ }
+ }
+ }
+
+ try {
+ shutdownDone.await();
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException();
+ }
+
+ }
+
+ /**
+ * The async processing loop that writes to the data files and
+ * does the force calls.
+ *
+ * Since the file sync() call is the slowest of all the operations,
+ * this algorithm tries to 'batch' or group together several file sync() requests
+ * into a single file sync() call. The batching is accomplished attaching the
+ * same CountDownLatch instance to every force request in a group.
+ *
+ */
+ protected void processQueue() {
+ DataFile dataFile=null;
+ RandomAccessFile file=null;
+ try {
+
+ DataByteArrayOutputStream buff = new DataByteArrayOutputStream(MAX_WRITE_BATCH_SIZE);
+ while( true ) {
+
+ Object o = null;
+
+ // Block till we get a command.
+ synchronized(enqueueMutex) {
+ while( true ) {
+ if( shutdown ) {
+ o = SHUTDOWN_COMMAND;
+ break;
+ }
+ if( nextWriteBatch!=null ) {
+ o = nextWriteBatch;
+ nextWriteBatch=null;
+ break;
+ }
+ enqueueMutex.wait();
+ }
+ enqueueMutex.notify();
+ }
+
+
+ if( o == SHUTDOWN_COMMAND ) {
+ break;
+ }
+
+ WriteBatch wb = (WriteBatch) o;
+ if( dataFile != wb.dataFile ) {
+ if( file!=null ) {
+ dataFile.closeRandomAccessFile(file);
+ }
+ dataFile = wb.dataFile;
+ file = dataFile.openRandomAccessFile(true);
+ }
+
+ WriteCommand write = wb.first;
+
+ // Write all the data.
+ // Only need to seek to first location.. all others
+ // are in sequence.
+ file.seek(write.location.getOffset());
+
+ //
+ // is it just 1 big write?
+ if( wb.size == write.location.getSize() ) {
+
+ // Just write it directly..
+ file.writeInt(write.location.getSize());
+ file.writeByte(write.location.getType());
+ file.write(RESERVED_SPACE);
+ file.write(AsyncDataManager.ITEM_HEAD_SOR);
+ file.write(write.data.getData(),write.data.getOffset(), write.data.getLength());
+ file.write(AsyncDataManager.ITEM_HEAD_EOR);
+
+ } else {
+
+ // Combine the smaller writes into 1 big buffer
+ while( write!=null ) {
+
+ buff.writeInt(write.location.getSize());
+ buff.writeByte(write.location.getType());
+ buff.write(RESERVED_SPACE);
+ buff.write(AsyncDataManager.ITEM_HEAD_SOR);
+ buff.write(write.data.getData(),write.data.getOffset(), write.data.getLength());
+ buff.write(AsyncDataManager.ITEM_HEAD_EOR);
+
+ write = (WriteCommand) write.getNext();
+ }
+
+ // Now do the 1 big write.
+ ByteSequence sequence = buff.toByteSequence();
+ file.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
+ buff.reset();
+ }
+
+ file.getFD().sync();
+
+ WriteCommand lastWrite = (WriteCommand) wb.first.getTailNode();
+ dataManager.setLastAppendLocation( lastWrite.location );
+
+ // Signal any waiting threads that the write is on disk.
+ if( wb.latch!=null ) {
+ wb.latch.countDown();
+ }
+
+ // Now that the data is on disk, remove the writes from the in flight
+ // cache.
+ write = wb.first;
+ while( write!=null ) {
+ if( !write.sync ) {
+ inflightWrites.remove(new WriteKey(write.location));
+ }
+ write = (WriteCommand) write.getNext();
+ }
+ }
+
+ } catch (IOException e) {
+ synchronized( enqueueMutex ) {
+ firstAsyncException = e;
+ }
+ } catch (InterruptedException e) {
+ } finally {
+ try {
+ if( file!=null ) {
+ dataFile.closeRandomAccessFile(file);
+ }
+ } catch (IOException e) {
+ }
+ shutdownDone.countDown();
+ }
+ }
+
+}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataManagerFacade.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataManagerFacade.java?view=auto&rev=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataManagerFacade.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataManagerFacade.java Fri Nov 24 22:00:56 2006
@@ -0,0 +1,157 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.async;
+
+import java.io.IOException;
+
+import org.apache.activemq.kaha.Marshaller;
+import org.apache.activemq.kaha.StoreLocation;
+import org.apache.activemq.kaha.impl.data.RedoListener;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.DataByteArrayInputStream;
+import org.apache.activemq.util.DataByteArrayOutputStream;
+
+/**
+ * Provides a Kaha DataManager Facade to the DataManager.
+ *
+ * @version $Revision: 1.1.1.1 $
+ */
+public final class DataManagerFacade implements org.apache.activemq.kaha.impl.DataManager {
+
+ private static class StoreLocationFacade implements StoreLocation {
+ private final Location location;
+
+ public StoreLocationFacade(Location location) {
+ this.location = location;
+ }
+
+ public int getFile() {
+ return location.getDataFileId();
+ }
+
+ public long getOffset() {
+ return location.getOffset();
+ }
+
+ public int getSize() {
+ return location.getSize();
+ }
+
+ public Location getLocation() {
+ return location;
+ }
+ }
+
+ static private StoreLocation convertToStoreLocation(Location location) {
+ if(location==null)
+ return null;
+ return new StoreLocationFacade(location);
+ }
+
+ static private Location convertFromStoreLocation(StoreLocation location) {
+
+ if(location==null)
+ return null;
+
+ if( location.getClass()== StoreLocationFacade.class )
+ return ((StoreLocationFacade)location).getLocation();
+
+ Location l = new Location();
+ l.setOffset((int) location.getOffset());
+ l.setSize(location.getSize());
+ l.setDataFileId(location.getFile());
+ return l;
+ }
+
+ static final private ByteSequence FORCE_COMMAND = new ByteSequence(new byte[]{'F', 'O', 'R', 'C', 'E'});
+
+ AsyncDataManager dataManager;
+ private final String name;
+ private Marshaller redoMarshaller;
+
+
+ public DataManagerFacade(AsyncDataManager dataManager, String name) {
+ this.dataManager=dataManager;
+ this.name = name;
+ }
+
+ public Object readItem(Marshaller marshaller, StoreLocation location) throws IOException {
+ ByteSequence sequence = dataManager.read(convertFromStoreLocation(location));
+ DataByteArrayInputStream dataIn = new DataByteArrayInputStream(sequence);
+ return marshaller.readPayload(dataIn);
+ }
+
+
+ public StoreLocation storeDataItem(Marshaller marshaller, Object payload) throws IOException {
+ final DataByteArrayOutputStream buffer = new DataByteArrayOutputStream();
+ marshaller.writePayload(payload,buffer);
+ ByteSequence data = buffer.toByteSequence();
+ return convertToStoreLocation(dataManager.write(data, (byte)1, false));
+ }
+
+
+ public void force() throws IOException {
+ dataManager.write(FORCE_COMMAND, (byte)2, true);
+ }
+
+ public void updateItem(StoreLocation location, Marshaller marshaller, Object payload) throws IOException {
+ final DataByteArrayOutputStream buffer = new DataByteArrayOutputStream();
+ marshaller.writePayload(payload,buffer);
+ ByteSequence data = buffer.toByteSequence();
+ dataManager.update(convertFromStoreLocation(location), data, false);
+ }
+
+ public void close() throws IOException {
+ dataManager.close();
+ }
+
+ public void consolidateDataFiles() throws IOException {
+ dataManager.consolidateDataFiles();
+ }
+
+ public boolean delete() throws IOException {
+ return dataManager.delete();
+ }
+
+ public void addInterestInFile(int file) throws IOException {
+ dataManager.addInterestInFile(file);
+ }
+ public void removeInterestInFile(int file) throws IOException {
+ dataManager.removeInterestInFile(file);
+ }
+
+ public void recoverRedoItems(RedoListener listener) throws IOException {
+ throw new RuntimeException("Not Implemented..");
+ }
+ public StoreLocation storeRedoItem(Object payload) throws IOException {
+ throw new RuntimeException("Not Implemented..");
+ }
+
+ public Marshaller getRedoMarshaller() {
+ return redoMarshaller;
+ }
+ public void setRedoMarshaller(Marshaller redoMarshaller) {
+ this.redoMarshaller = redoMarshaller;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+
+}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/JournalFacade.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/JournalFacade.java?view=auto&rev=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/JournalFacade.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/JournalFacade.java Fri Nov 24 22:00:56 2006
@@ -0,0 +1,108 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.async;
+
+import java.io.IOException;
+
+import org.apache.activeio.journal.InvalidRecordLocationException;
+import org.apache.activeio.journal.Journal;
+import org.apache.activeio.journal.JournalEventListener;
+import org.apache.activeio.journal.RecordLocation;
+import org.apache.activeio.packet.ByteArrayPacket;
+import org.apache.activeio.packet.Packet;
+import org.apache.activemq.util.ByteSequence;
+
+/**
+ * Provides a Journal Facade to the DataManager.
+ *
+ * @version $Revision: 1.1.1.1 $
+ */
+public final class JournalFacade implements Journal {
+
+
+ public static class RecordLocationFacade implements RecordLocation {
+ private final Location location;
+
+ public RecordLocationFacade(Location location) {
+ this.location = location;
+ }
+
+ public Location getLocation() {
+ return location;
+ }
+
+ public int compareTo(Object o) {
+ RecordLocationFacade rlf = (RecordLocationFacade)o;
+ int rc = location.compareTo(rlf.location);
+ return rc;
+ }
+ }
+
+ static private RecordLocation convertToRecordLocation(Location location) {
+ if(location==null)
+ return null;
+ return new RecordLocationFacade(location);
+ }
+
+ static private Location convertFromRecordLocation(RecordLocation location) {
+
+ if(location==null)
+ return null;
+
+ return ((RecordLocationFacade)location).getLocation();
+ }
+
+ AsyncDataManager dataManager;
+
+ public JournalFacade(AsyncDataManager dataManager) {
+ this.dataManager = dataManager;
+ }
+
+ public void close() throws IOException {
+ dataManager.close();
+ }
+
+ public RecordLocation getMark() throws IllegalStateException {
+ return convertToRecordLocation(dataManager.getMark());
+ }
+
+ public RecordLocation getNextRecordLocation(RecordLocation location) throws InvalidRecordLocationException, IOException, IllegalStateException {
+ return convertToRecordLocation(dataManager.getNextLocation(convertFromRecordLocation(location)));
+ }
+
+ public Packet read(RecordLocation location) throws InvalidRecordLocationException, IOException, IllegalStateException {
+ ByteSequence rc = dataManager.read(convertFromRecordLocation(location));
+ if( rc == null )
+ return null;
+ return new ByteArrayPacket(rc.getData(), rc.getOffset(), rc.getLength());
+ }
+
+ public void setJournalEventListener(JournalEventListener listener) throws IllegalStateException {
+ }
+
+ public void setMark(RecordLocation location, boolean sync) throws InvalidRecordLocationException, IOException, IllegalStateException {
+ dataManager.setMark(convertFromRecordLocation(location), sync);
+ }
+
+ public RecordLocation write(Packet packet, boolean sync) throws IOException, IllegalStateException {
+ org.apache.activeio.packet.ByteSequence data = packet.asByteSequence();
+ ByteSequence sequence = new ByteSequence(data.getData(), data.getOffset(), data.getLength());
+ return convertToRecordLocation(dataManager.write(sequence, sync));
+ }
+
+}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/Location.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/Location.java?view=auto&rev=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/Location.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/Location.java Fri Nov 24 22:00:56 2006
@@ -0,0 +1,126 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.async;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Used as a location in the data store.
+ *
+ * @version $Revision: 1.2 $
+ */
+public final class Location {
+
+ public static final byte MARK_TYPE=-1;
+ public static final byte USER_TYPE=1;
+ public static final byte NOT_SET_TYPE=0;
+ public static final int NOT_SET=-1;
+
+ private int dataFileId=NOT_SET;
+ private int offset=NOT_SET;
+ private int size=NOT_SET;
+ private byte type=NOT_SET_TYPE;
+
+ public Location(){}
+
+ Location(Location item) {
+ this.dataFileId = item.dataFileId;
+ this.offset = item.offset;
+ this.size = item.size;
+ this.type = item.type;
+ }
+
+ boolean isValid(){
+ return dataFileId != NOT_SET;
+ }
+
+ /**
+ * @return the size of the data record including the header.
+ */
+ public int getSize(){
+ return size;
+ }
+
+ /**
+ * @param size the size of the data record including the header.
+ */
+ public void setSize(int size){
+ this.size=size;
+ }
+
+ /**
+ * @return the size of the payload of the record.
+ */
+ public int getPaylodSize() {
+ return size-AsyncDataManager.ITEM_HEAD_FOOT_SPACE;
+ }
+
+ public int getOffset(){
+ return offset;
+ }
+ public void setOffset(int offset){
+ this.offset=offset;
+ }
+
+ public int getDataFileId(){
+ return dataFileId;
+ }
+
+ public void setDataFileId(int file){
+ this.dataFileId=file;
+ }
+
+ public byte getType() {
+ return type;
+ }
+
+ public void setType(byte type) {
+ this.type = type;
+ }
+
+ public String toString(){
+ String result="offset = "+offset+", file = " + dataFileId + ", size = "+size + ", type = "+type;
+ return result;
+ }
+
+ public int compareTo(Object o) {
+ Location l = (Location)o;
+ if( dataFileId == l.dataFileId ) {
+ int rc = offset-l.offset;
+ return rc;
+ }
+ return dataFileId - l.dataFileId;
+ }
+
+ public void writeExternal(DataOutput dos) throws IOException {
+ dos.writeInt(dataFileId);
+ dos.writeInt(offset);
+ dos.writeInt(size);
+ dos.writeByte(type);
+ }
+
+ public void readExternal(DataInput dis) throws IOException {
+ dataFileId = dis.readInt();
+ offset = dis.readInt();
+ size = dis.readInt();
+ type = dis.readByte();
+ }
+
+}
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/Location.java
------------------------------------------------------------------------------
svn:executable = *
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java?view=auto&rev=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java Fri Nov 24 22:00:56 2006
@@ -0,0 +1,213 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.async;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+/**
+ * An AsyncDataFileAppender that uses NIO ByteBuffers and File chanels to more efficently
+ * copy data to files.
+ *
+ * @version $Revision: 1.1.1.1 $
+ */
+class NIODataFileAppender extends DataFileAppender {
+
+ public NIODataFileAppender(AsyncDataManager fileManager) {
+ super(fileManager);
+ }
+
+ /**
+ * The async processing loop that writes to the data files and
+ * does the force calls.
+ *
+ * Since the file sync() call is the slowest of all the operations,
+ * this algorithm tries to 'batch' or group together several file sync() requests
+ * into a single file sync() call. The batching is accomplished attaching the
+ * same CountDownLatch instance to every force request in a group.
+ *
+ */
+ protected void processQueue() {
+ DataFile dataFile=null;
+ RandomAccessFile file=null;
+ FileChannel channel=null;
+
+ try {
+
+ ByteBuffer header = ByteBuffer.allocateDirect(AsyncDataManager.ITEM_HEAD_SPACE);
+ ByteBuffer footer = ByteBuffer.allocateDirect(AsyncDataManager.ITEM_FOOT_SPACE);
+ ByteBuffer buffer = ByteBuffer.allocateDirect(MAX_WRITE_BATCH_SIZE);
+
+ // Populate the static parts of the headers and footers..
+ header.putInt(0); // size
+ header.put((byte) 0); // type
+ header.put(RESERVED_SPACE); // reserved
+ header.put(AsyncDataManager.ITEM_HEAD_SOR);
+ footer.put(AsyncDataManager.ITEM_HEAD_EOR);
+
+ while( true ) {
+
+ Object o = null;
+
+ // Block till we get a command.
+ synchronized(enqueueMutex) {
+ while( true ) {
+ if( shutdown ) {
+ o = SHUTDOWN_COMMAND;
+ break;
+ }
+ if( nextWriteBatch!=null ) {
+ o = nextWriteBatch;
+ nextWriteBatch=null;
+ break;
+ }
+ enqueueMutex.wait();
+ }
+ enqueueMutex.notify();
+ }
+
+
+ if( o == SHUTDOWN_COMMAND ) {
+ break;
+ }
+
+ WriteBatch wb = (WriteBatch) o;
+ if( dataFile != wb.dataFile ) {
+ if( file!=null ) {
+ dataFile.closeRandomAccessFile(file);
+ }
+ dataFile = wb.dataFile;
+ file = dataFile.openRandomAccessFile(true);
+ channel = file.getChannel();
+ }
+
+ WriteCommand write = wb.first;
+
+ // Write all the data.
+ // Only need to seek to first location.. all others
+ // are in sequence.
+ file.seek(write.location.getOffset());
+
+ //
+ // is it just 1 big write?
+ if( wb.size == write.location.getSize() ) {
+
+ header.clear();
+ header.putInt(write.location.getSize());
+ header.put(write.location.getType());
+ header.clear();
+ transfer(header, channel);
+ ByteBuffer source = ByteBuffer.wrap(write.data.getData(), write.data.getOffset(), write.data.getLength());
+ transfer(source, channel);
+ footer.clear();
+ transfer(footer, channel);
+
+ } else {
+
+ // Combine the smaller writes into 1 big buffer
+ while( write!=null ) {
+
+ header.clear();
+ header.putInt(write.location.getSize());
+ header.put(write.location.getType());
+ header.clear();
+ copy(header, buffer);
+ assert !header.hasRemaining();
+
+ ByteBuffer source = ByteBuffer.wrap(write.data.getData(), write.data.getOffset(), write.data.getLength());
+ copy(source, buffer);
+ assert !source.hasRemaining();
+
+ footer.clear();
+ copy(footer, buffer);
+ assert !footer.hasRemaining();
+
+ write = (WriteCommand) write.getNext();
+ }
+
+ // Fully write out the buffer..
+ buffer.flip();
+ transfer(buffer, channel);
+ buffer.clear();
+ }
+
+ file.getChannel().force(false);
+
+ WriteCommand lastWrite = (WriteCommand) wb.first.getTailNode();
+ dataManager.setLastAppendLocation( lastWrite.location );
+
+ // Signal any waiting threads that the write is on disk.
+ if( wb.latch!=null ) {
+ wb.latch.countDown();
+ }
+
+ // Now that the data is on disk, remove the writes from the in flight
+ // cache.
+ write = wb.first;
+ while( write!=null ) {
+ if( !write.sync ) {
+ inflightWrites.remove(new WriteKey(write.location));
+ }
+ write = (WriteCommand) write.getNext();
+ }
+ }
+
+ } catch (IOException e) {
+ synchronized( enqueueMutex ) {
+ firstAsyncException = e;
+ }
+ } catch (InterruptedException e) {
+ } finally {
+ try {
+ if( file!=null ) {
+ dataFile.closeRandomAccessFile(file);
+ }
+ } catch (IOException e) {
+ }
+ shutdownDone.countDown();
+ }
+ }
+
+ /**
+ * Copy the bytes in header to the channel.
+ * @param header - source of data
+ * @param channel - destination where the data will be written.
+ * @throws IOException
+ */
+ private void transfer(ByteBuffer header, FileChannel channel) throws IOException {
+ while (header.hasRemaining()) {
+ channel.write(header);
+ }
+ }
+
+ private int copy(ByteBuffer src, ByteBuffer dest) {
+ int rc = Math.min(dest.remaining(), src.remaining());
+ if( rc > 0 ) {
+ // Adjust our limit so that we don't overflow the dest buffer.
+ int limit = src.limit();
+ src.limit(src.position()+rc);
+ dest.put(src);
+ // restore the limit.
+ src.limit(limit);
+ }
+ return rc;
+ }
+
+}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java?view=diff&rev=479089&r1=479088&r2=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java Fri Nov 24 22:00:56 2006
@@ -24,7 +24,7 @@
import org.apache.activemq.kaha.IndexTypes;
import org.apache.activemq.kaha.RuntimeStoreException;
import org.apache.activemq.kaha.StoreEntry;
-import org.apache.activemq.kaha.impl.data.DataManager;
+import org.apache.activemq.kaha.impl.DataManager;
import org.apache.activemq.kaha.impl.data.Item;
import org.apache.activemq.kaha.impl.index.DiskIndexLinkedList;
import org.apache.activemq.kaha.impl.index.IndexItem;
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java?view=diff&rev=479089&r1=479088&r2=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java Fri Nov 24 22:00:56 2006
@@ -27,7 +27,7 @@
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.kaha.StoreLocation;
-import org.apache.activemq.kaha.impl.data.DataManager;
+import org.apache.activemq.kaha.impl.DataManager;
import org.apache.activemq.kaha.impl.data.Item;
import org.apache.activemq.kaha.impl.index.IndexItem;
import org.apache.activemq.kaha.impl.index.IndexManager;
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java?view=diff&rev=479089&r1=479088&r2=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java Fri Nov 24 22:00:56 2006
@@ -32,7 +32,7 @@
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.kaha.StoreLocation;
-import org.apache.activemq.kaha.impl.data.DataManager;
+import org.apache.activemq.kaha.impl.DataManager;
import org.apache.activemq.kaha.impl.data.Item;
import org.apache.activemq.kaha.impl.index.IndexItem;
import org.apache.activemq.kaha.impl.index.IndexLinkedList;