You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2018/05/11 12:45:59 UTC

[2/4] cassandra git commit: Audit logging for database activity

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/AuthResponse.java b/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
index 332b024..88c3085 100644
--- a/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
+++ b/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.transport.messages;
 import java.nio.ByteBuffer;
 
 import io.netty.buffer.ByteBuf;
+import org.apache.cassandra.audit.AuditLogEntry;
+import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.auth.AuthenticatedUser;
 import org.apache.cassandra.auth.IAuthenticator;
 import org.apache.cassandra.exceptions.AuthenticationException;
@@ -79,6 +81,14 @@ public class AuthResponse extends Message.Request
                 AuthenticatedUser user = negotiator.getAuthenticatedUser();
                 queryState.getClientState().login(user);
                 AuthMetrics.instance.markSuccess();
+                if (auditLogEnabled)
+                {
+                    AuditLogEntry auditEntry = new AuditLogEntry.Builder(queryState.getClientState())
+                                               .setOperation("LOGIN SUCCESSFUL")
+                                               .setType(AuditLogEntryType.LOGIN_SUCCESS)
+                                               .build();
+                    auditLogManager.log(auditEntry);
+                }
                 // authentication is complete, send a ready message to the client
                 return new AuthSuccess(challenge);
             }
@@ -90,6 +100,14 @@ public class AuthResponse extends Message.Request
         catch (AuthenticationException e)
         {
             AuthMetrics.instance.markFailure();
+            if (auditLogEnabled)
+            {
+                AuditLogEntry auditEntry = new AuditLogEntry.Builder(queryState.getClientState())
+                                           .setOperation("LOGIN FAILURE")
+                                           .setType(AuditLogEntryType.LOGIN_ERROR)
+                                           .build();
+                auditLogManager.log(auditEntry, e);
+            }
             return ErrorMessage.fromException(e);
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
index dcaa8da..5ffadac 100644
--- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
@@ -19,24 +19,31 @@ package org.apache.cassandra.transport.messages;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 
 import com.google.common.collect.ImmutableMap;
-import io.netty.buffer.ByteBuf;
 
-import org.apache.cassandra.cql3.*;
+import io.netty.buffer.ByteBuf;
+import org.apache.cassandra.audit.AuditLogEntry;
+import org.apache.cassandra.audit.AuditLogEntryType;
+import org.apache.cassandra.cql3.Attributes;
+import org.apache.cassandra.cql3.BatchQueryOptions;
+import org.apache.cassandra.cql3.QueryHandler;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.statements.BatchStatement;
 import org.apache.cassandra.cql3.statements.ModificationStatement;
 import org.apache.cassandra.cql3.statements.ParsedStatement;
-import org.apache.cassandra.db.fullquerylog.FullQueryLogger;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.PreparedQueryNotFoundException;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.transport.*;
+import org.apache.cassandra.transport.CBUtil;
+import org.apache.cassandra.transport.Message;
+import org.apache.cassandra.transport.ProtocolException;
+import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.MD5Digest;
 import org.apache.cassandra.utils.UUIDGen;
@@ -176,30 +183,20 @@ public class BatchMessage extends Message.Request
 
             QueryHandler handler = ClientState.getCQLQueryHandler();
             List<ParsedStatement.Prepared> prepared = new ArrayList<>(queryOrIdList.size());
-            boolean fullQueryLogEnabled = FullQueryLogger.instance.enabled();
-            List<String> queryStrings = fullQueryLogEnabled ? new ArrayList<>(queryOrIdList.size()) : Collections.EMPTY_LIST;
             for (int i = 0; i < queryOrIdList.size(); i++)
             {
                 Object query = queryOrIdList.get(i);
-                String queryString;
                 ParsedStatement.Prepared p;
                 if (query instanceof String)
                 {
                     p = QueryProcessor.parseStatement((String)query,
                                                       state.getClientState().cloneWithKeyspaceIfSet(options.getKeyspace()));
-                    queryString = (String)query;
                 }
                 else
                 {
                     p = handler.getPrepared((MD5Digest)query);
                     if (p == null)
                         throw new PreparedQueryNotFoundException((MD5Digest)query);
-                    queryString = p.rawCQLStatement;
-                }
-
-                if (fullQueryLogEnabled)
-                {
-                    queryStrings.add(queryString);
                 }
 
                 List<ByteBuffer> queryValues = values.get(i);
@@ -227,17 +224,16 @@ public class BatchMessage extends Message.Request
             // Note: It's ok at this point to pass a bogus value for the number of bound terms in the BatchState ctor
             // (and no value would be really correct, so we prefer passing a clearly wrong one).
             BatchStatement batch = new BatchStatement(-1, batchType, statements, Attributes.none());
-            long fqlTime = 0;
-            if (fullQueryLogEnabled)
-            {
-                fqlTime = System.currentTimeMillis();
-            }
+
+            long fqlTime = isLoggingEnabled ? System.currentTimeMillis() : 0;
             Message.Response response = handler.processBatch(batch, state, batchOptions, getCustomPayload(), queryStartNanoTime);
-            if (fullQueryLogEnabled)
+
+            if (isLoggingEnabled)
             {
-                FullQueryLogger.instance.logBatch(batchType.name(), queryStrings, values, options, fqlTime);
+                auditLogManager.logBatch(batchType.name(), queryOrIdList, values, prepared, options, state, fqlTime);
             }
 
+
             if (tracingId != null)
                 response.setTracingId(tracingId);
 
@@ -245,6 +241,16 @@ public class BatchMessage extends Message.Request
         }
         catch (Exception e)
         {
+            if (auditLogEnabled)
+            {
+                AuditLogEntry entry = new AuditLogEntry.Builder(state.getClientState())
+                                      .setOperation(getAuditString())
+                                      .setOptions(options)
+                                      .setType(AuditLogEntryType.BATCH)
+                                      .build();
+                auditLogManager.log(entry, e);
+            }
+
             JVMStabilityInspector.inspectThrowable(e);
             return ErrorMessage.fromException(e);
         }
@@ -267,4 +273,13 @@ public class BatchMessage extends Message.Request
         sb.append("] at consistency ").append(options.getConsistency());
         return sb.toString();
     }
