You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by jasobrown <gi...@git.apache.org> on 2018/03/12 12:52:55 UTC

[GitHub] cassandra pull request #204: 14115 v1

GitHub user jasobrown opened a pull request:

    https://github.com/apache/cassandra/pull/204

    14115 v1

    Opening a PR for review

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/bdeggleston/cassandra 14115-v1

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/cassandra/pull/204.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #204
    
----
commit f85c9849aeb718f1ace17220366c5915d3b0013b
Author: Blake Eggleston <bd...@...>
Date:   2018-03-04T22:44:30Z

    remaing incoming/outgoing stream messages

commit db7d5177b3cc3ae7a718d53a88753b1af6370077
Author: Blake Eggleston <bd...@...>
Date:   2018-03-04T23:06:01Z

    renaming file header

commit 22c3fbca652169004927cf1033b5ef83c1ff520c
Author: Blake Eggleston <bd...@...>
Date:   2018-03-04T23:27:43Z

    infer 'keepSSTableLevel' from streamOperation

commit 6b64216c7b9451189a096a0b4cae5562d6bd361b
Author: Blake Eggleston <bd...@...>
Date:   2018-03-05T00:22:26Z

    refactoring streamwriter

commit e782e5dac6828bb45d16a530eabb5cd365e98bf8
Author: Blake Eggleston <bd...@...>
Date:   2018-03-05T03:10:49Z

    refactoring streamwriter some more

commit 4b7fc853f534f0b8e6034aeb0b7d7d3f0a7b1093
Author: Blake Eggleston <bd...@...>
Date:   2018-03-05T03:13:32Z

    beginning stream read refactor

commit 6a6649b539c21b142af45086e5554de30478560f
Author: Blake Eggleston <bd...@...>
Date:   2018-03-05T03:27:40Z

    refactoring streamreader

commit b39cd935ce5dfd0506fec44a21e1461a723738ee
Author: Blake Eggleston <bd...@...>
Date:   2018-03-06T01:32:57Z

    refactoring streamreader some more

commit d8ffade23dd70e16b625c1cbf03a7c0faf71ba7a
Author: Blake Eggleston <bd...@...>
Date:   2018-03-06T16:42:51Z

    refactoring management of received streams

commit efc70ffa5fb639dcfc91bf8c8e7a48d34d50c926
Author: Blake Eggleston <bd...@...>
Date:   2018-03-07T01:40:41Z

    refactoring stream preparation

commit 712bb68cdf3dfd18d6312073503b84b1778522cd
Author: Blake Eggleston <bd...@...>
Date:   2018-03-07T01:45:54Z

    making preview kind backend agnostic

commit b1ddb8c99861665d9f5586d06bf2268e1be2509f
Author: Blake Eggleston <bd...@...>
Date:   2018-03-07T02:15:32Z

    additional cleanup

commit 8f2f060d439a66e8de98da2d8f9e8fa36b787937
Author: Blake Eggleston <bd...@...>
Date:   2018-03-07T02:17:21Z

    renaming StreamAggregator to StreamReceiver

commit fbeb67ebf5cfb6b9d44da80115900ecddc0eeede
Author: Blake Eggleston <bd...@...>
Date:   2018-03-07T06:11:33Z

    little more renaming

commit 017210bbb363b98bda73896da8d2221824ca0f57
Author: Blake Eggleston <bd...@...>
Date:   2018-03-07T15:42:30Z

    more renaming, adding interface docs

commit 032b7c8ef9be576a6516c1e8b3ec217bf676896a
Author: Blake Eggleston <bd...@...>
Date:   2018-03-08T05:38:33Z

    fixing tests

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #204: 14115 v1

