You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/05/23 17:08:34 UTC

[2/4] git commit: merge from 1.2

merge from 1.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f620b348
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f620b348
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f620b348

Branch: refs/heads/trunk
Commit: f620b348a064a55749ffcbc4d25e08c3fe71f1be
Parents: 0680372 b706391
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu May 23 10:08:06 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Thu May 23 10:08:06 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../org/apache/cassandra/db/ColumnFamilyStore.java |    9 ++++-
 .../apache/cassandra/db/HintedHandOffManager.java  |   25 +++++++++++
 src/java/org/apache/cassandra/db/RowMutation.java  |    9 ++++
 src/java/org/apache/cassandra/db/SystemTable.java  |   32 ++++++++++-----
 .../cassandra/db/commitlog/CommitLogReplayer.java  |   10 ++---
 6 files changed, 68 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f620b348/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index b765896,66c5f04..d283b72
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,58 -1,5 +1,59 @@@
 +2.0
 + * Removed on-heap row cache (CASSANDRA-5348)
 + * use nanotime consistently for node-local timeouts (CASSANDRA-5581)
 + * Avoid unnecessary second pass on name-based queries (CASSANDRA-5577)
 + * Experimental triggers (CASSANDRA-1311)
 + * JEMalloc support for off-heap allocation (CASSANDRA-3997)
 + * Single-pass compaction (CASSANDRA-4180)
 + * Removed token range bisection (CASSANDRA-5518)
 + * Removed compatibility with pre-1.2.5 sstables and network messages
 +   (CASSANDRA-5511)
 + * removed PBSPredictor (CASSANDRA-5455)
 + * CAS support (CASSANDRA-5062, 5441, 5443)
 + * Leveled compaction performs size-tiered compactions in L0 
 +   (CASSANDRA-5371, 5439)
 + * Add yaml network topology snitch for mixed ec2/other envs (CASSANDRA-5339)
 + * Log when a node is down longer than the hint window (CASSANDRA-4554)
 + * Optimize tombstone creation for ExpiringColumns (CASSANDRA-4917)
 + * Improve LeveledScanner work estimation (CASSANDRA-5250, 5407)
 + * Replace compaction lock with runWithCompactionsDisabled (CASSANDRA-3430)
 + * Change Message IDs to ints (CASSANDRA-5307)
 + * Move sstable level information into the Stats component, removing the
 +   need for a separate Manifest file (CASSANDRA-4872)
 + * avoid serializing to byte[] on commitlog append (CASSANDRA-5199)
 + * make index_interval configurable per columnfamily (CASSANDRA-3961)
 + * add default_time_to_live (CASSANDRA-3974)
 + * add memtable_flush_period_in_ms (CASSANDRA-4237)
 + * replace supercolumns internally by composites (CASSANDRA-3237, 5123)
 + * upgrade thrift to 0.9.0 (CASSANDRA-3719)
 + * drop unnecessary keyspace parameter from user-defined compaction API 
 +   (CASSANDRA-5139)
 + * more robust solution to incomplete compactions + counters (CASSANDRA-5151)
 + * Change order of directory searching for c*.in.sh (CASSANDRA-3983)
 + * Add tool to reset SSTable compaction level for LCS (CASSANDRA-5271)
 + * Allow custom configuration loader (CASSANDRA-5045)
 + * Remove memory emergency pressure valve logic (CASSANDRA-3534)
 + * Reduce request latency with eager retry (CASSANDRA-4705)
 + * cqlsh: Remove ASSUME command (CASSANDRA-5331)
 + * Rebuild BF when loading sstables if bloom_filter_fp_chance
 +   has changed since compaction (CASSANDRA-5015)
 + * remove row-level bloom filters (CASSANDRA-4885)
 + * Change Kernel Page Cache skipping into row preheating (disabled by default)
 +   (CASSANDRA-4937)
 + * Improve repair by deciding on a gcBefore before sending
 +   out TreeRequests (CASSANDRA-4932)
 + * Add an official way to disable compactions (CASSANDRA-5074)
 + * Reenable ALTER TABLE DROP with new semantics (CASSANDRA-3919)
 + * Add binary protocol versioning (CASSANDRA-5436)
 + * Swap THshaServer for TThreadedSelectorServer (CASSANDRA-5530)
 + * Add alias support to SELECT statement (CASSANDRA-5075)
 + * Don't create empty RowMutations in CommitLogReplayer (CASSANDRA-5541)
 + * Use range tombstones when dropping cfs/columns from schema (CASSANDRA-5579)
 + * cqlsh: drop CQL2/CQL3-beta support (CASSANDRA-5585)
 +
 +
  1.2.6
