You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ee...@apache.org on 2010/02/23 17:57:53 UTC
svn commit: r915428 [1/3] - in
/incubator/cassandra/tags/cassandra-0.6.0-beta1: ./ contrib/word_count/src/
interface/thrift/gen-java/org/apache/cassandra/thrift/
src/java/org/apache/cassandra/cache/
src/java/org/apache/cassandra/db/commitlog/ src/java/...
Author: eevans
Date: Tue Feb 23 16:57:51 2010
New Revision: 915428
URL: http://svn.apache.org/viewvc?rev=915428&view=rev
Log:
tagged 0.6.0-beta1
Added:
incubator/cassandra/tags/cassandra-0.6.0-beta1/
- copied from r915421, incubator/cassandra/branches/cassandra-0.6/
incubator/cassandra/tags/cassandra-0.6.0-beta1/build.xml
- copied unchanged from r915427, incubator/cassandra/branches/cassandra-0.6/build.xml
incubator/cassandra/tags/cassandra-0.6.0-beta1/interface/thrift/gen-java/org/apache/cassandra/thrift/AuthenticationException.java
- copied unchanged from r915426, incubator/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/AuthenticationException.java
incubator/cassandra/tags/cassandra-0.6.0-beta1/interface/thrift/gen-java/org/apache/cassandra/thrift/AuthenticationRequest.java
- copied unchanged from r915426, incubator/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/AuthenticationRequest.java
incubator/cassandra/tags/cassandra-0.6.0-beta1/interface/thrift/gen-java/org/apache/cassandra/thrift/AuthorizationException.java
- copied unchanged from r915426, incubator/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/AuthorizationException.java
incubator/cassandra/tags/cassandra-0.6.0-beta1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
- copied unchanged from r915426, incubator/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
incubator/cassandra/tags/cassandra-0.6.0-beta1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
- copied unchanged from r915426, incubator/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
incubator/cassandra/tags/cassandra-0.6.0-beta1/interface/thrift/gen-java/org/apache/cassandra/thrift/ColumnOrSuperColumn.java
- copied unchanged from r915426, incubator/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/ColumnOrSuperColumn.java
incubator/cassandra/tags/cassandra-0.6.0-beta1/interface/thrift/gen-java/org/apache/cassandra/thrift/ColumnParent.java
- copied unchanged from r915426, incubator/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/ColumnParent.java
incubator/cassandra/tags/cassandra-0.6.0-beta1/interface/thrift/gen-java/org/apache/cassandra/thrift/ColumnPath.java
- copied unchanged from r915426, incubator/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/ColumnPath.java
incubator/cassandra/tags/cassandra-0.6.0-beta1/interface/thrift/gen-java/org/apache/cassandra/thrift/ConsistencyLevel.java
- copied unchanged from r915426, incubator/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/ConsistencyLevel.java
incubator/cassandra/tags/cassandra-0.6.0-beta1/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java
- copied unchanged from r915426, incubator/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java
incubator/cassandra/tags/cassandra-0.6.0-beta1/interface/thrift/gen-java/org/apache/cassandra/thrift/Deletion.java
- copied unchanged from r915426, incubator/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Deletion.java
incubator/cassandra/tags/cassandra-0.6.0-beta1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
- copied unchanged from r915426, incubator/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
incubator/cassandra/tags/cassandra-0.6.0-beta1/interface/thrift/gen-java/org/apache/cassandra/thrift/KeyRange.java
- copied unchanged from r915426, incubator/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/KeyRange.java
incubator/cassandra/tags/cassandra-0.6.0-beta1/interface/thrift/gen-java/org/apache/cassandra/thrift/KeySlice.java
- copied unchanged from r915426, incubator/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/KeySlice.java
incubator/cassandra/tags/cassandra-0.6.0-beta1/interface/thrift/gen-java/org/apache/cassandra/thrift/Mutation.java
- copied unchanged from r915426, incubator/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Mutation.java
incubator/cassandra/tags/cassandra-0.6.0-beta1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
- copied unchanged from r915426, incubator/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
incubator/cassandra/tags/cassandra-0.6.0-beta1/interface/thrift/gen-java/org/apache/cassandra/thrift/SlicePredicate.java
- copied unchanged from r915426, incubator/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SlicePredicate.java
incubator/cassandra/tags/cassandra-0.6.0-beta1/interface/thrift/gen-java/org/apache/cassandra/thrift/SliceRange.java
- copied unchanged from r915426, incubator/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SliceRange.java
incubator/cassandra/tags/cassandra-0.6.0-beta1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
- copied unchanged from r915426, incubator/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
incubator/cassandra/tags/cassandra-0.6.0-beta1/interface/thrift/gen-java/org/apache/cassandra/thrift/TimedOutException.java
- copied unchanged from r915426, incubator/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/TimedOutException.java
incubator/cassandra/tags/cassandra-0.6.0-beta1/interface/thrift/gen-java/org/apache/cassandra/thrift/TokenRange.java
- copied unchanged from r915426, incubator/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/TokenRange.java
incubator/cassandra/tags/cassandra-0.6.0-beta1/interface/thrift/gen-java/org/apache/cassandra/thrift/UnavailableException.java
- copied unchanged from r915426, incubator/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/UnavailableException.java
Modified:
incubator/cassandra/tags/cassandra-0.6.0-beta1/contrib/word_count/src/WordCountSetup.java
incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/cache/AbstractCache.java
incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/cache/IAggregatableCacheProvider.java
incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/cache/InstrumentedCache.java
incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/cache/JMXAggregatingCache.java
incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/cache/JMXAggregatingCacheMBean.java
incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/cache/JMXInstrumentedCache.java
incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/cache/JMXInstrumentedCacheMBean.java
incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/db/commitlog/CommitLogExecutorService.java
incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/dht/AbstractBounds.java
incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/dht/Bounds.java
incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/io/DeletionService.java
incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/io/SSTableDeletingReference.java
incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/io/util/FileDataInput.java
incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/streaming/CompletedFileStatus.java
incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java
incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/streaming/StreamInitiateDoneVerbHandler.java
incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
incubator/cassandra/tags/cassandra-0.6.0-beta1/test/unit/org/apache/cassandra/dht/BoundsTest.java
incubator/cassandra/tags/cassandra-0.6.0-beta1/test/unit/org/apache/cassandra/dht/RangeIntersectionTest.java
Modified: incubator/cassandra/tags/cassandra-0.6.0-beta1/contrib/word_count/src/WordCountSetup.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/tags/cassandra-0.6.0-beta1/contrib/word_count/src/WordCountSetup.java?rev=915428&r1=915421&r2=915428&view=diff
==============================================================================
--- incubator/cassandra/tags/cassandra-0.6.0-beta1/contrib/word_count/src/WordCountSetup.java (original)
+++ incubator/cassandra/tags/cassandra-0.6.0-beta1/contrib/word_count/src/WordCountSetup.java Tue Feb 23 16:57:51 2010
@@ -1,61 +1,61 @@
-import java.util.Arrays;
-
-import org.apache.log4j.Logger;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.service.StorageProxy;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.thrift.ConsistencyLevel;
-
-public class WordCountSetup
-{
- private static final Logger logger = Logger.getLogger(WordCountSetup.class);
-
- public static final int TEST_COUNT = 4;
-
- public static void main(String[] args) throws Exception
- {
- StorageService.instance.initClient();
- logger.info("Sleeping " + WordCount.RING_DELAY);
- Thread.sleep(WordCount.RING_DELAY);
- assert !StorageService.instance.getLiveNodes().isEmpty();
-
- RowMutation rm;
- ColumnFamily cf;
- byte[] columnName;
-
- // text0: no rows
-
- // text1: 1 row, 1 word
- columnName = "text1".getBytes();
- rm = new RowMutation(WordCount.KEYSPACE, "Key0");
- cf = ColumnFamily.create(WordCount.KEYSPACE, WordCount.COLUMN_FAMILY);
- cf.addColumn(new Column(columnName, "word1".getBytes(), 0));
- rm.add(cf);
- StorageProxy.mutateBlocking(Arrays.asList(rm), ConsistencyLevel.ONE);
- logger.info("added text1");
-
- // text2: 1 row, 2 words
- columnName = "text2".getBytes();
- rm = new RowMutation(WordCount.KEYSPACE, "Key0");
- cf = ColumnFamily.create(WordCount.KEYSPACE, WordCount.COLUMN_FAMILY);
- cf.addColumn(new Column(columnName, "word1 word2".getBytes(), 0));
- rm.add(cf);
- StorageProxy.mutateBlocking(Arrays.asList(rm), ConsistencyLevel.ONE);
- logger.info("added text2");
-
- // text3: 1000 rows, 1 word
- columnName = "text3".getBytes();
- for (int i = 0; i < 1000; i++)
- {
- rm = new RowMutation(WordCount.KEYSPACE, "Key" + i);
- cf = ColumnFamily.create(WordCount.KEYSPACE, WordCount.COLUMN_FAMILY);
- cf.addColumn(new Column(columnName, "word1".getBytes(), 0));
- rm.add(cf);
- StorageProxy.mutateBlocking(Arrays.asList(rm), ConsistencyLevel.ONE);
- }
- logger.info("added text3");
-
- System.exit(0);
- }
-}
+import java.util.Arrays;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+
+public class WordCountSetup
+{
+ private static final Logger logger = Logger.getLogger(WordCountSetup.class);
+
+ public static final int TEST_COUNT = 4;
+
+ public static void main(String[] args) throws Exception
+ {
+ StorageService.instance.initClient();
+ logger.info("Sleeping " + WordCount.RING_DELAY);
+ Thread.sleep(WordCount.RING_DELAY);
+ assert !StorageService.instance.getLiveNodes().isEmpty();
+
+ RowMutation rm;
+ ColumnFamily cf;
+ byte[] columnName;
+
+ // text0: no rows
+
+ // text1: 1 row, 1 word
+ columnName = "text1".getBytes();
+ rm = new RowMutation(WordCount.KEYSPACE, "Key0");
+ cf = ColumnFamily.create(WordCount.KEYSPACE, WordCount.COLUMN_FAMILY);
+ cf.addColumn(new Column(columnName, "word1".getBytes(), 0));
+ rm.add(cf);
+ StorageProxy.mutateBlocking(Arrays.asList(rm), ConsistencyLevel.ONE);
+ logger.info("added text1");
+
+ // text2: 1 row, 2 words
+ columnName = "text2".getBytes();
+ rm = new RowMutation(WordCount.KEYSPACE, "Key0");
+ cf = ColumnFamily.create(WordCount.KEYSPACE, WordCount.COLUMN_FAMILY);
+ cf.addColumn(new Column(columnName, "word1 word2".getBytes(), 0));
+ rm.add(cf);
+ StorageProxy.mutateBlocking(Arrays.asList(rm), ConsistencyLevel.ONE);
+ logger.info("added text2");
+
+ // text3: 1000 rows, 1 word
+ columnName = "text3".getBytes();
+ for (int i = 0; i < 1000; i++)
+ {
+ rm = new RowMutation(WordCount.KEYSPACE, "Key" + i);
+ cf = ColumnFamily.create(WordCount.KEYSPACE, WordCount.COLUMN_FAMILY);
+ cf.addColumn(new Column(columnName, "word1".getBytes(), 0));
+ rm.add(cf);
+ StorageProxy.mutateBlocking(Arrays.asList(rm), ConsistencyLevel.ONE);
+ }
+ logger.info("added text3");
+
+ System.exit(0);
+ }
+}
Modified: incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/cache/AbstractCache.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/cache/AbstractCache.java?rev=915428&r1=915421&r2=915428&view=diff
==============================================================================
--- incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/cache/AbstractCache.java (original)
+++ incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/cache/AbstractCache.java Tue Feb 23 16:57:51 2010
@@ -1,22 +1,22 @@
-package org.apache.cassandra.cache;
-
-import java.lang.management.ManagementFactory;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
-public class AbstractCache
-{
- static void registerMBean(Object cache, String table, String name)
- {
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- try
- {
- String mbeanName = "org.apache.cassandra.db:type=Caches,keyspace=" + table + ",cache=" + name;
- mbs.registerMBean(cache, new ObjectName(mbeanName));
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- }
-}
+package org.apache.cassandra.cache;
+
+import java.lang.management.ManagementFactory;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+public class AbstractCache
+{
+ static void registerMBean(Object cache, String table, String name)
+ {
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ try
+ {
+ String mbeanName = "org.apache.cassandra.db:type=Caches,keyspace=" + table + ",cache=" + name;
+ mbs.registerMBean(cache, new ObjectName(mbeanName));
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+}
Modified: incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/cache/IAggregatableCacheProvider.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/cache/IAggregatableCacheProvider.java?rev=915428&r1=915421&r2=915428&view=diff
==============================================================================
--- incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/cache/IAggregatableCacheProvider.java (original)
+++ incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/cache/IAggregatableCacheProvider.java Tue Feb 23 16:57:51 2010
@@ -1,7 +1,7 @@
-package org.apache.cassandra.cache;
-
-public interface IAggregatableCacheProvider<K, V>
-{
- public InstrumentedCache<K, V> getCache();
- public long getObjectCount();
-}
+package org.apache.cassandra.cache;
+
+public interface IAggregatableCacheProvider<K, V>
+{
+ public InstrumentedCache<K, V> getCache();
+ public long getObjectCount();
+}
Modified: incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/cache/InstrumentedCache.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/cache/InstrumentedCache.java?rev=915428&r1=915421&r2=915428&view=diff
==============================================================================
--- incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/cache/InstrumentedCache.java (original)
+++ incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/cache/InstrumentedCache.java Tue Feb 23 16:57:51 2010
@@ -1,85 +1,85 @@
-package org.apache.cassandra.cache;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.reardencommerce.kernel.collections.shared.evictable.ConcurrentLinkedHashMap;
-
-public class InstrumentedCache<K, V>
-{
- private int capacity;
- private final ConcurrentLinkedHashMap<K, V> map;
- private final AtomicLong requests = new AtomicLong(0);
- private final AtomicLong hits = new AtomicLong(0);
- long lastRequests, lastHits;
-
- public InstrumentedCache(int capacity)
- {
- this.capacity = capacity;
- map = ConcurrentLinkedHashMap.create(ConcurrentLinkedHashMap.EvictionPolicy.SECOND_CHANCE, capacity);
- }
-
- public void put(K key, V value)
- {
- map.put(key, value);
- }
-
- public V get(K key)
- {
- V v = map.get(key);
- requests.incrementAndGet();
- if (v != null)
- hits.incrementAndGet();
- return v;
- }
-
- public V getInternal(K key)
- {
- return map.get(key);
- }
-
- public void remove(K key)
- {
- map.remove(key);
- }
-
- public int getCapacity()
- {
- return capacity;
- }
-
- public void setCapacity(int capacity)
- {
- map.setCapacity(capacity);
- this.capacity = capacity;
- }
-
- public int getSize()
- {
- return map.size();
- }
-
- public long getHits()
- {
- return hits.get();
- }
-
- public long getRequests()
- {
- return requests.get();
- }
-
- public double getRecentHitRate()
- {
- long r = requests.get();
- long h = hits.get();
- try
- {
- return ((double)(h - lastHits)) / (r - lastRequests);
- }
- finally
- {
- lastRequests = r;
- lastHits = h;
- }
- }
-}
+package org.apache.cassandra.cache;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.reardencommerce.kernel.collections.shared.evictable.ConcurrentLinkedHashMap;
+
+public class InstrumentedCache<K, V>
+{
+ private int capacity;
+ private final ConcurrentLinkedHashMap<K, V> map;
+ private final AtomicLong requests = new AtomicLong(0);
+ private final AtomicLong hits = new AtomicLong(0);
+ long lastRequests, lastHits;
+
+ public InstrumentedCache(int capacity)
+ {
+ this.capacity = capacity;
+ map = ConcurrentLinkedHashMap.create(ConcurrentLinkedHashMap.EvictionPolicy.SECOND_CHANCE, capacity);
+ }
+
+ public void put(K key, V value)
+ {
+ map.put(key, value);
+ }
+
+ public V get(K key)
+ {
+ V v = map.get(key);
+ requests.incrementAndGet();
+ if (v != null)
+ hits.incrementAndGet();
+ return v;
+ }
+
+ public V getInternal(K key)
+ {
+ return map.get(key);
+ }
+
+ public void remove(K key)
+ {
+ map.remove(key);
+ }
+
+ public int getCapacity()
+ {
+ return capacity;
+ }
+
+ public void setCapacity(int capacity)
+ {
+ map.setCapacity(capacity);
+ this.capacity = capacity;
+ }
+
+ public int getSize()
+ {
+ return map.size();
+ }
+
+ public long getHits()
+ {
+ return hits.get();
+ }
+
+ public long getRequests()
+ {
+ return requests.get();
+ }
+
+ public double getRecentHitRate()
+ {
+ long r = requests.get();
+ long h = hits.get();
+ try
+ {
+ return ((double)(h - lastHits)) / (r - lastRequests);
+ }
+ finally
+ {
+ lastRequests = r;
+ lastHits = h;
+ }
+ }
+}
Modified: incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/cache/JMXAggregatingCache.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/cache/JMXAggregatingCache.java?rev=915428&r1=915421&r2=915428&view=diff
==============================================================================
--- incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/cache/JMXAggregatingCache.java (original)
+++ incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/cache/JMXAggregatingCache.java Tue Feb 23 16:57:51 2010
@@ -1,78 +1,78 @@
-package org.apache.cassandra.cache;
-
-public class JMXAggregatingCache implements JMXAggregatingCacheMBean
-{
- private final Iterable<IAggregatableCacheProvider> cacheProviders;
-
- public JMXAggregatingCache(Iterable<IAggregatableCacheProvider> caches, String table, String name)
- {
- this.cacheProviders = caches;
- AbstractCache.registerMBean(this, table, name);
- }
-
- public int getCapacity()
- {
- int capacity = 0;
- for (IAggregatableCacheProvider cacheProvider : cacheProviders)
- {
- capacity += cacheProvider.getCache().getCapacity();
- }
- return capacity;
- }
-
- public void setCapacity(int capacity)
- {
- long totalObjects = 0;
- for (IAggregatableCacheProvider cacheProvider : cacheProviders)
- {
- totalObjects += cacheProvider.getObjectCount();
- }
- for (IAggregatableCacheProvider cacheProvider : cacheProviders)
- {
- double ratio = ((double)cacheProvider.getObjectCount()) / totalObjects;
- cacheProvider.getCache().setCapacity((int)(capacity * ratio));
- }
- }
-
- public int getSize()
- {
- int size = 0;
- for (IAggregatableCacheProvider cacheProvider : cacheProviders)
- {
- size += cacheProvider.getCache().getSize();
- }
- return size;
- }
-
- public long getRequests()
- {
- long requests = 0;
- for (IAggregatableCacheProvider cacheProvider : cacheProviders)
- {
- requests += cacheProvider.getCache().getRequests();
- }
- return requests;
- }
-
- public long getHits()
- {
- long hits = 0;
- for (IAggregatableCacheProvider cacheProvider : cacheProviders)
- {
- hits += cacheProvider.getCache().getHits();
- }
- return hits;
- }
-
- public double getRecentHitRate()
- {
- int n = 0;
- double rate = 0;
- for (IAggregatableCacheProvider cacheProvider : cacheProviders)
- {
- rate += cacheProvider.getCache().getRecentHitRate();
- n++;
- }
- return rate / n;
- }
-}
+package org.apache.cassandra.cache;
+
+public class JMXAggregatingCache implements JMXAggregatingCacheMBean
+{
+ private final Iterable<IAggregatableCacheProvider> cacheProviders;
+
+ public JMXAggregatingCache(Iterable<IAggregatableCacheProvider> caches, String table, String name)
+ {
+ this.cacheProviders = caches;
+ AbstractCache.registerMBean(this, table, name);
+ }
+
+ public int getCapacity()
+ {
+ int capacity = 0;
+ for (IAggregatableCacheProvider cacheProvider : cacheProviders)
+ {
+ capacity += cacheProvider.getCache().getCapacity();
+ }
+ return capacity;
+ }
+
+ public void setCapacity(int capacity)
+ {
+ long totalObjects = 0;
+ for (IAggregatableCacheProvider cacheProvider : cacheProviders)
+ {
+ totalObjects += cacheProvider.getObjectCount();
+ }
+ for (IAggregatableCacheProvider cacheProvider : cacheProviders)
+ {
+ double ratio = ((double)cacheProvider.getObjectCount()) / totalObjects;
+ cacheProvider.getCache().setCapacity((int)(capacity * ratio));
+ }
+ }
+
+ public int getSize()
+ {
+ int size = 0;
+ for (IAggregatableCacheProvider cacheProvider : cacheProviders)
+ {
+ size += cacheProvider.getCache().getSize();
+ }
+ return size;
+ }
+
+ public long getRequests()
+ {
+ long requests = 0;
+ for (IAggregatableCacheProvider cacheProvider : cacheProviders)
+ {
+ requests += cacheProvider.getCache().getRequests();
+ }
+ return requests;
+ }
+
+ public long getHits()
+ {
+ long hits = 0;
+ for (IAggregatableCacheProvider cacheProvider : cacheProviders)
+ {
+ hits += cacheProvider.getCache().getHits();
+ }
+ return hits;
+ }
+
+ public double getRecentHitRate()
+ {
+ int n = 0;
+ double rate = 0;
+ for (IAggregatableCacheProvider cacheProvider : cacheProviders)
+ {
+ rate += cacheProvider.getCache().getRecentHitRate();
+ n++;
+ }
+ return rate / n;
+ }
+}
Modified: incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/cache/JMXAggregatingCacheMBean.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/cache/JMXAggregatingCacheMBean.java?rev=915428&r1=915421&r2=915428&view=diff
==============================================================================
--- incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/cache/JMXAggregatingCacheMBean.java (original)
+++ incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/cache/JMXAggregatingCacheMBean.java Tue Feb 23 16:57:51 2010
@@ -1,12 +1,12 @@
-package org.apache.cassandra.cache;
-
-public interface JMXAggregatingCacheMBean
-{
- public int getCapacity();
- public void setCapacity(int capacity);
- public int getSize();
-
- public long getRequests();
- public long getHits();
- public double getRecentHitRate();
+package org.apache.cassandra.cache;
+
+public interface JMXAggregatingCacheMBean
+{
+ public int getCapacity();
+ public void setCapacity(int capacity);
+ public int getSize();
+
+ public long getRequests();
+ public long getHits();
+ public double getRecentHitRate();
}
\ No newline at end of file
Modified: incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/cache/JMXInstrumentedCache.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/cache/JMXInstrumentedCache.java?rev=915428&r1=915421&r2=915428&view=diff
==============================================================================
--- incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/cache/JMXInstrumentedCache.java (original)
+++ incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/cache/JMXInstrumentedCache.java Tue Feb 23 16:57:51 2010
@@ -1,10 +1,10 @@
-package org.apache.cassandra.cache;
-
-public class JMXInstrumentedCache<K, V> extends InstrumentedCache<K, V> implements JMXInstrumentedCacheMBean
-{
- public JMXInstrumentedCache(String table, String name, int capacity)
- {
- super(capacity);
- AbstractCache.registerMBean(this, table, name);
- }
+package org.apache.cassandra.cache;
+
+public class JMXInstrumentedCache<K, V> extends InstrumentedCache<K, V> implements JMXInstrumentedCacheMBean
+{
+ public JMXInstrumentedCache(String table, String name, int capacity)
+ {
+ super(capacity);
+ AbstractCache.registerMBean(this, table, name);
+ }
}
\ No newline at end of file
Modified: incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/cache/JMXInstrumentedCacheMBean.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/cache/JMXInstrumentedCacheMBean.java?rev=915428&r1=915421&r2=915428&view=diff
==============================================================================
--- incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/cache/JMXInstrumentedCacheMBean.java (original)
+++ incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/cache/JMXInstrumentedCacheMBean.java Tue Feb 23 16:57:51 2010
@@ -1,21 +1,21 @@
-package org.apache.cassandra.cache;
-
-public interface JMXInstrumentedCacheMBean
-{
- public int getCapacity();
- public void setCapacity(int capacity);
- public int getSize();
-
- /** total request count since cache creation */
- public long getRequests();
-
- /** total cache hit count since cache creation */
- public long getHits();
-
- /**
- * hits / requests since the last time getHitRate was called. serious telemetry apps should not use this,
- * and should instead track the deltas from getHits / getRequests themselves, since those will not be
- * affected by multiple users calling it. Provided for convenience only.
- */
- public double getRecentHitRate();
-}
+package org.apache.cassandra.cache;
+
+public interface JMXInstrumentedCacheMBean
+{
+ public int getCapacity();
+ public void setCapacity(int capacity);
+ public int getSize();
+
+ /** total request count since cache creation */
+ public long getRequests();
+
+ /** total cache hit count since cache creation */
+ public long getHits();
+
+ /**
+ * hits / requests since the last time getHitRate was called. serious telemetry apps should not use this,
+ * and should instead track the deltas from getHits / getRequests themselves, since those will not be
+ * affected by multiple users calling it. Provided for convenience only.
+ */
+ public double getRecentHitRate();
+}
Modified: incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/db/commitlog/CommitLogExecutorService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/db/commitlog/CommitLogExecutorService.java?rev=915428&r1=915421&r2=915428&view=diff
==============================================================================
--- incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/db/commitlog/CommitLogExecutorService.java (original)
+++ incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/db/commitlog/CommitLogExecutorService.java Tue Feb 23 16:57:51 2010
@@ -1,213 +1,213 @@
-package org.apache.cassandra.db.commitlog;
-
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.*;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.utils.WrappedRunnable;
-
-class CommitLogExecutorService extends AbstractExecutorService implements CommitLogExecutorServiceMBean
-{
- private final BlockingQueue<CheaterFutureTask> queue;
-
- private volatile long completedTaskCount = 0;
-
- public CommitLogExecutorService()
- {
- this(DatabaseDescriptor.getCommitLogSync() == DatabaseDescriptor.CommitLogSync.batch
- ? DatabaseDescriptor.getConcurrentWriters()
- : 1024 * Runtime.getRuntime().availableProcessors());
- }
-
- public CommitLogExecutorService(int queueSize)
- {
- queue = new LinkedBlockingQueue<CheaterFutureTask>(queueSize);
- Runnable runnable = new WrappedRunnable()
- {
- public void runMayThrow() throws Exception
- {
- if (DatabaseDescriptor.getCommitLogSync() == DatabaseDescriptor.CommitLogSync.batch)
- {
- while (true)
- {
- processWithSyncBatch();
- completedTaskCount++;
- }
- }
- else
- {
- while (true)
- {
- process();
- completedTaskCount++;
- }
- }
- }
- };
- new Thread(runnable, "COMMIT-LOG-WRITER").start();
-
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- try
- {
- mbs.registerMBean(this, new ObjectName("org.apache.cassandra.db:type=Commitlog"));
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- }
-
-
- /**
- * Get the current number of running tasks
- */
- public int getActiveCount()
- {
- return 1;
- }
-
- /**
- * Get the number of completed tasks
- */
- public long getCompletedTasks()
- {
- return completedTaskCount;
- }
-
- /**
- * Get the number of tasks waiting to be executed
- */
- public long getPendingTasks()
- {
- return queue.size();
- }
-
- private void process() throws InterruptedException
- {
- queue.take().run();
- }
-
- private final ArrayList<CheaterFutureTask> incompleteTasks = new ArrayList<CheaterFutureTask>();
- private final ArrayList taskValues = new ArrayList(); // TODO not sure how to generify this
- private void processWithSyncBatch() throws Exception
- {
- CheaterFutureTask firstTask = queue.take();
- if (!(firstTask.getRawCallable() instanceof CommitLog.LogRecordAdder))
- {
- firstTask.run();
- return;
- }
-
- // attempt to do a bunch of LogRecordAdder ops before syncing
- // (this is a little clunky since there is no blocking peek method,
- // so we have to break it into firstTask / extra tasks)
- incompleteTasks.clear();
- taskValues.clear();
- long end = System.nanoTime() + (long)(1000000 * DatabaseDescriptor.getCommitLogSyncBatchWindow());
-
- // it doesn't seem worth bothering future-izing the exception
- // since if a commitlog op throws, we're probably screwed anyway
- incompleteTasks.add(firstTask);
- taskValues.add(firstTask.getRawCallable().call());
- while (!queue.isEmpty()
- && queue.peek().getRawCallable() instanceof CommitLog.LogRecordAdder
- && System.nanoTime() < end)
- {
- CheaterFutureTask task = queue.remove();
- incompleteTasks.add(task);
- taskValues.add(task.getRawCallable().call());
- }
-
- // now sync and set the tasks' values (which allows thread calling get() to proceed)
- try
- {
- CommitLog.instance().sync();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- for (int i = 0; i < incompleteTasks.size(); i++)
- {
- incompleteTasks.get(i).set(taskValues.get(i));
- }
- }
-
-
- @Override
- protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value)
- {
- return newTaskFor(Executors.callable(runnable, value));
- }
-
- @Override
- protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable)
- {
- return new CheaterFutureTask(callable);
- }
-
- public void execute(Runnable command)
- {
- try
- {
- queue.put((CheaterFutureTask)command);
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- public boolean isShutdown()
- {
- return false;
- }
-
- public boolean isTerminated()
- {
- return false;
- }
-
- // cassandra is crash-only so there's no need to implement the shutdown methods
- public void shutdown()
- {
- throw new UnsupportedOperationException();
- }
-
- public List<Runnable> shutdownNow()
- {
- throw new UnsupportedOperationException();
- }
-
- public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
- {
- throw new UnsupportedOperationException();
- }
-
- private static class CheaterFutureTask<V> extends FutureTask<V>
- {
- private final Callable rawCallable;
-
- public CheaterFutureTask(Callable<V> callable)
- {
- super(callable);
- rawCallable = callable;
- }
-
- public Callable getRawCallable()
- {
- return rawCallable;
- }
-
- @Override
- public void set(V v)
- {
- super.set(v);
- }
- }
-}
+package org.apache.cassandra.db.commitlog;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.*;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.utils.WrappedRunnable;
+
+class CommitLogExecutorService extends AbstractExecutorService implements CommitLogExecutorServiceMBean
+{
+ private final BlockingQueue<CheaterFutureTask> queue;
+
+ private volatile long completedTaskCount = 0;
+
+ public CommitLogExecutorService()
+ {
+ this(DatabaseDescriptor.getCommitLogSync() == DatabaseDescriptor.CommitLogSync.batch
+ ? DatabaseDescriptor.getConcurrentWriters()
+ : 1024 * Runtime.getRuntime().availableProcessors());
+ }
+
+ public CommitLogExecutorService(int queueSize)
+ {
+ queue = new LinkedBlockingQueue<CheaterFutureTask>(queueSize);
+ Runnable runnable = new WrappedRunnable()
+ {
+ public void runMayThrow() throws Exception
+ {
+ if (DatabaseDescriptor.getCommitLogSync() == DatabaseDescriptor.CommitLogSync.batch)
+ {
+ while (true)
+ {
+ processWithSyncBatch();
+ completedTaskCount++;
+ }
+ }
+ else
+ {
+ while (true)
+ {
+ process();
+ completedTaskCount++;
+ }
+ }
+ }
+ };
+ new Thread(runnable, "COMMIT-LOG-WRITER").start();
+
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ try
+ {
+ mbs.registerMBean(this, new ObjectName("org.apache.cassandra.db:type=Commitlog"));
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+
+ /**
+ * Get the current number of running tasks
+ */
+ public int getActiveCount()
+ {
+ return 1;
+ }
+
+ /**
+ * Get the number of completed tasks
+ */
+ public long getCompletedTasks()
+ {
+ return completedTaskCount;
+ }
+
+ /**
+ * Get the number of tasks waiting to be executed
+ */
+ public long getPendingTasks()
+ {
+ return queue.size();
+ }
+
+ private void process() throws InterruptedException
+ {
+ queue.take().run();
+ }
+
+ private final ArrayList<CheaterFutureTask> incompleteTasks = new ArrayList<CheaterFutureTask>();
+ private final ArrayList taskValues = new ArrayList(); // TODO not sure how to generify this
+ private void processWithSyncBatch() throws Exception
+ {
+ CheaterFutureTask firstTask = queue.take();
+ if (!(firstTask.getRawCallable() instanceof CommitLog.LogRecordAdder))
+ {
+ firstTask.run();
+ return;
+ }
+
+ // attempt to do a bunch of LogRecordAdder ops before syncing
+ // (this is a little clunky since there is no blocking peek method,
+ // so we have to break it into firstTask / extra tasks)
+ incompleteTasks.clear();
+ taskValues.clear();
+ long end = System.nanoTime() + (long)(1000000 * DatabaseDescriptor.getCommitLogSyncBatchWindow());
+
+ // it doesn't seem worth bothering future-izing the exception
+ // since if a commitlog op throws, we're probably screwed anyway
+ incompleteTasks.add(firstTask);
+ taskValues.add(firstTask.getRawCallable().call());
+ while (!queue.isEmpty()
+ && queue.peek().getRawCallable() instanceof CommitLog.LogRecordAdder
+ && System.nanoTime() < end)
+ {
+ CheaterFutureTask task = queue.remove();
+ incompleteTasks.add(task);
+ taskValues.add(task.getRawCallable().call());
+ }
+
+ // now sync and set the tasks' values (which allows thread calling get() to proceed)
+ try
+ {
+ CommitLog.instance().sync();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ for (int i = 0; i < incompleteTasks.size(); i++)
+ {
+ incompleteTasks.get(i).set(taskValues.get(i));
+ }
+ }
+
+
+ @Override
+ protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value)
+ {
+ return newTaskFor(Executors.callable(runnable, value));
+ }
+
+ @Override
+ protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable)
+ {
+ return new CheaterFutureTask(callable);
+ }
+
+ public void execute(Runnable command)
+ {
+ try
+ {
+ queue.put((CheaterFutureTask)command);
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public boolean isShutdown()
+ {
+ return false;
+ }
+
+ public boolean isTerminated()
+ {
+ return false;
+ }
+
+ // cassandra is crash-only so there's no need to implement the shutdown methods
+ public void shutdown()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public List<Runnable> shutdownNow()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ private static class CheaterFutureTask<V> extends FutureTask<V>
+ {
+ private final Callable rawCallable;
+
+ public CheaterFutureTask(Callable<V> callable)
+ {
+ super(callable);
+ rawCallable = callable;
+ }
+
+ public Callable getRawCallable()
+ {
+ return rawCallable;
+ }
+
+ @Override
+ public void set(V v)
+ {
+ super.set(v);
+ }
+ }
+}
Modified: incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java?rev=915428&r1=915421&r2=915428&view=diff
==============================================================================
--- incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java (original)
+++ incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java Tue Feb 23 16:57:51 2010
@@ -1,193 +1,193 @@
-package org.apache.cassandra.db.commitlog;
-
-import java.io.File;
-import java.io.IOError;
-import java.io.IOException;
-import java.util.zip.CRC32;
-import java.util.zip.Checksum;
-
-import org.apache.log4j.Logger;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.RowMutation;
-import org.apache.cassandra.db.Table;
-import org.apache.cassandra.io.util.BufferedRandomAccessFile;
-import org.apache.cassandra.io.util.DataOutputBuffer;
-
-public class CommitLogSegment
-{
- private static final Logger logger = Logger.getLogger(CommitLogSegment.class);
-
- private final BufferedRandomAccessFile logWriter;
- private final CommitLogHeader header;
-
- public CommitLogSegment(int cfCount)
- {
- this.header = new CommitLogHeader(cfCount);
- String logFile = DatabaseDescriptor.getLogFileLocation() + File.separator + "CommitLog-" + System.currentTimeMillis() + ".log";
- logger.info("Creating new commitlog segment " + logFile);
-
- try
- {
- logWriter = createWriter(logFile);
- writeCommitLogHeader(header.toByteArray());
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
- }
-
- public void writeHeader() throws IOException
- {
- seekAndWriteCommitLogHeader(header.toByteArray());
- }
-
- /** writes header at the beginning of the file, then seeks back to current position */
- void seekAndWriteCommitLogHeader(byte[] bytes) throws IOException
- {
- long currentPos = logWriter.getFilePointer();
- logWriter.seek(0);
-
- writeCommitLogHeader(bytes);
-
- logWriter.seek(currentPos);
- }
-
- private void writeCommitLogHeader(byte[] bytes) throws IOException
- {
- logWriter.writeLong(bytes.length);
- logWriter.write(bytes);
- logWriter.sync();
- }
-
- private static BufferedRandomAccessFile createWriter(String file) throws IOException
- {
- return new BufferedRandomAccessFile(file, "rw", 128 * 1024);
- }
-
- public CommitLogSegment.CommitLogContext write(RowMutation rowMutation, Object serializedRow) throws IOException
- {
- long currentPosition = -1L;
- try
- {
- currentPosition = logWriter.getFilePointer();
- CommitLogSegment.CommitLogContext cLogCtx = new CommitLogSegment.CommitLogContext(currentPosition);
- Table table = Table.open(rowMutation.getTable());
-
- // update header
- for (ColumnFamily columnFamily : rowMutation.getColumnFamilies())
- {
- int id = table.getColumnFamilyId(columnFamily.name());
- if (!header.isDirty(id))
- {
- header.turnOn(id, logWriter.getFilePointer());
- seekAndWriteCommitLogHeader(header.toByteArray());
- }
- }
-
- // write mutation, w/ checksum
- Checksum checkum = new CRC32();
- if (serializedRow instanceof DataOutputBuffer)
- {
- DataOutputBuffer buffer = (DataOutputBuffer) serializedRow;
- logWriter.writeLong(buffer.getLength());
- logWriter.write(buffer.getData(), 0, buffer.getLength());
- checkum.update(buffer.getData(), 0, buffer.getLength());
- }
- else
- {
- assert serializedRow instanceof byte[];
- byte[] bytes = (byte[]) serializedRow;
- logWriter.writeLong(bytes.length);
- logWriter.write(bytes);
- checkum.update(bytes, 0, bytes.length);
- }
- logWriter.writeLong(checkum.getValue());
-
- return cLogCtx;
- }
- catch (IOException e)
- {
- if (currentPosition != -1)
- logWriter.seek(currentPosition);
- throw e;
- }
- }
-
- public void sync() throws IOException
- {
- logWriter.sync();
- }
-
- public CommitLogContext getContext()
- {
- return new CommitLogContext(logWriter.getFilePointer());
- }
-
- public CommitLogHeader getHeader()
- {
- return header;
- }
-
- public String getPath()
- {
- return logWriter.getPath();
- }
-
- public long length()
- {
- try
- {
- return logWriter.length();
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
- }
-
- public void close()
- {
- try
- {
- logWriter.close();
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
- }
-
- @Override
- public String toString()
- {
- return "CommitLogSegment(" + logWriter.getPath() + ')';
- }
-
- public class CommitLogContext
- {
- public final long position;
-
- public CommitLogContext(long position)
- {
- assert position >= 0;
- this.position = position;
- }
-
- public CommitLogSegment getSegment()
- {
- return CommitLogSegment.this;
- }
-
- @Override
- public String toString()
- {
- return "CommitLogContext(" +
- "file='" + logWriter.getPath() + '\'' +
- ", position=" + position +
- ')';
- }
- }
-}
+package org.apache.cassandra.db.commitlog;
+
+import java.io.File;
+import java.io.IOError;
+import java.io.IOException;
+import java.util.zip.CRC32;
+import java.util.zip.Checksum;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.io.util.BufferedRandomAccessFile;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+
+public class CommitLogSegment
+{
+ private static final Logger logger = Logger.getLogger(CommitLogSegment.class);
+
+ private final BufferedRandomAccessFile logWriter;
+ private final CommitLogHeader header;
+
+ public CommitLogSegment(int cfCount)
+ {
+ this.header = new CommitLogHeader(cfCount);
+ String logFile = DatabaseDescriptor.getLogFileLocation() + File.separator + "CommitLog-" + System.currentTimeMillis() + ".log";
+ logger.info("Creating new commitlog segment " + logFile);
+
+ try
+ {
+ logWriter = createWriter(logFile);
+ writeCommitLogHeader(header.toByteArray());
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ }
+
+ public void writeHeader() throws IOException
+ {
+ seekAndWriteCommitLogHeader(header.toByteArray());
+ }
+
+ /** writes header at the beginning of the file, then seeks back to current position */
+ void seekAndWriteCommitLogHeader(byte[] bytes) throws IOException
+ {
+ long currentPos = logWriter.getFilePointer();
+ logWriter.seek(0);
+
+ writeCommitLogHeader(bytes);
+
+ logWriter.seek(currentPos);
+ }
+
+ private void writeCommitLogHeader(byte[] bytes) throws IOException
+ {
+ logWriter.writeLong(bytes.length);
+ logWriter.write(bytes);
+ logWriter.sync();
+ }
+
+ private static BufferedRandomAccessFile createWriter(String file) throws IOException
+ {
+ return new BufferedRandomAccessFile(file, "rw", 128 * 1024);
+ }
+
+ public CommitLogSegment.CommitLogContext write(RowMutation rowMutation, Object serializedRow) throws IOException
+ {
+ long currentPosition = -1L;
+ try
+ {
+ currentPosition = logWriter.getFilePointer();
+ CommitLogSegment.CommitLogContext cLogCtx = new CommitLogSegment.CommitLogContext(currentPosition);
+ Table table = Table.open(rowMutation.getTable());
+
+ // update header
+ for (ColumnFamily columnFamily : rowMutation.getColumnFamilies())
+ {
+ int id = table.getColumnFamilyId(columnFamily.name());
+ if (!header.isDirty(id))
+ {
+ header.turnOn(id, logWriter.getFilePointer());
+ seekAndWriteCommitLogHeader(header.toByteArray());
+ }
+ }
+
+ // write mutation, w/ checksum
+ Checksum checkum = new CRC32();
+ if (serializedRow instanceof DataOutputBuffer)
+ {
+ DataOutputBuffer buffer = (DataOutputBuffer) serializedRow;
+ logWriter.writeLong(buffer.getLength());
+ logWriter.write(buffer.getData(), 0, buffer.getLength());
+ checkum.update(buffer.getData(), 0, buffer.getLength());
+ }
+ else
+ {
+ assert serializedRow instanceof byte[];
+ byte[] bytes = (byte[]) serializedRow;
+ logWriter.writeLong(bytes.length);
+ logWriter.write(bytes);
+ checkum.update(bytes, 0, bytes.length);
+ }
+ logWriter.writeLong(checkum.getValue());
+
+ return cLogCtx;
+ }
+ catch (IOException e)
+ {
+ if (currentPosition != -1)
+ logWriter.seek(currentPosition);
+ throw e;
+ }
+ }
+
+ public void sync() throws IOException
+ {
+ logWriter.sync();
+ }
+
+ public CommitLogContext getContext()
+ {
+ return new CommitLogContext(logWriter.getFilePointer());
+ }
+
+ public CommitLogHeader getHeader()
+ {
+ return header;
+ }
+
+ public String getPath()
+ {
+ return logWriter.getPath();
+ }
+
+ public long length()
+ {
+ try
+ {
+ return logWriter.length();
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ }
+
+ public void close()
+ {
+ try
+ {
+ logWriter.close();
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return "CommitLogSegment(" + logWriter.getPath() + ')';
+ }
+
+ public class CommitLogContext
+ {
+ public final long position;
+
+ public CommitLogContext(long position)
+ {
+ assert position >= 0;
+ this.position = position;
+ }
+
+ public CommitLogSegment getSegment()
+ {
+ return CommitLogSegment.this;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "CommitLogContext(" +
+ "file='" + logWriter.getPath() + '\'' +
+ ", position=" + position +
+ ')';
+ }
+ }
+}
Modified: incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/dht/AbstractBounds.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/dht/AbstractBounds.java?rev=915428&r1=915421&r2=915428&view=diff
==============================================================================
--- incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/dht/AbstractBounds.java (original)
+++ incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/dht/AbstractBounds.java Tue Feb 23 16:57:51 2010
@@ -1,71 +1,71 @@
-package org.apache.cassandra.dht;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.cassandra.io.ICompactSerializer2;
-
-public abstract class AbstractBounds implements Serializable
-{
- private static AbstractBoundsSerializer serializer = new AbstractBoundsSerializer();
-
- public static ICompactSerializer2<AbstractBounds> serializer()
- {
- return serializer;
- }
-
- private enum Type
- {
- RANGE,
- BOUNDS
- }
-
- public final Token left;
- public final Token right;
-
- protected transient final IPartitioner partitioner;
-
- public AbstractBounds(Token left, Token right, IPartitioner partitioner)
- {
- this.left = left;
- this.right = right;
- this.partitioner = partitioner;
- }
-
- @Override
- public int hashCode()
- {
- return toString().hashCode();
- }
-
- @Override
- public abstract boolean equals(Object obj);
-
- public abstract boolean contains(Token start);
-
- public abstract Set<AbstractBounds> restrictTo(Range range);
-
- public abstract List<AbstractBounds> unwrap();
-
- private static class AbstractBoundsSerializer implements ICompactSerializer2<AbstractBounds>
- {
- public void serialize(AbstractBounds range, DataOutput out) throws IOException
- {
- out.writeInt(range instanceof Range ? Type.RANGE.ordinal() : Type.BOUNDS.ordinal());
- Token.serializer().serialize(range.left, out);
- Token.serializer().serialize(range.right, out);
- }
-
- public AbstractBounds deserialize(DataInput in) throws IOException
- {
- if (in.readInt() == Type.RANGE.ordinal())
- return new Range(Token.serializer().deserialize(in), Token.serializer().deserialize(in));
- return new Bounds(Token.serializer().deserialize(in), Token.serializer().deserialize(in));
- }
- }
-}
-
+package org.apache.cassandra.dht;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.cassandra.io.ICompactSerializer2;
+
+public abstract class AbstractBounds implements Serializable
+{
+ private static AbstractBoundsSerializer serializer = new AbstractBoundsSerializer();
+
+ public static ICompactSerializer2<AbstractBounds> serializer()
+ {
+ return serializer;
+ }
+
+ private enum Type
+ {
+ RANGE,
+ BOUNDS
+ }
+
+ public final Token left;
+ public final Token right;
+
+ protected transient final IPartitioner partitioner;
+
+ public AbstractBounds(Token left, Token right, IPartitioner partitioner)
+ {
+ this.left = left;
+ this.right = right;
+ this.partitioner = partitioner;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return toString().hashCode();
+ }
+
+ @Override
+ public abstract boolean equals(Object obj);
+
+ public abstract boolean contains(Token start);
+
+ public abstract Set<AbstractBounds> restrictTo(Range range);
+
+ public abstract List<AbstractBounds> unwrap();
+
+ private static class AbstractBoundsSerializer implements ICompactSerializer2<AbstractBounds>
+ {
+ public void serialize(AbstractBounds range, DataOutput out) throws IOException
+ {
+ out.writeInt(range instanceof Range ? Type.RANGE.ordinal() : Type.BOUNDS.ordinal());
+ Token.serializer().serialize(range.left, out);
+ Token.serializer().serialize(range.right, out);
+ }
+
+ public AbstractBounds deserialize(DataInput in) throws IOException
+ {
+ if (in.readInt() == Type.RANGE.ordinal())
+ return new Range(Token.serializer().deserialize(in), Token.serializer().deserialize(in));
+ return new Bounds(Token.serializer().deserialize(in), Token.serializer().deserialize(in));
+ }
+ }
+}
+
Modified: incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/dht/Bounds.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/dht/Bounds.java?rev=915428&r1=915421&r2=915428&view=diff
==============================================================================
--- incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/dht/Bounds.java (original)
+++ incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/dht/Bounds.java Tue Feb 23 16:57:51 2010
@@ -1,73 +1,73 @@
-package org.apache.cassandra.dht;
-
-import java.util.*;
-
-import org.apache.cassandra.service.StorageService;
-
-public class Bounds extends AbstractBounds
-{
- public Bounds(Token left, Token right)
- {
- this(left, right, StorageService.getPartitioner());
- }
-
- Bounds(Token left, Token right, IPartitioner partitioner)
- {
- super(left, right, partitioner);
- // unlike a Range, a Bounds may not wrap
- assert left.compareTo(right) <= 0 || right.equals(partitioner.getMinimumToken()) : "[" + left + "," + right + "]";
- }
-
- @Override
- public boolean contains(Token token)
- {
- return Range.contains(left, right, token) || left.equals(token);
- }
-
- public Set<AbstractBounds> restrictTo(Range range)
- {
- Token min = partitioner.getMinimumToken();
-
- // special case Bounds where left=right (single Token)
- if (this.left.equals(this.right) && !this.right.equals(min))
- return range.contains(this.left)
- ? Collections.unmodifiableSet(new HashSet<AbstractBounds>(Arrays.asList(this)))
- : Collections.<AbstractBounds>emptySet();
-
- // get the intersection of a Range w/ same left & right
- Set<Range> ranges = range.intersectionWith(new Range(this.left, this.right));
- // if range doesn't contain left token anyway, that's the correct answer
- if (!range.contains(this.left))
- return (Set) ranges;
- // otherwise, add back in the left token
- Set<AbstractBounds> S = new HashSet<AbstractBounds>(ranges.size());
- for (Range restricted : ranges)
- {
- if (restricted.left.equals(this.left))
- S.add(new Bounds(restricted.left, restricted.right));
- else
- S.add(restricted);
- }
- return Collections.unmodifiableSet(S);
- }
-
- public List<AbstractBounds> unwrap()
- {
- // Bounds objects never wrap
- return (List)Arrays.asList(this);
- }
-
- @Override
- public boolean equals(Object o)
- {
- if (!(o instanceof Bounds))
- return false;
- Bounds rhs = (Bounds)o;
- return left.equals(rhs.left) && right.equals(rhs.right);
- }
-
- public String toString()
- {
- return "[" + left + "," + right + "]";
- }
-}
+package org.apache.cassandra.dht;
+
+import java.util.*;
+
+import org.apache.cassandra.service.StorageService;
+
+public class Bounds extends AbstractBounds
+{
+ public Bounds(Token left, Token right)
+ {
+ this(left, right, StorageService.getPartitioner());
+ }
+
+ Bounds(Token left, Token right, IPartitioner partitioner)
+ {
+ super(left, right, partitioner);
+ // unlike a Range, a Bounds may not wrap
+ assert left.compareTo(right) <= 0 || right.equals(partitioner.getMinimumToken()) : "[" + left + "," + right + "]";
+ }
+
+ @Override
+ public boolean contains(Token token)
+ {
+ return Range.contains(left, right, token) || left.equals(token);
+ }
+
+ public Set<AbstractBounds> restrictTo(Range range)
+ {
+ Token min = partitioner.getMinimumToken();
+
+ // special case Bounds where left=right (single Token)
+ if (this.left.equals(this.right) && !this.right.equals(min))
+ return range.contains(this.left)
+ ? Collections.unmodifiableSet(new HashSet<AbstractBounds>(Arrays.asList(this)))
+ : Collections.<AbstractBounds>emptySet();
+
+ // get the intersection of a Range w/ same left & right
+ Set<Range> ranges = range.intersectionWith(new Range(this.left, this.right));
+ // if range doesn't contain left token anyway, that's the correct answer
+ if (!range.contains(this.left))
+ return (Set) ranges;
+ // otherwise, add back in the left token
+ Set<AbstractBounds> S = new HashSet<AbstractBounds>(ranges.size());
+ for (Range restricted : ranges)
+ {
+ if (restricted.left.equals(this.left))
+ S.add(new Bounds(restricted.left, restricted.right));
+ else
+ S.add(restricted);
+ }
+ return Collections.unmodifiableSet(S);
+ }
+
+ public List<AbstractBounds> unwrap()
+ {
+ // Bounds objects never wrap
+ return (List)Arrays.asList(this);
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (!(o instanceof Bounds))
+ return false;
+ Bounds rhs = (Bounds)o;
+ return left.equals(rhs.left) && right.equals(rhs.right);
+ }
+
+ public String toString()
+ {
+ return "[" + left + "," + right + "]";
+ }
+}
Modified: incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/io/DeletionService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/io/DeletionService.java?rev=915428&r1=915421&r2=915428&view=diff
==============================================================================
--- incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/io/DeletionService.java (original)
+++ incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/io/DeletionService.java Tue Feb 23 16:57:51 2010
@@ -1,67 +1,67 @@
-package org.apache.cassandra.io;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.concurrent.ExecutorService;
-
-import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.utils.WrappedRunnable;
-
-public class DeletionService
-{
- public static final int MAX_RETRIES = 10;
-
- public static final ExecutorService executor = new JMXEnabledThreadPoolExecutor("FILEUTILS-DELETE-POOL");
-
- public static void submitDelete(final String file)
- {
- Runnable deleter = new WrappedRunnable()
- {
- @Override
- protected void runMayThrow() throws IOException
- {
- FileUtils.deleteWithConfirm(new File(file));
- }
- };
- executor.submit(deleter);
- }
-
- public static void submitDeleteWithRetry(String file)
- {
- submitDeleteWithRetry(file, 0);
- }
-
- private static void submitDeleteWithRetry(final String file, final int retryCount)
- {
- Runnable deleter = new WrappedRunnable()
- {
- @Override
- protected void runMayThrow() throws IOException
- {
- if (!new File(file).delete())
- {
- if (retryCount > MAX_RETRIES)
- throw new IOException("Unable to delete " + file + " after " + MAX_RETRIES + " tries");
- new Thread(new Runnable()
- {
- public void run()
- {
- try
- {
- Thread.sleep(10000);
- }
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
- submitDeleteWithRetry(file, retryCount + 1);
- }
- }, "Delete submission: " + file).start();
- }
- }
- };
- executor.submit(deleter);
- }
-}
+package org.apache.cassandra.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.WrappedRunnable;
+
+public class DeletionService
+{
+ public static final int MAX_RETRIES = 10;
+
+ public static final ExecutorService executor = new JMXEnabledThreadPoolExecutor("FILEUTILS-DELETE-POOL");
+
+ public static void submitDelete(final String file)
+ {
+ Runnable deleter = new WrappedRunnable()
+ {
+ @Override
+ protected void runMayThrow() throws IOException
+ {
+ FileUtils.deleteWithConfirm(new File(file));
+ }
+ };
+ executor.submit(deleter);
+ }
+
+ public static void submitDeleteWithRetry(String file)
+ {
+ submitDeleteWithRetry(file, 0);
+ }
+
+ private static void submitDeleteWithRetry(final String file, final int retryCount)
+ {
+ Runnable deleter = new WrappedRunnable()
+ {
+ @Override
+ protected void runMayThrow() throws IOException
+ {
+ if (!new File(file).delete())
+ {
+ if (retryCount > MAX_RETRIES)
+ throw new IOException("Unable to delete " + file + " after " + MAX_RETRIES + " tries");
+ new Thread(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ Thread.sleep(10000);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ submitDeleteWithRetry(file, retryCount + 1);
+ }
+ }, "Delete submission: " + file).start();
+ }
+ }
+ };
+ executor.submit(deleter);
+ }
+}
Modified: incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/io/SSTableDeletingReference.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/io/SSTableDeletingReference.java?rev=915428&r1=915421&r2=915428&view=diff
==============================================================================
--- incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/io/SSTableDeletingReference.java (original)
+++ incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/io/SSTableDeletingReference.java Tue Feb 23 16:57:51 2010
@@ -1,86 +1,86 @@
-package org.apache.cassandra.io;
-
-import java.io.File;
-import java.io.IOError;
-import java.io.IOException;
-import java.lang.ref.PhantomReference;
-import java.lang.ref.ReferenceQueue;
-import java.util.Timer;
-import java.util.TimerTask;
-
-import org.apache.log4j.Logger;
-
-import org.apache.cassandra.io.util.FileUtils;
-
-public class SSTableDeletingReference extends PhantomReference<SSTableReader>
-{
- private static final Logger logger = Logger.getLogger(SSTableDeletingReference.class);
-
- private static final Timer timer = new Timer("SSTABLE-CLEANUP-TIMER");
- public static final int RETRY_DELAY = 10000;
-
- private final SSTableTracker tracker;
- public final String path;
- private final long size;
- private boolean deleteOnCleanup;
-
- SSTableDeletingReference(SSTableTracker tracker, SSTableReader referent, ReferenceQueue<? super SSTableReader> q)
- {
- super(referent, q);
- this.tracker = tracker;
- this.path = referent.path;
- this.size = referent.bytesOnDisk();
- }
-
- public void deleteOnCleanup()
- {
- deleteOnCleanup = true;
- }
-
- public void cleanup() throws IOException
- {
- if (deleteOnCleanup)
- {
- // this is tricky because the mmapping might not have been finalized yet,
- // and delete will fail until it is. additionally, we need to make sure to
- // delete the data file first, so on restart the others will be recognized as GCable
- // even if the compaction marker gets deleted next.
- timer.schedule(new CleanupTask(), RETRY_DELAY);
- }
- }
-
- private class CleanupTask extends TimerTask
- {
- int attempts = 0;
-
- @Override
- public void run()
- {
- File datafile = new File(path);
- if (!datafile.delete())
- {
- if (attempts++ < DeletionService.MAX_RETRIES)
- {
- timer.schedule(this, RETRY_DELAY);
- return;
- }
- else
- {
- throw new RuntimeException("Unable to delete " + path);
- }
- }
- try
- {
- FileUtils.deleteWithConfirm(new File(SSTable.indexFilename(path)));
- FileUtils.deleteWithConfirm(new File(SSTable.filterFilename(path)));
- FileUtils.deleteWithConfirm(new File(SSTable.compactedFilename(path)));
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
- tracker.spaceReclaimed(size);
- logger.info("Deleted " + path);
- }
- }
-}
+package org.apache.cassandra.io;
+
+import java.io.File;
+import java.io.IOError;
+import java.io.IOException;
+import java.lang.ref.PhantomReference;
+import java.lang.ref.ReferenceQueue;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.io.util.FileUtils;
+
+public class SSTableDeletingReference extends PhantomReference<SSTableReader>
+{
+ private static final Logger logger = Logger.getLogger(SSTableDeletingReference.class);
+
+ private static final Timer timer = new Timer("SSTABLE-CLEANUP-TIMER");
+ public static final int RETRY_DELAY = 10000;
+
+ private final SSTableTracker tracker;
+ public final String path;
+ private final long size;
+ private boolean deleteOnCleanup;
+
+ SSTableDeletingReference(SSTableTracker tracker, SSTableReader referent, ReferenceQueue<? super SSTableReader> q)
+ {
+ super(referent, q);
+ this.tracker = tracker;
+ this.path = referent.path;
+ this.size = referent.bytesOnDisk();
+ }
+
+ public void deleteOnCleanup()
+ {
+ deleteOnCleanup = true;
+ }
+
+ public void cleanup() throws IOException
+ {
+ if (deleteOnCleanup)
+ {
+ // this is tricky because the mmapping might not have been finalized yet,
+ // and delete will fail until it is. additionally, we need to make sure to
+ // delete the data file first, so on restart the others will be recognized as GCable
+ // even if the compaction marker gets deleted next.
+ timer.schedule(new CleanupTask(), RETRY_DELAY);
+ }
+ }
+
+ private class CleanupTask extends TimerTask
+ {
+ int attempts = 0;
+
+ @Override
+ public void run()
+ {
+ File datafile = new File(path);
+ if (!datafile.delete())
+ {
+ if (attempts++ < DeletionService.MAX_RETRIES)
+ {
+ timer.schedule(this, RETRY_DELAY);
+ return;
+ }
+ else
+ {
+ throw new RuntimeException("Unable to delete " + path);
+ }
+ }
+ try
+ {
+ FileUtils.deleteWithConfirm(new File(SSTable.indexFilename(path)));
+ FileUtils.deleteWithConfirm(new File(SSTable.filterFilename(path)));
+ FileUtils.deleteWithConfirm(new File(SSTable.compactedFilename(path)));
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ tracker.spaceReclaimed(size);
+ logger.info("Deleted " + path);
+ }
+ }
+}
Modified: incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/io/util/FileDataInput.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/io/util/FileDataInput.java?rev=915428&r1=915421&r2=915428&view=diff
==============================================================================
--- incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/io/util/FileDataInput.java (original)
+++ incubator/cassandra/tags/cassandra-0.6.0-beta1/src/java/org/apache/cassandra/io/util/FileDataInput.java Tue Feb 23 16:57:51 2010
@@ -1,18 +1,18 @@
-package org.apache.cassandra.io.util;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.io.Closeable;
-
-public interface FileDataInput extends DataInput, Closeable
-{
- public String getPath();
-
- public boolean isEOF() throws IOException;
-
- public void mark();
-
- public void reset() throws IOException;
-
- public int bytesPastMark();
-}
+package org.apache.cassandra.io.util;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.io.Closeable;
+
+public interface FileDataInput extends DataInput, Closeable
+{
+ public String getPath();
+
+ public boolean isEOF() throws IOException;
+
+ public void mark();
+
+ public void reset() throws IOException;
+
+ public int bytesPastMark();
+}