You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/07/30 17:30:27 UTC
svn commit: r799331 [10/29] - in /incubator/cassandra/trunk: ./
src/java/org/apache/cassandra/concurrent/
src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/
src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/gms/ src/...
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Thu Jul 30 15:30:21 2009
@@ -1,768 +1,768 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db;
-
-import java.util.*;
-import java.io.IOException;
-import java.io.File;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.ExecutionException;
-
-import org.apache.commons.collections.IteratorUtils;
-import org.apache.commons.collections.Predicate;
-import org.apache.commons.lang.ArrayUtils;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.dht.BootstrapInitiateMessage;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.io.SSTableReader;
-import org.apache.cassandra.io.FileStruct;
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.net.io.IStreamComplete;
-import org.apache.cassandra.net.io.StreamContextManager;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.*;
-import org.apache.cassandra.db.filter.*;
-
-import org.apache.log4j.Logger;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
-*/
-
-public class Table
-{
- public static final String SYSTEM_TABLE = "system";
-
- private static Logger logger_ = Logger.getLogger(Table.class);
- private static final String SNAPSHOT_SUBDIR_NAME = "snapshots";
-
- /*
- * This class represents the metadata of this Table. The metadata
- * is basically the column family name and the ID associated with
- * this column family. We use this ID in the Commit Log header to
- * determine when a log file that has been rolled can be deleted.
- */
- public static class TableMetadata
- {
- private static HashMap<String,TableMetadata> tableMetadataMap_ = new HashMap<String,TableMetadata>();
- private static Map<Integer, String> idCfMap_ = new HashMap<Integer, String>();
- static
- {
- try
- {
- DatabaseDescriptor.storeMetadata();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- public static synchronized Table.TableMetadata instance(String tableName) throws IOException
- {
- if ( tableMetadataMap_.get(tableName) == null )
- {
- tableMetadataMap_.put(tableName, new Table.TableMetadata());
- }
- return tableMetadataMap_.get(tableName);
- }
-
- /* The mapping between column family and the column type. */
- private Map<String, String> cfTypeMap_ = new HashMap<String, String>();
- private Map<String, Integer> cfIdMap_ = new HashMap<String, Integer>();
-
- public void add(String cf, int id)
- {
- add(cf, id, "Standard");
- }
-
- public void add(String cf, int id, String type)
- {
- if (logger_.isDebugEnabled())
- logger_.debug("adding " + cf + " as " + id);
- assert !idCfMap_.containsKey(id);
- cfIdMap_.put(cf, id);
- idCfMap_.put(id, cf);
- cfTypeMap_.put(cf, type);
- }
-
- public boolean isEmpty()
- {
- return cfIdMap_.isEmpty();
- }
-
- int getColumnFamilyId(String columnFamily)
- {
- return cfIdMap_.get(columnFamily);
- }
-
- public static String getColumnFamilyName(int id)
- {
- return idCfMap_.get(id);
- }
-
- String getColumnFamilyType(String cfName)
- {
- return cfTypeMap_.get(cfName);
- }
-
- Set<String> getColumnFamilies()
- {
- return cfIdMap_.keySet();
- }
-
- int size()
- {
- return cfIdMap_.size();
- }
-
- boolean isValidColumnFamily(String cfName)
- {
- return cfIdMap_.containsKey(cfName);
- }
-
- public String toString()
- {
- StringBuilder sb = new StringBuilder("");
- Set<String> cfNames = cfIdMap_.keySet();
-
- for ( String cfName : cfNames )
- {
- sb.append(cfName);
- sb.append("---->");
- sb.append(cfIdMap_.get(cfName));
- sb.append(System.getProperty("line.separator"));
- }
-
- return sb.toString();
- }
-
- public static int getColumnFamilyCount()
- {
- return idCfMap_.size();
- }
- }
-
- /**
- * This is the callback handler that is invoked when we have
- * completely been bootstrapped for a single file by a remote host.
- */
- public static class BootstrapCompletionHandler implements IStreamComplete
- {
- public void onStreamCompletion(String host, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus) throws IOException
- {
- /* Parse the stream context and the file to the list of SSTables in the associated Column Family Store. */
- if (streamContext.getTargetFile().contains("-Data.db"))
- {
- File file = new File( streamContext.getTargetFile() );
- String fileName = file.getName();
- String [] temp = null;
- String tableName;
- temp = fileName.split("-");
- tableName = temp[0];
- /*
- * If the file is a Data File we need to load the indicies associated
- * with this file. We also need to cache the file name in the SSTables
- * list of the associated Column Family. Also merge the CBF into the
- * sampler.
- */
- SSTableReader sstable = SSTableReader.open(streamContext.getTargetFile());
- if (logger_.isDebugEnabled())
- logger_.debug("Merging the counting bloom filter in the sampler ...");
- String[] peices = FBUtilities.strip(fileName, "-");
- Table.open(peices[0]).getColumnFamilyStore(peices[1]).addToList(sstable);
- }
-
- EndPoint to = new EndPoint(host, DatabaseDescriptor.getStoragePort());
- if (logger_.isDebugEnabled())
- logger_.debug("Sending a bootstrap terminate message with " + streamStatus + " to " + to);
- /* Send a StreamStatusMessage object which may require the source node to re-stream certain files. */
- StreamContextManager.StreamStatusMessage streamStatusMessage = new StreamContextManager.StreamStatusMessage(streamStatus);
- Message message = StreamContextManager.StreamStatusMessage.makeStreamStatusMessage(streamStatusMessage);
- MessagingService.getMessagingInstance().sendOneWay(message, to);
- }
- }
-
- public static class BootStrapInitiateVerbHandler implements IVerbHandler
- {
- /*
- * Here we handle the BootstrapInitiateMessage. Here we get the
- * array of StreamContexts. We get file names for the column
- * families associated with the files and replace them with the
- * file names as obtained from the column family store on the
- * receiving end.
- */
- public void doVerb(Message message)
- {
- byte[] body = message.getMessageBody();
- DataInputBuffer bufIn = new DataInputBuffer();
- bufIn.reset(body, body.length);
-
- try
- {
- BootstrapInitiateMessage biMsg = BootstrapInitiateMessage.serializer().deserialize(bufIn);
- StreamContextManager.StreamContext[] streamContexts = biMsg.getStreamContext();
-
- Map<String, String> fileNames = getNewNames(streamContexts);
- /*
- * For each of stream context's in the incoming message
- * generate the new file names and store the new file names
- * in the StreamContextManager.
- */
- for (StreamContextManager.StreamContext streamContext : streamContexts )
- {
- StreamContextManager.StreamStatus streamStatus = new StreamContextManager.StreamStatus(streamContext.getTargetFile(), streamContext.getExpectedBytes() );
- File sourceFile = new File( streamContext.getTargetFile() );
- String[] peices = FBUtilities.strip(sourceFile.getName(), "-");
- String newFileName = fileNames.get( peices[1] + "-" + peices[2] );
-
- String file = DatabaseDescriptor.getDataFileLocationForTable(streamContext.getTable()) + File.separator + newFileName + "-Data.db";
- if (logger_.isDebugEnabled())
- logger_.debug("Received Data from : " + message.getFrom() + " " + streamContext.getTargetFile() + " " + file);
- streamContext.setTargetFile(file);
- addStreamContext(message.getFrom().getHost(), streamContext, streamStatus);
- }
-
- StreamContextManager.registerStreamCompletionHandler(message.getFrom().getHost(), new Table.BootstrapCompletionHandler());
- /* Send a bootstrap initiation done message to execute on default stage. */
- if (logger_.isDebugEnabled())
- logger_.debug("Sending a bootstrap initiate done message ...");
- Message doneMessage = new Message( StorageService.getLocalStorageEndPoint(), "", StorageService.bootStrapInitiateDoneVerbHandler_, new byte[0] );
- MessagingService.getMessagingInstance().sendOneWay(doneMessage, message.getFrom());
- }
- catch ( IOException ex )
- {
- logger_.info(LogUtil.throwableToString(ex));
- }
- }
-
- private Map<String, String> getNewNames(StreamContextManager.StreamContext[] streamContexts) throws IOException
- {
- /*
- * Mapping for each file with unique CF-i ---> new file name. For eg.
- * for a file with name <Table>-<CF>-<i>-Data.db there is a corresponding
- * <Table>-<CF>-<i>-Index.db. We maintain a mapping from <CF>-<i> to a newly
- * generated file name.
- */
- Map<String, String> fileNames = new HashMap<String, String>();
- /* Get the distinct entries from StreamContexts i.e have one entry per Data/Index file combination */
- Set<String> distinctEntries = new HashSet<String>();
- for ( StreamContextManager.StreamContext streamContext : streamContexts )
- {
- String[] peices = FBUtilities.strip(streamContext.getTargetFile(), "-");
- distinctEntries.add(peices[0] + "-" + peices[1] + "-" + peices[2]);
- }
-
- /* Generate unique file names per entry */
- for ( String distinctEntry : distinctEntries )
- {
- String tableName;
- String[] peices = FBUtilities.strip(distinctEntry, "-");
- tableName = peices[0];
- Table table = Table.open( tableName );
- Map<String, ColumnFamilyStore> columnFamilyStores = table.getColumnFamilyStores();
-
- ColumnFamilyStore cfStore = columnFamilyStores.get(peices[1]);
- if (logger_.isDebugEnabled())
- logger_.debug("Generating file name for " + distinctEntry + " ...");
- fileNames.put(distinctEntry, cfStore.getNextFileName());
- }
-
- return fileNames;
- }
-
- private void addStreamContext(String host, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus)
- {
- if (logger_.isDebugEnabled())
- logger_.debug("Adding stream context " + streamContext + " for " + host + " ...");
- StreamContextManager.addStreamContext(host, streamContext, streamStatus);
- }
- }
-
- /* Used to lock the factory for creation of Table instance */
- private static Lock createLock_ = new ReentrantLock();
- private static Map<String, Table> instances_ = new HashMap<String, Table>();
- /* Table name. */
- private String table_;
- /* Handle to the Table Metadata */
- private Table.TableMetadata tableMetadata_;
- /* ColumnFamilyStore per column family */
- private Map<String, ColumnFamilyStore> columnFamilyStores_ = new HashMap<String, ColumnFamilyStore>();
- // cache application CFs since Range queries ask for them a _lot_
- private SortedSet<String> applicationColumnFamilies_;
-
- public static Table open(String table) throws IOException
- {
- Table tableInstance = instances_.get(table);
- /*
- * Read the config and figure the column families for this table.
- * Set the isConfigured flag so that we do not read config all the
- * time.
- */
- if ( tableInstance == null )
- {
- Table.createLock_.lock();
- try
- {
- if ( tableInstance == null )
- {
- tableInstance = new Table(table);
- instances_.put(table, tableInstance);
- }
- }
- finally
- {
- createLock_.unlock();
- }
- }
- return tableInstance;
- }
-
- public Set<String> getColumnFamilies()
- {
- return tableMetadata_.getColumnFamilies();
- }
-
- Map<String, ColumnFamilyStore> getColumnFamilyStores()
- {
- return columnFamilyStores_;
- }
-
- public ColumnFamilyStore getColumnFamilyStore(String cfName)
- {
- return columnFamilyStores_.get(cfName);
- }
-
- /*
- * This method is called to obtain statistics about
- * the table. It will return statistics about all
- * the column families that make up this table.
- */
- public String tableStats(String newLineSeparator, java.text.DecimalFormat df)
- {
- StringBuilder sb = new StringBuilder();
- sb.append(table_ + " statistics :");
- sb.append(newLineSeparator);
- int oldLength = sb.toString().length();
-
- Set<String> cfNames = columnFamilyStores_.keySet();
- for ( String cfName : cfNames )
- {
- ColumnFamilyStore cfStore = columnFamilyStores_.get(cfName);
- sb.append(cfStore.cfStats(newLineSeparator));
- }
- int newLength = sb.toString().length();
-
- /* Don't show anything if there is nothing to show. */
- if ( newLength == oldLength )
- return "";
-
- return sb.toString();
- }
-
- public void onStart() throws IOException
- {
- for (String columnFamily : tableMetadata_.getColumnFamilies())
- {
- columnFamilyStores_.get(columnFamily).onStart();
- }
- }
-
- /**
- * Do a cleanup of keys that do not belong locally.
- */
- public void forceCleanup()
- {
- Set<String> columnFamilies = tableMetadata_.getColumnFamilies();
- for ( String columnFamily : columnFamilies )
- {
- ColumnFamilyStore cfStore = columnFamilyStores_.get( columnFamily );
- if ( cfStore != null )
- cfStore.forceCleanup();
- }
- }
-
-
- /**
- * Take a snapshot of the entire set of column families with a given timestamp.
- *
- * @param clientSuppliedName the tag associated with the name of the snapshot. This
- * value can be null.
- */
- public void snapshot(String clientSuppliedName) throws IOException
- {
- String snapshotName = Long.toString(System.currentTimeMillis());
- if (clientSuppliedName != null && !clientSuppliedName.equals(""))
- {
- snapshotName = snapshotName + "-" + clientSuppliedName;
- }
-
- for (ColumnFamilyStore cfStore : columnFamilyStores_.values())
- {
- cfStore.snapshot(snapshotName);
- }
- }
-
-
- /**
- * Clear all the snapshots for a given table.
- */
- public void clearSnapshot() throws IOException
- {
- for (String dataDirPath : DatabaseDescriptor.getAllDataFileLocations())
- {
- String snapshotPath = dataDirPath + File.separator + table_ + File.separator + SNAPSHOT_SUBDIR_NAME;
- File snapshotDir = new File(snapshotPath);
- if (snapshotDir.exists())
- {
- if (logger_.isDebugEnabled())
- logger_.debug("Removing snapshot directory " + snapshotPath);
- if (!FileUtils.deleteDir(snapshotDir))
- throw new IOException("Could not clear snapshot directory " + snapshotPath);
- }
- }
- }
-
- /*
- * This method is invoked only during a bootstrap process. We basically
- * do a complete compaction since we can figure out based on the ranges
- * whether the files need to be split.
- */
- public boolean forceCompaction(List<Range> ranges, EndPoint target, List<String> fileList)
- {
- boolean result = true;
- Set<String> columnFamilies = tableMetadata_.getColumnFamilies();
- for ( String columnFamily : columnFamilies )
- {
- if ( !isApplicationColumnFamily(columnFamily) )
- continue;
-
- ColumnFamilyStore cfStore = columnFamilyStores_.get( columnFamily );
- if ( cfStore != null )
- {
- /* Counting Bloom Filter for the Column Family */
- cfStore.forceCompaction(ranges, target, 0, fileList);
- }
- }
- return result;
- }
-
- /*
- * This method is an ADMIN operation to force compaction
- * of all SSTables on disk.
- */
- public void forceCompaction()
- {
- Set<String> columnFamilies = tableMetadata_.getColumnFamilies();
- for ( String columnFamily : columnFamilies )
- {
- ColumnFamilyStore cfStore = columnFamilyStores_.get( columnFamily );
- if ( cfStore != null )
- MinorCompactionManager.instance().submitMajor(cfStore, 0);
- }
- }
-
- /*
- * Get the list of all SSTables on disk. Not safe unless you aquire the CFS readlocks!
- */
- public List<SSTableReader> getAllSSTablesOnDisk()
- {
- List<SSTableReader> list = new ArrayList<SSTableReader>();
- Set<String> columnFamilies = tableMetadata_.getColumnFamilies();
- for ( String columnFamily : columnFamilies )
- {
- ColumnFamilyStore cfStore = columnFamilyStores_.get( columnFamily );
- if ( cfStore != null )
- list.addAll(cfStore.getSSTables());
- }
- return list;
- }
-
- private Table(String table) throws IOException
- {
- table_ = table;
- tableMetadata_ = Table.TableMetadata.instance(table);
- for (String columnFamily : tableMetadata_.getColumnFamilies())
- {
- columnFamilyStores_.put(columnFamily, ColumnFamilyStore.getColumnFamilyStore(table, columnFamily));
- }
- }
-
- boolean isApplicationColumnFamily(String columnFamily)
- {
- return DatabaseDescriptor.isApplicationColumnFamily(columnFamily);
- }
-
- int getColumnFamilyId(String columnFamily)
- {
- return tableMetadata_.getColumnFamilyId(columnFamily);
- }
-
- boolean isValidColumnFamily(String columnFamily)
- {
- return tableMetadata_.isValidColumnFamily(columnFamily);
- }
-
- /**
- * Selects the row associated with the given key.
- */
- @Deprecated // CF should be our atom of work, not Row
- public Row get(String key) throws IOException
- {
- Row row = new Row(table_, key);
- for (String columnFamily : getColumnFamilies())
- {
- ColumnFamily cf = get(key, columnFamily);
- if (cf != null)
- {
- row.addColumnFamily(cf);
- }
- }
- return row;
- }
-
-
- /**
- * Selects the specified column family for the specified key.
- */
- @Deprecated // single CFs could be larger than memory
- public ColumnFamily get(String key, String cfName) throws IOException
- {
- ColumnFamilyStore cfStore = columnFamilyStores_.get(cfName);
- assert cfStore != null : "Column family " + cfName + " has not been defined";
- return cfStore.getColumnFamily(new IdentityQueryFilter(key, new QueryPath(cfName)));
- }
-
- /**
- * Selects only the specified column family for the specified key.
- */
- @Deprecated
- public Row getRow(String key, String cfName) throws IOException
- {
- Row row = new Row(table_, key);
- ColumnFamily columnFamily = get(key, cfName);
- if ( columnFamily != null )
- row.addColumnFamily(columnFamily);
- return row;
- }
-
- public Row getRow(QueryFilter filter) throws IOException
- {
- ColumnFamilyStore cfStore = columnFamilyStores_.get(filter.getColumnFamilyName());
- Row row = new Row(table_, filter.key);
- ColumnFamily columnFamily = cfStore.getColumnFamily(filter);
- if (columnFamily != null)
- row.addColumnFamily(columnFamily);
- return row;
- }
-
- /**
- * This method adds the row to the Commit Log associated with this table.
- * Once this happens the data associated with the individual column families
- * is also written to the column family store's memtable.
- */
- void apply(Row row) throws IOException
- {
- CommitLog.CommitLogContext cLogCtx = CommitLog.open().add(row);
-
- for (ColumnFamily columnFamily : row.getColumnFamilies())
- {
- ColumnFamilyStore cfStore = columnFamilyStores_.get(columnFamily.name());
- cfStore.apply(row.key(), columnFamily, cLogCtx);
- }
- }
-
- void applyNow(Row row) throws IOException
- {
- String key = row.key();
- for (ColumnFamily columnFamily : row.getColumnFamilies())
- {
- ColumnFamilyStore cfStore = columnFamilyStores_.get(columnFamily.name());
- cfStore.applyNow( key, columnFamily );
- }
- }
-
- public void flush(boolean fRecovery) throws IOException
- {
- Set<String> cfNames = columnFamilyStores_.keySet();
- for ( String cfName : cfNames )
- {
- if (fRecovery) {
- columnFamilyStores_.get(cfName).flushMemtableOnRecovery();
- } else {
- columnFamilyStores_.get(cfName).forceFlush();
- }
- }
- }
-
- // for binary load path. skips commitlog.
- void load(Row row) throws IOException
- {
- String key = row.key();
-
- for (ColumnFamily columnFamily : row.getColumnFamilies())
- {
- Collection<IColumn> columns = columnFamily.getSortedColumns();
- for(IColumn column : columns)
- {
- ColumnFamilyStore cfStore = columnFamilyStores_.get(column.name());
- cfStore.applyBinary(key, column.value());
- }
- }
- row.clear();
- }
-
- public SortedSet<String> getApplicationColumnFamilies()
- {
- if (applicationColumnFamilies_ == null)
- {
- applicationColumnFamilies_ = new TreeSet<String>();
- for (String cfName : getColumnFamilies())
- {
- if (DatabaseDescriptor.isApplicationColumnFamily(cfName))
- {
- applicationColumnFamilies_.add(cfName);
- }
- }
- }
- return applicationColumnFamilies_;
- }
-
- /**
- * @param startWith key to start with, inclusive. empty string = start at beginning.
- * @param stopAt key to stop at, inclusive. empty string = stop only when keys are exhausted.
- * @param maxResults
- * @return list of keys between startWith and stopAt
- */
- public List<String> getKeyRange(String columnFamily, final String startWith, final String stopAt, int maxResults)
- throws IOException, ExecutionException, InterruptedException
- {
- assert getColumnFamilyStore(columnFamily) != null : columnFamily;
-
- getColumnFamilyStore(columnFamily).getReadLock().lock();
- try
- {
- return getKeyRangeUnsafe(columnFamily, startWith, stopAt, maxResults);
- }
- finally
- {
- getColumnFamilyStore(columnFamily).getReadLock().unlock();
- }
- }
-
- private List<String> getKeyRangeUnsafe(final String cfName, final String startWith, final String stopAt, int maxResults) throws IOException, ExecutionException, InterruptedException
- {
- // (OPP key decoration is a no-op so using the "decorated" comparator against raw keys is fine)
- final Comparator<String> comparator = StorageService.getPartitioner().getDecoratedKeyComparator();
-
- // create a CollatedIterator that will return unique keys from different sources
- // (current memtable, historical memtables, and SSTables) in the correct order.
- List<Iterator<String>> iterators = new ArrayList<Iterator<String>>();
- ColumnFamilyStore cfs = getColumnFamilyStore(cfName);
-
- // we iterate through memtables with a priority queue to avoid more sorting than necessary.
- // this predicate throws out the keys before the start of our range.
- Predicate p = new Predicate()
- {
- public boolean evaluate(Object key)
- {
- String st = (String)key;
- return comparator.compare(startWith, st) <= 0 && (stopAt.isEmpty() || comparator.compare(st, stopAt) <= 0);
- }
- };
-
- // current memtable keys. have to go through the CFS api for locking.
- iterators.add(IteratorUtils.filteredIterator(cfs.memtableKeyIterator(), p));
- // historical memtables
- for (Memtable memtable : ColumnFamilyStore.getUnflushedMemtables(cfName))
- {
- iterators.add(IteratorUtils.filteredIterator(Memtable.getKeyIterator(memtable.getKeys()), p));
- }
-
- // sstables
- for (SSTableReader sstable : cfs.getSSTables())
- {
- FileStruct fs = sstable.getFileStruct();
- fs.seekTo(startWith);
- iterators.add(fs);
- }
-
- Iterator<String> collated = IteratorUtils.collatedIterator(comparator, iterators);
- Iterable<String> reduced = new ReducingIterator<String>(collated) {
- String current;
-
- public void reduce(String current)
- {
- this.current = current;
- }
-
- protected String getReduced()
- {
- return current;
- }
- };
-
- try
- {
- // pull keys out of the CollatedIterator. checking tombstone status is expensive,
- // so we set an arbitrary limit on how many we'll do at once.
- List<String> keys = new ArrayList<String>();
- for (String current : reduced)
- {
- if (!stopAt.isEmpty() && comparator.compare(stopAt, current) < 0)
- {
- break;
- }
- // make sure there is actually non-tombstone content associated w/ this key
- // TODO record the key source(s) somehow and only check that source (e.g., memtable or sstable)
- QueryFilter filter = new SliceQueryFilter(current, new QueryPath(cfName), ArrayUtils.EMPTY_BYTE_ARRAY, ArrayUtils.EMPTY_BYTE_ARRAY, true, 1);
- if (cfs.getColumnFamily(filter, Integer.MAX_VALUE) != null)
- {
- keys.add(current);
- }
- if (keys.size() >= maxResults)
- {
- break;
- }
- }
- return keys;
- }
- finally
- {
- for (Iterator iter : iterators)
- {
- if (iter instanceof FileStruct)
- {
- ((FileStruct)iter).close();
- }
- }
- }
- }
-
- public static String getSnapshotPath(String dataDirPath, String tableName, String snapshotName)
- {
- return dataDirPath + File.separator + tableName + File.separator + SNAPSHOT_SUBDIR_NAME + File.separator + snapshotName;
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.*;
+import java.io.IOException;
+import java.io.File;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.apache.commons.collections.Predicate;
+import org.apache.commons.lang.ArrayUtils;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.BootstrapInitiateMessage;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.SSTableReader;
+import org.apache.cassandra.io.FileStruct;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.io.IStreamComplete;
+import org.apache.cassandra.net.io.StreamContextManager;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.*;
+import org.apache.cassandra.db.filter.*;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+*/
+
+public class Table
+{
+ public static final String SYSTEM_TABLE = "system";
+
+ private static Logger logger_ = Logger.getLogger(Table.class);
+ private static final String SNAPSHOT_SUBDIR_NAME = "snapshots";
+
+ /*
+ * This class represents the metadata of this Table. The metadata
+ * is basically the column family name and the ID associated with
+ * this column family. We use this ID in the Commit Log header to
+ * determine when a log file that has been rolled can be deleted.
+ */
+ public static class TableMetadata
+ {
+ private static HashMap<String,TableMetadata> tableMetadataMap_ = new HashMap<String,TableMetadata>();
+ private static Map<Integer, String> idCfMap_ = new HashMap<Integer, String>();
+ static
+ {
+ try
+ {
+ DatabaseDescriptor.storeMetadata();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static synchronized Table.TableMetadata instance(String tableName) throws IOException
+ {
+ if ( tableMetadataMap_.get(tableName) == null )
+ {
+ tableMetadataMap_.put(tableName, new Table.TableMetadata());
+ }
+ return tableMetadataMap_.get(tableName);
+ }
+
+ /* The mapping between column family and the column type. */
+ private Map<String, String> cfTypeMap_ = new HashMap<String, String>();
+ private Map<String, Integer> cfIdMap_ = new HashMap<String, Integer>();
+
+ public void add(String cf, int id)
+ {
+ add(cf, id, "Standard");
+ }
+
+ public void add(String cf, int id, String type)
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug("adding " + cf + " as " + id);
+ assert !idCfMap_.containsKey(id);
+ cfIdMap_.put(cf, id);
+ idCfMap_.put(id, cf);
+ cfTypeMap_.put(cf, type);
+ }
+
+ public boolean isEmpty()
+ {
+ return cfIdMap_.isEmpty();
+ }
+
+ int getColumnFamilyId(String columnFamily)
+ {
+ return cfIdMap_.get(columnFamily);
+ }
+
+ public static String getColumnFamilyName(int id)
+ {
+ return idCfMap_.get(id);
+ }
+
+ String getColumnFamilyType(String cfName)
+ {
+ return cfTypeMap_.get(cfName);
+ }
+
+ Set<String> getColumnFamilies()
+ {
+ return cfIdMap_.keySet();
+ }
+
+ int size()
+ {
+ return cfIdMap_.size();
+ }
+
+ boolean isValidColumnFamily(String cfName)
+ {
+ return cfIdMap_.containsKey(cfName);
+ }
+
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder("");
+ Set<String> cfNames = cfIdMap_.keySet();
+
+ for ( String cfName : cfNames )
+ {
+ sb.append(cfName);
+ sb.append("---->");
+ sb.append(cfIdMap_.get(cfName));
+ sb.append(System.getProperty("line.separator"));
+ }
+
+ return sb.toString();
+ }
+
+ public static int getColumnFamilyCount()
+ {
+ return idCfMap_.size();
+ }
+ }
+
+ /**
+ * This is the callback handler that is invoked when we have
+ * completely been bootstrapped for a single file by a remote host.
+ */
+ public static class BootstrapCompletionHandler implements IStreamComplete
+ {
+ public void onStreamCompletion(String host, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus) throws IOException
+ {
+ /* Parse the stream context and the file to the list of SSTables in the associated Column Family Store. */
+ if (streamContext.getTargetFile().contains("-Data.db"))
+ {
+ File file = new File( streamContext.getTargetFile() );
+ String fileName = file.getName();
+ String [] temp = null;
+ String tableName;
+ temp = fileName.split("-");
+ tableName = temp[0];
+ /*
+ * If the file is a Data File we need to load the indicies associated
+ * with this file. We also need to cache the file name in the SSTables
+ * list of the associated Column Family. Also merge the CBF into the
+ * sampler.
+ */
+ SSTableReader sstable = SSTableReader.open(streamContext.getTargetFile());
+ if (logger_.isDebugEnabled())
+ logger_.debug("Merging the counting bloom filter in the sampler ...");
+ String[] peices = FBUtilities.strip(fileName, "-");
+ Table.open(peices[0]).getColumnFamilyStore(peices[1]).addToList(sstable);
+ }
+
+ EndPoint to = new EndPoint(host, DatabaseDescriptor.getStoragePort());
+ if (logger_.isDebugEnabled())
+ logger_.debug("Sending a bootstrap terminate message with " + streamStatus + " to " + to);
+ /* Send a StreamStatusMessage object which may require the source node to re-stream certain files. */
+ StreamContextManager.StreamStatusMessage streamStatusMessage = new StreamContextManager.StreamStatusMessage(streamStatus);
+ Message message = StreamContextManager.StreamStatusMessage.makeStreamStatusMessage(streamStatusMessage);
+ MessagingService.getMessagingInstance().sendOneWay(message, to);
+ }
+ }
+
+ public static class BootStrapInitiateVerbHandler implements IVerbHandler
+ {
+ /*
+ * Here we handle the BootstrapInitiateMessage. Here we get the
+ * array of StreamContexts. We get file names for the column
+ * families associated with the files and replace them with the
+ * file names as obtained from the column family store on the
+ * receiving end.
+ */
+ public void doVerb(Message message)
+ {
+ byte[] body = message.getMessageBody();
+ DataInputBuffer bufIn = new DataInputBuffer();
+ bufIn.reset(body, body.length);
+
+ try
+ {
+ BootstrapInitiateMessage biMsg = BootstrapInitiateMessage.serializer().deserialize(bufIn);
+ StreamContextManager.StreamContext[] streamContexts = biMsg.getStreamContext();
+
+ Map<String, String> fileNames = getNewNames(streamContexts);
+ /*
+ * For each of stream context's in the incoming message
+ * generate the new file names and store the new file names
+ * in the StreamContextManager.
+ */
+ for (StreamContextManager.StreamContext streamContext : streamContexts )
+ {
+ StreamContextManager.StreamStatus streamStatus = new StreamContextManager.StreamStatus(streamContext.getTargetFile(), streamContext.getExpectedBytes() );
+ File sourceFile = new File( streamContext.getTargetFile() );
+ String[] peices = FBUtilities.strip(sourceFile.getName(), "-");
+ String newFileName = fileNames.get( peices[1] + "-" + peices[2] );
+
+ String file = DatabaseDescriptor.getDataFileLocationForTable(streamContext.getTable()) + File.separator + newFileName + "-Data.db";
+ if (logger_.isDebugEnabled())
+ logger_.debug("Received Data from : " + message.getFrom() + " " + streamContext.getTargetFile() + " " + file);
+ streamContext.setTargetFile(file);
+ addStreamContext(message.getFrom().getHost(), streamContext, streamStatus);
+ }
+
+ StreamContextManager.registerStreamCompletionHandler(message.getFrom().getHost(), new Table.BootstrapCompletionHandler());
+ /* Send a bootstrap initiation done message to execute on default stage. */
+ if (logger_.isDebugEnabled())
+ logger_.debug("Sending a bootstrap initiate done message ...");
+ Message doneMessage = new Message( StorageService.getLocalStorageEndPoint(), "", StorageService.bootStrapInitiateDoneVerbHandler_, new byte[0] );
+ MessagingService.getMessagingInstance().sendOneWay(doneMessage, message.getFrom());
+ }
+ catch ( IOException ex )
+ {
+ logger_.info(LogUtil.throwableToString(ex));
+ }
+ }
+
+ private Map<String, String> getNewNames(StreamContextManager.StreamContext[] streamContexts) throws IOException
+ {
+ /*
+ * Mapping for each file with unique CF-i ---> new file name. For eg.
+ * for a file with name <Table>-<CF>-<i>-Data.db there is a corresponding
+ * <Table>-<CF>-<i>-Index.db. We maintain a mapping from <CF>-<i> to a newly
+ * generated file name.
+ */
+ Map<String, String> fileNames = new HashMap<String, String>();
+ /* Get the distinct entries from StreamContexts i.e have one entry per Data/Index file combination */
+ Set<String> distinctEntries = new HashSet<String>();
+ for ( StreamContextManager.StreamContext streamContext : streamContexts )
+ {
+ String[] peices = FBUtilities.strip(streamContext.getTargetFile(), "-");
+ distinctEntries.add(peices[0] + "-" + peices[1] + "-" + peices[2]);
+ }
+
+ /* Generate unique file names per entry */
+ for ( String distinctEntry : distinctEntries )
+ {
+ String tableName;
+ String[] peices = FBUtilities.strip(distinctEntry, "-");
+ tableName = peices[0];
+ Table table = Table.open( tableName );
+ Map<String, ColumnFamilyStore> columnFamilyStores = table.getColumnFamilyStores();
+
+ ColumnFamilyStore cfStore = columnFamilyStores.get(peices[1]);
+ if (logger_.isDebugEnabled())
+ logger_.debug("Generating file name for " + distinctEntry + " ...");
+ fileNames.put(distinctEntry, cfStore.getNextFileName());
+ }
+
+ return fileNames;
+ }
+
+ private void addStreamContext(String host, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus)
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug("Adding stream context " + streamContext + " for " + host + " ...");
+ StreamContextManager.addStreamContext(host, streamContext, streamStatus);
+ }
+ }
+
+ /* Used to lock the factory for creation of Table instance */
+ private static Lock createLock_ = new ReentrantLock();
+ private static Map<String, Table> instances_ = new HashMap<String, Table>();
+ /* Table name. */
+ private String table_;
+ /* Handle to the Table Metadata */
+ private Table.TableMetadata tableMetadata_;
+ /* ColumnFamilyStore per column family */
+ private Map<String, ColumnFamilyStore> columnFamilyStores_ = new HashMap<String, ColumnFamilyStore>();
+ // cache application CFs since Range queries ask for them a _lot_
+ private SortedSet<String> applicationColumnFamilies_;
+
+ public static Table open(String table) throws IOException
+ {
+ Table tableInstance = instances_.get(table);
+ /*
+ * Read the config and figure the column families for this table.
+ * Set the isConfigured flag so that we do not read config all the
+ * time.
+ */
+ if ( tableInstance == null )
+ {
+ Table.createLock_.lock();
+ try
+ {
+ if ( tableInstance == null )
+ {
+ tableInstance = new Table(table);
+ instances_.put(table, tableInstance);
+ }
+ }
+ finally
+ {
+ createLock_.unlock();
+ }
+ }
+ return tableInstance;
+ }
+
+ public Set<String> getColumnFamilies()
+ {
+ return tableMetadata_.getColumnFamilies();
+ }
+
+ Map<String, ColumnFamilyStore> getColumnFamilyStores()
+ {
+ return columnFamilyStores_;
+ }
+
+ public ColumnFamilyStore getColumnFamilyStore(String cfName)
+ {
+ return columnFamilyStores_.get(cfName);
+ }
+
+ /*
+ * This method is called to obtain statistics about
+ * the table. It will return statistics about all
+ * the column families that make up this table.
+ */
+ public String tableStats(String newLineSeparator, java.text.DecimalFormat df)
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append(table_ + " statistics :");
+ sb.append(newLineSeparator);
+ int oldLength = sb.toString().length();
+
+ Set<String> cfNames = columnFamilyStores_.keySet();
+ for ( String cfName : cfNames )
+ {
+ ColumnFamilyStore cfStore = columnFamilyStores_.get(cfName);
+ sb.append(cfStore.cfStats(newLineSeparator));
+ }
+ int newLength = sb.toString().length();
+
+ /* Don't show anything if there is nothing to show. */
+ if ( newLength == oldLength )
+ return "";
+
+ return sb.toString();
+ }
+
+ public void onStart() throws IOException
+ {
+ for (String columnFamily : tableMetadata_.getColumnFamilies())
+ {
+ columnFamilyStores_.get(columnFamily).onStart();
+ }
+ }
+
+ /**
+ * Do a cleanup of keys that do not belong locally.
+ */
+ public void forceCleanup()
+ {
+ Set<String> columnFamilies = tableMetadata_.getColumnFamilies();
+ for ( String columnFamily : columnFamilies )
+ {
+ ColumnFamilyStore cfStore = columnFamilyStores_.get( columnFamily );
+ if ( cfStore != null )
+ cfStore.forceCleanup();
+ }
+ }
+
+
+ /**
+ * Take a snapshot of the entire set of column families with a given timestamp.
+ *
+ * @param clientSuppliedName the tag associated with the name of the snapshot. This
+ * value can be null.
+ */
+ public void snapshot(String clientSuppliedName) throws IOException
+ {
+ String snapshotName = Long.toString(System.currentTimeMillis());
+ if (clientSuppliedName != null && !clientSuppliedName.equals(""))
+ {
+ snapshotName = snapshotName + "-" + clientSuppliedName;
+ }
+
+ for (ColumnFamilyStore cfStore : columnFamilyStores_.values())
+ {
+ cfStore.snapshot(snapshotName);
+ }
+ }
+
+
+ /**
+ * Clear all the snapshots for a given table.
+ */
+ public void clearSnapshot() throws IOException
+ {
+ for (String dataDirPath : DatabaseDescriptor.getAllDataFileLocations())
+ {
+ String snapshotPath = dataDirPath + File.separator + table_ + File.separator + SNAPSHOT_SUBDIR_NAME;
+ File snapshotDir = new File(snapshotPath);
+ if (snapshotDir.exists())
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug("Removing snapshot directory " + snapshotPath);
+ if (!FileUtils.deleteDir(snapshotDir))
+ throw new IOException("Could not clear snapshot directory " + snapshotPath);
+ }
+ }
+ }
+
+ /*
+ * This method is invoked only during a bootstrap process. We basically
+ * do a complete compaction since we can figure out based on the ranges
+ * whether the files need to be split.
+ */
+ public boolean forceCompaction(List<Range> ranges, EndPoint target, List<String> fileList)
+ {
+ boolean result = true;
+ Set<String> columnFamilies = tableMetadata_.getColumnFamilies();
+ for ( String columnFamily : columnFamilies )
+ {
+ if ( !isApplicationColumnFamily(columnFamily) )
+ continue;
+
+ ColumnFamilyStore cfStore = columnFamilyStores_.get( columnFamily );
+ if ( cfStore != null )
+ {
+ /* Counting Bloom Filter for the Column Family */
+ cfStore.forceCompaction(ranges, target, 0, fileList);
+ }
+ }
+ return result;
+ }
+
+ /*
+ * This method is an ADMIN operation to force compaction
+ * of all SSTables on disk.
+ */
+ public void forceCompaction()
+ {
+ Set<String> columnFamilies = tableMetadata_.getColumnFamilies();
+ for ( String columnFamily : columnFamilies )
+ {
+ ColumnFamilyStore cfStore = columnFamilyStores_.get( columnFamily );
+ if ( cfStore != null )
+ MinorCompactionManager.instance().submitMajor(cfStore, 0);
+ }
+ }
+
+ /*
+ * Get the list of all SSTables on disk. Not safe unless you aquire the CFS readlocks!
+ */
+ public List<SSTableReader> getAllSSTablesOnDisk()
+ {
+ List<SSTableReader> list = new ArrayList<SSTableReader>();
+ Set<String> columnFamilies = tableMetadata_.getColumnFamilies();
+ for ( String columnFamily : columnFamilies )
+ {
+ ColumnFamilyStore cfStore = columnFamilyStores_.get( columnFamily );
+ if ( cfStore != null )
+ list.addAll(cfStore.getSSTables());
+ }
+ return list;
+ }
+
+ private Table(String table) throws IOException
+ {
+ table_ = table;
+ tableMetadata_ = Table.TableMetadata.instance(table);
+ for (String columnFamily : tableMetadata_.getColumnFamilies())
+ {
+ columnFamilyStores_.put(columnFamily, ColumnFamilyStore.getColumnFamilyStore(table, columnFamily));
+ }
+ }
+
+ boolean isApplicationColumnFamily(String columnFamily)
+ {
+ return DatabaseDescriptor.isApplicationColumnFamily(columnFamily);
+ }
+
+ int getColumnFamilyId(String columnFamily)
+ {
+ return tableMetadata_.getColumnFamilyId(columnFamily);
+ }
+
+ boolean isValidColumnFamily(String columnFamily)
+ {
+ return tableMetadata_.isValidColumnFamily(columnFamily);
+ }
+
+ /**
+ * Selects the row associated with the given key.
+ */
+ @Deprecated // CF should be our atom of work, not Row
+ public Row get(String key) throws IOException
+ {
+ Row row = new Row(table_, key);
+ for (String columnFamily : getColumnFamilies())
+ {
+ ColumnFamily cf = get(key, columnFamily);
+ if (cf != null)
+ {
+ row.addColumnFamily(cf);
+ }
+ }
+ return row;
+ }
+
+
+ /**
+ * Selects the specified column family for the specified key.
+ */
+ @Deprecated // single CFs could be larger than memory
+ public ColumnFamily get(String key, String cfName) throws IOException
+ {
+ ColumnFamilyStore cfStore = columnFamilyStores_.get(cfName);
+ assert cfStore != null : "Column family " + cfName + " has not been defined";
+ return cfStore.getColumnFamily(new IdentityQueryFilter(key, new QueryPath(cfName)));
+ }
+
+ /**
+ * Selects only the specified column family for the specified key.
+ */
+ @Deprecated
+ public Row getRow(String key, String cfName) throws IOException
+ {
+ Row row = new Row(table_, key);
+ ColumnFamily columnFamily = get(key, cfName);
+ if ( columnFamily != null )
+ row.addColumnFamily(columnFamily);
+ return row;
+ }
+
+ public Row getRow(QueryFilter filter) throws IOException
+ {
+ ColumnFamilyStore cfStore = columnFamilyStores_.get(filter.getColumnFamilyName());
+ Row row = new Row(table_, filter.key);
+ ColumnFamily columnFamily = cfStore.getColumnFamily(filter);
+ if (columnFamily != null)
+ row.addColumnFamily(columnFamily);
+ return row;
+ }
+
+ /**
+ * This method adds the row to the Commit Log associated with this table.
+ * Once this happens the data associated with the individual column families
+ * is also written to the column family store's memtable.
+ */
+ void apply(Row row) throws IOException
+ {
+ CommitLog.CommitLogContext cLogCtx = CommitLog.open().add(row);
+
+ for (ColumnFamily columnFamily : row.getColumnFamilies())
+ {
+ ColumnFamilyStore cfStore = columnFamilyStores_.get(columnFamily.name());
+ cfStore.apply(row.key(), columnFamily, cLogCtx);
+ }
+ }
+
+ void applyNow(Row row) throws IOException
+ {
+ String key = row.key();
+ for (ColumnFamily columnFamily : row.getColumnFamilies())
+ {
+ ColumnFamilyStore cfStore = columnFamilyStores_.get(columnFamily.name());
+ cfStore.applyNow( key, columnFamily );
+ }
+ }
+
+ public void flush(boolean fRecovery) throws IOException
+ {
+ Set<String> cfNames = columnFamilyStores_.keySet();
+ for ( String cfName : cfNames )
+ {
+ if (fRecovery) {
+ columnFamilyStores_.get(cfName).flushMemtableOnRecovery();
+ } else {
+ columnFamilyStores_.get(cfName).forceFlush();
+ }
+ }
+ }
+
+ // for binary load path. skips commitlog.
+ void load(Row row) throws IOException
+ {
+ String key = row.key();
+
+ for (ColumnFamily columnFamily : row.getColumnFamilies())
+ {
+ Collection<IColumn> columns = columnFamily.getSortedColumns();
+ for(IColumn column : columns)
+ {
+ ColumnFamilyStore cfStore = columnFamilyStores_.get(column.name());
+ cfStore.applyBinary(key, column.value());
+ }
+ }
+ row.clear();
+ }
+
+ public SortedSet<String> getApplicationColumnFamilies()
+ {
+ if (applicationColumnFamilies_ == null)
+ {
+ applicationColumnFamilies_ = new TreeSet<String>();
+ for (String cfName : getColumnFamilies())
+ {
+ if (DatabaseDescriptor.isApplicationColumnFamily(cfName))
+ {
+ applicationColumnFamilies_.add(cfName);
+ }
+ }
+ }
+ return applicationColumnFamilies_;
+ }
+
+ /**
+ * @param startWith key to start with, inclusive. empty string = start at beginning.
+ * @param stopAt key to stop at, inclusive. empty string = stop only when keys are exhausted.
+ * @param maxResults
+ * @return list of keys between startWith and stopAt
+ */
+ public List<String> getKeyRange(String columnFamily, final String startWith, final String stopAt, int maxResults)
+ throws IOException, ExecutionException, InterruptedException
+ {
+ assert getColumnFamilyStore(columnFamily) != null : columnFamily;
+
+ getColumnFamilyStore(columnFamily).getReadLock().lock();
+ try
+ {
+ return getKeyRangeUnsafe(columnFamily, startWith, stopAt, maxResults);
+ }
+ finally
+ {
+ getColumnFamilyStore(columnFamily).getReadLock().unlock();
+ }
+ }
+
+ private List<String> getKeyRangeUnsafe(final String cfName, final String startWith, final String stopAt, int maxResults) throws IOException, ExecutionException, InterruptedException
+ {
+ // (OPP key decoration is a no-op so using the "decorated" comparator against raw keys is fine)
+ final Comparator<String> comparator = StorageService.getPartitioner().getDecoratedKeyComparator();
+
+ // create a CollatedIterator that will return unique keys from different sources
+ // (current memtable, historical memtables, and SSTables) in the correct order.
+ List<Iterator<String>> iterators = new ArrayList<Iterator<String>>();
+ ColumnFamilyStore cfs = getColumnFamilyStore(cfName);
+
+ // we iterate through memtables with a priority queue to avoid more sorting than necessary.
+ // this predicate throws out the keys before the start of our range.
+ Predicate p = new Predicate()
+ {
+ public boolean evaluate(Object key)
+ {
+ String st = (String)key;
+ return comparator.compare(startWith, st) <= 0 && (stopAt.isEmpty() || comparator.compare(st, stopAt) <= 0);
+ }
+ };
+
+ // current memtable keys. have to go through the CFS api for locking.
+ iterators.add(IteratorUtils.filteredIterator(cfs.memtableKeyIterator(), p));
+ // historical memtables
+ for (Memtable memtable : ColumnFamilyStore.getUnflushedMemtables(cfName))
+ {
+ iterators.add(IteratorUtils.filteredIterator(Memtable.getKeyIterator(memtable.getKeys()), p));
+ }
+
+ // sstables
+ for (SSTableReader sstable : cfs.getSSTables())
+ {
+ FileStruct fs = sstable.getFileStruct();
+ fs.seekTo(startWith);
+ iterators.add(fs);
+ }
+
+ Iterator<String> collated = IteratorUtils.collatedIterator(comparator, iterators);
+ Iterable<String> reduced = new ReducingIterator<String>(collated) {
+ String current;
+
+ public void reduce(String current)
+ {
+ this.current = current;
+ }
+
+ protected String getReduced()
+ {
+ return current;
+ }
+ };
+
+ try
+ {
+ // pull keys out of the CollatedIterator. checking tombstone status is expensive,
+ // so we set an arbitrary limit on how many we'll do at once.
+ List<String> keys = new ArrayList<String>();
+ for (String current : reduced)
+ {
+ if (!stopAt.isEmpty() && comparator.compare(stopAt, current) < 0)
+ {
+ break;
+ }
+ // make sure there is actually non-tombstone content associated w/ this key
+ // TODO record the key source(s) somehow and only check that source (e.g., memtable or sstable)
+ QueryFilter filter = new SliceQueryFilter(current, new QueryPath(cfName), ArrayUtils.EMPTY_BYTE_ARRAY, ArrayUtils.EMPTY_BYTE_ARRAY, true, 1);
+ if (cfs.getColumnFamily(filter, Integer.MAX_VALUE) != null)
+ {
+ keys.add(current);
+ }
+ if (keys.size() >= maxResults)
+ {
+ break;
+ }
+ }
+ return keys;
+ }
+ finally
+ {
+ for (Iterator iter : iterators)
+ {
+ if (iter instanceof FileStruct)
+ {
+ ((FileStruct)iter).close();
+ }
+ }
+ }
+ }
+
+ public static String getSnapshotPath(String dataDirPath, String tableName, String snapshotName)
+ {
+ return dataDirPath + File.separator + tableName + File.separator + SNAPSHOT_SUBDIR_NAME + File.separator + snapshotName;
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java Thu Jul 30 15:30:21 2009
@@ -1,100 +1,100 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-
-import javax.xml.bind.annotation.XmlElement;
-
-import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.StorageService;
-
-
-/*
- * This message is sent back the row mutation verb handler
- * and basically specifies if the write succeeded or not for a particular
- * key in a table
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-public class WriteResponse
-{
- private static WriteResponseSerializer serializer_ = new WriteResponseSerializer();
-
- public static WriteResponseSerializer serializer()
- {
- return serializer_;
- }
-
- public static Message makeWriteResponseMessage(Message original, WriteResponse writeResponseMessage) throws IOException
- {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream( bos );
- WriteResponse.serializer().serialize(writeResponseMessage, dos);
- return original.getReply(StorageService.getLocalStorageEndPoint(), bos.toByteArray());
- }
-
- private final String table_;
- private final String key_;
- private final boolean status_;
-
- public WriteResponse(String table, String key, boolean bVal) {
- table_ = table;
- key_ = key;
- status_ = bVal;
- }
-
- public String table()
- {
- return table_;
- }
-
- public String key()
- {
- return key_;
- }
-
- public boolean isSuccess()
- {
- return status_;
- }
-
- public static class WriteResponseSerializer implements ICompactSerializer<WriteResponse>
- {
- public void serialize(WriteResponse wm, DataOutputStream dos) throws IOException
- {
- dos.writeUTF(wm.table());
- dos.writeUTF(wm.key());
- dos.writeBoolean(wm.isSuccess());
- }
-
- public WriteResponse deserialize(DataInputStream dis) throws IOException
- {
- String table = dis.readUTF();
- String key = dis.readUTF();
- boolean status = dis.readBoolean();
- return new WriteResponse(table, key, status);
- }
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+import javax.xml.bind.annotation.XmlElement;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+
+
+/*
+ * This message is sent back the row mutation verb handler
+ * and basically specifies if the write succeeded or not for a particular
+ * key in a table
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+public class WriteResponse
+{
+ private static WriteResponseSerializer serializer_ = new WriteResponseSerializer();
+
+ public static WriteResponseSerializer serializer()
+ {
+ return serializer_;
+ }
+
+ public static Message makeWriteResponseMessage(Message original, WriteResponse writeResponseMessage) throws IOException
+ {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream( bos );
+ WriteResponse.serializer().serialize(writeResponseMessage, dos);
+ return original.getReply(StorageService.getLocalStorageEndPoint(), bos.toByteArray());
+ }
+
+ private final String table_;
+ private final String key_;
+ private final boolean status_;
+
+ public WriteResponse(String table, String key, boolean bVal) {
+ table_ = table;
+ key_ = key;
+ status_ = bVal;
+ }
+
+ public String table()
+ {
+ return table_;
+ }
+
+ public String key()
+ {
+ return key_;
+ }
+
+ public boolean isSuccess()
+ {
+ return status_;
+ }
+
+ public static class WriteResponseSerializer implements ICompactSerializer<WriteResponse>
+ {
+ public void serialize(WriteResponse wm, DataOutputStream dos) throws IOException
+ {
+ dos.writeUTF(wm.table());
+ dos.writeUTF(wm.key());
+ dos.writeBoolean(wm.isSuccess());
+ }
+
+ public WriteResponse deserialize(DataInputStream dis) throws IOException
+ {
+ String table = dis.readUTF();
+ String key = dis.readUTF();
+ boolean status = dis.readBoolean();
+ return new WriteResponse(table, key, status);
+ }
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java Thu Jul 30 15:30:21 2009
@@ -1,134 +1,134 @@
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.dht;
-
- import java.util.ArrayList;
- import java.util.Collections;
- import java.util.HashMap;
- import java.util.HashSet;
- import java.util.List;
- import java.util.Map;
- import java.util.Set;
-
- import org.apache.log4j.Logger;
-
- import org.apache.cassandra.locator.TokenMetadata;
- import org.apache.cassandra.net.EndPoint;
- import org.apache.cassandra.service.StorageService;
- import org.apache.cassandra.utils.LogUtil;
-
-
-/**
- * This class handles the bootstrapping responsibilities for
- * any new endpoint.
-*/
-public class BootStrapper implements Runnable
-{
- private static Logger logger_ = Logger.getLogger(BootStrapper.class);
- /* endpoints that need to be bootstrapped */
- protected EndPoint[] targets_ = new EndPoint[0];
- /* tokens of the nodes being bootstrapped. */
- protected final Token[] tokens_;
- protected TokenMetadata tokenMetadata_ = null;
- private List<EndPoint> filters_ = new ArrayList<EndPoint>();
-
- public BootStrapper(EndPoint[] target, Token... token)
- {
- targets_ = target;
- tokens_ = token;
- tokenMetadata_ = StorageService.instance().getTokenMetadata();
- }
-
- public BootStrapper(EndPoint[] target, Token[] token, EndPoint[] filters)
- {
- this(target, token);
- Collections.addAll(filters_, filters);
- }
-
- public void run()
- {
- try
- {
- if (logger_.isDebugEnabled())
- logger_.debug("Beginning bootstrap process for " + targets_ + " ...");
- /* copy the token to endpoint map */
- Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
- /* remove the tokens associated with the endpoints being bootstrapped */
- for (Token token : tokens_)
- {
- tokenToEndPointMap.remove(token);
- }
-
- Set<Token> oldTokens = new HashSet<Token>( tokenToEndPointMap.keySet() );
- Range[] oldRanges = StorageService.instance().getAllRanges(oldTokens);
- if (logger_.isDebugEnabled())
- logger_.debug("Total number of old ranges " + oldRanges.length);
- /*
- * Find the ranges that are split. Maintain a mapping between
- * the range being split and the list of subranges.
- */
- Map<Range, List<Range>> splitRanges = LeaveJoinProtocolHelper.getRangeSplitRangeMapping(oldRanges, tokens_);
- /* Calculate the list of nodes that handle the old ranges */
- Map<Range, List<EndPoint>> oldRangeToEndPointMap = StorageService.instance().constructRangeToEndPointMap(oldRanges, tokenToEndPointMap);
- /* Mapping of split ranges to the list of endpoints responsible for the range */
- Map<Range, List<EndPoint>> replicasForSplitRanges = new HashMap<Range, List<EndPoint>>();
- Set<Range> rangesSplit = splitRanges.keySet();
- for ( Range splitRange : rangesSplit )
- {
- replicasForSplitRanges.put( splitRange, oldRangeToEndPointMap.get(splitRange) );
- }
- /* Remove the ranges that are split. */
- for ( Range splitRange : rangesSplit )
- {
- oldRangeToEndPointMap.remove(splitRange);
- }
-
- /* Add the subranges of the split range to the map with the same replica set. */
- for ( Range splitRange : rangesSplit )
- {
- List<Range> subRanges = splitRanges.get(splitRange);
- List<EndPoint> replicas = replicasForSplitRanges.get(splitRange);
- for ( Range subRange : subRanges )
- {
- /* Make sure we clone or else we are hammered. */
- oldRangeToEndPointMap.put(subRange, new ArrayList<EndPoint>(replicas));
- }
- }
-
- /* Add the new token and re-calculate the range assignments */
- Collections.addAll( oldTokens, tokens_ );
- Range[] newRanges = StorageService.instance().getAllRanges(oldTokens);
-
- if (logger_.isDebugEnabled())
- logger_.debug("Total number of new ranges " + newRanges.length);
- /* Calculate the list of nodes that handle the new ranges */
- Map<Range, List<EndPoint>> newRangeToEndPointMap = StorageService.instance().constructRangeToEndPointMap(newRanges);
- /* Calculate ranges that need to be sent and from whom to where */
- Map<Range, List<BootstrapSourceTarget>> rangesWithSourceTarget = LeaveJoinProtocolHelper.getRangeSourceTargetInfo(oldRangeToEndPointMap, newRangeToEndPointMap);
- /* Send messages to respective folks to stream data over to the new nodes being bootstrapped */
- LeaveJoinProtocolHelper.assignWork(rangesWithSourceTarget, filters_);
- }
- catch ( Throwable th )
- {
- if (logger_.isDebugEnabled())
- logger_.debug( LogUtil.throwableToString(th) );
- }
- }
-
-}
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+
+ import org.apache.log4j.Logger;
+
+ import org.apache.cassandra.locator.TokenMetadata;
+ import org.apache.cassandra.net.EndPoint;
+ import org.apache.cassandra.service.StorageService;
+ import org.apache.cassandra.utils.LogUtil;
+
+
+/**
+ * This class handles the bootstrapping responsibilities for
+ * any new endpoint.
+*/
+public class BootStrapper implements Runnable
+{
+ private static Logger logger_ = Logger.getLogger(BootStrapper.class);
+ /* endpoints that need to be bootstrapped */
+ protected EndPoint[] targets_ = new EndPoint[0];
+ /* tokens of the nodes being bootstrapped. */
+ protected final Token[] tokens_;
+ protected TokenMetadata tokenMetadata_ = null;
+ private List<EndPoint> filters_ = new ArrayList<EndPoint>();
+
+ public BootStrapper(EndPoint[] target, Token... token)
+ {
+ targets_ = target;
+ tokens_ = token;
+ tokenMetadata_ = StorageService.instance().getTokenMetadata();
+ }
+
+ public BootStrapper(EndPoint[] target, Token[] token, EndPoint[] filters)
+ {
+ this(target, token);
+ Collections.addAll(filters_, filters);
+ }
+
+ public void run()
+ {
+ try
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug("Beginning bootstrap process for " + targets_ + " ...");
+ /* copy the token to endpoint map */
+ Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+ /* remove the tokens associated with the endpoints being bootstrapped */
+ for (Token token : tokens_)
+ {
+ tokenToEndPointMap.remove(token);
+ }
+
+ Set<Token> oldTokens = new HashSet<Token>( tokenToEndPointMap.keySet() );
+ Range[] oldRanges = StorageService.instance().getAllRanges(oldTokens);
+ if (logger_.isDebugEnabled())
+ logger_.debug("Total number of old ranges " + oldRanges.length);
+ /*
+ * Find the ranges that are split. Maintain a mapping between
+ * the range being split and the list of subranges.
+ */
+ Map<Range, List<Range>> splitRanges = LeaveJoinProtocolHelper.getRangeSplitRangeMapping(oldRanges, tokens_);
+ /* Calculate the list of nodes that handle the old ranges */
+ Map<Range, List<EndPoint>> oldRangeToEndPointMap = StorageService.instance().constructRangeToEndPointMap(oldRanges, tokenToEndPointMap);
+ /* Mapping of split ranges to the list of endpoints responsible for the range */
+ Map<Range, List<EndPoint>> replicasForSplitRanges = new HashMap<Range, List<EndPoint>>();
+ Set<Range> rangesSplit = splitRanges.keySet();
+ for ( Range splitRange : rangesSplit )
+ {
+ replicasForSplitRanges.put( splitRange, oldRangeToEndPointMap.get(splitRange) );
+ }
+ /* Remove the ranges that are split. */
+ for ( Range splitRange : rangesSplit )
+ {
+ oldRangeToEndPointMap.remove(splitRange);
+ }
+
+ /* Add the subranges of the split range to the map with the same replica set. */
+ for ( Range splitRange : rangesSplit )
+ {
+ List<Range> subRanges = splitRanges.get(splitRange);
+ List<EndPoint> replicas = replicasForSplitRanges.get(splitRange);
+ for ( Range subRange : subRanges )
+ {
+ /* Make sure we clone or else we are hammered. */
+ oldRangeToEndPointMap.put(subRange, new ArrayList<EndPoint>(replicas));
+ }
+ }
+
+ /* Add the new token and re-calculate the range assignments */
+ Collections.addAll( oldTokens, tokens_ );
+ Range[] newRanges = StorageService.instance().getAllRanges(oldTokens);
+
+ if (logger_.isDebugEnabled())
+ logger_.debug("Total number of new ranges " + newRanges.length);
+ /* Calculate the list of nodes that handle the new ranges */
+ Map<Range, List<EndPoint>> newRangeToEndPointMap = StorageService.instance().constructRangeToEndPointMap(newRanges);
+ /* Calculate ranges that need to be sent and from whom to where */
+ Map<Range, List<BootstrapSourceTarget>> rangesWithSourceTarget = LeaveJoinProtocolHelper.getRangeSourceTargetInfo(oldRangeToEndPointMap, newRangeToEndPointMap);
+ /* Send messages to respective folks to stream data over to the new nodes being bootstrapped */
+ LeaveJoinProtocolHelper.assignWork(rangesWithSourceTarget, filters_);
+ }
+ catch ( Throwable th )
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug( LogUtil.throwableToString(th) );
+ }
+ }
+
+}