You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sm...@apache.org on 2022/10/18 12:16:39 UTC

[cassandra] branch trunk updated: Make Cassandra logs able to be viewed in the virtual table system_views.system_logs

This is an automated email from the ASF dual-hosted git repository.

smiklosovic pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c089818881 Make Cassandra logs able to be viewed in the virtual table system_views.system_logs
c089818881 is described below

commit c08981888181392017761606f4490cca7f977db9
Author: Stefan Miklosovic <sm...@apache.org>
AuthorDate: Wed Oct 5 17:02:34 2022 +0200

    Make Cassandra logs able to be viewed in the virtual table system_views.system_logs
    
    patch by Stefan Miklosovic; reviewed by Brandon Williams for CASSANDRA-17948
---
 CHANGES.txt                                        |   1 +
 NEWS.txt                                           |   2 +
 conf/logback.xml                                   |  11 ++
 .../pages/configuration/cass_logback_xml_file.adoc |  33 +++-
 .../config/CassandraRelevantProperties.java        |   7 +-
 .../cassandra/db/virtual/LogMessagesTable.java     | 192 +++++++++++++++++++
 .../apache/cassandra/db/virtual/SimpleDataSet.java |  10 +-
 .../cassandra/db/virtual/SystemViewsKeyspace.java  |   1 +
 .../apache/cassandra/service/CassandraDaemon.java  |   8 +
 .../utils/logging/LogbackLoggingSupport.java       |  45 +++++
 .../cassandra/utils/logging/LoggingSupport.java    |   8 +
 .../utils/logging/VirtualTableAppender.java        | 128 +++++++++++++
 test/conf/logback-dtest_with_vtable_appender.xml   |  66 +++++++
 .../logback-dtest_with_vtable_appender_invalid.xml |  73 +++++++
 .../cassandra/distributed/impl/Instance.java       |   2 +
 .../distributed/test/VirtualTableLogsTest.java     | 129 +++++++++++++
 .../cassandra/db/virtual/LogMessagesTableTest.java | 210 +++++++++++++++++++++
 17 files changed, 921 insertions(+), 5 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index c3f0bcf534..1bae643c3e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.2
+ * Make Cassandra logs able to be viewed in the virtual table system_views.system_logs (CASSANDRA-17946)
  * IllegalArgumentException in Gossiper#order due to concurrent mutations to elements being applied (CASSANDRA-17908)
  * Include estimated active compaction remaining write size when starting a new compaction (CASSANDRA-17931)
  * Mixed mode support for internode authentication during TLS upgrades (CASSANDRA-17923)
diff --git a/NEWS.txt b/NEWS.txt
index 727e711ec4..2442572d7e 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -92,6 +92,8 @@ New features
       clear snapshots which are older than some period for example, "--older-than 5h" to remove 
       snapshots older than 5 hours and it is possible to clear all snapshots older than some timestamp, for example
       --older-than-timestamp 2022-12-03T10:15:30Z.
+    - Cassandra logs can be viewed in the virtual table system_views.system_logs.
+      Please uncomment the respective appender in logback.xml file to make logs flow into this table. This feature is turned off by default.
 
 Upgrading
 ---------
diff --git a/conf/logback.xml b/conf/logback.xml
index e98fea480f..102cf06352 100644
--- a/conf/logback.xml
+++ b/conf/logback.xml
@@ -111,6 +111,14 @@ appender reference in the root level section below.
   <appender name="LogbackMetrics" class="com.codahale.metrics.logback.InstrumentedAppender" />
    -->
 
+  <!-- Uncomment below configuration and corresponding appender-ref to activate
+  logging into system_views.system_logs virtual table. -->
+  <!-- <appender name="CQLLOG" class="org.apache.cassandra.utils.logging.VirtualTableAppender">
+    <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+      <level>WARN</level>
+    </filter>
+  </appender> -->
+
   <root level="INFO">
     <appender-ref ref="SYSTEMLOG" />
     <appender-ref ref="STDOUT" />
@@ -118,6 +126,9 @@ appender reference in the root level section below.
     <!--
     <appender-ref ref="LogbackMetrics" />
     -->
+    <!--
+    <appender-ref ref="CQLLOG"/>
+    -->
   </root>
 
   <logger name="org.apache.cassandra" level="DEBUG"/>
diff --git a/doc/modules/cassandra/pages/configuration/cass_logback_xml_file.adoc b/doc/modules/cassandra/pages/configuration/cass_logback_xml_file.adoc
index e673622099..7e64aea048 100644
--- a/doc/modules/cassandra/pages/configuration/cass_logback_xml_file.adoc
+++ b/doc/modules/cassandra/pages/configuration/cass_logback_xml_file.adoc
@@ -81,6 +81,27 @@ Specify the format of the message. Part of the rolling policy.
 <pattern>%-5level [%thread] %date\{ISO8601} %F:%L - %msg%n</pattern>
 </encoder>
 
