You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2013/08/29 17:20:25 UTC

[3/6] git commit: Fix streaming does not transfer wrapped range

Fix streaming does not transfer wrapped range

patch by Sergio Bossa; reviewed by yukim for CASSANDRA-5948


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/18be7fa8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/18be7fa8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/18be7fa8

Branch: refs/heads/trunk
Commit: 18be7fa8760c578a9d42f7512e7767992025ac18
Parents: 254d315
Author: Sergio Bossa <se...@gmail.com>
Authored: Wed Aug 28 14:43:04 2013 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Aug 29 10:16:57 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/streaming/StreamOut.java   |   9 +-
 .../streaming/StreamingTransferTest.java        | 181 +++++++++++--------
 3 files changed, 112 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/18be7fa8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9ef0651..5cb1522 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -3,6 +3,7 @@
  * Fix CqlRecordWriter with composite keys (CASSANDRA-5949)
  * Allow disabling SlabAllocator (CASSANDRA-5935)
  * Make user-defined compaction JMX blocking (CASSANDRA-4952)
+ * Fix streaming does not transfer wrapped range (CASSANDRA-5948)
 
 
 1.2.9

http://git-wip-us.apache.org/repos/asf/cassandra/blob/18be7fa8/src/java/org/apache/cassandra/streaming/StreamOut.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamOut.java b/src/java/org/apache/cassandra/streaming/StreamOut.java
index 5a5ab9a..7035ec7 100644
--- a/src/java/org/apache/cassandra/streaming/StreamOut.java
+++ b/src/java/org/apache/cassandra/streaming/StreamOut.java
@@ -126,8 +126,11 @@ public class StreamOut
                                       boolean flushTables)
     {
         assert ranges.size() > 0;
+
+        List<Range<Token>> normalizedRanges = Range.normalize(ranges);
+
         logger.info("Beginning transfer to {}", session.getHost());
-        logger.debug("Ranges are {}", StringUtils.join(ranges, ","));
+        logger.debug("Ranges are {}", StringUtils.join(normalizedRanges, ","));
 
         if (flushTables)
             flushSSTables(cfses);
@@ -136,13 +139,13 @@ public class StreamOut
         for (ColumnFamilyStore cfStore : cfses)
         {
             List<AbstractBounds<RowPosition>> rowBoundsList = Lists.newLinkedList();
-            for (Range<Token> range : ranges)
+            for (Range<Token> range : normalizedRanges)
                 rowBoundsList.add(range.toRowBounds());
             ColumnFamilyStore.ViewFragment view = cfStore.markReferenced(rowBoundsList);
             sstables.addAll(view.sstables);
         }
 
-        transferSSTables(session, sstables, ranges, type);
+        transferSSTables(session, sstables, normalizedRanges, type);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/18be7fa8/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index 2befe45..82c6b1c 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@ -1,23 +1,21 @@
-package org.apache.cassandra.streaming;
-
 /*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
 
 import static junit.framework.Assert.assertEquals;
 import org.apache.cassandra.OrderedJUnit4ClassRunner;
@@ -73,7 +71,7 @@ public class StreamingTransferTest extends SchemaLoader
      * Create and transfer a single sstable, and return the keys that should have been transferred.
      * The Mutator must create the given column, but it may also create any other columns it pleases.
      */
-    private List<String> createAndTransfer(Table table, ColumnFamilyStore cfs, Mutator mutator) throws Exception
+    private List<String> createAndTransfer(Table table, ColumnFamilyStore cfs, Mutator mutator, boolean transferSSTables) throws Exception
     {
         // write a temporary SSTable, and unregister it
         logger.debug("Mutating " + cfs.columnFamily);
@@ -83,18 +81,29 @@ public class StreamingTransferTest extends SchemaLoader
         cfs.forceBlockingFlush();
         Util.compactAll(cfs).get();
         assertEquals(1, cfs.getSSTables().size());
-        SSTableReader sstable = cfs.getSSTables().iterator().next();
-        cfs.clearUnsafe();
 
         // transfer the first and last key
         logger.debug("Transferring " + cfs.columnFamily);
-        transfer(table, sstable);
+        int[] offs;
+        if (transferSSTables)
+        {
+            SSTableReader sstable = cfs.getSSTables().iterator().next();
+            cfs.clearUnsafe();
+            transferSSTables(table, sstable);
+            offs = new int[]{1, 3};
+        }
+        else
+        {
+            long beforeStreaming = System.currentTimeMillis();
+            transferRanges(table, cfs);
+            cfs.discardSSTables(beforeStreaming);
+            offs = new int[]{2, 3};
+        }
 
         // confirm that a single SSTable was transferred and registered
         assertEquals(1, cfs.getSSTables().size());
 
         // and that the index and filter were properly recovered
-        int[] offs = new int[]{1, 3};
         List<Row> rows = Util.getRangeSlice(cfs);
         assertEquals(offs.length, rows.size());
         for (int i = 0; i < offs.length; i++)
@@ -108,7 +117,6 @@ public class StreamingTransferTest extends SchemaLoader
 
         // and that the max timestamp for the file was rediscovered
         assertEquals(timestamp, cfs.getSSTables().iterator().next().getMaxTimestamp());
-
         List<String> keys = new ArrayList<String>();
         for (int off : offs)
             keys.add("key" + off);
@@ -117,17 +125,64 @@ public class StreamingTransferTest extends SchemaLoader
         return keys;
     }
 
-    private void transfer(Table table, SSTableReader sstable) throws Exception
+    private void transferRanges(Table table, ColumnFamilyStore cfs) throws Exception
+    {
+        IPartitioner p = StorageService.getPartitioner();
+        List<Range<Token>> ranges = new ArrayList<Range<Token>>();
+        ranges.add(new Range<Token>(p.getToken(ByteBufferUtil.bytes("key1")), p.getToken(ByteBufferUtil.bytes("key0"))));
+        StreamOutSession session = StreamOutSession.create(table.name, LOCAL, (IStreamCallback) null);
+        StreamOut.transferRanges(session, Arrays.asList(cfs), ranges, OperationType.BOOTSTRAP);
+        session.await();
+    }
+
+    private void transferSSTables(Table table, SSTableReader sstable) throws Exception
     {
         IPartitioner p = StorageService.getPartitioner();
         List<Range<Token>> ranges = new ArrayList<Range<Token>>();
         ranges.add(new Range<Token>(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("key1"))));
         ranges.add(new Range<Token>(p.getToken(ByteBufferUtil.bytes("key2")), p.getMinimumToken()));
-        StreamOutSession session = StreamOutSession.create(table.name, LOCAL, (IStreamCallback)null);
+        StreamOutSession session = StreamOutSession.create(table.name, LOCAL, (IStreamCallback) null);
         StreamOut.transferSSTables(session, Arrays.asList(sstable), ranges, OperationType.BOOTSTRAP);
         session.await();
     }
 
