You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/07/30 17:30:27 UTC
svn commit: r799331 [6/29] - in /incubator/cassandra/trunk: ./
src/java/org/apache/cassandra/concurrent/
src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/
src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/gms/
src/j...
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java Thu Jul 30 15:30:21 2009
@@ -1,151 +1,151 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.io.DataOutputBuffer;
-import org.apache.cassandra.io.IndexHelper;
-import org.apache.cassandra.utils.BloomFilter;
-import org.apache.cassandra.db.marshal.AbstractType;
-
-
-/**
- * Help to create an index for a column family based on size of columns
- */
-
-public class ColumnIndexer
-{
- /**
- * Given a column family this, function creates an in-memory structure that represents the
- * column index for the column family, and subsequently writes it to disk.
- * @param columnFamily Column family to create index for
- * @param dos data output stream
- * @throws IOException
- */
- public static void serialize(ColumnFamily columnFamily, DataOutputStream dos) throws IOException
- {
- Collection<IColumn> columns = columnFamily.getSortedColumns();
- BloomFilter bf = createColumnBloomFilter(columns);
- /* Write out the bloom filter. */
- DataOutputBuffer bufOut = new DataOutputBuffer();
- BloomFilter.serializer().serialize(bf, bufOut);
- /* write the length of the serialized bloom filter. */
- dos.writeInt(bufOut.getLength());
- /* write out the serialized bytes. */
- dos.write(bufOut.getData(), 0, bufOut.getLength());
-
- /* Do the indexing */
- doIndexing(columnFamily.getComparator(), columns, dos);
- }
-
- /**
- * Create a bloom filter that contains the subcolumns and the columns that
- * make up this Column Family.
- * @param columns columns of the ColumnFamily
- * @return BloomFilter with the summarized information.
- */
- private static BloomFilter createColumnBloomFilter(Collection<IColumn> columns)
- {
- int columnCount = 0;
- for (IColumn column : columns)
- {
- columnCount += column.getObjectCount();
- }
-
- BloomFilter bf = new BloomFilter(columnCount, 4);
- for (IColumn column : columns)
- {
- bf.add(column.name());
- /* If this is SuperColumn type Column Family we need to get the subColumns too. */
- if (column instanceof SuperColumn)
- {
- Collection<IColumn> subColumns = column.getSubColumns();
- for (IColumn subColumn : subColumns)
- {
- bf.add(subColumn.name());
- }
- }
- }
- return bf;
- }
-
- /**
- * Given the collection of columns in the Column Family,
- * the name index is generated and written into the provided
- * stream
- * @param columns for whom the name index needs to be generated
- * @param dos stream into which the serialized name index needs
- * to be written.
- * @throws IOException
- */
- private static void doIndexing(AbstractType comparator, Collection<IColumn> columns, DataOutputStream dos) throws IOException
- {
- /* we are going to write column indexes */
- int numColumns = 0;
- int position = 0;
- int indexSizeInBytes = 0;
- int sizeSummarized = 0;
-
- /*
- * Maintains a list of KeyPositionInfo objects for the columns in this
- * column family. The key is the column name and the position is the
- * relative offset of that column name from the start of the list.
- * We do this so that we don't read all the columns into memory.
- */
-
- List<IndexHelper.ColumnIndexInfo> columnIndexList = new ArrayList<IndexHelper.ColumnIndexInfo>();
-
- /* column offsets at the right thresholds into the index map. */
- for ( IColumn column : columns )
- {
- /* if we hit the column index size that we have to index after, go ahead and index it */
- if(position - sizeSummarized >= DatabaseDescriptor.getColumnIndexSize())
- {
- /*
- * ColumnSort applies only to columns. So in case of
- * SuperColumn always use the name indexing scheme for
- * the SuperColumns. We will fix this later.
- */
- IndexHelper.ColumnIndexInfo cIndexInfo = new IndexHelper.ColumnIndexInfo(column.name(), 0, 0, comparator);
- cIndexInfo.position(position);
- cIndexInfo.count(numColumns);
- columnIndexList.add(cIndexInfo);
- /*
- * we will be writing this object as a UTF8 string and two ints,
- * so calculate the size accordingly. Note that we store the string
- * as UTF-8 encoded, so when we calculate the length, it should be
- * converted to UTF-8.
- */
- indexSizeInBytes += cIndexInfo.size();
- sizeSummarized = position;
- numColumns = 0;
- }
- position += column.serializedSize();
- ++numColumns;
- }
- /* write the column index list */
- IndexHelper.serialize(indexSizeInBytes, columnIndexList, dos);
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.io.IndexHelper;
+import org.apache.cassandra.utils.BloomFilter;
+import org.apache.cassandra.db.marshal.AbstractType;
+
+
+/**
+ * Help to create an index for a column family based on size of columns
+ */
+
+public class ColumnIndexer
+{
+ /**
+ * Given a column family this, function creates an in-memory structure that represents the
+ * column index for the column family, and subsequently writes it to disk.
+ * @param columnFamily Column family to create index for
+ * @param dos data output stream
+ * @throws IOException
+ */
+ public static void serialize(ColumnFamily columnFamily, DataOutputStream dos) throws IOException
+ {
+ Collection<IColumn> columns = columnFamily.getSortedColumns();
+ BloomFilter bf = createColumnBloomFilter(columns);
+ /* Write out the bloom filter. */
+ DataOutputBuffer bufOut = new DataOutputBuffer();
+ BloomFilter.serializer().serialize(bf, bufOut);
+ /* write the length of the serialized bloom filter. */
+ dos.writeInt(bufOut.getLength());
+ /* write out the serialized bytes. */
+ dos.write(bufOut.getData(), 0, bufOut.getLength());
+
+ /* Do the indexing */
+ doIndexing(columnFamily.getComparator(), columns, dos);
+ }
+
+ /**
+ * Create a bloom filter that contains the subcolumns and the columns that
+ * make up this Column Family.
+ * @param columns columns of the ColumnFamily
+ * @return BloomFilter with the summarized information.
+ */
+ private static BloomFilter createColumnBloomFilter(Collection<IColumn> columns)
+ {
+ int columnCount = 0;
+ for (IColumn column : columns)
+ {
+ columnCount += column.getObjectCount();
+ }
+
+ BloomFilter bf = new BloomFilter(columnCount, 4);
+ for (IColumn column : columns)
+ {
+ bf.add(column.name());
+ /* If this is SuperColumn type Column Family we need to get the subColumns too. */
+ if (column instanceof SuperColumn)
+ {
+ Collection<IColumn> subColumns = column.getSubColumns();
+ for (IColumn subColumn : subColumns)
+ {
+ bf.add(subColumn.name());
+ }
+ }
+ }
+ return bf;
+ }
+
+ /**
+ * Given the collection of columns in the Column Family,
+ * the name index is generated and written into the provided
+ * stream
+ * @param columns for whom the name index needs to be generated
+ * @param dos stream into which the serialized name index needs
+ * to be written.
+ * @throws IOException
+ */
+ private static void doIndexing(AbstractType comparator, Collection<IColumn> columns, DataOutputStream dos) throws IOException
+ {
+ /* we are going to write column indexes */
+ int numColumns = 0;
+ int position = 0;
+ int indexSizeInBytes = 0;
+ int sizeSummarized = 0;
+
+ /*
+ * Maintains a list of KeyPositionInfo objects for the columns in this
+ * column family. The key is the column name and the position is the
+ * relative offset of that column name from the start of the list.
+ * We do this so that we don't read all the columns into memory.
+ */
+
+ List<IndexHelper.ColumnIndexInfo> columnIndexList = new ArrayList<IndexHelper.ColumnIndexInfo>();
+
+ /* column offsets at the right thresholds into the index map. */
+ for ( IColumn column : columns )
+ {
+ /* if we hit the column index size that we have to index after, go ahead and index it */
+ if(position - sizeSummarized >= DatabaseDescriptor.getColumnIndexSize())
+ {
+ /*
+ * ColumnSort applies only to columns. So in case of
+ * SuperColumn always use the name indexing scheme for
+ * the SuperColumns. We will fix this later.
+ */
+ IndexHelper.ColumnIndexInfo cIndexInfo = new IndexHelper.ColumnIndexInfo(column.name(), 0, 0, comparator);
+ cIndexInfo.position(position);
+ cIndexInfo.count(numColumns);
+ columnIndexList.add(cIndexInfo);
+ /*
+ * we will be writing this object as a UTF8 string and two ints,
+ * so calculate the size accordingly. Note that we store the string
+ * as UTF-8 encoded, so when we calculate the length, it should be
+ * converted to UTF-8.
+ */
+ indexSizeInBytes += cIndexInfo.size();
+ sizeSummarized = position;
+ numColumns = 0;
+ }
+ position += column.serializedSize();
+ ++numColumns;
+ }
+ /* write the column index list */
+ IndexHelper.serialize(indexSizeInBytes, columnIndexList, dos);
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java Thu Jul 30 15:30:21 2009
@@ -1,562 +1,562 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db;
-
-import java.io.*;
-import java.util.*;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.io.*;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.FileUtils;
-
-import org.apache.log4j.Logger;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-
-/*
- * Commit Log tracks every write operation into the system. The aim
- * of the commit log is to be able to successfully recover data that was
- * not stored to disk via the Memtable. Every Commit Log maintains a
- * header represented by the abstraction CommitLogHeader. The header
- * contains a bit array and an array of longs and both the arrays are
- * of size, #column families for the Table, the Commit Log represents.
- *
- * Whenever a ColumnFamily is written to, for the first time its bit flag
- * is set to one in the CommitLogHeader. When it is flushed to disk by the
- * Memtable its corresponding bit in the header is set to zero. This helps
- * track which CommitLogs can be thrown away as a result of Memtable flushes.
- * Additionally, when a ColumnFamily is flushed and written to disk, its
- * entry in the array of longs is updated with the offset in the Commit Log
- * file where it was written. This helps speed up recovery since we can seek
- * to these offsets and start processing the commit log.
- *
- * Every Commit Log is rolled over everytime it reaches its threshold in size;
- * the new log inherits the "dirty" bits from the old.
- *
- * Over time there could be a number of commit logs that would be generated.
- * To allow cleaning up non-active commit logs, whenever we flush a column family and update its bit flag in
- * the active CL, we take the dirty bit array and bitwise & it with the headers of the older logs.
- * If the result is 0, then it is safe to remove the older file. (Since the new CL
- * inherited the old's dirty bitflags, getting a zero for any given bit in the anding
- * means that either the CF was clean in the old CL or it has been flushed since the
- * switch in the new.)
- *
- * The CommitLog class itself is "mostly a singleton." open() always returns one
- * instance, but log replay will bypass that.
- */
-public class CommitLog
-{
- private static volatile int SEGMENT_SIZE = 128*1024*1024; // roll after log gets this big
- private static volatile CommitLog instance_;
- private static Lock lock_ = new ReentrantLock();
- private static Logger logger_ = Logger.getLogger(CommitLog.class);
- private static Map<String, CommitLogHeader> clHeaders_ = new HashMap<String, CommitLogHeader>();
-
- private ExecutorService executor;
-
-
- public static final class CommitLogContext
- {
- static CommitLogContext NULL = new CommitLogContext(null, -1L);
- /* Commit Log associated with this operation */
- public final String file;
- /* Offset within the Commit Log where this row as added */
- public final long position;
-
- public CommitLogContext(String file, long position)
- {
- this.file = file;
- this.position = position;
- }
-
- boolean isValidContext()
- {
- return (position != -1L);
- }
- }
-
- public static class CommitLogFileComparator implements Comparator<String>
- {
- public int compare(String f, String f2)
- {
- return (int)(getCreationTime(f) - getCreationTime(f2));
- }
-
- public boolean equals(Object o)
- {
- if ( !(o instanceof CommitLogFileComparator) )
- return false;
- return true;
- }
- }
-
- public static void setSegmentSize(int size)
- {
- SEGMENT_SIZE = size;
- }
-
- static int getSegmentCount()
- {
- return clHeaders_.size();
- }
-
- static long getCreationTime(String file)
- {
- String[] entries = FBUtilities.strip(file, "-.");
- return Long.parseLong(entries[entries.length - 2]);
- }
-
- private static AbstractWriter createWriter(String file) throws IOException
- {
- return SequenceFile.writer(file);
- }
-
- static CommitLog open() throws IOException
- {
- if ( instance_ == null )
- {
- CommitLog.lock_.lock();
- try
- {
-
- if ( instance_ == null )
- {
- instance_ = new CommitLog(false);
- }
- }
- finally
- {
- CommitLog.lock_.unlock();
- }
- }
- return instance_;
- }
-
- /* Current commit log file */
- private String logFile_;
- /* header for current commit log */
- private CommitLogHeader clHeader_;
- private AbstractWriter logWriter_;
-
- /*
- * Generates a file name of the format CommitLog-<table>-<timestamp>.log in the
- * directory specified by the Database Descriptor.
- */
- private void setNextFileName()
- {
- logFile_ = DatabaseDescriptor.getLogFileLocation() + File.separator +
- "CommitLog-" + System.currentTimeMillis() + ".log";
- }
-
- /*
- * param @ table - name of table for which we are maintaining
- * this commit log.
- * param @ recoverymode - is commit log being instantiated in
- * in recovery mode.
- */
- CommitLog(boolean recoveryMode) throws IOException
- {
- if ( !recoveryMode )
- {
- executor = new CommitLogExecutorService();
- setNextFileName();
- logWriter_ = CommitLog.createWriter(logFile_);
- writeCommitLogHeader();
- }
- }
-
- /*
- * This ctor is currently used only for debugging. We
- * are now using it to modify the header so that recovery
- * can be tested in as many scenarios as we could imagine.
- *
- * param @ logFile - logfile which we wish to modify.
- */
- CommitLog(File logFile) throws IOException
- {
- logFile_ = logFile.getAbsolutePath();
- logWriter_ = CommitLog.createWriter(logFile_);
- }
-
- String getLogFile()
- {
- return logFile_;
- }
-
- private CommitLogHeader readCommitLogHeader(IFileReader logReader) throws IOException
- {
- int size = (int)logReader.readLong();
- byte[] bytes = new byte[size];
- logReader.readDirect(bytes);
- ByteArrayInputStream byteStream = new ByteArrayInputStream(bytes);
- return CommitLogHeader.serializer().deserialize(new DataInputStream(byteStream));
- }
-
- /*
- * Write the serialized commit log header into the specified commit log.
- */
- private static void writeCommitLogHeader(String commitLogFileName, byte[] bytes) throws IOException
- {
- AbstractWriter logWriter = CommitLog.createWriter(commitLogFileName);
- writeCommitLogHeader(logWriter, bytes);
- logWriter.close();
- }
-
- /*
- * This is invoked on startup via the ctor. It basically
- * writes a header with all bits set to zero.
- */
- private void writeCommitLogHeader() throws IOException
- {
- int cfSize = Table.TableMetadata.getColumnFamilyCount();
- clHeader_ = new CommitLogHeader(cfSize);
- writeCommitLogHeader(logWriter_, clHeader_.toByteArray());
- }
-
- /** writes header at the beginning of the file, then seeks back to current position */
- private void seekAndWriteCommitLogHeader(byte[] bytes) throws IOException
- {
- long currentPos = logWriter_.getCurrentPosition();
- logWriter_.seek(0);
-
- writeCommitLogHeader(logWriter_, bytes);
-
- logWriter_.seek(currentPos);
- }
-
- private static void writeCommitLogHeader(AbstractWriter logWriter, byte[] bytes) throws IOException
- {
- logWriter.writeLong(bytes.length);
- logWriter.writeDirect(bytes);
- }
-
- void recover(File[] clogs) throws IOException
- {
- DataInputBuffer bufIn = new DataInputBuffer();
-
- for (File file : clogs)
- {
- IFileReader reader = SequenceFile.reader(file.getAbsolutePath());
- CommitLogHeader clHeader = readCommitLogHeader(reader);
- /* seek to the lowest position */
- int lowPos = CommitLogHeader.getLowestPosition(clHeader);
- /*
- * If lowPos == 0 then we need to skip the processing of this
- * file.
- */
- if (lowPos == 0)
- break;
- else
- reader.seek(lowPos);
-
- Set<Table> tablesRecovered = new HashSet<Table>();
-
- /* read the logs populate RowMutation and apply */
- while ( !reader.isEOF() )
- {
- byte[] bytes;
- try
- {
- bytes = new byte[(int)reader.readLong()];
- reader.readDirect(bytes);
- }
- catch (EOFException e)
- {
- // last CL entry didn't get completely written. that's ok.
- break;
- }
- bufIn.reset(bytes, bytes.length);
-
- /* read the commit log entry */
- Row row = Row.serializer().deserialize(bufIn);
- Table table = Table.open(row.getTable());
- tablesRecovered.add(table);
- Collection<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>(row.getColumnFamilies());
- /* remove column families that have already been flushed */
- for (ColumnFamily columnFamily : columnFamilies)
- {
- /* TODO: Remove this to not process Hints */
- if ( !DatabaseDescriptor.isApplicationColumnFamily(columnFamily.name()) )
- {
- row.removeColumnFamily(columnFamily);
- continue;
- }
- int id = table.getColumnFamilyId(columnFamily.name());
- if ( !clHeader.isDirty(id) || reader.getCurrentPosition() < clHeader.getPosition(id) )
- row.removeColumnFamily(columnFamily);
- }
- if ( !row.isEmpty() )
- {
- table.applyNow(row);
- }
- }
- reader.close();
- /* apply the rows read -- success will result in the CL file being discarded */
- for (Table table : tablesRecovered)
- {
- table.flush(true);
- }
- }
- }
-
- /*
- * Update the header of the commit log if a new column family
- * is encountered for the first time.
- */
- private void maybeUpdateHeader(Row row) throws IOException
- {
- Table table = Table.open(row.getTable());
- for (ColumnFamily columnFamily : row.getColumnFamilies())
- {
- int id = table.getColumnFamilyId(columnFamily.name());
- if (!clHeader_.isDirty(id) || (clHeader_.isDirty(id) && clHeader_.getPosition(id) == 0))
- {
- clHeader_.turnOn(id, logWriter_.getCurrentPosition());
- seekAndWriteCommitLogHeader(clHeader_.toByteArray());
- }
- }
- }
-
- CommitLogContext getContext() throws IOException
- {
- Callable<CommitLogContext> task = new Callable<CommitLogContext>()
- {
- public CommitLogContext call() throws Exception
- {
- return new CommitLogContext(logFile_, logWriter_.getCurrentPosition());
- }
- };
- try
- {
- return executor.submit(task).get();
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
- catch (ExecutionException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- /*
- * Adds the specified row to the commit log. This method will reset the
- * file offset to what it is before the start of the operation in case
- * of any problems. This way we can assume that the subsequent commit log
- * entry will override the garbage left over by the previous write.
- */
- CommitLogContext add(final Row row) throws IOException
- {
- Callable<CommitLogContext> task = new LogRecordAdder(row);
-
- try
- {
- return executor.submit(task).get();
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
- catch (ExecutionException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- /*
- * This is called on Memtable flush to add to the commit log
- * a token indicating that this column family has been flushed.
- * The bit flag associated with this column family is set in the
- * header and this is used to decide if the log file can be deleted.
- */
- void onMemtableFlush(final String tableName, final String cf, final CommitLog.CommitLogContext cLogCtx) throws IOException
- {
- Callable task = new Callable()
- {
- public Object call() throws IOException
- {
- Table table = Table.open(tableName);
- int id = table.getColumnFamilyId(cf);
- discardCompletedSegments(cLogCtx, id);
- return null;
- }
- };
- try
- {
- executor.submit(task).get();
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
- catch (ExecutionException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- /*
- * Delete log segments whose contents have been turned into SSTables.
- *
- * param @ cLogCtx The commitLog context .
- * param @ id id of the columnFamily being flushed to disk.
- *
- */
- private void discardCompletedSegments(CommitLog.CommitLogContext cLogCtx, int id) throws IOException
- {
- /* retrieve the commit log header associated with the file in the context */
- CommitLogHeader commitLogHeader = clHeaders_.get(cLogCtx.file);
- if(commitLogHeader == null )
- {
- if( logFile_.equals(cLogCtx.file) )
- {
- /* this means we are dealing with the current commit log. */
- commitLogHeader = clHeader_;
- clHeaders_.put(cLogCtx.file, clHeader_);
- }
- else
- return;
- }
-
- /*
- * log replay assumes that we only have to look at entries past the last
- * flush position, so verify that this flush happens after the last.
- * (Currently Memtables are flushed on a single thread so this should be fine.)
- */
- assert cLogCtx.position >= commitLogHeader.getPosition(id);
-
- commitLogHeader.turnOff(id);
- /* Sort the commit logs based on creation time */
- List<String> oldFiles = new ArrayList<String>(clHeaders_.keySet());
- Collections.sort(oldFiles, new CommitLogFileComparator());
- List<String> listOfDeletedFiles = new ArrayList<String>();
- /*
- * Loop through all the commit log files in the history. Now process
- * all files that are older than the one in the context. For each of
- * these files the header needs to modified by performing a bitwise &
- * of the header with the header of the file in the context. If we
- * encounter the file in the context in our list of old commit log files
- * then we update the header and write it back to the commit log.
- */
- for(String oldFile : oldFiles)
- {
- if(oldFile.equals(cLogCtx.file))
- {
- /*
- * We need to turn on again. This is because we always keep
- * the bit turned on and the position indicates from where the
- * commit log needs to be read. When a flush occurs we turn off
- * perform & operation and then turn on with the new position.
- */
- commitLogHeader.turnOn(id, cLogCtx.position);
- writeCommitLogHeader(cLogCtx.file, commitLogHeader.toByteArray());
- break;
- }
- else
- {
- CommitLogHeader oldCommitLogHeader = clHeaders_.get(oldFile);
- oldCommitLogHeader.and(commitLogHeader);
- if(oldCommitLogHeader.isSafeToDelete())
- {
- if (logger_.isDebugEnabled())
- logger_.debug("Deleting commit log:"+ oldFile);
- FileUtils.deleteAsync(oldFile);
- listOfDeletedFiles.add(oldFile);
- }
- else
- {
- writeCommitLogHeader(oldFile, oldCommitLogHeader.toByteArray());
- }
- }
- }
-
- for ( String deletedFile : listOfDeletedFiles)
- {
- clHeaders_.remove(deletedFile);
- }
- }
-
- private boolean maybeRollLog() throws IOException
- {
- if (logWriter_.getFileSize() >= SEGMENT_SIZE)
- {
- /* Rolls the current log file over to a new one. */
- setNextFileName();
- String oldLogFile = logWriter_.getFileName();
- logWriter_.close();
-
- /* point reader/writer to a new commit log file. */
- logWriter_ = CommitLog.createWriter(logFile_);
- /* squirrel away the old commit log header */
- clHeaders_.put(oldLogFile, new CommitLogHeader(clHeader_));
- // we leave the old 'dirty' bits alone, so we can test for
- // whether it's safe to remove a given log segment by and-ing its dirty
- // with the current one.
- clHeader_.zeroPositions();
- writeCommitLogHeader(logWriter_, clHeader_.toByteArray());
- return true;
- }
- return false;
- }
-
- void sync() throws IOException
- {
- logWriter_.sync();
- }
-
- class LogRecordAdder implements Callable<CommitLog.CommitLogContext>
- {
- Row row;
-
- LogRecordAdder(Row row)
- {
- this.row = row;
- }
-
- public CommitLog.CommitLogContext call() throws Exception
- {
- long currentPosition = -1L;
- DataOutputBuffer cfBuffer = new DataOutputBuffer();
- try
- {
- /* serialize the row */
- Row.serializer().serialize(row, cfBuffer);
- currentPosition = logWriter_.getCurrentPosition();
- CommitLogContext cLogCtx = new CommitLogContext(logFile_, currentPosition);
- /* Update the header */
- maybeUpdateHeader(row);
- logWriter_.writeLong(cfBuffer.getLength());
- logWriter_.append(cfBuffer);
- maybeRollLog();
- return cLogCtx;
- }
- catch (IOException e)
- {
- if ( currentPosition != -1 )
- logWriter_.seek(currentPosition);
- throw e;
- }
- }
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.*;
+import java.util.*;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.*;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.FileUtils;
+
+import org.apache.log4j.Logger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+
+/*
+ * Commit Log tracks every write operation into the system. The aim
+ * of the commit log is to be able to successfully recover data that was
+ * not stored to disk via the Memtable. Every Commit Log maintains a
+ * header represented by the abstraction CommitLogHeader. The header
+ * contains a bit array and an array of longs and both the arrays are
+ * of size, #column families for the Table, the Commit Log represents.
+ *
+ * Whenever a ColumnFamily is written to, for the first time its bit flag
+ * is set to one in the CommitLogHeader. When it is flushed to disk by the
+ * Memtable its corresponding bit in the header is set to zero. This helps
+ * track which CommitLogs can be thrown away as a result of Memtable flushes.
+ * Additionally, when a ColumnFamily is flushed and written to disk, its
+ * entry in the array of longs is updated with the offset in the Commit Log
+ * file where it was written. This helps speed up recovery since we can seek
+ * to these offsets and start processing the commit log.
+ *
+ * Every Commit Log is rolled over everytime it reaches its threshold in size;
+ * the new log inherits the "dirty" bits from the old.
+ *
+ * Over time there could be a number of commit logs that would be generated.
+ * To allow cleaning up non-active commit logs, whenever we flush a column family and update its bit flag in
+ * the active CL, we take the dirty bit array and bitwise & it with the headers of the older logs.
+ * If the result is 0, then it is safe to remove the older file. (Since the new CL
+ * inherited the old's dirty bitflags, getting a zero for any given bit in the anding
+ * means that either the CF was clean in the old CL or it has been flushed since the
+ * switch in the new.)
+ *
+ * The CommitLog class itself is "mostly a singleton." open() always returns one
+ * instance, but log replay will bypass that.
+ */
+public class CommitLog
+{
+ private static volatile int SEGMENT_SIZE = 128*1024*1024; // roll after log gets this big
+ private static volatile CommitLog instance_;
+ private static Lock lock_ = new ReentrantLock();
+ private static Logger logger_ = Logger.getLogger(CommitLog.class);
+ private static Map<String, CommitLogHeader> clHeaders_ = new HashMap<String, CommitLogHeader>();
+
+ private ExecutorService executor;
+
+
+ public static final class CommitLogContext
+ {
+ static CommitLogContext NULL = new CommitLogContext(null, -1L);
+ /* Commit Log associated with this operation */
+ public final String file;
+ /* Offset within the Commit Log where this row as added */
+ public final long position;
+
+ public CommitLogContext(String file, long position)
+ {
+ this.file = file;
+ this.position = position;
+ }
+
+ boolean isValidContext()
+ {
+ return (position != -1L);
+ }
+ }
+
+ public static class CommitLogFileComparator implements Comparator<String>
+ {
+ public int compare(String f, String f2)
+ {
+ return (int)(getCreationTime(f) - getCreationTime(f2));
+ }
+
+ public boolean equals(Object o)
+ {
+ if ( !(o instanceof CommitLogFileComparator) )
+ return false;
+ return true;
+ }
+ }
+
+ public static void setSegmentSize(int size)
+ {
+ SEGMENT_SIZE = size;
+ }
+
+ static int getSegmentCount()
+ {
+ return clHeaders_.size();
+ }
+
+ static long getCreationTime(String file)
+ {
+ String[] entries = FBUtilities.strip(file, "-.");
+ return Long.parseLong(entries[entries.length - 2]);
+ }
+
+ private static AbstractWriter createWriter(String file) throws IOException
+ {
+ return SequenceFile.writer(file);
+ }
+
+ static CommitLog open() throws IOException
+ {
+ if ( instance_ == null )
+ {
+ CommitLog.lock_.lock();
+ try
+ {
+
+ if ( instance_ == null )
+ {
+ instance_ = new CommitLog(false);
+ }
+ }
+ finally
+ {
+ CommitLog.lock_.unlock();
+ }
+ }
+ return instance_;
+ }
+
+ /* Current commit log file */
+ private String logFile_;
+ /* header for current commit log */
+ private CommitLogHeader clHeader_;
+ private AbstractWriter logWriter_;
+
+ /*
+ * Generates a file name of the format CommitLog-<table>-<timestamp>.log in the
+ * directory specified by the Database Descriptor.
+ */
+ private void setNextFileName()
+ {
+ logFile_ = DatabaseDescriptor.getLogFileLocation() + File.separator +
+ "CommitLog-" + System.currentTimeMillis() + ".log";
+ }
+
+ /*
+ * param @ table - name of table for which we are maintaining
+ * this commit log.
+ * param @ recoverymode - is commit log being instantiated in
+ * in recovery mode.
+ */
+ CommitLog(boolean recoveryMode) throws IOException
+ {
+ if ( !recoveryMode )
+ {
+ executor = new CommitLogExecutorService();
+ setNextFileName();
+ logWriter_ = CommitLog.createWriter(logFile_);
+ writeCommitLogHeader();
+ }
+ }
+
+ /*
+ * This ctor is currently used only for debugging. We
+ * are now using it to modify the header so that recovery
+ * can be tested in as many scenarios as we could imagine.
+ *
+ * param @ logFile - logfile which we wish to modify.
+ */
+ CommitLog(File logFile) throws IOException
+ {
+ logFile_ = logFile.getAbsolutePath();
+ logWriter_ = CommitLog.createWriter(logFile_);
+ }
+
+ String getLogFile()
+ {
+ return logFile_;
+ }
+
+ private CommitLogHeader readCommitLogHeader(IFileReader logReader) throws IOException
+ {
+ int size = (int)logReader.readLong();
+ byte[] bytes = new byte[size];
+ logReader.readDirect(bytes);
+ ByteArrayInputStream byteStream = new ByteArrayInputStream(bytes);
+ return CommitLogHeader.serializer().deserialize(new DataInputStream(byteStream));
+ }
+
+ /*
+ * Write the serialized commit log header into the specified commit log.
+ */
+ private static void writeCommitLogHeader(String commitLogFileName, byte[] bytes) throws IOException
+ {
+ AbstractWriter logWriter = CommitLog.createWriter(commitLogFileName);
+ writeCommitLogHeader(logWriter, bytes);
+ logWriter.close();
+ }
+
+ /*
+ * This is invoked on startup via the ctor. It basically
+ * writes a header with all bits set to zero.
+ */
+ private void writeCommitLogHeader() throws IOException
+ {
+ int cfSize = Table.TableMetadata.getColumnFamilyCount();
+ clHeader_ = new CommitLogHeader(cfSize);
+ writeCommitLogHeader(logWriter_, clHeader_.toByteArray());
+ }
+
+ /** writes header at the beginning of the file, then seeks back to current position */
+ private void seekAndWriteCommitLogHeader(byte[] bytes) throws IOException
+ {
+ long currentPos = logWriter_.getCurrentPosition();
+ logWriter_.seek(0);
+
+ writeCommitLogHeader(logWriter_, bytes);
+
+ logWriter_.seek(currentPos);
+ }
+
+ private static void writeCommitLogHeader(AbstractWriter logWriter, byte[] bytes) throws IOException
+ {
+ logWriter.writeLong(bytes.length);
+ logWriter.writeDirect(bytes);
+ }
+
+ void recover(File[] clogs) throws IOException
+ {
+ DataInputBuffer bufIn = new DataInputBuffer();
+
+ for (File file : clogs)
+ {
+ IFileReader reader = SequenceFile.reader(file.getAbsolutePath());
+ CommitLogHeader clHeader = readCommitLogHeader(reader);
+ /* seek to the lowest position */
+ int lowPos = CommitLogHeader.getLowestPosition(clHeader);
+ /*
+ * If lowPos == 0 then we need to skip the processing of this
+ * file.
+ */
+ if (lowPos == 0)
+ break;
+ else
+ reader.seek(lowPos);
+
+ Set<Table> tablesRecovered = new HashSet<Table>();
+
+ /* read the logs populate RowMutation and apply */
+ while ( !reader.isEOF() )
+ {
+ byte[] bytes;
+ try
+ {
+ bytes = new byte[(int)reader.readLong()];
+ reader.readDirect(bytes);
+ }
+ catch (EOFException e)
+ {
+ // last CL entry didn't get completely written. that's ok.
+ break;
+ }
+ bufIn.reset(bytes, bytes.length);
+
+ /* read the commit log entry */
+ Row row = Row.serializer().deserialize(bufIn);
+ Table table = Table.open(row.getTable());
+ tablesRecovered.add(table);
+ Collection<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>(row.getColumnFamilies());
+ /* remove column families that have already been flushed */
+ for (ColumnFamily columnFamily : columnFamilies)
+ {
+ /* TODO: Remove this to not process Hints */
+ if ( !DatabaseDescriptor.isApplicationColumnFamily(columnFamily.name()) )
+ {
+ row.removeColumnFamily(columnFamily);
+ continue;
+ }
+ int id = table.getColumnFamilyId(columnFamily.name());
+ if ( !clHeader.isDirty(id) || reader.getCurrentPosition() < clHeader.getPosition(id) )
+ row.removeColumnFamily(columnFamily);
+ }
+ if ( !row.isEmpty() )
+ {
+ table.applyNow(row);
+ }
+ }
+ reader.close();
+ /* apply the rows read -- success will result in the CL file being discarded */
+ for (Table table : tablesRecovered)
+ {
+ table.flush(true);
+ }
+ }
+ }
+
+ /*
+ * Update the header of the commit log if a new column family
+ * is encountered for the first time.
+ */
+ private void maybeUpdateHeader(Row row) throws IOException
+ {
+ Table table = Table.open(row.getTable());
+ for (ColumnFamily columnFamily : row.getColumnFamilies())
+ {
+ int id = table.getColumnFamilyId(columnFamily.name());
+ if (!clHeader_.isDirty(id) || (clHeader_.isDirty(id) && clHeader_.getPosition(id) == 0))
+ {
+ clHeader_.turnOn(id, logWriter_.getCurrentPosition());
+ seekAndWriteCommitLogHeader(clHeader_.toByteArray());
+ }
+ }
+ }
+
+ CommitLogContext getContext() throws IOException
+ {
+ Callable<CommitLogContext> task = new Callable<CommitLogContext>()
+ {
+ public CommitLogContext call() throws Exception
+ {
+ return new CommitLogContext(logFile_, logWriter_.getCurrentPosition());
+ }
+ };
+ try
+ {
+ return executor.submit(task).get();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (ExecutionException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /*
+ * Adds the specified row to the commit log. This method will reset the
+ * file offset to what it is before the start of the operation in case
+ * of any problems. This way we can assume that the subsequent commit log
+ * entry will override the garbage left over by the previous write.
+ */
+ CommitLogContext add(final Row row) throws IOException
+ {
+ Callable<CommitLogContext> task = new LogRecordAdder(row);
+
+ try
+ {
+ return executor.submit(task).get();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (ExecutionException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /*
+ * This is called on Memtable flush to add to the commit log
+ * a token indicating that this column family has been flushed.
+ * The bit flag associated with this column family is set in the
+ * header and this is used to decide if the log file can be deleted.
+ */
+ void onMemtableFlush(final String tableName, final String cf, final CommitLog.CommitLogContext cLogCtx) throws IOException
+ {
+ Callable task = new Callable()
+ {
+ public Object call() throws IOException
+ {
+ Table table = Table.open(tableName);
+ int id = table.getColumnFamilyId(cf);
+ discardCompletedSegments(cLogCtx, id);
+ return null;
+ }
+ };
+ try
+ {
+ executor.submit(task).get();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (ExecutionException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /*
+ * Delete log segments whose contents have been turned into SSTables.
+ *
+ * param @ cLogCtx The commitLog context .
+ * param @ id id of the columnFamily being flushed to disk.
+ *
+ */
+ private void discardCompletedSegments(CommitLog.CommitLogContext cLogCtx, int id) throws IOException
+ {
+ /* retrieve the commit log header associated with the file in the context */
+ CommitLogHeader commitLogHeader = clHeaders_.get(cLogCtx.file);
+ if(commitLogHeader == null )
+ {
+ if( logFile_.equals(cLogCtx.file) )
+ {
+ /* this means we are dealing with the current commit log. */
+ commitLogHeader = clHeader_;
+ clHeaders_.put(cLogCtx.file, clHeader_);
+ }
+ else
+ return;
+ }
+
+ /*
+ * log replay assumes that we only have to look at entries past the last
+ * flush position, so verify that this flush happens after the last.
+ * (Currently Memtables are flushed on a single thread so this should be fine.)
+ */
+ assert cLogCtx.position >= commitLogHeader.getPosition(id);
+
+ commitLogHeader.turnOff(id);
+ /* Sort the commit logs based on creation time */
+ List<String> oldFiles = new ArrayList<String>(clHeaders_.keySet());
+ Collections.sort(oldFiles, new CommitLogFileComparator());
+ List<String> listOfDeletedFiles = new ArrayList<String>();
+ /*
+ * Loop through all the commit log files in the history. Now process
+ * all files that are older than the one in the context. For each of
+ * these files the header needs to modified by performing a bitwise &
+ * of the header with the header of the file in the context. If we
+ * encounter the file in the context in our list of old commit log files
+ * then we update the header and write it back to the commit log.
+ */
+ for(String oldFile : oldFiles)
+ {
+ if(oldFile.equals(cLogCtx.file))
+ {
+ /*
+ * We need to turn on again. This is because we always keep
+ * the bit turned on and the position indicates from where the
+ * commit log needs to be read. When a flush occurs we turn off
+ * perform & operation and then turn on with the new position.
+ */
+ commitLogHeader.turnOn(id, cLogCtx.position);
+ writeCommitLogHeader(cLogCtx.file, commitLogHeader.toByteArray());
+ break;
+ }
+ else
+ {
+ CommitLogHeader oldCommitLogHeader = clHeaders_.get(oldFile);
+ oldCommitLogHeader.and(commitLogHeader);
+ if(oldCommitLogHeader.isSafeToDelete())
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug("Deleting commit log:"+ oldFile);
+ FileUtils.deleteAsync(oldFile);
+ listOfDeletedFiles.add(oldFile);
+ }
+ else
+ {
+ writeCommitLogHeader(oldFile, oldCommitLogHeader.toByteArray());
+ }
+ }
+ }
+
+ for ( String deletedFile : listOfDeletedFiles)
+ {
+ clHeaders_.remove(deletedFile);
+ }
+ }
+
+ private boolean maybeRollLog() throws IOException
+ {
+ if (logWriter_.getFileSize() >= SEGMENT_SIZE)
+ {
+ /* Rolls the current log file over to a new one. */
+ setNextFileName();
+ String oldLogFile = logWriter_.getFileName();
+ logWriter_.close();
+
+ /* point reader/writer to a new commit log file. */
+ logWriter_ = CommitLog.createWriter(logFile_);
+ /* squirrel away the old commit log header */
+ clHeaders_.put(oldLogFile, new CommitLogHeader(clHeader_));
+ // we leave the old 'dirty' bits alone, so we can test for
+ // whether it's safe to remove a given log segment by and-ing its dirty
+ // with the current one.
+ clHeader_.zeroPositions();
+ writeCommitLogHeader(logWriter_, clHeader_.toByteArray());
+ return true;
+ }
+ return false;
+ }
+
+ void sync() throws IOException
+ {
+ logWriter_.sync();
+ }
+
+ class LogRecordAdder implements Callable<CommitLog.CommitLogContext>
+ {
+ Row row;
+
+ LogRecordAdder(Row row)
+ {
+ this.row = row;
+ }
+
+ public CommitLog.CommitLogContext call() throws Exception
+ {
+ long currentPosition = -1L;
+ DataOutputBuffer cfBuffer = new DataOutputBuffer();
+ try
+ {
+ /* serialize the row */
+ Row.serializer().serialize(row, cfBuffer);
+ currentPosition = logWriter_.getCurrentPosition();
+ CommitLogContext cLogCtx = new CommitLogContext(logFile_, currentPosition);
+ /* Update the header */
+ maybeUpdateHeader(row);
+ logWriter_.writeLong(cfBuffer.getLength());
+ logWriter_.append(cfBuffer);
+ maybeRollLog();
+ return cLogCtx;
+ }
+ catch (IOException e)
+ {
+ if ( currentPosition != -1 )
+ logWriter_.seek(currentPosition);
+ throw e;
+ }
+ }
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogHeader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogHeader.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogHeader.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogHeader.java Thu Jul 30 15:30:21 2009
@@ -1,184 +1,184 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db;
-
-import java.io.*;
-import java.util.BitSet;
-
-import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.utils.BitSetSerializer;
-import org.apache.cassandra.config.DatabaseDescriptor;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-class CommitLogHeader
-{
- private static CommitLogHeaderSerializer serializer = new CommitLogHeaderSerializer();
-
- static CommitLogHeaderSerializer serializer()
- {
- return serializer;
- }
-
- public static BitSet and(byte[] bytes1, byte[] bytes2) throws IOException
- {
- DataInputBuffer bufIn = new DataInputBuffer();
- bufIn.reset(bytes1, 0, bytes1.length);
- CommitLogHeader header1 = serializer.deserialize(bufIn);
- bufIn.reset(bytes2, 0, bytes2.length);
- CommitLogHeader header2 = serializer.deserialize(bufIn);
- header1.and(header2);
- return header1.dirty;
- }
-
- static int getLowestPosition(CommitLogHeader clHeader)
- {
- int minPosition = Integer.MAX_VALUE;
- for ( int position : clHeader.lastFlushedAt)
- {
- if ( position < minPosition && position > 0)
- {
- minPosition = position;
- }
- }
-
- if(minPosition == Integer.MAX_VALUE)
- minPosition = 0;
- return minPosition;
- }
-
- private BitSet dirty; // columnfamilies with un-flushed data in this CommitLog
- private int[] lastFlushedAt; // position at which each CF was last flushed
-
- CommitLogHeader(int size)
- {
- dirty = new BitSet(size);
- lastFlushedAt = new int[size];
- }
-
- /*
- * This ctor is used while deserializing. This ctor
- * also builds an index of position to column family
- * Id.
- */
- CommitLogHeader(BitSet dirty, int[] lastFlushedAt)
- {
- this.dirty = dirty;
- this.lastFlushedAt = lastFlushedAt;
- }
-
- CommitLogHeader(CommitLogHeader clHeader)
- {
- dirty = (BitSet)clHeader.dirty.clone();
- lastFlushedAt = new int[clHeader.lastFlushedAt.length];
- System.arraycopy(clHeader.lastFlushedAt, 0, lastFlushedAt, 0, lastFlushedAt.length);
- }
-
- boolean isDirty(int index)
- {
- return dirty.get(index);
- }
-
- int getPosition(int index)
- {
- return lastFlushedAt[index];
- }
-
- void turnOn(int index, long position)
- {
- dirty.set(index);
- lastFlushedAt[index] = (int) position;
- }
-
- void turnOff(int index)
- {
- dirty.set(index, false);
- lastFlushedAt[index] = 0;
- }
-
- boolean isSafeToDelete() throws IOException
- {
- return dirty.isEmpty();
- }
-
- void zeroPositions()
- {
- int size = lastFlushedAt.length;
- lastFlushedAt = new int[size];
- }
-
- void and(CommitLogHeader commitLogHeader)
- {
- dirty.and(commitLogHeader.dirty);
- }
-
- byte[] toByteArray() throws IOException
- {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(bos);
- CommitLogHeader.serializer().serialize(this, dos);
- return bos.toByteArray();
- }
-
- public String toString()
- {
- StringBuilder sb = new StringBuilder("");
- for ( int i = 0; i < dirty.size(); ++i )
- {
- sb.append((dirty.get(i) ? 0 : 1));
- sb.append(":");
- sb.append(Table.TableMetadata.getColumnFamilyName(i));
- sb.append(" ");
- }
- sb.append(" | " );
- for ( int position : lastFlushedAt)
- {
- sb.append(position);
- sb.append(" ");
- }
- return sb.toString();
- }
-
- static class CommitLogHeaderSerializer implements ICompactSerializer<CommitLogHeader>
- {
- public void serialize(CommitLogHeader clHeader, DataOutputStream dos) throws IOException
- {
- BitSetSerializer.serialize(clHeader.dirty, dos);
- dos.writeInt(clHeader.lastFlushedAt.length);
- for (int position : clHeader.lastFlushedAt)
- {
- dos.writeInt(position);
- }
- }
-
- public CommitLogHeader deserialize(DataInputStream dis) throws IOException
- {
- BitSet bitFlags = BitSetSerializer.deserialize(dis);
- int[] position = new int[dis.readInt()];
- for (int i = 0; i < position.length; ++i)
- {
- position[i] = dis.readInt();
- }
- return new CommitLogHeader(bitFlags, position);
- }
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.*;
+import java.util.BitSet;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.utils.BitSetSerializer;
+import org.apache.cassandra.config.DatabaseDescriptor;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class CommitLogHeader
+{
+ private static CommitLogHeaderSerializer serializer = new CommitLogHeaderSerializer();
+
+ static CommitLogHeaderSerializer serializer()
+ {
+ return serializer;
+ }
+
+ public static BitSet and(byte[] bytes1, byte[] bytes2) throws IOException
+ {
+ DataInputBuffer bufIn = new DataInputBuffer();
+ bufIn.reset(bytes1, 0, bytes1.length);
+ CommitLogHeader header1 = serializer.deserialize(bufIn);
+ bufIn.reset(bytes2, 0, bytes2.length);
+ CommitLogHeader header2 = serializer.deserialize(bufIn);
+ header1.and(header2);
+ return header1.dirty;
+ }
+
+ static int getLowestPosition(CommitLogHeader clHeader)
+ {
+ int minPosition = Integer.MAX_VALUE;
+ for ( int position : clHeader.lastFlushedAt)
+ {
+ if ( position < minPosition && position > 0)
+ {
+ minPosition = position;
+ }
+ }
+
+ if(minPosition == Integer.MAX_VALUE)
+ minPosition = 0;
+ return minPosition;
+ }
+
+ private BitSet dirty; // columnfamilies with un-flushed data in this CommitLog
+ private int[] lastFlushedAt; // position at which each CF was last flushed
+
+ CommitLogHeader(int size)
+ {
+ dirty = new BitSet(size);
+ lastFlushedAt = new int[size];
+ }
+
+ /*
+ * This ctor is used while deserializing. This ctor
+ * also builds an index of position to column family
+ * Id.
+ */
+ CommitLogHeader(BitSet dirty, int[] lastFlushedAt)
+ {
+ this.dirty = dirty;
+ this.lastFlushedAt = lastFlushedAt;
+ }
+
+ CommitLogHeader(CommitLogHeader clHeader)
+ {
+ dirty = (BitSet)clHeader.dirty.clone();
+ lastFlushedAt = new int[clHeader.lastFlushedAt.length];
+ System.arraycopy(clHeader.lastFlushedAt, 0, lastFlushedAt, 0, lastFlushedAt.length);
+ }
+
+ boolean isDirty(int index)
+ {
+ return dirty.get(index);
+ }
+
+ int getPosition(int index)
+ {
+ return lastFlushedAt[index];
+ }
+
+ void turnOn(int index, long position)
+ {
+ dirty.set(index);
+ lastFlushedAt[index] = (int) position;
+ }
+
+ void turnOff(int index)
+ {
+ dirty.set(index, false);
+ lastFlushedAt[index] = 0;
+ }
+
+ boolean isSafeToDelete() throws IOException
+ {
+ return dirty.isEmpty();
+ }
+
+ void zeroPositions()
+ {
+ int size = lastFlushedAt.length;
+ lastFlushedAt = new int[size];
+ }
+
+ void and(CommitLogHeader commitLogHeader)
+ {
+ dirty.and(commitLogHeader.dirty);
+ }
+
+ byte[] toByteArray() throws IOException
+ {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ CommitLogHeader.serializer().serialize(this, dos);
+ return bos.toByteArray();
+ }
+
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder("");
+ for ( int i = 0; i < dirty.size(); ++i )
+ {
+ sb.append((dirty.get(i) ? 0 : 1));
+ sb.append(":");
+ sb.append(Table.TableMetadata.getColumnFamilyName(i));
+ sb.append(" ");
+ }
+ sb.append(" | " );
+ for ( int position : lastFlushedAt)
+ {
+ sb.append(position);
+ sb.append(" ");
+ }
+ return sb.toString();
+ }
+
+ static class CommitLogHeaderSerializer implements ICompactSerializer<CommitLogHeader>
+ {
+ public void serialize(CommitLogHeader clHeader, DataOutputStream dos) throws IOException
+ {
+ BitSetSerializer.serialize(clHeader.dirty, dos);
+ dos.writeInt(clHeader.lastFlushedAt.length);
+ for (int position : clHeader.lastFlushedAt)
+ {
+ dos.writeInt(position);
+ }
+ }
+
+ public CommitLogHeader deserialize(DataInputStream dis) throws IOException
+ {
+ BitSet bitFlags = BitSetSerializer.deserialize(dis);
+ int[] position = new int[dis.readInt()];
+ for (int i = 0; i < position.length; ++i)
+ {
+ position[i] = dis.readInt();
+ }
+ return new CommitLogHeader(bitFlags, position);
+ }
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactSerializerInvocationHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactSerializerInvocationHandler.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactSerializerInvocationHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactSerializerInvocationHandler.java Thu Jul 30 15:30:21 2009
@@ -1,54 +1,54 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db;
-
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Method;
-
-import org.apache.cassandra.io.DataOutputBuffer;
-import org.apache.cassandra.io.ICompactSerializer;
-
-
-/*
- * This is the abstraction that pre-processes calls to implementations
- * of the ICompactSerializer serialize() via dynamic proxies.
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class CompactSerializerInvocationHandler<T> implements InvocationHandler
-{
- private ICompactSerializer<T> serializer_;
-
- public CompactSerializerInvocationHandler(ICompactSerializer<T> serializer)
- {
- serializer_ = serializer;
- }
-
- /*
- * This dynamic runtime proxy adds the indexes before the actual columns are serialized.
- */
- public Object invoke(Object proxy, Method m, Object[] args) throws Throwable
- {
- /* Do the pre-processing here. */
- ColumnFamily cf = (ColumnFamily)args[0];
- DataOutputBuffer bufOut = (DataOutputBuffer)args[1];
- ColumnIndexer.serialize(cf, bufOut);
- return m.invoke(serializer_, args);
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.io.ICompactSerializer;
+
+
+/*
+ * This is the abstraction that pre-processes calls to implementations
+ * of the ICompactSerializer serialize() via dynamic proxies.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class CompactSerializerInvocationHandler<T> implements InvocationHandler
+{
+ private ICompactSerializer<T> serializer_;
+
+ public CompactSerializerInvocationHandler(ICompactSerializer<T> serializer)
+ {
+ serializer_ = serializer;
+ }
+
+ /*
+ * This dynamic runtime proxy adds the indexes before the actual columns are serialized.
+ */
+ public Object invoke(Object proxy, Method m, Object[] args) throws Throwable
+ {
+ /* Do the pre-processing here. */
+ ColumnFamily cf = (ColumnFamily)args[0];
+ DataOutputBuffer bufOut = (DataOutputBuffer)args[1];
+ ColumnIndexer.serialize(cf, bufOut);
+ return m.invoke(serializer_, args);
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DBConstants.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DBConstants.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DBConstants.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DBConstants.java Thu Jul 30 15:30:21 2009
@@ -16,12 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.cassandra.db;
-
-final class DBConstants
-{
- public static final int boolSize_ = 1;
- public static final int intSize_ = 4;
- public static final int longSize_ = 8;
- public static final int tsSize_ = 8;
-}
+package org.apache.cassandra.db;
+
+final class DBConstants
+{
+ public static final int boolSize_ = 1;
+ public static final int intSize_ = 4;
+ public static final int longSize_ = 8;
+ public static final int tsSize_ = 8;
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DataFileVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DataFileVerbHandler.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DataFileVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DataFileVerbHandler.java Thu Jul 30 15:30:21 2009
@@ -1,63 +1,63 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-package org.apache.cassandra.db;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.io.SSTableReader;
-
-import org.apache.log4j.Logger;
-
-
-public class DataFileVerbHandler implements IVerbHandler
-{
- private static Logger logger_ = Logger.getLogger( DataFileVerbHandler.class );
-
- public void doVerb(Message message)
- {
- byte[] bytes = message.getMessageBody();
- String table = new String(bytes);
- logger_.info("**** Received a request from " + message.getFrom());
-
- try
- {
- List<SSTableReader> ssTables = Table.open(table).getAllSSTablesOnDisk();
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(bos);
- dos.writeInt(ssTables.size());
- for (SSTableReader sstable : ssTables)
- {
- dos.writeUTF(sstable.getFilename());
- }
- Message response = message.getReply( StorageService.getLocalStorageEndPoint(), bos.toByteArray());
- MessagingService.getMessagingInstance().sendOneWay(response, message.getFrom());
- }
- catch (IOException ex)
- {
- logger_.error("Error listing data files", ex);
- }
- }
-}
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.db;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.io.SSTableReader;
+
+import org.apache.log4j.Logger;
+
+
+public class DataFileVerbHandler implements IVerbHandler
+{
+ private static Logger logger_ = Logger.getLogger( DataFileVerbHandler.class );
+
+ public void doVerb(Message message)
+ {
+ byte[] bytes = message.getMessageBody();
+ String table = new String(bytes);
+ logger_.info("**** Received a request from " + message.getFrom());
+
+ try
+ {
+ List<SSTableReader> ssTables = Table.open(table).getAllSSTablesOnDisk();
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ dos.writeInt(ssTables.size());
+ for (SSTableReader sstable : ssTables)
+ {
+ dos.writeUTF(sstable.getFilename());
+ }
+ Message response = message.getReply( StorageService.getLocalStorageEndPoint(), bos.toByteArray());
+ MessagingService.getMessagingInstance().sendOneWay(response, message.getFrom());
+ }
+ catch (IOException ex)
+ {
+ logger_.error("Error listing data files", ex);
+ }
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/FileNameComparator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/FileNameComparator.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/FileNameComparator.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/FileNameComparator.java Thu Jul 30 15:30:21 2009
@@ -16,27 +16,27 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.cassandra.db;
-
-import java.util.Comparator;
-
-class FileNameComparator implements Comparator<String>
-{
- private int order_ = 1 ;
-
- public static final int Ascending = 0 ;
- public static final int Descending = 1 ;
-
- FileNameComparator( int order )
- {
- order_ = order;
- }
-
- public int compare(String f, String f2)
- {
- if( order_ == 1 )
- return ColumnFamilyStore.getIndexFromFileName(f2) - ColumnFamilyStore.getIndexFromFileName(f);
- else
- return ColumnFamilyStore.getIndexFromFileName(f) - ColumnFamilyStore.getIndexFromFileName(f2);
- }
+package org.apache.cassandra.db;
+
+import java.util.Comparator;
+
+class FileNameComparator implements Comparator<String>
+{
+ private int order_ = 1 ;
+
+ public static final int Ascending = 0 ;
+ public static final int Descending = 1 ;
+
+ FileNameComparator( int order )
+ {
+ order_ = order;
+ }
+
+ public int compare(String f, String f2)
+ {
+ if( order_ == 1 )
+ return ColumnFamilyStore.getIndexFromFileName(f2) - ColumnFamilyStore.getIndexFromFileName(f);
+ else
+ return ColumnFamilyStore.getIndexFromFileName(f) - ColumnFamilyStore.getIndexFromFileName(f2);
+ }
}
\ No newline at end of file
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/FileStructComparator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/FileStructComparator.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/FileStructComparator.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/FileStructComparator.java Thu Jul 30 15:30:21 2009
@@ -1,31 +1,31 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-package org.apache.cassandra.db;
-
-import java.util.Comparator;
-
-import org.apache.cassandra.io.FileStruct;
-
-class FileStructComparator implements Comparator<FileStruct>
-{
- public int compare(FileStruct f, FileStruct f2)
- {
- return f.getFileName().compareTo(f2.getFileName());
- }
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.db;
+
+import java.util.Comparator;
+
+import org.apache.cassandra.io.FileStruct;
+
+class FileStructComparator implements Comparator<FileStruct>
+{
+ public int compare(FileStruct f, FileStruct f2)
+ {
+ return f.getFileName().compareTo(f2.getFileName());
+ }
}
\ No newline at end of file