You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/11/07 20:37:24 UTC
svn commit: r1406770 - in
/activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data:
./ DataFile.java DataItem.java DataManagerImpl.java Item.java
RedoListener.java SyncDataFileReader.java SyncDataFileWriter.java
Author: chirino
Date: Wed Nov 7 19:37:24 2012
New Revision: 1406770
URL: http://svn.apache.org/viewvc?rev=1406770&view=rev
Log:
Add files that were part of my .gitignore.
Added:
activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/
activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java
activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/DataItem.java (with props)
activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java
activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/Item.java (with props)
activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/RedoListener.java
activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileReader.java
activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java
Added: activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java?rev=1406770&view=auto
==============================================================================
--- activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java (added)
+++ activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java Wed Nov 7 19:37:24 2012
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.data;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+/**
+ * DataFile
+ *
+ *
+ */
+class DataFile {
+
+ private final File file;
+ private final Integer number;
+ private int referenceCount;
+ private RandomAccessFile randomAcessFile;
+ private Object writerData;
+ private long length;
+ private boolean dirty;
+
+ DataFile(File file, int number) {
+ this.file = file;
+ this.number = Integer.valueOf(number);
+ length = file.exists() ? file.length() : 0;
+ }
+
+ Integer getNumber() {
+ return number;
+ }
+
+ synchronized RandomAccessFile getRandomAccessFile() throws FileNotFoundException {
+ if (randomAcessFile == null) {
+ randomAcessFile = new RandomAccessFile(file, "rw");
+ }
+ return randomAcessFile;
+ }
+
+ synchronized long getLength() {
+ return length;
+ }
+
+ synchronized void incrementLength(int size) {
+ length += size;
+ }
+
+ synchronized void purge() throws IOException {
+ if (randomAcessFile != null) {
+ randomAcessFile.close();
+ randomAcessFile = null;
+ }
+ }
+
+ synchronized boolean delete() throws IOException {
+ purge();
+ return file.delete();
+ }
+
+ synchronized void close() throws IOException {
+ if (randomAcessFile != null) {
+ randomAcessFile.close();
+ }
+ }
+
+ synchronized int increment() {
+ return ++referenceCount;
+ }
+
+ synchronized int decrement() {
+ return --referenceCount;
+ }
+
+ synchronized boolean isUnused() {
+ return referenceCount <= 0;
+ }
+
+ public String toString() {
+ String result = file.getName() + " number = " + number + " , length = " + length + " refCount = " + referenceCount;
+ return result;
+ }
+
+ /**
+ * @return Opaque data that a DataFileWriter may want to associate with the
+ * DataFile.
+ */
+ public synchronized Object getWriterData() {
+ return writerData;
+ }
+
+ /**
+ * @param writerData - Opaque data that a DataFileWriter may want to
+ * associate with the DataFile.
+ */
+ public synchronized void setWriterData(Object writerData) {
+ this.writerData = writerData;
+ dirty = true;
+ }
+
+ public synchronized boolean isDirty() {
+ return dirty;
+ }
+
+ public synchronized void setDirty(boolean value) {
+ this.dirty = value;
+ }
+
+}
Added: activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/DataItem.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/DataItem.java?rev=1406770&view=auto
==============================================================================
--- activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/DataItem.java (added)
+++ activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/DataItem.java Wed Nov 7 19:37:24 2012
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.data;
+
+import org.apache.activemq.kaha.StoreLocation;
+
+/**
+ * A a wrapper for a data in the store
+ *
+ *
+ */
+public final class DataItem implements Item, StoreLocation {
+
+ private int file = (int)POSITION_NOT_SET;
+ private long offset = POSITION_NOT_SET;
+ private int size;
+
+ public DataItem() {
+ }
+
+ DataItem(DataItem item) {
+ this.file = item.file;
+ this.offset = item.offset;
+ this.size = item.size;
+ }
+
+ boolean isValid() {
+ return file != POSITION_NOT_SET;
+ }
+
+ /**
+ * @return
+ * @see org.apache.activemq.kaha.StoreLocation#getSize()
+ */
+ public int getSize() {
+ return size;
+ }
+
+ /**
+ * @param size The size to set.
+ */
+ public void setSize(int size) {
+ this.size = size;
+ }
+
+ /**
+ * @return
+ * @see org.apache.activemq.kaha.StoreLocation#getOffset()
+ */
+ public long getOffset() {
+ return offset;
+ }
+
+ /**
+ * @param offset The offset to set.
+ */
+ public void setOffset(long offset) {
+ this.offset = offset;
+ }
+
+ /**
+ * @return
+ * @see org.apache.activemq.kaha.StoreLocation#getFile()
+ */
+ public int getFile() {
+ return file;
+ }
+
+ /**
+ * @param file The file to set.
+ */
+ public void setFile(int file) {
+ this.file = file;
+ }
+
+ /**
+ * @return a pretty print
+ */
+ public String toString() {
+ String result = "offset = " + offset + ", file = " + file + ", size = " + size;
+ return result;
+ }
+
+ public DataItem copy() {
+ return new DataItem(this);
+ }
+}
Propchange: activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/DataItem.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java?rev=1406770&view=auto
==============================================================================
--- activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java (added)
+++ activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java Wed Nov 7 19:37:24 2012
@@ -0,0 +1,408 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.data;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.kaha.Marshaller;
+import org.apache.activemq.kaha.StoreLocation;
+import org.apache.activemq.kaha.impl.DataManager;
+import org.apache.activemq.kaha.impl.index.RedoStoreIndexItem;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.IOHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages DataFiles
+ *
+ *
+ */
+public final class DataManagerImpl implements DataManager {
+
+ public static final int ITEM_HEAD_SIZE = 5; // type + length
+ public static final byte DATA_ITEM_TYPE = 1;
+ public static final byte REDO_ITEM_TYPE = 2;
+ public static final long MAX_FILE_LENGTH = 1024 * 1024 * 32;
+
+ private static final Logger LOG = LoggerFactory.getLogger(DataManagerImpl.class);
+ private static final String NAME_PREFIX = "data-";
+
+ private final File directory;
+ private final String name;
+ private SyncDataFileReader reader;
+ private SyncDataFileWriter writer;
+ private DataFile currentWriteFile;
+ private long maxFileLength = MAX_FILE_LENGTH;
+ private Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
+ private Marshaller redoMarshaller = RedoStoreIndexItem.MARSHALLER;
+ private String dataFilePrefix;
+ private final AtomicLong storeSize;
+
+ public DataManagerImpl(File dir, final String name,AtomicLong storeSize) {
+ this.directory = dir;
+ this.name = name;
+ this.storeSize=storeSize;
+
+ dataFilePrefix = IOHelper.toFileSystemSafeName(NAME_PREFIX + name + "-");
+ // build up list of current dataFiles
+ File[] files = dir.listFiles(new FilenameFilter() {
+ public boolean accept(File dir, String n) {
+ return dir.equals(directory) && n.startsWith(dataFilePrefix);
+ }
+ });
+ if (files != null) {
+ for (int i = 0; i < files.length; i++) {
+ File file = files[i];
+ String n = file.getName();
+ String numStr = n.substring(dataFilePrefix.length(), n.length());
+ int num = Integer.parseInt(numStr);
+ DataFile dataFile = new DataFile(file, num);
+ storeSize.addAndGet(dataFile.getLength());
+ fileMap.put(dataFile.getNumber(), dataFile);
+ if (currentWriteFile == null || currentWriteFile.getNumber().intValue() < num) {
+ currentWriteFile = dataFile;
+ }
+ }
+ }
+ }
+
+ private DataFile createAndAddDataFile(int num) {
+ String fileName = dataFilePrefix + num;
+ File file = new File(directory, fileName);
+ DataFile result = new DataFile(file, num);
+ fileMap.put(result.getNumber(), result);
+ return result;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.kaha.impl.data.IDataManager#getName()
+ */
+ public String getName() {
+ return name;
+ }
+
+ synchronized DataFile findSpaceForData(DataItem item) throws IOException {
+ if (currentWriteFile == null || ((currentWriteFile.getLength() + item.getSize()) > maxFileLength)) {
+ int nextNum = currentWriteFile != null ? currentWriteFile.getNumber().intValue() + 1 : 1;
+ if (currentWriteFile != null && currentWriteFile.isUnused()) {
+ removeDataFile(currentWriteFile);
+ }
+ currentWriteFile = createAndAddDataFile(nextNum);
+ }
+ item.setOffset(currentWriteFile.getLength());
+ item.setFile(currentWriteFile.getNumber().intValue());
+ int len = item.getSize() + ITEM_HEAD_SIZE;
+ currentWriteFile.incrementLength(len);
+ storeSize.addAndGet(len);
+ return currentWriteFile;
+ }
+
+ DataFile getDataFile(StoreLocation item) throws IOException {
+ Integer key = Integer.valueOf(item.getFile());
+ DataFile dataFile = fileMap.get(key);
+ if (dataFile == null) {
+ LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
+ throw new IOException("Could not locate data file " + NAME_PREFIX + name + "-" + item.getFile());
+ }
+ return dataFile;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.kaha.impl.data.IDataManager#readItem(org.apache.activemq.kaha.Marshaller,
+ * org.apache.activemq.kaha.StoreLocation)
+ */
+ public synchronized Object readItem(Marshaller marshaller, StoreLocation item) throws IOException {
+ return getReader().readItem(marshaller, item);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.kaha.impl.data.IDataManager#storeDataItem(org.apache.activemq.kaha.Marshaller,
+ * java.lang.Object)
+ */
+ public synchronized StoreLocation storeDataItem(Marshaller marshaller, Object payload) throws IOException {
+ return getWriter().storeItem(marshaller, payload, DATA_ITEM_TYPE);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.kaha.impl.data.IDataManager#storeRedoItem(java.lang.Object)
+ */
+ public synchronized StoreLocation storeRedoItem(Object payload) throws IOException {
+ return getWriter().storeItem(redoMarshaller, payload, REDO_ITEM_TYPE);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.kaha.impl.data.IDataManager#updateItem(org.apache.activemq.kaha.StoreLocation,
+ * org.apache.activemq.kaha.Marshaller, java.lang.Object)
+ */
+ public synchronized void updateItem(StoreLocation location, Marshaller marshaller, Object payload)
+ throws IOException {
+ getWriter().updateItem((DataItem)location, marshaller, payload, DATA_ITEM_TYPE);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.kaha.impl.data.IDataManager#recoverRedoItems(org.apache.activemq.kaha.impl.data.RedoListener)
+ */
+ public synchronized void recoverRedoItems(RedoListener listener) throws IOException {
+
+ // Nothing to recover if there is no current file.
+ if (currentWriteFile == null) {
+ return;
+ }
+
+ DataItem item = new DataItem();
+ item.setFile(currentWriteFile.getNumber().intValue());
+ item.setOffset(0);
+ while (true) {
+ byte type;
+ try {
+ type = getReader().readDataItemSize(item);
+ } catch (IOException ignore) {
+ LOG.trace("End of data file reached at (header was invalid): " + item);
+ return;
+ }
+ if (type == REDO_ITEM_TYPE) {
+ // Un-marshal the redo item
+ Object object;
+ try {
+ object = readItem(redoMarshaller, item);
+ } catch (IOException e1) {
+ LOG.trace("End of data file reached at (payload was invalid): " + item);
+ return;
+ }
+ try {
+
+ listener.onRedoItem(item, object);
+ // in case the listener is holding on to item references,
+ // copy it
+ // so we don't change it behind the listener's back.
+ item = item.copy();
+
+ } catch (Exception e) {
+ throw IOExceptionSupport.create("Recovery handler failed: " + e, e);
+ }
+ }
+ // Move to the next item.
+ item.setOffset(item.getOffset() + ITEM_HEAD_SIZE + item.getSize());
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.kaha.impl.data.IDataManager#close()
+ */
+ public synchronized void close() throws IOException {
+ getWriter().close();
+ for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
+ DataFile dataFile = i.next();
+ getWriter().force(dataFile);
+ dataFile.close();
+ }
+ fileMap.clear();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.kaha.impl.data.IDataManager#force()
+ */
+ public synchronized void force() throws IOException {
+ for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
+ DataFile dataFile = i.next();
+ getWriter().force(dataFile);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.kaha.impl.data.IDataManager#delete()
+ */
+ public synchronized boolean delete() throws IOException {
+ boolean result = true;
+ for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
+ DataFile dataFile = i.next();
+ storeSize.addAndGet(-dataFile.getLength());
+ result &= dataFile.delete();
+ }
+ fileMap.clear();
+ return result;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.kaha.impl.data.IDataManager#addInterestInFile(int)
+ */
+ public synchronized void addInterestInFile(int file) throws IOException {
+ if (file >= 0) {
+ Integer key = Integer.valueOf(file);
+ DataFile dataFile = fileMap.get(key);
+ if (dataFile == null) {
+ dataFile = createAndAddDataFile(file);
+ }
+ addInterestInFile(dataFile);
+ }
+ }
+
+ synchronized void addInterestInFile(DataFile dataFile) {
+ if (dataFile != null) {
+ dataFile.increment();
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.kaha.impl.data.IDataManager#removeInterestInFile(int)
+ */
+ public synchronized void removeInterestInFile(int file) throws IOException {
+ if (file >= 0) {
+ Integer key = Integer.valueOf(file);
+ DataFile dataFile = fileMap.get(key);
+ removeInterestInFile(dataFile);
+ }
+ }
+
+ synchronized void removeInterestInFile(DataFile dataFile) throws IOException {
+ if (dataFile != null) {
+
+ if (dataFile.decrement() <= 0) {
+ if (dataFile != currentWriteFile) {
+ removeDataFile(dataFile);
+ }
+ }
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.kaha.impl.data.IDataManager#consolidateDataFiles()
+ */
+ public synchronized void consolidateDataFiles() throws IOException {
+ List<DataFile> purgeList = new ArrayList<DataFile>();
+ for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
+ DataFile dataFile = i.next();
+ if (dataFile.isUnused() && dataFile != currentWriteFile) {
+ purgeList.add(dataFile);
+ }
+ }
+ for (int i = 0; i < purgeList.size(); i++) {
+ DataFile dataFile = purgeList.get(i);
+ removeDataFile(dataFile);
+ }
+ }
+
+ private void removeDataFile(DataFile dataFile) throws IOException {
+ fileMap.remove(dataFile.getNumber());
+ if (writer != null) {
+ writer.force(dataFile);
+ }
+ storeSize.addAndGet(-dataFile.getLength());
+ boolean result = dataFile.delete();
+ LOG.debug("discarding data file " + dataFile + (result ? "successful " : "failed"));
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.kaha.impl.data.IDataManager#getRedoMarshaller()
+ */
+ public Marshaller getRedoMarshaller() {
+ return redoMarshaller;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.kaha.impl.data.IDataManager#setRedoMarshaller(org.apache.activemq.kaha.Marshaller)
+ */
+ public void setRedoMarshaller(Marshaller redoMarshaller) {
+ this.redoMarshaller = redoMarshaller;
+ }
+
+ /**
+ * @return the maxFileLength
+ */
+ public long getMaxFileLength() {
+ return maxFileLength;
+ }
+
+ /**
+ * @param maxFileLength the maxFileLength to set
+ */
+ public void setMaxFileLength(long maxFileLength) {
+ this.maxFileLength = maxFileLength;
+ }
+
+ public String toString() {
+ return "DataManager:(" + NAME_PREFIX + name + ")";
+ }
+
+ public synchronized SyncDataFileReader getReader() {
+ if (reader == null) {
+ reader = createReader();
+ }
+ return reader;
+ }
+
+ protected synchronized SyncDataFileReader createReader() {
+ return new SyncDataFileReader(this);
+ }
+
+ public synchronized void setReader(SyncDataFileReader reader) {
+ this.reader = reader;
+ }
+
+ public synchronized SyncDataFileWriter getWriter() {
+ if (writer == null) {
+ writer = createWriter();
+ }
+ return writer;
+ }
+
+ private SyncDataFileWriter createWriter() {
+ return new SyncDataFileWriter(this);
+ }
+
+ public synchronized void setWriter(SyncDataFileWriter writer) {
+ this.writer = writer;
+ }
+
+}
Added: activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/Item.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/Item.java?rev=1406770&view=auto
==============================================================================
--- activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/Item.java (added)
+++ activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/Item.java Wed Nov 7 19:37:24 2012
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.data;
+
+/**
+ * A a wrapper for a data in the store
+ *
+ *
+ */
+public interface Item {
+ long POSITION_NOT_SET = -1;
+ short MAGIC = 31317;
+ int ACTIVE = 22;
+ int FREE = 33;
+ int LOCATION_SIZE = 24;
+}
Propchange: activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/Item.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/RedoListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/RedoListener.java?rev=1406770&view=auto
==============================================================================
--- activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/RedoListener.java (added)
+++ activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/RedoListener.java Wed Nov 7 19:37:24 2012
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.data;
+
+import org.apache.activemq.kaha.StoreLocation;
+
+
+public interface RedoListener {
+
+ void onRedoItem(StoreLocation item, Object object) throws Exception;
+
+}
Added: activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileReader.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileReader.java?rev=1406770&view=auto
==============================================================================
--- activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileReader.java (added)
+++ activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileReader.java Wed Nov 7 19:37:24 2012
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.data;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import org.apache.activemq.kaha.Marshaller;
+import org.apache.activemq.kaha.StoreLocation;
+import org.apache.activemq.util.DataByteArrayInputStream;
+
+/**
+ * Optimized Store reader
+ *
+ *
+ */
+public final class SyncDataFileReader {
+
+ private DataManagerImpl dataManager;
+ private DataByteArrayInputStream dataIn;
+
+ /**
+ * Construct a Store reader
+ *
+ * @param fileId
+ */
+ SyncDataFileReader(DataManagerImpl fileManager) {
+ this.dataManager = fileManager;
+ this.dataIn = new DataByteArrayInputStream();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.kaha.impl.data.DataFileReader#readDataItemSize(org.apache.activemq.kaha.impl.data.DataItem)
+ */
+ public synchronized byte readDataItemSize(DataItem item) throws IOException {
+ RandomAccessFile file = dataManager.getDataFile(item).getRandomAccessFile();
+ file.seek(item.getOffset()); // jump to the size field
+ byte rc = file.readByte();
+ item.setSize(file.readInt());
+ return rc;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.kaha.impl.data.DataFileReader#readItem(org.apache.activemq.kaha.Marshaller,
+ * org.apache.activemq.kaha.StoreLocation)
+ */
+ public synchronized Object readItem(Marshaller marshaller, StoreLocation item) throws IOException {
+ RandomAccessFile file = dataManager.getDataFile(item).getRandomAccessFile();
+
+ // TODO: we could reuse the buffer in dataIn if it's big enough to avoid
+ // allocating byte[] arrays on every readItem.
+ byte[] data = new byte[item.getSize()];
+ file.seek(item.getOffset() + DataManagerImpl.ITEM_HEAD_SIZE);
+ file.readFully(data);
+ dataIn.restart(data);
+ return marshaller.readPayload(dataIn);
+ }
+}
Added: activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java?rev=1406770&view=auto
==============================================================================
--- activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java (added)
+++ activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java Wed Nov 7 19:37:24 2012
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.data;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+import org.apache.activemq.kaha.Marshaller;
+import org.apache.activemq.util.DataByteArrayOutputStream;
+
+/**
+ * Optimized Store writer. Synchronously marshalls and writes to the data file.
+ * Simple but may introduce a bit of contention when put under load.
+ *
+ *
+ */
+public final class SyncDataFileWriter {
+
+ private DataByteArrayOutputStream buffer;
+ private DataManagerImpl dataManager;
+
+ /**
+ * Construct a Store writer
+ *
+ * @param fileId
+ */
+ SyncDataFileWriter(DataManagerImpl fileManager) {
+ this.dataManager = fileManager;
+ this.buffer = new DataByteArrayOutputStream();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.kaha.impl.data.DataFileWriter#storeItem(org.apache.activemq.kaha.Marshaller,
+ * java.lang.Object, byte)
+ */
+ public synchronized DataItem storeItem(Marshaller marshaller, Object payload, byte type)
+ throws IOException {
+
+ // Write the packet our internal buffer.
+ buffer.reset();
+ buffer.position(DataManagerImpl.ITEM_HEAD_SIZE);
+ marshaller.writePayload(payload, buffer);
+ int size = buffer.size();
+ int payloadSize = size - DataManagerImpl.ITEM_HEAD_SIZE;
+ buffer.reset();
+ buffer.writeByte(type);
+ buffer.writeInt(payloadSize);
+
+ // Find the position where this item will land at.
+ DataItem item = new DataItem();
+ item.setSize(payloadSize);
+ DataFile dataFile = dataManager.findSpaceForData(item);
+
+ // Now splat the buffer to the file.
+ dataFile.getRandomAccessFile().seek(item.getOffset());
+ dataFile.getRandomAccessFile().write(buffer.getData(), 0, size);
+ dataFile.setWriterData(Boolean.TRUE); // Use as dirty marker..
+
+ dataManager.addInterestInFile(dataFile);
+ return item;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.kaha.impl.data.DataFileWriter#updateItem(org.apache.activemq.kaha.StoreLocation,
+ * org.apache.activemq.kaha.Marshaller, java.lang.Object, byte)
+ */
+ public synchronized void updateItem(DataItem item, Marshaller marshaller, Object payload, byte type)
+ throws IOException {
+ // Write the packet our internal buffer.
+ buffer.reset();
+ buffer.position(DataManagerImpl.ITEM_HEAD_SIZE);
+ marshaller.writePayload(payload, buffer);
+ int size = buffer.size();
+ int payloadSize = size - DataManagerImpl.ITEM_HEAD_SIZE;
+ buffer.reset();
+ buffer.writeByte(type);
+ buffer.writeInt(payloadSize);
+ item.setSize(payloadSize);
+ DataFile dataFile = dataManager.getDataFile(item);
+ RandomAccessFile file = dataFile.getRandomAccessFile();
+ file.seek(item.getOffset());
+ file.write(buffer.getData(), 0, size);
+ dataFile.setWriterData(Boolean.TRUE); // Use as dirty marker..
+ }
+
+ public synchronized void force(DataFile dataFile) throws IOException {
+ // If our dirty marker was set.. then we need to sync
+ if (dataFile.getWriterData() != null && dataFile.isDirty()) {
+ dataFile.getRandomAccessFile().getFD().sync();
+ dataFile.setWriterData(null);
+ dataFile.setDirty(false);
+ }
+ }
+
+ public void close() throws IOException {
+ }
+}