You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/03/09 20:23:55 UTC
svn commit: r1079952 - in /cassandra/trunk: ./ conf/ contrib/
contrib/pig/src/java/org/apache/cassandra/hadoop/pig/
interface/thrift/gen-java/org/apache/cassandra/thrift/
src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/
src/java/...
Author: jbellis
Date: Wed Mar 9 19:23:53 2011
New Revision: 1079952
URL: http://svn.apache.org/viewvc?rev=1079952&view=rev
Log:
merge from 0.7
Removed:
cassandra/trunk/src/java/org/apache/cassandra/io/util/PageCacheInformer.java
cassandra/trunk/src/java/org/apache/cassandra/utils/PageCacheMetrics.java
Modified:
cassandra/trunk/ (props changed)
cassandra/trunk/CHANGES.txt
cassandra/trunk/NEWS.txt
cassandra/trunk/conf/cassandra.yaml
cassandra/trunk/contrib/ (props changed)
cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java (props changed)
cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
cassandra/trunk/src/java/org/apache/cassandra/db/Column.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java
cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
cassandra/trunk/src/java/org/apache/cassandra/io/AbstractCompactedRow.java
cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java
cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/CacheWriter.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
cassandra/trunk/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
cassandra/trunk/src/java/org/apache/cassandra/utils/CLibrary.java
cassandra/trunk/test/unit/org/apache/cassandra/db/ScrubTest.java
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Mar 9 19:23:53 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891
-/cassandra/branches/cassandra-0.7:1026516-1079540
+/cassandra/branches/cassandra-0.7:1026516-1079936
/cassandra/branches/cassandra-0.7.0:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3:774578-796573
Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1079952&r1=1079951&r2=1079952&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Wed Mar 9 19:23:53 2011
@@ -14,17 +14,19 @@
0.7.4
* add nodetool join command (CASSANDRA-2160)
* fix secondary indexes on pre-existing or streamed data (CASSANDRA-2244)
- * initialize endpoing in gossiper earlier (CASSANDRA-2228)
+ * initialize endpoint in gossiper earlier (CASSANDRA-2228)
* add ability to write to Cassandra from Pig (CASSANDRA-1828)
* add rpc_[min|max]_threads (CASSANDRA-2176)
* add CL.TWO, CL.THREE (CASSANDRA-2013)
* avoid exporting an un-requested row in sstable2json, when exporting
a key that does not exist (CASSANDRA-2168)
- * track and migrate cached pages during compaction (CASSANDRA-1902)
* add incremental_backups option (CASSANDRA-1872)
* add configurable row limit to Pig loadfunc (CASSANDRA-2276)
* validate column values in batches as well as single-Column inserts
(CASSANDRA-2259)
+ * avoid writing empty rows when scrubbing tombstoned rows (CASSANDRA-2296)
+ * fix assertion error in range and index scans for CL < ALL
+ (CASSANDRA-2282)
0.7.3
Modified: cassandra/trunk/NEWS.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/NEWS.txt?rev=1079952&r1=1079951&r2=1079952&view=diff
==============================================================================
--- cassandra/trunk/NEWS.txt (original)
+++ cassandra/trunk/NEWS.txt Wed Mar 9 19:23:53 2011
@@ -1,4 +1,3 @@
-<<<<<<< .working
Whatever
========
@@ -25,7 +24,19 @@ JMX
- By default, JMX now listens on port 7199.
-=======
+0.7.4
+=====
+
+Upgrading
+---------
+ - Nothing specific to 0.7.4, but see 0.7.3 Upgrading if upgrading
+ from earlier than 0.7.1.
+
+Features
+--------
+ - Output to Pig is now supported as well as input
+
+
0.7.3
=====
Modified: cassandra/trunk/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra.yaml?rev=1079952&r1=1079951&r2=1079952&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra.yaml (original)
+++ cassandra/trunk/conf/cassandra.yaml Wed Mar 9 19:23:53 2011
@@ -256,10 +256,6 @@ in_memory_compaction_limit_in_mb: 64
# key caches.
compaction_preheat_key_cache: true
-# When set to true, this setting lets cassandra inform the OS to pre-cache
-# popular data in newly compacted files. This requires Linux and JNA be installed.
-enable_page_cache_migration: false
-
# Time to wait for a reply from other nodes before failing the command
rpc_timeout_in_ms: 10000
Propchange: cassandra/trunk/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Mar 9 19:23:53 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/contrib:922689-1052356,1052358-1053452,1053454,1053456-1068009
-/cassandra/branches/cassandra-0.7/contrib:1026516-1079540
+/cassandra/branches/cassandra-0.7/contrib:1026516-1079936
/cassandra/branches/cassandra-0.7.0/contrib:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/contrib:774578-796573
Modified: cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?rev=1079952&r1=1079951&r2=1079952&view=diff
==============================================================================
--- cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java (original)
+++ cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java Wed Mar 9 19:23:53 2011
@@ -71,7 +71,7 @@ public class CassandraStorage extends Lo
}
/**
- * @param limit: number of rows to fetch at a time
+ * @param limit: number of columns to fetch in a slice
*/
public CassandraStorage(int limit)
{
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Mar 9 19:23:53 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1079540
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1079936
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Mar 9 19:23:53 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1079540
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1079936
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Mar 9 19:23:53 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1079540
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1079936
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Mar 9 19:23:53 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1079540
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1079936
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Mar 9 19:23:53 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1079540
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1079936
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/Config.java?rev=1079952&r1=1079951&r2=1079952&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/Config.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/Config.java Wed Mar 9 19:23:53 2011
@@ -114,10 +114,6 @@ public class Config
public int hinted_handoff_throttle_delay_in_ms = 0;
public boolean compaction_preheat_key_cache = true;
- // make this configurable as its being releases in a maintenance release
- // TODO: remove in 0.8
- public Boolean enable_page_cache_migration = false;
-
public boolean incremental_backups = false;
public static enum CommitLogSync {
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1079952&r1=1079951&r2=1079952&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Wed Mar 9 19:23:53 2011
@@ -1236,11 +1236,6 @@ public class DatabaseDescriptor
throw new ConfigurationException("memtable_flush_after_mins must be greater than 0.");
}
- public static boolean isPageCaheMigrationEnabled()
- {
- return conf.enable_page_cache_migration;
- }
-
public static boolean incrementalBackupsEnabled()
{
return conf.incremental_backups;
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Column.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java?rev=1079952&r1=1079951&r2=1079952&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Column.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Column.java Wed Mar 9 19:23:53 2011
@@ -46,7 +46,6 @@ public class Column implements IColumn
return new ColumnSerializer();
}
- protected boolean isInPageCache;
protected final ByteBuffer name;
protected final ByteBuffer value;
protected final long timestamp;
@@ -69,7 +68,6 @@ public class Column implements IColumn
this.name = name;
this.value = value;
this.timestamp = timestamp;
- isInPageCache = false;
}
public ByteBuffer name()
@@ -239,15 +237,5 @@ public class Column implements IColumn
{
return !isMarkedForDelete();
}
-
- public boolean isInPageCache()
- {
- return isInPageCache;
- }
-
- public void setIsInPageCache(boolean isInPageCache)
- {
- this.isInPageCache = isInPageCache;
- }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java?rev=1079952&r1=1079951&r2=1079952&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java Wed Mar 9 19:23:53 2011
@@ -224,13 +224,6 @@ public class ColumnFamily implements ICo
IColumn oldColumn;
while ((oldColumn = columns.putIfAbsent(name, column)) != null)
{
- // migrate any page cache info (prefer cached)
- if (oldColumn.isInPageCache() || column.isInPageCache())
- {
- oldColumn.setIsInPageCache(true);
- column.setIsInPageCache(true);
- }
-
if (oldColumn instanceof SuperColumn)
{
((SuperColumn) oldColumn).putColumn(column);
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java?rev=1079952&r1=1079951&r2=1079952&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java Wed Mar 9 19:23:53 2011
@@ -21,7 +21,9 @@ package org.apache.cassandra.db;
*/
-import java.io.*;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.util.Collection;
import org.slf4j.Logger;
@@ -29,12 +31,6 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.io.ICompactSerializer2;
-import org.apache.cassandra.io.util.PageCacheInformer;
-import org.apache.cassandra.utils.PageCacheMetrics;
-
-import org.apache.cassandra.io.util.PageCacheInformer;
-import org.apache.cassandra.utils.PageCacheMetrics;
-
public class ColumnFamilySerializer implements ICompactSerializer2<ColumnFamily>
{
@@ -74,16 +70,11 @@ public class ColumnFamilySerializer impl
{
throw new RuntimeException(e);
}
-
serializeForSSTable(columnFamily, dos);
}
public int serializeForSSTable(ColumnFamily columnFamily, DataOutput dos)
{
- PageCacheInformer pci = dos instanceof PageCacheInformer
- ? (PageCacheInformer) dos
- : null;
-
try
{
serializeCFInfo(columnFamily, dos);
@@ -93,16 +84,8 @@ public class ColumnFamilySerializer impl
dos.writeInt(count);
for (IColumn column : columns)
{
- long startAt = pci != null ? pci.getCurrentPosition() : -1;
-
columnFamily.getColumnSerializer().serialize(column, dos);
-
- //Track the section of serialized data that should
- //be included in the page cache (compaction)
- if (column.isInPageCache() && pci != null)
- pci.keepCacheWindow(startAt);
}
-
return count;
}
catch (IOException e)
@@ -139,50 +122,19 @@ public class ColumnFamilySerializer impl
throw new UnserializableColumnFamilyException("Couldn't find cfId=" + cfId, cfId);
ColumnFamily cf = ColumnFamily.create(cfId);
deserializeFromSSTableNoColumns(cf, dis);
- deserializeColumns(dis, cf, intern, null);
-
+ deserializeColumns(dis, cf, intern);
return cf;
}
-
- public boolean deserializeColumns(DataInput dis, ColumnFamily cf, boolean intern, PageCacheMetrics pageCacheMetrics) throws IOException
+ public void deserializeColumns(DataInput dis, ColumnFamily cf, boolean intern) throws IOException
{
-
int size = dis.readInt();
-
ColumnFamilyStore interner = intern ? Table.open(CFMetaData.getCF(cf.id()).left).getColumnFamilyStore(cf.id()) : null;
- boolean hasColumnsInPageCache = false;
-
- if (pageCacheMetrics != null && dis instanceof RandomAccessFile)
- {
- RandomAccessFile raf = (RandomAccessFile) dis;
-
- for (int i = 0; i < size; ++i)
- {
- long startAt = raf.getFilePointer();
-
- IColumn column = cf.getColumnSerializer().deserialize(dis, interner);
-
- long endAt = raf.getFilePointer();
-
- column.setIsInPageCache(pageCacheMetrics.isRangeInCache(startAt, endAt));
-
- if(!hasColumnsInPageCache)
- hasColumnsInPageCache = column.isInPageCache();
-
- cf.addColumn(column);
- }
+ for (int i = 0; i < size; ++i)
+ {
+ IColumn column = cf.getColumnSerializer().deserialize(dis, interner);
+ cf.addColumn(column);
}
- else
- {
- for (int i = 0; i < size; ++i)
- {
- IColumn column = cf.getColumnSerializer().deserialize(dis, interner);
- cf.addColumn(column);
- }
- }
-
- return hasColumnsInPageCache;
}
public ColumnFamily deserializeFromSSTableNoColumns(ColumnFamily cf, DataInput input) throws IOException
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1079952&r1=1079951&r2=1079952&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed Mar 9 19:23:53 2011
@@ -2240,11 +2240,11 @@ public class ColumnFamilyStore implement
public SSTableWriter createFlushWriter(long estimatedRows) throws IOException
{
- return new SSTableWriter(getFlushPath(), estimatedRows, metadata, partitioner, false);
+ return new SSTableWriter(getFlushPath(), estimatedRows, metadata, partitioner);
}
- public SSTableWriter createCompactionWriter(long estimatedRows, String location, boolean migratePageCache) throws IOException
+ public SSTableWriter createCompactionWriter(long estimatedRows, String location) throws IOException
{
- return new SSTableWriter(getTempSSTablePath(location), estimatedRows, metadata, partitioner, migratePageCache);
+ return new SSTableWriter(getTempSSTablePath(location), estimatedRows, metadata, partitioner);
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1079952&r1=1079951&r2=1079952&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Wed Mar 9 19:23:53 2011
@@ -46,7 +46,6 @@ import org.apache.cassandra.io.*;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.io.util.BufferedRandomAccessFile;
import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.io.util.PageCacheInformer;
import org.apache.cassandra.service.AntiEntropyService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.OperationType;
@@ -391,7 +390,6 @@ public class CompactionManager implement
assert sstables != null;
Table table = cfs.table;
-
if (DatabaseDescriptor.isSnapshotBeforeCompaction())
table.snapshot("compact-" + cfs.columnFamily);
@@ -401,11 +399,9 @@ public class CompactionManager implement
assert sstable.descriptor.cfname.equals(cfs.columnFamily);
String compactionFileLocation = table.getDataFileLocation(cfs.getExpectedCompactedFileSize(sstables));
-
// If the compaction file path is null that means we have no space left for this compaction.
// try again w/o the largest one.
List<SSTableReader> smallerSSTables = new ArrayList<SSTableReader>(sstables);
-
while (compactionFileLocation == null && smallerSSTables.size() > 1)
{
logger.warn("insufficient space to compact all requested files " + StringUtils.join(smallerSSTables, ", "));
@@ -417,7 +413,6 @@ public class CompactionManager implement
logger.error("insufficient space to compact even the two smallest files, aborting");
return 0;
}
-
sstables = smallerSSTables;
// new sstables from flush can be added during a compaction, but only the compaction can remove them,
@@ -429,9 +424,9 @@ public class CompactionManager implement
long totalkeysWritten = 0;
// TODO the int cast here is potentially buggy
- int expectedBloomFilterSize = Math.max(DatabaseDescriptor.getIndexInterval(), (int) SSTableReader.getApproximateKeyCount(sstables));
+ int expectedBloomFilterSize = Math.max(DatabaseDescriptor.getIndexInterval(), (int)SSTableReader.getApproximateKeyCount(sstables));
if (logger.isDebugEnabled())
- logger.debug("Expected bloom filter size : " + expectedBloomFilterSize);
+ logger.debug("Expected bloom filter size : " + expectedBloomFilterSize);
SSTableWriter writer;
CompactionIterator ci = new CompactionIterator(cfs, sstables, gcBefore, major); // retain a handle so we can call close()
@@ -451,13 +446,11 @@ public class CompactionManager implement
return 0;
}
- writer = cfs.createCompactionWriter(expectedBloomFilterSize, compactionFileLocation, ci.hasRowsInPageCache());
+ writer = cfs.createCompactionWriter(expectedBloomFilterSize, compactionFileLocation);
while (nni.hasNext())
{
AbstractCompactedRow row = nni.next();
-
long position = writer.append(row);
-
totalkeysWritten++;
if (DatabaseDescriptor.getPreheatKeyCache())
@@ -487,7 +480,7 @@ public class CompactionManager implement
long dTime = System.currentTimeMillis() - startTime;
long startsize = SSTable.getTotalBytes(sstables);
long endsize = ssTable.length();
- double ratio = (double) endsize / (double) startsize;
+ double ratio = (double)endsize / (double)startsize;
logger.info(String.format("Compacted to %s. %,d to %,d (~%d%% of original) bytes for %,d keys. Time: %,dms.",
writer.getFilename(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, dTime));
return sstables.size();
@@ -540,9 +533,9 @@ public class CompactionManager implement
assert firstRowPositionFromIndex == 0 : firstRowPositionFromIndex;
}
- SSTableWriter writer = maybeCreateWriter(cfs, compactionFileLocation, expectedBloomFilterSize, null, false);
+ SSTableWriter writer = maybeCreateWriter(cfs, compactionFileLocation, expectedBloomFilterSize, null);
executor.beginCompaction(cfs.columnFamily, new ScrubInfo(dataFile, sstable));
- int goodRows = 0, badRows = 0;
+ int goodRows = 0, badRows = 0, emptyRows = 0;
while (!dataFile.isEOF())
{
@@ -595,9 +588,17 @@ public class CompactionManager implement
throw new IOError(new IOException("Unable to read row key from data file"));
if (dataSize > dataFile.length())
throw new IOError(new IOException("Impossible row size " + dataSize));
- SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataStart, dataSize, null, true);
- writer.append(getCompactedRow(row, cfs, sstable.descriptor, true));
- goodRows++;
+ SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataStart, dataSize, true);
+ AbstractCompactedRow compactedRow = getCompactedRow(row, cfs, sstable.descriptor, true);
+ if (compactedRow.isEmpty())
+ {
+ emptyRows++;
+ }
+ else
+ {
+ writer.append(compactedRow);
+ goodRows++;
+ }
if (!key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex)
logger.warn("Row scrubbed successfully but index file contains a different key or row size; consider rebuilding the index as described in http://www.mail-archive.com/user@cassandra.apache.org/msg03325.html");
}
@@ -615,9 +616,17 @@ public class CompactionManager implement
key = SSTableReader.decodeKey(sstable.partitioner, sstable.descriptor, currentIndexKey);
try
{
- SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataStartFromIndex, dataSizeFromIndex, null, true);
- writer.append(getCompactedRow(row, cfs, sstable.descriptor, true));
- goodRows++;
+ SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataStartFromIndex, dataSizeFromIndex, true);
+ AbstractCompactedRow compactedRow = getCompactedRow(row, cfs, sstable.descriptor, true);
+ if (compactedRow.isEmpty())
+ {
+ emptyRows++;
+ }
+ else
+ {
+ writer.append(compactedRow);
+ goodRows++;
+ }
}
catch (Throwable th2)
{
@@ -630,7 +639,7 @@ public class CompactionManager implement
}
else
{
- logger.warn("Row is unreadable; skipping to next");
+ logger.warn("Row at " + dataStart + " is unreadable; skipping to next");
if (currentIndexKey != null)
dataFile.seek(nextRowPositionFromIndex);
badRows++;
@@ -642,14 +651,17 @@ public class CompactionManager implement
{
SSTableReader newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
cfs.replaceCompactedSSTables(Arrays.asList(sstable), Arrays.asList(newSstable));
- logger.info("Scrub of " + sstable + " complete: " + goodRows + " rows in new sstable");
+ logger.info("Scrub of " + sstable + " complete: " + goodRows + " rows in new sstable and " + emptyRows + " empty (tombstoned) rows dropped");
if (badRows > 0)
logger.warn("Unable to recover " + badRows + " rows that were skipped. You can attempt manual recovery from the pre-scrub snapshot. You can also run nodetool repair to transfer the data from a healthy replica, if any");
}
else
{
cfs.markCompacted(Arrays.asList(sstable));
- logger.warn("No valid rows found while scrubbing " + sstable + "; it is marked for deletion now. If you want to attempt manual recovery, you can find a copy in the pre-scrub snapshot");
+ if (badRows > 0)
+ logger.warn("No valid rows found while scrubbing " + sstable + "; it is marked for deletion now. If you want to attempt manual recovery, you can find a copy in the pre-scrub snapshot");
+ else
+ logger.info("Scrub of " + sstable + " complete; looks like all " + emptyRows + " rows were tombstoned");
}
}
}
@@ -700,7 +712,7 @@ public class CompactionManager implement
SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
if (Range.isTokenInRanges(row.getKey().token, ranges))
{
- writer = maybeCreateWriter(cfs, compactionFileLocation, expectedBloomFilterSize, writer, row.hasRowsInPageCache());
+ writer = maybeCreateWriter(cfs, compactionFileLocation, expectedBloomFilterSize, writer);
writer.append(getCompactedRow(row, cfs, sstable.descriptor, false));
totalkeysWritten++;
}
@@ -769,13 +781,13 @@ public class CompactionManager implement
: new PrecompactedRow(cfs, Arrays.asList(row), false, getDefaultGcBefore(cfs), forceDeserialize);
}
- private SSTableWriter maybeCreateWriter(ColumnFamilyStore cfs, String compactionFileLocation, int expectedBloomFilterSize, SSTableWriter writer, boolean migrateCachedPages)
+ private SSTableWriter maybeCreateWriter(ColumnFamilyStore cfs, String compactionFileLocation, int expectedBloomFilterSize, SSTableWriter writer)
throws IOException
{
if (writer == null)
{
FileUtils.createDirectory(compactionFileLocation);
- writer = cfs.createCompactionWriter(expectedBloomFilterSize, compactionFileLocation, migrateCachedPages);
+ writer = cfs.createCompactionWriter(expectedBloomFilterSize, compactionFileLocation);
}
return writer;
}
@@ -1121,8 +1133,9 @@ public class CompactionManager implement
this.row = row;
}
- public void write(PageCacheInformer out) throws IOException
+ public void write(DataOutput out) throws IOException
{
+ assert row.dataSize > 0;
out.writeLong(row.dataSize);
row.echoData(out);
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java?rev=1079952&r1=1079951&r2=1079952&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java Wed Mar 9 19:23:53 2011
@@ -57,12 +57,4 @@ public interface IColumn
* supercolumn deleted-at time.
*/
boolean isLive();
-
- /**
- * Used to identify columns during compaction that are in the os page cache
- * so that they can be re-cached in new SSTables
- * @return
- */
- boolean isInPageCache();
- void setIsInPageCache(boolean isInPageCache);
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java?rev=1079952&r1=1079951&r2=1079952&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java Wed Mar 9 19:23:53 2011
@@ -48,7 +48,6 @@ public class SuperColumn implements ICol
private ConcurrentSkipListMap<ByteBuffer, IColumn> columns_;
private AtomicInteger localDeletionTime = new AtomicInteger(Integer.MIN_VALUE);
private AtomicLong markedForDeleteAt = new AtomicLong(Long.MIN_VALUE);
- private boolean isInPageCache = false;
public SuperColumn(ByteBuffer name, AbstractType comparator)
{
@@ -322,16 +321,6 @@ public class SuperColumn implements ICol
{
throw new UnsupportedOperationException("Super columns don't have a serialization mask");
}
-
- public boolean isInPageCache()
- {
- return isInPageCache;
- }
-
- public void setIsInPageCache(boolean isInPageCache)
- {
- this.isInPageCache = isInPageCache;
- }
}
class SuperColumnSerializer implements IColumnSerializer
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=1079952&r1=1079951&r2=1079952&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java Wed Mar 9 19:23:53 2011
@@ -174,7 +174,7 @@ public class CommitLog
for (File file : clogs)
{
int bufferSize = (int) Math.min(Math.max(file.length(), 1), 32 * 1024 * 1024);
- BufferedRandomAccessFile reader = new BufferedRandomAccessFile(new File(file.getAbsolutePath()), "r", bufferSize, true, false);
+ BufferedRandomAccessFile reader = new BufferedRandomAccessFile(new File(file.getAbsolutePath()), "r", bufferSize, true);
try
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java?rev=1079952&r1=1079951&r2=1079952&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java Wed Mar 9 19:23:53 2011
@@ -73,7 +73,7 @@ public class CommitLogSegment
private static BufferedRandomAccessFile createWriter(String file) throws IOException
{
- return new BufferedRandomAccessFile(new File(file), "rw", 128 * 1024, true, false);
+ return new BufferedRandomAccessFile(new File(file), "rw", 128 * 1024, true);
}
public CommitLogSegment.CommitLogContext write(RowMutation rowMutation) throws IOException
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/AbstractCompactedRow.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/AbstractCompactedRow.java?rev=1079952&r1=1079951&r2=1079952&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/AbstractCompactedRow.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/AbstractCompactedRow.java Wed Mar 9 19:23:53 2011
@@ -21,11 +21,11 @@ package org.apache.cassandra.io;
*/
+import java.io.DataOutput;
import java.io.IOException;
import java.security.MessageDigest;
import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.io.util.PageCacheInformer;
/**
* a CompactedRow is an object that takes a bunch of rows (keys + columnfamilies)
@@ -35,7 +35,6 @@ import org.apache.cassandra.io.util.Page
public abstract class AbstractCompactedRow
{
public final DecoratedKey key;
- protected boolean hasColumnsInPageCache = false;
public AbstractCompactedRow(DecoratedKey key)
{
@@ -45,7 +44,7 @@ public abstract class AbstractCompactedR
/**
* write the row (size + column index + filter + column data, but NOT row key) to @param out
*/
- public abstract void write(PageCacheInformer out) throws IOException;
+ public abstract void write(DataOutput out) throws IOException;
/**
* update @param digest with the data bytes of the row (not including row key or row size)
@@ -61,18 +60,4 @@ public abstract class AbstractCompactedR
* @return the number of columns in the row
*/
public abstract int columnCount();
-
- /**
- * @return if any columns in this row are in the OS Page Cache
- */
- public boolean hasColumnsInPageCache()
- {
- return hasColumnsInPageCache;
- }
-
- public void setHasColumnsInPageCache(boolean hasColumnsInPageCache)
- {
- this.hasColumnsInPageCache = hasColumnsInPageCache;
- }
-
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java?rev=1079952&r1=1079951&r2=1079952&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java Wed Mar 9 19:23:53 2011
@@ -48,7 +48,6 @@ implements Closeable, ICompactionInfo
public static final int FILE_BUFFER_SIZE = 1024 * 1024;
protected final List<SSTableIdentityIterator> rows = new ArrayList<SSTableIdentityIterator>();
-
private final ColumnFamilyStore cfs;
private final int gcBefore;
private final boolean major;
@@ -56,7 +55,6 @@ implements Closeable, ICompactionInfo
private long totalBytes;
private long bytesRead;
private long row;
- private boolean hasRowsInPageCache;
public CompactionIterator(ColumnFamilyStore cfs, Iterable<SSTableReader> sstables, int gcBefore, boolean major) throws IOException
{
@@ -99,9 +97,6 @@ implements Closeable, ICompactionInfo
public void reduce(SSTableIdentityIterator current)
{
rows.add(current);
-
- if(current.hasRowsInPageCache())
- hasRowsInPageCache = true;
}
protected AbstractCompactedRow getReduced()
@@ -130,7 +125,6 @@ implements Closeable, ICompactionInfo
protected AbstractCompactedRow getCompactedRow()
{
long rowSize = 0;
-
for (SSTableIdentityIterator row : rows)
{
rowSize += row.dataSize;
@@ -168,11 +162,6 @@ implements Closeable, ICompactionInfo
return bytesRead;
}
- public boolean hasRowsInPageCache()
- {
- return hasRowsInPageCache;
- }
-
public String getTaskType()
{
return major ? "Major" : "Minor";
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java?rev=1079952&r1=1079951&r2=1079952&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java Wed Mar 9 19:23:53 2011
@@ -38,7 +38,8 @@ import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
-import org.apache.cassandra.io.util.*;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.IIterableColumns;
import org.apache.cassandra.utils.ReducingIterator;
/**
@@ -93,11 +94,12 @@ public class LazilyCompactedRow extends
iter = null;
}
- public void write(PageCacheInformer out) throws IOException
+ public void write(DataOutput out) throws IOException
{
if (rows.size() == 1 && !shouldPurge && rows.get(0).sstable.descriptor.isLatestVersion && !forceDeserialize)
{
SSTableIdentityIterator row = rows.get(0);
+ assert row.dataSize > 0;
out.writeLong(row.dataSize);
row.echoData(out);
return;
@@ -106,7 +108,9 @@ public class LazilyCompactedRow extends
DataOutputBuffer clockOut = new DataOutputBuffer();
ColumnFamily.serializer().serializeCFInfo(emptyColumnFamily, clockOut);
- out.writeLong(headerBuffer.getLength() + clockOut.getLength() + columnSerializedSize);
+ long dataSize = headerBuffer.getLength() + clockOut.getLength() + columnSerializedSize;
+ assert dataSize > 0;
+ out.writeLong(dataSize);
out.write(headerBuffer.getData(), 0, headerBuffer.getLength());
out.write(clockOut.getData(), 0, clockOut.getLength());
out.writeInt(columnCount);
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java?rev=1079952&r1=1079951&r2=1079952&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java Wed Mar 9 19:23:53 2011
@@ -21,6 +21,7 @@ package org.apache.cassandra.io;
*/
+import java.io.DataOutput;
import java.io.IOError;
import java.io.IOException;
import java.security.MessageDigest;
@@ -38,8 +39,6 @@ import org.apache.cassandra.db.Decorated
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.io.util.PageCacheInformer;
-import org.apache.cassandra.utils.Pair;
/**
* PrecompactedRow merges its rows in its constructor in memory.
@@ -95,11 +94,7 @@ public class PrecompactedRow extends Abs
{
cf.addAll(thisCF);
}
-
- if (row.hasColumnsInPageCache())
- this.hasColumnsInPageCache = true;
}
-
ColumnFamily cfPurged = shouldPurge ? ColumnFamilyStore.removeDeleted(cf, gcBefore) : cf;
if (cfPurged == null)
return;
@@ -122,47 +117,13 @@ public class PrecompactedRow extends Abs
}
}
- public void write(PageCacheInformer out) throws IOException
+ public void write(DataOutput out) throws IOException
{
- out.writeLong(headerBuffer.getLength() + buffer.getLength());
+ long dataSize = headerBuffer.getLength() + buffer.getLength();
+ assert dataSize > 0;
+ out.writeLong(dataSize);
out.write(headerBuffer.getData(), 0, headerBuffer.getLength());
-
- List<Pair<Integer, Integer>> pageCacheMarkers = buffer.getPageCacheMarkers();
-
- if(pageCacheMarkers == null)
- {
- out.write(buffer.getData(), 0, buffer.getLength());
- return;
- }
-
- // Step through each page cache window and inform the
- // output writer to respect these...
- long startingPosition = out.getCurrentPosition();
- int bufferOffset = 0;
- for (Pair<Integer,Integer> window : pageCacheMarkers)
- {
- // write out any data before the window
- if (window.left > (out.getCurrentPosition() - startingPosition))
- {
- out.write(buffer.getData(), bufferOffset, window.left - bufferOffset);
- bufferOffset = window.left;
- }
-
- long startingAt = out.getCurrentPosition();
-
- assert (bufferOffset + window.right) <= buffer.getLength() : ""+(bufferOffset + window.right)+" > "+buffer.getLength();
-
- out.write(buffer.getData(), bufferOffset, window.right);
- out.keepCacheWindow(startingAt);
-
- bufferOffset += window.right;
- }
-
- // Write everything else
- if (bufferOffset < buffer.getLength())
- {
- out.write(buffer.getData(), bufferOffset, buffer.getLength() - bufferOffset);
- }
+ out.write(buffer.getData(), 0, buffer.getLength());
}
public void update(MessageDigest digest)
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/CacheWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/CacheWriter.java?rev=1079952&r1=1079951&r2=1079952&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/CacheWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/CacheWriter.java Wed Mar 9 19:23:53 2011
@@ -75,7 +75,7 @@ public class CacheWriter<K, V> implement
logger.debug("Saving {}", path);
File tmpFile = File.createTempFile(path.getName(), null, path.getParentFile());
- BufferedRandomAccessFile out = new BufferedRandomAccessFile(tmpFile, "rw", BufferedRandomAccessFile.DEFAULT_BUFFER_SIZE, true, false);
+ BufferedRandomAccessFile out = new BufferedRandomAccessFile(tmpFile, "rw", BufferedRandomAccessFile.DEFAULT_BUFFER_SIZE, true);
try
{
for (K key : keys)
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java?rev=1079952&r1=1079951&r2=1079952&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java Wed Mar 9 19:23:53 2011
@@ -110,7 +110,7 @@ public class IndexHelper
{
int size = file.readInt();
if (size > maxSize || size <= 0)
- throw new EOFException("bloom filter claims to be longer than entire row size");
+ throw new EOFException("bloom filter claims to be " + size + " bytes, longer than entire row size " + maxSize);
ByteBuffer bytes = file.readBytes(size);
DataInputStream stream = new DataInputStream(ByteBufferUtil.inputStream(bytes));
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/KeyIterator.java?rev=1079952&r1=1079951&r2=1079952&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/KeyIterator.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/KeyIterator.java Wed Mar 9 19:23:53 2011
@@ -47,8 +47,7 @@ public class KeyIterator extends Abstrac
in = new BufferedRandomAccessFile(new File(desc.filenameFor(SSTable.COMPONENT_INDEX)),
"r",
BufferedRandomAccessFile.DEFAULT_BUFFER_SIZE,
- true,
- false);
+ true);
}
catch (IOException e)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java?rev=1079952&r1=1079951&r2=1079952&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java Wed Mar 9 19:23:53 2011
@@ -174,7 +174,7 @@ public abstract class SSTable
}
catch (Exception e)
{
- if (!"snapshots".equals(name))
+ if (!"snapshots".equals(name) && !"backups".equals(name))
logger.warn("Invalid file '{}' in data directory {}.", name, dir);
return null;
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java?rev=1079952&r1=1079951&r2=1079952&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java Wed Mar 9 19:23:53 2011
@@ -35,8 +35,7 @@ import org.apache.cassandra.db.Decorated
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.columniterator.IColumnIterator;
import org.apache.cassandra.io.util.BufferedRandomAccessFile;
-import org.apache.cassandra.io.util.PageCacheInformer;
-import org.apache.cassandra.utils.PageCacheMetrics;
+import org.apache.cassandra.utils.Filter;
public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterator>, IColumnIterator
{
@@ -48,9 +47,6 @@ public class SSTableIdentityIterator imp
public final SSTableReader sstable;
private final long dataStart;
public final long dataSize;
- public final PageCacheMetrics pageCacheMetrics;
- private boolean hasRowsInPageCache;
- private boolean hasColumnsInPageCache;
private final ColumnFamily columnFamily;
public final int columnCount;
@@ -65,13 +61,13 @@ public class SSTableIdentityIterator imp
* @param dataSize length of row data
* @throws IOException
*/
- public SSTableIdentityIterator(SSTableReader sstable, BufferedRandomAccessFile file, DecoratedKey key, long dataStart, long dataSize, PageCacheMetrics pageCacheMetrics)
+ public SSTableIdentityIterator(SSTableReader sstable, BufferedRandomAccessFile file, DecoratedKey key, long dataStart, long dataSize)
throws IOException
{
- this(sstable, file, key, dataStart, dataSize, pageCacheMetrics, false);
+ this(sstable, file, key, dataStart, dataSize, false);
}
- public SSTableIdentityIterator(SSTableReader sstable, BufferedRandomAccessFile file, DecoratedKey key, long dataStart, long dataSize, PageCacheMetrics pageCacheMetrics, boolean deserializeRowHeader)
+ public SSTableIdentityIterator(SSTableReader sstable, BufferedRandomAccessFile file, DecoratedKey key, long dataStart, long dataSize, boolean deserializeRowHeader)
throws IOException
{
this.sstable = sstable;
@@ -80,10 +76,6 @@ public class SSTableIdentityIterator imp
this.dataStart = dataStart;
this.dataSize = dataSize;
finishedAt = dataStart + dataSize;
- this.pageCacheMetrics = pageCacheMetrics;
-
- //Mark if any rows are in the pageCache
- hasRowsInPageCache = (pageCacheMetrics != null) && pageCacheMetrics.isRangeInCache(dataStart, finishedAt);
try
{
@@ -145,19 +137,7 @@ public class SSTableIdentityIterator imp
{
try
{
- long columnStartAt = file.getFilePointer();
-
- IColumn col = sstable.getColumnSerializer().deserialize(file);
-
- long columnEndAt = file.getFilePointer();
-
- if (pageCacheMetrics != null)
- {
- col.setIsInPageCache(pageCacheMetrics.isRangeInCache(columnStartAt, columnEndAt));
- }
-
- return col;
-
+ return sstable.getColumnSerializer().deserialize(file);
}
catch (IOException e)
{
@@ -180,56 +160,12 @@ public class SSTableIdentityIterator imp
return file.getPath();
}
- public void echoData(PageCacheInformer out) throws IOException
+ public void echoData(DataOutput out) throws IOException
{
file.seek(dataStart);
-
- if (pageCacheMetrics == null)
+ while (file.getFilePointer() < finishedAt)
{
- while (file.getFilePointer() < finishedAt)
- {
- out.write(file.readByte());
- }
- }
- else
- {
- // Since this is just a big opaque block of data we
- // Split into chunks >= pageSize
- int chunkSize = (int) (finishedAt - dataStart) / 128;
-
- chunkSize = chunkSize >= pageCacheMetrics.pageSize ? chunkSize : pageCacheMetrics.pageSize;
-
- long chunkStart = 0;
- long chunkEnd = 0;
- boolean isChunkInPageCache = false;
-
- while (file.getFilePointer() < finishedAt)
- {
-
- // Mark chunks that have cached pages
- // So we can migrate them
- if (file.getFilePointer() >= chunkEnd)
- {
- if (isChunkInPageCache)
- {
- out.keepCacheWindow(out.getCurrentPosition() - chunkSize);
- }
-
- chunkStart = file.getFilePointer();
- chunkEnd = chunkStart + chunkSize;
-
- if(chunkEnd > finishedAt)
- chunkEnd = finishedAt;
-
- isChunkInPageCache = pageCacheMetrics.isRangeInCache(chunkStart, chunkEnd);
- }
-
-
- out.write(file.readByte());
- }
-
- if (isChunkInPageCache)
- out.keepCacheWindow(out.getCurrentPosition() - (file.getFilePointer() - chunkStart));
+ out.write(file.readByte());
}
}
@@ -237,9 +173,7 @@ public class SSTableIdentityIterator imp
{
file.seek(columnPosition - 4); // seek to before column count int
ColumnFamily cf = columnFamily.cloneMeShallow();
-
- hasColumnsInPageCache = ColumnFamily.serializer().deserializeColumns(file, cf, false,pageCacheMetrics);
-
+ ColumnFamily.serializer().deserializeColumns(file, cf, false);
return cf;
}
@@ -259,15 +193,4 @@ public class SSTableIdentityIterator imp
throw new IOError(e);
}
}
-
- public boolean hasRowsInPageCache()
- {
- return hasRowsInPageCache;
- }
-
- public boolean hasColumnsInPageCache()
- {
- return hasColumnsInPageCache;
- }
-
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1079952&r1=1079951&r2=1079952&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Wed Mar 9 19:23:53 2011
@@ -276,9 +276,7 @@ public class SSTableReader extends SSTab
BufferedRandomAccessFile input = new BufferedRandomAccessFile(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)),
"r",
BufferedRandomAccessFile.DEFAULT_BUFFER_SIZE,
- true,
- false);
-
+ true);
try
{
if (keyCache != null && keyCache.getCapacity() - keyCache.getSize() < keysToLoadInCache.size())
@@ -297,11 +295,9 @@ public class SSTableReader extends SSTab
break;
boolean shouldAddEntry = indexSummary.shouldAddEntry();
-
ByteBuffer key = (shouldAddEntry || cacheLoading || recreatebloom)
- ? ByteBufferUtil.readWithShortLength(input)
- : ByteBufferUtil.skipShortLength(input);
-
+ ? ByteBufferUtil.readWithShortLength(input)
+ : ByteBufferUtil.skipShortLength(input);
long dataPosition = input.readLong();
if (key != null)
{
@@ -553,7 +549,7 @@ public class SSTableReader extends SSTab
}
/**
- * SSTableScanner that avoids polluting the page cache
+ * Direct I/O SSTableScanner
* @param bufferSize Buffer size in bytes for this Scanner.
* @return A Scanner for seeking over the rows of the SSTable.
*/
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java?rev=1079952&r1=1079951&r2=1079952&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java Wed Mar 9 19:23:53 2011
@@ -19,7 +19,10 @@
package org.apache.cassandra.io.sstable;
-import java.io.*;
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOError;
+import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
@@ -31,8 +34,6 @@ import org.apache.cassandra.db.columnite
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.io.util.BufferedRandomAccessFile;
import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.CLibrary;
-import org.apache.cassandra.utils.PageCacheMetrics;
public class SSTableScanner implements Iterator<IColumnIterator>, Closeable
@@ -41,9 +42,6 @@ public class SSTableScanner implements I
private final BufferedRandomAccessFile file;
private final SSTableReader sstable;
-
- private final PageCacheMetrics pageCacheMetrics;
-
private IColumnIterator row;
private boolean exhausted = false;
private Iterator<IColumnIterator> iterator;
@@ -54,25 +52,15 @@ public class SSTableScanner implements I
*/
SSTableScanner(SSTableReader sstable, int bufferSize, boolean skipCache)
{
- PageCacheMetrics pageCacheMetrics = null;
-
try
{
- File sstableFile = new File(sstable.getFilename());
- file = new BufferedRandomAccessFile(sstableFile, "r", bufferSize, skipCache, false);
-
- if (skipCache)
- {
- pageCacheMetrics = CLibrary.getCachedPages(file);
- }
+ this.file = new BufferedRandomAccessFile(new File(sstable.getFilename()), "r", bufferSize, skipCache);
}
catch (IOException e)
{
throw new IOError(e);
}
-
this.sstable = sstable;
- this.pageCacheMetrics = pageCacheMetrics;
}
/**
@@ -91,7 +79,6 @@ public class SSTableScanner implements I
}
this.sstable = sstable;
this.filter = filter;
- this.pageCacheMetrics = null;
}
public void close() throws IOException
@@ -189,7 +176,8 @@ public class SSTableScanner implements I
if (filter == null)
{
- return row = new SSTableIdentityIterator(sstable, file, key, dataStart, dataSize, pageCacheMetrics);
+ row = new SSTableIdentityIterator(sstable, file, key, dataStart, dataSize);
+ return row;
}
else
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1079952&r1=1079951&r2=1079952&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Wed Mar 9 19:23:53 2011
@@ -58,16 +58,16 @@ public class SSTableWriter extends SSTab
private IndexWriter iwriter;
private SegmentedFile.Builder dbuilder;
- public final BufferedRandomAccessFile dataFile;
+ private final BufferedRandomAccessFile dataFile;
private DecoratedKey lastWrittenKey;
private FileMark dataMark;
public SSTableWriter(String filename, long keyCount) throws IOException
{
- this(filename, keyCount, DatabaseDescriptor.getCFMetaData(Descriptor.fromFilename(filename)), StorageService.getPartitioner(), false);
+ this(filename, keyCount, DatabaseDescriptor.getCFMetaData(Descriptor.fromFilename(filename)), StorageService.getPartitioner());
}
- public SSTableWriter(String filename, long keyCount, CFMetaData metadata, IPartitioner partitioner, boolean migratePageCache) throws IOException
+ public SSTableWriter(String filename, long keyCount, CFMetaData metadata, IPartitioner partitioner) throws IOException
{
super(Descriptor.fromFilename(filename),
new HashSet<Component>(Arrays.asList(Component.DATA, Component.FILTER, Component.PRIMARY_INDEX, Component.STATS)),
@@ -75,11 +75,9 @@ public class SSTableWriter extends SSTab
partitioner,
SSTable.defaultRowHistogram(),
SSTable.defaultColumnHistogram());
-
- iwriter = new IndexWriter(descriptor, partitioner, keyCount, !migratePageCache); //when we migrate pages we cache the index
+ iwriter = new IndexWriter(descriptor, partitioner, keyCount);
dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
-
- dataFile = new BufferedRandomAccessFile(new File(getFilename()), "rw", DatabaseDescriptor.getInMemoryCompactionLimit(), true, migratePageCache);
+ dataFile = new BufferedRandomAccessFile(new File(getFilename()), "rw", DatabaseDescriptor.getInMemoryCompactionLimit(), true);
}
public void mark()
@@ -150,7 +148,9 @@ public class SSTableWriter extends SSTab
// seek back and write the row size (not including the size Long itself)
long endPosition = dataFile.getFilePointer();
dataFile.seek(sizePosition);
- dataFile.writeLong(endPosition - (sizePosition + 8));
+ long dataSize = endPosition - (sizePosition + 8);
+ assert dataSize > 0;
+ dataFile.writeLong(dataSize);
// finally, reset for next row
dataFile.seek(endPosition);
afterAppend(decoratedKey, startPosition);
@@ -317,7 +317,7 @@ public class SSTableWriter extends SSTab
RowIndexer(Descriptor desc, CFMetaData metadata) throws IOException
{
- this(desc, new BufferedRandomAccessFile(new File(desc.filenameFor(SSTable.COMPONENT_DATA)), "r", 8 * 1024 * 1024, true, false), metadata);
+ this(desc, new BufferedRandomAccessFile(new File(desc.filenameFor(SSTable.COMPONENT_DATA)), "r", 8 * 1024 * 1024, true), metadata);
}
protected RowIndexer(Descriptor desc, BufferedRandomAccessFile dfile, CFMetaData metadata) throws IOException
@@ -333,7 +333,7 @@ public class SSTableWriter extends SSTab
try
{
estimatedRows = SSTable.estimateRowsFromData(desc, dfile);
- iwriter = new IndexWriter(desc, StorageService.getPartitioner(), estimatedRows, true);
+ iwriter = new IndexWriter(desc, StorageService.getPartitioner(), estimatedRows);
return estimatedRows;
}
catch(IOException e)
@@ -399,7 +399,7 @@ public class SSTableWriter extends SSTab
{
AESCommutativeRowIndexer(Descriptor desc, CFMetaData metadata) throws IOException
{
- super(desc, new BufferedRandomAccessFile(new File(desc.filenameFor(SSTable.COMPONENT_DATA)), "rw", 8 * 1024 * 1024, true, false), metadata);
+ super(desc, new BufferedRandomAccessFile(new File(desc.filenameFor(SSTable.COMPONENT_DATA)), "rw", 8 * 1024 * 1024, true), metadata);
}
@Override
@@ -427,7 +427,7 @@ public class SSTableWriter extends SSTab
// deserialize CF
ColumnFamily cf = ColumnFamily.create(desc.ksname, desc.cfname);
ColumnFamily.serializer().deserializeFromSSTableNoColumns(cf, dfile);
- ColumnFamily.serializer().deserializeColumns(dfile, cf, false, null);
+ ColumnFamily.serializer().deserializeColumns(dfile, cf, false);
rowSizes.add(dataSize);
columnCounts.add(cf.getEstimatedColumnCount());
@@ -486,11 +486,11 @@ public class SSTableWriter extends SSTab
public final BloomFilter bf;
private FileMark mark;
- IndexWriter(Descriptor desc, IPartitioner part, long keyCount, boolean skipCache) throws IOException
+ IndexWriter(Descriptor desc, IPartitioner part, long keyCount) throws IOException
{
this.desc = desc;
this.partitioner = part;
- indexFile = new BufferedRandomAccessFile(new File(desc.filenameFor(SSTable.COMPONENT_INDEX)), "rw", 8 * 1024 * 1024, skipCache, false);
+ indexFile = new BufferedRandomAccessFile(new File(desc.filenameFor(SSTable.COMPONENT_INDEX)), "rw", 8 * 1024 * 1024, true);
builder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
summary = new IndexSummary(keyCount);
bf = BloomFilter.getFilter(keyCount, 15);
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java?rev=1079952&r1=1079951&r2=1079952&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java Wed Mar 9 19:23:53 2011
@@ -26,7 +26,6 @@ import java.nio.channels.ClosedChannelEx
import java.nio.channels.FileChannel;
import org.apache.cassandra.utils.CLibrary;
-import org.apache.log4j.Logger;
/**
* A <code>BufferedRandomAccessFile</code> is like a
@@ -39,9 +38,9 @@ import org.apache.log4j.Logger;
* overridden here relies on the implementation of those methods in the
* superclass.
*/
-public class BufferedRandomAccessFile extends RandomAccessFile implements FileDataInput, PageCacheInformer
+public class BufferedRandomAccessFile extends RandomAccessFile implements FileDataInput
{
- private static final Logger logger = Logger.getLogger(BufferedRandomAccessFile.class);
+ private static final long MAX_BYTES_IN_PAGE_CACHE = (long) Math.pow(2, 27); // 128mb
// absolute filesystem path to the file
private final String filePath;
@@ -73,23 +72,12 @@ public class BufferedRandomAccessFile ex
// file descriptor
private int fd;
- // keep *at most* this much data in the page cache from this file at a time
- private static final long MAX_BYTES_IN_PAGE_CACHE = (long) Math.pow(2, 27); // 128mb
-
// skip cache - used for commit log and sstable writing w/ posix_fadvise
private final boolean skipCache;
- // used for page cache migration in compaction
- private final boolean pageCacheMigrate;
-
- // tracks the number of bytes read or written since last page cache flush
private long bytesSinceCacheFlush = 0;
-
- // tracks the lowest seen file position since the last page cache flush
- // this is needed because the posix_fadvise call takes a offset and length
private long minBufferOffset = Long.MAX_VALUE;
-
/*
* Open a new <code>BufferedRandomAccessFile</code> on the file named
* <code>name</code> in mode <code>mode</code>, which should be "r" for
@@ -117,15 +105,14 @@ public class BufferedRandomAccessFile ex
public BufferedRandomAccessFile(File file, String mode, int bufferSize) throws IOException
{
- this(file, mode, bufferSize, false, false);
+ this(file, mode, bufferSize, false);
}
- public BufferedRandomAccessFile(File file, String mode, int bufferSize, boolean skipCache, boolean pageCacheMigrate) throws IOException
+ public BufferedRandomAccessFile(File file, String mode, int bufferSize, boolean skipCache) throws IOException
{
super(file, mode);
this.skipCache = skipCache;
- this.pageCacheMigrate = pageCacheMigrate;
channel = super.getChannel();
filePath = file.getAbsolutePath();
@@ -150,15 +137,10 @@ public class BufferedRandomAccessFile ex
channel.force(true); // true, because file length counts as
// "meta-data"
- if (skipCache && bytesSinceCacheFlush > 0)
+ if (skipCache)
{
- // clear remaining file from page cache
- long startAt = 0;
-
- if (pageCacheMigrate)
- startAt = minBufferOffset;
-
- CLibrary.trySkipCache(this.fd, startAt, 0);
+ // clear entire file from page cache
+ CLibrary.trySkipCache(this.fd, 0, 0);
minBufferOffset = Long.MAX_VALUE;
bytesSinceCacheFlush = 0;
@@ -168,34 +150,6 @@ public class BufferedRandomAccessFile ex
}
}
- /** {@InheritDoc} */
- public long getCurrentPosition()
- {
- return getFilePointer();
- }
-
- /** @{InheritDoc} */
- public void keepCacheWindow(long startingOffset)
- {
-
- if (!pageCacheMigrate)
- return;
-
- if (minBufferOffset < startingOffset)
- {
- //Flush anything before start offset
- CLibrary.trySkipCache(this.fd, minBufferOffset, startingOffset - 1);
-
- //jump ahead to position() so it doesn't get flushed in the range
- minBufferOffset = bufferOffset;
- bytesSinceCacheFlush = 0;
- }
-
- if(logger.isDebugEnabled())
- logger.debug("Kept " + (getFilePointer() - startingOffset) + " bytes in page cache");
- }
-
-
public void flush() throws IOException
{
if (isDirty)
@@ -208,8 +162,10 @@ public class BufferedRandomAccessFile ex
if (skipCache)
{
- // we don't know when the data reaches disk since we aren't calling flush
- // so we continue to clear pages we don't need from the first offset we see
+ // we don't know when the data reaches disk since we aren't
+ // calling flush
+ // so we continue to clear pages we don't need from the first
+ // offset we see
// periodically we update this starting offset
bytesSinceCacheFlush += validBufferBytes;
@@ -218,7 +174,7 @@ public class BufferedRandomAccessFile ex
if (bytesSinceCacheFlush >= MAX_BYTES_IN_PAGE_CACHE)
{
- CLibrary.trySkipCache(this.fd, minBufferOffset, 0);
+ CLibrary.trySkipCache(this.fd, (int) minBufferOffset, 0);
minBufferOffset = bufferOffset;
bytesSinceCacheFlush = 0;
}
@@ -257,7 +213,7 @@ public class BufferedRandomAccessFile ex
bytesSinceCacheFlush += read;
if (skipCache && bytesSinceCacheFlush >= MAX_BYTES_IN_PAGE_CACHE)
{
- CLibrary.trySkipCache(this.fd, minBufferOffset, 0);
+ CLibrary.trySkipCache(this.fd, (int) minBufferOffset, 0);
bytesSinceCacheFlush = 0;
minBufferOffset = Long.MAX_VALUE;
}
@@ -441,12 +397,7 @@ public class BufferedRandomAccessFile ex
if (skipCache && bytesSinceCacheFlush > 0)
{
- long startAt = 0;
-
- if(pageCacheMigrate)
- startAt = minBufferOffset;
-
- CLibrary.trySkipCache(this.fd, startAt, 0);
+ CLibrary.trySkipCache(this.fd, 0, 0);
}
super.close();
@@ -492,7 +443,7 @@ public class BufferedRandomAccessFile ex
public static BufferedRandomAccessFile getUncachingReader(String filename) throws IOException
{
- return new BufferedRandomAccessFile(new File(filename), "r", 8 * 1024 * 1024, true, false);
+ return new BufferedRandomAccessFile(new File(filename), "r", 8 * 1024 * 1024, true);
}
/**
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java?rev=1079952&r1=1079951&r2=1079952&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java Wed Mar 9 19:23:53 2011
@@ -19,20 +19,14 @@
package org.apache.cassandra.io.util;
import java.io.DataOutputStream;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.cassandra.utils.Pair;
/**
* An implementation of the DataOutputStream interface. This class is completely thread
* unsafe.
*/
-public final class DataOutputBuffer extends DataOutputStream implements PageCacheInformer
+public final class DataOutputBuffer extends DataOutputStream
{
- private List<Pair<Integer, Integer>> pageCacheMarkers;
-
public DataOutputBuffer()
{
this(128);
@@ -76,32 +70,6 @@ public final class DataOutputBuffer exte
{
this.written = 0;
buffer().reset();
- pageCacheMarkers = null;
-
return this;
}
-
- /** {@InheritDoc} */
- public void keepCacheWindow(long startAt)
- {
- if (pageCacheMarkers == null)
- pageCacheMarkers = new ArrayList<Pair<Integer,Integer>>();
-
- long endAt = getCurrentPosition();
-
- assert startAt <= endAt;
-
- pageCacheMarkers.add(new Pair<Integer,Integer>((int) startAt, (int) (endAt - startAt)));
- }
-
- /** {@InheritDoc} */
- public long getCurrentPosition()
- {
- return getLength();
- }
-
- public final List<Pair<Integer,Integer>> getPageCacheMarkers()
- {
- return pageCacheMarkers;
- }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/CLibrary.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/CLibrary.java?rev=1079952&r1=1079951&r2=1079952&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/CLibrary.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/CLibrary.java Wed Mar 9 19:23:53 2011
@@ -18,14 +18,10 @@
*/
package org.apache.cassandra.utils;
-import java.io.*;
+import java.io.File;
+import java.io.FileDescriptor;
+import java.io.IOException;
import java.lang.reflect.Field;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel.MapMode;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,10 +29,6 @@ import org.slf4j.LoggerFactory;
import com.sun.jna.LastErrorException;
import com.sun.jna.Native;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.io.util.BufferedRandomAccessFile;
-import org.apache.cassandra.io.util.FileUtils;
-
public final class CLibrary
{
private static Logger logger = LoggerFactory.getLogger(CLibrary.class);
@@ -58,28 +50,11 @@ public final class CLibrary
private static final int POSIX_FADV_DONTNEED = 4; /* fadvise.h */
private static final int POSIX_FADV_NOREUSE = 5; /* fadvise.h */
- private static native int mlockall(int flags) throws LastErrorException;
- private static native int munlockall() throws LastErrorException;
-
- private static native int link(String from, String to) throws LastErrorException;
-
- // fcntl - manipulate file descriptor, `man 2 fcntl`
- public static native int fcntl(int fd, int command, long flags) throws LastErrorException;
-
- // fadvice
- public static native int posix_fadvise(int fd, long offset, long len, int flag) throws LastErrorException;
-
- public static native int mincore(ByteBuffer buf, int length, char[] vec) throws LastErrorException;
-
- private static native int getpagesize() throws LastErrorException;
- public static Integer pageSize = null;
-
static
{
try
{
Native.register("c");
- pageSize = CLibrary.getPageSize();
}
catch (NoClassDefFoundError e)
{
@@ -93,38 +68,19 @@ public final class CLibrary
{
logger.warn("Obsolete version of JNA present; unable to register C library. Upgrade to JNA 3.2.7 or later");
}
-
}
- private static Integer getPageSize()
- {
- int ps = -1;
-
- if (!DatabaseDescriptor.isPageCaheMigrationEnabled())
- return null;
-
- try
- {
- ps = CLibrary.getpagesize();
- assert ps >= 0; // on error a value of -1 is returned and errno is set to indicate the error.
-
- logger.info("PageSize = " + ps);
- }
- catch (RuntimeException e)
- {
- if (!(e instanceof LastErrorException))
- throw e;
+ private static native int mlockall(int flags) throws LastErrorException;
+ private static native int munlockall() throws LastErrorException;
- logger.warn(String.format("getpagesize failed, errno (%d).", CLibrary.errno(e)));
- }
- catch (UnsatisfiedLinkError e)
- {
- // this will have already been logged by CLibrary, no need to repeat it
- }
+ private static native int link(String from, String to) throws LastErrorException;
- return ps > 0 ? ps : null;
- }
+ // fcntl - manipulate file descriptor, `man 2 fcntl`
+ public static native int fcntl(int fd, int command, long flags) throws LastErrorException;
+ // fadvice
+ public static native int posix_fadvise(int fd, int offset, int len, int flag) throws LastErrorException;
+
private static int errno(RuntimeException e)
{
assert e instanceof LastErrorException;
@@ -233,9 +189,9 @@ public final class CLibrary
}
}
- public static void trySkipCache(int fd, long offset, long len)
+ public static void trySkipCache(int fd, int offset, int len)
{
- if (fd < 0 || !DatabaseDescriptor.isPageCaheMigrationEnabled())
+ if (fd < 0)
return;
try
@@ -256,63 +212,6 @@ public final class CLibrary
}
}
- public static PageCacheMetrics getCachedPages(BufferedRandomAccessFile braf) throws IOException
- {
- if (pageSize == null || pageSize == 0 || !DatabaseDescriptor.isPageCaheMigrationEnabled())
- return null;
-
- long length = braf.length();
-
- PageCacheMetrics pageCacheMetrics = new PageCacheMetrics(pageSize, length);
-
- try
- {
- char[] pages = null;
-
- // MMap 2G chunks
- for (long offset = 0; offset < length; offset += Integer.MAX_VALUE)
- {
- long limit = (offset + Integer.MAX_VALUE) > length ? (length - offset) : Integer.MAX_VALUE;
-
- ByteBuffer buf = braf.getChannel().map(MapMode.READ_ONLY, offset, limit);
-
- int numPages = (int) ((limit + pageSize - 1) / pageSize);
-
- if (pages == null || pages.length < numPages)
- pages = new char[numPages];
-
- int rc = mincore(buf, (int) limit, pages);
-
- if (rc != 0)
- {
- logger.warn(String.format("mincore failed, rc (%d).", rc));
- break;
- }
-
- for (long i = 0, position = offset; position < limit; position += pageSize, i++)
- {
- if ((pages[(int) i] & 1) == 1)
- {
- pageCacheMetrics.setPage(position);
- }
- }
- }
- }
- catch (RuntimeException e)
- {
- if (!(e instanceof LastErrorException))
- throw e;
-
- logger.warn(String.format("mincore failed, errno (%d).", CLibrary.errno(e)));
- }
- catch (UnsatisfiedLinkError e)
- {
- // this will have already been logged by CLibrary, no need to repeat it
- }
-
- return pageCacheMetrics;
- }
-
public static int tryFcntl(int fd, int command, int flags)
{
int result = -1;
Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/ScrubTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/ScrubTest.java?rev=1079952&r1=1079951&r2=1079952&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/ScrubTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/ScrubTest.java Wed Mar 9 19:23:53 2011
@@ -46,6 +46,7 @@ public class ScrubTest extends CleanupHe
public String TABLE = "Keyspace1";
public String CF = "Standard1";
public String CF2 = "Super5";
+ public String CF3 = "Standard2";
public String corruptSSTableName;
@@ -129,6 +130,25 @@ public class ScrubTest extends CleanupHe
}
@Test
+ public void testScrubDeletedRow() throws IOException, ExecutionException, InterruptedException, ConfigurationException
+ {
+ CompactionManager.instance.disableAutoCompaction();
+ Table table = Table.open(TABLE);
+ ColumnFamilyStore cfs = table.getColumnFamilyStore(CF3);
+
+ RowMutation rm;
+ rm = new RowMutation(TABLE, ByteBufferUtil.bytes(1));
+ ColumnFamily cf = ColumnFamily.create(TABLE, CF3);
+ cf.delete(0, 1); // expired tombstone
+ rm.add(cf);
+ rm.applyUnsafe();
+ cfs.forceBlockingFlush();
+
+ CompactionManager.instance.performScrub(cfs);
+ assert cfs.getSSTables().isEmpty();
+ }
+
+ @Test
public void testScrubMultiRow() throws IOException, ExecutionException, InterruptedException, ConfigurationException
{
CompactionManager.instance.disableAutoCompaction();
Modified: cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java?rev=1079952&r1=1079951&r2=1079952&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java Wed Mar 9 19:23:53 2011
@@ -138,11 +138,11 @@ public class AntiEntropyServiceTest exte
// add a row with the minimum token
validator.add(new PrecompactedRow(new DecoratedKey(min, ByteBufferUtil.bytes("nonsense!")),
- new DataOutputBuffer()));
+ new DataOutputBuffer()));
// and a row after it
validator.add(new PrecompactedRow(new DecoratedKey(mid, ByteBufferUtil.bytes("inconceivable!")),
- new DataOutputBuffer()));
+ new DataOutputBuffer()));
validator.complete();
// confirm that the tree was validated