You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/03/25 00:43:03 UTC
[3/3] cassandra git commit: Compressed Commit Log
Compressed Commit Log
patch by branimir; reviewed by ariel for CASSANDRA-6809
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/44f8254d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/44f8254d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/44f8254d
Branch: refs/heads/trunk
Commit: 44f8254df850f17d0c9c940d69af7a2305beb4b0
Parents: 52ddfe4
Author: blambov <br...@datastax.com>
Authored: Tue Mar 24 23:42:30 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Tue Mar 24 23:42:30 2015 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
build.xml | 9 +-
conf/cassandra.yaml | 20 +-
.../org/apache/cassandra/config/Config.java | 2 +-
.../cassandra/config/DatabaseDescriptor.java | 39 +-
.../cassandra/config/ParametrizedClass.java | 60 +++
.../cassandra/config/SeedProviderDef.java | 35 --
.../config/YamlConfigurationLoader.java | 2 +-
.../db/commitlog/AbstractCommitLogService.java | 9 +-
.../db/commitlog/BatchCommitLogService.java | 2 +-
.../cassandra/db/commitlog/CommitLog.java | 55 +-
.../db/commitlog/CommitLogArchiver.java | 16 +-
.../db/commitlog/CommitLogDescriptor.java | 116 ++++-
.../db/commitlog/CommitLogReplayer.java | 509 +++++++++++--------
.../db/commitlog/CommitLogSegment.java | 297 +++++------
.../db/commitlog/CommitLogSegmentManager.java | 110 ++--
.../db/commitlog/CompressedSegment.java | 156 ++++++
.../db/commitlog/MemoryMappedSegment.java | 110 ++++
.../db/commitlog/PeriodicCommitLogService.java | 3 +-
.../io/compress/CompressionParameters.java | 19 +-
.../io/compress/DeflateCompressor.java | 2 +-
.../cassandra/io/util/ByteBufferDataInput.java | 172 +++++++
.../cassandra/io/util/MappedFileDataInput.java | 172 -------
.../cassandra/io/util/MmappedSegmentedFile.java | 2 +-
.../cassandra/metrics/CommitLogMetrics.java | 16 +-
test/conf/cassandra.yaml | 2 +-
test/conf/commitlog_compression.yaml | 2 +
.../cassandra/db/commitlog/ComitLogStress.java | 12 +-
.../db/commitlog/CommitLogStressTest.java | 412 +++++++++++++++
.../org/apache/cassandra/db/CommitLogTest.java | 72 ++-
30 files changed, 1668 insertions(+), 766 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8254d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3bbc48f..eb5acdb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0
+ * Compressed Commit Log (CASSANDRA-6809)
* Optimise IntervalTree (CASSANDRA-8988)
* Add a key-value payload for third party usage (CASSANDRA-8553)
* Bump metrics-reporter-config dependency for metrics 3.0 (CASSANDRA-8149)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8254d/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 7f046b4..8442698 100644
--- a/build.xml
+++ b/build.xml
@@ -1250,13 +1250,20 @@
</target>
<target name="test-compression" depends="build-test" description="Execute unit tests with sstable compression enabled">
- <testmacro suitename="unit" inputdir="${test.unit.src}" exclude="**/pig/*.java" timeout="${test.timeout}">
+ <property name="compressed_yaml" value="${build.test.dir}/cassandra.compressed.yaml"/>
+ <concat destfile="${compressed_yaml}">
+ <fileset file="${test.conf}/cassandra.yaml"/>
+ <fileset file="${test.conf}/commitlog_compression.yaml"/>
+ </concat>
+ <echo>Compressed config: ${compressed_yaml}</echo>
+ <testmacro suitename="unit" inputdir="${test.unit.src}" exclude="**/pig/*.java" timeout="${test.timeout}">
<jvmarg value="-Dlegacy-sstable-root=${test.data}/legacy-sstables"/>
<jvmarg value="-Dcorrupt-sstable-root=${test.data}/corrupt-sstables"/>
<jvmarg value="-Dmigration-sstable-root=${test.data}/migration-sstables"/>
<jvmarg value="-Dcassandra.test.compression=true"/>
<jvmarg value="-Dcassandra.ring_delay_ms=1000"/>
<jvmarg value="-Dcassandra.tolerate_sstable_size=true"/>
+ <jvmarg value="-Dcassandra.config=file:///${compressed_yaml}"/>
</testmacro>
<fileset dir="${test.unit.src}">
<exclude name="**/pig/*.java" />
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8254d/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index f59c7a3..35326f3 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -289,6 +289,13 @@ commitlog_sync_period_in_ms: 10000
# is reasonable.
commitlog_segment_size_in_mb: 32
+# Compression to apply to the commit log. If omitted, the commit log
+# will be written uncompressed.
+#commitlog_compression:
+# - class_name: LZ4Compressor
+# parameters:
+# -
+
# any class that implements the SeedProvider interface and has a
# constructor that takes a Map<String, String> of parameters will do.
seed_provider:
@@ -344,14 +351,13 @@ concurrent_counter_writes: 32
# offheap_objects: native memory, eliminating nio buffer heap overhead
memtable_allocation_type: heap_buffers
-# Total space to use for commitlogs. Since commitlog segments are
-# mmapped, and hence use up address space, the default size is 32
-# on 32-bit JVMs, and 8192 on 64-bit JVMs.
+# Total uncompressed size of the commit log.
+#
+# If space gets above this value, Cassandra will flush every dirty CF
+# in the oldest segment and remove it. So a small total commitlog space
+# will tend to cause more flush activity on less-active columnfamilies.
#
-# If space gets above this value (it will round up to the next nearest
-# segment multiple), Cassandra will flush every dirty CF in the oldest
-# segment and remove it. So a small total commitlog space will tend
-# to cause more flush activity on less-active columnfamilies.
+# The default value is 8192.
# commitlog_total_space_in_mb: 8192
# This sets the amount of memtable flush writer threads. These will
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8254d/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 7ade647..25a9b31 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -64,7 +64,7 @@ public class Config
public Set<String> hinted_handoff_enabled_by_dc = Sets.newConcurrentHashSet();
public volatile Integer max_hint_window_in_ms = 3600 * 1000; // one hour
- public SeedProviderDef seed_provider;
+ public ParametrizedClass seed_provider;
public DiskAccessMode disk_access_mode = DiskAccessMode.auto;
public DiskFailurePolicy disk_failure_policy = DiskFailurePolicy.ignore;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8254d/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index c5e185b..c36c9e9 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -25,10 +25,12 @@ import java.util.*;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.common.primitives.Longs;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.auth.*;
+import org.apache.cassandra.config.Config.CommitLogSync;
import org.apache.cassandra.config.Config.RequestSchedulerId;
import org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions;
import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
@@ -287,7 +289,7 @@ public class DatabaseDescriptor
}
if (conf.commitlog_total_space_in_mb == null)
- conf.commitlog_total_space_in_mb = hasLargeAddressSpace() ? 8192 : 32;
+ conf.commitlog_total_space_in_mb = 8192;
// Always force standard mode access on Windows - CASSANDRA-6993. Windows won't allow deletion of hard-links to files that
// are memory-mapped which causes trouble with snapshots.
@@ -1075,6 +1077,21 @@ public class DatabaseDescriptor
return conf.commitlog_directory;
}
+ public static ParametrizedClass getCommitLogCompression()
+ {
+ return conf.commitlog_compression;
+ }
+
+ public static void setCommitLogCompression(ParametrizedClass compressor)
+ {
+ conf.commitlog_compression = compressor;
+ }
+
+ public static int getCommitLogMaxCompressionBuffersInPool()
+ {
+ return conf.commitlog_max_compression_buffers_in_pool;
+ }
+
public static int getTombstoneWarnThreshold()
{
return conf.tombstone_warn_threshold;
@@ -1102,6 +1119,11 @@ public class DatabaseDescriptor
{
return conf.commitlog_segment_size_in_mb * 1024 * 1024;
}
+
+ public static void setCommitLogSegmentSize(int sizeMegabytes)
+ {
+ conf.commitlog_segment_size_in_mb = sizeMegabytes;
+ }
public static String getSavedCachesLocation()
{
@@ -1237,10 +1259,20 @@ public class DatabaseDescriptor
return conf.commitlog_sync_batch_window_in_ms;
}
+ public static void setCommitLogSyncBatchWindow(double windowMillis)
+ {
+ conf.commitlog_sync_batch_window_in_ms = windowMillis;
+ }
+
public static int getCommitLogSyncPeriod()
{
return conf.commitlog_sync_period_in_ms;
}
+
+ public static void setCommitLogSyncPeriod(int periodMillis)
+ {
+ conf.commitlog_sync_period_in_ms = periodMillis;
+ }
public static int getCommitLogPeriodicQueueSize()
{
@@ -1252,6 +1284,11 @@ public class DatabaseDescriptor
return conf.commitlog_sync;
}
+ public static void setCommitLogSync(CommitLogSync sync)
+ {
+ conf.commitlog_sync = sync;
+ }
+
public static Config.DiskAccessMode getDiskAccessMode()
{
return conf.disk_access_mode;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8254d/src/java/org/apache/cassandra/config/ParametrizedClass.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/ParametrizedClass.java b/src/java/org/apache/cassandra/config/ParametrizedClass.java
new file mode 100644
index 0000000..783b3b0
--- /dev/null
+++ b/src/java/org/apache/cassandra/config/ParametrizedClass.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.config;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.base.Objects;
+
+public class ParametrizedClass
+{
+ public String class_name;
+ public Map<String, String> parameters;
+
+ public ParametrizedClass(String class_name, Map<String, String> parameters)
+ {
+ this.class_name = class_name;
+ this.parameters = parameters;
+ }
+
+ @SuppressWarnings("unchecked")
+ public ParametrizedClass(LinkedHashMap<String, ?> p)
+ {
+ this((String)p.get("class_name"),
+ p.containsKey("parameters") ? (Map<String, String>)((List<?>)p.get("parameters")).get(0) : null);
+ }
+
+ @Override
+ public boolean equals(Object that)
+ {
+ return that instanceof ParametrizedClass && equals((ParametrizedClass) that);
+ }
+
+ public boolean equals(ParametrizedClass that)
+ {
+ return Objects.equal(class_name, that.class_name) && Objects.equal(parameters, that.parameters);
+ }
+
+ @Override
+ public String toString()
+ {
+ return class_name + (parameters == null ? "" : parameters.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8254d/src/java/org/apache/cassandra/config/SeedProviderDef.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/SeedProviderDef.java b/src/java/org/apache/cassandra/config/SeedProviderDef.java
deleted file mode 100644
index cbe444a..0000000
--- a/src/java/org/apache/cassandra/config/SeedProviderDef.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.config;
-
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
-
-public class SeedProviderDef
-{
- public String class_name;
- public Map<String, String> parameters;
-
- public SeedProviderDef(LinkedHashMap<String, ?> p)
- {
- class_name = (String)p.get("class_name");
- parameters = (Map<String, String>)((List)p.get("parameters")).get(0);
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8254d/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java b/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
index 98eb75e..28511fe 100644
--- a/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
+++ b/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
@@ -104,7 +104,7 @@ public class YamlConfigurationLoader implements ConfigurationLoader
logConfig(configBytes);
org.yaml.snakeyaml.constructor.Constructor constructor = new org.yaml.snakeyaml.constructor.Constructor(Config.class);
- TypeDescription seedDesc = new TypeDescription(SeedProviderDef.class);
+ TypeDescription seedDesc = new TypeDescription(ParametrizedClass.class);
seedDesc.putMapPropertyType("parameters", String.class, String.class);
constructor.addTypeDescription(seedDesc);
MissingPropertiesChecker propertiesChecker = new MissingPropertiesChecker();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8254d/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index 78da1d5..d8967d6 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@ -45,7 +45,7 @@ public abstract class AbstractCommitLogService
protected final WaitQueue syncComplete = new WaitQueue();
private final Semaphore haveWork = new Semaphore(1);
- private final CommitLog commitLog;
+ final CommitLog commitLog;
private final String name;
private final long pollIntervalMillis;
@@ -62,11 +62,10 @@ public abstract class AbstractCommitLogService
this.commitLog = commitLog;
this.name = name;
this.pollIntervalMillis = pollIntervalMillis;
- start();
}
- // Separated into individual method for unit testing stop/start capability
- private void start()
+ // Separated into individual method to ensure relevant objects are constructed before this is started.
+ void start()
{
if (pollIntervalMillis < 1)
throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %dms", pollIntervalMillis));
@@ -220,4 +219,4 @@ public abstract class AbstractCommitLogService
{
return pending.get();
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8254d/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogService.java
index 65bee40..b433754 100644
--- a/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogService.java
@@ -30,7 +30,7 @@ class BatchCommitLogService extends AbstractCommitLogService
{
// wait until record has been safely persisted to disk
pending.incrementAndGet();
- alloc.awaitDiskSync();
+ alloc.awaitDiskSync(commitLog.metrics.waitingOnCommit);
pending.decrementAndGet();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8254d/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index fe993e2..51b3e53 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -26,6 +26,7 @@ import javax.management.MBeanServer;
import javax.management.ObjectName;
import com.google.common.annotations.VisibleForTesting;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,8 +35,11 @@ import org.apache.commons.lang3.StringUtils;
import com.github.tjake.ICRC32;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.ParametrizedClass;
import org.apache.cassandra.db.*;
import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.compress.CompressionParameters;
+import org.apache.cassandra.io.compress.ICompressor;
import org.apache.cassandra.io.util.DataOutputByteBuffer;
import org.apache.cassandra.metrics.CommitLogMetrics;
import org.apache.cassandra.net.MessagingService;
@@ -53,39 +57,58 @@ public class CommitLog implements CommitLogMBean
{
private static final Logger logger = LoggerFactory.getLogger(CommitLog.class);
- public static final CommitLog instance = new CommitLog();
+ public static final CommitLog instance = CommitLog.construct();
// we only permit records HALF the size of a commit log, to ensure we don't spin allocating many mostly
// empty segments when writing large records
- private static final long MAX_MUTATION_SIZE = DatabaseDescriptor.getCommitLogSegmentSize() >> 1;
+ private final long MAX_MUTATION_SIZE = DatabaseDescriptor.getCommitLogSegmentSize() >> 1;
public final CommitLogSegmentManager allocator;
- public final CommitLogArchiver archiver = new CommitLogArchiver();
+ public final CommitLogArchiver archiver;
final CommitLogMetrics metrics;
final AbstractCommitLogService executor;
- private CommitLog()
- {
- DatabaseDescriptor.createAllDirectories();
-
- allocator = new CommitLogSegmentManager();
+ final ICompressor compressor;
+ public ParametrizedClass compressorClass;
+ final public String location;
- executor = DatabaseDescriptor.getCommitLogSync() == Config.CommitLogSync.batch
- ? new BatchCommitLogService(this)
- : new PeriodicCommitLogService(this);
+ static private CommitLog construct()
+ {
+ CommitLog log = new CommitLog(DatabaseDescriptor.getCommitLogLocation(), new CommitLogArchiver());
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
try
{
- mbs.registerMBean(this, new ObjectName("org.apache.cassandra.db:type=Commitlog"));
+ mbs.registerMBean(log, new ObjectName("org.apache.cassandra.db:type=Commitlog"));
}
catch (Exception e)
{
throw new RuntimeException(e);
}
+ return log;
+ }
+
+ @VisibleForTesting
+ CommitLog(String location, CommitLogArchiver archiver)
+ {
+ compressorClass = DatabaseDescriptor.getCommitLogCompression();
+ this.location = location;
+ ICompressor compressor = compressorClass != null ? CompressionParameters.createCompressor(compressorClass) : null;
+ DatabaseDescriptor.createAllDirectories();
+
+ this.compressor = compressor;
+ this.archiver = archiver;
+ metrics = new CommitLogMetrics();
+
+ executor = DatabaseDescriptor.getCommitLogSync() == Config.CommitLogSync.batch
+ ? new BatchCommitLogService(this)
+ : new PeriodicCommitLogService(this);
+
+ allocator = new CommitLogSegmentManager(this);
+ executor.start();
// register metrics
- metrics = new CommitLogMetrics(executor, allocator);
+ metrics.attach(executor, allocator);
}
/**
@@ -102,7 +125,7 @@ public class CommitLog implements CommitLogMBean
// we used to try to avoid instantiating commitlog (thus creating an empty segment ready for writes)
// until after recover was finished. this turns out to be fragile; it is less error-prone to go
// ahead and allow writes before recover(), and just skip active segments when we do.
- return CommitLogDescriptor.isValid(name) && !instance.allocator.manages(name);
+ return CommitLogDescriptor.isValid(name) && !allocator.manages(name);
}
};
@@ -130,7 +153,7 @@ public class CommitLog implements CommitLogMBean
logger.info("Log replay complete, {} replayed mutations", replayed);
for (File f : files)
- CommitLog.instance.allocator.recycleSegment(f);
+ allocator.recycleSegment(f);
}
allocator.enableReserveSegmentCreation();
@@ -145,7 +168,7 @@ public class CommitLog implements CommitLogMBean
*/
public int recover(File... clogs) throws IOException
{
- CommitLogReplayer recovery = new CommitLogReplayer();
+ CommitLogReplayer recovery = CommitLogReplayer.create();
recovery.recover(clogs);
return recovery.blockForWrites();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8254d/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
index 363fcef..79316c7 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
@@ -32,10 +32,11 @@ import java.util.concurrent.*;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.compress.CompressionParameters;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.WrappedRunnable;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -203,7 +204,7 @@ public class CommitLogArchiver
CommitLogDescriptor descriptor;
if (fromHeader == null && fromName == null)
throw new IllegalStateException("Cannot safely construct descriptor for segment, either from its name or its header: " + fromFile.getPath());
- else if (fromHeader != null && fromName != null && !fromHeader.equals(fromName))
+ else if (fromHeader != null && fromName != null && !fromHeader.equalsIgnoringCompression(fromName))
throw new IllegalStateException(String.format("Cannot safely construct descriptor for segment, as name and header descriptors do not match (%s vs %s): %s", fromHeader, fromName, fromFile.getPath()));
else if (fromName != null && fromHeader == null && fromName.version >= CommitLogDescriptor.VERSION_21)
throw new IllegalStateException("Cannot safely construct descriptor for segment, as name descriptor implies a version that should contain a header descriptor, but that descriptor could not be read: " + fromFile.getPath());
@@ -214,6 +215,17 @@ public class CommitLogArchiver
if (descriptor.version > CommitLogDescriptor.VERSION_30)
throw new IllegalStateException("Unsupported commit log version: " + descriptor.version);
+ if (descriptor.compression != null) {
+ try
+ {
+ CompressionParameters.createCompressor(descriptor.compression);
+ }
+ catch (ConfigurationException e)
+ {
+ throw new IllegalStateException("Unknown compression", e);
+ }
+ }
+
File toFile = new File(DatabaseDescriptor.getCommitLogLocation(), descriptor.fileName());
if (toFile.exists())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8254d/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
index d127fb9..6e8c78c 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
@@ -20,20 +20,29 @@
*/
package org.apache.cassandra.db.commitlog;
+import java.io.DataInput;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.TreeMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.google.common.annotations.VisibleForTesting;
-
+import com.google.common.base.Objects;
import com.github.tjake.ICRC32;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.ParametrizedClass;
+import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.CRC32Factory;
+import org.json.simple.JSONValue;
public class CommitLogDescriptor
{
@@ -42,6 +51,8 @@ public class CommitLogDescriptor
private static final String FILENAME_EXTENSION = ".log";
// match both legacy and new version of commitlogs Ex: CommitLog-12345.log and CommitLog-4-12345.log.
private static final Pattern COMMIT_LOG_FILE_PATTERN = Pattern.compile(FILENAME_PREFIX + "((\\d+)(" + SEPARATOR + "\\d+)?)" + FILENAME_EXTENSION);
+ private static final String COMPRESSION_PARAMETERS_KEY = "compressionParameters";
+ private static final String COMPRESSION_CLASS_KEY = "compressionClass";
public static final int VERSION_12 = 2;
public static final int VERSION_20 = 3;
@@ -54,32 +65,55 @@ public class CommitLogDescriptor
@VisibleForTesting
public static final int current_version = VERSION_30;
- // [version, id, checksum]
- static final int HEADER_SIZE = 4 + 8 + 4;
-
final int version;
public final long id;
+ public final ParametrizedClass compression;
- public CommitLogDescriptor(int version, long id)
+ public CommitLogDescriptor(int version, long id, ParametrizedClass compression)
{
this.version = version;
this.id = id;
+ this.compression = compression;
}
- public CommitLogDescriptor(long id)
+ public CommitLogDescriptor(long id, ParametrizedClass compression)
{
- this(current_version, id);
+ this(current_version, id, compression);
}
- static void writeHeader(ByteBuffer out, CommitLogDescriptor descriptor)
+ public static void writeHeader(ByteBuffer out, CommitLogDescriptor descriptor)
{
- out.putInt(0, descriptor.version);
- out.putLong(4, descriptor.id);
ICRC32 crc = CRC32Factory.instance.create();
+ out.putInt(descriptor.version);
crc.updateInt(descriptor.version);
+ out.putLong(descriptor.id);
crc.updateInt((int) (descriptor.id & 0xFFFFFFFFL));
crc.updateInt((int) (descriptor.id >>> 32));
- out.putInt(12, crc.getCrc());
+ if (descriptor.version >= VERSION_30) {
+ String parametersString = constructParametersString(descriptor);
+ byte[] parametersBytes = parametersString.getBytes(StandardCharsets.UTF_8);
+ if (parametersBytes.length != (((short) parametersBytes.length) & 0xFFFF))
+ throw new ConfigurationException(String.format("Compression parameters too long, length %d cannot be above 65535.",
+ parametersBytes.length));
+ out.putShort((short) parametersBytes.length);
+ crc.updateInt(parametersBytes.length);
+ out.put(parametersBytes);
+ crc.update(parametersBytes, 0, parametersBytes.length);
+ } else
+ assert descriptor.compression == null;
+ out.putInt(crc.getCrc());
+ }
+
+ private static String constructParametersString(CommitLogDescriptor descriptor)
+ {
+ Map<String, Object> params = new TreeMap<String, Object>();
+ ParametrizedClass compression = descriptor.compression;
+ if (compression != null)
+ {
+ params.put(COMPRESSION_PARAMETERS_KEY, compression.parameters);
+ params.put(COMPRESSION_CLASS_KEY, compression.class_name);
+ }
+ return JSONValue.toJSONString(params);
}
public static CommitLogDescriptor fromHeader(File file)
@@ -87,16 +121,7 @@ public class CommitLogDescriptor
try (RandomAccessFile raf = new RandomAccessFile(file, "r"))
{
assert raf.getFilePointer() == 0;
- int version = raf.readInt();
- long id = raf.readLong();
- int crc = raf.readInt();
- ICRC32 checkcrc = CRC32Factory.instance.create();
- checkcrc.updateInt(version);
- checkcrc.updateInt((int) (id & 0xFFFFFFFFL));
- checkcrc.updateInt((int) (id >>> 32));
- if (crc == checkcrc.getCrc())
- return new CommitLogDescriptor(version, id);
- return null;
+ return readHeader(raf);
}
catch (EOFException e)
{
@@ -108,6 +133,44 @@ public class CommitLogDescriptor
}
}
+ public static CommitLogDescriptor readHeader(DataInput input) throws IOException
+ {
+ ICRC32 checkcrc = CRC32Factory.instance.create();
+ int version = input.readInt();
+ checkcrc.updateInt(version);
+ long id = input.readLong();
+ checkcrc.updateInt((int) (id & 0xFFFFFFFFL));
+ checkcrc.updateInt((int) (id >>> 32));
+ int parametersLength = 0;
+ if (version >= VERSION_30) {
+ parametersLength = input.readShort() & 0xFFFF;
+ checkcrc.updateInt(parametersLength);
+ }
+ // This should always succeed as parametersLength cannot be too long even for a
+ // corrupt segment file.
+ byte[] parametersBytes = new byte[parametersLength];
+ input.readFully(parametersBytes);
+ checkcrc.update(parametersBytes, 0, parametersBytes.length);
+ int crc = input.readInt();
+ if (crc == checkcrc.getCrc())
+ return new CommitLogDescriptor(version, id,
+ parseCompression((Map<?, ?>) JSONValue.parse(new String(parametersBytes, StandardCharsets.UTF_8))));
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ private static ParametrizedClass parseCompression(Map<?, ?> params)
+ {
+ if (params == null)
+ return null;
+ String className = (String) params.get(COMPRESSION_CLASS_KEY);
+ if (className == null)
+ return null;
+
+ Map<String, String> cparams = (Map<String, String>) params.get(COMPRESSION_PARAMETERS_KEY);
+ return new ParametrizedClass(className, cparams);
+ }
+
public static CommitLogDescriptor fromFileName(String name)
{
Matcher matcher;
@@ -118,7 +181,7 @@ public class CommitLogDescriptor
throw new UnsupportedOperationException("Commitlog segment is too old to open; upgrade to 1.2.5+ first");
long id = Long.parseLong(matcher.group(3).split(SEPARATOR)[1]);
- return new CommitLogDescriptor(Integer.parseInt(matcher.group(2)), id);
+ return new CommitLogDescriptor(Integer.parseInt(matcher.group(2)), id, null);
}
public int getMessagingVersion()
@@ -154,7 +217,7 @@ public class CommitLogDescriptor
public String toString()
{
- return "(" + version + "," + id + ")";
+ return "(" + version + "," + id + (compression != null ? "," + compression : "") + ")";
}
public boolean equals(Object that)
@@ -162,9 +225,14 @@ public class CommitLogDescriptor
return that instanceof CommitLogDescriptor && equals((CommitLogDescriptor) that);
}
- public boolean equals(CommitLogDescriptor that)
+ public boolean equalsIgnoringCompression(CommitLogDescriptor that)
{
return this.version == that.version && this.id == that.id;
}
+ public boolean equals(CommitLogDescriptor that)
+ {
+ return equalsIgnoringCompression(that) && Objects.equal(this.compression, that.compression);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8254d/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 7090e06..0aea866 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -18,7 +18,14 @@
*/
package org.apache.cassandra.db.commitlog;
-import java.io.*;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
@@ -28,20 +35,31 @@ import com.google.common.collect.HashMultimap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
import com.google.common.collect.Ordering;
+
import org.apache.commons.lang3.StringUtils;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.github.tjake.ICRC32;
+
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.compress.CompressionParameters;
+import org.apache.cassandra.io.compress.ICompressor;
+import org.apache.cassandra.io.util.ByteBufferDataInput;
import org.apache.cassandra.io.util.FastByteArrayInputStream;
+import org.apache.cassandra.io.util.FileDataInput;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.RandomAccessReader;
-import org.apache.cassandra.utils.*;
-
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.CRC32Factory;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.WrappedRunnable;
import org.cliffc.high_scale_lib.NonBlockingHashSet;
public class CommitLogReplayer
@@ -58,19 +76,26 @@ public class CommitLogReplayer
private final ReplayPosition globalPosition;
private final ICRC32 checksum;
private byte[] buffer;
+ private byte[] uncompressedBuffer;
- public CommitLogReplayer()
+ CommitLogReplayer(ReplayPosition globalPosition, Map<UUID, ReplayPosition> cfPositions)
{
this.keyspacesRecovered = new NonBlockingHashSet<Keyspace>();
this.futures = new ArrayList<Future<?>>();
this.buffer = new byte[4096];
+ this.uncompressedBuffer = new byte[4096];
this.invalidMutations = new HashMap<UUID, AtomicInteger>();
// count the number of replayed mutation. We don't really care about atomicity, but we need it to be a reference.
this.replayedCount = new AtomicInteger();
this.checksum = CRC32Factory.instance.create();
+ this.cfPositions = cfPositions;
+ this.globalPosition = globalPosition;
+ }
+ public static CommitLogReplayer create()
+ {
// compute per-CF and global replay positions
- cfPositions = new HashMap<UUID, ReplayPosition>();
+ Map<UUID, ReplayPosition> cfPositions = new HashMap<UUID, ReplayPosition>();
Ordering<ReplayPosition> replayPositionOrdering = Ordering.from(ReplayPosition.comparator);
for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
{
@@ -86,8 +111,9 @@ public class CommitLogReplayer
cfPositions.put(cfs.metadata.cfId, rp);
}
- globalPosition = replayPositionOrdering.min(cfPositions.values());
+ ReplayPosition globalPosition = replayPositionOrdering.min(cfPositions.values());
logger.debug("Global replay position is {} from columnfamilies {}", globalPosition, FBUtilities.toString(cfPositions));
+ return new CommitLogReplayer(globalPosition, cfPositions);
}
public void recover(File[] clogs) throws IOException
@@ -117,9 +143,7 @@ public class CommitLogReplayer
{
if (offset > reader.length() - CommitLogSegment.SYNC_MARKER_SIZE)
{
- if (offset != reader.length() && offset != Integer.MAX_VALUE)
- logger.warn("Encountered bad header at position {} of Commit log {}; not enough room for a header", offset, reader.getPath());
- // cannot possibly be a header here. if we're == length(), assume it's a correctly written final segment
+ // There was no room in the segment to write a final header. No data could be present here.
return -1;
}
reader.seek(offset);
@@ -128,11 +152,7 @@ public class CommitLogReplayer
crc.updateInt((int) (descriptor.id >>> 32));
crc.updateInt((int) reader.getPosition());
int end = reader.readInt();
- long filecrc;
- if (descriptor.version < CommitLogDescriptor.VERSION_21)
- filecrc = reader.readLong();
- else
- filecrc = reader.readInt() & 0xffffffffL;
+ long filecrc = reader.readInt() & 0xffffffffL;
if (crc.getValue() != filecrc)
{
if (end != 0 || filecrc != 0)
@@ -148,23 +168,8 @@ public class CommitLogReplayer
}
return end;
}
-
- private int getStartOffset(long segmentId, int version)
- {
- if (globalPosition.segment < segmentId)
- {
- if (version >= CommitLogDescriptor.VERSION_21)
- return CommitLogDescriptor.HEADER_SIZE + CommitLogSegment.SYNC_MARKER_SIZE;
- else
- return 0;
- }
- else if (globalPosition.segment == segmentId)
- return globalPosition.position;
- else
- return -1;
- }
-
- private abstract static class ReplayFilter
+
+ abstract static class ReplayFilter
{
public abstract Iterable<ColumnFamily> filter(Mutation mutation);
@@ -229,218 +234,302 @@ public class CommitLogReplayer
public void recover(File file) throws IOException
{
final ReplayFilter replayFilter = ReplayFilter.create();
- logger.info("Replaying {}", file.getPath());
CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName());
- final long segmentId = desc.id;
- logger.info("Replaying {} (CL version {}, messaging version {})",
- file.getPath(),
- desc.version,
- desc.getMessagingVersion());
RandomAccessReader reader = RandomAccessReader.open(new File(file.getAbsolutePath()));
-
try
{
- assert reader.length() <= Integer.MAX_VALUE;
- int offset = getStartOffset(segmentId, desc.version);
- if (offset < 0)
+ if (desc.version < CommitLogDescriptor.VERSION_21)
{
- logger.debug("skipping replay of fully-flushed {}", file);
+ if (logAndCheckIfShouldSkip(file, desc))
+ return;
+ if (globalPosition.segment == desc.id)
+ reader.seek(globalPosition.position);
+ replaySyncSection(reader, -1, desc, replayFilter);
return;
}
- int prevEnd = CommitLogDescriptor.HEADER_SIZE;
- main: while (true)
+ final long segmentId = desc.id;
+ try
+ {
+ desc = CommitLogDescriptor.readHeader(reader);
+ }
+ catch (IOException e)
{
+ desc = null;
+ }
+ if (desc == null) {
+ logger.warn("Could not read commit log descriptor in file {}", file);
+ return;
+ }
+ assert segmentId == desc.id;
+ if (logAndCheckIfShouldSkip(file, desc))
+ return;
- int end = prevEnd;
- if (desc.version < CommitLogDescriptor.VERSION_21)
- end = Integer.MAX_VALUE;
- else
+ ICompressor compressor = null;
+ if (desc.compression != null)
+ {
+ try
{
- do { end = readSyncMarker(desc, end, reader); }
- while (end < offset && end > prevEnd);
+ compressor = CompressionParameters.createCompressor(desc.compression);
}
+ catch (ConfigurationException e)
+ {
+ logger.warn("Unknown compression: {}", e.getMessage());
+ return;
+ }
+ }
- if (end < prevEnd)
- break;
-
- if (logger.isDebugEnabled())
- logger.debug("Replaying {} between {} and {}", file, offset, end);
+ assert reader.length() <= Integer.MAX_VALUE;
+ int end = (int) reader.getFilePointer();
+ int replayEnd = end;
- reader.seek(offset);
+ while ((end = readSyncMarker(desc, end, reader)) >= 0)
+ {
+ int replayPos = replayEnd + CommitLogSegment.SYNC_MARKER_SIZE;
- /* read the logs populate Mutation and apply */
- while (reader.getPosition() < end && !reader.isEOF())
+ if (logger.isDebugEnabled())
+ logger.trace("Replaying {} between {} and {}", file, reader.getFilePointer(), end);
+ if (compressor != null)
{
- if (logger.isDebugEnabled())
- logger.debug("Reading mutation at {}", reader.getFilePointer());
-
- long claimedCRC32;
- int serializedSize;
- try
- {
- // any of the reads may hit EOF
- serializedSize = reader.readInt();
- if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)
- {
- logger.debug("Encountered end of segment marker at {}", reader.getFilePointer());
- break main;
- }
-
- // Mutation must be at LEAST 10 bytes:
- // 3 each for a non-empty Keyspace and Key (including the
- // 2-byte length from writeUTF/writeWithShortLength) and 4 bytes for column count.
- // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128
- if (serializedSize < 10)
- break main;
-
- long claimedSizeChecksum;
- if (desc.version < CommitLogDescriptor.VERSION_21)
- claimedSizeChecksum = reader.readLong();
- else
- claimedSizeChecksum = reader.readInt() & 0xffffffffL;
- checksum.reset();
- if (desc.version < CommitLogDescriptor.VERSION_20)
- checksum.update(serializedSize);
- else
- checksum.updateInt(serializedSize);
-
- if (checksum.getValue() != claimedSizeChecksum)
- break main; // entry wasn't synced correctly/fully. that's
- // ok.
-
- if (serializedSize > buffer.length)
- buffer = new byte[(int) (1.2 * serializedSize)];
- reader.readFully(buffer, 0, serializedSize);
- if (desc.version < CommitLogDescriptor.VERSION_21)
- claimedCRC32 = reader.readLong();
- else
- claimedCRC32 = reader.readInt() & 0xffffffffL;
- }
- catch (EOFException eof)
- {
- break main; // last CL entry didn't get completely written. that's ok.
- }
+ int uncompressedLength = reader.readInt();
+ replayEnd = replayPos + uncompressedLength;
+ } else
+ {
+ replayEnd = end;
+ }
- checksum.update(buffer, 0, serializedSize);
- if (claimedCRC32 != checksum.getValue())
- {
- // this entry must not have been fsynced. probably the rest is bad too,
- // but just in case there is no harm in trying them (since we still read on an entry boundary)
- continue;
- }
+ if (segmentId == globalPosition.segment && replayEnd < globalPosition.position)
+ // Skip over flushed section.
+ continue;
- /* deserialize the commit log entry */
- FastByteArrayInputStream bufIn = new FastByteArrayInputStream(buffer, 0, serializedSize);
- final Mutation mutation;
+ FileDataInput sectionReader = reader;
+ if (compressor != null)
try
{
- mutation = Mutation.serializer.deserialize(new DataInputStream(bufIn),
- desc.getMessagingVersion(),
- ColumnSerializer.Flag.LOCAL);
- // doublecheck that what we read is [still] valid for the current schema
- for (ColumnFamily cf : mutation.getColumnFamilies())
- for (Cell cell : cf)
- cf.getComparator().validate(cell.name());
- }
- catch (UnknownColumnFamilyException ex)
- {
- if (ex.cfId == null)
- continue;
- AtomicInteger i = invalidMutations.get(ex.cfId);
- if (i == null)
- {
- i = new AtomicInteger(1);
- invalidMutations.put(ex.cfId, i);
- }
- else
- i.incrementAndGet();
- continue;
+ int start = (int) reader.getFilePointer();
+ int compressedLength = end - start;
+ if (logger.isDebugEnabled())
+ logger.trace("Decompressing {} between replay positions {} and {}",
+ file,
+ replayPos,
+ replayEnd);
+ if (compressedLength > buffer.length)
+ buffer = new byte[(int) (1.2 * compressedLength)];
+ reader.readFully(buffer, 0, compressedLength);
+ int uncompressedLength = replayEnd - replayPos;
+ if (uncompressedLength > uncompressedBuffer.length)
+ uncompressedBuffer = new byte[(int) (1.2 * uncompressedLength)];
+ compressedLength = compressor.uncompress(buffer, 0, compressedLength, uncompressedBuffer, 0);
+ sectionReader = new ByteBufferDataInput(ByteBuffer.wrap(uncompressedBuffer), reader.getPath(), replayPos, 0);
}
- catch (Throwable t)
+ catch (IOException e)
{
- JVMStabilityInspector.inspectThrowable(t);
- File f = File.createTempFile("mutation", "dat");
- DataOutputStream out = new DataOutputStream(new FileOutputStream(f));
- try
- {
- out.write(buffer, 0, serializedSize);
- }
- finally
- {
- out.close();
- }
- String st = String.format("Unexpected error deserializing mutation; saved to %s and ignored. This may be caused by replaying a mutation against a table with the same name but incompatible schema. Exception follows: ",
- f.getAbsolutePath());
- logger.error(st, t);
+ logger.error("Unexpected exception decompressing section {}", e);
continue;
}
- if (logger.isDebugEnabled())
- logger.debug("replaying mutation for {}.{}: {}", mutation.getKeyspaceName(), ByteBufferUtil.bytesToHex(mutation.key()), "{" + StringUtils.join(mutation.getColumnFamilies().iterator(), ", ") + "}");
+ if (!replaySyncSection(sectionReader, replayEnd, desc, replayFilter))
+ break;
+ }
+ }
+ finally
+ {
+ FileUtils.closeQuietly(reader);
+ logger.info("Finished reading {}", file);
+ }
+ }
- final long entryLocation = reader.getFilePointer();
- Runnable runnable = new WrappedRunnable()
- {
- public void runMayThrow() throws IOException
- {
- if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null)
- return;
- if (pointInTimeExceeded(mutation))
- return;
-
- final Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
-
- // Rebuild the mutation, omitting column families that
- // a) the user has requested that we ignore,
- // b) have already been flushed,
- // or c) are part of a cf that was dropped.
- // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead.
- Mutation newMutation = null;
- for (ColumnFamily columnFamily : replayFilter.filter(mutation))
- {
- if (Schema.instance.getCF(columnFamily.id()) == null)
- continue; // dropped
-
- ReplayPosition rp = cfPositions.get(columnFamily.id());
-
- // replay if current segment is newer than last flushed one or,
- // if it is the last known segment, if we are after the replay position
- if (segmentId > rp.segment || (segmentId == rp.segment && entryLocation > rp.position))
- {
- if (newMutation == null)
- newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key());
- newMutation.add(columnFamily);
- replayedCount.incrementAndGet();
- }
- }
- if (newMutation != null)
- {
- assert !newMutation.isEmpty();
- Keyspace.open(newMutation.getKeyspaceName()).apply(newMutation, false);
- keyspacesRecovered.add(keyspace);
- }
- }
- };
- futures.add(StageManager.getStage(Stage.MUTATION).submit(runnable));
- if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT)
- {
- FBUtilities.waitOnFutures(futures);
- futures.clear();
- }
+ public boolean logAndCheckIfShouldSkip(File file, CommitLogDescriptor desc)
+ {
+ logger.info("Replaying {} (CL version {}, messaging version {}, compression {})",
+ file.getPath(),
+ desc.version,
+ desc.getMessagingVersion(),
+ desc.compression);
+
+ if (globalPosition.segment > desc.id)
+ {
+ logger.debug("skipping replay of fully-flushed {}", file);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Replays a sync section containing a list of mutations.
+ *
+ * @return Whether replay should continue with the next section.
+ */
+ private boolean replaySyncSection(FileDataInput reader, int end, CommitLogDescriptor desc,
+ final ReplayFilter replayFilter) throws IOException, FileNotFoundException
+ {
+ /* read the logs populate Mutation and apply */
+ while (reader.getFilePointer() < end && !reader.isEOF())
+ {
+ if (logger.isDebugEnabled())
+ logger.trace("Reading mutation at {}", reader.getFilePointer());
+
+ long claimedCRC32;
+ int serializedSize;
+ try
+ {
+ // any of the reads may hit EOF
+ serializedSize = reader.readInt();
+ if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)
+ {
+ logger.debug("Encountered end of segment marker at {}", reader.getFilePointer());
+ return false;
}
+ // Mutation must be at LEAST 10 bytes:
+ // 3 each for a non-empty Keyspace and Key (including the
+ // 2-byte length from writeUTF/writeWithShortLength) and 4 bytes for column count.
+ // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128
+ if (serializedSize < 10)
+ return false;
+
+ long claimedSizeChecksum;
if (desc.version < CommitLogDescriptor.VERSION_21)
- break;
+ claimedSizeChecksum = reader.readLong();
+ else
+ claimedSizeChecksum = reader.readInt() & 0xffffffffL;
+ checksum.reset();
+ if (desc.version < CommitLogDescriptor.VERSION_20)
+ checksum.update(serializedSize);
+ else
+ checksum.updateInt(serializedSize);
+
+ if (checksum.getValue() != claimedSizeChecksum)
+ return false;
+ // ok.
- offset = end + CommitLogSegment.SYNC_MARKER_SIZE;
- prevEnd = end;
+ if (serializedSize > buffer.length)
+ buffer = new byte[(int) (1.2 * serializedSize)];
+ reader.readFully(buffer, 0, serializedSize);
+ if (desc.version < CommitLogDescriptor.VERSION_21)
+ claimedCRC32 = reader.readLong();
+ else
+ claimedCRC32 = reader.readInt() & 0xffffffffL;
+ }
+ catch (EOFException eof)
+ {
+ return false; // last CL entry didn't get completely written. that's ok.
}
+
+ checksum.update(buffer, 0, serializedSize);
+ if (claimedCRC32 != checksum.getValue())
+ {
+ // this entry must not have been fsynced. probably the rest is bad too,
+ // but just in case there is no harm in trying them (since we still read on an entry boundary)
+ continue;
+ }
+ replayMutation(buffer, serializedSize, reader.getFilePointer(), desc, replayFilter);
}
- finally
+ return true;
+ }
+
+ /**
+ * Deserializes and replays a commit log entry.
+ */
+ void replayMutation(byte[] inputBuffer, int size,
+ final long entryLocation, final CommitLogDescriptor desc, final ReplayFilter replayFilter) throws IOException,
+ FileNotFoundException
+ {
+ FastByteArrayInputStream bufIn = new FastByteArrayInputStream(inputBuffer, 0, size);
+ final Mutation mutation;
+ try
{
- FileUtils.closeQuietly(reader);
- logger.info("Finished reading {}", file);
+ mutation = Mutation.serializer.deserialize(new DataInputStream(bufIn),
+ desc.getMessagingVersion(),
+ ColumnSerializer.Flag.LOCAL);
+ // doublecheck that what we read is [still] valid for the current schema
+ for (ColumnFamily cf : mutation.getColumnFamilies())
+ for (Cell cell : cf)
+ cf.getComparator().validate(cell.name());
+ }
+ catch (UnknownColumnFamilyException ex)
+ {
+ if (ex.cfId == null)
+ return;
+ AtomicInteger i = invalidMutations.get(ex.cfId);
+ if (i == null)
+ {
+ i = new AtomicInteger(1);
+ invalidMutations.put(ex.cfId, i);
+ }
+ else
+ i.incrementAndGet();
+ return;
+ }
+ catch (Throwable t)
+ {
+ JVMStabilityInspector.inspectThrowable(t);
+ File f = File.createTempFile("mutation", "dat");
+ DataOutputStream out = new DataOutputStream(new FileOutputStream(f));
+ try
+ {
+ out.write(inputBuffer, 0, size);
+ }
+ finally
+ {
+ out.close();
+ }
+ String st = String.format("Unexpected error deserializing mutation; saved to %s and ignored. This may be caused by replaying a mutation against a table with the same name but incompatible schema. Exception follows: ",
+ f.getAbsolutePath());
+ logger.error(st, t);
+ return;
+ }
+
+ if (logger.isDebugEnabled())
+ logger.debug("replaying mutation for {}.{}: {}", mutation.getKeyspaceName(), ByteBufferUtil.bytesToHex(mutation.key()), "{" + StringUtils.join(mutation.getColumnFamilies().iterator(), ", ") + "}");
+
+ Runnable runnable = new WrappedRunnable()
+ {
+ public void runMayThrow() throws IOException
+ {
+ if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null)
+ return;
+ if (pointInTimeExceeded(mutation))
+ return;
+
+ final Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
+
+ // Rebuild the mutation, omitting column families that
+ // a) the user has requested that we ignore,
+ // b) have already been flushed,
+ // or c) are part of a cf that was dropped.
+ // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead.
+ Mutation newMutation = null;
+ for (ColumnFamily columnFamily : replayFilter.filter(mutation))
+ {
+ if (Schema.instance.getCF(columnFamily.id()) == null)
+ continue; // dropped
+
+ ReplayPosition rp = cfPositions.get(columnFamily.id());
+
+ // replay if current segment is newer than last flushed one or,
+ // if it is the last known segment, if we are after the replay position
+ if (desc.id > rp.segment || (desc.id == rp.segment && entryLocation > rp.position))
+ {
+ if (newMutation == null)
+ newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key());
+ newMutation.add(columnFamily);
+ replayedCount.incrementAndGet();
+ }
+ }
+ if (newMutation != null)
+ {
+ assert !newMutation.isEmpty();
+ Keyspace.open(newMutation.getKeyspaceName()).apply(newMutation, false);
+ keyspacesRecovered.add(keyspace);
+ }
+ }
+ };
+ futures.add(StageManager.getStage(Stage.MUTATION).submit(runnable));
+ if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT)
+ {
+ FBUtilities.waitOnFutures(futures);
+ futures.clear();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8254d/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index 1e5895b..d04690d 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -19,9 +19,7 @@ package org.apache.cassandra.db.commitlog;
import java.io.File;
import java.io.IOException;
-import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
@@ -35,9 +33,12 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
+import com.codahale.metrics.Timer;
import com.github.tjake.ICRC32;
+
import org.apache.cassandra.utils.CRC32Factory;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,7 +58,7 @@ import org.apache.cassandra.utils.concurrent.WaitQueue;
* as well as tracking the last mutation position of any "dirty" CFs covered by the segment file. Segment
* files are initially allocated to a fixed size and can grow to accomidate a larger value if necessary.
*/
-public class CommitLogSegment
+public abstract class CommitLogSegment
{
private static final Logger logger = LoggerFactory.getLogger(CommitLogSegment.class);
@@ -87,12 +88,13 @@ public class CommitLogSegment
// Everything before this offset has been synced and written. The SYNC_MARKER_SIZE bytes after
// each sync are reserved, and point forwards to the next such offset. The final
- // sync marker in a segment will be zeroed out, or point to EOF.
+ // sync marker in a segment will be zeroed out, or point to a position too close to the EOF to fit a marker.
private volatile int lastSyncedOffset;
- // the amount of the tail of the file we have allocated but not used - this is used when we discard a log segment
- // to ensure nobody writes to it after we've decided we're done with it
- private int discardedTailFrom;
+ // The end position of the buffer. Initially set to its capacity and updated to point to the last written position
+ // as the segment is being closed.
+ // No need to be volatile as writes are protected by appendOrder barrier.
+ private int endOfBuffer;
// a signal for writers to wait on to confirm the log message they provided has been written to disk
private final WaitQueue syncComplete = new WaitQueue();
@@ -105,20 +107,17 @@ public class CommitLogSegment
public final long id;
- private final File logFile;
- private final FileChannel channel;
- private final int fd;
+ final File logFile;
+ final FileChannel channel;
+ final int fd;
- private final MappedByteBuffer buffer;
+ ByteBuffer buffer;
public final CommitLogDescriptor descriptor;
- /**
- * @return a newly minted segment file
- */
- static CommitLogSegment freshSegment()
+ static CommitLogSegment createSegment(CommitLog commitLog)
{
- return new CommitLogSegment(null);
+ return commitLog.compressor != null ? new CompressedSegment(commitLog) : new MemoryMappedSegment(commitLog);
}
static long getNextId()
@@ -131,66 +130,32 @@ public class CommitLogSegment
*
* @param filePath if not null, recycles the existing file by renaming it and truncating it to CommitLog.SEGMENT_SIZE.
*/
- CommitLogSegment(String filePath)
+ CommitLogSegment(CommitLog commitLog)
{
id = getNextId();
- descriptor = new CommitLogDescriptor(id);
- logFile = new File(DatabaseDescriptor.getCommitLogLocation(), descriptor.fileName());
+ descriptor = new CommitLogDescriptor(id, commitLog.compressorClass);
+ logFile = new File(commitLog.location, descriptor.fileName());
try
{
- if (filePath != null)
- {
- File oldFile = new File(filePath);
-
- if (oldFile.exists())
- {
- logger.debug("Re-using discarded CommitLog segment for {} from {}", id, filePath);
- if (!oldFile.renameTo(logFile))
- throw new IOException("Rename from " + filePath + " to " + id + " failed");
- }
- else
- {
- logger.debug("Creating new CommitLog segment: {}", logFile);
- }
- }
-
- // Extend or truncate the file size to the standard segment size as we may have restarted after a segment
- // size configuration change, leaving "incorrectly" sized segments on disk.
- // NOTE: while we're using RAF to easily adjust file size, we need to avoid using RAF
- // for grabbing the FileChannel due to FILE_SHARE_DELETE flag bug on windows.
- // See: https://bugs.openjdk.java.net/browse/JDK-6357433 and CASSANDRA-8308
- if (logFile.length() != DatabaseDescriptor.getCommitLogSegmentSize())
- {
- try (RandomAccessFile raf = new RandomAccessFile(logFile, "rw"))
- {
- raf.setLength(DatabaseDescriptor.getCommitLogSegmentSize());
- }
- catch (IOException e)
- {
- throw new FSWriteError(e, logFile);
- }
- }
-
- channel = FileChannel.open(logFile.toPath(), StandardOpenOption.WRITE, StandardOpenOption.READ);
-
+ channel = FileChannel.open(logFile.toPath(), StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE);
fd = CLibrary.getfd(channel);
- buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, DatabaseDescriptor.getCommitLogSegmentSize());
-
- CommitLogDescriptor.writeHeader(buffer, descriptor);
-
- // mark the initial sync marker as uninitialised
- buffer.putInt(CommitLogDescriptor.HEADER_SIZE, 0);
- buffer.putLong(CommitLogDescriptor.HEADER_SIZE + 4, 0);
- allocatePosition.set(CommitLogDescriptor.HEADER_SIZE + SYNC_MARKER_SIZE);
- lastSyncedOffset = CommitLogDescriptor.HEADER_SIZE;
}
catch (IOException e)
{
throw new FSWriteError(e, logFile);
}
+
+ buffer = createBuffer(commitLog);
+ // write the header
+ CommitLogDescriptor.writeHeader(buffer, descriptor);
+ endOfBuffer = buffer.capacity();
+ lastSyncedOffset = buffer.position();
+ allocatePosition.set(lastSyncedOffset + SYNC_MARKER_SIZE);
}
+ abstract ByteBuffer createBuffer(CommitLog commitLog);
+
/**
* Allocate space in this buffer for the provided mutation, and return the allocated Allocation object.
* Returns null if there is not enough space in this segment, and a new segment is needed.
@@ -223,32 +188,42 @@ public class CommitLogSegment
{
int prev = allocatePosition.get();
int next = prev + size;
- if (next >= buffer.capacity())
+ if (next >= endOfBuffer)
return -1;
if (allocatePosition.compareAndSet(prev, next))
+ {
+ assert buffer != null;
return prev;
+ }
}
}
// ensures no more of this segment is writeable, by allocating any unused section at the end and marking it discarded
void discardUnusedTail()
{
- // we guard this with the OpOrdering instead of synchronised due to potential dead-lock with CLSM.advanceAllocatingFrom()
- // this actually isn't strictly necessary, as currently all calls to discardUnusedTail occur within a block
- // already protected by this OpOrdering, but to prevent future potential mistakes, we duplicate the protection here
- // so that the contract between discardUnusedTail() and sync() is more explicit.
+ // We guard this with the OpOrdering instead of synchronised due to potential dead-lock with CLSM.advanceAllocatingFrom()
+ // Ensures endOfBuffer update is reflected in the buffer end position picked up by sync().
+ // This actually isn't strictly necessary, as currently all calls to discardUnusedTail are executed either by the thread
+ // running sync or within a mutation already protected by this OpOrdering, but to prevent future potential mistakes,
+ // we duplicate the protection here so that the contract between discardUnusedTail() and sync() is more explicit.
try (OpOrder.Group group = appendOrder.start())
{
while (true)
{
int prev = allocatePosition.get();
- // we set allocatePosition past buffer.capacity() to make sure we always set discardedTailFrom
- int next = buffer.capacity() + 1;
- if (prev == next)
+
+ int next = endOfBuffer + 1;
+ if (prev >= next)
+ {
+ // Already stopped allocating, might also be closed.
+ assert buffer == null || prev == buffer.capacity() + 1;
return;
+ }
if (allocatePosition.compareAndSet(prev, next))
{
- discardedTailFrom = prev;
+ // Stopped allocating now. Can only succeed once, no further allocation or discardUnusedTail can succeed.
+ endOfBuffer = prev;
+ assert buffer != null && next == buffer.capacity() + 1;
return;
}
}
@@ -269,83 +244,61 @@ public class CommitLogSegment
*/
synchronized void sync()
{
- try
+ boolean close = false;
+ // check we have more work to do
+ if (allocatePosition.get() <= lastSyncedOffset + SYNC_MARKER_SIZE)
+ return;
+ // Note: Even if the very first allocation of this sync section failed, we still want to enter this
+ // to ensure the segment is closed. As allocatePosition is set to 1 beyond the capacity of the buffer,
+ // this will always be entered when a mutation allocation has been attempted after the marker allocation
+ // succeeded in the previous sync.
+ assert buffer != null; // Only close once.
+
+ int startMarker = lastSyncedOffset;
+ // Allocate a new sync marker; this is both necessary in itself, but also serves to demarcate
+ // the point at which we can safely consider records to have been completely written to.
+ int nextMarker = allocate(SYNC_MARKER_SIZE);
+ if (nextMarker < 0)
{
- // check we have more work to do
- if (allocatePosition.get() <= lastSyncedOffset + SYNC_MARKER_SIZE)
- return;
-
- // allocate a new sync marker; this is both necessary in itself, but also serves to demarcate
- // the point at which we can safely consider records to have been completely written to
- int nextMarker;
- nextMarker = allocate(SYNC_MARKER_SIZE);
- boolean close = false;
- if (nextMarker < 0)
- {
- // ensure no more of this CLS is writeable, and mark ourselves for closing
- discardUnusedTail();
- close = true;
-
- // wait for modifications guards both discardedTailFrom, and any outstanding appends
- waitForModifications();
-
- if (discardedTailFrom < buffer.capacity() - SYNC_MARKER_SIZE)
- {
- // if there's room in the discard section to write an empty header, use that as the nextMarker
- nextMarker = discardedTailFrom;
- }
- else
- {
- // not enough space left in the buffer, so mark the next sync marker as the EOF position
- nextMarker = buffer.capacity();
- }
- }
- else
- {
- waitForModifications();
- }
-
- assert nextMarker > lastSyncedOffset;
-
- // write previous sync marker to point to next sync marker
- // we don't chain the crcs here to ensure this method is idempotent if it fails
- int offset = lastSyncedOffset;
- final ICRC32 crc = CRC32Factory.instance.create();
- crc.updateInt((int) (id & 0xFFFFFFFFL));
- crc.updateInt((int) (id >>> 32));
- crc.updateInt(offset);
- buffer.putInt(offset, nextMarker);
- buffer.putInt(offset + 4, crc.getCrc());
-
- // zero out the next sync marker so replayer can cleanly exit
- if (nextMarker < buffer.capacity())
- {
- buffer.putInt(nextMarker, 0);
- buffer.putInt(nextMarker + 4, 0);
- }
+ // Ensure no more of this CLS is writeable, and mark ourselves for closing.
+ discardUnusedTail();
+ close = true;
+
+ // We use the buffer size as the synced position after a close instead of the end of the actual data
+ // to make sure we only close the buffer once.
+ // The endOfBuffer position may be incorrect at this point (to be written by another stalled thread).
+ nextMarker = buffer.capacity();
+ }
- // actually perform the sync and signal those waiting for it
- buffer.force();
+ // Wait for mutations to complete as well as endOfBuffer to have been written.
+ waitForModifications();
+ int sectionEnd = close ? endOfBuffer : nextMarker;
- if (close)
- nextMarker = buffer.capacity();
+ // Perform compression, writing to file and flush.
+ write(startMarker, sectionEnd);
- lastSyncedOffset = nextMarker;
- syncComplete.signalAll();
+ // Signal the sync as complete.
+ lastSyncedOffset = nextMarker;
+ if (close)
+ internalClose();
+ syncComplete.signalAll();
+ }
- CLibrary.trySkipCache(fd, offset, nextMarker);
- if (close)
- internalClose();
- }
- catch (Exception e) // MappedByteBuffer.force() does not declare IOException but can actually throw it
- {
- throw new FSWriteError(e, getPath());
- }
+ protected void writeSyncMarker(ByteBuffer buffer, int offset, int filePos, int nextMarker)
+ {
+ ICRC32 crc = CRC32Factory.instance.create();
+ crc.updateInt((int) (id & 0xFFFFFFFFL));
+ crc.updateInt((int) (id >>> 32));
+ crc.updateInt(filePos);
+ buffer.putInt(offset, nextMarker);
+ buffer.putInt(offset + 4, crc.getCrc());
}
+ abstract void write(int lastSyncedOffset, int nextMarker);
+
public boolean isStillAllocating()
{
- return allocatePosition.get() < buffer.capacity();
+ return allocatePosition.get() < endOfBuffer;
}
/**
@@ -357,28 +310,6 @@ public class CommitLogSegment
}
/**
- * Recycle processes an unneeded segment file for reuse.
- *
- * @return a new CommitLogSegment representing the newly reusable segment.
- */
- CommitLogSegment recycle()
- {
- try
- {
- sync();
- }
- catch (FSWriteError e)
- {
- logger.error("I/O error flushing {} {}", this, e.getMessage());
- throw e;
- }
-
- close();
-
- return new CommitLogSegment(getPath());
- }
-
- /**
* @return the current ReplayPosition for this log segment
*/
public ReplayPosition getContext()
@@ -407,7 +338,7 @@ public class CommitLogSegment
while (true)
{
WaitQueue.Signal signal = syncComplete.register();
- if (lastSyncedOffset < buffer.capacity())
+ if (lastSyncedOffset < endOfBuffer)
{
signal.awaitUninterruptibly();
}
@@ -419,24 +350,39 @@ public class CommitLogSegment
}
}
+ void waitForSync(int position, Timer waitingOnCommit)
+ {
+ while (lastSyncedOffset < position)
+ {
+ WaitQueue.Signal signal = waitingOnCommit != null ?
+ syncComplete.register(waitingOnCommit.time()) :
+ syncComplete.register();
+ if (lastSyncedOffset < position)
+ signal.awaitUninterruptibly();
+ else
+ signal.cancel();
+ }
+ }
+
/**
- * Close the segment file.
+ * Stop writing to this file, sync and close it. Does nothing if the file is already closed.
*/
synchronized void close()
{
discardUnusedTail();
- waitForModifications();
- lastSyncedOffset = buffer.capacity();
- internalClose();
+ sync();
+ assert buffer == null;
}
- void internalClose()
+ /**
+ * Close the segment file. Do not call from outside this class, use syncAndClose() instead.
+ */
+ protected void internalClose()
{
try
{
- if (FileUtils.isCleanerAvailable())
- FileUtils.clean(buffer);
channel.close();
+ buffer = null;
}
catch (IOException e)
{
@@ -632,16 +578,9 @@ public class CommitLogSegment
appendOp.close();
}
- void awaitDiskSync()
+ void awaitDiskSync(Timer waitingOnCommit)
{
- while (segment.lastSyncedOffset < position)
- {
- WaitQueue.Signal signal = segment.syncComplete.register(CommitLog.instance.metrics.waitingOnCommit.time());
- if (segment.lastSyncedOffset < position)
- signal.awaitUninterruptibly();
- else
- signal.cancel();
- }
+ segment.waitForSync(position, waitingOnCommit);
}
public ReplayPosition getReplayPosition()