You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by jasobrown <gi...@git.apache.org> on 2018/01/02 14:04:34 UTC

[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

GitHub user jasobrown opened a pull request:

    https://github.com/apache/cassandra/pull/184

    Cassandra 7544 rebased2

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/aweisberg/cassandra cassandra-7544-rebased2

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/cassandra/pull/184.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #184
    
----
commit f612c604a509356543dd2e02d4b168c63c5dcbef
Author: Ariel Weisberg <aw...@...>
Date:   2017-11-09T16:33:48Z

    checkpoint to roll back

commit 9a6a9494555956f355cba511934f08b8b3c3b204
Author: Ariel Weisberg <aw...@...>
Date:   2017-11-09T17:25:55Z

    Fix last broken unit test

commit 0be7dc2fd173650a5a26e11e1208b22771748f26
Author: Ariel Weisberg <aw...@...>
Date:   2017-11-27T19:00:17Z

    Fix cqlsh.py bug and rename python driver so it picks up the current one.

commit 1e017b10457c1a639d3b2a0a43ad7a44668cf9b2
Author: Ariel Weisberg <aw...@...>
Date:   2017-11-28T18:56:26Z

    Update python driver to rebased version

commit aa2417223d9a2bd5c058738d28b22858ca023cfb
Author: Ariel Weisberg <aw...@...>
Date:   2017-11-28T19:16:36Z

    Update circle.yml to run dtests.

commit 2045f9f9754ef81513540ed48aa0b8d8789a5768
Author: Ariel Weisberg <aw...@...>
Date:   2017-11-29T19:07:58Z

    Update java driver

commit 729bb53dd3311d471c412e6fb6fc4470147a4432
Author: Ariel Weisberg <aw...@...>
Date:   2017-11-29T23:34:13Z

    Fix serialization bug and enable test for the bug.

commit aef0fdc96b4510f7ce5b431e3c50b55576e836eb
Author: Ariel Weisberg <aw...@...>
Date:   2017-11-30T20:59:25Z

    Don't randomly change the types of system table columns.

commit a3ea6dbff53c7ab5ef9b752477c1348fe4258df8
Author: Ariel Weisberg <aw...@...>
Date:   2017-11-30T21:46:37Z

    Ended up with python driver that didn't have the goods in it for some reason.

commit c83925cc65c481cabedea3039e36f0458b3f8d0e
Author: Ariel Weisberg <aw...@...>
Date:   2017-12-01T21:20:56Z

    Don't modify the behavior of a public interface in DynamicEndpointSnitchMBean.

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r163063411
  
    --- Diff: src/java/org/apache/cassandra/db/SystemKeyspace.java ---
    @@ -607,39 +672,65 @@ public static long getTruncatedAt(TableId id)
         /**
          * Record tokens being used by another node
          */
    -    public static synchronized void updateTokens(InetAddress ep, Collection<Token> tokens)
    +    public static synchronized void updateTokens(InetAddressAndPort ep, Collection<Token> tokens)
         {
    -        if (ep.equals(FBUtilities.getBroadcastAddress()))
    +        if (ep.equals(FBUtilities.getBroadcastAddressAndPort()))
                 return;
     
             String req = "INSERT INTO system.%s (peer, tokens) VALUES (?, ?)";
    -        executeInternal(format(req, PEERS), ep, tokensAsSet(tokens));
    +        executeInternal(String.format(req, LEGACY_PEERS), ep.address, tokensAsSet(tokens));
    +        req = "INSERT INTO system.%s (peer, peer_port, tokens) VALUES (?, ?, ?)";
    +        executeInternal(String.format(req, PEERS_V2), ep.address, ep.port, tokensAsSet(tokens));
         }
     
    -    public static synchronized void updatePreferredIP(InetAddress ep, InetAddress preferred_ip)
    +    public static synchronized void updatePreferredIP(InetAddressAndPort ep, InetAddressAndPort preferred_ip)
         {
             if (getPreferredIP(ep) == preferred_ip)
                 return;
     
             String req = "INSERT INTO system.%s (peer, preferred_ip) VALUES (?, ?)";
    -        executeInternal(format(req, PEERS), ep, preferred_ip);
    -        forceBlockingFlush(PEERS);
    +        executeInternal(String.format(req, LEGACY_PEERS), ep.address, preferred_ip.address);
    +        forceBlockingFlush(LEGACY_PEERS);
    --- End diff --
    
    Hah wow that came out buggy and more passive aggressive then I intended. Anyways I added a commit with this. We don't call forceBlockingFlush a lot so I think it's fine.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r160535682
  
    --- Diff: src/java/org/apache/cassandra/net/MessageDeliveryTask.java ---
    @@ -96,19 +98,11 @@ private void handleFailure(Throwable t)
             if (message.doCallbackOnFailure())
             {
                 MessageOut response = new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE)
    -                                                .withParameter(MessagingService.FAILURE_RESPONSE_PARAM, MessagingService.ONE_BYTE);
    +                                                .withParameter(ParameterType.FAILURE_RESPONSE, MessagingService.ONE_BYTE);
     
                 if (t instanceof TombstoneOverwhelmingException)
                 {
    -                try (DataOutputBuffer out = new DataOutputBuffer())
    -                {
    -                    out.writeShort(RequestFailureReason.READ_TOO_MANY_TOMBSTONES.code);
    -                    response = response.withParameter(MessagingService.FAILURE_REASON_PARAM, out.getData());
    -                }
    -                catch (IOException ex)
    -                {
    -                    throw new RuntimeException(ex);
    -                }
    +                response = response.withParameter(ParameterType.FAILURE_REASON.FAILURE_REASON, Shorts.checkedCast(RequestFailureReason.READ_TOO_MANY_TOMBSTONES.code));
    --- End diff --
    
    should `ParameterType.FAILURE_REASON.FAILURE_REASON` be `ParameterType.FAILURE_REASON`? This looks weird with two "`FAILURE_REASON`"s, unless there's some java syntax nugget I'm unfamiliar with. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r163084763
  
    --- Diff: src/java/org/apache/cassandra/net/CompactEndpointSerializationHelper.java ---
    @@ -21,28 +21,142 @@
     import java.net.Inet4Address;
     import java.net.Inet6Address;
     import java.net.InetAddress;
    +import java.nio.ByteBuffer;
     
    -public class CompactEndpointSerializationHelper
    +import org.apache.cassandra.io.IVersionedSerializer;
    +import org.apache.cassandra.io.util.DataInputBuffer;
    +import org.apache.cassandra.io.util.DataInputPlus;
    +import org.apache.cassandra.io.util.DataOutputPlus;
    +import org.apache.cassandra.locator.InetAddressAndPort;
    +import org.apache.cassandra.streaming.messages.StreamMessage;
    +
    +/*
    + * As of version 4.0 the endpoint description includes a port number as an unsigned short
    + */
    +public class CompactEndpointSerializationHelper implements IVersionedSerializer<InetAddressAndPort>
     {
    -    public static void serialize(InetAddress endpoint, DataOutput out) throws IOException
    +    public static final IVersionedSerializer<InetAddressAndPort> instance = new CompactEndpointSerializationHelper();
    +
    +    /**
    +     * Streaming uses it's own version numbering so we need to map those versions to the versions used my regular messaging.
    +     * There are only two variants of the serialization currently so a simple mapping around pre vs post 4.0 is fine.
    +     */
    +    public static final IVersionedSerializer<InetAddressAndPort> streamingInstance = new IVersionedSerializer<InetAddressAndPort>()
         {
    -        byte[] buf = endpoint.getAddress();
    -        out.writeByte(buf.length);
    -        out.write(buf);
    +        public void serialize(InetAddressAndPort inetAddressAndPort, DataOutputPlus out, int version) throws IOException
    +        {
    +            if (version < StreamMessage.VERSION_40)
    +            {
    +                instance.serialize(inetAddressAndPort, out, MessagingService.VERSION_30);
    +            }
    +            else
    +            {
    +                instance.serialize(inetAddressAndPort, out, MessagingService.VERSION_40);
    +            }
    +        }
    +
    +        public InetAddressAndPort deserialize(DataInputPlus in, int version) throws IOException
    +        {
    +            if (version < StreamMessage.VERSION_40)
    +            {
    +                return instance.deserialize(in, MessagingService.VERSION_30);
    +            }
    +            else
    +            {
    +                return instance.deserialize(in, MessagingService.VERSION_40);
    +            }
    +        }
    +
    +        public long serializedSize(InetAddressAndPort inetAddressAndPort, int version)
    +        {
    +            if (version < StreamMessage.VERSION_40)
    +            {
    +                return instance.serializedSize(inetAddressAndPort, MessagingService.VERSION_30);
    +            }
    +            else
    +            {
    +                return instance.serializedSize(inetAddressAndPort, MessagingService.VERSION_40);
    +            }
    +        }
    +    };
    +
    +    private CompactEndpointSerializationHelper() {}
    +
    +    public void serialize(InetAddressAndPort endpoint, DataOutputPlus out, int version) throws IOException
    +    {
    +        if (version >= MessagingService.VERSION_40)
    +        {
    +            byte[] buf = endpoint.address.getAddress();
    +            out.writeByte(buf.length + 2);
    +            out.write(buf);
    +            out.writeShort(endpoint.port);
    +        }
    +        else
    +        {
    +            byte[] buf = endpoint.address.getAddress();
    +            out.writeByte(buf.length);
    +            out.write(buf);
    +        }
         }
     
    -    public static InetAddress deserialize(DataInput in) throws IOException
    +    public InetAddressAndPort deserialize(DataInputPlus in, int version) throws IOException
         {
    -        byte[] bytes = new byte[in.readByte()];
    -        in.readFully(bytes, 0, bytes.length);
    -        return InetAddress.getByAddress(bytes);
    +        int size = in.readByte() & 0xFF;
    +        switch(size)
    +        {
    +            //The original pre-4.0 serialiation of just an address
    +            case 4:
    +            case 16:
    +            {
    +                byte[] bytes = new byte[size];
    --- End diff --
    
    This allocation is unavoidable. You can't construct an InetAddress without it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r160495552
  
    --- Diff: src/java/org/apache/cassandra/gms/Gossiper.java ---
    @@ -1207,12 +1239,27 @@ private void applyNewStates(InetAddress addr, EndpointState localState, Endpoint
             assert remoteState.getHeartBeatState().getGeneration() == localState.getHeartBeatState().getGeneration();
             localState.addApplicationStates(remoteStates);
     
    -        for (Entry<ApplicationState, VersionedValue> remoteEntry : remoteStates)
    +        //Filter out pre-4.0 versions of data for more complete 4.0 versions
    +        Set<Entry<ApplicationState, VersionedValue>> filtered = remoteStates.stream().filter(entry -> {
    --- End diff --
    
    I'm not sure this is wise. 
    a) by returning the alternate `WITH_PORT` variant, we will execute `doOnChangeNotifications` twice for it. I'm not confident that the `IEndpointStateChangeSubscriber` implementations (read: `StorageService`) will be intelligently idempotent. If some block of logic happen to execute twice, tracking down the effects could be miserable.
    b) there may be `IEndpointStateChangeSubscriber` implementations (read: DSE) that might not update correctly and still be expecting the old AppState to work correctly. tbqh, I'm not really worried about this case.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r163073904
  
    --- Diff: src/java/org/apache/cassandra/hadoop/ConfigHelper.java ---
    @@ -349,6 +350,16 @@ public static String getOutputInitialAddress(Configuration conf)
             return conf.get(OUTPUT_INITIAL_ADDRESS);
         }
     
    +    public static void setOutputInitialPort(Configuration conf, Integer port)
    +    {
    +        conf.set(OUTPUT_INITIAL_PORT, port.toString());
    +    }
    +
    +    public static Integer getOutputInitialPort(Configuration conf)
    +    {
    +        return Integer.valueOf(conf.get(OUTPUT_INITIAL_PORT, "7000"));
    --- End diff --
    
    This is the configuration where the default value would be replaced. So if you don't configure it yes you get a default of 7k.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r160283145
  
    --- Diff: src/java/org/apache/cassandra/db/LegacySystemKeyspaceMigrator.java ---
    @@ -0,0 +1,183 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.cassandra.db;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.cassandra.config.DatabaseDescriptor;
    +import org.apache.cassandra.schema.SchemaConstants;
    +import org.apache.cassandra.cql3.QueryProcessor;
    +import org.apache.cassandra.cql3.UntypedResultSet;
    +import org.apache.cassandra.db.marshal.BytesType;
    +import org.apache.cassandra.db.marshal.Int32Type;
    +import org.apache.cassandra.db.marshal.UTF8Type;
    +import org.apache.cassandra.db.marshal.UUIDType;
    +
    +/**
    + * Migrate 3.0 versions of some tables to 4.0. In this case it's just extra columns and some keys
    + * that are changed.
    + *
    + * Can't just add the additional columns because they are primary key columns and C* doesn't support changing
    + * key columns even if it's just clustering columns.
    + */
    +public class LegacySystemKeyspaceMigrator
    +{
    +    static final String legacyPeersName = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.LEGACY_PEERS);
    +    static final String peersName = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.PEERS_V2);
    +    static final String legacyPeerEventsName = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.LEGACY_PEER_EVENTS);
    +    static final String peerEventsName = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.PEER_EVENTS_V2);
    +    static final String legacyTransferredRangesName = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.LEGACY_TRANSFERRED_RANGES);
    +    static final String transferredRangesName = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.TRANSFERRED_RANGES_V2);
    +
    +    private static final Logger logger = LoggerFactory.getLogger(LegacySystemKeyspaceMigrator.class);
    +
    +    private LegacySystemKeyspaceMigrator() {}
    +
    +    public static void migrate()
    +    {
    +        migratePeers();
    +        migratePeerEvents();
    +        migrateLegacyTransferredRanges();
    +    }
    +
    +    private static void migratePeers()
    +    {
    +        ColumnFamilyStore newPeers = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(SystemKeyspace.PEERS_V2);
    +
    +        if (!newPeers.isEmpty())
    +             return;
    +
    +        logger.info("{} table was empty, migrating legacy {}, if this fails you should fix the issue and then truncate {} to have it try again.",
    +                                  peersName, legacyPeersName, peersName);
    +
    +        String query = String.format("SELECT * FROM %s",
    +                                     legacyPeersName);
    +
    +        String insert = String.format("INSERT INTO %s ( "
    +                                      + "peer, "
    +                                      + "peer_port, "
    +                                      + "data_center, "
    +                                      + "host_id, "
    +                                      + "preferred_ip, "
    +                                      + "preferred_port, "
    +                                      + "rack, "
    +                                      + "release_version, "
    +                                      + "native_address, "
    +                                      + "native_port, "
    +                                      + "schema_version, "
    +                                      + "tokens) "
    +                                      + " values ( ?, ?, ? , ? , ?, ?, ?, ?, ?, ?, ?, ?)",
    +                                      peersName);
    +
    +        UntypedResultSet rows = QueryProcessor.executeInternalWithPaging(query, 1000);
    +        int transferred = 0;
    +        for (UntypedResultSet.Row row : rows)
    +        {
    +            logger.debug("Transferring row {}", transferred);
    --- End diff --
    
    trivial nit: all three `migrate*` methods have this same log line. can you add the table name or something more unique?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r163062356
  
    --- Diff: src/java/org/apache/cassandra/db/SystemKeyspace.java ---
    @@ -607,39 +672,65 @@ public static long getTruncatedAt(TableId id)
         /**
          * Record tokens being used by another node
          */
    -    public static synchronized void updateTokens(InetAddress ep, Collection<Token> tokens)
    +    public static synchronized void updateTokens(InetAddressAndPort ep, Collection<Token> tokens)
         {
    -        if (ep.equals(FBUtilities.getBroadcastAddress()))
    +        if (ep.equals(FBUtilities.getBroadcastAddressAndPort()))
                 return;
     
             String req = "INSERT INTO system.%s (peer, tokens) VALUES (?, ?)";
    -        executeInternal(format(req, PEERS), ep, tokensAsSet(tokens));
    +        executeInternal(String.format(req, LEGACY_PEERS), ep.address, tokensAsSet(tokens));
    +        req = "INSERT INTO system.%s (peer, peer_port, tokens) VALUES (?, ?, ?)";
    +        executeInternal(String.format(req, PEERS_V2), ep.address, ep.port, tokensAsSet(tokens));
         }
     
    -    public static synchronized void updatePreferredIP(InetAddress ep, InetAddress preferred_ip)
    +    public static synchronized void updatePreferredIP(InetAddressAndPort ep, InetAddressAndPort preferred_ip)
         {
             if (getPreferredIP(ep) == preferred_ip)
                 return;
     
             String req = "INSERT INTO system.%s (peer, preferred_ip) VALUES (?, ?)";
    -        executeInternal(format(req, PEERS), ep, preferred_ip);
    -        forceBlockingFlush(PEERS);
    +        executeInternal(String.format(req, LEGACY_PEERS), ep.address, preferred_ip.address);
    +        forceBlockingFlush(LEGACY_PEERS);
    --- End diff --
    
    Well... we do this in a lot of places. Some things are called indirectly. I think we can add a bit of complexity to solve a problem we aren't sure we have.
    
    Also is this really an issue with this patch which yes increases the number of places we do this or is it out of scope?
    
    I've already implemented, but going through and seeing the number of places where I wouldn't be applying this due to indirection was disappointing.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r160537959
  
    --- Diff: src/java/org/apache/cassandra/net/MessageIn.java ---
    @@ -89,37 +88,39 @@ private MessageIn(InetAddress from,
     
         public static <T2> MessageIn<T2> read(DataInputPlus in, int version, int id, long constructionTime) throws IOException
         {
    -        InetAddress from = CompactEndpointSerializationHelper.deserialize(in);
    +        InetAddressAndPort from = CompactEndpointSerializationHelper.instance.deserialize(in, version);
     
             MessagingService.Verb verb = MessagingService.Verb.fromId(in.readInt());
    -        Map<String, byte[]> parameters = readParameters(in);
    +        Map<ParameterType, Object> parameters = readParameters(in, version);
             int payloadSize = in.readInt();
             return read(in, version, id, constructionTime, from, payloadSize, verb, parameters);
         }
     
    -    public static Map<String, byte[]> readParameters(DataInputPlus in) throws IOException
    +    public static Map<ParameterType, Object> readParameters(DataInputPlus in, int version) throws IOException
         {
             int parameterCount = in.readInt();
    +        Map<ParameterType, Object> parameters;
             if (parameterCount == 0)
             {
                 return Collections.emptyMap();
             }
             else
             {
    -            ImmutableMap.Builder<String, byte[]> builder = ImmutableMap.builder();
    +            ImmutableMap.Builder<ParameterType, Object> builder = ImmutableMap.builder();
                 for (int i = 0; i < parameterCount; i++)
                 {
                     String key = in.readUTF();
    +                ParameterType type = ParameterType.byName.get(key);
    --- End diff --
    
    Is it better to be defensive here and add a null check? If a `ParameterType` is added in the next version, we could at least ignore it (and skip over the `value`'s bytes, as well).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r160491216
  
    --- Diff: src/java/org/apache/cassandra/gms/Gossiper.java ---
    @@ -1112,21 +1131,34 @@ public boolean isSilentShutdownState(EndpointState epState)
     
         private static String getGossipStatus(EndpointState epState)
         {
    -        if (epState == null || epState.getApplicationState(ApplicationState.STATUS) == null)
    +        if (epState == null)
    +        {
    +            return "";
    +        }
    +
    +        VersionedValue versionedValue = epState.getApplicationState(ApplicationState.STATUS_WITH_PORT);
    +        if (versionedValue == null)
    +        {
    +            versionedValue = epState.getApplicationState(ApplicationState.STATUS);
    +        }
    +
    +        if (versionedValue == null)
    --- End diff --
    
    maybe move this null check into the block a few lines earlier


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra issue #184: Cassandra 7544 rebased2

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on the issue:

    https://github.com/apache/cassandra/pull/184
  
    I can't close this, but we should close it in favor of https://github.com/apache/cassandra/pull/188 which is this rebased yet again.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r163059371
  
    --- Diff: src/java/org/apache/cassandra/db/SystemKeyspace.java ---
    @@ -91,47 +92,52 @@ private SystemKeyspace()
         public static final String PAXOS = "paxos";
         public static final String BUILT_INDEXES = "IndexInfo";
         public static final String LOCAL = "local";
    -    public static final String PEERS = "peers";
    -    public static final String PEER_EVENTS = "peer_events";
    +    public static final String PEERS_V2 = "peers_v2";
    +    public static final String PEER_EVENTS_V2 = "peer_events_v2";
         public static final String RANGE_XFERS = "range_xfers";
         public static final String COMPACTION_HISTORY = "compaction_history";
         public static final String SSTABLE_ACTIVITY = "sstable_activity";
         public static final String SIZE_ESTIMATES = "size_estimates";
         public static final String AVAILABLE_RANGES = "available_ranges";
    -    public static final String TRANSFERRED_RANGES = "transferred_ranges";
    +    public static final String TRANSFERRED_RANGES_V2 = "transferred_ranges_v2";
         public static final String VIEWS_BUILDS_IN_PROGRESS = "views_builds_in_progress";
         public static final String BUILT_VIEWS = "built_views";
         public static final String PREPARED_STATEMENTS = "prepared_statements";
         public static final String REPAIRS = "repairs";
     
    +    @Deprecated public static final String LEGACY_PEERS = "peers";
    +    @Deprecated public static final String LEGACY_PEER_EVENTS = "peer_events";
    +    @Deprecated public static final String LEGACY_TRANSFERRED_RANGES = "transferred_ranges";
    +
         public static final TableMetadata Batches =
             parse(BATCHES,
    -              "batches awaiting replay",
    -              "CREATE TABLE %s ("
    -              + "id timeuuid,"
    -              + "mutations list<blob>,"
    -              + "version int,"
    -              + "PRIMARY KEY ((id)))")
    -              .partitioner(new LocalPartitioner(TimeUUIDType.instance))
    -              .compaction(CompactionParams.scts(singletonMap("min_threshold", "2")))
    -              .build();
    +                "batches awaiting replay",
    +                "CREATE TABLE %s ("
    +                + "id timeuuid,"
    +                + "mutations list<blob>,"
    +                + "version int,"
    +                + "PRIMARY KEY ((id)))")
    +                .partitioner(new LocalPartitioner(TimeUUIDType.instance))
    +                .compaction(CompactionParams.scts(singletonMap("min_threshold", "2")))
    +                .gcGraceSeconds(0)
    --- End diff --
    
    I haven't rebased onto trunk in a while. Aleksey landed https://github.com/apache/cassandra/commit/af3fe39dcabd9ef77a00309ce6741268423206df#diff-ce3f6856b405c96859d9a50d9977e0b9L115 which is when it was removed on trunk. I'll get it when I rebase. And probably a lot of other pain as well. Looks like it's going to be 100% conflicts.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r163086795
  
    --- Diff: src/java/org/apache/cassandra/net/MessageIn.java ---
    @@ -89,37 +88,39 @@ private MessageIn(InetAddress from,
     
         public static <T2> MessageIn<T2> read(DataInputPlus in, int version, int id, long constructionTime) throws IOException
         {
    -        InetAddress from = CompactEndpointSerializationHelper.deserialize(in);
    +        InetAddressAndPort from = CompactEndpointSerializationHelper.instance.deserialize(in, version);
     
             MessagingService.Verb verb = MessagingService.Verb.fromId(in.readInt());
    -        Map<String, byte[]> parameters = readParameters(in);
    +        Map<ParameterType, Object> parameters = readParameters(in, version);
             int payloadSize = in.readInt();
             return read(in, version, id, constructionTime, from, payloadSize, verb, parameters);
         }
     
    -    public static Map<String, byte[]> readParameters(DataInputPlus in) throws IOException
    +    public static Map<ParameterType, Object> readParameters(DataInputPlus in, int version) throws IOException
         {
             int parameterCount = in.readInt();
    +        Map<ParameterType, Object> parameters;
             if (parameterCount == 0)
             {
                 return Collections.emptyMap();
             }
             else
             {
    -            ImmutableMap.Builder<String, byte[]> builder = ImmutableMap.builder();
    +            ImmutableMap.Builder<ParameterType, Object> builder = ImmutableMap.builder();
                 for (int i = 0; i < parameterCount; i++)
                 {
                     String key = in.readUTF();
    +                ParameterType type = ParameterType.byName.get(key);
    --- End diff --
    
    OK, I think no one does do that and it could be added back even in a minor version so I will have it silently skip it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r163095381
  
    --- Diff: src/java/org/apache/cassandra/transport/Server.java ---
    @@ -454,51 +455,32 @@ public static LatestEvent forTopologyChange(Event.TopologyChange.Change change,
     
             // We keep track of the latest status change events we have sent to avoid sending duplicates
             // since StorageService may send duplicate notifications (CASSANDRA-7816, CASSANDRA-8236, CASSANDRA-9156)
    -        private final Map<InetAddress, LatestEvent> latestEvents = new ConcurrentHashMap<>();
    +        private final Map<InetAddressAndPort, LatestEvent> latestEvents = new ConcurrentHashMap<>();
             // We also want to delay delivering a NEW_NODE notification until the new node has set its RPC ready
             // state. This tracks the endpoints which have joined, but not yet signalled they're ready for clients
    -        private final Set<InetAddress> endpointsPendingJoinedNotification = ConcurrentHashMap.newKeySet();
    -
    -
    -        private static final InetAddress bindAll;
    --- End diff --
    
    I looked up the JIRA and it's not necessary anymore. The comment says 
    
                    // Note that after all nodes are running a version that includes CASSANDRA-5899, rpcAddress should
                    // never be 0.0.0.0, so this can eventually be removed.
    
    So we can never hit that path anyway. I think it was useful when introduced, but it's not anymore.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r160496090
  
    --- Diff: src/java/org/apache/cassandra/gms/VersionedValue.java ---
    @@ -138,6 +139,11 @@ public VersionedValue bootReplacing(InetAddress oldNode)
                 return new VersionedValue(versionString(VersionedValue.STATUS_BOOTSTRAPPING_REPLACE, oldNode.getHostAddress()));
             }
     
    +        public VersionedValue bootReplacingWithPort(InetAddressAndPort oldNode)
    --- End diff --
    
    perhaps annotate `bootReplacing()` as `@Deprecated`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r163398496
  
    --- Diff: src/java/org/apache/cassandra/tools/nodetool/Ring.java ---
    @@ -51,72 +51,116 @@
         @Override
         public void execute(NodeProbe probe)
         {
    -        Map<String, String> tokensToEndpoints = probe.getTokenToEndpointMap();
    -        LinkedHashMultimap<String, String> endpointsToTokens = LinkedHashMultimap.create();
    -        boolean haveVnodes = false;
    -        for (Map.Entry<String, String> entry : tokensToEndpoints.entrySet())
    -        {
    -            haveVnodes |= endpointsToTokens.containsKey(entry.getValue());
    -            endpointsToTokens.put(entry.getValue(), entry.getKey());
    -        }
    -
    -        int maxAddressLength = Collections.max(endpointsToTokens.keys(), new Comparator<String>()
    +        try
             {
    -            @Override
    -            public int compare(String first, String second)
    +            Map<String, String> tokensToEndpoints = probe.getTokenToEndpointMap(withPort);
    +            LinkedHashMultimap<String, String> endpointsToTokens = LinkedHashMultimap.create();
    +            boolean haveVnodes = false;
    +            for (Map.Entry<String, String> entry : tokensToEndpoints.entrySet())
                 {
    -            	return Integer.compare(first.length(), second.length());
    +                haveVnodes |= endpointsToTokens.containsKey(entry.getValue());
    +                endpointsToTokens.put(entry.getValue(), entry.getKey());
                 }
    -        }).length();
     
    -        String formatPlaceholder = "%%-%ds  %%-12s%%-7s%%-8s%%-16s%%-20s%%-44s%%n";
    -        String format = format(formatPlaceholder, maxAddressLength);
    +            int maxAddressLength = Collections.max(endpointsToTokens.keys(), new Comparator<String>()
    +            {
    +                @Override
    +                public int compare(String first, String second)
    +                {
    +                    return Integer.compare(first.length(), second.length());
    +                }
    +            }).length();
     
    -        StringBuilder errors = new StringBuilder();
    -        boolean showEffectiveOwnership = true;
    -        // Calculate per-token ownership of the ring
    -        Map<InetAddress, Float> ownerships;
    -        try
    -        {
    -            ownerships = probe.effectiveOwnership(keyspace);
    -        }
    -        catch (IllegalStateException ex)
    -        {
    -            ownerships = probe.getOwnership();
    -            errors.append("Note: ").append(ex.getMessage()).append("%n");
    -            showEffectiveOwnership = false;
    -        }
    -        catch (IllegalArgumentException ex)
    -        {
    -            System.out.printf("%nError: %s%n", ex.getMessage());
    -            return;
    -        }
    +            String formatPlaceholder = "%%-%ds  %%-12s%%-7s%%-8s%%-16s%%-20s%%-44s%%n";
    +            String format = format(formatPlaceholder, maxAddressLength);
     
    +            StringBuilder errors = new StringBuilder();
    +            boolean showEffectiveOwnership = true;
     
    -        System.out.println();
    -        for (Entry<String, SetHostStat> entry : NodeTool.getOwnershipByDc(probe, resolveIp, tokensToEndpoints, ownerships).entrySet())
    -            printDc(probe, format, entry.getKey(), endpointsToTokens, entry.getValue(),showEffectiveOwnership);
    +            if (withPort)
    +            {
    +                // Calculate per-token ownership of the ring
    +                Map<String, Float> ownerships;
    +                try
    +                {
    +                    ownerships = probe.effectiveOwnershipWithPort(keyspace);
    +                }
    +                catch (IllegalStateException ex)
    +                {
    +                    ownerships = probe.getOwnershipWithPort();
    +                    errors.append("Note: ").append(ex.getMessage()).append("%n");
    +                    showEffectiveOwnership = false;
    +                }
    +                catch (IllegalArgumentException ex)
    +                {
    +                    System.out.printf("%nError: %s%n", ex.getMessage());
    +                    return;
    +                }
    +
    +
    +                System.out.println();
    +                for (Entry<String, SetHostStatWithPort> entry : NodeTool.getOwnershipByDcWithPort(probe, resolveIp, tokensToEndpoints, ownerships).entrySet())
    +                    printDc(probe, format, entry.getKey(), endpointsToTokens, entry.getValue(), showEffectiveOwnership);
    +
    +                if (haveVnodes)
    +                {
    +                    System.out.println("  Warning: \"nodetool ring\" is used to output all the tokens of a node.");
    +                    System.out.println("  To view status related info of a node use \"nodetool status\" instead.\n");
    +                }
    +
    +                System.out.printf("%n  " + errors.toString());
    +            }
    +            else
    +            {
    +                // Calculate per-token ownership of the ring
    +                Map<InetAddress, Float> ownerships;
    +                try
    +                {
    +                    ownerships = probe.effectiveOwnership(keyspace);
    +                }
    +                catch (IllegalStateException ex)
    +                {
    +                    ownerships = probe.getOwnership();
    +                    errors.append("Note: ").append(ex.getMessage()).append("%n");
    +                    showEffectiveOwnership = false;
    +                }
    +                catch (IllegalArgumentException ex)
    +                {
    +                    System.out.printf("%nError: %s%n", ex.getMessage());
    +                    return;
    +                }
     
    -        if (haveVnodes)
    +
    +                System.out.println();
    +                for (Entry<String, SetHostStat> entry : NodeTool.getOwnershipByDc(probe, resolveIp, tokensToEndpoints, ownerships).entrySet())
    +                    printDc(probe, format, entry.getKey(), endpointsToTokens, entry.getValue(), showEffectiveOwnership);
    +
    +                if (haveVnodes)
    +                {
    +                    System.out.println("  Warning: \"nodetool ring\" is used to output all the tokens of a node.");
    +                    System.out.println("  To view status related info of a node use \"nodetool status\" instead.\n");
    +                }
    +
    +                System.out.printf("%n  " + errors.toString());
    +            }
    +        } catch (Exception e)
             {
    -            System.out.println("  Warning: \"nodetool ring\" is used to output all the tokens of a node.");
    -            System.out.println("  To view status related info of a node use \"nodetool status\" instead.\n");
    +            e.printStackTrace();
    +            throw e;
             }
    -
    -        System.out.printf("%n  " + errors.toString());
         }
     
         private void printDc(NodeProbe probe, String format,
                              String dc,
                              LinkedHashMultimap<String, String> endpointsToTokens,
                              SetHostStat hoststats,boolean showEffectiveOwnership)
         {
    -        Collection<String> liveNodes = probe.getLiveNodes();
    -        Collection<String> deadNodes = probe.getUnreachableNodes();
    -        Collection<String> joiningNodes = probe.getJoiningNodes();
    -        Collection<String> leavingNodes = probe.getLeavingNodes();
    -        Collection<String> movingNodes = probe.getMovingNodes();
    -        Map<String, String> loadMap = probe.getLoadMap();
    +        Collection<String> liveNodes = probe.getLiveNodes(false);
    --- End diff --
    
    +1


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r163306691
  
    --- Diff: src/java/org/apache/cassandra/gms/ApplicationState.java ---
    @@ -19,24 +19,25 @@
     
     public enum ApplicationState
     {
    -    STATUS,
    +    @Deprecated STATUS, //Deprecated and unsued in 4.0, stop publishing in 5.0, reclaim in 6.0
         LOAD,
         SCHEMA,
         DC,
         RACK,
         RELEASE_VERSION,
         REMOVAL_COORDINATOR,
    -    INTERNAL_IP,
    -    RPC_ADDRESS,
    +    @Deprecated INTERNAL_IP, //Deprecated and unused in 4.0, stop publishing in 5.0, reclaim in 6.0
    +    @Deprecated RPC_ADDRESS, // ^ Same
         X_11_PADDING, // padding specifically for 1.1
         SEVERITY,
         NET_VERSION,
         HOST_ID,
         TOKENS,
         RPC_READY,
         // pad to allow adding new states to existing cluster
    -    X1,
    -    X2,
    +    INTERNAL_ADDRESS_AND_PORT, //Replacement for INTERNAL_IP with up to two ports
    +    NATIVE_ADDRESS_AND_PORT, //Replacement for RPC_ADDRESS
    --- End diff --
    
    Not sure what the optimal thing to do here is. I didn't do this out of the blue and I didn't really want to, but I felt guilty about continuing to call it RPC. It's a lot of code changes to stop using native because there are many places where I made it consistent.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r163092630
  
    --- Diff: src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java ---
    @@ -74,48 +74,50 @@ private SystemDistributedKeyspace()
     
         private static final TableMetadata RepairHistory =
    --- End diff --
    
    Hmm... well primarily because it was extra work and we can drop the columns later. I don't see a  reason not to use the built in schema change capabilities.
    
    Other cases that have a migration and new table do that because the key changed and we don't support changing key columns.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r161926116
  
    --- Diff: conf/cassandra.yaml ---
    @@ -960,6 +967,7 @@ server_encryption_options:
         # cipher_suites: [TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA,TLS_DHE_RSA_WITH_AES_128_CBC_SHA,TLS_DHE_RSA_WITH_AES_256_CBC_SHA,TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA]
         # require_client_auth: false
         # require_endpoint_verification: false
    +    # outgoing_encrypted_port_source: yaml
    --- End diff --
    
    This field is not referenced anywhere in the code (not in `ServerEncrpytionOptions`). Is this just incomplete?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r163094078
  
    --- Diff: src/java/org/apache/cassandra/tools/nodetool/Ring.java ---
    @@ -51,72 +51,116 @@
         @Override
         public void execute(NodeProbe probe)
         {
    -        Map<String, String> tokensToEndpoints = probe.getTokenToEndpointMap();
    -        LinkedHashMultimap<String, String> endpointsToTokens = LinkedHashMultimap.create();
    -        boolean haveVnodes = false;
    -        for (Map.Entry<String, String> entry : tokensToEndpoints.entrySet())
    -        {
    -            haveVnodes |= endpointsToTokens.containsKey(entry.getValue());
    -            endpointsToTokens.put(entry.getValue(), entry.getKey());
    -        }
    -
    -        int maxAddressLength = Collections.max(endpointsToTokens.keys(), new Comparator<String>()
    +        try
             {
    -            @Override
    -            public int compare(String first, String second)
    +            Map<String, String> tokensToEndpoints = probe.getTokenToEndpointMap(withPort);
    +            LinkedHashMultimap<String, String> endpointsToTokens = LinkedHashMultimap.create();
    +            boolean haveVnodes = false;
    +            for (Map.Entry<String, String> entry : tokensToEndpoints.entrySet())
                 {
    -            	return Integer.compare(first.length(), second.length());
    +                haveVnodes |= endpointsToTokens.containsKey(entry.getValue());
    +                endpointsToTokens.put(entry.getValue(), entry.getKey());
                 }
    -        }).length();
     
    -        String formatPlaceholder = "%%-%ds  %%-12s%%-7s%%-8s%%-16s%%-20s%%-44s%%n";
    -        String format = format(formatPlaceholder, maxAddressLength);
    +            int maxAddressLength = Collections.max(endpointsToTokens.keys(), new Comparator<String>()
    +            {
    +                @Override
    +                public int compare(String first, String second)
    +                {
    +                    return Integer.compare(first.length(), second.length());
    +                }
    +            }).length();
     
    -        StringBuilder errors = new StringBuilder();
    -        boolean showEffectiveOwnership = true;
    -        // Calculate per-token ownership of the ring
    -        Map<InetAddress, Float> ownerships;
    -        try
    -        {
    -            ownerships = probe.effectiveOwnership(keyspace);
    -        }
    -        catch (IllegalStateException ex)
    -        {
    -            ownerships = probe.getOwnership();
    -            errors.append("Note: ").append(ex.getMessage()).append("%n");
    -            showEffectiveOwnership = false;
    -        }
    -        catch (IllegalArgumentException ex)
    -        {
    -            System.out.printf("%nError: %s%n", ex.getMessage());
    -            return;
    -        }
    +            String formatPlaceholder = "%%-%ds  %%-12s%%-7s%%-8s%%-16s%%-20s%%-44s%%n";
    +            String format = format(formatPlaceholder, maxAddressLength);
     
    +            StringBuilder errors = new StringBuilder();
    +            boolean showEffectiveOwnership = true;
     
    -        System.out.println();
    -        for (Entry<String, SetHostStat> entry : NodeTool.getOwnershipByDc(probe, resolveIp, tokensToEndpoints, ownerships).entrySet())
    -            printDc(probe, format, entry.getKey(), endpointsToTokens, entry.getValue(),showEffectiveOwnership);
    +            if (withPort)
    +            {
    +                // Calculate per-token ownership of the ring
    +                Map<String, Float> ownerships;
    +                try
    +                {
    +                    ownerships = probe.effectiveOwnershipWithPort(keyspace);
    +                }
    +                catch (IllegalStateException ex)
    +                {
    +                    ownerships = probe.getOwnershipWithPort();
    +                    errors.append("Note: ").append(ex.getMessage()).append("%n");
    +                    showEffectiveOwnership = false;
    +                }
    +                catch (IllegalArgumentException ex)
    +                {
    +                    System.out.printf("%nError: %s%n", ex.getMessage());
    +                    return;
    +                }
    +
    +
    +                System.out.println();
    +                for (Entry<String, SetHostStatWithPort> entry : NodeTool.getOwnershipByDcWithPort(probe, resolveIp, tokensToEndpoints, ownerships).entrySet())
    +                    printDc(probe, format, entry.getKey(), endpointsToTokens, entry.getValue(), showEffectiveOwnership);
    +
    +                if (haveVnodes)
    +                {
    +                    System.out.println("  Warning: \"nodetool ring\" is used to output all the tokens of a node.");
    +                    System.out.println("  To view status related info of a node use \"nodetool status\" instead.\n");
    +                }
    +
    +                System.out.printf("%n  " + errors.toString());
    +            }
    +            else
    +            {
    +                // Calculate per-token ownership of the ring
    +                Map<InetAddress, Float> ownerships;
    +                try
    +                {
    +                    ownerships = probe.effectiveOwnership(keyspace);
    +                }
    +                catch (IllegalStateException ex)
    +                {
    +                    ownerships = probe.getOwnership();
    +                    errors.append("Note: ").append(ex.getMessage()).append("%n");
    +                    showEffectiveOwnership = false;
    +                }
    +                catch (IllegalArgumentException ex)
    +                {
    +                    System.out.printf("%nError: %s%n", ex.getMessage());
    +                    return;
    +                }
     
    -        if (haveVnodes)
    +
    +                System.out.println();
    +                for (Entry<String, SetHostStat> entry : NodeTool.getOwnershipByDc(probe, resolveIp, tokensToEndpoints, ownerships).entrySet())
    +                    printDc(probe, format, entry.getKey(), endpointsToTokens, entry.getValue(), showEffectiveOwnership);
    +
    +                if (haveVnodes)
    +                {
    +                    System.out.println("  Warning: \"nodetool ring\" is used to output all the tokens of a node.");
    +                    System.out.println("  To view status related info of a node use \"nodetool status\" instead.\n");
    +                }
    +
    +                System.out.printf("%n  " + errors.toString());
    +            }
    +        } catch (Exception e)
             {
    -            System.out.println("  Warning: \"nodetool ring\" is used to output all the tokens of a node.");
    -            System.out.println("  To view status related info of a node use \"nodetool status\" instead.\n");
    +            e.printStackTrace();
    +            throw e;
             }
    -
    -        System.out.printf("%n  " + errors.toString());
         }
     
         private void printDc(NodeProbe probe, String format,
                              String dc,
                              LinkedHashMultimap<String, String> endpointsToTokens,
                              SetHostStat hoststats,boolean showEffectiveOwnership)
         {
    -        Collection<String> liveNodes = probe.getLiveNodes();
    -        Collection<String> deadNodes = probe.getUnreachableNodes();
    -        Collection<String> joiningNodes = probe.getJoiningNodes();
    -        Collection<String> leavingNodes = probe.getLeavingNodes();
    -        Collection<String> movingNodes = probe.getMovingNodes();
    -        Map<String, String> loadMap = probe.getLoadMap();
    +        Collection<String> liveNodes = probe.getLiveNodes(false);
    --- End diff --
    
    I know it's ugly, but eventually we can just get rid of the old way. It's going to converge on what we want soon enough.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r163397667
  
    --- Diff: src/java/org/apache/cassandra/net/MessagingService.java ---
    @@ -766,15 +766,16 @@ private void listen(InetAddress localEp, ServerEncryptionOptions serverEncryptio
                 ServerEncryptionOptions legacyEncOptions = new ServerEncryptionOptions(serverEncryptionOptions);
                 legacyEncOptions.optional = false;
     
    -            InetSocketAddress localAddr = new InetSocketAddress(localEp, DatabaseDescriptor.getSSLStoragePort());
    +            InetAddressAndPort localAddr = InetAddressAndPort.getByAddressOverrideDefaults(localEp.address, DatabaseDescriptor.getSSLStoragePort());
                 ChannelGroup channelGroup = new DefaultChannelGroup("LegacyEncryptedInternodeMessagingGroup", NettyFactory.executorForChannelGroups());
                 InboundInitializer initializer = new InboundInitializer(authenticator, legacyEncOptions, channelGroup);
                 Channel encryptedChannel = NettyFactory.instance.createInboundChannel(localAddr, initializer, receiveBufferSize);
                 serverChannels.add(new ServerChannel(encryptedChannel, channelGroup, localAddr, ServerChannel.SecurityLevel.REQUIRED));
             }
     
             // this is for the socket that can be plain, only ssl, or optional plain/ssl
    -        InetSocketAddress localAddr = new InetSocketAddress(localEp, DatabaseDescriptor.getStoragePort());
    +        assert localEp.port == DatabaseDescriptor.getStoragePort() : String.format("Local endpoint port %d doesn't match YAML configured port %d%n", localEp.port, DatabaseDescriptor.getStoragePort());
    --- End diff --
    
    sgtm


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r160538757
  
    --- Diff: src/java/org/apache/cassandra/net/MessageOut.java ---
    @@ -133,11 +139,13 @@ public MessageOut(InetAddress from, MessagingService.Verb verb, T payload, IVers
             this.parameters = parameters;
         }
     
    -    public MessageOut<T> withParameter(String key, byte[] value)
    +    public <VT> MessageOut<T> withParameter(ParameterType type, VT value)
         {
    -        ImmutableMap.Builder<String, byte[]> builder = ImmutableMap.builder();
    -        builder.putAll(parameters).put(key, value);
    -        return new MessageOut<T>(verb, payload, serializer, builder.build());
    +        List<Object> newParameters = new ArrayList<>(parameters.size() + 3);
    --- End diff --
    
    why `+ 3` instead of `+ 2`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r160518717
  
    --- Diff: src/java/org/apache/cassandra/net/CompactEndpointSerializationHelper.java ---
    @@ -21,28 +21,142 @@
     import java.net.Inet4Address;
     import java.net.Inet6Address;
     import java.net.InetAddress;
    +import java.nio.ByteBuffer;
     
    -public class CompactEndpointSerializationHelper
    +import org.apache.cassandra.io.IVersionedSerializer;
    +import org.apache.cassandra.io.util.DataInputBuffer;
    +import org.apache.cassandra.io.util.DataInputPlus;
    +import org.apache.cassandra.io.util.DataOutputPlus;
    +import org.apache.cassandra.locator.InetAddressAndPort;
    +import org.apache.cassandra.streaming.messages.StreamMessage;
    +
    +/*
    + * As of version 4.0 the endpoint description includes a port number as an unsigned short
    + */
    +public class CompactEndpointSerializationHelper implements IVersionedSerializer<InetAddressAndPort>
     {
    -    public static void serialize(InetAddress endpoint, DataOutput out) throws IOException
    +    public static final IVersionedSerializer<InetAddressAndPort> instance = new CompactEndpointSerializationHelper();
    +
    +    /**
    +     * Streaming uses it's own version numbering so we need to map those versions to the versions used my regular messaging.
    +     * There are only two variants of the serialization currently so a simple mapping around pre vs post 4.0 is fine.
    +     */
    +    public static final IVersionedSerializer<InetAddressAndPort> streamingInstance = new IVersionedSerializer<InetAddressAndPort>()
         {
    -        byte[] buf = endpoint.getAddress();
    -        out.writeByte(buf.length);
    -        out.write(buf);
    +        public void serialize(InetAddressAndPort inetAddressAndPort, DataOutputPlus out, int version) throws IOException
    +        {
    +            if (version < StreamMessage.VERSION_40)
    +            {
    +                instance.serialize(inetAddressAndPort, out, MessagingService.VERSION_30);
    +            }
    +            else
    +            {
    +                instance.serialize(inetAddressAndPort, out, MessagingService.VERSION_40);
    +            }
    +        }
    +
    +        public InetAddressAndPort deserialize(DataInputPlus in, int version) throws IOException
    +        {
    +            if (version < StreamMessage.VERSION_40)
    +            {
    +                return instance.deserialize(in, MessagingService.VERSION_30);
    +            }
    +            else
    +            {
    +                return instance.deserialize(in, MessagingService.VERSION_40);
    +            }
    +        }
    +
    +        public long serializedSize(InetAddressAndPort inetAddressAndPort, int version)
    +        {
    +            if (version < StreamMessage.VERSION_40)
    +            {
    +                return instance.serializedSize(inetAddressAndPort, MessagingService.VERSION_30);
    +            }
    +            else
    +            {
    +                return instance.serializedSize(inetAddressAndPort, MessagingService.VERSION_40);
    +            }
    +        }
    +    };
    +
    +    private CompactEndpointSerializationHelper() {}
    +
    +    public void serialize(InetAddressAndPort endpoint, DataOutputPlus out, int version) throws IOException
    +    {
    +        if (version >= MessagingService.VERSION_40)
    +        {
    +            byte[] buf = endpoint.address.getAddress();
    +            out.writeByte(buf.length + 2);
    +            out.write(buf);
    +            out.writeShort(endpoint.port);
    +        }
    +        else
    +        {
    +            byte[] buf = endpoint.address.getAddress();
    +            out.writeByte(buf.length);
    +            out.write(buf);
    +        }
         }
     
    -    public static InetAddress deserialize(DataInput in) throws IOException
    +    public InetAddressAndPort deserialize(DataInputPlus in, int version) throws IOException
         {
    -        byte[] bytes = new byte[in.readByte()];
    -        in.readFully(bytes, 0, bytes.length);
    -        return InetAddress.getByAddress(bytes);
    +        int size = in.readByte() & 0xFF;
    +        switch(size)
    +        {
    +            //The original pre-4.0 serialiation of just an address
    +            case 4:
    +            case 16:
    +            {
    +                byte[] bytes = new byte[size];
    +                in.readFully(bytes, 0, bytes.length);
    +                return InetAddressAndPort.getByAddress(bytes);
    +            }
    +            //Address and one port
    +            case 6:
    +            case 18:
    +            {
    +                byte[] bytes = new byte[size - 2];
    +                in.readFully(bytes);
    +
    +                int port = in.readShort() & 0xFFFF;
    +                return InetAddressAndPort.getByAddressOverrideDefaults(InetAddress.getByAddress(bytes), port);
    +            }
    +            default:
    +                throw new AssertionError("Unexpected size " + size);
    +
    +        }
         }
     
    -    public static int serializedSize(InetAddress from)
    +    public long serializedSize(InetAddressAndPort from, int version)
         {
    -        if (from instanceof Inet4Address)
    -            return 1 + 4;
    -        assert from instanceof Inet6Address;
    -        return 1 + 16;
    +        //4.0 includes a port number
    +        if (version >= MessagingService.VERSION_40)
    +        {
    +            if (from.address instanceof Inet4Address)
    +                return 1 + 4 + 2;
    +            assert from.address instanceof Inet6Address;
    +            return 1 + 16 + 2;
    +        }
    +        else
    +        {
    +            if (from.address instanceof Inet4Address)
    +                return 1 + 4;
    +            assert from.address instanceof Inet6Address;
    +            return 1 + 16;
    +        }
         }
    +
    +    public static InetAddressAndPort fromBytes(ByteBuffer buffer, int version)
    --- End diff --
    
    this method doesn't appear to be used anywhere. maybe remove if unused?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r163399770
  
    --- Diff: src/java/org/apache/cassandra/transport/Server.java ---
    @@ -454,51 +455,32 @@ public static LatestEvent forTopologyChange(Event.TopologyChange.Change change,
     
             // We keep track of the latest status change events we have sent to avoid sending duplicates
             // since StorageService may send duplicate notifications (CASSANDRA-7816, CASSANDRA-8236, CASSANDRA-9156)
    -        private final Map<InetAddress, LatestEvent> latestEvents = new ConcurrentHashMap<>();
    +        private final Map<InetAddressAndPort, LatestEvent> latestEvents = new ConcurrentHashMap<>();
             // We also want to delay delivering a NEW_NODE notification until the new node has set its RPC ready
             // state. This tracks the endpoints which have joined, but not yet signalled they're ready for clients
    -        private final Set<InetAddress> endpointsPendingJoinedNotification = ConcurrentHashMap.newKeySet();
    -
    -
    -        private static final InetAddress bindAll;
    --- End diff --
    
    hmm, looks like I didn't read that comment. But yes, you are correct, so +1 here


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r163085031
  
    --- Diff: src/java/org/apache/cassandra/net/ForwardToContainer.java ---
    @@ -0,0 +1,40 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.cassandra.net;
    +
    +import java.util.Collection;
    +
    +import org.apache.cassandra.locator.InetAddressAndPort;
    +
    +/**
    + * Contains forward to information until it can be serialized as part of a message using a version
    + * specific serialization
    + */
    +public class ForwardToContainer
    +{
    +    public final Collection<InetAddressAndPort> targets;
    +    public final int[] messageIds;
    +
    +    public ForwardToContainer(Collection<InetAddressAndPort> targets,
    +                              int[] messageIds)
    +    {
    +        this.targets = targets;
    --- End diff --
    
    Good idea for it to fail on construction.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r163071575
  
    --- Diff: src/java/org/apache/cassandra/gms/Gossiper.java ---
    @@ -1207,12 +1239,27 @@ private void applyNewStates(InetAddress addr, EndpointState localState, Endpoint
             assert remoteState.getHeartBeatState().getGeneration() == localState.getHeartBeatState().getGeneration();
             localState.addApplicationStates(remoteStates);
     
    -        for (Entry<ApplicationState, VersionedValue> remoteEntry : remoteStates)
    +        //Filter out pre-4.0 versions of data for more complete 4.0 versions
    +        Set<Entry<ApplicationState, VersionedValue>> filtered = remoteStates.stream().filter(entry -> {
    --- End diff --
    
    uggh. nevermind. I am incorrect here. this lambda is for the filter function, not a "possibly replace old value with new value" function.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r163097955
  
    --- Diff: test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java ---
    @@ -531,7 +530,7 @@ private void testPrepareWithLWT(ProtocolVersion version) throws Throwable
         @Test
         public void testPrepareWithBatchLWT() throws Throwable
         {
    -        testPrepareWithBatchLWT(ProtocolVersion.V4);
    +//        testPrepareWithBatchLWT(ProtocolVersion.V4);
    --- End diff --
    
    No I think I was debugging and commented it out.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r162180384
  
    --- Diff: src/java/org/apache/cassandra/gms/ApplicationState.java ---
    @@ -19,24 +19,25 @@
     
     public enum ApplicationState
     {
    -    STATUS,
    +    @Deprecated STATUS, //Deprecated and unsued in 4.0, stop publishing in 5.0, reclaim in 6.0
         LOAD,
         SCHEMA,
         DC,
         RACK,
         RELEASE_VERSION,
         REMOVAL_COORDINATOR,
    -    INTERNAL_IP,
    -    RPC_ADDRESS,
    +    @Deprecated INTERNAL_IP, //Deprecated and unused in 4.0, stop publishing in 5.0, reclaim in 6.0
    +    @Deprecated RPC_ADDRESS, // ^ Same
         X_11_PADDING, // padding specifically for 1.1
         SEVERITY,
         NET_VERSION,
         HOST_ID,
         TOKENS,
         RPC_READY,
         // pad to allow adding new states to existing cluster
    -    X1,
    -    X2,
    +    INTERNAL_ADDRESS_AND_PORT, //Replacement for INTERNAL_IP with up to two ports
    +    NATIVE_ADDRESS_AND_PORT, //Replacement for RPC_ADDRESS
    --- End diff --
    
    It's unclear to me why you renamed `RPC_ADDRESS` to `NATIVE_ADDRESS`. That make it really easy to confuse "the addr/port to be used between peers" as opposed to "the addr/port open for client apps/drivers".  Perhaps a better name is `INTERNODE_ADDRESS_AND_PORT`? (or INTERNAL_ or PEER_ or ...)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r160281918
  
    --- Diff: src/java/org/apache/cassandra/db/LegacySystemKeyspaceMigrator.java ---
    @@ -0,0 +1,183 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.cassandra.db;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.cassandra.config.DatabaseDescriptor;
    +import org.apache.cassandra.schema.SchemaConstants;
    +import org.apache.cassandra.cql3.QueryProcessor;
    +import org.apache.cassandra.cql3.UntypedResultSet;
    +import org.apache.cassandra.db.marshal.BytesType;
    +import org.apache.cassandra.db.marshal.Int32Type;
    +import org.apache.cassandra.db.marshal.UTF8Type;
    +import org.apache.cassandra.db.marshal.UUIDType;
    +
    +/**
    + * Migrate 3.0 versions of some tables to 4.0. In this case it's just extra columns and some keys
    + * that are changed.
    + *
    + * Can't just add the additional columns because they are primary key columns and C* doesn't support changing
    + * key columns even if it's just clustering columns.
    + */
    +public class LegacySystemKeyspaceMigrator
    +{
    +    static final String legacyPeersName = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.LEGACY_PEERS);
    +    static final String peersName = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.PEERS_V2);
    +    static final String legacyPeerEventsName = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.LEGACY_PEER_EVENTS);
    +    static final String peerEventsName = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.PEER_EVENTS_V2);
    +    static final String legacyTransferredRangesName = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.LEGACY_TRANSFERRED_RANGES);
    +    static final String transferredRangesName = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.TRANSFERRED_RANGES_V2);
    +
    +    private static final Logger logger = LoggerFactory.getLogger(LegacySystemKeyspaceMigrator.class);
    +
    +    private LegacySystemKeyspaceMigrator() {}
    +
    +    public static void migrate()
    +    {
    +        migratePeers();
    +        migratePeerEvents();
    +        migrateLegacyTransferredRanges();
    +    }
    +
    +    private static void migratePeers()
    +    {
    +        ColumnFamilyStore newPeers = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(SystemKeyspace.PEERS_V2);
    +
    +        if (!newPeers.isEmpty())
    +             return;
    +
    +        logger.info("{} table was empty, migrating legacy {}, if this fails you should fix the issue and then truncate {} to have it try again.",
    +                                  peersName, legacyPeersName, peersName);
    +
    +        String query = String.format("SELECT * FROM %s",
    +                                     legacyPeersName);
    +
    +        String insert = String.format("INSERT INTO %s ( "
    +                                      + "peer, "
    +                                      + "peer_port, "
    +                                      + "data_center, "
    +                                      + "host_id, "
    +                                      + "preferred_ip, "
    +                                      + "preferred_port, "
    +                                      + "rack, "
    +                                      + "release_version, "
    +                                      + "native_address, "
    +                                      + "native_port, "
    +                                      + "schema_version, "
    +                                      + "tokens) "
    +                                      + " values ( ?, ?, ? , ? , ?, ?, ?, ?, ?, ?, ?, ?)",
    +                                      peersName);
    +
    +        UntypedResultSet rows = QueryProcessor.executeInternalWithPaging(query, 1000);
    +        int transferred = 0;
    +        for (UntypedResultSet.Row row : rows)
    +        {
    +            logger.debug("Transferring row {}", transferred);
    +            QueryProcessor.executeInternal(insert,
    +                                           row.has("peer") ? row.getInetAddress("peer") : null,
    +                                           DatabaseDescriptor.getStoragePort(),
    +                                           row.has("data_center") ? row.getString("data_center") : null,
    +                                           row.has("host_id") ? row.getUUID("host_id") : null,
    +                                           row.has("preferred_ip") ? row.getInetAddress("preferred_ip") : null,
    +                                           DatabaseDescriptor.getStoragePort(),
    +                                           row.has("rack") ? row.getString("rack") : null,
    +                                           row.has("release_version") ? row.getString("release_version") : null,
    +                                           row.has("rpc_address") ? row.getInetAddress("rpc_address") : null,
    +                                           DatabaseDescriptor.getNativeTransportPort(),
    +                                           row.has("schema_version") ? row.getUUID("schema_version") : null,
    +                                           row.has("tokens") ? row.getSet("tokens", UTF8Type.instance) : null);
    +            transferred++;
    +        }
    +        logger.info("Migrated {} rows from legacy {} to {}", transferred, legacyPeersName, peersName);
    +    }
    +
    +    private static void migratePeerEvents()
    +    {
    +        ColumnFamilyStore newPeerEvents = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(SystemKeyspace.PEER_EVENTS_V2);
    +
    +        if (!newPeerEvents.isEmpty())
    +            return;
    +
    +        logger.info("{} table was empty, migrating legacy {}", peerEventsName, legacyPeerEventsName);
    +
    +        String query = String.format("SELECT * FROM %s",
    +                                     legacyPeerEventsName);
    +
    +        String insert = String.format("INSERT INTO %s ( "
    +                                      + "peer, "
    +                                      + "peer_port, "
    +                                      + "hints_dropped) "
    +                                      + " values ( ?, ?, ? )",
    +                                      peerEventsName);
    +
    +        UntypedResultSet rows = QueryProcessor.executeInternalWithPaging(query, 1000);
    +        int transferred = 0;
    +        for (UntypedResultSet.Row row : rows)
    +        {
    +            logger.debug("Transferring row {}", transferred);
    +            QueryProcessor.executeInternal(insert,
    +                                           row.has("peer") ? row.getInetAddress("peer") : null,
    +                                           DatabaseDescriptor.getStoragePort(),
    +                                           row.has("hints_dropped") ? row.getMap("hints_dropped", UUIDType.instance, Int32Type.instance) : null);
    +            transferred++;
    +        }
    +        logger.info("Migrated {} rows from legacy {} to {}", transferred, legacyPeerEventsName, peerEventsName);
    +    }
    +
    +    static void migrateLegacyTransferredRanges()
    --- End diff --
    
    pretty naming nit: remove 'legacy' from method name as you didn't do that on the other methods.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r160284046
  
    --- Diff: src/java/org/apache/cassandra/db/SystemKeyspace.java ---
    @@ -91,47 +92,52 @@ private SystemKeyspace()
         public static final String PAXOS = "paxos";
         public static final String BUILT_INDEXES = "IndexInfo";
         public static final String LOCAL = "local";
    -    public static final String PEERS = "peers";
    -    public static final String PEER_EVENTS = "peer_events";
    +    public static final String PEERS_V2 = "peers_v2";
    +    public static final String PEER_EVENTS_V2 = "peer_events_v2";
         public static final String RANGE_XFERS = "range_xfers";
         public static final String COMPACTION_HISTORY = "compaction_history";
         public static final String SSTABLE_ACTIVITY = "sstable_activity";
         public static final String SIZE_ESTIMATES = "size_estimates";
         public static final String AVAILABLE_RANGES = "available_ranges";
    -    public static final String TRANSFERRED_RANGES = "transferred_ranges";
    +    public static final String TRANSFERRED_RANGES_V2 = "transferred_ranges_v2";
         public static final String VIEWS_BUILDS_IN_PROGRESS = "views_builds_in_progress";
         public static final String BUILT_VIEWS = "built_views";
         public static final String PREPARED_STATEMENTS = "prepared_statements";
         public static final String REPAIRS = "repairs";
     
    +    @Deprecated public static final String LEGACY_PEERS = "peers";
    +    @Deprecated public static final String LEGACY_PEER_EVENTS = "peer_events";
    +    @Deprecated public static final String LEGACY_TRANSFERRED_RANGES = "transferred_ranges";
    +
         public static final TableMetadata Batches =
             parse(BATCHES,
    -              "batches awaiting replay",
    -              "CREATE TABLE %s ("
    -              + "id timeuuid,"
    -              + "mutations list<blob>,"
    -              + "version int,"
    -              + "PRIMARY KEY ((id)))")
    -              .partitioner(new LocalPartitioner(TimeUUIDType.instance))
    -              .compaction(CompactionParams.scts(singletonMap("min_threshold", "2")))
    -              .build();
    +                "batches awaiting replay",
    +                "CREATE TABLE %s ("
    +                + "id timeuuid,"
    +                + "mutations list<blob>,"
    +                + "version int,"
    +                + "PRIMARY KEY ((id)))")
    +                .partitioner(new LocalPartitioner(TimeUUIDType.instance))
    +                .compaction(CompactionParams.scts(singletonMap("min_threshold", "2")))
    +                .gcGraceSeconds(0)
    --- End diff --
    
    Is this suppossed to be here? It's not on trunk


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r163306795
  
    --- Diff: conf/cassandra.yaml ---
    @@ -960,6 +967,7 @@ server_encryption_options:
         # cipher_suites: [TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA,TLS_DHE_RSA_WITH_AES_128_CBC_SHA,TLS_DHE_RSA_WITH_AES_256_CBC_SHA,TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA]
         # require_client_auth: false
         # require_endpoint_verification: false
    +    # outgoing_encrypted_port_source: yaml
    --- End diff --
    
    It's out of date. I implemented this functionality, but then you implemented and merged competing functionality. I"ll remove it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r163304964
  
    --- Diff: src/java/org/apache/cassandra/gms/ApplicationState.java ---
    @@ -19,24 +19,25 @@
     
     public enum ApplicationState
     {
    -    STATUS,
    +    @Deprecated STATUS, //Deprecated and unsued in 4.0, stop publishing in 5.0, reclaim in 6.0
         LOAD,
         SCHEMA,
         DC,
         RACK,
         RELEASE_VERSION,
         REMOVAL_COORDINATOR,
    -    INTERNAL_IP,
    -    RPC_ADDRESS,
    +    @Deprecated INTERNAL_IP, //Deprecated and unused in 4.0, stop publishing in 5.0, reclaim in 6.0
    +    @Deprecated RPC_ADDRESS, // ^ Same
         X_11_PADDING, // padding specifically for 1.1
         SEVERITY,
         NET_VERSION,
         HOST_ID,
         TOKENS,
         RPC_READY,
         // pad to allow adding new states to existing cluster
    -    X1,
    -    X2,
    +    INTERNAL_ADDRESS_AND_PORT, //Replacement for INTERNAL_IP with up to two ports
    +    NATIVE_ADDRESS_AND_PORT, //Replacement for RPC_ADDRESS
    --- End diff --
    
    native address is what it is now because thrift RPC is gone. Now what I remember is I did this because I thought we stopped using rpc_address in the yaml and renamed it to native_address. And now I am very confused because that doesn't seem to be the case. I was trying to consistently use native_address instead of rpc_address now that thrift is gone.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r163092194
  
    --- Diff: src/java/org/apache/cassandra/net/MessagingService.java ---
    @@ -766,15 +766,16 @@ private void listen(InetAddress localEp, ServerEncryptionOptions serverEncryptio
                 ServerEncryptionOptions legacyEncOptions = new ServerEncryptionOptions(serverEncryptionOptions);
                 legacyEncOptions.optional = false;
     
    -            InetSocketAddress localAddr = new InetSocketAddress(localEp, DatabaseDescriptor.getSSLStoragePort());
    +            InetAddressAndPort localAddr = InetAddressAndPort.getByAddressOverrideDefaults(localEp.address, DatabaseDescriptor.getSSLStoragePort());
                 ChannelGroup channelGroup = new DefaultChannelGroup("LegacyEncryptedInternodeMessagingGroup", NettyFactory.executorForChannelGroups());
                 InboundInitializer initializer = new InboundInitializer(authenticator, legacyEncOptions, channelGroup);
                 Channel encryptedChannel = NettyFactory.instance.createInboundChannel(localAddr, initializer, receiveBufferSize);
                 serverChannels.add(new ServerChannel(encryptedChannel, channelGroup, localAddr, ServerChannel.SecurityLevel.REQUIRED));
             }
     
             // this is for the socket that can be plain, only ssl, or optional plain/ssl
    -        InetSocketAddress localAddr = new InetSocketAddress(localEp, DatabaseDescriptor.getStoragePort());
    +        assert localEp.port == DatabaseDescriptor.getStoragePort() : String.format("Local endpoint port %d doesn't match YAML configured port %d%n", localEp.port, DatabaseDescriptor.getStoragePort());
    --- End diff --
    
    I probably encountered an incorrectly sourced port value at some point. If we send it out with the wrong port we'll never get the response :-)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r160490807
  
    --- Diff: src/java/org/apache/cassandra/gms/Gossiper.java ---
    @@ -623,18 +641,19 @@ else if (newState.getHeartBeatState().getHeartBeatVersion() != heartbeat)
             }
     
             // do not pass go, do not collect 200 dollars, just gtfo
    +        epState.addApplicationState(ApplicationState.STATUS_WITH_PORT, StorageService.instance.valueFactory.left(tokens, computeExpireTime()));
    --- End diff --
    
    is it better to `computeExpireTime()` once and pass the same value into each of the `valueFactory` calls?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r163081197
  
    --- Diff: src/java/org/apache/cassandra/locator/InetAddressAndPort.java ---
    @@ -0,0 +1,193 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.cassandra.locator;
    +
    +import java.io.Serializable;
    +import java.net.Inet6Address;
    +import java.net.InetAddress;
    +import java.net.UnknownHostException;
    +import java.nio.ByteBuffer;
    +import java.util.regex.Pattern;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.net.HostAndPort;
    +
    +import org.apache.cassandra.config.Config;
    +
    +/**
    + * A class to replace the usage of InetAddress to identify hosts in the cluster.
    + * Opting for a full replacement class so that in the future if we change the nature
    + * of the identifier the refactor will be easier in that we don't have to change the type
    + * just the methods.
    + *
    + * Because an IP might contain multiple C* instances the identification must be done
    + * using the IP + port. InetSocketAddress is undesirable for a couple of reasons. It's not comparable,
    + * it's toString() method doesn't correctly bracket IPv6, it doesn't handle optional default values,
    + * and a couple of other minor behaviors that are slightly less troublesome like handling the
    + * need to sometimes return a port and sometimes not.
    + *
    + */
    +public final class InetAddressAndPort implements Comparable<InetAddressAndPort>, Serializable
    +{
    +    private static final long serialVersionUID = 0;
    +
    +    //Store these here to avoid requiring DatabaseDescriptor to be loaded. DatabaseDescriptor will set
    +    //these when it loads the config. A lot of unit tests won't end up loading DatabaseDescriptor.
    +    //Tools that might use this class also might not load database descriptor. Those tools are expected
    +    //to always override the defaults.
    +    static volatile int defaultPort = 7000;
    +
    +    public final InetAddress address;
    +    public final int port;
    +
    +    private InetAddressAndPort(InetAddress address, int port)
    +    {
    +        Preconditions.checkNotNull(address);
    +        validatePortRange(port);
    +        this.address = address;
    +        this.port = port;
    +    }
    +
    +    private static void validatePortRange(int port)
    +    {
    +        if (port < 0 | port > 65535)
    +        {
    +            throw new IllegalArgumentException("Port " + port + " is not a valid port number in the range 0-65535");
    +        }
    +    }
    +
    +    @Override
    +    public boolean equals(Object o)
    +    {
    +        if (this == o) return true;
    +        if (o == null || getClass() != o.getClass()) return false;
    +
    +        InetAddressAndPort that = (InetAddressAndPort) o;
    +
    +        if (port != that.port) return false;
    +        return address.equals(that.address);
    +    }
    +
    +    @Override
    +    public int hashCode()
    +    {
    +        int result = address.hashCode();
    +        result = 31 * result + port;
    +        return result;
    +    }
    +
    +    @Override
    +    public int compareTo(InetAddressAndPort o)
    +    {
    +        int retval = ByteBuffer.wrap(address.getAddress()).compareTo(ByteBuffer.wrap(o.address.getAddress()));
    --- End diff --
    
    Well it's not for whatever reason. They are defensive copies.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra issue #184: Cassandra 7544 rebased2

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on the issue:

    https://github.com/apache/cassandra/pull/184
  
    closing as 7544 has been committed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r163301627
  
    --- Diff: src/java/org/apache/cassandra/db/SystemKeyspace.java ---
    @@ -145,168 +152,218 @@ private SystemKeyspace()
     
         private static final TableMetadata Local =
             parse(LOCAL,
    -              "information about the local node",
    -              "CREATE TABLE %s ("
    -              + "key text,"
    -              + "bootstrapped text,"
    -              + "broadcast_address inet,"
    -              + "cluster_name text,"
    -              + "cql_version text,"
    -              + "data_center text,"
    -              + "gossip_generation int,"
    -              + "host_id uuid,"
    -              + "listen_address inet,"
    -              + "native_protocol_version text,"
    -              + "partitioner text,"
    -              + "rack text,"
    -              + "release_version text,"
    -              + "rpc_address inet,"
    -              + "schema_version uuid,"
    -              + "tokens set<varchar>,"
    -              + "truncated_at map<uuid, blob>,"
    -              + "PRIMARY KEY ((key)))")
    -              .recordDeprecatedSystemColumn("thrift_version", UTF8Type.instance)
    -              .build();
    -
    -    private static final TableMetadata Peers =
    -        parse(PEERS,
    -              "information about known peers in the cluster",
    -              "CREATE TABLE %s ("
    -              + "peer inet,"
    -              + "data_center text,"
    -              + "host_id uuid,"
    -              + "preferred_ip inet,"
    -              + "rack text,"
    -              + "release_version text,"
    -              + "rpc_address inet,"
    -              + "schema_version uuid,"
    -              + "tokens set<varchar>,"
    -              + "PRIMARY KEY ((peer)))")
    -              .build();
    -
    -    private static final TableMetadata PeerEvents =
    -        parse(PEER_EVENTS,
    -              "events related to peers",
    -              "CREATE TABLE %s ("
    -              + "peer inet,"
    -              + "hints_dropped map<uuid, int>,"
    -              + "PRIMARY KEY ((peer)))")
    -              .build();
    +                "information about the local node",
    +                "CREATE TABLE %s ("
    +                + "key text,"
    +                + "bootstrapped text,"
    +                + "broadcast_address inet,"
    +                + "broadcast_port int,"
    +                + "cluster_name text,"
    +                + "cql_version text,"
    +                + "data_center text,"
    +                + "gossip_generation int,"
    +                + "host_id uuid,"
    +                + "listen_address inet,"
    +                + "listen_port int,"
    +                + "native_protocol_version text,"
    +                + "partitioner text,"
    +                + "rack text,"
    +                + "release_version text,"
    +                + "rpc_address inet,"
    +                + "rpc_port int,"
    +                + "schema_version uuid,"
    +                + "tokens set<varchar>,"
    +                + "truncated_at map<uuid, blob>,"
    +                + "PRIMARY KEY ((key)))"
    +                ).recordDeprecatedSystemColumn("thrift_version", UTF8Type.instance)
    +                .build();
    +
    +    private static final TableMetadata PeersV2 =
    +        parse(PEERS_V2,
    +                "information about known peers in the cluster",
    +                "CREATE TABLE %s ("
    +                + "peer inet,"
    +                + "peer_port int,"
    +                + "data_center text,"
    +                + "host_id uuid,"
    +                + "preferred_ip inet,"
    +                + "preferred_port int,"
    +                + "rack text,"
    +                + "release_version text,"
    +                + "native_address inet,"
    +                + "native_port int,"
    +                + "schema_version uuid,"
    +                + "tokens set<varchar>,"
    +                + "PRIMARY KEY ((peer), peer_port))")
    +                .build();
    +
    +    private static final TableMetadata PeerEventsV2 =
    +        parse(PEER_EVENTS_V2,
    +                "events related to peers",
    +                "CREATE TABLE %s ("
    +                + "peer inet,"
    +                + "peer_port int,"
    +                + "hints_dropped map<uuid, int>,"
    +                + "PRIMARY KEY ((peer), peer_port))")
    +                .build();
     
         private static final TableMetadata RangeXfers =
             parse(RANGE_XFERS,
    -              "ranges requested for transfer",
    -              "CREATE TABLE %s ("
    -              + "token_bytes blob,"
    -              + "requested_at timestamp,"
    -              + "PRIMARY KEY ((token_bytes)))")
    -              .build();
    +                "ranges requested for transfer",
    +                "CREATE TABLE %s ("
    +                + "token_bytes blob,"
    +                + "requested_at timestamp,"
    +                + "PRIMARY KEY ((token_bytes)))")
    +                .build();
     
         private static final TableMetadata CompactionHistory =
             parse(COMPACTION_HISTORY,
    -              "week-long compaction history",
    -              "CREATE TABLE %s ("
    -              + "id uuid,"
    -              + "bytes_in bigint,"
    -              + "bytes_out bigint,"
    -              + "columnfamily_name text,"
    -              + "compacted_at timestamp,"
    -              + "keyspace_name text,"
    -              + "rows_merged map<int, bigint>,"
    -              + "PRIMARY KEY ((id)))")
    -              .defaultTimeToLive((int) TimeUnit.DAYS.toSeconds(7))
    -              .build();
    +                "week-long compaction history",
    +                "CREATE TABLE %s ("
    +                + "id uuid,"
    +                + "bytes_in bigint,"
    +                + "bytes_out bigint,"
    +                + "columnfamily_name text,"
    +                + "compacted_at timestamp,"
    +                + "keyspace_name text,"
    +                + "rows_merged map<int, bigint>,"
    +                + "PRIMARY KEY ((id)))")
    +                .defaultTimeToLive((int) TimeUnit.DAYS.toSeconds(7))
    +                .build();
     
         private static final TableMetadata SSTableActivity =
             parse(SSTABLE_ACTIVITY,
    -              "historic sstable read rates",
    -              "CREATE TABLE %s ("
    -              + "keyspace_name text,"
    -              + "columnfamily_name text,"
    -              + "generation int,"
    -              + "rate_120m double,"
    -              + "rate_15m double,"
    -              + "PRIMARY KEY ((keyspace_name, columnfamily_name, generation)))")
    -              .build();
    +                "historic sstable read rates",
    +                "CREATE TABLE %s ("
    +                + "keyspace_name text,"
    +                + "columnfamily_name text,"
    +                + "generation int,"
    +                + "rate_120m double,"
    +                + "rate_15m double,"
    +                + "PRIMARY KEY ((keyspace_name, columnfamily_name, generation)))")
    +                .build();
     
         private static final TableMetadata SizeEstimates =
             parse(SIZE_ESTIMATES,
    -              "per-table primary range size estimates",
    -              "CREATE TABLE %s ("
    -              + "keyspace_name text,"
    -              + "table_name text,"
    -              + "range_start text,"
    -              + "range_end text,"
    -              + "mean_partition_size bigint,"
    -              + "partitions_count bigint,"
    -              + "PRIMARY KEY ((keyspace_name), table_name, range_start, range_end))")
    -              .build();
    +                "per-table primary range size estimates",
    +                "CREATE TABLE %s ("
    +                + "keyspace_name text,"
    +                + "table_name text,"
    +                + "range_start text,"
    +                + "range_end text,"
    +                + "mean_partition_size bigint,"
    +                + "partitions_count bigint,"
    +                + "PRIMARY KEY ((keyspace_name), table_name, range_start, range_end))")
    +                .gcGraceSeconds(0)
    --- End diff --
    
    This is also not supposed to be here anymore.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r160520881
  
    --- Diff: src/java/org/apache/cassandra/net/CompactEndpointSerializationHelper.java ---
    @@ -21,28 +21,142 @@
     import java.net.Inet4Address;
     import java.net.Inet6Address;
     import java.net.InetAddress;
    +import java.nio.ByteBuffer;
     
    -public class CompactEndpointSerializationHelper
    +import org.apache.cassandra.io.IVersionedSerializer;
    +import org.apache.cassandra.io.util.DataInputBuffer;
    +import org.apache.cassandra.io.util.DataInputPlus;
    +import org.apache.cassandra.io.util.DataOutputPlus;
    +import org.apache.cassandra.locator.InetAddressAndPort;
    +import org.apache.cassandra.streaming.messages.StreamMessage;
    +
    +/*
    + * As of version 4.0 the endpoint description includes a port number as an unsigned short
    + */
    +public class CompactEndpointSerializationHelper implements IVersionedSerializer<InetAddressAndPort>
     {
    -    public static void serialize(InetAddress endpoint, DataOutput out) throws IOException
    +    public static final IVersionedSerializer<InetAddressAndPort> instance = new CompactEndpointSerializationHelper();
    +
    +    /**
    +     * Streaming uses it's own version numbering so we need to map those versions to the versions used my regular messaging.
    +     * There are only two variants of the serialization currently so a simple mapping around pre vs post 4.0 is fine.
    +     */
    +    public static final IVersionedSerializer<InetAddressAndPort> streamingInstance = new IVersionedSerializer<InetAddressAndPort>()
         {
    -        byte[] buf = endpoint.getAddress();
    -        out.writeByte(buf.length);
    -        out.write(buf);
    +        public void serialize(InetAddressAndPort inetAddressAndPort, DataOutputPlus out, int version) throws IOException
    +        {
    +            if (version < StreamMessage.VERSION_40)
    +            {
    +                instance.serialize(inetAddressAndPort, out, MessagingService.VERSION_30);
    +            }
    +            else
    +            {
    +                instance.serialize(inetAddressAndPort, out, MessagingService.VERSION_40);
    +            }
    +        }
    +
    +        public InetAddressAndPort deserialize(DataInputPlus in, int version) throws IOException
    +        {
    +            if (version < StreamMessage.VERSION_40)
    +            {
    +                return instance.deserialize(in, MessagingService.VERSION_30);
    +            }
    +            else
    +            {
    +                return instance.deserialize(in, MessagingService.VERSION_40);
    +            }
    +        }
    +
    +        public long serializedSize(InetAddressAndPort inetAddressAndPort, int version)
    +        {
    +            if (version < StreamMessage.VERSION_40)
    +            {
    +                return instance.serializedSize(inetAddressAndPort, MessagingService.VERSION_30);
    +            }
    +            else
    +            {
    +                return instance.serializedSize(inetAddressAndPort, MessagingService.VERSION_40);
    +            }
    +        }
    +    };
    +
    +    private CompactEndpointSerializationHelper() {}
    +
    +    public void serialize(InetAddressAndPort endpoint, DataOutputPlus out, int version) throws IOException
    +    {
    +        if (version >= MessagingService.VERSION_40)
    +        {
    +            byte[] buf = endpoint.address.getAddress();
    --- End diff --
    
    Since serializing/deserializing IP addrs is on the hot path (literally used everywhere for internode messaging), I've long wondered if we can avoid the byte array allocation and instead write to an int (for IPv4) or two longs (for IPv6) as those would be stack allocated, and thus no extra garbage. We could choose to pass the `DataOutputPlus` to a new function on  `InetAddressAndPort`, as well.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r160285588
  
    --- Diff: src/java/org/apache/cassandra/db/SystemKeyspace.java ---
    @@ -607,39 +672,65 @@ public static long getTruncatedAt(TableId id)
         /**
          * Record tokens being used by another node
          */
    -    public static synchronized void updateTokens(InetAddress ep, Collection<Token> tokens)
    +    public static synchronized void updateTokens(InetAddressAndPort ep, Collection<Token> tokens)
         {
    -        if (ep.equals(FBUtilities.getBroadcastAddress()))
    +        if (ep.equals(FBUtilities.getBroadcastAddressAndPort()))
                 return;
     
             String req = "INSERT INTO system.%s (peer, tokens) VALUES (?, ?)";
    -        executeInternal(format(req, PEERS), ep, tokensAsSet(tokens));
    +        executeInternal(String.format(req, LEGACY_PEERS), ep.address, tokensAsSet(tokens));
    +        req = "INSERT INTO system.%s (peer, peer_port, tokens) VALUES (?, ?, ?)";
    +        executeInternal(String.format(req, PEERS_V2), ep.address, ep.port, tokensAsSet(tokens));
         }
     
    -    public static synchronized void updatePreferredIP(InetAddress ep, InetAddress preferred_ip)
    +    public static synchronized void updatePreferredIP(InetAddressAndPort ep, InetAddressAndPort preferred_ip)
         {
             if (getPreferredIP(ep) == preferred_ip)
                 return;
     
             String req = "INSERT INTO system.%s (peer, preferred_ip) VALUES (?, ?)";
    -        executeInternal(format(req, PEERS), ep, preferred_ip);
    -        forceBlockingFlush(PEERS);
    +        executeInternal(String.format(req, LEGACY_PEERS), ep.address, preferred_ip.address);
    +        forceBlockingFlush(LEGACY_PEERS);
    --- End diff --
    
    There are several methods on this class which call `forceBlockingFlush` for two tables. Perhaps to reduce the time between each flush, we can perform both `executeInternal` calls, then overload `forceBlockingFlush` to block for multiple flushes. wdyt?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r163085375
  
    --- Diff: src/java/org/apache/cassandra/net/MessageDeliveryTask.java ---
    @@ -96,19 +98,11 @@ private void handleFailure(Throwable t)
             if (message.doCallbackOnFailure())
             {
                 MessageOut response = new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE)
    -                                                .withParameter(MessagingService.FAILURE_RESPONSE_PARAM, MessagingService.ONE_BYTE);
    +                                                .withParameter(ParameterType.FAILURE_RESPONSE, MessagingService.ONE_BYTE);
     
                 if (t instanceof TombstoneOverwhelmingException)
                 {
    -                try (DataOutputBuffer out = new DataOutputBuffer())
    -                {
    -                    out.writeShort(RequestFailureReason.READ_TOO_MANY_TOMBSTONES.code);
    -                    response = response.withParameter(MessagingService.FAILURE_REASON_PARAM, out.getData());
    -                }
    -                catch (IOException ex)
    -                {
    -                    throw new RuntimeException(ex);
    -                }
    +                response = response.withParameter(ParameterType.FAILURE_REASON.FAILURE_REASON, Shorts.checkedCast(RequestFailureReason.READ_TOO_MANY_TOMBSTONES.code));
    --- End diff --
    
    It's just a bug in the code that happens to compile and work (with a warning). I'll remove the extra one.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r160276907
  
    --- Diff: src/java/org/apache/cassandra/db/LegacySystemKeyspaceMigrator.java ---
    @@ -0,0 +1,183 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.cassandra.db;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.cassandra.config.DatabaseDescriptor;
    +import org.apache.cassandra.schema.SchemaConstants;
    +import org.apache.cassandra.cql3.QueryProcessor;
    +import org.apache.cassandra.cql3.UntypedResultSet;
    +import org.apache.cassandra.db.marshal.BytesType;
    +import org.apache.cassandra.db.marshal.Int32Type;
    +import org.apache.cassandra.db.marshal.UTF8Type;
    +import org.apache.cassandra.db.marshal.UUIDType;
    +
    +/**
    + * Migrate 3.0 versions of some tables to 4.0. In this case it's just extra columns and some keys
    + * that are changed.
    + *
    + * Can't just add the additional columns because they are primary key columns and C* doesn't support changing
    + * key columns even if it's just clustering columns.
    + */
    +public class LegacySystemKeyspaceMigrator
    --- End diff --
    
    trivial nit: maybe rename to `SystemKeyspaceMigrator40` to indicate this is for 3.0/3.x -> 4.0 only, and that we drop after 4.0. I guess it's 'legacy' that's a little vague to me.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r163106555
  
    --- Diff: src/java/org/apache/cassandra/streaming/StreamResultFuture.java ---
    @@ -135,11 +136,13 @@ public StreamCoordinator getCoordinator()
             return coordinator;
         }
     
    -    private void attachConnection(InetAddress from, int sessionIndex, Channel channel)
    +    private void attachConnection(InetAddressAndPort from, int sessionIndex, Channel channel)
         {
             SocketAddress addr = channel.remoteAddress();
    -        InetAddress connecting = (addr instanceof InetSocketAddress ? ((InetSocketAddress) addr).getAddress() : from);
    -        StreamSession session = coordinator.getOrCreateSessionById(from, sessionIndex, connecting);
    +        InetAddress connecting = (addr instanceof InetSocketAddress ? ((InetSocketAddress) addr).getAddress() : from.address);
    +        //Need to turn connecting into a InetAddressAndPort with the correct port. I think getting the port from "from"
    +        //Will work since we don't actually have ports diverge across network interfaces
    +        StreamSession session = coordinator.getOrCreateSessionById(from, sessionIndex, InetAddressAndPort.getByAddressOverrideDefaults(connecting, from.port));
    --- End diff --
    
    This doesn't look right? Your using the ephemeral port from SocketChannel.remoteAddress()? That's not a useful port number for anything. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r160490410
  
    --- Diff: src/java/org/apache/cassandra/gms/Gossiper.java ---
    @@ -319,14 +323,25 @@ public long getEndpointDowntime(InetAddress ep)
                 return 0L;
         }
     
    -    private boolean isShutdown(InetAddress endpoint)
    +    private boolean isShutdown(InetAddressAndPort endpoint)
         {
             EndpointState epState = endpointStateMap.get(endpoint);
             if (epState == null)
    +        {
                 return false;
    -        if (epState.getApplicationState(ApplicationState.STATUS) == null)
    +        }
    +
    +        VersionedValue versionedValue = epState.getApplicationState(ApplicationState.STATUS_WITH_PORT);
    +        if (versionedValue == null)
    +        {
    +            versionedValue = epState.getApplicationState(ApplicationState.STATUS);
    +        }
    +        if (versionedValue == null)
    --- End diff --
    
    this null check can be moved into the block right above, that way it's a bit easier to read/reason about.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r163087311
  
    --- Diff: src/java/org/apache/cassandra/net/MessageOut.java ---
    @@ -133,11 +139,13 @@ public MessageOut(InetAddress from, MessagingService.Verb verb, T payload, IVers
             this.parameters = parameters;
         }
     
    -    public MessageOut<T> withParameter(String key, byte[] value)
    +    public <VT> MessageOut<T> withParameter(ParameterType type, VT value)
         {
    -        ImmutableMap.Builder<String, byte[]> builder = ImmutableMap.builder();
    -        builder.putAll(parameters).put(key, value);
    -        return new MessageOut<T>(verb, payload, serializer, builder.build());
    +        List<Object> newParameters = new ArrayList<>(parameters.size() + 3);
    --- End diff --
    
    I think tuple size used to be 3. I'll fix it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r163086498
  
    --- Diff: src/java/org/apache/cassandra/net/MessageIn.java ---
    @@ -89,37 +88,39 @@ private MessageIn(InetAddress from,
     
         public static <T2> MessageIn<T2> read(DataInputPlus in, int version, int id, long constructionTime) throws IOException
         {
    -        InetAddress from = CompactEndpointSerializationHelper.deserialize(in);
    +        InetAddressAndPort from = CompactEndpointSerializationHelper.instance.deserialize(in, version);
     
             MessagingService.Verb verb = MessagingService.Verb.fromId(in.readInt());
    -        Map<String, byte[]> parameters = readParameters(in);
    +        Map<ParameterType, Object> parameters = readParameters(in, version);
             int payloadSize = in.readInt();
             return read(in, version, id, constructionTime, from, payloadSize, verb, parameters);
         }
     
    -    public static Map<String, byte[]> readParameters(DataInputPlus in) throws IOException
    +    public static Map<ParameterType, Object> readParameters(DataInputPlus in, int version) throws IOException
         {
             int parameterCount = in.readInt();
    +        Map<ParameterType, Object> parameters;
             if (parameterCount == 0)
             {
                 return Collections.emptyMap();
             }
             else
             {
    -            ImmutableMap.Builder<String, byte[]> builder = ImmutableMap.builder();
    +            ImmutableMap.Builder<ParameterType, Object> builder = ImmutableMap.builder();
                 for (int i = 0; i < parameterCount; i++)
                 {
                     String key = in.readUTF();
    +                ParameterType type = ParameterType.byName.get(key);
    --- End diff --
    
    The only place this would matter is if we blindly forward message parameters somewhere.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r160540192
  
    --- Diff: src/java/org/apache/cassandra/net/MessageOut.java ---
    @@ -80,12 +81,17 @@
     public class MessageOut<T>
     {
         private static final int SERIALIZED_SIZE_VERSION_UNDEFINED = -1;
    +    public static final int PARAMETER_TUPLE_SIZE = 2;
    +    public static final int PARAMETER_TUPLE_TYPE_OFFSET = 0;
    +    public static final int PARAMETER_TUPLE_PARAMETER_OFFSET = 1;
     
    -    public final InetAddress from;
    +    public final InetAddressAndPort from;
         public final MessagingService.Verb verb;
         public final T payload;
         public final IVersionedSerializer<T> serializer;
    -    public final Map<String, byte[]> parameters;
    +    //A list of pairs, first object is the ParameterType enum,
    --- End diff --
    
    the wording "list of pairs" threw me off as I started thinking "list of Pair<>s". Not sure I have a better wording off the top of my head, but maybe modify this?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r160523743
  
    --- Diff: src/java/org/apache/cassandra/net/CompactEndpointSerializationHelper.java ---
    @@ -21,28 +21,142 @@
     import java.net.Inet4Address;
     import java.net.Inet6Address;
     import java.net.InetAddress;
    +import java.nio.ByteBuffer;
     
    -public class CompactEndpointSerializationHelper
    +import org.apache.cassandra.io.IVersionedSerializer;
    +import org.apache.cassandra.io.util.DataInputBuffer;
    +import org.apache.cassandra.io.util.DataInputPlus;
    +import org.apache.cassandra.io.util.DataOutputPlus;
    +import org.apache.cassandra.locator.InetAddressAndPort;
    +import org.apache.cassandra.streaming.messages.StreamMessage;
    +
    +/*
    + * As of version 4.0 the endpoint description includes a port number as an unsigned short
    + */
    +public class CompactEndpointSerializationHelper implements IVersionedSerializer<InetAddressAndPort>
     {
    -    public static void serialize(InetAddress endpoint, DataOutput out) throws IOException
    +    public static final IVersionedSerializer<InetAddressAndPort> instance = new CompactEndpointSerializationHelper();
    +
    +    /**
    +     * Streaming uses it's own version numbering so we need to map those versions to the versions used my regular messaging.
    +     * There are only two variants of the serialization currently so a simple mapping around pre vs post 4.0 is fine.
    +     */
    +    public static final IVersionedSerializer<InetAddressAndPort> streamingInstance = new IVersionedSerializer<InetAddressAndPort>()
         {
    -        byte[] buf = endpoint.getAddress();
    -        out.writeByte(buf.length);
    -        out.write(buf);
    +        public void serialize(InetAddressAndPort inetAddressAndPort, DataOutputPlus out, int version) throws IOException
    +        {
    +            if (version < StreamMessage.VERSION_40)
    +            {
    +                instance.serialize(inetAddressAndPort, out, MessagingService.VERSION_30);
    +            }
    +            else
    +            {
    +                instance.serialize(inetAddressAndPort, out, MessagingService.VERSION_40);
    +            }
    +        }
    +
    +        public InetAddressAndPort deserialize(DataInputPlus in, int version) throws IOException
    +        {
    +            if (version < StreamMessage.VERSION_40)
    +            {
    +                return instance.deserialize(in, MessagingService.VERSION_30);
    +            }
    +            else
    +            {
    +                return instance.deserialize(in, MessagingService.VERSION_40);
    +            }
    +        }
    +
    +        public long serializedSize(InetAddressAndPort inetAddressAndPort, int version)
    +        {
    +            if (version < StreamMessage.VERSION_40)
    +            {
    +                return instance.serializedSize(inetAddressAndPort, MessagingService.VERSION_30);
    +            }
    +            else
    +            {
    +                return instance.serializedSize(inetAddressAndPort, MessagingService.VERSION_40);
    +            }
    +        }
    +    };
    +
    +    private CompactEndpointSerializationHelper() {}
    +
    +    public void serialize(InetAddressAndPort endpoint, DataOutputPlus out, int version) throws IOException
    +    {
    +        if (version >= MessagingService.VERSION_40)
    +        {
    +            byte[] buf = endpoint.address.getAddress();
    +            out.writeByte(buf.length + 2);
    +            out.write(buf);
    +            out.writeShort(endpoint.port);
    +        }
    +        else
    +        {
    +            byte[] buf = endpoint.address.getAddress();
    +            out.writeByte(buf.length);
    +            out.write(buf);
    +        }
         }
     
    -    public static InetAddress deserialize(DataInput in) throws IOException
    +    public InetAddressAndPort deserialize(DataInputPlus in, int version) throws IOException
         {
    -        byte[] bytes = new byte[in.readByte()];
    -        in.readFully(bytes, 0, bytes.length);
    -        return InetAddress.getByAddress(bytes);
    +        int size = in.readByte() & 0xFF;
    +        switch(size)
    +        {
    +            //The original pre-4.0 serialiation of just an address
    +            case 4:
    +            case 16:
    +            {
    +                byte[] bytes = new byte[size];
    --- End diff --
    
    similar to my comment in the `serialize()` method, it would be cool if we can avoid allocating the byte array (on the heap). 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r163421217
  
    --- Diff: src/java/org/apache/cassandra/gms/ApplicationState.java ---
    @@ -19,24 +19,25 @@
     
     public enum ApplicationState
     {
    -    STATUS,
    +    @Deprecated STATUS, //Deprecated and unsued in 4.0, stop publishing in 5.0, reclaim in 6.0
         LOAD,
         SCHEMA,
         DC,
         RACK,
         RELEASE_VERSION,
         REMOVAL_COORDINATOR,
    -    INTERNAL_IP,
    -    RPC_ADDRESS,
    +    @Deprecated INTERNAL_IP, //Deprecated and unused in 4.0, stop publishing in 5.0, reclaim in 6.0
    +    @Deprecated RPC_ADDRESS, // ^ Same
         X_11_PADDING, // padding specifically for 1.1
         SEVERITY,
         NET_VERSION,
         HOST_ID,
         TOKENS,
         RPC_READY,
         // pad to allow adding new states to existing cluster
    -    X1,
    -    X2,
    +    INTERNAL_ADDRESS_AND_PORT, //Replacement for INTERNAL_IP with up to two ports
    +    NATIVE_ADDRESS_AND_PORT, //Replacement for RPC_ADDRESS
    --- End diff --
    
    ughh, so I was completely wrong wrt the RPC_ADDRESS/NATIVE_ADDRESS thing. i misread this line:
    ```java
        appStates.put(ApplicationState.RPC_ADDRESS, valueFactory.rpcaddress(FBUtilities.getBroadcastRpcAddress()));
    ```
    as this:
    ```java
        appStates.put(ApplicationState.RPC_ADDRESS, valueFactory.rpcaddress(FBUtilities.getBroadcastAddress()));
    ```
    
    (`getBroadcastRpcAddress` vs `getBroadcastAddress`) when we do the assignment in StorageService.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r163084819
  
    --- Diff: src/java/org/apache/cassandra/net/CompactEndpointSerializationHelper.java ---
    @@ -21,28 +21,142 @@
     import java.net.Inet4Address;
     import java.net.Inet6Address;
     import java.net.InetAddress;
    +import java.nio.ByteBuffer;
     
    -public class CompactEndpointSerializationHelper
    +import org.apache.cassandra.io.IVersionedSerializer;
    +import org.apache.cassandra.io.util.DataInputBuffer;
    +import org.apache.cassandra.io.util.DataInputPlus;
    +import org.apache.cassandra.io.util.DataOutputPlus;
    +import org.apache.cassandra.locator.InetAddressAndPort;
    +import org.apache.cassandra.streaming.messages.StreamMessage;
    +
    +/*
    + * As of version 4.0 the endpoint description includes a port number as an unsigned short
    + */
    +public class CompactEndpointSerializationHelper implements IVersionedSerializer<InetAddressAndPort>
     {
    -    public static void serialize(InetAddress endpoint, DataOutput out) throws IOException
    +    public static final IVersionedSerializer<InetAddressAndPort> instance = new CompactEndpointSerializationHelper();
    +
    +    /**
    +     * Streaming uses it's own version numbering so we need to map those versions to the versions used my regular messaging.
    +     * There are only two variants of the serialization currently so a simple mapping around pre vs post 4.0 is fine.
    +     */
    +    public static final IVersionedSerializer<InetAddressAndPort> streamingInstance = new IVersionedSerializer<InetAddressAndPort>()
         {
    -        byte[] buf = endpoint.getAddress();
    -        out.writeByte(buf.length);
    -        out.write(buf);
    +        public void serialize(InetAddressAndPort inetAddressAndPort, DataOutputPlus out, int version) throws IOException
    +        {
    +            if (version < StreamMessage.VERSION_40)
    +            {
    +                instance.serialize(inetAddressAndPort, out, MessagingService.VERSION_30);
    +            }
    +            else
    +            {
    +                instance.serialize(inetAddressAndPort, out, MessagingService.VERSION_40);
    +            }
    +        }
    +
    +        public InetAddressAndPort deserialize(DataInputPlus in, int version) throws IOException
    +        {
    +            if (version < StreamMessage.VERSION_40)
    +            {
    +                return instance.deserialize(in, MessagingService.VERSION_30);
    +            }
    +            else
    +            {
    +                return instance.deserialize(in, MessagingService.VERSION_40);
    +            }
    +        }
    +
    +        public long serializedSize(InetAddressAndPort inetAddressAndPort, int version)
    +        {
    +            if (version < StreamMessage.VERSION_40)
    +            {
    +                return instance.serializedSize(inetAddressAndPort, MessagingService.VERSION_30);
    +            }
    +            else
    +            {
    +                return instance.serializedSize(inetAddressAndPort, MessagingService.VERSION_40);
    +            }
    +        }
    +    };
    +
    +    private CompactEndpointSerializationHelper() {}
    +
    +    public void serialize(InetAddressAndPort endpoint, DataOutputPlus out, int version) throws IOException
    +    {
    +        if (version >= MessagingService.VERSION_40)
    +        {
    +            byte[] buf = endpoint.address.getAddress();
    +            out.writeByte(buf.length + 2);
    +            out.write(buf);
    +            out.writeShort(endpoint.port);
    +        }
    +        else
    +        {
    +            byte[] buf = endpoint.address.getAddress();
    +            out.writeByte(buf.length);
    +            out.write(buf);
    +        }
         }
     
    -    public static InetAddress deserialize(DataInput in) throws IOException
    +    public InetAddressAndPort deserialize(DataInputPlus in, int version) throws IOException
         {
    -        byte[] bytes = new byte[in.readByte()];
    -        in.readFully(bytes, 0, bytes.length);
    -        return InetAddress.getByAddress(bytes);
    +        int size = in.readByte() & 0xFF;
    +        switch(size)
    +        {
    +            //The original pre-4.0 serialiation of just an address
    +            case 4:
    +            case 16:
    +            {
    +                byte[] bytes = new byte[size];
    +                in.readFully(bytes, 0, bytes.length);
    +                return InetAddressAndPort.getByAddress(bytes);
    +            }
    +            //Address and one port
    +            case 6:
    +            case 18:
    +            {
    +                byte[] bytes = new byte[size - 2];
    +                in.readFully(bytes);
    +
    +                int port = in.readShort() & 0xFFFF;
    +                return InetAddressAndPort.getByAddressOverrideDefaults(InetAddress.getByAddress(bytes), port);
    +            }
    +            default:
    +                throw new AssertionError("Unexpected size " + size);
    +
    +        }
         }
     
    -    public static int serializedSize(InetAddress from)
    +    public long serializedSize(InetAddressAndPort from, int version)
         {
    -        if (from instanceof Inet4Address)
    -            return 1 + 4;
    -        assert from instanceof Inet6Address;
    -        return 1 + 16;
    +        //4.0 includes a port number
    +        if (version >= MessagingService.VERSION_40)
    +        {
    +            if (from.address instanceof Inet4Address)
    +                return 1 + 4 + 2;
    +            assert from.address instanceof Inet6Address;
    +            return 1 + 16 + 2;
    +        }
    +        else
    +        {
    +            if (from.address instanceof Inet4Address)
    +                return 1 + 4;
    +            assert from.address instanceof Inet6Address;
    +            return 1 + 16;
    +        }
         }
    +
    +    public static InetAddressAndPort fromBytes(ByteBuffer buffer, int version)
    --- End diff --
    
    You are right. Removed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r163056742
  
    --- Diff: src/java/org/apache/cassandra/db/LegacySystemKeyspaceMigrator.java ---
    @@ -0,0 +1,183 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.cassandra.db;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.cassandra.config.DatabaseDescriptor;
    +import org.apache.cassandra.schema.SchemaConstants;
    +import org.apache.cassandra.cql3.QueryProcessor;
    +import org.apache.cassandra.cql3.UntypedResultSet;
    +import org.apache.cassandra.db.marshal.BytesType;
    +import org.apache.cassandra.db.marshal.Int32Type;
    +import org.apache.cassandra.db.marshal.UTF8Type;
    +import org.apache.cassandra.db.marshal.UUIDType;
    +
    +/**
    + * Migrate 3.0 versions of some tables to 4.0. In this case it's just extra columns and some keys
    + * that are changed.
    + *
    + * Can't just add the additional columns because they are primary key columns and C* doesn't support changing
    + * key columns even if it's just clustering columns.
    + */
    +public class LegacySystemKeyspaceMigrator
    +{
    +    static final String legacyPeersName = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.LEGACY_PEERS);
    +    static final String peersName = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.PEERS_V2);
    +    static final String legacyPeerEventsName = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.LEGACY_PEER_EVENTS);
    +    static final String peerEventsName = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.PEER_EVENTS_V2);
    +    static final String legacyTransferredRangesName = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.LEGACY_TRANSFERRED_RANGES);
    +    static final String transferredRangesName = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.TRANSFERRED_RANGES_V2);
    +
    +    private static final Logger logger = LoggerFactory.getLogger(LegacySystemKeyspaceMigrator.class);
    +
    +    private LegacySystemKeyspaceMigrator() {}
    +
    +    public static void migrate()
    +    {
    +        migratePeers();
    +        migratePeerEvents();
    +        migrateLegacyTransferredRanges();
    +    }
    +
    +    private static void migratePeers()
    +    {
    +        ColumnFamilyStore newPeers = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(SystemKeyspace.PEERS_V2);
    +
    +        if (!newPeers.isEmpty())
    +             return;
    +
    +        logger.info("{} table was empty, migrating legacy {}, if this fails you should fix the issue and then truncate {} to have it try again.",
    +                                  peersName, legacyPeersName, peersName);
    +
    +        String query = String.format("SELECT * FROM %s",
    +                                     legacyPeersName);
    +
    +        String insert = String.format("INSERT INTO %s ( "
    +                                      + "peer, "
    +                                      + "peer_port, "
    +                                      + "data_center, "
    +                                      + "host_id, "
    +                                      + "preferred_ip, "
    +                                      + "preferred_port, "
    +                                      + "rack, "
    +                                      + "release_version, "
    +                                      + "native_address, "
    +                                      + "native_port, "
    +                                      + "schema_version, "
    +                                      + "tokens) "
    +                                      + " values ( ?, ?, ? , ? , ?, ?, ?, ?, ?, ?, ?, ?)",
    +                                      peersName);
    +
    +        UntypedResultSet rows = QueryProcessor.executeInternalWithPaging(query, 1000);
    +        int transferred = 0;
    +        for (UntypedResultSet.Row row : rows)
    +        {
    +            logger.debug("Transferring row {}", transferred);
    --- End diff --
    
    Do we really want to repeat the table name on every line? There is a row at the end that lists the count of rows transferred and the source and destination. I could repeat that without the count before the transfer starts.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r163397924
  
    --- Diff: src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java ---
    @@ -74,48 +74,50 @@ private SystemDistributedKeyspace()
     
         private static final TableMetadata RepairHistory =
    --- End diff --
    
    ughh, i did realize this later, but I though i had updated the comment. oh well, sgtm


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r163404318
  
    --- Diff: src/java/org/apache/cassandra/streaming/StreamResultFuture.java ---
    @@ -135,11 +136,13 @@ public StreamCoordinator getCoordinator()
             return coordinator;
         }
     
    -    private void attachConnection(InetAddress from, int sessionIndex, Channel channel)
    +    private void attachConnection(InetAddressAndPort from, int sessionIndex, Channel channel)
         {
             SocketAddress addr = channel.remoteAddress();
    -        InetAddress connecting = (addr instanceof InetSocketAddress ? ((InetSocketAddress) addr).getAddress() : from);
    -        StreamSession session = coordinator.getOrCreateSessionById(from, sessionIndex, connecting);
    +        InetAddress connecting = (addr instanceof InetSocketAddress ? ((InetSocketAddress) addr).getAddress() : from.address);
    +        //Need to turn connecting into a InetAddressAndPort with the correct port. I think getting the port from "from"
    +        //Will work since we don't actually have ports diverge across network interfaces
    +        StreamSession session = coordinator.getOrCreateSessionById(from, sessionIndex, InetAddressAndPort.getByAddressOverrideDefaults(connecting, from.port));
    --- End diff --
    
    Yes are correct; I made a mistake. We should keep your code, but can you add my comment: `In the case of unit tests, if you use the EmbeddedChannel, channel.remoteAddress() does not return an InetSocketAddress, but an EmbeddedSocketAddress. Hence why we need the type check here`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r160539548
  
    --- Diff: src/java/org/apache/cassandra/net/MessageOut.java ---
    @@ -80,12 +81,17 @@
     public class MessageOut<T>
     {
         private static final int SERIALIZED_SIZE_VERSION_UNDEFINED = -1;
    +    public static final int PARAMETER_TUPLE_SIZE = 2;
    --- End diff --
    
    Adding some comments would be helpful, esp how these constants relate to the member field `parameters`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r161921935
  
    --- Diff: src/java/org/apache/cassandra/streaming/StreamResultFuture.java ---
    @@ -135,11 +136,13 @@ public StreamCoordinator getCoordinator()
             return coordinator;
         }
     
    -    private void attachConnection(InetAddress from, int sessionIndex, Channel channel)
    +    private void attachConnection(InetAddressAndPort from, int sessionIndex, Channel channel)
         {
             SocketAddress addr = channel.remoteAddress();
    -        InetAddress connecting = (addr instanceof InetSocketAddress ? ((InetSocketAddress) addr).getAddress() : from);
    -        StreamSession session = coordinator.getOrCreateSessionById(from, sessionIndex, connecting);
    +        InetAddress connecting = (addr instanceof InetSocketAddress ? ((InetSocketAddress) addr).getAddress() : from.address);
    +        //Need to turn connecting into a InetAddressAndPort with the correct port. I think getting the port from "from"
    +        //Will work since we don't actually have ports diverge across network interfaces
    +        StreamSession session = coordinator.getOrCreateSessionById(from, sessionIndex, InetAddressAndPort.getByAddressOverrideDefaults(connecting, from.port));
    --- End diff --
    
    well, because I didn't document this (at all), it's hard to know why the hell I have to check the `addr` coming from the `channel`. Shame on me.
    
    In the case of unit tests, if you use the `EmbeddedChannel`, `channel.remoteAddress()` does not return an `InetSocketAddress`, but an `EmbeddedSocketAddress`. I think the best thing to do here is:
    
    ```java
    private void attachConnection(InetAddressAndPort from, int sessionIndex, Channel channel)
    {
    SocketAddress addr = channel.remoteAddress();
    final InetAddressAndPort connecting;
    if (addr instanceof InetSocketAddress)
    {
        InetSocketAddress address = (InetSocketAddress)addr;
        connecting = InetAddressAndPort.getByAddressOverrideDefaults(address.getAddress(), address.getPort());
    }
    else
    {
        // assumably the addr is an EmbeddedSocketAddress, and we only get that when running unit tests where channel is an instance of EmbeddedChannel. In that case, it's safe to simply use the "from" parameter.
        connecting = from;
    }
    StreamSession session = coordinator.getOrCreateSessionById(from, sessionIndex, connecting);
    }
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown closed the pull request at:

    https://github.com/apache/cassandra/pull/184


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r163396452
  
    --- Diff: src/java/org/apache/cassandra/hadoop/ConfigHelper.java ---
    @@ -349,6 +350,16 @@ public static String getOutputInitialAddress(Configuration conf)
             return conf.get(OUTPUT_INITIAL_ADDRESS);
         }
     
    +    public static void setOutputInitialPort(Configuration conf, Integer port)
    +    {
    +        conf.set(OUTPUT_INITIAL_PORT, port.toString());
    +    }
    +
    +    public static Integer getOutputInitialPort(Configuration conf)
    +    {
    +        return Integer.valueOf(conf.get(OUTPUT_INITIAL_PORT, "7000"));
    --- End diff --
    
    sgtm. pretty nit: maybe make it a constant, instead of looking like a magic number?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r160288035
  
    --- Diff: src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java ---
    @@ -40,6 +40,7 @@
     import org.apache.cassandra.config.ParameterizedClass;
     import org.apache.cassandra.exceptions.ConfigurationException;
     import org.apache.cassandra.io.FSReadError;
    +import org.apache.cassandra.io.util.Memory;
    --- End diff --
    
    petty nit: unused import


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r160496553
  
    --- Diff: src/java/org/apache/cassandra/hadoop/ConfigHelper.java ---
    @@ -349,6 +350,16 @@ public static String getOutputInitialAddress(Configuration conf)
             return conf.get(OUTPUT_INITIAL_ADDRESS);
         }
     
    +    public static void setOutputInitialPort(Configuration conf, Integer port)
    +    {
    +        conf.set(OUTPUT_INITIAL_PORT, port.toString());
    +    }
    +
    +    public static Integer getOutputInitialPort(Configuration conf)
    +    {
    +        return Integer.valueOf(conf.get(OUTPUT_INITIAL_PORT, "7000"));
    --- End diff --
    
    Is this the correct thing to do, hard code `7000`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r160533135
  
    --- Diff: src/java/org/apache/cassandra/net/ForwardToContainer.java ---
    @@ -0,0 +1,40 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.cassandra.net;
    +
    +import java.util.Collection;
    +
    +import org.apache.cassandra.locator.InetAddressAndPort;
    +
    +/**
    + * Contains forward to information until it can be serialized as part of a message using a version
    + * specific serialization
    + */
    +public class ForwardToContainer
    +{
    +    public final Collection<InetAddressAndPort> targets;
    +    public final int[] messageIds;
    +
    +    public ForwardToContainer(Collection<InetAddressAndPort> targets,
    +                              int[] messageIds)
    +    {
    +        this.targets = targets;
    --- End diff --
    
    Do we need any checking to to ensure that `targets` and `messageIds` have the same number of elements?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r163086238
  
    --- Diff: src/java/org/apache/cassandra/net/MessageIn.java ---
    @@ -89,37 +88,39 @@ private MessageIn(InetAddress from,
     
         public static <T2> MessageIn<T2> read(DataInputPlus in, int version, int id, long constructionTime) throws IOException
         {
    -        InetAddress from = CompactEndpointSerializationHelper.deserialize(in);
    +        InetAddressAndPort from = CompactEndpointSerializationHelper.instance.deserialize(in, version);
     
             MessagingService.Verb verb = MessagingService.Verb.fromId(in.readInt());
    -        Map<String, byte[]> parameters = readParameters(in);
    +        Map<ParameterType, Object> parameters = readParameters(in, version);
             int payloadSize = in.readInt();
             return read(in, version, id, constructionTime, from, payloadSize, verb, parameters);
         }
     
    -    public static Map<String, byte[]> readParameters(DataInputPlus in) throws IOException
    +    public static Map<ParameterType, Object> readParameters(DataInputPlus in, int version) throws IOException
         {
             int parameterCount = in.readInt();
    +        Map<ParameterType, Object> parameters;
             if (parameterCount == 0)
             {
                 return Collections.emptyMap();
             }
             else
             {
    -            ImmutableMap.Builder<String, byte[]> builder = ImmutableMap.builder();
    +            ImmutableMap.Builder<ParameterType, Object> builder = ImmutableMap.builder();
                 for (int i = 0; i < parameterCount; i++)
                 {
                     String key = in.readUTF();
    +                ParameterType type = ParameterType.byName.get(key);
    --- End diff --
    
    You are right we should throw or skip it and not try to use null as a key. Now silently go ahead and skip? It's a useful behavior for a protocol.
    
    But this illustrates a larger issue. The key used to be a string so we could pass around parameters we didn't recognize. Now we can't we can only drop them.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r159255891
  
    --- Diff: src/java/org/apache/cassandra/locator/InetAddressAndPort.java ---
    @@ -0,0 +1,193 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.cassandra.locator;
    +
    +import java.io.Serializable;
    +import java.net.Inet6Address;
    +import java.net.InetAddress;
    +import java.net.UnknownHostException;
    +import java.nio.ByteBuffer;
    +import java.util.regex.Pattern;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.net.HostAndPort;
    +
    +import org.apache.cassandra.config.Config;
    +
    +/**
    + * A class to replace the usage of InetAddress to identify hosts in the cluster.
    + * Opting for a full replacement class so that in the future if we change the nature
    + * of the identifier the refactor will be easier in that we don't have to change the type
    + * just the methods.
    + *
    + * Because an IP might contain multiple C* instances the identification must be done
    + * using the IP + port. InetSocketAddress is undesirable for a couple of reasons. It's not comparable,
    + * it's toString() method doesn't correctly bracket IPv6, it doesn't handle optional default values,
    + * and a couple of other minor behaviors that are slightly less troublesome like handling the
    + * need to sometimes return a port and sometimes not.
    + *
    + */
    +public final class InetAddressAndPort implements Comparable<InetAddressAndPort>, Serializable
    +{
    +    private static final long serialVersionUID = 0;
    +
    +    //Store these here to avoid requiring DatabaseDescriptor to be loaded. DatabaseDescriptor will set
    +    //these when it loads the config. A lot of unit tests won't end up loading DatabaseDescriptor.
    +    //Tools that might use this class also might not load database descriptor. Those tools are expected
    +    //to always override the defaults.
    +    static volatile int defaultPort = 7000;
    +
    +    public final InetAddress address;
    +    public final int port;
    +
    +    private InetAddressAndPort(InetAddress address, int port)
    +    {
    +        Preconditions.checkNotNull(address);
    +        validatePortRange(port);
    +        this.address = address;
    +        this.port = port;
    +    }
    +
    +    private static void validatePortRange(int port)
    +    {
    +        if (port < 0 | port > 65535)
    +        {
    +            throw new IllegalArgumentException("Port " + port + " is not a valid port number in the range 0-65535");
    +        }
    +    }
    +
    +    @Override
    +    public boolean equals(Object o)
    +    {
    +        if (this == o) return true;
    +        if (o == null || getClass() != o.getClass()) return false;
    +
    +        InetAddressAndPort that = (InetAddressAndPort) o;
    +
    +        if (port != that.port) return false;
    +        return address.equals(that.address);
    +    }
    +
    +    @Override
    +    public int hashCode()
    +    {
    +        int result = address.hashCode();
    +        result = 31 * result + port;
    +        return result;
    +    }
    +
    +    @Override
    +    public int compareTo(InetAddressAndPort o)
    +    {
    +        int retval = ByteBuffer.wrap(address.getAddress()).compareTo(ByteBuffer.wrap(o.address.getAddress()));
    --- End diff --
    
    I'm not sure if `compareTo` is on a hot path anywhere, but I think we can avoid allocating the two `ByteBuffer`s by using `FastByteOperations#compareUnsigned`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r163069286
  
    --- Diff: src/java/org/apache/cassandra/locator/InetAddressAndPort.java ---
    @@ -0,0 +1,193 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.cassandra.locator;
    +
    +import java.io.Serializable;
    +import java.net.Inet6Address;
    +import java.net.InetAddress;
    +import java.net.UnknownHostException;
    +import java.nio.ByteBuffer;
    +import java.util.regex.Pattern;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.net.HostAndPort;
    +
    +import org.apache.cassandra.config.Config;
    +
    +/**
    + * A class to replace the usage of InetAddress to identify hosts in the cluster.
    + * Opting for a full replacement class so that in the future if we change the nature
    + * of the identifier the refactor will be easier in that we don't have to change the type
    + * just the methods.
    + *
    + * Because an IP might contain multiple C* instances the identification must be done
    + * using the IP + port. InetSocketAddress is undesirable for a couple of reasons. It's not comparable,
    + * it's toString() method doesn't correctly bracket IPv6, it doesn't handle optional default values,
    + * and a couple of other minor behaviors that are slightly less troublesome like handling the
    + * need to sometimes return a port and sometimes not.
    + *
    + */
    +public final class InetAddressAndPort implements Comparable<InetAddressAndPort>, Serializable
    +{
    +    private static final long serialVersionUID = 0;
    +
    +    //Store these here to avoid requiring DatabaseDescriptor to be loaded. DatabaseDescriptor will set
    +    //these when it loads the config. A lot of unit tests won't end up loading DatabaseDescriptor.
    +    //Tools that might use this class also might not load database descriptor. Those tools are expected
    +    //to always override the defaults.
    +    static volatile int defaultPort = 7000;
    +
    +    public final InetAddress address;
    +    public final int port;
    +
    +    private InetAddressAndPort(InetAddress address, int port)
    +    {
    +        Preconditions.checkNotNull(address);
    +        validatePortRange(port);
    +        this.address = address;
    +        this.port = port;
    +    }
    +
    +    private static void validatePortRange(int port)
    +    {
    +        if (port < 0 | port > 65535)
    +        {
    +            throw new IllegalArgumentException("Port " + port + " is not a valid port number in the range 0-65535");
    +        }
    +    }
    +
    +    @Override
    +    public boolean equals(Object o)
    +    {
    +        if (this == o) return true;
    +        if (o == null || getClass() != o.getClass()) return false;
    +
    +        InetAddressAndPort that = (InetAddressAndPort) o;
    +
    +        if (port != that.port) return false;
    +        return address.equals(that.address);
    +    }
    +
    +    @Override
    +    public int hashCode()
    +    {
    +        int result = address.hashCode();
    +        result = 31 * result + port;
    +        return result;
    +    }
    +
    +    @Override
    +    public int compareTo(InetAddressAndPort o)
    +    {
    +        int retval = ByteBuffer.wrap(address.getAddress()).compareTo(ByteBuffer.wrap(o.address.getAddress()));
    --- End diff --
    
    Yeah it's pretty bad. getAddress() allocates the address byte array. Maybe I should have these store the address array so I don't have to go to address to get it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r163093996
  
    --- Diff: src/java/org/apache/cassandra/tools/LoaderOptions.java ---
    @@ -122,8 +129,11 @@
             EncryptionOptions clientEncOptions = new EncryptionOptions();
             int connectionsPerHost = 1;
             EncryptionOptions.ServerEncryptionOptions serverEncOptions = new EncryptionOptions.ServerEncryptionOptions();
    -        Set<InetAddress> hosts = new HashSet<>();
    -        Set<InetAddress> ignores = new HashSet<>();
    +        Set<InetAddress> hostsArg = new HashSet<>();
    +        Set<InetAddress> ignoresArg = new HashSet<>();
    +        Set<InetSocketAddress> hosts = new HashSet<>();
    --- End diff --
    
    The loader is a client not really part of the database and interacts with the Java driver using InetSocketAddress.
    
    The rule is that an InetSocketAddress should never refer to the storage port. InetAddressAndPort might refer to the client port though in some of the internal code in the server though.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r163083984
  
    --- Diff: src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java ---
    @@ -40,6 +40,7 @@
     import org.apache.cassandra.config.ParameterizedClass;
     import org.apache.cassandra.exceptions.ConfigurationException;
     import org.apache.cassandra.io.FSReadError;
    +import org.apache.cassandra.io.util.Memory;
    --- End diff --
    
    Unused imports aren't petty!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #184: Cassandra 7544 rebased2

Posted by aweisberg <gi...@git.apache.org>.
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r163074165
  
    --- Diff: src/java/org/apache/cassandra/locator/InetAddressAndPort.java ---
    @@ -0,0 +1,193 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.cassandra.locator;
    +
    +import java.io.Serializable;
    +import java.net.Inet6Address;
    +import java.net.InetAddress;
    +import java.net.UnknownHostException;
    +import java.nio.ByteBuffer;
    +import java.util.regex.Pattern;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.net.HostAndPort;
    +
    +import org.apache.cassandra.config.Config;
    +
    +/**
    + * A class to replace the usage of InetAddress to identify hosts in the cluster.
    + * Opting for a full replacement class so that in the future if we change the nature
    + * of the identifier the refactor will be easier in that we don't have to change the type
    + * just the methods.
    + *
    + * Because an IP might contain multiple C* instances the identification must be done
    + * using the IP + port. InetSocketAddress is undesirable for a couple of reasons. It's not comparable,
    + * it's toString() method doesn't correctly bracket IPv6, it doesn't handle optional default values,
    + * and a couple of other minor behaviors that are slightly less troublesome like handling the
    + * need to sometimes return a port and sometimes not.
    + *
    + */
    +public final class InetAddressAndPort implements Comparable<InetAddressAndPort>, Serializable
    +{
    +    private static final long serialVersionUID = 0;
    +
    +    //Store these here to avoid requiring DatabaseDescriptor to be loaded. DatabaseDescriptor will set
    +    //these when it loads the config. A lot of unit tests won't end up loading DatabaseDescriptor.
    +    //Tools that might use this class also might not load database descriptor. Those tools are expected
    +    //to always override the defaults.
    +    static volatile int defaultPort = 7000;
    +
    +    public final InetAddress address;
    +    public final int port;
    +
    +    private InetAddressAndPort(InetAddress address, int port)
    +    {
    +        Preconditions.checkNotNull(address);
    +        validatePortRange(port);
    +        this.address = address;
    +        this.port = port;
    +    }
    +
    +    private static void validatePortRange(int port)
    +    {
    +        if (port < 0 | port > 65535)
    +        {
    +            throw new IllegalArgumentException("Port " + port + " is not a valid port number in the range 0-65535");
    +        }
    +    }
    +
    +    @Override
    +    public boolean equals(Object o)
    +    {
    +        if (this == o) return true;
    +        if (o == null || getClass() != o.getClass()) return false;
    +
    +        InetAddressAndPort that = (InetAddressAndPort) o;
    +
    +        if (port != that.port) return false;
    +        return address.equals(that.address);
    +    }
    +
    +    @Override
    +    public int hashCode()
    +    {
    +        int result = address.hashCode();
    +        result = 31 * result + port;
    +        return result;
    +    }
    +
    +    @Override
    +    public int compareTo(InetAddressAndPort o)
    +    {
    +        int retval = ByteBuffer.wrap(address.getAddress()).compareTo(ByteBuffer.wrap(o.address.getAddress()));
    --- End diff --
    
    Get HostAddress is the same thing.
    
    I think InetAddress implementations for whatever reason choose to trade footprint for allocations. 
    
    For us it's fine to just store the allocation. We don't plan on storing a ton of these I think.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org