+  * Ignore pre-truncate hints (CASSANDRA-4655)
   * Move System.exit on OOM into a separate thread (CASSANDRA-5273)
   * Write row markers when serializing schema (CASSANDRA-5572)
   * Check only SSTables for the requested range when streaming (CASSANDRA-5569)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f620b348/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 78825cf,429859e..dcd7814
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -1830,116 -1793,11 +1831,116 @@@ public class ColumnFamilyStore implemen
              }
          }
  
 -        long truncatedAt = System.currentTimeMillis();
 -        if (DatabaseDescriptor.isAutoSnapshot())
 -            snapshot(Table.getTimestampedSnapshotName(columnFamily));
 +        Runnable truncateRunnable = new Runnable()
 +        {
 +            public void run()
 +            {
 +                logger.debug("Discarding sstable data for truncated CF + indexes");
 +
 +                final long truncatedAt = System.currentTimeMillis();
 +                if (DatabaseDescriptor.isAutoSnapshot())
 +                    snapshot(Table.getTimestampedSnapshotName(name));
 +
 +                ReplayPosition replayAfter = discardSSTables(truncatedAt);
 +
 +                for (SecondaryIndex index : indexManager.getIndexes())
 +                    index.truncateBlocking(truncatedAt);
 +
-                 SystemTable.saveTruncationPosition(ColumnFamilyStore.this, replayAfter);
++                SystemTable.saveTruncationRecord(ColumnFamilyStore.this, truncatedAt, replayAfter);
 +
 +                logger.debug("cleaning out row cache");
 +                for (RowCacheKey key : CacheService.instance.rowCache.getKeySet())
 +                {
 +                    if (key.cfId == metadata.cfId)
 +                        CacheService.instance.rowCache.remove(key);
 +                }
 +            }
 +        };
 +
 +        runWithCompactionsDisabled(Executors.callable(truncateRunnable), true);
 +        logger.debug("truncate complete");
 +    }
 +
 +    public <V> V runWithCompactionsDisabled(Callable<V> callable, boolean interruptValidation)
 +    {
 +        // synchronize so that concurrent invocations don't re-enable compactions partway through unexpectedly,
 +        // and so we only run one major compaction at a time
 +        synchronized (this)
 +        {
 +            logger.debug("Cancelling in-progress compactions for {}", metadata.cfName);
 +
 +            Iterable<ColumnFamilyStore> selfWithIndexes = concatWithIndexes();
 +            for (ColumnFamilyStore cfs : selfWithIndexes)
 +                cfs.getCompactionStrategy().pause();
 +            try
 +            {
 +                // interrupt in-progress compactions
 +                Function<ColumnFamilyStore, CFMetaData> f = new Function<ColumnFamilyStore, CFMetaData>()
 +                {
 +                    public CFMetaData apply(ColumnFamilyStore cfs)
 +                    {
 +                        return cfs.metadata;
 +                    }
 +                };
 +                Iterable<CFMetaData> allMetadata = Iterables.transform(selfWithIndexes, f);
 +                CompactionManager.instance.interruptCompactionFor(allMetadata, interruptValidation);
 +
 +                // wait for the interruption to be recognized
 +                long start = System.nanoTime();
 +                long delay = TimeUnit.MINUTES.toNanos(1);
 +                while (System.nanoTime() - start < delay)
 +                {
 +                    if (CompactionManager.instance.isCompacting(selfWithIndexes))
 +                        Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
 +                    else
 +                        break;
 +                }
 +
 +                // doublecheck that we finished, instead of timing out
 +                for (ColumnFamilyStore cfs : selfWithIndexes)
 +                {
 +                    if (!cfs.getDataTracker().getCompacting().isEmpty())
 +                    {
 +                        logger.warn("Unable to cancel in-progress compactions for {}.  Probably there is an unusually large row in progress somewhere.  It is also possible that buggy code left some sstables compacting after it was done with them", metadata.cfName);
 +                    }
 +                }
 +                logger.debug("Compactions successfully cancelled");
 +
 +                // run our task
 +                try
 +                {
 +                    return callable.call();
 +                }
 +                catch (Exception e)
 +                {
 +                    throw new RuntimeException(e);
 +                }
 +            }
 +            finally
 +            {
 +                for (ColumnFamilyStore cfs : selfWithIndexes)
 +                    cfs.getCompactionStrategy().resume();
 +            }
 +        }
 +    }
 +
 +    public Iterable<SSTableReader> markAllCompacting()
 +    {
 +        Callable<Iterable<SSTableReader>> callable = new Callable<Iterable<SSTableReader>>()
 +        {
 +            public Iterable<SSTableReader> call() throws Exception
 +            {
 +                assert data.getCompacting().isEmpty() : data.getCompacting();
 +                Iterable<SSTableReader> sstables = Lists.newArrayList(AbstractCompactionStrategy.filterSuspectSSTables(getSSTables()));
 +                if (Iterables.isEmpty(sstables))
 +                    return null;
 +                boolean success = data.markCompacting(sstables);
 +                assert success : "something marked things compacting while compactions are disabled";
 +                return sstables;
 +            }
 +        };
  
 -        return CompactionManager.instance.submitTruncate(this, truncatedAt);
 +        return runWithCompactionsDisabled(callable, false);
      }
  
      public long getBloomFilterFalsePositives()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f620b348/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f620b348/src/java/org/apache/cassandra/db/RowMutation.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/RowMutation.java
