You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sm...@apache.org on 2022/10/18 12:16:39 UTC
[cassandra] branch trunk updated: Make Cassandra logs able to be viewed in the virtual table system_views.system_logs
This is an automated email from the ASF dual-hosted git repository.
smiklosovic pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new c089818881 Make Cassandra logs able to be viewed in the virtual table system_views.system_logs
c089818881 is described below
commit c08981888181392017761606f4490cca7f977db9
Author: Stefan Miklosovic <sm...@apache.org>
AuthorDate: Wed Oct 5 17:02:34 2022 +0200
Make Cassandra logs able to be viewed in the virtual table system_views.system_logs
patch by Stefan Miklosovic; reviewed by Brandon Williams for CASSANDRA-17948
---
CHANGES.txt | 1 +
NEWS.txt | 2 +
conf/logback.xml | 11 ++
.../pages/configuration/cass_logback_xml_file.adoc | 33 +++-
.../config/CassandraRelevantProperties.java | 7 +-
.../cassandra/db/virtual/LogMessagesTable.java | 192 +++++++++++++++++++
.../apache/cassandra/db/virtual/SimpleDataSet.java | 10 +-
.../cassandra/db/virtual/SystemViewsKeyspace.java | 1 +
.../apache/cassandra/service/CassandraDaemon.java | 8 +
.../utils/logging/LogbackLoggingSupport.java | 45 +++++
.../cassandra/utils/logging/LoggingSupport.java | 8 +
.../utils/logging/VirtualTableAppender.java | 128 +++++++++++++
test/conf/logback-dtest_with_vtable_appender.xml | 66 +++++++
.../logback-dtest_with_vtable_appender_invalid.xml | 73 +++++++
.../cassandra/distributed/impl/Instance.java | 2 +
.../distributed/test/VirtualTableLogsTest.java | 129 +++++++++++++
.../cassandra/db/virtual/LogMessagesTableTest.java | 210 +++++++++++++++++++++
17 files changed, 921 insertions(+), 5 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index c3f0bcf534..1bae643c3e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.2
+ * Make Cassandra logs able to be viewed in the virtual table system_views.system_logs (CASSANDRA-17946)
* IllegalArgumentException in Gossiper#order due to concurrent mutations to elements being applied (CASSANDRA-17908)
* Include estimated active compaction remaining write size when starting a new compaction (CASSANDRA-17931)
* Mixed mode support for internode authentication during TLS upgrades (CASSANDRA-17923)
diff --git a/NEWS.txt b/NEWS.txt
index 727e711ec4..2442572d7e 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -92,6 +92,8 @@ New features
clear snapshots which are older than some period for example, "--older-than 5h" to remove
snapshots older than 5 hours and it is possible to clear all snapshots older than some timestamp, for example
--older-than-timestamp 2022-12-03T10:15:30Z.
+ - Cassandra logs can be viewed in the virtual table system_views.system_logs.
+ Please uncomment the respective appender in logback.xml file to make logs flow into this table. This feature is turned off by default.
Upgrading
---------
diff --git a/conf/logback.xml b/conf/logback.xml
index e98fea480f..102cf06352 100644
--- a/conf/logback.xml
+++ b/conf/logback.xml
@@ -111,6 +111,14 @@ appender reference in the root level section below.
<appender name="LogbackMetrics" class="com.codahale.metrics.logback.InstrumentedAppender" />
-->
+ <!-- Uncomment below configuration and corresponding appender-ref to activate
+ logging into system_views.system_logs virtual table. -->
+ <!-- <appender name="CQLLOG" class="org.apache.cassandra.utils.logging.VirtualTableAppender">
+ <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+ <level>WARN</level>
+ </filter>
+ </appender> -->
+
<root level="INFO">
<appender-ref ref="SYSTEMLOG" />
<appender-ref ref="STDOUT" />
@@ -118,6 +126,9 @@ appender reference in the root level section below.
<!--
<appender-ref ref="LogbackMetrics" />
-->
+ <!--
+ <appender-ref ref="CQLLOG"/>
+ -->
</root>
<logger name="org.apache.cassandra" level="DEBUG"/>
diff --git a/doc/modules/cassandra/pages/configuration/cass_logback_xml_file.adoc b/doc/modules/cassandra/pages/configuration/cass_logback_xml_file.adoc
index e673622099..7e64aea048 100644
--- a/doc/modules/cassandra/pages/configuration/cass_logback_xml_file.adoc
+++ b/doc/modules/cassandra/pages/configuration/cass_logback_xml_file.adoc
@@ -81,6 +81,27 @@ Specify the format of the message. Part of the rolling policy.
<pattern>%-5level [%thread] %date\{ISO8601} %F:%L - %msg%n</pattern>
</encoder>
+=== Logging to Cassandra virtual table
+
+It is possible to configure logback.xml in such a way that logs would appear in `system_views.system_log` table.
+This is achieved by appender implemented in class `VirtualTableAppender` which is called `CQLLOG` in the
+default configration. When the appender is commented out, no system logs are written to the virtual table.
+
+CQLLOG appender is special as the underlying structure it saves log messages into can not grow without any bound
+as a node would run out of memory. For this reason, `system_log` table is limited on its size.
+By default, it can hold at most 50 000 log messages, it can never hold more than 100 000 log messages.
+
+To specify how many rows you want that virtual table to hold at most, there is
+a system property called `cassandra.virtual.logs.max.rows` which takes an integer as value.
+
+You can execute CQL `truncate` query for `system_views.system_log` if you want to wipe out all the logs in virtual table
+to e.g. save some memory.
+
+It is recommended to set filter to at least `WARN` level so this table holds only important logging messages as
+each message will occupy memory.
+
+The appender to virtual table is commented out by default so logging to virtual table is not active.
+
=== Contents of default `logback.xml`
[source,XML]
@@ -151,6 +172,14 @@ Specify the format of the message. Part of the rolling policy.
<appender name="LogbackMetrics" class="com.codahale.metrics.logback.InstrumentedAppender" />
-->
+ <!-- Uncomment below configuration and corresponding appender-ref to activate
+ logging into system_views.system_logs virtual table. -->
+ <!-- <appender name="CQLLOG" class="org.apache.cassandra.utils.logging.VirtualTableAppender">
+ <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+ <level>WARN</level>
+ </filter>
+ </appender> -->
+
<root level="INFO">
<appender-ref ref="SYSTEMLOG" />
<appender-ref ref="STDOUT" />
@@ -158,9 +187,11 @@ Specify the format of the message. Part of the rolling policy.
<!--
<appender-ref ref="LogbackMetrics" />
-->
+ <!--
+ <appender-ref ref="CQLLOG"/>
+ -->
</root>
<logger name="org.apache.cassandra" level="DEBUG"/>
- <logger name="com.thinkaurelius.thrift" level="ERROR"/>
</configuration>
----
diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
index 49df7404e7..d7c543da67 100644
--- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
+++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.config;
import java.util.concurrent.TimeUnit;
+import org.apache.cassandra.db.virtual.LogMessagesTable;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.service.FileSystemOwnershipCheck;
@@ -299,8 +300,10 @@ public enum CassandraRelevantProperties
ORG_APACHE_CASSANDRA_DB_VIRTUAL_SYSTEM_PROPERTIES_TABLE_TEST("org.apache.cassandra.db.virtual.SystemPropertiesTableTest"),
// Loosen the definition of "empty" for gossip state, for use during host replacements if things go awry
- LOOSE_DEF_OF_EMPTY_ENABLED(Config.PROPERTY_PREFIX + "gossiper.loose_empty_enabled");
- ;
+ LOOSE_DEF_OF_EMPTY_ENABLED(Config.PROPERTY_PREFIX + "gossiper.loose_empty_enabled"),
+
+ // Maximum number of rows in system_views.logs table
+ LOGS_VIRTUAL_TABLE_MAX_ROWS("cassandra.virtual.logs.max.rows", Integer.toString(LogMessagesTable.LOGS_VIRTUAL_TABLE_DEFAULT_ROWS));
CassandraRelevantProperties(String key, String defaultVal)
diff --git a/src/java/org/apache/cassandra/db/virtual/LogMessagesTable.java b/src/java/org/apache/cassandra/db/virtual/LogMessagesTable.java
new file mode 100644
index 0000000000..cd7999b308
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/virtual/LogMessagesTable.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.virtual;
+
+import java.util.Collections;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import ch.qos.logback.classic.spi.LoggingEvent;
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.TimestampType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.dht.LocalPartitioner;
+import org.apache.cassandra.schema.TableMetadata;
+
+/**
+ * Virtual table for holding Cassandra logs. Entries to this table are added via log appender.
+ * <p>
+ * The virtual table is bounded in its size. If a new log message is appended to virtual table,
+ * the oldest one is removed.
+ * <p>
+ * This virtual table can be truncated.
+ *
+ * @see org.apache.cassandra.utils.logging.VirtualTableAppender
+ */
+public final class LogMessagesTable extends AbstractMutableVirtualTable
+{
+ private static final Logger logger = LoggerFactory.getLogger(LogMessagesTable.class);
+
+ public static final int LOGS_VIRTUAL_TABLE_MIN_ROWS = 1000;
+ public static final int LOGS_VIRTUAL_TABLE_DEFAULT_ROWS = 50_000;
+ public static final int LOGS_VIRTUAL_TABLE_MAX_ROWS = 100_000;
+
+ public static final String TABLE_NAME = "system_logs";
+ private static final String TABLE_COMMENT = "Cassandra logs";
+
+ public static final String TIMESTAMP_COLUMN_NAME = "timestamp";
+ public static final String LOGGER_COLUMN_NAME = "logger";
+ public static final String ORDER_IN_MILLISECOND_COLUMN_NAME = "order_in_millisecond";
+ public static final String LEVEL_COLUMN_NAME = "level";
+ public static final String MESSAGE_COLUMN_NAME = "message";
+
+ private final List<LogMessage> buffer;
+
+ LogMessagesTable(String keyspace)
+ {
+ this(keyspace, resolveBufferSize());
+ }
+
+ @VisibleForTesting
+ LogMessagesTable(String keyspace, int size)
+ {
+ super(TableMetadata.builder(keyspace, TABLE_NAME)
+ .comment(TABLE_COMMENT)
+ .kind(TableMetadata.Kind.VIRTUAL)
+ .partitioner(new LocalPartitioner(TimestampType.instance))
+ .addPartitionKeyColumn(TIMESTAMP_COLUMN_NAME, TimestampType.instance)
+ .addClusteringColumn(ORDER_IN_MILLISECOND_COLUMN_NAME, Int32Type.instance)
+ .addRegularColumn(LOGGER_COLUMN_NAME, UTF8Type.instance)
+ .addRegularColumn(LEVEL_COLUMN_NAME, UTF8Type.instance)
+ .addRegularColumn(MESSAGE_COLUMN_NAME, UTF8Type.instance).build());
+
+ logger.debug("capacity of virtual table {} is set to be at most {} rows", metadata().toString(), size);
+ buffer = BoundedLinkedList.create(size);
+ }
+
+ @Override
+ public DataSet data()
+ {
+ SimpleDataSet result = new SimpleDataSet(metadata(), DecoratedKey.comparator.reversed());
+
+ synchronized (buffer)
+ {
+ long milliSecondsOfPreviousLog = 0;
+ long milliSecondsOfCurrentLog;
+
+ int index = 0;
+
+ Iterator<LogMessage> iterator = buffer.listIterator();
+ while (iterator.hasNext())
+ {
+ LogMessage log = iterator.next();
+
+ milliSecondsOfCurrentLog = log.timestamp;
+ if (milliSecondsOfPreviousLog == milliSecondsOfCurrentLog)
+ ++index;
+ else
+ index = 0;
+
+ milliSecondsOfPreviousLog = milliSecondsOfCurrentLog;
+
+ result.row(new Date(log.timestamp), index)
+ .column(LOGGER_COLUMN_NAME, log.logger)
+ .column(LEVEL_COLUMN_NAME, log.level)
+ .column(MESSAGE_COLUMN_NAME, log.message);
+ }
+ }
+
+ return result;
+ }
+
+ public void add(LoggingEvent event)
+ {
+ buffer.add(new LogMessage(event));
+ }
+
+ @Override
+ public void truncate()
+ {
+ buffer.clear();
+ }
+
+ @VisibleForTesting
+ static int resolveBufferSize()
+ {
+ int size = CassandraRelevantProperties.LOGS_VIRTUAL_TABLE_MAX_ROWS.getInt();
+ return (size < LOGS_VIRTUAL_TABLE_MIN_ROWS || size > LOGS_VIRTUAL_TABLE_MAX_ROWS)
+ ? LOGS_VIRTUAL_TABLE_DEFAULT_ROWS : size;
+ }
+
+ @VisibleForTesting
+ public static class LogMessage
+ {
+ public final long timestamp;
+ public final String logger;
+ public final String level;
+ public final String message;
+
+ public LogMessage(LoggingEvent event)
+ {
+ this(event.getTimeStamp(), event.getLoggerName(), event.getLevel().toString(), event.getFormattedMessage());
+ }
+
+ public LogMessage(long timestamp, String logger, String level, String message)
+ {
+ this.timestamp = timestamp;
+ this.logger = logger;
+ this.level = level;
+ this.message = message;
+ }
+ }
+
+ private static final class BoundedLinkedList<T> extends LinkedList<T>
+ {
+ private final int maxSize;
+
+ public static <T> List<T> create(int size)
+ {
+ return Collections.synchronizedList(new BoundedLinkedList<>(size));
+ }
+
+ private BoundedLinkedList(int maxSize)
+ {
+ this.maxSize = maxSize;
+ }
+
+ @Override
+ public boolean add(T t)
+ {
+ if (size() == maxSize)
+ removeLast();
+
+ addFirst(t);
+
+ return true;
+ }
+ }
+}
diff --git a/src/java/org/apache/cassandra/db/virtual/SimpleDataSet.java b/src/java/org/apache/cassandra/db/virtual/SimpleDataSet.java
index 715f4f89d7..6f3052d8be 100644
--- a/src/java/org/apache/cassandra/db/virtual/SimpleDataSet.java
+++ b/src/java/org/apache/cassandra/db/virtual/SimpleDataSet.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.db.virtual;
import java.nio.ByteBuffer;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -54,12 +55,17 @@ public class SimpleDataSet extends AbstractVirtualTable.AbstractDataSet
private Row currentRow;
- public SimpleDataSet(TableMetadata metadata)
+ public SimpleDataSet(TableMetadata metadata, Comparator<DecoratedKey> comparator)
{
- super(new TreeMap<>(DecoratedKey.comparator));
+ super(new TreeMap<>(comparator));
this.metadata = metadata;
}
+ public SimpleDataSet(TableMetadata metadata)
+ {
+ this(metadata, DecoratedKey.comparator);
+ }
+
public SimpleDataSet row(Object... primaryKeyValues)
{
if (Iterables.size(metadata.primaryKeyColumns()) != primaryKeyValues.length)
diff --git a/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
index 59a0aba809..d2aac53764 100644
--- a/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
+++ b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
@@ -48,6 +48,7 @@ public final class SystemViewsKeyspace extends VirtualKeyspace
.add(new StreamingVirtualTable(VIRTUAL_VIEWS))
.add(new GossipInfoTable(VIRTUAL_VIEWS))
.add(new QueriesTable(VIRTUAL_VIEWS))
+ .add(new LogMessagesTable(VIRTUAL_VIEWS))
.addAll(LocalRepairTables.getAll(VIRTUAL_VIEWS))
.build());
}
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 90b9496138..3ca42513a6 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -88,6 +88,8 @@ import org.apache.cassandra.utils.Mx4jTool;
import org.apache.cassandra.utils.NativeLibrary;
import org.apache.cassandra.utils.concurrent.Future;
import org.apache.cassandra.utils.concurrent.FutureCombiner;
+import org.apache.cassandra.utils.logging.LoggingSupportFactory;
+import org.apache.cassandra.utils.logging.VirtualTableAppender;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.apache.cassandra.config.CassandraRelevantProperties.CASSANDRA_FOREGROUND;
@@ -566,6 +568,12 @@ public class CassandraDaemon
{
VirtualKeyspaceRegistry.instance.register(VirtualSchemaKeyspace.instance);
VirtualKeyspaceRegistry.instance.register(SystemViewsKeyspace.instance);
+
+ // flush log messages to system_views.system_logs virtual table as there were messages already logged
+ // before that virtual table was instantiated
+ LoggingSupportFactory.getLoggingSupport()
+ .getAppender(VirtualTableAppender.class, VirtualTableAppender.APPENDER_NAME)
+ .ifPresent(appender -> ((VirtualTableAppender) appender).flushBuffer());
}
public void scrubDataDirectories() throws StartupException
diff --git a/src/java/org/apache/cassandra/utils/logging/LogbackLoggingSupport.java b/src/java/org/apache/cassandra/utils/logging/LogbackLoggingSupport.java
index eda9153e95..e710d44dd1 100644
--- a/src/java/org/apache/cassandra/utils/logging/LogbackLoggingSupport.java
+++ b/src/java/org/apache/cassandra/utils/logging/LogbackLoggingSupport.java
@@ -20,8 +20,11 @@ package org.apache.cassandra.utils.logging;
import java.lang.management.ManagementFactory;
import java.security.AccessControlException;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
+import java.util.Optional;
import javax.management.JMX;
import javax.management.ObjectName;
@@ -57,6 +60,8 @@ public class LogbackLoggingSupport implements LoggingSupport
@Override
public void onStartup()
{
+ checkOnlyOneVirtualTableAppender();
+
// The default logback configuration in conf/logback.xml allows reloading the
// configuration when the configuration file has changed (every 60 seconds by default).
// This requires logback to use file I/O APIs. But file I/O is not allowed from UDFs.
@@ -132,6 +137,46 @@ public class LogbackLoggingSupport implements LoggingSupport
return logLevelMaps;
}
+ @Override
+ public Optional<Appender<?>> getAppender(Class<?> appenderClass, String name)
+ {
+ LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
+ for (Logger logBackLogger : lc.getLoggerList())
+ {
+ for (Iterator<Appender<ILoggingEvent>> iterator = logBackLogger.iteratorForAppenders(); iterator.hasNext();)
+ {
+ Appender<ILoggingEvent> appender = iterator.next();
+ if (appender.getClass() == appenderClass && appender.getName().equals(name))
+ return Optional.of(appender);
+ }
+ }
+
+ return Optional.empty();
+ }
+
+ private void checkOnlyOneVirtualTableAppender()
+ {
+ int count = 0;
+ LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
+ List<String> virtualAppenderNames = new ArrayList<>();
+ for (Logger logBackLogger : lc.getLoggerList())
+ {
+ for (Iterator<Appender<ILoggingEvent>> iterator = logBackLogger.iteratorForAppenders(); iterator.hasNext();)
+ {
+ Appender<?> appender = iterator.next();
+ if (appender instanceof VirtualTableAppender)
+ {
+ virtualAppenderNames.add(appender.getName());
+ count += 1;
+ }
+ }
+ }
+
+ if (count > 1)
+ throw new IllegalStateException(String.format("There are multiple appenders of class %s of names %s. There is only one appender of such class allowed.",
+ VirtualTableAppender.class.getName(), String.join(",", virtualAppenderNames)));
+ }
+
private boolean hasAppenders(Logger logBackLogger)
{
Iterator<Appender<ILoggingEvent>> it = logBackLogger.iteratorForAppenders();
diff --git a/src/java/org/apache/cassandra/utils/logging/LoggingSupport.java b/src/java/org/apache/cassandra/utils/logging/LoggingSupport.java
index 8ea83be004..35e11975f9 100644
--- a/src/java/org/apache/cassandra/utils/logging/LoggingSupport.java
+++ b/src/java/org/apache/cassandra/utils/logging/LoggingSupport.java
@@ -19,6 +19,9 @@
package org.apache.cassandra.utils.logging;
import java.util.Map;
+import java.util.Optional;
+
+import ch.qos.logback.core.Appender;
/**
* Common abstraction of functionality which can be implemented for different logging backend implementations (slf4j bindings).
@@ -49,4 +52,9 @@ public interface LoggingSupport
* @return a map of logger names and their associated log level as string representations.
*/
Map<String, String> getLoggingLevels();
+
+ default Optional<Appender<?>> getAppender(Class<?> appenderClass, String appenderName)
+ {
+ return Optional.empty();
+ }
}
diff --git a/src/java/org/apache/cassandra/utils/logging/VirtualTableAppender.java b/src/java/org/apache/cassandra/utils/logging/VirtualTableAppender.java
new file mode 100644
index 0000000000..2820b2936f
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/logging/VirtualTableAppender.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils.logging;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableSet;
+
+import ch.qos.logback.classic.spi.LoggingEvent;
+import ch.qos.logback.core.AppenderBase;
+import org.apache.cassandra.audit.FileAuditLogger;
+import org.apache.cassandra.db.virtual.LogMessagesTable;
+import org.apache.cassandra.db.virtual.VirtualKeyspace;
+import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
+import org.apache.cassandra.db.virtual.VirtualTable;
+
+import static org.apache.cassandra.db.virtual.LogMessagesTable.LOGS_VIRTUAL_TABLE_DEFAULT_ROWS;
+import static org.apache.cassandra.db.virtual.LogMessagesTable.TABLE_NAME;
+import static org.apache.cassandra.schema.SchemaConstants.VIRTUAL_VIEWS;
+
+/**
+ * Appends Cassandra logs to virtual table system_views.system_logs
+ */
+public final class VirtualTableAppender extends AppenderBase<LoggingEvent>
+{
+ public static final String APPENDER_NAME = "CQLLOG";
+
+ private static final Set<String> forbiddenLoggers = ImmutableSet.of(FileAuditLogger.class.getName());
+
+ private LogMessagesTable logs;
+
+ // for holding messages until virtual registry contains logs virtual table
+ // as it takes some time during startup of a node to initialise virtual tables but messages are
+ // logged already
+ private final List<LoggingEvent> messageBuffer = new LinkedList<>();
+
+ @Override
+ protected void append(LoggingEvent eventObject)
+ {
+ if (!forbiddenLoggers.contains(eventObject.getLoggerName()))
+ {
+ if (logs == null)
+ {
+ logs = getVirtualTable();
+ if (logs == null)
+ addToBuffer(eventObject);
+ else
+ logs.add(eventObject);
+ }
+ else
+ logs.add(eventObject);
+ }
+ }
+
+ @Override
+ public void stop()
+ {
+ messageBuffer.clear();
+ super.stop();
+ }
+
+ /**
+ * Flushes all logs which were appended before virtual table was registered.
+ *
+ * @see org.apache.cassandra.service.CassandraDaemon#setupVirtualKeyspaces
+ */
+ public void flushBuffer()
+ {
+ Optional.ofNullable(getVirtualTable()).ifPresent(vtable -> {
+ messageBuffer.forEach(vtable::add);
+ messageBuffer.clear();
+ });
+ }
+
+ private LogMessagesTable getVirtualTable()
+ {
+ VirtualKeyspace keyspace = VirtualKeyspaceRegistry.instance.getKeyspaceNullable(VIRTUAL_VIEWS);
+
+ if (keyspace == null)
+ return null;
+
+ Optional<VirtualTable> logsTable = keyspace.tables()
+ .stream()
+ .filter(vt -> vt.name().equals(TABLE_NAME))
+ .findFirst();
+
+ if (!logsTable.isPresent())
+ return null;
+
+ VirtualTable vt = logsTable.get();
+
+ if (!(vt instanceof LogMessagesTable))
+ throw new IllegalStateException(String.format("Virtual table %s.%s is not backed by an instance of %s but by %s",
+ VIRTUAL_VIEWS,
+ TABLE_NAME,
+ LogMessagesTable.class.getName(),
+ vt.getClass().getName()));
+
+ return (LogMessagesTable) vt;
+ }
+
+ private void addToBuffer(LoggingEvent eventObject)
+ {
+ // we restrict how many logging events we can put into buffer,
+ // so we are not growing without any bound when things go south
+ if (messageBuffer.size() < LOGS_VIRTUAL_TABLE_DEFAULT_ROWS)
+ messageBuffer.add(eventObject);
+ }
+}
diff --git a/test/conf/logback-dtest_with_vtable_appender.xml b/test/conf/logback-dtest_with_vtable_appender.xml
new file mode 100644
index 0000000000..c9fd108c77
--- /dev/null
+++ b/test/conf/logback-dtest_with_vtable_appender.xml
@@ -0,0 +1,66 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<configuration debug="false" scan="true" scanPeriod="60 seconds">
+ <define name="cluster_id" class="org.apache.cassandra.distributed.impl.ClusterIDDefiner" />
+ <define name="instance_id" class="org.apache.cassandra.distributed.impl.InstanceIDDefiner" />
+
+ <!-- Shutdown hook ensures that async appender flushes -->
+ <shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/>
+
+ <appender name="INSTANCEFILE" class="ch.qos.logback.core.FileAppender">
+ <file>./build/test/logs/${cassandra.testtag}/${suitename}/${cluster_id}/${instance_id}/system.log</file>
+ <encoder>
+ <pattern>%-5level [%thread] ${instance_id} %date{ISO8601} %msg%n</pattern>
+ </encoder>
+ <immediateFlush>true</immediateFlush>
+ </appender>
+
+ <appender name="INSTANCESTDERR" target="System.err" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%-5level %date{HH:mm:ss,SSS} %msg%n</pattern>
+ </encoder>
+ <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+ <level>WARN</level>
+ </filter>
+ </appender>
+
+ <appender name="INSTANCESTDOUT" target="System.out" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%-5level [%thread] ${instance_id} %date{ISO8601} %F:%L - %msg%n</pattern>
+ </encoder>
+ <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+ <level>DEBUG</level>
+ </filter>
+ </appender>
+
+ <logger name="org.apache.hadoop" level="WARN"/>
+
+ <appender name="CQLLOG" class="org.apache.cassandra.utils.logging.VirtualTableAppender">
+ <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+ <level>INFO</level>
+ </filter>
+ </appender>
+
+ <root level="DEBUG">
+ <appender-ref ref="INSTANCEFILE" /> <!-- use blocking to avoid race conditions with appending and searching -->
+ <appender-ref ref="INSTANCESTDERR" />
+ <appender-ref ref="INSTANCESTDOUT" />
+ <appender-ref ref="CQLLOG" />
+ </root>
+</configuration>
diff --git a/test/conf/logback-dtest_with_vtable_appender_invalid.xml b/test/conf/logback-dtest_with_vtable_appender_invalid.xml
new file mode 100644
index 0000000000..1b30c141c2
--- /dev/null
+++ b/test/conf/logback-dtest_with_vtable_appender_invalid.xml
@@ -0,0 +1,73 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<configuration debug="false" scan="true" scanPeriod="60 seconds">
+ <define name="cluster_id" class="org.apache.cassandra.distributed.impl.ClusterIDDefiner" />
+ <define name="instance_id" class="org.apache.cassandra.distributed.impl.InstanceIDDefiner" />
+
+ <!-- Shutdown hook ensures that async appender flushes -->
+ <shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/>
+
+ <appender name="INSTANCEFILE" class="ch.qos.logback.core.FileAppender">
+ <file>./build/test/logs/${cassandra.testtag}/${suitename}/${cluster_id}/${instance_id}/system.log</file>
+ <encoder>
+ <pattern>%-5level [%thread] ${instance_id} %date{ISO8601} %msg%n</pattern>
+ </encoder>
+ <immediateFlush>true</immediateFlush>
+ </appender>
+
+ <appender name="INSTANCESTDERR" target="System.err" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%-5level %date{HH:mm:ss,SSS} %msg%n</pattern>
+ </encoder>
+ <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+ <level>WARN</level>
+ </filter>
+ </appender>
+
+ <appender name="INSTANCESTDOUT" target="System.out" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%-5level [%thread] ${instance_id} %date{ISO8601} %F:%L - %msg%n</pattern>
+ </encoder>
+ <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+ <level>DEBUG</level>
+ </filter>
+ </appender>
+
+ <logger name="org.apache.hadoop" level="WARN"/>
+
+ <appender name="CQLLOG" class="org.apache.cassandra.utils.logging.VirtualTableAppender">
+ <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+ <level>INFO</level>
+ </filter>
+ </appender>
+
+ <appender name="CQLLOG2" class="org.apache.cassandra.utils.logging.VirtualTableAppender">
+ <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+ <level>INFO</level>
+ </filter>
+ </appender>
+
+ <root level="DEBUG">
+ <appender-ref ref="INSTANCEFILE" /> <!-- use blocking to avoid race conditions with appending and searching -->
+ <appender-ref ref="INSTANCESTDERR" />
+ <appender-ref ref="INSTANCESTDOUT" />
+ <appender-ref ref="CQLLOG" />
+ <appender-ref ref="CQLLOG2" /> <!-- invalid, we can not have multiple appenders of VirtualTableAppender class -->
+ </root>
+</configuration>
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 2c3123546f..cba3807237 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -114,6 +114,7 @@ import org.apache.cassandra.net.Verb;
import org.apache.cassandra.schema.MigrationCoordinator;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.security.ThreadAwareSecurityManager;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.CassandraDaemon;
import org.apache.cassandra.service.ClientState;
@@ -590,6 +591,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
DistributedTestSnitch.assign(config.networkTopology());
DatabaseDescriptor.daemonInitialization();
+ ThreadAwareSecurityManager.install();
FileUtils.setFSErrorHandler(new DefaultFSErrorHandler());
DatabaseDescriptor.createAllDirectories();
CassandraDaemon.getInstanceForTesting().migrateSystemDataIfNeeded();
diff --git a/test/distributed/org/apache/cassandra/distributed/test/VirtualTableLogsTest.java b/test/distributed/org/apache/cassandra/distributed/test/VirtualTableLogsTest.java
new file mode 100644
index 0000000000..17cd994256
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/VirtualTableLogsTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Test;
+
+import ch.qos.logback.classic.Level;
+import org.apache.cassandra.db.virtual.LogMessagesTable;
+import org.apache.cassandra.db.virtual.LogMessagesTable.LogMessage;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.utils.logging.VirtualTableAppender;
+
+import static java.lang.String.format;
+import static org.apache.cassandra.db.virtual.LogMessagesTable.LEVEL_COLUMN_NAME;
+import static org.apache.cassandra.db.virtual.LogMessagesTable.LOGGER_COLUMN_NAME;
+import static org.apache.cassandra.db.virtual.LogMessagesTable.MESSAGE_COLUMN_NAME;
+import static org.apache.cassandra.db.virtual.LogMessagesTable.ORDER_IN_MILLISECOND_COLUMN_NAME;
+import static org.apache.cassandra.db.virtual.LogMessagesTable.TIMESTAMP_COLUMN_NAME;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ONE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class VirtualTableLogsTest extends TestBaseImpl
+{
+ @Test
+ public void testVTableOutput() throws Throwable
+ {
+ System.setProperty("logback.configurationFile", "test/conf/logback-dtest_with_vtable_appender.xml");
+
+ try (Cluster cluster = Cluster.build(1)
+ .withConfig(c -> c.with(Feature.values()))
+ .start())
+ {
+ List<TestingLogMessage> rows = getRows(cluster);
+ assertFalse(rows.isEmpty());
+
+ rows.forEach(message -> assertTrue(Level.toLevel(message.level).isGreaterOrEqual(Level.INFO)));
+ }
+ finally
+ {
+ System.clearProperty("logback.configurationFile");
+ }
+ }
+
+ @Test
+ public void testMultipleAppendersFailToStartNode() throws Throwable
+ {
+ System.setProperty("logback.configurationFile", "test/conf/logback-dtest_with_vtable_appender_invalid.xml");
+
+ try (Cluster ignored = Cluster.build(1)
+ .withConfig(c -> c.with(Feature.values()))
+ .start())
+ {
+ fail("Node should not start as there is supposed to be invalid logback configuration file.");
+ }
+ catch (IllegalStateException ex)
+ {
+ assertEquals(format("There are multiple appenders of class %s " +
+ "of names CQLLOG,CQLLOG2. There is only one appender of such class allowed.",
+ VirtualTableAppender.class.getName()),
+ ex.getMessage());
+ }
+ finally
+ {
+ System.clearProperty("logback.configurationFile");
+ }
+ }
+
+ private List<TestingLogMessage> getRows(Cluster cluster)
+ {
+ SimpleQueryResult simpleQueryResult = cluster.coordinator(1).executeWithResult(query("select * from %s"), ONE);
+ List<TestingLogMessage> rows = new ArrayList<>();
+ simpleQueryResult.forEachRemaining(row -> {
+ long timestamp = row.getTimestamp(TIMESTAMP_COLUMN_NAME).getTime();
+ String logger = row.getString(LOGGER_COLUMN_NAME);
+ String level = row.getString(LEVEL_COLUMN_NAME);
+ String message = row.getString(MESSAGE_COLUMN_NAME);
+ int order = row.getInteger(ORDER_IN_MILLISECOND_COLUMN_NAME);
+ TestingLogMessage logMessage = new TestingLogMessage(timestamp, logger, level, message, order);
+ rows.add(logMessage);
+ });
+ return rows;
+ }
+
+ private String query(String template)
+ {
+ return format(template, getTableName());
+ }
+
+ private String getTableName()
+ {
+ return format("%s.%s", SchemaConstants.VIRTUAL_VIEWS, LogMessagesTable.TABLE_NAME);
+ }
+
+ private static class TestingLogMessage extends LogMessage
+ {
+ private int order;
+
+ public TestingLogMessage(long timestamp, String logger, String level, String message, int order)
+ {
+ super(timestamp, logger, level, message);
+ this.order = order;
+ }
+ }
+}
diff --git a/test/unit/org/apache/cassandra/db/virtual/LogMessagesTableTest.java b/test/unit/org/apache/cassandra/db/virtual/LogMessagesTableTest.java
new file mode 100644
index 0000000000..2225c4a1dc
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/virtual/LogMessagesTableTest.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.virtual;
+
+import java.time.Instant;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import ch.qos.logback.classic.Level;
+import ch.qos.logback.classic.spi.LoggingEvent;
+import com.datastax.driver.core.Row;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.DataRange;
+import org.apache.cassandra.db.marshal.TimestampType;
+import org.apache.cassandra.db.virtual.AbstractVirtualTable.DataSet;
+import org.apache.cassandra.db.virtual.AbstractVirtualTable.Partition;
+import org.apache.cassandra.dht.LocalPartitioner;
+
+import static org.apache.cassandra.config.CassandraRelevantProperties.LOGS_VIRTUAL_TABLE_MAX_ROWS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class LogMessagesTableTest extends CQLTester
+{
+ private String keyspace = createKeyspaceName();
+ private LogMessagesTable table;
+
+ @BeforeClass
+ public static void setup()
+ {
+ CQLTester.setUpClass();
+ }
+
+ @Test
+ public void testTruncate() throws Throwable
+ {
+ registerVirtualTable();
+
+ int numberOfRows = 100;
+ List<LoggingEvent> loggingEvents = getLoggingEvents(numberOfRows);
+ loggingEvents.forEach(table::add);
+
+ execute(query("truncate %s"));
+
+ assertTrue(executeNet(query("select timestamp from %s")).all().isEmpty());
+ }
+
+ @Test
+ public void empty() throws Throwable
+ {
+ registerVirtualTable();
+ assertEmpty(execute(query("select * from %s")));
+ }
+
+ @Test
+ public void testInsert()
+ {
+ registerVirtualTable();
+
+ int numberOfRows = 1000;
+ List<LoggingEvent> loggingEvents = getLoggingEvents(numberOfRows);
+ loggingEvents.forEach(table::add);
+
+ assertEquals(numberOfRows, numberOfPartitions());
+ }
+
+ @Test
+ public void testLimitedCapacity() throws Throwable
+ {
+ registerVirtualTable(100);
+
+ int numberOfRows = 1000;
+ List<LoggingEvent> loggingEvents = getLoggingEvents(numberOfRows);
+ loggingEvents.forEach(table::add);
+
+ // even we inserted 1000 rows, only 100 are present as its capacity is bounded
+ assertEquals(100, numberOfPartitions());
+
+ // the first record in the table will be the last one which we inserted
+ LoggingEvent firstEvent = loggingEvents.get(999);
+ assertRowsNet(executeNet(query("select timestamp from %s limit 1")),
+ new Object[] { new Date(firstEvent.getTimeStamp()) });
+
+ // the last record in the table will be 900th we inserted
+ List<Row> all = executeNet(query("select timestamp from %s")).all();
+ assertEquals(100, all.size());
+ Row row = all.get(all.size() - 1);
+ Date timestamp = row.getTimestamp(0);
+ assertEquals(loggingEvents.get(900).getTimeStamp(), timestamp.getTime());
+ }
+
+ @Test
+ public void testMultipleLogsInSameMillisecond()
+ {
+ registerVirtualTable(10);
+ List<LoggingEvent> loggingEvents = getLoggingEvents(10, Instant.now(), 5);
+ loggingEvents.forEach(table::add);
+
+ // 2 partitions, 5 rows in each
+ assertEquals(2, numberOfPartitions());
+ }
+
+ @Test
+ public void testResolvingBufferSize()
+ {
+ System.setProperty(LOGS_VIRTUAL_TABLE_MAX_ROWS.getKey(), "-1");
+ assertEquals(LogMessagesTable.LOGS_VIRTUAL_TABLE_DEFAULT_ROWS, LogMessagesTable.resolveBufferSize());
+
+ System.setProperty(LOGS_VIRTUAL_TABLE_MAX_ROWS.getKey(), "0");
+ assertEquals(LogMessagesTable.LOGS_VIRTUAL_TABLE_DEFAULT_ROWS, LogMessagesTable.resolveBufferSize());
+
+ System.setProperty(LOGS_VIRTUAL_TABLE_MAX_ROWS.getKey(), "1000001");
+ assertEquals(LogMessagesTable.LOGS_VIRTUAL_TABLE_DEFAULT_ROWS, LogMessagesTable.resolveBufferSize());
+
+ System.setProperty(LOGS_VIRTUAL_TABLE_MAX_ROWS.getKey(), "999");
+ assertEquals(LogMessagesTable.LOGS_VIRTUAL_TABLE_DEFAULT_ROWS, LogMessagesTable.resolveBufferSize());
+
+ System.setProperty(LOGS_VIRTUAL_TABLE_MAX_ROWS.getKey(), "50001");
+ assertEquals(50001, LogMessagesTable.resolveBufferSize());
+ }
+
+ private void registerVirtualTable()
+ {
+ registerVirtualTable(LogMessagesTable.LOGS_VIRTUAL_TABLE_MIN_ROWS);
+ }
+
+ private void registerVirtualTable(int size)
+ {
+ table = new LogMessagesTable(keyspace, size);
+ VirtualKeyspaceRegistry.instance.register(new VirtualKeyspace(keyspace, ImmutableList.of(table)));
+ }
+
+ private int numberOfPartitions()
+ {
+ DataSet data = table.data();
+
+ Iterator<Partition> partitions = data.getPartitions(DataRange.allData(new LocalPartitioner(TimestampType.instance)));
+
+ int numberOfPartitions = 0;
+
+ while (partitions.hasNext())
+ {
+ partitions.next();
+ numberOfPartitions += 1;
+ }
+
+ return numberOfPartitions;
+ }
+
+ private String query(String query)
+ {
+ return String.format(query, table.toString());
+ }
+
+ private List<LoggingEvent> getLoggingEvents(int size)
+ {
+ return getLoggingEvents(size, Instant.now(), 1);
+ }
+
+ private List<LoggingEvent> getLoggingEvents(int size, Instant firstTimestamp, int logsInMillisecond)
+ {
+ List<LoggingEvent> logs = new LinkedList<>();
+ int partitions = size / logsInMillisecond;
+
+ for (int i = 0; i < partitions; i++)
+ {
+ long timestamp = firstTimestamp.toEpochMilli();
+ firstTimestamp = firstTimestamp.plusSeconds(1);
+
+ for (int j = 0; j < logsInMillisecond; j++)
+ logs.add(getLoggingEvent(timestamp));
+ }
+
+ return logs;
+ }
+
+ private LoggingEvent getLoggingEvent(long timestamp)
+ {
+ LoggingEvent event = new LoggingEvent();
+ event.setLevel(Level.INFO);
+ event.setMessage("message " + timestamp);
+ event.setLoggerName("logger " + timestamp);
+ event.setThreadName(Thread.currentThread().getName());
+ event.setTimeStamp(timestamp);
+
+ return event;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org