+    private void doTransferTable(boolean transferSSTables) throws Exception
+    {
+        final Table table = Table.open("Keyspace1");
+        final ColumnFamilyStore cfs = table.getColumnFamilyStore("Indexed1");
+
+        List<String> keys = createAndTransfer(table, cfs, new Mutator()
+        {
+            public void mutate(String key, String col, long timestamp) throws Exception
+            {
+                long val = key.hashCode();
+                RowMutation rm = new RowMutation("Keyspace1", ByteBufferUtil.bytes(key));
+                ColumnFamily cf = ColumnFamily.create(table.name, cfs.columnFamily);
+                cf.addColumn(column(col, "v", timestamp));
+                cf.addColumn(new Column(ByteBufferUtil.bytes("birthdate"), ByteBufferUtil.bytes(val), timestamp));
+                rm.add(cf);
+                logger.debug("Applying row to transfer " + rm);
+                rm.apply();
+            }
+        }, transferSSTables);
+
+        // confirm that the secondary index was recovered
+        for (String key : keys)
+        {
+            long val = key.hashCode();
+            IPartitioner p = StorageService.getPartitioner();
+            IndexExpression expr = new IndexExpression(ByteBufferUtil.bytes("birthdate"),
+                    IndexOperator.EQ,
+                    ByteBufferUtil.bytes(val));
+            List<IndexExpression> clause = Arrays.asList(expr);
+            IDiskAtomFilter filter = new IdentityQueryFilter();
+            Range<RowPosition> range = Util.range("", "");
+            List<Row> rows = cfs.search(clause, range, 100, filter);
+            assertEquals(1, rows.size());
+            assert rows.get(0).key.key.equals(ByteBufferUtil.bytes(key));
+        }
+    }
+
     /**
      * Test to make sure RangeTombstones at column index boundary transferred correctly.
      */
@@ -153,7 +208,7 @@ public class StreamingTransferTest extends SchemaLoader
 
         SSTableReader sstable = cfs.getSSTables().iterator().next();
         cfs.clearUnsafe();
