You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2018/05/11 12:45:58 UTC

[1/4] cassandra git commit: Audit logging for database activity

Repository: cassandra
Updated Branches:
  refs/heads/trunk aba582fb4 -> f56871b88


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/test/unit/org/apache/cassandra/audit/InMemoryAuditLogger.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/audit/InMemoryAuditLogger.java b/test/unit/org/apache/cassandra/audit/InMemoryAuditLogger.java
new file mode 100644
index 0000000..7052d76
--- /dev/null
+++ b/test/unit/org/apache/cassandra/audit/InMemoryAuditLogger.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.audit;
+
+import java.nio.file.Path;
+import java.util.LinkedList;
+import java.util.Queue;
+
+public class InMemoryAuditLogger implements IAuditLogger
+{
+    final Queue<AuditLogEntry> inMemQueue = new LinkedList<>();
+    private boolean enabled = true;
+
+    @Override
+    public boolean enabled()
+    {
+        return enabled;
+    }
+
+    @Override
+    public void log(AuditLogEntry logMessage)
+    {
+        inMemQueue.offer(logMessage);
+    }
+
+    @Override
+    public void stop()
+    {
+        enabled = false;
+        inMemQueue.clear();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
index 084ad7e..4078e2a 100644
--- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
+++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
@@ -119,6 +119,11 @@ public class DatabaseDescriptorRefTest
     "org.apache.cassandra.LogbackStatusListener$1",
     "org.apache.cassandra.LogbackStatusListener$2",
     "org.apache.cassandra.TeeingAppender",
+    "org.apache.cassandra.audit.IAuditLogger",
+    "org.apache.cassandra.audit.BinAuditLogger",
+    "org.apache.cassandra.audit.BinLogAuditLogger",
+    "org.apache.cassandra.audit.FullQueryLogger",
+    "org.apache.cassandra.audit.AuditLogOptions",
     // generated classes
     "org.apache.cassandra.config.ConfigBeanInfo",
     "org.apache.cassandra.config.ConfigCustomizer",

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index dacb37c..e53342d 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -543,6 +543,13 @@ public abstract class CQLTester
         return tables.get(tables.size() - 1);
     }
 
+    protected String currentKeyspace()
+    {
+        if (keyspaces.isEmpty())
+            return null;
+        return keyspaces.get(keyspaces.size() - 1);
+    }
+
     protected ByteBuffer unset()
     {
         return ByteBufferUtil.UNSET_BYTE_BUFFER;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/test/unit/org/apache/cassandra/db/fullquerylog/FullQueryLoggerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/fullquerylog/FullQueryLoggerTest.java b/test/unit/org/apache/cassandra/db/fullquerylog/FullQueryLoggerTest.java
deleted file mode 100644
index 175cf8c..0000000
--- a/test/unit/org/apache/cassandra/db/fullquerylog/FullQueryLoggerTest.java
+++ /dev/null
@@ -1,601 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db.fullquerylog;
-
-
-import java.io.File;
-import java.nio.ByteBuffer;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.junit.After;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import io.netty.buffer.Unpooled;
-import net.openhft.chronicle.queue.ChronicleQueue;
-import net.openhft.chronicle.queue.ChronicleQueueBuilder;
-import net.openhft.chronicle.queue.ExcerptTailer;
-import net.openhft.chronicle.queue.RollCycles;
-import net.openhft.chronicle.wire.ValueIn;
-import net.openhft.chronicle.wire.WireOut;
-import org.apache.cassandra.Util;
-import org.apache.cassandra.cql3.QueryOptions;
-import org.apache.cassandra.db.fullquerylog.FullQueryLogger.WeighableMarshallableQuery;
-import org.apache.cassandra.db.fullquerylog.FullQueryLogger.WeighableMarshallableBatch;
-import org.apache.cassandra.transport.ProtocolVersion;
-import org.apache.cassandra.transport.Server;
-import org.apache.cassandra.utils.ObjectSizes;
-import org.apache.cassandra.utils.binlog.BinLogTest;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class FullQueryLoggerTest
-{
-    private static Path tempDir;
-
-    @BeforeClass
-    public static void beforeClass() throws Exception
-    {
-        tempDir = BinLogTest.tempDir();
-    }
-
-    @After
-    public void tearDown()
-    {
-        FullQueryLogger.instance.reset(tempDir.toString());
-    }
-
-    @Test(expected = NullPointerException.class)
-    public void testConfigureNullPath() throws Exception
-    {
-        FullQueryLogger.instance.configure(null, "", true, 1, 1);
-    }
-
-    @Test(expected = NullPointerException.class)
-    public void testConfigureNullRollCycle() throws Exception
-    {
-        FullQueryLogger.instance.configure(BinLogTest.tempDir(), null, true, 1, 1);
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testConfigureInvalidRollCycle() throws Exception
-    {
-        FullQueryLogger.instance.configure(BinLogTest.tempDir(), "foobar", true, 1, 1);
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testConfigureInvalidMaxQueueWeight() throws Exception
-    {
-        FullQueryLogger.instance.configure(BinLogTest.tempDir(), "DAILY", true, 0, 1);
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testConfigureInvalidMaxQueueLogSize() throws Exception
-    {
-        FullQueryLogger.instance.configure(BinLogTest.tempDir(), "DAILY", true, 1, 0);
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testConfigureOverExistingFile() throws Exception
-    {
-        File f = File.createTempFile("foo", "bar");
-        f.deleteOnExit();
-        FullQueryLogger.instance.configure(f.toPath(), "TEST_SECONDLY", true, 1, 1);
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testCanRead() throws Exception
-    {
-        tempDir.toFile().setReadable(false);
-        try
-        {
-            configureFQL();
-        }
-        finally
-        {
-            tempDir.toFile().setReadable(true);
-        }
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testCanWrite() throws Exception
-    {
-        tempDir.toFile().setWritable(false);
-        try
-        {
-            configureFQL();
-        }
-        finally
-        {
-            tempDir.toFile().setWritable(true);
-        }
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testCanExecute() throws Exception
-    {
-        tempDir.toFile().setExecutable(false);
-        try
-        {
-            configureFQL();
-        }
-        finally
-        {
-            tempDir.toFile().setExecutable(true);
-        }
-    }
-
-    @Test
-    public void testResetWithoutConfigure() throws Exception
-    {
-        FullQueryLogger.instance.reset(tempDir.toString());
-        FullQueryLogger.instance.reset(tempDir.toString());
-    }
-
-    @Test
-    public void stopWithoutConfigure() throws Exception
-    {
-        FullQueryLogger.instance.stop();
-        FullQueryLogger.instance.stop();
-    }
-
-    /**
-     * Both the last used and supplied directory should get cleaned
-     */
-    @Test
-    public void testResetCleansPaths() throws Exception
-    {
-        configureFQL();
-        File tempA = File.createTempFile("foo", "bar", tempDir.toFile());
-        assertTrue(tempA.exists());
-        File tempB = File.createTempFile("foo", "bar", BinLogTest.tempDir().toFile());
-        FullQueryLogger.instance.reset(tempB.getParent());
-        assertFalse(tempA.exists());
-        assertFalse(tempB.exists());
-    }
-
-    /**
-     * The last used and configured directory are the same and it shouldn't be an issue
-     */
-    @Test
-    public void testResetSamePath() throws Exception
-    {
-        configureFQL();
-        File tempA = File.createTempFile("foo", "bar", tempDir.toFile());
-        assertTrue(tempA.exists());
-        FullQueryLogger.instance.reset(tempA.getParent());
-        assertFalse(tempA.exists());
-    }
-
-    @Test(expected = IllegalStateException.class)
-    public void testDoubleConfigure() throws Exception
-    {
-        configureFQL();
-        configureFQL();
-    }
-
-    @Test
-    public void testCleansDirectory() throws Exception
-    {
-        assertTrue(new File(tempDir.toFile(), "foobar").createNewFile());
-        configureFQL();
-        assertEquals(tempDir.toFile().listFiles().length, 1);
-        assertEquals("directory-listing.cq4t", tempDir.toFile().listFiles()[0].getName());
-    }
-
-    @Test
-    public void testEnabledReset() throws Exception
-    {
-        assertFalse(FullQueryLogger.instance.enabled());
-        configureFQL();
-        assertTrue(FullQueryLogger.instance.enabled());
-        FullQueryLogger.instance.reset(tempDir.toString());
-        assertFalse(FullQueryLogger.instance.enabled());
-    }
-
-    @Test
-    public void testEnabledStop() throws Exception
-    {
-        assertFalse(FullQueryLogger.instance.enabled());
-        configureFQL();
-        assertTrue(FullQueryLogger.instance.enabled());
-        FullQueryLogger.instance.stop();
-        assertFalse(FullQueryLogger.instance.enabled());
-    }
-
-    /**
-     * Test that a thread will block if the FQL is over weight, and unblock once the backup is cleared
-     */
-    @Test
-    public void testBlocking() throws Exception
-    {
-        configureFQL();
-        //Prevent the bin log thread from making progress, causing the task queue to block
-        Semaphore blockBinLog = new Semaphore(0);
-        try
-        {
-            //Find out when the bin log thread has been blocked, necessary to not run into batch task drain behavior
-            Semaphore binLogBlocked = new Semaphore(0);
-            FullQueryLogger.instance.binLog.put(new WeighableMarshallableQuery("foo1", QueryOptions.DEFAULT, 1)
-            {
-
-                public void writeMarshallable(WireOut wire)
-                {
-                    //Notify that the bin log is blocking now
-                    binLogBlocked.release();
-                    try
-                    {
-                        //Block the bin log thread so the task queue can be filled
-                        blockBinLog.acquire();
-                    }
-                    catch (InterruptedException e)
-                    {
-                        e.printStackTrace();
-                    }
-                    super.writeMarshallable(wire);
-                }
-
-                public void release()
-                {
-                    super.release();
-                }
-            });
-
-            //Wait for the bin log thread to block so it can't batch drain tasks
-            Util.spinAssertEquals(true, binLogBlocked::tryAcquire, 60);
-
-            //Now fill the task queue
-            logQuery("foo2");
-
-            //Start a thread to block waiting on the bin log queue
-            Thread t = new Thread(() ->
-                                  {
-                                      logQuery("foo3");
-                                      //Should be able to log another query without an issue
-                                      logQuery("foo4");
-                                  });
-            t.start();
-            Thread.sleep(500);
-            //If thread state is terminated then the thread started, finished, and didn't block on the full task queue
-            assertTrue(t.getState() != Thread.State.TERMINATED);
-        }
-        finally
-        {
-            //Unblock the binlog thread
-            blockBinLog.release();
-        }
-        Util.spinAssertEquals(true, () -> checkForQueries(Arrays.asList("foo1", "foo2", "foo3", "foo4")), 60);
-    }
-
-    private boolean checkForQueries(List<String> queries)
-    {
-        try (ChronicleQueue queue = ChronicleQueueBuilder.single(tempDir.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build())
-        {
-            ExcerptTailer tailer = queue.createTailer();
-            List<String> expectedQueries = new LinkedList<>(queries);
-            while (!expectedQueries.isEmpty())
-            {
-                if (!tailer.readDocument(wire -> {
-                    assertEquals(expectedQueries.get(0), wire.read("query").text());
-                    expectedQueries.remove(0);
-                }))
-                {
-                    return false;
-                }
-            }
-            assertFalse(tailer.readDocument(wire -> {}));
-            return true;
-        }
-    }
-
-    @Test
-    public void testNonBlocking() throws Exception
-    {
-        FullQueryLogger.instance.configure(tempDir, "TEST_SECONDLY", false, 1, 1024 * 1024 * 256);
-        //Prevent the bin log thread from making progress, causing the task queue to refuse tasks
-        Semaphore blockBinLog = new Semaphore(0);
-        try
-        {
-            //Find out when the bin log thread has been blocked, necessary to not run into batch task drain behavior
-            Semaphore binLogBlocked = new Semaphore(0);
-            FullQueryLogger.instance.binLog.put(new WeighableMarshallableQuery("foo1", QueryOptions.DEFAULT, 1)
-            {
-
-                public void writeMarshallable(WireOut wire)
-                {
-                    //Notify that the bin log is blocking now
-                    binLogBlocked.release();
-                    try
-                    {
-                        //Block the bin log thread so the task queue can be filled
-                        blockBinLog.acquire();
-                    }
-                    catch (InterruptedException e)
-                    {
-                        e.printStackTrace();
-                    }
-                    super.writeMarshallable(wire);
-                }
-
-                public void release()
-                {
-                    super.release();
-                }
-            });
-
-            //Wait for the bin log thread to block so it can't batch drain tasks
-            Util.spinAssertEquals(true, binLogBlocked::tryAcquire, 60);
-
-            //Now fill the task queue
-            logQuery("foo2");
-
-            //This sample should get dropped AKA released without being written
-            AtomicInteger releasedCount = new AtomicInteger(0);
-            AtomicInteger writtenCount = new AtomicInteger(0);
-            FullQueryLogger.instance.logRecord(new WeighableMarshallableQuery("foo3", QueryOptions.DEFAULT, 1) {
-                public void writeMarshallable(WireOut wire)
-                {
-                    writtenCount.incrementAndGet();
-                    super.writeMarshallable(wire);
-                }
-
-                public void release()
-                {
-                    releasedCount.incrementAndGet();
-                    super.release();
-                }
-            }, FullQueryLogger.instance.binLog);
-
-            Util.spinAssertEquals(1, releasedCount::get, 60);
-            assertEquals(0, writtenCount.get());
-        }
-        finally
-        {
-            blockBinLog.release();
-        }
-        //Wait for tasks to drain so there should be space in the queue
-        Util.spinAssertEquals(true, () -> checkForQueries(Arrays.asList("foo1", "foo2")), 60);
-        //Should be able to log again
-        logQuery("foo4");
-        Util.spinAssertEquals(true, () -> checkForQueries(Arrays.asList("foo1", "foo2", "foo4")), 60);
-    }
-
-    @Test
-    public void testRoundTripQuery() throws Exception
-    {
-        configureFQL();
-        logQuery("foo");
-        Util.spinAssertEquals(true, () -> checkForQueries(Arrays.asList("foo")), 60);
-        try (ChronicleQueue queue = ChronicleQueueBuilder.single(tempDir.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build())
-        {
-            ExcerptTailer tailer = queue.createTailer();
-            assertTrue(tailer.readDocument(wire -> {
-                assertEquals("single", wire.read("type").text());
-                ProtocolVersion protocolVersion = ProtocolVersion.decode(wire.read("protocol-version").int32());
-                assertEquals(ProtocolVersion.CURRENT, protocolVersion);
-                QueryOptions queryOptions = QueryOptions.codec.decode(Unpooled.wrappedBuffer(wire.read("query-options").bytes()), protocolVersion);
-                compareQueryOptions(QueryOptions.DEFAULT, queryOptions);
-                assertEquals(1L, wire.read("query-time").int64());
-                assertEquals("foo", wire.read("query").text());
-            }));
-        }
-    }
-
-    @Test
-    public void testRoundTripBatch() throws Exception
-    {
-        configureFQL();
-        FullQueryLogger.instance.logBatch("UNLOGGED", Arrays.asList("foo1", "foo2"), Arrays.asList(Arrays.asList(ByteBuffer.allocate(1) , ByteBuffer.allocateDirect(2)), Arrays.asList()), QueryOptions.DEFAULT, 1);
-        Util.spinAssertEquals(true, () ->
-        {
-            try (ChronicleQueue queue = ChronicleQueueBuilder.single(tempDir.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build())
-            {
-                return queue.createTailer().readingDocument().isPresent();
-            }
-        }, 60);
-        try (ChronicleQueue queue = ChronicleQueueBuilder.single(tempDir.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build())
-        {
-            ExcerptTailer tailer = queue.createTailer();
-            assertTrue(tailer.readDocument(wire -> {
-                assertEquals("batch", wire.read("type").text());
-                ProtocolVersion protocolVersion = ProtocolVersion.decode(wire.read("protocol-version").int32());
-                assertEquals(ProtocolVersion.CURRENT, protocolVersion);
-                QueryOptions queryOptions = QueryOptions.codec.decode(Unpooled.wrappedBuffer(wire.read("query-options").bytes()), protocolVersion);
-                assertEquals(1L, wire.read("query-time").int64());
-                compareQueryOptions(QueryOptions.DEFAULT, queryOptions);
-                assertEquals("UNLOGGED", wire.read("batch-type").text());
-                ValueIn in = wire.read("queries");
-                assertEquals(2, in.int32());
-                assertEquals("foo1", in.text());
-                assertEquals("foo2", in.text());
-                in = wire.read("values");
-                assertEquals(2, in.int32());
-                assertEquals(2, in.int32());
-                assertTrue(Arrays.equals(new byte[1], in.bytes()));
-                assertTrue(Arrays.equals(new byte[2], in.bytes()));
-                assertEquals(0, in.int32());
-            }));
-        }
-    }
-
-    @Test
-    public void testQueryWeight()
-    {
-        //Empty query should have some weight
-        WeighableMarshallableQuery query = new WeighableMarshallableQuery("", QueryOptions.DEFAULT, 1);
-        assertTrue(query.weight() >= 95);
-
-        StringBuilder sb = new StringBuilder();
-        for (int ii = 0; ii < 1024 * 1024; ii++)
-        {
-            sb.append('a');
-        }
-        query = new WeighableMarshallableQuery(sb.toString(), QueryOptions.DEFAULT, 1);
-
-        //A large query should be reflected in the size, * 2 since characters are still two bytes
-        assertTrue(query.weight() > ObjectSizes.measureDeep(sb.toString()));
-
-        //Large query options should be reflected
-        QueryOptions largeOptions = QueryOptions.forInternalCalls(Arrays.asList(ByteBuffer.allocate(1024 * 1024)));
-        query = new WeighableMarshallableQuery("", largeOptions, 1);
-        assertTrue(query.weight() > 1024 * 1024);
-        System.out.printf("weight %d%n", query.weight());
-    }
-
-    @Test
-    public void testBatchWeight()
-    {
-        //An empty batch should have weight
-        WeighableMarshallableBatch batch = new WeighableMarshallableBatch("", new ArrayList<>(), new ArrayList<>(), QueryOptions.DEFAULT, 1);
-        assertTrue(batch.weight() >= 183);
-
-        StringBuilder sb = new StringBuilder();
-        for (int ii = 0; ii < 1024 * 1024; ii++)
-        {
-            sb.append('a');
-        }
-
-        //The weight of the type string should be reflected
-        batch = new WeighableMarshallableBatch(sb.toString(), new ArrayList<>(), new ArrayList<>(), QueryOptions.DEFAULT, 1);
-        assertTrue(batch.weight() > ObjectSizes.measureDeep(sb.toString()));
-
-        //The weight of the list containing queries should be reflected
-        List<String> bigList = new ArrayList(100000);
-        for (int ii = 0; ii < 100000; ii++)
-        {
-            bigList.add("");
-        }
-        batch = new WeighableMarshallableBatch("", bigList, new ArrayList<>(), QueryOptions.DEFAULT, 1);
-        assertTrue(batch.weight() > ObjectSizes.measureDeep(bigList));
-
-        //The size of the query should be reflected
-        bigList = new ArrayList(1);
-        bigList.add(sb.toString());
-        batch = new WeighableMarshallableBatch("", bigList, new ArrayList<>(), QueryOptions.DEFAULT, 1);
-        assertTrue(batch.weight() > ObjectSizes.measureDeep(bigList));
-
-        bigList = null;
-        //The size of the list of values should be reflected
-        List<List<ByteBuffer>> bigValues = new ArrayList<>(100000);
-        for (int ii = 0; ii < 100000; ii++)
-        {
-            bigValues.add(new ArrayList<>(0));
-        }
-        bigValues.get(0).add(ByteBuffer.allocate(1024 * 1024 * 5));
-        batch = new WeighableMarshallableBatch("", new ArrayList<>(),  bigValues, QueryOptions.DEFAULT, 1);
-        assertTrue(batch.weight() > ObjectSizes.measureDeep(bigValues));
-
-        //As should the size of the values
-        QueryOptions largeOptions = QueryOptions.forInternalCalls(Arrays.asList(ByteBuffer.allocate(1024 * 1024)));
-        batch = new WeighableMarshallableBatch("", new ArrayList<>(), new ArrayList<>(), largeOptions, 1);
-        assertTrue(batch.weight() > 1024 * 1024);
-    }
-
-    @Test(expected = NullPointerException.class)
-    public void testLogBatchNullType() throws Exception
-    {
-        FullQueryLogger.instance.logBatch(null, new ArrayList<>(), new ArrayList<>(), QueryOptions.DEFAULT, 1);
-    }
-
-    @Test(expected = NullPointerException.class)
-    public void testLogBatchNullQueries() throws Exception
-    {
-        FullQueryLogger.instance.logBatch("", null, new ArrayList<>(), QueryOptions.DEFAULT, 1);
-    }
-
-    @Test(expected = NullPointerException.class)
-    public void testLogBatchNullQueriesQuery() throws Exception
-    {
-        configureFQL();
-        FullQueryLogger.instance.logBatch("", Arrays.asList((String)null), new ArrayList<>(), QueryOptions.DEFAULT, 1);
-    }
-
-    @Test(expected = NullPointerException.class)
-    public void testLogBatchNullValues() throws Exception
-    {
-        FullQueryLogger.instance.logBatch("", new ArrayList<>(), null, QueryOptions.DEFAULT, 1);
-    }
-
-    @Test(expected = NullPointerException.class)
-    public void testLogBatchNullValuesValue() throws Exception
-    {
-        FullQueryLogger.instance.logBatch("", new ArrayList<>(), Arrays.asList((List<ByteBuffer>)null), null, 1);
-    }
-
-    @Test(expected = NullPointerException.class)
-    public void testLogBatchNullQueryOptions() throws Exception
-    {
-        FullQueryLogger.instance.logBatch("", new ArrayList<>(), new ArrayList<>(), null, 1);
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testLogBatchNegativeTime() throws Exception
-    {
-        FullQueryLogger.instance.logBatch("", new ArrayList<>(), new ArrayList<>(), QueryOptions.DEFAULT, -1);
-    }
-
-    @Test(expected = NullPointerException.class)
-    public void testLogQueryNullQuery() throws Exception
-    {
-        FullQueryLogger.instance.logQuery(null, QueryOptions.DEFAULT, 1);
-    }
-
-    @Test(expected = NullPointerException.class)
-    public void testLogQueryNullQueryOptions() throws Exception
-    {
-        FullQueryLogger.instance.logQuery("", null, 1);
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testLogQueryNegativeTime() throws Exception
-    {
-        FullQueryLogger.instance.logQuery("", QueryOptions.DEFAULT, -1);
-    }
-
-    private static void compareQueryOptions(QueryOptions a, QueryOptions b)
-    {
-        assertEquals(a.getClass(), b.getClass());
-        assertEquals(a.getProtocolVersion(), b.getProtocolVersion());
-        assertEquals(a.getPageSize(), b.getPageSize());
-        assertEquals(a.getConsistency(), b.getConsistency());
-        assertEquals(a.getPagingState(), b.getPagingState());
-        assertEquals(a.getValues(), b.getValues());
-        assertEquals(a.getSerialConsistency(), b.getSerialConsistency());
-    }
-
-    private void configureFQL() throws Exception
-    {
-        FullQueryLogger.instance.configure(tempDir, "TEST_SECONDLY", true, 1, 1024 * 1024 * 256);
-    }
-
-    private void logQuery(String query)
-    {
-        FullQueryLogger.instance.logQuery(query, QueryOptions.DEFAULT, 1);
-    }
-
-}
-

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java b/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
index 8c4f5f6..e4ecbea 100644
--- a/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
+++ b/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
@@ -34,6 +34,7 @@ import org.junit.runner.RunWith;
 
 import org.apache.cassandra.OrderedJUnit4ClassRunner;
 import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.audit.AuditLogManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.Gossiper;
@@ -618,4 +619,31 @@ public class StorageServiceServerTest
         Gossiper.instance.getEndpointStateForEndpoint(internalAddress).addApplicationState(ApplicationState.NATIVE_ADDRESS_AND_PORT, valueFactory.nativeaddressAndPort(InetAddressAndPort.getByName("127.0.0.3:666")));
         assertEquals("127.0.0.3:666", StorageService.instance.getNativeaddress(internalAddress, true));
     }
+
+    @Test(expected = IllegalStateException.class)
+    public void testAuditLogEnableLoggerNotFound() throws Exception
+    {
+        StorageService.instance.enableAuditLog(null, null, null, null, null, null, null);
+        assertTrue(AuditLogManager.getInstance().isAuditingEnabled());
+        StorageService.instance.enableAuditLog("foobar", null, null, null, null, null, null);
+    }
+
+    @Test
+    public void testAuditLogEnableLoggerTransitions() throws Exception
+    {
+        StorageService.instance.enableAuditLog(null, null, null, null, null, null, null);
+        assertTrue(AuditLogManager.getInstance().isAuditingEnabled());
+
+        try
+        {
+            StorageService.instance.enableAuditLog("foobar", null, null, null, null, null, null);
+        }
+        catch (ConfigurationException | IllegalStateException e)
+        {
+            e.printStackTrace();
+        }
+
+        StorageService.instance.enableAuditLog(null, null, null, null, null, null, null);
+        assertTrue(AuditLogManager.getInstance().isAuditingEnabled());
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/test/unit/org/apache/cassandra/utils/binlog/BinLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/binlog/BinLogTest.java b/test/unit/org/apache/cassandra/utils/binlog/BinLogTest.java
index 204e27b..64b0d4f 100644
--- a/test/unit/org/apache/cassandra/utils/binlog/BinLogTest.java
+++ b/test/unit/org/apache/cassandra/utils/binlog/BinLogTest.java
@@ -114,7 +114,7 @@ public class BinLogTest
         AtomicInteger releaseCount = new AtomicInteger();
         binLog.put(new BinLog.ReleaseableWriteMarshallable()
         {
-            protected void release()
+            public void release()
             {
                 releaseCount.incrementAndGet();
             }
@@ -139,7 +139,7 @@ public class BinLogTest
 
             }
 
-            protected void release()
+            public void release()
             {
                 releaseCount.incrementAndGet();
             }
@@ -174,7 +174,7 @@ public class BinLogTest
         Semaphore released = new Semaphore(0);
         binLog.sampleQueue.put(new BinLog.ReleaseableWriteMarshallable()
         {
-            protected void release()
+            public void release()
             {
                 released.release();
             }
@@ -220,7 +220,7 @@ public class BinLogTest
         {
             binLog.put(new BinLog.ReleaseableWriteMarshallable()
             {
-                protected void release()
+                public void release()
                 {
                 }
 
@@ -296,7 +296,7 @@ public class BinLogTest
         {
             assertTrue(binLog.offer(new BinLog.ReleaseableWriteMarshallable()
             {
-                protected void release()
+                public void release()
                 {
                 }
 
@@ -412,7 +412,7 @@ public class BinLogTest
     {
         return new BinLog.ReleaseableWriteMarshallable()
         {
-            protected void release()
+            public void release()
             {
                 //Do nothing
             }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[2/4] cassandra git commit: Audit logging for database activity

Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/AuthResponse.java b/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
index 332b024..88c3085 100644
--- a/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
+++ b/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.transport.messages;
 import java.nio.ByteBuffer;
 
 import io.netty.buffer.ByteBuf;
+import org.apache.cassandra.audit.AuditLogEntry;
+import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.auth.AuthenticatedUser;
 import org.apache.cassandra.auth.IAuthenticator;
 import org.apache.cassandra.exceptions.AuthenticationException;
@@ -79,6 +81,14 @@ public class AuthResponse extends Message.Request
                 AuthenticatedUser user = negotiator.getAuthenticatedUser();
                 queryState.getClientState().login(user);
                 AuthMetrics.instance.markSuccess();
+                if (auditLogEnabled)
+                {
+                    AuditLogEntry auditEntry = new AuditLogEntry.Builder(queryState.getClientState())
+                                               .setOperation("LOGIN SUCCESSFUL")
+                                               .setType(AuditLogEntryType.LOGIN_SUCCESS)
+                                               .build();
+                    auditLogManager.log(auditEntry);
+                }
                 // authentication is complete, send a ready message to the client
                 return new AuthSuccess(challenge);
             }
@@ -90,6 +100,14 @@ public class AuthResponse extends Message.Request
         catch (AuthenticationException e)
         {
             AuthMetrics.instance.markFailure();
+            if (auditLogEnabled)
+            {
+                AuditLogEntry auditEntry = new AuditLogEntry.Builder(queryState.getClientState())
+                                           .setOperation("LOGIN FAILURE")
+                                           .setType(AuditLogEntryType.LOGIN_ERROR)
+                                           .build();
+                auditLogManager.log(auditEntry, e);
+            }
             return ErrorMessage.fromException(e);
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
index dcaa8da..5ffadac 100644
--- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
@@ -19,24 +19,31 @@ package org.apache.cassandra.transport.messages;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 
 import com.google.common.collect.ImmutableMap;
-import io.netty.buffer.ByteBuf;
 
-import org.apache.cassandra.cql3.*;
+import io.netty.buffer.ByteBuf;
+import org.apache.cassandra.audit.AuditLogEntry;
+import org.apache.cassandra.audit.AuditLogEntryType;
+import org.apache.cassandra.cql3.Attributes;
+import org.apache.cassandra.cql3.BatchQueryOptions;
+import org.apache.cassandra.cql3.QueryHandler;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.statements.BatchStatement;
 import org.apache.cassandra.cql3.statements.ModificationStatement;
 import org.apache.cassandra.cql3.statements.ParsedStatement;
-import org.apache.cassandra.db.fullquerylog.FullQueryLogger;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.PreparedQueryNotFoundException;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.transport.*;
+import org.apache.cassandra.transport.CBUtil;
+import org.apache.cassandra.transport.Message;
+import org.apache.cassandra.transport.ProtocolException;
+import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.MD5Digest;
 import org.apache.cassandra.utils.UUIDGen;
@@ -176,30 +183,20 @@ public class BatchMessage extends Message.Request
 
             QueryHandler handler = ClientState.getCQLQueryHandler();
             List<ParsedStatement.Prepared> prepared = new ArrayList<>(queryOrIdList.size());
-            boolean fullQueryLogEnabled = FullQueryLogger.instance.enabled();
-            List<String> queryStrings = fullQueryLogEnabled ? new ArrayList<>(queryOrIdList.size()) : Collections.EMPTY_LIST;
             for (int i = 0; i < queryOrIdList.size(); i++)
             {
                 Object query = queryOrIdList.get(i);
-                String queryString;
                 ParsedStatement.Prepared p;
                 if (query instanceof String)
                 {
                     p = QueryProcessor.parseStatement((String)query,
                                                       state.getClientState().cloneWithKeyspaceIfSet(options.getKeyspace()));
-                    queryString = (String)query;
                 }
                 else
                 {
                     p = handler.getPrepared((MD5Digest)query);
                     if (p == null)
                         throw new PreparedQueryNotFoundException((MD5Digest)query);
-                    queryString = p.rawCQLStatement;
-                }
-
-                if (fullQueryLogEnabled)
-                {
-                    queryStrings.add(queryString);
                 }
 
                 List<ByteBuffer> queryValues = values.get(i);
@@ -227,17 +224,16 @@ public class BatchMessage extends Message.Request
             // Note: It's ok at this point to pass a bogus value for the number of bound terms in the BatchState ctor
             // (and no value would be really correct, so we prefer passing a clearly wrong one).
             BatchStatement batch = new BatchStatement(-1, batchType, statements, Attributes.none());
-            long fqlTime = 0;
-            if (fullQueryLogEnabled)
-            {
-                fqlTime = System.currentTimeMillis();
-            }
+
+            long fqlTime = isLoggingEnabled ? System.currentTimeMillis() : 0;
             Message.Response response = handler.processBatch(batch, state, batchOptions, getCustomPayload(), queryStartNanoTime);
-            if (fullQueryLogEnabled)
+
+            if (isLoggingEnabled)
             {
-                FullQueryLogger.instance.logBatch(batchType.name(), queryStrings, values, options, fqlTime);
+                auditLogManager.logBatch(batchType.name(), queryOrIdList, values, prepared, options, state, fqlTime);
             }
 
+
             if (tracingId != null)
                 response.setTracingId(tracingId);
 
@@ -245,6 +241,16 @@ public class BatchMessage extends Message.Request
         }
         catch (Exception e)
         {
+            if (auditLogEnabled)
+            {
+                AuditLogEntry entry = new AuditLogEntry.Builder(state.getClientState())
+                                      .setOperation(getAuditString())
+                                      .setOptions(options)
+                                      .setType(AuditLogEntryType.BATCH)
+                                      .build();
+                auditLogManager.log(entry, e);
+            }
+
             JVMStabilityInspector.inspectThrowable(e);
             return ErrorMessage.fromException(e);
         }
@@ -267,4 +273,13 @@ public class BatchMessage extends Message.Request
         sb.append("] at consistency ").append(options.getConsistency());
         return sb.toString();
     }
+
+    private String getAuditString()
+    {
+        StringBuilder sb = new StringBuilder();
+        sb.append("BATCH of [");
+        sb.append(queryOrIdList.size());
+        sb.append("] statements at consistency ").append(options.getConsistency());
+        return sb.toString();
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
index e969134..cd7f300 100644
--- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
@@ -20,23 +20,24 @@ package org.apache.cassandra.transport.messages;
 import java.util.UUID;
 
 import com.google.common.collect.ImmutableMap;
-import io.netty.buffer.ByteBuf;
 
+import io.netty.buffer.ByteBuf;
+import org.apache.cassandra.audit.AuditLogEntry;
+import org.apache.cassandra.audit.AuditLogManager;
 import org.apache.cassandra.cql3.CQLStatement;
 import org.apache.cassandra.cql3.ColumnSpecification;
 import org.apache.cassandra.cql3.QueryHandler;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.ResultSet;
-import org.apache.cassandra.cql3.statements.BatchStatement;
-import org.apache.cassandra.cql3.statements.ModificationStatement;
 import org.apache.cassandra.cql3.statements.ParsedStatement;
-import org.apache.cassandra.cql3.statements.UpdateStatement;
-import org.apache.cassandra.db.fullquerylog.FullQueryLogger;
 import org.apache.cassandra.exceptions.PreparedQueryNotFoundException;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.transport.*;
+import org.apache.cassandra.transport.CBUtil;
+import org.apache.cassandra.transport.Message;
+import org.apache.cassandra.transport.ProtocolException;
+import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.MD5Digest;
 import org.apache.cassandra.utils.UUIDGen;
@@ -163,16 +164,21 @@ public class ExecuteMessage extends Message.Request
             // Some custom QueryHandlers are interested by the bound names. We provide them this information
             // by wrapping the QueryOptions.
             QueryOptions queryOptions = QueryOptions.addColumnSpecifications(options, prepared.boundNames);
-            boolean fqlEnabled = FullQueryLogger.instance.enabled();
-            long fqlTime = 0;
-            if (fqlEnabled)
-            {
-                fqlTime = System.currentTimeMillis();
-            }
+
+            long fqlTime = isLoggingEnabled ? System.currentTimeMillis() : 0;
             Message.Response response = handler.processPrepared(statement, state, queryOptions, getCustomPayload(), queryStartNanoTime);
-            if (fqlEnabled)
+
+            if (isLoggingEnabled)
             {
-                FullQueryLogger.instance.logQuery(prepared.rawCQLStatement, options, fqlTime);
+                AuditLogEntry auditEntry = new AuditLogEntry.Builder(state.getClientState())
+                                           .setType(statement.getAuditLogContext().auditLogEntryType)
+                                           .setOperation(prepared.rawCQLStatement)
+                                           .setTimestamp(fqlTime)
+                                           .setScope(statement)
+                                           .setKeyspace(state, statement)
+                                           .setOptions(options)
+                                           .build();
+                AuditLogManager.getInstance().log(auditEntry);
             }
 
             if (response instanceof ResultMessage.Rows)
@@ -212,6 +218,33 @@ public class ExecuteMessage extends Message.Request
         }
         catch (Exception e)
         {
+            if (auditLogEnabled)
+            {
+                if (e instanceof PreparedQueryNotFoundException)
+                {
+                    AuditLogEntry auditLogEntry = new AuditLogEntry.Builder(state.getClientState())
+                                                  .setOperation(toString())
+                                                  .setOptions(options)
+                                                  .build();
+                    auditLogManager.log(auditLogEntry, e);
+                }
+                else
+                {
+                    ParsedStatement.Prepared prepared = ClientState.getCQLQueryHandler().getPrepared(statementId);
+                    if (prepared != null)
+                    {
+                        AuditLogEntry auditLogEntry = new AuditLogEntry.Builder(state.getClientState())
+                                                      .setOperation(toString())
+                                                      .setType(prepared.statement.getAuditLogContext().auditLogEntryType)
+                                                      .setScope(prepared.statement)
+                                                      .setKeyspace(state, prepared.statement)
+                                                      .setOptions(options)
+                                                      .build();
+                        auditLogManager.log(auditLogEntry, e);
+                    }
+                }
+            }
+
             JVMStabilityInspector.inspectThrowable(e);
             return ErrorMessage.fromException(e);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
index b6bf055..e5e5248 100644
--- a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
@@ -20,12 +20,18 @@ package org.apache.cassandra.transport.messages;
 import java.util.UUID;
 
 import com.google.common.collect.ImmutableMap;
-import io.netty.buffer.ByteBuf;
 
+import io.netty.buffer.ByteBuf;
+import org.apache.cassandra.audit.AuditLogEntry;
+import org.apache.cassandra.audit.AuditLogEntryType;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.statements.ParsedStatement;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.transport.*;
+import org.apache.cassandra.transport.CBUtil;
+import org.apache.cassandra.transport.Message;
+import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.UUIDGen;
 
@@ -111,6 +117,17 @@ public class PrepareMessage extends Message.Request
             Message.Response response = ClientState.getCQLQueryHandler().prepare(query,
                                                                                  state.getClientState().cloneWithKeyspaceIfSet(keyspace),
                                                                                  getCustomPayload());
+            if (auditLogEnabled)
+            {
+                ParsedStatement.Prepared parsedStmt = QueryProcessor.parseStatement(query, state.getClientState());
+                AuditLogEntry auditLogEntry = new AuditLogEntry.Builder(state.getClientState())
+                                              .setOperation(query)
+                                              .setType(AuditLogEntryType.PREPARE_STATEMENT)
+                                              .setScope(parsedStmt.statement)
+                                              .setKeyspace(parsedStmt.statement)
+                                              .build();
+                auditLogManager.log(auditLogEntry);
+            }
 
             if (tracingId != null)
                 response.setTracingId(tracingId);
@@ -119,6 +136,15 @@ public class PrepareMessage extends Message.Request
         }
         catch (Exception e)
         {
+            if (auditLogEnabled)
+            {
+                AuditLogEntry auditLogEntry = new AuditLogEntry.Builder(state.getClientState())
+                                              .setOperation(query)
+                                              .setKeyspace(keyspace)
+                                              .setType(AuditLogEntryType.PREPARE_STATEMENT)
+                                              .build();
+                auditLogManager.log(auditLogEntry, e);
+            }
             JVMStabilityInspector.inspectThrowable(e);
             return ErrorMessage.fromException(e);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
index 8f64033..9df9205 100644
--- a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
@@ -22,8 +22,11 @@ import java.util.UUID;
 import com.google.common.collect.ImmutableMap;
 
 import io.netty.buffer.ByteBuf;
+import org.apache.cassandra.audit.AuditLogEntry;
+import org.apache.cassandra.audit.AuditLogManager;
 import org.apache.cassandra.cql3.QueryOptions;
-import org.apache.cassandra.db.fullquerylog.FullQueryLogger;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.statements.ParsedStatement;
 import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.exceptions.RequestValidationException;
 import org.apache.cassandra.service.ClientState;
@@ -114,17 +117,24 @@ public class QueryMessage extends Message.Request
                 Tracing.instance.begin("Execute CQL3 query", state.getClientAddress(), builder.build());
             }
 
-            boolean fqlEnabled = FullQueryLogger.instance.enabled();
-            long fqlTime = 0;
-            if (fqlEnabled)
-            {
-                fqlTime = System.currentTimeMillis();
-            }
+            long fqlTime = isLoggingEnabled ? System.currentTimeMillis() : 0;
             Message.Response response = ClientState.getCQLQueryHandler().process(query, state, options, getCustomPayload(), queryStartNanoTime);
-            if (fqlEnabled)
+
+            if (isLoggingEnabled)
             {
-                FullQueryLogger.instance.logQuery(query, options, fqlTime);
+                ParsedStatement.Prepared parsedStatement = QueryProcessor.parseStatement(query, state.getClientState());
+                AuditLogEntry auditEntry = new AuditLogEntry.Builder(state.getClientState())
+                                           .setType(parsedStatement.statement.getAuditLogContext().auditLogEntryType)
+                                           .setOperation(query)
+                                           .setTimestamp(fqlTime)
+                                           .setScope(parsedStatement.statement)
+                                           .setKeyspace(state, parsedStatement.statement)
+                                           .setOptions(options)
+                                           .build();
+                AuditLogManager.getInstance().log(auditEntry);
+
             }
+
             if (options.skipMetadata() && response instanceof ResultMessage.Rows)
                 ((ResultMessage.Rows)response).result.metadata.setSkipMetadata();
 
@@ -135,6 +145,14 @@ public class QueryMessage extends Message.Request
         }
         catch (Exception e)
         {
+            if (auditLogEnabled)
+            {
+                AuditLogEntry auditLogEntry = new AuditLogEntry.Builder(state.getClientState())
+                                              .setOperation(query)
+                                              .setOptions(options)
+                                              .build();
+                auditLogManager.log(auditLogEntry, e);
+            }
             JVMStabilityInspector.inspectThrowable(e);
             if (!((e instanceof RequestValidationException) || (e instanceof RequestExecutionException)))
                 logger.error("Unexpected error during query", e);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index 078b414..b621071 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -40,6 +40,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.auth.AllowAllNetworkAuthorizer;
+import org.apache.cassandra.audit.IAuditLogger;
 import org.apache.cassandra.auth.IAuthenticator;
 import org.apache.cassandra.auth.IAuthorizer;
 import org.apache.cassandra.auth.INetworkAuthorizer;
@@ -526,6 +527,29 @@ public class FBUtilities
         }
         return FBUtilities.construct(className, "network authorizer");
     }
+    
+    public static IAuditLogger newAuditLogger(String className) throws ConfigurationException
+    {
+        if (!className.contains("."))
+            className = "org.apache.cassandra.audit." + className;
+        return FBUtilities.construct(className, "Audit logger");
+    }
+
+    public static boolean isAuditLoggerClassExists(String className)
+    {
+        if (!className.contains("."))
+            className = "org.apache.cassandra.audit." + className;
+
+        try
+        {
+            FBUtilities.classForName(className, "Audit logger");
+        }
+        catch (ConfigurationException e)
+        {
+            return false;
+        }
+        return true;
+    }
 
     /**
      * @return The Class for the given name.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/utils/binlog/BinLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/binlog/BinLog.java b/src/java/org/apache/cassandra/utils/binlog/BinLog.java
index 070a151..0c8659e 100644
--- a/src/java/org/apache/cassandra/utils/binlog/BinLog.java
+++ b/src/java/org/apache/cassandra/utils/binlog/BinLog.java
@@ -272,6 +272,6 @@ public class BinLog implements Runnable, StoreFileListener
 
     public abstract static class ReleaseableWriteMarshallable implements WriteMarshallable
     {
-        protected abstract void release();
+        public abstract void release();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/test/unit/org/apache/cassandra/audit/AuditLogFilterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/audit/AuditLogFilterTest.java b/test/unit/org/apache/cassandra/audit/AuditLogFilterTest.java
new file mode 100644
index 0000000..8054f90
--- /dev/null
+++ b/test/unit/org/apache/cassandra/audit/AuditLogFilterTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.audit;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.cassandra.audit.AuditLogFilter.isFiltered;
+
+public class AuditLogFilterTest
+{
+    @Test
+    public void isFiltered_IncludeSetOnly()
+    {
+        Set<String> includeSet = new HashSet<>();
+        includeSet.add("a");
+        includeSet.add("b");
+        includeSet.add("c");
+
+        Set<String> excludeSet = new HashSet<>();
+
+        Assert.assertFalse(isFiltered("a", includeSet, excludeSet));
+        Assert.assertFalse(isFiltered("b", includeSet, excludeSet));
+        Assert.assertFalse(isFiltered("c", includeSet, excludeSet));
+        Assert.assertTrue(isFiltered("d", includeSet, excludeSet));
+        Assert.assertTrue(isFiltered("e", includeSet, excludeSet));
+    }
+
+    @Test
+    public void isFiltered_ExcludeSetOnly()
+    {
+        Set<String> includeSet = new HashSet<>();
+
+        Set<String> excludeSet = new HashSet<>();
+        excludeSet.add("a");
+        excludeSet.add("b");
+        excludeSet.add("c");
+
+        Assert.assertTrue(isFiltered("a", includeSet, excludeSet));
+        Assert.assertTrue(isFiltered("b", includeSet, excludeSet));
+        Assert.assertTrue(isFiltered("c", includeSet, excludeSet));
+        Assert.assertFalse(isFiltered("d", includeSet, excludeSet));
+        Assert.assertFalse(isFiltered("e", includeSet, excludeSet));
+    }
+
+    @Test
+    public void isFiltered_MutualExclusive()
+    {
+        Set<String> includeSet = new HashSet<>();
+        includeSet.add("a");
+        includeSet.add("b");
+        includeSet.add("c");
+
+        Set<String> excludeSet = new HashSet<>();
+        excludeSet.add("a");
+
+        Assert.assertTrue(isFiltered("a", includeSet, excludeSet));
+        Assert.assertFalse(isFiltered("b", includeSet, excludeSet));
+        Assert.assertFalse(isFiltered("c", includeSet, excludeSet));
+        Assert.assertTrue(isFiltered("e", includeSet, excludeSet));
+    }
+
+    @Test
+    public void isFiltered_MutualInclusive()
+    {
+        Set<String> includeSet = new HashSet<>();
+        includeSet.add("a");
+        includeSet.add("b");
+
+        Set<String> excludeSet = new HashSet<>();
+        excludeSet.add("c");
+        excludeSet.add("d");
+
+        Assert.assertFalse(isFiltered("a", includeSet, excludeSet));
+        Assert.assertFalse(isFiltered("b", includeSet, excludeSet));
+        Assert.assertTrue(isFiltered("c", includeSet, excludeSet));
+        Assert.assertTrue(isFiltered("d", includeSet, excludeSet));
+        Assert.assertTrue(isFiltered("e", includeSet, excludeSet));
+        Assert.assertTrue(isFiltered("f", includeSet, excludeSet));
+    }
+
+    @Test
+    public void isFiltered_UnSpecifiedInput()
+    {
+        Set<String> includeSet = new HashSet<>();
+        includeSet.add("a");
+        includeSet.add("b");
+        includeSet.add("c");
+
+        Set<String> excludeSet = new HashSet<>();
+        excludeSet.add("a");
+
+        Assert.assertTrue(isFiltered("a", includeSet, excludeSet));
+        Assert.assertFalse(isFiltered("b", includeSet, excludeSet));
+        Assert.assertFalse(isFiltered("c", includeSet, excludeSet));
+        Assert.assertTrue(isFiltered("d", includeSet, excludeSet));
+        Assert.assertTrue(isFiltered("e", includeSet, excludeSet));
+    }
+
+    @Test
+    public void isFiltered_SpecifiedInput()
+    {
+        Set<String> includeSet = new HashSet<>();
+        includeSet.add("a");
+        includeSet.add("b");
+        includeSet.add("c");
+
+        Set<String> excludeSet = new HashSet<>();
+        excludeSet.add("a");
+
+        Assert.assertTrue(isFiltered("a", includeSet, excludeSet));
+        Assert.assertFalse(isFiltered("b", includeSet, excludeSet));
+        Assert.assertFalse(isFiltered("c", includeSet, excludeSet));
+    }
+
+    @Test
+    public void isFiltered_FilteredInput_EmptyInclude()
+    {
+        Set<String> includeSet = new HashSet<>();
+        Set<String> excludeSet = new HashSet<>();
+        excludeSet.add("a");
+
+        Assert.assertTrue(isFiltered("a", includeSet, excludeSet));
+        Assert.assertFalse(isFiltered("b", includeSet, excludeSet));
+    }
+
+    @Test
+    public void isFiltered_FilteredInput_EmptyExclude()
+    {
+        Set<String> includeSet = new HashSet<>();
+        includeSet.add("a");
+        includeSet.add("b");
+        includeSet.add("c");
+
+        Set<String> excludeSet = new HashSet<>();
+
+        Assert.assertFalse(isFiltered("a", includeSet, excludeSet));
+        Assert.assertFalse(isFiltered("b", includeSet, excludeSet));
+        Assert.assertFalse(isFiltered("c", includeSet, excludeSet));
+        Assert.assertTrue(isFiltered("e", includeSet, excludeSet));
+    }
+
+    @Test
+    public void isFiltered_EmptyInputs()
+    {
+        Set<String> includeSet = new HashSet<>();
+        Set<String> excludeSet = new HashSet<>();
+
+        Assert.assertFalse(isFiltered("a", includeSet, excludeSet));
+        Assert.assertFalse(isFiltered("e", includeSet, excludeSet));
+    }
+
+    @Test
+    public void isFiltered_NullInputs()
+    {
+        Set<String> includeSet = new HashSet<>();
+        Set<String> excludeSet = new HashSet<>();
+        Assert.assertFalse(isFiltered(null, includeSet, excludeSet));
+
+        includeSet.add("a");
+        includeSet.add("b");
+        includeSet.add("c");
+        Assert.assertTrue(isFiltered(null, includeSet, excludeSet));
+
+        includeSet = new HashSet<>();
+        excludeSet.add("a");
+        excludeSet.add("b");
+        Assert.assertFalse(isFiltered(null, includeSet, excludeSet));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/test/unit/org/apache/cassandra/audit/AuditLoggerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/audit/AuditLoggerTest.java b/test/unit/org/apache/cassandra/audit/AuditLoggerTest.java
new file mode 100644
index 0000000..40eadf8
--- /dev/null
+++ b/test/unit/org/apache/cassandra/audit/AuditLoggerTest.java
@@ -0,0 +1,690 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.audit;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.datastax.driver.core.exceptions.SyntaxError;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.service.StorageService;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+
+public class AuditLoggerTest extends CQLTester
+{
+    @BeforeClass
+    public static void setUp()
+    {
+        AuditLogOptions options = new AuditLogOptions();
+        options.enabled = true;
+        options.logger = "InMemoryAuditLogger";
+        DatabaseDescriptor.setAuditLoggingOptions(options);
+        requireNetwork();
+    }
+
+    @Before
+    public void beforeTestMethod()
+    {
+        AuditLogOptions options = new AuditLogOptions();
+        enableAuditLogOptions(options);
+    }
+
+    private void enableAuditLogOptions(AuditLogOptions options)
+    {
+        String loggerName = "InMemoryAuditLogger";
+        String includedKeyspaces = options.included_keyspaces;
+        String excludedKeyspaces = options.excluded_keyspaces;
+        String includedCategories = options.included_categories;
+        String excludedCategories = options.excluded_categories;
+        String includedUsers = options.included_users;
+        String excludedUsers = options.excluded_users;
+
+        StorageService.instance.enableAuditLog(loggerName, includedKeyspaces, excludedKeyspaces, includedCategories, excludedCategories, includedUsers, excludedUsers);
+    }
+
+    private void disableAuditLogOptions()
+    {
+        StorageService.instance.disableAuditLog();
+    }
+
+    @Test
+    public void testAuditLogFilters() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+        execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 1, "Apache", "Cassandra");
+        execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 2, "trace", "test");
+
+        AuditLogOptions options = new AuditLogOptions();
+        options.excluded_keyspaces = KEYSPACE;
+        enableAuditLogOptions(options);
+
+        String cql = "SELECT id, v1, v2 FROM " + KEYSPACE + '.' + currentTable() + " WHERE id = ?";
+        ResultSet rs = executeAndAssertNoAuditLog(cql, 1);
+        assertEquals(1, rs.all().size());
+
+        options = new AuditLogOptions();
+        options.included_keyspaces = KEYSPACE;
+        enableAuditLogOptions(options);
+
+        cql = "SELECT id, v1, v2 FROM " + KEYSPACE + '.' + currentTable() + " WHERE id = ?";
+        rs = executeAndAssertWithPrepare(cql, AuditLogEntryType.SELECT, 1);
+        assertEquals(1, rs.all().size());
+
+        options = new AuditLogOptions();
+        options.included_keyspaces = KEYSPACE;
+        options.excluded_keyspaces = KEYSPACE;
+        enableAuditLogOptions(options);
+
+        cql = "SELECT id, v1, v2 FROM " + KEYSPACE + '.' + currentTable() + " WHERE id = ?";
+        rs = executeAndAssertNoAuditLog(cql, 1);
+        assertEquals(1, rs.all().size());
+
+        options = new AuditLogOptions();
+        enableAuditLogOptions(options);
+
+        cql = "SELECT id, v1, v2 FROM " + KEYSPACE + '.' + currentTable() + " WHERE id = ?";
+        rs = executeAndAssertWithPrepare(cql, AuditLogEntryType.SELECT, 1);
+        assertEquals(1, rs.all().size());
+    }
+
+    @Test
+    public void testAuditLogFiltersTransitions() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+        execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 1, "Apache", "Cassandra");
+        execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 2, "trace", "test");
+
+        AuditLogOptions options = new AuditLogOptions();
+        options.excluded_keyspaces = KEYSPACE;
+        enableAuditLogOptions(options);
+
+        String cql = "SELECT id, v1, v2 FROM " + KEYSPACE + '.' + currentTable() + " WHERE id = ?";
+        ResultSet rs = executeAndAssertNoAuditLog(cql, 1);
+        assertEquals(1, rs.all().size());
+
+        disableAuditLogOptions();
+
+        cql = "SELECT id, v1, v2 FROM " + KEYSPACE + '.' + currentTable() + " WHERE id = ?";
+        rs = executeAndAssertDisableAuditLog(cql, 1);
+        assertEquals(1, rs.all().size());
+
+        options = new AuditLogOptions();
+        options.included_keyspaces = KEYSPACE;
+        options.excluded_keyspaces = KEYSPACE;
+        enableAuditLogOptions(options);
+
+        cql = "SELECT id, v1, v2 FROM " + KEYSPACE + '.' + currentTable() + " WHERE id = ?";
+        rs = executeAndAssertNoAuditLog(cql, 1);
+        assertEquals(1, rs.all().size());
+
+        disableAuditLogOptions();
+
+        cql = "SELECT id, v1, v2 FROM " + KEYSPACE + '.' + currentTable() + " WHERE id = ?";
+        rs = executeAndAssertDisableAuditLog(cql, 1);
+        assertEquals(1, rs.all().size());
+    }
+
+    @Test
+    public void testAuditLogExceptions()
+    {
+        AuditLogOptions options = new AuditLogOptions();
+        options.excluded_keyspaces = KEYSPACE;
+        enableAuditLogOptions(options);
+        Assert.assertTrue(AuditLogManager.getInstance().isAuditingEnabled());
+
+        disableAuditLogOptions();
+    }
+
+    @Test
+    public void testAuditLogFilterIncludeExclude() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+        String tbl1 = currentTable();
+        execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 1, "Apache", "Cassandra");
+        execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 2, "trace", "test");
+
+        AuditLogOptions options = new AuditLogOptions();
+        options.excluded_categories = "QUERY";
+        options.included_categories = "QUERY,DML,PREPARE";
+        enableAuditLogOptions(options);
+
+        //QUERY - Should be filtered, part of excluded categories,
+        String cql = "SELECT id, v1, v2 FROM " + KEYSPACE + '.' + currentTable() + " WHERE id = 1";
+        Session session = sessionNet();
+        ResultSet rs = session.execute(cql);
+
+        assertEquals(0, ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.size());
+        assertEquals(1, rs.all().size());
+
+        //DML - Should not be filtered, part of included categories
+        cql = "INSERT INTO " + KEYSPACE + '.' + currentTable() + " (id, v1, v2) VALUES (?, ?, ?)";
+        executeAndAssertWithPrepare(cql, AuditLogEntryType.UPDATE, 1, "insert_audit", "test");
+
+        //DDL - Should be filtered, not part of included categories
+        cql = "ALTER TABLE  " + KEYSPACE + '.' + currentTable() + " ADD v3 text";
+        session = sessionNet();
+        rs = session.execute(cql);
+        assertEquals(0, ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.size());
+    }
+
+    @Test
+    public void testCqlSelectAuditing() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+        execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 1, "Apache", "Cassandra");
+        execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 2, "trace", "test");
+
+        String cql = "SELECT id, v1, v2 FROM " + KEYSPACE + '.' + currentTable() + " WHERE id = ?";
+        ResultSet rs = executeAndAssertWithPrepare(cql, AuditLogEntryType.SELECT, 1);
+
+        assertEquals(1, rs.all().size());
+    }
+
+    @Test
+    public void testCqlInsertAuditing() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+
+        String cql = "INSERT INTO " + KEYSPACE + '.' + currentTable() + "  (id, v1, v2) VALUES (?, ?, ?)";
+        executeAndAssertWithPrepare(cql, AuditLogEntryType.UPDATE, 1, "insert_audit", "test");
+    }
+
+    @Test
+    public void testCqlUpdateAuditing() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+        execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 1, "Apache", "Cassandra");
+        execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 2, "trace", "test");
+
+        String cql = "UPDATE " + KEYSPACE + '.' + currentTable() + "  SET v1 = 'ApacheCassandra' WHERE id = 1";
+        executeAndAssert(cql, AuditLogEntryType.UPDATE);
+
+        cql = "UPDATE " + KEYSPACE + '.' + currentTable() + "  SET v1 = ? WHERE id = ?";
+        executeAndAssertWithPrepare(cql, AuditLogEntryType.UPDATE, "AuditingTest", 2);
+    }
+
+    @Test
+    public void testCqlDeleteAuditing() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+        execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 1, "Apache", "Cassandra");
+        execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 2, "trace", "test");
+
+        String cql = "DELETE FROM " + KEYSPACE + '.' + currentTable() + " WHERE id = ?";
+        executeAndAssertWithPrepare(cql, AuditLogEntryType.DELETE, 1);
+    }
+
+    @Test
+    public void testCqlTruncateAuditing() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+        execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 1, "Apache", "Cassandra");
+        execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 2, "trace", "test");
+
+        String cql = "TRUNCATE TABLE  " + KEYSPACE + '.' + currentTable();
+        executeAndAssertWithPrepare(cql, AuditLogEntryType.TRUNCATE);
+    }
+
+    @Test
+    public void testCqlBatchAuditing() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+
+        Session session = sessionNet();
+
+        BatchStatement batchStatement = new BatchStatement();
+
+        String cqlInsert = "INSERT INTO " + KEYSPACE + "." + currentTable() + " (id, v1, v2) VALUES (?, ?, ?)";
+        PreparedStatement prep = session.prepare(cqlInsert);
+        AuditLogEntry logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+        assertLogEntry(cqlInsert, AuditLogEntryType.PREPARE_STATEMENT, logEntry, false);
+
+        batchStatement.add(prep.bind(1, "Apapche", "Cassandra"));
+        batchStatement.add(prep.bind(2, "Apapche1", "Cassandra1"));
+
+        String cqlUpdate = "UPDATE " + KEYSPACE + "." + currentTable() + " SET v1 = ? WHERE id = ?";
+        prep = session.prepare(cqlUpdate);
+        logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+        assertLogEntry(cqlUpdate, AuditLogEntryType.PREPARE_STATEMENT, logEntry, false);
+
+        batchStatement.add(prep.bind("Apache Cassandra", 1));
+
+        String cqlDelete = "DELETE FROM " + KEYSPACE + "." + currentTable() + " WHERE id = ?";
+        prep = session.prepare(cqlDelete);
+        logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+        assertLogEntry(cqlDelete, AuditLogEntryType.PREPARE_STATEMENT, logEntry, false);
+
+        batchStatement.add(prep.bind(1));
+
+        ResultSet rs = session.execute(batchStatement);
+
+        assertEquals(5, ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.size());
+        logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+
+        logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+        assertLogEntry(cqlInsert, AuditLogEntryType.UPDATE, logEntry, false);
+
+        logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+        assertLogEntry(cqlInsert, AuditLogEntryType.UPDATE, logEntry, false);
+
+        logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+        assertLogEntry(cqlUpdate, AuditLogEntryType.UPDATE, logEntry, false);
+
+        logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+        assertLogEntry(cqlDelete, AuditLogEntryType.DELETE, logEntry, false);
+
+        int size = rs.all().size();
+
+        assertEquals(0, size);
+    }
+
+    @Test
+    public void testCqlBatch_MultipleTablesAuditing()
+    {
+        createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+        String table1 = currentTable();
+
+        Session session = sessionNet();
+
+        BatchStatement batchStatement = new BatchStatement();
+
+        String cqlInsert1 = "INSERT INTO " + KEYSPACE + "." + table1 + " (id, v1, v2) VALUES (?, ?, ?)";
+        PreparedStatement prep = session.prepare(cqlInsert1);
+        AuditLogEntry logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+        assertLogEntry(cqlInsert1, AuditLogEntryType.PREPARE_STATEMENT, logEntry, false);
+
+        batchStatement.add(prep.bind(1, "Apapche", "Cassandra"));
+
+        createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+        String table2 = currentTable();
+
+        String cqlInsert2 = "INSERT INTO " + KEYSPACE + "." + table2 + " (id, v1, v2) VALUES (?, ?, ?)";
+        prep = session.prepare(cqlInsert2);
+        logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+        assertLogEntry(cqlInsert2, AuditLogEntryType.PREPARE_STATEMENT, logEntry, false);
+
+        batchStatement.add(prep.bind(1, "Apapche", "Cassandra"));
+
+        createKeyspace("CREATE KEYSPACE %s WITH replication={ 'class' : 'SimpleStrategy', 'replication_factor' : 1 }");
+        String ks2 = currentKeyspace();
+
+        createTable(ks2, "CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+        String table3 = currentTable();
+
+        String cqlInsert3 = "INSERT INTO " + ks2 + "." + table3 + " (id, v1, v2) VALUES (?, ?, ?)";
+        prep = session.prepare(cqlInsert3);
+        logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+        assertLogEntry(cqlInsert3, AuditLogEntryType.PREPARE_STATEMENT, logEntry, false, ks2);
+
+        batchStatement.add(prep.bind(1, "Apapche", "Cassandra"));
+
+        ResultSet rs = session.execute(batchStatement);
+
+        assertEquals(4, ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.size());
+        logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+
+        logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+        assertLogEntry(cqlInsert1, table1, AuditLogEntryType.UPDATE, logEntry, false, KEYSPACE);
+
+        logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+        assertLogEntry(cqlInsert2, table2, AuditLogEntryType.UPDATE, logEntry, false, KEYSPACE);
+
+        logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+        assertLogEntry(cqlInsert3, table3, AuditLogEntryType.UPDATE, logEntry, false, ks2);
+
+        int size = rs.all().size();
+
+        assertEquals(0, size);
+    }
+
+    @Test
+    public void testCqlKeyspaceAuditing() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+
+        String cql = "CREATE KEYSPACE " + createKeyspaceName() + " WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 2}  ";
+        executeAndAssert(cql, AuditLogEntryType.CREATE_KEYSPACE, true, currentKeyspace());
+
+        cql = "CREATE KEYSPACE IF NOT EXISTS " + createKeyspaceName() + " WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 2}  ";
+        executeAndAssert(cql, AuditLogEntryType.CREATE_KEYSPACE, true, currentKeyspace());
+
+        cql = "ALTER KEYSPACE " + currentKeyspace() + " WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 2}  ";
+        executeAndAssert(cql, AuditLogEntryType.ALTER_KEYSPACE, true, currentKeyspace());
+
+        cql = "DROP KEYSPACE " + currentKeyspace();
+        executeAndAssert(cql, AuditLogEntryType.DROP_KEYSPACE, true, currentKeyspace());
+    }
+
+    @Test
+    public void testCqlTableAuditing() throws Throwable
+    {
+        String cql = "CREATE TABLE " + KEYSPACE + "." + createTableName() + " (id int primary key, v1 text, v2 text)";
+        executeAndAssert(cql, AuditLogEntryType.CREATE_TABLE);
+
+        cql = "CREATE TABLE IF NOT EXISTS " + KEYSPACE + "." + createTableName() + " (id int primary key, v1 text, v2 text)";
+        executeAndAssert(cql, AuditLogEntryType.CREATE_TABLE);
+
+        cql = "ALTER TABLE " + KEYSPACE + "." + currentTable() + " ADD v3 text";
+        executeAndAssert(cql, AuditLogEntryType.ALTER_TABLE);
+
+        cql = "DROP TABLE " + KEYSPACE + "." + currentTable();
+        executeAndAssert(cql, AuditLogEntryType.DROP_TABLE);
+    }
+
+    @Test
+    public void testCqlMVAuditing() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+        execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 1, "Apache", "Cassandra");
+        execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 2, "trace", "test");
+
+        String tblName = currentTable();
+        String cql = "CREATE MATERIALIZED VIEW " + KEYSPACE + "." + createTableName() + " AS SELECT id,v1 FROM " + KEYSPACE + "." + tblName + " WHERE id IS NOT NULL AND v1 IS NOT NULL PRIMARY KEY ( id, v1 ) ";
+        executeAndAssert(cql, AuditLogEntryType.CREATE_VIEW);
+
+        cql = "CREATE MATERIALIZED VIEW IF NOT EXISTS " + KEYSPACE + "." + currentTable() + " AS SELECT id,v1 FROM " + KEYSPACE + "." + tblName + " WHERE id IS NOT NULL AND v1 IS NOT NULL PRIMARY KEY ( id, v1 ) ";
+        executeAndAssert(cql, AuditLogEntryType.CREATE_VIEW);
+
+        cql = "ALTER MATERIALIZED VIEW " + KEYSPACE + "." + currentTable() + " WITH caching = {  'keys' : 'NONE' };";
+        executeAndAssert(cql, AuditLogEntryType.ALTER_VIEW);
+
+        cql = "DROP MATERIALIZED VIEW " + KEYSPACE + "." + currentTable();
+        executeAndAssert(cql, AuditLogEntryType.DROP_VIEW);
+    }
+
+    @Test
+    public void testCqlTypeAuditing() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+
+        String tblName = createTableName();
+
+        String cql = "CREATE TYPE " + KEYSPACE + "." + tblName + " (id int, v1 text, v2 text)";
+        executeAndAssert(cql, AuditLogEntryType.CREATE_TYPE);
+
+        cql = "CREATE TYPE IF NOT EXISTS " + KEYSPACE + "." + tblName + " (id int, v1 text, v2 text)";
+        executeAndAssert(cql, AuditLogEntryType.CREATE_TYPE);
+
+        cql = "ALTER TYPE " + KEYSPACE + "." + tblName + " ADD v3 int";
+        executeAndAssert(cql, AuditLogEntryType.ALTER_TYPE);
+
+        cql = "ALTER TYPE " + KEYSPACE + "." + tblName + " RENAME v3 TO v4";
+        executeAndAssert(cql, AuditLogEntryType.ALTER_TYPE);
+
+        cql = "DROP TYPE " + KEYSPACE + "." + tblName;
+        executeAndAssert(cql, AuditLogEntryType.DROP_TYPE);
+
+        cql = "DROP TYPE IF EXISTS " + KEYSPACE + "." + tblName;
+        executeAndAssert(cql, AuditLogEntryType.DROP_TYPE);
+    }
+
+    @Test
+    public void testCqlIndexAuditing() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+
+        String tblName = currentTable();
+
+        String indexName = createTableName();
+
+        String cql = "CREATE INDEX " + indexName + " ON " + KEYSPACE + "." + tblName + " (v1)";
+        executeAndAssert(cql, AuditLogEntryType.CREATE_INDEX);
+
+        cql = "DROP INDEX " + KEYSPACE + "." + indexName;
+        executeAndAssert(cql, AuditLogEntryType.DROP_INDEX);
+    }
+
+    @Test
+    public void testCqlFunctionAuditing() throws Throwable
+    {
+        String tblName = createTableName();
+
+        String cql = "CREATE FUNCTION IF NOT EXISTS  " + KEYSPACE + "." + tblName + " (column TEXT,num int) RETURNS NULL ON NULL INPUT RETURNS text LANGUAGE javascript AS $$ column.substring(0,num) $$";
+        executeAndAssert(cql, AuditLogEntryType.CREATE_FUNCTION);
+
+        cql = "DROP FUNCTION " + KEYSPACE + "." + tblName;
+        executeAndAssert(cql, AuditLogEntryType.DROP_FUNCTION);
+    }
+
+    @Test
+    public void testCqlTriggerAuditing() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+
+        String tblName = currentTable();
+        String triggerName = createTableName();
+
+        String cql = "DROP TRIGGER IF EXISTS " + triggerName + " ON " + KEYSPACE + "." + tblName;
+        executeAndAssert(cql, AuditLogEntryType.DROP_TRIGGER);
+    }
+
+    @Test
+    public void testCqlAggregateAuditing() throws Throwable
+    {
+        String aggName = createTableName();
+        String cql = "DROP AGGREGATE IF EXISTS " + KEYSPACE + "." + aggName;
+        executeAndAssert(cql, AuditLogEntryType.DROP_AGGREGATE);
+    }
+
+    @Test
+    public void testCqlQuerySyntaxError()
+    {
+        String cql = "INSERT INTO " + KEYSPACE + '.' + currentTable() + "1 (id, v1, v2) VALUES (1, 'insert_audit, 'test')";
+        try
+        {
+            createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+            Session session = sessionNet();
+            ResultSet rs = session.execute(cql);
+            Assert.fail("should not succeed");
+        }
+        catch (SyntaxError e)
+        {
+            // nop
+        }
+
+        AuditLogEntry logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+        assertLogEntry(logEntry, cql);
+        assertEquals(0, ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.size());
+    }
+
+    @Test
+    public void testCqlSelectQuerySyntaxError()
+    {
+        createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+        String cql = "SELECT * FROM " + KEYSPACE + '.' + currentTable() + " LIMIT 2w";
+
+        try
+        {
+            Session session = sessionNet();
+            ResultSet rs = session.execute(cql);
+            Assert.fail("should not succeed");
+        }
+        catch (SyntaxError e)
+        {
+            // nop
+        }
+
+        AuditLogEntry logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+        assertLogEntry(logEntry, cql);
+        assertEquals(0, ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.size());
+    }
+
+    @Test
+    public void testCqlPrepareQueryError()
+    {
+        createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+        String cql = "INSERT INTO " + KEYSPACE + '.' + currentTable() + " (id, v1, v2) VALUES (?,?,?)";
+        try
+        {
+            Session session = sessionNet();
+
+            PreparedStatement pstmt = session.prepare(cql);
+            AuditLogEntry logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+            assertLogEntry(cql, AuditLogEntryType.PREPARE_STATEMENT, logEntry, false);
+
+            dropTable("DROP TABLE %s");
+            ResultSet rs = session.execute(pstmt.bind(1, "insert_audit", "test"));
+            Assert.fail("should not succeed");
+        }
+        catch (NoHostAvailableException e)
+        {
+            // nop
+        }
+
+        AuditLogEntry logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+        assertLogEntry(logEntry, null);
+        logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+        assertLogEntry(logEntry, cql);
+        assertEquals(0, ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.size());
+    }
+
+    @Test
+    public void testCqlPrepareQuerySyntaxError()
+    {
+        String cql = "INSERT INTO " + KEYSPACE + '.' + "foo" + "(id, v1, v2) VALES (?,?,?)";
+        try
+        {
+            createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+            Session session = sessionNet();
+            PreparedStatement pstmt = session.prepare(cql);
+            ResultSet rs = session.execute(pstmt.bind(1, "insert_audit", "test"));
+            Assert.fail("should not succeed");
+        }
+        catch (SyntaxError e)
+        {
+            // nop
+        }
+        AuditLogEntry logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+        assertLogEntry(logEntry, cql);
+        assertEquals(0, ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.size());
+    }
+
+    /**
+     * Helper methods for Audit Log CQL Testing
+     */
+
+    private ResultSet executeAndAssert(String cql, AuditLogEntryType type) throws Throwable
+    {
+        return executeAndAssert(cql, type, false, KEYSPACE);
+    }
+
+    private ResultSet executeAndAssert(String cql, AuditLogEntryType type, boolean isTableNull, String keyspace) throws Throwable
+    {
+        Session session = sessionNet();
+
+        ResultSet rs = session.execute(cql);
+
+        AuditLogEntry logEntry1 = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+        assertLogEntry(cql, type, logEntry1, isTableNull, keyspace);
+
+        assertEquals(0, ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.size());
+        return rs;
+    }
+
+    private ResultSet executeAndAssertWithPrepare(String cql, AuditLogEntryType exceuteType, Object... bindValues) throws Throwable
+    {
+        return executeAndAssertWithPrepare(cql, exceuteType, false, bindValues);
+    }
+
+    private ResultSet executeAndAssertWithPrepare(String cql, AuditLogEntryType executeType, boolean isTableNull, Object... bindValues) throws Throwable
+    {
+        Session session = sessionNet();
+
+        PreparedStatement pstmt = session.prepare(cql);
+        ResultSet rs = session.execute(pstmt.bind(bindValues));
+
+        AuditLogEntry logEntry1 = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+        assertLogEntry(cql, AuditLogEntryType.PREPARE_STATEMENT, logEntry1, isTableNull);
+
+        AuditLogEntry logEntry2 = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+        assertLogEntry(cql, executeType, logEntry2, isTableNull);
+
+        assertEquals(0, ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.size());
+        return rs;
+    }
+
+    private ResultSet executeAndAssertNoAuditLog(String cql, Object... bindValues)
+    {
+        Session session = sessionNet();
+
+        PreparedStatement pstmt = session.prepare(cql);
+        ResultSet rs = session.execute(pstmt.bind(bindValues));
+
+        assertEquals(0, ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.size());
+        return rs;
+    }
+
+    private ResultSet executeAndAssertDisableAuditLog(String cql, Object... bindValues)
+    {
+        Session session = sessionNet();
+
+        PreparedStatement pstmt = session.prepare(cql);
+        ResultSet rs = session.execute(pstmt.bind(bindValues));
+
+        assertThat(AuditLogManager.getInstance().getLogger(),instanceOf(NoOpAuditLogger.class));
+        return rs;
+    }
+
+    private void assertLogEntry(String cql, AuditLogEntryType type, AuditLogEntry actual, boolean isTableNull)
+    {
+        assertLogEntry(cql, type, actual, isTableNull, KEYSPACE);
+    }
+
+    private void assertLogEntry(String cql, AuditLogEntryType type, AuditLogEntry actual, boolean isTableNull, String keyspace)
+    {
+        assertLogEntry(cql, currentTable(), type, actual, isTableNull, keyspace);
+    }
+
+    private void assertLogEntry(String cql, String table, AuditLogEntryType type, AuditLogEntry actual, boolean isTableNull, String keyspace)
+    {
+        assertEquals(keyspace, actual.getKeyspace());
+        if (!isTableNull)
+        {
+            assertEquals(table, actual.getScope());
+        }
+        assertEquals(type, actual.getType());
+        assertEquals(cql, actual.getOperation());
+        assertNotEquals(0,actual.getTimestamp());
+    }
+
+    private void assertLogEntry(AuditLogEntry logEntry, String cql)
+    {
+        assertNull(logEntry.getKeyspace());
+        assertNull(logEntry.getScope());
+        assertNotEquals(0,logEntry.getTimestamp());
+        assertEquals(AuditLogEntryType.REQUEST_FAILURE, logEntry.getType());
+        if (null != cql && !cql.isEmpty())
+        {
+            assertThat(logEntry.getOperation(), containsString(cql));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/test/unit/org/apache/cassandra/audit/BinAuditLoggerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/audit/BinAuditLoggerTest.java b/test/unit/org/apache/cassandra/audit/BinAuditLoggerTest.java
new file mode 100644
index 0000000..f9d2930
--- /dev/null
+++ b/test/unit/org/apache/cassandra/audit/BinAuditLoggerTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.audit;
+
+import java.nio.file.Path;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import net.openhft.chronicle.queue.ChronicleQueue;
+import net.openhft.chronicle.queue.ChronicleQueueBuilder;
+import net.openhft.chronicle.queue.ExcerptTailer;
+import net.openhft.chronicle.queue.RollCycles;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.utils.binlog.BinLogTest;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+public class BinAuditLoggerTest extends CQLTester
+{
+    private static Path tempDir;
+
+    @BeforeClass
+    public static void setUp() throws Exception
+    {
+        tempDir = BinLogTest.tempDir();
+
+        AuditLogOptions options = new AuditLogOptions();
+        options.enabled = true;
+        options.logger = "BinAuditLogger";
+        options.roll_cycle = "TEST_SECONDLY";
+        options.audit_logs_dir = tempDir.toString();
+        DatabaseDescriptor.setAuditLoggingOptions(options);
+        requireNetwork();
+    }
+
+    @Test
+    public void testSelectRoundTripQuery() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+        execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 1, "Apache", "Cassandra");
+        execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 2, "trace", "test");
+
+        String cql = "SELECT id, v1, v2 FROM " + KEYSPACE + '.' + currentTable() + " WHERE id = ?";
+
+        Session session = sessionNet();
+
+        PreparedStatement pstmt = session.prepare(cql);
+        ResultSet rs = session.execute(pstmt.bind(1));
+
+        assertEquals(1, rs.all().size());
+        try (ChronicleQueue queue = ChronicleQueueBuilder.single(tempDir.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build())
+        {
+            ExcerptTailer tailer = queue.createTailer();
+            assertTrue(tailer.readDocument(wire -> {
+                assertEquals("AuditLog", wire.read("type").text());
+                assertThat(wire.read("message").text(), containsString(AuditLogEntryType.PREPARE_STATEMENT.toString()));
+            }));
+
+            assertTrue(tailer.readDocument(wire -> {
+                assertEquals("AuditLog", wire.read("type").text());
+                assertThat(wire.read("message").text(), containsString(AuditLogEntryType.SELECT.toString()));
+            }));
+            assertFalse(tailer.readDocument(wire -> {
+            }));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/test/unit/org/apache/cassandra/audit/FullQueryLoggerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/audit/FullQueryLoggerTest.java b/test/unit/org/apache/cassandra/audit/FullQueryLoggerTest.java
new file mode 100644
index 0000000..14f7f23
--- /dev/null
+++ b/test/unit/org/apache/cassandra/audit/FullQueryLoggerTest.java
@@ -0,0 +1,610 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.audit;
+
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import io.netty.buffer.Unpooled;
+import net.openhft.chronicle.queue.ChronicleQueue;
+import net.openhft.chronicle.queue.ChronicleQueueBuilder;
+import net.openhft.chronicle.queue.ExcerptTailer;
+import net.openhft.chronicle.queue.RollCycles;
+import net.openhft.chronicle.wire.ValueIn;
+import net.openhft.chronicle.wire.WireOut;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.audit.FullQueryLogger.WeighableMarshallableQuery;
+import org.apache.cassandra.audit.FullQueryLogger.WeighableMarshallableBatch;
+import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.binlog.BinLogTest;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class FullQueryLoggerTest extends CQLTester
+{
+    private static Path tempDir;
+
+    @BeforeClass
+    public static void beforeClass() throws Exception
+    {
+        tempDir = BinLogTest.tempDir();
+    }
+
+    private FullQueryLogger instance;
+    
+    @Before
+    public void setUp()
+    {
+        instance = AuditLogManager.getInstance().getFullQueryLogger();
+    }
+    
+    @After
+    public void tearDown()
+    {
+        instance.reset(tempDir.toString());
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testConfigureNullPath() throws Exception
+    {
+        instance.configure(null, "", true, 1, 1);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testConfigureNullRollCycle() throws Exception
+    {
+        instance.configure(BinLogTest.tempDir(), null, true, 1, 1);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testConfigureInvalidRollCycle() throws Exception
+    {
+        instance.configure(BinLogTest.tempDir(), "foobar", true, 1, 1);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testConfigureInvalidMaxQueueWeight() throws Exception
+    {
+        instance.configure(BinLogTest.tempDir(), "DAILY", true, 0, 1);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testConfigureInvalidMaxQueueLogSize() throws Exception
+    {
+        instance.configure(BinLogTest.tempDir(), "DAILY", true, 1, 0);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testConfigureOverExistingFile() throws Exception
+    {
+        File f = File.createTempFile("foo", "bar");
+        f.deleteOnExit();
+        instance.configure(f.toPath(), "TEST_SECONDLY", true, 1, 1);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testCanRead() throws Exception
+    {
+        tempDir.toFile().setReadable(false);
+        try
+        {
+            configureFQL();
+        }
+        finally
+        {
+            tempDir.toFile().setReadable(true);
+        }
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testCanWrite() throws Exception
+    {
+        tempDir.toFile().setWritable(false);
+        try
+        {
+            configureFQL();
+        }
+        finally
+        {
+            tempDir.toFile().setWritable(true);
+        }
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testCanExecute() throws Exception
+    {
+        tempDir.toFile().setExecutable(false);
+        try
+        {
+            configureFQL();
+        }
+        finally
+        {
+            tempDir.toFile().setExecutable(true);
+        }
+    }
+
+    @Test
+    public void testResetWithoutConfigure() throws Exception
+    {
+        instance.reset(tempDir.toString());
+        instance.reset(tempDir.toString());
+    }
+
+    @Test
+    public void stopWithoutConfigure() throws Exception
+    {
+        instance.stop();
+        instance.stop();
+    }
+
+    /**
+     * Both the last used and supplied directory should get cleaned
+     */
+    @Test
+    public void testResetCleansPaths() throws Exception
+    {
+        configureFQL();
+        File tempA = File.createTempFile("foo", "bar", tempDir.toFile());
+        assertTrue(tempA.exists());
+        File tempB = File.createTempFile("foo", "bar", BinLogTest.tempDir().toFile());
+        instance.reset(tempB.getParent());
+        assertFalse(tempA.exists());
+        assertFalse(tempB.exists());
+    }
+
+    /**
+     * The last used and configured directory are the same and it shouldn't be an issue
+     */
+    @Test
+    public void testResetSamePath() throws Exception
+    {
+        configureFQL();
+        File tempA = File.createTempFile("foo", "bar", tempDir.toFile());
+        assertTrue(tempA.exists());
+        instance.reset(tempA.getParent());
+        assertFalse(tempA.exists());
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testDoubleConfigure() throws Exception
+    {
+        configureFQL();
+        configureFQL();
+    }
+
+    @Test
+    public void testCleansDirectory() throws Exception
+    {
+        assertTrue(new File(tempDir.toFile(), "foobar").createNewFile());
+        configureFQL();
+        assertEquals(tempDir.toFile().listFiles().length, 1);
+        assertEquals("directory-listing.cq4t", tempDir.toFile().listFiles()[0].getName());
+    }
+
+    @Test
+    public void testEnabledReset() throws Exception
+    {
+        assertFalse(instance.enabled());
+        configureFQL();
+        assertTrue(instance.enabled());
+        instance.reset(tempDir.toString());
+        assertFalse(instance.enabled());
+    }
+
+    @Test
+    public void testEnabledStop() throws Exception
+    {
+        assertFalse(instance.enabled());
+        configureFQL();
+        assertTrue(instance.enabled());
+        instance.stop();
+        assertFalse(instance.enabled());
+    }
+
+    /**
+     * Test that a thread will block if the FQL is over weight, and unblock once the backup is cleared
+     */
+    @Test
+    public void testBlocking() throws Exception
+    {
+        configureFQL();
+        //Prevent the bin log thread from making progress, causing the task queue to block
+        Semaphore blockBinLog = new Semaphore(0);
+        try
+        {
+            //Find out when the bin log thread has been blocked, necessary to not run into batch task drain behavior
+            Semaphore binLogBlocked = new Semaphore(0);
+            instance.binLog.put(new WeighableMarshallableQuery("foo1", QueryOptions.DEFAULT, 1)
+            {
+
+                public void writeMarshallable(WireOut wire)
+                {
+                    //Notify that the bin log is blocking now
+                    binLogBlocked.release();
+                    try
+                    {
+                        //Block the bin log thread so the task queue can be filled
+                        blockBinLog.acquire();
+                    }
+                    catch (InterruptedException e)
+                    {
+                        e.printStackTrace();
+                    }
+                    super.writeMarshallable(wire);
+                }
+
+                public void release()
+                {
+                    super.release();
+                }
+            });
+
+            //Wait for the bin log thread to block so it can't batch drain tasks
+            Util.spinAssertEquals(true, binLogBlocked::tryAcquire, 60);
+
+            //Now fill the task queue
+            logQuery("foo2");
+
+            //Start a thread to block waiting on the bin log queue
+            Thread t = new Thread(() ->
+                                  {
+                                      logQuery("foo3");
+                                      //Should be able to log another query without an issue
+                                      logQuery("foo4");
+                                  });
+            t.start();
+            Thread.sleep(500);
+            //If thread state is terminated then the thread started, finished, and didn't block on the full task queue
+            assertTrue(t.getState() != Thread.State.TERMINATED);
+        }
+        finally
+        {
+            //Unblock the binlog thread
+            blockBinLog.release();
+        }
+        Util.spinAssertEquals(true, () -> checkForQueries(Arrays.asList("foo1", "foo2", "foo3", "foo4")), 60);
+    }
+
+    private boolean checkForQueries(List<String> queries)
+    {
+        try (ChronicleQueue queue = ChronicleQueueBuilder.single(tempDir.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build())
+        {
+            ExcerptTailer tailer = queue.createTailer();
+            List<String> expectedQueries = new LinkedList<>(queries);
+            while (!expectedQueries.isEmpty())
+            {
+                if (!tailer.readDocument(wire -> {
+                    assertEquals(expectedQueries.get(0), wire.read("query").text());
+                    expectedQueries.remove(0);
+                }))
+                {
+                    return false;
+                }
+            }
+            assertFalse(tailer.readDocument(wire -> {}));
+            return true;
+        }
+    }
+
+    @Test
+    public void testNonBlocking() throws Exception
+    {
+        instance.configure(tempDir, "TEST_SECONDLY", false, 1, 1024 * 1024 * 256);
+        //Prevent the bin log thread from making progress, causing the task queue to refuse tasks
+        Semaphore blockBinLog = new Semaphore(0);
+        try
+        {
+            //Find out when the bin log thread has been blocked, necessary to not run into batch task drain behavior
+            Semaphore binLogBlocked = new Semaphore(0);
+            instance.binLog.put(new WeighableMarshallableQuery("foo1", QueryOptions.DEFAULT, 1)
+            {
+
+                public void writeMarshallable(WireOut wire)
+                {
+                    //Notify that the bin log is blocking now
+                    binLogBlocked.release();
+                    try
+                    {
+                        //Block the bin log thread so the task queue can be filled
+                        blockBinLog.acquire();
+                    }
+                    catch (InterruptedException e)
+                    {
+                        e.printStackTrace();
+                    }
+                    super.writeMarshallable(wire);
+                }
+
+                public void release()
+                {
+                    super.release();
+                }
+            });
+
+            //Wait for the bin log thread to block so it can't batch drain tasks
+            Util.spinAssertEquals(true, binLogBlocked::tryAcquire, 60);
+
+            //Now fill the task queue
+            logQuery("foo2");
+
+            //This sample should get dropped AKA released without being written
+            AtomicInteger releasedCount = new AtomicInteger(0);
+            AtomicInteger writtenCount = new AtomicInteger(0);
+            instance.logRecord(new WeighableMarshallableQuery("foo3", QueryOptions.DEFAULT, 1) {
+                public void writeMarshallable(WireOut wire)
+                {
+                    writtenCount.incrementAndGet();
+                    super.writeMarshallable(wire);
+                }
+
+                public void release()
+                {
+                    releasedCount.incrementAndGet();
+                    super.release();
+                }
+            }, instance.binLog);
+
+            Util.spinAssertEquals(1, releasedCount::get, 60);
+            assertEquals(0, writtenCount.get());
+        }
+        finally
+        {
+            blockBinLog.release();
+        }
+        //Wait for tasks to drain so there should be space in the queue
+        Util.spinAssertEquals(true, () -> checkForQueries(Arrays.asList("foo1", "foo2")), 60);
+        //Should be able to log again
+        logQuery("foo4");
+        Util.spinAssertEquals(true, () -> checkForQueries(Arrays.asList("foo1", "foo2", "foo4")), 60);
+    }
+
+    @Test
+    public void testRoundTripQuery() throws Exception
+    {
+        configureFQL();
+        logQuery("foo");
+        Util.spinAssertEquals(true, () -> checkForQueries(Arrays.asList("foo")), 60);
+        try (ChronicleQueue queue = ChronicleQueueBuilder.single(tempDir.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build())
+        {
+            ExcerptTailer tailer = queue.createTailer();
+            assertTrue(tailer.readDocument(wire -> {
+                assertEquals("single", wire.read("type").text());
+                ProtocolVersion protocolVersion = ProtocolVersion.decode(wire.read("protocol-version").int32());
+                assertEquals(ProtocolVersion.CURRENT, protocolVersion);
+                QueryOptions queryOptions = QueryOptions.codec.decode(Unpooled.wrappedBuffer(wire.read("query-options").bytes()), protocolVersion);
+                compareQueryOptions(QueryOptions.DEFAULT, queryOptions);
+                assertEquals(1L, wire.read("query-time").int64());
+                assertEquals("foo", wire.read("query").text());
+            }));
+        }
+    }
+
+    @Test
+    public void testRoundTripBatch() throws Exception
+    {
+        configureFQL();
+        instance.logBatch("UNLOGGED", Arrays.asList("foo1", "foo2"), Arrays.asList(Arrays.asList(ByteBuffer.allocate(1) , ByteBuffer.allocateDirect(2)), Arrays.asList()), QueryOptions.DEFAULT, 1);
+        Util.spinAssertEquals(true, () ->
+        {
+            try (ChronicleQueue queue = ChronicleQueueBuilder.single(tempDir.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build())
+            {
+                return queue.createTailer().readingDocument().isPresent();
+            }
+        }, 60);
+        try (ChronicleQueue queue = ChronicleQueueBuilder.single(tempDir.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build())
+        {
+            ExcerptTailer tailer = queue.createTailer();
+            assertTrue(tailer.readDocument(wire -> {
+                assertEquals("batch", wire.read("type").text());
+                ProtocolVersion protocolVersion = ProtocolVersion.decode(wire.read("protocol-version").int32());
+                assertEquals(ProtocolVersion.CURRENT, protocolVersion);
+                QueryOptions queryOptions = QueryOptions.codec.decode(Unpooled.wrappedBuffer(wire.read("query-options").bytes()), protocolVersion);
+                assertEquals(1L, wire.read("query-time").int64());
+                compareQueryOptions(QueryOptions.DEFAULT, queryOptions);
+                assertEquals("UNLOGGED", wire.read("batch-type").text());
+                ValueIn in = wire.read("queries");
+                assertEquals(2, in.int32());
+                assertEquals("foo1", in.text());
+                assertEquals("foo2", in.text());
+                in = wire.read("values");
+                assertEquals(2, in.int32());
+                assertEquals(2, in.int32());
+                assertTrue(Arrays.equals(new byte[1], in.bytes()));
+                assertTrue(Arrays.equals(new byte[2], in.bytes()));
+                assertEquals(0, in.int32());
+            }));
+        }
+    }
+
+    @Test
+    public void testQueryWeight()
+    {
+        //Empty query should have some weight
+        WeighableMarshallableQuery query = new WeighableMarshallableQuery("", QueryOptions.DEFAULT, 1);
+        assertTrue(query.weight() >= 95);
+
+        StringBuilder sb = new StringBuilder();
+        for (int ii = 0; ii < 1024 * 1024; ii++)
+        {
+            sb.append('a');
+        }
+        query = new WeighableMarshallableQuery(sb.toString(), QueryOptions.DEFAULT, 1);
+
+        //A large query should be reflected in the size, * 2 since characters are still two bytes
+        assertTrue(query.weight() > ObjectSizes.measureDeep(sb.toString()));
+
+        //Large query options should be reflected
+        QueryOptions largeOptions = QueryOptions.forInternalCalls(Arrays.asList(ByteBuffer.allocate(1024 * 1024)));
+        query = new WeighableMarshallableQuery("", largeOptions, 1);
+        assertTrue(query.weight() > 1024 * 1024);
+        System.out.printf("weight %d%n", query.weight());
+    }
+
+    @Test
+    public void testBatchWeight()
+    {
+        //An empty batch should have weight
+        WeighableMarshallableBatch batch = new WeighableMarshallableBatch("", new ArrayList<>(), new ArrayList<>(), QueryOptions.DEFAULT, 1);
+        assertTrue(batch.weight() >= 183);
+
+        StringBuilder sb = new StringBuilder();
+        for (int ii = 0; ii < 1024 * 1024; ii++)
+        {
+            sb.append('a');
+        }
+
+        //The weight of the type string should be reflected
+        batch = new WeighableMarshallableBatch(sb.toString(), new ArrayList<>(), new ArrayList<>(), QueryOptions.DEFAULT, 1);
+        assertTrue(batch.weight() > ObjectSizes.measureDeep(sb.toString()));
+
+        //The weight of the list containing queries should be reflected
+        List<String> bigList = new ArrayList(100000);
+        for (int ii = 0; ii < 100000; ii++)
+        {
+            bigList.add("");
+        }
+        batch = new WeighableMarshallableBatch("", bigList, new ArrayList<>(), QueryOptions.DEFAULT, 1);
+        assertTrue(batch.weight() > ObjectSizes.measureDeep(bigList));
+
+        //The size of the query should be reflected
+        bigList = new ArrayList(1);
+        bigList.add(sb.toString());
+        batch = new WeighableMarshallableBatch("", bigList, new ArrayList<>(), QueryOptions.DEFAULT, 1);
+        assertTrue(batch.weight() > ObjectSizes.measureDeep(bigList));
+
+        bigList = null;
+        //The size of the list of values should be reflected
+        List<List<ByteBuffer>> bigValues = new ArrayList<>(100000);
+        for (int ii = 0; ii < 100000; ii++)
+        {
+            bigValues.add(new ArrayList<>(0));
+        }
+        bigValues.get(0).add(ByteBuffer.allocate(1024 * 1024 * 5));
+        batch = new WeighableMarshallableBatch("", new ArrayList<>(),  bigValues, QueryOptions.DEFAULT, 1);
+        assertTrue(batch.weight() > ObjectSizes.measureDeep(bigValues));
+
+        //As should the size of the values
+        QueryOptions largeOptions = QueryOptions.forInternalCalls(Arrays.asList(ByteBuffer.allocate(1024 * 1024)));
+        batch = new WeighableMarshallableBatch("", new ArrayList<>(), new ArrayList<>(), largeOptions, 1);
+        assertTrue(batch.weight() > 1024 * 1024);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testLogBatchNullType() throws Exception
+    {
+        instance.logBatch(null, new ArrayList<>(), new ArrayList<>(), QueryOptions.DEFAULT, 1);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testLogBatchNullQueries() throws Exception
+    {
+        instance.logBatch("", null, new ArrayList<>(), QueryOptions.DEFAULT, 1);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testLogBatchNullQueriesQuery() throws Exception
+    {
+        configureFQL();
+        instance.logBatch("", Arrays.asList((String)null), new ArrayList<>(), QueryOptions.DEFAULT, 1);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testLogBatchNullValues() throws Exception
+    {
+        instance.logBatch("", new ArrayList<>(), null, QueryOptions.DEFAULT, 1);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testLogBatchNullValuesValue() throws Exception
+    {
+        instance.logBatch("", new ArrayList<>(), Arrays.asList((List<ByteBuffer>)null), null, 1);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testLogBatchNullQueryOptions() throws Exception
+    {
+        instance.logBatch("", new ArrayList<>(), new ArrayList<>(), null, 1);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testLogBatchNegativeTime() throws Exception
+    {
+        instance.logBatch("", new ArrayList<>(), new ArrayList<>(), QueryOptions.DEFAULT, -1);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testLogQueryNullQuery() throws Exception
+    {
+        instance.logQuery(null, QueryOptions.DEFAULT, 1);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testLogQueryNullQueryOptions() throws Exception
+    {
+        instance.logQuery("", null, 1);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testLogQueryNegativeTime() throws Exception
+    {
+        instance.logQuery("", QueryOptions.DEFAULT, -1);
+    }
+
+    private static void compareQueryOptions(QueryOptions a, QueryOptions b)
+    {
+        assertEquals(a.getClass(), b.getClass());
+        assertEquals(a.getProtocolVersion(), b.getProtocolVersion());
+        assertEquals(a.getPageSize(), b.getPageSize());
+        assertEquals(a.getConsistency(), b.getConsistency());
+        assertEquals(a.getPagingState(), b.getPagingState());
+        assertEquals(a.getValues(), b.getValues());
+        assertEquals(a.getSerialConsistency(), b.getSerialConsistency());
+    }
+
+    private void configureFQL() throws Exception
+    {
+        instance.configure(tempDir, "TEST_SECONDLY", true, 1, 1024 * 1024 * 256);
+    }
+
+    private void logQuery(String query)
+    {
+        instance.logQuery(query, QueryOptions.DEFAULT, 1);
+    }
+
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[3/4] cassandra git commit: Audit logging for database activity

Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 2c28796..d945368 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -32,6 +32,7 @@ import com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.audit.AuditLogOptions;
 import org.apache.cassandra.db.ConsistencyLevel;
 
 /**
@@ -380,6 +381,8 @@ public class Config
     public volatile boolean automatic_sstable_upgrade = false;
     public volatile int max_concurrent_automatic_sstable_upgrades = 1;
 
+    public volatile AuditLogOptions audit_logging_options = new AuditLogOptions();
+
 
     /**
      * @deprecated migrate to {@link DatabaseDescriptor#isClientInitialized()}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 6b11974..699148d 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -36,6 +36,7 @@ import com.google.common.primitives.Longs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.audit.AuditLogOptions;
 import org.apache.cassandra.auth.AllowAllInternodeAuthenticator;
 import org.apache.cassandra.auth.AuthConfig;
 import org.apache.cassandra.auth.IAuthenticator;
@@ -2579,4 +2580,14 @@ public class DatabaseDescriptor
         if (value > getConcurrentCompactors())
             logger.warn("max_concurrent_automatic_sstable_upgrades ({}) is larger than concurrent_compactors ({})", value, getConcurrentCompactors());
     }
+    
+    public static AuditLogOptions getAuditLoggingOptions()
+    {
+        return conf.audit_logging_options;
+    }
+
+    public static void setAuditLoggingOptions(AuditLogOptions auditLoggingOptions)
+    {
+        conf.audit_logging_options = auditLoggingOptions;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/cql3/CQLStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CQLStatement.java b/src/java/org/apache/cassandra/cql3/CQLStatement.java
index f3b2090..1e4dad3 100644
--- a/src/java/org/apache/cassandra/cql3/CQLStatement.java
+++ b/src/java/org/apache/cassandra/cql3/CQLStatement.java
@@ -17,13 +17,14 @@
  */
 package org.apache.cassandra.cql3;
 
+import org.apache.cassandra.audit.IAuditLogContext;
 import org.apache.cassandra.cql3.functions.Function;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.transport.messages.ResultMessage;
 
-public interface CQLStatement
+public interface CQLStatement extends IAuditLogContext
 {
     /**
      * Returns the number of bound terms in this statement.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java
index 0f6d624..e26910d 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.cql3.statements;
 
+import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.exceptions.*;
@@ -117,4 +118,10 @@ public class AlterKeyspaceStatement extends SchemaAlteringStatement
     {
         return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.ALTER_KEYSPACE, keyspace(), null);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/cql3/statements/AlterRoleStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterRoleStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterRoleStatement.java
index 64ffe57..957ac97 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterRoleStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterRoleStatement.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.cql3.statements;
 
+import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.auth.*;
 import org.apache.cassandra.auth.IRoleManager.Option;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -109,4 +110,10 @@ public class AlterRoleStatement extends AuthenticationStatement
     {
         return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.ALTER_ROLE);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
index 1561c74..b3aeb74 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
@@ -22,6 +22,7 @@ import java.util.stream.Collectors;
 
 import com.google.common.collect.Iterables;
 
+import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.db.ColumnFamilyStore;
@@ -304,4 +305,10 @@ public class AlterTableStatement extends SchemaAlteringStatement
                              cfName,
                              oType);
     }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.ALTER_TABLE, keyspace(), cfName.getColumnFamily());
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/cql3/statements/AlterTypeStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterTypeStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterTypeStatement.java
index e76cd12..3249af2 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterTypeStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterTypeStatement.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.cql3.statements;
 
 import java.util.*;
 
+import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.db.marshal.*;
@@ -185,4 +186,10 @@ public abstract class AlterTypeStatement extends SchemaAlteringStatement
     {
         return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.ALTER_TYPE, keyspace(), name.getStringTypeName());
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/cql3/statements/AlterViewStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterViewStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterViewStatement.java
index fbfc54c..938908c 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterViewStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterViewStatement.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.cql3.statements;
 
+import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.cql3.CFName;
 import org.apache.cassandra.db.view.View;
@@ -93,4 +94,10 @@ public class AlterViewStatement extends SchemaAlteringStatement
     {
         return String.format("AlterViewStatement(name=%s)", cfName);
     }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.ALTER_VIEW, keyspace(), cfName.getColumnFamily());
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index b54e3a0..2fcd867 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -27,6 +27,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.helpers.MessageFormatter;
 
+import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.ColumnMetadata;
@@ -598,4 +599,10 @@ public class BatchStatement implements CQLStatement
             return m;
         }
     }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.BATCH);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/cql3/statements/CFStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CFStatement.java b/src/java/org/apache/cassandra/cql3/statements/CFStatement.java
index cb24615..7cc8a99 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CFStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CFStatement.java
@@ -57,7 +57,7 @@ public abstract class CFStatement extends ParsedStatement
 
     public String keyspace()
     {
-        assert cfName.hasKeyspace() : "The statement hasn't be prepared correctly";
+        assert cfName.hasKeyspace() : "The statement hasn't been prepared correctly";
         return cfName.getKeyspace();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java
index b9643b0..d3ef599 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.Objects;
 import java.util.List;
 
+import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.auth.*;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.schema.Schema;
@@ -266,4 +267,10 @@ public final class CreateAggregateStatement extends SchemaAlteringStatement
     {
         return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.CREATE_AGGREGATE, keyspace(), functionName.name);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java
index 259fed9..1f0e703 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 
+import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.auth.*;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.schema.Schema;
@@ -188,4 +189,10 @@ public final class CreateFunctionStatement extends SchemaAlteringStatement
     {
         return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.CREATE_FUNCTION, functionName.keyspace, functionName.name);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
index 05f3839..9d0a714 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
@@ -26,6 +26,7 @@ import com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.cql3.CFName;
 import org.apache.cassandra.cql3.ColumnIdentifier;
@@ -263,4 +264,9 @@ public class CreateIndexStatement extends SchemaAlteringStatement
     {
         return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.CREATE_INDEX, keyspace(), indexName);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java
index 3b6fd4e..b452d16 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.cql3.statements;
 
 import java.util.regex.Pattern;
 
+import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.auth.*;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.exceptions.*;
@@ -142,4 +143,10 @@ public class CreateKeyspaceStatement extends SchemaAlteringStatement
     {
         return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.CREATE_KEYSPACE, keyspace());
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/cql3/statements/CreateRoleStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateRoleStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateRoleStatement.java
index 0e0afec..f12d7e6 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateRoleStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateRoleStatement.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.cql3.statements;
 
+import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.auth.*;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.RoleName;
@@ -118,4 +119,9 @@ public class CreateRoleStatement extends AuthenticationStatement
     {
         return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.CREATE_ROLE);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
index 5315f95..55249c4 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
@@ -24,6 +24,7 @@ import com.google.common.collect.HashMultiset;
 import com.google.common.collect.Multiset;
 import org.apache.commons.lang3.StringUtils;
 
+import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.auth.*;
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.cql3.*;
@@ -419,4 +420,9 @@ public class CreateTableStatement extends SchemaAlteringStatement
     {
         return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.CREATE_TABLE, keyspace(), cfName.getColumnFamily());
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/cql3/statements/CreateTriggerStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateTriggerStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateTriggerStatement.java
index 58db887..d57bff7 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateTriggerStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateTriggerStatement.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.cql3.statements;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.cql3.CFName;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.InvalidRequestException;
@@ -103,4 +104,11 @@ public class CreateTriggerStatement extends SchemaAlteringStatement
     {
         return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.CREATE_TRIGGER, keyspace(), triggerName);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/cql3/statements/CreateTypeStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateTypeStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateTypeStatement.java
index 66b8705..7dce478 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateTypeStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateTypeStatement.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.cql3.statements;
 import java.util.*;
 import java.util.stream.Collectors;
 
+import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.db.marshal.AbstractType;
@@ -140,4 +141,10 @@ public class CreateTypeStatement extends SchemaAlteringStatement
     {
         return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.CREATE_TYPE, keyspace(), name.getStringTypeName());
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
index f2d2030..01ed6fe 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.*;
@@ -379,4 +380,9 @@ public class CreateViewStatement extends SchemaAlteringStatement
     {
         return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.CREATE_VIEW, keyspace(), cfName.getColumnFamily());
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
index 1e8abc5..af04572 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.cql3.statements;
 
 import java.util.List;
 
+import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.cql3.conditions.ColumnCondition;
 import org.apache.cassandra.cql3.conditions.Conditions;
@@ -191,4 +192,9 @@ public class DeleteStatement extends ModificationStatement
     {
         return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.DELETE, keyspace(), columnFamily());
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/cql3/statements/DropAggregateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropAggregateStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropAggregateStatement.java
index de80790..727e9f2 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DropAggregateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DropAggregateStatement.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
+import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.cql3.CQL3Type;
 import org.apache.cassandra.cql3.functions.*;
@@ -157,4 +158,10 @@ public final class DropAggregateStatement extends SchemaAlteringStatement
     {
         return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.DROP_AGGREGATE, functionName.keyspace, functionName.name);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java
index e82c257..0abcffa 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java
@@ -23,6 +23,7 @@ import java.util.List;
 
 import com.google.common.base.Joiner;
 
+import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.auth.FunctionResource;
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.cql3.CQL3Type;
@@ -188,10 +189,15 @@ public final class DropFunctionStatement extends SchemaAlteringStatement
         }
         return old;
     }
-    
     @Override
     public String toString()
     {
         return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.DROP_FUNCTION, functionName.keyspace, functionName.name);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/cql3/statements/DropIndexStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropIndexStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropIndexStatement.java
index 636a7dc..f61faf1 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DropIndexStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DropIndexStatement.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.cql3.statements;
 
+import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.Schema;
@@ -121,4 +122,10 @@ public class DropIndexStatement extends SchemaAlteringStatement
     {
         return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.DROP_INDEX, keyspace(), indexName);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/cql3/statements/DropKeyspaceStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropKeyspaceStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropKeyspaceStatement.java
index f289c84..cfc6564 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DropKeyspaceStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DropKeyspaceStatement.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.cql3.statements;
 
+import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.InvalidRequestException;
@@ -78,4 +79,10 @@ public class DropKeyspaceStatement extends SchemaAlteringStatement
     {
         return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.DROP_KEYSPACE, keyspace());
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/cql3/statements/DropRoleStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropRoleStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropRoleStatement.java
index a858233..88b8b1a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DropRoleStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DropRoleStatement.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.cql3.statements;
 
+import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.auth.*;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.RoleName;
@@ -81,4 +82,10 @@ public class DropRoleStatement extends AuthenticationStatement
     {
         return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.DROP_ROLE);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/cql3/statements/DropTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropTableStatement.java
index 09a8b61..d7801e5 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DropTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DropTableStatement.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.cql3.statements;
 
+import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.cql3.CFName;
 import org.apache.cassandra.exceptions.ConfigurationException;
@@ -109,4 +110,10 @@ public class DropTableStatement extends SchemaAlteringStatement
     {
         return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.DROP_TABLE, keyspace(), cfName.getColumnFamily());
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/cql3/statements/DropTriggerStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropTriggerStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropTriggerStatement.java
index 1031b73..1d9e3e4 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DropTriggerStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DropTriggerStatement.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.cql3.statements;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.cql3.CFName;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.InvalidRequestException;
@@ -90,4 +91,10 @@ public class DropTriggerStatement extends SchemaAlteringStatement
     {
         return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.DROP_TRIGGER, keyspace(), triggerName);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java
index 4f649e9..cc4ca1c 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.cql3.statements;
 
+import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.cql3.functions.Function;
@@ -122,10 +123,16 @@ public class DropTypeStatement extends SchemaAlteringStatement
         MigrationManager.announceTypeDrop(toDrop, isLocalOnly);
         return new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.TYPE, keyspace(), name.getStringTypeName());
     }
-    
+
     @Override
     public String toString()
     {
         return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.DROP_TYPE, keyspace(), name.getStringTypeName());
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/cql3/statements/DropViewStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropViewStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropViewStatement.java
index 2b31b51..218807f 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DropViewStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DropViewStatement.java
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.cql3.statements;
 
+import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.cql3.CFName;
 import org.apache.cassandra.db.view.View;
@@ -74,4 +75,10 @@ public class DropViewStatement extends SchemaAlteringStatement
     {
         return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.DROP_VIEW, keyspace(), cfName.getColumnFamily());
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/cql3/statements/GrantPermissionsStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/GrantPermissionsStatement.java b/src/java/org/apache/cassandra/cql3/statements/GrantPermissionsStatement.java
index 06a53e2..f5ac5ca 100644
--- a/src/java/org/apache/cassandra/cql3/statements/GrantPermissionsStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/GrantPermissionsStatement.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.cql3.statements;
 
 import java.util.Set;
 
+import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.auth.IResource;
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -40,4 +41,12 @@ public class GrantPermissionsStatement extends PermissionsManagementStatement
         DatabaseDescriptor.getAuthorizer().grant(state.getUser(), permissions, resource, grantee);
         return null;
     }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        String keyspace = resource.hasParent() ? resource.getParent().getName() : resource.getName();
+        return new AuditLogContext(AuditLogEntryType.GRANT, keyspace, resource.getName());
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/cql3/statements/GrantRoleStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/GrantRoleStatement.java b/src/java/org/apache/cassandra/cql3/statements/GrantRoleStatement.java
index 2aa3b7d..f234237 100644
--- a/src/java/org/apache/cassandra/cql3/statements/GrantRoleStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/GrantRoleStatement.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.cql3.statements;
 
+import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.RoleName;
 import org.apache.cassandra.exceptions.RequestExecutionException;
@@ -44,4 +45,10 @@ public class GrantRoleStatement extends RoleManagementStatement
     {
         return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.GRANT);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/cql3/statements/ListPermissionsStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ListPermissionsStatement.java b/src/java/org/apache/cassandra/cql3/statements/ListPermissionsStatement.java
index e742de5..80195c0 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ListPermissionsStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ListPermissionsStatement.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.cql3.statements;
 
 import java.util.*;
 
+import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.auth.*;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.schema.SchemaConstants;
@@ -137,4 +138,10 @@ public class ListPermissionsStatement extends AuthorizationStatement
     {
         return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.LIST_PERMISSIONS);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/cql3/statements/ListRolesStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ListRolesStatement.java b/src/java/org/apache/cassandra/cql3/statements/ListRolesStatement.java
index 47077e7..8b51c59 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ListRolesStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ListRolesStatement.java
@@ -24,6 +24,7 @@ import java.util.Set;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
+import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.auth.*;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.schema.SchemaConstants;
@@ -136,4 +137,10 @@ public class ListRolesStatement extends AuthorizationStatement
     {
         return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.LIST_ROLES);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/cql3/statements/RevokePermissionsStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/RevokePermissionsStatement.java b/src/java/org/apache/cassandra/cql3/statements/RevokePermissionsStatement.java
index cd35bf3..eb528fc 100644
--- a/src/java/org/apache/cassandra/cql3/statements/RevokePermissionsStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/RevokePermissionsStatement.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.cql3.statements;
 
 import java.util.Set;
 
+import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.auth.IResource;
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -48,4 +49,11 @@ public class RevokePermissionsStatement extends PermissionsManagementStatement
     {
         return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        String keyspace = resource.hasParent() ? resource.getParent().getName() : resource.getName();
+        return new AuditLogContext(AuditLogEntryType.REVOKE, keyspace, resource.getName());
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/cql3/statements/RevokeRoleStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/RevokeRoleStatement.java b/src/java/org/apache/cassandra/cql3/statements/RevokeRoleStatement.java
index 512b84a..c497121 100644
--- a/src/java/org/apache/cassandra/cql3/statements/RevokeRoleStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/RevokeRoleStatement.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.cql3.statements;
 
+import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.RoleName;
 import org.apache.cassandra.exceptions.RequestExecutionException;
@@ -44,4 +45,10 @@ public class RevokeRoleStatement extends RoleManagementStatement
     {
         return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.REVOKE);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 84fa1d7..7fa9964 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -25,6 +25,8 @@ import com.google.common.base.MoreObjects;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.audit.AuditLogEntryType;
+import org.apache.cassandra.audit.IAuditLogContext;
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.Schema;
@@ -287,6 +289,12 @@ public class SelectStatement implements CQLStatement
         }
     }
 
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.SELECT, keyspace(), table.name);
+    }
+
     // Simple wrapper class to avoid some code duplication
     private static abstract class Pager
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
index 3a6e22c..5d09cfa 100644
--- a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.cql3.statements;
 
 import java.util.concurrent.TimeoutException;
 
+import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.db.ColumnFamilyStore;
@@ -96,4 +97,10 @@ public class TruncateStatement extends CFStatement implements CQLStatement
     {
         return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.TRUNCATE, keyspace(), cfName.getColumnFamily());
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
index 858a490..66addab 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
@@ -21,6 +21,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.cql3.conditions.ColumnCondition;
 import org.apache.cassandra.cql3.conditions.Conditions;
@@ -333,4 +334,10 @@ public class UpdateStatement extends ModificationStatement
     {
         return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.UPDATE, keyspace(), columnFamily());
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UseStatement.java b/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
index d51e67b..381ed3a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.cql3.statements;
 
+import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.cql3.CQLStatement;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.exceptions.InvalidRequestException;
@@ -73,4 +74,10 @@ public class UseStatement extends ParsedStatement implements CQLStatement
     {
         return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.USE_KEYSPACE, keyspace);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/db/fullquerylog/FullQueryLogger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/fullquerylog/FullQueryLogger.java b/src/java/org/apache/cassandra/db/fullquerylog/FullQueryLogger.java
deleted file mode 100644
index 8502664..0000000
--- a/src/java/org/apache/cassandra/db/fullquerylog/FullQueryLogger.java
+++ /dev/null
@@ -1,530 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db.fullquerylog;
-
-import java.io.File;
-import java.nio.ByteBuffer;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
-import com.google.common.primitives.Ints;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import io.netty.buffer.ByteBuf;
-import net.openhft.chronicle.bytes.BytesStore;
-import net.openhft.chronicle.queue.RollCycles;
-import net.openhft.chronicle.wire.ValueOut;
-import net.openhft.chronicle.wire.WireOut;
-import org.apache.cassandra.cql3.QueryOptions;
-import org.apache.cassandra.io.FSError;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.transport.CBUtil;
-import org.apache.cassandra.transport.ProtocolVersion;
-import org.apache.cassandra.utils.NoSpamLogger;
-import org.apache.cassandra.utils.ObjectSizes;
-import org.apache.cassandra.utils.Throwables;
-import org.apache.cassandra.utils.binlog.BinLog;
-import org.apache.cassandra.utils.concurrent.WeightedQueue;
-import org.github.jamm.MemoryLayoutSpecification;
-
-/**
- * A logger that logs entire query contents after the query finishes (or times out).
- */
-public class FullQueryLogger
-{
-    private static final int EMPTY_BYTEBUFFER_SIZE = Ints.checkedCast(ObjectSizes.sizeOnHeapExcludingData(ByteBuffer.allocate(0)));
-    private static final int EMPTY_LIST_SIZE = Ints.checkedCast(ObjectSizes.measureDeep(new ArrayList(0)));
-    private static final int EMPTY_BYTEBUF_SIZE;
-    private static final int OBJECT_HEADER_SIZE = MemoryLayoutSpecification.SPEC.getObjectHeaderSize();
-    static
-    {
-        int tempSize = 0;
-        ByteBuf buf = CBUtil.allocator.buffer(0, 0);
-        try
-        {
-            tempSize = Ints.checkedCast(ObjectSizes.measure(buf));
-        }
-        finally
-        {
-            buf.release();
-        }
-        EMPTY_BYTEBUF_SIZE = tempSize;
-    }
-
-    private static final Logger logger = LoggerFactory.getLogger(FullQueryLogger.class);
-    private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES);
-    private static final NoSpamLogger.NoSpamLogStatement droppedSamplesStatement = noSpamLogger.getStatement("Dropped {} binary log samples", 1, TimeUnit.MINUTES);
-
-    public static final FullQueryLogger instance = new FullQueryLogger();
-
-    volatile BinLog binLog;
-    private volatile boolean blocking;
-    private Path path;
-
-    private final AtomicLong droppedSamplesSinceLastLog = new AtomicLong();
-
-    private FullQueryLogger()
-    {
-    }
-
-    /**
-     * Configure the global instance of the FullQueryLogger
-     * @param path Dedicated path where the FQL can store it's files.
-     * @param rollCycle How often to roll FQL log segments so they can potentially be reclaimed
-     * @param blocking Whether the FQL should block if the FQL falls behind or should drop log records
-     * @param maxQueueWeight Maximum weight of in memory queue for records waiting to be written to the file before blocking or dropping
-     * @param maxLogSize Maximum size of the rolled files to retain on disk before deleting the oldest file
-     */
-    public synchronized void configure(Path path, String rollCycle, boolean blocking, int maxQueueWeight, long maxLogSize)
-    {
-        Preconditions.checkNotNull(path, "path was null");
-        File pathAsFile = path.toFile();
-        Preconditions.checkNotNull(rollCycle, "rollCycle was null");
-        rollCycle = rollCycle.toUpperCase();
-
-        //Exists and is a directory or can be created
-        Preconditions.checkArgument((pathAsFile.exists() && pathAsFile.isDirectory()) || (!pathAsFile.exists() && pathAsFile.mkdirs()), "path exists and is not a directory or couldn't be created");
-        Preconditions.checkArgument(pathAsFile.canRead() && pathAsFile.canWrite() && pathAsFile.canExecute(), "path is not readable, writable, and executable");
-        Preconditions.checkNotNull(RollCycles.valueOf(rollCycle), "unrecognized roll cycle");
-        Preconditions.checkArgument(maxQueueWeight > 0, "maxQueueWeight must be > 0");
-        Preconditions.checkArgument(maxLogSize > 0, "maxLogSize must be > 0");
-        logger.info("Attempting to configure full query logger path: {} Roll cycle: {} Blocking: {} Max queue weight: {} Max log size:{}", path, rollCycle, blocking, maxQueueWeight, maxLogSize);
-        if (binLog != null)
-        {
-            logger.warn("Full query logger already configured. Ignoring requested configuration.");
-            throw new IllegalStateException("Already configured");
-        }
-
-        if (path.toFile().exists())
-        {
-            Throwable error = cleanDirectory(path.toFile(), null);
-            if (error != null)
-            {
-                throw new RuntimeException(error);
-            }
-        }
-
-        this.path = path;
-        this.blocking = blocking;
-        binLog = new BinLog(path, RollCycles.valueOf(rollCycle), maxQueueWeight, maxLogSize);
-        binLog.start();
-    }
-
-    /**
-     * Need the path as a parameter as well because if the process is restarted the config file might be the only
-     * location for retrieving the path to the full query log files, but JMX also allows you to specify a path
-     * that isn't persisted anywhere so we have to clean that one a well.
-     */
-    public synchronized void reset(String fullQueryLogPath)
-    {
-        try
-        {
-            Set<File> pathsToClean = Sets.newHashSet();
-
-            //First decide whether to clean the path configured in the YAML
-            if (fullQueryLogPath != null)
-            {
-                File fullQueryLogPathFile = new File(fullQueryLogPath);
-                if (fullQueryLogPathFile.exists())
-                {
-                    pathsToClean.add(fullQueryLogPathFile);
-                }
-            }
-
-            //Then decide whether to clean the last used path, possibly configured by JMX
-            if (path != null)
-            {
-                File pathFile = path.toFile();
-                if (pathFile.exists())
-                {
-                    pathsToClean.add(pathFile);
-                }
-            }
-
-            logger.info("Reset (and deactivation) of full query log requested.");
-            if (binLog != null)
-            {
-                logger.info("Stopping full query log. Cleaning {}.", pathsToClean);
-                binLog.stop();
-                binLog = null;
-            }
-            else
-            {
-                logger.info("Full query log already deactivated. Cleaning {}.", pathsToClean);
-            }
-
-            Throwable accumulate = null;
-            for (File f : pathsToClean)
-            {
-                accumulate = cleanDirectory(f, accumulate);
-            }
-            if (accumulate != null)
-            {
-                throw new RuntimeException(accumulate);
-            }
-        }
-        catch (Exception e)
-        {
-            if (e instanceof RuntimeException)
-            {
-                throw (RuntimeException)e;
-            }
-            throw new RuntimeException(e);
-        }
-    }
-
-    /**
-     * Stop the full query log leaving behind any generated files.
-     */
-    public synchronized void stop()
-    {
-        try
-        {
-            logger.info("Deactivation of full query log requested.");
-            if (binLog != null)
-            {
-                logger.info("Stopping full query log");
-                binLog.stop();
-                binLog = null;
-            }
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    /**
-     * Check whether the full query log is enabled.
-     * @return true if records are recorded and false otherwise.
-     */
-    public boolean enabled()
-    {
-        return binLog != null;
-    }
-
-    /**
-     * This is potentially lossy, but it's not super critical as we will always generally know
-     * when this is happening and roughly how bad it is.
-     */
-    private void logDroppedSample()
-    {
-        droppedSamplesSinceLastLog.incrementAndGet();
-        if (droppedSamplesStatement.warn(new Object[] {droppedSamplesSinceLastLog.get()}))
-        {
-            droppedSamplesSinceLastLog.set(0);
-        }
-    }
-
-    /**
-     * Log an invocation of a batch of queries
-     * @param type The type of the batch
-     * @param queries CQL text of the queries
-     * @param values Values to bind to as parameters for the queries
-     * @param queryOptions Options associated with the query invocation
-     * @param batchTimeMillis Approximate time in milliseconds since the epoch since the batch was invoked
-     */
-    public void logBatch(String type, List<String> queries,  List<List<ByteBuffer>> values, QueryOptions queryOptions, long batchTimeMillis)
-    {
-        Preconditions.checkNotNull(type, "type was null");
-        Preconditions.checkNotNull(queries, "queries was null");
-        Preconditions.checkNotNull(values, "value was null");
-        Preconditions.checkNotNull(queryOptions, "queryOptions was null");
-        Preconditions.checkArgument(batchTimeMillis > 0, "batchTimeMillis must be > 0");
-
-        //Don't construct the wrapper if the log is disabled
-        BinLog binLog = this.binLog;
-        if (binLog == null)
-        {
-            return;
-        }
-
-        WeighableMarshallableBatch wrappedBatch = new WeighableMarshallableBatch(type, queries, values, queryOptions, batchTimeMillis);
-        logRecord(wrappedBatch, binLog);
-    }
-
-    void logRecord(AbstractWeighableMarshallable record, BinLog binLog)
-    {
-
-        boolean putInQueue = false;
-        try
-        {
-            if (blocking)
-            {
-                try
-                {
-                    binLog.put(record);
-                    putInQueue = true;
-                }
-                catch (InterruptedException e)
-                {
-                    throw new RuntimeException(e);
-                }
-            }
-            else
-            {
-                if (!binLog.offer(record))
-                {
-                    logDroppedSample();
-                }
-                else
-                {
-                    putInQueue = true;
-                }
-            }
-        }
-        finally
-        {
-            if (!putInQueue)
-            {
-                record.release();
-            }
-        }
-    }
-
-    /**
-     * Log a single CQL query
-     * @param query CQL query text
-     * @param queryOptions Options associated with the query invocation
-     * @param queryTimeMillis Approximate time in milliseconds since the epoch since the batch was invoked
-     */
-    public void logQuery(String query, QueryOptions queryOptions, long queryTimeMillis)
-    {
-        Preconditions.checkNotNull(query, "query was null");
-        Preconditions.checkNotNull(queryOptions, "queryOptions was null");
-        Preconditions.checkArgument(queryTimeMillis > 0, "queryTimeMillis must be > 0");
-
-        //Don't construct the wrapper if the log is disabled
-        BinLog binLog = this.binLog;
-        if (binLog == null)
-        {
-            return;
-        }
-
-        WeighableMarshallableQuery wrappedQuery = new WeighableMarshallableQuery(query, queryOptions, queryTimeMillis);
-        logRecord(wrappedQuery, binLog);
-    }
-
-    private static abstract class AbstractWeighableMarshallable extends BinLog.ReleaseableWriteMarshallable implements WeightedQueue.Weighable
-    {
-        private final ByteBuf queryOptionsBuffer;
-        private final long timeMillis;
-        private final int protocolVersion;
-
-        private AbstractWeighableMarshallable(QueryOptions queryOptions, long timeMillis)
-        {
-            this.timeMillis = timeMillis;
-            ProtocolVersion version = queryOptions.getProtocolVersion();
-            this.protocolVersion = version.asInt();
-            int optionsSize = QueryOptions.codec.encodedSize(queryOptions, version);
-            queryOptionsBuffer = CBUtil.allocator.buffer(optionsSize, optionsSize);
-            /*
-             * Struggled with what tradeoff to make in terms of query options which is potentially large and complicated
-             * There is tension between low garbage production (or allocator overhead), small working set size, and CPU overhead reserializing the
-             * query options into binary format.
-             *
-             * I went with the lowest risk most predictable option which is allocator overhead and CPU overhead
-             * rather then keep the original query message around so I could just serialize that as a memcpy. It's more
-             * instructions when turned on, but it doesn't change memory footprint quite as much and it's more pay for what you use
-             * in terms of query volume. The CPU overhead is spread out across producers so we should at least get
-             * some scaling.
-             *
-             */
-            boolean success = false;
-            try
-            {
-                QueryOptions.codec.encode(queryOptions, queryOptionsBuffer, version);
-                success = true;
-            }
-            finally
-            {
-                if (!success)
-                {
-                    queryOptionsBuffer.release();
-                }
-            }
-        }
-
-        @Override
-        public void writeMarshallable(WireOut wire)
-        {
-            wire.write("protocol-version").int32(protocolVersion);
-            wire.write("query-options").bytes(BytesStore.wrap(queryOptionsBuffer.nioBuffer()));
-            wire.write("query-time").int64(timeMillis);
-        }
-
-        @Override
-        public void release()
-        {
-            queryOptionsBuffer.release();
-        }
-
-        //8-bytes for protocol version (assume alignment cost), 8-byte timestamp, 8-byte object header + other contents
-        @Override
-        public int weight()
-        {
-            return 8 + 8 + OBJECT_HEADER_SIZE + EMPTY_BYTEBUF_SIZE + queryOptionsBuffer.capacity();
-        }
-    }
-
-    static class WeighableMarshallableBatch extends AbstractWeighableMarshallable
-    {
-        private final int weight;
-        private final String batchType;
-        private final List<String> queries;
-        private final List<List<ByteBuffer>> values;
-
-        public WeighableMarshallableBatch(String batchType, List<String> queries, List<List<ByteBuffer>> values, QueryOptions queryOptions, long batchTimeMillis)
-        {
-           super(queryOptions, batchTimeMillis);
-           this.queries = queries;
-           this.values = values;
-           this.batchType = batchType;
-           boolean success = false;
-           try
-           {
-
-               //weight, batch type, queries, values
-               int weightTemp = 8 + EMPTY_LIST_SIZE + EMPTY_LIST_SIZE;
-               for (int ii = 0; ii < queries.size(); ii++)
-               {
-                   weightTemp += ObjectSizes.sizeOf(queries.get(ii));
-               }
-
-               weightTemp += EMPTY_LIST_SIZE * values.size();
-               for (int ii = 0; ii < values.size(); ii++)
-               {
-                   List<ByteBuffer> sublist = values.get(ii);
-                   weightTemp += EMPTY_BYTEBUFFER_SIZE * sublist.size();
-                   for (int zz = 0; zz < sublist.size(); zz++)
-                   {
-                       weightTemp += sublist.get(zz).capacity();
-                   }
-               }
-               weightTemp += super.weight();
-               weightTemp += ObjectSizes.sizeOf(batchType);
-               weight = weightTemp;
-               success = true;
-           }
-           finally
-           {
-               if (!success)
-               {
-                   release();
-               }
-           }
-        }
-
-        @Override
-        public void writeMarshallable(WireOut wire)
-        {
-            wire.write("type").text("batch");
-            super.writeMarshallable(wire);
-            wire.write("batch-type").text(batchType);
-            ValueOut valueOut = wire.write("queries");
-            valueOut.int32(queries.size());
-            for (String query : queries)
-            {
-                valueOut.text(query);
-            }
-            valueOut = wire.write("values");
-            valueOut.int32(values.size());
-            for (List<ByteBuffer> subValues : values)
-            {
-                valueOut.int32(subValues.size());
-                for (ByteBuffer value : subValues)
-                {
-                    valueOut.bytes(BytesStore.wrap(value));
-                }
-            }
-        }
-
-        @Override
-        public int weight()
-        {
-            return weight;
-        }
-
-    }
-
-    static class WeighableMarshallableQuery extends AbstractWeighableMarshallable
-    {
-        private final String query;
-
-        public WeighableMarshallableQuery(String query, QueryOptions queryOptions, long queryTimeMillis)
-        {
-            super(queryOptions, queryTimeMillis);
-            this.query = query;
-        }
-
-        @Override
-        public void writeMarshallable(WireOut wire)
-        {
-            wire.write("type").text("single");
-            super.writeMarshallable(wire);
-            wire.write("query").text(query);
-        }
-
-        @Override
-        public int weight()
-        {
-            return Ints.checkedCast(ObjectSizes.sizeOf(query)) + super.weight();
-        }
-    }
-
-    static Throwable cleanDirectory(File directory, Throwable accumulate)
-    {
-        if (!directory.exists())
-        {
-            return Throwables.merge(accumulate, new RuntimeException(String.format("%s does not exists", directory)));
-        }
-        if (!directory.isDirectory())
-        {
-            return Throwables.merge(accumulate, new RuntimeException(String.format("%s is not a directory", directory)));
-        }
-        for (File f : directory.listFiles())
-        {
-            accumulate = deleteRecursively(f, accumulate);
-        }
-        if (accumulate instanceof FSError)
-        {
-            FileUtils.handleFSError((FSError)accumulate);
-        }
-        return accumulate;
-    }
-
-    private static Throwable deleteRecursively(File fileOrDirectory, Throwable accumulate)
-    {
-        if (fileOrDirectory.isDirectory())
-        {
-            for (File f : fileOrDirectory.listFiles())
-            {
-                accumulate = FileUtils.deleteWithConfirm(f, true, accumulate);
-            }
-        }
-        return FileUtils.deleteWithConfirm(fileOrDirectory, true , accumulate);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 7f0837b..37bfd17 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -40,6 +40,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.audit.AuditLogManager;
 import org.apache.cassandra.batchlog.Batch;
 import org.apache.cassandra.batchlog.BatchlogManager;
 import org.apache.cassandra.concurrent.Stage;
@@ -55,7 +56,6 @@ import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
-import org.apache.cassandra.db.fullquerylog.FullQueryLogger;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.RowIterator;
 import org.apache.cassandra.db.view.ViewUtils;
@@ -78,7 +78,6 @@ import org.apache.cassandra.service.paxos.ProposeCallback;
 import org.apache.cassandra.service.paxos.ProposeVerbHandler;
 import org.apache.cassandra.net.MessagingService.Verb;
 import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.transport.Server;
 import org.apache.cassandra.triggers.TriggerExecutor;
 import org.apache.cassandra.utils.*;
 import org.apache.cassandra.utils.AbstractIterator;
@@ -2807,19 +2806,19 @@ public class StorageProxy implements StorageProxyMBean
     {
         path = path != null ? path : DatabaseDescriptor.getFullQueryLogPath();
         Preconditions.checkNotNull(path, "cassandra.yaml did not set full_query_log_dir and not set as parameter");
-        FullQueryLogger.instance.configure(Paths.get(path), rollCycle, blocking, maxQueueWeight, maxLogSize);
+        AuditLogManager.getInstance().configureFQL(Paths.get(path), rollCycle, blocking, maxQueueWeight, maxLogSize);
     }
 
     @Override
     public void resetFullQueryLogger()
     {
-        FullQueryLogger.instance.reset(DatabaseDescriptor.getFullQueryLogPath());
+        AuditLogManager.getInstance().resetFQL(DatabaseDescriptor.getFullQueryLogPath());
     }
 
     @Override
     public void stopFullQueryLogger()
     {
-        FullQueryLogger.instance.stop();
+        AuditLogManager.getInstance().disableFQL();
     }
 
     public int getOtcBacklogExpirationInterval() {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 2dea41c..dd2f178 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -37,6 +37,7 @@ import javax.management.openmbean.TabularData;
 import javax.management.openmbean.TabularDataSupport;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.collect.*;
 import com.google.common.util.concurrent.*;
@@ -46,6 +47,8 @@ import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.audit.AuditLogManager;
+import org.apache.cassandra.audit.AuditLogOptions;
 import org.apache.cassandra.auth.AuthKeyspace;
 import org.apache.cassandra.auth.AuthSchemaChangeListener;
 import org.apache.cassandra.batchlog.BatchRemoveVerbHandler;
@@ -5388,4 +5391,41 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         daemon.clearConnectionHistory();
         logger.info("Cleared connection history");
     }
+    public void disableAuditLog()
+    {
+        AuditLogManager.getInstance().disableAuditLog();
+        logger.info("Auditlog is disabled");
+    }
+
+    public void enableAuditLog(String loggerName, String includedKeyspaces, String excludedKeyspaces, String includedCategories, String excludedCategories,
+                               String includedUsers, String excludedUsers) throws ConfigurationException, IllegalStateException
+    {
+        loggerName = loggerName != null ? loggerName : DatabaseDescriptor.getAuditLoggingOptions().logger;
+
+        Preconditions.checkNotNull(loggerName, "cassandra.yaml did not have logger in audit_logging_option and not set as parameter");
+        Preconditions.checkState(FBUtilities.isAuditLoggerClassExists(loggerName), "Unable to find AuditLogger class: "+loggerName);
+
+        AuditLogOptions auditLogOptions = new AuditLogOptions();
+        auditLogOptions.enabled = true;
+        auditLogOptions.logger = loggerName;
+        auditLogOptions.included_keyspaces = includedKeyspaces != null ? includedKeyspaces : DatabaseDescriptor.getAuditLoggingOptions().included_keyspaces;
+        auditLogOptions.excluded_keyspaces = excludedKeyspaces != null ? excludedKeyspaces : DatabaseDescriptor.getAuditLoggingOptions().excluded_keyspaces;
+        auditLogOptions.included_categories = includedCategories != null ? includedCategories : DatabaseDescriptor.getAuditLoggingOptions().included_categories;
+        auditLogOptions.excluded_categories = excludedCategories != null ? excludedCategories : DatabaseDescriptor.getAuditLoggingOptions().excluded_categories;
+        auditLogOptions.included_users = includedUsers != null ? includedUsers : DatabaseDescriptor.getAuditLoggingOptions().included_users;
+        auditLogOptions.excluded_users = excludedUsers != null ? excludedUsers : DatabaseDescriptor.getAuditLoggingOptions().excluded_users;
+
+        AuditLogManager.getInstance().enableAuditLog(auditLogOptions);
+
+        logger.info("AuditLog is enabled with logger: [{}], included_keyspaces: [{}], excluded_keyspaces: [{}], " +
+                    "included_categories: [{}], excluded_categories: [{}]," +
+                    "included_users: [{}], excluded_users: [{}],", loggerName, auditLogOptions.included_keyspaces, auditLogOptions.excluded_keyspaces,
+                    auditLogOptions.included_categories, auditLogOptions.excluded_categories, auditLogOptions.included_users, auditLogOptions.excluded_users);
+
+    }
+
+    public boolean isAuditLogEnabled()
+    {
+        return AuditLogManager.getInstance().isAuditingEnabled();
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 20b7400..8c4b618 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -29,6 +29,8 @@ import javax.annotation.Nullable;
 import javax.management.NotificationEmitter;
 import javax.management.openmbean.TabularData;
 
+import org.apache.cassandra.exceptions.ConfigurationException;
+
 public interface StorageServiceMBean extends NotificationEmitter
 {
     /**
@@ -676,4 +678,7 @@ public interface StorageServiceMBean extends NotificationEmitter
 
     /** Clears the history of clients that have connected in the past **/
     void clearConnectionHistory();
+    public void disableAuditLog();
+    public void enableAuditLog(String loggerName, String includedKeyspaces, String excludedKeyspaces, String includedCategories, String excludedCategories, String includedUsers, String excludedUsers) throws ConfigurationException;
+    public boolean isAuditLogEnabled();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index e221e11..f556ffc 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -1675,6 +1675,16 @@ public class NodeProbe implements AutoCloseable
     {
         ssProxy.clearConnectionHistory();
     }
+    
+    public void disableAuditLog()
+    {
+        ssProxy.disableAuditLog();
+    }
+
+    public void enableAuditLog(String loggerName, String includedKeyspaces ,String excludedKeyspaces ,String includedCategories ,String excludedCategories ,String includedUsers ,String excludedUsers)
+    {
+        ssProxy.enableAuditLog(loggerName, includedKeyspaces, excludedKeyspaces, includedCategories, excludedCategories, includedUsers, excludedUsers);
+    }
 }
 
 class ColumnFamilyStoreMBeanIterator implements Iterator<Map.Entry<String, ColumnFamilyStoreMBean>>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/tools/NodeTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java
index fb9204c..b1d01d5 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -158,7 +158,9 @@ public class NodeTool
                 RelocateSSTables.class,
                 ViewBuildStatus.class,
                 HandoffWindow.class,
-                ReloadSslCertificates.class
+                ReloadSslCertificates.class,
+                EnableAuditLog.class,
+                DisableAuditLog.class
         );
 
         Cli.CliBuilder<Runnable> builder = Cli.builder("nodetool");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/tools/fqltool/Dump.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/fqltool/Dump.java b/src/java/org/apache/cassandra/tools/fqltool/Dump.java
index 9d70b8e..6a748bc 100644
--- a/src/java/org/apache/cassandra/tools/fqltool/Dump.java
+++ b/src/java/org/apache/cassandra/tools/fqltool/Dump.java
@@ -71,42 +71,52 @@ public class Dump implements Runnable
             sb.setLength(0);
             String type = wireIn.read("type").text();
             sb.append("Type: ").append(type).append(System.lineSeparator());
-            int protocolVersion = wireIn.read("protocol-version").int32();
-            sb.append("Protocol version: ").append(protocolVersion).append(System.lineSeparator());
-            QueryOptions options = QueryOptions.codec.decode(Unpooled.wrappedBuffer(wireIn.read("query-options").bytesStore().toTemporaryDirectByteBuffer()), ProtocolVersion.decode(protocolVersion));
-            sb.append("Query time: ").append(wireIn.read("query-time").int64()).append(System.lineSeparator());
-            if (type.equals("single"))
+            assert type != null;
+            if (type.equals("AuditLog"))
             {
-                sb.append("Query: ").append(wireIn.read("query").text()).append(System.lineSeparator());
-                List<ByteBuffer> values = options.getValues() != null ? options.getValues() : Collections.EMPTY_LIST;
-                sb.append("Values: ").append(System.lineSeparator());
-                valuesToStringBuilder(values, sb);
+                sb.append("LogMessage: ").append(wireIn.read("message").text()).append(System.lineSeparator());
+
             }
             else
             {
-                sb.append("Batch type: ").append(wireIn.read("batch-type").text()).append(System.lineSeparator());
-                ValueIn in = wireIn.read("queries");
-                int numQueries = in.int32();
-                List<String> queries = new ArrayList<>();
-                for (int ii = 0; ii < numQueries; ii++)
+                int protocolVersion = wireIn.read("protocol-version").int32();
+                sb.append("Protocol version: ").append(protocolVersion).append(System.lineSeparator());
+                QueryOptions options = QueryOptions.codec.decode(Unpooled.wrappedBuffer(wireIn.read("query-options").bytesStore().toTemporaryDirectByteBuffer()), ProtocolVersion.decode(protocolVersion));
+                sb.append("Query time: ").append(wireIn.read("query-time").int64()).append(System.lineSeparator());
+
+                if (type.equals("single"))
                 {
-                    queries.add(in.text());
+                    sb.append("Query: ").append(wireIn.read("query").text()).append(System.lineSeparator());
+                    List<ByteBuffer> values = options.getValues() != null ? options.getValues() : Collections.EMPTY_LIST;
+                    sb.append("Values: ").append(System.lineSeparator());
+                    valuesToStringBuilder(values, sb);
                 }
-                in = wireIn.read("values");
-                int numValues = in.int32();
-                List<List<ByteBuffer>> values = new ArrayList<>();
-                for (int ii = 0; ii < numValues; ii++)
+                else
                 {
-                    List<ByteBuffer> subValues = new ArrayList<>();
-                    values.add(subValues);
-                    int numSubValues = in.int32();
-                    for (int zz = 0; zz < numSubValues; zz++)
+                    sb.append("Batch type: ").append(wireIn.read("batch-type").text()).append(System.lineSeparator());
+                    ValueIn in = wireIn.read("queries");
+                    int numQueries = in.int32();
+                    List<String> queries = new ArrayList<>();
+                    for (int ii = 0; ii < numQueries; ii++)
                     {
-                        subValues.add(ByteBuffer.wrap(in.bytes()));
+                        queries.add(in.text());
+                    }
+                    in = wireIn.read("values");
+                    int numValues = in.int32();
+                    List<List<ByteBuffer>> values = new ArrayList<>();
+                    for (int ii = 0; ii < numValues; ii++)
+                    {
+                        List<ByteBuffer> subValues = new ArrayList<>();
+                        values.add(subValues);
+                        int numSubValues = in.int32();
+                        for (int zz = 0; zz < numSubValues; zz++)
+                        {
+                            subValues.add(ByteBuffer.wrap(in.bytes()));
+                        }
+                        sb.append("Query: ").append(queries.get(ii)).append(System.lineSeparator());
+                        sb.append("Values: ").append(System.lineSeparator());
+                        valuesToStringBuilder(subValues, sb);
                     }
-                    sb.append("Query: ").append(queries.get(ii)).append(System.lineSeparator());
-                    sb.append("Values: ").append(System.lineSeparator());
-                    valuesToStringBuilder(subValues, sb);
                 }
             }
             sb.append(System.lineSeparator());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/tools/nodetool/DisableAuditLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/DisableAuditLog.java b/src/java/org/apache/cassandra/tools/nodetool/DisableAuditLog.java
new file mode 100644
index 0000000..35653ae
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/nodetool/DisableAuditLog.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.nodetool;
+
+import io.airlift.airline.Command;
+import org.apache.cassandra.tools.NodeProbe;
+import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
+
+@Command(name = "disableauditlog", description = "Disable the audit log")
+public class DisableAuditLog extends NodeToolCmd
+{
+    @Override
+    public void execute(NodeProbe probe)
+    {
+        probe.disableAuditLog();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/tools/nodetool/EnableAuditLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/EnableAuditLog.java b/src/java/org/apache/cassandra/tools/nodetool/EnableAuditLog.java
new file mode 100644
index 0000000..c71d210
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/nodetool/EnableAuditLog.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.nodetool;
+
+import io.airlift.airline.Command;
+import io.airlift.airline.Option;
+import org.apache.cassandra.tools.NodeProbe;
+import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
+
+@Command(name = "enableauditlog", description = "Enable the audit log")
+public class EnableAuditLog extends NodeToolCmd
+{
+    @Option(title = "logger", name = { "--logger" }, description = "Logger name to be used for AuditLogging. Default BinAuditLogger. If not set the value from cassandra.yaml will be used")
+    private String logger = null;
+
+    @Option(title = "included_keyspaces", name = { "--included-keyspaces" }, description = "Comma separated list of keyspaces to be included for audit log. If not set the value from cassandra.yaml will be used")
+    private String included_keyspaces = null;
+
+    @Option(title = "excluded_keyspaces", name = { "--excluded-keyspaces" }, description = "Comma separated list of keyspaces to be excluded for audit log. If not set the value from cassandra.yaml will be used")
+    private String excluded_keyspaces = null;
+
+    @Option(title = "included_categories", name = { "--included-categories" }, description = "Comma separated list of Audit Log Categories to be included for audit log. If not set the value from cassandra.yaml will be used")
+    private String included_categories = null;
+
+    @Option(title = "excluded_categories", name = { "--excluded-categories" }, description = "Comma separated list of Audit Log Categories to be excluded for audit log. If not set the value from cassandra.yaml will be used")
+    private String excluded_categories = null;
+
+    @Option(title = "included_users", name = { "--included-users" }, description = "Comma separated list of users to be included for audit log. If not set the value from cassandra.yaml will be used")
+    private String included_users = null;
+
+    @Option(title = "excluded_users", name = { "--excluded-users" }, description = "Comma separated list of users to be excluded for audit log. If not set the value from cassandra.yaml will be used")
+    private String excluded_users = null;
+
+    @Override
+    public void execute(NodeProbe probe)
+    {
+        probe.enableAuditLog(logger, included_keyspaces, excluded_keyspaces, included_categories, excluded_categories, included_users, excluded_users);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/transport/Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java
index f71d640..531909f 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -42,6 +42,7 @@ import com.google.common.collect.ImmutableSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.audit.AuditLogManager;
 import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.transport.messages.*;
 import org.apache.cassandra.service.QueryState;
@@ -202,6 +203,10 @@ public abstract class Message
     {
         protected boolean tracingRequested;
 
+        protected final AuditLogManager auditLogManager = AuditLogManager.getInstance();
+        protected boolean auditLogEnabled = auditLogManager.isAuditingEnabled();
+        protected boolean isLoggingEnabled = auditLogManager.isLoggingEnabled();
+
         protected Request(Type type)
         {
             super(type);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[4/4] cassandra git commit: Audit logging for database activity

Posted by ja...@apache.org.
Audit logging for database activity

patch by Vinay Chella; reviewed by jasobrown for CASSANDRA-12151


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f56871b8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f56871b8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f56871b8

Branch: refs/heads/trunk
Commit: f56871b88be1e8965f166769c12cfa43313bac74
Parents: aba582f
Author: Vinay Chella <vi...@gmail.com>
Authored: Fri Feb 23 20:16:16 2018 -0800
Committer: Jason Brown <ja...@gmail.com>
Committed: Fri May 11 05:44:16 2018 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |   2 +
 conf/cassandra.yaml                             |  14 +
 doc/source/operating/audit_logging.rst          | 203 ++++++
 .../apache/cassandra/audit/AuditLogEntry.java   | 298 ++++++++
 .../cassandra/audit/AuditLogEntryCategory.java  |  27 +
 .../cassandra/audit/AuditLogEntryType.java      |  83 +++
 .../apache/cassandra/audit/AuditLogFilter.java  | 162 +++++
 .../apache/cassandra/audit/AuditLogManager.java | 317 +++++++++
 .../apache/cassandra/audit/AuditLogOptions.java |  61 ++
 .../apache/cassandra/audit/BinAuditLogger.java  |  85 +++
 .../cassandra/audit/BinLogAuditLogger.java      | 387 +++++++++++
 .../apache/cassandra/audit/FileAuditLogger.java |  57 ++
 .../apache/cassandra/audit/FullQueryLogger.java | 197 ++++++
 .../cassandra/audit/IAuditLogContext.java       |  53 ++
 .../apache/cassandra/audit/IAuditLogger.java    |  47 ++
 .../apache/cassandra/audit/NoOpAuditLogger.java |  42 ++
 .../org/apache/cassandra/config/Config.java     |   3 +
 .../cassandra/config/DatabaseDescriptor.java    |  11 +
 .../org/apache/cassandra/cql3/CQLStatement.java |   3 +-
 .../cql3/statements/AlterKeyspaceStatement.java |   7 +
 .../cql3/statements/AlterRoleStatement.java     |   7 +
 .../cql3/statements/AlterTableStatement.java    |   7 +
 .../cql3/statements/AlterTypeStatement.java     |   7 +
 .../cql3/statements/AlterViewStatement.java     |   7 +
 .../cql3/statements/BatchStatement.java         |   7 +
 .../cassandra/cql3/statements/CFStatement.java  |   2 +-
 .../statements/CreateAggregateStatement.java    |   7 +
 .../statements/CreateFunctionStatement.java     |   7 +
 .../cql3/statements/CreateIndexStatement.java   |   6 +
 .../statements/CreateKeyspaceStatement.java     |   7 +
 .../cql3/statements/CreateRoleStatement.java    |   6 +
 .../cql3/statements/CreateTableStatement.java   |   6 +
 .../cql3/statements/CreateTriggerStatement.java |   8 +
 .../cql3/statements/CreateTypeStatement.java    |   7 +
 .../cql3/statements/CreateViewStatement.java    |   6 +
 .../cql3/statements/DeleteStatement.java        |   6 +
 .../cql3/statements/DropAggregateStatement.java |   7 +
 .../cql3/statements/DropFunctionStatement.java  |   8 +-
 .../cql3/statements/DropIndexStatement.java     |   7 +
 .../cql3/statements/DropKeyspaceStatement.java  |   7 +
 .../cql3/statements/DropRoleStatement.java      |   7 +
 .../cql3/statements/DropTableStatement.java     |   7 +
 .../cql3/statements/DropTriggerStatement.java   |   7 +
 .../cql3/statements/DropTypeStatement.java      |   9 +-
 .../cql3/statements/DropViewStatement.java      |   7 +
 .../statements/GrantPermissionsStatement.java   |   9 +
 .../cql3/statements/GrantRoleStatement.java     |   7 +
 .../statements/ListPermissionsStatement.java    |   7 +
 .../cql3/statements/ListRolesStatement.java     |   7 +
 .../statements/RevokePermissionsStatement.java  |   8 +
 .../cql3/statements/RevokeRoleStatement.java    |   7 +
 .../cql3/statements/SelectStatement.java        |   8 +
 .../cql3/statements/TruncateStatement.java      |   7 +
 .../cql3/statements/UpdateStatement.java        |   7 +
 .../cassandra/cql3/statements/UseStatement.java |   7 +
 .../db/fullquerylog/FullQueryLogger.java        | 530 --------------
 .../apache/cassandra/service/StorageProxy.java  |   9 +-
 .../cassandra/service/StorageService.java       |  40 ++
 .../cassandra/service/StorageServiceMBean.java  |   5 +
 .../org/apache/cassandra/tools/NodeProbe.java   |  10 +
 .../org/apache/cassandra/tools/NodeTool.java    |   4 +-
 .../apache/cassandra/tools/fqltool/Dump.java    |  64 +-
 .../tools/nodetool/DisableAuditLog.java         |  33 +
 .../tools/nodetool/EnableAuditLog.java          |  55 ++
 .../org/apache/cassandra/transport/Message.java |   5 +
 .../transport/messages/AuthResponse.java        |  18 +
 .../transport/messages/BatchMessage.java        |  59 +-
 .../transport/messages/ExecuteMessage.java      |  61 +-
 .../transport/messages/PrepareMessage.java      |  30 +-
 .../transport/messages/QueryMessage.java        |  36 +-
 .../org/apache/cassandra/utils/FBUtilities.java |  24 +
 .../apache/cassandra/utils/binlog/BinLog.java   |   2 +-
 .../cassandra/audit/AuditLogFilterTest.java     | 189 +++++
 .../apache/cassandra/audit/AuditLoggerTest.java | 690 +++++++++++++++++++
 .../cassandra/audit/BinAuditLoggerTest.java     |  91 +++
 .../cassandra/audit/FullQueryLoggerTest.java    | 610 ++++++++++++++++
 .../cassandra/audit/InMemoryAuditLogger.java    |  47 ++
 .../config/DatabaseDescriptorRefTest.java       |   5 +
 .../org/apache/cassandra/cql3/CQLTester.java    |   7 +
 .../db/fullquerylog/FullQueryLoggerTest.java    | 601 ----------------
 .../service/StorageServiceServerTest.java       |  28 +
 .../cassandra/utils/binlog/BinLogTest.java      |  12 +-
 83 files changed, 4346 insertions(+), 1222 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e574d16..6308416 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Audit logging for database activity (CASSANDRA-12151)
  * Clean up build artifacts in docs container (CASSANDRA-14432)
  * Minor network authz improvements (Cassandra-14413)
  * Automatic sstable upgrades (CASSANDRA-14197)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 4885a12..da80684 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -79,6 +79,8 @@ New features
      in cassandra.yaml, and the docs for create and alter role statements. CASSANDRA-13985
    - Roles altered from login=true to login=false will prevent existing connections from executing any
      statements after the cache has been refreshed. CASSANDRA-13985
+   - Support for audit logging of database activity. If enabled, logs every incoming
+     CQL command request, Authentication (successful as well as unsuccessful login) to a node.
 
 Upgrading
 ---------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 7cc9e32..49c6f03 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1184,3 +1184,17 @@ back_pressure_strategy:
 # automatic_sstable_upgrade: false
 # Limit the number of concurrent sstable upgrades
 # max_concurrent_automatic_sstable_upgrades: 1
+
+# Audit logging - Logs every incoming CQL command request, authentication to a node. See the docs
+# on audit_logging for full details about the various configuration options.
+audit_logging_options:
+    enabled: false
+    logger: BinAuditLogger
+    # audit_logs_dir:
+    # included_keyspaces: 
+    # excluded_keyspaces:
+    # included_categories:
+    # excluded_categories:
+    # included_users:
+    # excluded_users:
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/doc/source/operating/audit_logging.rst
----------------------------------------------------------------------
diff --git a/doc/source/operating/audit_logging.rst b/doc/source/operating/audit_logging.rst
new file mode 100644
index 0000000..9be7a43
--- /dev/null
+++ b/doc/source/operating/audit_logging.rst
@@ -0,0 +1,203 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+.. or more contributor license agreements.  See the NOTICE file
+.. distributed with this work for additional information
+.. regarding copyright ownership.  The ASF licenses this file
+.. to you under the Apache License, Version 2.0 (the
+.. "License"); you may not use this file except in compliance
+.. with the License.  You may obtain a copy of the License at
+..
+..     http://www.apache.org/licenses/LICENSE-2.0
+..
+.. Unless required by applicable law or agreed to in writing, software
+.. distributed under the License is distributed on an "AS IS" BASIS,
+.. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+.. See the License for the specific language governing permissions and
+.. limitations under the License.
+
+.. highlight:: none
+
+
+
+Audit Logging
+------------------
+
+Audit logging in Cassandra logs every incoming CQL command request, Authentication (successful as well as unsuccessful login)
+to C* node. Currently, there are two implementations provided, the custom logger can be implemented and injected with the
+class name as a parameter in cassandra.yaml.
+
+- ``BinAuditLogger`` An efficient way to log events to file in a binary format.
+- ``FileAuditLogger`` Logs events to  ``audit/audit.log`` file using slf4j logger.
+
+*Recommendation* ``BinAuditLogger`` is a community recommended logger considering the performance
+
+What does it capture
+^^^^^^^^^^^^^^^^^^^^^^^
+
+Audit logging captures following events
+
+- Successful as well as unsuccessful login attempts.
+
+- All database commands executed via Native protocol (CQL) attempted or successfully executed.
+
+What does it log
+^^^^^^^^^^^^^^^^^^^
+Each audit log implementation has access to the following attributes, and for the default text based logger these fields are concatenated with `|` s to yield the final message.
+
+ - ``user``: User name(if available)
+ - ``host``: Host IP, where the command is being executed
+ - ``source ip address``: Source IP address from where the request initiated
+ - ``source port``: Source port number from where the request initiated
+ - ``timestamp``: unix time stamp
+ - ``type``: Type of the request (SELECT, INSERT, etc.,)
+ - ``category`` - Category of the request (DDL, DML, etc.,)
+ - ``keyspace`` - Keyspace(If applicable) on which request is targeted to be executed
+ - ``scope`` - Table/Aggregate name/ function name/ trigger name etc., as applicable
+ - ``operation`` - CQL command being executed
+
+How to configure
+^^^^^^^^^^^^^^^^^^
+Auditlog can be configured using cassandra.yaml. If you want to try Auditlog on one node, it can also be enabled and configured using ``nodetool``.
+
+cassandra.yaml configurations for AuditLog
+"""""""""""""""""""""""""""""""""""""""""""""
+	- ``enabled``: This option enables/ disables audit log
+	- ``logger``: Class name of the logger/ custom logger.
+	- ``audit_logs_dir``: Auditlogs directory location, if not set, default to `cassandra.logdir.audit` or `cassandra.logdir` + /audit/
+	- ``included_keyspaces``: Comma separated list of keyspaces to be included in audit log, default - includes all keyspaces
+	- ``excluded_keyspaces``: Comma separated list of keyspaces to be excluded from audit log, default - excludes no keyspace
+	- ``included_categories``: Comma separated list of Audit Log Categories to be included in audit log, default - includes all categories
+	- ``excluded_categories``: Comma separated list of Audit Log Categories to be excluded from audit log, default - excludes no category
+	- ``included_users``: Comma separated list of users to be included in audit log, default - includes all users
+	- ``excluded_users``: Comma separated list of users to be excluded from audit log, default - excludes no user
+
+	  
+List of available categories are: QUERY, DML, DDL, DCL, OTHER, AUTH, ERROR, PREPARE
+
+NodeTool command to enable AuditLog
+"""""""""""""""""""""""""""""""""""""
+``enableauditlog``: Enables AuditLog with yaml defaults. yaml configurations can be overridden using options via nodetool command.
+
+::
+
+    nodetool enableauditlog
+
+Options
+**********
+
+
+``--excluded-categories``
+    Comma separated list of Audit Log Categories to be excluded for
+    audit log. If not set the value from cassandra.yaml will be used
+
+``--excluded-keyspaces``
+    Comma separated list of keyspaces to be excluded for audit log. If
+    not set the value from cassandra.yaml will be used
+
+``--excluded-users``
+    Comma separated list of users to be excluded for audit log. If not
+    set the value from cassandra.yaml will be used
+
+``--included-categories``
+    Comma separated list of Audit Log Categories to be included for
+    audit log. If not set the value from cassandra.yaml will be used
+
+``--included-keyspaces``
+    Comma separated list of keyspaces to be included for audit log. If
+    not set the value from cassandra.yaml will be used
+
+``--included-users``
+    Comma separated list of users to be included for audit log. If not
+    set the value from cassandra.yaml will be used
+
+``--logger``
+    Logger name to be used for AuditLogging. Default BinAuditLogger. If
+    not set the value from cassandra.yaml will be used
+
+
+NodeTool command to disable AuditLog
+"""""""""""""""""""""""""""""""""""""""
+
+``disableauditlog``: Disables AuditLog.
+
+::
+
+    nodetool disableuditlog
+
+
+
+
+
+
+
+NodeTool command to reload AuditLog filters
+"""""""""""""""""""""""""""""""""""""""""""""
+
+``enableauditlog``: NodeTool enableauditlog command can be used to reload auditlog filters when called with default or previous ``loggername`` and updated filters
+
+E.g.,
+::
+
+    nodetool enableauditlog --loggername <Default/ existing loggerName> --included-keyspaces <New Filter values>
+
+
+
+
+
+
+
+
+Sample output
+^^^^^^^^^^^^^^^^
+::
+
+    LogMessage: user:anonymous|host:localhost/X.X.X.X|source:/X.X.X.X|port:60878|timestamp:1521158923615|type:USE_KS|category:DDL|ks:dev1|operation:USE "dev1"
+
+
+
+Configuring BinAuditLogger
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+To use ``BinAuditLogger`` as a logger in AuditLogging, set the logger to ``BinAuditLogger`` in cassandra.yaml under ``audit_logging_options`` section. ``BinAuditLogger`` can be futher configued using its advanced options in cassandra.yaml.
+
+
+Adcanced Options for BinAuditLogger
+""""""""""""""""""""""""""""""""""""""
+
+``block``
+	Indicates if the AuditLog should block if the it falls behind or should drop audit log records. Default is set to ``true`` so that AuditLog records wont be lost
+
+``max_queue_weight``
+	Maximum weight of in memory queue for records waiting to be written to the audit log file before blocking or dropping the log records. Default is set to ``256 * 1024 * 1024``
+
+``max_log_size``
+	Maximum size of the rolled files to retain on disk before deleting the oldest file. Default is set to ``16L * 1024L * 1024L * 1024L``
+
+``roll_cycle``
+	How often to roll Audit log segments so they can potentially be reclaimed. Available options are: MINUTELY, HOURLY, DAILY, LARGE_DAILY, XLARGE_DAILY, HUGE_DAILY.For more options, refer: net.openhft.chronicle.queue.RollCycles. Default is set to ``"HOURLY"``
+
+Configuring FileAuditLogger
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+To use ``FileAuditLogger`` as a logger in AuditLogging, apart from setting the class name in cassandra.yaml, following configuration is needed to have the audit log events to flow through separate log file instead of system.log
+
+
+.. code-block:: xml
+
+    	<!-- Audit Logging (FileAuditLogger) rolling file appender to audit.log -->
+    	<appender name="AUDIT" class="ch.qos.logback.core.rolling.RollingFileAppender">
+    	  <file>${cassandra.logdir}/audit/audit.log</file>
+    	  <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
+    	    <!-- rollover daily -->
+    	    <fileNamePattern>${cassandra.logdir}/audit/audit.log.%d{yyyy-MM-dd}.%i.zip</fileNamePattern>
+    	    <!-- each file should be at most 50MB, keep 30 days worth of history, but at most 5GB -->
+    	    <maxFileSize>50MB</maxFileSize>
+    	    <maxHistory>30</maxHistory>
+    	    <totalSizeCap>5GB</totalSizeCap>
+    	  </rollingPolicy>
+    	  <encoder>
+    	    <pattern>%-5level [%thread] %date{ISO8601} %F:%L - %msg%n</pattern>
+    	  </encoder>
+    	</appender>
+
+      	<!-- Audit Logging additivity to redirect audt logging events to audit/audit.log -->
+      	<logger name="org.apache.cassandra.audit" additivity="false" level="INFO">
+        	<appender-ref ref="AUDIT"/>
+      	</logger>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/audit/AuditLogEntry.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/audit/AuditLogEntry.java b/src/java/org/apache/cassandra/audit/AuditLogEntry.java
new file mode 100644
index 0000000..d53fc6e
--- /dev/null
+++ b/src/java/org/apache/cassandra/audit/AuditLogEntry.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.audit;
+
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.UUID;
+import javax.annotation.Nullable;
+
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.cassandra.auth.AuthenticatedUser;
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class AuditLogEntry
+{
+    private final InetAddressAndPort host = FBUtilities.getBroadcastAddressAndPort();
+    private final InetAddressAndPort source;
+    private final String user;
+    private final long timestamp;
+    private final AuditLogEntryType type;
+    private final UUID batch;
+    private final String keyspace;
+    private final String scope;
+    private final String operation;
+    private final QueryOptions options;
+
+    private AuditLogEntry(AuditLogEntryType type, InetAddressAndPort source, String user, long timestamp, UUID batch, String keyspace, String scope, String operation, QueryOptions options)
+    {
+        this.type = type;
+        this.source = source;
+        this.user = user;
+        this.timestamp = timestamp;
+        this.batch = batch;
+        this.keyspace = keyspace;
+        this.scope = scope;
+        this.operation = operation;
+        this.options = options;
+    }
+
+    String getLogString()
+    {
+        StringBuilder builder = new StringBuilder(100);
+        builder.append("user:").append(user)
+               .append("|host:").append(host)
+               .append("|source:").append(source.address);
+        if (source.port > 0)
+        {
+            builder.append("|port:").append(source.port);
+        }
+
+        builder.append("|timestamp:").append(timestamp)
+               .append("|type:").append(type)
+               .append("|category:").append(type.getCategory());
+
+        if (batch != null)
+        {
+            builder.append("|batch:").append(batch);
+        }
+        if (StringUtils.isNotBlank(keyspace))
+        {
+            builder.append("|ks:").append(keyspace);
+        }
+        if (StringUtils.isNotBlank(scope))
+        {
+            builder.append("|scope:").append(scope);
+        }
+        if (StringUtils.isNotBlank(operation))
+        {
+            builder.append("|operation:").append(operation);
+        }
+        return builder.toString();
+    }
+
+    public InetAddressAndPort getHost()
+    {
+        return host;
+    }
+
+    public InetAddressAndPort getSource()
+    {
+        return source;
+    }
+
+    public String getUser()
+    {
+        return user;
+    }
+
+    public long getTimestamp()
+    {
+        return timestamp;
+    }
+
+    public AuditLogEntryType getType()
+    {
+        return type;
+    }
+
+    public UUID getBatch()
+    {
+        return batch;
+    }
+
+    public String getKeyspace()
+    {
+        return keyspace;
+    }
+
+    public String getScope()
+    {
+        return scope;
+    }
+
+    public String getOperation()
+    {
+        return operation;
+    }
+
+    public QueryOptions getOptions()
+    {
+        return options;
+    }
+
+    public static class Builder
+    {
+        private static final InetAddressAndPort DEFAULT_SOURCE;
+
+        static
+        {
+            try
+            {
+                DEFAULT_SOURCE = InetAddressAndPort.getByNameOverrideDefaults("0.0.0.0", 0);
+            }
+            catch (UnknownHostException e)
+            {
+
+                throw new RuntimeException("failed to create default source address", e);
+            }
+        }
+
+        private static final String DEFAULT_OPERATION = StringUtils.EMPTY;
+
+        private AuditLogEntryType type;
+        private InetAddressAndPort source;
+        private String user;
+        private long timestamp;
+        private UUID batch;
+        private String keyspace;
+        private String scope;
+        private String operation;
+        private QueryOptions options;
+
+        public Builder(ClientState clientState)
+        {
+            if (clientState != null)
+            {
+                if (clientState.getRemoteAddress() != null)
+                {
+                    InetSocketAddress addr = clientState.getRemoteAddress();
+                    source = InetAddressAndPort.getByAddressOverrideDefaults(addr.getAddress(), addr.getPort());
+                }
+
+                if (clientState.getUser() != null)
+                {
+                    user = clientState.getUser().getName();
+                }
+                keyspace = clientState.getRawKeyspace();
+            }
+            else
+            {
+                source = DEFAULT_SOURCE;
+                user = AuthenticatedUser.SYSTEM_USER.getName();
+            }
+
+            timestamp = System.currentTimeMillis();
+        }
+
+        public Builder(AuditLogEntry entry)
+        {
+            type = entry.type;
+            source = entry.source;
+            user = entry.user;
+            timestamp = entry.timestamp;
+            batch = entry.batch;
+            keyspace = entry.keyspace;
+            scope = entry.scope;
+            operation = entry.operation;
+            options = entry.options;
+        }
+
+        public Builder setType(AuditLogEntryType type)
+        {
+            this.type = type;
+            return this;
+        }
+
+        public Builder(AuditLogEntryType type)
+        {
+            this.type = type;
+            operation = DEFAULT_OPERATION;
+        }
+
+        public Builder setUser(String user)
+        {
+            this.user = user;
+            return this;
+        }
+
+        public Builder setBatch(UUID batch)
+        {
+            this.batch = batch;
+            return this;
+        }
+
+        public Builder setTimestamp(long timestampMillis)
+        {
+            this.timestamp = timestampMillis;
+            return this;
+        }
+
+        public Builder setKeyspace(QueryState queryState, @Nullable CQLStatement statement)
+        {
+            keyspace = statement != null && statement.getAuditLogContext().keyspace != null
+                       ? statement.getAuditLogContext().keyspace
+                       : queryState.getClientState().getRawKeyspace();
+            return this;
+        }
+
+        public Builder setKeyspace(String keyspace)
+        {
+            this.keyspace = keyspace;
+            return this;
+        }
+
+        public Builder setKeyspace(CQLStatement statement)
+        {
+            this.keyspace = statement.getAuditLogContext().keyspace;
+            return this;
+        }
+
+        public Builder setScope(CQLStatement statement)
+        {
+            this.scope = statement.getAuditLogContext().scope;
+            return this;
+        }
+
+        public Builder setOperation(String operation)
+        {
+            this.operation = operation;
+            return this;
+        }
+
+        public void appendToOperation(String str)
+        {
+            if (StringUtils.isNotBlank(str))
+            {
+                if (operation.isEmpty())
+                    operation = str;
+                else
+                    operation = operation.concat("; ").concat(str);
+            }
+        }
+
+        public Builder setOptions(QueryOptions options)
+        {
+            this.options = options;
+            return this;
+        }
+
+        public AuditLogEntry build()
+        {
+            timestamp = timestamp > 0 ? timestamp : System.currentTimeMillis();
+            return new AuditLogEntry(type, source, user, timestamp, batch, keyspace, scope, operation, options);
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/audit/AuditLogEntryCategory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/audit/AuditLogEntryCategory.java b/src/java/org/apache/cassandra/audit/AuditLogEntryCategory.java
new file mode 100644
index 0000000..616658c
--- /dev/null
+++ b/src/java/org/apache/cassandra/audit/AuditLogEntryCategory.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.audit;
+
+/**
+ * Enum to categorize AuditLogEntries
+ */
+public enum AuditLogEntryCategory
+{
+    QUERY, DML, DDL, DCL, OTHER, AUTH, ERROR, PREPARE
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/audit/AuditLogEntryType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/audit/AuditLogEntryType.java b/src/java/org/apache/cassandra/audit/AuditLogEntryType.java
new file mode 100644
index 0000000..4eb112b
--- /dev/null
+++ b/src/java/org/apache/cassandra/audit/AuditLogEntryType.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.audit;
+
+public enum AuditLogEntryType
+{
+    /*
+     * CQL Audit Log Entry Types
+     */
+
+    SELECT(AuditLogEntryCategory.QUERY),
+    UPDATE(AuditLogEntryCategory.DML),
+    DELETE(AuditLogEntryCategory.DML),
+    TRUNCATE(AuditLogEntryCategory.DDL),
+    CREATE_KEYSPACE(AuditLogEntryCategory.DDL),
+    ALTER_KEYSPACE(AuditLogEntryCategory.DDL),
+    DROP_KEYSPACE(AuditLogEntryCategory.DDL),
+    CREATE_TABLE(AuditLogEntryCategory.DDL),
+    DROP_TABLE(AuditLogEntryCategory.DDL),
+    PREPARE_STATEMENT(AuditLogEntryCategory.PREPARE),
+    DROP_TRIGGER(AuditLogEntryCategory.DDL),
+    LIST_USERS(AuditLogEntryCategory.DCL),
+    CREATE_INDEX(AuditLogEntryCategory.DDL),
+    DROP_INDEX(AuditLogEntryCategory.DDL),
+    GRANT(AuditLogEntryCategory.DCL),
+    REVOKE(AuditLogEntryCategory.DCL),
+    CREATE_TYPE(AuditLogEntryCategory.DDL),
+    DROP_AGGREGATE(AuditLogEntryCategory.DDL),
+    ALTER_VIEW(AuditLogEntryCategory.DDL),
+    CREATE_VIEW(AuditLogEntryCategory.DDL),
+    DROP_ROLE(AuditLogEntryCategory.DCL),
+    CREATE_FUNCTION(AuditLogEntryCategory.DDL),
+    ALTER_TABLE(AuditLogEntryCategory.DDL),
+    BATCH(AuditLogEntryCategory.DML),
+    CREATE_AGGREGATE(AuditLogEntryCategory.DDL),
+    DROP_VIEW(AuditLogEntryCategory.DDL),
+    DROP_TYPE(AuditLogEntryCategory.DDL),
+    DROP_FUNCTION(AuditLogEntryCategory.DDL),
+    ALTER_ROLE(AuditLogEntryCategory.DCL),
+    CREATE_TRIGGER(AuditLogEntryCategory.DDL),
+    LIST_ROLES(AuditLogEntryCategory.DCL),
+    LIST_PERMISSIONS(AuditLogEntryCategory.DCL),
+    ALTER_TYPE(AuditLogEntryCategory.DDL),
+    CREATE_ROLE(AuditLogEntryCategory.DCL),
+    USE_KEYSPACE(AuditLogEntryCategory.OTHER),
+
+    /*
+     * Common Audit Log Entry Types
+     */
+
+    REQUEST_FAILURE(AuditLogEntryCategory.ERROR),
+    LOGIN_ERROR(AuditLogEntryCategory.AUTH),
+    UNAUTHORIZED_ATTEMPT(AuditLogEntryCategory.AUTH),
+    LOGIN_SUCCESS(AuditLogEntryCategory.AUTH);
+
+    private final AuditLogEntryCategory category;
+
+    AuditLogEntryType(AuditLogEntryCategory category)
+    {
+        this.category = category;
+    }
+
+    public AuditLogEntryCategory getCategory()
+    {
+        return category;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/audit/AuditLogFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/audit/AuditLogFilter.java b/src/java/org/apache/cassandra/audit/AuditLogFilter.java
new file mode 100644
index 0000000..163114d
--- /dev/null
+++ b/src/java/org/apache/cassandra/audit/AuditLogFilter.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.audit;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AuditLogFilter
+{
+    private static final Logger logger = LoggerFactory.getLogger(AuditLogFilter.class);
+
+    private static ImmutableSet<String> EMPTY_FILTERS = ImmutableSet.of();
+
+    private final ImmutableSet<String> excludedKeyspaces;
+    private final ImmutableSet<String> includedKeyspaces;
+    private final ImmutableSet<String> excludedCategories;
+    private final ImmutableSet<String> includedCategories;
+    private final ImmutableSet<String> includedUsers;
+    private final ImmutableSet<String> excludedUsers;
+
+    private AuditLogFilter(ImmutableSet<String> excludedKeyspaces, ImmutableSet<String> includedKeyspaces, ImmutableSet<String> excludedCategories, ImmutableSet<String> includedCategories, ImmutableSet<String> excludedUsers, ImmutableSet<String> includedUsers)
+    {
+        this.excludedKeyspaces = excludedKeyspaces;
+        this.includedKeyspaces = includedKeyspaces;
+        this.excludedCategories = excludedCategories;
+        this.includedCategories = includedCategories;
+        this.includedUsers = includedUsers;
+        this.excludedUsers = excludedUsers;
+    }
+
+    /**
+     * (Re-)Loads filters from config. Called during startup as well as JMX invocations.
+     */
+    public static AuditLogFilter create(AuditLogOptions auditLogOptions)
+    {
+        logger.trace("Loading AuditLog filters");
+
+        IncludeExcludeHolder keyspaces = loadInputSets(auditLogOptions.included_keyspaces, auditLogOptions.excluded_keyspaces);
+        IncludeExcludeHolder categories = loadInputSets(auditLogOptions.included_categories, auditLogOptions.excluded_categories);
+        IncludeExcludeHolder users = loadInputSets(auditLogOptions.included_users, auditLogOptions.excluded_users);
+
+        return new AuditLogFilter(keyspaces.excludedSet, keyspaces.includedSet,
+                                  categories.excludedSet, categories.includedSet,
+                                  users.excludedSet, users.includedSet);
+    }
+
+    /**
+     * Constructs mutually exclusive sets of included and excluded data. When there is a conflict,
+     * the entry is put into the excluded set (and removed fron the included).
+     */
+    private static IncludeExcludeHolder loadInputSets(String includedInput, String excludedInput)
+    {
+        final ImmutableSet<String> excludedSet;
+        if (StringUtils.isEmpty(excludedInput))
+        {
+            excludedSet = EMPTY_FILTERS;
+        }
+        else
+        {
+            String[] excludes = excludedInput.split(",");
+            ImmutableSet.Builder<String> builder = ImmutableSet.builderWithExpectedSize(excludes.length);
+            for (String exclude : excludes)
+            {
+                if (!exclude.isEmpty())
+                {
+                    builder.add(exclude);
+                }
+            }
+            excludedSet = builder.build();
+        }
+
+        final ImmutableSet<String> includedSet;
+        if (StringUtils.isEmpty(includedInput))
+        {
+            includedSet = EMPTY_FILTERS;
+        }
+        else
+        {
+            String[] includes = includedInput.split(",");
+            ImmutableSet.Builder<String> builder = ImmutableSet.builderWithExpectedSize(includes.length);
+            for (String include : includes)
+            {
+                //Ensure both included and excluded sets are mutually exclusive
+                if (!include.isEmpty() && !excludedSet.contains(include))
+                {
+                    builder.add(include);
+                }
+            }
+            includedSet = builder.build();
+        }
+
+        return new IncludeExcludeHolder(includedSet, excludedSet);
+    }
+
+    /**
+     * Simple struct to hold inclusion/exclusion sets.
+     */
+    private static class IncludeExcludeHolder
+    {
+        private final ImmutableSet<String> includedSet;
+        private final ImmutableSet<String> excludedSet;
+
+        private IncludeExcludeHolder(ImmutableSet<String> includedSet, ImmutableSet<String> excludedSet)
+        {
+            this.includedSet = includedSet;
+            this.excludedSet = excludedSet;
+        }
+    }
+
+    /**
+     * Checks whether a give AuditLog Entry is filtered or not
+     *
+     * @param auditLogEntry AuditLogEntry to verify
+     * @return true if it is filtered, false otherwise
+     */
+    boolean isFiltered(AuditLogEntry auditLogEntry)
+    {
+        return isFiltered(auditLogEntry.getKeyspace(), includedKeyspaces, excludedKeyspaces)
+               || isFiltered(auditLogEntry.getType().getCategory().toString(), includedCategories, excludedCategories)
+               || isFiltered(auditLogEntry.getUser(), includedUsers, excludedUsers);
+    }
+
+    /**
+     * Checks whether given input is being filtered or not.
+     * If excludeSet does not contain any items, by default nothing is excluded (unless there are
+     * entries in the includeSet).
+     * If includeSet does not contain any items, by default everything is included
+     * If an input is part of both includeSet and excludeSet, excludeSet takes the priority over includeSet
+     *
+     * @param input      Input to be checked for filtereing based on includeSet and excludeSet
+     * @param includeSet Include filtering set
+     * @param excludeSet Exclude filtering set
+     * @return true if the input is filtered, false when the input is not filtered
+     */
+    static boolean isFiltered(String input, Set<String> includeSet, Set<String> excludeSet)
+    {
+        if (!excludeSet.isEmpty() && excludeSet.contains(input))
+            return true;
+
+        return !(includeSet.isEmpty() || includeSet.contains(input));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/audit/AuditLogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/audit/AuditLogManager.java b/src/java/org/apache/cassandra/audit/AuditLogManager.java
new file mode 100644
index 0000000..090499c
--- /dev/null
+++ b/src/java/org/apache/cassandra/audit/AuditLogManager.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.audit;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.statements.ParsedStatement;
+import org.apache.cassandra.exceptions.AuthenticationException;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.UnauthorizedException;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Central location for managing the logging of client/user-initated actions (like queries, log in commands, and so on).
+ *
+ * We can run multiple {@link IAuditLogger}s at the same time, including the standard audit logger ({@link #auditLogger}
+ * and the full query logger ({@link #fullQueryLogger}.
+ */
+public class AuditLogManager
+{
+    private static final Logger logger = LoggerFactory.getLogger(AuditLogManager.class);
+    private static final AuditLogManager instance = new AuditLogManager();
+
+    // FQL always writes to a BinLog, but it is a type of IAuditLogger
+    private final FullQueryLogger fullQueryLogger;
+    private final ImmutableSet<AuditLogEntryCategory> fqlIncludeFilter = ImmutableSet.of(AuditLogEntryCategory.OTHER,
+                                                                                         AuditLogEntryCategory.QUERY,
+                                                                                         AuditLogEntryCategory.DCL,
+                                                                                         AuditLogEntryCategory.DML,
+                                                                                         AuditLogEntryCategory.DDL);
+
+    // auditLogger can write anywhere, as it's pluggable (logback, BinLog, DiagnosticEvents, etc ...)
+    private volatile IAuditLogger auditLogger;
+
+    private volatile AuditLogFilter filter;
+    private volatile boolean isAuditLogEnabled;
+
+    private AuditLogManager()
+    {
+        fullQueryLogger = new FullQueryLogger();
+
+        if (DatabaseDescriptor.getAuditLoggingOptions().enabled)
+        {
+            logger.info("Audit logging is enabled.");
+            auditLogger = getAuditLogger(DatabaseDescriptor.getAuditLoggingOptions().logger);
+            isAuditLogEnabled = true;
+        }
+        else
+        {
+            logger.debug("Audit logging is disabled.");
+            isAuditLogEnabled = false;
+            auditLogger = new NoOpAuditLogger();
+        }
+
+        filter = AuditLogFilter.create(DatabaseDescriptor.getAuditLoggingOptions());
+    }
+
+    public static AuditLogManager getInstance()
+    {
+        return instance;
+    }
+
+    private IAuditLogger getAuditLogger(String loggerClassName) throws ConfigurationException
+    {
+        if (loggerClassName != null)
+        {
+            return FBUtilities.newAuditLogger(loggerClassName);
+        }
+
+        return FBUtilities.newAuditLogger(BinAuditLogger.class.getName());
+    }
+
+    @VisibleForTesting
+    public IAuditLogger getLogger()
+    {
+        return auditLogger;
+    }
+
+    public boolean isAuditingEnabled()
+    {
+        return isAuditLogEnabled;
+    }
+
+    public boolean isLoggingEnabled()
+    {
+        return isAuditingEnabled() || isFQLEnabled();
+    }
+
+    private boolean isFQLEnabled()
+    {
+        return fullQueryLogger.enabled();
+    }
+
+    private boolean isSystemKeyspace(String keyspaceName)
+    {
+        return SchemaConstants.isLocalSystemKeyspace(keyspaceName);
+    }
+
+    /**
+     * Logs AuditLogEntry to standard audit logger
+     * @param logEntry AuditLogEntry to be logged
+     */
+    private void logAuditLoggerEntry(AuditLogEntry logEntry)
+    {
+        if ((logEntry.getKeyspace() == null || !isSystemKeyspace(logEntry.getKeyspace()))
+            && !filter.isFiltered(logEntry))
+        {
+            auditLogger.log(logEntry);
+        }
+    }
+
+    /**
+     * Logs AudigLogEntry to both FQL and standard audit logger
+     * @param logEntry AuditLogEntry to be logged
+     */
+    public void log(AuditLogEntry logEntry)
+    {
+        if (logEntry == null)
+            return;
+
+        if (isAuditingEnabled())
+        {
+            logAuditLoggerEntry(logEntry);
+        }
+
+        if (isFQLEnabled() && fqlIncludeFilter.contains(logEntry.getType().getCategory()))
+        {
+            fullQueryLogger.log(logEntry);
+        }
+    }
+
+    public void log(AuditLogEntry logEntry, Exception e)
+    {
+        if ((logEntry != null) && (isAuditingEnabled()))
+        {
+            AuditLogEntry.Builder builder = new AuditLogEntry.Builder(logEntry);
+
+            if (e instanceof UnauthorizedException)
+            {
+                builder.setType(AuditLogEntryType.UNAUTHORIZED_ATTEMPT);
+            }
+            else if (e instanceof AuthenticationException)
+            {
+                builder.setType(AuditLogEntryType.LOGIN_ERROR);
+            }
+            else
+            {
+                builder.setType(AuditLogEntryType.REQUEST_FAILURE);
+            }
+
+            builder.appendToOperation(e.getMessage());
+
+            log(builder.build());
+        }
+    }
+
+    /**
+     * Logs Batch queries to both FQL and standard audit logger.
+     */
+    public void logBatch(String batchTypeName, List<Object> queryOrIdList, List<List<ByteBuffer>> values, List<ParsedStatement.Prepared> prepared, QueryOptions options, QueryState state, long queryStartTimeMillis)
+    {
+        if (isAuditingEnabled())
+        {
+            List<AuditLogEntry> entries = buildEntriesForBatch(queryOrIdList, prepared, state, options, queryStartTimeMillis);
+            for (AuditLogEntry auditLogEntry : entries)
+            {
+                logAuditLoggerEntry(auditLogEntry);
+            }
+        }
+
+        if (isFQLEnabled())
+        {
+            List<String> queryStrings = new ArrayList<>(queryOrIdList.size());
+            for (ParsedStatement.Prepared prepStatment : prepared)
+            {
+                queryStrings.add(prepStatment.rawCQLStatement);
+            }
+            fullQueryLogger.logBatch(batchTypeName, queryStrings, values, options, queryStartTimeMillis);
+        }
+    }
+
+    private static List<AuditLogEntry> buildEntriesForBatch(List<Object> queryOrIdList, List<ParsedStatement.Prepared> prepared, QueryState state, QueryOptions options, long queryStartTimeMillis)
+    {
+        List<AuditLogEntry> auditLogEntries = new ArrayList<>(queryOrIdList.size() + 1);
+        UUID batchId = UUID.randomUUID();
+        String queryString = String.format("BatchId:[%s] - BATCH of [%d] statements", batchId, queryOrIdList.size());
+        AuditLogEntry entry = new AuditLogEntry.Builder(state.getClientState())
+                              .setOperation(queryString)
+                              .setOptions(options)
+                              .setTimestamp(queryStartTimeMillis)
+                              .setBatch(batchId)
+                              .setType(AuditLogEntryType.BATCH)
+                              .build();
+        auditLogEntries.add(entry);
+
+        for (int i = 0; i < queryOrIdList.size(); i++)
+        {
+            CQLStatement statement = prepared.get(i).statement;
+            entry = new AuditLogEntry.Builder(state.getClientState())
+                    .setType(statement.getAuditLogContext().auditLogEntryType)
+                    .setOperation(prepared.get(i).rawCQLStatement)
+                    .setTimestamp(queryStartTimeMillis)
+                    .setScope(statement)
+                    .setKeyspace(state, statement)
+                    .setOptions(options)
+                    .setBatch(batchId)
+                    .build();
+            auditLogEntries.add(entry);
+        }
+
+        return auditLogEntries;
+    }
+
+    /**
+     * Disables AuditLog, designed to be invoked only via JMX/ Nodetool, not from anywhere else in the codepath.
+     */
+    public synchronized void disableAuditLog()
+    {
+        if (isAuditLogEnabled)
+        {
+            // Disable isAuditLogEnabled before attempting to cleanup/ stop AuditLogger so that any incoming log() requests will be dropped.
+            isAuditLogEnabled = false;
+            IAuditLogger oldLogger = auditLogger;
+            auditLogger = new NoOpAuditLogger();
+            oldLogger.stop();
+        }
+    }
+
+    /**
+     * Enables AuditLog, designed to be invoked only via JMX/ Nodetool, not from anywhere else in the codepath.
+     * @param auditLogOptions AuditLogOptions to be used for enabling AuditLog
+     * @throws ConfigurationException It can throw configuration exception when provided logger class does not exist in the classpath
+     */
+    public synchronized void enableAuditLog(AuditLogOptions auditLogOptions) throws ConfigurationException
+    {
+        if (isFQLEnabled() && fullQueryLogger.path().toString().equals(auditLogOptions.audit_logs_dir))
+            throw new IllegalArgumentException(String.format("audit log path (%s) cannot be the same as the " +
+                                                             "running full query logger (%s)",
+                                                             auditLogOptions.audit_logs_dir,
+                                                             fullQueryLogger.path()));
+
+        // always reload the filters
+        filter = AuditLogFilter.create(auditLogOptions);
+
+        // next, check to see if we're changing the logging implementation; if not, keep the same instance and bail.
+        // note: auditLogger should never be null
+        IAuditLogger oldLogger = auditLogger;
+        if (oldLogger.getClass().getSimpleName().equals(auditLogOptions.logger))
+            return;
+
+        auditLogger = getAuditLogger(auditLogOptions.logger);
+        isAuditLogEnabled = true;
+
+        // ensure oldLogger's stop() is called after we swap it with new logger,
+        // otherwise, we might be calling log() on the stopped logger.
+        oldLogger.stop();
+    }
+
+    public void configureFQL(Path path, String rollCycle, boolean blocking, int maxQueueWeight, long maxLogSize)
+    {
+        if (path.equals(auditLogger.path()))
+            throw new IllegalArgumentException(String.format("fullquerylogger path (%s) cannot be the same as the " +
+                                                             "running audit logger (%s)",
+                                                             path,
+                                                             auditLogger.path()));
+
+        fullQueryLogger.configure(path, rollCycle, blocking, maxQueueWeight, maxLogSize);
+    }
+
+    public void resetFQL(String fullQueryLogPath)
+    {
+        fullQueryLogger.reset(fullQueryLogPath);
+    }
+
+    public void disableFQL()
+    {
+        fullQueryLogger.stop();
+    }
+
+    /**
+     * ONLY FOR TESTING
+     */
+    FullQueryLogger getFullQueryLogger()
+    {
+        return fullQueryLogger;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/audit/AuditLogOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/audit/AuditLogOptions.java b/src/java/org/apache/cassandra/audit/AuditLogOptions.java
new file mode 100644
index 0000000..1888c45
--- /dev/null
+++ b/src/java/org/apache/cassandra/audit/AuditLogOptions.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.audit;
+
+import org.apache.commons.lang3.StringUtils;
+
+public class AuditLogOptions
+{
+    public volatile boolean enabled = false;
+    public String logger = BinAuditLogger.class.getSimpleName();
+    public String included_keyspaces = StringUtils.EMPTY;
+    public String excluded_keyspaces = StringUtils.EMPTY;
+    public String included_categories = StringUtils.EMPTY;
+    public String excluded_categories = StringUtils.EMPTY;
+    public String included_users = StringUtils.EMPTY;
+    public String excluded_users = StringUtils.EMPTY;
+
+    /**
+     * AuditLogs directory can be configured using `cassandra.logdir.audit` or default is set to `cassandra.logdir` + /audit/
+     */
+    public String audit_logs_dir = System.getProperty("cassandra.logdir.audit",
+                                                      System.getProperty("cassandra.logdir",".")+"/audit/");
+    /**
+     * Indicates if the AuditLog should block if the it falls behind or should drop audit log records.
+     * Default is set to true so that AuditLog records wont be lost
+     */
+    public boolean block = true;
+
+    /**
+     * Maximum weight of in memory queue for records waiting to be written to the audit log file
+     * before blocking or dropping the log records. For advanced configurations
+     */
+    public int max_queue_weight = 256 * 1024 * 1024;
+
+    /**
+     * Maximum size of the rolled files to retain on disk before deleting the oldest file. For advanced configurations
+     */
+    public long max_log_size = 16L * 1024L * 1024L * 1024L;
+
+    /**
+     * How often to roll Audit log segments so they can potentially be reclaimed. Available options are:
+     * MINUTELY, HOURLY, DAILY, LARGE_DAILY, XLARGE_DAILY, HUGE_DAILY.
+     * For more options, refer: net.openhft.chronicle.queue.RollCycles
+     */
+    public String roll_cycle = "HOURLY";
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/audit/BinAuditLogger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/audit/BinAuditLogger.java b/src/java/org/apache/cassandra/audit/BinAuditLogger.java
new file mode 100644
index 0000000..89b764c
--- /dev/null
+++ b/src/java/org/apache/cassandra/audit/BinAuditLogger.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.audit;
+
+import java.nio.file.Paths;
+
+import com.google.common.primitives.Ints;
+
+import net.openhft.chronicle.wire.WireOut;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.binlog.BinLog;
+import org.apache.cassandra.utils.concurrent.WeightedQueue;
+
+public class BinAuditLogger extends BinLogAuditLogger implements IAuditLogger
+{
+    public BinAuditLogger()
+    {
+        // due to the way that IAuditLogger instance are created in AuditLogManager, via reflection, we can't assume
+        // the manager will call configure() (it won't). thus, we have to call it here from the constructor.
+        AuditLogOptions auditLoggingOptions = DatabaseDescriptor.getAuditLoggingOptions();
+        configure(Paths.get(auditLoggingOptions.audit_logs_dir),
+                  auditLoggingOptions.roll_cycle,
+                  auditLoggingOptions.block,
+                  auditLoggingOptions.max_queue_weight,
+                  auditLoggingOptions.max_log_size,
+                  false);
+    }
+
+    @Override
+    public void log(AuditLogEntry auditLogEntry)
+    {
+        BinLog binLog = this.binLog;
+        if (binLog == null || auditLogEntry == null)
+        {
+            return;
+        }
+
+        super.logRecord(new WeighableMarshallableMessage(auditLogEntry.getLogString()), binLog);
+    }
+
+    static class WeighableMarshallableMessage extends BinLog.ReleaseableWriteMarshallable implements WeightedQueue.Weighable
+    {
+        private final String message;
+
+        WeighableMarshallableMessage(String message)
+        {
+            this.message = message;
+        }
+
+        @Override
+        public void writeMarshallable(WireOut wire)
+        {
+            wire.write("type").text("AuditLog");
+            wire.write("message").text(message);
+        }
+
+        @Override
+        public void release()
+        {
+
+        }
+
+        @Override
+        public int weight()
+        {
+            return Ints.checkedCast(ObjectSizes.sizeOf(message));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/audit/BinLogAuditLogger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/audit/BinLogAuditLogger.java b/src/java/org/apache/cassandra/audit/BinLogAuditLogger.java
new file mode 100644
index 0000000..a2426b9
--- /dev/null
+++ b/src/java/org/apache/cassandra/audit/BinLogAuditLogger.java
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.audit;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import com.google.common.primitives.Ints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.buffer.ByteBuf;
+import net.openhft.chronicle.bytes.BytesStore;
+import net.openhft.chronicle.queue.RollCycles;
+import net.openhft.chronicle.wire.WireOut;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.io.FSError;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.transport.CBUtil;
+import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.utils.NoSpamLogger;
+import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.Throwables;
+import org.apache.cassandra.utils.binlog.BinLog;
+import org.apache.cassandra.utils.concurrent.WeightedQueue;
+import org.github.jamm.MemoryLayoutSpecification;
+
+abstract class BinLogAuditLogger implements IAuditLogger
+{
+    static final int EMPTY_BYTEBUFFER_SIZE = Ints.checkedCast(ObjectSizes.sizeOnHeapExcludingData(ByteBuffer.allocate(0)));
+    static final int EMPTY_LIST_SIZE = Ints.checkedCast(ObjectSizes.measureDeep(new ArrayList(0)));
+    private static final int EMPTY_BYTEBUF_SIZE;
+    private static final int OBJECT_HEADER_SIZE = MemoryLayoutSpecification.SPEC.getObjectHeaderSize();
+    static
+    {
+        int tempSize = 0;
+        ByteBuf buf = CBUtil.allocator.buffer(0, 0);
+        try
+        {
+            tempSize = Ints.checkedCast(ObjectSizes.measure(buf));
+        }
+        finally
+        {
+            buf.release();
+        }
+        EMPTY_BYTEBUF_SIZE = tempSize;
+    }
+
+    protected static final Logger logger = LoggerFactory.getLogger(BinLogAuditLogger.class);
+    private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES);
+    private static final NoSpamLogger.NoSpamLogStatement droppedSamplesStatement = noSpamLogger.getStatement("Dropped {} binary log samples", 1, TimeUnit.MINUTES);
+
+    volatile BinLog binLog;
+    protected volatile boolean blocking;
+    protected Path path;
+
+    private final AtomicLong droppedSamplesSinceLastLog = new AtomicLong();
+
+    /**
+     * Configure the global instance of the FullQueryLogger. Clean the provided directory before starting
+     * @param path Dedicated path where the FQL can store it's files.
+     * @param rollCycle How often to roll FQL log segments so they can potentially be reclaimed
+     * @param blocking Whether the FQL should block if the FQL falls behind or should drop log records
+     * @param maxQueueWeight Maximum weight of in memory queue for records waiting to be written to the file before blocking or dropping
+     * @param maxLogSize Maximum size of the rolled files to retain on disk before deleting the oldest file
+     */
+    public synchronized void configure(Path path, String rollCycle, boolean blocking, int maxQueueWeight, long maxLogSize)
+    {
+        this.configure(path, rollCycle, blocking, maxQueueWeight, maxLogSize, true);
+    }
+
+    /**
+     * Configure the global instance of the FullQueryLogger
+     * @param path Dedicated path where the FQL can store it's files.
+     * @param rollCycle How often to roll FQL log segments so they can potentially be reclaimed
+     * @param blocking Whether the FQL should block if the FQL falls behind or should drop log records
+     * @param maxQueueWeight Maximum weight of in memory queue for records waiting to be written to the file before blocking or dropping
+     * @param maxLogSize Maximum size of the rolled files to retain on disk before deleting the oldest file
+     * @param cleanDirectory Indicates to clean the directory before starting FullQueryLogger or not
+     */
+    public synchronized void configure(Path path, String rollCycle, boolean blocking, int maxQueueWeight, long maxLogSize, boolean cleanDirectory)
+    {
+        Preconditions.checkNotNull(path, "path was null");
+        File pathAsFile = path.toFile();
+        Preconditions.checkNotNull(rollCycle, "rollCycle was null");
+        rollCycle = rollCycle.toUpperCase();
+
+        //Exists and is a directory or can be created
+        Preconditions.checkArgument((pathAsFile.exists() && pathAsFile.isDirectory()) || (!pathAsFile.exists() && pathAsFile.mkdirs()), "path exists and is not a directory or couldn't be created");
+        Preconditions.checkArgument(pathAsFile.canRead() && pathAsFile.canWrite() && pathAsFile.canExecute(), "path is not readable, writable, and executable");
+        Preconditions.checkNotNull(RollCycles.valueOf(rollCycle), "unrecognized roll cycle");
+        Preconditions.checkArgument(maxQueueWeight > 0, "maxQueueWeight must be > 0");
+        Preconditions.checkArgument(maxLogSize > 0, "maxLogSize must be > 0");
+        logger.info("Attempting to configure full query logger path: {} Roll cycle: {} Blocking: {} Max queue weight: {} Max log size:{}", path, rollCycle, blocking, maxQueueWeight, maxLogSize);
+
+        if (binLog != null)
+        {
+            logger.warn("Full query logger already configured. Ignoring requested configuration.");
+            throw new IllegalStateException("Already configured");
+        }
+
+        if (cleanDirectory)
+        {
+            logger.info("Cleaning directory: {} as requested",path);
+            if (path.toFile().exists())
+            {
+                Throwable error = cleanDirectory(path.toFile(), null);
+                if (error != null)
+                {
+                    throw new RuntimeException(error);
+                }
+            }
+        }
+
+        this.path = path;
+        this.blocking = blocking;
+        binLog = new BinLog(path, RollCycles.valueOf(rollCycle), maxQueueWeight, maxLogSize);
+        binLog.start();
+    }
+
+    public Path path()
+    {
+        return path;
+    }
+
+    /**
+     * Need the path as a parameter as well because if the process is restarted the config file might be the only
+     * location for retrieving the path to the full query log files, but JMX also allows you to specify a path
+     * that isn't persisted anywhere so we have to clean that one a well.
+     */
+    public synchronized void reset(String fullQueryLogPath)
+    {
+        try
+        {
+            Set<File> pathsToClean = Sets.newHashSet();
+
+            //First decide whether to clean the path configured in the YAML
+            if (fullQueryLogPath != null)
+            {
+                File fullQueryLogPathFile = new File(fullQueryLogPath);
+                if (fullQueryLogPathFile.exists())
+                {
+                    pathsToClean.add(fullQueryLogPathFile);
+                }
+            }
+
+            //Then decide whether to clean the last used path, possibly configured by JMX
+            if (path != null)
+            {
+                File pathFile = path.toFile();
+                if (pathFile.exists())
+                {
+                    pathsToClean.add(pathFile);
+                }
+            }
+
+            logger.info("Reset (and deactivation) of full query log requested.");
+            if (binLog != null)
+            {
+                logger.info("Stopping full query log. Cleaning {}.", pathsToClean);
+                binLog.stop();
+                binLog = null;
+            }
+            else
+            {
+                logger.info("Full query log already deactivated. Cleaning {}.", pathsToClean);
+            }
+
+            Throwable accumulate = null;
+            for (File f : pathsToClean)
+            {
+                accumulate = cleanDirectory(f, accumulate);
+            }
+            if (accumulate != null)
+            {
+                throw new RuntimeException(accumulate);
+            }
+        }
+        catch (Exception e)
+        {
+            if (e instanceof RuntimeException)
+            {
+                throw (RuntimeException)e;
+            }
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Stop the full query log leaving behind any generated files.
+     */
+    public synchronized void stop()
+    {
+        try
+        {
+            logger.info("Deactivation of full query log requested.");
+            if (binLog != null)
+            {
+                logger.info("Stopping full query log");
+                binLog.stop();
+                binLog = null;
+            }
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Check whether the full query log is enabled.
+     * @return true if records are recorded and false otherwise.
+     */
+    public boolean enabled()
+    {
+        return binLog != null;
+    }
+
+    void logRecord(BinLog.ReleaseableWriteMarshallable record, BinLog binLog)
+    {
+        boolean putInQueue = false;
+        try
+        {
+            if (blocking)
+            {
+                try
+                {
+                    binLog.put(record);
+                    putInQueue = true;
+                }
+                catch (InterruptedException e)
+                {
+                    throw new RuntimeException(e);
+                }
+            }
+            else
+            {
+                if (!binLog.offer(record))
+                {
+                    logDroppedSample();
+                }
+                else
+                {
+                    putInQueue = true;
+                }
+            }
+        }
+        finally
+        {
+            if (!putInQueue)
+            {
+                record.release();
+            }
+        }
+    }
+
+    /**
+     * This is potentially lossy, but it's not super critical as we will always generally know
+     * when this is happening and roughly how bad it is.
+     */
+    private void logDroppedSample()
+    {
+        droppedSamplesSinceLastLog.incrementAndGet();
+        if (droppedSamplesStatement.warn(new Object[] {droppedSamplesSinceLastLog.get()}))
+        {
+            droppedSamplesSinceLastLog.set(0);
+        }
+    }
+
+    protected static abstract class AbstractWeighableMarshallable extends BinLog.ReleaseableWriteMarshallable implements WeightedQueue.Weighable
+    {
+        private final ByteBuf queryOptionsBuffer;
+        private final long timeMillis;
+        private final int protocolVersion;
+
+        AbstractWeighableMarshallable(QueryOptions queryOptions, long timeMillis)
+        {
+            this.timeMillis = timeMillis;
+            ProtocolVersion version = queryOptions.getProtocolVersion();
+            this.protocolVersion = version.asInt();
+            int optionsSize = QueryOptions.codec.encodedSize(queryOptions, version);
+            queryOptionsBuffer = CBUtil.allocator.buffer(optionsSize, optionsSize);
+            /*
+             * Struggled with what tradeoff to make in terms of query options which is potentially large and complicated
+             * There is tension between low garbage production (or allocator overhead), small working set size, and CPU overhead reserializing the
+             * query options into binary format.
+             *
+             * I went with the lowest risk most predictable option which is allocator overhead and CPU overhead
+             * rather then keep the original query message around so I could just serialize that as a memcpy. It's more
+             * instructions when turned on, but it doesn't change memory footprint quite as much and it's more pay for what you use
+             * in terms of query volume. The CPU overhead is spread out across producers so we should at least get
+             * some scaling.
+             *
+             */
+            boolean success = false;
+            try
+            {
+                QueryOptions.codec.encode(queryOptions, queryOptionsBuffer, version);
+                success = true;
+            }
+            finally
+            {
+                if (!success)
+                {
+                    queryOptionsBuffer.release();
+                }
+            }
+        }
+
+        @Override
+        public void writeMarshallable(WireOut wire)
+        {
+            wire.write("protocol-version").int32(protocolVersion);
+            wire.write("query-options").bytes(BytesStore.wrap(queryOptionsBuffer.nioBuffer()));
+            wire.write("query-time").int64(timeMillis);
+        }
+
+        @Override
+        public void release()
+        {
+            queryOptionsBuffer.release();
+        }
+
+        //8-bytes for protocol version (assume alignment cost), 8-byte timestamp, 8-byte object header + other contents
+        @Override
+        public int weight()
+        {
+            return 8 + 8 + OBJECT_HEADER_SIZE + EMPTY_BYTEBUF_SIZE + queryOptionsBuffer.capacity();
+        }
+    }
+
+    private static Throwable cleanDirectory(File directory, Throwable accumulate)
+    {
+        if (!directory.exists())
+        {
+            return Throwables.merge(accumulate, new RuntimeException(String.format("%s does not exists", directory)));
+        }
+        if (!directory.isDirectory())
+        {
+            return Throwables.merge(accumulate, new RuntimeException(String.format("%s is not a directory", directory)));
+        }
+        for (File f : directory.listFiles())
+        {
+            accumulate = deleteRecursively(f, accumulate);
+        }
+        if (accumulate instanceof FSError)
+        {
+            FileUtils.handleFSError((FSError)accumulate);
+        }
+        return accumulate;
+    }
+
+    private static Throwable deleteRecursively(File fileOrDirectory, Throwable accumulate)
+    {
+        if (fileOrDirectory.isDirectory())
+        {
+            for (File f : fileOrDirectory.listFiles())
+            {
+                accumulate = FileUtils.deleteWithConfirm(f, true, accumulate);
+            }
+        }
+        return FileUtils.deleteWithConfirm(fileOrDirectory, true , accumulate);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/audit/FileAuditLogger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/audit/FileAuditLogger.java b/src/java/org/apache/cassandra/audit/FileAuditLogger.java
new file mode 100644
index 0000000..9490bdd
--- /dev/null
+++ b/src/java/org/apache/cassandra/audit/FileAuditLogger.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.audit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Synchronous, file-based audit logger; just uses the standard logging mechansim.
+ */
+public class FileAuditLogger implements IAuditLogger
+{
+    protected static final Logger logger = LoggerFactory.getLogger(FileAuditLogger.class);
+
+    private volatile boolean enabled;
+
+    public FileAuditLogger()
+    {
+        enabled = true;
+    }
+
+    @Override
+    public boolean enabled()
+    {
+        return enabled;
+    }
+
+    @Override
+    public void log(AuditLogEntry auditLogEntry)
+    {
+        // don't bother with the volatile read of enabled here. just go ahead and log, other components
+        // will check the enbaled field.
+        logger.info(auditLogEntry.getLogString());
+    }
+
+    @Override
+    public void stop()
+    {
+        enabled = false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/audit/FullQueryLogger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/audit/FullQueryLogger.java b/src/java/org/apache/cassandra/audit/FullQueryLogger.java
new file mode 100644
index 0000000..36d0127
--- /dev/null
+++ b/src/java/org/apache/cassandra/audit/FullQueryLogger.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.audit;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+
+import net.openhft.chronicle.bytes.BytesStore;
+import net.openhft.chronicle.wire.ValueOut;
+import net.openhft.chronicle.wire.WireOut;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.binlog.BinLog;
+
+/**
+ * A logger that logs entire query contents after the query finishes (or times out).
+ */
+public class FullQueryLogger extends BinLogAuditLogger implements IAuditLogger
+{
+    @Override
+    public void log(AuditLogEntry entry)
+    {
+        logQuery(entry.getOperation(), entry.getOptions(), entry.getTimestamp());
+    }
+
+    /**
+     * Log an invocation of a batch of queries
+     * @param type The type of the batch
+     * @param queries CQL text of the queries
+     * @param values Values to bind to as parameters for the queries
+     * @param queryOptions Options associated with the query invocation
+     * @param batchTimeMillis Approximate time in milliseconds since the epoch since the batch was invoked
+     */
+    void logBatch(String type, List<String> queries, List<List<ByteBuffer>> values, QueryOptions queryOptions, long batchTimeMillis)
+    {
+        Preconditions.checkNotNull(type, "type was null");
+        Preconditions.checkNotNull(queries, "queries was null");
+        Preconditions.checkNotNull(values, "value was null");
+        Preconditions.checkNotNull(queryOptions, "queryOptions was null");
+        Preconditions.checkArgument(batchTimeMillis > 0, "batchTimeMillis must be > 0");
+
+        //Don't construct the wrapper if the log is disabled
+        BinLog binLog = this.binLog;
+        if (binLog == null)
+        {
+            return;
+        }
+
+        WeighableMarshallableBatch wrappedBatch = new WeighableMarshallableBatch(type, queries, values, queryOptions, batchTimeMillis);
+        logRecord(wrappedBatch, binLog);
+    }
+
+    /**
+     * Log a single CQL query
+     * @param query CQL query text
+     * @param queryOptions Options associated with the query invocation
+     * @param queryTimeMillis Approximate time in milliseconds since the epoch since the batch was invoked
+     */
+    void logQuery(String query, QueryOptions queryOptions, long queryTimeMillis)
+    {
+        Preconditions.checkNotNull(query, "query was null");
+        Preconditions.checkNotNull(queryOptions, "queryOptions was null");
+        Preconditions.checkArgument(queryTimeMillis > 0, "queryTimeMillis must be > 0");
+
+        //Don't construct the wrapper if the log is disabled
+        BinLog binLog = this.binLog;
+        if (binLog == null)
+        {
+            return;
+        }
+
+        WeighableMarshallableQuery wrappedQuery = new WeighableMarshallableQuery(query, queryOptions, queryTimeMillis);
+        logRecord(wrappedQuery, binLog);
+    }
+
+    static class WeighableMarshallableBatch extends AbstractWeighableMarshallable
+    {
+        private final int weight;
+        private final String batchType;
+        private final List<String> queries;
+        private final List<List<ByteBuffer>> values;
+
+        public WeighableMarshallableBatch(String batchType, List<String> queries, List<List<ByteBuffer>> values, QueryOptions queryOptions, long batchTimeMillis)
+        {
+           super(queryOptions, batchTimeMillis);
+           this.queries = queries;
+           this.values = values;
+           this.batchType = batchType;
+           boolean success = false;
+           try
+           {
+               //weight, batch type, queries, values
+               int weightTemp = 8 + EMPTY_LIST_SIZE + EMPTY_LIST_SIZE;
+               for (int ii = 0; ii < queries.size(); ii++)
+               {
+                   weightTemp += ObjectSizes.sizeOf(queries.get(ii));
+               }
+
+               weightTemp += EMPTY_LIST_SIZE * values.size();
+               for (int ii = 0; ii < values.size(); ii++)
+               {
+                   List<ByteBuffer> sublist = values.get(ii);
+                   weightTemp += EMPTY_BYTEBUFFER_SIZE * sublist.size();
+                   for (int zz = 0; zz < sublist.size(); zz++)
+                   {
+                       weightTemp += sublist.get(zz).capacity();
+                   }
+               }
+               weightTemp += super.weight();
+               weightTemp += ObjectSizes.sizeOf(batchType);
+               weight = weightTemp;
+               success = true;
+           }
+           finally
+           {
+               if (!success)
+               {
+                   release();
+               }
+           }
+        }
+
+        @Override
+        public void writeMarshallable(WireOut wire)
+        {
+            wire.write("type").text("batch");
+            super.writeMarshallable(wire);
+            wire.write("batch-type").text(batchType);
+            ValueOut valueOut = wire.write("queries");
+            valueOut.int32(queries.size());
+            for (String query : queries)
+            {
+                valueOut.text(query);
+            }
+            valueOut = wire.write("values");
+            valueOut.int32(values.size());
+            for (List<ByteBuffer> subValues : values)
+            {
+                valueOut.int32(subValues.size());
+                for (ByteBuffer value : subValues)
+                {
+                    valueOut.bytes(BytesStore.wrap(value));
+                }
+            }
+        }
+
+        @Override
+        public int weight()
+        {
+            return weight;
+        }
+    }
+
+    static class WeighableMarshallableQuery extends AbstractWeighableMarshallable
+    {
+        private final String query;
+
+        public WeighableMarshallableQuery(String query, QueryOptions queryOptions, long queryTimeMillis)
+        {
+            super(queryOptions, queryTimeMillis);
+            this.query = query;
+        }
+
+        @Override
+        public void writeMarshallable(WireOut wire)
+        {
+            wire.write("type").text("single");
+            super.writeMarshallable(wire);
+            wire.write("query").text(query);
+        }
+
+        @Override
+        public int weight()
+        {
+            return Ints.checkedCast(ObjectSizes.sizeOf(query)) + super.weight();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/audit/IAuditLogContext.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/audit/IAuditLogContext.java b/src/java/org/apache/cassandra/audit/IAuditLogContext.java
new file mode 100644
index 0000000..55c3e04
--- /dev/null
+++ b/src/java/org/apache/cassandra/audit/IAuditLogContext.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.audit;
+
+import org.apache.cassandra.cql3.CQLStatement;
+
+/**
+ * Provides the context needed for audit logging statements.
+ * {@link CQLStatement} implements this interface such that every CQL command provides the context needed for AuditLog.
+ */
+public interface IAuditLogContext
+{
+    AuditLogContext getAuditLogContext();
+
+    static class AuditLogContext
+    {
+        public final AuditLogEntryType auditLogEntryType;
+        public final String keyspace;
+        public final String scope;
+
+        public AuditLogContext(AuditLogEntryType auditLogEntryType)
+        {
+            this(auditLogEntryType,null,null);
+        }
+
+        public AuditLogContext(AuditLogEntryType auditLogEntryType, String keyspace)
+        {
+           this(auditLogEntryType,keyspace,null);
+        }
+
+        public AuditLogContext(AuditLogEntryType auditLogEntryType, String keyspace, String scope)
+        {
+            this.auditLogEntryType = auditLogEntryType;
+            this.keyspace = keyspace;
+            this.scope = scope;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/audit/IAuditLogger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/audit/IAuditLogger.java b/src/java/org/apache/cassandra/audit/IAuditLogger.java
new file mode 100644
index 0000000..b72a256
--- /dev/null
+++ b/src/java/org/apache/cassandra/audit/IAuditLogger.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.audit;
+
+import java.nio.file.Path;
+
+public interface IAuditLogger
+{
+    boolean enabled();
+
+    /**
+     * Logs AuditLogEntry. This method might be called after {@link #stop()},
+     * hence implementations need to handle the race condition.
+     */
+    void log(AuditLogEntry auditLogEntry);
+
+    /**
+     * Stop and cleanup any resources of IAuditLogger implementations. Please note that
+     * {@link #log(AuditLogEntry)} might be called after being stopped.
+     */
+    void stop();
+
+    /**
+     * @return the path to the logging files/directory if the implemenation writes out to the local filesystem,
+     * or null if the implementation doesn't log locally.
+     */
+    default Path path()
+    {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/audit/NoOpAuditLogger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/audit/NoOpAuditLogger.java b/src/java/org/apache/cassandra/audit/NoOpAuditLogger.java
new file mode 100644
index 0000000..8d3dd7d
--- /dev/null
+++ b/src/java/org/apache/cassandra/audit/NoOpAuditLogger.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.audit;
+
+/**
+ * No-Op implementation of {@link IAuditLogger} to be used as a default audit logger when audit logging is disabled.
+ */
+public class NoOpAuditLogger implements IAuditLogger
+{
+    @Override
+    public boolean enabled()
+    {
+        return false;
+    }
+
+    @Override
+    public void log(AuditLogEntry logMessage)
+    {
+
+    }
+
+    @Override
+    public void stop()
+    {
+
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org