+
+    private String getAuditString()
+    {
+        StringBuilder sb = new StringBuilder();
+        sb.append("BATCH of [");
+        sb.append(queryOrIdList.size());
+        sb.append("] statements at consistency ").append(options.getConsistency());
+        return sb.toString();
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
index e969134..cd7f300 100644
--- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
@@ -20,23 +20,24 @@ package org.apache.cassandra.transport.messages;
 import java.util.UUID;
 
 import com.google.common.collect.ImmutableMap;
-import io.netty.buffer.ByteBuf;
 
+import io.netty.buffer.ByteBuf;
+import org.apache.cassandra.audit.AuditLogEntry;
+import org.apache.cassandra.audit.AuditLogManager;
 import org.apache.cassandra.cql3.CQLStatement;
 import org.apache.cassandra.cql3.ColumnSpecification;
 import org.apache.cassandra.cql3.QueryHandler;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.ResultSet;
-import org.apache.cassandra.cql3.statements.BatchStatement;
-import org.apache.cassandra.cql3.statements.ModificationStatement;
 import org.apache.cassandra.cql3.statements.ParsedStatement;
-import org.apache.cassandra.cql3.statements.UpdateStatement;
-import org.apache.cassandra.db.fullquerylog.FullQueryLogger;
 import org.apache.cassandra.exceptions.PreparedQueryNotFoundException;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.transport.*;
+import org.apache.cassandra.transport.CBUtil;
+import org.apache.cassandra.transport.Message;
+import org.apache.cassandra.transport.ProtocolException;
+import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.MD5Digest;
 import org.apache.cassandra.utils.UUIDGen;
@@ -163,16 +164,21 @@ public class ExecuteMessage extends Message.Request
             // Some custom QueryHandlers are interested by the bound names. We provide them this information
             // by wrapping the QueryOptions.
             QueryOptions queryOptions = QueryOptions.addColumnSpecifications(options, prepared.boundNames);
-            boolean fqlEnabled = FullQueryLogger.instance.enabled();
-            long fqlTime = 0;
-            if (fqlEnabled)
-            {
-                fqlTime = System.currentTimeMillis();
-            }
+
+            long fqlTime = isLoggingEnabled ? System.currentTimeMillis() : 0;
             Message.Response response = handler.processPrepared(statement, state, queryOptions, getCustomPayload(), queryStartNanoTime);
-            if (fqlEnabled)
+
+            if (isLoggingEnabled)
             {
-                FullQueryLogger.instance.logQuery(prepared.rawCQLStatement, options, fqlTime);
+                AuditLogEntry auditEntry = new AuditLogEntry.Builder(state.getClientState())
+                                           .setType(statement.getAuditLogContext().auditLogEntryType)
+                                           .setOperation(prepared.rawCQLStatement)
+                                           .setTimestamp(fqlTime)
+                                           .setScope(statement)
+                                           .setKeyspace(state, statement)
+                                           .setOptions(options)
+                                           .build();
+                AuditLogManager.getInstance().log(auditEntry);
             }
 
             if (response instanceof ResultMessage.Rows)
@@ -212,6 +218,33 @@ public class ExecuteMessage extends Message.Request
         }
         catch (Exception e)
         {
+            if (auditLogEnabled)
+            {
+                if (e instanceof PreparedQueryNotFoundException)
+                {
+                    AuditLogEntry auditLogEntry = new AuditLogEntry.Builder(state.getClientState())
+                                                  .setOperation(toString())
+                                                  .setOptions(options)
+                                                  .build();
+                    auditLogManager.log(auditLogEntry, e);
+                }
+                else
+                {
+                    ParsedStatement.Prepared prepared = ClientState.getCQLQueryHandler().getPrepared(statementId);
+                    if (prepared != null)
+                    {
+                        AuditLogEntry auditLogEntry = new AuditLogEntry.Builder(state.getClientState())
+                                                      .setOperation(toString())
+                                                      .setType(prepared.statement.getAuditLogContext().auditLogEntryType)
+                                                      .setScope(prepared.statement)
+                                                      .setKeyspace(state, prepared.statement)
+                                                      .setOptions(options)
+                                                      .build();
+                        auditLogManager.log(auditLogEntry, e);
+                    }
+                }
+            }
+
             JVMStabilityInspector.inspectThrowable(e);
             return ErrorMessage.fromException(e);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
index b6bf055..e5e5248 100644
--- a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
@@ -20,12 +20,18 @@ package org.apache.cassandra.transport.messages;
 import java.util.UUID;
 
 import com.google.common.collect.ImmutableMap;
-import io.netty.buffer.ByteBuf;
 
+import io.netty.buffer.ByteBuf;
+import org.apache.cassandra.audit.AuditLogEntry;
+import org.apache.cassandra.audit.AuditLogEntryType;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.statements.ParsedStatement;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.transport.*;
+import org.apache.cassandra.transport.CBUtil;
+import org.apache.cassandra.transport.Message;
+import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.UUIDGen;
 
@@ -111,6 +117,17 @@ public class PrepareMessage extends Message.Request
             Message.Response response = ClientState.getCQLQueryHandler().prepare(query,
                                                                                  state.getClientState().cloneWithKeyspaceIfSet(keyspace),
                                                                                  getCustomPayload());
+            if (auditLogEnabled)
+            {
+                ParsedStatement.Prepared parsedStmt = QueryProcessor.parseStatement(query, state.getClientState());
+                AuditLogEntry auditLogEntry = new AuditLogEntry.Builder(state.getClientState())
+                                              .setOperation(query)
+                                              .setType(AuditLogEntryType.PREPARE_STATEMENT)
+                                              .setScope(parsedStmt.statement)
+                                              .setKeyspace(parsedStmt.statement)
+                                              .build();
+                auditLogManager.log(auditLogEntry);
+            }
 
             if (tracingId != null)
                 response.setTracingId(tracingId);
@@ -119,6 +136,15 @@ public class PrepareMessage extends Message.Request
         }
         catch (Exception e)
         {
+            if (auditLogEnabled)
+            {
+                AuditLogEntry auditLogEntry = new AuditLogEntry.Builder(state.getClientState())
+                                              .setOperation(query)
+                                              .setKeyspace(keyspace)
+                                              .setType(AuditLogEntryType.PREPARE_STATEMENT)
+                                              .build();
+                auditLogManager.log(auditLogEntry, e);
+            }
             JVMStabilityInspector.inspectThrowable(e);
             return ErrorMessage.fromException(e);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
index 8f64033..9df9205 100644
--- a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
@@ -22,8 +22,11 @@ import java.util.UUID;
 import com.google.common.collect.ImmutableMap;
 
 import io.netty.buffer.ByteBuf;
+import org.apache.cassandra.audit.AuditLogEntry;
+import org.apache.cassandra.audit.AuditLogManager;
 import org.apache.cassandra.cql3.QueryOptions;
-import org.apache.cassandra.db.fullquerylog.FullQueryLogger;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.statements.ParsedStatement;
 import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.exceptions.RequestValidationException;
 import org.apache.cassandra.service.ClientState;
@@ -114,17 +117,24 @@ public class QueryMessage extends Message.Request
                 Tracing.instance.begin("Execute CQL3 query", state.getClientAddress(), builder.build());
             }
 
-            boolean fqlEnabled = FullQueryLogger.instance.enabled();
-            long fqlTime = 0;
-            if (fqlEnabled)
-            {
-                fqlTime = System.currentTimeMillis();
-            }
+            long fqlTime = isLoggingEnabled ? System.currentTimeMillis() : 0;
             Message.Response response = ClientState.getCQLQueryHandler().process(query, state, options, getCustomPayload(), queryStartNanoTime);
-            if (fqlEnabled)
+
+            if (isLoggingEnabled)
             {
-                FullQueryLogger.instance.logQuery(query, options, fqlTime);
+                ParsedStatement.Prepared parsedStatement = QueryProcessor.parseStatement(query, state.getClientState());
+                AuditLogEntry auditEntry = new AuditLogEntry.Builder(state.getClientState())
+                                           .setType(parsedStatement.statement.getAuditLogContext().auditLogEntryType)
+                                           .setOperation(query)
+                                           .setTimestamp(fqlTime)
+                                           .setScope(parsedStatement.statement)
+                                           .setKeyspace(state, parsedStatement.statement)
+                                           .setOptions(options)
+                                           .build();
+                AuditLogManager.getInstance().log(auditEntry);
+
             }
+
             if (options.skipMetadata() && response instanceof ResultMessage.Rows)
                 ((ResultMessage.Rows)response).result.metadata.setSkipMetadata();
 
@@ -135,6 +145,14 @@ public class QueryMessage extends Message.Request
         }
         catch (Exception e)
         {
+            if (auditLogEnabled)
+            {
+                AuditLogEntry auditLogEntry = new AuditLogEntry.Builder(state.getClientState())
+                                              .setOperation(query)
+                                              .setOptions(options)
+                                              .build();
+                auditLogManager.log(auditLogEntry, e);
+            }
             JVMStabilityInspector.inspectThrowable(e);
             if (!((e instanceof RequestValidationException) || (e instanceof RequestExecutionException)))
                 logger.error("Unexpected error during query", e);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index 078b414..b621071 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -40,6 +40,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.auth.AllowAllNetworkAuthorizer;
+import org.apache.cassandra.audit.IAuditLogger;
 import org.apache.cassandra.auth.IAuthenticator;
 import org.apache.cassandra.auth.IAuthorizer;
 import org.apache.cassandra.auth.INetworkAuthorizer;
@@ -526,6 +527,29 @@ public class FBUtilities
         }
         return FBUtilities.construct(className, "network authorizer");
     }
