You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/02/03 21:43:20 UTC

svn commit: r618082 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async: AsyncDataManager.java DataFile.java ReadOnlyAsyncDataManager.java ReadOnlyDataFile.java

Author: chirino
Date: Sun Feb  3 12:43:17 2008
New Revision: 618082

URL: http://svn.apache.org/viewvc?rev=618082&view=rev
Log:
Adding a ReadOnlyAsyncDataManager so that you can access a set of data files in a read only way.

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ReadOnlyAsyncDataManager.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ReadOnlyDataFile.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java?rev=618082&r1=618081&r2=618082&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java Sun Feb  3 12:43:17 2008
@@ -50,7 +50,7 @@
  * 
  * @version $Revision: 1.1.1.1 $
  */
-public final class AsyncDataManager {
+public class AsyncDataManager {
 
     public static final int CONTROL_RECORD_MAX_LENGTH = 1024;
     public static final int ITEM_HEAD_RESERVED_SPACE = 21;
@@ -75,28 +75,28 @@
 
     protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>();
 
-    File directory = new File(DEFAULT_DIRECTORY);
-    File directoryArchive = new File (DEFAULT_ARCHIVE_DIRECTORY);
-    String filePrefix = DEFAULT_FILE_PREFIX;
-    ControlFile controlFile;
-    boolean started;
-    boolean useNio = true;
-
-    private int maxFileLength = DEFAULT_MAX_FILE_LENGTH;
-    private int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - 1024 * 512;
-
-    private DataFileAppender appender;
-    private DataFileAccessorPool accessorPool = new DataFileAccessorPool(this);
-
-    private Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
-    private Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>();
-    private DataFile currentWriteFile;
-
-    private Location mark;
-    private final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>();
-    private Runnable cleanupTask;
-    private final AtomicLong storeSize;
-    private boolean archiveDataLogs;
+    protected File directory = new File(DEFAULT_DIRECTORY);
+    protected File directoryArchive = new File (DEFAULT_ARCHIVE_DIRECTORY);
+    protected String filePrefix = DEFAULT_FILE_PREFIX;
+    protected ControlFile controlFile;
+    protected boolean started;
+    protected boolean useNio = true;
+
+    protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH;
+    protected int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - 1024 * 512;
+
+    protected DataFileAppender appender;
+    protected DataFileAccessorPool accessorPool = new DataFileAccessorPool(this);
+
+    protected Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
+    protected Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>();
+    protected DataFile currentWriteFile;
+
+    protected Location mark;
+    protected final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>();
+    protected Runnable cleanupTask;
+    protected final AtomicLong storeSize;
+    protected boolean archiveDataLogs;
     
     public AsyncDataManager(AtomicLong storeSize) {
         this.storeSize=storeSize;
@@ -194,7 +194,7 @@
         Scheduler.executePeriodically(cleanupTask, 1000 * 30);
     }
 
-    private Location recoveryCheck(DataFile dataFile, Location location) throws IOException {
+    protected Location recoveryCheck(DataFile dataFile, Location location) throws IOException {
         if (location == null) {
             location = new Location();
             location.setDataFileId(dataFile.getDataFileId());
@@ -213,7 +213,7 @@
         return location;
     }
 
-    private void unmarshallState(ByteSequence sequence) throws IOException {
+    protected void unmarshallState(ByteSequence sequence) throws IOException {
         ByteArrayInputStream bais = new ByteArrayInputStream(sequence.getData(), sequence.getOffset(), sequence.getLength());
         DataInputStream dis = new DataInputStream(bais);
         if (dis.readBoolean()) {
@@ -596,7 +596,7 @@
         storeState(sync);
     }
 
-    private synchronized void storeState(boolean sync) throws IOException {
+    protected synchronized void storeState(boolean sync) throws IOException {
         ByteSequence state = marshallState();
         appender.storeItem(state, Location.MARK_TYPE, sync);
         controlFile.store(state, sync);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java?rev=618082&r1=618081&r2=618082&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java Sun Feb  3 12:43:17 2008
@@ -28,14 +28,14 @@
  * 
  * @version $Revision: 1.1.1.1 $
  */
-class DataFile extends LinkedNode implements Comparable<DataFile> {
+public class DataFile extends LinkedNode implements Comparable<DataFile> {
 
-    private final File file;
-    private final Integer dataFileId;
-    private final int preferedSize;
+    protected final File file;
+    protected final Integer dataFileId;
+    protected final int preferedSize;
 
-    private int length;
-    private int referenceCount;
+    protected int length;
+    protected int referenceCount;
 
     DataFile(File file, int number, int preferedSize) {
         this.file = file;

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ReadOnlyAsyncDataManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ReadOnlyAsyncDataManager.java?rev=618082&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ReadOnlyAsyncDataManager.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ReadOnlyAsyncDataManager.java Sun Feb  3 12:43:17 2008
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.async;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.activemq.thread.Scheduler;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * An AsyncDataManager that works in read only mode against multiple data directories.
+ * Useful for reading back archived data files.
+ */
+public class ReadOnlyAsyncDataManager extends AsyncDataManager {
+    
+    private static final Log LOG = LogFactory.getLog(ReadOnlyAsyncDataManager.class);
+    private final ArrayList<File> dirs;
+
+    public ReadOnlyAsyncDataManager(final ArrayList<File> dirs) {
+        this.dirs = dirs;
+    }
+
+    @SuppressWarnings("unchecked")
+    public synchronized void start() throws IOException {
+        if (started) {
+            return;
+        }
+
+        started = true;
+                
+        ArrayList<File> files = new ArrayList<File>();
+        for (File directory : dirs) {
+            final File d = directory;
+            File[] f = d.listFiles(new FilenameFilter() {
+                public boolean accept(File dir, String n) {
+                    return dir.equals(d) && n.startsWith(filePrefix);
+                }
+            });
+            for (int i = 0; i < f.length; i++) {
+                files.add(f[i]);
+            }
+        }
+       
+        for (File file : files) {
+            try {
+                String n = file.getName();
+                String numStr = n.substring(filePrefix.length(), n.length());
+                int num = Integer.parseInt(numStr);
+                DataFile dataFile = new ReadOnlyDataFile(file, num, preferedFileLength);
+                fileMap.put(dataFile.getDataFileId(), dataFile);
+                storeSize.addAndGet(dataFile.getLength());
+            } catch (NumberFormatException e) {
+                // Ignore file that do not match the pattern.
+            }
+        }
+
+        // Sort the list so that we can link the DataFiles together in the
+        // right order.
+        List<DataFile> dataFiles = new ArrayList<DataFile>(fileMap.values());
+        Collections.sort(dataFiles);
+        currentWriteFile = null;
+        for (DataFile df : dataFiles) {
+            if (currentWriteFile != null) {
+                currentWriteFile.linkAfter(df);
+            }
+            currentWriteFile = df;
+            fileByFileMap.put(df.getFile(), df);
+        }
+        
+        // Need to check the current Write File to see if there was a partial
+        // write to it.
+        if (currentWriteFile != null) {
+
+            // See if the lastSyncedLocation is valid..
+            Location l = lastAppendLocation.get();
+            if (l != null && l.getDataFileId() != currentWriteFile.getDataFileId().intValue()) {
+                l = null;
+            }
+        }
+    }
+    
+    public synchronized void close() throws IOException {
+        if (!started) {
+            return;
+        }
+        accessorPool.close();
+        fileMap.clear();
+        fileByFileMap.clear();
+        started = false;
+    }
+
+    
+    public Location getFirstLocation() throws IllegalStateException, IOException {
+        if( currentWriteFile == null ) {
+            return null;
+        }
+        
+        DataFile first = (DataFile)currentWriteFile.getHeadNode();
+        Location cur = new Location();
+        cur.setDataFileId(first.getDataFileId());
+        cur.setOffset(0);
+        cur.setSize(0);
+        return getNextLocation(cur);
+    }
+    
+    @Override
+    public synchronized boolean delete() throws IOException {
+        throw new RuntimeException("Cannot delete a ReadOnlyAsyncDataManager");
+    }    
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ReadOnlyAsyncDataManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ReadOnlyAsyncDataManager.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ReadOnlyDataFile.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ReadOnlyDataFile.java?rev=618082&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ReadOnlyDataFile.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ReadOnlyDataFile.java Sun Feb  3 12:43:17 2008
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.async;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+import org.apache.activemq.util.IOHelper;
+
+/**
+ * Allows you to open a data file in read only mode.  Useful when working with 
+ * archived data files.
+ */
+public class ReadOnlyDataFile extends DataFile {
+
+    ReadOnlyDataFile(File file, int number, int preferedSize) {
+        super(file, number, preferedSize);
+    }
+    
+    
+    public RandomAccessFile openRandomAccessFile(boolean appender) throws IOException {
+        RandomAccessFile rc = new RandomAccessFile(file, "r");
+        // When we start to write files size them up so that the OS has a chance
+        // to allocate the file contigously.
+        if (appender) {
+            if (length < preferedSize) {
+                rc.setLength(preferedSize);
+            }
+        }
+        return rc;
+    }
+
+    public void closeRandomAccessFile(RandomAccessFile file) throws IOException {
+        file.close();
+    }
+
+    public synchronized boolean delete() throws IOException {
+        throw new RuntimeException("Not valid on a read only file.");
+    }
+    
+    public synchronized void move(File targetDirectory) throws IOException{
+        throw new RuntimeException("Not valid on a read only file.");
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ReadOnlyDataFile.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ReadOnlyDataFile.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date