index b08055f,b85cfcd..d78247b
--- a/src/java/org/apache/cassandra/db/RowMutation.java
+++ b/src/java/org/apache/cassandra/db/RowMutation.java
@@@ -238,14 -311,65 +238,23 @@@ public class RowMutation implements IMu
          return buff.append("])").toString();
      }
  
 -    public void addColumnOrSuperColumn(String cfName, ColumnOrSuperColumn cosc)
 -    {
 -        if (cosc.super_column != null)
 -        {
 -            for (org.apache.cassandra.thrift.Column column : cosc.super_column.columns)
 -            {
 -                add(new QueryPath(cfName, cosc.super_column.name, column.name), column.value, column.timestamp, column.ttl);
 -            }
 -        }
 -        else if (cosc.column != null)
 -        {
 -            add(new QueryPath(cfName, null, cosc.column.name), cosc.column.value, cosc.column.timestamp, cosc.column.ttl);
 -        }
 -        else if (cosc.counter_super_column != null)
 -        {
 -            for (org.apache.cassandra.thrift.CounterColumn column : cosc.counter_super_column.columns)
 -            {
 -                addCounter(new QueryPath(cfName, cosc.counter_super_column.name, column.name), column.value);
 -            }
 -        }
 -        else // cosc.counter_column != null
 -        {
 -            addCounter(new QueryPath(cfName, null, cosc.counter_column.name), cosc.counter_column.value);
 -        }
 -    }
 -
 -    public void deleteColumnOrSuperColumn(String cfName, Deletion del)
 -    {
 -        if (del.predicate != null && del.predicate.column_names != null)
 -        {
 -            for(ByteBuffer c : del.predicate.column_names)
 -            {
 -                if (del.super_column == null && Schema.instance.getColumnFamilyType(table, cfName) == ColumnFamilyType.Super)
 -                    delete(new QueryPath(cfName, c), del.timestamp);
 -                else
 -                    delete(new QueryPath(cfName, del.super_column, c), del.timestamp);
 -            }
 -        }
 -        else
 -        {
 -            delete(new QueryPath(cfName, del.super_column), del.timestamp);
 -        }
 -    }
 -
