You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/09/18 19:45:20 UTC
[09/10] git commit: merge from 1.2
merge from 1.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2648047a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2648047a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2648047a
Branch: refs/heads/cassandra-2.0
Commit: 2648047a443306c8f2fc921c353cdbd54d964c5f
Parents: e93578b 7161aec
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Sep 18 12:44:31 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Sep 18 12:44:31 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
doc/cql3/CQL.textile | 2 +-
.../org/apache/cassandra/db/Directories.java | 62 +++++---------------
.../db/compaction/CompactionManager.java | 2 +-
.../cassandra/db/compaction/Scrubber.java | 2 +-
.../cassandra/io/util/DiskAwareRunnable.java | 2 +-
.../cassandra/streaming/StreamReader.java | 2 +-
.../cassandra/db/ColumnFamilyStoreTest.java | 7 +--
.../apache/cassandra/db/DirectoriesTest.java | 2 +-
.../compaction/LegacyLeveledManifestTest.java | 2 +-
.../io/sstable/SSTableSimpleWriterTest.java | 2 +-
11 files changed, 28 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2648047a/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 98ea03f,fb9915e..fd8d852
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,30 -1,5 +1,31 @@@
-1.2.10
+2.0.1
+ * add file_cache_size_in_mb setting (CASSANDRA-5661)
+ * Improve error message when yaml contains invalid properties (CASSANDRA-5958)
+ * Improve leveled compaction's ability to find non-overlapping L0 compactions
+ to work on concurrently (CASSANDRA-5921)
+ * Notify indexer of columns shadowed by range tombstones (CASSANDRA-5614)
+ * Log Merkle tree stats (CASSANDRA-2698)
+ * Switch from crc32 to adler32 for compressed sstable checksums (CASSANDRA-5862)
+ * Improve offheap memcpy performance (CASSANDRA-5884)
+ * Use a range aware scanner for cleanup (CASSANDRA-2524)
+ * Cleanup doesn't need to inspect sstables that contain only local data
+ (CASSANDRA-5722)
+ * Add ability for CQL3 to list partition keys (CASSANDRA-4536)
+ * Improve native protocol serialization (CASSANDRA-5664)
+ * Upgrade Thrift to 0.9.1 (CASSANDRA-5923)
+ * Require superuser status for adding triggers (CASSANDRA-5963)
+ * Make standalone scrubber handle old and new style leveled manifest
+ (CASSANDRA-6005)
+ * Fix paxos bugs (CASSANDRA-6012, 6013, 6023)
+ * Fix paged ranges with multiple replicas (CASSANDRA-6004)
+ * Fix potential AssertionError during tracing (CASSANDRA-6041)
+ * Fix NPE in sstablesplit (CASSANDRA-6027)
+ * Migrate pre-2.0 key/value/column aliases to system.schema_columns
+ (CASSANDRA-6009)
+ * Paging filter empty rows too agressively (CASSANDRA-6040)
+ * Support variadic parameters for IN clauses (CASSANDRA-4210)
+Merged from 1.2:
+ * Avoid second-guessing out-of-space state (CASSANDRA-5605)
* Tuning knobs for dealing with large blobs and many CFs (CASSANDRA-5982)
* (Hadoop) Fix CQLRW for thrift tables (CASSANDRA-6002)
* Fix possible divide-by-zero in HHOM (CASSANDRA-5990)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2648047a/doc/cql3/CQL.textile
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2648047a/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Directories.java
index 52f9699,351c0c0..18fc639
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@@ -19,9 -19,9 +19,10 @@@ package org.apache.cassandra.db
import java.io.File;
import java.io.FileFilter;
+ import java.io.IOError;
import java.io.IOException;
import java.util.*;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@@ -149,8 -147,15 +150,8 @@@ public class Directorie
// retry after GCing has forced unmap of compacted SSTables so they can be deleted
// Note: GCInspector will do this already, but only sun JVM supports GCInspector so far
SSTableDeletingTask.rescheduleFailedTasks();
- try
- {
- Thread.sleep(10000);
- }
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
+ Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
- path = getLocationWithMaximumAvailableSpace(estimatedSize);
+ path = getWriteableLocationAsFile();
}
return path;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2648047a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2648047a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2648047a/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReader.java
index 862f5a2,0000000..d72cb5e
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@@ -1,137 -1,0 +1,137 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.util.Collection;
+import java.util.UUID;
+
+import com.google.common.base.Throwables;
+import com.ning.compress.lzf.LZFInputStream;
+
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.SSTableWriter;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.messages.FileMessageHeader;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.BytesReadTracker;
+import org.apache.cassandra.utils.Pair;
+
+/**
+ * StreamReader reads from stream and writes to SSTable.
+ */
+public class StreamReader
+{
+ protected final UUID cfId;
+ protected final long estimatedKeys;
+ protected final Collection<Pair<Long, Long>> sections;
+ protected final StreamSession session;
+ protected final Descriptor.Version inputVersion;
+
+ protected Descriptor desc;
+
+ public StreamReader(FileMessageHeader header, StreamSession session)
+ {
+ this.session = session;
+ this.cfId = header.cfId;
+ this.estimatedKeys = header.estimatedKeys;
+ this.sections = header.sections;
+ this.inputVersion = new Descriptor.Version(header.version);
+ }
+
+ /**
+ * @param channel where this reads data from
+ * @return SSTable transferred
+ * @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails.
+ */
+ public SSTableReader read(ReadableByteChannel channel) throws IOException
+ {
+ long totalSize = totalSize();
+
+ Pair<String, String> kscf = Schema.instance.getCF(cfId);
+ ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+
+ SSTableWriter writer = createWriter(cfs, totalSize);
+ DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel)));
+ BytesReadTracker in = new BytesReadTracker(dis);
+ try
+ {
+ while (in.getBytesRead() < totalSize)
+ {
+ writeRow(writer, in, cfs);
+ // TODO move this to BytesReadTracker
+ session.progress(desc, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
+ }
+ return writer.closeAndOpenReader();
+ }
+ catch (Throwable e)
+ {
+ writer.abort();
+ drain(dis, in.getBytesRead());
+ if (e instanceof IOException)
+ throw (IOException) e;
+ else
+ throw Throwables.propagate(e);
+ }
+ }
+
+ protected SSTableWriter createWriter(ColumnFamilyStore cfs, long totalSize) throws IOException
+ {
- Directories.DataDirectory localDir = cfs.directories.getLocationCapableOfSize(totalSize);
++ Directories.DataDirectory localDir = cfs.directories.getWriteableLocation();
+ if (localDir == null)
+ throw new IOException("Insufficient disk space to store " + totalSize + " bytes");
+ desc = Descriptor.fromFilename(cfs.getTempSSTablePath(cfs.directories.getLocationForDisk(localDir)));
+
+ return new SSTableWriter(desc.filenameFor(Component.DATA), estimatedKeys);
+ }
+
+ protected void drain(InputStream dis, long bytesRead) throws IOException
+ {
+ long toSkip = totalSize() - bytesRead;
+ toSkip = toSkip - dis.skip(toSkip);
+ while (toSkip > 0)
+ toSkip = toSkip - dis.skip(toSkip);
+ }
+
+ protected long totalSize()
+ {
+ long size = 0;
+ for (Pair<Long, Long> section : sections)
+ size += section.right - section.left;
+ return size;
+ }
+
+ protected void writeRow(SSTableWriter writer, DataInput in, ColumnFamilyStore cfs) throws IOException
+ {
+ DecoratedKey key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in));
+ writer.appendFromStream(key, cfs.metadata, in, inputVersion);
+ cfs.invalidateCachedRow(key);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2648047a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index f21b60e,abe3f05..0a78b2a
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@@ -1334,66 -1240,12 +1334,65 @@@ public class ColumnFamilyStoreTest exte
testMultiRangeSlicesBehavior(prepareMultiRangeSlicesTest(10, false));
}
+ @Test
+ public void testRemoveUnifinishedCompactionLeftovers() throws Throwable
+ {
+ String ks = "Keyspace1";
+ String cf = "Standard3"; // should be empty
+
+ final CFMetaData cfmeta = Schema.instance.getCFMetaData(ks, cf);
+ Directories dir = Directories.create(ks, cf);
+ ByteBuffer key = bytes("key");
+
+ // 1st sstable
- SSTableSimpleWriter writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(100),
- cfmeta, StorageService.getPartitioner());
++ SSTableSimpleWriter writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(), cfmeta, StorageService.getPartitioner());
+ writer.newRow(key);
+ writer.addColumn(bytes("col"), bytes("val"), 1);
+ writer.close();
+
+ Map<Descriptor, Set<Component>> sstables = dir.sstableLister().list();
+ assert sstables.size() == 1;
+
+ Map.Entry<Descriptor, Set<Component>> sstableToOpen = sstables.entrySet().iterator().next();
+ final SSTableReader sstable1 = SSTableReader.open(sstableToOpen.getKey());
+
+ // simulate incomplete compaction
- writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(100),
++ writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(),
+ cfmeta, StorageService.getPartitioner())
+ {
+ protected SSTableWriter getWriter()
+ {
+ SSTableMetadata.Collector collector = SSTableMetadata.createCollector(cfmeta.comparator);
+ collector.addAncestor(sstable1.descriptor.generation); // add ancestor from previously written sstable
+ return new SSTableWriter(makeFilename(directory, metadata.ksName, metadata.cfName),
+ 0,
+ metadata,
+ StorageService.getPartitioner(),
+ collector);
+ }
+ };
+ writer.newRow(key);
+ writer.addColumn(bytes("col"), bytes("val"), 1);
+ writer.close();
+
+ // should have 2 sstables now
+ sstables = dir.sstableLister().list();
+ assert sstables.size() == 2;
+
+ ColumnFamilyStore.removeUnfinishedCompactionLeftovers(ks, cf, Sets.newHashSet(sstable1.descriptor.generation));
+
+ // 2nd sstable should be removed (only 1st sstable exists in set of size 1)
+ sstables = dir.sstableLister().list();
+ assert sstables.size() == 1;
+ assert sstables.containsKey(sstable1.descriptor);
+ }
+
private ColumnFamilyStore prepareMultiRangeSlicesTest(int valueSize, boolean flush) throws Throwable
{
- String tableName = "Keyspace1";
+ String keyspaceName = "Keyspace1";
String cfName = "Standard1";
- Table table = Table.open(tableName);
- ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName);
+ Keyspace keyspace = Keyspace.open(keyspaceName);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfName);
cfs.clearUnsafe();
String[] letters = new String[] { "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l" };
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2648047a/test/unit/org/apache/cassandra/db/DirectoriesTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2648047a/test/unit/org/apache/cassandra/db/compaction/LegacyLeveledManifestTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/LegacyLeveledManifestTest.java
index d8e8af0,0000000..7fd6c10
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/db/compaction/LegacyLeveledManifestTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LegacyLeveledManifestTest.java
@@@ -1,117 -1,0 +1,117 @@@
+package org.apache.cassandra.db.compaction;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableMetadata;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.Pair;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.apache.cassandra.db.compaction.LegacyLeveledManifestTestHelper.*;
+
+public class LegacyLeveledManifestTest
+{
+ private final static String LEGACY_VERSION = "ic";
+
+ private File destDir;
+ @Before
+ public void setup()
+ {
- destDir = Directories.create(KS, CF).getDirectoryForNewSSTables(0);
++ destDir = Directories.create(KS, CF).getDirectoryForNewSSTables();
+ FileUtils.createDirectory(destDir);
+ for (File srcFile : getLegacySSTableDir(LEGACY_VERSION).listFiles())
+ {
+ File destFile = new File(destDir, srcFile.getName());
+ FileUtils.createHardLink(srcFile,destFile);
+ assert destFile.exists() : destFile.getAbsoluteFile();
+ }
+ }
+ @After
+ public void tearDown()
+ {
+ FileUtils.deleteRecursive(destDir);
+ }
+
+ @Test
+ public void migrateTest() throws IOException
+ {
+ assertTrue(LegacyLeveledManifest.manifestNeedsMigration(KS, CF));
+ }
+
+ @Test
+ public void doMigrationTest() throws IOException, InterruptedException
+ {
+ LegacyLeveledManifest.migrateManifests(KS, CF);
+
+ for (int i = 0; i <= 2; i++)
+ {
+ Descriptor descriptor = Descriptor.fromFilename(destDir+File.separator+KS+"-"+CF+"-"+LEGACY_VERSION+"-"+i+"-Statistics.db");
+ SSTableMetadata metadata = SSTableMetadata.serializer.deserialize(descriptor).left;
+ assertEquals(metadata.sstableLevel, i);
+ }
+ }
+
+ /**
+ * Validate that the rewritten stats file is the same as the original one.
+ * @throws IOException
+ */
+ @Test
+ public void validateSSTableMetadataTest() throws IOException
+ {
+ Map<Descriptor, Pair<SSTableMetadata, Set<Integer>>> beforeMigration = new HashMap<>();
+ for (int i = 0; i <= 2; i++)
+ {
+ Descriptor descriptor = Descriptor.fromFilename(destDir+File.separator+KS+"-"+CF+"-"+LEGACY_VERSION+"-"+i+"-Statistics.db");
+ beforeMigration.put(descriptor, SSTableMetadata.serializer.deserialize(descriptor, false));
+ }
+
+ LegacyLeveledManifest.migrateManifests(KS, CF);
+
+ for (Map.Entry<Descriptor, Pair<SSTableMetadata, Set<Integer>>> entry : beforeMigration.entrySet())
+ {
+ Pair<SSTableMetadata, Set<Integer>> newMetaPair = SSTableMetadata.serializer.deserialize(entry.getKey());
+ SSTableMetadata newMetadata = newMetaPair.left;
+ SSTableMetadata oldMetadata = entry.getValue().left;
+ assertEquals(newMetadata.estimatedRowSize, oldMetadata.estimatedRowSize);
+ assertEquals(newMetadata.estimatedColumnCount, oldMetadata.estimatedColumnCount);
+ assertEquals(newMetadata.replayPosition, oldMetadata.replayPosition);
+ assertEquals(newMetadata.minTimestamp, oldMetadata.minTimestamp);
+ assertEquals(newMetadata.maxTimestamp, oldMetadata.maxTimestamp);
+ assertEquals(newMetadata.compressionRatio, oldMetadata.compressionRatio, 0.01);
+ assertEquals(newMetadata.partitioner, oldMetadata.partitioner);
+ assertEquals(newMetadata.estimatedTombstoneDropTime, oldMetadata.estimatedTombstoneDropTime);
+ assertEquals(entry.getValue().right, newMetaPair.right);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2648047a/test/unit/org/apache/cassandra/io/sstable/SSTableSimpleWriterTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/sstable/SSTableSimpleWriterTest.java
index e80d2bb,ce569b9..9e7aa16
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableSimpleWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableSimpleWriterTest.java
@@@ -39,11 -39,11 +39,11 @@@ public class SSTableSimpleWriterTest ex
final int INC = 5;
final int NBCOL = 10;
- String tablename = "Keyspace1";
+ String keyspaceName = "Keyspace1";
String cfname = "StandardInteger1";
- Table t = Table.open(tablename); // make sure we create the directory
- File dir = Directories.create(tablename, cfname).getDirectoryForNewSSTables();
+ Keyspace t = Keyspace.open(keyspaceName); // make sure we create the directory
- File dir = Directories.create(keyspaceName, cfname).getDirectoryForNewSSTables(0);
++ File dir = Directories.create(keyspaceName, cfname).getDirectoryForNewSSTables();
assert dir.exists();
IPartitioner partitioner = StorageService.getPartitioner();