You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by zz...@apache.org on 2016/09/20 02:32:26 UTC

[20/50] [abbrv] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e4a53f4d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e4a53f4d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e4a53f4d

Branch: refs/heads/cassandra-3.9
Commit: e4a53f4d3160833af3ea7917a35e7e35ae02786d
Parents: ab98b11 b39d984
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Aug 31 20:24:03 2016 +0100
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Aug 31 20:25:24 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 src/java/org/apache/cassandra/gms/Gossiper.java |   5 +-
 .../apache/cassandra/gms/VersionedValue.java    |   6 +
 .../apache/cassandra/locator/TokenMetadata.java |  52 ++++++-
 .../cassandra/service/LoadBroadcaster.java      |   2 +-
 .../cassandra/service/StorageService.java       | 136 +++++++++++++++----
 6 files changed, 173 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4a53f4d/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 4b77e4d,d7e9394..30931d3
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,50 -1,5 +1,51 @@@
 -2.2.8
 +3.0.9
 + * Add option to state current gc_grace_seconds to tools/bin/sstablemetadata (CASSANDRA-12208)
 + * Fix file system race condition that may cause LogAwareFileLister to fail to classify files (CASSANDRA-11889)
 + * Fix file handle leaks due to simultaneous compaction/repair and
 +   listing snapshots, calculating snapshot sizes, or making schema
 +   changes (CASSANDRA-11594)
 + * Fix nodetool repair exits with 0 for some errors (CASSANDRA-12508)
 + * Do not shut down BatchlogManager twice during drain (CASSANDRA-12504)
 + * Disk failure policy should not be invoked on out of space (CASSANDRA-12385)
 + * Calculate last compacted key on startup (CASSANDRA-6216)
 + * Add schema to snapshot manifest, add USING TIMESTAMP clause to ALTER TABLE statements (CASSANDRA-7190)
 + * Fix clean interval not sent to commit log for empty memtable flush (CASSANDRA-12436)
 + * Fix potential resource leak in RMIServerSocketFactoryImpl (CASSANDRA-12331)
 + * Backport CASSANDRA-12002 (CASSANDRA-12177)
 + * Make sure compaction stats are updated when compaction is interrupted (CASSANDRA-12100)
 + * Fix potential bad messaging service message for paged range reads
 +   within mixed-version 3.x clusters (CASSANDRA-12249)
 + * Change commitlog and sstables to track dirty and clean intervals (CASSANDRA-11828)
 + * NullPointerException during compaction on table with static columns (CASSANDRA-12336)
 + * Fixed ConcurrentModificationException when reading metrics in GraphiteReporter (CASSANDRA-11823)
 + * Fix upgrade of super columns on thrift (CASSANDRA-12335)
 + * Fixed flacky BlacklistingCompactionsTest, switched to fixed size types and increased corruption size (CASSANDRA-12359)
 + * Rerun ReplicationAwareTokenAllocatorTest on failure to avoid flakiness (CASSANDRA-12277)
 + * Exception when computing read-repair for range tombstones (CASSANDRA-12263)
 + * Lost counter writes in compact table and static columns (CASSANDRA-12219)
 + * AssertionError with MVs on updating a row that isn't indexed due to a null value (CASSANDRA-12247)
 + * Disable RR and speculative retry with EACH_QUORUM reads (CASSANDRA-11980)
 + * Add option to override compaction space check (CASSANDRA-12180)
 + * Faster startup by only scanning each directory for temporary files once (CASSANDRA-12114)
 + * Respond with v1/v2 protocol header when responding to driver that attempts
 +   to connect with too low of a protocol version (CASSANDRA-11464)
 + * NullPointerExpception when reading/compacting table (CASSANDRA-11988)
 + * Fix problem with undeleteable rows on upgrade to new sstable format (CASSANDRA-12144)
 + * Fix paging logic for deleted partitions with static columns (CASSANDRA-12107)
 + * Wait until the message is being send to decide which serializer must be used (CASSANDRA-11393)
 + * Fix migration of static thrift column names with non-text comparators (CASSANDRA-12147)
 + * Fix upgrading sparse tables that are incorrectly marked as dense (CASSANDRA-11315)
 + * Fix reverse queries ignoring range tombstones (CASSANDRA-11733)
 + * Avoid potential race when rebuilding CFMetaData (CASSANDRA-12098)
 + * Avoid missing sstables when getting the canonical sstables (CASSANDRA-11996)
 + * Always select the live sstables when getting sstables in bounds (CASSANDRA-11944)
 + * Fix column ordering of results with static columns for Thrift requests in
 +   a mixed 2.x/3.x cluster, also fix potential non-resolved duplication of
 +   those static columns in query results (CASSANDRA-12123)
 + * Avoid digest mismatch with empty but static rows (CASSANDRA-12090)
 + * Fix EOF exception when altering column type (CASSANDRA-11820)
 +Merged from 2.2:
+  * Forward writes to replacement node when replace_address != broadcast_address (CASSANDRA-8523)
   * Enable repair -pr and -local together (fix regression of CASSANDRA-7450) (CASSANDRA-12522)
   * Fail repair on non-existing table (CASSANDRA-12279)
   * cqlsh copy: fix missing counter values (CASSANDRA-12476)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4a53f4d/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4a53f4d/src/java/org/apache/cassandra/gms/VersionedValue.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4a53f4d/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/locator/TokenMetadata.java
index 97c5f10,b06c9c8..b50db00
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@@ -343,6 -337,43 +352,43 @@@ public class TokenMetadat
          }
      }
  
