You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/09/18 19:45:20 UTC

[09/10] git commit: merge from 1.2

merge from 1.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2648047a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2648047a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2648047a

Branch: refs/heads/cassandra-2.0
Commit: 2648047a443306c8f2fc921c353cdbd54d964c5f
Parents: e93578b 7161aec
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Sep 18 12:44:31 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Sep 18 12:44:31 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 doc/cql3/CQL.textile                            |  2 +-
 .../org/apache/cassandra/db/Directories.java    | 62 +++++---------------
 .../db/compaction/CompactionManager.java        |  2 +-
 .../cassandra/db/compaction/Scrubber.java       |  2 +-
 .../cassandra/io/util/DiskAwareRunnable.java    |  2 +-
 .../cassandra/streaming/StreamReader.java       |  2 +-
 .../cassandra/db/ColumnFamilyStoreTest.java     |  7 +--
 .../apache/cassandra/db/DirectoriesTest.java    |  2 +-
 .../compaction/LegacyLeveledManifestTest.java   |  2 +-
 .../io/sstable/SSTableSimpleWriterTest.java     |  2 +-
 11 files changed, 28 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2648047a/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 98ea03f,fb9915e..fd8d852
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,30 -1,5 +1,31 @@@
 -1.2.10
 +2.0.1
 + * add file_cache_size_in_mb setting (CASSANDRA-5661)
 + * Improve error message when yaml contains invalid properties (CASSANDRA-5958)
 + * Improve leveled compaction's ability to find non-overlapping L0 compactions
 +   to work on concurrently (CASSANDRA-5921)
 + * Notify indexer of columns shadowed by range tombstones (CASSANDRA-5614)
 + * Log Merkle tree stats (CASSANDRA-2698)
 + * Switch from crc32 to adler32 for compressed sstable checksums (CASSANDRA-5862)
 + * Improve offheap memcpy performance (CASSANDRA-5884)
 + * Use a range aware scanner for cleanup (CASSANDRA-2524)
 + * Cleanup doesn't need to inspect sstables that contain only local data 
 +   (CASSANDRA-5722)
 + * Add ability for CQL3 to list partition keys (CASSANDRA-4536)
 + * Improve native protocol serialization (CASSANDRA-5664)
 + * Upgrade Thrift to 0.9.1 (CASSANDRA-5923)
 + * Require superuser status for adding triggers (CASSANDRA-5963)
 + * Make standalone scrubber handle old and new style leveled manifest
 +   (CASSANDRA-6005)
 + * Fix paxos bugs (CASSANDRA-6012, 6013, 6023)
 + * Fix paged ranges with multiple replicas (CASSANDRA-6004)
 + * Fix potential AssertionError during tracing (CASSANDRA-6041)
 + * Fix NPE in sstablesplit (CASSANDRA-6027)
 + * Migrate pre-2.0 key/value/column aliases to system.schema_columns
 +   (CASSANDRA-6009)
 + * Paging filter empty rows too agressively (CASSANDRA-6040)
 + * Support variadic parameters for IN clauses (CASSANDRA-4210)
 +Merged from 1.2:
+  * Avoid second-guessing out-of-space state (CASSANDRA-5605)
   * Tuning knobs for dealing with large blobs and many CFs (CASSANDRA-5982)
   * (Hadoop) Fix CQLRW for thrift tables (CASSANDRA-6002)
   * Fix possible divide-by-zero in HHOM (CASSANDRA-5990)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2648047a/doc/cql3/CQL.textile
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2648047a/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Directories.java
index 52f9699,351c0c0..18fc639
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@@ -19,9 -19,9 +19,10 @@@ package org.apache.cassandra.db
  
  import java.io.File;
  import java.io.FileFilter;
+ import java.io.IOError;
  import java.io.IOException;
  import java.util.*;
 +import java.util.concurrent.TimeUnit;
  import java.util.concurrent.atomic.AtomicInteger;
  import java.util.concurrent.atomic.AtomicLong;
  
@@@ -149,8 -147,15 +150,8 @@@ public class Directorie
              // retry after GCing has forced unmap of compacted SSTables so they can be deleted
              // Note: GCInspector will do this already, but only sun JVM supports GCInspector so far
              SSTableDeletingTask.rescheduleFailedTasks();
 -            try
 -            {
 -                Thread.sleep(10000);
 -            }
 -            catch (InterruptedException e)
 -            {
 -                throw new AssertionError(e);
 -            }
 +            Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
-             path = getLocationWithMaximumAvailableSpace(estimatedSize);
+             path = getWriteableLocationAsFile();
          }
  
          return path;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2648047a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2648047a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2648047a/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReader.java