+=== Logging to Cassandra virtual table
+
+It is possible to configure logback.xml in such a way that logs would appear in `system_views.system_log` table.
+This is achieved by appender implemented in class `VirtualTableAppender` which is called `CQLLOG` in the
+default configration. When the appender is commented out, no system logs are written to the virtual table.
+
+CQLLOG appender is special as the underlying structure it saves log messages into can not grow without any bound
+as a node would run out of memory. For this reason, `system_log` table is limited on its size.
+By default, it can hold at most 50 000 log messages, it can never hold more than 100 000 log messages.
+
+To specify how many rows you want that virtual table to hold at most, there is
+a system property called `cassandra.virtual.logs.max.rows` which takes an integer as value.
+
+You can execute CQL `truncate` query for `system_views.system_log` if you want to wipe out all the logs in virtual table
+to e.g. save some memory.
+
+It is recommended to set filter to at least `WARN` level so this table holds only important logging messages as
+each message will occupy memory.
+
+The appender to virtual table is commented out by default so logging to virtual table is not active.
+
 === Contents of default `logback.xml`
 
 [source,XML]
@@ -151,6 +172,14 @@ Specify the format of the message. Part of the rolling policy.
   <appender name="LogbackMetrics" class="com.codahale.metrics.logback.InstrumentedAppender" />
    -->
 
+  <!-- Uncomment below configuration and corresponding appender-ref to activate
+  logging into system_views.system_logs virtual table. -->
+  <!-- <appender name="CQLLOG" class="org.apache.cassandra.utils.logging.VirtualTableAppender">
+    <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+      <level>WARN</level>
+    </filter>
+  </appender> -->
+
   <root level="INFO">
     <appender-ref ref="SYSTEMLOG" />
     <appender-ref ref="STDOUT" />
@@ -158,9 +187,11 @@ Specify the format of the message. Part of the rolling policy.
     <!--
     <appender-ref ref="LogbackMetrics" />
     -->
+    <!--
+    <appender-ref ref="CQLLOG"/>
+    -->
   </root>
 
   <logger name="org.apache.cassandra" level="DEBUG"/>
-  <logger name="com.thinkaurelius.thrift" level="ERROR"/>
 </configuration>
 ----
diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
index 49df7404e7..d7c543da67 100644
--- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
+++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.config;
 
 import java.util.concurrent.TimeUnit;
 
+import org.apache.cassandra.db.virtual.LogMessagesTable;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.service.FileSystemOwnershipCheck;
 
@@ -299,8 +300,10 @@ public enum CassandraRelevantProperties
     ORG_APACHE_CASSANDRA_DB_VIRTUAL_SYSTEM_PROPERTIES_TABLE_TEST("org.apache.cassandra.db.virtual.SystemPropertiesTableTest"),
 
     // Loosen the definition of "empty" for gossip state, for use during host replacements if things go awry
-    LOOSE_DEF_OF_EMPTY_ENABLED(Config.PROPERTY_PREFIX + "gossiper.loose_empty_enabled");
-    ;
+    LOOSE_DEF_OF_EMPTY_ENABLED(Config.PROPERTY_PREFIX + "gossiper.loose_empty_enabled"),
+
+    // Maximum number of rows in system_views.logs table
+    LOGS_VIRTUAL_TABLE_MAX_ROWS("cassandra.virtual.logs.max.rows", Integer.toString(LogMessagesTable.LOGS_VIRTUAL_TABLE_DEFAULT_ROWS));
 
 
     CassandraRelevantProperties(String key, String defaultVal)
