You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2018/05/11 12:46:01 UTC

[4/4] cassandra git commit: Audit logging for database activity

Audit logging for database activity

patch by Vinay Chella; reviewed by jasobrown for CASSANDRA-12151


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f56871b8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f56871b8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f56871b8

Branch: refs/heads/trunk
Commit: f56871b88be1e8965f166769c12cfa43313bac74
Parents: aba582f
Author: Vinay Chella <vi...@gmail.com>
Authored: Fri Feb 23 20:16:16 2018 -0800
Committer: Jason Brown <ja...@gmail.com>
Committed: Fri May 11 05:44:16 2018 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |   2 +
 conf/cassandra.yaml                             |  14 +
 doc/source/operating/audit_logging.rst          | 203 ++++++
 .../apache/cassandra/audit/AuditLogEntry.java   | 298 ++++++++
 .../cassandra/audit/AuditLogEntryCategory.java  |  27 +
 .../cassandra/audit/AuditLogEntryType.java      |  83 +++
 .../apache/cassandra/audit/AuditLogFilter.java  | 162 +++++
 .../apache/cassandra/audit/AuditLogManager.java | 317 +++++++++
 .../apache/cassandra/audit/AuditLogOptions.java |  61 ++
 .../apache/cassandra/audit/BinAuditLogger.java  |  85 +++
 .../cassandra/audit/BinLogAuditLogger.java      | 387 +++++++++++
 .../apache/cassandra/audit/FileAuditLogger.java |  57 ++
 .../apache/cassandra/audit/FullQueryLogger.java | 197 ++++++
 .../cassandra/audit/IAuditLogContext.java       |  53 ++
 .../apache/cassandra/audit/IAuditLogger.java    |  47 ++
 .../apache/cassandra/audit/NoOpAuditLogger.java |  42 ++
 .../org/apache/cassandra/config/Config.java     |   3 +
 .../cassandra/config/DatabaseDescriptor.java    |  11 +
 .../org/apache/cassandra/cql3/CQLStatement.java |   3 +-
 .../cql3/statements/AlterKeyspaceStatement.java |   7 +
 .../cql3/statements/AlterRoleStatement.java     |   7 +
 .../cql3/statements/AlterTableStatement.java    |   7 +
 .../cql3/statements/AlterTypeStatement.java     |   7 +
 .../cql3/statements/AlterViewStatement.java     |   7 +
 .../cql3/statements/BatchStatement.java         |   7 +
 .../cassandra/cql3/statements/CFStatement.java  |   2 +-
 .../statements/CreateAggregateStatement.java    |   7 +
 .../statements/CreateFunctionStatement.java     |   7 +
 .../cql3/statements/CreateIndexStatement.java   |   6 +
 .../statements/CreateKeyspaceStatement.java     |   7 +
 .../cql3/statements/CreateRoleStatement.java    |   6 +
 .../cql3/statements/CreateTableStatement.java   |   6 +
 .../cql3/statements/CreateTriggerStatement.java |   8 +
 .../cql3/statements/CreateTypeStatement.java    |   7 +
 .../cql3/statements/CreateViewStatement.java    |   6 +
 .../cql3/statements/DeleteStatement.java        |   6 +
 .../cql3/statements/DropAggregateStatement.java |   7 +
 .../cql3/statements/DropFunctionStatement.java  |   8 +-
 .../cql3/statements/DropIndexStatement.java     |   7 +
 .../cql3/statements/DropKeyspaceStatement.java  |   7 +
 .../cql3/statements/DropRoleStatement.java      |   7 +
 .../cql3/statements/DropTableStatement.java     |   7 +
 .../cql3/statements/DropTriggerStatement.java   |   7 +
 .../cql3/statements/DropTypeStatement.java      |   9 +-
 .../cql3/statements/DropViewStatement.java      |   7 +
 .../statements/GrantPermissionsStatement.java   |   9 +
 .../cql3/statements/GrantRoleStatement.java     |   7 +
 .../statements/ListPermissionsStatement.java    |   7 +
 .../cql3/statements/ListRolesStatement.java     |   7 +
 .../statements/RevokePermissionsStatement.java  |   8 +
 .../cql3/statements/RevokeRoleStatement.java    |   7 +
 .../cql3/statements/SelectStatement.java        |   8 +
 .../cql3/statements/TruncateStatement.java      |   7 +
 .../cql3/statements/UpdateStatement.java        |   7 +
 .../cassandra/cql3/statements/UseStatement.java |   7 +
 .../db/fullquerylog/FullQueryLogger.java        | 530 --------------
 .../apache/cassandra/service/StorageProxy.java  |   9 +-
 .../cassandra/service/StorageService.java       |  40 ++
 .../cassandra/service/StorageServiceMBean.java  |   5 +
 .../org/apache/cassandra/tools/NodeProbe.java   |  10 +
 .../org/apache/cassandra/tools/NodeTool.java    |   4 +-
 .../apache/cassandra/tools/fqltool/Dump.java    |  64 +-
 .../tools/nodetool/DisableAuditLog.java         |  33 +
 .../tools/nodetool/EnableAuditLog.java          |  55 ++
 .../org/apache/cassandra/transport/Message.java |   5 +
 .../transport/messages/AuthResponse.java        |  18 +
 .../transport/messages/BatchMessage.java        |  59 +-
 .../transport/messages/ExecuteMessage.java      |  61 +-
 .../transport/messages/PrepareMessage.java      |  30 +-
 .../transport/messages/QueryMessage.java        |  36 +-
 .../org/apache/cassandra/utils/FBUtilities.java |  24 +
 .../apache/cassandra/utils/binlog/BinLog.java   |   2 +-
 .../cassandra/audit/AuditLogFilterTest.java     | 189 +++++
 .../apache/cassandra/audit/AuditLoggerTest.java | 690 +++++++++++++++++++
 .../cassandra/audit/BinAuditLoggerTest.java     |  91 +++
 .../cassandra/audit/FullQueryLoggerTest.java    | 610 ++++++++++++++++
 .../cassandra/audit/InMemoryAuditLogger.java    |  47 ++
 .../config/DatabaseDescriptorRefTest.java       |   5 +
 .../org/apache/cassandra/cql3/CQLTester.java    |   7 +
 .../db/fullquerylog/FullQueryLoggerTest.java    | 601 ----------------
 .../service/StorageServiceServerTest.java       |  28 +
 .../cassandra/utils/binlog/BinLogTest.java      |  12 +-
 83 files changed, 4346 insertions(+), 1222 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e574d16..6308416 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Audit logging for database activity (CASSANDRA-12151)
  * Clean up build artifacts in docs container (CASSANDRA-14432)
  * Minor network authz improvements (Cassandra-14413)
  * Automatic sstable upgrades (CASSANDRA-14197)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 4885a12..da80684 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -79,6 +79,8 @@ New features
      in cassandra.yaml, and the docs for create and alter role statements. CASSANDRA-13985
    - Roles altered from login=true to login=false will prevent existing connections from executing any
      statements after the cache has been refreshed. CASSANDRA-13985
+   - Support for audit logging of database activity. If enabled, logs every incoming
+     CQL command request, Authentication (successful as well as unsuccessful login) to a node.
 
 Upgrading
 ---------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 7cc9e32..49c6f03 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1184,3 +1184,17 @@ back_pressure_strategy:
 # automatic_sstable_upgrade: false
 # Limit the number of concurrent sstable upgrades
 # max_concurrent_automatic_sstable_upgrades: 1