index 862f5a2,0000000..d72cb5e
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@@ -1,137 -1,0 +1,137 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.streaming;
 +
 +import java.io.DataInput;
 +import java.io.DataInputStream;
 +import java.io.IOException;
 +import java.io.InputStream;
 +import java.nio.channels.Channels;
 +import java.nio.channels.ReadableByteChannel;
 +import java.util.Collection;
 +import java.util.UUID;
 +
 +import com.google.common.base.Throwables;
 +import com.ning.compress.lzf.LZFInputStream;
 +
 +import org.apache.cassandra.config.Schema;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.DecoratedKey;
 +import org.apache.cassandra.db.Directories;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.io.sstable.Component;
 +import org.apache.cassandra.io.sstable.Descriptor;
 +import org.apache.cassandra.io.sstable.SSTableReader;
 +import org.apache.cassandra.io.sstable.SSTableWriter;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.streaming.messages.FileMessageHeader;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.BytesReadTracker;
 +import org.apache.cassandra.utils.Pair;
 +
 +/**
 + * StreamReader reads from stream and writes to SSTable.
 + */
 +public class StreamReader
 +{
 +    protected final UUID cfId;
 +    protected final long estimatedKeys;
 +    protected final Collection<Pair<Long, Long>> sections;
 +    protected final StreamSession session;
 +    protected final Descriptor.Version inputVersion;
 +
 +    protected Descriptor desc;
 +
 +    public StreamReader(FileMessageHeader header, StreamSession session)
 +    {
 +        this.session = session;
 +        this.cfId = header.cfId;
 +        this.estimatedKeys = header.estimatedKeys;
 +        this.sections = header.sections;
 +        this.inputVersion = new Descriptor.Version(header.version);
 +    }
 +
 +    /**
 +     * @param channel where this reads data from
 +     * @return SSTable transferred
 +     * @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails.
 +     */
 +    public SSTableReader read(ReadableByteChannel channel) throws IOException
 +    {
 +        long totalSize = totalSize();
 +
 +        Pair<String, String> kscf = Schema.instance.getCF(cfId);
 +        ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
 +
 +        SSTableWriter writer = createWriter(cfs, totalSize);
 +        DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel)));
 +        BytesReadTracker in = new BytesReadTracker(dis);
 +        try
 +        {
 +            while (in.getBytesRead() < totalSize)
 +            {
 +                writeRow(writer, in, cfs);
 +                // TODO move this to BytesReadTracker
 +                session.progress(desc, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
 +            }
 +            return writer.closeAndOpenReader();
 +        }
 +        catch (Throwable e)
 +        {
 +            writer.abort();
 +            drain(dis, in.getBytesRead());
 +            if (e instanceof IOException)
 +                throw (IOException) e;
 +            else
 +                throw Throwables.propagate(e);
 +        }
 +    }
 +
 +    protected SSTableWriter createWriter(ColumnFamilyStore cfs, long totalSize) throws IOException
 +    {
-         Directories.DataDirectory localDir = cfs.directories.getLocationCapableOfSize(totalSize);
++        Directories.DataDirectory localDir = cfs.directories.getWriteableLocation();
 +        if (localDir == null)
 +            throw new IOException("Insufficient disk space to store " + totalSize + " bytes");
 +        desc = Descriptor.fromFilename(cfs.getTempSSTablePath(cfs.directories.getLocationForDisk(localDir)));
 +
 +        return new SSTableWriter(desc.filenameFor(Component.DATA), estimatedKeys);
 +    }
 +
 +    protected void drain(InputStream dis, long bytesRead) throws IOException
 +    {
 +        long toSkip = totalSize() - bytesRead;
 +        toSkip = toSkip - dis.skip(toSkip);
 +        while (toSkip > 0)
 +            toSkip = toSkip - dis.skip(toSkip);
 +    }
 +
 +    protected long totalSize()
 +    {
 +        long size = 0;
 +        for (Pair<Long, Long> section : sections)
 +            size += section.right - section.left;
 +        return size;
 +    }
 +
 +    protected void writeRow(SSTableWriter writer, DataInput in, ColumnFamilyStore cfs) throws IOException
 +    {
 +        DecoratedKey key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in));
 +        writer.appendFromStream(key, cfs.metadata, in, inputVersion);
 +        cfs.invalidateCachedRow(key);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2648047a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index f21b60e,abe3f05..0a78b2a
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@@ -1334,66 -1240,12 +1334,65 @@@ public class ColumnFamilyStoreTest exte
          testMultiRangeSlicesBehavior(prepareMultiRangeSlicesTest(10, false));
      }
  
 +    @Test
 +    public void testRemoveUnifinishedCompactionLeftovers() throws Throwable
 +    {
 +        String ks = "Keyspace1";
 +        String cf = "Standard3"; // should be empty
 +
 +        final CFMetaData cfmeta = Schema.instance.getCFMetaData(ks, cf);
 +        Directories dir = Directories.create(ks, cf);
 +        ByteBuffer key = bytes("key");
 +
 +        // 1st sstable
-         SSTableSimpleWriter writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(100),
-                                                              cfmeta, StorageService.getPartitioner());
++        SSTableSimpleWriter writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(), cfmeta, StorageService.getPartitioner());
 +        writer.newRow(key);
 +        writer.addColumn(bytes("col"), bytes("val"), 1);
 +        writer.close();
 +
 +        Map<Descriptor, Set<Component>> sstables = dir.sstableLister().list();
 +        assert sstables.size() == 1;
 +
 +        Map.Entry<Descriptor, Set<Component>> sstableToOpen = sstables.entrySet().iterator().next();
 +        final SSTableReader sstable1 = SSTableReader.open(sstableToOpen.getKey());
 +
 +        // simulate incomplete compaction
