You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/11/07 20:37:24 UTC

svn commit: r1406770 - in /activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data: ./ DataFile.java DataItem.java DataManagerImpl.java Item.java RedoListener.java SyncDataFileReader.java SyncDataFileWriter.java

Author: chirino
Date: Wed Nov  7 19:37:24 2012
New Revision: 1406770

URL: http://svn.apache.org/viewvc?rev=1406770&view=rev
Log:
Add files that were part of my .gitignore.

Added:
    activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/
    activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java
    activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/DataItem.java   (with props)
    activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java
    activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/Item.java   (with props)
    activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/RedoListener.java
    activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileReader.java
    activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java

Added: activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java?rev=1406770&view=auto
==============================================================================
--- activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java (added)
+++ activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java Wed Nov  7 19:37:24 2012
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.data;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+/**
+ * DataFile
+ * 
+ * 
+ */
+class DataFile {
+
+    private final File file;
+    private final Integer number;
+    private int referenceCount;
+    private RandomAccessFile randomAcessFile;
+    private Object writerData;
+    private long length;
+    private boolean dirty;
+
+    DataFile(File file, int number) {
+        this.file = file;
+        this.number = Integer.valueOf(number);
+        length = file.exists() ? file.length() : 0;
+    }
+
+    Integer getNumber() {
+        return number;
+    }
+
+    synchronized RandomAccessFile getRandomAccessFile() throws FileNotFoundException {
+        if (randomAcessFile == null) {
+            randomAcessFile = new RandomAccessFile(file, "rw");
+        }
+        return randomAcessFile;
+    }
+
+    synchronized long getLength() {
+        return length;
+    }
+
+    synchronized void incrementLength(int size) {
+        length += size;
+    }
+
+    synchronized void purge() throws IOException {
+        if (randomAcessFile != null) {
+            randomAcessFile.close();
+            randomAcessFile = null;
+        }
+    }
+
+    synchronized boolean delete() throws IOException {
+        purge();
+        return file.delete();
+    }
+
+    synchronized void close() throws IOException {
+        if (randomAcessFile != null) {
+            randomAcessFile.close();
+        }
+    }
+
+    synchronized int increment() {
+        return ++referenceCount;
+    }
+
+    synchronized int decrement() {
+        return --referenceCount;
+    }
+
+    synchronized boolean isUnused() {
+        return referenceCount <= 0;
+    }
+
+    public String toString() {
+        String result = file.getName() + " number = " + number + " , length = " + length + " refCount = " + referenceCount;
+        return result;
+    }
+
+    /**
+     * @return Opaque data that a DataFileWriter may want to associate with the
+     *         DataFile.
+     */
+    public synchronized Object getWriterData() {
+        return writerData;
+    }
+
+    /**
+     * @param writerData - Opaque data that a DataFileWriter may want to
+     *                associate with the DataFile.
+     */
+    public synchronized void setWriterData(Object writerData) {
+        this.writerData = writerData;
+        dirty = true;
+    }
+
+    public synchronized boolean isDirty() {
+        return dirty;
+    }
+
+    public synchronized void setDirty(boolean value) {
+        this.dirty = value;
+    }
+
+}

