You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by jasobrown <gi...@git.apache.org> on 2018/03/12 12:52:55 UTC
[GitHub] cassandra pull request #204: 14115 v1
GitHub user jasobrown opened a pull request:
https://github.com/apache/cassandra/pull/204
14115 v1
Opening a PR for review
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/bdeggleston/cassandra 14115-v1
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/cassandra/pull/204.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #204
----
commit f85c9849aeb718f1ace17220366c5915d3b0013b
Author: Blake Eggleston <bd...@...>
Date: 2018-03-04T22:44:30Z
remaing incoming/outgoing stream messages
commit db7d5177b3cc3ae7a718d53a88753b1af6370077
Author: Blake Eggleston <bd...@...>
Date: 2018-03-04T23:06:01Z
renaming file header
commit 22c3fbca652169004927cf1033b5ef83c1ff520c
Author: Blake Eggleston <bd...@...>
Date: 2018-03-04T23:27:43Z
infer 'keepSSTableLevel' from streamOperation
commit 6b64216c7b9451189a096a0b4cae5562d6bd361b
Author: Blake Eggleston <bd...@...>
Date: 2018-03-05T00:22:26Z
refactoring streamwriter
commit e782e5dac6828bb45d16a530eabb5cd365e98bf8
Author: Blake Eggleston <bd...@...>
Date: 2018-03-05T03:10:49Z
refactoring streamwriter some more
commit 4b7fc853f534f0b8e6034aeb0b7d7d3f0a7b1093
Author: Blake Eggleston <bd...@...>
Date: 2018-03-05T03:13:32Z
beginning stream read refactor
commit 6a6649b539c21b142af45086e5554de30478560f
Author: Blake Eggleston <bd...@...>
Date: 2018-03-05T03:27:40Z
refactoring streamreader
commit b39cd935ce5dfd0506fec44a21e1461a723738ee
Author: Blake Eggleston <bd...@...>
Date: 2018-03-06T01:32:57Z
refactoring streamreader some more
commit d8ffade23dd70e16b625c1cbf03a7c0faf71ba7a
Author: Blake Eggleston <bd...@...>
Date: 2018-03-06T16:42:51Z
refactoring management of received streams
commit efc70ffa5fb639dcfc91bf8c8e7a48d34d50c926
Author: Blake Eggleston <bd...@...>
Date: 2018-03-07T01:40:41Z
refactoring stream preparation
commit 712bb68cdf3dfd18d6312073503b84b1778522cd
Author: Blake Eggleston <bd...@...>
Date: 2018-03-07T01:45:54Z
making preview kind backend agnostic
commit b1ddb8c99861665d9f5586d06bf2268e1be2509f
Author: Blake Eggleston <bd...@...>
Date: 2018-03-07T02:15:32Z
additional cleanup
commit 8f2f060d439a66e8de98da2d8f9e8fa36b787937
Author: Blake Eggleston <bd...@...>
Date: 2018-03-07T02:17:21Z
renaming StreamAggregator to StreamReceiver
commit fbeb67ebf5cfb6b9d44da80115900ecddc0eeede
Author: Blake Eggleston <bd...@...>
Date: 2018-03-07T06:11:33Z
little more renaming
commit 017210bbb363b98bda73896da8d2221824ca0f57
Author: Blake Eggleston <bd...@...>
Date: 2018-03-07T15:42:30Z
more renaming, adding interface docs
commit 032b7c8ef9be576a6516c1e8b3ec217bf676896a
Author: Blake Eggleston <bd...@...>
Date: 2018-03-08T05:38:33Z
fixing tests
----
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #204: 14115 v1
Posted by bdeggleston <gi...@git.apache.org>.
Github user bdeggleston commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/204#discussion_r174927597
--- Diff: src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.lifecycle.SSTableIntervalTree;
+import org.apache.cassandra.db.lifecycle.SSTableSet;
+import org.apache.cassandra.db.lifecycle.View;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.streaming.IncomingStream;
+import org.apache.cassandra.streaming.OutgoingStream;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.streaming.StreamReceiver;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.TableStreamManager;
+import org.apache.cassandra.streaming.messages.StreamMessageHeader;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.concurrent.Ref;
+import org.apache.cassandra.utils.concurrent.Refs;
+
+/**
+ * Implements the streaming interface for the native cassandra storage engine.
+ *
+ * Handles the streaming a one or more section of one of more sstables to and from a specific
+ * remote node. The sending side performs a block-level transfer of the source stream, while the receiver
+ * must deserilaize that data stream into an partitions and rows, and then write that out as an sstable.
+ */
+public class CassandraStreamManager implements TableStreamManager
+{
+ private static final Logger logger = LoggerFactory.getLogger(CassandraStreamManager.class);
+
+ private final ColumnFamilyStore cfs;
+
+ public CassandraStreamManager(ColumnFamilyStore cfs)
+ {
+ this.cfs = cfs;
+ }
+
+ @Override
+ public IncomingStream prepareIncomingStream(StreamSession session, StreamMessageHeader header)
+ {
+ return new CassandraIncomingFile(cfs, session, header);
+ }
+
+ @Override
+ public StreamReceiver createStreamReceiver(StreamSession session, int totalStreams)
+ {
+ return new CassandraStreamReceiver(cfs, session, totalStreams);
+ }
+
+ private static Predicate<SSTableReader> getPreviewPredicate(PreviewKind kind)
+ {
+ switch (kind)
+ {
+ case ALL:
+ return Predicates.alwaysTrue();
+ case UNREPAIRED:
+ return Predicates.not(SSTableReader::isRepaired);
+ case REPAIRED:
+ return SSTableReader::isRepaired;
+ default:
+ throw new IllegalArgumentException("Unsupported kind: " + kind);
+ }
+ }
+
+ @Override
+ public Collection<OutgoingStream> createOutgoingStreams(StreamSession session, Collection<Range<Token>> ranges, UUID pendingRepair, PreviewKind previewKind)
+ {
+ Refs<SSTableReader> refs = new Refs<>();
+ final List<Range<PartitionPosition>> keyRanges = new ArrayList<>(ranges.size());
--- End diff --
pushed up a new commit to fix. The idea was that the refs release had been moved to the caller, `StreamSession.getOutgoingStreamsForRanges`, since `StreamSession.getSSTableSectionsForRanges` worked on all tables being streamed, and the stream handler only works on one table. Still, that didn't handle the refs taken in the method, but not yet returned as streams, so we need both.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #204: 14115 v1
Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/204#discussion_r174135042
--- Diff: src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.lifecycle.SSTableIntervalTree;
+import org.apache.cassandra.db.lifecycle.SSTableSet;
+import org.apache.cassandra.db.lifecycle.View;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.streaming.IncomingStream;
+import org.apache.cassandra.streaming.OutgoingStream;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.streaming.StreamReceiver;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.TableStreamManager;
+import org.apache.cassandra.streaming.messages.StreamMessageHeader;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.concurrent.Ref;
+import org.apache.cassandra.utils.concurrent.Refs;
+
+/**
+ * Implements the streaming interface for the native cassandra storage engine.
+ *
+ * Handles the streaming a one or more section of one of more sstables to and from a specific
+ * remote node. The sending side performs a block-level transfer of the source stream, while the receiver
+ * must deserilaize that data stream into an partitions and rows, and then write that out as an sstable.
+ */
+public class CassandraStreamManager implements TableStreamManager
+{
+ private static final Logger logger = LoggerFactory.getLogger(CassandraStreamManager.class);
+
+ private final ColumnFamilyStore cfs;
+
+ public CassandraStreamManager(ColumnFamilyStore cfs)
+ {
+ this.cfs = cfs;
+ }
+
+ @Override
+ public IncomingStream prepareIncomingStream(StreamSession session, StreamMessageHeader header)
+ {
+ return new CassandraIncomingFile(cfs, session, header);
+ }
+
+ @Override
+ public StreamReceiver createStreamReceiver(StreamSession session, int totalStreams)
+ {
+ return new CassandraStreamReceiver(cfs, session, totalStreams);
+ }
+
+ private static Predicate<SSTableReader> getPreviewPredicate(PreviewKind kind)
+ {
+ switch (kind)
+ {
+ case ALL:
+ return Predicates.alwaysTrue();
+ case UNREPAIRED:
+ return Predicates.not(SSTableReader::isRepaired);
+ case REPAIRED:
+ return SSTableReader::isRepaired;
+ default:
+ throw new IllegalArgumentException("Unsupported kind: " + kind);
+ }
+ }
+
+ @Override
+ public Collection<OutgoingStream> createOutgoingStreams(StreamSession session, Collection<Range<Token>> ranges, UUID pendingRepair, PreviewKind previewKind)
+ {
+ Refs<SSTableReader> refs = new Refs<>();
+ final List<Range<PartitionPosition>> keyRanges = new ArrayList<>(ranges.size());
--- End diff --
`StreamSession.getSSTableSectionsForRanges` included a `try` after that encompassed most of the function, and would release the `refs` on any failure. Was that unnecessary, or should we keep it?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org