+
+# Audit logging - Logs every incoming CQL command request, authentication to a node. See the docs
+# on audit_logging for full details about the various configuration options.
+audit_logging_options:
+    enabled: false
+    logger: BinAuditLogger
+    # audit_logs_dir:
+    # included_keyspaces: 
+    # excluded_keyspaces:
+    # included_categories:
+    # excluded_categories:
+    # included_users:
+    # excluded_users:
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/doc/source/operating/audit_logging.rst
----------------------------------------------------------------------
diff --git a/doc/source/operating/audit_logging.rst b/doc/source/operating/audit_logging.rst
new file mode 100644
index 0000000..9be7a43
--- /dev/null
+++ b/doc/source/operating/audit_logging.rst
@@ -0,0 +1,203 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+.. or more contributor license agreements.  See the NOTICE file
+.. distributed with this work for additional information
+.. regarding copyright ownership.  The ASF licenses this file
+.. to you under the Apache License, Version 2.0 (the
+.. "License"); you may not use this file except in compliance
+.. with the License.  You may obtain a copy of the License at
+..
+..     http://www.apache.org/licenses/LICENSE-2.0
+..
+.. Unless required by applicable law or agreed to in writing, software
+.. distributed under the License is distributed on an "AS IS" BASIS,
+.. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+.. See the License for the specific language governing permissions and
+.. limitations under the License.
+
+.. highlight:: none
+
+
+
+Audit Logging
+------------------
+
+Audit logging in Cassandra logs every incoming CQL command request, Authentication (successful as well as unsuccessful login)
+to C* node. Currently, there are two implementations provided, the custom logger can be implemented and injected with the
+class name as a parameter in cassandra.yaml.
+
+- ``BinAuditLogger`` An efficient way to log events to file in a binary format.
+- ``FileAuditLogger`` Logs events to  ``audit/audit.log`` file using slf4j logger.
+
+*Recommendation* ``BinAuditLogger`` is a community recommended logger considering the performance
+
+What does it capture
+^^^^^^^^^^^^^^^^^^^^^^^
+
+Audit logging captures following events
+
+- Successful as well as unsuccessful login attempts.
+
+- All database commands executed via Native protocol (CQL) attempted or successfully executed.
+
+What does it log
+^^^^^^^^^^^^^^^^^^^
+Each audit log implementation has access to the following attributes, and for the default text based logger these fields are concatenated with `|` s to yield the final message.
+
+ - ``user``: User name(if available)
+ - ``host``: Host IP, where the command is being executed
+ - ``source ip address``: Source IP address from where the request initiated
+ - ``source port``: Source port number from where the request initiated
+ - ``timestamp``: unix time stamp
+ - ``type``: Type of the request (SELECT, INSERT, etc.,)
+ - ``category`` - Category of the request (DDL, DML, etc.,)
+ - ``keyspace`` - Keyspace(If applicable) on which request is targeted to be executed
+ - ``scope`` - Table/Aggregate name/ function name/ trigger name etc., as applicable
+ - ``operation`` - CQL command being executed
+
+How to configure
+^^^^^^^^^^^^^^^^^^
+Auditlog can be configured using cassandra.yaml. If you want to try Auditlog on one node, it can also be enabled and configured using ``nodetool``.
+
+cassandra.yaml configurations for AuditLog
+"""""""""""""""""""""""""""""""""""""""""""""
+	- ``enabled``: This option enables/ disables audit log
+	- ``logger``: Class name of the logger/ custom logger.
+	- ``audit_logs_dir``: Auditlogs directory location, if not set, default to `cassandra.logdir.audit` or `cassandra.logdir` + /audit/
+	- ``included_keyspaces``: Comma separated list of keyspaces to be included in audit log, default - includes all keyspaces
+	- ``excluded_keyspaces``: Comma separated list of keyspaces to be excluded from audit log, default - excludes no keyspace
+	- ``included_categories``: Comma separated list of Audit Log Categories to be included in audit log, default - includes all categories
+	- ``excluded_categories``: Comma separated list of Audit Log Categories to be excluded from audit log, default - excludes no category
+	- ``included_users``: Comma separated list of users to be included in audit log, default - includes all users
+	- ``excluded_users``: Comma separated list of users to be excluded from audit log, default - excludes no user
+
+	  
+List of available categories are: QUERY, DML, DDL, DCL, OTHER, AUTH, ERROR, PREPARE
+
+NodeTool command to enable AuditLog
+"""""""""""""""""""""""""""""""""""""
+``enableauditlog``: Enables AuditLog with yaml defaults. yaml configurations can be overridden using options via nodetool command.
+
+::
+
+    nodetool enableauditlog
+
+Options
+**********
+
+
+``--excluded-categories``
+    Comma separated list of Audit Log Categories to be excluded for
+    audit log. If not set the value from cassandra.yaml will be used
+
+``--excluded-keyspaces``
+    Comma separated list of keyspaces to be excluded for audit log. If
+    not set the value from cassandra.yaml will be used
+
+``--excluded-users``
+    Comma separated list of users to be excluded for audit log. If not
+    set the value from cassandra.yaml will be used
+
+``--included-categories``
+    Comma separated list of Audit Log Categories to be included for
+    audit log. If not set the value from cassandra.yaml will be used
+
+``--included-keyspaces``
+    Comma separated list of keyspaces to be included for audit log. If
+    not set the value from cassandra.yaml will be used
+
+``--included-users``
+    Comma separated list of users to be included for audit log. If not
+    set the value from cassandra.yaml will be used
+
+``--logger``
+    Logger name to be used for AuditLogging. Default BinAuditLogger. If
+    not set the value from cassandra.yaml will be used
+
+
+NodeTool command to disable AuditLog
+"""""""""""""""""""""""""""""""""""""""
+
+``disableauditlog``: Disables AuditLog.
+
+::
+
+    nodetool disableuditlog
+
+
+
+
+
+
+
+NodeTool command to reload AuditLog filters
+"""""""""""""""""""""""""""""""""""""""""""""
+
+``enableauditlog``: NodeTool enableauditlog command can be used to reload auditlog filters when called with default or previous ``loggername`` and updated filters
+
+E.g.,
+::
+
+    nodetool enableauditlog --loggername <Default/ existing loggerName> --included-keyspaces <New Filter values>
+
+
+
+
+
+
+
+
+Sample output
+^^^^^^^^^^^^^^^^
+::
+
+    LogMessage: user:anonymous|host:localhost/X.X.X.X|source:/X.X.X.X|port:60878|timestamp:1521158923615|type:USE_KS|category:DDL|ks:dev1|operation:USE "dev1"
+
+
+
+Configuring BinAuditLogger
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+To use ``BinAuditLogger`` as a logger in AuditLogging, set the logger to ``BinAuditLogger`` in cassandra.yaml under ``audit_logging_options`` section. ``BinAuditLogger`` can be futher configued using its advanced options in cassandra.yaml.
+
+
+Adcanced Options for BinAuditLogger
+""""""""""""""""""""""""""""""""""""""
+
+``block``
+	Indicates if the AuditLog should block if the it falls behind or should drop audit log records. Default is set to ``true`` so that AuditLog records wont be lost
+
+``max_queue_weight``
+	Maximum weight of in memory queue for records waiting to be written to the audit log file before blocking or dropping the log records. Default is set to ``256 * 1024 * 1024``
+
+``max_log_size``
+	Maximum size of the rolled files to retain on disk before deleting the oldest file. Default is set to ``16L * 1024L * 1024L * 1024L``
+
+``roll_cycle``
+	How often to roll Audit log segments so they can potentially be reclaimed. Available options are: MINUTELY, HOURLY, DAILY, LARGE_DAILY, XLARGE_DAILY, HUGE_DAILY.For more options, refer: net.openhft.chronicle.queue.RollCycles. Default is set to ``"HOURLY"``
+
+Configuring FileAuditLogger
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+To use ``FileAuditLogger`` as a logger in AuditLogging, apart from setting the class name in cassandra.yaml, following configuration is needed to have the audit log events to flow through separate log file instead of system.log
+
+
+.. code-block:: xml
+
+    	<!-- Audit Logging (FileAuditLogger) rolling file appender to audit.log -->
+    	<appender name="AUDIT" class="ch.qos.logback.core.rolling.RollingFileAppender">
+    	  <file>${cassandra.logdir}/audit/audit.log</file>
+    	  <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
+    	    <!-- rollover daily -->
+    	    <fileNamePattern>${cassandra.logdir}/audit/audit.log.%d{yyyy-MM-dd}.%i.zip</fileNamePattern>
+    	    <!-- each file should be at most 50MB, keep 30 days worth of history, but at most 5GB -->
+    	    <maxFileSize>50MB</maxFileSize>
+    	    <maxHistory>30</maxHistory>
+    	    <totalSizeCap>5GB</totalSizeCap>
+    	  </rollingPolicy>
+    	  <encoder>
+    	    <pattern>%-5level [%thread] %date{ISO8601} %F:%L - %msg%n</pattern>
+    	  </encoder>
+    	</appender>
+
+      	<!-- Audit Logging additivity to redirect audt logging events to audit/audit.log -->
+      	<logger name="org.apache.cassandra.audit" additivity="false" level="INFO">
+        	<appender-ref ref="AUDIT"/>
+      	</logger>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/audit/AuditLogEntry.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/audit/AuditLogEntry.java b/src/java/org/apache/cassandra/audit/AuditLogEntry.java
new file mode 100644
index 0000000..d53fc6e
--- /dev/null
+++ b/src/java/org/apache/cassandra/audit/AuditLogEntry.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.audit;
+
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.UUID;
+import javax.annotation.Nullable;
+
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.cassandra.auth.AuthenticatedUser;
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class AuditLogEntry
+{
+    private final InetAddressAndPort host = FBUtilities.getBroadcastAddressAndPort();
+    private final InetAddressAndPort source;
+    private final String user;
+    private final long timestamp;
+    private final AuditLogEntryType type;
+    private final UUID batch;
+    private final String keyspace;
+    private final String scope;
+    private final String operation;
+    private final QueryOptions options;
+
+    private AuditLogEntry(AuditLogEntryType type, InetAddressAndPort source, String user, long timestamp, UUID batch, String keyspace, String scope, String operation, QueryOptions options)
+    {
+        this.type = type;
+        this.source = source;
+        this.user = user;
+        this.timestamp = timestamp;
+        this.batch = batch;
+        this.keyspace = keyspace;
+        this.scope = scope;
+        this.operation = operation;
+        this.options = options;
+    }
+
+    String getLogString()
+    {
+        StringBuilder builder = new StringBuilder(100);
+        builder.append("user:").append(user)
+               .append("|host:").append(host)
+               .append("|source:").append(source.address);
+        if (source.port > 0)
+        {
+            builder.append("|port:").append(source.port);
+        }
+
+        builder.append("|timestamp:").append(timestamp)
+               .append("|type:").append(type)
+               .append("|category:").append(type.getCategory());
+
+        if (batch != null)
+        {
+            builder.append("|batch:").append(batch);
+        }
+        if (StringUtils.isNotBlank(keyspace))
+        {
+            builder.append("|ks:").append(keyspace);
+        }
+        if (StringUtils.isNotBlank(scope))
+        {
+            builder.append("|scope:").append(scope);
+        }
+        if (StringUtils.isNotBlank(operation))
+        {
+            builder.append("|operation:").append(operation);
+        }
+        return builder.toString();
+    }
+
+    public InetAddressAndPort getHost()
+    {
+        return host;
+    }
+
+    public InetAddressAndPort getSource()
+    {
+        return source;
+    }
+
+    public String getUser()
+    {
+        return user;
+    }
+
+    public long getTimestamp()
+    {
+        return timestamp;
+    }
+
+    public AuditLogEntryType getType()
+    {
+        return type;
+    }
+
+    public UUID getBatch()
+    {
+        return batch;
+    }
+
+    public String getKeyspace()
+    {
+        return keyspace;
+    }
+
+    public String getScope()
+    {
+        return scope;
+    }
+
+    public String getOperation()
+    {
+        return operation;
+    }
+
+    public QueryOptions getOptions()
+    {
+        return options;
+    }
+
+    public static class Builder
+    {
+        private static final InetAddressAndPort DEFAULT_SOURCE;
+
+        static
+        {
+            try
+            {
+                DEFAULT_SOURCE = InetAddressAndPort.getByNameOverrideDefaults("0.0.0.0", 0);
+            }
+            catch (UnknownHostException e)
+            {
+
+                throw new RuntimeException("failed to create default source address", e);
+            }
+        }
+
+        private static final String DEFAULT_OPERATION = StringUtils.EMPTY;
+
+        private AuditLogEntryType type;
+        private InetAddressAndPort source;
+        private String user;
+        private long timestamp;
+        private UUID batch;
+        private String keyspace;
+        private String scope;
+        private String operation;
+        private QueryOptions options;
+
+        public Builder(ClientState clientState)
+        {
+            if (clientState != null)
+            {
+                if (clientState.getRemoteAddress() != null)
+                {
+                    InetSocketAddress addr = clientState.getRemoteAddress();
+                    source = InetAddressAndPort.getByAddressOverrideDefaults(addr.getAddress(), addr.getPort());
+                }
+
+                if (clientState.getUser() != null)
+                {
+                    user = clientState.getUser().getName();
+                }
+                keyspace = clientState.getRawKeyspace();
+            }
+            else
+            {
+                source = DEFAULT_SOURCE;
+                user = AuthenticatedUser.SYSTEM_USER.getName();
+            }
+
+            timestamp = System.currentTimeMillis();
+        }
+
+        public Builder(AuditLogEntry entry)
+        {
+            type = entry.type;
+            source = entry.source;
+            user = entry.user;
+            timestamp = entry.timestamp;
+            batch = entry.batch;
+            keyspace = entry.keyspace;
+            scope = entry.scope;
+            operation = entry.operation;
+            options = entry.options;
+        }
+
+        public Builder setType(AuditLogEntryType type)
+        {
+            this.type = type;
+            return this;
+        }
+
+        public Builder(AuditLogEntryType type)
+        {
+            this.type = type;
+            operation = DEFAULT_OPERATION;
+        }
+
+        public Builder setUser(String user)
+        {
+            this.user = user;
+            return this;
+        }
+
+        public Builder setBatch(UUID batch)
+        {
+            this.batch = batch;
+            return this;
+        }
+
+        public Builder setTimestamp(long timestampMillis)
+        {
+            this.timestamp = timestampMillis;
+            return this;
+        }
+
+        public Builder setKeyspace(QueryState queryState, @Nullable CQLStatement statement)
+        {
+            keyspace = statement != null && statement.getAuditLogContext().keyspace != null
+                       ? statement.getAuditLogContext().keyspace
+                       : queryState.getClientState().getRawKeyspace();
+            return this;
+        }
+
+        public Builder setKeyspace(String keyspace)
+        {
+            this.keyspace = keyspace;
+            return this;
+        }
+
+        public Builder setKeyspace(CQLStatement statement)
+        {
+            this.keyspace = statement.getAuditLogContext().keyspace;
+            return this;
+        }
+
+        public Builder setScope(CQLStatement statement)
+        {
+            this.scope = statement.getAuditLogContext().scope;
+            return this;
+        }
+
+        public Builder setOperation(String operation)
+        {
+            this.operation = operation;
+            return this;
+        }
+
+        public void appendToOperation(String str)
+        {
+            if (StringUtils.isNotBlank(str))
+            {
+                if (operation.isEmpty())
+                    operation = str;
+                else
+                    operation = operation.concat("; ").concat(str);
+            }
+        }
+
+        public Builder setOptions(QueryOptions options)
+        {
+            this.options = options;
+            return this;
+        }
+
+        public AuditLogEntry build()
+        {
+            timestamp = timestamp > 0 ? timestamp : System.currentTimeMillis();
+            return new AuditLogEntry(type, source, user, timestamp, batch, keyspace, scope, operation, options);
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/audit/AuditLogEntryCategory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/audit/AuditLogEntryCategory.java b/src/java/org/apache/cassandra/audit/AuditLogEntryCategory.java
new file mode 100644
index 0000000..616658c
--- /dev/null
+++ b/src/java/org/apache/cassandra/audit/AuditLogEntryCategory.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.audit;
+
+/**
+ * Enum to categorize AuditLogEntries
+ */
+public enum AuditLogEntryCategory
+{
+    QUERY, DML, DDL, DCL, OTHER, AUTH, ERROR, PREPARE
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/audit/AuditLogEntryType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/audit/AuditLogEntryType.java b/src/java/org/apache/cassandra/audit/AuditLogEntryType.java
new file mode 100644
index 0000000..4eb112b
--- /dev/null
+++ b/src/java/org/apache/cassandra/audit/AuditLogEntryType.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.audit;
+
+public enum AuditLogEntryType
+{
+    /*
+     * CQL Audit Log Entry Types
+     */
+
+    SELECT(AuditLogEntryCategory.QUERY),
+    UPDATE(AuditLogEntryCategory.DML),
+    DELETE(AuditLogEntryCategory.DML),
+    TRUNCATE(AuditLogEntryCategory.DDL),
+    CREATE_KEYSPACE(AuditLogEntryCategory.DDL),
+    ALTER_KEYSPACE(AuditLogEntryCategory.DDL),
+    DROP_KEYSPACE(AuditLogEntryCategory.DDL),
+    CREATE_TABLE(AuditLogEntryCategory.DDL),
+    DROP_TABLE(AuditLogEntryCategory.DDL),
+    PREPARE_STATEMENT(AuditLogEntryCategory.PREPARE),
+    DROP_TRIGGER(AuditLogEntryCategory.DDL),
+    LIST_USERS(AuditLogEntryCategory.DCL),
+    CREATE_INDEX(AuditLogEntryCategory.DDL),
+    DROP_INDEX(AuditLogEntryCategory.DDL),
+    GRANT(AuditLogEntryCategory.DCL),
+    REVOKE(AuditLogEntryCategory.DCL),
+    CREATE_TYPE(AuditLogEntryCategory.DDL),
+    DROP_AGGREGATE(AuditLogEntryCategory.DDL),
+    ALTER_VIEW(AuditLogEntryCategory.DDL),
+    CREATE_VIEW(AuditLogEntryCategory.DDL),
+    DROP_ROLE(AuditLogEntryCategory.DCL),
+    CREATE_FUNCTION(AuditLogEntryCategory.DDL),
+    ALTER_TABLE(AuditLogEntryCategory.DDL),
+    BATCH(AuditLogEntryCategory.DML),
+    CREATE_AGGREGATE(AuditLogEntryCategory.DDL),
+    DROP_VIEW(AuditLogEntryCategory.DDL),
+    DROP_TYPE(AuditLogEntryCategory.DDL),
+    DROP_FUNCTION(AuditLogEntryCategory.DDL),
+    ALTER_ROLE(AuditLogEntryCategory.DCL),
+    CREATE_TRIGGER(AuditLogEntryCategory.DDL),
+    LIST_ROLES(AuditLogEntryCategory.DCL),
+    LIST_PERMISSIONS(AuditLogEntryCategory.DCL),
+    ALTER_TYPE(AuditLogEntryCategory.DDL),
+    CREATE_ROLE(AuditLogEntryCategory.DCL),
+    USE_KEYSPACE(AuditLogEntryCategory.OTHER),
+
+    /*
+     * Common Audit Log Entry Types
+     */
+
+    REQUEST_FAILURE(AuditLogEntryCategory.ERROR),
+    LOGIN_ERROR(AuditLogEntryCategory.AUTH),
+    UNAUTHORIZED_ATTEMPT(AuditLogEntryCategory.AUTH),
+    LOGIN_SUCCESS(AuditLogEntryCategory.AUTH);
+
+    private final AuditLogEntryCategory category;
+
+    AuditLogEntryType(AuditLogEntryCategory category)
+    {
+        this.category = category;
+    }
+
+    public AuditLogEntryCategory getCategory()
+    {
+        return category;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/audit/AuditLogFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/audit/AuditLogFilter.java b/src/java/org/apache/cassandra/audit/AuditLogFilter.java
new file mode 100644
index 0000000..163114d
--- /dev/null
+++ b/src/java/org/apache/cassandra/audit/AuditLogFilter.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.audit;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AuditLogFilter
+{
+    private static final Logger logger = LoggerFactory.getLogger(AuditLogFilter.class);
+
+    private static ImmutableSet<String> EMPTY_FILTERS = ImmutableSet.of();
+
+    private final ImmutableSet<String> excludedKeyspaces;
+    private final ImmutableSet<String> includedKeyspaces;
+    private final ImmutableSet<String> excludedCategories;
+    private final ImmutableSet<String> includedCategories;
+    private final ImmutableSet<String> includedUsers;
+    private final ImmutableSet<String> excludedUsers;
+
+    private AuditLogFilter(ImmutableSet<String> excludedKeyspaces, ImmutableSet<String> includedKeyspaces, ImmutableSet<String> excludedCategories, ImmutableSet<String> includedCategories, ImmutableSet<String> excludedUsers, ImmutableSet<String> includedUsers)
+    {
+        this.excludedKeyspaces = excludedKeyspaces;
+        this.includedKeyspaces = includedKeyspaces;
+        this.excludedCategories = excludedCategories;
+        this.includedCategories = includedCategories;
+        this.includedUsers = includedUsers;
+        this.excludedUsers = excludedUsers;
+    }
+
+    /**
+     * (Re-)Loads filters from config. Called during startup as well as JMX invocations.
+     */
+    public static AuditLogFilter create(AuditLogOptions auditLogOptions)
+    {
+        logger.trace("Loading AuditLog filters");
+
+        IncludeExcludeHolder keyspaces = loadInputSets(auditLogOptions.included_keyspaces, auditLogOptions.excluded_keyspaces);
+        IncludeExcludeHolder categories = loadInputSets(auditLogOptions.included_categories, auditLogOptions.excluded_categories);
+        IncludeExcludeHolder users = loadInputSets(auditLogOptions.included_users, auditLogOptions.excluded_users);
+
+        return new AuditLogFilter(keyspaces.excludedSet, keyspaces.includedSet,
+                                  categories.excludedSet, categories.includedSet,
+                                  users.excludedSet, users.includedSet);
+    }
+
+    /**
+     * Constructs mutually exclusive sets of included and excluded data. When there is a conflict,
+     * the entry is put into the excluded set (and removed fron the included).
+     */
+    private static IncludeExcludeHolder loadInputSets(String includedInput, String excludedInput)
+    {
+        final ImmutableSet<String> excludedSet;
+        if (StringUtils.isEmpty(excludedInput))
+        {
+            excludedSet = EMPTY_FILTERS;
+        }
+        else
+        {
+            String[] excludes = excludedInput.split(",");
+            ImmutableSet.Builder<String> builder = ImmutableSet.builderWithExpectedSize(excludes.length);
+            for (String exclude : excludes)
+            {
+                if (!exclude.isEmpty())
+                {
+                    builder.add(exclude);
+                }
+            }
+            excludedSet = builder.build();
+        }
+
+        final ImmutableSet<String> includedSet;
+        if (StringUtils.isEmpty(includedInput))
+        {
+            includedSet = EMPTY_FILTERS;
+        }
+        else
+        {
+            String[] includes = includedInput.split(",");
+            ImmutableSet.Builder<String> builder = ImmutableSet.builderWithExpectedSize(includes.length);
+            for (String include : includes)
+            {
+                //Ensure both included and excluded sets are mutually exclusive
+                if (!include.isEmpty() && !excludedSet.contains(include))
+                {
+                    builder.add(include);
+                }
+            }
+            includedSet = builder.build();
+        }
+
+        return new IncludeExcludeHolder(includedSet, excludedSet);
+    }
+
+    /**
+     * Simple struct to hold inclusion/exclusion sets.
+     */
+    private static class IncludeExcludeHolder
+    {
+        private final ImmutableSet<String> includedSet;
+        private final ImmutableSet<String> excludedSet;
+
+        private IncludeExcludeHolder(ImmutableSet<String> includedSet, ImmutableSet<String> excludedSet)
+        {
+            this.includedSet = includedSet;
+            this.excludedSet = excludedSet;
+        }
+    }
+
+    /**
+     * Checks whether a give AuditLog Entry is filtered or not
+     *
+     * @param auditLogEntry AuditLogEntry to verify
+     * @return true if it is filtered, false otherwise
+     */
+    boolean isFiltered(AuditLogEntry auditLogEntry)
+    {
+        return isFiltered(auditLogEntry.getKeyspace(), includedKeyspaces, excludedKeyspaces)
+               || isFiltered(auditLogEntry.getType().getCategory().toString(), includedCategories, excludedCategories)
+               || isFiltered(auditLogEntry.getUser(), includedUsers, excludedUsers);
+    }
+
+    /**
+     * Checks whether given input is being filtered or not.
+     * If excludeSet does not contain any items, by default nothing is excluded (unless there are
+     * entries in the includeSet).
+     * If includeSet does not contain any items, by default everything is included
+     * If an input is part of both includeSet and excludeSet, excludeSet takes the priority over includeSet
+     *
+     * @param input      Input to be checked for filtereing based on includeSet and excludeSet
+     * @param includeSet Include filtering set
+     * @param excludeSet Exclude filtering set
+     * @return true if the input is filtered, false when the input is not filtered
+     */
+    static boolean isFiltered(String input, Set<String> includeSet, Set<String> excludeSet)
+    {
+        if (!excludeSet.isEmpty() && excludeSet.contains(input))
+            return true;
+
+        return !(includeSet.isEmpty() || includeSet.contains(input));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/audit/AuditLogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/audit/AuditLogManager.java b/src/java/org/apache/cassandra/audit/AuditLogManager.java
new file mode 100644
index 0000000..090499c
--- /dev/null
+++ b/src/java/org/apache/cassandra/audit/AuditLogManager.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.audit;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.statements.ParsedStatement;
+import org.apache.cassandra.exceptions.AuthenticationException;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.UnauthorizedException;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Central location for managing the logging of client/user-initated actions (like queries, log in commands, and so on).
+ *
+ * We can run multiple {@link IAuditLogger}s at the same time, including the standard audit logger ({@link #auditLogger}
+ * and the full query logger ({@link #fullQueryLogger}.
+ */
+public class AuditLogManager
+{
+    private static final Logger logger = LoggerFactory.getLogger(AuditLogManager.class);
+    private static final AuditLogManager instance = new AuditLogManager();
+
+    // FQL always writes to a BinLog, but it is a type of IAuditLogger
+    private final FullQueryLogger fullQueryLogger;
+    private final ImmutableSet<AuditLogEntryCategory> fqlIncludeFilter = ImmutableSet.of(AuditLogEntryCategory.OTHER,
+                                                                                         AuditLogEntryCategory.QUERY,
+                                                                                         AuditLogEntryCategory.DCL,
+                                                                                         AuditLogEntryCategory.DML,
+                                                                                         AuditLogEntryCategory.DDL);
+
+    // auditLogger can write anywhere, as it's pluggable (logback, BinLog, DiagnosticEvents, etc ...)
+    private volatile IAuditLogger auditLogger;
+
+    private volatile AuditLogFilter filter;
+    private volatile boolean isAuditLogEnabled;
+
+    private AuditLogManager()
+    {
+        fullQueryLogger = new FullQueryLogger();
+
+        if (DatabaseDescriptor.getAuditLoggingOptions().enabled)
+        {
+            logger.info("Audit logging is enabled.");
+            auditLogger = getAuditLogger(DatabaseDescriptor.getAuditLoggingOptions().logger);
+            isAuditLogEnabled = true;
+        }
+        else
+        {
+            logger.debug("Audit logging is disabled.");
+            isAuditLogEnabled = false;
+            auditLogger = new NoOpAuditLogger();
+        }
+
+        filter = AuditLogFilter.create(DatabaseDescriptor.getAuditLoggingOptions());
+    }
+
+    public static AuditLogManager getInstance()
+    {
+        return instance;
+    }
+
+    private IAuditLogger getAuditLogger(String loggerClassName) throws ConfigurationException
+    {
+        if (loggerClassName != null)
+        {
+            return FBUtilities.newAuditLogger(loggerClassName);
+        }
+
+        return FBUtilities.newAuditLogger(BinAuditLogger.class.getName());
+    }
+
+    @VisibleForTesting
+    public IAuditLogger getLogger()
+    {
+        return auditLogger;
+    }
+
+    public boolean isAuditingEnabled()
+    {
+        return isAuditLogEnabled;
+    }
+
+    public boolean isLoggingEnabled()
+    {
+        return isAuditingEnabled() || isFQLEnabled();
+    }
+
+    private boolean isFQLEnabled()
+    {
+        return fullQueryLogger.enabled();
+    }
+
+    private boolean isSystemKeyspace(String keyspaceName)
+    {
+        return SchemaConstants.isLocalSystemKeyspace(keyspaceName);
+    }
+
+    /**
+     * Logs AuditLogEntry to standard audit logger
+     * @param logEntry AuditLogEntry to be logged
+     */
+    private void logAuditLoggerEntry(AuditLogEntry logEntry)
+    {
+        if ((logEntry.getKeyspace() == null || !isSystemKeyspace(logEntry.getKeyspace()))
+            && !filter.isFiltered(logEntry))
+        {
+            auditLogger.log(logEntry);
+        }
+    }
+
+    /**
+     * Logs AudigLogEntry to both FQL and standard audit logger
+     * @param logEntry AuditLogEntry to be logged
+     */
+    public void log(AuditLogEntry logEntry)
+    {
+        if (logEntry == null)
+            return;
+
+        if (isAuditingEnabled())
+        {
+            logAuditLoggerEntry(logEntry);
+        }
+
+        if (isFQLEnabled() && fqlIncludeFilter.contains(logEntry.getType().getCategory()))
+        {
+            fullQueryLogger.log(logEntry);
+        }
+    }
+
+    public void log(AuditLogEntry logEntry, Exception e)
+    {
+        if ((logEntry != null) && (isAuditingEnabled()))
+        {
+            AuditLogEntry.Builder builder = new AuditLogEntry.Builder(logEntry);
+
+            if (e instanceof UnauthorizedException)
+            {
+                builder.setType(AuditLogEntryType.UNAUTHORIZED_ATTEMPT);
+            }
+            else if (e instanceof AuthenticationException)
+            {
+                builder.setType(AuditLogEntryType.LOGIN_ERROR);
+            }
+            else
+            {
+                builder.setType(AuditLogEntryType.REQUEST_FAILURE);
+            }
+
+            builder.appendToOperation(e.getMessage());
+
+            log(builder.build());
+        }
+    }
+
+    /**
+     * Logs Batch queries to both FQL and standard audit logger.
+     */
+    public void logBatch(String batchTypeName, List<Object> queryOrIdList, List<List<ByteBuffer>> values, List<ParsedStatement.Prepared> prepared, QueryOptions options, QueryState state, long queryStartTimeMillis)
+    {
+        if (isAuditingEnabled())
+        {
+            List<AuditLogEntry> entries = buildEntriesForBatch(queryOrIdList, prepared, state, options, queryStartTimeMillis);
+            for (AuditLogEntry auditLogEntry : entries)
+            {
+                logAuditLoggerEntry(auditLogEntry);
+            }
+        }
+
+        if (isFQLEnabled())
+        {
+            List<String> queryStrings = new ArrayList<>(queryOrIdList.size());
+            for (ParsedStatement.Prepared prepStatment : prepared)
+            {
+                queryStrings.add(prepStatment.rawCQLStatement);
+            }
+            fullQueryLogger.logBatch(batchTypeName, queryStrings, values, options, queryStartTimeMillis);
+        }
+    }
+
+    private static List<AuditLogEntry> buildEntriesForBatch(List<Object> queryOrIdList, List<ParsedStatement.Prepared> prepared, QueryState state, QueryOptions options, long queryStartTimeMillis)
+    {
+        List<AuditLogEntry> auditLogEntries = new ArrayList<>(queryOrIdList.size() + 1);
+        UUID batchId = UUID.randomUUID();
+        String queryString = String.format("BatchId:[%s] - BATCH of [%d] statements", batchId, queryOrIdList.size());
+        AuditLogEntry entry = new AuditLogEntry.Builder(state.getClientState())
+                              .setOperation(queryString)
+                              .setOptions(options)
+                              .setTimestamp(queryStartTimeMillis)
+                              .setBatch(batchId)
+                              .setType(AuditLogEntryType.BATCH)
+                              .build();
+        auditLogEntries.add(entry);
+
+        for (int i = 0; i < queryOrIdList.size(); i++)
+        {
+            CQLStatement statement = prepared.get(i).statement;
+            entry = new AuditLogEntry.Builder(state.getClientState())
+                    .setType(statement.getAuditLogContext().auditLogEntryType)
+                    .setOperation(prepared.get(i).rawCQLStatement)
+                    .setTimestamp(queryStartTimeMillis)
+                    .setScope(statement)
+                    .setKeyspace(state, statement)
+                    .setOptions(options)
+                    .setBatch(batchId)
+                    .build();
+            auditLogEntries.add(entry);
+        }
+
+        return auditLogEntries;
+    }
+
+    /**
+     * Disables AuditLog, designed to be invoked only via JMX/ Nodetool, not from anywhere else in the codepath.
+     */
+    public synchronized void disableAuditLog()
+    {
+        if (isAuditLogEnabled)
+        {
+            // Disable isAuditLogEnabled before attempting to cleanup/ stop AuditLogger so that any incoming log() requests will be dropped.
+            isAuditLogEnabled = false;
+            IAuditLogger oldLogger = auditLogger;
+            auditLogger = new NoOpAuditLogger();
+            oldLogger.stop();
+        }
+    }
+
+    /**
+     * Enables AuditLog, designed to be invoked only via JMX/ Nodetool, not from anywhere else in the codepath.
+     * @param auditLogOptions AuditLogOptions to be used for enabling AuditLog
+     * @throws ConfigurationException It can throw configuration exception when provided logger class does not exist in the classpath
+     */
+    public synchronized void enableAuditLog(AuditLogOptions auditLogOptions) throws ConfigurationException
+    {
+        if (isFQLEnabled() && fullQueryLogger.path().toString().equals(auditLogOptions.audit_logs_dir))
+            throw new IllegalArgumentException(String.format("audit log path (%s) cannot be the same as the " +
+                                                             "running full query logger (%s)",
+                                                             auditLogOptions.audit_logs_dir,
+                                                             fullQueryLogger.path()));
+
+        // always reload the filters
+        filter = AuditLogFilter.create(auditLogOptions);
+
+        // next, check to see if we're changing the logging implementation; if not, keep the same instance and bail.
+        // note: auditLogger should never be null
+        IAuditLogger oldLogger = auditLogger;
+        if (oldLogger.getClass().getSimpleName().equals(auditLogOptions.logger))
+            return;
+
+        auditLogger = getAuditLogger(auditLogOptions.logger);
+        isAuditLogEnabled = true;
+
+        // ensure oldLogger's stop() is called after we swap it with new logger,
+        // otherwise, we might be calling log() on the stopped logger.
+        oldLogger.stop();
+    }
+
+    public void configureFQL(Path path, String rollCycle, boolean blocking, int maxQueueWeight, long maxLogSize)
+    {
+        if (path.equals(auditLogger.path()))
+            throw new IllegalArgumentException(String.format("fullquerylogger path (%s) cannot be the same as the " +
+                                                             "running audit logger (%s)",
+                                                             path,
+                                                             auditLogger.path()));
+
+        fullQueryLogger.configure(path, rollCycle, blocking, maxQueueWeight, maxLogSize);
+    }
+
+    public void resetFQL(String fullQueryLogPath)
+    {
+        fullQueryLogger.reset(fullQueryLogPath);
+    }
+
+    public void disableFQL()
+    {
+        fullQueryLogger.stop();
+    }
+
+    /**
+     * ONLY FOR TESTING
+     */
+    FullQueryLogger getFullQueryLogger()
+    {
+        return fullQueryLogger;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/audit/AuditLogOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/audit/AuditLogOptions.java b/src/java/org/apache/cassandra/audit/AuditLogOptions.java
new file mode 100644
index 0000000..1888c45
--- /dev/null
+++ b/src/java/org/apache/cassandra/audit/AuditLogOptions.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.audit;
+
+import org.apache.commons.lang3.StringUtils;
+
+public class AuditLogOptions
+{
+    public volatile boolean enabled = false;
+    public String logger = BinAuditLogger.class.getSimpleName();
+    public String included_keyspaces = StringUtils.EMPTY;
+    public String excluded_keyspaces = StringUtils.EMPTY;
+    public String included_categories = StringUtils.EMPTY;
+    public String excluded_categories = StringUtils.EMPTY;
+    public String included_users = StringUtils.EMPTY;
+    public String excluded_users = StringUtils.EMPTY;
+
+    /**
+     * AuditLogs directory can be configured using `cassandra.logdir.audit` or default is set to `cassandra.logdir` + /audit/
+     */
+    public String audit_logs_dir = System.getProperty("cassandra.logdir.audit",
+                                                      System.getProperty("cassandra.logdir",".")+"/audit/");
+    /**
+     * Indicates if the AuditLog should block if the it falls behind or should drop audit log records.
+     * Default is set to true so that AuditLog records wont be lost
+     */
+    public boolean block = true;
+
+    /**
+     * Maximum weight of in memory queue for records waiting to be written to the audit log file
+     * before blocking or dropping the log records. For advanced configurations
+     */
+    public int max_queue_weight = 256 * 1024 * 1024;
+
+    /**
+     * Maximum size of the rolled files to retain on disk before deleting the oldest file. For advanced configurations
+     */
+    public long max_log_size = 16L * 1024L * 1024L * 1024L;
+
+    /**
+     * How often to roll Audit log segments so they can potentially be reclaimed. Available options are:
+     * MINUTELY, HOURLY, DAILY, LARGE_DAILY, XLARGE_DAILY, HUGE_DAILY.
+     * For more options, refer: net.openhft.chronicle.queue.RollCycles
+     */
+    public String roll_cycle = "HOURLY";
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/audit/BinAuditLogger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/audit/BinAuditLogger.java b/src/java/org/apache/cassandra/audit/BinAuditLogger.java
new file mode 100644
index 0000000..89b764c
--- /dev/null
+++ b/src/java/org/apache/cassandra/audit/BinAuditLogger.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.audit;
+
+import java.nio.file.Paths;
+
+import com.google.common.primitives.Ints;
+
+import net.openhft.chronicle.wire.WireOut;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.binlog.BinLog;
+import org.apache.cassandra.utils.concurrent.WeightedQueue;
+
+public class BinAuditLogger extends BinLogAuditLogger implements IAuditLogger
+{
+    public BinAuditLogger()
+    {
+        // due to the way that IAuditLogger instance are created in AuditLogManager, via reflection, we can't assume
+        // the manager will call configure() (it won't). thus, we have to call it here from the constructor.
+        AuditLogOptions auditLoggingOptions = DatabaseDescriptor.getAuditLoggingOptions();
+        configure(Paths.get(auditLoggingOptions.audit_logs_dir),
+                  auditLoggingOptions.roll_cycle,
+                  auditLoggingOptions.block,
+                  auditLoggingOptions.max_queue_weight,
+                  auditLoggingOptions.max_log_size,
+                  false);
+    }
+
+    @Override
+    public void log(AuditLogEntry auditLogEntry)
+    {
+        BinLog binLog = this.binLog;
+        if (binLog == null || auditLogEntry == null)
+        {
+            return;
+        }
+
+        super.logRecord(new WeighableMarshallableMessage(auditLogEntry.getLogString()), binLog);
+    }
+
+    static class WeighableMarshallableMessage extends BinLog.ReleaseableWriteMarshallable implements WeightedQueue.Weighable
+    {
+        private final String message;
+
+        WeighableMarshallableMessage(String message)
+        {
+            this.message = message;
+        }
+
+        @Override
+        public void writeMarshallable(WireOut wire)
+        {
+            wire.write("type").text("AuditLog");
+            wire.write("message").text(message);
+        }
+
+        @Override
+        public void release()
+        {
+
+        }
+
+        @Override
+        public int weight()
+        {
+            return Ints.checkedCast(ObjectSizes.sizeOf(message));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/audit/BinLogAuditLogger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/audit/BinLogAuditLogger.java b/src/java/org/apache/cassandra/audit/BinLogAuditLogger.java
new file mode 100644
index 0000000..a2426b9
--- /dev/null
+++ b/src/java/org/apache/cassandra/audit/BinLogAuditLogger.java
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.audit;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import com.google.common.primitives.Ints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.buffer.ByteBuf;
+import net.openhft.chronicle.bytes.BytesStore;
+import net.openhft.chronicle.queue.RollCycles;
+import net.openhft.chronicle.wire.WireOut;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.io.FSError;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.transport.CBUtil;
+import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.utils.NoSpamLogger;
+import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.Throwables;
+import org.apache.cassandra.utils.binlog.BinLog;
+import org.apache.cassandra.utils.concurrent.WeightedQueue;
+import org.github.jamm.MemoryLayoutSpecification;
+
+abstract class BinLogAuditLogger implements IAuditLogger
+{
+    static final int EMPTY_BYTEBUFFER_SIZE = Ints.checkedCast(ObjectSizes.sizeOnHeapExcludingData(ByteBuffer.allocate(0)));
+    static final int EMPTY_LIST_SIZE = Ints.checkedCast(ObjectSizes.measureDeep(new ArrayList(0)));
+    private static final int EMPTY_BYTEBUF_SIZE;
+    private static final int OBJECT_HEADER_SIZE = MemoryLayoutSpecification.SPEC.getObjectHeaderSize();
+    static
+    {
+        int tempSize = 0;
+        ByteBuf buf = CBUtil.allocator.buffer(0, 0);
+        try
+        {
+            tempSize = Ints.checkedCast(ObjectSizes.measure(buf));
+        }
+        finally
+        {
+            buf.release();
+        }
+        EMPTY_BYTEBUF_SIZE = tempSize;
+    }
+
+    protected static final Logger logger = LoggerFactory.getLogger(BinLogAuditLogger.class);
+    private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES);
+    private static final NoSpamLogger.NoSpamLogStatement droppedSamplesStatement = noSpamLogger.getStatement("Dropped {} binary log samples", 1, TimeUnit.MINUTES);
+
+    volatile BinLog binLog;
+    protected volatile boolean blocking;
+    protected Path path;
+
+    private final AtomicLong droppedSamplesSinceLastLog = new AtomicLong();
+
+    /**
+     * Configure the global instance of the FullQueryLogger. Clean the provided directory before starting
+     * @param path Dedicated path where the FQL can store it's files.
+     * @param rollCycle How often to roll FQL log segments so they can potentially be reclaimed
+     * @param blocking Whether the FQL should block if the FQL falls behind or should drop log records
+     * @param maxQueueWeight Maximum weight of in memory queue for records waiting to be written to the file before blocking or dropping
+     * @param maxLogSize Maximum size of the rolled files to retain on disk before deleting the oldest file
+     */
+    public synchronized void configure(Path path, String rollCycle, boolean blocking, int maxQueueWeight, long maxLogSize)
+    {
+        this.configure(path, rollCycle, blocking, maxQueueWeight, maxLogSize, true);
+    }
+
+    /**
+     * Configure the global instance of the FullQueryLogger
+     * @param path Dedicated path where the FQL can store it's files.
+     * @param rollCycle How often to roll FQL log segments so they can potentially be reclaimed
+     * @param blocking Whether the FQL should block if the FQL falls behind or should drop log records
+     * @param maxQueueWeight Maximum weight of in memory queue for records waiting to be written to the file before blocking or dropping
+     * @param maxLogSize Maximum size of the rolled files to retain on disk before deleting the oldest file
+     * @param cleanDirectory Indicates to clean the directory before starting FullQueryLogger or not
+     */
+    public synchronized void configure(Path path, String rollCycle, boolean blocking, int maxQueueWeight, long maxLogSize, boolean cleanDirectory)
+    {
+        Preconditions.checkNotNull(path, "path was null");
+        File pathAsFile = path.toFile();
+        Preconditions.checkNotNull(rollCycle, "rollCycle was null");
+        rollCycle = rollCycle.toUpperCase();
+
+        //Exists and is a directory or can be created
+        Preconditions.checkArgument((pathAsFile.exists() && pathAsFile.isDirectory()) || (!pathAsFile.exists() && pathAsFile.mkdirs()), "path exists and is not a directory or couldn't be created");
+        Preconditions.checkArgument(pathAsFile.canRead() && pathAsFile.canWrite() && pathAsFile.canExecute(), "path is not readable, writable, and executable");
+        Preconditions.checkNotNull(RollCycles.valueOf(rollCycle), "unrecognized roll cycle");
+        Preconditions.checkArgument(maxQueueWeight > 0, "maxQueueWeight must be > 0");
+        Preconditions.checkArgument(maxLogSize > 0, "maxLogSize must be > 0");
+        logger.info("Attempting to configure full query logger path: {} Roll cycle: {} Blocking: {} Max queue weight: {} Max log size:{}", path, rollCycle, blocking, maxQueueWeight, maxLogSize);
+
+        if (binLog != null)
+        {
+            logger.warn("Full query logger already configured. Ignoring requested configuration.");
+            throw new IllegalStateException("Already configured");
+        }
+
+        if (cleanDirectory)
+        {
+            logger.info("Cleaning directory: {} as requested",path);
+            if (path.toFile().exists())
+            {
+                Throwable error = cleanDirectory(path.toFile(), null);
+                if (error != null)
+                {
+                    throw new RuntimeException(error);
+                }
+            }
+        }
+
+        this.path = path;
+        this.blocking = blocking;
+        binLog = new BinLog(path, RollCycles.valueOf(rollCycle), maxQueueWeight, maxLogSize);
+        binLog.start();
+    }
+
+    public Path path()
+    {
+        return path;
+    }
+
+    /**
+     * Need the path as a parameter as well because if the process is restarted the config file might be the only
+     * location for retrieving the path to the full query log files, but JMX also allows you to specify a path
+     * that isn't persisted anywhere so we have to clean that one a well.
+     */
+    public synchronized void reset(String fullQueryLogPath)
+    {
+        try
+        {
+            Set<File> pathsToClean = Sets.newHashSet();
+
+            //First decide whether to clean the path configured in the YAML
+            if (fullQueryLogPath != null)
+            {
+                File fullQueryLogPathFile = new File(fullQueryLogPath);
+                if (fullQueryLogPathFile.exists())
+                {
+                    pathsToClean.add(fullQueryLogPathFile);
+                }
+            }
+
+            //Then decide whether to clean the last used path, possibly configured by JMX
+            if (path != null)
+            {
+                File pathFile = path.toFile();
+                if (pathFile.exists())
+                {
+                    pathsToClean.add(pathFile);
+                }
+            }
+
+            logger.info("Reset (and deactivation) of full query log requested.");
+            if (binLog != null)
+            {
+                logger.info("Stopping full query log. Cleaning {}.", pathsToClean);
+                binLog.stop();
+                binLog = null;
+            }
+            else
+            {
+                logger.info("Full query log already deactivated. Cleaning {}.", pathsToClean);
+            }
+
+            Throwable accumulate = null;
+            for (File f : pathsToClean)
+            {
+                accumulate = cleanDirectory(f, accumulate);
+            }
+            if (accumulate != null)
+            {
+                throw new RuntimeException(accumulate);
+            }
+        }
+        catch (Exception e)
+        {
+            if (e instanceof RuntimeException)
+            {
+                throw (RuntimeException)e;
+            }
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Stop the full query log leaving behind any generated files.
+     */
+    public synchronized void stop()
+    {
+        try
+        {
+            logger.info("Deactivation of full query log requested.");
+            if (binLog != null)
+            {
+                logger.info("Stopping full query log");
+                binLog.stop();
+                binLog = null;
+            }
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Check whether the full query log is enabled.
+     * @return true if records are recorded and false otherwise.
+     */
+    public boolean enabled()
+    {
+        return binLog != null;
+    }
+
+    void logRecord(BinLog.ReleaseableWriteMarshallable record, BinLog binLog)
+    {
+        boolean putInQueue = false;
+        try
+        {
+            if (blocking)
+            {
+                try
+                {
+                    binLog.put(record);
+                    putInQueue = true;
+                }
+                catch (InterruptedException e)
+                {
+                    throw new RuntimeException(e);
+                }
+            }
+            else
+            {
+                if (!binLog.offer(record))
+                {
+                    logDroppedSample();
+                }
+                else
+                {
+                    putInQueue = true;
+                }
+            }
+        }
+        finally
+        {
+            if (!putInQueue)
+            {
+                record.release();
+            }
+        }
+    }
+
+    /**
+     * This is potentially lossy, but it's not super critical as we will always generally know
+     * when this is happening and roughly how bad it is.
+     */
+    private void logDroppedSample()
+    {
+        droppedSamplesSinceLastLog.incrementAndGet();
+        if (droppedSamplesStatement.warn(new Object[] {droppedSamplesSinceLastLog.get()}))
+        {
+            droppedSamplesSinceLastLog.set(0);
+        }
+    }
+
+    protected static abstract class AbstractWeighableMarshallable extends BinLog.ReleaseableWriteMarshallable implements WeightedQueue.Weighable
+    {
+        private final ByteBuf queryOptionsBuffer;
+        private final long timeMillis;
+        private final int protocolVersion;
+
+        AbstractWeighableMarshallable(QueryOptions queryOptions, long timeMillis)
+        {
+            this.timeMillis = timeMillis;
+            ProtocolVersion version = queryOptions.getProtocolVersion();
+            this.protocolVersion = version.asInt();
+            int optionsSize = QueryOptions.codec.encodedSize(queryOptions, version);
+            queryOptionsBuffer = CBUtil.allocator.buffer(optionsSize, optionsSize);
+            /*
+             * Struggled with what tradeoff to make in terms of query options which is potentially large and complicated
+             * There is tension between low garbage production (or allocator overhead), small working set size, and CPU overhead reserializing the
+             * query options into binary format.
+             *
+             * I went with the lowest risk most predictable option which is allocator overhead and CPU overhead
+             * rather then keep the original query message around so I could just serialize that as a memcpy. It's more
+             * instructions when turned on, but it doesn't change memory footprint quite as much and it's more pay for what you use
+             * in terms of query volume. The CPU overhead is spread out across producers so we should at least get
+             * some scaling.
+             *
+             */
+            boolean success = false;
+            try
+            {
+                QueryOptions.codec.encode(queryOptions, queryOptionsBuffer, version);
+                success = true;
+            }
+            finally
+            {
+                if (!success)
+                {
+                    queryOptionsBuffer.release();
+                }
+            }
+        }
+
+        @Override
+        public void writeMarshallable(WireOut wire)
+        {
+            wire.write("protocol-version").int32(protocolVersion);
+            wire.write("query-options").bytes(BytesStore.wrap(queryOptionsBuffer.nioBuffer()));
+            wire.write("query-time").int64(timeMillis);
+        }
+
+        @Override
+        public void release()
+        {
+            queryOptionsBuffer.release();
+        }
+
+        //8-bytes for protocol version (assume alignment cost), 8-byte timestamp, 8-byte object header + other contents
+        @Override
+        public int weight()
+        {
+            return 8 + 8 + OBJECT_HEADER_SIZE + EMPTY_BYTEBUF_SIZE + queryOptionsBuffer.capacity();
+        }
+    }
+
+    private static Throwable cleanDirectory(File directory, Throwable accumulate)
+    {
+        if (!directory.exists())
+        {
+            return Throwables.merge(accumulate, new RuntimeException(String.format("%s does not exists", directory)));
+        }
+        if (!directory.isDirectory())
+        {
+            return Throwables.merge(accumulate, new RuntimeException(String.format("%s is not a directory", directory)));
+        }
+        for (File f : directory.listFiles())
+        {
+            accumulate = deleteRecursively(f, accumulate);
+        }
+        if (accumulate instanceof FSError)
+        {
+            FileUtils.handleFSError((FSError)accumulate);
+        }
+        return accumulate;
+    }
+
+    private static Throwable deleteRecursively(File fileOrDirectory, Throwable accumulate)
+    {
+        if (fileOrDirectory.isDirectory())
+        {
+            for (File f : fileOrDirectory.listFiles())
+            {
+                accumulate = FileUtils.deleteWithConfirm(f, true, accumulate);
+            }
+        }
+        return FileUtils.deleteWithConfirm(fileOrDirectory, true , accumulate);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/audit/FileAuditLogger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/audit/FileAuditLogger.java b/src/java/org/apache/cassandra/audit/FileAuditLogger.java
new file mode 100644
index 0000000..9490bdd
--- /dev/null
+++ b/src/java/org/apache/cassandra/audit/FileAuditLogger.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.audit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Synchronous, file-based audit logger; just uses the standard logging mechansim.
+ */
+public class FileAuditLogger implements IAuditLogger
+{
+    protected static final Logger logger = LoggerFactory.getLogger(FileAuditLogger.class);
+
+    private volatile boolean enabled;
+
+    public FileAuditLogger()
+    {
+        enabled = true;
+    }
+
+    @Override
+    public boolean enabled()
+    {
+        return enabled;
+    }
+
+    @Override
+    public void log(AuditLogEntry auditLogEntry)
+    {
+        // don't bother with the volatile read of enabled here. just go ahead and log, other components
+        // will check the enbaled field.
+        logger.info(auditLogEntry.getLogString());
+    }
+
+    @Override
+    public void stop()
+    {
+        enabled = false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/audit/FullQueryLogger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/audit/FullQueryLogger.java b/src/java/org/apache/cassandra/audit/FullQueryLogger.java
new file mode 100644
index 0000000..36d0127
--- /dev/null
+++ b/src/java/org/apache/cassandra/audit/FullQueryLogger.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.audit;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+
+import net.openhft.chronicle.bytes.BytesStore;
+import net.openhft.chronicle.wire.ValueOut;
+import net.openhft.chronicle.wire.WireOut;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.binlog.BinLog;
+
+/**
+ * A logger that logs entire query contents after the query finishes (or times out).
+ */
+public class FullQueryLogger extends BinLogAuditLogger implements IAuditLogger
+{
+    @Override
+    public void log(AuditLogEntry entry)
+    {
+        logQuery(entry.getOperation(), entry.getOptions(), entry.getTimestamp());
+    }
+
+    /**
+     * Log an invocation of a batch of queries
+     * @param type The type of the batch
+     * @param queries CQL text of the queries
+     * @param values Values to bind to as parameters for the queries
+     * @param queryOptions Options associated with the query invocation
+     * @param batchTimeMillis Approximate time in milliseconds since the epoch since the batch was invoked
+     */
+    void logBatch(String type, List<String> queries, List<List<ByteBuffer>> values, QueryOptions queryOptions, long batchTimeMillis)
+    {
+        Preconditions.checkNotNull(type, "type was null");
+        Preconditions.checkNotNull(queries, "queries was null");
+        Preconditions.checkNotNull(values, "value was null");
+        Preconditions.checkNotNull(queryOptions, "queryOptions was null");
+        Preconditions.checkArgument(batchTimeMillis > 0, "batchTimeMillis must be > 0");
+
+        //Don't construct the wrapper if the log is disabled
+        BinLog binLog = this.binLog;
+        if (binLog == null)
+        {
+            return;
+        }
+
+        WeighableMarshallableBatch wrappedBatch = new WeighableMarshallableBatch(type, queries, values, queryOptions, batchTimeMillis);
+        logRecord(wrappedBatch, binLog);
+    }
+
+    /**
+     * Log a single CQL query
+     * @param query CQL query text
+     * @param queryOptions Options associated with the query invocation
+     * @param queryTimeMillis Approximate time in milliseconds since the epoch since the batch was invoked
+     */
+    void logQuery(String query, QueryOptions queryOptions, long queryTimeMillis)
+    {
+        Preconditions.checkNotNull(query, "query was null");
+        Preconditions.checkNotNull(queryOptions, "queryOptions was null");
+        Preconditions.checkArgument(queryTimeMillis > 0, "queryTimeMillis must be > 0");
+
+        //Don't construct the wrapper if the log is disabled
+        BinLog binLog = this.binLog;
+        if (binLog == null)
+        {
+            return;
+        }
+
+        WeighableMarshallableQuery wrappedQuery = new WeighableMarshallableQuery(query, queryOptions, queryTimeMillis);
+        logRecord(wrappedQuery, binLog);
+    }
+
+    static class WeighableMarshallableBatch extends AbstractWeighableMarshallable
+    {
+        private final int weight;
+        private final String batchType;
+        private final List<String> queries;
+        private final List<List<ByteBuffer>> values;
+
+        public WeighableMarshallableBatch(String batchType, List<String> queries, List<List<ByteBuffer>> values, QueryOptions queryOptions, long batchTimeMillis)
+        {
+           super(queryOptions, batchTimeMillis);
+           this.queries = queries;
+           this.values = values;
+           this.batchType = batchType;
+           boolean success = false;
+           try
+           {
+               //weight, batch type, queries, values
+               int weightTemp = 8 + EMPTY_LIST_SIZE + EMPTY_LIST_SIZE;
+               for (int ii = 0; ii < queries.size(); ii++)
+               {
+                   weightTemp += ObjectSizes.sizeOf(queries.get(ii));
+               }
+
+               weightTemp += EMPTY_LIST_SIZE * values.size();
+               for (int ii = 0; ii < values.size(); ii++)
+               {
+                   List<ByteBuffer> sublist = values.get(ii);
+                   weightTemp += EMPTY_BYTEBUFFER_SIZE * sublist.size();
+                   for (int zz = 0; zz < sublist.size(); zz++)
+                   {
+                       weightTemp += sublist.get(zz).capacity();
+                   }
+               }
+               weightTemp += super.weight();
+               weightTemp += ObjectSizes.sizeOf(batchType);
+               weight = weightTemp;
+               success = true;
+           }
+           finally
+           {
+               if (!success)
+               {
+                   release();
+               }
+           }
+        }
+
+        @Override
+        public void writeMarshallable(WireOut wire)
+        {
+            wire.write("type").text("batch");
+            super.writeMarshallable(wire);
+            wire.write("batch-type").text(batchType);
+            ValueOut valueOut = wire.write("queries");
+            valueOut.int32(queries.size());
+            for (String query : queries)
+            {
+                valueOut.text(query);
+            }
+            valueOut = wire.write("values");
+            valueOut.int32(values.size());
+            for (List<ByteBuffer> subValues : values)
+            {
+                valueOut.int32(subValues.size());
+                for (ByteBuffer value : subValues)
+                {
+                    valueOut.bytes(BytesStore.wrap(value));
+                }
+            }
+        }
+
+        @Override
+        public int weight()
+        {
+            return weight;
+        }
+    }
+
+    static class WeighableMarshallableQuery extends AbstractWeighableMarshallable
+    {
+        private final String query;
+
+        public WeighableMarshallableQuery(String query, QueryOptions queryOptions, long queryTimeMillis)
+        {
+            super(queryOptions, queryTimeMillis);
+            this.query = query;
+        }
+
+        @Override
+        public void writeMarshallable(WireOut wire)
+        {
+            wire.write("type").text("single");
+            super.writeMarshallable(wire);
+            wire.write("query").text(query);
+        }
+
+        @Override
+        public int weight()
+        {
+            return Ints.checkedCast(ObjectSizes.sizeOf(query)) + super.weight();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/audit/IAuditLogContext.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/audit/IAuditLogContext.java b/src/java/org/apache/cassandra/audit/IAuditLogContext.java
new file mode 100644
index 0000000..55c3e04
--- /dev/null
+++ b/src/java/org/apache/cassandra/audit/IAuditLogContext.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.audit;
+
+import org.apache.cassandra.cql3.CQLStatement;
+
+/**
+ * Provides the context needed for audit logging statements.
+ * {@link CQLStatement} implements this interface such that every CQL command provides the context needed for AuditLog.
+ */
+public interface IAuditLogContext
+{
+    AuditLogContext getAuditLogContext();
+
+    static class AuditLogContext
+    {
+        public final AuditLogEntryType auditLogEntryType;
+        public final String keyspace;
+        public final String scope;
+
+        public AuditLogContext(AuditLogEntryType auditLogEntryType)
+        {
+            this(auditLogEntryType,null,null);
+        }
+
+        public AuditLogContext(AuditLogEntryType auditLogEntryType, String keyspace)
+        {
+           this(auditLogEntryType,keyspace,null);
+        }
+
+        public AuditLogContext(AuditLogEntryType auditLogEntryType, String keyspace, String scope)
+        {
+            this.auditLogEntryType = auditLogEntryType;
+            this.keyspace = keyspace;
+            this.scope = scope;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/audit/IAuditLogger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/audit/IAuditLogger.java b/src/java/org/apache/cassandra/audit/IAuditLogger.java
new file mode 100644
index 0000000..b72a256
--- /dev/null
+++ b/src/java/org/apache/cassandra/audit/IAuditLogger.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.audit;
+
+import java.nio.file.Path;
+
+public interface IAuditLogger
+{
+    boolean enabled();
+
+    /**
+     * Logs AuditLogEntry. This method might be called after {@link #stop()},
+     * hence implementations need to handle the race condition.
+     */
+    void log(AuditLogEntry auditLogEntry);
+
+    /**
+     * Stop and cleanup any resources of IAuditLogger implementations. Please note that
+     * {@link #log(AuditLogEntry)} might be called after being stopped.
+     */
+    void stop();
+
+    /**
+     * @return the path to the logging files/directory if the implemenation writes out to the local filesystem,
+     * or null if the implementation doesn't log locally.
+     */
+    default Path path()
+    {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/audit/NoOpAuditLogger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/audit/NoOpAuditLogger.java b/src/java/org/apache/cassandra/audit/NoOpAuditLogger.java
new file mode 100644
index 0000000..8d3dd7d
--- /dev/null
+++ b/src/java/org/apache/cassandra/audit/NoOpAuditLogger.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.audit;
+
+/**
+ * No-Op implementation of {@link IAuditLogger} to be used as a default audit logger when audit logging is disabled.
+ */
+public class NoOpAuditLogger implements IAuditLogger
+{
+    @Override
+    public boolean enabled()
+    {
+        return false;
+    }
+
+    @Override
+    public void log(AuditLogEntry logMessage)
+    {
+
+    }
+
+    @Override
+    public void stop()
+    {
+
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org