Added: activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/DataItem.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/DataItem.java?rev=1406770&view=auto
==============================================================================
--- activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/DataItem.java (added)
+++ activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/DataItem.java Wed Nov  7 19:37:24 2012
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.data;
+
+import org.apache.activemq.kaha.StoreLocation;
+
+/**
+ * A a wrapper for a data in the store
+ * 
+ * 
+ */
+public final class DataItem implements Item, StoreLocation {
+
+    private int file = (int)POSITION_NOT_SET;
+    private long offset = POSITION_NOT_SET;
+    private int size;
+
+    public DataItem() {
+    }
+
+    DataItem(DataItem item) {
+        this.file = item.file;
+        this.offset = item.offset;
+        this.size = item.size;
+    }
+
+    boolean isValid() {
+        return file != POSITION_NOT_SET;
+    }
+
+    /**
+     * @return
+     * @see org.apache.activemq.kaha.StoreLocation#getSize()
+     */
+    public int getSize() {
+        return size;
+    }
+
+    /**
+     * @param size The size to set.
+     */
+    public void setSize(int size) {
+        this.size = size;
+    }
+
+    /**
+     * @return
+     * @see org.apache.activemq.kaha.StoreLocation#getOffset()
+     */
+    public long getOffset() {
+        return offset;
+    }
+
+    /**
+     * @param offset The offset to set.
+     */
+    public void setOffset(long offset) {
+        this.offset = offset;
+    }
+
+    /**
+     * @return
+     * @see org.apache.activemq.kaha.StoreLocation#getFile()
+     */
+    public int getFile() {
+        return file;
+    }
+
+    /**
+     * @param file The file to set.
+     */
+    public void setFile(int file) {
+        this.file = file;
+    }
+
+    /**
+     * @return a pretty print
+     */
+    public String toString() {
+        String result = "offset = " + offset + ", file = " + file + ", size = " + size;
+        return result;
+    }
+
+    public DataItem copy() {
+        return new DataItem(this);
+    }
+}