diff --git a/src/java/org/apache/cassandra/db/virtual/LogMessagesTable.java b/src/java/org/apache/cassandra/db/virtual/LogMessagesTable.java
new file mode 100644
index 0000000000..cd7999b308
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/virtual/LogMessagesTable.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.virtual;
+
+import java.util.Collections;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import ch.qos.logback.classic.spi.LoggingEvent;
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.TimestampType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.dht.LocalPartitioner;
+import org.apache.cassandra.schema.TableMetadata;
+
+/**
+ * Virtual table for holding Cassandra logs. Entries to this table are added via log appender.
+ * <p>
+ * The virtual table is bounded in its size. If a new log message is appended to virtual table,
+ * the oldest one is removed.
+ * <p>
+ * This virtual table can be truncated.
+ *
+ * @see org.apache.cassandra.utils.logging.VirtualTableAppender
+ */
+public final class LogMessagesTable extends AbstractMutableVirtualTable
+{
+    private static final Logger logger = LoggerFactory.getLogger(LogMessagesTable.class);
+
+    public static final int LOGS_VIRTUAL_TABLE_MIN_ROWS = 1000;
+    public static final int LOGS_VIRTUAL_TABLE_DEFAULT_ROWS = 50_000;
+    public static final int LOGS_VIRTUAL_TABLE_MAX_ROWS = 100_000;
+
+    public static final String TABLE_NAME = "system_logs";
+    private static final String TABLE_COMMENT = "Cassandra logs";
+
+    public static final String TIMESTAMP_COLUMN_NAME = "timestamp";
+    public static final String LOGGER_COLUMN_NAME = "logger";
+    public static final String ORDER_IN_MILLISECOND_COLUMN_NAME = "order_in_millisecond";
+    public static final String LEVEL_COLUMN_NAME = "level";
+    public static final String MESSAGE_COLUMN_NAME = "message";
+
+    private final List<LogMessage> buffer;
+
+    LogMessagesTable(String keyspace)
+    {
+        this(keyspace, resolveBufferSize());
+    }
+
+    @VisibleForTesting
+    LogMessagesTable(String keyspace, int size)
+    {
+        super(TableMetadata.builder(keyspace, TABLE_NAME)
+                           .comment(TABLE_COMMENT)
+                           .kind(TableMetadata.Kind.VIRTUAL)
+                           .partitioner(new LocalPartitioner(TimestampType.instance))
+                           .addPartitionKeyColumn(TIMESTAMP_COLUMN_NAME, TimestampType.instance)
+                           .addClusteringColumn(ORDER_IN_MILLISECOND_COLUMN_NAME, Int32Type.instance)
+                           .addRegularColumn(LOGGER_COLUMN_NAME, UTF8Type.instance)
+                           .addRegularColumn(LEVEL_COLUMN_NAME, UTF8Type.instance)
+                           .addRegularColumn(MESSAGE_COLUMN_NAME, UTF8Type.instance).build());
+
+        logger.debug("capacity of virtual table {} is set to be at most {} rows", metadata().toString(), size);
+        buffer = BoundedLinkedList.create(size);
+    }
+
+    @Override
+    public DataSet data()
+    {
+        SimpleDataSet result = new SimpleDataSet(metadata(), DecoratedKey.comparator.reversed());
+
+        synchronized (buffer)
+        {
+            long milliSecondsOfPreviousLog = 0;
+            long milliSecondsOfCurrentLog;
+
+            int index = 0;
+
+            Iterator<LogMessage> iterator = buffer.listIterator();
+            while (iterator.hasNext())
+            {
+                LogMessage log = iterator.next();
+
+                milliSecondsOfCurrentLog = log.timestamp;
+                if (milliSecondsOfPreviousLog == milliSecondsOfCurrentLog)
+                    ++index;
+                else
+                    index = 0;
+
+                milliSecondsOfPreviousLog = milliSecondsOfCurrentLog;
+
+                result.row(new Date(log.timestamp), index)
+                      .column(LOGGER_COLUMN_NAME, log.logger)
+                      .column(LEVEL_COLUMN_NAME, log.level)
+                      .column(MESSAGE_COLUMN_NAME, log.message);
+            }
+        }
+
+        return result;
+    }
+
+    public void add(LoggingEvent event)
+    {
+        buffer.add(new LogMessage(event));
+    }
+
+    @Override
+    public void truncate()
+    {
+        buffer.clear();
+    }
+
+    @VisibleForTesting
+    static int resolveBufferSize()
+    {
+        int size = CassandraRelevantProperties.LOGS_VIRTUAL_TABLE_MAX_ROWS.getInt();
+        return (size < LOGS_VIRTUAL_TABLE_MIN_ROWS || size > LOGS_VIRTUAL_TABLE_MAX_ROWS)
+               ? LOGS_VIRTUAL_TABLE_DEFAULT_ROWS : size;
+    }
+
+    @VisibleForTesting
+    public static class LogMessage
+    {
+        public final long timestamp;
+        public final String logger;
+        public final String level;
+        public final String message;
+
+        public LogMessage(LoggingEvent event)
+        {
+            this(event.getTimeStamp(), event.getLoggerName(), event.getLevel().toString(), event.getFormattedMessage());
+        }
+
+        public LogMessage(long timestamp, String logger, String level, String message)
+        {
+            this.timestamp = timestamp;
+            this.logger = logger;
+            this.level = level;
+            this.message = message;
+        }
+    }
+
+    private static final class BoundedLinkedList<T> extends LinkedList<T>
+    {
+        private final int maxSize;
+
+        public static <T> List<T> create(int size)
+        {
+            return Collections.synchronizedList(new BoundedLinkedList<>(size));
+        }
+
+        private BoundedLinkedList(int maxSize)
+        {
+            this.maxSize = maxSize;
+        }
+
+        @Override
+        public boolean add(T t)
+        {
+            if (size() == maxSize)
+                removeLast();
+
+            addFirst(t);
+
+            return true;
+        }
+    }
+}
diff --git a/src/java/org/apache/cassandra/db/virtual/SimpleDataSet.java b/src/java/org/apache/cassandra/db/virtual/SimpleDataSet.java
index 715f4f89d7..6f3052d8be 100644
--- a/src/java/org/apache/cassandra/db/virtual/SimpleDataSet.java
+++ b/src/java/org/apache/cassandra/db/virtual/SimpleDataSet.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.db.virtual;
 
 import java.nio.ByteBuffer;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -54,12 +55,17 @@ public class SimpleDataSet extends AbstractVirtualTable.AbstractDataSet
 
     private Row currentRow;
 
-    public SimpleDataSet(TableMetadata metadata)
+    public SimpleDataSet(TableMetadata metadata, Comparator<DecoratedKey> comparator)
     {
-        super(new TreeMap<>(DecoratedKey.comparator));
+        super(new TreeMap<>(comparator));
         this.metadata = metadata;
     }
 
