You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pm...@apache.org on 2009/03/02 08:57:31 UTC

svn commit: r749218 [18/34] - in /incubator/cassandra: branches/ dist/ nightly/ site/ tags/ trunk/ trunk/lib/ trunk/src/ trunk/src/org/ trunk/src/org/apache/ trunk/src/org/apache/cassandra/ trunk/src/org/apache/cassandra/analytics/ trunk/src/org/apache...

Added: incubator/cassandra/trunk/src/org/apache/cassandra/gms/HeartBeatState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/gms/HeartBeatState.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/gms/HeartBeatState.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/gms/HeartBeatState.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.cassandra.io.ICompactSerializer;
+
+
+/**
+ * HeartBeat State associated with any given endpoint. 
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class HeartBeatState
+{
+    private static ICompactSerializer<HeartBeatState> serializer_;
+    
+    static
+    {
+        serializer_ = new HeartBeatStateSerializer();
+    }
+    
+    int generation_;
+    AtomicInteger heartbeat_;
+    int version_;
+
+    HeartBeatState()
+    {
+    }
+    
+    HeartBeatState(int generation, int heartbeat)
+    {
+        this(generation, heartbeat, 0);
+    }
+    
+    HeartBeatState(int generation, int heartbeat, int version)
+    {
+        generation_ = generation;
+        heartbeat_ = new AtomicInteger(heartbeat);
+        version_ = version;
+    }
+
+    public static ICompactSerializer<HeartBeatState> serializer()
+    {
+        return serializer_;
+    }
+    
+    int getGeneration()
+    {
+        return generation_;
+    }
+    
+    void updateGeneration()
+    {
+        ++generation_;
+        version_ = VersionGenerator.getNextVersion();
+    }
+    
+    int getHeartBeat()
+    {
+        return heartbeat_.get();
+    }
+    
+    void updateHeartBeat()
+    {
+        heartbeat_.incrementAndGet();      
+        version_ = VersionGenerator.getNextVersion();
+    }
+    
+    int getHeartBeatVersion()
+    {
+        return version_;
+    }
+};
+
+class HeartBeatStateSerializer implements ICompactSerializer<HeartBeatState>
+{
+    public void serialize(HeartBeatState hbState, DataOutputStream dos) throws IOException
+    {
+        dos.writeInt(hbState.generation_);
+        dos.writeInt(hbState.heartbeat_.get());
+        dos.writeInt(hbState.version_);
+    }
+    
+    public HeartBeatState deserialize(DataInputStream dis) throws IOException
+    {
+        return new HeartBeatState(dis.readInt(), dis.readInt(), dis.readInt());
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/gms/IEndPointStateChangePublisher.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/gms/IEndPointStateChangePublisher.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/gms/IEndPointStateChangePublisher.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/gms/IEndPointStateChangePublisher.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+/**
+ * This is implemented by the Gossiper module to publish change events to interested parties.
+ * Interested parties register/unregister interest by invoking the methods of this interface.
+ * 
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IEndPointStateChangePublisher
+{
+    /**
+     * Register for interesting state changes.
+     * @param subcriber module which implements the IEndPointStateChangeSubscriber
+     */
+    public void register(IEndPointStateChangeSubscriber subcriber);
+    
+    /**
+     * Unregister interest for state changes.
+     * @param subcriber module which implements the IEndPointStateChangeSubscriber
+     */
+    public void unregister(IEndPointStateChangeSubscriber subcriber);
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/gms/IEndPointStateChangeSubscriber.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/gms/IEndPointStateChangeSubscriber.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/gms/IEndPointStateChangeSubscriber.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/gms/IEndPointStateChangeSubscriber.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import org.apache.cassandra.net.EndPoint;
+
+/**
+ * This is called by an instance of the IEndPointStateChangePublisher to notify
+ * interested parties about changes in the the state associated with any endpoint.
+ * For instance if node A figures there is a changes in state for an endpoint B
+ * it notifies all interested parties of this change. It is upto to the registered
+ * instance to decide what he does with this change. Not all modules maybe interested 
+ * in all state changes.
+ *  
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IEndPointStateChangeSubscriber
+{
+    /**
+     * Use to inform interested parties about the change in the state
+     * for specified endpoint
+     * 
+     * @param endpoint endpoint for which the state change occured.
+     * @param epState state that actually changed for the above endpoint.
+     */
+    public void onChange(EndPoint endpoint, EndPointState epState);
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/gms/IFailureDetectionEventListener.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/gms/IFailureDetectionEventListener.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/gms/IFailureDetectionEventListener.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/gms/IFailureDetectionEventListener.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import org.apache.cassandra.net.EndPoint;
+
+/**
+ * Implemented by the Gossiper to either convict/suspect an endpoint
+ * based on the PHI calculated by the Failure Detector on the inter-arrival
+ * times of the heart beats.
+ *  
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IFailureDetectionEventListener
+{  
+    /**
+     * Convict the specified endpoint.
+     * @param ep endpoint to be convicted
+     */
+    public void convict(EndPoint ep);
+    
+    /**
+     * Suspect the specified endpoint.
+     * @param ep endpoint to be suspected.
+     */
+    public void suspect(EndPoint ep);    
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/gms/IFailureDetector.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/gms/IFailureDetector.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/gms/IFailureDetector.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/gms/IFailureDetector.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import org.apache.cassandra.net.EndPoint;
+
+/**
+ * An interface that provides an application with the ability
+ * to query liveness information of a node in the cluster. It 
+ * also exposes methods which help an application register callbacks
+ * for notifications of liveness information of nodes.
+ * 
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IFailureDetector
+{
+    /**
+     * Failure Detector's knowledge of whether a node is up or
+     * down.
+     * 
+     * @param ep endpoint in question.
+     * @return true if UP and false if DOWN.
+     */
+    public boolean isAlive(EndPoint ep);
+    
+    /**
+     * This method is invoked by any entity wanting to interrogate the status of an endpoint. 
+     * In our case it would be the Gossiper. The Failure Detector will then calculate Phi and
+     * deem an endpoint as suspicious or alive as explained in the Hayashibara paper. 
+     * 
+     * param ep endpoint for which we interpret the inter arrival times.
+    */
+    public void intepret(EndPoint ep);
+    
+    /**
+     * This method is invoked by the receiver of the heartbeat. In our case it would be
+     * the Gossiper. Gossiper inform the Failure Detector on receipt of a heartbeat. The
+     * FailureDetector will then sample the arrival time as explained in the paper.
+     * 
+     * param ep endpoint being reported.
+    */
+    public void report(EndPoint ep);
+    
+    /**
+     * Register interest for Failure Detector events. 
+     * @param listener implementation of an application provided IFailureDetectionEventListener 
+     */
+    public void registerFailureDetectionEventListener(IFailureDetectionEventListener listener);
+    
+    /**
+     * Un-register interest for Failure Detector events. 
+     * @param listener implementation of an application provided IFailureDetectionEventListener 
+     */
+    public void unregisterFailureDetectionEventListener(IFailureDetectionEventListener listener);
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/gms/IFailureNotification.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/gms/IFailureNotification.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/gms/IFailureNotification.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/gms/IFailureNotification.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import org.apache.cassandra.net.EndPoint;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IFailureNotification
+{   
+    public void suspect(EndPoint ep);
+    public void revive(EndPoint ep);
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/gms/JoinMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/gms/JoinMessage.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/gms/JoinMessage.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/gms/JoinMessage.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.cassandra.io.ICompactSerializer;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class JoinMessage
+{
+    private static ICompactSerializer<JoinMessage> serializer_;
+    static
+    {
+        serializer_ = new JoinMessageSerializer();
+    }
+    
+    static ICompactSerializer<JoinMessage> serializer()
+    {
+        return serializer_;
+    }
+    
+    String clusterId_;
+    
+    JoinMessage(String clusterId)
+    {
+        clusterId_ = clusterId;
+    }
+}
+
+class JoinMessageSerializer implements ICompactSerializer<JoinMessage>
+{
+    public void serialize(JoinMessage joinMessage, DataOutputStream dos) throws IOException
+    {    
+        dos.writeUTF(joinMessage.clusterId_);         
+    }
+
+    public JoinMessage deserialize(DataInputStream dis) throws IOException
+    {
+        String clusterId = dis.readUTF();
+        return new JoinMessage(clusterId);
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/gms/PureRandom.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/gms/PureRandom.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/gms/PureRandom.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/gms/PureRandom.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.util.Random;
+
+import org.apache.cassandra.utils.BitSet;
+
+
+
+/**
+ * Implementation of a PureRandomNumber generator. Use this class cautiously. Not
+ * for general purpose use. Currently this is used by the Gossiper to choose a random
+ * endpoint to Gossip to.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class PureRandom extends Random
+{
+    private BitSet bs_ = new BitSet();
+    private int lastUb_;
+
+    PureRandom()
+    {
+        super();
+    }
+
+    public int nextInt(int ub)
+    {
+    	if (ub <= 0)
+    		throw new IllegalArgumentException("ub must be positive");
+
+        if ( lastUb_ !=  ub )
+        {
+            bs_.clear();
+            lastUb_ = ub;
+        }
+        else if(bs_.cardinality() == ub)
+        {
+        	bs_.clear();
+        }
+
+        int value = super.nextInt(ub);
+        while ( bs_.get(value) )
+        {
+            value = super.nextInt(ub);
+        }
+        bs_.set(value);
+        return value;
+    }
+
+    public static void main(String[] args) throws Throwable
+    {
+    	Random pr = new PureRandom();
+        int ubs[] = new int[] { 2, 3, 1, 10, 5, 0};
+
+        for (int ub : ubs)
+        {
+            System.out.println("UB: " + String.valueOf(ub));
+            for (int j = 0; j < 10; j++)
+            {
+                int junk = pr.nextInt(ub);
+                // Do something with junk so JVM doesn't optimize away
+                System.out.println(junk);
+            }
+        }
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/gms/VersionGenerator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/gms/VersionGenerator.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/gms/VersionGenerator.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/gms/VersionGenerator.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A unique version number generator for any state that is generated by the 
+ * local node.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class VersionGenerator
+{
+    private static AtomicInteger version_ = new AtomicInteger(0);
+    
+    public static int getNextVersion()
+    {
+        return version_.incrementAndGet();
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/io/AIORandomAccessFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/AIORandomAccessFile.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/AIORandomAccessFile.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/AIORandomAccessFile.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,789 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousFileChannel;
+import java.nio.channels.CompletionHandler;
+import java.nio.file.OpenOption;
+import java.nio.file.StandardOpenOption;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.cassandra.concurrent.ContinuationContext;
+import org.apache.cassandra.concurrent.ContinuationsExecutor;
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.IContinuable;
+import org.apache.cassandra.concurrent.ThreadFactoryImpl;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.continuations.Suspendable;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.GuidGenerator;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.commons.javaflow.Continuation;
+import org.apache.log4j.Logger;
+
+/**
+ * A <code>AIORandomAccessFile</code> is like a
+ * <code>RandomAccessFile</code>, but it uses a private buffer so that most
+ * operations do not require a disk access.
+ * <P>
+ * 
+ * Note: The operations on this class are unmonitored. Also, the correct
+ * functioning of the <code>RandomAccessFile</code> methods that are not
+ * overridden here relies on the implementation of those methods in the
+ * superclass.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public final class AIORandomAccessFile extends RandomAccessFile
+{   
+    private final static Logger logger_ = Logger.getLogger(AIORandomAccessFile.class);
+    private final static ThreadLocal<Runnable> tls_ = new InheritableThreadLocal<Runnable>();    
+    static final int LogBuffSz_ = 16; // 64K buffer
+    public static final int BuffSz_ = (1 << LogBuffSz_);
+    static final long BuffMask_ = ~(((long) BuffSz_) - 1L);
+    /* Used to lock the creation of the disk thread pool instance */
+    private static Lock createLock_ = new ReentrantLock(); 
+    private static ExecutorService diskIOPool_;          
+   
+    /**
+     * Submits a read request to the Kernel and is used
+     * only when running in Continuations mode. The kernel
+     * on read completion will resume the continuation passed
+     * in to complete the read request.
+     *  
+     * @author alakshman
+     *
+     */    
+    class AIOReader implements IContinuable
+    {              
+        /* the continuation that needs to be resumed on read completion */
+        private ContinuationContext continuationCtx_;                
+        
+        AIOReader(ContinuationContext continuationCtx)
+        {           
+            continuationCtx_ = continuationCtx;            
+        }
+        
+        public void run(Continuation c)
+        {                     
+            /* submit the read request */               
+            continuationCtx_.setContinuation(c);
+            ByteBuffer buffer = ByteBuffer.wrap( buffer_ );              
+            fileChannel_.read(buffer, diskPos_, continuationCtx_, new ReadCompletionHandler());
+        }
+    }  
+    
+    /**
+     * Read completion handler for AIO framework. The context
+     * that is passed in, is a Continuation that needs to be
+     * resumed on read completion.
+     * 
+     * @author alakshman
+     *
+     * @param <V> number of bytes read.
+     */    
+    class ReadCompletionHandler implements CompletionHandler<Integer, ContinuationContext>
+    {                   
+        public void cancelled(ContinuationContext attachment)
+        {
+        }
+        
+        public void completed(Integer result, ContinuationContext attachment)
+        {    
+            logger_.debug("Bytes read " + result);
+            if ( attachment != null )
+            {
+                Continuation c = attachment.getContinuation();     
+                attachment.result(result);
+                if ( c != null )
+                {                                           
+                    c = Continuation.continueWith(c, attachment);
+                    ContinuationsExecutor.doPostProcessing(c);
+                }
+            }            
+        }
+        
+        public void failed(Throwable th, ContinuationContext attachment)
+        {
+        }
+    }
+    
+    /*
+     * This implementation is based on the buffer implementation in Modula-3's
+     * "Rd", "Wr", "RdClass", and "WrClass" interfaces.
+     */
+    private boolean dirty_; // true iff unflushed bytes exist
+    private boolean closed_; // true iff the file is closed
+    private long curr_; // current position in file
+    private long lo_, hi_; // bounds on characters in "buff"
+    private byte[] buffer_ = new byte[0]; // local buffer
+    private long maxHi_; // this.lo + this.buff.length
+    private boolean hitEOF_; // buffer contains last file block?
+    private long diskPos_; // disk position    
+    private AsynchronousFileChannel fileChannel_; // asynchronous file channel used for AIO.
+    private boolean bContinuations_; // indicates if used in continuations mode.    
+    
+    /*
+     * To describe the above fields, we introduce the following abstractions for
+     * the file "f":
+     * 
+     * len(f) the length of the file curr(f) the current position in the file
+     * c(f) the abstract contents of the file disk(f) the contents of f's
+     * backing disk file closed(f) true iff the file is closed
+     * 
+     * "curr(f)" is an index in the closed interval [0, len(f)]. "c(f)" is a
+     * character sequence of length "len(f)". "c(f)" and "disk(f)" may differ if
+     * "c(f)" contains unflushed writes not reflected in "disk(f)". The flush
+     * operation has the effect of making "disk(f)" identical to "c(f)".
+     * 
+     * A file is said to be *valid* if the following conditions hold:
+     * 
+     * V1. The "closed" and "curr" fields are correct:
+     * 
+     * f.closed == closed(f) f.curr == curr(f)
+     * 
+     * V2. The current position is either contained in the buffer, or just past
+     * the buffer:
+     * 
+     * f.lo <= f.curr <= f.hi
+     * 
+     * V3. Any (possibly) unflushed characters are stored in "f.buff":
+     * 
+     * (forall i in [f.lo, f.curr): c(f)[i] == f.buff[i - f.lo])
+     * 
+     * V4. For all characters not covered by V3, c(f) and disk(f) agree:
+     * 
+     * (forall i in [f.lo, len(f)): i not in [f.lo, f.curr) => c(f)[i] ==
+     * disk(f)[i])
+     * 
+     * V5. "f.dirty" is true iff the buffer contains bytes that should be
+     * flushed to the file; by V3 and V4, only part of the buffer can be dirty.
+     * 
+     * f.dirty == (exists i in [f.lo, f.curr): c(f)[i] != f.buff[i - f.lo])
+     * 
+     * V6. this.maxHi == this.lo + this.buff.length
+     * 
+     * Note that "f.buff" can be "null" in a valid file, since the range of
+     * characters in V3 is empty when "f.lo == f.curr".
+     * 
+     * A file is said to be *ready* if the buffer contains the current position,
+     * i.e., when:
+     * 
+     * R1. !f.closed && f.buff != null && f.lo <= f.curr && f.curr < f.hi
+     * 
+     * When a file is ready, reading or writing a single byte can be performed
+     * by reading or writing the in-memory buffer without performing a disk
+     * operation.
+     */
+    
+    /**
+     * Open a AIORandomAccessFile for r/w operations.
+     * @param file file to be opened.
+     */
+    public AIORandomAccessFile(File file) throws IOException
+    {
+        super(file, "rw");
+        this.init(file, 0, false);
+    }
+    
+    /**
+     * Open a AIORandomAccessFile for r/w operations.
+     * @param file file to be opened.
+     * @param bContinuations specify if continuations
+     *        support is required.
+     */
+    public AIORandomAccessFile(File file, boolean bContinuations) throws IOException
+    {
+        super(file, "rw");
+        this.init(file, 0, bContinuations);        
+    }
+    
+    /**
+     * Open a AIORandomAccessFile for r/w operations.
+     * @param file file to be opened
+     * @param size amount of data to be buffer as part 
+     *        of r/w operations
+     * @throws IOException
+     */
+    public AIORandomAccessFile(File file, int size) throws IOException
+    {
+        super(file, "rw");
+        init(file, size, false);
+    }
+    
+    /**
+     * Open a AIORandomAccessFile for r/w operations.
+     * @param file file to be opened
+     * @param size amount of data to be buffer as part 
+     *        of r/w operations
+     * @param bContinuations specify if continuations
+     *        support is required.
+     * @throws IOException
+     */
+    public AIORandomAccessFile(File file, int size, boolean bContinuations) throws IOException
+    {
+        super(file, "rw");
+        init(file, size, bContinuations);
+    }
+    
+    /**
+     * Open a AIORandomAccessFile for r/w operations.
+     * @param name of file to be opened.     
+     */
+    public AIORandomAccessFile(String name) throws IOException
+    {
+        super(name, "rw");
+        this.init(new File(name), 0, false);
+    }
+    
+    /**
+     * Open a AIORandomAccessFile for r/w operations.
+     * @param name of file to be opened.
+     * @param bContinuations specify if continuations
+     *        support is required.
+     */
+    public AIORandomAccessFile(String name, boolean bContinuations) throws IOException
+    {
+        super(name, "rw");
+        this.init(new File(name), 0, bContinuations);
+    }
+    
+    /**
+     * Open a AIORandomAccessFile for r/w operations.
+     * @param name of file to be opened.
+     * @param size buffering size to be used.
+     */
+    public AIORandomAccessFile(String name, int size) throws IOException
+    {
+        super(name, "rw");
+        this.init(new File(name), size, false);
+    }
+    
+    /**
+     * Open a AIORandomAccessFile for r/w operations.
+     * @param name of file to be opened.
+     * @param name of file to be opened.
+     * @param bContinuations specify if continuations
+     *        support is required.
+     */
+    public AIORandomAccessFile(String name, int size, boolean bContinuations) throws IOException
+    {
+        super(name, "rw");
+        this.init(new File(name), size, bContinuations);
+    }
+    
+    private void init(File file, int size, boolean bVal) throws IOException
+    {
+        bContinuations_ = bVal;
+        OpenOption[] openOptions = new OpenOption[]{StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.READ};
+        this.dirty_ = this.closed_ = false;
+        this.lo_ = this.curr_ = this.hi_ = 0;
+        this.buffer_ = (size > BuffSz_) ? new byte[size] : new byte[BuffSz_];
+        this.maxHi_ = (long) BuffSz_;
+        this.hitEOF_ = false;
+        this.diskPos_ = 0L;        
+        /* set up the asynchronous file channel */
+        if ( diskIOPool_ == null )
+        {
+            createLock_.lock(); 
+            try
+            {
+                if ( diskIOPool_ == null )
+                {
+                    int maxThreads = DatabaseDescriptor.getThreadsPerPool();
+                    diskIOPool_ = new ContinuationsExecutor( maxThreads,
+                            maxThreads,
+                            Integer.MAX_VALUE,
+                            TimeUnit.SECONDS,
+                            new LinkedBlockingQueue<Runnable>(),
+                            new ThreadFactoryImpl("DISK-IO-POOL")
+                            ); 
+                }                                            
+            }
+            finally
+            {
+                createLock_.unlock();
+            }            
+        }
+        Set<OpenOption> set = new HashSet<OpenOption>( Arrays.asList(openOptions) );
+        fileChannel_ = AsynchronousFileChannel.open(file.toPath(), set, diskIOPool_);        
+    }
+    
+    public void close() throws IOException
+    {
+        this.onClose();
+        this.closed_ = true;     
+        fileChannel_.close();
+    }
+    
+    /**
+     * Flush any bytes in the file's buffer that have not yet been written to
+     * disk. If the file was created read-only, this method is a no-op.
+     */
+    public void flush() throws IOException
+    {
+        this.flushBuffer();
+    }
+    
+    /**
+     * Flush any dirty bytes in the buffer to disk. 
+    */
+    private void flushBuffer() throws IOException
+    {        
+        if (this.dirty_)
+        {            
+            int len = (int) (this.curr_ - this.lo_);            
+            doWrite(this.lo_, false);
+            this.diskPos_ = this.curr_;
+            this.dirty_ = false;
+        }                
+    }
+    
+    /**
+     * Invoked when close() is invoked and causes the flush
+     * of the last few bytes to block when the write is submitted.
+     * @throws IOException
+     */
+    private void onClose() throws IOException
+    {
+        if (this.dirty_)
+        {
+            int len = (int) (this.curr_ - this.lo_);            
+            doWrite(this.lo_, true);
+            this.diskPos_ = this.curr_;
+            this.dirty_ = false;
+        }  
+    }
+    
+    /**
+     * This method submits an I/O for write where the write happens at
+     * <i>position</i> within the file.
+     * 
+     * @param position to seek to within the file
+     * @param onClose indicates if this method was invoked on a close().
+     */
+    private void doWrite(long position, boolean onClose)
+    {          
+        ByteBuffer buffer = ByteBuffer.wrap(buffer_);  
+        int length = (int) (this.curr_ - this.lo_);
+        buffer.limit(length);         
+        Future<Integer> futurePtr = fileChannel_.write(buffer, position, null, new WriteCompletionHandler<Integer>());  
+        if ( onClose )
+        {
+            try
+            {
+                /* this will block but will execute only on a close() */
+                futurePtr.get();
+            }
+            catch (ExecutionException ex)
+            {
+                logger_.warn(LogUtil.throwableToString(ex));
+            }
+            catch (InterruptedException ex)
+            {
+                logger_.warn(LogUtil.throwableToString(ex));
+            }
+        }
+        buffer_ = new byte[buffer_.length];
+    }
+    
+    /**
+     * Read at most "this.buff.length" bytes into "this.buff", returning the
+     * number of bytes read. If the return result is less than
+     * "this.buff.length", then EOF was read.
+     */
+    private int fillBuffer() throws IOException
+    {        
+        int cnt = 0;        
+        ByteBuffer buffer = ByteBuffer.allocate(buffer_.length);                 
+        Future<Integer> futurePtr = fileChannel_.read(buffer, this.diskPos_, null, new ReadCompletionHandler());       
+                      
+        try
+        {
+            /* 
+             * This should block
+            */
+            cnt = futurePtr.get();
+        }
+        catch (ExecutionException ex)
+        {
+            logger_.warn(LogUtil.throwableToString(ex));
+        }
+        catch (InterruptedException ex)
+        {
+            logger_.warn(LogUtil.throwableToString(ex));
+        }
+        
+        if ( (cnt < 0) && ( this.hitEOF_ = (cnt < this.buffer_.length) ) ) 
+        {
+            // make sure buffer that wasn't read is initialized with -1
+            if ( cnt < 0 )
+                cnt = 0;            
+            Arrays.fill(buffer_, cnt, this.buffer_.length, (byte) 0xff);
+        }
+        else
+        {            
+            buffer_ = buffer.array();
+        }
+        this.diskPos_ += cnt;
+        return cnt;
+    } 
+ 
+    /**
+     * Read as much data as indicated by the size of the buffer.
+     * This method is only invoked in continuation mode.
+     */
+    private int fillBuffer2()
+    {    
+        ContinuationContext continuationCtx = (ContinuationContext)Continuation.getContext();        
+        IContinuable reader = new AIOReader( continuationCtx );
+        ContinuationsExecutor.putInTls(reader);
+        /* suspend the continuation */                 
+        Continuation.suspend();
+                                
+        continuationCtx = (ContinuationContext)Continuation.getContext();                        
+        int cnt = (Integer)continuationCtx.result();
+        
+        if ( (cnt < 0) && ( this.hitEOF_ = (cnt < this.buffer_.length) ) ) 
+        {
+            // make sure buffer that wasn't read is initialized with -1
+            if ( cnt < 0 )
+                cnt = 0;            
+            Arrays.fill(buffer_, cnt, this.buffer_.length, (byte) 0xff);
+        }           
+        this.diskPos_ += cnt;
+        return cnt;
+    }
+    
+    /**
+     * This method positions <code>this.curr</code> at position
+     * <code>pos</code>. If <code>pos</code> does not fall in the current
+     * buffer, it flushes the current buffer and loads the correct one.
+     * <p>
+     * 
+     * On exit from this routine <code>this.curr == this.hi</code> iff
+     * <code>pos</code> is at or past the end-of-file, which can only happen
+     * if the file was opened in read-only mode.
+     */
+    public void seek(long pos) throws IOException
+    {        
+        if (pos >= this.hi_ || pos < this.lo_)
+        {
+            // seeking outside of current buffer -- flush and read
+            this.flushBuffer();
+            this.lo_ = pos & BuffMask_; // start at BuffSz boundary
+            this.maxHi_ = this.lo_ + (long) this.buffer_.length;
+            if (this.diskPos_ != this.lo_)
+            {
+                this.diskPos_ = this.lo_;
+            }
+            
+            int n = 0;
+            /* Perform the read operations in continuation style */
+            if ( bContinuations_ )
+            {                   
+                n = fillBuffer2();
+            }
+            else
+            {
+                n = fillBuffer();
+            }
+            this.hi_ = this.lo_ + (long) n;
+        }
+        else
+        {
+            // seeking inside current buffer -- no read required
+            if (pos < this.curr_)
+            {
+                // if seeking backwards, we must flush to maintain V4
+                this.flushBuffer();
+            }
+        }
+        this.curr_ = pos;
+    }
+
+    
+    public long getFilePointer()
+    {
+        return this.curr_;
+    }
+    
+    public long length() throws IOException
+    {
+        return Math.max(this.curr_, super.length());
+    }
+    
+    public int read() throws IOException
+    {
+        if (this.curr_ >= this.hi_)
+        {
+            // test for EOF
+            // if (this.hi < this.maxHi) return -1;
+            if (this.hitEOF_)
+                return -1;
+            
+            // slow path -- read another buffer
+            this.seek(this.curr_);
+            if (this.curr_ == this.hi_)
+                return -1;
+        }
+        byte res = this.buffer_[(int) (this.curr_ - this.lo_)];
+        this.curr_++;
+        return ((int) res) & 0xFF; // convert byte -> int
+    }
+    
+    public int read(byte[] b) throws IOException
+    {
+        return this.read(b, 0, b.length);
+    }
+    
+    public int read(byte[] b, int off, int len) throws IOException
+    {
+        if (this.curr_ >= this.hi_)
+        {
+            // test for EOF
+            // if (this.hi < this.maxHi) return -1;
+            if (this.hitEOF_)
+                return -1;
+            
+            // slow path -- read another buffer
+            this.seek(this.curr_);
+            if (this.curr_ == this.hi_)
+                return -1;
+        }
+        len = Math.min(len, (int) (this.hi_ - this.curr_));
+        int buffOff = (int) (this.curr_ - this.lo_);
+        System.arraycopy(this.buffer_, buffOff, b, off, len);
+        this.curr_ += len;
+        return len;
+    }
+    
+    public void write(int b) throws IOException
+    {
+        if (this.curr_ >= this.hi_)
+        {
+            if (this.hitEOF_ && this.hi_ < this.maxHi_)
+            {
+                // at EOF -- bump "hi"
+                this.hi_++;
+            }
+            else
+            {
+                // slow path -- write current buffer; read next one
+                this.seek(this.curr_);
+                if (this.curr_ == this.hi_)
+                {
+                    // appending to EOF -- bump "hi"
+                    this.hi_++;
+                }
+            }
+        }
+        this.buffer_[(int) (this.curr_ - this.lo_)] = (byte) b;
+        this.curr_++;
+        this.dirty_ = true;
+    }
+    
+    public void write(byte[] b) throws IOException
+    {
+        this.write(b, 0, b.length);
+    }
+    
+    public void write(byte[] b, int off, int len) throws IOException
+    {
+        while (len > 0)
+        {
+            int n = this.writeAtMost(b, off, len);
+            off += n;
+            len -= n;
+        }
+        this.dirty_ = true;
+    }
+    
+    /*
+     * Write at most "len" bytes to "b" starting at position "off", and return
+     * the number of bytes written.
+     */
+    private int writeAtMost(byte[] b, int off, int len) throws IOException
+    {
+        if (this.curr_ >= this.hi_)
+        {
+            if (this.hitEOF_ && this.hi_ < this.maxHi_)
+            {
+                // at EOF -- bump "hi"
+                this.hi_ = this.maxHi_;
+            }
+            else
+            {
+                // slow path -- write current buffer; read next one
+                this.seek(this.curr_);
+                if (this.curr_ == this.hi_)
+                {
+                    // appending to EOF -- bump "hi"
+                    this.hi_ = this.maxHi_;
+                }
+            }
+        }
+        len = Math.min(len, (int) (this.hi_ - this.curr_));
+        int buffOff = (int) (this.curr_ - this.lo_);
+        System.arraycopy(b, off, this.buffer_, buffOff, len);
+        this.curr_ += len;
+        return len;
+    }
+    
+    public static void main(String[] args) throws Throwable
+    {  
+        /*
+        int i = 0;
+        try
+        {
+            RandomAccessFile aRaf2 = new AIORandomAccessFile( new File("/var/cassandra/test.dat"), 64*1024);        
+            aRaf2.seek(0L);
+            while ( i < 10000 )
+            {
+                aRaf2.writeInt(32);
+                aRaf2.writeUTF("Avinash Lakshman");
+                ++i;
+            }
+            aRaf2.close();
+        }
+        catch( IOException ex )
+        {
+            ex.printStackTrace();
+        }
+        */
+        /*
+        int j = 0;
+        try
+        {
+            RandomAccessFile aRaf2 = new AIORandomAccessFile( new File("/var/cassandra/test.dat") );                    
+            while ( j < 10 )
+            {
+                System.out.println( aRaf2.readInt() );
+                System.out.println( aRaf2.readUTF() );
+                ++j;
+            }
+            aRaf2.close();
+        }
+        catch( IOException ex )
+        {
+            ex.printStackTrace();
+        }
+        */
+                
+        ExecutorService es = new ContinuationsExecutor(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>() );
+        es.execute(new ReadImpl());               
+    }
+}
+
+class ReadImpl implements Runnable
+{
+    public void run()
+    {
+        int i = 0;
+        try
+        {
+            System.out.println("About to start the whole thing ...");
+            AIORandomAccessFile aRaf2 = new AIORandomAccessFile( new File("/var/cassandra/test.dat"), true );  
+            System.out.println("About to seek ...");
+            
+            //aRaf2.seek(0L);                        
+            System.out.println( aRaf2.readInt() );
+            System.out.println( aRaf2.readUTF() );
+            
+            System.out.println("About to seek a second time ...");
+            aRaf2.seek(66000L);
+            System.out.println( aRaf2.readInt() );
+            System.out.println( aRaf2.readUTF() );
+            
+            aRaf2.close();
+        }
+        catch( IOException ex )
+        {
+            ex.printStackTrace();
+        }        
+    }
+}
+
+class WriteImpl implements Runnable
+{
+    public void run()
+    {
+        int i = 0;
+        try
+        {
+            AIORandomAccessFile aRaf2 = new AIORandomAccessFile( new File("/var/cassandra/test.dat"));                    
+            while ( i < 10000 )
+            {
+                aRaf2.writeInt(32);
+                aRaf2.writeUTF("Avinash Lakshman thinks John McCain is an idiot");
+                ++i;
+            }
+            aRaf2.close();
+        }
+        catch( IOException ex )
+        {
+            ex.printStackTrace();
+        }        
+    }
+}
+
+/**
+ * Write completion handler for AIO framework. The context
+ * that is passed in, is a Continuation that needs to be
+ * resumed on write completion. For now the continuation is
+ * not used at all.
+ * 
+ * @author alakshman
+ *
+ * @param <V> number of bytes written.
+ */
+class WriteCompletionHandler<V> implements CompletionHandler<V, Continuation>
+{
+    private final static Logger logger_ = Logger.getLogger(WriteCompletionHandler.class);
+    
+    public void cancelled(Continuation attachment)
+    {
+    }
+    
+    public void completed(V result, Continuation attachment)
+    {               
+        logger_.debug("Bytes written " + result);
+        while ( attachment != null )
+        {
+            attachment = Continuation.continueWith(attachment);
+        }
+    }
+    
+    public void failed(Throwable th, Continuation attachment)
+    {
+    }
+}
+
+

Added: incubator/cassandra/trunk/src/org/apache/cassandra/io/BufferedRandomAccessFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/BufferedRandomAccessFile.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/BufferedRandomAccessFile.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/BufferedRandomAccessFile.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,375 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.*;
+import java.util.Arrays;
+
+import org.apache.log4j.Logger;
+
+/**
+ * A <code>BufferedRandomAccessFile</code> is like a
+ * <code>RandomAccessFile</code>, but it uses a private buffer so that most
+ * operations do not require a disk access.
+ * <P>
+ * 
+ * Note: The operations on this class are unmonitored. Also, the correct
+ * functioning of the <code>RandomAccessFile</code> methods that are not
+ * overridden here relies on the implementation of those methods in the
+ * superclass.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public final class BufferedRandomAccessFile extends RandomAccessFile
+{
+    private static final Logger logger_ = Logger.getLogger(BufferedRandomAccessFile.class);
+    static final int LogBuffSz_ = 16; // 64K buffer
+    public static final int BuffSz_ = (1 << LogBuffSz_);
+    static final long BuffMask_ = ~(((long) BuffSz_) - 1L);
+    
+    /*
+     * This implementation is based on the buffer implementation in Modula-3's
+     * "Rd", "Wr", "RdClass", and "WrClass" interfaces.
+     */
+    private boolean dirty_; // true iff unflushed bytes exist
+    private boolean closed_; // true iff the file is closed
+    private long curr_; // current position in file
+    private long lo_, hi_; // bounds on characters in "buff"
+    private byte[] buff_; // local buffer
+    private long maxHi_; // this.lo + this.buff.length
+    private boolean hitEOF_; // buffer contains last file block?
+    private long diskPos_; // disk position
+    
+    /*
+     * To describe the above fields, we introduce the following abstractions for
+     * the file "f":
+     * 
+     * len(f) the length of the file curr(f) the current position in the file
+     * c(f) the abstract contents of the file disk(f) the contents of f's
+     * backing disk file closed(f) true iff the file is closed
+     * 
+     * "curr(f)" is an index in the closed interval [0, len(f)]. "c(f)" is a
+     * character sequence of length "len(f)". "c(f)" and "disk(f)" may differ if
+     * "c(f)" contains unflushed writes not reflected in "disk(f)". The flush
+     * operation has the effect of making "disk(f)" identical to "c(f)".
+     * 
+     * A file is said to be *valid* if the following conditions hold:
+     * 
+     * V1. The "closed" and "curr" fields are correct:
+     * 
+     * f.closed == closed(f) f.curr == curr(f)
+     * 
+     * V2. The current position is either contained in the buffer, or just past
+     * the buffer:
+     * 
+     * f.lo <= f.curr <= f.hi
+     * 
+     * V3. Any (possibly) unflushed characters are stored in "f.buff":
+     * 
+     * (forall i in [f.lo, f.curr): c(f)[i] == f.buff[i - f.lo])
+     * 
+     * V4. For all characters not covered by V3, c(f) and disk(f) agree:
+     * 
+     * (forall i in [f.lo, len(f)): i not in [f.lo, f.curr) => c(f)[i] ==
+     * disk(f)[i])
+     * 
+     * V5. "f.dirty" is true iff the buffer contains bytes that should be
+     * flushed to the file; by V3 and V4, only part of the buffer can be dirty.
+     * 
+     * f.dirty == (exists i in [f.lo, f.curr): c(f)[i] != f.buff[i - f.lo])
+     * 
+     * V6. this.maxHi == this.lo + this.buff.length
+     * 
+     * Note that "f.buff" can be "null" in a valid file, since the range of
+     * characters in V3 is empty when "f.lo == f.curr".
+     * 
+     * A file is said to be *ready* if the buffer contains the current position,
+     * i.e., when:
+     * 
+     * R1. !f.closed && f.buff != null && f.lo <= f.curr && f.curr < f.hi
+     * 
+     * When a file is ready, reading or writing a single byte can be performed
+     * by reading or writing the in-memory buffer without performing a disk
+     * operation.
+     */
+    
+    /**
+     * Open a new <code>BufferedRandomAccessFile</code> on <code>file</code>
+     * in mode <code>mode</code>, which should be "r" for reading only, or
+     * "rw" for reading and writing.
+     */
+    public BufferedRandomAccessFile(File file, String mode) throws IOException
+    {
+        super(file, mode);
+        this.init(0);
+    }
+    
+    public BufferedRandomAccessFile(File file, String mode, int size) throws IOException
+    {
+        super(file, mode);
+        this.init(size);
+    }
+    
+    /**
+     * Open a new <code>BufferedRandomAccessFile</code> on the file named
+     * <code>name</code> in mode <code>mode</code>, which should be "r" for
+     * reading only, or "rw" for reading and writing.
+     */
+    public BufferedRandomAccessFile(String name, String mode) throws IOException
+    {
+        super(name, mode);
+        this.init(0);
+    }
+    
+    public BufferedRandomAccessFile(String name, String mode, int size) throws FileNotFoundException
+    {
+        super(name, mode);
+        this.init(size);
+    }
+    
+    private void init(int size)
+    {
+        this.dirty_ = this.closed_ = false;
+        this.lo_ = this.curr_ = this.hi_ = 0;
+        this.buff_ = (size > BuffSz_) ? new byte[size] : new byte[BuffSz_];
+        this.maxHi_ = (long) BuffSz_;
+        this.hitEOF_ = false;
+        this.diskPos_ = 0L;
+    }
+    
+    public void close() throws IOException
+    {
+        this.flush();
+        this.closed_ = true;
+        super.close();
+    }
+    
+    /**
+     * Flush any bytes in the file's buffer that have not yet been written to
+     * disk. If the file was created read-only, this method is a no-op.
+     */
+    public void flush() throws IOException
+    {
+        this.flushBuffer();
+    }
+    
+    /* Flush any dirty bytes in the buffer to disk. */
+    private void flushBuffer() throws IOException
+    {
+        if (this.dirty_)
+        {
+            if (this.diskPos_ != this.lo_)
+                super.seek(this.lo_);
+            int len = (int) (this.curr_ - this.lo_);
+            super.write(this.buff_, 0, len);
+            this.diskPos_ = this.curr_;
+            this.dirty_ = false;
+        }
+    }
+    
+    /*
+     * Read at most "this.buff.length" bytes into "this.buff", returning the
+     * number of bytes read. If the return result is less than
+     * "this.buff.length", then EOF was read.
+     */
+    private int fillBuffer() throws IOException
+    {
+        int cnt = 0;
+        int rem = this.buff_.length;
+        while (rem > 0)
+        {
+            int n = super.read(this.buff_, cnt, rem);
+            if (n < 0)
+                break;
+            cnt += n;
+            rem -= n;
+        }
+        if ( (cnt < 0) && (this.hitEOF_ = (cnt < this.buff_.length)) )
+        {
+            // make sure buffer that wasn't read is initialized with -1
+            Arrays.fill(this.buff_, cnt, this.buff_.length, (byte) 0xff);
+        }
+        this.diskPos_ += cnt;
+        return cnt;
+    }
+    
+    /*
+     * This method positions <code>this.curr</code> at position <code>pos</code>.
+     * If <code>pos</code> does not fall in the current buffer, it flushes the
+     * current buffer and loads the correct one.<p>
+     * 
+     * On exit from this routine <code>this.curr == this.hi</code> iff <code>pos</code>
+     * is at or past the end-of-file, which can only happen if the file was
+     * opened in read-only mode.
+     */
+    public void seek(long pos) throws IOException
+    {
+        if (pos >= this.hi_ || pos < this.lo_)
+        {
+            // seeking outside of current buffer -- flush and read
+            this.flushBuffer();
+            this.lo_ = pos & BuffMask_; // start at BuffSz boundary
+            this.maxHi_ = this.lo_ + (long) this.buff_.length;
+            if (this.diskPos_ != this.lo_)
+            {
+                super.seek(this.lo_);
+                this.diskPos_ = this.lo_;
+            }
+            int n = this.fillBuffer();
+            this.hi_ = this.lo_ + (long) n;
+        }
+        else
+        {
+            // seeking inside current buffer -- no read required
+            if (pos < this.curr_)
+            {
+                // if seeking backwards, we must flush to maintain V4
+                this.flushBuffer();
+            }
+        }
+        this.curr_ = pos;
+    }
+    
+    public long getFilePointer()
+    {
+        return this.curr_;
+    }
+    
+    public long length() throws IOException
+    {
+        return Math.max(this.curr_, super.length());
+    }
+    
+    public int read() throws IOException
+    {
+        if (this.curr_ >= this.hi_)
+        {
+            // test for EOF
+            // if (this.hi < this.maxHi) return -1;
+            if (this.hitEOF_)
+                return -1;
+            
+            // slow path -- read another buffer
+            this.seek(this.curr_);
+            if (this.curr_ == this.hi_)
+                return -1;
+        }
+        byte res = this.buff_[(int) (this.curr_ - this.lo_)];
+        this.curr_++;
+        return ((int) res) & 0xFF; // convert byte -> int
+    }
+    
+    public int read(byte[] b) throws IOException
+    {
+        return this.read(b, 0, b.length);
+    }
+    
+    public int read(byte[] b, int off, int len) throws IOException
+    {
+        if (this.curr_ >= this.hi_)
+        {
+            // test for EOF
+            // if (this.hi < this.maxHi) return -1;
+            if (this.hitEOF_)
+                return -1;
+            
+            // slow path -- read another buffer
+            this.seek(this.curr_);
+            if (this.curr_ == this.hi_)
+                return -1;
+        }
+        len = Math.min(len, (int) (this.hi_ - this.curr_));
+        int buffOff = (int) (this.curr_ - this.lo_);
+        System.arraycopy(this.buff_, buffOff, b, off, len);
+        this.curr_ += len;
+        return len;
+    }
+    
+    public void write(int b) throws IOException
+    {
+        if (this.curr_ >= this.hi_)
+        {
+            if (this.hitEOF_ && this.hi_ < this.maxHi_)
+            {
+                // at EOF -- bump "hi"
+                this.hi_++;
+            }
+            else
+            {
+                // slow path -- write current buffer; read next one
+                this.seek(this.curr_);
+                if (this.curr_ == this.hi_)
+                {
+                    // appending to EOF -- bump "hi"
+                    this.hi_++;
+                }
+            }
+        }
+        this.buff_[(int) (this.curr_ - this.lo_)] = (byte) b;
+        this.curr_++;
+        this.dirty_ = true;
+    }
+    
+    public void write(byte[] b) throws IOException
+    {
+        this.write(b, 0, b.length);
+    }
+    
+    public void write(byte[] b, int off, int len) throws IOException
+    {
+        while (len > 0)
+        {
+            int n = this.writeAtMost(b, off, len);
+            off += n;
+            len -= n;
+        }
+        this.dirty_ = true;
+    }
+    
+    /*
+     * Write at most "len" bytes to "b" starting at position "off", and return
+     * the number of bytes written.
+     */
+    private int writeAtMost(byte[] b, int off, int len) throws IOException
+    {
+        if (this.curr_ >= this.hi_)
+        {
+            if (this.hitEOF_ && this.hi_ < this.maxHi_)
+            {
+                // at EOF -- bump "hi"
+                this.hi_ = this.maxHi_;
+            }
+            else
+            {
+                // slow path -- write current buffer; read next one
+                this.seek(this.curr_);
+                if (this.curr_ == this.hi_)
+                {
+                    // appending to EOF -- bump "hi"
+                    this.hi_ = this.maxHi_;
+                }
+            }
+        }
+        len = Math.min(len, (int) (this.hi_ - this.curr_));
+        int buffOff = (int) (this.curr_ - this.lo_);
+        System.arraycopy(b, off, this.buff_, buffOff, len);
+        this.curr_ += len;
+        return len;
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/io/ChecksumManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/ChecksumManager.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/ChecksumManager.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/ChecksumManager.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,423 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.lang.reflect.Method;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.zip.Adler32;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.utils.FileUtils;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import bak.pcj.map.AbstractLongKeyLongMap;
+import bak.pcj.map.LongKeyLongChainedHashMap;
+
+/**
+ * This class manages the persistence of checksums and keeps
+ * them in memory. It maintains a mapping of data files on
+ * disk to their corresponding checksum files. It is also
+ * loads the checksums in memory on start up.
+ * 
+ * @author alakshman
+ *
+ */
+class ChecksumManager
+{    
+    private static Logger logger_ = Logger.getLogger(ChecksumManager.class);
+    /* Keeps a mapping of checksum manager instances to data file */
+    private static Map<String, ChecksumManager> chksumMgrs_ = new HashMap<String, ChecksumManager>();
+    private static Lock lock_ = new ReentrantLock();
+    private static final String checksumPrefix_ = "Checksum-";
+    private static final int bufferSize_ = 8*1024*1024;
+    private static final long chunkMask_ = 0x00000000FFFFFFFFL;
+    private static final long fileIdMask_ = 0x7FFFFFFF00000000L;
+    /* Map where checksums are cached. */
+    private static AbstractLongKeyLongMap chksums_ = new LongKeyLongChainedHashMap();
+
+    public static ChecksumManager instance(String dataFile) throws IOException
+    {
+        ChecksumManager chksumMgr = chksumMgrs_.get(dataFile);
+        if ( chksumMgr == null )
+        {
+            lock_.lock();
+            try
+            {
+                if ( chksumMgr == null )
+                {
+                    chksumMgr = new ChecksumManager(dataFile);
+                    chksumMgrs_.put(dataFile, chksumMgr);
+                }
+            }
+            finally
+            {
+                lock_.unlock();
+            }
+        }
+        return chksumMgr;
+    }
+    
+    /* TODO: Debug only */
+    public static ChecksumManager instance(String dataFile, String chkSumFile) throws IOException
+    {
+        ChecksumManager chksumMgr = chksumMgrs_.get(dataFile);
+        if ( chksumMgr == null )
+        {
+            lock_.lock();
+            try
+            {
+                if ( chksumMgr == null )
+                {
+                    chksumMgr = new ChecksumManager(dataFile, chkSumFile);
+                    chksumMgrs_.put(dataFile, chksumMgr);
+                }
+            }
+            finally
+            {
+                lock_.unlock();
+            }
+        }
+        return chksumMgr;
+    }
+    
+    
+    /**
+     * On start read all the check sum files on disk and
+     * pull them into memory.
+     * @throws IOException
+     */
+    public static void onStart() throws IOException
+    {
+        String[] directories = DatabaseDescriptor.getAllDataFileLocations(); 
+        List<File> allFiles = new ArrayList<File>();
+        for ( String directory : directories )
+        {
+            File file = new File(directory);
+            File[] files = file.listFiles();
+            for ( File f : files )
+            {
+                if ( f.getName().contains(ChecksumManager.checksumPrefix_) )
+                {
+                    allFiles.add(f);
+                }
+            }
+        }
+        
+        for ( File file : allFiles )
+        {                           
+            int fId = SequenceFile.getFileId(file.getName());
+            ChecksumReader chksumRdr = new ChecksumReader(file.getAbsolutePath(), 0L, file.length());
+                        
+            int chunk = 0;
+            while ( !chksumRdr.isEOF() )
+            {
+                long value = chksumRdr.readLong();
+                long key = ChecksumManager.key(fId, ++chunk);
+                chksums_.put(key, value);
+            }
+        }
+    }
+    
+    /**
+     * On delete of this dataFile remove the checksums associated with
+     * this file from memory, remove the check sum manager instance.
+     * 
+     * @param dataFile data file that is being deleted.
+     * @throws IOException
+     */
+    public static void onFileDelete(String dataFile) throws IOException
+    {
+        File f = new File(dataFile);
+        long size = f.length();
+        int fileId = SequenceFile.getFileId(f.getName());
+        int chunks = (int)(size >> 16L);
+        
+        for ( int i = 0; i < chunks; ++i )
+        {
+            long key = ChecksumManager.key(fileId, i);
+            chksums_.remove(key);
+        }
+        
+        /* remove the check sum manager instance */
+        chksumMgrs_.remove(dataFile);
+        String chksumFile = f.getParent() + System.getProperty("file.separator") + checksumPrefix_ + fileId + ".db";
+        FileUtils.delete(chksumFile);
+    }
+    
+    private static long key(int fileId, int chunkId)
+    {
+        long key = 0;
+        key |= fileId;
+        key <<= 32;
+        key |= chunkId;
+        return key;
+    }
+    
+    private RandomAccessFile raf_;
+    private Adler32 adler_ = new Adler32();
+    
+    ChecksumManager(String dataFile) throws IOException
+    {
+        File file = new File(dataFile);
+        String directory = file.getParent();
+        String f = file.getName();
+        short fId = SequenceFile.getFileId(f);
+        String chkSumFile = directory + System.getProperty("file.separator") + checksumPrefix_ + fId + ".db";
+        raf_ = new RandomAccessFile(chkSumFile, "rw");
+    }
+    
+    /* TODO: Remove later. */
+    ChecksumManager(String dataFile, String chkSumFile) throws IOException
+    {
+        File file = new File(dataFile);
+        String directory = file.getParent();
+        String f = file.getName();
+        short fId = SequenceFile.getFileId(f);        
+        raf_ = new RandomAccessFile(chkSumFile, "rw");
+        
+        file = new File(chkSumFile);        
+        ChecksumReader chksumRdr = new ChecksumReader(file.getAbsolutePath(), 0L, file.length());
+                    
+        int chunk = 0;
+        while ( !chksumRdr.isEOF() )
+        {
+            long value = chksumRdr.readLong();
+            long key = ChecksumManager.key(fId, ++chunk);
+            chksums_.put(key, value);
+        }
+    }
+    
+    /**
+     * Log the checksum for the the specified file and chunk
+     * within the file.
+     * @param fileId id associated with the file
+     * @param chunkId chunk within the file.
+     * @param buffer for which the checksum needs to be calculated.
+     * @throws IOException
+     */
+    void logChecksum(int fileId, int chunkId, byte[] buffer)
+    {
+        logChecksum(fileId, chunkId, buffer, 0, buffer.length);
+    }
+    
+    /**
+     * Log the checksum for the the specified file and chunk
+     * within the file.
+     * @param fileId id associated with the file
+     * @param chunkId chunk within the file.
+     * @param buffer for which the checksum needs to be calculated.
+     * @param startoffset offset to start within the buffer
+     * @param length size of the checksum buffer.
+     * @throws IOException
+     */
+    void logChecksum(int fileId, int chunkId, byte[] buffer, int startOffset, int length)
+    {
+        try
+        {            
+            adler_.update(buffer, startOffset, length);
+            long chksum = adler_.getValue();
+            adler_.reset();
+            /* log checksums to disk */
+            raf_.writeLong(chksum);
+            /* add the chksum to memory */
+            long key = ChecksumManager.key(fileId, chunkId);
+            chksums_.put(key, chksum);
+        }
+        catch ( IOException ex )
+        {
+            logger_.warn( LogUtil.throwableToString(ex) );
+        }
+    }
+    
+    /**
+     * Validate checksums for the data in the buffer.
+     * @file name of the file from which data is being
+     *       read.
+     * @chunkId chunkId
+     * @param buffer with data for which checksum needs to be 
+     *        verified.
+     * @throws IOException
+     */
+    void validateChecksum(String file, int chunkId, byte[] buffer) throws IOException
+    {                
+        validateChecksum(file, chunkId, buffer, 0, buffer.length);
+    }
+    
+    /**
+     * Validate checksums for the data in the buffer for the region
+     * that is encapsulated in the section object
+     * @file name of the file from which data is being
+     *       read.
+     * @chunkId chunkId     
+     * @param buffer with data for which checksum needs to be 
+     *        verified.
+     * @param startOffset within the buffer
+     * @param length of the data whose checksum needs to be verified.
+     * @throws IOException
+     */
+    void validateChecksum(String file, int chunkId, byte[] buffer, int startOffset, int length) throws IOException
+    {            
+        int fId = SequenceFile.getFileId(file);
+        long key = ChecksumManager.key(fId, chunkId);
+        adler_.update(buffer, startOffset, length);
+        long currentChksum = adler_.getValue();
+        adler_.reset();
+        long oldChksum = chksums_.get(key);
+        if ( currentChksum != oldChksum )
+        {                                   
+            throw new IOException("Checksums do not match in file " + file + " for chunk " + chunkId + ".");
+        }        
+    }
+    
+    
+    /**
+     * Get the checksum for the specified file's chunk
+     * @param fileId id associated with the file.
+     * @param chunkId chunk within the file.
+     * @return associated checksum for the chunk
+     */
+    long getChecksum(int fileId, int chunkId)
+    {        
+        long key = ChecksumManager.key(fileId, chunkId);
+        return chksums_.get(key);
+    }
+    
+    public static void main(String[] args) throws Throwable
+    {
+        ChecksumReader rdr = new ChecksumReader("C:\\Engagements\\Cassandra\\Checksum-1.db");
+        while ( !rdr.isEOF() )
+        {
+            System.out.println(rdr.readLong());
+        }
+        rdr.close();
+    }
+}
+
+/**
+ * ChecksumReader is used to memory map the checksum files and
+ * load the data into memory.
+ * 
+ * @author alakshman
+ *
+ */
+class ChecksumReader 
+{
+    private static Logger logger_ = Logger.getLogger(ChecksumReader.class);
+    private String filename_;
+    private MappedByteBuffer buffer_;
+
+    ChecksumReader(String filename) throws IOException
+    {
+        filename_ = filename;
+        File f = new File(filename);
+        map(0, f.length());
+    }
+    
+    ChecksumReader(String filename, long start, long end) throws IOException
+    {        
+        filename_ = filename;
+        map(start, end);
+    }
+
+    public void map() throws IOException
+    {
+        RandomAccessFile file = new RandomAccessFile(filename_, "rw");
+        try
+        {
+            buffer_ = file.getChannel().map(FileChannel.MapMode.READ_ONLY, 0, file.length() );
+            buffer_.load();
+        }
+        finally
+        {
+            file.close();
+        }
+    }
+
+    public void map(long start, long end) throws IOException
+    {
+        if ( start < 0 || end < 0 || end < start )
+            throw new IllegalArgumentException("Invalid values for start and end.");
+
+        RandomAccessFile file = new RandomAccessFile(filename_, "rw");
+        try
+        {
+            if ( end == 0 )
+                end = file.length();
+            buffer_ = file.getChannel().map(FileChannel.MapMode.READ_ONLY, start, end);
+            buffer_.load();
+        }
+        finally
+        {
+            file.close();
+        }
+    }
+
+    void unmap(final Object buffer)
+    {
+        AccessController.doPrivileged( new PrivilegedAction<MappedByteBuffer>()
+                {
+            public MappedByteBuffer run()
+            {
+                try
+                {
+                    Method getCleanerMethod = buffer.getClass().getMethod("cleaner", new Class[0]);
+                    getCleanerMethod.setAccessible(true);
+                    sun.misc.Cleaner cleaner = (sun.misc.Cleaner)getCleanerMethod.invoke(buffer,new Object[0]);
+                    cleaner.clean();
+                }
+                catch(Throwable e)
+                {
+                    logger_.debug( LogUtil.throwableToString(e) );
+                }
+                return null;
+            }
+                });
+    }
+    
+    public long readLong() throws IOException
+    {
+        return buffer_.getLong();
+    }
+    
+    public boolean isEOF()
+    {
+        return ( buffer_.remaining() == 0 );
+    }
+
+    
+    public void close() throws IOException
+    {
+        unmap(buffer_);
+    }
+}