You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pm...@apache.org on 2009/03/02 08:57:31 UTC
svn commit: r749218 [18/34] - in /incubator/cassandra: branches/ dist/
nightly/ site/ tags/ trunk/ trunk/lib/ trunk/src/ trunk/src/org/
trunk/src/org/apache/ trunk/src/org/apache/cassandra/
trunk/src/org/apache/cassandra/analytics/ trunk/src/org/apache...
Added: incubator/cassandra/trunk/src/org/apache/cassandra/gms/HeartBeatState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/gms/HeartBeatState.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/gms/HeartBeatState.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/gms/HeartBeatState.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.cassandra.io.ICompactSerializer;
+
+
+/**
+ * HeartBeat State associated with any given endpoint.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class HeartBeatState
+{
+ private static ICompactSerializer<HeartBeatState> serializer_;
+
+ static
+ {
+ serializer_ = new HeartBeatStateSerializer();
+ }
+
+ int generation_;
+ AtomicInteger heartbeat_;
+ int version_;
+
+ HeartBeatState()
+ {
+ }
+
+ HeartBeatState(int generation, int heartbeat)
+ {
+ this(generation, heartbeat, 0);
+ }
+
+ HeartBeatState(int generation, int heartbeat, int version)
+ {
+ generation_ = generation;
+ heartbeat_ = new AtomicInteger(heartbeat);
+ version_ = version;
+ }
+
+ public static ICompactSerializer<HeartBeatState> serializer()
+ {
+ return serializer_;
+ }
+
+ int getGeneration()
+ {
+ return generation_;
+ }
+
+ void updateGeneration()
+ {
+ ++generation_;
+ version_ = VersionGenerator.getNextVersion();
+ }
+
+ int getHeartBeat()
+ {
+ return heartbeat_.get();
+ }
+
+ void updateHeartBeat()
+ {
+ heartbeat_.incrementAndGet();
+ version_ = VersionGenerator.getNextVersion();
+ }
+
+ int getHeartBeatVersion()
+ {
+ return version_;
+ }
+};
+
+class HeartBeatStateSerializer implements ICompactSerializer<HeartBeatState>
+{
+ public void serialize(HeartBeatState hbState, DataOutputStream dos) throws IOException
+ {
+ dos.writeInt(hbState.generation_);
+ dos.writeInt(hbState.heartbeat_.get());
+ dos.writeInt(hbState.version_);
+ }
+
+ public HeartBeatState deserialize(DataInputStream dis) throws IOException
+ {
+ return new HeartBeatState(dis.readInt(), dis.readInt(), dis.readInt());
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/gms/IEndPointStateChangePublisher.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/gms/IEndPointStateChangePublisher.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/gms/IEndPointStateChangePublisher.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/gms/IEndPointStateChangePublisher.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+/**
+ * This is implemented by the Gossiper module to publish change events to interested parties.
+ * Interested parties register/unregister interest by invoking the methods of this interface.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IEndPointStateChangePublisher
+{
+ /**
+ * Register for interesting state changes.
+ * @param subcriber module which implements the IEndPointStateChangeSubscriber
+ */
+ public void register(IEndPointStateChangeSubscriber subcriber);
+
+ /**
+ * Unregister interest for state changes.
+ * @param subcriber module which implements the IEndPointStateChangeSubscriber
+ */
+ public void unregister(IEndPointStateChangeSubscriber subcriber);
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/gms/IEndPointStateChangeSubscriber.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/gms/IEndPointStateChangeSubscriber.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/gms/IEndPointStateChangeSubscriber.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/gms/IEndPointStateChangeSubscriber.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import org.apache.cassandra.net.EndPoint;
+
+/**
+ * This is called by an instance of the IEndPointStateChangePublisher to notify
+ * interested parties about changes in the the state associated with any endpoint.
+ * For instance if node A figures there is a changes in state for an endpoint B
+ * it notifies all interested parties of this change. It is upto to the registered
+ * instance to decide what he does with this change. Not all modules maybe interested
+ * in all state changes.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IEndPointStateChangeSubscriber
+{
+ /**
+ * Use to inform interested parties about the change in the state
+ * for specified endpoint
+ *
+ * @param endpoint endpoint for which the state change occured.
+ * @param epState state that actually changed for the above endpoint.
+ */
+ public void onChange(EndPoint endpoint, EndPointState epState);
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/gms/IFailureDetectionEventListener.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/gms/IFailureDetectionEventListener.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/gms/IFailureDetectionEventListener.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/gms/IFailureDetectionEventListener.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import org.apache.cassandra.net.EndPoint;
+
+/**
+ * Implemented by the Gossiper to either convict/suspect an endpoint
+ * based on the PHI calculated by the Failure Detector on the inter-arrival
+ * times of the heart beats.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IFailureDetectionEventListener
+{
+ /**
+ * Convict the specified endpoint.
+ * @param ep endpoint to be convicted
+ */
+ public void convict(EndPoint ep);
+
+ /**
+ * Suspect the specified endpoint.
+ * @param ep endpoint to be suspected.
+ */
+ public void suspect(EndPoint ep);
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/gms/IFailureDetector.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/gms/IFailureDetector.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/gms/IFailureDetector.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/gms/IFailureDetector.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import org.apache.cassandra.net.EndPoint;
+
+/**
+ * An interface that provides an application with the ability
+ * to query liveness information of a node in the cluster. It
+ * also exposes methods which help an application register callbacks
+ * for notifications of liveness information of nodes.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IFailureDetector
+{
+ /**
+ * Failure Detector's knowledge of whether a node is up or
+ * down.
+ *
+ * @param ep endpoint in question.
+ * @return true if UP and false if DOWN.
+ */
+ public boolean isAlive(EndPoint ep);
+
+ /**
+ * This method is invoked by any entity wanting to interrogate the status of an endpoint.
+ * In our case it would be the Gossiper. The Failure Detector will then calculate Phi and
+ * deem an endpoint as suspicious or alive as explained in the Hayashibara paper.
+ *
+ * param ep endpoint for which we interpret the inter arrival times.
+ */
+ public void intepret(EndPoint ep);
+
+ /**
+ * This method is invoked by the receiver of the heartbeat. In our case it would be
+ * the Gossiper. Gossiper inform the Failure Detector on receipt of a heartbeat. The
+ * FailureDetector will then sample the arrival time as explained in the paper.
+ *
+ * param ep endpoint being reported.
+ */
+ public void report(EndPoint ep);
+
+ /**
+ * Register interest for Failure Detector events.
+ * @param listener implementation of an application provided IFailureDetectionEventListener
+ */
+ public void registerFailureDetectionEventListener(IFailureDetectionEventListener listener);
+
+ /**
+ * Un-register interest for Failure Detector events.
+ * @param listener implementation of an application provided IFailureDetectionEventListener
+ */
+ public void unregisterFailureDetectionEventListener(IFailureDetectionEventListener listener);
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/gms/IFailureNotification.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/gms/IFailureNotification.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/gms/IFailureNotification.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/gms/IFailureNotification.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import org.apache.cassandra.net.EndPoint;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IFailureNotification
+{
+ public void suspect(EndPoint ep);
+ public void revive(EndPoint ep);
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/gms/JoinMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/gms/JoinMessage.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/gms/JoinMessage.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/gms/JoinMessage.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.cassandra.io.ICompactSerializer;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class JoinMessage
+{
+ private static ICompactSerializer<JoinMessage> serializer_;
+ static
+ {
+ serializer_ = new JoinMessageSerializer();
+ }
+
+ static ICompactSerializer<JoinMessage> serializer()
+ {
+ return serializer_;
+ }
+
+ String clusterId_;
+
+ JoinMessage(String clusterId)
+ {
+ clusterId_ = clusterId;
+ }
+}
+
+class JoinMessageSerializer implements ICompactSerializer<JoinMessage>
+{
+ public void serialize(JoinMessage joinMessage, DataOutputStream dos) throws IOException
+ {
+ dos.writeUTF(joinMessage.clusterId_);
+ }
+
+ public JoinMessage deserialize(DataInputStream dis) throws IOException
+ {
+ String clusterId = dis.readUTF();
+ return new JoinMessage(clusterId);
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/gms/PureRandom.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/gms/PureRandom.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/gms/PureRandom.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/gms/PureRandom.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.util.Random;
+
+import org.apache.cassandra.utils.BitSet;
+
+
+
+/**
+ * Implementation of a PureRandomNumber generator. Use this class cautiously. Not
+ * for general purpose use. Currently this is used by the Gossiper to choose a random
+ * endpoint to Gossip to.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class PureRandom extends Random
+{
+ private BitSet bs_ = new BitSet();
+ private int lastUb_;
+
+ PureRandom()
+ {
+ super();
+ }
+
+ public int nextInt(int ub)
+ {
+ if (ub <= 0)
+ throw new IllegalArgumentException("ub must be positive");
+
+ if ( lastUb_ != ub )
+ {
+ bs_.clear();
+ lastUb_ = ub;
+ }
+ else if(bs_.cardinality() == ub)
+ {
+ bs_.clear();
+ }
+
+ int value = super.nextInt(ub);
+ while ( bs_.get(value) )
+ {
+ value = super.nextInt(ub);
+ }
+ bs_.set(value);
+ return value;
+ }
+
+ public static void main(String[] args) throws Throwable
+ {
+ Random pr = new PureRandom();
+ int ubs[] = new int[] { 2, 3, 1, 10, 5, 0};
+
+ for (int ub : ubs)
+ {
+ System.out.println("UB: " + String.valueOf(ub));
+ for (int j = 0; j < 10; j++)
+ {
+ int junk = pr.nextInt(ub);
+ // Do something with junk so JVM doesn't optimize away
+ System.out.println(junk);
+ }
+ }
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/gms/VersionGenerator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/gms/VersionGenerator.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/gms/VersionGenerator.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/gms/VersionGenerator.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A unique version number generator for any state that is generated by the
+ * local node.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class VersionGenerator
+{
+ private static AtomicInteger version_ = new AtomicInteger(0);
+
+ public static int getNextVersion()
+ {
+ return version_.incrementAndGet();
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/io/AIORandomAccessFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/AIORandomAccessFile.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/AIORandomAccessFile.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/AIORandomAccessFile.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,789 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousFileChannel;
+import java.nio.channels.CompletionHandler;
+import java.nio.file.OpenOption;
+import java.nio.file.StandardOpenOption;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.cassandra.concurrent.ContinuationContext;
+import org.apache.cassandra.concurrent.ContinuationsExecutor;
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.IContinuable;
+import org.apache.cassandra.concurrent.ThreadFactoryImpl;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.continuations.Suspendable;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.GuidGenerator;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.commons.javaflow.Continuation;
+import org.apache.log4j.Logger;
+
+/**
+ * A <code>AIORandomAccessFile</code> is like a
+ * <code>RandomAccessFile</code>, but it uses a private buffer so that most
+ * operations do not require a disk access.
+ * <P>
+ *
+ * Note: The operations on this class are unmonitored. Also, the correct
+ * functioning of the <code>RandomAccessFile</code> methods that are not
+ * overridden here relies on the implementation of those methods in the
+ * superclass.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public final class AIORandomAccessFile extends RandomAccessFile
+{
+ private final static Logger logger_ = Logger.getLogger(AIORandomAccessFile.class);
+ private final static ThreadLocal<Runnable> tls_ = new InheritableThreadLocal<Runnable>();
+ static final int LogBuffSz_ = 16; // 64K buffer
+ public static final int BuffSz_ = (1 << LogBuffSz_);
+ static final long BuffMask_ = ~(((long) BuffSz_) - 1L);
+ /* Used to lock the creation of the disk thread pool instance */
+ private static Lock createLock_ = new ReentrantLock();
+ private static ExecutorService diskIOPool_;
+
+ /**
+ * Submits a read request to the Kernel and is used
+ * only when running in Continuations mode. The kernel
+ * on read completion will resume the continuation passed
+ * in to complete the read request.
+ *
+ * @author alakshman
+ *
+ */
+ class AIOReader implements IContinuable
+ {
+ /* the continuation that needs to be resumed on read completion */
+ private ContinuationContext continuationCtx_;
+
+ AIOReader(ContinuationContext continuationCtx)
+ {
+ continuationCtx_ = continuationCtx;
+ }
+
+ public void run(Continuation c)
+ {
+ /* submit the read request */
+ continuationCtx_.setContinuation(c);
+ ByteBuffer buffer = ByteBuffer.wrap( buffer_ );
+ fileChannel_.read(buffer, diskPos_, continuationCtx_, new ReadCompletionHandler());
+ }
+ }
+
+ /**
+ * Read completion handler for AIO framework. The context
+ * that is passed in, is a Continuation that needs to be
+ * resumed on read completion.
+ *
+ * @author alakshman
+ *
+ * @param <V> number of bytes read.
+ */
+ class ReadCompletionHandler implements CompletionHandler<Integer, ContinuationContext>
+ {
+ public void cancelled(ContinuationContext attachment)
+ {
+ }
+
+ public void completed(Integer result, ContinuationContext attachment)
+ {
+ logger_.debug("Bytes read " + result);
+ if ( attachment != null )
+ {
+ Continuation c = attachment.getContinuation();
+ attachment.result(result);
+ if ( c != null )
+ {
+ c = Continuation.continueWith(c, attachment);
+ ContinuationsExecutor.doPostProcessing(c);
+ }
+ }
+ }
+
+ public void failed(Throwable th, ContinuationContext attachment)
+ {
+ }
+ }
+
+ /*
+ * This implementation is based on the buffer implementation in Modula-3's
+ * "Rd", "Wr", "RdClass", and "WrClass" interfaces.
+ */
+ private boolean dirty_; // true iff unflushed bytes exist
+ private boolean closed_; // true iff the file is closed
+ private long curr_; // current position in file
+ private long lo_, hi_; // bounds on characters in "buff"
+ private byte[] buffer_ = new byte[0]; // local buffer
+ private long maxHi_; // this.lo + this.buff.length
+ private boolean hitEOF_; // buffer contains last file block?
+ private long diskPos_; // disk position
+ private AsynchronousFileChannel fileChannel_; // asynchronous file channel used for AIO.
+ private boolean bContinuations_; // indicates if used in continuations mode.
+
+ /*
+ * To describe the above fields, we introduce the following abstractions for
+ * the file "f":
+ *
+ * len(f) the length of the file curr(f) the current position in the file
+ * c(f) the abstract contents of the file disk(f) the contents of f's
+ * backing disk file closed(f) true iff the file is closed
+ *
+ * "curr(f)" is an index in the closed interval [0, len(f)]. "c(f)" is a
+ * character sequence of length "len(f)". "c(f)" and "disk(f)" may differ if
+ * "c(f)" contains unflushed writes not reflected in "disk(f)". The flush
+ * operation has the effect of making "disk(f)" identical to "c(f)".
+ *
+ * A file is said to be *valid* if the following conditions hold:
+ *
+ * V1. The "closed" and "curr" fields are correct:
+ *
+ * f.closed == closed(f) f.curr == curr(f)
+ *
+ * V2. The current position is either contained in the buffer, or just past
+ * the buffer:
+ *
+ * f.lo <= f.curr <= f.hi
+ *
+ * V3. Any (possibly) unflushed characters are stored in "f.buff":
+ *
+ * (forall i in [f.lo, f.curr): c(f)[i] == f.buff[i - f.lo])
+ *
+ * V4. For all characters not covered by V3, c(f) and disk(f) agree:
+ *
+ * (forall i in [f.lo, len(f)): i not in [f.lo, f.curr) => c(f)[i] ==
+ * disk(f)[i])
+ *
+ * V5. "f.dirty" is true iff the buffer contains bytes that should be
+ * flushed to the file; by V3 and V4, only part of the buffer can be dirty.
+ *
+ * f.dirty == (exists i in [f.lo, f.curr): c(f)[i] != f.buff[i - f.lo])
+ *
+ * V6. this.maxHi == this.lo + this.buff.length
+ *
+ * Note that "f.buff" can be "null" in a valid file, since the range of
+ * characters in V3 is empty when "f.lo == f.curr".
+ *
+ * A file is said to be *ready* if the buffer contains the current position,
+ * i.e., when:
+ *
+ * R1. !f.closed && f.buff != null && f.lo <= f.curr && f.curr < f.hi
+ *
+ * When a file is ready, reading or writing a single byte can be performed
+ * by reading or writing the in-memory buffer without performing a disk
+ * operation.
+ */
+
+ /**
+ * Open a AIORandomAccessFile for r/w operations.
+ * @param file file to be opened.
+ */
+ public AIORandomAccessFile(File file) throws IOException
+ {
+ super(file, "rw");
+ this.init(file, 0, false);
+ }
+
+ /**
+ * Open a AIORandomAccessFile for r/w operations.
+ * @param file file to be opened.
+ * @param bContinuations specify if continuations
+ * support is required.
+ */
+ public AIORandomAccessFile(File file, boolean bContinuations) throws IOException
+ {
+ super(file, "rw");
+ this.init(file, 0, bContinuations);
+ }
+
+ /**
+ * Open a AIORandomAccessFile for r/w operations.
+ * @param file file to be opened
+ * @param size amount of data to be buffer as part
+ * of r/w operations
+ * @throws IOException
+ */
+ public AIORandomAccessFile(File file, int size) throws IOException
+ {
+ super(file, "rw");
+ init(file, size, false);
+ }
+
+ /**
+ * Open a AIORandomAccessFile for r/w operations.
+ * @param file file to be opened
+ * @param size amount of data to be buffer as part
+ * of r/w operations
+ * @param bContinuations specify if continuations
+ * support is required.
+ * @throws IOException
+ */
+ public AIORandomAccessFile(File file, int size, boolean bContinuations) throws IOException
+ {
+ super(file, "rw");
+ init(file, size, bContinuations);
+ }
+
+ /**
+ * Open a AIORandomAccessFile for r/w operations.
+ * @param name of file to be opened.
+ */
+ public AIORandomAccessFile(String name) throws IOException
+ {
+ super(name, "rw");
+ this.init(new File(name), 0, false);
+ }
+
+ /**
+ * Open a AIORandomAccessFile for r/w operations.
+ * @param name of file to be opened.
+ * @param bContinuations specify if continuations
+ * support is required.
+ */
+ public AIORandomAccessFile(String name, boolean bContinuations) throws IOException
+ {
+ super(name, "rw");
+ this.init(new File(name), 0, bContinuations);
+ }
+
+ /**
+ * Open a AIORandomAccessFile for r/w operations.
+ * @param name of file to be opened.
+ * @param size buffering size to be used.
+ */
+ public AIORandomAccessFile(String name, int size) throws IOException
+ {
+ super(name, "rw");
+ this.init(new File(name), size, false);
+ }
+
+ /**
+ * Open a AIORandomAccessFile for r/w operations.
+ * @param name of file to be opened.
+ * @param name of file to be opened.
+ * @param bContinuations specify if continuations
+ * support is required.
+ */
+ public AIORandomAccessFile(String name, int size, boolean bContinuations) throws IOException
+ {
+ super(name, "rw");
+ this.init(new File(name), size, bContinuations);
+ }
+
+ private void init(File file, int size, boolean bVal) throws IOException
+ {
+ bContinuations_ = bVal;
+ OpenOption[] openOptions = new OpenOption[]{StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.READ};
+ this.dirty_ = this.closed_ = false;
+ this.lo_ = this.curr_ = this.hi_ = 0;
+ this.buffer_ = (size > BuffSz_) ? new byte[size] : new byte[BuffSz_];
+ this.maxHi_ = (long) BuffSz_;
+ this.hitEOF_ = false;
+ this.diskPos_ = 0L;
+ /* set up the asynchronous file channel */
+ if ( diskIOPool_ == null )
+ {
+ createLock_.lock();
+ try
+ {
+ if ( diskIOPool_ == null )
+ {
+ int maxThreads = DatabaseDescriptor.getThreadsPerPool();
+ diskIOPool_ = new ContinuationsExecutor( maxThreads,
+ maxThreads,
+ Integer.MAX_VALUE,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new ThreadFactoryImpl("DISK-IO-POOL")
+ );
+ }
+ }
+ finally
+ {
+ createLock_.unlock();
+ }
+ }
+ Set<OpenOption> set = new HashSet<OpenOption>( Arrays.asList(openOptions) );
+ fileChannel_ = AsynchronousFileChannel.open(file.toPath(), set, diskIOPool_);
+ }
+
+ public void close() throws IOException
+ {
+ this.onClose();
+ this.closed_ = true;
+ fileChannel_.close();
+ }
+
+ /**
+ * Flush any bytes in the file's buffer that have not yet been written to
+ * disk. If the file was created read-only, this method is a no-op.
+ */
+ public void flush() throws IOException
+ {
+ this.flushBuffer();
+ }
+
+ /**
+ * Flush any dirty bytes in the buffer to disk.
+ */
+ private void flushBuffer() throws IOException
+ {
+ if (this.dirty_)
+ {
+ int len = (int) (this.curr_ - this.lo_);
+ doWrite(this.lo_, false);
+ this.diskPos_ = this.curr_;
+ this.dirty_ = false;
+ }
+ }
+
+ /**
+ * Invoked when close() is invoked and causes the flush
+ * of the last few bytes to block when the write is submitted.
+ * @throws IOException
+ */
+ private void onClose() throws IOException
+ {
+ if (this.dirty_)
+ {
+ int len = (int) (this.curr_ - this.lo_);
+ doWrite(this.lo_, true);
+ this.diskPos_ = this.curr_;
+ this.dirty_ = false;
+ }
+ }
+
+ /**
+ * This method submits an I/O for write where the write happens at
+ * <i>position</i> within the file.
+ *
+ * @param position to seek to within the file
+ * @param onClose indicates if this method was invoked on a close().
+ */
+ private void doWrite(long position, boolean onClose)
+ {
+ ByteBuffer buffer = ByteBuffer.wrap(buffer_);
+ int length = (int) (this.curr_ - this.lo_);
+ buffer.limit(length);
+ Future<Integer> futurePtr = fileChannel_.write(buffer, position, null, new WriteCompletionHandler<Integer>());
+ if ( onClose )
+ {
+ try
+ {
+ /* this will block but will execute only on a close() */
+ futurePtr.get();
+ }
+ catch (ExecutionException ex)
+ {
+ logger_.warn(LogUtil.throwableToString(ex));
+ }
+ catch (InterruptedException ex)
+ {
+ logger_.warn(LogUtil.throwableToString(ex));
+ }
+ }
+ buffer_ = new byte[buffer_.length];
+ }
+
+ /**
+ * Read at most "this.buff.length" bytes into "this.buff", returning the
+ * number of bytes read. If the return result is less than
+ * "this.buff.length", then EOF was read.
+ */
+ private int fillBuffer() throws IOException
+ {
+ int cnt = 0;
+ ByteBuffer buffer = ByteBuffer.allocate(buffer_.length);
+ Future<Integer> futurePtr = fileChannel_.read(buffer, this.diskPos_, null, new ReadCompletionHandler());
+
+ try
+ {
+ /*
+ * This should block
+ */
+ cnt = futurePtr.get();
+ }
+ catch (ExecutionException ex)
+ {
+ logger_.warn(LogUtil.throwableToString(ex));
+ }
+ catch (InterruptedException ex)
+ {
+ logger_.warn(LogUtil.throwableToString(ex));
+ }
+
+ if ( (cnt < 0) && ( this.hitEOF_ = (cnt < this.buffer_.length) ) )
+ {
+ // make sure buffer that wasn't read is initialized with -1
+ if ( cnt < 0 )
+ cnt = 0;
+ Arrays.fill(buffer_, cnt, this.buffer_.length, (byte) 0xff);
+ }
+ else
+ {
+ buffer_ = buffer.array();
+ }
+ this.diskPos_ += cnt;
+ return cnt;
+ }
+
+ /**
+ * Read as much data as indicated by the size of the buffer.
+ * This method is only invoked in continuation mode.
+ */
+ private int fillBuffer2()
+ {
+ ContinuationContext continuationCtx = (ContinuationContext)Continuation.getContext();
+ IContinuable reader = new AIOReader( continuationCtx );
+ ContinuationsExecutor.putInTls(reader);
+ /* suspend the continuation */
+ Continuation.suspend();
+
+ continuationCtx = (ContinuationContext)Continuation.getContext();
+ int cnt = (Integer)continuationCtx.result();
+
+ if ( (cnt < 0) && ( this.hitEOF_ = (cnt < this.buffer_.length) ) )
+ {
+ // make sure buffer that wasn't read is initialized with -1
+ if ( cnt < 0 )
+ cnt = 0;
+ Arrays.fill(buffer_, cnt, this.buffer_.length, (byte) 0xff);
+ }
+ this.diskPos_ += cnt;
+ return cnt;
+ }
+
+ /**
+ * This method positions <code>this.curr</code> at position
+ * <code>pos</code>. If <code>pos</code> does not fall in the current
+ * buffer, it flushes the current buffer and loads the correct one.
+ * <p>
+ *
+ * On exit from this routine <code>this.curr == this.hi</code> iff
+ * <code>pos</code> is at or past the end-of-file, which can only happen
+ * if the file was opened in read-only mode.
+ */
+ public void seek(long pos) throws IOException
+ {
+ if (pos >= this.hi_ || pos < this.lo_)
+ {
+ // seeking outside of current buffer -- flush and read
+ this.flushBuffer();
+ this.lo_ = pos & BuffMask_; // start at BuffSz boundary
+ this.maxHi_ = this.lo_ + (long) this.buffer_.length;
+ if (this.diskPos_ != this.lo_)
+ {
+ this.diskPos_ = this.lo_;
+ }
+
+ int n = 0;
+ /* Perform the read operations in continuation style */
+ if ( bContinuations_ )
+ {
+ n = fillBuffer2();
+ }
+ else
+ {
+ n = fillBuffer();
+ }
+ this.hi_ = this.lo_ + (long) n;
+ }
+ else
+ {
+ // seeking inside current buffer -- no read required
+ if (pos < this.curr_)
+ {
+ // if seeking backwards, we must flush to maintain V4
+ this.flushBuffer();
+ }
+ }
+ this.curr_ = pos;
+ }
+
+
+ public long getFilePointer()
+ {
+ return this.curr_;
+ }
+
+ public long length() throws IOException
+ {
+ return Math.max(this.curr_, super.length());
+ }
+
+ public int read() throws IOException
+ {
+ if (this.curr_ >= this.hi_)
+ {
+ // test for EOF
+ // if (this.hi < this.maxHi) return -1;
+ if (this.hitEOF_)
+ return -1;
+
+ // slow path -- read another buffer
+ this.seek(this.curr_);
+ if (this.curr_ == this.hi_)
+ return -1;
+ }
+ byte res = this.buffer_[(int) (this.curr_ - this.lo_)];
+ this.curr_++;
+ return ((int) res) & 0xFF; // convert byte -> int
+ }
+
+ public int read(byte[] b) throws IOException
+ {
+ return this.read(b, 0, b.length);
+ }
+
+ public int read(byte[] b, int off, int len) throws IOException
+ {
+ if (this.curr_ >= this.hi_)
+ {
+ // test for EOF
+ // if (this.hi < this.maxHi) return -1;
+ if (this.hitEOF_)
+ return -1;
+
+ // slow path -- read another buffer
+ this.seek(this.curr_);
+ if (this.curr_ == this.hi_)
+ return -1;
+ }
+ len = Math.min(len, (int) (this.hi_ - this.curr_));
+ int buffOff = (int) (this.curr_ - this.lo_);
+ System.arraycopy(this.buffer_, buffOff, b, off, len);
+ this.curr_ += len;
+ return len;
+ }
+
+ public void write(int b) throws IOException
+ {
+ if (this.curr_ >= this.hi_)
+ {
+ if (this.hitEOF_ && this.hi_ < this.maxHi_)
+ {
+ // at EOF -- bump "hi"
+ this.hi_++;
+ }
+ else
+ {
+ // slow path -- write current buffer; read next one
+ this.seek(this.curr_);
+ if (this.curr_ == this.hi_)
+ {
+ // appending to EOF -- bump "hi"
+ this.hi_++;
+ }
+ }
+ }
+ this.buffer_[(int) (this.curr_ - this.lo_)] = (byte) b;
+ this.curr_++;
+ this.dirty_ = true;
+ }
+
+ public void write(byte[] b) throws IOException
+ {
+ this.write(b, 0, b.length);
+ }
+
+ public void write(byte[] b, int off, int len) throws IOException
+ {
+ while (len > 0)
+ {
+ int n = this.writeAtMost(b, off, len);
+ off += n;
+ len -= n;
+ }
+ this.dirty_ = true;
+ }
+
+ /*
+ * Write at most "len" bytes to "b" starting at position "off", and return
+ * the number of bytes written.
+ */
+ private int writeAtMost(byte[] b, int off, int len) throws IOException
+ {
+ if (this.curr_ >= this.hi_)
+ {
+ if (this.hitEOF_ && this.hi_ < this.maxHi_)
+ {
+ // at EOF -- bump "hi"
+ this.hi_ = this.maxHi_;
+ }
+ else
+ {
+ // slow path -- write current buffer; read next one
+ this.seek(this.curr_);
+ if (this.curr_ == this.hi_)
+ {
+ // appending to EOF -- bump "hi"
+ this.hi_ = this.maxHi_;
+ }
+ }
+ }
+ len = Math.min(len, (int) (this.hi_ - this.curr_));
+ int buffOff = (int) (this.curr_ - this.lo_);
+ System.arraycopy(b, off, this.buffer_, buffOff, len);
+ this.curr_ += len;
+ return len;
+ }
+
+ public static void main(String[] args) throws Throwable
+ {
+ /*
+ int i = 0;
+ try
+ {
+ RandomAccessFile aRaf2 = new AIORandomAccessFile( new File("/var/cassandra/test.dat"), 64*1024);
+ aRaf2.seek(0L);
+ while ( i < 10000 )
+ {
+ aRaf2.writeInt(32);
+ aRaf2.writeUTF("Avinash Lakshman");
+ ++i;
+ }
+ aRaf2.close();
+ }
+ catch( IOException ex )
+ {
+ ex.printStackTrace();
+ }
+ */
+ /*
+ int j = 0;
+ try
+ {
+ RandomAccessFile aRaf2 = new AIORandomAccessFile( new File("/var/cassandra/test.dat") );
+ while ( j < 10 )
+ {
+ System.out.println( aRaf2.readInt() );
+ System.out.println( aRaf2.readUTF() );
+ ++j;
+ }
+ aRaf2.close();
+ }
+ catch( IOException ex )
+ {
+ ex.printStackTrace();
+ }
+ */
+
+ ExecutorService es = new ContinuationsExecutor(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>() );
+ es.execute(new ReadImpl());
+ }
+}
+
+class ReadImpl implements Runnable
+{
+ public void run()
+ {
+ int i = 0;
+ try
+ {
+ System.out.println("About to start the whole thing ...");
+ AIORandomAccessFile aRaf2 = new AIORandomAccessFile( new File("/var/cassandra/test.dat"), true );
+ System.out.println("About to seek ...");
+
+ //aRaf2.seek(0L);
+ System.out.println( aRaf2.readInt() );
+ System.out.println( aRaf2.readUTF() );
+
+ System.out.println("About to seek a second time ...");
+ aRaf2.seek(66000L);
+ System.out.println( aRaf2.readInt() );
+ System.out.println( aRaf2.readUTF() );
+
+ aRaf2.close();
+ }
+ catch( IOException ex )
+ {
+ ex.printStackTrace();
+ }
+ }
+}
+
+class WriteImpl implements Runnable
+{
+ public void run()
+ {
+ int i = 0;
+ try
+ {
+ AIORandomAccessFile aRaf2 = new AIORandomAccessFile( new File("/var/cassandra/test.dat"));
+ while ( i < 10000 )
+ {
+ aRaf2.writeInt(32);
+ aRaf2.writeUTF("Avinash Lakshman thinks John McCain is an idiot");
+ ++i;
+ }
+ aRaf2.close();
+ }
+ catch( IOException ex )
+ {
+ ex.printStackTrace();
+ }
+ }
+}
+
+/**
+ * Write completion handler for AIO framework. The context
+ * that is passed in, is a Continuation that needs to be
+ * resumed on write completion. For now the continuation is
+ * not used at all.
+ *
+ * @author alakshman
+ *
+ * @param <V> number of bytes written.
+ */
+class WriteCompletionHandler<V> implements CompletionHandler<V, Continuation>
+{
+ private final static Logger logger_ = Logger.getLogger(WriteCompletionHandler.class);
+
+ public void cancelled(Continuation attachment)
+ {
+ }
+
+ public void completed(V result, Continuation attachment)
+ {
+ logger_.debug("Bytes written " + result);
+ while ( attachment != null )
+ {
+ attachment = Continuation.continueWith(attachment);
+ }
+ }
+
+ public void failed(Throwable th, Continuation attachment)
+ {
+ }
+}
+
+
Added: incubator/cassandra/trunk/src/org/apache/cassandra/io/BufferedRandomAccessFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/BufferedRandomAccessFile.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/BufferedRandomAccessFile.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/BufferedRandomAccessFile.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,375 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.*;
+import java.util.Arrays;
+
+import org.apache.log4j.Logger;
+
+/**
+ * A <code>BufferedRandomAccessFile</code> is like a
+ * <code>RandomAccessFile</code>, but it uses a private buffer so that most
+ * operations do not require a disk access.
+ * <P>
+ *
+ * Note: The operations on this class are unmonitored. Also, the correct
+ * functioning of the <code>RandomAccessFile</code> methods that are not
+ * overridden here relies on the implementation of those methods in the
+ * superclass.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public final class BufferedRandomAccessFile extends RandomAccessFile
+{
+ private static final Logger logger_ = Logger.getLogger(BufferedRandomAccessFile.class);
+ static final int LogBuffSz_ = 16; // 64K buffer
+ public static final int BuffSz_ = (1 << LogBuffSz_);
+ static final long BuffMask_ = ~(((long) BuffSz_) - 1L);
+
+ /*
+ * This implementation is based on the buffer implementation in Modula-3's
+ * "Rd", "Wr", "RdClass", and "WrClass" interfaces.
+ */
+ private boolean dirty_; // true iff unflushed bytes exist
+ private boolean closed_; // true iff the file is closed
+ private long curr_; // current position in file
+ private long lo_, hi_; // bounds on characters in "buff"
+ private byte[] buff_; // local buffer
+ private long maxHi_; // this.lo + this.buff.length
+ private boolean hitEOF_; // buffer contains last file block?
+ private long diskPos_; // disk position
+
+ /*
+ * To describe the above fields, we introduce the following abstractions for
+ * the file "f":
+ *
+ * len(f) the length of the file curr(f) the current position in the file
+ * c(f) the abstract contents of the file disk(f) the contents of f's
+ * backing disk file closed(f) true iff the file is closed
+ *
+ * "curr(f)" is an index in the closed interval [0, len(f)]. "c(f)" is a
+ * character sequence of length "len(f)". "c(f)" and "disk(f)" may differ if
+ * "c(f)" contains unflushed writes not reflected in "disk(f)". The flush
+ * operation has the effect of making "disk(f)" identical to "c(f)".
+ *
+ * A file is said to be *valid* if the following conditions hold:
+ *
+ * V1. The "closed" and "curr" fields are correct:
+ *
+ * f.closed == closed(f) f.curr == curr(f)
+ *
+ * V2. The current position is either contained in the buffer, or just past
+ * the buffer:
+ *
+ * f.lo <= f.curr <= f.hi
+ *
+ * V3. Any (possibly) unflushed characters are stored in "f.buff":
+ *
+ * (forall i in [f.lo, f.curr): c(f)[i] == f.buff[i - f.lo])
+ *
+ * V4. For all characters not covered by V3, c(f) and disk(f) agree:
+ *
+ * (forall i in [f.lo, len(f)): i not in [f.lo, f.curr) => c(f)[i] ==
+ * disk(f)[i])
+ *
+ * V5. "f.dirty" is true iff the buffer contains bytes that should be
+ * flushed to the file; by V3 and V4, only part of the buffer can be dirty.
+ *
+ * f.dirty == (exists i in [f.lo, f.curr): c(f)[i] != f.buff[i - f.lo])
+ *
+ * V6. this.maxHi == this.lo + this.buff.length
+ *
+ * Note that "f.buff" can be "null" in a valid file, since the range of
+ * characters in V3 is empty when "f.lo == f.curr".
+ *
+ * A file is said to be *ready* if the buffer contains the current position,
+ * i.e., when:
+ *
+ * R1. !f.closed && f.buff != null && f.lo <= f.curr && f.curr < f.hi
+ *
+ * When a file is ready, reading or writing a single byte can be performed
+ * by reading or writing the in-memory buffer without performing a disk
+ * operation.
+ */
+
+ /**
+ * Open a new <code>BufferedRandomAccessFile</code> on <code>file</code>
+ * in mode <code>mode</code>, which should be "r" for reading only, or
+ * "rw" for reading and writing.
+ */
+ public BufferedRandomAccessFile(File file, String mode) throws IOException
+ {
+ super(file, mode);
+ this.init(0);
+ }
+
+ public BufferedRandomAccessFile(File file, String mode, int size) throws IOException
+ {
+ super(file, mode);
+ this.init(size);
+ }
+
+ /**
+ * Open a new <code>BufferedRandomAccessFile</code> on the file named
+ * <code>name</code> in mode <code>mode</code>, which should be "r" for
+ * reading only, or "rw" for reading and writing.
+ */
+ public BufferedRandomAccessFile(String name, String mode) throws IOException
+ {
+ super(name, mode);
+ this.init(0);
+ }
+
+ public BufferedRandomAccessFile(String name, String mode, int size) throws FileNotFoundException
+ {
+ super(name, mode);
+ this.init(size);
+ }
+
+ private void init(int size)
+ {
+ this.dirty_ = this.closed_ = false;
+ this.lo_ = this.curr_ = this.hi_ = 0;
+ this.buff_ = (size > BuffSz_) ? new byte[size] : new byte[BuffSz_];
+ this.maxHi_ = (long) BuffSz_;
+ this.hitEOF_ = false;
+ this.diskPos_ = 0L;
+ }
+
+ public void close() throws IOException
+ {
+ this.flush();
+ this.closed_ = true;
+ super.close();
+ }
+
+ /**
+ * Flush any bytes in the file's buffer that have not yet been written to
+ * disk. If the file was created read-only, this method is a no-op.
+ */
+ public void flush() throws IOException
+ {
+ this.flushBuffer();
+ }
+
+ /* Flush any dirty bytes in the buffer to disk. */
+ private void flushBuffer() throws IOException
+ {
+ if (this.dirty_)
+ {
+ if (this.diskPos_ != this.lo_)
+ super.seek(this.lo_);
+ int len = (int) (this.curr_ - this.lo_);
+ super.write(this.buff_, 0, len);
+ this.diskPos_ = this.curr_;
+ this.dirty_ = false;
+ }
+ }
+
+ /*
+ * Read at most "this.buff.length" bytes into "this.buff", returning the
+ * number of bytes read. If the return result is less than
+ * "this.buff.length", then EOF was read.
+ */
+ private int fillBuffer() throws IOException
+ {
+ int cnt = 0;
+ int rem = this.buff_.length;
+ while (rem > 0)
+ {
+ int n = super.read(this.buff_, cnt, rem);
+ if (n < 0)
+ break;
+ cnt += n;
+ rem -= n;
+ }
+ if ( (cnt < 0) && (this.hitEOF_ = (cnt < this.buff_.length)) )
+ {
+ // make sure buffer that wasn't read is initialized with -1
+ Arrays.fill(this.buff_, cnt, this.buff_.length, (byte) 0xff);
+ }
+ this.diskPos_ += cnt;
+ return cnt;
+ }
+
+ /*
+ * This method positions <code>this.curr</code> at position <code>pos</code>.
+ * If <code>pos</code> does not fall in the current buffer, it flushes the
+ * current buffer and loads the correct one.<p>
+ *
+ * On exit from this routine <code>this.curr == this.hi</code> iff <code>pos</code>
+ * is at or past the end-of-file, which can only happen if the file was
+ * opened in read-only mode.
+ */
+ public void seek(long pos) throws IOException
+ {
+ if (pos >= this.hi_ || pos < this.lo_)
+ {
+ // seeking outside of current buffer -- flush and read
+ this.flushBuffer();
+ this.lo_ = pos & BuffMask_; // start at BuffSz boundary
+ this.maxHi_ = this.lo_ + (long) this.buff_.length;
+ if (this.diskPos_ != this.lo_)
+ {
+ super.seek(this.lo_);
+ this.diskPos_ = this.lo_;
+ }
+ int n = this.fillBuffer();
+ this.hi_ = this.lo_ + (long) n;
+ }
+ else
+ {
+ // seeking inside current buffer -- no read required
+ if (pos < this.curr_)
+ {
+ // if seeking backwards, we must flush to maintain V4
+ this.flushBuffer();
+ }
+ }
+ this.curr_ = pos;
+ }
+
+ public long getFilePointer()
+ {
+ return this.curr_;
+ }
+
+ public long length() throws IOException
+ {
+ return Math.max(this.curr_, super.length());
+ }
+
+ public int read() throws IOException
+ {
+ if (this.curr_ >= this.hi_)
+ {
+ // test for EOF
+ // if (this.hi < this.maxHi) return -1;
+ if (this.hitEOF_)
+ return -1;
+
+ // slow path -- read another buffer
+ this.seek(this.curr_);
+ if (this.curr_ == this.hi_)
+ return -1;
+ }
+ byte res = this.buff_[(int) (this.curr_ - this.lo_)];
+ this.curr_++;
+ return ((int) res) & 0xFF; // convert byte -> int
+ }
+
+ public int read(byte[] b) throws IOException
+ {
+ return this.read(b, 0, b.length);
+ }
+
+ public int read(byte[] b, int off, int len) throws IOException
+ {
+ if (this.curr_ >= this.hi_)
+ {
+ // test for EOF
+ // if (this.hi < this.maxHi) return -1;
+ if (this.hitEOF_)
+ return -1;
+
+ // slow path -- read another buffer
+ this.seek(this.curr_);
+ if (this.curr_ == this.hi_)
+ return -1;
+ }
+ len = Math.min(len, (int) (this.hi_ - this.curr_));
+ int buffOff = (int) (this.curr_ - this.lo_);
+ System.arraycopy(this.buff_, buffOff, b, off, len);
+ this.curr_ += len;
+ return len;
+ }
+
+ public void write(int b) throws IOException
+ {
+ if (this.curr_ >= this.hi_)
+ {
+ if (this.hitEOF_ && this.hi_ < this.maxHi_)
+ {
+ // at EOF -- bump "hi"
+ this.hi_++;
+ }
+ else
+ {
+ // slow path -- write current buffer; read next one
+ this.seek(this.curr_);
+ if (this.curr_ == this.hi_)
+ {
+ // appending to EOF -- bump "hi"
+ this.hi_++;
+ }
+ }
+ }
+ this.buff_[(int) (this.curr_ - this.lo_)] = (byte) b;
+ this.curr_++;
+ this.dirty_ = true;
+ }
+
+ public void write(byte[] b) throws IOException
+ {
+ this.write(b, 0, b.length);
+ }
+
+ public void write(byte[] b, int off, int len) throws IOException
+ {
+ while (len > 0)
+ {
+ int n = this.writeAtMost(b, off, len);
+ off += n;
+ len -= n;
+ }
+ this.dirty_ = true;
+ }
+
+ /*
+ * Write at most "len" bytes to "b" starting at position "off", and return
+ * the number of bytes written.
+ */
+ private int writeAtMost(byte[] b, int off, int len) throws IOException
+ {
+ if (this.curr_ >= this.hi_)
+ {
+ if (this.hitEOF_ && this.hi_ < this.maxHi_)
+ {
+ // at EOF -- bump "hi"
+ this.hi_ = this.maxHi_;
+ }
+ else
+ {
+ // slow path -- write current buffer; read next one
+ this.seek(this.curr_);
+ if (this.curr_ == this.hi_)
+ {
+ // appending to EOF -- bump "hi"
+ this.hi_ = this.maxHi_;
+ }
+ }
+ }
+ len = Math.min(len, (int) (this.hi_ - this.curr_));
+ int buffOff = (int) (this.curr_ - this.lo_);
+ System.arraycopy(b, off, this.buff_, buffOff, len);
+ this.curr_ += len;
+ return len;
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/io/ChecksumManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/ChecksumManager.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/ChecksumManager.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/ChecksumManager.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,423 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.lang.reflect.Method;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.zip.Adler32;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.utils.FileUtils;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import bak.pcj.map.AbstractLongKeyLongMap;
+import bak.pcj.map.LongKeyLongChainedHashMap;
+
+/**
+ * This class manages the persistence of checksums and keeps
+ * them in memory. It maintains a mapping of data files on
+ * disk to their corresponding checksum files. It is also
+ * loads the checksums in memory on start up.
+ *
+ * @author alakshman
+ *
+ */
+class ChecksumManager
+{
+ private static Logger logger_ = Logger.getLogger(ChecksumManager.class);
+ /* Keeps a mapping of checksum manager instances to data file */
+ private static Map<String, ChecksumManager> chksumMgrs_ = new HashMap<String, ChecksumManager>();
+ private static Lock lock_ = new ReentrantLock();
+ private static final String checksumPrefix_ = "Checksum-";
+ private static final int bufferSize_ = 8*1024*1024;
+ private static final long chunkMask_ = 0x00000000FFFFFFFFL;
+ private static final long fileIdMask_ = 0x7FFFFFFF00000000L;
+ /* Map where checksums are cached. */
+ private static AbstractLongKeyLongMap chksums_ = new LongKeyLongChainedHashMap();
+
+ public static ChecksumManager instance(String dataFile) throws IOException
+ {
+ ChecksumManager chksumMgr = chksumMgrs_.get(dataFile);
+ if ( chksumMgr == null )
+ {
+ lock_.lock();
+ try
+ {
+ if ( chksumMgr == null )
+ {
+ chksumMgr = new ChecksumManager(dataFile);
+ chksumMgrs_.put(dataFile, chksumMgr);
+ }
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ }
+ return chksumMgr;
+ }
+
+ /* TODO: Debug only */
+ public static ChecksumManager instance(String dataFile, String chkSumFile) throws IOException
+ {
+ ChecksumManager chksumMgr = chksumMgrs_.get(dataFile);
+ if ( chksumMgr == null )
+ {
+ lock_.lock();
+ try
+ {
+ if ( chksumMgr == null )
+ {
+ chksumMgr = new ChecksumManager(dataFile, chkSumFile);
+ chksumMgrs_.put(dataFile, chksumMgr);
+ }
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ }
+ return chksumMgr;
+ }
+
+
+ /**
+ * On start read all the check sum files on disk and
+ * pull them into memory.
+ * @throws IOException
+ */
+ public static void onStart() throws IOException
+ {
+ String[] directories = DatabaseDescriptor.getAllDataFileLocations();
+ List<File> allFiles = new ArrayList<File>();
+ for ( String directory : directories )
+ {
+ File file = new File(directory);
+ File[] files = file.listFiles();
+ for ( File f : files )
+ {
+ if ( f.getName().contains(ChecksumManager.checksumPrefix_) )
+ {
+ allFiles.add(f);
+ }
+ }
+ }
+
+ for ( File file : allFiles )
+ {
+ int fId = SequenceFile.getFileId(file.getName());
+ ChecksumReader chksumRdr = new ChecksumReader(file.getAbsolutePath(), 0L, file.length());
+
+ int chunk = 0;
+ while ( !chksumRdr.isEOF() )
+ {
+ long value = chksumRdr.readLong();
+ long key = ChecksumManager.key(fId, ++chunk);
+ chksums_.put(key, value);
+ }
+ }
+ }
+
+ /**
+ * On delete of this dataFile remove the checksums associated with
+ * this file from memory, remove the check sum manager instance.
+ *
+ * @param dataFile data file that is being deleted.
+ * @throws IOException
+ */
+ public static void onFileDelete(String dataFile) throws IOException
+ {
+ File f = new File(dataFile);
+ long size = f.length();
+ int fileId = SequenceFile.getFileId(f.getName());
+ int chunks = (int)(size >> 16L);
+
+ for ( int i = 0; i < chunks; ++i )
+ {
+ long key = ChecksumManager.key(fileId, i);
+ chksums_.remove(key);
+ }
+
+ /* remove the check sum manager instance */
+ chksumMgrs_.remove(dataFile);
+ String chksumFile = f.getParent() + System.getProperty("file.separator") + checksumPrefix_ + fileId + ".db";
+ FileUtils.delete(chksumFile);
+ }
+
+ private static long key(int fileId, int chunkId)
+ {
+ long key = 0;
+ key |= fileId;
+ key <<= 32;
+ key |= chunkId;
+ return key;
+ }
+
+ private RandomAccessFile raf_;
+ private Adler32 adler_ = new Adler32();
+
+ ChecksumManager(String dataFile) throws IOException
+ {
+ File file = new File(dataFile);
+ String directory = file.getParent();
+ String f = file.getName();
+ short fId = SequenceFile.getFileId(f);
+ String chkSumFile = directory + System.getProperty("file.separator") + checksumPrefix_ + fId + ".db";
+ raf_ = new RandomAccessFile(chkSumFile, "rw");
+ }
+
+ /* TODO: Remove later. */
+ ChecksumManager(String dataFile, String chkSumFile) throws IOException
+ {
+ File file = new File(dataFile);
+ String directory = file.getParent();
+ String f = file.getName();
+ short fId = SequenceFile.getFileId(f);
+ raf_ = new RandomAccessFile(chkSumFile, "rw");
+
+ file = new File(chkSumFile);
+ ChecksumReader chksumRdr = new ChecksumReader(file.getAbsolutePath(), 0L, file.length());
+
+ int chunk = 0;
+ while ( !chksumRdr.isEOF() )
+ {
+ long value = chksumRdr.readLong();
+ long key = ChecksumManager.key(fId, ++chunk);
+ chksums_.put(key, value);
+ }
+ }
+
+ /**
+ * Log the checksum for the the specified file and chunk
+ * within the file.
+ * @param fileId id associated with the file
+ * @param chunkId chunk within the file.
+ * @param buffer for which the checksum needs to be calculated.
+ * @throws IOException
+ */
+ void logChecksum(int fileId, int chunkId, byte[] buffer)
+ {
+ logChecksum(fileId, chunkId, buffer, 0, buffer.length);
+ }
+
+ /**
+ * Log the checksum for the the specified file and chunk
+ * within the file.
+ * @param fileId id associated with the file
+ * @param chunkId chunk within the file.
+ * @param buffer for which the checksum needs to be calculated.
+ * @param startoffset offset to start within the buffer
+ * @param length size of the checksum buffer.
+ * @throws IOException
+ */
+ void logChecksum(int fileId, int chunkId, byte[] buffer, int startOffset, int length)
+ {
+ try
+ {
+ adler_.update(buffer, startOffset, length);
+ long chksum = adler_.getValue();
+ adler_.reset();
+ /* log checksums to disk */
+ raf_.writeLong(chksum);
+ /* add the chksum to memory */
+ long key = ChecksumManager.key(fileId, chunkId);
+ chksums_.put(key, chksum);
+ }
+ catch ( IOException ex )
+ {
+ logger_.warn( LogUtil.throwableToString(ex) );
+ }
+ }
+
+ /**
+ * Validate checksums for the data in the buffer.
+ * @file name of the file from which data is being
+ * read.
+ * @chunkId chunkId
+ * @param buffer with data for which checksum needs to be
+ * verified.
+ * @throws IOException
+ */
+ void validateChecksum(String file, int chunkId, byte[] buffer) throws IOException
+ {
+ validateChecksum(file, chunkId, buffer, 0, buffer.length);
+ }
+
+ /**
+ * Validate checksums for the data in the buffer for the region
+ * that is encapsulated in the section object
+ * @file name of the file from which data is being
+ * read.
+ * @chunkId chunkId
+ * @param buffer with data for which checksum needs to be
+ * verified.
+ * @param startOffset within the buffer
+ * @param length of the data whose checksum needs to be verified.
+ * @throws IOException
+ */
+ void validateChecksum(String file, int chunkId, byte[] buffer, int startOffset, int length) throws IOException
+ {
+ int fId = SequenceFile.getFileId(file);
+ long key = ChecksumManager.key(fId, chunkId);
+ adler_.update(buffer, startOffset, length);
+ long currentChksum = adler_.getValue();
+ adler_.reset();
+ long oldChksum = chksums_.get(key);
+ if ( currentChksum != oldChksum )
+ {
+ throw new IOException("Checksums do not match in file " + file + " for chunk " + chunkId + ".");
+ }
+ }
+
+
+ /**
+ * Get the checksum for the specified file's chunk
+ * @param fileId id associated with the file.
+ * @param chunkId chunk within the file.
+ * @return associated checksum for the chunk
+ */
+ long getChecksum(int fileId, int chunkId)
+ {
+ long key = ChecksumManager.key(fileId, chunkId);
+ return chksums_.get(key);
+ }
+
+ public static void main(String[] args) throws Throwable
+ {
+ ChecksumReader rdr = new ChecksumReader("C:\\Engagements\\Cassandra\\Checksum-1.db");
+ while ( !rdr.isEOF() )
+ {
+ System.out.println(rdr.readLong());
+ }
+ rdr.close();
+ }
+}
+
+/**
+ * ChecksumReader is used to memory map the checksum files and
+ * load the data into memory.
+ *
+ * @author alakshman
+ *
+ */
+class ChecksumReader
+{
+ private static Logger logger_ = Logger.getLogger(ChecksumReader.class);
+ private String filename_;
+ private MappedByteBuffer buffer_;
+
+ ChecksumReader(String filename) throws IOException
+ {
+ filename_ = filename;
+ File f = new File(filename);
+ map(0, f.length());
+ }
+
+ ChecksumReader(String filename, long start, long end) throws IOException
+ {
+ filename_ = filename;
+ map(start, end);
+ }
+
+ public void map() throws IOException
+ {
+ RandomAccessFile file = new RandomAccessFile(filename_, "rw");
+ try
+ {
+ buffer_ = file.getChannel().map(FileChannel.MapMode.READ_ONLY, 0, file.length() );
+ buffer_.load();
+ }
+ finally
+ {
+ file.close();
+ }
+ }
+
+ public void map(long start, long end) throws IOException
+ {
+ if ( start < 0 || end < 0 || end < start )
+ throw new IllegalArgumentException("Invalid values for start and end.");
+
+ RandomAccessFile file = new RandomAccessFile(filename_, "rw");
+ try
+ {
+ if ( end == 0 )
+ end = file.length();
+ buffer_ = file.getChannel().map(FileChannel.MapMode.READ_ONLY, start, end);
+ buffer_.load();
+ }
+ finally
+ {
+ file.close();
+ }
+ }
+
+ void unmap(final Object buffer)
+ {
+ AccessController.doPrivileged( new PrivilegedAction<MappedByteBuffer>()
+ {
+ public MappedByteBuffer run()
+ {
+ try
+ {
+ Method getCleanerMethod = buffer.getClass().getMethod("cleaner", new Class[0]);
+ getCleanerMethod.setAccessible(true);
+ sun.misc.Cleaner cleaner = (sun.misc.Cleaner)getCleanerMethod.invoke(buffer,new Object[0]);
+ cleaner.clean();
+ }
+ catch(Throwable e)
+ {
+ logger_.debug( LogUtil.throwableToString(e) );
+ }
+ return null;
+ }
+ });
+ }
+
+ public long readLong() throws IOException
+ {
+ return buffer_.getLong();
+ }
+
+ public boolean isEOF()
+ {
+ return ( buffer_.remaining() == 0 );
+ }
+
+
+ public void close() throws IOException
+ {
+ unmap(buffer_);
+ }
+}