+    public SimpleDataSet(TableMetadata metadata)
+    {
+        this(metadata, DecoratedKey.comparator);
+    }
+
     public SimpleDataSet row(Object... primaryKeyValues)
     {
         if (Iterables.size(metadata.primaryKeyColumns()) != primaryKeyValues.length)
diff --git a/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
index 59a0aba809..d2aac53764 100644
--- a/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
+++ b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
@@ -48,6 +48,7 @@ public final class SystemViewsKeyspace extends VirtualKeyspace
                     .add(new StreamingVirtualTable(VIRTUAL_VIEWS))
                     .add(new GossipInfoTable(VIRTUAL_VIEWS))
                     .add(new QueriesTable(VIRTUAL_VIEWS))
+                    .add(new LogMessagesTable(VIRTUAL_VIEWS))
                     .addAll(LocalRepairTables.getAll(VIRTUAL_VIEWS))
                     .build());
     }
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 90b9496138..3ca42513a6 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -88,6 +88,8 @@ import org.apache.cassandra.utils.Mx4jTool;
 import org.apache.cassandra.utils.NativeLibrary;
 import org.apache.cassandra.utils.concurrent.Future;
 import org.apache.cassandra.utils.concurrent.FutureCombiner;
+import org.apache.cassandra.utils.logging.LoggingSupportFactory;
+import org.apache.cassandra.utils.logging.VirtualTableAppender;
 
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static org.apache.cassandra.config.CassandraRelevantProperties.CASSANDRA_FOREGROUND;
@@ -566,6 +568,12 @@ public class CassandraDaemon
     {
         VirtualKeyspaceRegistry.instance.register(VirtualSchemaKeyspace.instance);
         VirtualKeyspaceRegistry.instance.register(SystemViewsKeyspace.instance);
+
+        // flush log messages to system_views.system_logs virtual table as there were messages already logged
+        // before that virtual table was instantiated
+        LoggingSupportFactory.getLoggingSupport()
+                             .getAppender(VirtualTableAppender.class, VirtualTableAppender.APPENDER_NAME)
+                             .ifPresent(appender -> ((VirtualTableAppender) appender).flushBuffer());
     }
 
     public void scrubDataDirectories() throws StartupException
diff --git a/src/java/org/apache/cassandra/utils/logging/LogbackLoggingSupport.java b/src/java/org/apache/cassandra/utils/logging/LogbackLoggingSupport.java
index eda9153e95..e710d44dd1 100644
--- a/src/java/org/apache/cassandra/utils/logging/LogbackLoggingSupport.java
+++ b/src/java/org/apache/cassandra/utils/logging/LogbackLoggingSupport.java
@@ -20,8 +20,11 @@ package org.apache.cassandra.utils.logging;
 
 import java.lang.management.ManagementFactory;
 import java.security.AccessControlException;
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import javax.management.JMX;
 import javax.management.ObjectName;
@@ -57,6 +60,8 @@ public class LogbackLoggingSupport implements LoggingSupport
     @Override
     public void onStartup()
     {
+        checkOnlyOneVirtualTableAppender();
+
         // The default logback configuration in conf/logback.xml allows reloading the
         // configuration when the configuration file has changed (every 60 seconds by default).
         // This requires logback to use file I/O APIs. But file I/O is not allowed from UDFs.
@@ -132,6 +137,46 @@ public class LogbackLoggingSupport implements LoggingSupport
         return logLevelMaps;
     }
 
+    @Override
+    public Optional<Appender<?>> getAppender(Class<?> appenderClass, String name)
+    {
+        LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
+        for (Logger logBackLogger : lc.getLoggerList())
+        {
+            for (Iterator<Appender<ILoggingEvent>> iterator = logBackLogger.iteratorForAppenders(); iterator.hasNext();)
+            {
+                Appender<ILoggingEvent> appender = iterator.next();
+                if (appender.getClass() == appenderClass && appender.getName().equals(name))
+                    return Optional.of(appender);
+            }
+        }
+
+        return Optional.empty();
+    }
+
+    private void checkOnlyOneVirtualTableAppender()
+    {
+        int count = 0;
+        LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
+        List<String> virtualAppenderNames = new ArrayList<>();
+        for (Logger logBackLogger : lc.getLoggerList())
+        {
+            for (Iterator<Appender<ILoggingEvent>> iterator = logBackLogger.iteratorForAppenders(); iterator.hasNext();)
+            {
+                Appender<?> appender = iterator.next();
+                if (appender instanceof VirtualTableAppender)
+                {
+                    virtualAppenderNames.add(appender.getName());
+                    count += 1;
+                }
+            }
+        }
+
+        if (count > 1)
+            throw new IllegalStateException(String.format("There are multiple appenders of class %s of names %s. There is only one appender of such class allowed.",
+                                                          VirtualTableAppender.class.getName(), String.join(",", virtualAppenderNames)));
+    }
+
     private boolean hasAppenders(Logger logBackLogger)
     {
         Iterator<Appender<ILoggingEvent>> it = logBackLogger.iteratorForAppenders();
diff --git a/src/java/org/apache/cassandra/utils/logging/LoggingSupport.java b/src/java/org/apache/cassandra/utils/logging/LoggingSupport.java
index 8ea83be004..35e11975f9 100644
--- a/src/java/org/apache/cassandra/utils/logging/LoggingSupport.java
+++ b/src/java/org/apache/cassandra/utils/logging/LoggingSupport.java
@@ -19,6 +19,9 @@
 package org.apache.cassandra.utils.logging;
 
 import java.util.Map;
+import java.util.Optional;
+
+import ch.qos.logback.core.Appender;
 
 /**
  * Common abstraction of functionality which can be implemented for different logging backend implementations (slf4j bindings).
@@ -49,4 +52,9 @@ public interface LoggingSupport
      * @return a map of logger names and their associated log level as string representations.
      */
     Map<String, String> getLoggingLevels();
+
+    default Optional<Appender<?>> getAppender(Class<?> appenderClass, String appenderName)
+    {
+        return Optional.empty();
+    }
 }