+     public RowMutation without(UUID cfId)
+     {
+         RowMutation rm = new RowMutation(table, key);
+         for (Map.Entry<UUID, ColumnFamily> entry : modifications.entrySet())
+             if (!entry.getKey().equals(cfId))
+                 rm.add(entry.getValue());
+         return rm;
+     }
+ 
      public static class RowMutationSerializer implements IVersionedSerializer<RowMutation>
      {
 -        public void serialize(RowMutation rm, DataOutput dos, int version) throws IOException
 +        public void serialize(RowMutation rm, DataOutput out, int version) throws IOException
          {
 -            dos.writeUTF(rm.getTable());
 -            ByteBufferUtil.writeWithShortLength(rm.key(), dos);
 +            if (version < MessagingService.VERSION_20)
 +                out.writeUTF(rm.getTable());
 +
 +            ByteBufferUtil.writeWithShortLength(rm.key(), out);
  
              /* serialize the modifications in the mutation */
              int size = rm.modifications.size();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f620b348/src/java/org/apache/cassandra/db/SystemTable.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/SystemTable.java
index dd818b2,327f01b..9662661
--- a/src/java/org/apache/cassandra/db/SystemTable.java
+++ b/src/java/org/apache/cassandra/db/SystemTable.java
@@@ -45,14 -43,14 +45,14 @@@ import org.apache.cassandra.db.marshal.
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
  import org.apache.cassandra.exceptions.ConfigurationException;
- import org.apache.cassandra.io.util.DataOutputBuffer;
 +import org.apache.cassandra.io.sstable.SSTableReader;
+ import org.apache.cassandra.io.util.DataOutputBuffer;
  import org.apache.cassandra.locator.IEndpointSnitch;
  import org.apache.cassandra.service.StorageService;
 -import org.apache.cassandra.thrift.Constants;
 -import org.apache.cassandra.utils.ByteBufferUtil;
 -import org.apache.cassandra.utils.CounterId;
 -import org.apache.cassandra.utils.FBUtilities;
 -import org.apache.cassandra.utils.Pair;
 +import org.apache.cassandra.service.paxos.Commit;
 +import org.apache.cassandra.service.paxos.PaxosState;
 +import org.apache.cassandra.thrift.cassandraConstants;
 +import org.apache.cassandra.utils.*;
  
  import static org.apache.cassandra.cql3.QueryProcessor.processInternal;
  
@@@ -126,71 -140,61 +126,81 @@@ public class SystemTabl
                                           DatabaseDescriptor.getPartitioner().getClass().getName()));
      }
  
 -    /** if system data becomes incompatible across versions of cassandra, that logic (and associated purging) is managed here */
 -    private static void upgradeSystemData() throws ExecutionException, InterruptedException
 +    /**
 +     * Write compaction log, except columfamilies under system keyspace.
 +     *
 +     * @param cfs
 +     * @param toCompact sstables to compact
 +     * @return compaction task id or null if cfs is under system keyspace
 +     */
 +    public static UUID startCompaction(ColumnFamilyStore cfs, Iterable<SSTableReader> toCompact)
      {
 -        Table table = Table.open(Table.SYSTEM_KS);
 -        ColumnFamilyStore oldStatusCfs = table.getColumnFamilyStore(OLD_STATUS_CF);
 -        if (oldStatusCfs.getSSTables().size() > 0)
 +        if (Table.SYSTEM_KS.equals(cfs.table.getName()))
 +            return null;
 +
 +        UUID compactionId = UUIDGen.getTimeUUID();
 +        String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, inputs) VALUES (%s, '%s', '%s', {%s})";
 +        Iterable<Integer> generations = Iterables.transform(toCompact, new Function<SSTableReader, Integer>()
          {
 -            SortedSet<ByteBuffer> cols = new TreeSet<ByteBuffer>(BytesType.instance);
 -            cols.add(ByteBufferUtil.bytes("ClusterName"));
 -            cols.add(ByteBufferUtil.bytes("Token"));
 -            QueryFilter filter = QueryFilter.getNamesFilter(decorate(ByteBufferUtil.bytes("L")), new QueryPath(OLD_STATUS_CF), cols);
 -            ColumnFamily oldCf = oldStatusCfs.getColumnFamily(filter);
 -            Iterator<IColumn> oldColumns = oldCf.columns.iterator();
 -
 -            String clusterName = null;
 -            try
 -            {
 -                clusterName = ByteBufferUtil.string(oldColumns.next().value());
 -            }
 -            catch (CharacterCodingException e)
 +            public Integer apply(SSTableReader sstable)
              {
 -                throw new RuntimeException(e);
 +                return sstable.descriptor.generation;
              }
 -            // serialize the old token as a collection of (one )tokens.
 -            Token token = StorageService.getPartitioner().getTokenFactory().fromByteArray(oldColumns.next().value());
 -            String tokenBytes = tokensAsSet(Collections.singleton(token));
 -            // (assume that any node getting upgraded was bootstrapped, since that was stored in a separate row for no particular reason)
 -            String req = "INSERT INTO system.%s (key, cluster_name, tokens, bootstrapped) VALUES ('%s', '%s', %s, '%s')";
 -            processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, clusterName, tokenBytes, BootstrapState.COMPLETED.name()));
 -
 -            oldStatusCfs.truncate();
 -        }
 +        });
 +        processInternal(String.format(req, COMPACTION_LOG, compactionId, cfs.table.getName(), cfs.name, StringUtils.join(Sets.newHashSet(generations), ',')));
 +        forceBlockingFlush(COMPACTION_LOG);
 +        return compactionId;
 +    }
 +
 +    public static void finishCompaction(UUID taskId)
 +    {
 +        assert taskId != null;
  
 -        ColumnFamilyStore oldHintsCfs = table.getColumnFamilyStore(OLD_HINTS_CF);
 -        if (oldHintsCfs.getSSTables().size() > 0)
 +        String req = "DELETE FROM system.%s WHERE id = %s";
 +        processInternal(String.format(req, COMPACTION_LOG, taskId));
 +        forceBlockingFlush(COMPACTION_LOG);
 +    }
 +
 +    /**
 +     * @return unfinished compactions, grouped by keyspace/columnfamily pair.
 +     */
 +    public static SetMultimap<Pair<String, String>, Integer> getUnfinishedCompactions()
 +    {
 +        String req = "SELECT * FROM system.%s";
 +        UntypedResultSet resultSet = processInternal(String.format(req, COMPACTION_LOG));
 +
 +        SetMultimap<Pair<String, String>, Integer> unfinishedCompactions = HashMultimap.create();
 +        for (UntypedResultSet.Row row : resultSet)
          {
 -            logger.info("Possible old-format hints found. Truncating");
 -            oldHintsCfs.truncate();
 +            String keyspace = row.getString("keyspace_name");
 +            String columnfamily = row.getString("columnfamily_name");
 +            Set<Integer> inputs = row.getSet("inputs", Int32Type.instance);
 +
 +            unfinishedCompactions.putAll(Pair.create(keyspace, columnfamily), inputs);
          }
 +        return unfinishedCompactions;
 +    }
 +
 +    public static void discardCompactionsInProgress()
 +    {
 +        ColumnFamilyStore compactionLog = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(COMPACTION_LOG);
 +        compactionLog.truncateBlocking();
      }
  
-     public static void saveTruncationPosition(ColumnFamilyStore cfs, ReplayPosition position)
+     public static void saveTruncationRecord(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position)
      {
          String req = "UPDATE system.%s SET truncated_at = truncated_at + %s WHERE key = '%s'";
-         processInternal(String.format(req, LOCAL_CF, positionAsMapEntry(cfs, position), LOCAL_KEY));
+         processInternal(String.format(req, LOCAL_CF, truncationAsMapEntry(cfs, truncatedAt, position), LOCAL_KEY));
+         forceBlockingFlush(LOCAL_CF);
+     }
+ 
+     /**
+      * This method is used to remove information about truncation time for specified column family
+      */
+     public static void removeTruncationRecord(UUID cfId)
+     {
+         String req = "DELETE truncation_time['%s'] from system.%s WHERE key = '%s'";
+         processInternal(String.format(req, cfId, LOCAL_CF, LOCAL_KEY));
          forceBlockingFlush(LOCAL_CF);
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f620b348/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------