You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sn...@apache.org on 2015/09/16 22:04:48 UTC
[7/7] cassandra git commit: Merge branch 'cassandra-2.1' into
cassandra-2.2
Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e63dacf7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e63dacf7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e63dacf7
Branch: refs/heads/cassandra-2.2
Commit: e63dacf793fedc8a9eed9c7fc635cde5f9fd68f3
Parents: 8b2dc1f e889ee4
Author: Robert Stupp <sn...@snazy.de>
Authored: Wed Sep 16 22:00:25 2015 +0200
Committer: Robert Stupp <sn...@snazy.de>
Committed: Wed Sep 16 22:00:25 2015 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/cache/AutoSavingCache.java | 193 ++++++++------
.../org/apache/cassandra/cache/CacheKey.java | 14 +-
.../apache/cassandra/cache/CounterCacheKey.java | 26 +-
.../org/apache/cassandra/cache/KeyCacheKey.java | 19 +-
.../org/apache/cassandra/cache/OHCProvider.java | 17 +-
.../org/apache/cassandra/cache/RowCacheKey.java | 34 +--
.../org/apache/cassandra/config/CFMetaData.java | 9 +
.../cassandra/config/DatabaseDescriptor.java | 19 +-
.../org/apache/cassandra/config/Schema.java | 56 +++-
.../apache/cassandra/db/ColumnFamilyStore.java | 75 ++----
src/java/org/apache/cassandra/db/Keyspace.java | 4 -
.../org/apache/cassandra/db/RowIndexEntry.java | 2 +-
.../db/index/SecondaryIndexManager.java | 30 +--
.../io/sstable/format/SSTableReader.java | 10 +-
.../io/sstable/format/big/BigTableReader.java | 2 +-
.../apache/cassandra/service/CacheService.java | 58 ++--
.../cassandra/service/CassandraDaemon.java | 41 ++-
.../cassandra/service/StorageService.java | 31 ++-
.../org/apache/cassandra/utils/FBUtilities.java | 16 ++
.../cassandra/cache/AutoSavingCacheTest.java | 5 +-
.../cassandra/cache/CacheProviderTest.java | 17 +-
.../apache/cassandra/cql3/KeyCacheCqlTest.java | 266 +++++++++++++++++++
.../apache/cassandra/db/CounterCacheTest.java | 70 ++++-
.../org/apache/cassandra/db/KeyCacheTest.java | 2 +-
.../org/apache/cassandra/db/RowCacheTest.java | 41 ++-
26 files changed, 760 insertions(+), 298 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 7deebcf,207f16a..96ec0fa
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,14 -1,5 +1,15 @@@
-2.1.10
+2.2.2
+ * Defer default role manager setup until all nodes are on 2.2+ (CASSANDRA-9761)
+ * Cancel transaction for sstables we wont redistribute index summary
+ for (CASSANDRA-10270)
+ * Handle missing RoleManager in config after upgrade to 2.2 (CASSANDRA-10209)
+ * Retry snapshot deletion after compaction and gc on Windows (CASSANDRA-10222)
+ * Fix failure to start with space in directory path on Windows (CASSANDRA-10239)
+ * Fix repair hang when snapshot failed (CASSANDRA-10057)
+ * Fall back to 1/4 commitlog volume for commitlog_total_space on small disks
+ (CASSANDRA-10199)
+Merged from 2.1:
+ * Fix cache handling of 2i and base tables (CASSANDRA-10155)
* Fix NPE in nodetool compactionhistory (CASSANDRA-9758)
* (Pig) support BulkOutputFormat as a URL parameter (CASSANDRA-7410)
* BATCH statement is broken in cqlsh (CASSANDRA-10272)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cache/AutoSavingCache.java
index f0f4e8a,3ebbc76..3ec9d4e
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@@ -61,8 -65,16 +67,16 @@@ public class AutoSavingCache<K extends
protected volatile ScheduledFuture<?> saveTask;
protected final CacheService.CacheType cacheType;
- private CacheSerializer<K, V> cacheLoader;
+ private final CacheSerializer<K, V> cacheLoader;
- private static final String CURRENT_VERSION = "c";
+
+ /*
+ * CASSANDRA-10155 required a format change to fix 2i indexes and caching.
+ * 2.2 is already at version "c" and 3.0 is at "d".
+ *
+ * Since cache versions match exactly and there is no partial fallback just add
+ * a minor version letter.
+ */
- private static final String CURRENT_VERSION = "ba";
++ private static final String CURRENT_VERSION = "ca";
private static volatile IStreamFactory streamFactory = new IStreamFactory()
{
@@@ -90,16 -102,9 +104,14 @@@
this.cacheLoader = cacheloader;
}
- public File getCacheDataPath(UUID cfId, String version)
- public File getCachePath(String version)
++ public File getCacheDataPath(String version)
{
- Pair<String, String> names = Schema.instance.getCF(cfId);
- return DatabaseDescriptor.getSerializedCachePath(names.left, names.right, cfId, cacheType, version, "db");
- return DatabaseDescriptor.getSerializedCachePath(cacheType, version);
++ return DatabaseDescriptor.getSerializedCachePath( cacheType, version, "db");
+ }
+
- public File getCacheCrcPath(UUID cfId, String version)
++ public File getCacheCrcPath(String version)
+ {
- Pair<String, String> names = Schema.instance.getCF(cfId);
- return DatabaseDescriptor.getSerializedCachePath(names.left, names.right, cfId, cacheType, version, "crc");
++ return DatabaseDescriptor.getSerializedCachePath( cacheType, version, "crc");
}
public Writer getWriter(int keysToSave)
@@@ -136,42 -170,65 +177,70 @@@
long start = System.nanoTime();
// modern format, allows both key and value (so key cache load can be purely sequential)
- File dataPath = getCacheDataPath(cfs.metadata.cfId, CURRENT_VERSION);
- File crcPath = getCacheCrcPath(cfs.metadata.cfId, CURRENT_VERSION);
- File path = getCachePath(CURRENT_VERSION);
- if (path.exists())
++ File dataPath = getCacheDataPath(CURRENT_VERSION);
++ File crcPath = getCacheCrcPath(CURRENT_VERSION);
+ if (dataPath.exists() && crcPath.exists())
{
DataInputStream in = null;
try
{
- logger.info(String.format("reading saved cache %s", path));
- in = new DataInputStream(new LengthAvailableInputStream(new BufferedInputStream(streamFactory.getInputStream(path)), path.length()));
+ logger.info(String.format("reading saved cache %s", dataPath));
+ in = new DataInputStream(new LengthAvailableInputStream(new BufferedInputStream(streamFactory.getInputStream(dataPath, crcPath)), dataPath.length()));
- List<Future<Pair<K, V>>> futures = new ArrayList<Future<Pair<K, V>>>();
+ ArrayDeque<Future<Pair<K, V>>> futures = new ArrayDeque<Future<Pair<K, V>>>();
-
while (in.available() > 0)
{
- Future<Pair<K, V>> entry = cacheLoader.deserialize(in, cfs);
+ //ksname and cfname are serialized by the serializers in CacheService
+ //That is delegated there because there are serializer specific conditions
+ //where a cache key is skipped and not written
+ String ksname = in.readUTF();
+ String cfname = in.readUTF();
+
+ ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreIncludingIndexes(Pair.create(ksname, cfname));
+
+ Future<Pair<K, V>> entryFuture = cacheLoader.deserialize(in, cfs);
// Key cache entry can return null, if the SSTable doesn't exist.
- if (entry == null)
+ if (entryFuture == null)
continue;
- futures.add(entry);
+
+ futures.offer(entryFuture);
count++;
+
+ /*
+ * Kind of unwise to accrue an unbounded number of pending futures
+ * So now there is this loop to keep a bounded number pending.
+ */
+ do
+ {
+ while (futures.peek() != null && futures.peek().isDone())
+ {
+ Future<Pair<K, V>> future = futures.poll();
+ Pair<K, V> entry = future.get();
+ if (entry != null && entry.right != null)
+ put(entry.left, entry.right);
+ }
+
+ if (futures.size() > 1000)
+ Thread.yield();
+ } while(futures.size() > 1000);
}
- for (Future<Pair<K, V>> future : futures)
+ Future<Pair<K, V>> future = null;
+ while ((future = futures.poll()) != null)
{
Pair<K, V> entry = future.get();
if (entry != null && entry.right != null)
put(entry.left, entry.right);
}
}
+ catch (CorruptFileException e)
+ {
+ JVMStabilityInspector.inspectThrowable(e);
+ logger.warn(String.format("Non-fatal checksum error reading saved cache %s", dataPath.getAbsolutePath()), e);
+ }
- catch (Exception e)
+ catch (Throwable t)
{
- JVMStabilityInspector.inspectThrowable(e);
- logger.debug(String.format("harmless error reading saved cache %s", dataPath.getAbsolutePath()), e);
+ JVMStabilityInspector.inspectThrowable(t);
- logger.info(String.format("Harmless error reading saved cache %s", path.getAbsolutePath()), t);
++ logger.info(String.format("Harmless error reading saved cache %s", dataPath.getAbsolutePath()), t);
}
finally
{
@@@ -236,11 -284,9 +305,10 @@@
public CompactionInfo getCompactionInfo()
{
// keyset can change in size, thus total can too
- return info.forProgress(keysWritten, Math.max(keysWritten, keys.size()));
+ // TODO need to check for this one... was: info.forProgress(keysWritten, Math.max(keysWritten, keys.size()));
+ return info.forProgress(keysWritten, Math.max(keysWritten, keysEstimate));
}
- @SuppressWarnings("resource")
public void saveCache()
{
logger.debug("Deleting old {} files.", cacheType);
@@@ -254,37 -300,25 +322,26 @@@
long start = System.nanoTime();
- HashMap<UUID, DataOutputPlus> writers = new HashMap<>();
- HashMap<UUID, OutputStream> streams = new HashMap<>();
- HashMap<UUID, Pair<File, File>> paths = new HashMap<>();
-
- DataOutputStreamPlus writer = null;
- File tempCacheFile = tempCacheFile();
++ WrappedDataOutputStreamPlus writer = null;
++ Pair<File, File> cacheFilePaths = tempCacheFiles();
try
{
+ try
+ {
- writer = new DataOutputStreamPlus(streamFactory.getOutputStream(tempCacheFile));
++ writer = new WrappedDataOutputStreamPlus(streamFactory.getOutputStream(cacheFilePaths.left, cacheFilePaths.right));
+ }
+ catch (FileNotFoundException e)
+ {
+ throw new RuntimeException(e);
+ }
+
- for (K key : keys)
+ while (keyIterator.hasNext())
{
+ K key = keyIterator.next();
- UUID cfId = key.getCFId();
- if (!Schema.instance.hasCF(key.getCFId()))
- continue; // the table has been dropped.
- DataOutputPlus writer = writers.get(cfId);
- if (writer == null)
- {
- Pair<File, File> cacheFilePaths = tempCacheFiles(cfId);
- OutputStream stream;
- try
- {
- stream = streamFactory.getOutputStream(cacheFilePaths.left, cacheFilePaths.right);
- writer = new WrappedDataOutputStreamPlus(stream);
- }
- catch (FileNotFoundException e)
- {
- throw new RuntimeException(e);
- }
- paths.put(cfId, cacheFilePaths);
- streams.put(cfId, stream);
- writers.put(cfId, writer);
- }
+ ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreIncludingIndexes(key.ksAndCFName);
+ if (cfs == null)
+ continue; // the table or 2i has been dropped.
try
{
@@@ -292,7 -326,7 +349,7 @@@
}
catch (IOException e)
{
- throw new FSWriteError(e, paths.get(cfId).left);
- throw new FSWriteError(e, tempCacheFile);
++ throw new FSWriteError(e, cacheFilePaths.left);
}
keysWritten++;
@@@ -302,49 -334,24 +359,31 @@@
}
finally
{
- if (keyIterator instanceof Closeable)
- try
- {
- ((Closeable)keyIterator).close();
- }
- catch (IOException ignored)
- {
- // not thrown (by OHC)
- }
-
- for (OutputStream writer : streams.values())
- {
+ if (writer != null)
FileUtils.closeQuietly(writer);
- }
}
- for (Map.Entry<UUID, DataOutputPlus> entry : writers.entrySet())
- {
- UUID cfId = entry.getKey();
- File cacheFile = getCachePath(CURRENT_VERSION);
++ File cacheFile = getCacheDataPath(CURRENT_VERSION);
++ File crcFile = getCacheCrcPath(CURRENT_VERSION);
- Pair<File, File> tmpFiles = paths.get(cfId);
- File cacheFile = getCacheDataPath(cfId, CURRENT_VERSION);
- File crcFile = getCacheCrcPath(cfId, CURRENT_VERSION);
+ cacheFile.delete(); // ignore error if it didn't exist
++ crcFile.delete();
+
- cacheFile.delete(); // ignore error if it didn't exist
- crcFile.delete();
++ if (!cacheFilePaths.left.renameTo(cacheFile))
++ logger.error("Unable to rename {} to {}", cacheFilePaths.left, cacheFile);
- if (!tmpFiles.left.renameTo(cacheFile))
- logger.error("Unable to rename {} to {}", tmpFiles.left, cacheFile);
-
- if (!tmpFiles.right.renameTo(crcFile))
- logger.error("Unable to rename {} to {}", tmpFiles.right, crcFile);
- }
- if (!tempCacheFile.renameTo(cacheFile))
- logger.error("Unable to rename {} to {}", tempCacheFile, cacheFile);
++ if (!cacheFilePaths.right.renameTo(crcFile))
++ logger.error("Unable to rename {} to {}", cacheFilePaths.right, crcFile);
- logger.info("Saved {} ({} items) in {} ms", cacheType, keys.size(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
+ logger.info("Saved {} ({} items) in {} ms", cacheType, keysWritten, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
}
- private Pair<File, File> tempCacheFiles(UUID cfId)
- private File tempCacheFile()
++ private Pair<File, File> tempCacheFiles()
{
- File dataPath = getCacheDataPath(cfId, CURRENT_VERSION);
- File crcPath = getCacheCrcPath(cfId, CURRENT_VERSION);
- File path = getCachePath(CURRENT_VERSION);
- return FileUtils.createTempFile(path.getName(), null, path.getParentFile());
++ File dataPath = getCacheDataPath(CURRENT_VERSION);
++ File crcPath = getCacheCrcPath(CURRENT_VERSION);
+ return Pair.create(FileUtils.createTempFile(dataPath.getName(), null, dataPath.getParentFile()),
+ FileUtils.createTempFile(crcPath.getName(), null, crcPath.getParentFile()));
}
private void deleteOldCacheFiles()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/cache/OHCProvider.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cache/OHCProvider.java
index e4cfb69,0000000..9b1c8cf
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cache/OHCProvider.java
+++ b/src/java/org/apache/cassandra/cache/OHCProvider.java
@@@ -1,282 -1,0 +1,285 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cache;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.Iterator;
- import java.util.UUID;
+
+import com.google.common.base.Function;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.Memory;
+import org.apache.cassandra.net.MessagingService;
++import org.apache.cassandra.utils.Pair;
+import org.caffinitas.ohc.OHCache;
+import org.caffinitas.ohc.OHCacheBuilder;
+
+public class OHCProvider implements CacheProvider<RowCacheKey, IRowCacheEntry>
+{
+ public ICache<RowCacheKey, IRowCacheEntry> create()
+ {
+ OHCacheBuilder<RowCacheKey, IRowCacheEntry> builder = OHCacheBuilder.newBuilder();
+ builder.capacity(DatabaseDescriptor.getRowCacheSizeInMB() * 1024 * 1024)
+ .keySerializer(new KeySerializer())
+ .valueSerializer(new ValueSerializer())
+ .throwOOME(true);
+
+ return new OHCacheAdapter(builder.build());
+ }
+
+ private static class OHCacheAdapter implements ICache<RowCacheKey, IRowCacheEntry>
+ {
+ private final OHCache<RowCacheKey, IRowCacheEntry> ohCache;
+
+ public OHCacheAdapter(OHCache<RowCacheKey, IRowCacheEntry> ohCache)
+ {
+ this.ohCache = ohCache;
+ }
+
+ public long capacity()
+ {
+ return ohCache.capacity();
+ }
+
+ public void setCapacity(long capacity)
+ {
+ ohCache.setCapacity(capacity);
+ }
+
+ public void put(RowCacheKey key, IRowCacheEntry value)
+ {
+ ohCache.put(key, value);
+ }
+
+ public boolean putIfAbsent(RowCacheKey key, IRowCacheEntry value)
+ {
+ return ohCache.putIfAbsent(key, value);
+ }
+
+ public boolean replace(RowCacheKey key, IRowCacheEntry old, IRowCacheEntry value)
+ {
+ return ohCache.addOrReplace(key, old, value);
+ }
+
+ public IRowCacheEntry get(RowCacheKey key)
+ {
+ return ohCache.get(key);
+ }
+
+ public void remove(RowCacheKey key)
+ {
+ ohCache.remove(key);
+ }
+
+ public int size()
+ {
+ return (int) ohCache.size();
+ }
+
+ public long weightedSize()
+ {
+ return ohCache.size();
+ }
+
+ public void clear()
+ {
+ ohCache.clear();
+ }
+
+ public Iterator<RowCacheKey> hotKeyIterator(int n)
+ {
+ return ohCache.hotKeyIterator(n);
+ }
+
+ public Iterator<RowCacheKey> keyIterator()
+ {
+ return ohCache.keyIterator();
+ }
+
+ public boolean containsKey(RowCacheKey key)
+ {
+ return ohCache.containsKey(key);
+ }
+ }
+
+ private static class KeySerializer implements org.caffinitas.ohc.CacheSerializer<RowCacheKey>
+ {
+ public void serialize(RowCacheKey rowCacheKey, DataOutput dataOutput) throws IOException
+ {
- dataOutput.writeLong(rowCacheKey.cfId.getMostSignificantBits());
- dataOutput.writeLong(rowCacheKey.cfId.getLeastSignificantBits());
++ dataOutput.writeUTF(rowCacheKey.ksAndCFName.left);
++ dataOutput.writeUTF(rowCacheKey.ksAndCFName.right);
+ dataOutput.writeInt(rowCacheKey.key.length);
+ dataOutput.write(rowCacheKey.key);
+ }
+
+ public RowCacheKey deserialize(DataInput dataInput) throws IOException
+ {
- long msb = dataInput.readLong();
- long lsb = dataInput.readLong();
++ String ksName = dataInput.readUTF();
++ String cfName = dataInput.readUTF();
+ byte[] key = new byte[dataInput.readInt()];
+ dataInput.readFully(key);
- return new RowCacheKey(new UUID(msb, lsb), key);
++ return new RowCacheKey(Pair.create(ksName, cfName), key);
+ }
+
+ public int serializedSize(RowCacheKey rowCacheKey)
+ {
- return 20 + rowCacheKey.key.length;
++ return TypeSizes.NATIVE.sizeof(rowCacheKey.ksAndCFName.left)
++ + TypeSizes.NATIVE.sizeof(rowCacheKey.ksAndCFName.right)
++ + 4
++ + rowCacheKey.key.length;
+ }
+ }
+
+ private static class ValueSerializer implements org.caffinitas.ohc.CacheSerializer<IRowCacheEntry>
+ {
+ public void serialize(IRowCacheEntry entry, DataOutput out) throws IOException
+ {
+ assert entry != null; // unlike CFS we don't support nulls, since there is no need for that in the cache
+ boolean isSentinel = entry instanceof RowCacheSentinel;
+ out.writeBoolean(isSentinel);
+ if (isSentinel)
+ out.writeLong(((RowCacheSentinel) entry).sentinelId);
+ else
+ ColumnFamily.serializer.serialize((ColumnFamily) entry, new DataOutputPlusAdapter(out), MessagingService.current_version);
+ }
+
+ public IRowCacheEntry deserialize(DataInput in) throws IOException
+ {
+ boolean isSentinel = in.readBoolean();
+ if (isSentinel)
+ return new RowCacheSentinel(in.readLong());
+ return ColumnFamily.serializer.deserialize(in, MessagingService.current_version);
+ }
+
+ public int serializedSize(IRowCacheEntry entry)
+ {
+ TypeSizes typeSizes = TypeSizes.NATIVE;
+ int size = typeSizes.sizeof(true);
+ if (entry instanceof RowCacheSentinel)
+ size += typeSizes.sizeof(((RowCacheSentinel) entry).sentinelId);
+ else
+ size += ColumnFamily.serializer.serializedSize((ColumnFamily) entry, typeSizes, MessagingService.current_version);
+ return size;
+ }
+ }
+
+ static class DataOutputPlusAdapter implements DataOutputPlus
+ {
+ private final DataOutput out;
+
+ public void write(byte[] b) throws IOException
+ {
+ out.write(b);
+ }
+
+ public void write(byte[] b, int off, int len) throws IOException
+ {
+ out.write(b, off, len);
+ }
+
+ public void write(int b) throws IOException
+ {
+ out.write(b);
+ }
+
+ public void writeBoolean(boolean v) throws IOException
+ {
+ out.writeBoolean(v);
+ }
+
+ public void writeByte(int v) throws IOException
+ {
+ out.writeByte(v);
+ }
+
+ public void writeBytes(String s) throws IOException
+ {
+ out.writeBytes(s);
+ }
+
+ public void writeChar(int v) throws IOException
+ {
+ out.writeChar(v);
+ }
+
+ public void writeChars(String s) throws IOException
+ {
+ out.writeChars(s);
+ }
+
+ public void writeDouble(double v) throws IOException
+ {
+ out.writeDouble(v);
+ }
+
+ public void writeFloat(float v) throws IOException
+ {
+ out.writeFloat(v);
+ }
+
+ public void writeInt(int v) throws IOException
+ {
+ out.writeInt(v);
+ }
+
+ public void writeLong(long v) throws IOException
+ {
+ out.writeLong(v);
+ }
+
+ public void writeShort(int v) throws IOException
+ {
+ out.writeShort(v);
+ }
+
+ public void writeUTF(String s) throws IOException
+ {
+ out.writeUTF(s);
+ }
+
+ public DataOutputPlusAdapter(DataOutput out)
+ {
+ this.out = out;
+ }
+
+ public void write(ByteBuffer buffer) throws IOException
+ {
+ if (buffer.hasArray())
+ out.write(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
+ else
+ throw new UnsupportedOperationException("IMPLEMENT ME");
+ }
+
+ public void write(Memory memory, long offset, long length) throws IOException
+ {
+ throw new UnsupportedOperationException("IMPLEMENT ME");
+ }
+
+ public <R> R applyToChannel(Function<WritableByteChannel, R> c) throws IOException
+ {
+ throw new UnsupportedOperationException("IMPLEMENT ME");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/cache/RowCacheKey.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cache/RowCacheKey.java
index ccb85d8,c959fd1..e02db42
--- a/src/java/org/apache/cassandra/cache/RowCacheKey.java
+++ b/src/java/org/apache/cassandra/cache/RowCacheKey.java
@@@ -33,20 -31,14 +31,20 @@@ public final class RowCacheKey extends
private static final long EMPTY_SIZE = ObjectSizes.measure(new RowCacheKey(null, ByteBufferUtil.EMPTY_BYTE_BUFFER));
- public RowCacheKey(UUID cfId, byte[] key)
++ public RowCacheKey(Pair<String, String> ksAndCFName, byte[] key)
+ {
- this.cfId = cfId;
++ super(ksAndCFName);
+ this.key = key;
+ }
+
- public RowCacheKey(UUID cfId, DecoratedKey key)
+ public RowCacheKey(Pair<String, String> ksAndCFName, DecoratedKey key)
{
- this(cfId, key.getKey());
+ this(ksAndCFName, key.getKey());
}
- public RowCacheKey(UUID cfId, ByteBuffer key)
+ public RowCacheKey(Pair<String, String> ksAndCFName, ByteBuffer key)
{
- this.cfId = cfId;
+ super(ksAndCFName);
this.key = ByteBufferUtil.getArray(key);
assert this.key != null;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/CFMetaData.java
index 6468973,2939f09..348eb89
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@@ -47,11 -48,15 +47,12 @@@ import org.apache.cassandra.db.marshal.
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.io.compress.CompressionParameters;
import org.apache.cassandra.io.compress.LZ4Compressor;
-import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.thrift.CfDef;
-import org.apache.cassandra.thrift.CqlResult;
-import org.apache.cassandra.thrift.CqlRow;
-import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.schema.LegacySchemaTables;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.UUIDGen;
import org.github.jamm.Unmetered;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 545ad05,84381a0..c459b5d
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@@ -1480,20 -1431,17 +1480,11 @@@ public class DatabaseDescripto
return conf.max_hint_window_in_ms;
}
- public static File getSerializedCachePath(String ksName,
- String cfName,
- UUID cfId,
- CacheService.CacheType cacheType,
- String version,
- String extension)
- @Deprecated
- public static Integer getIndexInterval()
-- {
- StringBuilder builder = new StringBuilder();
- builder.append(ksName).append('-');
- builder.append(cfName).append('-');
- builder.append(ByteBufferUtil.bytesToHex(ByteBufferUtil.bytes(cfId))).append('-');
- builder.append(cacheType);
- builder.append((version == null ? "" : "-" + version + "." + extension));
- return new File(conf.saved_caches_directory, builder.toString());
- return conf.index_interval;
- }
-
- public static File getSerializedCachePath(CacheService.CacheType cacheType, String version)
++ public static File getSerializedCachePath(CacheService.CacheType cacheType, String version, String extension)
+ {
+ String name = cacheType.toString()
- + (version == null ? "" : "-" + version + ".db");
++ + (version == null ? "" : "-" + version + "." + extension);
+ return new File(conf.saved_caches_directory, name);
}
public static int getDynamicUpdateInterval()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/config/Schema.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/Schema.java
index 548341e,fada670..00c9358
--- a/src/java/org/apache/cassandra/config/Schema.java
+++ b/src/java/org/apache/cassandra/config/Schema.java
@@@ -26,18 -28,14 +26,19 @@@ import com.google.common.collect.Sets
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.cql3.functions.Functions;
+import org.apache.cassandra.cql3.functions.UDAggregate;
+import org.apache.cassandra.cql3.functions.UDFunction;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.marshal.UserType;
+ import org.apache.cassandra.db.index.SecondaryIndex;
-import org.apache.cassandra.db.index.SecondaryIndexManager;
import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.schema.LegacySchemaTables;
import org.apache.cassandra.service.MigrationManager;
import org.apache.cassandra.utils.ConcurrentBiMap;
-import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Pair;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 343ecee,ffaa276..a8a8910
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -1665,9 -1655,9 +1626,9 @@@ public class ColumnFamilyStore implemen
private ColumnFamily getThroughCache(UUID cfId, QueryFilter filter)
{
assert isRowCacheEnabled()
- : String.format("Row cache is not enabled on column family [" + name + "]");
+ : String.format("Row cache is not enabled on table [" + name + "]");
- RowCacheKey key = new RowCacheKey(cfId, filter.key);
+ RowCacheKey key = new RowCacheKey(metadata.ksAndCFName, filter.key);
// attempt a sentinel-read-cache sequence. if a write invalidates our sentinel, we'll return our
// (now potentially obsolete) data, but won't cache it. see CASSANDRA-3862
@@@ -2075,23 -2026,19 +2036,23 @@@
{
Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(keyspace.getName());
- for (RowCacheKey key : CacheService.instance.rowCache.getKeySet())
+ for (Iterator<RowCacheKey> keyIter = CacheService.instance.rowCache.keyIterator();
+ keyIter.hasNext(); )
{
+ RowCacheKey key = keyIter.next();
DecoratedKey dk = partitioner.decorateKey(ByteBuffer.wrap(key.key));
- if (key.cfId.equals(metadata.cfId) && !Range.isInRanges(dk.getToken(), ranges))
+ if (key.ksAndCFName.equals(metadata.ksAndCFName) && !Range.isInRanges(dk.getToken(), ranges))
invalidateCachedRow(dk);
}
if (metadata.isCounter())
{
- for (CounterCacheKey key : CacheService.instance.counterCache.getKeySet())
+ for (Iterator<CounterCacheKey> keyIter = CacheService.instance.counterCache.keyIterator();
+ keyIter.hasNext(); )
{
+ CounterCacheKey key = keyIter.next();
DecoratedKey dk = partitioner.decorateKey(ByteBuffer.wrap(key.partitionKey));
- if (key.cfId.equals(metadata.cfId) && !Range.isInRanges(dk.getToken(), ranges))
+ if (key.ksAndCFName.equals(metadata.ksAndCFName) && !Range.isInRanges(dk.getToken(), ranges))
CacheService.instance.counterCache.remove(key);
}
}
@@@ -2965,13 -2955,21 +2926,13 @@@
}
}
- /**
- * Returns the creation time of the oldest memtable not fully flushed yet.
- */
- public long oldestUnflushedMemtable()
- {
- return data.getView().getOldestMemtable().creationTime();
- }
-
public boolean isEmpty()
{
- DataTracker.View view = data.getView();
- return view.sstables.isEmpty() && view.getCurrentMemtable().getOperations() == 0 && view.getCurrentMemtable() == view.getOldestMemtable();
+ View view = data.getView();
+ return view.sstables.isEmpty() && view.getCurrentMemtable().getOperations() == 0 && view.liveMemtables.size() <= 1 && view.flushingMemtables.size() == 0;
}
- private boolean isRowCacheEnabled()
+ public boolean isRowCacheEnabled()
{
return metadata.getCaching().rowCache.isEnabled() && CacheService.instance.rowCache.getCapacity() > 0;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/db/RowIndexEntry.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------