diff --git a/src/java/org/apache/cassandra/utils/logging/VirtualTableAppender.java b/src/java/org/apache/cassandra/utils/logging/VirtualTableAppender.java
new file mode 100644
index 0000000000..2820b2936f
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/logging/VirtualTableAppender.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils.logging;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableSet;
+
+import ch.qos.logback.classic.spi.LoggingEvent;
+import ch.qos.logback.core.AppenderBase;
+import org.apache.cassandra.audit.FileAuditLogger;
+import org.apache.cassandra.db.virtual.LogMessagesTable;
+import org.apache.cassandra.db.virtual.VirtualKeyspace;
+import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
+import org.apache.cassandra.db.virtual.VirtualTable;
+
+import static org.apache.cassandra.db.virtual.LogMessagesTable.LOGS_VIRTUAL_TABLE_DEFAULT_ROWS;
+import static org.apache.cassandra.db.virtual.LogMessagesTable.TABLE_NAME;
+import static org.apache.cassandra.schema.SchemaConstants.VIRTUAL_VIEWS;
+
+/**
+ * Appends Cassandra logs to virtual table system_views.system_logs
+ */
+public final class VirtualTableAppender extends AppenderBase<LoggingEvent>
+{
+    public static final String APPENDER_NAME = "CQLLOG";
+
+    private static final Set<String> forbiddenLoggers = ImmutableSet.of(FileAuditLogger.class.getName());
+
+    private LogMessagesTable logs;
+
+    // for holding messages until virtual registry contains logs virtual table
+    // as it takes some time during startup of a node to initialise virtual tables but messages are
+    // logged already
+    private final List<LoggingEvent> messageBuffer = new LinkedList<>();
+
+    @Override
+    protected void append(LoggingEvent eventObject)
+    {
+        if (!forbiddenLoggers.contains(eventObject.getLoggerName()))
+        {
+            if (logs == null)
+            {
+                logs = getVirtualTable();
+                if (logs == null)
+                    addToBuffer(eventObject);
+                else
+                    logs.add(eventObject);
+            }
+            else
+                logs.add(eventObject);
+        }
+    }
+
+    @Override
+    public void stop()
+    {
+        messageBuffer.clear();
+        super.stop();
+    }
+
+    /**
+     * Flushes all logs which were appended before virtual table was registered.
+     *
+     * @see org.apache.cassandra.service.CassandraDaemon#setupVirtualKeyspaces
+     */
+    public void flushBuffer()
+    {
+        Optional.ofNullable(getVirtualTable()).ifPresent(vtable -> {
+            messageBuffer.forEach(vtable::add);
+            messageBuffer.clear();
+        });
+    }
+
+    private LogMessagesTable getVirtualTable()
+    {
+        VirtualKeyspace keyspace = VirtualKeyspaceRegistry.instance.getKeyspaceNullable(VIRTUAL_VIEWS);
+
+        if (keyspace == null)
+            return null;
+
+        Optional<VirtualTable> logsTable = keyspace.tables()
+                                                   .stream()
+                                                   .filter(vt -> vt.name().equals(TABLE_NAME))
+                                                   .findFirst();
+
+        if (!logsTable.isPresent())
+            return null;
+
+        VirtualTable vt = logsTable.get();
+
+        if (!(vt instanceof LogMessagesTable))
+            throw new IllegalStateException(String.format("Virtual table %s.%s is not backed by an instance of %s but by %s",
+                                                          VIRTUAL_VIEWS,
+                                                          TABLE_NAME,
+                                                          LogMessagesTable.class.getName(),
+                                                          vt.getClass().getName()));
+
+        return (LogMessagesTable) vt;
+    }
+
+    private void addToBuffer(LoggingEvent eventObject)
+    {
+        // we restrict how many logging events we can put into buffer,
+        // so we are not growing without any bound when things go south
+        if (messageBuffer.size() < LOGS_VIRTUAL_TABLE_DEFAULT_ROWS)
+            messageBuffer.add(eventObject);
+    }
+}
diff --git a/test/conf/logback-dtest_with_vtable_appender.xml b/test/conf/logback-dtest_with_vtable_appender.xml
new file mode 100644
index 0000000000..c9fd108c77
--- /dev/null
+++ b/test/conf/logback-dtest_with_vtable_appender.xml
@@ -0,0 +1,66 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration debug="false" scan="true" scanPeriod="60 seconds">
+    <define name="cluster_id" class="org.apache.cassandra.distributed.impl.ClusterIDDefiner" />
+    <define name="instance_id" class="org.apache.cassandra.distributed.impl.InstanceIDDefiner" />
+
+    <!-- Shutdown hook ensures that async appender flushes -->
+    <shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/>
+
+    <appender name="INSTANCEFILE" class="ch.qos.logback.core.FileAppender">
+        <file>./build/test/logs/${cassandra.testtag}/${suitename}/${cluster_id}/${instance_id}/system.log</file>
+        <encoder>
+            <pattern>%-5level [%thread] ${instance_id} %date{ISO8601} %msg%n</pattern>
+        </encoder>
+        <immediateFlush>true</immediateFlush>
+    </appender>
+
+    <appender name="INSTANCESTDERR" target="System.err" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%-5level %date{HH:mm:ss,SSS} %msg%n</pattern>
+        </encoder>
+        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+            <level>WARN</level>
+        </filter>
+    </appender>
+
+    <appender name="INSTANCESTDOUT" target="System.out" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%-5level [%thread] ${instance_id} %date{ISO8601} %F:%L - %msg%n</pattern>
+        </encoder>
+        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+            <level>DEBUG</level>
+        </filter>
+    </appender>
+
+    <logger name="org.apache.hadoop" level="WARN"/>
+
+    <appender name="CQLLOG" class="org.apache.cassandra.utils.logging.VirtualTableAppender">
+        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+            <level>INFO</level>
+        </filter>
+    </appender>
+
+    <root level="DEBUG">
+        <appender-ref ref="INSTANCEFILE" /> <!-- use blocking to avoid race conditions with appending and searching -->
+        <appender-ref ref="INSTANCESTDERR" />
+        <appender-ref ref="INSTANCESTDOUT" />
+        <appender-ref ref="CQLLOG" />
+    </root>
+</configuration>
diff --git a/test/conf/logback-dtest_with_vtable_appender_invalid.xml b/test/conf/logback-dtest_with_vtable_appender_invalid.xml
new file mode 100644
index 0000000000..1b30c141c2
--- /dev/null
+++ b/test/conf/logback-dtest_with_vtable_appender_invalid.xml
@@ -0,0 +1,73 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration debug="false" scan="true" scanPeriod="60 seconds">
+    <define name="cluster_id" class="org.apache.cassandra.distributed.impl.ClusterIDDefiner" />
+    <define name="instance_id" class="org.apache.cassandra.distributed.impl.InstanceIDDefiner" />
+
+    <!-- Shutdown hook ensures that async appender flushes -->
+    <shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/>
+
+    <appender name="INSTANCEFILE" class="ch.qos.logback.core.FileAppender">
+        <file>./build/test/logs/${cassandra.testtag}/${suitename}/${cluster_id}/${instance_id}/system.log</file>
+        <encoder>
+            <pattern>%-5level [%thread] ${instance_id} %date{ISO8601} %msg%n</pattern>
+        </encoder>
+        <immediateFlush>true</immediateFlush>
+    </appender>
+
+    <appender name="INSTANCESTDERR" target="System.err" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%-5level %date{HH:mm:ss,SSS} %msg%n</pattern>
+        </encoder>
+        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+            <level>WARN</level>
+        </filter>
+    </appender>
+
+    <appender name="INSTANCESTDOUT" target="System.out" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%-5level [%thread] ${instance_id} %date{ISO8601} %F:%L - %msg%n</pattern>
+        </encoder>
+        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+            <level>DEBUG</level>
+        </filter>
+    </appender>
+
+    <logger name="org.apache.hadoop" level="WARN"/>
+
+    <appender name="CQLLOG" class="org.apache.cassandra.utils.logging.VirtualTableAppender">
+        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+            <level>INFO</level>
+        </filter>
+    </appender>
+
+    <appender name="CQLLOG2" class="org.apache.cassandra.utils.logging.VirtualTableAppender">
+        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+            <level>INFO</level>
+        </filter>
+    </appender>
+
+    <root level="DEBUG">
+        <appender-ref ref="INSTANCEFILE" /> <!-- use blocking to avoid race conditions with appending and searching -->
+        <appender-ref ref="INSTANCESTDERR" />
+        <appender-ref ref="INSTANCESTDOUT" />
+        <appender-ref ref="CQLLOG" />
+        <appender-ref ref="CQLLOG2" /> <!-- invalid, we can not have multiple appenders of VirtualTableAppender class -->
+    </root>
+</configuration>
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 2c3123546f..cba3807237 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -114,6 +114,7 @@ import org.apache.cassandra.net.Verb;
 import org.apache.cassandra.schema.MigrationCoordinator;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.security.ThreadAwareSecurityManager;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.CassandraDaemon;
 import org.apache.cassandra.service.ClientState;