+     public void addReplaceTokens(Collection<Token> replacingTokens, InetAddress newNode, InetAddress oldNode)
+     {
+         assert replacingTokens != null && !replacingTokens.isEmpty();
+         assert newNode != null && oldNode != null;
+ 
+         lock.writeLock().lock();
+         try
+         {
+             Collection<Token> oldNodeTokens = tokenToEndpointMap.inverse().get(oldNode);
+             if (!replacingTokens.containsAll(oldNodeTokens) || !oldNodeTokens.containsAll(replacingTokens))
+             {
+                 throw new RuntimeException(String.format("Node %s is trying to replace node %s with tokens %s with a " +
+                                                          "different set of tokens %s.", newNode, oldNode, oldNodeTokens,
+                                                          replacingTokens));
+             }
+ 
+             logger.debug("Replacing {} with {}", newNode, oldNode);
+             replacementToOriginal.put(newNode, oldNode);
+ 
+             addBootstrapTokens(replacingTokens, newNode, oldNode);
+         }
+         finally
+         {
+             lock.writeLock().unlock();
+         }
+     }
+ 
+     public Optional<InetAddress> getReplacementNode(InetAddress endpoint)
+     {
 -        return Optional.fromNullable(replacementToOriginal.inverse().get(endpoint));
++        return Optional.ofNullable(replacementToOriginal.inverse().get(endpoint));
+     }
+ 
+     public Optional<InetAddress> getReplacingNode(InetAddress endpoint)
+     {
 -        return Optional.fromNullable((replacementToOriginal.get(endpoint)));
++        return Optional.ofNullable((replacementToOriginal.get(endpoint)));
+     }
+ 
      public void removeBootstrapTokens(Collection<Token> tokens)
      {
          assert tokens != null && !tokens.isEmpty();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4a53f4d/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 60cb86b,9197ab1..c06bed2
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -454,9 -442,12 +454,12 @@@ public class StorageService extends Not
              VersionedValue tokensVersionedValue = Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
              if (tokensVersionedValue == null)
                  throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace");
 -            Collection<Token> tokens = TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes())));
 +            Collection<Token> tokens = TokenSerializer.deserialize(tokenMetadata.partitioner, new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes())));
  
-             SystemKeyspace.setLocalHostId(hostId); // use the replacee's host Id as our own so we receive hints, etc
+             if (isReplacingSameAddress())
+             {
+                 SystemKeyspace.setLocalHostId(replacingId); // use the replacee's host Id as our own so we receive hints, etc
+             }
              Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we need
              return tokens;
          }
@@@ -988,9 -952,19 +996,19 @@@
          }
      }
  
+     private void finishJoiningRing()
+     {
+         // start participating in the ring.
+         SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.COMPLETED);
+         setTokens(bootstrapTokens);
+ 
+         assert tokenMetadata.sortedTokens().size() > 0;
+         doAuthSetup();
+     }
+ 
      private void doAuthSetup()
      {
 -        maybeAddOrUpdateKeyspace(AuthKeyspace.definition());
 +        maybeAddOrUpdateKeyspace(AuthKeyspace.metadata());
  
          DatabaseDescriptor.getRoleManager().setup();
          DatabaseDescriptor.getAuthenticator().setup();
@@@ -1709,18 -1681,11 +1732,23 @@@
          }
      }
  
+     private static String[] splitValue(VersionedValue value)
+     {
+         return value.value.split(VersionedValue.DELIMITER_STR, -1);
+     }
+ 
 +    private void updateNetVersion(InetAddress endpoint, VersionedValue value)
 +    {
 +        try
 +        {
 +            MessagingService.instance().setVersion(endpoint, Integer.valueOf(value.value));
 +        }
 +        catch (NumberFormatException e)
 +        {
 +            throw new AssertionError("Got invalid value for NET_VERSION application state: " + value.value);
 +        }
 +    }
 +
      public void updateTopology(InetAddress endpoint)
      {
          if (getTokenMetadata().isMember(endpoint))
@@@ -1885,6 -1850,43 +1913,42 @@@
          tokenMetadata.updateHostId(Gossiper.instance.getHostId(endpoint), endpoint);
      }
  
 -
+     private void handleStateBootreplacing(InetAddress newNode, String[] pieces)
+     {
+         InetAddress oldNode;
+         try
+         {
+             oldNode = InetAddress.getByName(pieces[1]);
+         }
+         catch (Exception e)
+         {
+             logger.error("Node {} tried to replace malformed endpoint {}.", newNode, pieces[1], e);
+             return;
+         }
+ 
+         if (FailureDetector.instance.isAlive(oldNode))
+         {
+             throw new RuntimeException(String.format("Node %s is trying to replace alive node %s.", newNode, oldNode));
+         }
+ 
+         Optional<InetAddress> replacingNode = tokenMetadata.getReplacingNode(newNode);
+         if (replacingNode.isPresent() && !replacingNode.get().equals(oldNode))
+         {
+             throw new RuntimeException(String.format("Node %s is already replacing %s but is trying to replace %s.",
+                                                      newNode, replacingNode.get(), oldNode));
+         }
+ 
+         Collection<Token> tokens = getTokensFor(newNode);
+ 
+         if (logger.isDebugEnabled())
+             logger.debug("Node {} is replacing {}, tokens {}", newNode, oldNode, tokens);
+ 
+         tokenMetadata.addReplaceTokens(tokens, newNode, oldNode);
+         PendingRangeCalculatorService.instance.update();
+ 
+         tokenMetadata.updateHostId(Gossiper.instance.getHostId(newNode), newNode);
+     }
+ 
      /**
       * Handle node move to normal state. That is, node is entering token ring and participating
       * in reads.