-         writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(100),
++        writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(),
 +                                                             cfmeta, StorageService.getPartitioner())
 +        {
 +            protected SSTableWriter getWriter()
 +            {
 +                SSTableMetadata.Collector collector = SSTableMetadata.createCollector(cfmeta.comparator);
 +                collector.addAncestor(sstable1.descriptor.generation); // add ancestor from previously written sstable
 +                return new SSTableWriter(makeFilename(directory, metadata.ksName, metadata.cfName),
 +                                         0,
 +                                         metadata,
 +                                         StorageService.getPartitioner(),
 +                                         collector);
 +            }
 +        };
 +        writer.newRow(key);
 +        writer.addColumn(bytes("col"), bytes("val"), 1);
 +        writer.close();
 +
 +        // should have 2 sstables now
 +        sstables = dir.sstableLister().list();
 +        assert sstables.size() == 2;
 +
 +        ColumnFamilyStore.removeUnfinishedCompactionLeftovers(ks, cf, Sets.newHashSet(sstable1.descriptor.generation));
 +
 +        // 2nd sstable should be removed (only 1st sstable exists in set of size 1)
 +        sstables = dir.sstableLister().list();
 +        assert sstables.size() == 1;
 +        assert sstables.containsKey(sstable1.descriptor);
 +    }
 +
      private ColumnFamilyStore prepareMultiRangeSlicesTest(int valueSize, boolean flush) throws Throwable
      {
 -        String tableName = "Keyspace1";
 +        String keyspaceName = "Keyspace1";
          String cfName = "Standard1";
 -        Table table = Table.open(tableName);
 -        ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName);
 +        Keyspace keyspace = Keyspace.open(keyspaceName);
 +        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfName);
          cfs.clearUnsafe();
  
          String[] letters = new String[] { "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l" };

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2648047a/test/unit/org/apache/cassandra/db/DirectoriesTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2648047a/test/unit/org/apache/cassandra/db/compaction/LegacyLeveledManifestTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/LegacyLeveledManifestTest.java
index d8e8af0,0000000..7fd6c10
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/db/compaction/LegacyLeveledManifestTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LegacyLeveledManifestTest.java
@@@ -1,117 -1,0 +1,117 @@@
 +package org.apache.cassandra.db.compaction;
 +/*
 + * 
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + * 
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + * 
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + * 
 + */
 +
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.util.HashMap;
 +import java.util.Map;
 +import java.util.Set;
 +import org.junit.After;
 +import org.junit.Before;
 +import org.junit.Test;
 +
 +import org.apache.cassandra.db.Directories;
 +import org.apache.cassandra.io.sstable.Descriptor;
 +import org.apache.cassandra.io.sstable.SSTableMetadata;
 +import org.apache.cassandra.io.util.FileUtils;
 +import org.apache.cassandra.utils.Pair;
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertTrue;
 +import static org.apache.cassandra.db.compaction.LegacyLeveledManifestTestHelper.*;
 +
 +public class LegacyLeveledManifestTest
 +{
 +    private final static String LEGACY_VERSION = "ic";
 +
 +    private File destDir;
 +    @Before
 +    public void setup()
 +    {
-         destDir = Directories.create(KS, CF).getDirectoryForNewSSTables(0);
++        destDir = Directories.create(KS, CF).getDirectoryForNewSSTables();
 +        FileUtils.createDirectory(destDir);
 +        for (File srcFile : getLegacySSTableDir(LEGACY_VERSION).listFiles())
 +        {
 +            File destFile = new File(destDir, srcFile.getName());
 +            FileUtils.createHardLink(srcFile,destFile);
 +            assert destFile.exists() : destFile.getAbsoluteFile();
 +        }
 +    }
 +    @After
 +    public void tearDown()
 +    {
 +        FileUtils.deleteRecursive(destDir);
 +    }
 +
 +    @Test
 +    public void migrateTest() throws IOException
 +    {
 +        assertTrue(LegacyLeveledManifest.manifestNeedsMigration(KS, CF));
 +    }
 +
 +    @Test
 +    public void doMigrationTest() throws IOException, InterruptedException
 +    {
 +        LegacyLeveledManifest.migrateManifests(KS, CF);
 +
 +        for (int i = 0; i <= 2; i++)
 +        {
 +            Descriptor descriptor = Descriptor.fromFilename(destDir+File.separator+KS+"-"+CF+"-"+LEGACY_VERSION+"-"+i+"-Statistics.db");
 +            SSTableMetadata metadata = SSTableMetadata.serializer.deserialize(descriptor).left;
 +            assertEquals(metadata.sstableLevel, i);
 +        }
 +    }
 +
 +    /**
 +     * Validate that the rewritten stats file is the same as the original one.
 +     * @throws IOException
 +     */
 +    @Test
 +    public void validateSSTableMetadataTest() throws IOException
 +    {
 +        Map<Descriptor, Pair<SSTableMetadata, Set<Integer>>> beforeMigration = new HashMap<>();
 +        for (int i = 0; i <= 2; i++)
 +        {
 +            Descriptor descriptor = Descriptor.fromFilename(destDir+File.separator+KS+"-"+CF+"-"+LEGACY_VERSION+"-"+i+"-Statistics.db");
 +            beforeMigration.put(descriptor, SSTableMetadata.serializer.deserialize(descriptor, false));
 +        }
 +
 +        LegacyLeveledManifest.migrateManifests(KS, CF);
 +
 +        for (Map.Entry<Descriptor, Pair<SSTableMetadata, Set<Integer>>> entry : beforeMigration.entrySet())
 +        {
 +            Pair<SSTableMetadata, Set<Integer>> newMetaPair = SSTableMetadata.serializer.deserialize(entry.getKey());
 +            SSTableMetadata newMetadata = newMetaPair.left;
 +            SSTableMetadata oldMetadata = entry.getValue().left;
 +            assertEquals(newMetadata.estimatedRowSize, oldMetadata.estimatedRowSize);
 +            assertEquals(newMetadata.estimatedColumnCount, oldMetadata.estimatedColumnCount);
 +            assertEquals(newMetadata.replayPosition, oldMetadata.replayPosition);
 +            assertEquals(newMetadata.minTimestamp, oldMetadata.minTimestamp);
 +            assertEquals(newMetadata.maxTimestamp, oldMetadata.maxTimestamp);
 +            assertEquals(newMetadata.compressionRatio, oldMetadata.compressionRatio, 0.01);
 +            assertEquals(newMetadata.partitioner, oldMetadata.partitioner);
 +            assertEquals(newMetadata.estimatedTombstoneDropTime, oldMetadata.estimatedTombstoneDropTime);
 +            assertEquals(entry.getValue().right, newMetaPair.right);
 +        }
 +    }
 +
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2648047a/test/unit/org/apache/cassandra/io/sstable/SSTableSimpleWriterTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/sstable/SSTableSimpleWriterTest.java
index e80d2bb,ce569b9..9e7aa16
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableSimpleWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableSimpleWriterTest.java
@@@ -39,11 -39,11 +39,11 @@@ public class SSTableSimpleWriterTest ex
          final int INC = 5;
          final int NBCOL = 10;
  
 -        String tablename = "Keyspace1";
 +        String keyspaceName = "Keyspace1";
          String cfname = "StandardInteger1";
  
 -        Table t = Table.open(tablename); // make sure we create the directory
 -        File dir = Directories.create(tablename, cfname).getDirectoryForNewSSTables();
 +        Keyspace t = Keyspace.open(keyspaceName); // make sure we create the directory
-         File dir = Directories.create(keyspaceName, cfname).getDirectoryForNewSSTables(0);
++        File dir = Directories.create(keyspaceName, cfname).getDirectoryForNewSSTables();
          assert dir.exists();
  
          IPartitioner partitioner = StorageService.getPartitioner();