Propchange: activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/DataItem.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java?rev=1406770&view=auto
==============================================================================
--- activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java (added)
+++ activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java Wed Nov  7 19:37:24 2012
@@ -0,0 +1,408 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.data;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.kaha.Marshaller;
+import org.apache.activemq.kaha.StoreLocation;
+import org.apache.activemq.kaha.impl.DataManager;
+import org.apache.activemq.kaha.impl.index.RedoStoreIndexItem;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.IOHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages DataFiles
+ * 
+ * 
+ */
+public final class DataManagerImpl implements DataManager {
+
+    public static final int ITEM_HEAD_SIZE = 5; // type + length
+    public static final byte DATA_ITEM_TYPE = 1;
+    public static final byte REDO_ITEM_TYPE = 2;
+    public static final long MAX_FILE_LENGTH = 1024 * 1024 * 32;
+    
+    private static final Logger LOG = LoggerFactory.getLogger(DataManagerImpl.class);
+    private static final String NAME_PREFIX = "data-";
+    
+    private final File directory;
+    private final String name;
+    private SyncDataFileReader reader;
+    private SyncDataFileWriter writer;
+    private DataFile currentWriteFile;
+    private long maxFileLength = MAX_FILE_LENGTH;
+    private Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
+    private Marshaller redoMarshaller = RedoStoreIndexItem.MARSHALLER;
+    private String dataFilePrefix;
+    private final AtomicLong storeSize;
+
+    public DataManagerImpl(File dir, final String name,AtomicLong storeSize) {
+        this.directory = dir;
+        this.name = name;
+        this.storeSize=storeSize;
+
+        dataFilePrefix = IOHelper.toFileSystemSafeName(NAME_PREFIX + name + "-");
+        // build up list of current dataFiles
+        File[] files = dir.listFiles(new FilenameFilter() {
+            public boolean accept(File dir, String n) {
+                return dir.equals(directory) && n.startsWith(dataFilePrefix);
+            }
+        });
+        if (files != null) {
+            for (int i = 0; i < files.length; i++) {
+                File file = files[i];
+                String n = file.getName();
+                String numStr = n.substring(dataFilePrefix.length(), n.length());
+                int num = Integer.parseInt(numStr);
+                DataFile dataFile = new DataFile(file, num);
+                storeSize.addAndGet(dataFile.getLength());
+                fileMap.put(dataFile.getNumber(), dataFile);
+                if (currentWriteFile == null || currentWriteFile.getNumber().intValue() < num) {
+                    currentWriteFile = dataFile;
+                }
+            }
+        }
+    }
+
+    private DataFile createAndAddDataFile(int num) {
+        String fileName = dataFilePrefix + num;
+        File file = new File(directory, fileName);
+        DataFile result = new DataFile(file, num);
+        fileMap.put(result.getNumber(), result);
+        return result;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.impl.data.IDataManager#getName()
+     */
+    public String getName() {
+        return name;
+    }
+
+    synchronized DataFile findSpaceForData(DataItem item) throws IOException {
+        if (currentWriteFile == null || ((currentWriteFile.getLength() + item.getSize()) > maxFileLength)) {
+            int nextNum = currentWriteFile != null ? currentWriteFile.getNumber().intValue() + 1 : 1;
+            if (currentWriteFile != null && currentWriteFile.isUnused()) {
+                removeDataFile(currentWriteFile);
+            }
+            currentWriteFile = createAndAddDataFile(nextNum);
+        }
+        item.setOffset(currentWriteFile.getLength());
+        item.setFile(currentWriteFile.getNumber().intValue());
+        int len = item.getSize() + ITEM_HEAD_SIZE;
+        currentWriteFile.incrementLength(len);
+        storeSize.addAndGet(len);
+        return currentWriteFile;
+    }
+
+    DataFile getDataFile(StoreLocation item) throws IOException {
+        Integer key = Integer.valueOf(item.getFile());
+        DataFile dataFile = fileMap.get(key);
+        if (dataFile == null) {
+            LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
+            throw new IOException("Could not locate data file " + NAME_PREFIX + name + "-" + item.getFile());
+        }
+        return dataFile;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.impl.data.IDataManager#readItem(org.apache.activemq.kaha.Marshaller,
+     *      org.apache.activemq.kaha.StoreLocation)
+     */
+    public synchronized Object readItem(Marshaller marshaller, StoreLocation item) throws IOException {
+        return getReader().readItem(marshaller, item);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.impl.data.IDataManager#storeDataItem(org.apache.activemq.kaha.Marshaller,
+     *      java.lang.Object)
+     */
+    public synchronized StoreLocation storeDataItem(Marshaller marshaller, Object payload) throws IOException {
+        return getWriter().storeItem(marshaller, payload, DATA_ITEM_TYPE);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.impl.data.IDataManager#storeRedoItem(java.lang.Object)
+     */
+    public synchronized StoreLocation storeRedoItem(Object payload) throws IOException {
+        return getWriter().storeItem(redoMarshaller, payload, REDO_ITEM_TYPE);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.impl.data.IDataManager#updateItem(org.apache.activemq.kaha.StoreLocation,
+     *      org.apache.activemq.kaha.Marshaller, java.lang.Object)
+     */
+    public synchronized void updateItem(StoreLocation location, Marshaller marshaller, Object payload)
+        throws IOException {
+        getWriter().updateItem((DataItem)location, marshaller, payload, DATA_ITEM_TYPE);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.impl.data.IDataManager#recoverRedoItems(org.apache.activemq.kaha.impl.data.RedoListener)
+     */
+    public synchronized void recoverRedoItems(RedoListener listener) throws IOException {
+
+        // Nothing to recover if there is no current file.
+        if (currentWriteFile == null) {
+            return;
+        }
+
+        DataItem item = new DataItem();
+        item.setFile(currentWriteFile.getNumber().intValue());
+        item.setOffset(0);
+        while (true) {
+            byte type;
+            try {
+                type = getReader().readDataItemSize(item);
+            } catch (IOException ignore) {
+                LOG.trace("End of data file reached at (header was invalid): " + item);
+                return;
+            }
+            if (type == REDO_ITEM_TYPE) {
+                // Un-marshal the redo item
+                Object object;
+                try {
+                    object = readItem(redoMarshaller, item);
+                } catch (IOException e1) {
+                    LOG.trace("End of data file reached at (payload was invalid): " + item);
+                    return;
+                }
+                try {
+
+                    listener.onRedoItem(item, object);
+                    // in case the listener is holding on to item references,
+                    // copy it
+                    // so we don't change it behind the listener's back.
+                    item = item.copy();
+
+                } catch (Exception e) {
+                    throw IOExceptionSupport.create("Recovery handler failed: " + e, e);
+                }
+            }
+            // Move to the next item.
+            item.setOffset(item.getOffset() + ITEM_HEAD_SIZE + item.getSize());
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.impl.data.IDataManager#close()
+     */
+    public synchronized void close() throws IOException {
+        getWriter().close();
+        for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
+            DataFile dataFile = i.next();
+            getWriter().force(dataFile);
+            dataFile.close();
+        }
+        fileMap.clear();
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.impl.data.IDataManager#force()
+     */
+    public synchronized void force() throws IOException {
+        for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
+            DataFile dataFile = i.next();
+            getWriter().force(dataFile);
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.impl.data.IDataManager#delete()
+     */
+    public synchronized boolean delete() throws IOException {
+        boolean result = true;
+        for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
+            DataFile dataFile = i.next();
+            storeSize.addAndGet(-dataFile.getLength());
+            result &= dataFile.delete();
+        }
+        fileMap.clear();
+        return result;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.impl.data.IDataManager#addInterestInFile(int)
+     */
+    public synchronized void addInterestInFile(int file) throws IOException {
+        if (file >= 0) {
+            Integer key = Integer.valueOf(file);
+            DataFile dataFile = fileMap.get(key);
+            if (dataFile == null) {
+                dataFile = createAndAddDataFile(file);
+            }
+            addInterestInFile(dataFile);
+        }
+    }
+
+    synchronized void addInterestInFile(DataFile dataFile) {
+        if (dataFile != null) {
+            dataFile.increment();
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.impl.data.IDataManager#removeInterestInFile(int)
+     */
+    public synchronized void removeInterestInFile(int file) throws IOException {
+        if (file >= 0) {
+            Integer key = Integer.valueOf(file);
+            DataFile dataFile = fileMap.get(key);
+            removeInterestInFile(dataFile);
+        }
+    }
+
+    synchronized void removeInterestInFile(DataFile dataFile) throws IOException {
+        if (dataFile != null) {
+           
+            if (dataFile.decrement() <= 0) {
+                if (dataFile != currentWriteFile) {
+                    removeDataFile(dataFile);
+                }
+            }
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.impl.data.IDataManager#consolidateDataFiles()
+     */
+    public synchronized void consolidateDataFiles() throws IOException {
+        List<DataFile> purgeList = new ArrayList<DataFile>();
+        for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
+            DataFile dataFile = i.next();
+            if (dataFile.isUnused() && dataFile != currentWriteFile) {
+                purgeList.add(dataFile);
+            }
+        }
+        for (int i = 0; i < purgeList.size(); i++) {
+            DataFile dataFile = purgeList.get(i);
+            removeDataFile(dataFile);
+        }
+    }
+
+    private void removeDataFile(DataFile dataFile) throws IOException {
+        fileMap.remove(dataFile.getNumber());
+        if (writer != null) {
+            writer.force(dataFile);
+        }
+        storeSize.addAndGet(-dataFile.getLength());
+        boolean result = dataFile.delete();
+        LOG.debug("discarding data file " + dataFile + (result ? "successful " : "failed"));
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.impl.data.IDataManager#getRedoMarshaller()
+     */
+    public Marshaller getRedoMarshaller() {
+        return redoMarshaller;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.impl.data.IDataManager#setRedoMarshaller(org.apache.activemq.kaha.Marshaller)
+     */
+    public void setRedoMarshaller(Marshaller redoMarshaller) {
+        this.redoMarshaller = redoMarshaller;
+    }
+
+    /**
+     * @return the maxFileLength
+     */
+    public long getMaxFileLength() {
+        return maxFileLength;
+    }
+
+    /**
+     * @param maxFileLength the maxFileLength to set
+     */
+    public void setMaxFileLength(long maxFileLength) {
+        this.maxFileLength = maxFileLength;
+    }
+
+    public String toString() {
+        return "DataManager:(" + NAME_PREFIX + name + ")";
+    }
+
+    public synchronized SyncDataFileReader getReader() {
+        if (reader == null) {
+            reader = createReader();
+        }
+        return reader;
+    }
+
+    protected synchronized SyncDataFileReader createReader() {
+        return new SyncDataFileReader(this);
+    }
+
+    public synchronized void setReader(SyncDataFileReader reader) {
+        this.reader = reader;
+    }
+
+    public synchronized SyncDataFileWriter getWriter() {
+        if (writer == null) {
+            writer = createWriter();
+        }
+        return writer;
+    }
+
+    private SyncDataFileWriter createWriter() {
+        return new SyncDataFileWriter(this);
+    }
+
+    public synchronized void setWriter(SyncDataFileWriter writer) {
+        this.writer = writer;
+    }
+
+}

Added: activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/Item.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/Item.java?rev=1406770&view=auto
==============================================================================
--- activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/Item.java (added)
+++ activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/Item.java Wed Nov  7 19:37:24 2012
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.data;
+
+/**
+ * A a wrapper for a data in the store
+ * 
+ * 
+ */
+public interface Item {
+    long POSITION_NOT_SET = -1;
+    short MAGIC = 31317;
+    int ACTIVE = 22;
+    int FREE = 33;
+    int LOCATION_SIZE = 24;
+}

Propchange: activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/Item.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/RedoListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/RedoListener.java?rev=1406770&view=auto
==============================================================================
--- activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/RedoListener.java (added)
+++ activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/RedoListener.java Wed Nov  7 19:37:24 2012
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.data;
+
+import org.apache.activemq.kaha.StoreLocation;
+
+
+public interface RedoListener {
+
+    void onRedoItem(StoreLocation item, Object object) throws Exception;
+
+}

Added: activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileReader.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileReader.java?rev=1406770&view=auto
==============================================================================
--- activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileReader.java (added)
+++ activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileReader.java Wed Nov  7 19:37:24 2012
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.data;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import org.apache.activemq.kaha.Marshaller;
+import org.apache.activemq.kaha.StoreLocation;
+import org.apache.activemq.util.DataByteArrayInputStream;
+
+/**
+ * Optimized Store reader
+ * 
+ * 
+ */
+public final class SyncDataFileReader {
+
+    private DataManagerImpl dataManager;
+    private DataByteArrayInputStream dataIn;
+
+    /**
+     * Construct a Store reader
+     * 
+     * @param fileId
+     */
+    SyncDataFileReader(DataManagerImpl fileManager) {
+        this.dataManager = fileManager;
+        this.dataIn = new DataByteArrayInputStream();
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.impl.data.DataFileReader#readDataItemSize(org.apache.activemq.kaha.impl.data.DataItem)
+     */
+    public synchronized byte readDataItemSize(DataItem item) throws IOException {
+        RandomAccessFile file = dataManager.getDataFile(item).getRandomAccessFile();
+        file.seek(item.getOffset()); // jump to the size field
+        byte rc = file.readByte();
+        item.setSize(file.readInt());
+        return rc;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.impl.data.DataFileReader#readItem(org.apache.activemq.kaha.Marshaller,
+     *      org.apache.activemq.kaha.StoreLocation)
+     */
+    public synchronized Object readItem(Marshaller marshaller, StoreLocation item) throws IOException {
+        RandomAccessFile file = dataManager.getDataFile(item).getRandomAccessFile();
+
+        // TODO: we could reuse the buffer in dataIn if it's big enough to avoid
+        // allocating byte[] arrays on every readItem.
+        byte[] data = new byte[item.getSize()];
+        file.seek(item.getOffset() + DataManagerImpl.ITEM_HEAD_SIZE);
+        file.readFully(data);
+        dataIn.restart(data);
+        return marshaller.readPayload(dataIn);
+    }
+}

Added: activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java?rev=1406770&view=auto
==============================================================================
--- activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java (added)
+++ activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java Wed Nov  7 19:37:24 2012
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.data;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+import org.apache.activemq.kaha.Marshaller;
+import org.apache.activemq.util.DataByteArrayOutputStream;
+
+/**
+ * Optimized Store writer. Synchronously marshalls and writes to the data file.
+ * Simple but may introduce a bit of contention when put under load.
+ * 
+ * 
+ */
+public final class SyncDataFileWriter {
+
+    private DataByteArrayOutputStream buffer;
+    private DataManagerImpl dataManager;
+
+    /**
+     * Construct a Store writer
+     * 
+     * @param fileId
+     */
+    SyncDataFileWriter(DataManagerImpl fileManager) {
+        this.dataManager = fileManager;
+        this.buffer = new DataByteArrayOutputStream();
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.impl.data.DataFileWriter#storeItem(org.apache.activemq.kaha.Marshaller,
+     *      java.lang.Object, byte)
+     */
+    public synchronized DataItem storeItem(Marshaller marshaller, Object payload, byte type)
+        throws IOException {
+
+        // Write the packet our internal buffer.
+        buffer.reset();
+        buffer.position(DataManagerImpl.ITEM_HEAD_SIZE);
+        marshaller.writePayload(payload, buffer);
+        int size = buffer.size();
+        int payloadSize = size - DataManagerImpl.ITEM_HEAD_SIZE;
+        buffer.reset();
+        buffer.writeByte(type);
+        buffer.writeInt(payloadSize);
+
+        // Find the position where this item will land at.
+        DataItem item = new DataItem();
+        item.setSize(payloadSize);
+        DataFile dataFile = dataManager.findSpaceForData(item);
+
+        // Now splat the buffer to the file.
+        dataFile.getRandomAccessFile().seek(item.getOffset());
+        dataFile.getRandomAccessFile().write(buffer.getData(), 0, size);
+        dataFile.setWriterData(Boolean.TRUE); // Use as dirty marker..
+
+        dataManager.addInterestInFile(dataFile);
+        return item;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.impl.data.DataFileWriter#updateItem(org.apache.activemq.kaha.StoreLocation,
+     *      org.apache.activemq.kaha.Marshaller, java.lang.Object, byte)
+     */
+    public synchronized void updateItem(DataItem item, Marshaller marshaller, Object payload, byte type)
+        throws IOException {
+        // Write the packet our internal buffer.
+        buffer.reset();
+        buffer.position(DataManagerImpl.ITEM_HEAD_SIZE);
+        marshaller.writePayload(payload, buffer);
+        int size = buffer.size();
+        int payloadSize = size - DataManagerImpl.ITEM_HEAD_SIZE;
+        buffer.reset();
+        buffer.writeByte(type);
+        buffer.writeInt(payloadSize);
+        item.setSize(payloadSize);
+        DataFile dataFile = dataManager.getDataFile(item);
+        RandomAccessFile file = dataFile.getRandomAccessFile();
+        file.seek(item.getOffset());
+        file.write(buffer.getData(), 0, size);
+        dataFile.setWriterData(Boolean.TRUE); // Use as dirty marker..
+    }
+
+    public synchronized void force(DataFile dataFile) throws IOException {
+        // If our dirty marker was set.. then we need to sync
+        if (dataFile.getWriterData() != null && dataFile.isDirty()) {
+            dataFile.getRandomAccessFile().getFD().sync();
+            dataFile.setWriterData(null);
+            dataFile.setDirty(false);
+        }
+    }
+
+    public void close() throws IOException {
+    }
+}