You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pm...@apache.org on 2009/03/02 08:57:31 UTC
svn commit: r749218 [14/34] - in /incubator/cassandra: branches/ dist/
nightly/ site/ tags/ trunk/ trunk/lib/ trunk/src/ trunk/src/org/
trunk/src/org/apache/ trunk/src/org/apache/cassandra/
trunk/src/org/apache/cassandra/analytics/ trunk/src/org/apache...
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/IColumn.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/IColumn.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/IColumn.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/IColumn.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IColumn
+{
+ public static short UtfPrefix_ = 2;
+ public boolean isMarkedForDelete();
+ public String name();
+ public int size();
+ public int serializedSize();
+ public long timestamp();
+ public long timestamp(String key);
+ public byte[] value();
+ public byte[] value(String key);
+ public Collection<IColumn> getSubColumns();
+ public IColumn getSubColumn(String columnName);
+ public void addColumn(String name, IColumn column);
+ public void delete();
+ public void repair(IColumn column);
+ public IColumn diff(IColumn column);
+ public boolean putColumn(IColumn column);
+ public int getObjectCount();
+ public byte[] digest();
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/ICompactSerializer2.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ICompactSerializer2.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ICompactSerializer2.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ICompactSerializer2.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.utils.BloomFilter;
+
+
+/**
+ * This interface is an extension of the ICompactSerializer which allows for partial deserialization
+ * of a type.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface ICompactSerializer2<T> extends ICompactSerializer<T>
+{
+ /**
+ * Returns an instance of an IColumn which contains only the
+ * columns that are required. This is specified in the <i>columnNames</i>
+ * argument.
+ *
+ * @param dis DataInput from which we need to deserialize.
+ * @param columnNames list of items that are required.
+ * @throws IOException
+ * @return type which contains the specified items.
+ */
+ public T deserialize(DataInputStream dis, IFilter filter) throws IOException;
+
+ /**
+ * This method is used to deserialize just the specified field from
+ * the serialized stream.
+ *
+ * @param dis DataInput from which we need to deserialize.
+ * @param name name of the desired field.
+ * @param count count of the number of fields required.
+ * @throws IOException
+ * @return the deserialized type.
+ */
+ public T deserialize(DataInputStream dis, String name, IFilter filter) throws IOException;
+
+ /**
+ *
+ * @param dis
+ * @throws IOException
+ */
+ public void skip(DataInputStream dis) throws IOException;
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/IFilter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/IFilter.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/IFilter.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/IFilter.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,16 @@
+package org.apache.cassandra.db;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.SSTable;
+
+
+public interface IFilter
+{
+ public boolean isDone();
+ public ColumnFamily filter(String cfName, ColumnFamily cf);
+ public IColumn filter(IColumn column, DataInputStream dis) throws IOException;
+ public DataInputBuffer next(String key, String cf, SSTable ssTable) throws IOException;
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/IScanner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/IScanner.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/IScanner.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/IScanner.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,11 @@
+package org.apache.cassandra.db;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public interface IScanner<T> extends Closeable
+{
+ public boolean hasNext() throws IOException;
+ public T next() throws IOException;
+ public void fetch(String key, String cf) throws IOException, ColumnFamilyNotDefinedException;
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/IdentityFilter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/IdentityFilter.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/IdentityFilter.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/IdentityFilter.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,72 @@
+package org.apache.cassandra.db;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.continuations.Suspendable;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.SSTable;
+
+
+public class IdentityFilter implements IFilter
+{
+ private boolean isDone_ = false;
+
+ public boolean isDone()
+ {
+ return isDone_;
+ }
+
+ public ColumnFamily filter(String cfString, ColumnFamily columnFamily)
+ {
+ String[] values = RowMutation.getColumnAndColumnFamily(cfString);
+ if( columnFamily == null )
+ return columnFamily;
+ String cfName = columnFamily.name();
+ if ( values.length == 2 && !DatabaseDescriptor.getColumnType(cfName).equals("Super") )
+ {
+ Collection<IColumn> columns = columnFamily.getAllColumns();
+ if(columns.size() >= 1)
+ isDone_ = true;
+ }
+ if ( values.length == 3 && DatabaseDescriptor.getColumnType(cfName).equals("Super"))
+ {
+ Collection<IColumn> columns = columnFamily.getAllColumns();
+ for(IColumn column : columns)
+ {
+ SuperColumn superColumn = (SuperColumn)column;
+ Collection<IColumn> subColumns = superColumn.getSubColumns();
+ if( subColumns.size() >= 1 )
+ isDone_ = true;
+ }
+ }
+ return columnFamily;
+ }
+
+ public IColumn filter(IColumn column, DataInputStream dis) throws IOException
+ {
+ // TODO Auto-generated method stub
+ return column;
+ }
+
+ public DataInputBuffer next(String key, String cf, SSTable ssTable) throws IOException
+ {
+ return ssTable.next(key, cf);
+ }
+
+ public void setDone()
+ {
+ isDone_ = true;
+ }
+ /**
+ * @param args
+ */
+ public static void main(String[] args)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/LoadVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/LoadVerbHandler.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/LoadVerbHandler.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/LoadVerbHandler.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class LoadVerbHandler implements IVerbHandler
+{
+ private static Logger logger_ = Logger.getLogger(LoadVerbHandler.class);
+
+ public void doVerb(Message message)
+ {
+ try
+ {
+ Object[] body = message.getMessageBody();
+ RowMutationMessage rmMsg = (RowMutationMessage)body[0];
+ RowMutation rm = rmMsg.getRowMutation();
+
+ EndPoint[] endpoints = StorageService.instance().getNStorageEndPoint(rm.key());
+
+ Message messageInternal = new Message(StorageService.getLocalStorageEndPoint(),
+ StorageService.mutationStage_,
+ StorageService.mutationVerbHandler_,
+ new Object[]{ rmMsg }
+ );
+
+ StringBuilder sb = new StringBuilder();
+ for(EndPoint endPoint : endpoints)
+ {
+ sb.append(endPoint);
+ MessagingService.getMessagingInstance().sendOneWay(messageInternal, endPoint);
+ }
+ logger_.debug("Sent data to " + sb.toString());
+ }
+ catch ( Exception e )
+ {
+ logger_.debug(LogUtil.throwableToString(e));
+ }
+ }
+
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,500 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.ThreadFactoryImpl;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.service.IComponentShutdown;
+import org.apache.cassandra.service.PartitionerType;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.BloomFilter;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.io.*;
+import org.apache.cassandra.utils.*;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class Memtable implements MemtableMBean, Comparable<Memtable>
+{
+ private static Logger logger_ = Logger.getLogger( Memtable.class );
+ private static Map<String, ExecutorService> apartments_ = new HashMap<String, ExecutorService>();
+ public static final String flushKey_ = "FlushKey";
+ public static void shutdown()
+ {
+ Set<String> names = apartments_.keySet();
+ for (String name : names)
+ {
+ apartments_.get(name).shutdownNow();
+ }
+ }
+
+ private int threshold_ = DatabaseDescriptor.getMemtableSize()*1024*1024;
+ private int thresholdCount_ = DatabaseDescriptor.getMemtableObjectCount()*1024*1024;
+ private AtomicInteger currentSize_ = new AtomicInteger(0);
+ private AtomicInteger currentObjectCount_ = new AtomicInteger(0);
+
+ /* Table and ColumnFamily name are used to determine the ColumnFamilyStore */
+ private String table_;
+ private String cfName_;
+ /* Creation time of this Memtable */
+ private long creationTime_;
+ private boolean isFrozen_ = false;
+ private Map<String, ColumnFamily> columnFamilies_ = new HashMap<String, ColumnFamily>();
+ /* Lock and Condition for notifying new clients about Memtable switches */
+ Lock lock_ = new ReentrantLock();
+ Condition condition_;
+
+ Memtable(String table, String cfName) throws IOException
+ {
+ if ( apartments_.get(cfName) == null )
+ {
+ apartments_.put(cfName, new DebuggableThreadPoolExecutor( 1,
+ 1,
+ Integer.MAX_VALUE,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new ThreadFactoryImpl("FAST-MEMTABLE-POOL")
+ ));
+ }
+
+ condition_ = lock_.newCondition();
+ table_ = table;
+ cfName_ = cfName;
+ creationTime_ = System.currentTimeMillis();
+ }
+
+ class Putter implements Runnable
+ {
+ private String key_;
+ private ColumnFamily columnFamily_;
+
+ Putter(String key, ColumnFamily cf)
+ {
+ key_ = key;
+ columnFamily_ = cf;
+ }
+
+ public void run()
+ {
+ resolve(key_, columnFamily_);
+ }
+ }
+
+ class Getter implements Callable<ColumnFamily>
+ {
+ private String key_;
+ private String columnFamilyName_;
+ private IFilter filter_;
+
+ Getter(String key, String cfName)
+ {
+ key_ = key;
+ columnFamilyName_ = cfName;
+ }
+
+ Getter(String key, String cfName, IFilter filter)
+ {
+ this(key, cfName);
+ filter_ = filter;
+ }
+
+ public ColumnFamily call()
+ {
+ ColumnFamily cf = getLocalCopy(key_, columnFamilyName_, filter_);
+ return cf;
+ }
+ }
+
+ class Remover implements Runnable
+ {
+ private String key_;
+ private ColumnFamily columnFamily_;
+
+ Remover(String key, ColumnFamily columnFamily)
+ {
+ key_ = key;
+ columnFamily_ = columnFamily;
+ }
+
+ public void run()
+ {
+ columnFamily_.delete();
+ columnFamilies_.put(key_, columnFamily_);
+ }
+ }
+
+ /**
+ * Compares two Memtable based on creation time.
+ * @param rhs
+ * @return
+ */
+ public int compareTo(Memtable rhs)
+ {
+ long diff = creationTime_ - rhs.creationTime_;
+ if ( diff > 0 )
+ return 1;
+ else if ( diff < 0 )
+ return -1;
+ else
+ return 0;
+ }
+
+ public int getMemtableThreshold()
+ {
+ return currentSize_.get();
+ }
+
+ void resolveSize(int oldSize, int newSize)
+ {
+ currentSize_.addAndGet(newSize - oldSize);
+ }
+
+ void resolveCount(int oldCount, int newCount)
+ {
+ currentObjectCount_.addAndGet(newCount - oldCount);
+ }
+
+ private boolean isLifetimeViolated()
+ {
+ /* Memtable lifetime in terms of milliseconds */
+ long lifetimeInMillis = DatabaseDescriptor.getMemtableLifetime() * 3600 * 1000;
+ return ( ( System.currentTimeMillis() - creationTime_ ) >= lifetimeInMillis );
+ }
+
+ boolean isThresholdViolated(String key)
+ {
+ boolean bVal = false;//isLifetimeViolated();
+ if (currentSize_.get() >= threshold_ || currentObjectCount_.get() >= thresholdCount_ || bVal || key.equals(flushKey_))
+ {
+ if ( bVal )
+ logger_.info("Memtable's lifetime for " + cfName_ + " has been violated.");
+ return true;
+ }
+ return false;
+ }
+
+ String getColumnFamily()
+ {
+ return cfName_;
+ }
+
+ void printExecutorStats()
+ {
+ DebuggableThreadPoolExecutor es = (DebuggableThreadPoolExecutor)apartments_.get(cfName_);
+ long taskCount = (es.getTaskCount() - es.getCompletedTaskCount());
+ logger_.debug("MEMTABLE TASKS : " + taskCount);
+ }
+
+ /*
+ * This version is used by the external clients to put data into
+ * the memtable. This version will respect the threshold and flush
+ * the memtable to disk when the size exceeds the threshold.
+ */
+ void put(String key, ColumnFamily columnFamily, CommitLog.CommitLogContext cLogCtx) throws IOException
+ {
+ if (isThresholdViolated(key) )
+ {
+ lock_.lock();
+ try
+ {
+ ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
+ if (!isFrozen_)
+ {
+ isFrozen_ = true;
+ MemtableManager.instance().submit(cfStore.getColumnFamilyName(), this, cLogCtx);
+ cfStore.switchMemtable(key, columnFamily, cLogCtx);
+ }
+ else
+ {
+ cfStore.apply(key, columnFamily, cLogCtx);
+ }
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ }
+ else
+ {
+ printExecutorStats();
+ Runnable putter = new Putter(key, columnFamily);
+ apartments_.get(cfName_).submit(putter);
+ }
+ }
+
+ /*
+ * This version is used to switch memtable and force flush.
+ */
+ void forceflush(ColumnFamilyStore cfStore, boolean fRecovery) throws IOException
+ {
+ if(!fRecovery)
+ {
+ RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), flushKey_);
+ try
+ {
+ rm.add(cfStore.columnFamily_ + ":Column","0".getBytes());
+ rm.apply();
+ }
+ catch(ColumnFamilyNotDefinedException ex)
+ {
+ logger_.debug(LogUtil.throwableToString(ex));
+ }
+ }
+ else
+ {
+ flush(CommitLog.CommitLogContext.NULL);
+ }
+ }
+
+
+
+ private void resolve(String key, ColumnFamily columnFamily)
+ {
+ ColumnFamily oldCf = columnFamilies_.get(key);
+ if ( oldCf != null )
+ {
+ int oldSize = oldCf.size();
+ int oldObjectCount = oldCf.getColumnCount();
+ oldCf.addColumns(columnFamily);
+ int newSize = oldCf.size();
+ int newObjectCount = oldCf.getColumnCount();
+ resolveSize(oldSize, newSize);
+ resolveCount(oldObjectCount, newObjectCount);
+ }
+ else
+ {
+ columnFamilies_.put(key, columnFamily);
+ currentSize_.addAndGet(columnFamily.size() + key.length());
+ currentObjectCount_.addAndGet(columnFamily.getColumnCount());
+ }
+ }
+
+ /*
+ * This version is called on commit log recovery. The threshold
+ * is not respected and a forceFlush() needs to be invoked to flush
+ * the contents to disk.
+ */
+ void putOnRecovery(String key, ColumnFamily columnFamily) throws IOException
+ {
+ if(!key.equals(Memtable.flushKey_))
+ resolve(key, columnFamily);
+ }
+
+
+ ColumnFamily getLocalCopy(String key, String cfName, IFilter filter)
+ {
+ String[] values = RowMutation.getColumnAndColumnFamily(cfName);
+ ColumnFamily columnFamily = null;
+ if(values.length == 1 )
+ {
+ columnFamily = columnFamilies_.get(key);
+ }
+ else
+ {
+ ColumnFamily cFamily = columnFamilies_.get(key);
+ if(cFamily == null)
+ return null;
+ IColumn column = null;
+ if(values.length == 2)
+ {
+ column = cFamily.getColumn(values[1]);
+ if(column != null )
+ {
+ columnFamily = new ColumnFamily(cfName_);
+ columnFamily.addColumn(column.name(), column);
+ }
+ }
+ else
+ {
+ column = cFamily.getColumn(values[1]);
+ if(column != null )
+ {
+
+ IColumn subColumn = ((SuperColumn)column).getSubColumn(values[2]);
+ if(subColumn != null)
+ {
+ columnFamily = new ColumnFamily(cfName_);
+ columnFamily.createColumn(values[1] + ":" + values[2], subColumn.value(), subColumn.timestamp());
+ }
+ }
+ }
+ }
+ /* Filter unnecessary data from the column based on the provided filter */
+ return filter.filter(cfName, columnFamily);
+ }
+
+ ColumnFamily get(String key, String cfName)
+ {
+ printExecutorStats();
+ Callable<ColumnFamily> call = new Getter(key, cfName);
+ ColumnFamily cf = null;
+ try
+ {
+ cf = apartments_.get(cfName_).submit(call).get();
+ }
+ catch ( ExecutionException ex )
+ {
+ logger_.debug(LogUtil.throwableToString(ex));
+ }
+ catch ( InterruptedException ex2 )
+ {
+ logger_.debug(LogUtil.throwableToString(ex2));
+ }
+ return cf;
+ }
+
+ ColumnFamily get(String key, String cfName, IFilter filter)
+ {
+ printExecutorStats();
+ Callable<ColumnFamily> call = new Getter(key, cfName, filter);
+ ColumnFamily cf = null;
+ try
+ {
+ cf = apartments_.get(cfName_).submit(call).get();
+ }
+ catch ( ExecutionException ex )
+ {
+ logger_.debug(LogUtil.throwableToString(ex));
+ }
+ catch ( InterruptedException ex2 )
+ {
+ logger_.debug(LogUtil.throwableToString(ex2));
+ }
+ return cf;
+ }
+
+ /*
+ * Although the method is named remove() we cannot remove the key
+ * from memtable. We add it to the memtable but mark it as deleted.
+ * The reason for this because we do not want a successive get()
+ * for the same key to scan the ColumnFamilyStore files for this key.
+ */
+ void remove(String key, ColumnFamily columnFamily) throws IOException
+ {
+ printExecutorStats();
+ Runnable deleter = new Remover(key, columnFamily);
+ apartments_.get(cfName_).submit(deleter);
+ }
+
+ /*
+ * param recoveryMode - indicates if this was invoked during
+ * recovery.
+ */
+ void flush(CommitLog.CommitLogContext cLogCtx) throws IOException
+ {
+ ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
+ if ( columnFamilies_.size() == 0 )
+ {
+ // This should be called even if size is 0
+ // This is because we should try to delete the useless commitlogs
+ // even though there is nothing to flush in memtables for a given family like Hints etc.
+ cfStore.onMemtableFlush(cLogCtx);
+ return;
+ }
+
+ PartitionerType pType = StorageService.getPartitionerType();
+ String directory = DatabaseDescriptor.getDataFileLocation();
+ String filename = cfStore.getNextFileName();
+ SSTable ssTable = new SSTable(directory, filename, pType);
+ switch (pType)
+ {
+ case OPHF:
+ flushForOrderPreservingPartitioner(ssTable, cfStore, cLogCtx);
+ break;
+
+ default:
+ flushForRandomPartitioner(ssTable, cfStore, cLogCtx);
+ break;
+ }
+
+ columnFamilies_.clear();
+ }
+
+ private void flushForRandomPartitioner(SSTable ssTable, ColumnFamilyStore cfStore, CommitLog.CommitLogContext cLogCtx) throws IOException
+ {
+ /* List of primary keys in sorted order */
+ List<PrimaryKey> pKeys = PrimaryKey.create( columnFamilies_.keySet() );
+ DataOutputBuffer buffer = new DataOutputBuffer();
+ /* Use this BloomFilter to decide if a key exists in a SSTable */
+ BloomFilter bf = new BloomFilter(pKeys.size(), 15);
+ for ( PrimaryKey pKey : pKeys )
+ {
+ buffer.reset();
+ ColumnFamily columnFamily = columnFamilies_.get(pKey.key());
+ if ( columnFamily != null )
+ {
+ /* serialize the cf with column indexes */
+ ColumnFamily.serializer2().serialize( columnFamily, buffer );
+ /* Now write the key and value to disk */
+ ssTable.append(pKey.key(), pKey.hash(), buffer);
+ bf.fill(pKey.key());
+ columnFamily.clear();
+ }
+ }
+ ssTable.close(bf);
+ cfStore.onMemtableFlush(cLogCtx);
+ cfStore.storeLocation( ssTable.getDataFileLocation(), bf );
+ buffer.close();
+ }
+
+ private void flushForOrderPreservingPartitioner(SSTable ssTable, ColumnFamilyStore cfStore, CommitLog.CommitLogContext cLogCtx) throws IOException
+ {
+ List<String> keys = new ArrayList<String>( columnFamilies_.keySet() );
+ Collections.sort(keys);
+ DataOutputBuffer buffer = new DataOutputBuffer();
+ /* Use this BloomFilter to decide if a key exists in a SSTable */
+ BloomFilter bf = new BloomFilter(keys.size(), 15);
+ for ( String key : keys )
+ {
+ buffer.reset();
+ ColumnFamily columnFamily = columnFamilies_.get(key);
+ if ( columnFamily != null )
+ {
+ /* serialize the cf with column indexes */
+ ColumnFamily.serializer2().serialize( columnFamily, buffer );
+ /* Now write the key and value to disk */
+ ssTable.append(key, buffer);
+ bf.fill(key);
+ columnFamily.clear();
+ }
+ }
+ ssTable.close(bf);
+ cfStore.onMemtableFlush(cLogCtx);
+ cfStore.storeLocation( ssTable.getDataFileLocation(), bf );
+ buffer.close();
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/MemtableMBean.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/MemtableMBean.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/MemtableMBean.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/MemtableMBean.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+
+public interface MemtableMBean
+{
+ public int getMemtableThreshold();
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/MemtableManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/MemtableManager.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/MemtableManager.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/MemtableManager.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.ThreadFactoryImpl;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.utils.*;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class MemtableManager
+{
+ private static MemtableManager instance_;
+ private static Lock lock_ = new ReentrantLock();
+ private static Logger logger_ = Logger.getLogger(MemtableManager.class);
+ private ReentrantReadWriteLock rwLock_ = new ReentrantReadWriteLock(true);
+ static MemtableManager instance()
+ {
+ if ( instance_ == null )
+ {
+ lock_.lock();
+ try
+ {
+ if ( instance_ == null )
+ instance_ = new MemtableManager();
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ }
+ return instance_;
+ }
+
+ class MemtableFlusher implements Runnable
+ {
+ private Memtable memtable_;
+ private CommitLog.CommitLogContext cLogCtx_;
+
+ MemtableFlusher(Memtable memtable, CommitLog.CommitLogContext cLogCtx)
+ {
+ memtable_ = memtable;
+ cLogCtx_ = cLogCtx;
+ }
+
+ public void run()
+ {
+ try
+ {
+ memtable_.flush(cLogCtx_);
+ }
+ catch (IOException e)
+ {
+ logger_.debug( LogUtil.throwableToString(e) );
+ }
+ rwLock_.writeLock().lock();
+ try
+ {
+ List<Memtable> memtables = history_.get(memtable_.getColumnFamily());
+ memtables.remove(memtable_);
+ }
+ finally
+ {
+ rwLock_.writeLock().unlock();
+ }
+ }
+ }
+
+ private Map<String, List<Memtable>> history_ = new HashMap<String, List<Memtable>>();
+ private ExecutorService flusher_ = new DebuggableThreadPoolExecutor( 1,
+ 1,
+ Integer.MAX_VALUE,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new ThreadFactoryImpl("MEMTABLE-FLUSHER-POOL")
+ );
+
+ /* Submit memtables to be flushed to disk */
+ void submit(String cfName, Memtable memtbl, CommitLog.CommitLogContext cLogCtx)
+ {
+ rwLock_.writeLock().lock();
+ try
+ {
+ List<Memtable> memtables = history_.get(cfName);
+ if ( memtables == null )
+ {
+ memtables = new ArrayList<Memtable>();
+ history_.put(cfName, memtables);
+ }
+ memtables.add(memtbl);
+ flusher_.submit( new MemtableFlusher(memtbl, cLogCtx) );
+ }
+ finally
+ {
+ rwLock_.writeLock().unlock();
+ }
+ }
+
+
+ /*
+ * Retrieve column family from the list of Memtables that have been
+ * submitted for flush but have not yet been flushed.
+ * It also filters out unneccesary columns based on the passed in filter.
+ */
+ void getColumnFamily(String key, String cfName, String cf, IFilter filter, List<ColumnFamily> columnFamilies)
+ {
+ rwLock_.readLock().lock();
+ try
+ {
+ /* Get all memtables associated with this column family */
+ List<Memtable> memtables = history_.get(cfName);
+ if ( memtables != null )
+ {
+ Collections.sort(memtables);
+ int size = memtables.size();
+ for ( int i = size - 1; i >= 0; --i )
+ {
+ ColumnFamily columnFamily = memtables.get(i).getLocalCopy(key, cf, filter);
+ if ( columnFamily != null )
+ {
+ columnFamilies.add(columnFamily);
+ if( filter.isDone())
+ break;
+ }
+ }
+ }
+ }
+ finally
+ {
+ rwLock_.readLock().unlock();
+ }
+ }
+
+
+
+
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/MinorCompactionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/MinorCompactionManager.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/MinorCompactionManager.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/MinorCompactionManager.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.ThreadFactoryImpl;
+import org.apache.cassandra.db.HintedHandOffManager.HintedHandOff;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.service.IComponentShutdown;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.BloomFilter;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.concurrent.*;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class MinorCompactionManager implements IComponentShutdown
+{
+ private static MinorCompactionManager instance_;
+ private static Lock lock_ = new ReentrantLock();
+ private static Logger logger_ = Logger.getLogger(MinorCompactionManager.class);
+ final static long intervalInMins_ = 5;
+
+ public static MinorCompactionManager instance()
+ {
+ if ( instance_ == null )
+ {
+ lock_.lock();
+ try
+ {
+ if ( instance_ == null )
+ instance_ = new MinorCompactionManager();
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ }
+ return instance_;
+ }
+
+ class FileCompactor implements Runnable
+ {
+ private ColumnFamilyStore columnFamilyStore_;
+
+ FileCompactor(ColumnFamilyStore columnFamilyStore)
+ {
+ columnFamilyStore_ = columnFamilyStore;
+ }
+
+ public void run()
+ {
+ try
+ {
+ logger_.debug("Started compaction ..."+columnFamilyStore_.columnFamily_);
+ columnFamilyStore_.doCompaction();
+ logger_.debug("Finished compaction ..."+columnFamilyStore_.columnFamily_);
+ }
+ catch (IOException e)
+ {
+ logger_.debug( LogUtil.throwableToString(e) );
+ }
+ catch (Throwable th)
+ {
+ logger_.error( LogUtil.throwableToString(th) );
+ }
+ }
+ }
+
+ class FileCompactor2 implements Callable<Boolean>
+ {
+ private ColumnFamilyStore columnFamilyStore_;
+ private List<Range> ranges_;
+ private EndPoint target_;
+ private List<String> fileList_;
+
+ FileCompactor2(ColumnFamilyStore columnFamilyStore, List<Range> ranges)
+ {
+ columnFamilyStore_ = columnFamilyStore;
+ ranges_ = ranges;
+ }
+
+ FileCompactor2(ColumnFamilyStore columnFamilyStore, List<Range> ranges, EndPoint target,List<String> fileList)
+ {
+ columnFamilyStore_ = columnFamilyStore;
+ ranges_ = ranges;
+ target_ = target;
+ fileList_ = fileList;
+ }
+
+ public Boolean call()
+ {
+ boolean result = true;
+ try
+ {
+ logger_.debug("Started compaction ..."+columnFamilyStore_.columnFamily_);
+ result = columnFamilyStore_.doAntiCompaction(ranges_, target_,fileList_);
+ logger_.debug("Finished compaction ..."+columnFamilyStore_.columnFamily_);
+ }
+ catch (IOException e)
+ {
+ logger_.debug( LogUtil.throwableToString(e) );
+ }
+ return result;
+ }
+ }
+
+ class OnDemandCompactor implements Runnable
+ {
+ private ColumnFamilyStore columnFamilyStore_;
+ private long skip_ = 0L;
+
+ OnDemandCompactor(ColumnFamilyStore columnFamilyStore, long skip)
+ {
+ columnFamilyStore_ = columnFamilyStore;
+ skip_ = skip;
+ }
+
+ public void run()
+ {
+ try
+ {
+ logger_.debug("Started Major compaction ..."+columnFamilyStore_.columnFamily_);
+ columnFamilyStore_.doMajorCompaction(skip_);
+ logger_.debug("Finished Major compaction ..."+columnFamilyStore_.columnFamily_);
+ }
+ catch (IOException e)
+ {
+ logger_.debug( LogUtil.throwableToString(e) );
+ }
+ return ;
+ }
+ }
+
+ class CleanupCompactor implements Runnable
+ {
+ private ColumnFamilyStore columnFamilyStore_;
+
+ CleanupCompactor(ColumnFamilyStore columnFamilyStore)
+ {
+ columnFamilyStore_ = columnFamilyStore;
+ }
+
+ public void run()
+ {
+ try
+ {
+ logger_.debug("Started compaction ..."+columnFamilyStore_.columnFamily_);
+ columnFamilyStore_.doCleanupCompaction();
+ logger_.debug("Finished compaction ..."+columnFamilyStore_.columnFamily_);
+ }
+ catch (IOException e)
+ {
+ logger_.debug( LogUtil.throwableToString(e) );
+ }
+ catch (Throwable th)
+ {
+ logger_.error( LogUtil.throwableToString(th) );
+ }
+ }
+ }
+
+
+ private ScheduledExecutorService compactor_ = new DebuggableScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("MINOR-COMPACTION-POOL"));
+
+ public MinorCompactionManager()
+ {
+ StorageService.instance().registerComponentForShutdown(this);
+ }
+
+ public void shutdown()
+ {
+ compactor_.shutdownNow();
+ }
+
+ public void submitPeriodicCompaction(ColumnFamilyStore columnFamilyStore)
+ {
+ compactor_.scheduleWithFixedDelay(new FileCompactor(columnFamilyStore), MinorCompactionManager.intervalInMins_,
+ MinorCompactionManager.intervalInMins_, TimeUnit.MINUTES);
+ }
+
+ public void submit(ColumnFamilyStore columnFamilyStore)
+ {
+ compactor_.submit(new FileCompactor(columnFamilyStore));
+ }
+
+ public void submitCleanup(ColumnFamilyStore columnFamilyStore)
+ {
+ compactor_.submit(new CleanupCompactor(columnFamilyStore));
+ }
+
+ public Future<Boolean> submit(ColumnFamilyStore columnFamilyStore, List<Range> ranges, EndPoint target, List<String> fileList)
+ {
+ return compactor_.submit( new FileCompactor2(columnFamilyStore, ranges, target, fileList) );
+ }
+
+ public Future<Boolean> submit(ColumnFamilyStore columnFamilyStore, List<Range> ranges)
+ {
+ return compactor_.submit( new FileCompactor2(columnFamilyStore, ranges) );
+ }
+
+ public void submitMajor(ColumnFamilyStore columnFamilyStore, List<Range> ranges, long skip)
+ {
+ compactor_.submit( new OnDemandCompactor(columnFamilyStore, skip) );
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/NamesFilter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/NamesFilter.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/NamesFilter.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/NamesFilter.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.SSTable;
+
+
+
+public class NamesFilter implements IFilter
+{
+ /* list of column names to filter against. */
+ private List<String> names_ = new ArrayList<String>();
+
+ NamesFilter(List<String> names)
+ {
+ names_ = names;
+ }
+
+ public ColumnFamily filter(String cf, ColumnFamily columnFamily)
+ {
+ if ( columnFamily == null )
+ {
+ return columnFamily;
+ }
+ String[] values = RowMutation.getColumnAndColumnFamily(cf);
+ String cfName = columnFamily.name();
+ ColumnFamily filteredCf = new ColumnFamily(cfName);
+ if( values.length == 1 )
+ {
+ Collection<IColumn> columns = columnFamily.getAllColumns();
+ for(IColumn column : columns)
+ {
+ if ( names_.contains(column.name()) )
+ {
+ names_.remove(column.name());
+ filteredCf.addColumn(column.name(), column);
+ }
+ if( isDone() )
+ {
+ return filteredCf;
+ }
+ }
+ }
+ else if ( values.length == 2 && DatabaseDescriptor.getColumnType(cfName).equals("Super") )
+ {
+ Collection<IColumn> columns = columnFamily.getAllColumns();
+ for(IColumn column : columns)
+ {
+ SuperColumn superColumn = (SuperColumn)column;
+ SuperColumn filteredSuperColumn = new SuperColumn(superColumn.name());
+ filteredCf.addColumn(filteredSuperColumn.name(), filteredSuperColumn);
+ Collection<IColumn> subColumns = superColumn.getSubColumns();
+ for(IColumn subColumn : subColumns)
+ {
+ if ( names_.contains(subColumn.name()) )
+ {
+ names_.remove(subColumn.name());
+ filteredSuperColumn.addColumn(subColumn.name(), subColumn);
+ }
+ if( isDone() )
+ {
+ return filteredCf;
+ }
+ }
+ }
+ }
+ else
+ {
+ throw new UnsupportedOperationException();
+ }
+ return filteredCf;
+ }
+
+ public IColumn filter(IColumn column, DataInputStream dis) throws IOException
+ {
+ String columnName = column.name();
+ if ( names_.contains(columnName) )
+ {
+ names_.remove(columnName);
+ }
+ else
+ {
+ column = null;
+ }
+
+ return column;
+ }
+
+ public boolean isDone()
+ {
+ return names_.isEmpty();
+ }
+
+ public DataInputBuffer next(String key, String cf, SSTable ssTable) throws IOException
+ {
+ return ssTable.next(key, cf, names_);
+ }
+
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/PrimaryKey.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/PrimaryKey.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/PrimaryKey.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/PrimaryKey.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.service.PartitionerType;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+
+public class PrimaryKey implements Comparable<PrimaryKey>
+{
+ public static List<PrimaryKey> create(Set<String> keys)
+ {
+ List<PrimaryKey> list = new ArrayList<PrimaryKey>();
+ for ( String key : keys )
+ {
+ list.add( new PrimaryKey(key) );
+ }
+ Collections.sort(list);
+ return list;
+ }
+
+ /* MD5 hash of the key_ */
+ private BigInteger hash_;
+ /* Key used by the application */
+ private String key_;
+
+ PrimaryKey(String key)
+ {
+ PartitionerType pType = StorageService.getPartitionerType();
+ switch (pType)
+ {
+ case RANDOM:
+ hash_ = FBUtilities.hash(key);
+ break;
+
+ case OPHF:
+ break;
+
+ default:
+ hash_ = hash_ = FBUtilities.hash(key);
+ break;
+ }
+ key_ = key;
+ }
+
+ PrimaryKey(String key, BigInteger hash)
+ {
+ hash_ = hash;
+ key_ = key;
+ }
+
+ public String key()
+ {
+ return key_;
+ }
+
+ public BigInteger hash()
+ {
+ return hash_;
+ }
+
+ /**
+ * This performs semantic comparison of Primary Keys.
+ * If the partition algorithm chosen is "Random" then
+ * the hash of the key is used for comparison. If it
+ * is an OPHF then the key is used.
+ *
+ * @param rhs primary against which we wish to compare.
+ * @return
+ */
+ public int compareTo(PrimaryKey rhs)
+ {
+ int value = 0;
+ PartitionerType pType = StorageService.getPartitionerType();
+ switch (pType)
+ {
+ case RANDOM:
+ value = hash_.compareTo(rhs.hash_);
+ break;
+
+ case OPHF:
+ value = key_.compareTo(rhs.key_);
+ break;
+
+ default:
+ value = hash_.compareTo(rhs.hash_);
+ break;
+ }
+ return value;
+ }
+
+ @Override
+ public String toString()
+ {
+ return (hash_ != null) ? (key_ + ":" + hash_) : key_;
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadMessage.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadMessage.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadMessage.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import javax.xml.bind.annotation.XmlElement;
+
+import org.apache.cassandra.continuations.Suspendable;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.StorageService;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class ReadMessage implements Serializable
+{
+ private static ICompactSerializer<ReadMessage> serializer_;
+
+ static
+ {
+ serializer_ = new ReadMessageSerializer();
+ }
+
+ static ICompactSerializer<ReadMessage> serializer()
+ {
+ return serializer_;
+ }
+
+ public static Message makeReadMessage(ReadMessage readMessage) throws IOException
+ {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream( bos );
+ ReadMessage.serializer().serialize(readMessage, dos);
+ Message message = new Message(StorageService.getLocalStorageEndPoint(), StorageService.readStage_, StorageService.readVerbHandler_, new Object[]{bos.toByteArray()});
+ return message;
+ }
+
+ @XmlElement(name="Table")
+ private String table_;
+
+ @XmlElement(name="Key")
+ private String key_;
+
+ @XmlElement(name="ColumnFamily")
+ private String columnFamily_column_ = null;
+
+ @XmlElement(name="start")
+ private int start_ = -1;
+
+ @XmlElement(name="count")
+ private int count_ = -1 ;
+
+ @XmlElement(name="sinceTimestamp")
+ private long sinceTimestamp_ = -1 ;
+
+ @XmlElement(name="columnNames")
+ private List<String> columns_ = new ArrayList<String>();
+
+ @XmlElement(name="isDigestQuery")
+ private boolean isDigestQuery_ = false;
+
+ private ReadMessage()
+ {
+ }
+
+ public ReadMessage(String table, String key)
+ {
+ table_ = table;
+ key_ = key;
+ }
+
+ public ReadMessage(String table, String key, String columnFamily_column)
+ {
+ table_ = table;
+ key_ = key;
+ columnFamily_column_ = columnFamily_column;
+ }
+
+ public ReadMessage(String table, String key, String columnFamily, List<String> columns)
+ {
+ table_ = table;
+ key_ = key;
+ columnFamily_column_ = columnFamily;
+ columns_ = columns;
+ }
+
+ public ReadMessage(String table, String key, String columnFamily_column, int start, int count)
+ {
+ table_ = table;
+ key_ = key;
+ columnFamily_column_ = columnFamily_column;
+ start_ = start ;
+ count_ = count;
+ }
+
+ public ReadMessage(String table, String key, String columnFamily_column, long sinceTimestamp)
+ {
+ table_ = table;
+ key_ = key;
+ columnFamily_column_ = columnFamily_column;
+ sinceTimestamp_ = sinceTimestamp ;
+ }
+
+ String table()
+ {
+ return table_;
+ }
+
+ String key()
+ {
+ return key_;
+ }
+
+ String columnFamily_column()
+ {
+ return columnFamily_column_;
+ }
+
+ int start()
+ {
+ return start_;
+ }
+
+ int count()
+ {
+ return count_;
+ }
+
+ long sinceTimestamp()
+ {
+ return sinceTimestamp_;
+ }
+
+ public boolean isDigestQuery()
+ {
+ return isDigestQuery_;
+ }
+
+ public void setIsDigestQuery(boolean isDigestQuery)
+ {
+ isDigestQuery_ = isDigestQuery;
+ }
+
+ public List<String> getColumnNames()
+ {
+ return columns_;
+ }
+}
+
+class ReadMessageSerializer implements ICompactSerializer<ReadMessage>
+{
+ public void serialize(ReadMessage rm, DataOutputStream dos) throws IOException
+ {
+ dos.writeUTF(rm.table());
+ dos.writeUTF(rm.key());
+ dos.writeUTF(rm.columnFamily_column());
+ dos.writeInt(rm.start());
+ dos.writeInt(rm.count());
+ dos.writeLong(rm.sinceTimestamp());
+ dos.writeBoolean(rm.isDigestQuery());
+ List<String> columns = rm.getColumnNames();
+ dos.writeInt(columns.size());
+ if ( columns.size() > 0 )
+ {
+ for ( String column : columns )
+ {
+ dos.writeInt(column.getBytes().length);
+ dos.write(column.getBytes());
+ }
+ }
+ }
+
+ public ReadMessage deserialize(DataInputStream dis) throws IOException
+ {
+ String table = dis.readUTF();
+ String key = dis.readUTF();
+ String columnFamily_column = dis.readUTF();
+ int start = dis.readInt();
+ int count = dis.readInt();
+ long sinceTimestamp = dis.readLong();
+ boolean isDigest = dis.readBoolean();
+
+ int size = dis.readInt();
+ List<String> columns = new ArrayList<String>();
+ for ( int i = 0; i < size; ++i )
+ {
+ byte[] bytes = new byte[dis.readInt()];
+ dis.readFully(bytes);
+ columns.add( new String(bytes) );
+ }
+ ReadMessage rm = null;
+ if ( columns.size() > 0 )
+ {
+ rm = new ReadMessage(table, key, columnFamily_column, columns);
+ }
+ if( sinceTimestamp > 0 )
+ {
+ rm = new ReadMessage(table, key, columnFamily_column, sinceTimestamp);
+ }
+ else
+ {
+ rm = new ReadMessage(table, key, columnFamily_column, start, count);
+ }
+ rm.setIsDigestQuery(isDigest);
+ return rm;
+ }
+}
+
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadRepairVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadRepairVerbHandler.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadRepairVerbHandler.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadRepairVerbHandler.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.*;
+
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.service.*;
+import org.apache.cassandra.utils.*;
+import org.apache.cassandra.concurrent.*;
+import org.apache.cassandra.net.*;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class ReadRepairVerbHandler implements IVerbHandler
+{
+ private static Logger logger_ = Logger.getLogger(ReadRepairVerbHandler.class);
+
+ public void doVerb(Message message)
+ {
+ byte[] body = (byte[])message.getMessageBody()[0];
+ DataInputBuffer buffer = new DataInputBuffer();
+ buffer.reset(body, body.length);
+
+ try
+ {
+ RowMutationMessage rmMsg = RowMutationMessage.serializer().deserialize(buffer);
+ RowMutation rm = rmMsg.getRowMutation();
+ rm.apply();
+ }
+ catch( ColumnFamilyNotDefinedException ex )
+ {
+ logger_.debug(LogUtil.throwableToString(ex));
+ }
+ catch ( IOException e )
+ {
+ logger_.debug(LogUtil.throwableToString(e));
+ }
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadResponseMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadResponseMessage.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadResponseMessage.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadResponseMessage.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+import javax.xml.bind.annotation.XmlElement;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+
+
+/*
+ * The read response message is sent by the server when reading data
+ * this encapsulates the tablename and teh row that has been read.
+ * The table name is needed so that we can use it to create repairs.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+public class ReadResponseMessage implements Serializable
+{
+private static ICompactSerializer<ReadResponseMessage> serializer_;
+
+ static
+ {
+ serializer_ = new ReadResponseMessageSerializer();
+ }
+
+ public static ICompactSerializer<ReadResponseMessage> serializer()
+ {
+ return serializer_;
+ }
+
+ public static Message makeReadResponseMessage(ReadResponseMessage readResponseMessage) throws IOException
+ {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream( bos );
+ ReadResponseMessage.serializer().serialize(readResponseMessage, dos);
+ Message message = new Message(StorageService.getLocalStorageEndPoint(), MessagingService.responseStage_, MessagingService.responseVerbHandler_, new Object[]{bos.toByteArray()});
+ return message;
+ }
+
+ @XmlElement(name = "Table")
+ private String table_;
+
+ @XmlElement(name = "Row")
+ private Row row_;
+
+ @XmlElement(name = "Digest")
+ private byte[] digest_ = new byte[0];
+
+ @XmlElement(name="isDigestQuery")
+ private boolean isDigestQuery_ = false;
+
+ private ReadResponseMessage() {
+ }
+
+ public ReadResponseMessage(String table, byte[] digest ) {
+ table_ = table;
+ digest_= digest;
+ }
+
+ public ReadResponseMessage(String table, Row row) {
+ table_ = table;
+ row_ = row;
+ }
+
+ public String table() {
+ return table_;
+ }
+
+ public Row row()
+ {
+ return row_;
+ }
+
+ public byte[] digest() {
+ return digest_;
+ }
+
+ public boolean isDigestQuery()
+ {
+ return isDigestQuery_;
+ }
+
+ public void setIsDigestQuery(boolean isDigestQuery)
+ {
+ isDigestQuery_ = isDigestQuery;
+ }
+}
+
+
+class ReadResponseMessageSerializer implements ICompactSerializer<ReadResponseMessage>
+{
+ public void serialize(ReadResponseMessage rm, DataOutputStream dos) throws IOException
+ {
+ dos.writeUTF(rm.table());
+ dos.writeInt(rm.digest().length);
+ dos.write(rm.digest());
+ dos.writeBoolean(rm.isDigestQuery());
+
+ if( !rm.isDigestQuery() && rm.row() != null )
+ {
+ Row.serializer().serialize(rm.row(), dos);
+ }
+ }
+
+ public ReadResponseMessage deserialize(DataInputStream dis) throws IOException
+ {
+ String table = dis.readUTF();
+ int digestSize = dis.readInt();
+ byte[] digest = new byte[digestSize];
+ dis.read(digest, 0 , digestSize);
+ boolean isDigest = dis.readBoolean();
+
+ Row row = null;
+ if ( !isDigest )
+ {
+ row = Row.serializer().deserialize(dis);
+ }
+
+ ReadResponseMessage rmsg = null;
+ if( isDigest )
+ {
+ rmsg = new ReadResponseMessage(table, digest);
+ }
+ else
+ {
+ rmsg = new ReadResponseMessage(table, row);
+ }
+ rmsg.setIsDigestQuery(isDigest);
+ return rmsg;
+ }
+}
\ No newline at end of file
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadVerbHandler.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadVerbHandler.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadVerbHandler.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+
+import org.apache.cassandra.continuations.Suspendable;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.net.*;
+import org.apache.cassandra.utils.*;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class ReadVerbHandler implements IVerbHandler
+{
+ private static class ReadContext
+ {
+ protected DataInputBuffer bufIn_ = new DataInputBuffer();
+ protected DataOutputBuffer bufOut_ = new DataOutputBuffer();
+ }
+
+ private static Logger logger_ = Logger.getLogger( ReadVerbHandler.class );
+ /* We use this so that we can reuse the same row mutation context for the mutation. */
+ private static ThreadLocal<ReadContext> tls_ = new InheritableThreadLocal<ReadContext>();
+
+ public void doVerb(Message message)
+ {
+ byte[] body = (byte[])message.getMessageBody()[0];
+ /* Obtain a Read Context from TLS */
+ ReadContext readCtx = tls_.get();
+ if ( readCtx == null )
+ {
+ readCtx = new ReadContext();
+ tls_.set(readCtx);
+ }
+ readCtx.bufIn_.reset(body, body.length);
+
+ try
+ {
+ ReadMessage readMessage = ReadMessage.serializer().deserialize(readCtx.bufIn_);
+ Table table = Table.open(readMessage.table());
+ Row row = null;
+ long start = System.currentTimeMillis();
+ if( readMessage.columnFamily_column() == null )
+ row = table.get(readMessage.key());
+ else
+ {
+ if(readMessage.getColumnNames().size() == 0)
+ {
+ if(readMessage.count() > 0 && readMessage.start() >= 0)
+ row = table.getRow(readMessage.key(), readMessage.columnFamily_column(), readMessage.start(), readMessage.count());
+ else
+ row = table.getRow(readMessage.key(), readMessage.columnFamily_column());
+ }
+ else
+ {
+ row = table.getRow(readMessage.key(), readMessage.columnFamily_column(), readMessage.getColumnNames());
+ }
+ }
+ logger_.info("getRow() TIME: " + (System.currentTimeMillis() - start) + " ms.");
+ start = System.currentTimeMillis();
+ ReadResponseMessage readResponseMessage = null;
+ if(readMessage.isDigestQuery())
+ {
+ readResponseMessage = new ReadResponseMessage(table.getTableName(), row.digest());
+
+ }
+ else
+ {
+ readResponseMessage = new ReadResponseMessage(table.getTableName(), row);
+ }
+ readResponseMessage.setIsDigestQuery(readMessage.isDigestQuery());
+ /* serialize the ReadResponseMessage. */
+ readCtx.bufOut_.reset();
+
+ start = System.currentTimeMillis();
+ ReadResponseMessage.serializer().serialize(readResponseMessage, readCtx.bufOut_);
+ logger_.info("serialize TIME: " + (System.currentTimeMillis() - start) + " ms.");
+
+ byte[] bytes = new byte[readCtx.bufOut_.getLength()];
+ start = System.currentTimeMillis();
+ System.arraycopy(readCtx.bufOut_.getData(), 0, bytes, 0, bytes.length);
+ logger_.info("copy TIME: " + (System.currentTimeMillis() - start) + " ms.");
+
+ Message response = message.getReply( StorageService.getLocalStorageEndPoint(), new Object[]{bytes} );
+ MessagingService.getMessagingInstance().sendOneWay(response, message.getFrom());
+ logger_.info("ReadVerbHandler TIME 2: " + (System.currentTimeMillis() - start)
+ + " ms.");
+ }
+ catch ( IOException ex)
+ {
+ logger_.info( LogUtil.throwableToString(ex) );
+ }
+ catch ( ColumnFamilyNotDefinedException ex)
+ {
+ logger_.info( LogUtil.throwableToString(ex) );
+ }
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/RecoveryManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/RecoveryManager.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/RecoveryManager.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/RecoveryManager.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.*;
+import java.io.*;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.utils.FileUtils;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class RecoveryManager
+{
+ private static RecoveryManager instance_;
+ private static Logger logger_ = Logger.getLogger(RecoveryManager.class);
+
+ synchronized static RecoveryManager instance() throws IOException
+ {
+ if ( instance_ == null )
+ instance_ = new RecoveryManager();
+ return instance_;
+ }
+
+ public static File[] getListofCommitLogs()
+ {
+ String directory = DatabaseDescriptor.getLogFileLocation();
+ File file = new File(directory);
+ File[] files = file.listFiles();
+ return files;
+ }
+
+ public static Map<String, List<File>> getListOFCommitLogsPerTable()
+ {
+ File[] files = getListofCommitLogs();
+ /* Maintains a mapping of table name to a list of commit log files */
+ Map<String, List<File>> tableToCommitLogs = new HashMap<String, List<File>>();
+
+ for (File f : files)
+ {
+ String table = CommitLog.getTableName(f.getName());
+ List<File> clogs = tableToCommitLogs.get(table);
+ if ( clogs == null )
+ {
+ clogs = new ArrayList<File>();
+ tableToCommitLogs.put(table, clogs);
+ }
+ clogs.add(f);
+ }
+ return tableToCommitLogs;
+ }
+
+ public void doRecovery() throws IOException
+ {
+ File[] files = getListofCommitLogs();
+ Map<String, List<File>> tableToCommitLogs = getListOFCommitLogsPerTable();
+ recoverEachTable(tableToCommitLogs);
+ FileUtils.delete(files);
+ }
+
+ private void recoverEachTable(Map<String, List<File>> tableToCommitLogs) throws IOException
+ {
+ Comparator<File> fCmp = new FileUtils.FileComparator();
+ Set<String> tables = tableToCommitLogs.keySet();
+ for ( String table : tables )
+ {
+ List<File> clogs = tableToCommitLogs.get(table);
+ Collections.sort(clogs, fCmp);
+ CommitLog clog = new CommitLog(table, true);
+ clog.recover(clogs);
+ }
+ }
+
+ public static void main(String[] args) throws Throwable
+ {
+ DatabaseDescriptor.init();
+ long start = System.currentTimeMillis();
+ RecoveryManager rm = RecoveryManager.instance();
+ rm.doRecovery();
+ logger_.debug( "Time taken : " + (System.currentTimeMillis() - start) + " ms.");
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/Row.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/Row.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/Row.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/Row.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.io.*;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+public class Row implements Serializable
+{
+ private static ICompactSerializer<Row> serializer_;
+ private static Logger logger_ = Logger.getLogger(Row.class);
+
+ static
+ {
+ serializer_ = new RowSerializer();
+ }
+
+ static ICompactSerializer<Row> serializer()
+ {
+ return serializer_;
+ }
+
+ private String key_;
+ private Map<String, ColumnFamily> columnFamilies_ = new Hashtable<String, ColumnFamily>();
+ private transient AtomicInteger size_ = new AtomicInteger(0);
+
+ /* Ctor for JAXB */
+ protected Row()
+ {
+ }
+
+ public Row(String key)
+ {
+ key_ = key;
+ }
+
+ public String key()
+ {
+ return key_;
+ }
+
+ void key(String key)
+ {
+ key_ = key;
+ }
+
+ public ColumnFamily getColumnFamily(String cfName)
+ {
+ return columnFamilies_.get(cfName);
+ }
+
+ public Map<String, ColumnFamily> getColumnFamilies()
+ {
+ return columnFamilies_;
+ }
+
+ void addColumnFamily(ColumnFamily columnFamily)
+ {
+ columnFamilies_.put(columnFamily.name(), columnFamily);
+ size_.addAndGet(columnFamily.size());
+ }
+
+ void removeColumnFamily(ColumnFamily columnFamily)
+ {
+ columnFamilies_.remove(columnFamily.name());
+ int delta = (-1) * columnFamily.size();
+ size_.addAndGet(delta);
+ }
+
+ public int size()
+ {
+ return size_.get();
+ }
+
+ public boolean isEmpty()
+ {
+ return ( columnFamilies_.size() == 0 );
+ }
+
+ /**
+ * This is used as oldRow.merge(newRow). Basically we take the newRow
+ * and merge it into the oldRow.
+ */
+ void merge(Row row)
+ {
+ Map<String, ColumnFamily> columnFamilies = row.getColumnFamilies();
+ Set<String> cfNames = columnFamilies.keySet();
+
+ for ( String cfName : cfNames )
+ {
+ ColumnFamily cf = columnFamilies_.get(cfName);
+ if ( cf == null )
+ columnFamilies_.put(cfName, columnFamilies.get(cfName));
+ else
+ {
+ cf.merge(columnFamilies.get(cfName));
+ }
+ }
+ }
+
+ /**
+ * This function will repair the current row with the input row
+ * what that means is that if there are any differences between the 2 rows then
+ * this fn will make the current row take the latest changes .
+ */
+ public void repair(Row row)
+ {
+ Map<String, ColumnFamily> columnFamilies = row.getColumnFamilies();
+ Set<String> cfNames = columnFamilies.keySet();
+
+ for ( String cfName : cfNames )
+ {
+ ColumnFamily cf = columnFamilies_.get(cfName);
+ if ( cf == null )
+ {
+ cf = new ColumnFamily(cfName);
+ columnFamilies_.put(cfName, cf);
+ }
+ cf.repair(columnFamilies.get(cfName));
+ }
+
+ }
+
+ /**
+ * This function will calculate the difference between 2 rows
+ * and return the resultant row. This assumes that the row that
+ * is being submitted is a super set of the current row so
+ * it only calculates additional
+ * difference and does not take care of what needs to be delted from the current row to make
+ * it same as the input row.
+ */
+ public Row diff(Row row)
+ {
+ Row rowDiff = new Row(key_);
+ Map<String, ColumnFamily> columnFamilies = row.getColumnFamilies();
+ Set<String> cfNames = columnFamilies.keySet();
+
+ for ( String cfName : cfNames )
+ {
+ ColumnFamily cf = columnFamilies_.get(cfName);
+ ColumnFamily cfDiff = null;
+ if ( cf == null )
+ rowDiff.getColumnFamilies().put(cfName, columnFamilies.get(cfName));
+ else
+ {
+ cfDiff = cf.diff(columnFamilies.get(cfName));
+ if(cfDiff != null)
+ rowDiff.getColumnFamilies().put(cfName, cfDiff);
+ }
+ }
+ if(rowDiff.getColumnFamilies().size() != 0)
+ return rowDiff;
+ else
+ return null;
+ }
+
+ public Row cloneMe()
+ {
+ Row row = new Row(key_);
+ row.columnFamilies_ = new HashMap<String, ColumnFamily>(columnFamilies_);
+ return row;
+ }
+
+ public byte[] digest()
+ {
+ long start = System.currentTimeMillis();
+ Set<String> cfamilies = columnFamilies_.keySet();
+ byte[] xorHash = new byte[0];
+ byte[] tmpHash = new byte[0];
+ for(String cFamily : cfamilies)
+ {
+ if(xorHash.length == 0)
+ {
+ xorHash = columnFamilies_.get(cFamily).digest();
+ }
+ else
+ {
+ tmpHash = columnFamilies_.get(cFamily).digest();
+ xorHash = FBUtilities.xor(xorHash, tmpHash);
+ }
+ }
+ logger_.info("DIGEST TIME: " + (System.currentTimeMillis() - start)
+ + " ms.");
+ return xorHash;
+ }
+
+ void clear()
+ {
+ columnFamilies_.clear();
+ }
+}
+
+class RowSerializer implements ICompactSerializer<Row>
+{
+ public void serialize(Row row, DataOutputStream dos) throws IOException
+ {
+ dos.writeUTF(row.key());
+ Map<String, ColumnFamily> columnFamilies = row.getColumnFamilies();
+ int size = columnFamilies.size();
+ dos.writeInt(size);
+
+ if ( size > 0 )
+ {
+ Set<String> cNames = columnFamilies.keySet();
+ for ( String cName : cNames )
+ {
+ ColumnFamily.serializer().serialize(columnFamilies.get(cName), dos);
+ }
+ }
+ }
+
+ public Row deserialize(DataInputStream dis) throws IOException
+ {
+ String key = dis.readUTF();
+ Row row = new Row(key);
+ int size = dis.readInt();
+
+ if ( size > 0 )
+ {
+ for ( int i = 0; i < size; ++i )
+ {
+ ColumnFamily cf = ColumnFamily.serializer().deserialize(dis);
+ row.addColumnFamily(cf);
+ }
+ }
+ return row;
+ }
+}