-        transfer(table, sstable);
+        transferSSTables(table, sstable);
 
         // confirm that a single SSTable was transferred and registered
         assertEquals(1, cfs.getSSTables().size());
@@ -163,41 +218,15 @@ public class StreamingTransferTest extends SchemaLoader
     }
 
     @Test
-    public void testTransferTable() throws Exception
+    public void testTransferTableViaRanges() throws Exception
     {
-        final Table table = Table.open("Keyspace1");
-        final ColumnFamilyStore cfs = table.getColumnFamilyStore("Indexed1");
-
-        List<String> keys = createAndTransfer(table, cfs, new Mutator()
-        {
-            public void mutate(String key, String col, long timestamp) throws Exception
-            {
-                long val = key.hashCode();
-                RowMutation rm = new RowMutation("Keyspace1", ByteBufferUtil.bytes(key));
-                ColumnFamily cf = ColumnFamily.create(table.name, cfs.columnFamily);
-                cf.addColumn(column(col, "v", timestamp));
-                cf.addColumn(new Column(ByteBufferUtil.bytes("birthdate"), ByteBufferUtil.bytes(val), timestamp));
-                rm.add(cf);
-                logger.debug("Applying row to transfer " + rm);
-                rm.apply();
-            }
-        });
+        doTransferTable(false);
+    }
 
-        // confirm that the secondary index was recovered
-        for (String key : keys)
-        {
-            long val = key.hashCode();
-            IPartitioner p = StorageService.getPartitioner();
-            IndexExpression expr = new IndexExpression(ByteBufferUtil.bytes("birthdate"),
-                                                       IndexOperator.EQ,
-                                                       ByteBufferUtil.bytes(val));
-            List<IndexExpression> clause = Arrays.asList(expr);
-            IDiskAtomFilter filter = new IdentityQueryFilter();
-            Range<RowPosition> range = Util.range("", "");
-            List<Row> rows = cfs.search(clause, range, 100, filter);
-            assertEquals(1, rows.size());
-            assert rows.get(0).key.key.equals(ByteBufferUtil.bytes(key));
-        }
+    @Test
+    public void testTransferTableViaSSTables() throws Exception
+    {
+        doTransferTable(true);
     }
 
     @Test
@@ -214,7 +243,7 @@ public class StreamingTransferTest extends SchemaLoader
                 addMutation(rm, cfs.columnFamily, col, 1, "val1", timestamp);
                 rm.apply();
             }
-        });
+        }, true);
     }
 
     @Test
@@ -240,35 +269,35 @@ public class StreamingTransferTest extends SchemaLoader
                 state.writeElement(CounterId.fromInt(6), 3L, 3L);
                 state.writeElement(CounterId.fromInt(8), 2L, 4L);
                 cf.addColumn(new CounterColumn(ByteBufferUtil.bytes(col),
-                                               state.context,
-                                               timestamp));
+                        state.context,
+                        timestamp));
                 cfCleaned.addColumn(new CounterColumn(ByteBufferUtil.bytes(col),
-                                                      cc.clearAllDelta(state.context),
-                                                      timestamp));
+                        cc.clearAllDelta(state.context),
+                        timestamp));
 
                 entries.put(key, cf);
                 cleanedEntries.put(key, cfCleaned);
                 cfs.addSSTable(SSTableUtils.prepare()
-                    .ks(table.name)
-                    .cf(cfs.columnFamily)
-                    .generation(0)
-                    .write(entries));
+                        .ks(table.name)
+                        .cf(cfs.columnFamily)
+                        .generation(0)
+                        .write(entries));
             }
-        });
+        }, true);
 
         // filter pre-cleaned entries locally, and ensure that the end result is equal
         cleanedEntries.keySet().retainAll(keys);
         SSTableReader cleaned = SSTableUtils.prepare()
-            .ks(table.name)
-            .cf(cfs.columnFamily)
-            .generation(0)
-            .write(cleanedEntries);
+                .ks(table.name)
+                .cf(cfs.columnFamily)
+                .generation(0)
+                .write(cleanedEntries);
         SSTableReader streamed = cfs.getSSTables().iterator().next();
         SSTableUtils.assertContentEquals(cleaned, streamed);
 
         // Retransfer the file, making sure it is now idempotent (see CASSANDRA-3481)
         cfs.clearUnsafe();
-        transfer(table, streamed);
+        transferSSTables(table, streamed);
         SSTableReader restreamed = cfs.getSSTables().iterator().next();
         SSTableUtils.assertContentEquals(streamed, restreamed);
     }