You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2018/05/11 12:46:01 UTC
[4/4] cassandra git commit: Audit logging for database activity
Audit logging for database activity
patch by Vinay Chella; reviewed by jasobrown for CASSANDRA-12151
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f56871b8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f56871b8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f56871b8
Branch: refs/heads/trunk
Commit: f56871b88be1e8965f166769c12cfa43313bac74
Parents: aba582f
Author: Vinay Chella <vi...@gmail.com>
Authored: Fri Feb 23 20:16:16 2018 -0800
Committer: Jason Brown <ja...@gmail.com>
Committed: Fri May 11 05:44:16 2018 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
NEWS.txt | 2 +
conf/cassandra.yaml | 14 +
doc/source/operating/audit_logging.rst | 203 ++++++
.../apache/cassandra/audit/AuditLogEntry.java | 298 ++++++++
.../cassandra/audit/AuditLogEntryCategory.java | 27 +
.../cassandra/audit/AuditLogEntryType.java | 83 +++
.../apache/cassandra/audit/AuditLogFilter.java | 162 +++++
.../apache/cassandra/audit/AuditLogManager.java | 317 +++++++++
.../apache/cassandra/audit/AuditLogOptions.java | 61 ++
.../apache/cassandra/audit/BinAuditLogger.java | 85 +++
.../cassandra/audit/BinLogAuditLogger.java | 387 +++++++++++
.../apache/cassandra/audit/FileAuditLogger.java | 57 ++
.../apache/cassandra/audit/FullQueryLogger.java | 197 ++++++
.../cassandra/audit/IAuditLogContext.java | 53 ++
.../apache/cassandra/audit/IAuditLogger.java | 47 ++
.../apache/cassandra/audit/NoOpAuditLogger.java | 42 ++
.../org/apache/cassandra/config/Config.java | 3 +
.../cassandra/config/DatabaseDescriptor.java | 11 +
.../org/apache/cassandra/cql3/CQLStatement.java | 3 +-
.../cql3/statements/AlterKeyspaceStatement.java | 7 +
.../cql3/statements/AlterRoleStatement.java | 7 +
.../cql3/statements/AlterTableStatement.java | 7 +
.../cql3/statements/AlterTypeStatement.java | 7 +
.../cql3/statements/AlterViewStatement.java | 7 +
.../cql3/statements/BatchStatement.java | 7 +
.../cassandra/cql3/statements/CFStatement.java | 2 +-
.../statements/CreateAggregateStatement.java | 7 +
.../statements/CreateFunctionStatement.java | 7 +
.../cql3/statements/CreateIndexStatement.java | 6 +
.../statements/CreateKeyspaceStatement.java | 7 +
.../cql3/statements/CreateRoleStatement.java | 6 +
.../cql3/statements/CreateTableStatement.java | 6 +
.../cql3/statements/CreateTriggerStatement.java | 8 +
.../cql3/statements/CreateTypeStatement.java | 7 +
.../cql3/statements/CreateViewStatement.java | 6 +
.../cql3/statements/DeleteStatement.java | 6 +
.../cql3/statements/DropAggregateStatement.java | 7 +
.../cql3/statements/DropFunctionStatement.java | 8 +-
.../cql3/statements/DropIndexStatement.java | 7 +
.../cql3/statements/DropKeyspaceStatement.java | 7 +
.../cql3/statements/DropRoleStatement.java | 7 +
.../cql3/statements/DropTableStatement.java | 7 +
.../cql3/statements/DropTriggerStatement.java | 7 +
.../cql3/statements/DropTypeStatement.java | 9 +-
.../cql3/statements/DropViewStatement.java | 7 +
.../statements/GrantPermissionsStatement.java | 9 +
.../cql3/statements/GrantRoleStatement.java | 7 +
.../statements/ListPermissionsStatement.java | 7 +
.../cql3/statements/ListRolesStatement.java | 7 +
.../statements/RevokePermissionsStatement.java | 8 +
.../cql3/statements/RevokeRoleStatement.java | 7 +
.../cql3/statements/SelectStatement.java | 8 +
.../cql3/statements/TruncateStatement.java | 7 +
.../cql3/statements/UpdateStatement.java | 7 +
.../cassandra/cql3/statements/UseStatement.java | 7 +
.../db/fullquerylog/FullQueryLogger.java | 530 --------------
.../apache/cassandra/service/StorageProxy.java | 9 +-
.../cassandra/service/StorageService.java | 40 ++
.../cassandra/service/StorageServiceMBean.java | 5 +
.../org/apache/cassandra/tools/NodeProbe.java | 10 +
.../org/apache/cassandra/tools/NodeTool.java | 4 +-
.../apache/cassandra/tools/fqltool/Dump.java | 64 +-
.../tools/nodetool/DisableAuditLog.java | 33 +
.../tools/nodetool/EnableAuditLog.java | 55 ++
.../org/apache/cassandra/transport/Message.java | 5 +
.../transport/messages/AuthResponse.java | 18 +
.../transport/messages/BatchMessage.java | 59 +-
.../transport/messages/ExecuteMessage.java | 61 +-
.../transport/messages/PrepareMessage.java | 30 +-
.../transport/messages/QueryMessage.java | 36 +-
.../org/apache/cassandra/utils/FBUtilities.java | 24 +
.../apache/cassandra/utils/binlog/BinLog.java | 2 +-
.../cassandra/audit/AuditLogFilterTest.java | 189 +++++
.../apache/cassandra/audit/AuditLoggerTest.java | 690 +++++++++++++++++++
.../cassandra/audit/BinAuditLoggerTest.java | 91 +++
.../cassandra/audit/FullQueryLoggerTest.java | 610 ++++++++++++++++
.../cassandra/audit/InMemoryAuditLogger.java | 47 ++
.../config/DatabaseDescriptorRefTest.java | 5 +
.../org/apache/cassandra/cql3/CQLTester.java | 7 +
.../db/fullquerylog/FullQueryLoggerTest.java | 601 ----------------
.../service/StorageServiceServerTest.java | 28 +
.../cassandra/utils/binlog/BinLogTest.java | 12 +-
83 files changed, 4346 insertions(+), 1222 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e574d16..6308416 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0
+ * Audit logging for database activity (CASSANDRA-12151)
* Clean up build artifacts in docs container (CASSANDRA-14432)
* Minor network authz improvements (Cassandra-14413)
* Automatic sstable upgrades (CASSANDRA-14197)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 4885a12..da80684 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -79,6 +79,8 @@ New features
in cassandra.yaml, and the docs for create and alter role statements. CASSANDRA-13985
- Roles altered from login=true to login=false will prevent existing connections from executing any
statements after the cache has been refreshed. CASSANDRA-13985
+ - Support for audit logging of database activity. If enabled, logs every incoming
+ CQL command request, Authentication (successful as well as unsuccessful login) to a node.
Upgrading
---------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 7cc9e32..49c6f03 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1184,3 +1184,17 @@ back_pressure_strategy:
# automatic_sstable_upgrade: false
# Limit the number of concurrent sstable upgrades
# max_concurrent_automatic_sstable_upgrades: 1
+
+# Audit logging - Logs every incoming CQL command request, authentication to a node. See the docs
+# on audit_logging for full details about the various configuration options.
+audit_logging_options:
+ enabled: false
+ logger: BinAuditLogger
+ # audit_logs_dir:
+ # included_keyspaces:
+ # excluded_keyspaces:
+ # included_categories:
+ # excluded_categories:
+ # included_users:
+ # excluded_users:
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/doc/source/operating/audit_logging.rst
----------------------------------------------------------------------
diff --git a/doc/source/operating/audit_logging.rst b/doc/source/operating/audit_logging.rst
new file mode 100644
index 0000000..9be7a43
--- /dev/null
+++ b/doc/source/operating/audit_logging.rst
@@ -0,0 +1,203 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+.. or more contributor license agreements. See the NOTICE file
+.. distributed with this work for additional information
+.. regarding copyright ownership. The ASF licenses this file
+.. to you under the Apache License, Version 2.0 (the
+.. "License"); you may not use this file except in compliance
+.. with the License. You may obtain a copy of the License at
+..
+.. http://www.apache.org/licenses/LICENSE-2.0
+..
+.. Unless required by applicable law or agreed to in writing, software
+.. distributed under the License is distributed on an "AS IS" BASIS,
+.. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+.. See the License for the specific language governing permissions and
+.. limitations under the License.
+
+.. highlight:: none
+
+
+
+Audit Logging
+------------------
+
+Audit logging in Cassandra logs every incoming CQL command request, Authentication (successful as well as unsuccessful login)
+to C* node. Currently, there are two implementations provided, the custom logger can be implemented and injected with the
+class name as a parameter in cassandra.yaml.
+
+- ``BinAuditLogger`` An efficient way to log events to file in a binary format.
+- ``FileAuditLogger`` Logs events to ``audit/audit.log`` file using slf4j logger.
+
+*Recommendation* ``BinAuditLogger`` is a community recommended logger considering the performance
+
+What does it capture
+^^^^^^^^^^^^^^^^^^^^^^^
+
+Audit logging captures following events
+
+- Successful as well as unsuccessful login attempts.
+
+- All database commands executed via Native protocol (CQL) attempted or successfully executed.
+
+What does it log
+^^^^^^^^^^^^^^^^^^^
+Each audit log implementation has access to the following attributes, and for the default text based logger these fields are concatenated with `|` s to yield the final message.
+
+ - ``user``: User name(if available)
+ - ``host``: Host IP, where the command is being executed
+ - ``source ip address``: Source IP address from where the request initiated
+ - ``source port``: Source port number from where the request initiated
+ - ``timestamp``: unix time stamp
+ - ``type``: Type of the request (SELECT, INSERT, etc.,)
+ - ``category`` - Category of the request (DDL, DML, etc.,)
+ - ``keyspace`` - Keyspace(If applicable) on which request is targeted to be executed
+ - ``scope`` - Table/Aggregate name/ function name/ trigger name etc., as applicable
+ - ``operation`` - CQL command being executed
+
+How to configure
+^^^^^^^^^^^^^^^^^^
+Auditlog can be configured using cassandra.yaml. If you want to try Auditlog on one node, it can also be enabled and configured using ``nodetool``.
+
+cassandra.yaml configurations for AuditLog
+"""""""""""""""""""""""""""""""""""""""""""""
+ - ``enabled``: This option enables/ disables audit log
+ - ``logger``: Class name of the logger/ custom logger.
+ - ``audit_logs_dir``: Auditlogs directory location, if not set, default to `cassandra.logdir.audit` or `cassandra.logdir` + /audit/
+ - ``included_keyspaces``: Comma separated list of keyspaces to be included in audit log, default - includes all keyspaces
+ - ``excluded_keyspaces``: Comma separated list of keyspaces to be excluded from audit log, default - excludes no keyspace
+ - ``included_categories``: Comma separated list of Audit Log Categories to be included in audit log, default - includes all categories
+ - ``excluded_categories``: Comma separated list of Audit Log Categories to be excluded from audit log, default - excludes no category
+ - ``included_users``: Comma separated list of users to be included in audit log, default - includes all users
+ - ``excluded_users``: Comma separated list of users to be excluded from audit log, default - excludes no user
+
+
+List of available categories are: QUERY, DML, DDL, DCL, OTHER, AUTH, ERROR, PREPARE
+
+NodeTool command to enable AuditLog
+"""""""""""""""""""""""""""""""""""""
+``enableauditlog``: Enables AuditLog with yaml defaults. yaml configurations can be overridden using options via nodetool command.
+
+::
+
+ nodetool enableauditlog
+
+Options
+**********
+
+
+``--excluded-categories``
+ Comma separated list of Audit Log Categories to be excluded for
+ audit log. If not set the value from cassandra.yaml will be used
+
+``--excluded-keyspaces``
+ Comma separated list of keyspaces to be excluded for audit log. If
+ not set the value from cassandra.yaml will be used
+
+``--excluded-users``
+ Comma separated list of users to be excluded for audit log. If not
+ set the value from cassandra.yaml will be used
+
+``--included-categories``
+ Comma separated list of Audit Log Categories to be included for
+ audit log. If not set the value from cassandra.yaml will be used
+
+``--included-keyspaces``
+ Comma separated list of keyspaces to be included for audit log. If
+ not set the value from cassandra.yaml will be used
+
+``--included-users``
+ Comma separated list of users to be included for audit log. If not
+ set the value from cassandra.yaml will be used
+
+``--logger``
+ Logger name to be used for AuditLogging. Default BinAuditLogger. If
+ not set the value from cassandra.yaml will be used
+
+
+NodeTool command to disable AuditLog
+"""""""""""""""""""""""""""""""""""""""
+
+``disableauditlog``: Disables AuditLog.
+
+::
+
+ nodetool disableuditlog
+
+
+
+
+
+
+
+NodeTool command to reload AuditLog filters
+"""""""""""""""""""""""""""""""""""""""""""""
+
+``enableauditlog``: NodeTool enableauditlog command can be used to reload auditlog filters when called with default or previous ``loggername`` and updated filters
+
+E.g.,
+::
+
+ nodetool enableauditlog --loggername <Default/ existing loggerName> --included-keyspaces <New Filter values>
+
+
+
+
+
+
+
+
+Sample output
+^^^^^^^^^^^^^^^^
+::
+
+ LogMessage: user:anonymous|host:localhost/X.X.X.X|source:/X.X.X.X|port:60878|timestamp:1521158923615|type:USE_KS|category:DDL|ks:dev1|operation:USE "dev1"
+
+
+
+Configuring BinAuditLogger
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+To use ``BinAuditLogger`` as a logger in AuditLogging, set the logger to ``BinAuditLogger`` in cassandra.yaml under ``audit_logging_options`` section. ``BinAuditLogger`` can be futher configued using its advanced options in cassandra.yaml.
+
+
+Adcanced Options for BinAuditLogger
+""""""""""""""""""""""""""""""""""""""
+
+``block``
+ Indicates if the AuditLog should block if the it falls behind or should drop audit log records. Default is set to ``true`` so that AuditLog records wont be lost
+
+``max_queue_weight``
+ Maximum weight of in memory queue for records waiting to be written to the audit log file before blocking or dropping the log records. Default is set to ``256 * 1024 * 1024``
+
+``max_log_size``
+ Maximum size of the rolled files to retain on disk before deleting the oldest file. Default is set to ``16L * 1024L * 1024L * 1024L``
+
+``roll_cycle``
+ How often to roll Audit log segments so they can potentially be reclaimed. Available options are: MINUTELY, HOURLY, DAILY, LARGE_DAILY, XLARGE_DAILY, HUGE_DAILY.For more options, refer: net.openhft.chronicle.queue.RollCycles. Default is set to ``"HOURLY"``
+
+Configuring FileAuditLogger
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+To use ``FileAuditLogger`` as a logger in AuditLogging, apart from setting the class name in cassandra.yaml, following configuration is needed to have the audit log events to flow through separate log file instead of system.log
+
+
+.. code-block:: xml
+
+ <!-- Audit Logging (FileAuditLogger) rolling file appender to audit.log -->
+ <appender name="AUDIT" class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <file>${cassandra.logdir}/audit/audit.log</file>
+ <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
+ <!-- rollover daily -->
+ <fileNamePattern>${cassandra.logdir}/audit/audit.log.%d{yyyy-MM-dd}.%i.zip</fileNamePattern>
+ <!-- each file should be at most 50MB, keep 30 days worth of history, but at most 5GB -->
+ <maxFileSize>50MB</maxFileSize>
+ <maxHistory>30</maxHistory>
+ <totalSizeCap>5GB</totalSizeCap>
+ </rollingPolicy>
+ <encoder>
+ <pattern>%-5level [%thread] %date{ISO8601} %F:%L - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <!-- Audit Logging additivity to redirect audt logging events to audit/audit.log -->
+ <logger name="org.apache.cassandra.audit" additivity="false" level="INFO">
+ <appender-ref ref="AUDIT"/>
+ </logger>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/audit/AuditLogEntry.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/audit/AuditLogEntry.java b/src/java/org/apache/cassandra/audit/AuditLogEntry.java
new file mode 100644
index 0000000..d53fc6e
--- /dev/null
+++ b/src/java/org/apache/cassandra/audit/AuditLogEntry.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.audit;
+
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.UUID;
+import javax.annotation.Nullable;
+
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.cassandra.auth.AuthenticatedUser;
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class AuditLogEntry
+{
+ private final InetAddressAndPort host = FBUtilities.getBroadcastAddressAndPort();
+ private final InetAddressAndPort source;
+ private final String user;
+ private final long timestamp;
+ private final AuditLogEntryType type;
+ private final UUID batch;
+ private final String keyspace;
+ private final String scope;
+ private final String operation;
+ private final QueryOptions options;
+
+ private AuditLogEntry(AuditLogEntryType type, InetAddressAndPort source, String user, long timestamp, UUID batch, String keyspace, String scope, String operation, QueryOptions options)
+ {
+ this.type = type;
+ this.source = source;
+ this.user = user;
+ this.timestamp = timestamp;
+ this.batch = batch;
+ this.keyspace = keyspace;
+ this.scope = scope;
+ this.operation = operation;
+ this.options = options;
+ }
+
+ String getLogString()
+ {
+ StringBuilder builder = new StringBuilder(100);
+ builder.append("user:").append(user)
+ .append("|host:").append(host)
+ .append("|source:").append(source.address);
+ if (source.port > 0)
+ {
+ builder.append("|port:").append(source.port);
+ }
+
+ builder.append("|timestamp:").append(timestamp)
+ .append("|type:").append(type)
+ .append("|category:").append(type.getCategory());
+
+ if (batch != null)
+ {
+ builder.append("|batch:").append(batch);
+ }
+ if (StringUtils.isNotBlank(keyspace))
+ {
+ builder.append("|ks:").append(keyspace);
+ }
+ if (StringUtils.isNotBlank(scope))
+ {
+ builder.append("|scope:").append(scope);
+ }
+ if (StringUtils.isNotBlank(operation))
+ {
+ builder.append("|operation:").append(operation);
+ }
+ return builder.toString();
+ }
+
+ public InetAddressAndPort getHost()
+ {
+ return host;
+ }
+
+ public InetAddressAndPort getSource()
+ {
+ return source;
+ }
+
+ public String getUser()
+ {
+ return user;
+ }
+
+ public long getTimestamp()
+ {
+ return timestamp;
+ }
+
+ public AuditLogEntryType getType()
+ {
+ return type;
+ }
+
+ public UUID getBatch()
+ {
+ return batch;
+ }
+
+ public String getKeyspace()
+ {
+ return keyspace;
+ }
+
+ public String getScope()
+ {
+ return scope;
+ }
+
+ public String getOperation()
+ {
+ return operation;
+ }
+
+ public QueryOptions getOptions()
+ {
+ return options;
+ }
+
+ public static class Builder
+ {
+ private static final InetAddressAndPort DEFAULT_SOURCE;
+
+ static
+ {
+ try
+ {
+ DEFAULT_SOURCE = InetAddressAndPort.getByNameOverrideDefaults("0.0.0.0", 0);
+ }
+ catch (UnknownHostException e)
+ {
+
+ throw new RuntimeException("failed to create default source address", e);
+ }
+ }
+
+ private static final String DEFAULT_OPERATION = StringUtils.EMPTY;
+
+ private AuditLogEntryType type;
+ private InetAddressAndPort source;
+ private String user;
+ private long timestamp;
+ private UUID batch;
+ private String keyspace;
+ private String scope;
+ private String operation;
+ private QueryOptions options;
+
+ public Builder(ClientState clientState)
+ {
+ if (clientState != null)
+ {
+ if (clientState.getRemoteAddress() != null)
+ {
+ InetSocketAddress addr = clientState.getRemoteAddress();
+ source = InetAddressAndPort.getByAddressOverrideDefaults(addr.getAddress(), addr.getPort());
+ }
+
+ if (clientState.getUser() != null)
+ {
+ user = clientState.getUser().getName();
+ }
+ keyspace = clientState.getRawKeyspace();
+ }
+ else
+ {
+ source = DEFAULT_SOURCE;
+ user = AuthenticatedUser.SYSTEM_USER.getName();
+ }
+
+ timestamp = System.currentTimeMillis();
+ }
+
+ public Builder(AuditLogEntry entry)
+ {
+ type = entry.type;
+ source = entry.source;
+ user = entry.user;
+ timestamp = entry.timestamp;
+ batch = entry.batch;
+ keyspace = entry.keyspace;
+ scope = entry.scope;
+ operation = entry.operation;
+ options = entry.options;
+ }
+
+ public Builder setType(AuditLogEntryType type)
+ {
+ this.type = type;
+ return this;
+ }
+
+ public Builder(AuditLogEntryType type)
+ {
+ this.type = type;
+ operation = DEFAULT_OPERATION;
+ }
+
+ public Builder setUser(String user)
+ {
+ this.user = user;
+ return this;
+ }
+
+ public Builder setBatch(UUID batch)
+ {
+ this.batch = batch;
+ return this;
+ }
+
+ public Builder setTimestamp(long timestampMillis)
+ {
+ this.timestamp = timestampMillis;
+ return this;
+ }
+
+ public Builder setKeyspace(QueryState queryState, @Nullable CQLStatement statement)
+ {
+ keyspace = statement != null && statement.getAuditLogContext().keyspace != null
+ ? statement.getAuditLogContext().keyspace
+ : queryState.getClientState().getRawKeyspace();
+ return this;
+ }
+
+ public Builder setKeyspace(String keyspace)
+ {
+ this.keyspace = keyspace;
+ return this;
+ }
+
+ public Builder setKeyspace(CQLStatement statement)
+ {
+ this.keyspace = statement.getAuditLogContext().keyspace;
+ return this;
+ }
+
+ public Builder setScope(CQLStatement statement)
+ {
+ this.scope = statement.getAuditLogContext().scope;
+ return this;
+ }
+
+ public Builder setOperation(String operation)
+ {
+ this.operation = operation;
+ return this;
+ }
+
+ public void appendToOperation(String str)
+ {
+ if (StringUtils.isNotBlank(str))
+ {
+ if (operation.isEmpty())
+ operation = str;
+ else
+ operation = operation.concat("; ").concat(str);
+ }
+ }
+
+ public Builder setOptions(QueryOptions options)
+ {
+ this.options = options;
+ return this;
+ }
+
+ public AuditLogEntry build()
+ {
+ timestamp = timestamp > 0 ? timestamp : System.currentTimeMillis();
+ return new AuditLogEntry(type, source, user, timestamp, batch, keyspace, scope, operation, options);
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/audit/AuditLogEntryCategory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/audit/AuditLogEntryCategory.java b/src/java/org/apache/cassandra/audit/AuditLogEntryCategory.java
new file mode 100644
index 0000000..616658c
--- /dev/null
+++ b/src/java/org/apache/cassandra/audit/AuditLogEntryCategory.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.audit;
+
+/**
+ * Enum to categorize AuditLogEntries
+ */
+public enum AuditLogEntryCategory
+{
+ QUERY, DML, DDL, DCL, OTHER, AUTH, ERROR, PREPARE
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/audit/AuditLogEntryType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/audit/AuditLogEntryType.java b/src/java/org/apache/cassandra/audit/AuditLogEntryType.java
new file mode 100644
index 0000000..4eb112b
--- /dev/null
+++ b/src/java/org/apache/cassandra/audit/AuditLogEntryType.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.audit;
+
+public enum AuditLogEntryType
+{
+ /*
+ * CQL Audit Log Entry Types
+ */
+
+ SELECT(AuditLogEntryCategory.QUERY),
+ UPDATE(AuditLogEntryCategory.DML),
+ DELETE(AuditLogEntryCategory.DML),
+ TRUNCATE(AuditLogEntryCategory.DDL),
+ CREATE_KEYSPACE(AuditLogEntryCategory.DDL),
+ ALTER_KEYSPACE(AuditLogEntryCategory.DDL),
+ DROP_KEYSPACE(AuditLogEntryCategory.DDL),
+ CREATE_TABLE(AuditLogEntryCategory.DDL),
+ DROP_TABLE(AuditLogEntryCategory.DDL),
+ PREPARE_STATEMENT(AuditLogEntryCategory.PREPARE),
+ DROP_TRIGGER(AuditLogEntryCategory.DDL),
+ LIST_USERS(AuditLogEntryCategory.DCL),
+ CREATE_INDEX(AuditLogEntryCategory.DDL),
+ DROP_INDEX(AuditLogEntryCategory.DDL),
+ GRANT(AuditLogEntryCategory.DCL),
+ REVOKE(AuditLogEntryCategory.DCL),
+ CREATE_TYPE(AuditLogEntryCategory.DDL),
+ DROP_AGGREGATE(AuditLogEntryCategory.DDL),
+ ALTER_VIEW(AuditLogEntryCategory.DDL),
+ CREATE_VIEW(AuditLogEntryCategory.DDL),
+ DROP_ROLE(AuditLogEntryCategory.DCL),
+ CREATE_FUNCTION(AuditLogEntryCategory.DDL),
+ ALTER_TABLE(AuditLogEntryCategory.DDL),
+ BATCH(AuditLogEntryCategory.DML),
+ CREATE_AGGREGATE(AuditLogEntryCategory.DDL),
+ DROP_VIEW(AuditLogEntryCategory.DDL),
+ DROP_TYPE(AuditLogEntryCategory.DDL),
+ DROP_FUNCTION(AuditLogEntryCategory.DDL),
+ ALTER_ROLE(AuditLogEntryCategory.DCL),
+ CREATE_TRIGGER(AuditLogEntryCategory.DDL),
+ LIST_ROLES(AuditLogEntryCategory.DCL),
+ LIST_PERMISSIONS(AuditLogEntryCategory.DCL),
+ ALTER_TYPE(AuditLogEntryCategory.DDL),
+ CREATE_ROLE(AuditLogEntryCategory.DCL),
+ USE_KEYSPACE(AuditLogEntryCategory.OTHER),
+
+ /*
+ * Common Audit Log Entry Types
+ */
+
+ REQUEST_FAILURE(AuditLogEntryCategory.ERROR),
+ LOGIN_ERROR(AuditLogEntryCategory.AUTH),
+ UNAUTHORIZED_ATTEMPT(AuditLogEntryCategory.AUTH),
+ LOGIN_SUCCESS(AuditLogEntryCategory.AUTH);
+
+ private final AuditLogEntryCategory category;
+
+ AuditLogEntryType(AuditLogEntryCategory category)
+ {
+ this.category = category;
+ }
+
+ public AuditLogEntryCategory getCategory()
+ {
+ return category;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/audit/AuditLogFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/audit/AuditLogFilter.java b/src/java/org/apache/cassandra/audit/AuditLogFilter.java
new file mode 100644
index 0000000..163114d
--- /dev/null
+++ b/src/java/org/apache/cassandra/audit/AuditLogFilter.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.audit;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AuditLogFilter
+{
+ private static final Logger logger = LoggerFactory.getLogger(AuditLogFilter.class);
+
+ private static ImmutableSet<String> EMPTY_FILTERS = ImmutableSet.of();
+
+ private final ImmutableSet<String> excludedKeyspaces;
+ private final ImmutableSet<String> includedKeyspaces;
+ private final ImmutableSet<String> excludedCategories;
+ private final ImmutableSet<String> includedCategories;
+ private final ImmutableSet<String> includedUsers;
+ private final ImmutableSet<String> excludedUsers;
+
+ private AuditLogFilter(ImmutableSet<String> excludedKeyspaces, ImmutableSet<String> includedKeyspaces, ImmutableSet<String> excludedCategories, ImmutableSet<String> includedCategories, ImmutableSet<String> excludedUsers, ImmutableSet<String> includedUsers)
+ {
+ this.excludedKeyspaces = excludedKeyspaces;
+ this.includedKeyspaces = includedKeyspaces;
+ this.excludedCategories = excludedCategories;
+ this.includedCategories = includedCategories;
+ this.includedUsers = includedUsers;
+ this.excludedUsers = excludedUsers;
+ }
+
+ /**
+ * (Re-)Loads filters from config. Called during startup as well as JMX invocations.
+ */
+ public static AuditLogFilter create(AuditLogOptions auditLogOptions)
+ {
+ logger.trace("Loading AuditLog filters");
+
+ IncludeExcludeHolder keyspaces = loadInputSets(auditLogOptions.included_keyspaces, auditLogOptions.excluded_keyspaces);
+ IncludeExcludeHolder categories = loadInputSets(auditLogOptions.included_categories, auditLogOptions.excluded_categories);
+ IncludeExcludeHolder users = loadInputSets(auditLogOptions.included_users, auditLogOptions.excluded_users);
+
+ return new AuditLogFilter(keyspaces.excludedSet, keyspaces.includedSet,
+ categories.excludedSet, categories.includedSet,
+ users.excludedSet, users.includedSet);
+ }
+
+ /**
+ * Constructs mutually exclusive sets of included and excluded data. When there is a conflict,
+ * the entry is put into the excluded set (and removed fron the included).
+ */
+ private static IncludeExcludeHolder loadInputSets(String includedInput, String excludedInput)
+ {
+ final ImmutableSet<String> excludedSet;
+ if (StringUtils.isEmpty(excludedInput))
+ {
+ excludedSet = EMPTY_FILTERS;
+ }
+ else
+ {
+ String[] excludes = excludedInput.split(",");
+ ImmutableSet.Builder<String> builder = ImmutableSet.builderWithExpectedSize(excludes.length);
+ for (String exclude : excludes)
+ {
+ if (!exclude.isEmpty())
+ {
+ builder.add(exclude);
+ }
+ }
+ excludedSet = builder.build();
+ }
+
+ final ImmutableSet<String> includedSet;
+ if (StringUtils.isEmpty(includedInput))
+ {
+ includedSet = EMPTY_FILTERS;
+ }
+ else
+ {
+ String[] includes = includedInput.split(",");
+ ImmutableSet.Builder<String> builder = ImmutableSet.builderWithExpectedSize(includes.length);
+ for (String include : includes)
+ {
+ //Ensure both included and excluded sets are mutually exclusive
+ if (!include.isEmpty() && !excludedSet.contains(include))
+ {
+ builder.add(include);
+ }
+ }
+ includedSet = builder.build();
+ }
+
+ return new IncludeExcludeHolder(includedSet, excludedSet);
+ }
+
+ /**
+ * Simple struct to hold inclusion/exclusion sets.
+ */
+ private static class IncludeExcludeHolder
+ {
+ private final ImmutableSet<String> includedSet;
+ private final ImmutableSet<String> excludedSet;
+
+ private IncludeExcludeHolder(ImmutableSet<String> includedSet, ImmutableSet<String> excludedSet)
+ {
+ this.includedSet = includedSet;
+ this.excludedSet = excludedSet;
+ }
+ }
+
+ /**
+ * Checks whether a give AuditLog Entry is filtered or not
+ *
+ * @param auditLogEntry AuditLogEntry to verify
+ * @return true if it is filtered, false otherwise
+ */
+ boolean isFiltered(AuditLogEntry auditLogEntry)
+ {
+ return isFiltered(auditLogEntry.getKeyspace(), includedKeyspaces, excludedKeyspaces)
+ || isFiltered(auditLogEntry.getType().getCategory().toString(), includedCategories, excludedCategories)
+ || isFiltered(auditLogEntry.getUser(), includedUsers, excludedUsers);
+ }
+
+ /**
+ * Checks whether given input is being filtered or not.
+ * If excludeSet does not contain any items, by default nothing is excluded (unless there are
+ * entries in the includeSet).
+ * If includeSet does not contain any items, by default everything is included
+ * If an input is part of both includeSet and excludeSet, excludeSet takes the priority over includeSet
+ *
+ * @param input Input to be checked for filtereing based on includeSet and excludeSet
+ * @param includeSet Include filtering set
+ * @param excludeSet Exclude filtering set
+ * @return true if the input is filtered, false when the input is not filtered
+ */
+ static boolean isFiltered(String input, Set<String> includeSet, Set<String> excludeSet)
+ {
+ if (!excludeSet.isEmpty() && excludeSet.contains(input))
+ return true;
+
+ return !(includeSet.isEmpty() || includeSet.contains(input));
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/audit/AuditLogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/audit/AuditLogManager.java b/src/java/org/apache/cassandra/audit/AuditLogManager.java
new file mode 100644
index 0000000..090499c
--- /dev/null
+++ b/src/java/org/apache/cassandra/audit/AuditLogManager.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.audit;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.statements.ParsedStatement;
+import org.apache.cassandra.exceptions.AuthenticationException;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.UnauthorizedException;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Central location for managing the logging of client/user-initated actions (like queries, log in commands, and so on).
+ *
+ * We can run multiple {@link IAuditLogger}s at the same time, including the standard audit logger ({@link #auditLogger}
+ * and the full query logger ({@link #fullQueryLogger}.
+ */
+public class AuditLogManager
+{
+ private static final Logger logger = LoggerFactory.getLogger(AuditLogManager.class);
+ private static final AuditLogManager instance = new AuditLogManager();
+
+ // FQL always writes to a BinLog, but it is a type of IAuditLogger
+ private final FullQueryLogger fullQueryLogger;
+ private final ImmutableSet<AuditLogEntryCategory> fqlIncludeFilter = ImmutableSet.of(AuditLogEntryCategory.OTHER,
+ AuditLogEntryCategory.QUERY,
+ AuditLogEntryCategory.DCL,
+ AuditLogEntryCategory.DML,
+ AuditLogEntryCategory.DDL);
+
+ // auditLogger can write anywhere, as it's pluggable (logback, BinLog, DiagnosticEvents, etc ...)
+ private volatile IAuditLogger auditLogger;
+
+ private volatile AuditLogFilter filter;
+ private volatile boolean isAuditLogEnabled;
+
+ private AuditLogManager()
+ {
+ fullQueryLogger = new FullQueryLogger();
+
+ if (DatabaseDescriptor.getAuditLoggingOptions().enabled)
+ {
+ logger.info("Audit logging is enabled.");
+ auditLogger = getAuditLogger(DatabaseDescriptor.getAuditLoggingOptions().logger);
+ isAuditLogEnabled = true;
+ }
+ else
+ {
+ logger.debug("Audit logging is disabled.");
+ isAuditLogEnabled = false;
+ auditLogger = new NoOpAuditLogger();
+ }
+
+ filter = AuditLogFilter.create(DatabaseDescriptor.getAuditLoggingOptions());
+ }
+
+ public static AuditLogManager getInstance()
+ {
+ return instance;
+ }
+
+ private IAuditLogger getAuditLogger(String loggerClassName) throws ConfigurationException
+ {
+ if (loggerClassName != null)
+ {
+ return FBUtilities.newAuditLogger(loggerClassName);
+ }
+
+ return FBUtilities.newAuditLogger(BinAuditLogger.class.getName());
+ }
+
+ @VisibleForTesting
+ public IAuditLogger getLogger()
+ {
+ return auditLogger;
+ }
+
+ public boolean isAuditingEnabled()
+ {
+ return isAuditLogEnabled;
+ }
+
+ public boolean isLoggingEnabled()
+ {
+ return isAuditingEnabled() || isFQLEnabled();
+ }
+
+ private boolean isFQLEnabled()
+ {
+ return fullQueryLogger.enabled();
+ }
+
+ private boolean isSystemKeyspace(String keyspaceName)
+ {
+ return SchemaConstants.isLocalSystemKeyspace(keyspaceName);
+ }
+
+ /**
+ * Logs AuditLogEntry to standard audit logger
+ * @param logEntry AuditLogEntry to be logged
+ */
+ private void logAuditLoggerEntry(AuditLogEntry logEntry)
+ {
+ if ((logEntry.getKeyspace() == null || !isSystemKeyspace(logEntry.getKeyspace()))
+ && !filter.isFiltered(logEntry))
+ {
+ auditLogger.log(logEntry);
+ }
+ }
+
+ /**
+ * Logs AudigLogEntry to both FQL and standard audit logger
+ * @param logEntry AuditLogEntry to be logged
+ */
+ public void log(AuditLogEntry logEntry)
+ {
+ if (logEntry == null)
+ return;
+
+ if (isAuditingEnabled())
+ {
+ logAuditLoggerEntry(logEntry);
+ }
+
+ if (isFQLEnabled() && fqlIncludeFilter.contains(logEntry.getType().getCategory()))
+ {
+ fullQueryLogger.log(logEntry);
+ }
+ }
+
+ public void log(AuditLogEntry logEntry, Exception e)
+ {
+ if ((logEntry != null) && (isAuditingEnabled()))
+ {
+ AuditLogEntry.Builder builder = new AuditLogEntry.Builder(logEntry);
+
+ if (e instanceof UnauthorizedException)
+ {
+ builder.setType(AuditLogEntryType.UNAUTHORIZED_ATTEMPT);
+ }
+ else if (e instanceof AuthenticationException)
+ {
+ builder.setType(AuditLogEntryType.LOGIN_ERROR);
+ }
+ else
+ {
+ builder.setType(AuditLogEntryType.REQUEST_FAILURE);
+ }
+
+ builder.appendToOperation(e.getMessage());
+
+ log(builder.build());
+ }
+ }
+
+ /**
+ * Logs Batch queries to both FQL and standard audit logger.
+ */
+ public void logBatch(String batchTypeName, List<Object> queryOrIdList, List<List<ByteBuffer>> values, List<ParsedStatement.Prepared> prepared, QueryOptions options, QueryState state, long queryStartTimeMillis)
+ {
+ if (isAuditingEnabled())
+ {
+ List<AuditLogEntry> entries = buildEntriesForBatch(queryOrIdList, prepared, state, options, queryStartTimeMillis);
+ for (AuditLogEntry auditLogEntry : entries)
+ {
+ logAuditLoggerEntry(auditLogEntry);
+ }
+ }
+
+ if (isFQLEnabled())
+ {
+ List<String> queryStrings = new ArrayList<>(queryOrIdList.size());
+ for (ParsedStatement.Prepared prepStatment : prepared)
+ {
+ queryStrings.add(prepStatment.rawCQLStatement);
+ }
+ fullQueryLogger.logBatch(batchTypeName, queryStrings, values, options, queryStartTimeMillis);
+ }
+ }
+
+ private static List<AuditLogEntry> buildEntriesForBatch(List<Object> queryOrIdList, List<ParsedStatement.Prepared> prepared, QueryState state, QueryOptions options, long queryStartTimeMillis)
+ {
+ List<AuditLogEntry> auditLogEntries = new ArrayList<>(queryOrIdList.size() + 1);
+ UUID batchId = UUID.randomUUID();
+ String queryString = String.format("BatchId:[%s] - BATCH of [%d] statements", batchId, queryOrIdList.size());
+ AuditLogEntry entry = new AuditLogEntry.Builder(state.getClientState())
+ .setOperation(queryString)
+ .setOptions(options)
+ .setTimestamp(queryStartTimeMillis)
+ .setBatch(batchId)
+ .setType(AuditLogEntryType.BATCH)
+ .build();
+ auditLogEntries.add(entry);
+
+ for (int i = 0; i < queryOrIdList.size(); i++)
+ {
+ CQLStatement statement = prepared.get(i).statement;
+ entry = new AuditLogEntry.Builder(state.getClientState())
+ .setType(statement.getAuditLogContext().auditLogEntryType)
+ .setOperation(prepared.get(i).rawCQLStatement)
+ .setTimestamp(queryStartTimeMillis)
+ .setScope(statement)
+ .setKeyspace(state, statement)
+ .setOptions(options)
+ .setBatch(batchId)
+ .build();
+ auditLogEntries.add(entry);
+ }
+
+ return auditLogEntries;
+ }
+
+ /**
+ * Disables AuditLog, designed to be invoked only via JMX/ Nodetool, not from anywhere else in the codepath.
+ */
+ public synchronized void disableAuditLog()
+ {
+ if (isAuditLogEnabled)
+ {
+ // Disable isAuditLogEnabled before attempting to cleanup/ stop AuditLogger so that any incoming log() requests will be dropped.
+ isAuditLogEnabled = false;
+ IAuditLogger oldLogger = auditLogger;
+ auditLogger = new NoOpAuditLogger();
+ oldLogger.stop();
+ }
+ }
+
+ /**
+ * Enables AuditLog, designed to be invoked only via JMX/ Nodetool, not from anywhere else in the codepath.
+ * @param auditLogOptions AuditLogOptions to be used for enabling AuditLog
+ * @throws ConfigurationException It can throw configuration exception when provided logger class does not exist in the classpath
+ */
+ public synchronized void enableAuditLog(AuditLogOptions auditLogOptions) throws ConfigurationException
+ {
+ if (isFQLEnabled() && fullQueryLogger.path().toString().equals(auditLogOptions.audit_logs_dir))
+ throw new IllegalArgumentException(String.format("audit log path (%s) cannot be the same as the " +
+ "running full query logger (%s)",
+ auditLogOptions.audit_logs_dir,
+ fullQueryLogger.path()));
+
+ // always reload the filters
+ filter = AuditLogFilter.create(auditLogOptions);
+
+ // next, check to see if we're changing the logging implementation; if not, keep the same instance and bail.
+ // note: auditLogger should never be null
+ IAuditLogger oldLogger = auditLogger;
+ if (oldLogger.getClass().getSimpleName().equals(auditLogOptions.logger))
+ return;
+
+ auditLogger = getAuditLogger(auditLogOptions.logger);
+ isAuditLogEnabled = true;
+
+ // ensure oldLogger's stop() is called after we swap it with new logger,
+ // otherwise, we might be calling log() on the stopped logger.
+ oldLogger.stop();
+ }
+
+ public void configureFQL(Path path, String rollCycle, boolean blocking, int maxQueueWeight, long maxLogSize)
+ {
+ if (path.equals(auditLogger.path()))
+ throw new IllegalArgumentException(String.format("fullquerylogger path (%s) cannot be the same as the " +
+ "running audit logger (%s)",
+ path,
+ auditLogger.path()));
+
+ fullQueryLogger.configure(path, rollCycle, blocking, maxQueueWeight, maxLogSize);
+ }
+
+ public void resetFQL(String fullQueryLogPath)
+ {
+ fullQueryLogger.reset(fullQueryLogPath);
+ }
+
+ public void disableFQL()
+ {
+ fullQueryLogger.stop();
+ }
+
+ /**
+ * ONLY FOR TESTING
+ */
+ FullQueryLogger getFullQueryLogger()
+ {
+ return fullQueryLogger;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/audit/AuditLogOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/audit/AuditLogOptions.java b/src/java/org/apache/cassandra/audit/AuditLogOptions.java
new file mode 100644
index 0000000..1888c45
--- /dev/null
+++ b/src/java/org/apache/cassandra/audit/AuditLogOptions.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.audit;
+
+import org.apache.commons.lang3.StringUtils;
+
+public class AuditLogOptions
+{
+ public volatile boolean enabled = false;
+ public String logger = BinAuditLogger.class.getSimpleName();
+ public String included_keyspaces = StringUtils.EMPTY;
+ public String excluded_keyspaces = StringUtils.EMPTY;
+ public String included_categories = StringUtils.EMPTY;
+ public String excluded_categories = StringUtils.EMPTY;
+ public String included_users = StringUtils.EMPTY;
+ public String excluded_users = StringUtils.EMPTY;
+
+ /**
+ * AuditLogs directory can be configured using `cassandra.logdir.audit` or default is set to `cassandra.logdir` + /audit/
+ */
+ public String audit_logs_dir = System.getProperty("cassandra.logdir.audit",
+ System.getProperty("cassandra.logdir",".")+"/audit/");
+ /**
+ * Indicates if the AuditLog should block if the it falls behind or should drop audit log records.
+ * Default is set to true so that AuditLog records wont be lost
+ */
+ public boolean block = true;
+
+ /**
+ * Maximum weight of in memory queue for records waiting to be written to the audit log file
+ * before blocking or dropping the log records. For advanced configurations
+ */
+ public int max_queue_weight = 256 * 1024 * 1024;
+
+ /**
+ * Maximum size of the rolled files to retain on disk before deleting the oldest file. For advanced configurations
+ */
+ public long max_log_size = 16L * 1024L * 1024L * 1024L;
+
+ /**
+ * How often to roll Audit log segments so they can potentially be reclaimed. Available options are:
+ * MINUTELY, HOURLY, DAILY, LARGE_DAILY, XLARGE_DAILY, HUGE_DAILY.
+ * For more options, refer: net.openhft.chronicle.queue.RollCycles
+ */
+ public String roll_cycle = "HOURLY";
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/audit/BinAuditLogger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/audit/BinAuditLogger.java b/src/java/org/apache/cassandra/audit/BinAuditLogger.java
new file mode 100644
index 0000000..89b764c
--- /dev/null
+++ b/src/java/org/apache/cassandra/audit/BinAuditLogger.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.audit;
+
+import java.nio.file.Paths;
+
+import com.google.common.primitives.Ints;
+
+import net.openhft.chronicle.wire.WireOut;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.binlog.BinLog;
+import org.apache.cassandra.utils.concurrent.WeightedQueue;
+
+public class BinAuditLogger extends BinLogAuditLogger implements IAuditLogger
+{
+ public BinAuditLogger()
+ {
+ // due to the way that IAuditLogger instance are created in AuditLogManager, via reflection, we can't assume
+ // the manager will call configure() (it won't). thus, we have to call it here from the constructor.
+ AuditLogOptions auditLoggingOptions = DatabaseDescriptor.getAuditLoggingOptions();
+ configure(Paths.get(auditLoggingOptions.audit_logs_dir),
+ auditLoggingOptions.roll_cycle,
+ auditLoggingOptions.block,
+ auditLoggingOptions.max_queue_weight,
+ auditLoggingOptions.max_log_size,
+ false);
+ }
+
+ @Override
+ public void log(AuditLogEntry auditLogEntry)
+ {
+ BinLog binLog = this.binLog;
+ if (binLog == null || auditLogEntry == null)
+ {
+ return;
+ }
+
+ super.logRecord(new WeighableMarshallableMessage(auditLogEntry.getLogString()), binLog);
+ }
+
+ static class WeighableMarshallableMessage extends BinLog.ReleaseableWriteMarshallable implements WeightedQueue.Weighable
+ {
+ private final String message;
+
+ WeighableMarshallableMessage(String message)
+ {
+ this.message = message;
+ }
+
+ @Override
+ public void writeMarshallable(WireOut wire)
+ {
+ wire.write("type").text("AuditLog");
+ wire.write("message").text(message);
+ }
+
+ @Override
+ public void release()
+ {
+
+ }
+
+ @Override
+ public int weight()
+ {
+ return Ints.checkedCast(ObjectSizes.sizeOf(message));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/audit/BinLogAuditLogger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/audit/BinLogAuditLogger.java b/src/java/org/apache/cassandra/audit/BinLogAuditLogger.java
new file mode 100644
index 0000000..a2426b9
--- /dev/null
+++ b/src/java/org/apache/cassandra/audit/BinLogAuditLogger.java
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.audit;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import com.google.common.primitives.Ints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.buffer.ByteBuf;
+import net.openhft.chronicle.bytes.BytesStore;
+import net.openhft.chronicle.queue.RollCycles;
+import net.openhft.chronicle.wire.WireOut;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.io.FSError;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.transport.CBUtil;
+import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.utils.NoSpamLogger;
+import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.Throwables;
+import org.apache.cassandra.utils.binlog.BinLog;
+import org.apache.cassandra.utils.concurrent.WeightedQueue;
+import org.github.jamm.MemoryLayoutSpecification;
+
+abstract class BinLogAuditLogger implements IAuditLogger
+{
+ static final int EMPTY_BYTEBUFFER_SIZE = Ints.checkedCast(ObjectSizes.sizeOnHeapExcludingData(ByteBuffer.allocate(0)));
+ static final int EMPTY_LIST_SIZE = Ints.checkedCast(ObjectSizes.measureDeep(new ArrayList(0)));
+ private static final int EMPTY_BYTEBUF_SIZE;
+ private static final int OBJECT_HEADER_SIZE = MemoryLayoutSpecification.SPEC.getObjectHeaderSize();
+ static
+ {
+ int tempSize = 0;
+ ByteBuf buf = CBUtil.allocator.buffer(0, 0);
+ try
+ {
+ tempSize = Ints.checkedCast(ObjectSizes.measure(buf));
+ }
+ finally
+ {
+ buf.release();
+ }
+ EMPTY_BYTEBUF_SIZE = tempSize;
+ }
+
+ protected static final Logger logger = LoggerFactory.getLogger(BinLogAuditLogger.class);
+ private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES);
+ private static final NoSpamLogger.NoSpamLogStatement droppedSamplesStatement = noSpamLogger.getStatement("Dropped {} binary log samples", 1, TimeUnit.MINUTES);
+
+ volatile BinLog binLog;
+ protected volatile boolean blocking;
+ protected Path path;
+
+ private final AtomicLong droppedSamplesSinceLastLog = new AtomicLong();
+
+ /**
+ * Configure the global instance of the FullQueryLogger. Clean the provided directory before starting
+ * @param path Dedicated path where the FQL can store it's files.
+ * @param rollCycle How often to roll FQL log segments so they can potentially be reclaimed
+ * @param blocking Whether the FQL should block if the FQL falls behind or should drop log records
+ * @param maxQueueWeight Maximum weight of in memory queue for records waiting to be written to the file before blocking or dropping
+ * @param maxLogSize Maximum size of the rolled files to retain on disk before deleting the oldest file
+ */
+ public synchronized void configure(Path path, String rollCycle, boolean blocking, int maxQueueWeight, long maxLogSize)
+ {
+ this.configure(path, rollCycle, blocking, maxQueueWeight, maxLogSize, true);
+ }
+
+ /**
+ * Configure the global instance of the FullQueryLogger
+ * @param path Dedicated path where the FQL can store it's files.
+ * @param rollCycle How often to roll FQL log segments so they can potentially be reclaimed
+ * @param blocking Whether the FQL should block if the FQL falls behind or should drop log records
+ * @param maxQueueWeight Maximum weight of in memory queue for records waiting to be written to the file before blocking or dropping
+ * @param maxLogSize Maximum size of the rolled files to retain on disk before deleting the oldest file
+ * @param cleanDirectory Indicates to clean the directory before starting FullQueryLogger or not
+ */
+ public synchronized void configure(Path path, String rollCycle, boolean blocking, int maxQueueWeight, long maxLogSize, boolean cleanDirectory)
+ {
+ Preconditions.checkNotNull(path, "path was null");
+ File pathAsFile = path.toFile();
+ Preconditions.checkNotNull(rollCycle, "rollCycle was null");
+ rollCycle = rollCycle.toUpperCase();
+
+ //Exists and is a directory or can be created
+ Preconditions.checkArgument((pathAsFile.exists() && pathAsFile.isDirectory()) || (!pathAsFile.exists() && pathAsFile.mkdirs()), "path exists and is not a directory or couldn't be created");
+ Preconditions.checkArgument(pathAsFile.canRead() && pathAsFile.canWrite() && pathAsFile.canExecute(), "path is not readable, writable, and executable");
+ Preconditions.checkNotNull(RollCycles.valueOf(rollCycle), "unrecognized roll cycle");
+ Preconditions.checkArgument(maxQueueWeight > 0, "maxQueueWeight must be > 0");
+ Preconditions.checkArgument(maxLogSize > 0, "maxLogSize must be > 0");
+ logger.info("Attempting to configure full query logger path: {} Roll cycle: {} Blocking: {} Max queue weight: {} Max log size:{}", path, rollCycle, blocking, maxQueueWeight, maxLogSize);
+
+ if (binLog != null)
+ {
+ logger.warn("Full query logger already configured. Ignoring requested configuration.");
+ throw new IllegalStateException("Already configured");
+ }
+
+ if (cleanDirectory)
+ {
+ logger.info("Cleaning directory: {} as requested",path);
+ if (path.toFile().exists())
+ {
+ Throwable error = cleanDirectory(path.toFile(), null);
+ if (error != null)
+ {
+ throw new RuntimeException(error);
+ }
+ }
+ }
+
+ this.path = path;
+ this.blocking = blocking;
+ binLog = new BinLog(path, RollCycles.valueOf(rollCycle), maxQueueWeight, maxLogSize);
+ binLog.start();
+ }
+
+ public Path path()
+ {
+ return path;
+ }
+
+ /**
+ * Need the path as a parameter as well because if the process is restarted the config file might be the only
+ * location for retrieving the path to the full query log files, but JMX also allows you to specify a path
+ * that isn't persisted anywhere so we have to clean that one a well.
+ */
+ public synchronized void reset(String fullQueryLogPath)
+ {
+ try
+ {
+ Set<File> pathsToClean = Sets.newHashSet();
+
+ //First decide whether to clean the path configured in the YAML
+ if (fullQueryLogPath != null)
+ {
+ File fullQueryLogPathFile = new File(fullQueryLogPath);
+ if (fullQueryLogPathFile.exists())
+ {
+ pathsToClean.add(fullQueryLogPathFile);
+ }
+ }
+
+ //Then decide whether to clean the last used path, possibly configured by JMX
+ if (path != null)
+ {
+ File pathFile = path.toFile();
+ if (pathFile.exists())
+ {
+ pathsToClean.add(pathFile);
+ }
+ }
+
+ logger.info("Reset (and deactivation) of full query log requested.");
+ if (binLog != null)
+ {
+ logger.info("Stopping full query log. Cleaning {}.", pathsToClean);
+ binLog.stop();
+ binLog = null;
+ }
+ else
+ {
+ logger.info("Full query log already deactivated. Cleaning {}.", pathsToClean);
+ }
+
+ Throwable accumulate = null;
+ for (File f : pathsToClean)
+ {
+ accumulate = cleanDirectory(f, accumulate);
+ }
+ if (accumulate != null)
+ {
+ throw new RuntimeException(accumulate);
+ }
+ }
+ catch (Exception e)
+ {
+ if (e instanceof RuntimeException)
+ {
+ throw (RuntimeException)e;
+ }
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Stop the full query log leaving behind any generated files.
+ */
+ public synchronized void stop()
+ {
+ try
+ {
+ logger.info("Deactivation of full query log requested.");
+ if (binLog != null)
+ {
+ logger.info("Stopping full query log");
+ binLog.stop();
+ binLog = null;
+ }
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Check whether the full query log is enabled.
+ * @return true if records are recorded and false otherwise.
+ */
+ public boolean enabled()
+ {
+ return binLog != null;
+ }
+
+ void logRecord(BinLog.ReleaseableWriteMarshallable record, BinLog binLog)
+ {
+ boolean putInQueue = false;
+ try
+ {
+ if (blocking)
+ {
+ try
+ {
+ binLog.put(record);
+ putInQueue = true;
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ else
+ {
+ if (!binLog.offer(record))
+ {
+ logDroppedSample();
+ }
+ else
+ {
+ putInQueue = true;
+ }
+ }
+ }
+ finally
+ {
+ if (!putInQueue)
+ {
+ record.release();
+ }
+ }
+ }
+
+ /**
+ * This is potentially lossy, but it's not super critical as we will always generally know
+ * when this is happening and roughly how bad it is.
+ */
+ private void logDroppedSample()
+ {
+ droppedSamplesSinceLastLog.incrementAndGet();
+ if (droppedSamplesStatement.warn(new Object[] {droppedSamplesSinceLastLog.get()}))
+ {
+ droppedSamplesSinceLastLog.set(0);
+ }
+ }
+
+ protected static abstract class AbstractWeighableMarshallable extends BinLog.ReleaseableWriteMarshallable implements WeightedQueue.Weighable
+ {
+ private final ByteBuf queryOptionsBuffer;
+ private final long timeMillis;
+ private final int protocolVersion;
+
+ AbstractWeighableMarshallable(QueryOptions queryOptions, long timeMillis)
+ {
+ this.timeMillis = timeMillis;
+ ProtocolVersion version = queryOptions.getProtocolVersion();
+ this.protocolVersion = version.asInt();
+ int optionsSize = QueryOptions.codec.encodedSize(queryOptions, version);
+ queryOptionsBuffer = CBUtil.allocator.buffer(optionsSize, optionsSize);
+ /*
+ * Struggled with what tradeoff to make in terms of query options which is potentially large and complicated
+ * There is tension between low garbage production (or allocator overhead), small working set size, and CPU overhead reserializing the
+ * query options into binary format.
+ *
+ * I went with the lowest risk most predictable option which is allocator overhead and CPU overhead
+ * rather then keep the original query message around so I could just serialize that as a memcpy. It's more
+ * instructions when turned on, but it doesn't change memory footprint quite as much and it's more pay for what you use
+ * in terms of query volume. The CPU overhead is spread out across producers so we should at least get
+ * some scaling.
+ *
+ */
+ boolean success = false;
+ try
+ {
+ QueryOptions.codec.encode(queryOptions, queryOptionsBuffer, version);
+ success = true;
+ }
+ finally
+ {
+ if (!success)
+ {
+ queryOptionsBuffer.release();
+ }
+ }
+ }
+
+ @Override
+ public void writeMarshallable(WireOut wire)
+ {
+ wire.write("protocol-version").int32(protocolVersion);
+ wire.write("query-options").bytes(BytesStore.wrap(queryOptionsBuffer.nioBuffer()));
+ wire.write("query-time").int64(timeMillis);
+ }
+
+ @Override
+ public void release()
+ {
+ queryOptionsBuffer.release();
+ }
+
+ //8-bytes for protocol version (assume alignment cost), 8-byte timestamp, 8-byte object header + other contents
+ @Override
+ public int weight()
+ {
+ return 8 + 8 + OBJECT_HEADER_SIZE + EMPTY_BYTEBUF_SIZE + queryOptionsBuffer.capacity();
+ }
+ }
+
+ private static Throwable cleanDirectory(File directory, Throwable accumulate)
+ {
+ if (!directory.exists())
+ {
+ return Throwables.merge(accumulate, new RuntimeException(String.format("%s does not exists", directory)));
+ }
+ if (!directory.isDirectory())
+ {
+ return Throwables.merge(accumulate, new RuntimeException(String.format("%s is not a directory", directory)));
+ }
+ for (File f : directory.listFiles())
+ {
+ accumulate = deleteRecursively(f, accumulate);
+ }
+ if (accumulate instanceof FSError)
+ {
+ FileUtils.handleFSError((FSError)accumulate);
+ }
+ return accumulate;
+ }
+
+ private static Throwable deleteRecursively(File fileOrDirectory, Throwable accumulate)
+ {
+ if (fileOrDirectory.isDirectory())
+ {
+ for (File f : fileOrDirectory.listFiles())
+ {
+ accumulate = FileUtils.deleteWithConfirm(f, true, accumulate);
+ }
+ }
+ return FileUtils.deleteWithConfirm(fileOrDirectory, true , accumulate);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/audit/FileAuditLogger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/audit/FileAuditLogger.java b/src/java/org/apache/cassandra/audit/FileAuditLogger.java
new file mode 100644
index 0000000..9490bdd
--- /dev/null
+++ b/src/java/org/apache/cassandra/audit/FileAuditLogger.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.audit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Synchronous, file-based audit logger; just uses the standard logging mechansim.
+ */
+public class FileAuditLogger implements IAuditLogger
+{
+ protected static final Logger logger = LoggerFactory.getLogger(FileAuditLogger.class);
+
+ private volatile boolean enabled;
+
+ public FileAuditLogger()
+ {
+ enabled = true;
+ }
+
+ @Override
+ public boolean enabled()
+ {
+ return enabled;
+ }
+
+ @Override
+ public void log(AuditLogEntry auditLogEntry)
+ {
+ // don't bother with the volatile read of enabled here. just go ahead and log, other components
+ // will check the enbaled field.
+ logger.info(auditLogEntry.getLogString());
+ }
+
+ @Override
+ public void stop()
+ {
+ enabled = false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/audit/FullQueryLogger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/audit/FullQueryLogger.java b/src/java/org/apache/cassandra/audit/FullQueryLogger.java
new file mode 100644
index 0000000..36d0127
--- /dev/null
+++ b/src/java/org/apache/cassandra/audit/FullQueryLogger.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.audit;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+
+import net.openhft.chronicle.bytes.BytesStore;
+import net.openhft.chronicle.wire.ValueOut;
+import net.openhft.chronicle.wire.WireOut;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.binlog.BinLog;
+
+/**
+ * A logger that logs entire query contents after the query finishes (or times out).
+ */
+public class FullQueryLogger extends BinLogAuditLogger implements IAuditLogger
+{
+ @Override
+ public void log(AuditLogEntry entry)
+ {
+ logQuery(entry.getOperation(), entry.getOptions(), entry.getTimestamp());
+ }
+
+ /**
+ * Log an invocation of a batch of queries
+ * @param type The type of the batch
+ * @param queries CQL text of the queries
+ * @param values Values to bind to as parameters for the queries
+ * @param queryOptions Options associated with the query invocation
+ * @param batchTimeMillis Approximate time in milliseconds since the epoch since the batch was invoked
+ */
+ void logBatch(String type, List<String> queries, List<List<ByteBuffer>> values, QueryOptions queryOptions, long batchTimeMillis)
+ {
+ Preconditions.checkNotNull(type, "type was null");
+ Preconditions.checkNotNull(queries, "queries was null");
+ Preconditions.checkNotNull(values, "value was null");
+ Preconditions.checkNotNull(queryOptions, "queryOptions was null");
+ Preconditions.checkArgument(batchTimeMillis > 0, "batchTimeMillis must be > 0");
+
+ //Don't construct the wrapper if the log is disabled
+ BinLog binLog = this.binLog;
+ if (binLog == null)
+ {
+ return;
+ }
+
+ WeighableMarshallableBatch wrappedBatch = new WeighableMarshallableBatch(type, queries, values, queryOptions, batchTimeMillis);
+ logRecord(wrappedBatch, binLog);
+ }
+
+ /**
+ * Log a single CQL query
+ * @param query CQL query text
+ * @param queryOptions Options associated with the query invocation
+ * @param queryTimeMillis Approximate time in milliseconds since the epoch since the batch was invoked
+ */
+ void logQuery(String query, QueryOptions queryOptions, long queryTimeMillis)
+ {
+ Preconditions.checkNotNull(query, "query was null");
+ Preconditions.checkNotNull(queryOptions, "queryOptions was null");
+ Preconditions.checkArgument(queryTimeMillis > 0, "queryTimeMillis must be > 0");
+
+ //Don't construct the wrapper if the log is disabled
+ BinLog binLog = this.binLog;
+ if (binLog == null)
+ {
+ return;
+ }
+
+ WeighableMarshallableQuery wrappedQuery = new WeighableMarshallableQuery(query, queryOptions, queryTimeMillis);
+ logRecord(wrappedQuery, binLog);
+ }
+
+ static class WeighableMarshallableBatch extends AbstractWeighableMarshallable
+ {
+ private final int weight;
+ private final String batchType;
+ private final List<String> queries;
+ private final List<List<ByteBuffer>> values;
+
+ public WeighableMarshallableBatch(String batchType, List<String> queries, List<List<ByteBuffer>> values, QueryOptions queryOptions, long batchTimeMillis)
+ {
+ super(queryOptions, batchTimeMillis);
+ this.queries = queries;
+ this.values = values;
+ this.batchType = batchType;
+ boolean success = false;
+ try
+ {
+ //weight, batch type, queries, values
+ int weightTemp = 8 + EMPTY_LIST_SIZE + EMPTY_LIST_SIZE;
+ for (int ii = 0; ii < queries.size(); ii++)
+ {
+ weightTemp += ObjectSizes.sizeOf(queries.get(ii));
+ }
+
+ weightTemp += EMPTY_LIST_SIZE * values.size();
+ for (int ii = 0; ii < values.size(); ii++)
+ {
+ List<ByteBuffer> sublist = values.get(ii);
+ weightTemp += EMPTY_BYTEBUFFER_SIZE * sublist.size();
+ for (int zz = 0; zz < sublist.size(); zz++)
+ {
+ weightTemp += sublist.get(zz).capacity();
+ }
+ }
+ weightTemp += super.weight();
+ weightTemp += ObjectSizes.sizeOf(batchType);
+ weight = weightTemp;
+ success = true;
+ }
+ finally
+ {
+ if (!success)
+ {
+ release();
+ }
+ }
+ }
+
+ @Override
+ public void writeMarshallable(WireOut wire)
+ {
+ wire.write("type").text("batch");
+ super.writeMarshallable(wire);
+ wire.write("batch-type").text(batchType);
+ ValueOut valueOut = wire.write("queries");
+ valueOut.int32(queries.size());
+ for (String query : queries)
+ {
+ valueOut.text(query);
+ }
+ valueOut = wire.write("values");
+ valueOut.int32(values.size());
+ for (List<ByteBuffer> subValues : values)
+ {
+ valueOut.int32(subValues.size());
+ for (ByteBuffer value : subValues)
+ {
+ valueOut.bytes(BytesStore.wrap(value));
+ }
+ }
+ }
+
+ @Override
+ public int weight()
+ {
+ return weight;
+ }
+ }
+
+ static class WeighableMarshallableQuery extends AbstractWeighableMarshallable
+ {
+ private final String query;
+
+ public WeighableMarshallableQuery(String query, QueryOptions queryOptions, long queryTimeMillis)
+ {
+ super(queryOptions, queryTimeMillis);
+ this.query = query;
+ }
+
+ @Override
+ public void writeMarshallable(WireOut wire)
+ {
+ wire.write("type").text("single");
+ super.writeMarshallable(wire);
+ wire.write("query").text(query);
+ }
+
+ @Override
+ public int weight()
+ {
+ return Ints.checkedCast(ObjectSizes.sizeOf(query)) + super.weight();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/audit/IAuditLogContext.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/audit/IAuditLogContext.java b/src/java/org/apache/cassandra/audit/IAuditLogContext.java
new file mode 100644
index 0000000..55c3e04
--- /dev/null
+++ b/src/java/org/apache/cassandra/audit/IAuditLogContext.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.audit;
+
+import org.apache.cassandra.cql3.CQLStatement;
+
+/**
+ * Provides the context needed for audit logging statements.
+ * {@link CQLStatement} implements this interface such that every CQL command provides the context needed for AuditLog.
+ */
+public interface IAuditLogContext
+{
+ AuditLogContext getAuditLogContext();
+
+ static class AuditLogContext
+ {
+ public final AuditLogEntryType auditLogEntryType;
+ public final String keyspace;
+ public final String scope;
+
+ public AuditLogContext(AuditLogEntryType auditLogEntryType)
+ {
+ this(auditLogEntryType,null,null);
+ }
+
+ public AuditLogContext(AuditLogEntryType auditLogEntryType, String keyspace)
+ {
+ this(auditLogEntryType,keyspace,null);
+ }
+
+ public AuditLogContext(AuditLogEntryType auditLogEntryType, String keyspace, String scope)
+ {
+ this.auditLogEntryType = auditLogEntryType;
+ this.keyspace = keyspace;
+ this.scope = scope;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/audit/IAuditLogger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/audit/IAuditLogger.java b/src/java/org/apache/cassandra/audit/IAuditLogger.java
new file mode 100644
index 0000000..b72a256
--- /dev/null
+++ b/src/java/org/apache/cassandra/audit/IAuditLogger.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.audit;
+
+import java.nio.file.Path;
+
+public interface IAuditLogger
+{
+ boolean enabled();
+
+ /**
+ * Logs AuditLogEntry. This method might be called after {@link #stop()},
+ * hence implementations need to handle the race condition.
+ */
+ void log(AuditLogEntry auditLogEntry);
+
+ /**
+ * Stop and cleanup any resources of IAuditLogger implementations. Please note that
+ * {@link #log(AuditLogEntry)} might be called after being stopped.
+ */
+ void stop();
+
+ /**
+ * @return the path to the logging files/directory if the implemenation writes out to the local filesystem,
+ * or null if the implementation doesn't log locally.
+ */
+ default Path path()
+ {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/audit/NoOpAuditLogger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/audit/NoOpAuditLogger.java b/src/java/org/apache/cassandra/audit/NoOpAuditLogger.java
new file mode 100644
index 0000000..8d3dd7d
--- /dev/null
+++ b/src/java/org/apache/cassandra/audit/NoOpAuditLogger.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.audit;
+
+/**
+ * No-Op implementation of {@link IAuditLogger} to be used as a default audit logger when audit logging is disabled.
+ */
+public class NoOpAuditLogger implements IAuditLogger
+{
+ @Override
+ public boolean enabled()
+ {
+ return false;
+ }
+
+ @Override
+ public void log(AuditLogEntry logMessage)
+ {
+
+ }
+
+ @Override
+ public void stop()
+ {
+
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org