Posted by bdeggleston <gi...@git.apache.org>.
Github user bdeggleston commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/204#discussion_r174927597
  
    --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java ---
    @@ -0,0 +1,158 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.cassandra.db.streaming;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.UUID;
    +
    +import com.google.common.base.Predicate;
    +import com.google.common.base.Predicates;
    +import com.google.common.collect.Iterables;
    +import com.google.common.collect.Sets;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.cassandra.db.ColumnFamilyStore;
    +import org.apache.cassandra.db.PartitionPosition;
    +import org.apache.cassandra.db.lifecycle.SSTableIntervalTree;
    +import org.apache.cassandra.db.lifecycle.SSTableSet;
    +import org.apache.cassandra.db.lifecycle.View;
    +import org.apache.cassandra.dht.Range;
    +import org.apache.cassandra.dht.Token;
    +import org.apache.cassandra.io.sstable.format.SSTableReader;
    +import org.apache.cassandra.service.ActiveRepairService;
    +import org.apache.cassandra.streaming.IncomingStream;
    +import org.apache.cassandra.streaming.OutgoingStream;
    +import org.apache.cassandra.streaming.PreviewKind;
    +import org.apache.cassandra.streaming.StreamReceiver;
    +import org.apache.cassandra.streaming.StreamSession;
    +import org.apache.cassandra.streaming.TableStreamManager;
    +import org.apache.cassandra.streaming.messages.StreamMessageHeader;
    +import org.apache.cassandra.utils.Pair;
    +import org.apache.cassandra.utils.concurrent.Ref;
    +import org.apache.cassandra.utils.concurrent.Refs;
    +
    +/**
    + * Implements the streaming interface for the native cassandra storage engine.
    + *
    + * Handles the streaming a one or more section of one of more sstables to and from a specific
    + * remote node. The sending side performs a block-level transfer of the source stream, while the receiver
    + * must deserilaize that data stream into an partitions and rows, and then write that out as an sstable.
    + */
    +public class CassandraStreamManager implements TableStreamManager
    +{
    +    private static final Logger logger = LoggerFactory.getLogger(CassandraStreamManager.class);
    +
    +    private final ColumnFamilyStore cfs;
    +
    +    public CassandraStreamManager(ColumnFamilyStore cfs)
    +    {
    +        this.cfs = cfs;
    +    }
    +
    +    @Override
    +    public IncomingStream prepareIncomingStream(StreamSession session, StreamMessageHeader header)
    +    {
    +        return new CassandraIncomingFile(cfs, session, header);
    +    }
    +
    +    @Override
    +    public StreamReceiver createStreamReceiver(StreamSession session, int totalStreams)
    +    {
    +        return new CassandraStreamReceiver(cfs, session, totalStreams);
    +    }
    +
    +    private static Predicate<SSTableReader> getPreviewPredicate(PreviewKind kind)
    +    {
    +        switch (kind)
    +        {
    +            case ALL:
    +                return Predicates.alwaysTrue();
    +            case UNREPAIRED:
    +                return Predicates.not(SSTableReader::isRepaired);
    +            case REPAIRED:
    +                return SSTableReader::isRepaired;
    +            default:
    +                throw new IllegalArgumentException("Unsupported kind: " + kind);
    +        }
    +    }
    +
    +    @Override
    +    public Collection<OutgoingStream> createOutgoingStreams(StreamSession session, Collection<Range<Token>> ranges, UUID pendingRepair, PreviewKind previewKind)
    +    {
    +        Refs<SSTableReader> refs = new Refs<>();
    +        final List<Range<PartitionPosition>> keyRanges = new ArrayList<>(ranges.size());
    --- End diff --
    
    pushed up a new commit to fix. The idea was that the refs release had been moved to the caller, `StreamSession.getOutgoingStreamsForRanges`, since `StreamSession.getSSTableSectionsForRanges` worked on all tables being streamed, and the stream handler only works on one table. Still, that didn't handle the refs taken in the method, but not yet returned as streams, so we need both.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #204: 14115 v1

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/204#discussion_r174135042
  
    --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java ---
    @@ -0,0 +1,158 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.cassandra.db.streaming;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.UUID;
    +
    +import com.google.common.base.Predicate;
    +import com.google.common.base.Predicates;
    +import com.google.common.collect.Iterables;
    +import com.google.common.collect.Sets;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.cassandra.db.ColumnFamilyStore;
    +import org.apache.cassandra.db.PartitionPosition;
    +import org.apache.cassandra.db.lifecycle.SSTableIntervalTree;
    +import org.apache.cassandra.db.lifecycle.SSTableSet;
    +import org.apache.cassandra.db.lifecycle.View;
    +import org.apache.cassandra.dht.Range;
    +import org.apache.cassandra.dht.Token;
    +import org.apache.cassandra.io.sstable.format.SSTableReader;
    +import org.apache.cassandra.service.ActiveRepairService;
    +import org.apache.cassandra.streaming.IncomingStream;
    +import org.apache.cassandra.streaming.OutgoingStream;
    +import org.apache.cassandra.streaming.PreviewKind;
    +import org.apache.cassandra.streaming.StreamReceiver;
    +import org.apache.cassandra.streaming.StreamSession;
    +import org.apache.cassandra.streaming.TableStreamManager;
    +import org.apache.cassandra.streaming.messages.StreamMessageHeader;
    +import org.apache.cassandra.utils.Pair;
    +import org.apache.cassandra.utils.concurrent.Ref;
    +import org.apache.cassandra.utils.concurrent.Refs;
    +
    +/**
    + * Implements the streaming interface for the native cassandra storage engine.
    + *
    + * Handles the streaming a one or more section of one of more sstables to and from a specific
    + * remote node. The sending side performs a block-level transfer of the source stream, while the receiver
    + * must deserilaize that data stream into an partitions and rows, and then write that out as an sstable.
    + */
    +public class CassandraStreamManager implements TableStreamManager
    +{
    +    private static final Logger logger = LoggerFactory.getLogger(CassandraStreamManager.class);
    +
    +    private final ColumnFamilyStore cfs;
    +
    +    public CassandraStreamManager(ColumnFamilyStore cfs)
    +    {
    +        this.cfs = cfs;
    +    }
    +
    +    @Override
    +    public IncomingStream prepareIncomingStream(StreamSession session, StreamMessageHeader header)
    +    {
    +        return new CassandraIncomingFile(cfs, session, header);
    +    }
    +
    +    @Override
    +    public StreamReceiver createStreamReceiver(StreamSession session, int totalStreams)
    +    {
    +        return new CassandraStreamReceiver(cfs, session, totalStreams);
    +    }
    +
    +    private static Predicate<SSTableReader> getPreviewPredicate(PreviewKind kind)
    +    {
    +        switch (kind)
    +        {
    +            case ALL:
    +                return Predicates.alwaysTrue();
    +            case UNREPAIRED:
    +                return Predicates.not(SSTableReader::isRepaired);
    +            case REPAIRED:
    +                return SSTableReader::isRepaired;
    +            default:
    +                throw new IllegalArgumentException("Unsupported kind: " + kind);
    +        }
    +    }
    +
    +    @Override
    +    public Collection<OutgoingStream> createOutgoingStreams(StreamSession session, Collection<Range<Token>> ranges, UUID pendingRepair, PreviewKind previewKind)
    +    {
    +        Refs<SSTableReader> refs = new Refs<>();
    +        final List<Range<PartitionPosition>> keyRanges = new ArrayList<>(ranges.size());
    --- End diff --
    
    `StreamSession.getSSTableSectionsForRanges` included a `try` after that encompassed most of the function, and would release the `refs` on any failure. Was that unnecessary, or should we keep it? 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org