You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/02/03 21:43:20 UTC
svn commit: r618082 - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async:
AsyncDataManager.java DataFile.java ReadOnlyAsyncDataManager.java
ReadOnlyDataFile.java
Author: chirino
Date: Sun Feb 3 12:43:17 2008
New Revision: 618082
URL: http://svn.apache.org/viewvc?rev=618082&view=rev
Log:
Adding a ReadOnlyAsyncDataManager so that you can access a set of data files in a read only way.
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ReadOnlyAsyncDataManager.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ReadOnlyDataFile.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java?rev=618082&r1=618081&r2=618082&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java Sun Feb 3 12:43:17 2008
@@ -50,7 +50,7 @@
*
* @version $Revision: 1.1.1.1 $
*/
-public final class AsyncDataManager {
+public class AsyncDataManager {
public static final int CONTROL_RECORD_MAX_LENGTH = 1024;
public static final int ITEM_HEAD_RESERVED_SPACE = 21;
@@ -75,28 +75,28 @@
protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>();
- File directory = new File(DEFAULT_DIRECTORY);
- File directoryArchive = new File (DEFAULT_ARCHIVE_DIRECTORY);
- String filePrefix = DEFAULT_FILE_PREFIX;
- ControlFile controlFile;
- boolean started;
- boolean useNio = true;
-
- private int maxFileLength = DEFAULT_MAX_FILE_LENGTH;
- private int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - 1024 * 512;
-
- private DataFileAppender appender;
- private DataFileAccessorPool accessorPool = new DataFileAccessorPool(this);
-
- private Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
- private Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>();
- private DataFile currentWriteFile;
-
- private Location mark;
- private final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>();
- private Runnable cleanupTask;
- private final AtomicLong storeSize;
- private boolean archiveDataLogs;
+ protected File directory = new File(DEFAULT_DIRECTORY);
+ protected File directoryArchive = new File (DEFAULT_ARCHIVE_DIRECTORY);
+ protected String filePrefix = DEFAULT_FILE_PREFIX;
+ protected ControlFile controlFile;
+ protected boolean started;
+ protected boolean useNio = true;
+
+ protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH;
+ protected int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - 1024 * 512;
+
+ protected DataFileAppender appender;
+ protected DataFileAccessorPool accessorPool = new DataFileAccessorPool(this);
+
+ protected Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
+ protected Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>();
+ protected DataFile currentWriteFile;
+
+ protected Location mark;
+ protected final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>();
+ protected Runnable cleanupTask;
+ protected final AtomicLong storeSize;
+ protected boolean archiveDataLogs;
public AsyncDataManager(AtomicLong storeSize) {
this.storeSize=storeSize;
@@ -194,7 +194,7 @@
Scheduler.executePeriodically(cleanupTask, 1000 * 30);
}
- private Location recoveryCheck(DataFile dataFile, Location location) throws IOException {
+ protected Location recoveryCheck(DataFile dataFile, Location location) throws IOException {
if (location == null) {
location = new Location();
location.setDataFileId(dataFile.getDataFileId());
@@ -213,7 +213,7 @@
return location;
}
- private void unmarshallState(ByteSequence sequence) throws IOException {
+ protected void unmarshallState(ByteSequence sequence) throws IOException {
ByteArrayInputStream bais = new ByteArrayInputStream(sequence.getData(), sequence.getOffset(), sequence.getLength());
DataInputStream dis = new DataInputStream(bais);
if (dis.readBoolean()) {
@@ -596,7 +596,7 @@
storeState(sync);
}
- private synchronized void storeState(boolean sync) throws IOException {
+ protected synchronized void storeState(boolean sync) throws IOException {
ByteSequence state = marshallState();
appender.storeItem(state, Location.MARK_TYPE, sync);
controlFile.store(state, sync);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java?rev=618082&r1=618081&r2=618082&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java Sun Feb 3 12:43:17 2008
@@ -28,14 +28,14 @@
*
* @version $Revision: 1.1.1.1 $
*/
-class DataFile extends LinkedNode implements Comparable<DataFile> {
+public class DataFile extends LinkedNode implements Comparable<DataFile> {
- private final File file;
- private final Integer dataFileId;
- private final int preferedSize;
+ protected final File file;
+ protected final Integer dataFileId;
+ protected final int preferedSize;
- private int length;
- private int referenceCount;
+ protected int length;
+ protected int referenceCount;
DataFile(File file, int number, int preferedSize) {
this.file = file;
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ReadOnlyAsyncDataManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ReadOnlyAsyncDataManager.java?rev=618082&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ReadOnlyAsyncDataManager.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ReadOnlyAsyncDataManager.java Sun Feb 3 12:43:17 2008
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.async;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.activemq.thread.Scheduler;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * An AsyncDataManager that works in read only mode against multiple data directories.
+ * Useful for reading back archived data files.
+ */
+public class ReadOnlyAsyncDataManager extends AsyncDataManager {
+
+ private static final Log LOG = LogFactory.getLog(ReadOnlyAsyncDataManager.class);
+ private final ArrayList<File> dirs;
+
+ public ReadOnlyAsyncDataManager(final ArrayList<File> dirs) {
+ this.dirs = dirs;
+ }
+
+ @SuppressWarnings("unchecked")
+ public synchronized void start() throws IOException {
+ if (started) {
+ return;
+ }
+
+ started = true;
+
+ ArrayList<File> files = new ArrayList<File>();
+ for (File directory : dirs) {
+ final File d = directory;
+ File[] f = d.listFiles(new FilenameFilter() {
+ public boolean accept(File dir, String n) {
+ return dir.equals(d) && n.startsWith(filePrefix);
+ }
+ });
+ for (int i = 0; i < f.length; i++) {
+ files.add(f[i]);
+ }
+ }
+
+ for (File file : files) {
+ try {
+ String n = file.getName();
+ String numStr = n.substring(filePrefix.length(), n.length());
+ int num = Integer.parseInt(numStr);
+ DataFile dataFile = new ReadOnlyDataFile(file, num, preferedFileLength);
+ fileMap.put(dataFile.getDataFileId(), dataFile);
+ storeSize.addAndGet(dataFile.getLength());
+ } catch (NumberFormatException e) {
+ // Ignore file that do not match the pattern.
+ }
+ }
+
+ // Sort the list so that we can link the DataFiles together in the
+ // right order.
+ List<DataFile> dataFiles = new ArrayList<DataFile>(fileMap.values());
+ Collections.sort(dataFiles);
+ currentWriteFile = null;
+ for (DataFile df : dataFiles) {
+ if (currentWriteFile != null) {
+ currentWriteFile.linkAfter(df);
+ }
+ currentWriteFile = df;
+ fileByFileMap.put(df.getFile(), df);
+ }
+
+ // Need to check the current Write File to see if there was a partial
+ // write to it.
+ if (currentWriteFile != null) {
+
+ // See if the lastSyncedLocation is valid..
+ Location l = lastAppendLocation.get();
+ if (l != null && l.getDataFileId() != currentWriteFile.getDataFileId().intValue()) {
+ l = null;
+ }
+ }
+ }
+
+ public synchronized void close() throws IOException {
+ if (!started) {
+ return;
+ }
+ accessorPool.close();
+ fileMap.clear();
+ fileByFileMap.clear();
+ started = false;
+ }
+
+
+ public Location getFirstLocation() throws IllegalStateException, IOException {
+ if( currentWriteFile == null ) {
+ return null;
+ }
+
+ DataFile first = (DataFile)currentWriteFile.getHeadNode();
+ Location cur = new Location();
+ cur.setDataFileId(first.getDataFileId());
+ cur.setOffset(0);
+ cur.setSize(0);
+ return getNextLocation(cur);
+ }
+
+ @Override
+ public synchronized boolean delete() throws IOException {
+ throw new RuntimeException("Cannot delete a ReadOnlyAsyncDataManager");
+ }
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ReadOnlyAsyncDataManager.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ReadOnlyAsyncDataManager.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ReadOnlyDataFile.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ReadOnlyDataFile.java?rev=618082&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ReadOnlyDataFile.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ReadOnlyDataFile.java Sun Feb 3 12:43:17 2008
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.async;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+import org.apache.activemq.util.IOHelper;
+
+/**
+ * Allows you to open a data file in read only mode. Useful when working with
+ * archived data files.
+ */
+public class ReadOnlyDataFile extends DataFile {
+
+ ReadOnlyDataFile(File file, int number, int preferedSize) {
+ super(file, number, preferedSize);
+ }
+
+
+ public RandomAccessFile openRandomAccessFile(boolean appender) throws IOException {
+ RandomAccessFile rc = new RandomAccessFile(file, "r");
+ // When we start to write files size them up so that the OS has a chance
+ // to allocate the file contigously.
+ if (appender) {
+ if (length < preferedSize) {
+ rc.setLength(preferedSize);
+ }
+ }
+ return rc;
+ }
+
+ public void closeRandomAccessFile(RandomAccessFile file) throws IOException {
+ file.close();
+ }
+
+ public synchronized boolean delete() throws IOException {
+ throw new RuntimeException("Not valid on a read only file.");
+ }
+
+ public synchronized void move(File targetDirectory) throws IOException{
+ throw new RuntimeException("Not valid on a read only file.");
+ }
+
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ReadOnlyDataFile.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ReadOnlyDataFile.java
------------------------------------------------------------------------------
svn:keywords = Rev Date