You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/09/02 09:51:53 UTC
[1/4] cassandra git commit: Improve batchlog write path
Repository: cassandra
Updated Branches:
refs/heads/trunk 2dbf029f0 -> 79628dd70
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java b/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
deleted file mode 100644
index 568e23d..0000000
--- a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
+++ /dev/null
@@ -1,394 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import java.net.InetAddress;
-import java.util.Collections;
-import java.util.Iterator;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.partitions.ImmutableBTreePartition;
-import org.apache.cassandra.db.rows.Row;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.schema.KeyspaceParams;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.ExecutionException;
-import com.google.common.collect.Lists;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.Util;
-import org.apache.cassandra.Util.PartitionerSwitcher;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.cql3.QueryProcessor;
-import org.apache.cassandra.cql3.UntypedResultSet;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
-import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.db.marshal.LongType;
-import org.apache.cassandra.dht.Murmur3Partitioner;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.locator.TokenMetadata;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.UUIDGen;
-
-public class BatchlogManagerTest
-{
- private static final String KEYSPACE1 = "BatchlogManagerTest1";
- private static final String CF_STANDARD1 = "Standard1";
- private static final String CF_STANDARD2 = "Standard2";
- private static final String CF_STANDARD3 = "Standard3";
- private static final String CF_STANDARD4 = "Standard4";
-
- static PartitionerSwitcher sw;
-
- @BeforeClass
- public static void defineSchema() throws ConfigurationException
- {
- sw = Util.switchPartitioner(Murmur3Partitioner.instance);
- SchemaLoader.prepareServer();
- SchemaLoader.createKeyspace(KEYSPACE1,
- KeyspaceParams.simple(1),
- SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1, 1, BytesType.instance),
- SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2, 1, BytesType.instance),
- SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD3, 1, BytesType.instance),
- SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD4, 1, BytesType.instance));
- System.out.println(Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1).metadata.partitionKeyColumns());
- }
-
- @AfterClass
- public static void cleanup()
- {
- sw.close();
- }
-
- @Before
- public void setUp() throws Exception
- {
- TokenMetadata metadata = StorageService.instance.getTokenMetadata();
- InetAddress localhost = InetAddress.getByName("127.0.0.1");
- metadata.updateNormalToken(Util.token("A"), localhost);
- metadata.updateHostId(UUIDGen.getTimeUUID(), localhost);
- Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).truncateBlocking();
- Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_BATCHLOG).truncateBlocking();
- }
-
- @Test
- public void testDelete()
- {
- ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
- CFMetaData cfm = cfs.metadata;
- new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), ByteBufferUtil.bytes("1234"))
- .clustering("c")
- .add("val", "val" + 1234)
- .build()
- .applyUnsafe();
-
- DecoratedKey dk = cfs.decorateKey(ByteBufferUtil.bytes("1234"));
- ImmutableBTreePartition results = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, dk).build());
- Iterator<Row> iter = results.iterator();
- assert iter.hasNext();
-
- Mutation mutation = new Mutation(PartitionUpdate.fullPartitionDelete(cfm,
- dk,
- FBUtilities.timestampMicros(),
- FBUtilities.nowInSeconds()));
- mutation.applyUnsafe();
-
- Util.assertEmpty(Util.cmd(cfs, dk).build());
- }
-
- // TODO: Fix. Currently endlessly looping on BatchLogManager.replayAllFailedBatches
- @Test
- public void testReplay() throws Exception
- {
- long initialAllBatches = BatchlogManager.instance.countAllBatches();
- long initialReplayedBatches = BatchlogManager.instance.getTotalBatchesReplayed();
-
- CFMetaData cfm = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1).metadata;
-
- // Generate 1000 mutations and put them all into the batchlog.
- // Half (500) ready to be replayed, half not.
- for (int i = 0; i < 1000; i++)
- {
- Mutation m = new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), bytes(i))
- .clustering("name" + i)
- .add("val", "val" + i)
- .build();
-
- long timestamp = i < 500
- ? (System.currentTimeMillis() - BatchlogManager.instance.getBatchlogTimeout())
- : (System.currentTimeMillis() + BatchlogManager.instance.getBatchlogTimeout());
-
- BatchlogManager.getBatchlogMutationFor(Collections.singleton(m),
- UUIDGen.getTimeUUID(timestamp, i),
- MessagingService.current_version)
- .applyUnsafe();
- }
-
- // Flush the batchlog to disk (see CASSANDRA-6822).
- Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).forceBlockingFlush();
-
- assertEquals(1000, BatchlogManager.instance.countAllBatches() - initialAllBatches);
- assertEquals(0, BatchlogManager.instance.getTotalBatchesReplayed() - initialReplayedBatches);
-
- // Force batchlog replay and wait for it to complete.
- BatchlogManager.instance.startBatchlogReplay().get();
-
- // Ensure that the first half, and only the first half, got replayed.
- assertEquals(500, BatchlogManager.instance.countAllBatches() - initialAllBatches);
- assertEquals(500, BatchlogManager.instance.getTotalBatchesReplayed() - initialReplayedBatches);
-
- for (int i = 0; i < 1000; i++)
- {
- UntypedResultSet result = QueryProcessor.executeInternal(String.format("SELECT * FROM \"%s\".\"%s\" WHERE key = intAsBlob(%d)", KEYSPACE1, CF_STANDARD1, i));
- if (i < 500)
- {
- assertEquals(bytes(i), result.one().getBytes("key"));
- assertEquals("name" + i, result.one().getString("name"));
- assertEquals("val" + i, result.one().getString("val"));
- }
- else
- {
- assertTrue(result.isEmpty());
- }
- }
-
- // Ensure that no stray mutations got somehow applied.
- UntypedResultSet result = QueryProcessor.executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", KEYSPACE1, CF_STANDARD1));
- assertEquals(500, result.one().getLong("count"));
- }
-
- @Test
- public void testTruncatedReplay() throws InterruptedException, ExecutionException
- {
- CFMetaData cf2 = Schema.instance.getCFMetaData(KEYSPACE1, CF_STANDARD2);
- CFMetaData cf3 = Schema.instance.getCFMetaData(KEYSPACE1, CF_STANDARD3);
- // Generate 2000 mutations (1000 batchlog entries) and put them all into the batchlog.
- // Each batchlog entry with a mutation for Standard2 and Standard3.
- // In the middle of the process, 'truncate' Standard2.
- for (int i = 0; i < 1000; i++)
- {
- Mutation mutation1 = new RowUpdateBuilder(cf2, FBUtilities.timestampMicros(), bytes(i))
- .clustering("name" + i)
- .add("val", "val" + i)
- .build();
- Mutation mutation2 = new RowUpdateBuilder(cf3, FBUtilities.timestampMicros(), bytes(i))
- .clustering("name" + i)
- .add("val", "val" + i)
- .build();
-
- List<Mutation> mutations = Lists.newArrayList(mutation1, mutation2);
-
- // Make sure it's ready to be replayed, so adjust the timestamp.
- long timestamp = System.currentTimeMillis() - BatchlogManager.instance.getBatchlogTimeout();
-
- if (i == 500)
- SystemKeyspace.saveTruncationRecord(Keyspace.open(KEYSPACE1).getColumnFamilyStore("Standard2"),
- timestamp,
- ReplayPosition.NONE);
-
- // Adjust the timestamp (slightly) to make the test deterministic.
- if (i >= 500)
- timestamp++;
- else
- timestamp--;
-
- BatchlogManager.getBatchlogMutationFor(mutations,
- UUIDGen.getTimeUUID(timestamp, i),
- MessagingService.current_version)
- .applyUnsafe();
- }
-
- // Flush the batchlog to disk (see CASSANDRA-6822).
- Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).forceBlockingFlush();
-
- // Force batchlog replay and wait for it to complete.
- BatchlogManager.instance.startBatchlogReplay().get();
-
- // We should see half of Standard2-targeted mutations written after the replay and all of Standard3 mutations applied.
- for (int i = 0; i < 1000; i++)
- {
- UntypedResultSet result = QueryProcessor.executeInternal(String.format("SELECT * FROM \"%s\".\"%s\" WHERE key = intAsBlob(%d)", KEYSPACE1, CF_STANDARD2,i));
- if (i >= 500)
- {
- assertEquals(bytes(i), result.one().getBytes("key"));
- assertEquals("name" + i, result.one().getString("name"));
- assertEquals("val" + i, result.one().getString("val"));
- }
- else
- {
- assertTrue(result.isEmpty());
- }
- }
-
- for (int i = 0; i < 1000; i++)
- {
- UntypedResultSet result = QueryProcessor.executeInternal(String.format("SELECT * FROM \"%s\".\"%s\" WHERE key = intAsBlob(%d)", KEYSPACE1, CF_STANDARD3, i));
- assertEquals(bytes(i), result.one().getBytes("key"));
- assertEquals("name" + i, result.one().getString("name"));
- assertEquals("val" + i, result.one().getString("val"));
- }
- }
-
- static Mutation fakeVersion12MutationFor(Collection<Mutation> mutations, long now) throws IOException
- {
- // Serialization can't write version 1.2 mutations, pretend this is old by using random id and written_at and
- // saving it in the legacy batchlog.
- UUID uuid = UUID.randomUUID();
- ByteBuffer writtenAt = LongType.instance.decompose(now);
- int version = MessagingService.VERSION_30;
- ByteBuffer data = BatchlogManager.serializeMutations(mutations, version);
-
- return new RowUpdateBuilder(SystemKeyspace.LegacyBatchlog, FBUtilities.timestampMicros(), uuid)
- .clustering()
- .add("written_at", writtenAt)
- .add("data", data)
- .add("version", version)
- .build();
- }
-
- static Mutation fakeVersion20MutationFor(Collection<Mutation> mutations, UUID uuid)
- {
- // Serialization can't write version 1.2 mutations, pretend this is old by saving it in the legacy batchlog.
- int version = MessagingService.VERSION_30;
- ByteBuffer writtenAt = LongType.instance.decompose(UUIDGen.unixTimestamp(uuid));
- return new RowUpdateBuilder(SystemKeyspace.LegacyBatchlog, FBUtilities.timestampMicros(), uuid)
- .clustering()
- .add("data", BatchlogManager.serializeMutations(mutations, version))
- .add("written_at", writtenAt)
- .add("version", version)
- .build();
- }
-
- @Test
- public void testConversion() throws Exception
- {
- long initialAllBatches = BatchlogManager.instance.countAllBatches();
- long initialReplayedBatches = BatchlogManager.instance.getTotalBatchesReplayed();
- CFMetaData cfm = Schema.instance.getCFMetaData(KEYSPACE1, CF_STANDARD4);
-
- // Generate 1000 mutations and put them all into the batchlog.
- // Half (500) ready to be replayed, half not.
- for (int i = 0; i < 1000; i++)
- {
- Mutation mutation = new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), bytes(i))
- .clustering("name" + i)
- .add("val", "val" + i)
- .build();
-
- long timestamp = i < 500
- ? (System.currentTimeMillis() - BatchlogManager.instance.getBatchlogTimeout())
- : (System.currentTimeMillis() + BatchlogManager.instance.getBatchlogTimeout());
-
-
- fakeVersion12MutationFor(Collections.singleton(mutation), timestamp).applyUnsafe();
- }
-
- // Add 400 version 2.0 mutations and put them all into the batchlog.
- // Half (200) ready to be replayed, half not.
- for (int i = 1000; i < 1400; i++)
- {
- Mutation mutation = new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), bytes(i))
- .clustering("name" + i)
- .add("val", "val" + i)
- .build();
-
- long timestamp = i < 1200
- ? (System.currentTimeMillis() - BatchlogManager.instance.getBatchlogTimeout())
- : (System.currentTimeMillis() + BatchlogManager.instance.getBatchlogTimeout());
-
-
- fakeVersion20MutationFor(Collections.singleton(mutation), UUIDGen.getTimeUUID(timestamp, i)).applyUnsafe();
- }
-
- // Mix in 100 current version mutations, 50 ready for replay.
- for (int i = 1400; i < 1500; i++)
- {
- Mutation mutation = new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), bytes(i))
- .clustering("name" + i)
- .add("val", "val" + i)
- .build();
-
- long timestamp = i < 1450
- ? (System.currentTimeMillis() - BatchlogManager.instance.getBatchlogTimeout())
- : (System.currentTimeMillis() + BatchlogManager.instance.getBatchlogTimeout());
-
-
- BatchlogManager.getBatchlogMutationFor(Collections.singleton(mutation),
- UUIDGen.getTimeUUID(timestamp, i),
- MessagingService.current_version)
- .applyUnsafe();
- }
-
- // Flush the batchlog to disk (see CASSANDRA-6822).
- Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_BATCHLOG).forceBlockingFlush();
- Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).forceBlockingFlush();
-
- assertEquals(100, BatchlogManager.instance.countAllBatches() - initialAllBatches);
- assertEquals(0, BatchlogManager.instance.getTotalBatchesReplayed() - initialReplayedBatches);
-
- UntypedResultSet result = QueryProcessor.executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", SystemKeyspace.NAME, SystemKeyspace.LEGACY_BATCHLOG));
- assertEquals("Count in blog legacy", 1400, result.one().getLong("count"));
- result = QueryProcessor.executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", SystemKeyspace.NAME, SystemKeyspace.BATCHES));
- assertEquals("Count in blog", 100, result.one().getLong("count"));
-
- // Force batchlog replay and wait for it to complete.
- BatchlogManager.instance.performInitialReplay();
-
- // Ensure that the first half, and only the first half, got replayed.
- assertEquals(750, BatchlogManager.instance.countAllBatches() - initialAllBatches);
- assertEquals(750, BatchlogManager.instance.getTotalBatchesReplayed() - initialReplayedBatches);
-
- for (int i = 0; i < 1500; i++)
- {
- result = QueryProcessor.executeInternal(String.format("SELECT * FROM \"%s\".\"%s\" WHERE key = intAsBlob(%d)", KEYSPACE1, CF_STANDARD4, i));
- if (i < 500 || i >= 1000 && i < 1200 || i >= 1400 && i < 1450)
- {
- assertEquals(bytes(i), result.one().getBytes("key"));
- assertEquals("name" + i, result.one().getString("name"));
- assertEquals("val" + i, result.one().getString("val"));
- }
- else
- {
- assertTrue("Present at " + i, result.isEmpty());
- }
- }
-
- // Ensure that no stray mutations got somehow applied.
- result = QueryProcessor.executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", KEYSPACE1, CF_STANDARD4));
- assertEquals(750, result.one().getLong("count"));
-
- // Ensure batchlog is left as expected.
- result = QueryProcessor.executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", SystemKeyspace.NAME, SystemKeyspace.BATCHES));
- assertEquals("Count in blog after initial replay", 750, result.one().getLong("count"));
- result = QueryProcessor.executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", SystemKeyspace.NAME, SystemKeyspace.LEGACY_BATCHLOG));
- assertEquals("Count in blog legacy after initial replay ", 0, result.one().getLong("count"));
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/test/unit/org/apache/cassandra/service/BatchlogEndpointFilterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/BatchlogEndpointFilterTest.java b/test/unit/org/apache/cassandra/service/BatchlogEndpointFilterTest.java
deleted file mode 100644
index be33e3f..0000000
--- a/test/unit/org/apache/cassandra/service/BatchlogEndpointFilterTest.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.Collection;
-import java.util.HashSet;
-
-import com.google.common.collect.ImmutableMultimap;
-import com.google.common.collect.Multimap;
-import org.junit.Test;
-import org.junit.matchers.JUnitMatchers;
-
-import org.apache.cassandra.db.BatchlogManager;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-
-public class BatchlogEndpointFilterTest
-{
- private static final String LOCAL = "local";
-
- @Test
- public void shouldSelect2hostsFromNonLocalRacks() throws UnknownHostException
- {
- Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder()
- .put(LOCAL, InetAddress.getByName("0"))
- .put(LOCAL, InetAddress.getByName("00"))
- .put("1", InetAddress.getByName("1"))
- .put("1", InetAddress.getByName("11"))
- .put("2", InetAddress.getByName("2"))
- .put("2", InetAddress.getByName("22"))
- .build();
- Collection<InetAddress> result = new TestEndpointFilter(LOCAL, endpoints).filter();
- assertThat(result.size(), is(2));
- assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("11")));
- assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("22")));
- }
-
- @Test
- public void shouldSelectHostFromLocal() throws UnknownHostException
- {
- Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder()
- .put(LOCAL, InetAddress.getByName("0"))
- .put(LOCAL, InetAddress.getByName("00"))
- .put("1", InetAddress.getByName("1"))
- .build();
- Collection<InetAddress> result = new TestEndpointFilter(LOCAL, endpoints).filter();
- assertThat(result.size(), is(2));
- assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("1")));
- assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("0")));
- }
-
- @Test
- public void shouldReturnAsIsIfNoEnoughEndpoints() throws UnknownHostException
- {
- Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder()
- .put(LOCAL, InetAddress.getByName("0"))
- .build();
- Collection<InetAddress> result = new TestEndpointFilter(LOCAL, endpoints).filter();
- assertThat(result.size(), is(1));
- assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("0")));
- }
-
- @Test
- public void shouldSelectTwoRandomHostsFromSingleOtherRack() throws UnknownHostException
- {
- Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder()
- .put(LOCAL, InetAddress.getByName("0"))
- .put(LOCAL, InetAddress.getByName("00"))
- .put("1", InetAddress.getByName("1"))
- .put("1", InetAddress.getByName("11"))
- .put("1", InetAddress.getByName("111"))
- .build();
- Collection<InetAddress> result = new TestEndpointFilter(LOCAL, endpoints).filter();
- // result should contain random two distinct values
- assertThat(new HashSet<>(result).size(), is(2));
- }
-
- private static class TestEndpointFilter extends BatchlogManager.EndpointFilter
- {
- public TestEndpointFilter(String localRack, Multimap<String, InetAddress> endpoints)
- {
- super(localRack, endpoints);
- }
-
- @Override
- protected boolean isValid(InetAddress input)
- {
- // We will use always alive non-localhost endpoints
- return true;
- }
-
- @Override
- protected int getRandomInt(int bound)
- {
- // We don't need random behavior here
- return bound - 1;
- }
- }
-}
[4/4] cassandra git commit: Merge branch 'cassandra-3.0' into trunk
Posted by al...@apache.org.
Merge branch 'cassandra-3.0' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/79628dd7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/79628dd7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/79628dd7
Branch: refs/heads/trunk
Commit: 79628dd7076c125c9b434e90689057cdf08bb87b
Parents: 2dbf029 53a177a
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Sep 2 08:51:29 2015 +0100
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Sep 2 08:51:29 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
conf/cassandra.yaml | 1 -
.../org/apache/cassandra/batchlog/Batch.java | 155 +++++
.../batchlog/BatchRemoveVerbHandler.java | 31 +
.../batchlog/BatchStoreVerbHandler.java | 32 +
.../cassandra/batchlog/BatchlogManager.java | 554 +++++++++++++++++
.../batchlog/BatchlogManagerMBean.java | 38 ++
.../batchlog/LegacyBatchlogMigrator.java | 196 ++++++
.../org/apache/cassandra/concurrent/Stage.java | 2 -
.../cassandra/concurrent/StageManager.java | 1 -
.../org/apache/cassandra/config/Config.java | 1 -
.../cassandra/config/DatabaseDescriptor.java | 7 +-
.../cql3/statements/BatchStatement.java | 5 -
.../apache/cassandra/db/BatchlogManager.java | 596 -------------------
.../cassandra/db/BatchlogManagerMBean.java | 38 --
.../db/CounterMutationVerbHandler.java | 2 +-
src/java/org/apache/cassandra/db/Mutation.java | 7 +-
.../cassandra/db/MutationVerbHandler.java | 43 +-
.../cassandra/db/ReadRepairVerbHandler.java | 3 +-
.../org/apache/cassandra/db/SystemKeyspace.java | 2 +-
.../org/apache/cassandra/db/WriteResponse.java | 18 +-
.../cassandra/hints/EncodedHintMessage.java | 6 +-
src/java/org/apache/cassandra/hints/Hint.java | 11 +-
.../org/apache/cassandra/hints/HintMessage.java | 14 +-
.../cassandra/hints/LegacyHintsMigrator.java | 2 +-
.../apache/cassandra/net/MessagingService.java | 26 +-
.../cassandra/service/CassandraDaemon.java | 6 +-
.../apache/cassandra/service/StorageProxy.java | 208 ++++---
.../cassandra/service/StorageService.java | 19 +-
.../service/paxos/CommitVerbHandler.java | 6 +-
.../org/apache/cassandra/tools/NodeProbe.java | 4 +-
.../cql3/MaterializedViewLongTest.java | 2 +-
.../apache/cassandra/batchlog/BatchTest.java | 153 +++++
.../batchlog/BatchlogEndpointFilterTest.java | 115 ++++
.../cassandra/batchlog/BatchlogManagerTest.java | 460 ++++++++++++++
.../cassandra/db/BatchlogManagerTest.java | 394 ------------
.../service/BatchlogEndpointFilterTest.java | 117 ----
37 files changed, 1945 insertions(+), 1331 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/79628dd7/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index c916ac9,751b75d..2a1bd1c
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,8 -1,5 +1,9 @@@
+3.2
+ * Add transparent data encryption core classes (CASSANDRA-9945)
+
+
3.0.0-beta2
+ * Improve batchlog write patch (CASSANDRA-9673)
* Re-apply MaterializedView updates on commitlog replay (CASSANDRA-10164)
* Require AbstractType.isByteOrderComparable declaration in constructor (CASSANDRA-9901)
* Avoid digest mismatch on upgrade to 3.0 (CASSANDRA-9554)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/79628dd7/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/79628dd7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
[3/4] cassandra git commit: Improve batchlog write path
Posted by al...@apache.org.
Improve batchlog write path
patch by Stefania Alborghetti; reviewed by Aleksey Yeschenko for
CASSANDRA-9673
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/53a177a9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/53a177a9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/53a177a9
Branch: refs/heads/trunk
Commit: 53a177a9150586e56408f25c959f75110a2997e7
Parents: 5f02f20
Author: Stefania Alborghetti <st...@datastax.com>
Authored: Fri Jul 10 17:03:06 2015 +0800
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Sep 2 08:43:42 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
conf/cassandra.yaml | 1 -
.../org/apache/cassandra/batchlog/Batch.java | 155 +++++
.../batchlog/BatchRemoveVerbHandler.java | 31 +
.../batchlog/BatchStoreVerbHandler.java | 32 +
.../cassandra/batchlog/BatchlogManager.java | 554 +++++++++++++++++
.../batchlog/BatchlogManagerMBean.java | 38 ++
.../batchlog/LegacyBatchlogMigrator.java | 196 ++++++
.../org/apache/cassandra/concurrent/Stage.java | 2 -
.../cassandra/concurrent/StageManager.java | 1 -
.../org/apache/cassandra/config/Config.java | 1 -
.../cassandra/config/DatabaseDescriptor.java | 7 +-
.../cql3/statements/BatchStatement.java | 5 -
.../apache/cassandra/db/BatchlogManager.java | 596 -------------------
.../cassandra/db/BatchlogManagerMBean.java | 38 --
.../db/CounterMutationVerbHandler.java | 2 +-
src/java/org/apache/cassandra/db/Mutation.java | 7 +-
.../cassandra/db/MutationVerbHandler.java | 43 +-
.../cassandra/db/ReadRepairVerbHandler.java | 3 +-
.../org/apache/cassandra/db/SystemKeyspace.java | 2 +-
.../org/apache/cassandra/db/WriteResponse.java | 18 +-
.../cassandra/hints/EncodedHintMessage.java | 6 +-
src/java/org/apache/cassandra/hints/Hint.java | 11 +-
.../org/apache/cassandra/hints/HintMessage.java | 14 +-
.../cassandra/hints/LegacyHintsMigrator.java | 2 +-
.../apache/cassandra/net/MessagingService.java | 26 +-
.../cassandra/service/CassandraDaemon.java | 6 +-
.../apache/cassandra/service/StorageProxy.java | 208 ++++---
.../cassandra/service/StorageService.java | 19 +-
.../service/paxos/CommitVerbHandler.java | 6 +-
.../org/apache/cassandra/tools/NodeProbe.java | 4 +-
.../cql3/MaterializedViewLongTest.java | 2 +-
.../apache/cassandra/batchlog/BatchTest.java | 153 +++++
.../batchlog/BatchlogEndpointFilterTest.java | 115 ++++
.../cassandra/batchlog/BatchlogManagerTest.java | 460 ++++++++++++++
.../cassandra/db/BatchlogManagerTest.java | 394 ------------
.../service/BatchlogEndpointFilterTest.java | 117 ----
37 files changed, 1945 insertions(+), 1331 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fe8f453..751b75d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.0-beta2
+ * Improve batchlog write patch (CASSANDRA-9673)
* Re-apply MaterializedView updates on commitlog replay (CASSANDRA-10164)
* Require AbstractType.isByteOrderComparable declaration in constructor (CASSANDRA-9901)
* Avoid digest mismatch on upgrade to 3.0 (CASSANDRA-9554)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 16108bd..0f8b829 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -355,7 +355,6 @@ seed_provider:
concurrent_reads: 32
concurrent_writes: 32
concurrent_counter_writes: 32
-concurrent_batchlog_writes: 32
# For materialized view writes, as there is a read involved, so this should
# be limited by the less of concurrent reads or concurrent writes.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/batchlog/Batch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/batchlog/Batch.java b/src/java/org/apache/cassandra/batchlog/Batch.java
new file mode 100644
index 0000000..caa2682
--- /dev/null
+++ b/src/java/org/apache/cassandra/batchlog/Batch.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.batchlog;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.UUIDSerializer;
+
+import static org.apache.cassandra.db.TypeSizes.sizeof;
+import static org.apache.cassandra.db.TypeSizes.sizeofVInt;
+
+public final class Batch
+{
+ public static final Serializer serializer = new Serializer();
+
+ public final UUID id;
+ public final long creationTime; // time of batch creation (in microseconds)
+
+ // one of these will always be empty
+ final Collection<Mutation> decodedMutations;
+ final Collection<ByteBuffer> encodedMutations;
+
+ private Batch(UUID id, long creationTime, Collection<Mutation> decodedMutations, Collection<ByteBuffer> encodedMutations)
+ {
+ this.id = id;
+ this.creationTime = creationTime;
+
+ this.decodedMutations = decodedMutations;
+ this.encodedMutations = encodedMutations;
+ }
+
+ /**
+ * Creates a 'local' batch - with all enclosed mutations in decoded form (as Mutation instances)
+ */
+ public static Batch createLocal(UUID id, long creationTime, Collection<Mutation> mutations)
+ {
+ return new Batch(id, creationTime, mutations, Collections.emptyList());
+ }
+
+ /**
+ * Creates a 'remote' batch - with all enclosed mutations in encoded form (as ByteBuffer instances)
+ *
+ * The mutations will always be encoded using the current messaging version.
+ */
+ public static Batch createRemote(UUID id, long creationTime, Collection<ByteBuffer> mutations)
+ {
+ return new Batch(id, creationTime, Collections.<Mutation>emptyList(), mutations);
+ }
+
+ /**
+ * Count of the mutations in the batch.
+ */
+ public int size()
+ {
+ return decodedMutations.size() + encodedMutations.size();
+ }
+
+ static final class Serializer implements IVersionedSerializer<Batch>
+ {
+ public long serializedSize(Batch batch, int version)
+ {
+ assert batch.encodedMutations.isEmpty() : "attempted to serialize a 'remote' batch";
+
+ long size = UUIDSerializer.serializer.serializedSize(batch.id, version);
+ size += sizeof(batch.creationTime);
+
+ size += sizeofVInt(batch.decodedMutations.size());
+ for (Mutation mutation : batch.decodedMutations)
+ {
+ int mutationSize = (int) Mutation.serializer.serializedSize(mutation, version);
+ size += sizeofVInt(mutationSize);
+ size += mutationSize;
+ }
+
+ return size;
+ }
+
+ public void serialize(Batch batch, DataOutputPlus out, int version) throws IOException
+ {
+ assert batch.encodedMutations.isEmpty() : "attempted to serialize a 'remote' batch";
+
+ UUIDSerializer.serializer.serialize(batch.id, out, version);
+ out.writeLong(batch.creationTime);
+
+ out.writeVInt(batch.decodedMutations.size());
+ for (Mutation mutation : batch.decodedMutations)
+ {
+ out.writeVInt(Mutation.serializer.serializedSize(mutation, version));
+ Mutation.serializer.serialize(mutation, out, version);
+ }
+ }
+
+ public Batch deserialize(DataInputPlus in, int version) throws IOException
+ {
+ UUID id = UUIDSerializer.serializer.deserialize(in, version);
+ long creationTime = in.readLong();
+
+ /*
+ * If version doesn't match the current one, we cannot not just read the encoded mutations verbatim,
+ * so we decode them instead, to deal with compatibility.
+ */
+ return version == MessagingService.current_version
+ ? createRemote(id, creationTime, readEncodedMutations(in))
+ : createLocal(id, creationTime, decodeMutations(in, version));
+ }
+
+ private static Collection<ByteBuffer> readEncodedMutations(DataInputPlus in) throws IOException
+ {
+ int count = (int) in.readVInt();
+
+ ArrayList<ByteBuffer> mutations = new ArrayList<>(count);
+ for (int i = 0; i < count; i++)
+ mutations.add(ByteBufferUtil.readWithVIntLength(in));
+
+ return mutations;
+ }
+
+ private static Collection<Mutation> decodeMutations(DataInputPlus in, int version) throws IOException
+ {
+ int count = (int) in.readVInt();
+
+ ArrayList<Mutation> mutations = new ArrayList<>(count);
+ for (int i = 0; i < count; i++)
+ {
+ in.readVInt(); // skip mutation size
+ mutations.add(Mutation.serializer.deserialize(in, version));
+ }
+
+ return mutations;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/batchlog/BatchRemoveVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/batchlog/BatchRemoveVerbHandler.java b/src/java/org/apache/cassandra/batchlog/BatchRemoveVerbHandler.java
new file mode 100644
index 0000000..3c3fcec
--- /dev/null
+++ b/src/java/org/apache/cassandra/batchlog/BatchRemoveVerbHandler.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.batchlog;
+
+import java.util.UUID;
+
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.MessageIn;
+
+public final class BatchRemoveVerbHandler implements IVerbHandler<UUID>
+{
+ public void doVerb(MessageIn<UUID> message, int id)
+ {
+ BatchlogManager.remove(message.payload);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/batchlog/BatchStoreVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/batchlog/BatchStoreVerbHandler.java b/src/java/org/apache/cassandra/batchlog/BatchStoreVerbHandler.java
new file mode 100644
index 0000000..4bc878c
--- /dev/null
+++ b/src/java/org/apache/cassandra/batchlog/BatchStoreVerbHandler.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.batchlog;
+
+import org.apache.cassandra.db.WriteResponse;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessagingService;
+
+public final class BatchStoreVerbHandler implements IVerbHandler<Batch>
+{
+ public void doVerb(MessageIn<Batch> message, int id)
+ {
+ BatchlogManager.store(message.payload);
+ MessagingService.instance().sendReply(WriteResponse.createMessage(), id, message.from);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
new file mode 100644
index 0000000..934ebaa
--- /dev/null
+++ b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.batchlog;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.*;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.*;
+import com.google.common.util.concurrent.RateLimiter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.UUIDType;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.WriteFailureException;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.hints.Hint;
+import org.apache.cassandra.hints.HintsService;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.WriteResponseHandler;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+
+import static com.google.common.collect.Iterables.transform;
+import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
+import static org.apache.cassandra.cql3.QueryProcessor.executeInternalWithPaging;
+
+public class BatchlogManager implements BatchlogManagerMBean
+{
+ public static final String MBEAN_NAME = "org.apache.cassandra.db:type=BatchlogManager";
+ private static final long REPLAY_INTERVAL = 10 * 1000; // milliseconds
+ static final int DEFAULT_PAGE_SIZE = 128;
+
+ private static final Logger logger = LoggerFactory.getLogger(BatchlogManager.class);
+ public static final BatchlogManager instance = new BatchlogManager();
+
+ private volatile long totalBatchesReplayed = 0; // no concurrency protection necessary as only written by replay thread.
+ private volatile UUID lastReplayedUuid = UUIDGen.minTimeUUID(0);
+
+ // Single-thread executor service for scheduling and serializing log replay.
+ private final ScheduledExecutorService batchlogTasks = new DebuggableScheduledThreadPoolExecutor("BatchlogTasks");
+
+ public void start()
+ {
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ try
+ {
+ mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ batchlogTasks.scheduleWithFixedDelay(this::replayFailedBatches,
+ StorageService.RING_DELAY,
+ REPLAY_INTERVAL,
+ TimeUnit.MILLISECONDS);
+ }
+
+ public void shutdown() throws InterruptedException
+ {
+ batchlogTasks.shutdown();
+ batchlogTasks.awaitTermination(60, TimeUnit.SECONDS);
+ }
+
+ public static void remove(UUID id)
+ {
+ new Mutation(PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batches,
+ UUIDType.instance.decompose(id),
+ FBUtilities.timestampMicros(),
+ FBUtilities.nowInSeconds()))
+ .apply();
+ }
+
+ public static void store(Batch batch)
+ {
+ store(batch, true);
+ }
+
+ public static void store(Batch batch, boolean durableWrites)
+ {
+ RowUpdateBuilder builder =
+ new RowUpdateBuilder(SystemKeyspace.Batches, batch.creationTime, batch.id)
+ .clustering()
+ .add("version", MessagingService.current_version);
+
+ for (ByteBuffer mutation : batch.encodedMutations)
+ builder.addListEntry("mutations", mutation);
+
+ for (Mutation mutation : batch.decodedMutations)
+ {
+ try (DataOutputBuffer buffer = new DataOutputBuffer())
+ {
+ Mutation.serializer.serialize(mutation, buffer, MessagingService.current_version);
+ builder.addListEntry("mutations", buffer.buffer());
+ }
+ catch (IOException e)
+ {
+ // shouldn't happen
+ throw new AssertionError(e);
+ }
+ }
+
+ builder.build().apply(durableWrites);
+ }
+
+ @VisibleForTesting
+ public int countAllBatches()
+ {
+ String query = String.format("SELECT count(*) FROM %s.%s", SystemKeyspace.NAME, SystemKeyspace.BATCHES);
+ UntypedResultSet results = executeInternal(query);
+ if (results == null || results.isEmpty())
+ return 0;
+
+ return (int) results.one().getLong("count");
+ }
+
+ public long getTotalBatchesReplayed()
+ {
+ return totalBatchesReplayed;
+ }
+
+ public void forceBatchlogReplay() throws Exception
+ {
+ startBatchlogReplay().get();
+ }
+
+ public Future<?> startBatchlogReplay()
+ {
+ // If a replay is already in progress this request will be executed after it completes.
+ return batchlogTasks.submit(this::replayFailedBatches);
+ }
+
+ void performInitialReplay() throws InterruptedException, ExecutionException
+ {
+ // Invokes initial replay. Used for testing only.
+ batchlogTasks.submit(this::replayFailedBatches).get();
+ }
+
+ private void replayFailedBatches()
+ {
+ logger.debug("Started replayFailedBatches");
+
+ // rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml).
+ // max rate is scaled by the number of nodes in the cluster (same as for HHOM - see CASSANDRA-5272).
+ int throttleInKB = DatabaseDescriptor.getBatchlogReplayThrottleInKB() / StorageService.instance.getTokenMetadata().getAllEndpoints().size();
+ RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024);
+
+ UUID limitUuid = UUIDGen.maxTimeUUID(System.currentTimeMillis() - getBatchlogTimeout());
+ ColumnFamilyStore store = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES);
+ int pageSize = calculatePageSize(store);
+ // There cannot be any live content where token(id) <= token(lastReplayedUuid) as every processed batch is
+ // deleted, but the tombstoned content may still be present in the tables. To avoid walking over it we specify
+ // token(id) > token(lastReplayedUuid) as part of the query.
+ String query = String.format("SELECT id, mutations, version FROM %s.%s WHERE token(id) > token(?) AND token(id) <= token(?)",
+ SystemKeyspace.NAME,
+ SystemKeyspace.BATCHES);
+ UntypedResultSet batches = executeInternalWithPaging(query, pageSize, lastReplayedUuid, limitUuid);
+ processBatchlogEntries(batches, pageSize, rateLimiter);
+ lastReplayedUuid = limitUuid;
+ logger.debug("Finished replayFailedBatches");
+ }
+
+ // read less rows (batches) per page if they are very large
+ static int calculatePageSize(ColumnFamilyStore store)
+ {
+ double averageRowSize = store.getMeanPartitionSize();
+ if (averageRowSize <= 0)
+ return DEFAULT_PAGE_SIZE;
+
+ return (int) Math.max(1, Math.min(DEFAULT_PAGE_SIZE, 4 * 1024 * 1024 / averageRowSize));
+ }
+
+ private void processBatchlogEntries(UntypedResultSet batches, int pageSize, RateLimiter rateLimiter)
+ {
+ int positionInPage = 0;
+ ArrayList<ReplayingBatch> unfinishedBatches = new ArrayList<>(pageSize);
+
+ Set<InetAddress> hintedNodes = new HashSet<>();
+ Set<UUID> replayedBatches = new HashSet<>();
+
+ // Sending out batches for replay without waiting for them, so that one stuck batch doesn't affect others
+ for (UntypedResultSet.Row row : batches)
+ {
+ UUID id = row.getUUID("id");
+ int version = row.getInt("version");
+ try
+ {
+ ReplayingBatch batch = new ReplayingBatch(id, version, row.getList("mutations", BytesType.instance));
+ if (batch.replay(rateLimiter, hintedNodes) > 0)
+ {
+ unfinishedBatches.add(batch);
+ }
+ else
+ {
+ remove(id); // no write mutations were sent (either expired or all CFs involved truncated).
+ ++totalBatchesReplayed;
+ }
+ }
+ catch (IOException e)
+ {
+ logger.warn("Skipped batch replay of {} due to {}", id, e);
+ remove(id);
+ }
+
+ if (++positionInPage == pageSize)
+ {
+ // We have reached the end of a batch. To avoid keeping more than a page of mutations in memory,
+ // finish processing the page before requesting the next row.
+ finishAndClearBatches(unfinishedBatches, hintedNodes, replayedBatches);
+ positionInPage = 0;
+ }
+ }
+
+ finishAndClearBatches(unfinishedBatches, hintedNodes, replayedBatches);
+
+ // to preserve batch guarantees, we must ensure that hints (if any) have made it to disk, before deleting the batches
+ HintsService.instance.flushAndFsyncBlockingly(transform(hintedNodes, StorageService.instance::getHostIdForEndpoint));
+
+ // once all generated hints are fsynced, actually delete the batches
+ replayedBatches.forEach(BatchlogManager::remove);
+ }
+
+ private void finishAndClearBatches(ArrayList<ReplayingBatch> batches, Set<InetAddress> hintedNodes, Set<UUID> replayedBatches)
+ {
+ // schedule hints for timed out deliveries
+ for (ReplayingBatch batch : batches)
+ {
+ batch.finish(hintedNodes);
+ replayedBatches.add(batch.id);
+ }
+
+ totalBatchesReplayed += batches.size();
+ batches.clear();
+ }
+
+ public static long getBatchlogTimeout()
+ {
+ return DatabaseDescriptor.getWriteRpcTimeout() * 2; // enough time for the actual write + BM removal mutation
+ }
+
+ private static class ReplayingBatch
+ {
+ private final UUID id;
+ private final long writtenAt;
+ private final List<Mutation> mutations;
+ private final int replayedBytes;
+
+ private List<ReplayWriteResponseHandler<Mutation>> replayHandlers;
+
+ ReplayingBatch(UUID id, int version, List<ByteBuffer> serializedMutations) throws IOException
+ {
+ this.id = id;
+ this.writtenAt = UUIDGen.unixTimestamp(id);
+ this.mutations = new ArrayList<>(serializedMutations.size());
+ this.replayedBytes = addMutations(version, serializedMutations);
+ }
+
+ public int replay(RateLimiter rateLimiter, Set<InetAddress> hintedNodes) throws IOException
+ {
+ logger.debug("Replaying batch {}", id);
+
+ if (mutations.isEmpty())
+ return 0;
+
+ int gcgs = gcgs(mutations);
+ if (TimeUnit.MILLISECONDS.toSeconds(writtenAt) + gcgs <= FBUtilities.nowInSeconds())
+ return 0;
+
+ replayHandlers = sendReplays(mutations, writtenAt, hintedNodes);
+
+ rateLimiter.acquire(replayedBytes); // acquire afterwards, to not mess up ttl calculation.
+
+ return replayHandlers.size();
+ }
+
+ public void finish(Set<InetAddress> hintedNodes)
+ {
+ for (int i = 0; i < replayHandlers.size(); i++)
+ {
+ ReplayWriteResponseHandler<Mutation> handler = replayHandlers.get(i);
+ try
+ {
+ handler.get();
+ }
+ catch (WriteTimeoutException|WriteFailureException e)
+ {
+ logger.debug("Failed replaying a batched mutation to a node, will write a hint");
+ logger.debug("Failure was : {}", e.getMessage());
+ // writing hints for the rest to hints, starting from i
+ writeHintsForUndeliveredEndpoints(i, hintedNodes);
+ return;
+ }
+ }
+ }
+
+ private int addMutations(int version, List<ByteBuffer> serializedMutations) throws IOException
+ {
+ int ret = 0;
+ for (ByteBuffer serializedMutation : serializedMutations)
+ {
+ ret += serializedMutation.remaining();
+ try (DataInputBuffer in = new DataInputBuffer(serializedMutation, true))
+ {
+ addMutation(Mutation.serializer.deserialize(in, version));
+ }
+ }
+
+ return ret;
+ }
+
+ // Remove CFs that have been truncated since. writtenAt and SystemTable#getTruncatedAt() both return millis.
+ // We don't abort the replay entirely b/c this can be considered a success (truncated is same as delivered then
+ // truncated.
+ private void addMutation(Mutation mutation)
+ {
+ for (UUID cfId : mutation.getColumnFamilyIds())
+ if (writtenAt <= SystemKeyspace.getTruncatedAt(cfId))
+ mutation = mutation.without(cfId);
+
+ if (!mutation.isEmpty())
+ mutations.add(mutation);
+ }
+
+ private void writeHintsForUndeliveredEndpoints(int startFrom, Set<InetAddress> hintedNodes)
+ {
+ int gcgs = gcgs(mutations);
+
+ // expired
+ if (TimeUnit.MILLISECONDS.toSeconds(writtenAt) + gcgs <= FBUtilities.nowInSeconds())
+ return;
+
+ for (int i = startFrom; i < replayHandlers.size(); i++)
+ {
+ ReplayWriteResponseHandler<Mutation> handler = replayHandlers.get(i);
+ Mutation undeliveredMutation = mutations.get(i);
+
+ if (handler != null)
+ {
+ hintedNodes.addAll(handler.undelivered);
+ HintsService.instance.write(transform(handler.undelivered, StorageService.instance::getHostIdForEndpoint),
+ Hint.create(undeliveredMutation, writtenAt));
+ }
+ }
+ }
+
+ private static List<ReplayWriteResponseHandler<Mutation>> sendReplays(List<Mutation> mutations,
+ long writtenAt,
+ Set<InetAddress> hintedNodes)
+ {
+ List<ReplayWriteResponseHandler<Mutation>> handlers = new ArrayList<>(mutations.size());
+ for (Mutation mutation : mutations)
+ {
+ ReplayWriteResponseHandler<Mutation> handler = sendSingleReplayMutation(mutation, writtenAt, hintedNodes);
+ if (handler != null)
+ handlers.add(handler);
+ }
+ return handlers;
+ }
+
+ /**
+ * We try to deliver the mutations to the replicas ourselves if they are alive and only resort to writing hints
+ * when a replica is down or a write request times out.
+ *
+ * @return direct delivery handler to wait on or null, if no live nodes found
+ */
+ private static ReplayWriteResponseHandler<Mutation> sendSingleReplayMutation(final Mutation mutation,
+ long writtenAt,
+ Set<InetAddress> hintedNodes)
+ {
+ Set<InetAddress> liveEndpoints = new HashSet<>();
+ String ks = mutation.getKeyspaceName();
+ Token tk = mutation.key().getToken();
+
+ for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk),
+ StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks)))
+ {
+ if (endpoint.equals(FBUtilities.getBroadcastAddress()))
+ {
+ mutation.apply();
+ }
+ else if (FailureDetector.instance.isAlive(endpoint))
+ {
+ liveEndpoints.add(endpoint); // will try delivering directly instead of writing a hint.
+ }
+ else
+ {
+ hintedNodes.add(endpoint);
+ HintsService.instance.write(StorageService.instance.getHostIdForEndpoint(endpoint),
+ Hint.create(mutation, writtenAt));
+ }
+ }
+
+ if (liveEndpoints.isEmpty())
+ return null;
+
+ ReplayWriteResponseHandler<Mutation> handler = new ReplayWriteResponseHandler<>(liveEndpoints);
+ MessageOut<Mutation> message = mutation.createMessage();
+ for (InetAddress endpoint : liveEndpoints)
+ MessagingService.instance().sendRR(message, endpoint, handler, false);
+ return handler;
+ }
+
+ private static int gcgs(Collection<Mutation> mutations)
+ {
+ int gcgs = Integer.MAX_VALUE;
+ for (Mutation mutation : mutations)
+ gcgs = Math.min(gcgs, mutation.smallestGCGS());
+ return gcgs;
+ }
+
+ /**
+ * A wrapper of WriteResponseHandler that stores the addresses of the endpoints from
+ * which we did not receive a successful reply.
+ */
+ private static class ReplayWriteResponseHandler<T> extends WriteResponseHandler<T>
+ {
+ private final Set<InetAddress> undelivered = Collections.newSetFromMap(new ConcurrentHashMap<>());
+
+ ReplayWriteResponseHandler(Collection<InetAddress> writeEndpoints)
+ {
+ super(writeEndpoints, Collections.<InetAddress>emptySet(), null, null, null, WriteType.UNLOGGED_BATCH);
+ undelivered.addAll(writeEndpoints);
+ }
+
+ @Override
+ protected int totalBlockFor()
+ {
+ return this.naturalEndpoints.size();
+ }
+
+ @Override
+ public void response(MessageIn<T> m)
+ {
+ boolean removed = undelivered.remove(m == null ? FBUtilities.getBroadcastAddress() : m.from);
+ assert removed;
+ super.response(m);
+ }
+ }
+ }
+
+ public static class EndpointFilter
+ {
+ private final String localRack;
+ private final Multimap<String, InetAddress> endpoints;
+
+ public EndpointFilter(String localRack, Multimap<String, InetAddress> endpoints)
+ {
+ this.localRack = localRack;
+ this.endpoints = endpoints;
+ }
+
+ /**
+ * @return list of candidates for batchlog hosting. If possible these will be two nodes from different racks.
+ */
+ public Collection<InetAddress> filter()
+ {
+ // special case for single-node data centers
+ if (endpoints.values().size() == 1)
+ return endpoints.values();
+
+ // strip out dead endpoints and localhost
+ ListMultimap<String, InetAddress> validated = ArrayListMultimap.create();
+ for (Map.Entry<String, InetAddress> entry : endpoints.entries())
+ if (isValid(entry.getValue()))
+ validated.put(entry.getKey(), entry.getValue());
+
+ if (validated.size() <= 2)
+ return validated.values();
+
+ if (validated.size() - validated.get(localRack).size() >= 2)
+ {
+ // we have enough endpoints in other racks
+ validated.removeAll(localRack);
+ }
+
+ if (validated.keySet().size() == 1)
+ {
+ // we have only 1 `other` rack
+ Collection<InetAddress> otherRack = Iterables.getOnlyElement(validated.asMap().values());
+ return Lists.newArrayList(Iterables.limit(otherRack, 2));
+ }
+
+ // randomize which racks we pick from if more than 2 remaining
+ Collection<String> racks;
+ if (validated.keySet().size() == 2)
+ {
+ racks = validated.keySet();
+ }
+ else
+ {
+ racks = Lists.newArrayList(validated.keySet());
+ Collections.shuffle((List<String>) racks);
+ }
+
+ // grab a random member of up to two racks
+ List<InetAddress> result = new ArrayList<>(2);
+ for (String rack : Iterables.limit(racks, 2))
+ {
+ List<InetAddress> rackMembers = validated.get(rack);
+ result.add(rackMembers.get(getRandomInt(rackMembers.size())));
+ }
+
+ return result;
+ }
+
+ @VisibleForTesting
+ protected boolean isValid(InetAddress input)
+ {
+ return !input.equals(FBUtilities.getBroadcastAddress()) && FailureDetector.instance.isAlive(input);
+ }
+
+ @VisibleForTesting
+ protected int getRandomInt(int bound)
+ {
+ return ThreadLocalRandom.current().nextInt(bound);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/batchlog/BatchlogManagerMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/batchlog/BatchlogManagerMBean.java b/src/java/org/apache/cassandra/batchlog/BatchlogManagerMBean.java
new file mode 100644
index 0000000..4dcc9f2
--- /dev/null
+++ b/src/java/org/apache/cassandra/batchlog/BatchlogManagerMBean.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.batchlog;
+
+public interface BatchlogManagerMBean
+{
+ /**
+ * Counts all batches currently in the batchlog.
+ *
+ * @return total batch count
+ */
+ public int countAllBatches();
+
+ /**
+ * @return total count of batches replayed since node start
+ */
+ public long getTotalBatchesReplayed();
+
+ /**
+ * Forces batchlog replay. Returns immediately if replay is already in progress.
+ */
+ public void forceBatchlogReplay() throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/batchlog/LegacyBatchlogMigrator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/batchlog/LegacyBatchlogMigrator.java b/src/java/org/apache/cassandra/batchlog/LegacyBatchlogMigrator.java
new file mode 100644
index 0000000..13ff81a
--- /dev/null
+++ b/src/java/org/apache/cassandra/batchlog/LegacyBatchlogMigrator.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.batchlog;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.UUIDType;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.exceptions.WriteFailureException;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.AbstractWriteResponseHandler;
+import org.apache.cassandra.service.WriteResponseHandler;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+
+public final class LegacyBatchlogMigrator
+{
+ private static final Logger logger = LoggerFactory.getLogger(LegacyBatchlogMigrator.class);
+
+ private LegacyBatchlogMigrator()
+ {
+ // static class
+ }
+
+ @SuppressWarnings("deprecation")
+ public static void migrate()
+ {
+ ColumnFamilyStore store = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_BATCHLOG);
+
+ // nothing to migrate
+ if (store.isEmpty())
+ return;
+
+ logger.info("Migrating legacy batchlog to new storage");
+
+ int convertedBatches = 0;
+ String query = String.format("SELECT id, data, written_at, version FROM %s.%s",
+ SystemKeyspace.NAME,
+ SystemKeyspace.LEGACY_BATCHLOG);
+
+ int pageSize = BatchlogManager.calculatePageSize(store);
+
+ UntypedResultSet rows = QueryProcessor.executeInternalWithPaging(query, pageSize);
+ for (UntypedResultSet.Row row : rows)
+ {
+ if (apply(row, convertedBatches))
+ convertedBatches++;
+ }
+
+ if (convertedBatches > 0)
+ Keyspace.openAndGetStore(SystemKeyspace.LegacyBatchlog).truncateBlocking();
+ }
+
+ @SuppressWarnings("deprecation")
+ public static boolean isLegacyBatchlogMutation(Mutation mutation)
+ {
+ return mutation.getKeyspaceName().equals(SystemKeyspace.NAME)
+ && mutation.getPartitionUpdate(SystemKeyspace.LegacyBatchlog.cfId) != null;
+ }
+
+ @SuppressWarnings("deprecation")
+ public static void handleLegacyMutation(Mutation mutation)
+ {
+ PartitionUpdate update = mutation.getPartitionUpdate(SystemKeyspace.LegacyBatchlog.cfId);
+ logger.debug("Applying legacy batchlog mutation {}", update);
+ update.forEach(row -> apply(UntypedResultSet.Row.fromInternalRow(update.metadata(), update.partitionKey(), row), -1));
+ }
+
+ private static boolean apply(UntypedResultSet.Row row, long counter)
+ {
+ UUID id = row.getUUID("id");
+ long timestamp = id.version() == 1 ? UUIDGen.unixTimestamp(id) : row.getLong("written_at");
+ int version = row.has("version") ? row.getInt("version") : MessagingService.VERSION_12;
+
+ if (id.version() != 1)
+ id = UUIDGen.getTimeUUID(timestamp, counter);
+
+ logger.debug("Converting mutation at {}", timestamp);
+
+ try (DataInputBuffer in = new DataInputBuffer(row.getBytes("data"), false))
+ {
+ int numMutations = in.readInt();
+ List<Mutation> mutations = new ArrayList<>(numMutations);
+ for (int i = 0; i < numMutations; i++)
+ mutations.add(Mutation.serializer.deserialize(in, version));
+
+ BatchlogManager.store(Batch.createLocal(id, TimeUnit.MILLISECONDS.toMicros(timestamp), mutations));
+ return true;
+ }
+ catch (Throwable t)
+ {
+ logger.error("Failed to convert mutation {} at timestamp {}", id, timestamp, t);
+ return false;
+ }
+ }
+
+ public static void syncWriteToBatchlog(WriteResponseHandler<?> handler, Batch batch, Collection<InetAddress> endpoints)
+ throws WriteTimeoutException, WriteFailureException
+ {
+ for (InetAddress target : endpoints)
+ {
+ logger.debug("Sending legacy batchlog store request {} to {} for {} mutations", batch.id, target, batch.size());
+
+ int targetVersion = MessagingService.instance().getVersion(target);
+ MessagingService.instance().sendRR(getStoreMutation(batch, targetVersion).createMessage(MessagingService.Verb.MUTATION),
+ target,
+ handler,
+ false);
+ }
+ }
+
+ public static void asyncRemoveFromBatchlog(Collection<InetAddress> endpoints, UUID uuid)
+ {
+ AbstractWriteResponseHandler<IMutation> handler = new WriteResponseHandler<>(endpoints,
+ Collections.<InetAddress>emptyList(),
+ ConsistencyLevel.ANY,
+ Keyspace.open(SystemKeyspace.NAME),
+ null,
+ WriteType.SIMPLE);
+ Mutation mutation = getRemoveMutation(uuid);
+
+ for (InetAddress target : endpoints)
+ {
+ logger.debug("Sending legacy batchlog remove request {} to {}", uuid, target);
+ MessagingService.instance().sendRR(mutation.createMessage(MessagingService.Verb.MUTATION), target, handler, false);
+ }
+ }
+
+ static void store(Batch batch, int version)
+ {
+ getStoreMutation(batch, version).apply();
+ }
+
+ @SuppressWarnings("deprecation")
+ static Mutation getStoreMutation(Batch batch, int version)
+ {
+ return new RowUpdateBuilder(SystemKeyspace.LegacyBatchlog, batch.creationTime, batch.id)
+ .clustering()
+ .add("written_at", new Date(batch.creationTime / 1000))
+ .add("data", getSerializedMutations(version, batch.decodedMutations))
+ .add("version", version)
+ .build();
+ }
+
+ @SuppressWarnings("deprecation")
+ private static Mutation getRemoveMutation(UUID uuid)
+ {
+ return new Mutation(PartitionUpdate.fullPartitionDelete(SystemKeyspace.LegacyBatchlog,
+ UUIDType.instance.decompose(uuid),
+ FBUtilities.timestampMicros(),
+ FBUtilities.nowInSeconds()));
+ }
+
+ private static ByteBuffer getSerializedMutations(int version, Collection<Mutation> mutations)
+ {
+ try (DataOutputBuffer buf = new DataOutputBuffer())
+ {
+ buf.writeInt(mutations.size());
+ for (Mutation mutation : mutations)
+ Mutation.serializer.serialize(mutation, buf, version);
+ return buf.buffer();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/concurrent/Stage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/Stage.java b/src/java/org/apache/cassandra/concurrent/Stage.java
index e91c515..a57587c 100644
--- a/src/java/org/apache/cassandra/concurrent/Stage.java
+++ b/src/java/org/apache/cassandra/concurrent/Stage.java
@@ -27,7 +27,6 @@ public enum Stage
READ,
MUTATION,
COUNTER_MUTATION,
- BATCHLOG_MUTATION,
MATERIALIZED_VIEW_MUTATION,
GOSSIP,
REQUEST_RESPONSE,
@@ -62,7 +61,6 @@ public enum Stage
return "internal";
case MUTATION:
case COUNTER_MUTATION:
- case BATCHLOG_MUTATION:
case MATERIALIZED_VIEW_MUTATION:
case READ:
case REQUEST_RESPONSE:
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/concurrent/StageManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/StageManager.java b/src/java/org/apache/cassandra/concurrent/StageManager.java
index ca83829..ee1fbe5 100644
--- a/src/java/org/apache/cassandra/concurrent/StageManager.java
+++ b/src/java/org/apache/cassandra/concurrent/StageManager.java
@@ -47,7 +47,6 @@ public class StageManager
{
stages.put(Stage.MUTATION, multiThreadedLowSignalStage(Stage.MUTATION, getConcurrentWriters()));
stages.put(Stage.COUNTER_MUTATION, multiThreadedLowSignalStage(Stage.COUNTER_MUTATION, getConcurrentCounterWriters()));
- stages.put(Stage.BATCHLOG_MUTATION, multiThreadedLowSignalStage(Stage.BATCHLOG_MUTATION, getConcurrentBatchlogWriters()));
stages.put(Stage.MATERIALIZED_VIEW_MUTATION, multiThreadedLowSignalStage(Stage.MATERIALIZED_VIEW_MUTATION, getConcurrentMaterializedViewWriters()));
stages.put(Stage.READ, multiThreadedLowSignalStage(Stage.READ, getConcurrentReaders()));
stages.put(Stage.REQUEST_RESPONSE, multiThreadedLowSignalStage(Stage.REQUEST_RESPONSE, FBUtilities.getAvailableProcessors()));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 9d55fc8..22b09d3 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -93,7 +93,6 @@ public class Config
public Integer concurrent_reads = 32;
public Integer concurrent_writes = 32;
public Integer concurrent_counter_writes = 32;
- public Integer concurrent_batchlog_writes = 32;
public Integer concurrent_materialized_view_writes = 32;
@Deprecated
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 4e13911..31a4e9d 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1080,8 +1080,9 @@ public class DatabaseDescriptor
case PAXOS_COMMIT:
case PAXOS_PREPARE:
case PAXOS_PROPOSE:
- case BATCHLOG_MUTATION:
case HINT:
+ case BATCH_STORE:
+ case BATCH_REMOVE:
return getWriteRpcTimeout();
case COUNTER_MUTATION:
return getCounterWriteRpcTimeout();
@@ -1128,10 +1129,6 @@ public class DatabaseDescriptor
return conf.concurrent_counter_writes;
}
- public static int getConcurrentBatchlogWriters()
- {
- return conf.concurrent_batchlog_writes;
- }
public static int getConcurrentMaterializedViewWriters()
{
return conf.concurrent_materialized_view_writes;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 5de4b6c..c8482b3 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -524,11 +524,6 @@ public class BatchStatement implements CQLStatement
}
}
- public interface BatchVariables
- {
- public List<ByteBuffer> getVariablesForStatement(int statementInBatch);
- }
-
public String toString()
{
return String.format("BatchStatement(type=%s, statements=%s)", type, statements);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java
deleted file mode 100644
index de85925..0000000
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ /dev/null
@@ -1,596 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.net.InetAddress;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.*;
-
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.*;
-import com.google.common.util.concurrent.RateLimiter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.cql3.UntypedResultSet;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.db.marshal.UUIDType;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.exceptions.WriteFailureException;
-import org.apache.cassandra.exceptions.WriteTimeoutException;
-import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.hints.Hint;
-import org.apache.cassandra.hints.HintsService;
-import org.apache.cassandra.io.util.DataInputBuffer;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.net.MessageOut;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.WriteResponseHandler;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.UUIDGen;
-
-import static com.google.common.collect.Iterables.transform;
-import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
-import static org.apache.cassandra.cql3.QueryProcessor.executeInternalWithPaging;
-
-public class BatchlogManager implements BatchlogManagerMBean
-{
- public static final String MBEAN_NAME = "org.apache.cassandra.db:type=BatchlogManager";
- private static final long REPLAY_INTERVAL = 10 * 1000; // milliseconds
- private static final int DEFAULT_PAGE_SIZE = 128;
-
- private static final Logger logger = LoggerFactory.getLogger(BatchlogManager.class);
- public static final BatchlogManager instance = new BatchlogManager();
-
- private volatile long totalBatchesReplayed = 0; // no concurrency protection necessary as only written by replay thread.
- private volatile UUID lastReplayedUuid = UUIDGen.minTimeUUID(0);
-
- // Single-thread executor service for scheduling and serializing log replay.
- private static final ScheduledExecutorService batchlogTasks = new DebuggableScheduledThreadPoolExecutor("BatchlogTasks");
-
- public void start()
- {
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- try
- {
- mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
-
- batchlogTasks.schedule(this::replayInitially, StorageService.RING_DELAY, TimeUnit.MILLISECONDS);
-
- batchlogTasks.scheduleWithFixedDelay(this::replayAllFailedBatches,
- StorageService.RING_DELAY + REPLAY_INTERVAL,
- REPLAY_INTERVAL,
- TimeUnit.MILLISECONDS);
- }
-
- private void replayInitially()
- {
- // Initial run must take care of non-time-uuid batches as written by Version 1.2.
- convertOldBatchEntries();
-
- replayAllFailedBatches();
- }
-
- public static void shutdown() throws InterruptedException
- {
- batchlogTasks.shutdown();
- batchlogTasks.awaitTermination(60, TimeUnit.SECONDS);
- }
-
- public int countAllBatches()
- {
- String query = String.format("SELECT count(*) FROM %s.%s", SystemKeyspace.NAME, SystemKeyspace.BATCHES);
- UntypedResultSet results = executeInternal(query);
- if (results.isEmpty())
- return 0;
- return (int) results.one().getLong("count");
- }
-
- public long getTotalBatchesReplayed()
- {
- return totalBatchesReplayed;
- }
-
- public void forceBatchlogReplay() throws Exception
- {
- startBatchlogReplay().get();
- }
-
- public Future<?> startBatchlogReplay()
- {
- // If a replay is already in progress this request will be executed after it completes.
- return batchlogTasks.submit(this::replayAllFailedBatches);
- }
-
- void performInitialReplay() throws InterruptedException, ExecutionException
- {
- // Invokes initial replay. Used for testing only.
- batchlogTasks.submit(this::replayInitially).get();
- }
-
- public static Mutation getBatchlogMutationFor(Collection<Mutation> mutations, UUID uuid, int version)
- {
- return new RowUpdateBuilder(SystemKeyspace.Batches, FBUtilities.timestampMicros(), uuid)
- .clustering()
- .add("data", serializeMutations(mutations, version))
- .add("version", version)
- .build();
- }
-
- @VisibleForTesting
- static ByteBuffer serializeMutations(Collection<Mutation> mutations, int version)
- {
- try (DataOutputBuffer buf = new DataOutputBuffer())
- {
- buf.writeInt(mutations.size());
- for (Mutation mutation : mutations)
- Mutation.serializer.serialize(mutation, buf, version);
- return buf.buffer();
- }
- catch (IOException e)
- {
- throw new AssertionError(); // cannot happen.
- }
- }
-
- private void replayAllFailedBatches()
- {
- logger.debug("Started replayAllFailedBatches");
-
- // rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml).
- // max rate is scaled by the number of nodes in the cluster (same as for HHOM - see CASSANDRA-5272).
- int throttleInKB = DatabaseDescriptor.getBatchlogReplayThrottleInKB() / StorageService.instance.getTokenMetadata().getAllEndpoints().size();
- RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024);
-
- UUID limitUuid = UUIDGen.maxTimeUUID(System.currentTimeMillis() - getBatchlogTimeout());
- int pageSize = calculatePageSize();
- // There cannot be any live content where token(id) <= token(lastReplayedUuid) as every processed batch is
- // deleted, but the tombstoned content may still be present in the tables. To avoid walking over it we specify
- // token(id) > token(lastReplayedUuid) as part of the query.
- String query = String.format("SELECT id, data, version FROM %s.%s WHERE token(id) > token(?) AND token(id) <= token(?)",
- SystemKeyspace.NAME,
- SystemKeyspace.BATCHES);
- UntypedResultSet batches = executeInternalWithPaging(query, pageSize, lastReplayedUuid, limitUuid);
- processBatchlogEntries(batches, pageSize, rateLimiter);
- lastReplayedUuid = limitUuid;
- logger.debug("Finished replayAllFailedBatches");
- }
-
- // read less rows (batches) per page if they are very large
- private static int calculatePageSize()
- {
- ColumnFamilyStore store = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES);
- double averageRowSize = store.getMeanPartitionSize();
- if (averageRowSize <= 0)
- return DEFAULT_PAGE_SIZE;
-
- return (int) Math.max(1, Math.min(DEFAULT_PAGE_SIZE, 4 * 1024 * 1024 / averageRowSize));
- }
-
- private static void deleteBatch(UUID id)
- {
- Mutation mutation = new Mutation(
- PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batches,
- UUIDType.instance.decompose(id),
- FBUtilities.timestampMicros(),
- FBUtilities.nowInSeconds()));
- mutation.apply();
- }
-
- private void processBatchlogEntries(UntypedResultSet batches, int pageSize, RateLimiter rateLimiter)
- {
- int positionInPage = 0;
- ArrayList<Batch> unfinishedBatches = new ArrayList<>(pageSize);
-
- Set<InetAddress> hintedNodes = new HashSet<>();
- Set<UUID> replayedBatches = new HashSet<>();
-
- // Sending out batches for replay without waiting for them, so that one stuck batch doesn't affect others
- for (UntypedResultSet.Row row : batches)
- {
- UUID id = row.getUUID("id");
- int version = row.getInt("version");
- Batch batch = new Batch(id, row.getBytes("data"), version);
- try
- {
- if (batch.replay(rateLimiter, hintedNodes) > 0)
- {
- unfinishedBatches.add(batch);
- }
- else
- {
- deleteBatch(id); // no write mutations were sent (either expired or all CFs involved truncated).
- ++totalBatchesReplayed;
- }
- }
- catch (IOException e)
- {
- logger.warn("Skipped batch replay of {} due to {}", id, e);
- deleteBatch(id);
- }
-
- if (++positionInPage == pageSize)
- {
- // We have reached the end of a batch. To avoid keeping more than a page of mutations in memory,
- // finish processing the page before requesting the next row.
- finishAndClearBatches(unfinishedBatches, hintedNodes, replayedBatches);
- positionInPage = 0;
- }
- }
-
- finishAndClearBatches(unfinishedBatches, hintedNodes, replayedBatches);
-
- // to preserve batch guarantees, we must ensure that hints (if any) have made it to disk, before deleting the batches
- HintsService.instance.flushAndFsyncBlockingly(transform(hintedNodes, StorageService.instance::getHostIdForEndpoint));
-
- // once all generated hints are fsynced, actually delete the batches
- replayedBatches.forEach(BatchlogManager::deleteBatch);
- }
-
- private void finishAndClearBatches(ArrayList<Batch> batches, Set<InetAddress> hintedNodes, Set<UUID> replayedBatches)
- {
- // schedule hints for timed out deliveries
- for (Batch batch : batches)
- {
- batch.finish(hintedNodes);
- replayedBatches.add(batch.id);
- }
-
- totalBatchesReplayed += batches.size();
- batches.clear();
- }
-
- public static long getBatchlogTimeout()
- {
- return DatabaseDescriptor.getWriteRpcTimeout() * 2; // enough time for the actual write + BM removal mutation
- }
-
- private static class Batch
- {
- private final UUID id;
- private final long writtenAt;
- private final ByteBuffer data;
- private final int version;
-
- private List<ReplayWriteResponseHandler<Mutation>> replayHandlers;
-
- Batch(UUID id, ByteBuffer data, int version)
- {
- this.id = id;
- this.writtenAt = UUIDGen.unixTimestamp(id);
- this.data = data;
- this.version = version;
- }
-
- public int replay(RateLimiter rateLimiter, Set<InetAddress> hintedNodes) throws IOException
- {
- logger.debug("Replaying batch {}", id);
-
- List<Mutation> mutations = replayingMutations();
-
- if (mutations.isEmpty())
- return 0;
-
- int gcgs = gcgs(mutations);
- if (TimeUnit.MILLISECONDS.toSeconds(writtenAt) + gcgs <= FBUtilities.nowInSeconds())
- return 0;
-
- replayHandlers = sendReplays(mutations, writtenAt, hintedNodes);
-
- rateLimiter.acquire(data.remaining()); // acquire afterwards, to not mess up ttl calculation.
-
- return replayHandlers.size();
- }
-
- public void finish(Set<InetAddress> hintedNodes)
- {
- for (int i = 0; i < replayHandlers.size(); i++)
- {
- ReplayWriteResponseHandler<Mutation> handler = replayHandlers.get(i);
- try
- {
- handler.get();
- }
- catch (WriteTimeoutException|WriteFailureException e)
- {
- logger.debug("Failed replaying a batched mutation to a node, will write a hint");
- logger.debug("Failure was : {}", e.getMessage());
- // writing hints for the rest to hints, starting from i
- writeHintsForUndeliveredEndpoints(i, hintedNodes);
- return;
- }
- }
- }
-
- private List<Mutation> replayingMutations() throws IOException
- {
- DataInputPlus in = new DataInputBuffer(data, true);
- int size = in.readInt();
- List<Mutation> mutations = new ArrayList<>(size);
- for (int i = 0; i < size; i++)
- {
- Mutation mutation = Mutation.serializer.deserialize(in, version);
-
- // Remove CFs that have been truncated since. writtenAt and SystemTable#getTruncatedAt() both return millis.
- // We don't abort the replay entirely b/c this can be considered a success (truncated is same as delivered then
- // truncated.
- for (UUID cfId : mutation.getColumnFamilyIds())
- if (writtenAt <= SystemKeyspace.getTruncatedAt(cfId))
- mutation = mutation.without(cfId);
-
- if (!mutation.isEmpty())
- mutations.add(mutation);
- }
- return mutations;
- }
-
- private void writeHintsForUndeliveredEndpoints(int startFrom, Set<InetAddress> hintedNodes)
- {
- try
- {
- // Here we deserialize mutations 2nd time from byte buffer.
- // but this is ok, because timeout on batch direct delivery is rare
- // (it can happen only several seconds until node is marked dead)
- // so trading some cpu to keep less objects
- List<Mutation> replayingMutations = replayingMutations();
- for (int i = startFrom; i < replayHandlers.size(); i++)
- {
- Mutation undeliveredMutation = replayingMutations.get(i);
- int gcgs = gcgs(replayingMutations);
- ReplayWriteResponseHandler<Mutation> handler = replayHandlers.get(i);
-
- if (TimeUnit.MILLISECONDS.toSeconds(writtenAt) + gcgs > FBUtilities.nowInSeconds() && handler != null)
- {
- hintedNodes.addAll(handler.undelivered);
- HintsService.instance.write(transform(handler.undelivered, StorageService.instance::getHostIdForEndpoint),
- Hint.create(undeliveredMutation, writtenAt));
- }
- }
- }
- catch (IOException e)
- {
- logger.error("Cannot schedule hints for undelivered batch", e);
- }
- }
-
- private static List<ReplayWriteResponseHandler<Mutation>> sendReplays(List<Mutation> mutations,
- long writtenAt,
- Set<InetAddress> hintedNodes)
- {
- List<ReplayWriteResponseHandler<Mutation>> handlers = new ArrayList<>(mutations.size());
- for (Mutation mutation : mutations)
- {
- ReplayWriteResponseHandler<Mutation> handler = sendSingleReplayMutation(mutation, writtenAt, hintedNodes);
- if (handler != null)
- handlers.add(handler);
- }
- return handlers;
- }
-
- /**
- * We try to deliver the mutations to the replicas ourselves if they are alive and only resort to writing hints
- * when a replica is down or a write request times out.
- *
- * @return direct delivery handler to wait on or null, if no live nodes found
- */
- private static ReplayWriteResponseHandler<Mutation> sendSingleReplayMutation(final Mutation mutation,
- long writtenAt,
- Set<InetAddress> hintedNodes)
- {
- Set<InetAddress> liveEndpoints = new HashSet<>();
- String ks = mutation.getKeyspaceName();
- Token tk = mutation.key().getToken();
-
- for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk),
- StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks)))
- {
- if (endpoint.equals(FBUtilities.getBroadcastAddress()))
- {
- mutation.apply();
- }
- else if (FailureDetector.instance.isAlive(endpoint))
- {
- liveEndpoints.add(endpoint); // will try delivering directly instead of writing a hint.
- }
- else
- {
- hintedNodes.add(endpoint);
- HintsService.instance.write(StorageService.instance.getHostIdForEndpoint(endpoint),
- Hint.create(mutation, writtenAt));
- }
- }
-
- if (liveEndpoints.isEmpty())
- return null;
-
- ReplayWriteResponseHandler<Mutation> handler = new ReplayWriteResponseHandler<>(liveEndpoints);
- MessageOut<Mutation> message = mutation.createMessage();
- for (InetAddress endpoint : liveEndpoints)
- MessagingService.instance().sendRR(message, endpoint, handler, false);
- return handler;
- }
-
- private static int gcgs(Collection<Mutation> mutations)
- {
- int gcgs = Integer.MAX_VALUE;
- for (Mutation mutation : mutations)
- gcgs = Math.min(gcgs, mutation.smallestGCGS());
- return gcgs;
- }
-
- /**
- * A wrapper of WriteResponseHandler that stores the addresses of the endpoints from
- * which we did not receive a successful reply.
- */
- private static class ReplayWriteResponseHandler<T> extends WriteResponseHandler<T>
- {
- private final Set<InetAddress> undelivered = Collections.newSetFromMap(new ConcurrentHashMap<>());
-
- ReplayWriteResponseHandler(Collection<InetAddress> writeEndpoints)
- {
- super(writeEndpoints, Collections.<InetAddress>emptySet(), null, null, null, WriteType.UNLOGGED_BATCH);
- undelivered.addAll(writeEndpoints);
- }
-
- @Override
- protected int totalBlockFor()
- {
- return this.naturalEndpoints.size();
- }
-
- @Override
- public void response(MessageIn<T> m)
- {
- boolean removed = undelivered.remove(m == null ? FBUtilities.getBroadcastAddress() : m.from);
- assert removed;
- super.response(m);
- }
- }
- }
-
- @SuppressWarnings("deprecation")
- private static void convertOldBatchEntries()
- {
- logger.debug("Started convertOldBatchEntries");
-
- String query = String.format("SELECT id, data, written_at, version FROM %s.%s",
- SystemKeyspace.NAME,
- SystemKeyspace.LEGACY_BATCHLOG);
- UntypedResultSet batches = executeInternalWithPaging(query, DEFAULT_PAGE_SIZE);
- int convertedBatches = 0;
- for (UntypedResultSet.Row row : batches)
- {
- UUID id = row.getUUID("id");
- long timestamp = row.getLong("written_at");
- int version = row.has("version") ? row.getInt("version") : MessagingService.VERSION_12;
- logger.debug("Converting mutation at " + timestamp);
-
- UUID newId = id;
- if (id.version() != 1 || timestamp != UUIDGen.unixTimestamp(id))
- newId = UUIDGen.getTimeUUID(timestamp, convertedBatches);
- ++convertedBatches;
-
- Mutation addRow = new RowUpdateBuilder(SystemKeyspace.Batches,
- FBUtilities.timestampMicros(),
- newId)
- .clustering()
- .add("data", row.getBytes("data"))
- .add("version", version)
- .build();
-
- addRow.apply();
- }
- if (convertedBatches > 0)
- Keyspace.openAndGetStore(SystemKeyspace.LegacyBatchlog).truncateBlocking();
- // cleanup will be called after replay
- logger.debug("Finished convertOldBatchEntries");
- }
-
- public static class EndpointFilter
- {
- private final String localRack;
- private final Multimap<String, InetAddress> endpoints;
-
- public EndpointFilter(String localRack, Multimap<String, InetAddress> endpoints)
- {
- this.localRack = localRack;
- this.endpoints = endpoints;
- }
-
- /**
- * @return list of candidates for batchlog hosting. If possible these will be two nodes from different racks.
- */
- public Collection<InetAddress> filter()
- {
- // special case for single-node data centers
- if (endpoints.values().size() == 1)
- return endpoints.values();
-
- // strip out dead endpoints and localhost
- ListMultimap<String, InetAddress> validated = ArrayListMultimap.create();
- for (Map.Entry<String, InetAddress> entry : endpoints.entries())
- if (isValid(entry.getValue()))
- validated.put(entry.getKey(), entry.getValue());
-
- if (validated.size() <= 2)
- return validated.values();
-
- if (validated.size() - validated.get(localRack).size() >= 2)
- {
- // we have enough endpoints in other racks
- validated.removeAll(localRack);
- }
-
- if (validated.keySet().size() == 1)
- {
- // we have only 1 `other` rack
- Collection<InetAddress> otherRack = Iterables.getOnlyElement(validated.asMap().values());
- return Lists.newArrayList(Iterables.limit(otherRack, 2));
- }
-
- // randomize which racks we pick from if more than 2 remaining
- Collection<String> racks;
- if (validated.keySet().size() == 2)
- {
- racks = validated.keySet();
- }
- else
- {
- racks = Lists.newArrayList(validated.keySet());
- Collections.shuffle((List<String>) racks);
- }
-
- // grab a random member of up to two racks
- List<InetAddress> result = new ArrayList<>(2);
- for (String rack : Iterables.limit(racks, 2))
- {
- List<InetAddress> rackMembers = validated.get(rack);
- result.add(rackMembers.get(getRandomInt(rackMembers.size())));
- }
-
- return result;
- }
-
- @VisibleForTesting
- protected boolean isValid(InetAddress input)
- {
- return !input.equals(FBUtilities.getBroadcastAddress()) && FailureDetector.instance.isAlive(input);
- }
-
- @VisibleForTesting
- protected int getRandomInt(int bound)
- {
- return ThreadLocalRandom.current().nextInt(bound);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/db/BatchlogManagerMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManagerMBean.java b/src/java/org/apache/cassandra/db/BatchlogManagerMBean.java
deleted file mode 100644
index a688117..0000000
--- a/src/java/org/apache/cassandra/db/BatchlogManagerMBean.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-public interface BatchlogManagerMBean
-{
- /**
- * Counts all batches currently in the batchlog.
- *
- * @return total batch count
- */
- public int countAllBatches();
-
- /**
- * @return total count of batches replayed since node start
- */
- public long getTotalBatchesReplayed();
-
- /**
- * Forces batchlog replay. Returns immediately if replay is already in progress.
- */
- public void forceBatchlogReplay() throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
index d9ee38a..e349bfc 100644
--- a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
@@ -49,7 +49,7 @@ public class CounterMutationVerbHandler implements IVerbHandler<CounterMutation>
{
public void run()
{
- MessagingService.instance().sendReply(new WriteResponse().createMessage(), id, message.from);
+ MessagingService.instance().sendReply(WriteResponse.createMessage(), id, message.from);
}
});
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/db/Mutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java
index 6e78b0e..da7d13d 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -58,7 +58,7 @@ public class Mutation implements IMutation
public final long createdAt = System.currentTimeMillis();
public Mutation(String keyspaceName, DecoratedKey key)
{
- this(keyspaceName, key, new HashMap<UUID, PartitionUpdate>());
+ this(keyspaceName, key, new HashMap<>());
}
public Mutation(PartitionUpdate update)
@@ -201,6 +201,11 @@ public class Mutation implements IMutation
ks.apply(this, ks.getMetadata().params.durableWrites);
}
+ public void apply(boolean durableWrites)
+ {
+ Keyspace.open(keyspaceName).apply(this, durableWrites);
+ }
+
public void applyUnsafe()
{
Keyspace.open(keyspaceName).apply(this, false);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/db/MutationVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/MutationVerbHandler.java b/src/java/org/apache/cassandra/db/MutationVerbHandler.java
index 640e45f..d4670a2 100644
--- a/src/java/org/apache/cassandra/db/MutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/MutationVerbHandler.java
@@ -18,10 +18,10 @@
package org.apache.cassandra.db;
import java.io.DataInputStream;
-import java.io.IOError;
import java.io.IOException;
import java.net.InetAddress;
+import org.apache.cassandra.batchlog.LegacyBatchlogMigrator;
import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.net.*;
@@ -29,31 +29,32 @@ import org.apache.cassandra.tracing.Tracing;
public class MutationVerbHandler implements IVerbHandler<Mutation>
{
- private static final boolean TEST_FAIL_WRITES = System.getProperty("cassandra.test.fail_writes", "false").equalsIgnoreCase("true");
-
public void doVerb(MessageIn<Mutation> message, int id) throws IOException
{
- // Check if there were any forwarding headers in this message
- byte[] from = message.parameters.get(Mutation.FORWARD_FROM);
- InetAddress replyTo;
- if (from == null)
- {
- replyTo = message.from;
- byte[] forwardBytes = message.parameters.get(Mutation.FORWARD_TO);
- if (forwardBytes != null)
- forwardToLocalNodes(message.payload, message.verb, forwardBytes, message.from);
- }
- else
- {
- replyTo = InetAddress.getByAddress(from);
- }
+ // Check if there were any forwarding headers in this message
+ byte[] from = message.parameters.get(Mutation.FORWARD_FROM);
+ InetAddress replyTo;
+ if (from == null)
+ {
+ replyTo = message.from;
+ byte[] forwardBytes = message.parameters.get(Mutation.FORWARD_TO);
+ if (forwardBytes != null)
+ forwardToLocalNodes(message.payload, message.verb, forwardBytes, message.from);
+ }
+ else
+ {
+ replyTo = InetAddress.getByAddress(from);
+ }
try
{
- message.payload.apply();
- WriteResponse response = new WriteResponse();
+ if (message.version < MessagingService.VERSION_30 && LegacyBatchlogMigrator.isLegacyBatchlogMutation(message.payload))
+ LegacyBatchlogMigrator.handleLegacyMutation(message.payload);
+ else
+ message.payload.apply();
+
Tracing.trace("Enqueuing response to {}", replyTo);
- MessagingService.instance().sendReply(response.createMessage(), id, replyTo);
+ MessagingService.instance().sendReply(WriteResponse.createMessage(), id, replyTo);
}
catch (WriteTimeoutException wto)
{
@@ -65,7 +66,7 @@ public class MutationVerbHandler implements IVerbHandler<Mutation>
* Older version (< 1.0) will not send this message at all, hence we don't
* need to check the version of the data.
*/
- private void forwardToLocalNodes(Mutation mutation, MessagingService.Verb verb, byte[] forwardBytes, InetAddress from) throws IOException
+ private static void forwardToLocalNodes(Mutation mutation, MessagingService.Verb verb, byte[] forwardBytes, InetAddress from) throws IOException
{
try (DataInputStream in = new DataInputStream(new FastByteArrayInputStream(forwardBytes)))
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java b/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
index 849ac70..2e499e7 100644
--- a/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
@@ -26,7 +26,6 @@ public class ReadRepairVerbHandler implements IVerbHandler<Mutation>
public void doVerb(MessageIn<Mutation> message, int id)
{
message.payload.apply();
- WriteResponse response = new WriteResponse();
- MessagingService.instance().sendReply(response.createMessage(), id, message.from);
+ MessagingService.instance().sendReply(WriteResponse.createMessage(), id, message.from);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index fb9eb48..cf8e14d 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -113,7 +113,7 @@ public final class SystemKeyspace
"batches awaiting replay",
"CREATE TABLE %s ("
+ "id timeuuid,"
- + "data blob,"
+ + "mutations list<blob>,"
+ "version int,"
+ "PRIMARY KEY ((id)))")
.copy(new LocalPartitioner(TimeUUIDType.instance))
[2/4] cassandra git commit: Improve batchlog write path
Posted by al...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/db/WriteResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/WriteResponse.java b/src/java/org/apache/cassandra/db/WriteResponse.java
index 824368e..0dddaab 100644
--- a/src/java/org/apache/cassandra/db/WriteResponse.java
+++ b/src/java/org/apache/cassandra/db/WriteResponse.java
@@ -28,16 +28,22 @@ import org.apache.cassandra.net.MessagingService;
/*
* This empty response is sent by a replica to inform the coordinator that the write succeeded
*/
-public class WriteResponse
+public final class WriteResponse
{
- public static final WriteResponseSerializer serializer = new WriteResponseSerializer();
+ public static final Serializer serializer = new Serializer();
- public MessageOut<WriteResponse> createMessage()
+ private static final WriteResponse instance = new WriteResponse();
+
+ private WriteResponse()
+ {
+ }
+
+ public static MessageOut<WriteResponse> createMessage()
{
- return new MessageOut<WriteResponse>(MessagingService.Verb.REQUEST_RESPONSE, this, serializer);
+ return new MessageOut<>(MessagingService.Verb.REQUEST_RESPONSE, instance, serializer);
}
- public static class WriteResponseSerializer implements IVersionedSerializer<WriteResponse>
+ public static class Serializer implements IVersionedSerializer<WriteResponse>
{
public void serialize(WriteResponse wm, DataOutputPlus out, int version) throws IOException
{
@@ -45,7 +51,7 @@ public class WriteResponse
public WriteResponse deserialize(DataInputPlus in, int version) throws IOException
{
- return new WriteResponse();
+ return instance;
}
public long serializedSize(WriteResponse response, int version)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/hints/EncodedHintMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/EncodedHintMessage.java b/src/java/org/apache/cassandra/hints/EncodedHintMessage.java
index 2797495..56727fc 100644
--- a/src/java/org/apache/cassandra/hints/EncodedHintMessage.java
+++ b/src/java/org/apache/cassandra/hints/EncodedHintMessage.java
@@ -65,8 +65,8 @@ final class EncodedHintMessage
if (version != message.version)
throw new IllegalArgumentException("serializedSize() called with non-matching version " + version);
- int size = (int) UUIDSerializer.serializer.serializedSize(message.hostId, version);
- size += TypeSizes.sizeof(message.hint.remaining());
+ long size = UUIDSerializer.serializer.serializedSize(message.hostId, version);
+ size += TypeSizes.sizeofVInt(message.hint.remaining());
size += message.hint.remaining();
return size;
}
@@ -77,7 +77,7 @@ final class EncodedHintMessage
throw new IllegalArgumentException("serialize() called with non-matching version " + version);
UUIDSerializer.serializer.serialize(message.hostId, out, version);
- out.writeInt(message.hint.remaining());
+ out.writeVInt(message.hint.remaining());
out.write(message.hint);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/hints/Hint.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/Hint.java b/src/java/org/apache/cassandra/hints/Hint.java
index d8f85c5..c88c494 100644
--- a/src/java/org/apache/cassandra/hints/Hint.java
+++ b/src/java/org/apache/cassandra/hints/Hint.java
@@ -26,6 +26,9 @@ import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
+import static org.apache.cassandra.db.TypeSizes.sizeof;
+import static org.apache.cassandra.db.TypeSizes.sizeofVInt;
+
/**
* Encapsulates the hinted mutation, its creation time, and the gc grace seconds param for each table involved.
*
@@ -107,8 +110,8 @@ public final class Hint
{
public long serializedSize(Hint hint, int version)
{
- long size = TypeSizes.sizeof(hint.creationTime);
- size += TypeSizes.sizeof(hint.gcgs);
+ long size = sizeof(hint.creationTime);
+ size += sizeofVInt(hint.gcgs);
size += Mutation.serializer.serializedSize(hint.mutation, version);
return size;
}
@@ -116,14 +119,14 @@ public final class Hint
public void serialize(Hint hint, DataOutputPlus out, int version) throws IOException
{
out.writeLong(hint.creationTime);
- out.writeInt(hint.gcgs);
+ out.writeVInt(hint.gcgs);
Mutation.serializer.serialize(hint.mutation, out, version);
}
public Hint deserialize(DataInputPlus in, int version) throws IOException
{
long creationTime = in.readLong();
- int gcgs = in.readInt();
+ int gcgs = (int) in.readVInt();
return new Hint(Mutation.serializer.deserialize(in, version), creationTime, gcgs);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/hints/HintMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintMessage.java b/src/java/org/apache/cassandra/hints/HintMessage.java
index 89baa89..6296a8c 100644
--- a/src/java/org/apache/cassandra/hints/HintMessage.java
+++ b/src/java/org/apache/cassandra/hints/HintMessage.java
@@ -24,6 +24,8 @@ import java.util.UUID;
import javax.annotation.Nullable;
+import com.google.common.primitives.Ints;
+
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.db.UnknownColumnFamilyException;
import org.apache.cassandra.io.IVersionedSerializer;
@@ -81,10 +83,10 @@ public final class HintMessage
{
public long serializedSize(HintMessage message, int version)
{
- int size = (int) UUIDSerializer.serializer.serializedSize(message.hostId, version);
+ long size = UUIDSerializer.serializer.serializedSize(message.hostId, version);
- int hintSize = (int) Hint.serializer.serializedSize(message.hint, version);
- size += TypeSizes.sizeof(hintSize);
+ long hintSize = Hint.serializer.serializedSize(message.hint, version);
+ size += TypeSizes.sizeofVInt(hintSize);
size += hintSize;
return size;
@@ -100,7 +102,7 @@ public final class HintMessage
* We are serializing the hint size so that the receiver of the message could gracefully handle
* deserialize failure when a table had been dropped, by simply skipping the unread bytes.
*/
- out.writeInt((int) Hint.serializer.serializedSize(message.hint, version));
+ out.writeVInt(Hint.serializer.serializedSize(message.hint, version));
Hint.serializer.serialize(message.hint, out, version);
}
@@ -114,7 +116,7 @@ public final class HintMessage
{
UUID hostId = UUIDSerializer.serializer.deserialize(in, version);
- int hintSize = in.readInt();
+ long hintSize = in.readVInt();
BytesReadTracker countingIn = new BytesReadTracker(in);
try
{
@@ -122,7 +124,7 @@ public final class HintMessage
}
catch (UnknownColumnFamilyException e)
{
- in.skipBytes(hintSize - (int) countingIn.getBytesRead());
+ in.skipBytes(Ints.checkedCast(hintSize - countingIn.getBytesRead()));
return new HintMessage(hostId, e.cfId);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/hints/LegacyHintsMigrator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/LegacyHintsMigrator.java b/src/java/org/apache/cassandra/hints/LegacyHintsMigrator.java
index 196f184..b0095ed 100644
--- a/src/java/org/apache/cassandra/hints/LegacyHintsMigrator.java
+++ b/src/java/org/apache/cassandra/hints/LegacyHintsMigrator.java
@@ -92,7 +92,7 @@ public final class LegacyHintsMigrator
compactLegacyHints();
// paginate over legacy hints and write them to the new storage
- logger.info("Migrating legacy hints to the new storage");
+ logger.info("Writing legacy hints to the new storage");
migrateLegacyHints();
// truncate the legacy hints table
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index e59cd58..15199fe 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -47,6 +47,7 @@ import org.apache.cassandra.concurrent.TracingAwareExecutorService;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.batchlog.Batch;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.BootStrapper;
import org.apache.cassandra.dht.IPartitioner;
@@ -103,8 +104,8 @@ public final class MessagingService implements MessagingServiceMBean
READ_REPAIR,
READ,
REQUEST_RESPONSE, // client-initiated reads and writes
- @Deprecated STREAM_INITIATE,
- @Deprecated STREAM_INITIATE_DONE,
+ BATCH_STORE, // was @Deprecated STREAM_INITIATE,
+ BATCH_REMOVE, // was @Deprecated STREAM_INITIATE_DONE,
@Deprecated STREAM_REPLY,
@Deprecated STREAM_REQUEST,
RANGE_SLICE,
@@ -135,7 +136,6 @@ public final class MessagingService implements MessagingServiceMBean
PAXOS_PROPOSE,
PAXOS_COMMIT,
@Deprecated PAGED_RANGE,
- BATCHLOG_MUTATION,
// remember to add new verbs at the end, since we serialize by ordinal
UNUSED_1,
UNUSED_2,
@@ -149,13 +149,14 @@ public final class MessagingService implements MessagingServiceMBean
{{
put(Verb.MUTATION, Stage.MUTATION);
put(Verb.COUNTER_MUTATION, Stage.COUNTER_MUTATION);
- put(Verb.BATCHLOG_MUTATION, Stage.BATCHLOG_MUTATION);
put(Verb.READ_REPAIR, Stage.MUTATION);
put(Verb.HINT, Stage.MUTATION);
put(Verb.TRUNCATE, Stage.MUTATION);
put(Verb.PAXOS_PREPARE, Stage.MUTATION);
put(Verb.PAXOS_PROPOSE, Stage.MUTATION);
put(Verb.PAXOS_COMMIT, Stage.MUTATION);
+ put(Verb.BATCH_STORE, Stage.MUTATION);
+ put(Verb.BATCH_REMOVE, Stage.MUTATION);
put(Verb.READ, Stage.READ);
put(Verb.RANGE_SLICE, Stage.READ);
@@ -209,7 +210,6 @@ public final class MessagingService implements MessagingServiceMBean
put(Verb.INTERNAL_RESPONSE, CallbackDeterminedSerializer.instance);
put(Verb.MUTATION, Mutation.serializer);
- put(Verb.BATCHLOG_MUTATION, Mutation.serializer);
put(Verb.READ_REPAIR, Mutation.serializer);
put(Verb.READ, ReadCommand.serializer);
put(Verb.RANGE_SLICE, ReadCommand.rangeSliceSerializer);
@@ -229,6 +229,8 @@ public final class MessagingService implements MessagingServiceMBean
put(Verb.PAXOS_PROPOSE, Commit.serializer);
put(Verb.PAXOS_COMMIT, Commit.serializer);
put(Verb.HINT, HintMessage.serializer);
+ put(Verb.BATCH_STORE, Batch.serializer);
+ put(Verb.BATCH_REMOVE, UUIDSerializer.serializer);
}};
/**
@@ -238,7 +240,6 @@ public final class MessagingService implements MessagingServiceMBean
{{
put(Verb.MUTATION, WriteResponse.serializer);
put(Verb.HINT, HintResponse.serializer);
- put(Verb.BATCHLOG_MUTATION, WriteResponse.serializer);
put(Verb.READ_REPAIR, WriteResponse.serializer);
put(Verb.COUNTER_MUTATION, WriteResponse.serializer);
put(Verb.RANGE_SLICE, ReadResponse.rangeSliceSerializer);
@@ -254,6 +255,9 @@ public final class MessagingService implements MessagingServiceMBean
put(Verb.PAXOS_PREPARE, PrepareResponse.serializer);
put(Verb.PAXOS_PROPOSE, BooleanSerializer.serializer);
+
+ put(Verb.BATCH_STORE, WriteResponse.serializer);
+ put(Verb.BATCH_REMOVE, WriteResponse.serializer);
}};
/* This records all the results mapped by message Id */
@@ -286,7 +290,7 @@ public final class MessagingService implements MessagingServiceMBean
/* Lookup table for registering message handlers based on the verb. */
private final Map<Verb, IVerbHandler> verbHandlers;
- private final ConcurrentMap<InetAddress, OutboundTcpConnectionPool> connectionManagers = new NonBlockingHashMap<InetAddress, OutboundTcpConnectionPool>();
+ private final ConcurrentMap<InetAddress, OutboundTcpConnectionPool> connectionManagers = new NonBlockingHashMap<>();
private static final Logger logger = LoggerFactory.getLogger(MessagingService.class);
private static final int LOG_DROPPED_INTERVAL_IN_MS = 5000;
@@ -301,14 +305,15 @@ public final class MessagingService implements MessagingServiceMBean
*/
public static final EnumSet<Verb> DROPPABLE_VERBS = EnumSet.of(Verb._TRACE,
Verb.MUTATION,
- Verb.BATCHLOG_MUTATION, //FIXME: should this be droppable??
Verb.COUNTER_MUTATION,
Verb.HINT,
Verb.READ_REPAIR,
Verb.READ,
Verb.RANGE_SLICE,
Verb.PAGED_RANGE,
- Verb.REQUEST_RESPONSE);
+ Verb.REQUEST_RESPONSE,
+ Verb.BATCH_STORE,
+ Verb.BATCH_REMOVE);
private static final class DroppedMessages
@@ -372,7 +377,7 @@ public final class MessagingService implements MessagingServiceMBean
droppedMessagesMap.put(verb, new DroppedMessages(verb));
listenGate = new SimpleCondition();
- verbHandlers = new EnumMap<Verb, IVerbHandler>(Verb.class);
+ verbHandlers = new EnumMap<>(Verb.class);
if (!testOnly)
{
Runnable logDropped = new Runnable()
@@ -630,7 +635,6 @@ public final class MessagingService implements MessagingServiceMBean
boolean allowHints)
{
assert message.verb == Verb.MUTATION
- || message.verb == Verb.BATCHLOG_MUTATION
|| message.verb == Verb.COUNTER_MUTATION
|| message.verb == Verb.PAXOS_COMMIT;
int messageId = nextId();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index cf1e021..c8b9677 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -27,7 +27,6 @@ import java.rmi.registry.LocateRegistry;
import java.rmi.server.RMIServerSocketFactory;
import java.util.Collections;
import java.util.Map;
-import java.util.UUID;
import java.util.concurrent.TimeUnit;
import javax.management.MBeanServer;
import javax.management.ObjectName;
@@ -41,7 +40,6 @@ import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistryListener;
import com.codahale.metrics.SharedMetricRegistries;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Uninterruptibles;
import org.slf4j.Logger;
@@ -52,6 +50,7 @@ import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.batchlog.LegacyBatchlogMigrator;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.StartupException;
@@ -280,6 +279,9 @@ public class CassandraDaemon
// migrate any legacy (pre-3.0) hints from system.hints table into the new store
new LegacyHintsMigrator(DatabaseDescriptor.getHintsDirectory(), DatabaseDescriptor.getMaxHintsFileSize()).migrate();
+ // migrate any legacy (pre-3.0) batch entries from system.batchlog to system.batches (new table format)
+ LegacyBatchlogMigrator.migrate();
+
// enable auto compaction
for (Keyspace keyspace : Keyspace.all())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 4952959..59f1c1c 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -24,12 +24,9 @@ import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
-import javax.annotation.Nullable;
import javax.management.MBeanServer;
import javax.management.ObjectName;
-import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.cache.CacheLoader;
import com.google.common.collect.*;
@@ -44,14 +41,19 @@ import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.HintedHandOffManager;
+import org.apache.cassandra.batchlog.*;
+import org.apache.cassandra.batchlog.LegacyBatchlogMigrator;
import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
-import org.apache.cassandra.db.marshal.UUIDType;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.rows.RowIterator;
import org.apache.cassandra.db.view.MaterializedViewManager;
import org.apache.cassandra.db.view.MaterializedViewUtils;
-import org.apache.cassandra.dht.*;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.dht.RingPosition;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.Gossiper;
@@ -67,8 +69,8 @@ import org.apache.cassandra.service.paxos.PrepareCallback;
import org.apache.cassandra.service.paxos.ProposeCallback;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.triggers.TriggerExecutor;
-import org.apache.cassandra.utils.AbstractIterator;
import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.AbstractIterator;
public class StorageProxy implements StorageProxyMBean
{
@@ -687,31 +689,19 @@ public class StorageProxy implements StorageProxyMBean
WriteType.BATCH,
cleanup);
- //When local node is the endpoint and there are no pending nodes we can
+ // When local node is the endpoint and there are no pending nodes we can
// Just apply the mutation locally.
- if (pairedEndpoint.equals(FBUtilities.getBroadcastAddress()) &&
- wrapper.handler.pendingEndpoints.isEmpty())
- {
- if (writeCommitLog)
- mutation.apply();
- else
- mutation.applyUnsafe();
- }
+ if (pairedEndpoint.equals(FBUtilities.getBroadcastAddress()) && wrapper.handler.pendingEndpoints.isEmpty())
+ mutation.apply(writeCommitLog);
else
- {
wrappers.add(wrapper);
- }
}
if (!wrappers.isEmpty())
{
- Mutation blMutation = BatchlogManager.getBatchlogMutationFor(Lists.transform(wrappers, w -> w.mutation), batchUUID, MessagingService.current_version);
-
- //Apply to local batchlog memtable in this thread
- if (writeCommitLog)
- blMutation.apply();
- else
- blMutation.applyUnsafe();
+ // Apply to local batchlog memtable in this thread
+ BatchlogManager.store(Batch.createLocal(batchUUID, FBUtilities.timestampMicros(), Lists.transform(wrappers, w -> w.mutation)),
+ writeCommitLog);
// now actually perform the writes and wait for them to complete
asyncWriteBatchedMutations(wrappers, localDataCenter, Stage.MATERIALIZED_VIEW_MUTATION);
@@ -781,16 +771,10 @@ public class StorageProxy implements StorageProxyMBean
batchConsistencyLevel = consistency_level;
}
- final Collection<InetAddress> batchlogEndpoints = getBatchlogEndpoints(localDataCenter, batchConsistencyLevel);
+ final BatchlogEndpoints batchlogEndpoints = getBatchlogEndpoints(localDataCenter, batchConsistencyLevel);
final UUID batchUUID = UUIDGen.getTimeUUID();
BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(),
- new BatchlogResponseHandler.BatchlogCleanupCallback()
- {
- public void invoke()
- {
- asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID);
- }
- });
+ () -> asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID));
// add a handler for each mutation - includes checking availability, but doesn't initiate any writes, yet
for (Mutation mutation : mutations)
@@ -840,75 +824,64 @@ public class StorageProxy implements StorageProxyMBean
return replica.equals(FBUtilities.getBroadcastAddress());
}
+ private static void syncWriteToBatchlog(Collection<Mutation> mutations, BatchlogEndpoints endpoints, UUID uuid)
+ throws WriteTimeoutException, WriteFailureException
+ {
+ WriteResponseHandler<?> handler = new WriteResponseHandler<>(endpoints.all,
+ Collections.<InetAddress>emptyList(),
+ endpoints.all.size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO,
+ Keyspace.open(SystemKeyspace.NAME),
+ null,
+ WriteType.BATCH_LOG);
+
+ Batch batch = Batch.createLocal(uuid, FBUtilities.timestampMicros(), mutations);
+
+ if (!endpoints.current.isEmpty())
+ syncWriteToBatchlog(handler, batch, endpoints.current);
+
+ if (!endpoints.legacy.isEmpty())
+ LegacyBatchlogMigrator.syncWriteToBatchlog(handler, batch, endpoints.legacy);
+
+ handler.get();
+ }
- private static void syncWriteToBatchlog(Collection<Mutation> mutations, Collection<InetAddress> endpoints, UUID uuid)
+ private static void syncWriteToBatchlog(WriteResponseHandler<?> handler, Batch batch, Collection<InetAddress> endpoints)
throws WriteTimeoutException, WriteFailureException
{
- AbstractWriteResponseHandler<IMutation> handler = new WriteResponseHandler<>(endpoints,
- Collections.<InetAddress>emptyList(),
- ConsistencyLevel.ONE,
- Keyspace.open(SystemKeyspace.NAME),
- null,
- WriteType.BATCH_LOG);
+ MessageOut<Batch> message = new MessageOut<>(MessagingService.Verb.BATCH_STORE, batch, Batch.serializer);
- MessageOut<Mutation> message = BatchlogManager.getBatchlogMutationFor(mutations, uuid, MessagingService.current_version)
- .createMessage(MessagingService.Verb.BATCHLOG_MUTATION);
for (InetAddress target : endpoints)
{
- int targetVersion = MessagingService.instance().getVersion(target);
+ logger.debug("Sending batchlog store request {} to {} for {} mutations", batch.id, target, batch.size());
+
if (canDoLocalRequest(target))
- {
- insertLocal(Stage.BATCHLOG_MUTATION, message.payload, handler);
- }
- else if (targetVersion < MessagingService.VERSION_30)
- {
- MessagingService.instance().sendRR(BatchlogManager.getBatchlogMutationFor(mutations, uuid, targetVersion)
- .createMessage(MessagingService.Verb.MUTATION),
- target,
- handler,
- false);
- }
+ performLocally(Stage.MUTATION, () -> BatchlogManager.store(batch), handler);
else
- {
- MessagingService.instance().sendRR(message, target, handler, false);
- }
+ MessagingService.instance().sendRR(message, target, handler);
}
+ }
- handler.get();
+ private static void asyncRemoveFromBatchlog(BatchlogEndpoints endpoints, UUID uuid)
+ {
+ if (!endpoints.current.isEmpty())
+ asyncRemoveFromBatchlog(endpoints.current, uuid);
+
+ if (!endpoints.legacy.isEmpty())
+ LegacyBatchlogMigrator.asyncRemoveFromBatchlog(endpoints.legacy, uuid);
}
private static void asyncRemoveFromBatchlog(Collection<InetAddress> endpoints, UUID uuid)
{
- AbstractWriteResponseHandler<IMutation> handler = new WriteResponseHandler<>(endpoints,
- Collections.<InetAddress>emptyList(),
- ConsistencyLevel.ANY,
- Keyspace.open(SystemKeyspace.NAME),
- null,
- WriteType.SIMPLE);
- Mutation mutation = new Mutation(
- PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batches,
- UUIDType.instance.decompose(uuid),
- FBUtilities.timestampMicros(),
- FBUtilities.nowInSeconds()));
- MessageOut<Mutation> message = mutation.createMessage(MessagingService.Verb.BATCHLOG_MUTATION);
+ MessageOut<UUID> message = new MessageOut<>(MessagingService.Verb.BATCH_REMOVE, uuid, UUIDSerializer.serializer);
for (InetAddress target : endpoints)
{
- int targetVersion = MessagingService.instance().getVersion(target);
+ if (logger.isDebugEnabled())
+ logger.debug("Sending batchlog remove request {} to {}", uuid, target);
+
if (canDoLocalRequest(target))
- {
- insertLocal(Stage.BATCHLOG_MUTATION, message.payload, handler);
- }
- else if (targetVersion < MessagingService.VERSION_30)
- {
- MessagingService.instance().sendRR(mutation.createMessage(MessagingService.Verb.MUTATION),
- target,
- handler,
- false);
- }
+ performLocally(Stage.MUTATION, () -> BatchlogManager.remove(uuid));
else
- {
- MessagingService.instance().sendRR(message, target, handler, false);
- }
+ MessagingService.instance().sendOneWay(message, target);
}
}
@@ -1034,13 +1007,38 @@ public class StorageProxy implements StorageProxyMBean
}
/*
+ * A class to filter batchlog endpoints into legacy endpoints (version < 3.0) or not.
+ */
+ private static final class BatchlogEndpoints
+ {
+ public final Collection<InetAddress> all;
+ public final Collection<InetAddress> current;
+ public final Collection<InetAddress> legacy;
+
+ BatchlogEndpoints(Collection<InetAddress> endpoints)
+ {
+ all = endpoints;
+ current = new ArrayList<>(2);
+ legacy = new ArrayList<>(2);
+
+ for (InetAddress ep : endpoints)
+ {
+ if (MessagingService.instance().getVersion(ep) >= MessagingService.VERSION_30)
+ current.add(ep);
+ else
+ legacy.add(ep);
+ }
+ }
+ }
+
+ /*
* Replicas are picked manually:
* - replicas should be alive according to the failure detector
* - replicas should be in the local datacenter
* - choose min(2, number of qualifying candiates above)
* - allow the local node to be the only replica only if it's a single-node DC
*/
- private static Collection<InetAddress> getBatchlogEndpoints(String localDataCenter, ConsistencyLevel consistencyLevel)
+ private static BatchlogEndpoints getBatchlogEndpoints(String localDataCenter, ConsistencyLevel consistencyLevel)
throws UnavailableException
{
TokenMetadata.Topology topology = StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology();
@@ -1051,12 +1049,12 @@ public class StorageProxy implements StorageProxyMBean
if (chosenEndpoints.isEmpty())
{
if (consistencyLevel == ConsistencyLevel.ANY)
- return Collections.singleton(FBUtilities.getBroadcastAddress());
+ return new BatchlogEndpoints(Collections.singleton(FBUtilities.getBroadcastAddress()));
throw new UnavailableException(ConsistencyLevel.ONE, 1, 0);
}
- return chosenEndpoints;
+ return new BatchlogEndpoints(chosenEndpoints);
}
/**
@@ -1109,7 +1107,8 @@ public class StorageProxy implements StorageProxyMBean
if (canDoLocalRequest(destination))
{
insertLocal = true;
- } else
+ }
+ else
{
// belongs on a different server
if (message == null)
@@ -1120,14 +1119,15 @@ public class StorageProxy implements StorageProxyMBean
if (localDataCenter.equals(dc))
{
MessagingService.instance().sendRR(message, destination, responseHandler, true);
- } else
+ }
+ else
{
Collection<InetAddress> messages = (dcGroups != null) ? dcGroups.get(dc) : null;
if (messages == null)
{
- messages = new ArrayList<InetAddress>(3); // most DCs will have <= 3 replicas
+ messages = new ArrayList<>(3); // most DCs will have <= 3 replicas
if (dcGroups == null)
- dcGroups = new HashMap<String, Collection<InetAddress>>();
+ dcGroups = new HashMap<>();
dcGroups.put(dc, messages);
}
messages.add(destination);
@@ -1149,7 +1149,7 @@ public class StorageProxy implements StorageProxyMBean
submitHint(mutation, endpointsToHint, responseHandler);
if (insertLocal)
- insertLocal(stage, mutation, responseHandler);
+ performLocally(stage, mutation::apply, responseHandler);
if (dcGroups != null)
{
@@ -1198,7 +1198,7 @@ public class StorageProxy implements StorageProxyMBean
}
}
- private static void insertLocal(Stage stage, final Mutation mutation, final AbstractWriteResponseHandler<IMutation> responseHandler)
+ private static void performLocally(Stage stage, final Runnable runnable)
{
StageManager.getStage(stage).maybeExecuteImmediately(new LocalMutationRunnable()
{
@@ -1206,14 +1206,32 @@ public class StorageProxy implements StorageProxyMBean
{
try
{
- mutation.apply();
- responseHandler.response(null);
+ runnable.run();
+ }
+ catch (Exception ex)
+ {
+ logger.error("Failed to apply mutation locally : {}", ex);
+ }
+ }
+ });
+ }
+
+ private static void performLocally(Stage stage, final Runnable runnable, final IAsyncCallbackWithFailure<?> handler)
+ {
+ StageManager.getStage(stage).maybeExecuteImmediately(new LocalMutationRunnable()
+ {
+ public void runMayThrow()
+ {
+ try
+ {
+ runnable.run();
+ handler.response(null);
}
catch (Exception ex)
{
if (!(ex instanceof WriteTimeoutException))
logger.error("Failed to apply mutation locally : {}", ex);
- responseHandler.onFailure(FBUtilities.getBroadcastAddress());
+ handler.onFailure(FBUtilities.getBroadcastAddress());
}
}
});
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index a6c2f8b..13dc29c7 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -55,6 +55,9 @@ import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.*;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.batchlog.BatchStoreVerbHandler;
+import org.apache.cassandra.batchlog.BatchRemoveVerbHandler;
+import org.apache.cassandra.batchlog.BatchlogManager;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.lifecycle.TransactionLog;
@@ -282,7 +285,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
/* register the verb handlers */
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.MUTATION, new MutationVerbHandler());
- MessagingService.instance().registerVerbHandlers(MessagingService.Verb.BATCHLOG_MUTATION, new MutationVerbHandler());
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.READ_REPAIR, new ReadRepairVerbHandler());
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.READ, new ReadCommandVerbHandler());
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.RANGE_SLICE, new RangeSliceVerbHandler());
@@ -311,6 +313,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.SNAPSHOT, new SnapshotVerbHandler());
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.ECHO, new EchoVerbHandler());
+
+ MessagingService.instance().registerVerbHandlers(MessagingService.Verb.BATCH_STORE, new BatchStoreVerbHandler());
+ MessagingService.instance().registerVerbHandlers(MessagingService.Verb.BATCH_REMOVE, new BatchRemoveVerbHandler());
}
public void registerDaemon(CassandraDaemon daemon)
@@ -620,12 +625,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
inShutdownHook = true;
ExecutorService materializedViewMutationStage = StageManager.getStage(Stage.MATERIALIZED_VIEW_MUTATION);
- ExecutorService batchlogMutationStage = StageManager.getStage(Stage.BATCHLOG_MUTATION);
ExecutorService counterMutationStage = StageManager.getStage(Stage.COUNTER_MUTATION);
ExecutorService mutationStage = StageManager.getStage(Stage.MUTATION);
if (mutationStage.isShutdown()
&& counterMutationStage.isShutdown()
- && batchlogMutationStage.isShutdown()
&& materializedViewMutationStage.isShutdown())
return; // drained already
@@ -638,12 +641,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
// before mutation stage, so we can get all the hints saved before shutting down
MessagingService.instance().shutdown();
materializedViewMutationStage.shutdown();
- batchlogMutationStage.shutdown();
HintsService.instance.pauseDispatch();
counterMutationStage.shutdown();
mutationStage.shutdown();
materializedViewMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
- batchlogMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
counterMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
mutationStage.awaitTermination(3600, TimeUnit.SECONDS);
StorageProxy.instance.verifyNoHintsInProgress();
@@ -3846,17 +3847,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
inShutdownHook = true;
- BatchlogManager.shutdown();
+ BatchlogManager.instance.shutdown();
HintsService.instance.pauseDispatch();
ExecutorService counterMutationStage = StageManager.getStage(Stage.COUNTER_MUTATION);
- ExecutorService batchlogMutationStage = StageManager.getStage(Stage.BATCHLOG_MUTATION);
ExecutorService materializedViewMutationStage = StageManager.getStage(Stage.MATERIALIZED_VIEW_MUTATION);
ExecutorService mutationStage = StageManager.getStage(Stage.MUTATION);
if (mutationStage.isTerminated()
&& counterMutationStage.isTerminated()
- && batchlogMutationStage.isTerminated()
&& materializedViewMutationStage.isTerminated())
{
logger.warn("Cannot drain node (did it already happen?)");
@@ -3872,11 +3871,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
setMode(Mode.DRAINING, "clearing mutation stage", false);
materializedViewMutationStage.shutdown();
- batchlogMutationStage.shutdown();
counterMutationStage.shutdown();
mutationStage.shutdown();
materializedViewMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
- batchlogMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
counterMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
mutationStage.awaitTermination(3600, TimeUnit.SECONDS);
@@ -3913,6 +3910,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
FBUtilities.waitOnFutures(flushes);
+ BatchlogManager.instance.shutdown();
+
HintsService.instance.shutdownBlocking();
// whilst we've flushed all the CFs, which will have recycled all completed segments, we want to ensure
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/service/paxos/CommitVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/CommitVerbHandler.java b/src/java/org/apache/cassandra/service/paxos/CommitVerbHandler.java
index 213023e..a702a4d 100644
--- a/src/java/org/apache/cassandra/service/paxos/CommitVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/paxos/CommitVerbHandler.java
@@ -1,4 +1,3 @@
-package org.apache.cassandra.service.paxos;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -19,7 +18,7 @@ package org.apache.cassandra.service.paxos;
* under the License.
*
*/
-
+package org.apache.cassandra.service.paxos;
import org.apache.cassandra.db.WriteResponse;
import org.apache.cassandra.net.IVerbHandler;
@@ -33,8 +32,7 @@ public class CommitVerbHandler implements IVerbHandler<Commit>
{
PaxosState.commit(message.payload);
- WriteResponse response = new WriteResponse();
Tracing.trace("Enqueuing acknowledge to {}", message.from);
- MessagingService.instance().sendReply(response.createMessage(), id, message.from);
+ MessagingService.instance().sendReply(WriteResponse.createMessage(), id, message.from);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 6909ea4..5f77097 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -54,8 +54,8 @@ import javax.management.remote.JMXServiceURL;
import javax.rmi.ssl.SslRMIClientSocketFactory;
import org.apache.cassandra.concurrent.Stage;
-import org.apache.cassandra.db.BatchlogManager;
-import org.apache.cassandra.db.BatchlogManagerMBean;
+import org.apache.cassandra.batchlog.BatchlogManager;
+import org.apache.cassandra.batchlog.BatchlogManagerMBean;
import org.apache.cassandra.db.ColumnFamilyStoreMBean;
import org.apache.cassandra.db.HintedHandOffManagerMBean;
import org.apache.cassandra.db.compaction.CompactionManager;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/test/long/org/apache/cassandra/cql3/MaterializedViewLongTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/cql3/MaterializedViewLongTest.java b/test/long/org/apache/cassandra/cql3/MaterializedViewLongTest.java
index 9738103..b833e60 100644
--- a/test/long/org/apache/cassandra/cql3/MaterializedViewLongTest.java
+++ b/test/long/org/apache/cassandra/cql3/MaterializedViewLongTest.java
@@ -35,7 +35,7 @@ import com.datastax.driver.core.exceptions.WriteTimeoutException;
import org.apache.cassandra.concurrent.SEPExecutor;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.db.BatchlogManager;
+import org.apache.cassandra.batchlog.BatchlogManager;
import org.apache.cassandra.utils.WrappedRunnable;
public class MaterializedViewLongTest extends CQLTester
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/test/unit/org/apache/cassandra/batchlog/BatchTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/batchlog/BatchTest.java b/test/unit/org/apache/cassandra/batchlog/BatchTest.java
new file mode 100644
index 0000000..b7a4100
--- /dev/null
+++ b/test/unit/org/apache/cassandra/batchlog/BatchTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.batchlog;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+import static org.junit.Assert.assertEquals;
+
+public class BatchTest
+{
+ private static final String KEYSPACE = "BatchRequestTest";
+ private static final String CF_STANDARD = "Standard";
+
+ @BeforeClass
+ public static void defineSchema() throws ConfigurationException
+ {
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE,
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD(KEYSPACE, CF_STANDARD, 1, BytesType.instance));
+ }
+
+ @Test
+ public void testSerialization() throws IOException
+ {
+ CFMetaData cfm = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF_STANDARD).metadata;
+
+ long now = FBUtilities.timestampMicros();
+ int version = MessagingService.current_version;
+ UUID uuid = UUIDGen.getTimeUUID();
+
+ List<Mutation> mutations = new ArrayList<>(10);
+ for (int i = 0; i < 10; i++)
+ {
+ mutations.add(new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), bytes(i))
+ .clustering("name" + i)
+ .add("val", "val" + i)
+ .build());
+ }
+
+ Batch batch1 = Batch.createLocal(uuid, now, mutations);
+ assertEquals(uuid, batch1.id);
+ assertEquals(now, batch1.creationTime);
+ assertEquals(mutations, batch1.decodedMutations);
+
+ DataOutputBuffer out = new DataOutputBuffer();
+ Batch.serializer.serialize(batch1, out, version);
+
+ assertEquals(out.getLength(), Batch.serializer.serializedSize(batch1, version));
+
+ DataInputPlus dis = new DataInputBuffer(out.getData());
+ Batch batch2 = Batch.serializer.deserialize(dis, version);
+
+ assertEquals(batch1.id, batch2.id);
+ assertEquals(batch1.creationTime, batch2.creationTime);
+ assertEquals(batch1.decodedMutations.size(), batch2.encodedMutations.size());
+
+ Iterator<Mutation> it1 = batch1.decodedMutations.iterator();
+ Iterator<ByteBuffer> it2 = batch2.encodedMutations.iterator();
+ while (it1.hasNext())
+ {
+ try (DataInputBuffer in = new DataInputBuffer(it2.next().array()))
+ {
+ assertEquals(it1.next().toString(), Mutation.serializer.deserialize(in, version).toString());
+ }
+ }
+ }
+
+ /**
+ * This is just to test decodeMutations() when deserializing,
+ * since Batch will never be serialized at a version 2.2.
+ * @throws IOException
+ */
+ @Test
+ public void testSerializationNonCurrentVersion() throws IOException
+ {
+ CFMetaData cfm = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF_STANDARD).metadata;
+
+ long now = FBUtilities.timestampMicros();
+ int version = MessagingService.VERSION_22;
+ UUID uuid = UUIDGen.getTimeUUID();
+
+ List<Mutation> mutations = new ArrayList<>(10);
+ for (int i = 0; i < 10; i++)
+ {
+ mutations.add(new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), bytes(i))
+ .clustering("name" + i)
+ .add("val", "val" + i)
+ .build());
+ }
+
+ Batch batch1 = Batch.createLocal(uuid, now, mutations);
+ assertEquals(uuid, batch1.id);
+ assertEquals(now, batch1.creationTime);
+ assertEquals(mutations, batch1.decodedMutations);
+
+ DataOutputBuffer out = new DataOutputBuffer();
+ Batch.serializer.serialize(batch1, out, version);
+
+ assertEquals(out.getLength(), Batch.serializer.serializedSize(batch1, version));
+
+ DataInputPlus dis = new DataInputBuffer(out.getData());
+ Batch batch2 = Batch.serializer.deserialize(dis, version);
+
+ assertEquals(batch1.id, batch2.id);
+ assertEquals(batch1.creationTime, batch2.creationTime);
+ assertEquals(batch1.decodedMutations.size(), batch2.decodedMutations.size());
+
+ Iterator<Mutation> it1 = batch1.decodedMutations.iterator();
+ Iterator<Mutation> it2 = batch2.decodedMutations.iterator();
+ while (it1.hasNext())
+ assertEquals(it1.next().toString(), it2.next().toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java b/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java
new file mode 100644
index 0000000..23aeaaa
--- /dev/null
+++ b/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.batchlog;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.HashSet;
+
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.Multimap;
+import org.junit.Test;
+import org.junit.matchers.JUnitMatchers;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+public class BatchlogEndpointFilterTest
+{
+ private static final String LOCAL = "local";
+
+ @Test
+ public void shouldSelect2hostsFromNonLocalRacks() throws UnknownHostException
+ {
+ Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder()
+ .put(LOCAL, InetAddress.getByName("0"))
+ .put(LOCAL, InetAddress.getByName("00"))
+ .put("1", InetAddress.getByName("1"))
+ .put("1", InetAddress.getByName("11"))
+ .put("2", InetAddress.getByName("2"))
+ .put("2", InetAddress.getByName("22"))
+ .build();
+ Collection<InetAddress> result = new TestEndpointFilter(LOCAL, endpoints).filter();
+ assertThat(result.size(), is(2));
+ assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("11")));
+ assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("22")));
+ }
+
+ @Test
+ public void shouldSelectHostFromLocal() throws UnknownHostException
+ {
+ Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder()
+ .put(LOCAL, InetAddress.getByName("0"))
+ .put(LOCAL, InetAddress.getByName("00"))
+ .put("1", InetAddress.getByName("1"))
+ .build();
+ Collection<InetAddress> result = new TestEndpointFilter(LOCAL, endpoints).filter();
+ assertThat(result.size(), is(2));
+ assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("1")));
+ assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("0")));
+ }
+
+ @Test
+ public void shouldReturnAsIsIfNoEnoughEndpoints() throws UnknownHostException
+ {
+ Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder()
+ .put(LOCAL, InetAddress.getByName("0"))
+ .build();
+ Collection<InetAddress> result = new TestEndpointFilter(LOCAL, endpoints).filter();
+ assertThat(result.size(), is(1));
+ assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("0")));
+ }
+
+ @Test
+ public void shouldSelectTwoRandomHostsFromSingleOtherRack() throws UnknownHostException
+ {
+ Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder()
+ .put(LOCAL, InetAddress.getByName("0"))
+ .put(LOCAL, InetAddress.getByName("00"))
+ .put("1", InetAddress.getByName("1"))
+ .put("1", InetAddress.getByName("11"))
+ .put("1", InetAddress.getByName("111"))
+ .build();
+ Collection<InetAddress> result = new TestEndpointFilter(LOCAL, endpoints).filter();
+ // result should contain random two distinct values
+ assertThat(new HashSet<>(result).size(), is(2));
+ }
+
+ private static class TestEndpointFilter extends BatchlogManager.EndpointFilter
+ {
+ TestEndpointFilter(String localRack, Multimap<String, InetAddress> endpoints)
+ {
+ super(localRack, endpoints);
+ }
+
+ @Override
+ protected boolean isValid(InetAddress input)
+ {
+ // We will use always alive non-localhost endpoints
+ return true;
+ }
+
+ @Override
+ protected int getRandomInt(int bound)
+ {
+ // We don't need random behavior here
+ return bound - 1;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java b/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java
new file mode 100644
index 0000000..dfb17c3
--- /dev/null
+++ b/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.batchlog;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Lists;
+import org.junit.*;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.Util.PartitionerSwitcher;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.partitions.ImmutableBTreePartition;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+
+import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
+import static org.junit.Assert.*;
+
+public class BatchlogManagerTest
+{
+ private static final String KEYSPACE1 = "BatchlogManagerTest1";
+ private static final String CF_STANDARD1 = "Standard1";
+ private static final String CF_STANDARD2 = "Standard2";
+ private static final String CF_STANDARD3 = "Standard3";
+ private static final String CF_STANDARD4 = "Standard4";
+ private static final String CF_STANDARD5 = "Standard5";
+
+ static PartitionerSwitcher sw;
+
+ @BeforeClass
+ public static void defineSchema() throws ConfigurationException
+ {
+ sw = Util.switchPartitioner(Murmur3Partitioner.instance);
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE1,
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1, 1, BytesType.instance),
+ SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2, 1, BytesType.instance),
+ SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD3, 1, BytesType.instance),
+ SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD4, 1, BytesType.instance),
+ SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD5, 1, BytesType.instance));
+ }
+
+ @AfterClass
+ public static void cleanup()
+ {
+ sw.close();
+ }
+
+ @Before
+ @SuppressWarnings("deprecation")
+ public void setUp() throws Exception
+ {
+ TokenMetadata metadata = StorageService.instance.getTokenMetadata();
+ InetAddress localhost = InetAddress.getByName("127.0.0.1");
+ metadata.updateNormalToken(Util.token("A"), localhost);
+ metadata.updateHostId(UUIDGen.getTimeUUID(), localhost);
+ Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).truncateBlocking();
+ Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_BATCHLOG).truncateBlocking();
+ }
+
+ @Test
+ public void testDelete()
+ {
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
+ CFMetaData cfm = cfs.metadata;
+ new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), ByteBufferUtil.bytes("1234"))
+ .clustering("c")
+ .add("val", "val" + 1234)
+ .build()
+ .applyUnsafe();
+
+ DecoratedKey dk = cfs.decorateKey(ByteBufferUtil.bytes("1234"));
+ ImmutableBTreePartition results = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, dk).build());
+ Iterator<Row> iter = results.iterator();
+ assert iter.hasNext();
+
+ Mutation mutation = new Mutation(PartitionUpdate.fullPartitionDelete(cfm,
+ dk,
+ FBUtilities.timestampMicros(),
+ FBUtilities.nowInSeconds()));
+ mutation.applyUnsafe();
+
+ Util.assertEmpty(Util.cmd(cfs, dk).build());
+ }
+
+ @Test
+ public void testReplay() throws Exception
+ {
+ testReplay(false);
+ }
+
+ @Test
+ public void testLegacyReplay() throws Exception
+ {
+ testReplay(true);
+ }
+
+ @SuppressWarnings("deprecation")
+ private static void testReplay(boolean legacy) throws Exception
+ {
+ long initialAllBatches = BatchlogManager.instance.countAllBatches();
+ long initialReplayedBatches = BatchlogManager.instance.getTotalBatchesReplayed();
+
+ CFMetaData cfm = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1).metadata;
+
+ // Generate 1000 mutations (100 batches of 10 mutations each) and put them all into the batchlog.
+ // Half batches (50) ready to be replayed, half not.
+ for (int i = 0; i < 100; i++)
+ {
+ List<Mutation> mutations = new ArrayList<>(10);
+ for (int j = 0; j < 10; j++)
+ {
+ mutations.add(new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), ByteBufferUtil.bytes(i))
+ .clustering("name" + j)
+ .add("val", "val" + j)
+ .build());
+ }
+
+ long timestamp = i < 50
+ ? (System.currentTimeMillis() - BatchlogManager.getBatchlogTimeout())
+ : (System.currentTimeMillis() + BatchlogManager.getBatchlogTimeout());
+
+ if (legacy)
+ LegacyBatchlogMigrator.store(Batch.createLocal(UUIDGen.getTimeUUID(timestamp, i), timestamp * 1000, mutations), MessagingService.current_version);
+ else
+ BatchlogManager.store(Batch.createLocal(UUIDGen.getTimeUUID(timestamp, i), timestamp * 1000, mutations));
+ }
+
+ if (legacy)
+ {
+ Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_BATCHLOG).forceBlockingFlush();
+ LegacyBatchlogMigrator.migrate();
+ }
+
+ // Flush the batchlog to disk (see CASSANDRA-6822).
+ Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).forceBlockingFlush();
+
+ assertEquals(100, BatchlogManager.instance.countAllBatches() - initialAllBatches);
+ assertEquals(0, BatchlogManager.instance.getTotalBatchesReplayed() - initialReplayedBatches);
+
+ // Force batchlog replay and wait for it to complete.
+ BatchlogManager.instance.startBatchlogReplay().get();
+
+ // Ensure that the first half, and only the first half, got replayed.
+ assertEquals(50, BatchlogManager.instance.countAllBatches() - initialAllBatches);
+ assertEquals(50, BatchlogManager.instance.getTotalBatchesReplayed() - initialReplayedBatches);
+
+ for (int i = 0; i < 100; i++)
+ {
+ String query = String.format("SELECT * FROM \"%s\".\"%s\" WHERE key = intAsBlob(%d)", KEYSPACE1, CF_STANDARD1, i);
+ UntypedResultSet result = executeInternal(query);
+ assertNotNull(result);
+ if (i < 50)
+ {
+ Iterator<UntypedResultSet.Row> it = result.iterator();
+ assertNotNull(it);
+ for (int j = 0; j < 10; j++)
+ {
+ assertTrue(it.hasNext());
+ UntypedResultSet.Row row = it.next();
+
+ assertEquals(ByteBufferUtil.bytes(i), row.getBytes("key"));
+ assertEquals("name" + j, row.getString("name"));
+ assertEquals("val" + j, row.getString("val"));
+ }
+
+ assertFalse(it.hasNext());
+ }
+ else
+ {
+ assertTrue(result.isEmpty());
+ }
+ }
+
+ // Ensure that no stray mutations got somehow applied.
+ UntypedResultSet result = executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", KEYSPACE1, CF_STANDARD1));
+ assertNotNull(result);
+ assertEquals(500, result.one().getLong("count"));
+ }
+
+ @Test
+ public void testTruncatedReplay() throws InterruptedException, ExecutionException
+ {
+ CFMetaData cf2 = Schema.instance.getCFMetaData(KEYSPACE1, CF_STANDARD2);
+ CFMetaData cf3 = Schema.instance.getCFMetaData(KEYSPACE1, CF_STANDARD3);
+ // Generate 2000 mutations (1000 batchlog entries) and put them all into the batchlog.
+ // Each batchlog entry with a mutation for Standard2 and Standard3.
+ // In the middle of the process, 'truncate' Standard2.
+ for (int i = 0; i < 1000; i++)
+ {
+ Mutation mutation1 = new RowUpdateBuilder(cf2, FBUtilities.timestampMicros(), ByteBufferUtil.bytes(i))
+ .clustering("name" + i)
+ .add("val", "val" + i)
+ .build();
+ Mutation mutation2 = new RowUpdateBuilder(cf3, FBUtilities.timestampMicros(), ByteBufferUtil.bytes(i))
+ .clustering("name" + i)
+ .add("val", "val" + i)
+ .build();
+
+ List<Mutation> mutations = Lists.newArrayList(mutation1, mutation2);
+
+ // Make sure it's ready to be replayed, so adjust the timestamp.
+ long timestamp = System.currentTimeMillis() - BatchlogManager.getBatchlogTimeout();
+
+ if (i == 500)
+ SystemKeyspace.saveTruncationRecord(Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD2),
+ timestamp,
+ ReplayPosition.NONE);
+
+ // Adjust the timestamp (slightly) to make the test deterministic.
+ if (i >= 500)
+ timestamp++;
+ else
+ timestamp--;
+
+ BatchlogManager.store(Batch.createLocal(UUIDGen.getTimeUUID(timestamp, i), FBUtilities.timestampMicros(), mutations));
+ }
+
+ // Flush the batchlog to disk (see CASSANDRA-6822).
+ Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).forceBlockingFlush();
+
+ // Force batchlog replay and wait for it to complete.
+ BatchlogManager.instance.startBatchlogReplay().get();
+
+ // We should see half of Standard2-targeted mutations written after the replay and all of Standard3 mutations applied.
+ for (int i = 0; i < 1000; i++)
+ {
+ UntypedResultSet result = executeInternal(String.format("SELECT * FROM \"%s\".\"%s\" WHERE key = intAsBlob(%d)", KEYSPACE1, CF_STANDARD2,i));
+ assertNotNull(result);
+ if (i >= 500)
+ {
+ assertEquals(ByteBufferUtil.bytes(i), result.one().getBytes("key"));
+ assertEquals("name" + i, result.one().getString("name"));
+ assertEquals("val" + i, result.one().getString("val"));
+ }
+ else
+ {
+ assertTrue(result.isEmpty());
+ }
+ }
+
+ for (int i = 0; i < 1000; i++)
+ {
+ UntypedResultSet result = executeInternal(String.format("SELECT * FROM \"%s\".\"%s\" WHERE key = intAsBlob(%d)", KEYSPACE1, CF_STANDARD3, i));
+ assertNotNull(result);
+ assertEquals(ByteBufferUtil.bytes(i), result.one().getBytes("key"));
+ assertEquals("name" + i, result.one().getString("name"));
+ assertEquals("val" + i, result.one().getString("val"));
+ }
+ }
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void testConversion() throws Exception
+ {
+ long initialAllBatches = BatchlogManager.instance.countAllBatches();
+ long initialReplayedBatches = BatchlogManager.instance.getTotalBatchesReplayed();
+ CFMetaData cfm = Schema.instance.getCFMetaData(KEYSPACE1, CF_STANDARD4);
+
+ // Generate 1400 version 2.0 mutations and put them all into the batchlog.
+ // Half ready to be replayed, half not.
+ for (int i = 0; i < 1400; i++)
+ {
+ Mutation mutation = new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), ByteBufferUtil.bytes(i))
+ .clustering("name" + i)
+ .add("val", "val" + i)
+ .build();
+
+ long timestamp = i < 700
+ ? (System.currentTimeMillis() - BatchlogManager.getBatchlogTimeout())
+ : (System.currentTimeMillis() + BatchlogManager.getBatchlogTimeout());
+
+
+ Mutation batchMutation = LegacyBatchlogMigrator.getStoreMutation(Batch.createLocal(UUIDGen.getTimeUUID(timestamp, i),
+ TimeUnit.MILLISECONDS.toMicros(timestamp),
+ Collections.singleton(mutation)),
+ MessagingService.VERSION_20);
+ assertTrue(LegacyBatchlogMigrator.isLegacyBatchlogMutation(batchMutation));
+ LegacyBatchlogMigrator.handleLegacyMutation(batchMutation);
+ }
+
+ // Mix in 100 current version mutations, 50 ready for replay.
+ for (int i = 1400; i < 1500; i++)
+ {
+ Mutation mutation = new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), ByteBufferUtil.bytes(i))
+ .clustering("name" + i)
+ .add("val", "val" + i)
+ .build();
+
+ long timestamp = i < 1450
+ ? (System.currentTimeMillis() - BatchlogManager.getBatchlogTimeout())
+ : (System.currentTimeMillis() + BatchlogManager.getBatchlogTimeout());
+
+
+ BatchlogManager.store(Batch.createLocal(UUIDGen.getTimeUUID(timestamp, i),
+ FBUtilities.timestampMicros(),
+ Collections.singleton(mutation)));
+ }
+
+ // Flush the batchlog to disk (see CASSANDRA-6822).
+ Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).forceBlockingFlush();
+
+ assertEquals(1500, BatchlogManager.instance.countAllBatches() - initialAllBatches);
+ assertEquals(0, BatchlogManager.instance.getTotalBatchesReplayed() - initialReplayedBatches);
+
+ UntypedResultSet result = executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", SystemKeyspace.NAME, SystemKeyspace.LEGACY_BATCHLOG));
+ assertNotNull(result);
+ assertEquals("Count in blog legacy", 0, result.one().getLong("count"));
+ result = executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", SystemKeyspace.NAME, SystemKeyspace.BATCHES));
+ assertNotNull(result);
+ assertEquals("Count in blog", 1500, result.one().getLong("count"));
+
+ // Force batchlog replay and wait for it to complete.
+ BatchlogManager.instance.performInitialReplay();
+
+ // Ensure that the first half, and only the first half, got replayed.
+ assertEquals(750, BatchlogManager.instance.countAllBatches() - initialAllBatches);
+ assertEquals(750, BatchlogManager.instance.getTotalBatchesReplayed() - initialReplayedBatches);
+
+ for (int i = 0; i < 1500; i++)
+ {
+ result = executeInternal(String.format("SELECT * FROM \"%s\".\"%s\" WHERE key = intAsBlob(%d)", KEYSPACE1, CF_STANDARD4, i));
+ assertNotNull(result);
+ if (i < 700 || i >= 1400 && i < 1450)
+ {
+ assertEquals(ByteBufferUtil.bytes(i), result.one().getBytes("key"));
+ assertEquals("name" + i, result.one().getString("name"));
+ assertEquals("val" + i, result.one().getString("val"));
+ }
+ else
+ {
+ assertTrue("Present at " + i, result.isEmpty());
+ }
+ }
+
+ // Ensure that no stray mutations got somehow applied.
+ result = executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", KEYSPACE1, CF_STANDARD4));
+ assertNotNull(result);
+ assertEquals(750, result.one().getLong("count"));
+
+ // Ensure batchlog is left as expected.
+ result = executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", SystemKeyspace.NAME, SystemKeyspace.BATCHES));
+ assertNotNull(result);
+ assertEquals("Count in blog after initial replay", 750, result.one().getLong("count"));
+ result = executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", SystemKeyspace.NAME, SystemKeyspace.LEGACY_BATCHLOG));
+ assertNotNull(result);
+ assertEquals("Count in blog legacy after initial replay ", 0, result.one().getLong("count"));
+ }
+
+ @Test
+ public void testAddBatch() throws IOException
+ {
+ long initialAllBatches = BatchlogManager.instance.countAllBatches();
+ CFMetaData cfm = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD5).metadata;
+
+ long timestamp = (System.currentTimeMillis() - DatabaseDescriptor.getWriteRpcTimeout() * 2) * 1000;
+ UUID uuid = UUIDGen.getTimeUUID();
+
+ // Add a batch with 10 mutations
+ List<Mutation> mutations = new ArrayList<>(10);
+ for (int j = 0; j < 10; j++)
+ {
+ mutations.add(new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), ByteBufferUtil.bytes(j))
+ .clustering("name" + j)
+ .add("val", "val" + j)
+ .build());
+ }
+
+
+ BatchlogManager.store(Batch.createLocal(uuid, timestamp, mutations));
+ Assert.assertEquals(initialAllBatches + 1, BatchlogManager.instance.countAllBatches());
+
+ String query = String.format("SELECT count(*) FROM %s.%s where id = %s",
+ SystemKeyspace.NAME,
+ SystemKeyspace.BATCHES,
+ uuid);
+ UntypedResultSet result = executeInternal(query);
+ assertNotNull(result);
+ assertEquals(1L, result.one().getLong("count"));
+ }
+
+ @Test
+ public void testRemoveBatch()
+ {
+ long initialAllBatches = BatchlogManager.instance.countAllBatches();
+ CFMetaData cfm = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD5).metadata;
+
+ long timestamp = (System.currentTimeMillis() - DatabaseDescriptor.getWriteRpcTimeout() * 2) * 1000;
+ UUID uuid = UUIDGen.getTimeUUID();
+
+ // Add a batch with 10 mutations
+ List<Mutation> mutations = new ArrayList<>(10);
+ for (int j = 0; j < 10; j++)
+ {
+ mutations.add(new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), ByteBufferUtil.bytes(j))
+ .clustering("name" + j)
+ .add("val", "val" + j)
+ .build());
+ }
+
+ // Store the batch
+ BatchlogManager.store(Batch.createLocal(uuid, timestamp, mutations));
+ Assert.assertEquals(initialAllBatches + 1, BatchlogManager.instance.countAllBatches());
+
+ // Remove the batch
+ BatchlogManager.remove(uuid);
+
+ assertEquals(initialAllBatches, BatchlogManager.instance.countAllBatches());
+
+ String query = String.format("SELECT count(*) FROM %s.%s where id = %s",
+ SystemKeyspace.NAME,
+ SystemKeyspace.BATCHES,
+ uuid);
+ UntypedResultSet result = executeInternal(query);
+ assertNotNull(result);
+ assertEquals(0L, result.one().getLong("count"));
+ }
+}