You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2013/08/29 17:20:25 UTC
[3/6] git commit: Fix streaming does not transfer wrapped range
Fix streaming does not transfer wrapped range
patch by Sergio Bossa; reviewed by yukim for CASSANDRA-5948
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/18be7fa8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/18be7fa8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/18be7fa8
Branch: refs/heads/trunk
Commit: 18be7fa8760c578a9d42f7512e7767992025ac18
Parents: 254d315
Author: Sergio Bossa <se...@gmail.com>
Authored: Wed Aug 28 14:43:04 2013 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Aug 29 10:16:57 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/streaming/StreamOut.java | 9 +-
.../streaming/StreamingTransferTest.java | 181 +++++++++++--------
3 files changed, 112 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/18be7fa8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9ef0651..5cb1522 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -3,6 +3,7 @@
* Fix CqlRecordWriter with composite keys (CASSANDRA-5949)
* Allow disabling SlabAllocator (CASSANDRA-5935)
* Make user-defined compaction JMX blocking (CASSANDRA-4952)
+ * Fix streaming does not transfer wrapped range (CASSANDRA-5948)
1.2.9
http://git-wip-us.apache.org/repos/asf/cassandra/blob/18be7fa8/src/java/org/apache/cassandra/streaming/StreamOut.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamOut.java b/src/java/org/apache/cassandra/streaming/StreamOut.java
index 5a5ab9a..7035ec7 100644
--- a/src/java/org/apache/cassandra/streaming/StreamOut.java
+++ b/src/java/org/apache/cassandra/streaming/StreamOut.java
@@ -126,8 +126,11 @@ public class StreamOut
boolean flushTables)
{
assert ranges.size() > 0;
+
+ List<Range<Token>> normalizedRanges = Range.normalize(ranges);
+
logger.info("Beginning transfer to {}", session.getHost());
- logger.debug("Ranges are {}", StringUtils.join(ranges, ","));
+ logger.debug("Ranges are {}", StringUtils.join(normalizedRanges, ","));
if (flushTables)
flushSSTables(cfses);
@@ -136,13 +139,13 @@ public class StreamOut
for (ColumnFamilyStore cfStore : cfses)
{
List<AbstractBounds<RowPosition>> rowBoundsList = Lists.newLinkedList();
- for (Range<Token> range : ranges)
+ for (Range<Token> range : normalizedRanges)
rowBoundsList.add(range.toRowBounds());
ColumnFamilyStore.ViewFragment view = cfStore.markReferenced(rowBoundsList);
sstables.addAll(view.sstables);
}
- transferSSTables(session, sstables, ranges, type);
+ transferSSTables(session, sstables, normalizedRanges, type);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/18be7fa8/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index 2befe45..82c6b1c 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@ -1,23 +1,21 @@
-package org.apache.cassandra.streaming;
-
/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
import static junit.framework.Assert.assertEquals;
import org.apache.cassandra.OrderedJUnit4ClassRunner;
@@ -73,7 +71,7 @@ public class StreamingTransferTest extends SchemaLoader
* Create and transfer a single sstable, and return the keys that should have been transferred.
* The Mutator must create the given column, but it may also create any other columns it pleases.
*/
- private List<String> createAndTransfer(Table table, ColumnFamilyStore cfs, Mutator mutator) throws Exception
+ private List<String> createAndTransfer(Table table, ColumnFamilyStore cfs, Mutator mutator, boolean transferSSTables) throws Exception
{
// write a temporary SSTable, and unregister it
logger.debug("Mutating " + cfs.columnFamily);
@@ -83,18 +81,29 @@ public class StreamingTransferTest extends SchemaLoader
cfs.forceBlockingFlush();
Util.compactAll(cfs).get();
assertEquals(1, cfs.getSSTables().size());
- SSTableReader sstable = cfs.getSSTables().iterator().next();
- cfs.clearUnsafe();
// transfer the first and last key
logger.debug("Transferring " + cfs.columnFamily);
- transfer(table, sstable);
+ int[] offs;
+ if (transferSSTables)
+ {
+ SSTableReader sstable = cfs.getSSTables().iterator().next();
+ cfs.clearUnsafe();
+ transferSSTables(table, sstable);
+ offs = new int[]{1, 3};
+ }
+ else
+ {
+ long beforeStreaming = System.currentTimeMillis();
+ transferRanges(table, cfs);
+ cfs.discardSSTables(beforeStreaming);
+ offs = new int[]{2, 3};
+ }
// confirm that a single SSTable was transferred and registered
assertEquals(1, cfs.getSSTables().size());
// and that the index and filter were properly recovered
- int[] offs = new int[]{1, 3};
List<Row> rows = Util.getRangeSlice(cfs);
assertEquals(offs.length, rows.size());
for (int i = 0; i < offs.length; i++)
@@ -108,7 +117,6 @@ public class StreamingTransferTest extends SchemaLoader
// and that the max timestamp for the file was rediscovered
assertEquals(timestamp, cfs.getSSTables().iterator().next().getMaxTimestamp());
-
List<String> keys = new ArrayList<String>();
for (int off : offs)
keys.add("key" + off);
@@ -117,17 +125,64 @@ public class StreamingTransferTest extends SchemaLoader
return keys;
}
- private void transfer(Table table, SSTableReader sstable) throws Exception
+ private void transferRanges(Table table, ColumnFamilyStore cfs) throws Exception
+ {
+ IPartitioner p = StorageService.getPartitioner();
+ List<Range<Token>> ranges = new ArrayList<Range<Token>>();
+ ranges.add(new Range<Token>(p.getToken(ByteBufferUtil.bytes("key1")), p.getToken(ByteBufferUtil.bytes("key0"))));
+ StreamOutSession session = StreamOutSession.create(table.name, LOCAL, (IStreamCallback) null);
+ StreamOut.transferRanges(session, Arrays.asList(cfs), ranges, OperationType.BOOTSTRAP);
+ session.await();
+ }
+
+ private void transferSSTables(Table table, SSTableReader sstable) throws Exception
{
IPartitioner p = StorageService.getPartitioner();
List<Range<Token>> ranges = new ArrayList<Range<Token>>();
ranges.add(new Range<Token>(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("key1"))));
ranges.add(new Range<Token>(p.getToken(ByteBufferUtil.bytes("key2")), p.getMinimumToken()));
- StreamOutSession session = StreamOutSession.create(table.name, LOCAL, (IStreamCallback)null);
+ StreamOutSession session = StreamOutSession.create(table.name, LOCAL, (IStreamCallback) null);
StreamOut.transferSSTables(session, Arrays.asList(sstable), ranges, OperationType.BOOTSTRAP);
session.await();
}
+ private void doTransferTable(boolean transferSSTables) throws Exception
+ {
+ final Table table = Table.open("Keyspace1");
+ final ColumnFamilyStore cfs = table.getColumnFamilyStore("Indexed1");
+
+ List<String> keys = createAndTransfer(table, cfs, new Mutator()
+ {
+ public void mutate(String key, String col, long timestamp) throws Exception
+ {
+ long val = key.hashCode();
+ RowMutation rm = new RowMutation("Keyspace1", ByteBufferUtil.bytes(key));
+ ColumnFamily cf = ColumnFamily.create(table.name, cfs.columnFamily);
+ cf.addColumn(column(col, "v", timestamp));
+ cf.addColumn(new Column(ByteBufferUtil.bytes("birthdate"), ByteBufferUtil.bytes(val), timestamp));
+ rm.add(cf);
+ logger.debug("Applying row to transfer " + rm);
+ rm.apply();
+ }
+ }, transferSSTables);
+
+ // confirm that the secondary index was recovered
+ for (String key : keys)
+ {
+ long val = key.hashCode();
+ IPartitioner p = StorageService.getPartitioner();
+ IndexExpression expr = new IndexExpression(ByteBufferUtil.bytes("birthdate"),
+ IndexOperator.EQ,
+ ByteBufferUtil.bytes(val));
+ List<IndexExpression> clause = Arrays.asList(expr);
+ IDiskAtomFilter filter = new IdentityQueryFilter();
+ Range<RowPosition> range = Util.range("", "");
+ List<Row> rows = cfs.search(clause, range, 100, filter);
+ assertEquals(1, rows.size());
+ assert rows.get(0).key.key.equals(ByteBufferUtil.bytes(key));
+ }
+ }
+
/**
* Test to make sure RangeTombstones at column index boundary transferred correctly.
*/
@@ -153,7 +208,7 @@ public class StreamingTransferTest extends SchemaLoader
SSTableReader sstable = cfs.getSSTables().iterator().next();
cfs.clearUnsafe();
- transfer(table, sstable);
+ transferSSTables(table, sstable);
// confirm that a single SSTable was transferred and registered
assertEquals(1, cfs.getSSTables().size());
@@ -163,41 +218,15 @@ public class StreamingTransferTest extends SchemaLoader
}
@Test
- public void testTransferTable() throws Exception
+ public void testTransferTableViaRanges() throws Exception
{
- final Table table = Table.open("Keyspace1");
- final ColumnFamilyStore cfs = table.getColumnFamilyStore("Indexed1");
-
- List<String> keys = createAndTransfer(table, cfs, new Mutator()
- {
- public void mutate(String key, String col, long timestamp) throws Exception
- {
- long val = key.hashCode();
- RowMutation rm = new RowMutation("Keyspace1", ByteBufferUtil.bytes(key));
- ColumnFamily cf = ColumnFamily.create(table.name, cfs.columnFamily);
- cf.addColumn(column(col, "v", timestamp));
- cf.addColumn(new Column(ByteBufferUtil.bytes("birthdate"), ByteBufferUtil.bytes(val), timestamp));
- rm.add(cf);
- logger.debug("Applying row to transfer " + rm);
- rm.apply();
- }
- });
+ doTransferTable(false);
+ }
- // confirm that the secondary index was recovered
- for (String key : keys)
- {
- long val = key.hashCode();
- IPartitioner p = StorageService.getPartitioner();
- IndexExpression expr = new IndexExpression(ByteBufferUtil.bytes("birthdate"),
- IndexOperator.EQ,
- ByteBufferUtil.bytes(val));
- List<IndexExpression> clause = Arrays.asList(expr);
- IDiskAtomFilter filter = new IdentityQueryFilter();
- Range<RowPosition> range = Util.range("", "");
- List<Row> rows = cfs.search(clause, range, 100, filter);
- assertEquals(1, rows.size());
- assert rows.get(0).key.key.equals(ByteBufferUtil.bytes(key));
- }
+ @Test
+ public void testTransferTableViaSSTables() throws Exception
+ {
+ doTransferTable(true);
}
@Test
@@ -214,7 +243,7 @@ public class StreamingTransferTest extends SchemaLoader
addMutation(rm, cfs.columnFamily, col, 1, "val1", timestamp);
rm.apply();
}
- });
+ }, true);
}
@Test
@@ -240,35 +269,35 @@ public class StreamingTransferTest extends SchemaLoader
state.writeElement(CounterId.fromInt(6), 3L, 3L);
state.writeElement(CounterId.fromInt(8), 2L, 4L);
cf.addColumn(new CounterColumn(ByteBufferUtil.bytes(col),
- state.context,
- timestamp));
+ state.context,
+ timestamp));
cfCleaned.addColumn(new CounterColumn(ByteBufferUtil.bytes(col),
- cc.clearAllDelta(state.context),
- timestamp));
+ cc.clearAllDelta(state.context),
+ timestamp));
entries.put(key, cf);
cleanedEntries.put(key, cfCleaned);
cfs.addSSTable(SSTableUtils.prepare()
- .ks(table.name)
- .cf(cfs.columnFamily)
- .generation(0)
- .write(entries));
+ .ks(table.name)
+ .cf(cfs.columnFamily)
+ .generation(0)
+ .write(entries));
}
- });
+ }, true);
// filter pre-cleaned entries locally, and ensure that the end result is equal
cleanedEntries.keySet().retainAll(keys);
SSTableReader cleaned = SSTableUtils.prepare()
- .ks(table.name)
- .cf(cfs.columnFamily)
- .generation(0)
- .write(cleanedEntries);
+ .ks(table.name)
+ .cf(cfs.columnFamily)
+ .generation(0)
+ .write(cleanedEntries);
SSTableReader streamed = cfs.getSSTables().iterator().next();
SSTableUtils.assertContentEquals(cleaned, streamed);
// Retransfer the file, making sure it is now idempotent (see CASSANDRA-3481)
cfs.clearUnsafe();
- transfer(table, streamed);
+ transferSSTables(table, streamed);
SSTableReader restreamed = cfs.getSSTables().iterator().next();
SSTableUtils.assertContentEquals(streamed, restreamed);
}