@@ -590,6 +591,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
                 DistributedTestSnitch.assign(config.networkTopology());
 
                 DatabaseDescriptor.daemonInitialization();
+                ThreadAwareSecurityManager.install();
                 FileUtils.setFSErrorHandler(new DefaultFSErrorHandler());
                 DatabaseDescriptor.createAllDirectories();
                 CassandraDaemon.getInstanceForTesting().migrateSystemDataIfNeeded();
diff --git a/test/distributed/org/apache/cassandra/distributed/test/VirtualTableLogsTest.java b/test/distributed/org/apache/cassandra/distributed/test/VirtualTableLogsTest.java
new file mode 100644
index 0000000000..17cd994256
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/VirtualTableLogsTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Test;
+
+import ch.qos.logback.classic.Level;
+import org.apache.cassandra.db.virtual.LogMessagesTable;
+import org.apache.cassandra.db.virtual.LogMessagesTable.LogMessage;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.utils.logging.VirtualTableAppender;
+
+import static java.lang.String.format;
+import static org.apache.cassandra.db.virtual.LogMessagesTable.LEVEL_COLUMN_NAME;
+import static org.apache.cassandra.db.virtual.LogMessagesTable.LOGGER_COLUMN_NAME;
+import static org.apache.cassandra.db.virtual.LogMessagesTable.MESSAGE_COLUMN_NAME;
+import static org.apache.cassandra.db.virtual.LogMessagesTable.ORDER_IN_MILLISECOND_COLUMN_NAME;
+import static org.apache.cassandra.db.virtual.LogMessagesTable.TIMESTAMP_COLUMN_NAME;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ONE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class VirtualTableLogsTest extends TestBaseImpl
+{
+    @Test
+    public void testVTableOutput() throws Throwable
+    {
+        System.setProperty("logback.configurationFile", "test/conf/logback-dtest_with_vtable_appender.xml");
+
+        try (Cluster cluster = Cluster.build(1)
+                                      .withConfig(c -> c.with(Feature.values()))
+                                      .start())
+        {
+            List<TestingLogMessage> rows = getRows(cluster);
+            assertFalse(rows.isEmpty());
+
+            rows.forEach(message -> assertTrue(Level.toLevel(message.level).isGreaterOrEqual(Level.INFO)));
+        }
+        finally
+        {
+            System.clearProperty("logback.configurationFile");
+        }
+    }
+
+    @Test
+    public void testMultipleAppendersFailToStartNode() throws Throwable
+    {
+        System.setProperty("logback.configurationFile", "test/conf/logback-dtest_with_vtable_appender_invalid.xml");
+
+        try (Cluster ignored = Cluster.build(1)
+                                      .withConfig(c -> c.with(Feature.values()))
+                                      .start())
+        {
+            fail("Node should not start as there is supposed to be invalid logback configuration file.");
+        }
+        catch (IllegalStateException ex)
+        {
+            assertEquals(format("There are multiple appenders of class %s " +
+                                "of names CQLLOG,CQLLOG2. There is only one appender of such class allowed.",
+                                VirtualTableAppender.class.getName()),
+                         ex.getMessage());
+        }
+        finally
+        {
+            System.clearProperty("logback.configurationFile");
+        }
+    }
+
+    private List<TestingLogMessage> getRows(Cluster cluster)
+    {
+        SimpleQueryResult simpleQueryResult = cluster.coordinator(1).executeWithResult(query("select * from %s"), ONE);
+        List<TestingLogMessage> rows = new ArrayList<>();
+        simpleQueryResult.forEachRemaining(row -> {
+            long timestamp = row.getTimestamp(TIMESTAMP_COLUMN_NAME).getTime();
+            String logger = row.getString(LOGGER_COLUMN_NAME);
+            String level = row.getString(LEVEL_COLUMN_NAME);
+            String message = row.getString(MESSAGE_COLUMN_NAME);
+            int order = row.getInteger(ORDER_IN_MILLISECOND_COLUMN_NAME);
+            TestingLogMessage logMessage = new TestingLogMessage(timestamp, logger, level, message, order);
+            rows.add(logMessage);
+        });
+        return rows;
+    }
+
+    private String query(String template)
+    {
+        return format(template, getTableName());
+    }
+
+    private String getTableName()
+    {
+        return format("%s.%s", SchemaConstants.VIRTUAL_VIEWS, LogMessagesTable.TABLE_NAME);
+    }
+
+    private static class TestingLogMessage extends LogMessage
+    {
+        private int order;
+
+        public TestingLogMessage(long timestamp, String logger, String level, String message, int order)
+        {
+            super(timestamp, logger, level, message);
+            this.order = order;
+        }
+    }
+}
diff --git a/test/unit/org/apache/cassandra/db/virtual/LogMessagesTableTest.java b/test/unit/org/apache/cassandra/db/virtual/LogMessagesTableTest.java
new file mode 100644
index 0000000000..2225c4a1dc
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/virtual/LogMessagesTableTest.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.virtual;
+
+import java.time.Instant;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import ch.qos.logback.classic.Level;
+import ch.qos.logback.classic.spi.LoggingEvent;
+import com.datastax.driver.core.Row;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.DataRange;
+import org.apache.cassandra.db.marshal.TimestampType;
+import org.apache.cassandra.db.virtual.AbstractVirtualTable.DataSet;
+import org.apache.cassandra.db.virtual.AbstractVirtualTable.Partition;
+import org.apache.cassandra.dht.LocalPartitioner;
+
+import static org.apache.cassandra.config.CassandraRelevantProperties.LOGS_VIRTUAL_TABLE_MAX_ROWS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class LogMessagesTableTest extends CQLTester
+{
+    private String keyspace = createKeyspaceName();
+    private LogMessagesTable table;
+
+    @BeforeClass
+    public static void setup()
+    {
+        CQLTester.setUpClass();
+    }
+
+    @Test
+    public void testTruncate() throws Throwable
+    {
+        registerVirtualTable();
+
+        int numberOfRows = 100;
+        List<LoggingEvent> loggingEvents = getLoggingEvents(numberOfRows);
+        loggingEvents.forEach(table::add);
+
+        execute(query("truncate %s"));
+
+        assertTrue(executeNet(query("select timestamp from %s")).all().isEmpty());
+    }
+
+    @Test
+    public void empty() throws Throwable
+    {
+        registerVirtualTable();
+        assertEmpty(execute(query("select * from %s")));
+    }
+
+    @Test
+    public void testInsert()
+    {
+        registerVirtualTable();
+
+        int numberOfRows = 1000;
+        List<LoggingEvent> loggingEvents = getLoggingEvents(numberOfRows);
+        loggingEvents.forEach(table::add);
+
+        assertEquals(numberOfRows, numberOfPartitions());
+    }
+
+    @Test
+    public void testLimitedCapacity() throws Throwable
+    {
+        registerVirtualTable(100);
+
+        int numberOfRows = 1000;
+        List<LoggingEvent> loggingEvents = getLoggingEvents(numberOfRows);
+        loggingEvents.forEach(table::add);
+
+        // even we inserted 1000 rows, only 100 are present as its capacity is bounded
+        assertEquals(100, numberOfPartitions());
+
+        // the first record in the table will be the last one which we inserted
+        LoggingEvent firstEvent = loggingEvents.get(999);
+        assertRowsNet(executeNet(query("select timestamp from %s limit 1")),
+                      new Object[] { new Date(firstEvent.getTimeStamp()) });
+
+        // the last record in the table will be 900th we inserted
+        List<Row> all = executeNet(query("select timestamp from %s")).all();
+        assertEquals(100, all.size());
+        Row row = all.get(all.size() - 1);
+        Date timestamp = row.getTimestamp(0);
+        assertEquals(loggingEvents.get(900).getTimeStamp(), timestamp.getTime());
+    }
+
+    @Test
+    public void testMultipleLogsInSameMillisecond()
+    {
+        registerVirtualTable(10);
+        List<LoggingEvent> loggingEvents = getLoggingEvents(10, Instant.now(), 5);
+        loggingEvents.forEach(table::add);
+
+        // 2 partitions, 5 rows in each
+        assertEquals(2, numberOfPartitions());
+    }
+
+    @Test
+    public void testResolvingBufferSize()
+    {
+        System.setProperty(LOGS_VIRTUAL_TABLE_MAX_ROWS.getKey(), "-1");
+        assertEquals(LogMessagesTable.LOGS_VIRTUAL_TABLE_DEFAULT_ROWS, LogMessagesTable.resolveBufferSize());
+
+        System.setProperty(LOGS_VIRTUAL_TABLE_MAX_ROWS.getKey(), "0");
+        assertEquals(LogMessagesTable.LOGS_VIRTUAL_TABLE_DEFAULT_ROWS, LogMessagesTable.resolveBufferSize());
+
+        System.setProperty(LOGS_VIRTUAL_TABLE_MAX_ROWS.getKey(), "1000001");
+        assertEquals(LogMessagesTable.LOGS_VIRTUAL_TABLE_DEFAULT_ROWS, LogMessagesTable.resolveBufferSize());
+
+        System.setProperty(LOGS_VIRTUAL_TABLE_MAX_ROWS.getKey(), "999");
+        assertEquals(LogMessagesTable.LOGS_VIRTUAL_TABLE_DEFAULT_ROWS, LogMessagesTable.resolveBufferSize());
+
+        System.setProperty(LOGS_VIRTUAL_TABLE_MAX_ROWS.getKey(), "50001");
+        assertEquals(50001, LogMessagesTable.resolveBufferSize());
+    }
+
+    private void registerVirtualTable()
+    {
+        registerVirtualTable(LogMessagesTable.LOGS_VIRTUAL_TABLE_MIN_ROWS);
+    }
+
+    private void registerVirtualTable(int size)
+    {
+        table = new LogMessagesTable(keyspace, size);
+        VirtualKeyspaceRegistry.instance.register(new VirtualKeyspace(keyspace, ImmutableList.of(table)));
+    }
+
+    private int numberOfPartitions()
+    {
+        DataSet data = table.data();
+
+        Iterator<Partition> partitions = data.getPartitions(DataRange.allData(new LocalPartitioner(TimestampType.instance)));
+
+        int numberOfPartitions = 0;
+
+        while (partitions.hasNext())
+        {
+            partitions.next();
+            numberOfPartitions += 1;
+        }
+
+        return numberOfPartitions;
+    }
+
+    private String query(String query)
+    {
+        return String.format(query, table.toString());
+    }
+
+    private List<LoggingEvent> getLoggingEvents(int size)
+    {
+        return getLoggingEvents(size, Instant.now(), 1);
+    }
+
+    private List<LoggingEvent> getLoggingEvents(int size, Instant firstTimestamp, int logsInMillisecond)
+    {
+        List<LoggingEvent> logs = new LinkedList<>();
+        int partitions = size / logsInMillisecond;
+
+        for (int i = 0; i < partitions; i++)
+        {
+            long timestamp = firstTimestamp.toEpochMilli();
+            firstTimestamp = firstTimestamp.plusSeconds(1);
+
+            for (int j = 0; j < logsInMillisecond; j++)
+                logs.add(getLoggingEvent(timestamp));
+        }
+
+        return logs;
+    }
+
+    private LoggingEvent getLoggingEvent(long timestamp)
+    {
+        LoggingEvent event = new LoggingEvent();
+        event.setLevel(Level.INFO);
+        event.setMessage("message " + timestamp);
+        event.setLoggerName("logger " + timestamp);
+        event.setThreadName(Thread.currentThread().getName());
+        event.setTimeStamp(timestamp);
+
+        return event;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org