+    
+    public static IAuditLogger newAuditLogger(String className) throws ConfigurationException
+    {
+        if (!className.contains("."))
+            className = "org.apache.cassandra.audit." + className;
+        return FBUtilities.construct(className, "Audit logger");
+    }
+
+    public static boolean isAuditLoggerClassExists(String className)
+    {
+        if (!className.contains("."))
+            className = "org.apache.cassandra.audit." + className;
+
+        try
+        {
+            FBUtilities.classForName(className, "Audit logger");
+        }
+        catch (ConfigurationException e)
+        {
+            return false;
+        }
+        return true;
+    }
 
     /**
      * @return The Class for the given name.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/utils/binlog/BinLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/binlog/BinLog.java b/src/java/org/apache/cassandra/utils/binlog/BinLog.java
index 070a151..0c8659e 100644
--- a/src/java/org/apache/cassandra/utils/binlog/BinLog.java
+++ b/src/java/org/apache/cassandra/utils/binlog/BinLog.java
@@ -272,6 +272,6 @@ public class BinLog implements Runnable, StoreFileListener
 
     public abstract static class ReleaseableWriteMarshallable implements WriteMarshallable
     {
-        protected abstract void release();
+        public abstract void release();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/test/unit/org/apache/cassandra/audit/AuditLogFilterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/audit/AuditLogFilterTest.java b/test/unit/org/apache/cassandra/audit/AuditLogFilterTest.java
new file mode 100644
index 0000000..8054f90
--- /dev/null
+++ b/test/unit/org/apache/cassandra/audit/AuditLogFilterTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.audit;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.cassandra.audit.AuditLogFilter.isFiltered;
+
+public class AuditLogFilterTest
+{
+    @Test
+    public void isFiltered_IncludeSetOnly()
+    {
+        Set<String> includeSet = new HashSet<>();
+        includeSet.add("a");
+        includeSet.add("b");
+        includeSet.add("c");
+
+        Set<String> excludeSet = new HashSet<>();
+
+        Assert.assertFalse(isFiltered("a", includeSet, excludeSet));
+        Assert.assertFalse(isFiltered("b", includeSet, excludeSet));
+        Assert.assertFalse(isFiltered("c", includeSet, excludeSet));
+        Assert.assertTrue(isFiltered("d", includeSet, excludeSet));
+        Assert.assertTrue(isFiltered("e", includeSet, excludeSet));
+    }
+
+    @Test
+    public void isFiltered_ExcludeSetOnly()
+    {
+        Set<String> includeSet = new HashSet<>();
+
+        Set<String> excludeSet = new HashSet<>();
+        excludeSet.add("a");
+        excludeSet.add("b");
+        excludeSet.add("c");
+
+        Assert.assertTrue(isFiltered("a", includeSet, excludeSet));
+        Assert.assertTrue(isFiltered("b", includeSet, excludeSet));
+        Assert.assertTrue(isFiltered("c", includeSet, excludeSet));
+        Assert.assertFalse(isFiltered("d", includeSet, excludeSet));
+        Assert.assertFalse(isFiltered("e", includeSet, excludeSet));
+    }
+
+    @Test
+    public void isFiltered_MutualExclusive()
+    {
+        Set<String> includeSet = new HashSet<>();
+        includeSet.add("a");
+        includeSet.add("b");
+        includeSet.add("c");
+
+        Set<String> excludeSet = new HashSet<>();
+        excludeSet.add("a");
+
+        Assert.assertTrue(isFiltered("a", includeSet, excludeSet));
+        Assert.assertFalse(isFiltered("b", includeSet, excludeSet));
+        Assert.assertFalse(isFiltered("c", includeSet, excludeSet));
+        Assert.assertTrue(isFiltered("e", includeSet, excludeSet));
+    }
+
+    @Test
+    public void isFiltered_MutualInclusive()
+    {
+        Set<String> includeSet = new HashSet<>();
+        includeSet.add("a");
+        includeSet.add("b");
+
+        Set<String> excludeSet = new HashSet<>();
+        excludeSet.add("c");
+        excludeSet.add("d");
+
+        Assert.assertFalse(isFiltered("a", includeSet, excludeSet));
+        Assert.assertFalse(isFiltered("b", includeSet, excludeSet));
+        Assert.assertTrue(isFiltered("c", includeSet, excludeSet));
+        Assert.assertTrue(isFiltered("d", includeSet, excludeSet));
+        Assert.assertTrue(isFiltered("e", includeSet, excludeSet));
+        Assert.assertTrue(isFiltered("f", includeSet, excludeSet));
+    }
+
+    @Test
+    public void isFiltered_UnSpecifiedInput()
+    {
+        Set<String> includeSet = new HashSet<>();
+        includeSet.add("a");
+        includeSet.add("b");
+        includeSet.add("c");
+
+        Set<String> excludeSet = new HashSet<>();
+        excludeSet.add("a");
+
+        Assert.assertTrue(isFiltered("a", includeSet, excludeSet));
+        Assert.assertFalse(isFiltered("b", includeSet, excludeSet));
+        Assert.assertFalse(isFiltered("c", includeSet, excludeSet));
+        Assert.assertTrue(isFiltered("d", includeSet, excludeSet));
+        Assert.assertTrue(isFiltered("e", includeSet, excludeSet));
+    }
+
+    @Test
+    public void isFiltered_SpecifiedInput()
+    {
+        Set<String> includeSet = new HashSet<>();
+        includeSet.add("a");
+        includeSet.add("b");
+        includeSet.add("c");
+
+        Set<String> excludeSet = new HashSet<>();
+        excludeSet.add("a");
+
+        Assert.assertTrue(isFiltered("a", includeSet, excludeSet));
+        Assert.assertFalse(isFiltered("b", includeSet, excludeSet));
+        Assert.assertFalse(isFiltered("c", includeSet, excludeSet));
+    }
+
+    @Test
+    public void isFiltered_FilteredInput_EmptyInclude()
+    {
+        Set<String> includeSet = new HashSet<>();
+        Set<String> excludeSet = new HashSet<>();
+        excludeSet.add("a");
+
+        Assert.assertTrue(isFiltered("a", includeSet, excludeSet));
+        Assert.assertFalse(isFiltered("b", includeSet, excludeSet));
+    }
+
+    @Test
+    public void isFiltered_FilteredInput_EmptyExclude()
+    {
+        Set<String> includeSet = new HashSet<>();
+        includeSet.add("a");
+        includeSet.add("b");
+        includeSet.add("c");
+
+        Set<String> excludeSet = new HashSet<>();
+
+        Assert.assertFalse(isFiltered("a", includeSet, excludeSet));
+        Assert.assertFalse(isFiltered("b", includeSet, excludeSet));
+        Assert.assertFalse(isFiltered("c", includeSet, excludeSet));
+        Assert.assertTrue(isFiltered("e", includeSet, excludeSet));
+    }
+
+    @Test
+    public void isFiltered_EmptyInputs()
+    {
+        Set<String> includeSet = new HashSet<>();
+        Set<String> excludeSet = new HashSet<>();
+
+        Assert.assertFalse(isFiltered("a", includeSet, excludeSet));
+        Assert.assertFalse(isFiltered("e", includeSet, excludeSet));
+    }
+
+    @Test
+    public void isFiltered_NullInputs()
+    {
+        Set<String> includeSet = new HashSet<>();
+        Set<String> excludeSet = new HashSet<>();
+        Assert.assertFalse(isFiltered(null, includeSet, excludeSet));
+
+        includeSet.add("a");
+        includeSet.add("b");
+        includeSet.add("c");
+        Assert.assertTrue(isFiltered(null, includeSet, excludeSet));
+
+        includeSet = new HashSet<>();
+        excludeSet.add("a");
+        excludeSet.add("b");
+        Assert.assertFalse(isFiltered(null, includeSet, excludeSet));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/test/unit/org/apache/cassandra/audit/AuditLoggerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/audit/AuditLoggerTest.java b/test/unit/org/apache/cassandra/audit/AuditLoggerTest.java
new file mode 100644
index 0000000..40eadf8
--- /dev/null
+++ b/test/unit/org/apache/cassandra/audit/AuditLoggerTest.java
@@ -0,0 +1,690 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.audit;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.datastax.driver.core.exceptions.SyntaxError;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.service.StorageService;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+
+public class AuditLoggerTest extends CQLTester
+{
+    @BeforeClass
+    public static void setUp()
+    {
+        AuditLogOptions options = new AuditLogOptions();
+        options.enabled = true;
+        options.logger = "InMemoryAuditLogger";
+        DatabaseDescriptor.setAuditLoggingOptions(options);
+        requireNetwork();
+    }
+
+    @Before
+    public void beforeTestMethod()
+    {
+        AuditLogOptions options = new AuditLogOptions();
+        enableAuditLogOptions(options);
+    }
+
+    private void enableAuditLogOptions(AuditLogOptions options)
+    {
+        String loggerName = "InMemoryAuditLogger";
+        String includedKeyspaces = options.included_keyspaces;
+        String excludedKeyspaces = options.excluded_keyspaces;
+        String includedCategories = options.included_categories;
+        String excludedCategories = options.excluded_categories;
+        String includedUsers = options.included_users;
+        String excludedUsers = options.excluded_users;
+
+        StorageService.instance.enableAuditLog(loggerName, includedKeyspaces, excludedKeyspaces, includedCategories, excludedCategories, includedUsers, excludedUsers);
+    }
+
+    private void disableAuditLogOptions()
+    {
+        StorageService.instance.disableAuditLog();
+    }
+
+    @Test
+    public void testAuditLogFilters() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+        execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 1, "Apache", "Cassandra");
+        execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 2, "trace", "test");
+
+        AuditLogOptions options = new AuditLogOptions();
+        options.excluded_keyspaces = KEYSPACE;
+        enableAuditLogOptions(options);
+
+        String cql = "SELECT id, v1, v2 FROM " + KEYSPACE + '.' + currentTable() + " WHERE id = ?";
+        ResultSet rs = executeAndAssertNoAuditLog(cql, 1);
+        assertEquals(1, rs.all().size());
+
+        options = new AuditLogOptions();
+        options.included_keyspaces = KEYSPACE;
+        enableAuditLogOptions(options);
+
+        cql = "SELECT id, v1, v2 FROM " + KEYSPACE + '.' + currentTable() + " WHERE id = ?";
+        rs = executeAndAssertWithPrepare(cql, AuditLogEntryType.SELECT, 1);
+        assertEquals(1, rs.all().size());
+
+        options = new AuditLogOptions();
+        options.included_keyspaces = KEYSPACE;
+        options.excluded_keyspaces = KEYSPACE;
+        enableAuditLogOptions(options);
+
+        cql = "SELECT id, v1, v2 FROM " + KEYSPACE + '.' + currentTable() + " WHERE id = ?";
+        rs = executeAndAssertNoAuditLog(cql, 1);
+        assertEquals(1, rs.all().size());
+
+        options = new AuditLogOptions();
+        enableAuditLogOptions(options);
+
+        cql = "SELECT id, v1, v2 FROM " + KEYSPACE + '.' + currentTable() + " WHERE id = ?";
+        rs = executeAndAssertWithPrepare(cql, AuditLogEntryType.SELECT, 1);
+        assertEquals(1, rs.all().size());
+    }
+
+    @Test
+    public void testAuditLogFiltersTransitions() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+        execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 1, "Apache", "Cassandra");
+        execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 2, "trace", "test");
+
+        AuditLogOptions options = new AuditLogOptions();
+        options.excluded_keyspaces = KEYSPACE;
+        enableAuditLogOptions(options);
+
+        String cql = "SELECT id, v1, v2 FROM " + KEYSPACE + '.' + currentTable() + " WHERE id = ?";
+        ResultSet rs = executeAndAssertNoAuditLog(cql, 1);
+        assertEquals(1, rs.all().size());
+
+        disableAuditLogOptions();
+
+        cql = "SELECT id, v1, v2 FROM " + KEYSPACE + '.' + currentTable() + " WHERE id = ?";
+        rs = executeAndAssertDisableAuditLog(cql, 1);
+        assertEquals(1, rs.all().size());
+
+        options = new AuditLogOptions();
+        options.included_keyspaces = KEYSPACE;
+        options.excluded_keyspaces = KEYSPACE;
+        enableAuditLogOptions(options);
+
+        cql = "SELECT id, v1, v2 FROM " + KEYSPACE + '.' + currentTable() + " WHERE id = ?";
+        rs = executeAndAssertNoAuditLog(cql, 1);
+        assertEquals(1, rs.all().size());
+
+        disableAuditLogOptions();
+
+        cql = "SELECT id, v1, v2 FROM " + KEYSPACE + '.' + currentTable() + " WHERE id = ?";
+        rs = executeAndAssertDisableAuditLog(cql, 1);
+        assertEquals(1, rs.all().size());
+    }
+
+    @Test
+    public void testAuditLogExceptions()
+    {
+        AuditLogOptions options = new AuditLogOptions();
+        options.excluded_keyspaces = KEYSPACE;
+        enableAuditLogOptions(options);
+        Assert.assertTrue(AuditLogManager.getInstance().isAuditingEnabled());
+
+        disableAuditLogOptions();
+    }
+
+    @Test
+    public void testAuditLogFilterIncludeExclude() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+        String tbl1 = currentTable();
+        execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 1, "Apache", "Cassandra");
+        execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 2, "trace", "test");
+
+        AuditLogOptions options = new AuditLogOptions();
+        options.excluded_categories = "QUERY";
+        options.included_categories = "QUERY,DML,PREPARE";
+        enableAuditLogOptions(options);
+
+        //QUERY - Should be filtered, part of excluded categories,
+        String cql = "SELECT id, v1, v2 FROM " + KEYSPACE + '.' + currentTable() + " WHERE id = 1";
+        Session session = sessionNet();
+        ResultSet rs = session.execute(cql);
+
+        assertEquals(0, ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.size());
+        assertEquals(1, rs.all().size());
+
+        //DML - Should not be filtered, part of included categories
+        cql = "INSERT INTO " + KEYSPACE + '.' + currentTable() + " (id, v1, v2) VALUES (?, ?, ?)";
+        executeAndAssertWithPrepare(cql, AuditLogEntryType.UPDATE, 1, "insert_audit", "test");
+
+        //DDL - Should be filtered, not part of included categories
+        cql = "ALTER TABLE  " + KEYSPACE + '.' + currentTable() + " ADD v3 text";
+        session = sessionNet();
+        rs = session.execute(cql);
+        assertEquals(0, ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.size());
+    }
+
+    @Test
+    public void testCqlSelectAuditing() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+        execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 1, "Apache", "Cassandra");
+        execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 2, "trace", "test");
+
+        String cql = "SELECT id, v1, v2 FROM " + KEYSPACE + '.' + currentTable() + " WHERE id = ?";
+        ResultSet rs = executeAndAssertWithPrepare(cql, AuditLogEntryType.SELECT, 1);
+
+        assertEquals(1, rs.all().size());
+    }
+
+    @Test
+    public void testCqlInsertAuditing() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+
+        String cql = "INSERT INTO " + KEYSPACE + '.' + currentTable() + "  (id, v1, v2) VALUES (?, ?, ?)";
+        executeAndAssertWithPrepare(cql, AuditLogEntryType.UPDATE, 1, "insert_audit", "test");
+    }
+
+    @Test
+    public void testCqlUpdateAuditing() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+        execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 1, "Apache", "Cassandra");
+        execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 2, "trace", "test");
+
+        String cql = "UPDATE " + KEYSPACE + '.' + currentTable() + "  SET v1 = 'ApacheCassandra' WHERE id = 1";
+        executeAndAssert(cql, AuditLogEntryType.UPDATE);
+
+        cql = "UPDATE " + KEYSPACE + '.' + currentTable() + "  SET v1 = ? WHERE id = ?";
+        executeAndAssertWithPrepare(cql, AuditLogEntryType.UPDATE, "AuditingTest", 2);
+    }
+
+    @Test
+    public void testCqlDeleteAuditing() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+        execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 1, "Apache", "Cassandra");
+        execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 2, "trace", "test");
+
+        String cql = "DELETE FROM " + KEYSPACE + '.' + currentTable() + " WHERE id = ?";
+        executeAndAssertWithPrepare(cql, AuditLogEntryType.DELETE, 1);
+    }
+
+    @Test
+    public void testCqlTruncateAuditing() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+        execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 1, "Apache", "Cassandra");
+        execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 2, "trace", "test");
+
+        String cql = "TRUNCATE TABLE  " + KEYSPACE + '.' + currentTable();
+        executeAndAssertWithPrepare(cql, AuditLogEntryType.TRUNCATE);
+    }
+
+    @Test
+    public void testCqlBatchAuditing() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+
+        Session session = sessionNet();
+
+        BatchStatement batchStatement = new BatchStatement();
+
+        String cqlInsert = "INSERT INTO " + KEYSPACE + "." + currentTable() + " (id, v1, v2) VALUES (?, ?, ?)";
+        PreparedStatement prep = session.prepare(cqlInsert);
+        AuditLogEntry logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+        assertLogEntry(cqlInsert, AuditLogEntryType.PREPARE_STATEMENT, logEntry, false);
+
+        batchStatement.add(prep.bind(1, "Apapche", "Cassandra"));
+        batchStatement.add(prep.bind(2, "Apapche1", "Cassandra1"));
+
+        String cqlUpdate = "UPDATE " + KEYSPACE + "." + currentTable() + " SET v1 = ? WHERE id = ?";
+        prep = session.prepare(cqlUpdate);
+        logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+        assertLogEntry(cqlUpdate, AuditLogEntryType.PREPARE_STATEMENT, logEntry, false);
+
+        batchStatement.add(prep.bind("Apache Cassandra", 1));
+
+        String cqlDelete = "DELETE FROM " + KEYSPACE + "." + currentTable() + " WHERE id = ?";
+        prep = session.prepare(cqlDelete);
+        logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+        assertLogEntry(cqlDelete, AuditLogEntryType.PREPARE_STATEMENT, logEntry, false);
+
+        batchStatement.add(prep.bind(1));
+
+        ResultSet rs = session.execute(batchStatement);
+
+        assertEquals(5, ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.size());
+        logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+
+        logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+        assertLogEntry(cqlInsert, AuditLogEntryType.UPDATE, logEntry, false);
+
+        logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+        assertLogEntry(cqlInsert, AuditLogEntryType.UPDATE, logEntry, false);
+
+        logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+        assertLogEntry(cqlUpdate, AuditLogEntryType.UPDATE, logEntry, false);
+
+        logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+        assertLogEntry(cqlDelete, AuditLogEntryType.DELETE, logEntry, false);
+
+        int size = rs.all().size();
+
+        assertEquals(0, size);
+    }
+
+    @Test
+    public void testCqlBatch_MultipleTablesAuditing()
+    {
+        createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+        String table1 = currentTable();
+
+        Session session = sessionNet();
+
+        BatchStatement batchStatement = new BatchStatement();
+
+        String cqlInsert1 = "INSERT INTO " + KEYSPACE + "." + table1 + " (id, v1, v2) VALUES (?, ?, ?)";
+        PreparedStatement prep = session.prepare(cqlInsert1);
+        AuditLogEntry logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+        assertLogEntry(cqlInsert1, AuditLogEntryType.PREPARE_STATEMENT, logEntry, false);
+
+        batchStatement.add(prep.bind(1, "Apapche", "Cassandra"));
+
+        createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+        String table2 = currentTable();
+
+        String cqlInsert2 = "INSERT INTO " + KEYSPACE + "." + table2 + " (id, v1, v2) VALUES (?, ?, ?)";
+        prep = session.prepare(cqlInsert2);
+        logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+        assertLogEntry(cqlInsert2, AuditLogEntryType.PREPARE_STATEMENT, logEntry, false);
+
+        batchStatement.add(prep.bind(1, "Apapche", "Cassandra"));
+
+        createKeyspace("CREATE KEYSPACE %s WITH replication={ 'class' : 'SimpleStrategy', 'replication_factor' : 1 }");
+        String ks2 = currentKeyspace();
+
+        createTable(ks2, "CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+        String table3 = currentTable();
+
+        String cqlInsert3 = "INSERT INTO " + ks2 + "." + table3 + " (id, v1, v2) VALUES (?, ?, ?)";
+        prep = session.prepare(cqlInsert3);
+        logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+        assertLogEntry(cqlInsert3, AuditLogEntryType.PREPARE_STATEMENT, logEntry, false, ks2);
+
+        batchStatement.add(prep.bind(1, "Apapche", "Cassandra"));
+
+        ResultSet rs = session.execute(batchStatement);
+
+        assertEquals(4, ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.size());
+        logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+
+        logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+        assertLogEntry(cqlInsert1, table1, AuditLogEntryType.UPDATE, logEntry, false, KEYSPACE);
+
+        logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+        assertLogEntry(cqlInsert2, table2, AuditLogEntryType.UPDATE, logEntry, false, KEYSPACE);
+
+        logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+        assertLogEntry(cqlInsert3, table3, AuditLogEntryType.UPDATE, logEntry, false, ks2);
+
+        int size = rs.all().size();
+
+        assertEquals(0, size);
+    }
+
+    @Test
+    public void testCqlKeyspaceAuditing() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+
+        String cql = "CREATE KEYSPACE " + createKeyspaceName() + " WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 2}  ";
+        executeAndAssert(cql, AuditLogEntryType.CREATE_KEYSPACE, true, currentKeyspace());
+
+        cql = "CREATE KEYSPACE IF NOT EXISTS " + createKeyspaceName() + " WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 2}  ";
+        executeAndAssert(cql, AuditLogEntryType.CREATE_KEYSPACE, true, currentKeyspace());
+
+        cql = "ALTER KEYSPACE " + currentKeyspace() + " WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 2}  ";
+        executeAndAssert(cql, AuditLogEntryType.ALTER_KEYSPACE, true, currentKeyspace());
+
+        cql = "DROP KEYSPACE " + currentKeyspace();
+        executeAndAssert(cql, AuditLogEntryType.DROP_KEYSPACE, true, currentKeyspace());
+    }
+
+    @Test
+    public void testCqlTableAuditing() throws Throwable
+    {
+        String cql = "CREATE TABLE " + KEYSPACE + "." + createTableName() + " (id int primary key, v1 text, v2 text)";
+        executeAndAssert(cql, AuditLogEntryType.CREATE_TABLE);
+
+        cql = "CREATE TABLE IF NOT EXISTS " + KEYSPACE + "." + createTableName() + " (id int primary key, v1 text, v2 text)";
+        executeAndAssert(cql, AuditLogEntryType.CREATE_TABLE);
+
+        cql = "ALTER TABLE " + KEYSPACE + "." + currentTable() + " ADD v3 text";
+        executeAndAssert(cql, AuditLogEntryType.ALTER_TABLE);
+
+        cql = "DROP TABLE " + KEYSPACE + "." + currentTable();
+        executeAndAssert(cql, AuditLogEntryType.DROP_TABLE);
+    }
+
+    @Test
+    public void testCqlMVAuditing() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+        execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 1, "Apache", "Cassandra");
+        execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 2, "trace", "test");
+
+        String tblName = currentTable();
+        String cql = "CREATE MATERIALIZED VIEW " + KEYSPACE + "." + createTableName() + " AS SELECT id,v1 FROM " + KEYSPACE + "." + tblName + " WHERE id IS NOT NULL AND v1 IS NOT NULL PRIMARY KEY ( id, v1 ) ";
+        executeAndAssert(cql, AuditLogEntryType.CREATE_VIEW);
+
+        cql = "CREATE MATERIALIZED VIEW IF NOT EXISTS " + KEYSPACE + "." + currentTable() + " AS SELECT id,v1 FROM " + KEYSPACE + "." + tblName + " WHERE id IS NOT NULL AND v1 IS NOT NULL PRIMARY KEY ( id, v1 ) ";
+        executeAndAssert(cql, AuditLogEntryType.CREATE_VIEW);
+
+        cql = "ALTER MATERIALIZED VIEW " + KEYSPACE + "." + currentTable() + " WITH caching = {  'keys' : 'NONE' };";
+        executeAndAssert(cql, AuditLogEntryType.ALTER_VIEW);
+
+        cql = "DROP MATERIALIZED VIEW " + KEYSPACE + "." + currentTable();
+        executeAndAssert(cql, AuditLogEntryType.DROP_VIEW);
+    }
+
+    @Test
+    public void testCqlTypeAuditing() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+
+        String tblName = createTableName();
+
+        String cql = "CREATE TYPE " + KEYSPACE + "." + tblName + " (id int, v1 text, v2 text)";
+        executeAndAssert(cql, AuditLogEntryType.CREATE_TYPE);
+
+        cql = "CREATE TYPE IF NOT EXISTS " + KEYSPACE + "." + tblName + " (id int, v1 text, v2 text)";
+        executeAndAssert(cql, AuditLogEntryType.CREATE_TYPE);
+
+        cql = "ALTER TYPE " + KEYSPACE + "." + tblName + " ADD v3 int";
+        executeAndAssert(cql, AuditLogEntryType.ALTER_TYPE);
+
+        cql = "ALTER TYPE " + KEYSPACE + "." + tblName + " RENAME v3 TO v4";
+        executeAndAssert(cql, AuditLogEntryType.ALTER_TYPE);
+
+        cql = "DROP TYPE " + KEYSPACE + "." + tblName;
+        executeAndAssert(cql, AuditLogEntryType.DROP_TYPE);
+
+        cql = "DROP TYPE IF EXISTS " + KEYSPACE + "." + tblName;
+        executeAndAssert(cql, AuditLogEntryType.DROP_TYPE);
+    }
+
+    @Test
+    public void testCqlIndexAuditing() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+
+        String tblName = currentTable();
+
+        String indexName = createTableName();
+
+        String cql = "CREATE INDEX " + indexName + " ON " + KEYSPACE + "." + tblName + " (v1)";
+        executeAndAssert(cql, AuditLogEntryType.CREATE_INDEX);
+
+        cql = "DROP INDEX " + KEYSPACE + "." + indexName;
+        executeAndAssert(cql, AuditLogEntryType.DROP_INDEX);
+    }
+
+    @Test
+    public void testCqlFunctionAuditing() throws Throwable
+    {
+        String tblName = createTableName();
+
+        String cql = "CREATE FUNCTION IF NOT EXISTS  " + KEYSPACE + "." + tblName + " (column TEXT,num int) RETURNS NULL ON NULL INPUT RETURNS text LANGUAGE javascript AS $$ column.substring(0,num) $$";
+        executeAndAssert(cql, AuditLogEntryType.CREATE_FUNCTION);
+
+        cql = "DROP FUNCTION " + KEYSPACE + "." + tblName;
+        executeAndAssert(cql, AuditLogEntryType.DROP_FUNCTION);
+    }
+
+    @Test
+    public void testCqlTriggerAuditing() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+
+        String tblName = currentTable();
+        String triggerName = createTableName();
+
+        String cql = "DROP TRIGGER IF EXISTS " + triggerName + " ON " + KEYSPACE + "." + tblName;
+        executeAndAssert(cql, AuditLogEntryType.DROP_TRIGGER);
+    }
+
+    @Test
+    public void testCqlAggregateAuditing() throws Throwable
+    {
+        String aggName = createTableName();
+        String cql = "DROP AGGREGATE IF EXISTS " + KEYSPACE + "." + aggName;
+        executeAndAssert(cql, AuditLogEntryType.DROP_AGGREGATE);
+    }
+
+    @Test
+    public void testCqlQuerySyntaxError()
+    {
+        String cql = "INSERT INTO " + KEYSPACE + '.' + currentTable() + "1 (id, v1, v2) VALUES (1, 'insert_audit, 'test')";
+        try
+        {
+            createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+            Session session = sessionNet();
+            ResultSet rs = session.execute(cql);
+            Assert.fail("should not succeed");
+        }
+        catch (SyntaxError e)
+        {
+            // nop
+        }
+
+        AuditLogEntry logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+        assertLogEntry(logEntry, cql);
+        assertEquals(0, ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.size());
+    }
+
+    @Test
+    public void testCqlSelectQuerySyntaxError()
+    {
+        createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+        String cql = "SELECT * FROM " + KEYSPACE + '.' + currentTable() + " LIMIT 2w";
+
+        try
+        {
+            Session session = sessionNet();
+            ResultSet rs = session.execute(cql);
+            Assert.fail("should not succeed");
+        }
+        catch (SyntaxError e)
+        {
+            // nop
+        }
+
+        AuditLogEntry logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+        assertLogEntry(logEntry, cql);
+        assertEquals(0, ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.size());
+    }
+
+    @Test
+    public void testCqlPrepareQueryError()
+    {
+        createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+        String cql = "INSERT INTO " + KEYSPACE + '.' + currentTable() + " (id, v1, v2) VALUES (?,?,?)";
+        try
+        {
+            Session session = sessionNet();
+
+            PreparedStatement pstmt = session.prepare(cql);
+            AuditLogEntry logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+            assertLogEntry(cql, AuditLogEntryType.PREPARE_STATEMENT, logEntry, false);
+
+            dropTable("DROP TABLE %s");
+            ResultSet rs = session.execute(pstmt.bind(1, "insert_audit", "test"));
+            Assert.fail("should not succeed");
+        }
+        catch (NoHostAvailableException e)
+        {
+            // nop
+        }
+
+        AuditLogEntry logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+        assertLogEntry(logEntry, null);
+        logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+        assertLogEntry(logEntry, cql);
+        assertEquals(0, ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.size());
+    }
+
+    @Test
+    public void testCqlPrepareQuerySyntaxError()
+    {
+        String cql = "INSERT INTO " + KEYSPACE + '.' + "foo" + "(id, v1, v2) VALES (?,?,?)";
+        try
+        {
+            createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+            Session session = sessionNet();
+            PreparedStatement pstmt = session.prepare(cql);
+            ResultSet rs = session.execute(pstmt.bind(1, "insert_audit", "test"));
+            Assert.fail("should not succeed");
+        }
+        catch (SyntaxError e)
+        {
+            // nop
+        }
+        AuditLogEntry logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+        assertLogEntry(logEntry, cql);
+        assertEquals(0, ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.size());
+    }
+
+    /**
+     * Helper methods for Audit Log CQL Testing
+     */
+
+    private ResultSet executeAndAssert(String cql, AuditLogEntryType type) throws Throwable
+    {
+        return executeAndAssert(cql, type, false, KEYSPACE);
+    }
+
+    private ResultSet executeAndAssert(String cql, AuditLogEntryType type, boolean isTableNull, String keyspace) throws Throwable
+    {
+        Session session = sessionNet();
+
+        ResultSet rs = session.execute(cql);
+
+        AuditLogEntry logEntry1 = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+        assertLogEntry(cql, type, logEntry1, isTableNull, keyspace);
+
+        assertEquals(0, ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.size());
+        return rs;
+    }
+
+    private ResultSet executeAndAssertWithPrepare(String cql, AuditLogEntryType exceuteType, Object... bindValues) throws Throwable
+    {
+        return executeAndAssertWithPrepare(cql, exceuteType, false, bindValues);
+    }
+
+    private ResultSet executeAndAssertWithPrepare(String cql, AuditLogEntryType executeType, boolean isTableNull, Object... bindValues) throws Throwable
+    {
+        Session session = sessionNet();
+
+        PreparedStatement pstmt = session.prepare(cql);
+        ResultSet rs = session.execute(pstmt.bind(bindValues));
+
+        AuditLogEntry logEntry1 = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+        assertLogEntry(cql, AuditLogEntryType.PREPARE_STATEMENT, logEntry1, isTableNull);
+
+        AuditLogEntry logEntry2 = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+        assertLogEntry(cql, executeType, logEntry2, isTableNull);
+
+        assertEquals(0, ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.size());
+        return rs;
+    }
+
+    private ResultSet executeAndAssertNoAuditLog(String cql, Object... bindValues)
+    {
+        Session session = sessionNet();
+
+        PreparedStatement pstmt = session.prepare(cql);
+        ResultSet rs = session.execute(pstmt.bind(bindValues));
+
+        assertEquals(0, ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.size());
+        return rs;
+    }
+
+    private ResultSet executeAndAssertDisableAuditLog(String cql, Object... bindValues)
+    {
+        Session session = sessionNet();
+
+        PreparedStatement pstmt = session.prepare(cql);
+        ResultSet rs = session.execute(pstmt.bind(bindValues));
+
+        assertThat(AuditLogManager.getInstance().getLogger(),instanceOf(NoOpAuditLogger.class));
+        return rs;
+    }
+
+    private void assertLogEntry(String cql, AuditLogEntryType type, AuditLogEntry actual, boolean isTableNull)
+    {
+        assertLogEntry(cql, type, actual, isTableNull, KEYSPACE);
+    }
+
+    private void assertLogEntry(String cql, AuditLogEntryType type, AuditLogEntry actual, boolean isTableNull, String keyspace)
+    {
+        assertLogEntry(cql, currentTable(), type, actual, isTableNull, keyspace);
+    }
+
+    private void assertLogEntry(String cql, String table, AuditLogEntryType type, AuditLogEntry actual, boolean isTableNull, String keyspace)
+    {
+        assertEquals(keyspace, actual.getKeyspace());
+        if (!isTableNull)
+        {
+            assertEquals(table, actual.getScope());
+        }
+        assertEquals(type, actual.getType());
+        assertEquals(cql, actual.getOperation());
+        assertNotEquals(0,actual.getTimestamp());
+    }
+
+    private void assertLogEntry(AuditLogEntry logEntry, String cql)
+    {
+        assertNull(logEntry.getKeyspace());
+        assertNull(logEntry.getScope());
+        assertNotEquals(0,logEntry.getTimestamp());
+        assertEquals(AuditLogEntryType.REQUEST_FAILURE, logEntry.getType());
+        if (null != cql && !cql.isEmpty())
+        {
+            assertThat(logEntry.getOperation(), containsString(cql));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/test/unit/org/apache/cassandra/audit/BinAuditLoggerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/audit/BinAuditLoggerTest.java b/test/unit/org/apache/cassandra/audit/BinAuditLoggerTest.java
new file mode 100644
index 0000000..f9d2930
--- /dev/null
+++ b/test/unit/org/apache/cassandra/audit/BinAuditLoggerTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.audit;
+
+import java.nio.file.Path;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import net.openhft.chronicle.queue.ChronicleQueue;
+import net.openhft.chronicle.queue.ChronicleQueueBuilder;
+import net.openhft.chronicle.queue.ExcerptTailer;
+import net.openhft.chronicle.queue.RollCycles;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.utils.binlog.BinLogTest;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+public class BinAuditLoggerTest extends CQLTester
+{
+    private static Path tempDir;
+
+    @BeforeClass
+    public static void setUp() throws Exception
+    {
+        tempDir = BinLogTest.tempDir();
+
+        AuditLogOptions options = new AuditLogOptions();
+        options.enabled = true;
+        options.logger = "BinAuditLogger";
+        options.roll_cycle = "TEST_SECONDLY";
+        options.audit_logs_dir = tempDir.toString();
+        DatabaseDescriptor.setAuditLoggingOptions(options);
+        requireNetwork();
+    }
+
+    @Test
+    public void testSelectRoundTripQuery() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+        execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 1, "Apache", "Cassandra");
+        execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 2, "trace", "test");
+
+        String cql = "SELECT id, v1, v2 FROM " + KEYSPACE + '.' + currentTable() + " WHERE id = ?";
+
+        Session session = sessionNet();
+
+        PreparedStatement pstmt = session.prepare(cql);
+        ResultSet rs = session.execute(pstmt.bind(1));
+
+        assertEquals(1, rs.all().size());
+        try (ChronicleQueue queue = ChronicleQueueBuilder.single(tempDir.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build())
+        {
+            ExcerptTailer tailer = queue.createTailer();
+            assertTrue(tailer.readDocument(wire -> {
+                assertEquals("AuditLog", wire.read("type").text());
+                assertThat(wire.read("message").text(), containsString(AuditLogEntryType.PREPARE_STATEMENT.toString()));
+            }));
+
+            assertTrue(tailer.readDocument(wire -> {
+                assertEquals("AuditLog", wire.read("type").text());
+                assertThat(wire.read("message").text(), containsString(AuditLogEntryType.SELECT.toString()));
+            }));
+            assertFalse(tailer.readDocument(wire -> {
+            }));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/test/unit/org/apache/cassandra/audit/FullQueryLoggerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/audit/FullQueryLoggerTest.java b/test/unit/org/apache/cassandra/audit/FullQueryLoggerTest.java
new file mode 100644
index 0000000..14f7f23
--- /dev/null
+++ b/test/unit/org/apache/cassandra/audit/FullQueryLoggerTest.java
@@ -0,0 +1,610 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.audit;
+
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import io.netty.buffer.Unpooled;
+import net.openhft.chronicle.queue.ChronicleQueue;
+import net.openhft.chronicle.queue.ChronicleQueueBuilder;
+import net.openhft.chronicle.queue.ExcerptTailer;
+import net.openhft.chronicle.queue.RollCycles;
+import net.openhft.chronicle.wire.ValueIn;
+import net.openhft.chronicle.wire.WireOut;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.audit.FullQueryLogger.WeighableMarshallableQuery;
+import org.apache.cassandra.audit.FullQueryLogger.WeighableMarshallableBatch;
+import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.binlog.BinLogTest;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class FullQueryLoggerTest extends CQLTester
+{
+    private static Path tempDir;
+
+    @BeforeClass
+    public static void beforeClass() throws Exception
+    {
+        tempDir = BinLogTest.tempDir();
+    }
+
+    private FullQueryLogger instance;
+    
+    @Before
+    public void setUp()
+    {
+        instance = AuditLogManager.getInstance().getFullQueryLogger();
+    }
+    
+    @After
+    public void tearDown()
+    {
+        instance.reset(tempDir.toString());
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testConfigureNullPath() throws Exception
+    {
+        instance.configure(null, "", true, 1, 1);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testConfigureNullRollCycle() throws Exception
+    {
+        instance.configure(BinLogTest.tempDir(), null, true, 1, 1);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testConfigureInvalidRollCycle() throws Exception
+    {
+        instance.configure(BinLogTest.tempDir(), "foobar", true, 1, 1);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testConfigureInvalidMaxQueueWeight() throws Exception
+    {
+        instance.configure(BinLogTest.tempDir(), "DAILY", true, 0, 1);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testConfigureInvalidMaxQueueLogSize() throws Exception
+    {
+        instance.configure(BinLogTest.tempDir(), "DAILY", true, 1, 0);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testConfigureOverExistingFile() throws Exception
+    {
+        File f = File.createTempFile("foo", "bar");
+        f.deleteOnExit();
+        instance.configure(f.toPath(), "TEST_SECONDLY", true, 1, 1);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testCanRead() throws Exception
+    {
+        tempDir.toFile().setReadable(false);
+        try
+        {
+            configureFQL();
+        }
+        finally
+        {
+            tempDir.toFile().setReadable(true);
+        }
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testCanWrite() throws Exception
+    {
+        tempDir.toFile().setWritable(false);
+        try
+        {
+            configureFQL();
+        }
+        finally
+        {
+            tempDir.toFile().setWritable(true);
+        }
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testCanExecute() throws Exception
+    {
+        tempDir.toFile().setExecutable(false);
+        try
+        {
+            configureFQL();
+        }
+        finally
+        {
+            tempDir.toFile().setExecutable(true);
+        }
+    }
+
+    @Test
+    public void testResetWithoutConfigure() throws Exception
+    {
+        instance.reset(tempDir.toString());
+        instance.reset(tempDir.toString());
+    }
+
+    @Test
+    public void stopWithoutConfigure() throws Exception
+    {
+        instance.stop();
+        instance.stop();
+    }
+
+    /**
+     * Both the last used and supplied directory should get cleaned
+     */
+    @Test
+    public void testResetCleansPaths() throws Exception
+    {
+        configureFQL();
+        File tempA = File.createTempFile("foo", "bar", tempDir.toFile());
+        assertTrue(tempA.exists());
+        File tempB = File.createTempFile("foo", "bar", BinLogTest.tempDir().toFile());
+        instance.reset(tempB.getParent());
+        assertFalse(tempA.exists());
+        assertFalse(tempB.exists());
+    }
+
+    /**
+     * The last used and configured directory are the same and it shouldn't be an issue
+     */
+    @Test
+    public void testResetSamePath() throws Exception
+    {
+        configureFQL();
+        File tempA = File.createTempFile("foo", "bar", tempDir.toFile());
+        assertTrue(tempA.exists());
+        instance.reset(tempA.getParent());
+        assertFalse(tempA.exists());
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testDoubleConfigure() throws Exception
+    {
+        configureFQL();
+        configureFQL();
+    }
+
+    @Test
+    public void testCleansDirectory() throws Exception
+    {
+        assertTrue(new File(tempDir.toFile(), "foobar").createNewFile());
+        configureFQL();
+        assertEquals(tempDir.toFile().listFiles().length, 1);
+        assertEquals("directory-listing.cq4t", tempDir.toFile().listFiles()[0].getName());
+    }
+
+    @Test
+    public void testEnabledReset() throws Exception
+    {
+        assertFalse(instance.enabled());
+        configureFQL();
+        assertTrue(instance.enabled());
+        instance.reset(tempDir.toString());
+        assertFalse(instance.enabled());
+    }
+
+    @Test
+    public void testEnabledStop() throws Exception
+    {
+        assertFalse(instance.enabled());
+        configureFQL();
+        assertTrue(instance.enabled());
+        instance.stop();
+        assertFalse(instance.enabled());
+    }
+
+    /**
+     * Test that a thread will block if the FQL is over weight, and unblock once the backup is cleared
+     */
+    @Test
+    public void testBlocking() throws Exception
+    {
+        configureFQL();
+        //Prevent the bin log thread from making progress, causing the task queue to block
+        Semaphore blockBinLog = new Semaphore(0);
+        try
+        {
+            //Find out when the bin log thread has been blocked, necessary to not run into batch task drain behavior
+            Semaphore binLogBlocked = new Semaphore(0);
+            instance.binLog.put(new WeighableMarshallableQuery("foo1", QueryOptions.DEFAULT, 1)
+            {
+
+                public void writeMarshallable(WireOut wire)
+                {
+                    //Notify that the bin log is blocking now
+                    binLogBlocked.release();
+                    try
+                    {
+                        //Block the bin log thread so the task queue can be filled
+                        blockBinLog.acquire();
+                    }
+                    catch (InterruptedException e)
+                    {
+                        e.printStackTrace();
+                    }
+                    super.writeMarshallable(wire);
+                }
+
+                public void release()
+                {
+                    super.release();
+                }
+            });
+
+            //Wait for the bin log thread to block so it can't batch drain tasks
+            Util.spinAssertEquals(true, binLogBlocked::tryAcquire, 60);
+
+            //Now fill the task queue
+            logQuery("foo2");
+
+            //Start a thread to block waiting on the bin log queue
+            Thread t = new Thread(() ->
+                                  {
+                                      logQuery("foo3");
+                                      //Should be able to log another query without an issue
+                                      logQuery("foo4");
+                                  });
+            t.start();
+            Thread.sleep(500);
+            //If thread state is terminated then the thread started, finished, and didn't block on the full task queue
+            assertTrue(t.getState() != Thread.State.TERMINATED);
+        }
+        finally
+        {
+            //Unblock the binlog thread
+            blockBinLog.release();
+        }
+        Util.spinAssertEquals(true, () -> checkForQueries(Arrays.asList("foo1", "foo2", "foo3", "foo4")), 60);
+    }
+
+    private boolean checkForQueries(List<String> queries)
+    {
+        try (ChronicleQueue queue = ChronicleQueueBuilder.single(tempDir.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build())
+        {
+            ExcerptTailer tailer = queue.createTailer();
+            List<String> expectedQueries = new LinkedList<>(queries);
+            while (!expectedQueries.isEmpty())
+            {
+                if (!tailer.readDocument(wire -> {
+                    assertEquals(expectedQueries.get(0), wire.read("query").text());
+                    expectedQueries.remove(0);
+                }))
+                {
+                    return false;
+                }
+            }
+            assertFalse(tailer.readDocument(wire -> {}));
+            return true;
+        }
+    }
+
+    @Test
+    public void testNonBlocking() throws Exception
+    {
+        instance.configure(tempDir, "TEST_SECONDLY", false, 1, 1024 * 1024 * 256);
+        //Prevent the bin log thread from making progress, causing the task queue to refuse tasks
+        Semaphore blockBinLog = new Semaphore(0);
+        try
+        {
+            //Find out when the bin log thread has been blocked, necessary to not run into batch task drain behavior
+            Semaphore binLogBlocked = new Semaphore(0);
+            instance.binLog.put(new WeighableMarshallableQuery("foo1", QueryOptions.DEFAULT, 1)
+            {
+
+                public void writeMarshallable(WireOut wire)
+                {
+                    //Notify that the bin log is blocking now
+                    binLogBlocked.release();
+                    try
+                    {
+                        //Block the bin log thread so the task queue can be filled
+                        blockBinLog.acquire();
+                    }
+                    catch (InterruptedException e)
+                    {
+                        e.printStackTrace();
+                    }
+                    super.writeMarshallable(wire);
+                }
+
+                public void release()
+                {
+                    super.release();
+                }
+            });
+
+            //Wait for the bin log thread to block so it can't batch drain tasks
+            Util.spinAssertEquals(true, binLogBlocked::tryAcquire, 60);
+
+            //Now fill the task queue
+            logQuery("foo2");
+
+            //This sample should get dropped AKA released without being written
+            AtomicInteger releasedCount = new AtomicInteger(0);
+            AtomicInteger writtenCount = new AtomicInteger(0);
+            instance.logRecord(new WeighableMarshallableQuery("foo3", QueryOptions.DEFAULT, 1) {
+                public void writeMarshallable(WireOut wire)
+                {
+                    writtenCount.incrementAndGet();
+                    super.writeMarshallable(wire);
+                }
+
+                public void release()
+                {
+                    releasedCount.incrementAndGet();
+                    super.release();
+                }
+            }, instance.binLog);
+
+            Util.spinAssertEquals(1, releasedCount::get, 60);
+            assertEquals(0, writtenCount.get());
+        }
+        finally
+        {
+            blockBinLog.release();
+        }
+        //Wait for tasks to drain so there should be space in the queue
+        Util.spinAssertEquals(true, () -> checkForQueries(Arrays.asList("foo1", "foo2")), 60);
+        //Should be able to log again
+        logQuery("foo4");
+        Util.spinAssertEquals(true, () -> checkForQueries(Arrays.asList("foo1", "foo2", "foo4")), 60);
+    }
+
+    @Test
+    public void testRoundTripQuery() throws Exception
+    {
+        configureFQL();
+        logQuery("foo");
+        Util.spinAssertEquals(true, () -> checkForQueries(Arrays.asList("foo")), 60);
+        try (ChronicleQueue queue = ChronicleQueueBuilder.single(tempDir.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build())
+        {
+            ExcerptTailer tailer = queue.createTailer();
+            assertTrue(tailer.readDocument(wire -> {
+                assertEquals("single", wire.read("type").text());
+                ProtocolVersion protocolVersion = ProtocolVersion.decode(wire.read("protocol-version").int32());
+                assertEquals(ProtocolVersion.CURRENT, protocolVersion);
+                QueryOptions queryOptions = QueryOptions.codec.decode(Unpooled.wrappedBuffer(wire.read("query-options").bytes()), protocolVersion);
+                compareQueryOptions(QueryOptions.DEFAULT, queryOptions);
+                assertEquals(1L, wire.read("query-time").int64());
+                assertEquals("foo", wire.read("query").text());
+            }));
+        }
+    }
+
+    @Test
+    public void testRoundTripBatch() throws Exception
+    {
+        configureFQL();
+        instance.logBatch("UNLOGGED", Arrays.asList("foo1", "foo2"), Arrays.asList(Arrays.asList(ByteBuffer.allocate(1) , ByteBuffer.allocateDirect(2)), Arrays.asList()), QueryOptions.DEFAULT, 1);
+        Util.spinAssertEquals(true, () ->
+        {
+            try (ChronicleQueue queue = ChronicleQueueBuilder.single(tempDir.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build())
+            {
+                return queue.createTailer().readingDocument().isPresent();
+            }
+        }, 60);
+        try (ChronicleQueue queue = ChronicleQueueBuilder.single(tempDir.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build())
+        {
+            ExcerptTailer tailer = queue.createTailer();
+            assertTrue(tailer.readDocument(wire -> {
+                assertEquals("batch", wire.read("type").text());
+                ProtocolVersion protocolVersion = ProtocolVersion.decode(wire.read("protocol-version").int32());
+                assertEquals(ProtocolVersion.CURRENT, protocolVersion);
+                QueryOptions queryOptions = QueryOptions.codec.decode(Unpooled.wrappedBuffer(wire.read("query-options").bytes()), protocolVersion);
+                assertEquals(1L, wire.read("query-time").int64());
+                compareQueryOptions(QueryOptions.DEFAULT, queryOptions);
+                assertEquals("UNLOGGED", wire.read("batch-type").text());
+                ValueIn in = wire.read("queries");
+                assertEquals(2, in.int32());
+                assertEquals("foo1", in.text());
+                assertEquals("foo2", in.text());
+                in = wire.read("values");
+                assertEquals(2, in.int32());
+                assertEquals(2, in.int32());
+                assertTrue(Arrays.equals(new byte[1], in.bytes()));
+                assertTrue(Arrays.equals(new byte[2], in.bytes()));
+                assertEquals(0, in.int32());
+            }));
+        }
+    }
+
+    @Test
+    public void testQueryWeight()
+    {
+        //Empty query should have some weight
+        WeighableMarshallableQuery query = new WeighableMarshallableQuery("", QueryOptions.DEFAULT, 1);
+        assertTrue(query.weight() >= 95);
+
+        StringBuilder sb = new StringBuilder();
+        for (int ii = 0; ii < 1024 * 1024; ii++)
+        {
+            sb.append('a');
+        }
+        query = new WeighableMarshallableQuery(sb.toString(), QueryOptions.DEFAULT, 1);
+
+        //A large query should be reflected in the size, * 2 since characters are still two bytes
+        assertTrue(query.weight() > ObjectSizes.measureDeep(sb.toString()));
+
+        //Large query options should be reflected
+        QueryOptions largeOptions = QueryOptions.forInternalCalls(Arrays.asList(ByteBuffer.allocate(1024 * 1024)));
+        query = new WeighableMarshallableQuery("", largeOptions, 1);
+        assertTrue(query.weight() > 1024 * 1024);
+        System.out.printf("weight %d%n", query.weight());
+    }
+
+    @Test
+    public void testBatchWeight()
+    {
+        //An empty batch should have weight
+        WeighableMarshallableBatch batch = new WeighableMarshallableBatch("", new ArrayList<>(), new ArrayList<>(), QueryOptions.DEFAULT, 1);
+        assertTrue(batch.weight() >= 183);
+
+        StringBuilder sb = new StringBuilder();
+        for (int ii = 0; ii < 1024 * 1024; ii++)
+        {
+            sb.append('a');
+        }
+
+        //The weight of the type string should be reflected
+        batch = new WeighableMarshallableBatch(sb.toString(), new ArrayList<>(), new ArrayList<>(), QueryOptions.DEFAULT, 1);
+        assertTrue(batch.weight() > ObjectSizes.measureDeep(sb.toString()));
+
+        //The weight of the list containing queries should be reflected
+        List<String> bigList = new ArrayList(100000);
+        for (int ii = 0; ii < 100000; ii++)
+        {
+            bigList.add("");
+        }
+        batch = new WeighableMarshallableBatch("", bigList, new ArrayList<>(), QueryOptions.DEFAULT, 1);
+        assertTrue(batch.weight() > ObjectSizes.measureDeep(bigList));
+
+        //The size of the query should be reflected
+        bigList = new ArrayList(1);
+        bigList.add(sb.toString());
+        batch = new WeighableMarshallableBatch("", bigList, new ArrayList<>(), QueryOptions.DEFAULT, 1);
+        assertTrue(batch.weight() > ObjectSizes.measureDeep(bigList));
+
+        bigList = null;
+        //The size of the list of values should be reflected
+        List<List<ByteBuffer>> bigValues = new ArrayList<>(100000);
+        for (int ii = 0; ii < 100000; ii++)
+        {
+            bigValues.add(new ArrayList<>(0));
+        }
+        bigValues.get(0).add(ByteBuffer.allocate(1024 * 1024 * 5));
+        batch = new WeighableMarshallableBatch("", new ArrayList<>(),  bigValues, QueryOptions.DEFAULT, 1);
+        assertTrue(batch.weight() > ObjectSizes.measureDeep(bigValues));
+
+        //As should the size of the values
+        QueryOptions largeOptions = QueryOptions.forInternalCalls(Arrays.asList(ByteBuffer.allocate(1024 * 1024)));
+        batch = new WeighableMarshallableBatch("", new ArrayList<>(), new ArrayList<>(), largeOptions, 1);
+        assertTrue(batch.weight() > 1024 * 1024);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testLogBatchNullType() throws Exception
+    {
+        instance.logBatch(null, new ArrayList<>(), new ArrayList<>(), QueryOptions.DEFAULT, 1);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testLogBatchNullQueries() throws Exception
+    {
+        instance.logBatch("", null, new ArrayList<>(), QueryOptions.DEFAULT, 1);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testLogBatchNullQueriesQuery() throws Exception
+    {
+        configureFQL();
+        instance.logBatch("", Arrays.asList((String)null), new ArrayList<>(), QueryOptions.DEFAULT, 1);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testLogBatchNullValues() throws Exception
+    {
+        instance.logBatch("", new ArrayList<>(), null, QueryOptions.DEFAULT, 1);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testLogBatchNullValuesValue() throws Exception
+    {
+        instance.logBatch("", new ArrayList<>(), Arrays.asList((List<ByteBuffer>)null), null, 1);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testLogBatchNullQueryOptions() throws Exception
+    {
+        instance.logBatch("", new ArrayList<>(), new ArrayList<>(), null, 1);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testLogBatchNegativeTime() throws Exception
+    {
+        instance.logBatch("", new ArrayList<>(), new ArrayList<>(), QueryOptions.DEFAULT, -1);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testLogQueryNullQuery() throws Exception
+    {
+        instance.logQuery(null, QueryOptions.DEFAULT, 1);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testLogQueryNullQueryOptions() throws Exception
+    {
+        instance.logQuery("", null, 1);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testLogQueryNegativeTime() throws Exception
+    {
+        instance.logQuery("", QueryOptions.DEFAULT, -1);
+    }
+
+    private static void compareQueryOptions(QueryOptions a, QueryOptions b)
+    {
+        assertEquals(a.getClass(), b.getClass());
+        assertEquals(a.getProtocolVersion(), b.getProtocolVersion());
+        assertEquals(a.getPageSize(), b.getPageSize());
+        assertEquals(a.getConsistency(), b.getConsistency());
+        assertEquals(a.getPagingState(), b.getPagingState());
+        assertEquals(a.getValues(), b.getValues());
+        assertEquals(a.getSerialConsistency(), b.getSerialConsistency());
+    }
+
+    private void configureFQL() throws Exception
+    {
+        instance.configure(tempDir, "TEST_SECONDLY", true, 1, 1024 * 1024 * 256);
+    }
+
+    private void logQuery(String query)
+    {
+        instance.logQuery(query, QueryOptions.DEFAULT, 1);
+    }
+
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org