You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2018/05/11 12:45:59 UTC
[2/4] cassandra git commit: Audit logging for database activity
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/AuthResponse.java b/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
index 332b024..88c3085 100644
--- a/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
+++ b/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.transport.messages;
import java.nio.ByteBuffer;
import io.netty.buffer.ByteBuf;
+import org.apache.cassandra.audit.AuditLogEntry;
+import org.apache.cassandra.audit.AuditLogEntryType;
import org.apache.cassandra.auth.AuthenticatedUser;
import org.apache.cassandra.auth.IAuthenticator;
import org.apache.cassandra.exceptions.AuthenticationException;
@@ -79,6 +81,14 @@ public class AuthResponse extends Message.Request
AuthenticatedUser user = negotiator.getAuthenticatedUser();
queryState.getClientState().login(user);
AuthMetrics.instance.markSuccess();
+ if (auditLogEnabled)
+ {
+ AuditLogEntry auditEntry = new AuditLogEntry.Builder(queryState.getClientState())
+ .setOperation("LOGIN SUCCESSFUL")
+ .setType(AuditLogEntryType.LOGIN_SUCCESS)
+ .build();
+ auditLogManager.log(auditEntry);
+ }
// authentication is complete, send a ready message to the client
return new AuthSuccess(challenge);
}
@@ -90,6 +100,14 @@ public class AuthResponse extends Message.Request
catch (AuthenticationException e)
{
AuthMetrics.instance.markFailure();
+ if (auditLogEnabled)
+ {
+ AuditLogEntry auditEntry = new AuditLogEntry.Builder(queryState.getClientState())
+ .setOperation("LOGIN FAILURE")
+ .setType(AuditLogEntryType.LOGIN_ERROR)
+ .build();
+ auditLogManager.log(auditEntry, e);
+ }
return ErrorMessage.fromException(e);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
index dcaa8da..5ffadac 100644
--- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
@@ -19,24 +19,31 @@ package org.apache.cassandra.transport.messages;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.UUID;
import com.google.common.collect.ImmutableMap;
-import io.netty.buffer.ByteBuf;
-import org.apache.cassandra.cql3.*;
+import io.netty.buffer.ByteBuf;
+import org.apache.cassandra.audit.AuditLogEntry;
+import org.apache.cassandra.audit.AuditLogEntryType;
+import org.apache.cassandra.cql3.Attributes;
+import org.apache.cassandra.cql3.BatchQueryOptions;
+import org.apache.cassandra.cql3.QueryHandler;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.statements.BatchStatement;
import org.apache.cassandra.cql3.statements.ModificationStatement;
import org.apache.cassandra.cql3.statements.ParsedStatement;
-import org.apache.cassandra.db.fullquerylog.FullQueryLogger;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.exceptions.PreparedQueryNotFoundException;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.transport.*;
+import org.apache.cassandra.transport.CBUtil;
+import org.apache.cassandra.transport.Message;
+import org.apache.cassandra.transport.ProtocolException;
+import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.MD5Digest;
import org.apache.cassandra.utils.UUIDGen;
@@ -176,30 +183,20 @@ public class BatchMessage extends Message.Request
QueryHandler handler = ClientState.getCQLQueryHandler();
List<ParsedStatement.Prepared> prepared = new ArrayList<>(queryOrIdList.size());
- boolean fullQueryLogEnabled = FullQueryLogger.instance.enabled();
- List<String> queryStrings = fullQueryLogEnabled ? new ArrayList<>(queryOrIdList.size()) : Collections.EMPTY_LIST;
for (int i = 0; i < queryOrIdList.size(); i++)
{
Object query = queryOrIdList.get(i);
- String queryString;
ParsedStatement.Prepared p;
if (query instanceof String)
{
p = QueryProcessor.parseStatement((String)query,
state.getClientState().cloneWithKeyspaceIfSet(options.getKeyspace()));
- queryString = (String)query;
}
else
{
p = handler.getPrepared((MD5Digest)query);
if (p == null)
throw new PreparedQueryNotFoundException((MD5Digest)query);
- queryString = p.rawCQLStatement;
- }
-
- if (fullQueryLogEnabled)
- {
- queryStrings.add(queryString);
}
List<ByteBuffer> queryValues = values.get(i);
@@ -227,17 +224,16 @@ public class BatchMessage extends Message.Request
// Note: It's ok at this point to pass a bogus value for the number of bound terms in the BatchState ctor
// (and no value would be really correct, so we prefer passing a clearly wrong one).
BatchStatement batch = new BatchStatement(-1, batchType, statements, Attributes.none());
- long fqlTime = 0;
- if (fullQueryLogEnabled)
- {
- fqlTime = System.currentTimeMillis();
- }
+
+ long fqlTime = isLoggingEnabled ? System.currentTimeMillis() : 0;
Message.Response response = handler.processBatch(batch, state, batchOptions, getCustomPayload(), queryStartNanoTime);
- if (fullQueryLogEnabled)
+
+ if (isLoggingEnabled)
{
- FullQueryLogger.instance.logBatch(batchType.name(), queryStrings, values, options, fqlTime);
+ auditLogManager.logBatch(batchType.name(), queryOrIdList, values, prepared, options, state, fqlTime);
}
+
if (tracingId != null)
response.setTracingId(tracingId);
@@ -245,6 +241,16 @@ public class BatchMessage extends Message.Request
}
catch (Exception e)
{
+ if (auditLogEnabled)
+ {
+ AuditLogEntry entry = new AuditLogEntry.Builder(state.getClientState())
+ .setOperation(getAuditString())
+ .setOptions(options)
+ .setType(AuditLogEntryType.BATCH)
+ .build();
+ auditLogManager.log(entry, e);
+ }
+
JVMStabilityInspector.inspectThrowable(e);
return ErrorMessage.fromException(e);
}
@@ -267,4 +273,13 @@ public class BatchMessage extends Message.Request
sb.append("] at consistency ").append(options.getConsistency());
return sb.toString();
}
+
+ private String getAuditString()
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append("BATCH of [");
+ sb.append(queryOrIdList.size());
+ sb.append("] statements at consistency ").append(options.getConsistency());
+ return sb.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
index e969134..cd7f300 100644
--- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
@@ -20,23 +20,24 @@ package org.apache.cassandra.transport.messages;
import java.util.UUID;
import com.google.common.collect.ImmutableMap;
-import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBuf;
+import org.apache.cassandra.audit.AuditLogEntry;
+import org.apache.cassandra.audit.AuditLogManager;
import org.apache.cassandra.cql3.CQLStatement;
import org.apache.cassandra.cql3.ColumnSpecification;
import org.apache.cassandra.cql3.QueryHandler;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.ResultSet;
-import org.apache.cassandra.cql3.statements.BatchStatement;
-import org.apache.cassandra.cql3.statements.ModificationStatement;
import org.apache.cassandra.cql3.statements.ParsedStatement;
-import org.apache.cassandra.cql3.statements.UpdateStatement;
-import org.apache.cassandra.db.fullquerylog.FullQueryLogger;
import org.apache.cassandra.exceptions.PreparedQueryNotFoundException;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.transport.*;
+import org.apache.cassandra.transport.CBUtil;
+import org.apache.cassandra.transport.Message;
+import org.apache.cassandra.transport.ProtocolException;
+import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.MD5Digest;
import org.apache.cassandra.utils.UUIDGen;
@@ -163,16 +164,21 @@ public class ExecuteMessage extends Message.Request
// Some custom QueryHandlers are interested by the bound names. We provide them this information
// by wrapping the QueryOptions.
QueryOptions queryOptions = QueryOptions.addColumnSpecifications(options, prepared.boundNames);
- boolean fqlEnabled = FullQueryLogger.instance.enabled();
- long fqlTime = 0;
- if (fqlEnabled)
- {
- fqlTime = System.currentTimeMillis();
- }
+
+ long fqlTime = isLoggingEnabled ? System.currentTimeMillis() : 0;
Message.Response response = handler.processPrepared(statement, state, queryOptions, getCustomPayload(), queryStartNanoTime);
- if (fqlEnabled)
+
+ if (isLoggingEnabled)
{
- FullQueryLogger.instance.logQuery(prepared.rawCQLStatement, options, fqlTime);
+ AuditLogEntry auditEntry = new AuditLogEntry.Builder(state.getClientState())
+ .setType(statement.getAuditLogContext().auditLogEntryType)
+ .setOperation(prepared.rawCQLStatement)
+ .setTimestamp(fqlTime)
+ .setScope(statement)
+ .setKeyspace(state, statement)
+ .setOptions(options)
+ .build();
+ AuditLogManager.getInstance().log(auditEntry);
}
if (response instanceof ResultMessage.Rows)
@@ -212,6 +218,33 @@ public class ExecuteMessage extends Message.Request
}
catch (Exception e)
{
+ if (auditLogEnabled)
+ {
+ if (e instanceof PreparedQueryNotFoundException)
+ {
+ AuditLogEntry auditLogEntry = new AuditLogEntry.Builder(state.getClientState())
+ .setOperation(toString())
+ .setOptions(options)
+ .build();
+ auditLogManager.log(auditLogEntry, e);
+ }
+ else
+ {
+ ParsedStatement.Prepared prepared = ClientState.getCQLQueryHandler().getPrepared(statementId);
+ if (prepared != null)
+ {
+ AuditLogEntry auditLogEntry = new AuditLogEntry.Builder(state.getClientState())
+ .setOperation(toString())
+ .setType(prepared.statement.getAuditLogContext().auditLogEntryType)
+ .setScope(prepared.statement)
+ .setKeyspace(state, prepared.statement)
+ .setOptions(options)
+ .build();
+ auditLogManager.log(auditLogEntry, e);
+ }
+ }
+ }
+
JVMStabilityInspector.inspectThrowable(e);
return ErrorMessage.fromException(e);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
index b6bf055..e5e5248 100644
--- a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
@@ -20,12 +20,18 @@ package org.apache.cassandra.transport.messages;
import java.util.UUID;
import com.google.common.collect.ImmutableMap;
-import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBuf;
+import org.apache.cassandra.audit.AuditLogEntry;
+import org.apache.cassandra.audit.AuditLogEntryType;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.statements.ParsedStatement;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.transport.*;
+import org.apache.cassandra.transport.CBUtil;
+import org.apache.cassandra.transport.Message;
+import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.UUIDGen;
@@ -111,6 +117,17 @@ public class PrepareMessage extends Message.Request
Message.Response response = ClientState.getCQLQueryHandler().prepare(query,
state.getClientState().cloneWithKeyspaceIfSet(keyspace),
getCustomPayload());
+ if (auditLogEnabled)
+ {
+ ParsedStatement.Prepared parsedStmt = QueryProcessor.parseStatement(query, state.getClientState());
+ AuditLogEntry auditLogEntry = new AuditLogEntry.Builder(state.getClientState())
+ .setOperation(query)
+ .setType(AuditLogEntryType.PREPARE_STATEMENT)
+ .setScope(parsedStmt.statement)
+ .setKeyspace(parsedStmt.statement)
+ .build();
+ auditLogManager.log(auditLogEntry);
+ }
if (tracingId != null)
response.setTracingId(tracingId);
@@ -119,6 +136,15 @@ public class PrepareMessage extends Message.Request
}
catch (Exception e)
{
+ if (auditLogEnabled)
+ {
+ AuditLogEntry auditLogEntry = new AuditLogEntry.Builder(state.getClientState())
+ .setOperation(query)
+ .setKeyspace(keyspace)
+ .setType(AuditLogEntryType.PREPARE_STATEMENT)
+ .build();
+ auditLogManager.log(auditLogEntry, e);
+ }
JVMStabilityInspector.inspectThrowable(e);
return ErrorMessage.fromException(e);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
index 8f64033..9df9205 100644
--- a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
@@ -22,8 +22,11 @@ import java.util.UUID;
import com.google.common.collect.ImmutableMap;
import io.netty.buffer.ByteBuf;
+import org.apache.cassandra.audit.AuditLogEntry;
+import org.apache.cassandra.audit.AuditLogManager;
import org.apache.cassandra.cql3.QueryOptions;
-import org.apache.cassandra.db.fullquerylog.FullQueryLogger;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.statements.ParsedStatement;
import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.exceptions.RequestValidationException;
import org.apache.cassandra.service.ClientState;
@@ -114,17 +117,24 @@ public class QueryMessage extends Message.Request
Tracing.instance.begin("Execute CQL3 query", state.getClientAddress(), builder.build());
}
- boolean fqlEnabled = FullQueryLogger.instance.enabled();
- long fqlTime = 0;
- if (fqlEnabled)
- {
- fqlTime = System.currentTimeMillis();
- }
+ long fqlTime = isLoggingEnabled ? System.currentTimeMillis() : 0;
Message.Response response = ClientState.getCQLQueryHandler().process(query, state, options, getCustomPayload(), queryStartNanoTime);
- if (fqlEnabled)
+
+ if (isLoggingEnabled)
{
- FullQueryLogger.instance.logQuery(query, options, fqlTime);
+ ParsedStatement.Prepared parsedStatement = QueryProcessor.parseStatement(query, state.getClientState());
+ AuditLogEntry auditEntry = new AuditLogEntry.Builder(state.getClientState())
+ .setType(parsedStatement.statement.getAuditLogContext().auditLogEntryType)
+ .setOperation(query)
+ .setTimestamp(fqlTime)
+ .setScope(parsedStatement.statement)
+ .setKeyspace(state, parsedStatement.statement)
+ .setOptions(options)
+ .build();
+ AuditLogManager.getInstance().log(auditEntry);
+
}
+
if (options.skipMetadata() && response instanceof ResultMessage.Rows)
((ResultMessage.Rows)response).result.metadata.setSkipMetadata();
@@ -135,6 +145,14 @@ public class QueryMessage extends Message.Request
}
catch (Exception e)
{
+ if (auditLogEnabled)
+ {
+ AuditLogEntry auditLogEntry = new AuditLogEntry.Builder(state.getClientState())
+ .setOperation(query)
+ .setOptions(options)
+ .build();
+ auditLogManager.log(auditLogEntry, e);
+ }
JVMStabilityInspector.inspectThrowable(e);
if (!((e instanceof RequestValidationException) || (e instanceof RequestExecutionException)))
logger.error("Unexpected error during query", e);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index 078b414..b621071 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -40,6 +40,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.auth.AllowAllNetworkAuthorizer;
+import org.apache.cassandra.audit.IAuditLogger;
import org.apache.cassandra.auth.IAuthenticator;
import org.apache.cassandra.auth.IAuthorizer;
import org.apache.cassandra.auth.INetworkAuthorizer;
@@ -526,6 +527,29 @@ public class FBUtilities
}
return FBUtilities.construct(className, "network authorizer");
}
+
+ public static IAuditLogger newAuditLogger(String className) throws ConfigurationException
+ {
+ if (!className.contains("."))
+ className = "org.apache.cassandra.audit." + className;
+ return FBUtilities.construct(className, "Audit logger");
+ }
+
+ public static boolean isAuditLoggerClassExists(String className)
+ {
+ if (!className.contains("."))
+ className = "org.apache.cassandra.audit." + className;
+
+ try
+ {
+ FBUtilities.classForName(className, "Audit logger");
+ }
+ catch (ConfigurationException e)
+ {
+ return false;
+ }
+ return true;
+ }
/**
* @return The Class for the given name.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/src/java/org/apache/cassandra/utils/binlog/BinLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/binlog/BinLog.java b/src/java/org/apache/cassandra/utils/binlog/BinLog.java
index 070a151..0c8659e 100644
--- a/src/java/org/apache/cassandra/utils/binlog/BinLog.java
+++ b/src/java/org/apache/cassandra/utils/binlog/BinLog.java
@@ -272,6 +272,6 @@ public class BinLog implements Runnable, StoreFileListener
public abstract static class ReleaseableWriteMarshallable implements WriteMarshallable
{
- protected abstract void release();
+ public abstract void release();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/test/unit/org/apache/cassandra/audit/AuditLogFilterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/audit/AuditLogFilterTest.java b/test/unit/org/apache/cassandra/audit/AuditLogFilterTest.java
new file mode 100644
index 0000000..8054f90
--- /dev/null
+++ b/test/unit/org/apache/cassandra/audit/AuditLogFilterTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.audit;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.cassandra.audit.AuditLogFilter.isFiltered;
+
+public class AuditLogFilterTest
+{
+ @Test
+ public void isFiltered_IncludeSetOnly()
+ {
+ Set<String> includeSet = new HashSet<>();
+ includeSet.add("a");
+ includeSet.add("b");
+ includeSet.add("c");
+
+ Set<String> excludeSet = new HashSet<>();
+
+ Assert.assertFalse(isFiltered("a", includeSet, excludeSet));
+ Assert.assertFalse(isFiltered("b", includeSet, excludeSet));
+ Assert.assertFalse(isFiltered("c", includeSet, excludeSet));
+ Assert.assertTrue(isFiltered("d", includeSet, excludeSet));
+ Assert.assertTrue(isFiltered("e", includeSet, excludeSet));
+ }
+
+ @Test
+ public void isFiltered_ExcludeSetOnly()
+ {
+ Set<String> includeSet = new HashSet<>();
+
+ Set<String> excludeSet = new HashSet<>();
+ excludeSet.add("a");
+ excludeSet.add("b");
+ excludeSet.add("c");
+
+ Assert.assertTrue(isFiltered("a", includeSet, excludeSet));
+ Assert.assertTrue(isFiltered("b", includeSet, excludeSet));
+ Assert.assertTrue(isFiltered("c", includeSet, excludeSet));
+ Assert.assertFalse(isFiltered("d", includeSet, excludeSet));
+ Assert.assertFalse(isFiltered("e", includeSet, excludeSet));
+ }
+
+ @Test
+ public void isFiltered_MutualExclusive()
+ {
+ Set<String> includeSet = new HashSet<>();
+ includeSet.add("a");
+ includeSet.add("b");
+ includeSet.add("c");
+
+ Set<String> excludeSet = new HashSet<>();
+ excludeSet.add("a");
+
+ Assert.assertTrue(isFiltered("a", includeSet, excludeSet));
+ Assert.assertFalse(isFiltered("b", includeSet, excludeSet));
+ Assert.assertFalse(isFiltered("c", includeSet, excludeSet));
+ Assert.assertTrue(isFiltered("e", includeSet, excludeSet));
+ }
+
+ @Test
+ public void isFiltered_MutualInclusive()
+ {
+ Set<String> includeSet = new HashSet<>();
+ includeSet.add("a");
+ includeSet.add("b");
+
+ Set<String> excludeSet = new HashSet<>();
+ excludeSet.add("c");
+ excludeSet.add("d");
+
+ Assert.assertFalse(isFiltered("a", includeSet, excludeSet));
+ Assert.assertFalse(isFiltered("b", includeSet, excludeSet));
+ Assert.assertTrue(isFiltered("c", includeSet, excludeSet));
+ Assert.assertTrue(isFiltered("d", includeSet, excludeSet));
+ Assert.assertTrue(isFiltered("e", includeSet, excludeSet));
+ Assert.assertTrue(isFiltered("f", includeSet, excludeSet));
+ }
+
+ @Test
+ public void isFiltered_UnSpecifiedInput()
+ {
+ Set<String> includeSet = new HashSet<>();
+ includeSet.add("a");
+ includeSet.add("b");
+ includeSet.add("c");
+
+ Set<String> excludeSet = new HashSet<>();
+ excludeSet.add("a");
+
+ Assert.assertTrue(isFiltered("a", includeSet, excludeSet));
+ Assert.assertFalse(isFiltered("b", includeSet, excludeSet));
+ Assert.assertFalse(isFiltered("c", includeSet, excludeSet));
+ Assert.assertTrue(isFiltered("d", includeSet, excludeSet));
+ Assert.assertTrue(isFiltered("e", includeSet, excludeSet));
+ }
+
+ @Test
+ public void isFiltered_SpecifiedInput()
+ {
+ Set<String> includeSet = new HashSet<>();
+ includeSet.add("a");
+ includeSet.add("b");
+ includeSet.add("c");
+
+ Set<String> excludeSet = new HashSet<>();
+ excludeSet.add("a");
+
+ Assert.assertTrue(isFiltered("a", includeSet, excludeSet));
+ Assert.assertFalse(isFiltered("b", includeSet, excludeSet));
+ Assert.assertFalse(isFiltered("c", includeSet, excludeSet));
+ }
+
+ @Test
+ public void isFiltered_FilteredInput_EmptyInclude()
+ {
+ Set<String> includeSet = new HashSet<>();
+ Set<String> excludeSet = new HashSet<>();
+ excludeSet.add("a");
+
+ Assert.assertTrue(isFiltered("a", includeSet, excludeSet));
+ Assert.assertFalse(isFiltered("b", includeSet, excludeSet));
+ }
+
+ @Test
+ public void isFiltered_FilteredInput_EmptyExclude()
+ {
+ Set<String> includeSet = new HashSet<>();
+ includeSet.add("a");
+ includeSet.add("b");
+ includeSet.add("c");
+
+ Set<String> excludeSet = new HashSet<>();
+
+ Assert.assertFalse(isFiltered("a", includeSet, excludeSet));
+ Assert.assertFalse(isFiltered("b", includeSet, excludeSet));
+ Assert.assertFalse(isFiltered("c", includeSet, excludeSet));
+ Assert.assertTrue(isFiltered("e", includeSet, excludeSet));
+ }
+
+ @Test
+ public void isFiltered_EmptyInputs()
+ {
+ Set<String> includeSet = new HashSet<>();
+ Set<String> excludeSet = new HashSet<>();
+
+ Assert.assertFalse(isFiltered("a", includeSet, excludeSet));
+ Assert.assertFalse(isFiltered("e", includeSet, excludeSet));
+ }
+
+ @Test
+ public void isFiltered_NullInputs()
+ {
+ Set<String> includeSet = new HashSet<>();
+ Set<String> excludeSet = new HashSet<>();
+ Assert.assertFalse(isFiltered(null, includeSet, excludeSet));
+
+ includeSet.add("a");
+ includeSet.add("b");
+ includeSet.add("c");
+ Assert.assertTrue(isFiltered(null, includeSet, excludeSet));
+
+ includeSet = new HashSet<>();
+ excludeSet.add("a");
+ excludeSet.add("b");
+ Assert.assertFalse(isFiltered(null, includeSet, excludeSet));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/test/unit/org/apache/cassandra/audit/AuditLoggerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/audit/AuditLoggerTest.java b/test/unit/org/apache/cassandra/audit/AuditLoggerTest.java
new file mode 100644
index 0000000..40eadf8
--- /dev/null
+++ b/test/unit/org/apache/cassandra/audit/AuditLoggerTest.java
@@ -0,0 +1,690 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.audit;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.datastax.driver.core.exceptions.SyntaxError;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.service.StorageService;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+
+public class AuditLoggerTest extends CQLTester
+{
+ @BeforeClass
+ public static void setUp()
+ {
+ AuditLogOptions options = new AuditLogOptions();
+ options.enabled = true;
+ options.logger = "InMemoryAuditLogger";
+ DatabaseDescriptor.setAuditLoggingOptions(options);
+ requireNetwork();
+ }
+
+ @Before
+ public void beforeTestMethod()
+ {
+ AuditLogOptions options = new AuditLogOptions();
+ enableAuditLogOptions(options);
+ }
+
+ private void enableAuditLogOptions(AuditLogOptions options)
+ {
+ String loggerName = "InMemoryAuditLogger";
+ String includedKeyspaces = options.included_keyspaces;
+ String excludedKeyspaces = options.excluded_keyspaces;
+ String includedCategories = options.included_categories;
+ String excludedCategories = options.excluded_categories;
+ String includedUsers = options.included_users;
+ String excludedUsers = options.excluded_users;
+
+ StorageService.instance.enableAuditLog(loggerName, includedKeyspaces, excludedKeyspaces, includedCategories, excludedCategories, includedUsers, excludedUsers);
+ }
+
+ private void disableAuditLogOptions()
+ {
+ StorageService.instance.disableAuditLog();
+ }
+
+ @Test
+ public void testAuditLogFilters() throws Throwable
+ {
+ createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+ execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 1, "Apache", "Cassandra");
+ execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 2, "trace", "test");
+
+ AuditLogOptions options = new AuditLogOptions();
+ options.excluded_keyspaces = KEYSPACE;
+ enableAuditLogOptions(options);
+
+ String cql = "SELECT id, v1, v2 FROM " + KEYSPACE + '.' + currentTable() + " WHERE id = ?";
+ ResultSet rs = executeAndAssertNoAuditLog(cql, 1);
+ assertEquals(1, rs.all().size());
+
+ options = new AuditLogOptions();
+ options.included_keyspaces = KEYSPACE;
+ enableAuditLogOptions(options);
+
+ cql = "SELECT id, v1, v2 FROM " + KEYSPACE + '.' + currentTable() + " WHERE id = ?";
+ rs = executeAndAssertWithPrepare(cql, AuditLogEntryType.SELECT, 1);
+ assertEquals(1, rs.all().size());
+
+ options = new AuditLogOptions();
+ options.included_keyspaces = KEYSPACE;
+ options.excluded_keyspaces = KEYSPACE;
+ enableAuditLogOptions(options);
+
+ cql = "SELECT id, v1, v2 FROM " + KEYSPACE + '.' + currentTable() + " WHERE id = ?";
+ rs = executeAndAssertNoAuditLog(cql, 1);
+ assertEquals(1, rs.all().size());
+
+ options = new AuditLogOptions();
+ enableAuditLogOptions(options);
+
+ cql = "SELECT id, v1, v2 FROM " + KEYSPACE + '.' + currentTable() + " WHERE id = ?";
+ rs = executeAndAssertWithPrepare(cql, AuditLogEntryType.SELECT, 1);
+ assertEquals(1, rs.all().size());
+ }
+
+ @Test
+ public void testAuditLogFiltersTransitions() throws Throwable
+ {
+ createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+ execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 1, "Apache", "Cassandra");
+ execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 2, "trace", "test");
+
+ AuditLogOptions options = new AuditLogOptions();
+ options.excluded_keyspaces = KEYSPACE;
+ enableAuditLogOptions(options);
+
+ String cql = "SELECT id, v1, v2 FROM " + KEYSPACE + '.' + currentTable() + " WHERE id = ?";
+ ResultSet rs = executeAndAssertNoAuditLog(cql, 1);
+ assertEquals(1, rs.all().size());
+
+ disableAuditLogOptions();
+
+ cql = "SELECT id, v1, v2 FROM " + KEYSPACE + '.' + currentTable() + " WHERE id = ?";
+ rs = executeAndAssertDisableAuditLog(cql, 1);
+ assertEquals(1, rs.all().size());
+
+ options = new AuditLogOptions();
+ options.included_keyspaces = KEYSPACE;
+ options.excluded_keyspaces = KEYSPACE;
+ enableAuditLogOptions(options);
+
+ cql = "SELECT id, v1, v2 FROM " + KEYSPACE + '.' + currentTable() + " WHERE id = ?";
+ rs = executeAndAssertNoAuditLog(cql, 1);
+ assertEquals(1, rs.all().size());
+
+ disableAuditLogOptions();
+
+ cql = "SELECT id, v1, v2 FROM " + KEYSPACE + '.' + currentTable() + " WHERE id = ?";
+ rs = executeAndAssertDisableAuditLog(cql, 1);
+ assertEquals(1, rs.all().size());
+ }
+
+ @Test
+ public void testAuditLogExceptions()
+ {
+ AuditLogOptions options = new AuditLogOptions();
+ options.excluded_keyspaces = KEYSPACE;
+ enableAuditLogOptions(options);
+ Assert.assertTrue(AuditLogManager.getInstance().isAuditingEnabled());
+
+ disableAuditLogOptions();
+ }
+
+ @Test
+ public void testAuditLogFilterIncludeExclude() throws Throwable
+ {
+ createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+ String tbl1 = currentTable();
+ execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 1, "Apache", "Cassandra");
+ execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 2, "trace", "test");
+
+ AuditLogOptions options = new AuditLogOptions();
+ options.excluded_categories = "QUERY";
+ options.included_categories = "QUERY,DML,PREPARE";
+ enableAuditLogOptions(options);
+
+ //QUERY - Should be filtered, part of excluded categories,
+ String cql = "SELECT id, v1, v2 FROM " + KEYSPACE + '.' + currentTable() + " WHERE id = 1";
+ Session session = sessionNet();
+ ResultSet rs = session.execute(cql);
+
+ assertEquals(0, ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.size());
+ assertEquals(1, rs.all().size());
+
+ //DML - Should not be filtered, part of included categories
+ cql = "INSERT INTO " + KEYSPACE + '.' + currentTable() + " (id, v1, v2) VALUES (?, ?, ?)";
+ executeAndAssertWithPrepare(cql, AuditLogEntryType.UPDATE, 1, "insert_audit", "test");
+
+ //DDL - Should be filtered, not part of included categories
+ cql = "ALTER TABLE " + KEYSPACE + '.' + currentTable() + " ADD v3 text";
+ session = sessionNet();
+ rs = session.execute(cql);
+ assertEquals(0, ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.size());
+ }
+
+ @Test
+ public void testCqlSelectAuditing() throws Throwable
+ {
+ createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+ execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 1, "Apache", "Cassandra");
+ execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 2, "trace", "test");
+
+ String cql = "SELECT id, v1, v2 FROM " + KEYSPACE + '.' + currentTable() + " WHERE id = ?";
+ ResultSet rs = executeAndAssertWithPrepare(cql, AuditLogEntryType.SELECT, 1);
+
+ assertEquals(1, rs.all().size());
+ }
+
+ @Test
+ public void testCqlInsertAuditing() throws Throwable
+ {
+ createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+
+ String cql = "INSERT INTO " + KEYSPACE + '.' + currentTable() + " (id, v1, v2) VALUES (?, ?, ?)";
+ executeAndAssertWithPrepare(cql, AuditLogEntryType.UPDATE, 1, "insert_audit", "test");
+ }
+
+ @Test
+ public void testCqlUpdateAuditing() throws Throwable
+ {
+ createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+ execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 1, "Apache", "Cassandra");
+ execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 2, "trace", "test");
+
+ String cql = "UPDATE " + KEYSPACE + '.' + currentTable() + " SET v1 = 'ApacheCassandra' WHERE id = 1";
+ executeAndAssert(cql, AuditLogEntryType.UPDATE);
+
+ cql = "UPDATE " + KEYSPACE + '.' + currentTable() + " SET v1 = ? WHERE id = ?";
+ executeAndAssertWithPrepare(cql, AuditLogEntryType.UPDATE, "AuditingTest", 2);
+ }
+
+ @Test
+ public void testCqlDeleteAuditing() throws Throwable
+ {
+ createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+ execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 1, "Apache", "Cassandra");
+ execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 2, "trace", "test");
+
+ String cql = "DELETE FROM " + KEYSPACE + '.' + currentTable() + " WHERE id = ?";
+ executeAndAssertWithPrepare(cql, AuditLogEntryType.DELETE, 1);
+ }
+
+ @Test
+ public void testCqlTruncateAuditing() throws Throwable
+ {
+ createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+ execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 1, "Apache", "Cassandra");
+ execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 2, "trace", "test");
+
+ String cql = "TRUNCATE TABLE " + KEYSPACE + '.' + currentTable();
+ executeAndAssertWithPrepare(cql, AuditLogEntryType.TRUNCATE);
+ }
+
+ @Test
+ public void testCqlBatchAuditing() throws Throwable
+ {
+ createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+
+ Session session = sessionNet();
+
+ BatchStatement batchStatement = new BatchStatement();
+
+ String cqlInsert = "INSERT INTO " + KEYSPACE + "." + currentTable() + " (id, v1, v2) VALUES (?, ?, ?)";
+ PreparedStatement prep = session.prepare(cqlInsert);
+ AuditLogEntry logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+ assertLogEntry(cqlInsert, AuditLogEntryType.PREPARE_STATEMENT, logEntry, false);
+
+ batchStatement.add(prep.bind(1, "Apapche", "Cassandra"));
+ batchStatement.add(prep.bind(2, "Apapche1", "Cassandra1"));
+
+ String cqlUpdate = "UPDATE " + KEYSPACE + "." + currentTable() + " SET v1 = ? WHERE id = ?";
+ prep = session.prepare(cqlUpdate);
+ logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+ assertLogEntry(cqlUpdate, AuditLogEntryType.PREPARE_STATEMENT, logEntry, false);
+
+ batchStatement.add(prep.bind("Apache Cassandra", 1));
+
+ String cqlDelete = "DELETE FROM " + KEYSPACE + "." + currentTable() + " WHERE id = ?";
+ prep = session.prepare(cqlDelete);
+ logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+ assertLogEntry(cqlDelete, AuditLogEntryType.PREPARE_STATEMENT, logEntry, false);
+
+ batchStatement.add(prep.bind(1));
+
+ ResultSet rs = session.execute(batchStatement);
+
+ assertEquals(5, ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.size());
+ logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+
+ logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+ assertLogEntry(cqlInsert, AuditLogEntryType.UPDATE, logEntry, false);
+
+ logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+ assertLogEntry(cqlInsert, AuditLogEntryType.UPDATE, logEntry, false);
+
+ logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+ assertLogEntry(cqlUpdate, AuditLogEntryType.UPDATE, logEntry, false);
+
+ logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+ assertLogEntry(cqlDelete, AuditLogEntryType.DELETE, logEntry, false);
+
+ int size = rs.all().size();
+
+ assertEquals(0, size);
+ }
+
+ @Test
+ public void testCqlBatch_MultipleTablesAuditing()
+ {
+ createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+ String table1 = currentTable();
+
+ Session session = sessionNet();
+
+ BatchStatement batchStatement = new BatchStatement();
+
+ String cqlInsert1 = "INSERT INTO " + KEYSPACE + "." + table1 + " (id, v1, v2) VALUES (?, ?, ?)";
+ PreparedStatement prep = session.prepare(cqlInsert1);
+ AuditLogEntry logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+ assertLogEntry(cqlInsert1, AuditLogEntryType.PREPARE_STATEMENT, logEntry, false);
+
+ batchStatement.add(prep.bind(1, "Apapche", "Cassandra"));
+
+ createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+ String table2 = currentTable();
+
+ String cqlInsert2 = "INSERT INTO " + KEYSPACE + "." + table2 + " (id, v1, v2) VALUES (?, ?, ?)";
+ prep = session.prepare(cqlInsert2);
+ logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+ assertLogEntry(cqlInsert2, AuditLogEntryType.PREPARE_STATEMENT, logEntry, false);
+
+ batchStatement.add(prep.bind(1, "Apapche", "Cassandra"));
+
+ createKeyspace("CREATE KEYSPACE %s WITH replication={ 'class' : 'SimpleStrategy', 'replication_factor' : 1 }");
+ String ks2 = currentKeyspace();
+
+ createTable(ks2, "CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+ String table3 = currentTable();
+
+ String cqlInsert3 = "INSERT INTO " + ks2 + "." + table3 + " (id, v1, v2) VALUES (?, ?, ?)";
+ prep = session.prepare(cqlInsert3);
+ logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+ assertLogEntry(cqlInsert3, AuditLogEntryType.PREPARE_STATEMENT, logEntry, false, ks2);
+
+ batchStatement.add(prep.bind(1, "Apapche", "Cassandra"));
+
+ ResultSet rs = session.execute(batchStatement);
+
+ assertEquals(4, ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.size());
+ logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+
+ logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+ assertLogEntry(cqlInsert1, table1, AuditLogEntryType.UPDATE, logEntry, false, KEYSPACE);
+
+ logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+ assertLogEntry(cqlInsert2, table2, AuditLogEntryType.UPDATE, logEntry, false, KEYSPACE);
+
+ logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+ assertLogEntry(cqlInsert3, table3, AuditLogEntryType.UPDATE, logEntry, false, ks2);
+
+ int size = rs.all().size();
+
+ assertEquals(0, size);
+ }
+
+ @Test
+ public void testCqlKeyspaceAuditing() throws Throwable
+ {
+ createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+
+ String cql = "CREATE KEYSPACE " + createKeyspaceName() + " WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 2} ";
+ executeAndAssert(cql, AuditLogEntryType.CREATE_KEYSPACE, true, currentKeyspace());
+
+ cql = "CREATE KEYSPACE IF NOT EXISTS " + createKeyspaceName() + " WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 2} ";
+ executeAndAssert(cql, AuditLogEntryType.CREATE_KEYSPACE, true, currentKeyspace());
+
+ cql = "ALTER KEYSPACE " + currentKeyspace() + " WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 2} ";
+ executeAndAssert(cql, AuditLogEntryType.ALTER_KEYSPACE, true, currentKeyspace());
+
+ cql = "DROP KEYSPACE " + currentKeyspace();
+ executeAndAssert(cql, AuditLogEntryType.DROP_KEYSPACE, true, currentKeyspace());
+ }
+
+ @Test
+ public void testCqlTableAuditing() throws Throwable
+ {
+ String cql = "CREATE TABLE " + KEYSPACE + "." + createTableName() + " (id int primary key, v1 text, v2 text)";
+ executeAndAssert(cql, AuditLogEntryType.CREATE_TABLE);
+
+ cql = "CREATE TABLE IF NOT EXISTS " + KEYSPACE + "." + createTableName() + " (id int primary key, v1 text, v2 text)";
+ executeAndAssert(cql, AuditLogEntryType.CREATE_TABLE);
+
+ cql = "ALTER TABLE " + KEYSPACE + "." + currentTable() + " ADD v3 text";
+ executeAndAssert(cql, AuditLogEntryType.ALTER_TABLE);
+
+ cql = "DROP TABLE " + KEYSPACE + "." + currentTable();
+ executeAndAssert(cql, AuditLogEntryType.DROP_TABLE);
+ }
+
+ @Test
+ public void testCqlMVAuditing() throws Throwable
+ {
+ createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+ execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 1, "Apache", "Cassandra");
+ execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 2, "trace", "test");
+
+ String tblName = currentTable();
+ String cql = "CREATE MATERIALIZED VIEW " + KEYSPACE + "." + createTableName() + " AS SELECT id,v1 FROM " + KEYSPACE + "." + tblName + " WHERE id IS NOT NULL AND v1 IS NOT NULL PRIMARY KEY ( id, v1 ) ";
+ executeAndAssert(cql, AuditLogEntryType.CREATE_VIEW);
+
+ cql = "CREATE MATERIALIZED VIEW IF NOT EXISTS " + KEYSPACE + "." + currentTable() + " AS SELECT id,v1 FROM " + KEYSPACE + "." + tblName + " WHERE id IS NOT NULL AND v1 IS NOT NULL PRIMARY KEY ( id, v1 ) ";
+ executeAndAssert(cql, AuditLogEntryType.CREATE_VIEW);
+
+ cql = "ALTER MATERIALIZED VIEW " + KEYSPACE + "." + currentTable() + " WITH caching = { 'keys' : 'NONE' };";
+ executeAndAssert(cql, AuditLogEntryType.ALTER_VIEW);
+
+ cql = "DROP MATERIALIZED VIEW " + KEYSPACE + "." + currentTable();
+ executeAndAssert(cql, AuditLogEntryType.DROP_VIEW);
+ }
+
+ @Test
+ public void testCqlTypeAuditing() throws Throwable
+ {
+ createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+
+ String tblName = createTableName();
+
+ String cql = "CREATE TYPE " + KEYSPACE + "." + tblName + " (id int, v1 text, v2 text)";
+ executeAndAssert(cql, AuditLogEntryType.CREATE_TYPE);
+
+ cql = "CREATE TYPE IF NOT EXISTS " + KEYSPACE + "." + tblName + " (id int, v1 text, v2 text)";
+ executeAndAssert(cql, AuditLogEntryType.CREATE_TYPE);
+
+ cql = "ALTER TYPE " + KEYSPACE + "." + tblName + " ADD v3 int";
+ executeAndAssert(cql, AuditLogEntryType.ALTER_TYPE);
+
+ cql = "ALTER TYPE " + KEYSPACE + "." + tblName + " RENAME v3 TO v4";
+ executeAndAssert(cql, AuditLogEntryType.ALTER_TYPE);
+
+ cql = "DROP TYPE " + KEYSPACE + "." + tblName;
+ executeAndAssert(cql, AuditLogEntryType.DROP_TYPE);
+
+ cql = "DROP TYPE IF EXISTS " + KEYSPACE + "." + tblName;
+ executeAndAssert(cql, AuditLogEntryType.DROP_TYPE);
+ }
+
+ @Test
+ public void testCqlIndexAuditing() throws Throwable
+ {
+ createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+
+ String tblName = currentTable();
+
+ String indexName = createTableName();
+
+ String cql = "CREATE INDEX " + indexName + " ON " + KEYSPACE + "." + tblName + " (v1)";
+ executeAndAssert(cql, AuditLogEntryType.CREATE_INDEX);
+
+ cql = "DROP INDEX " + KEYSPACE + "." + indexName;
+ executeAndAssert(cql, AuditLogEntryType.DROP_INDEX);
+ }
+
+ @Test
+ public void testCqlFunctionAuditing() throws Throwable
+ {
+ String tblName = createTableName();
+
+ String cql = "CREATE FUNCTION IF NOT EXISTS " + KEYSPACE + "." + tblName + " (column TEXT,num int) RETURNS NULL ON NULL INPUT RETURNS text LANGUAGE javascript AS $$ column.substring(0,num) $$";
+ executeAndAssert(cql, AuditLogEntryType.CREATE_FUNCTION);
+
+ cql = "DROP FUNCTION " + KEYSPACE + "." + tblName;
+ executeAndAssert(cql, AuditLogEntryType.DROP_FUNCTION);
+ }
+
+ @Test
+ public void testCqlTriggerAuditing() throws Throwable
+ {
+ createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+
+ String tblName = currentTable();
+ String triggerName = createTableName();
+
+ String cql = "DROP TRIGGER IF EXISTS " + triggerName + " ON " + KEYSPACE + "." + tblName;
+ executeAndAssert(cql, AuditLogEntryType.DROP_TRIGGER);
+ }
+
+ @Test
+ public void testCqlAggregateAuditing() throws Throwable
+ {
+ String aggName = createTableName();
+ String cql = "DROP AGGREGATE IF EXISTS " + KEYSPACE + "." + aggName;
+ executeAndAssert(cql, AuditLogEntryType.DROP_AGGREGATE);
+ }
+
+ @Test
+ public void testCqlQuerySyntaxError()
+ {
+ String cql = "INSERT INTO " + KEYSPACE + '.' + currentTable() + "1 (id, v1, v2) VALUES (1, 'insert_audit, 'test')";
+ try
+ {
+ createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+ Session session = sessionNet();
+ ResultSet rs = session.execute(cql);
+ Assert.fail("should not succeed");
+ }
+ catch (SyntaxError e)
+ {
+ // nop
+ }
+
+ AuditLogEntry logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+ assertLogEntry(logEntry, cql);
+ assertEquals(0, ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.size());
+ }
+
+ @Test
+ public void testCqlSelectQuerySyntaxError()
+ {
+ createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+ String cql = "SELECT * FROM " + KEYSPACE + '.' + currentTable() + " LIMIT 2w";
+
+ try
+ {
+ Session session = sessionNet();
+ ResultSet rs = session.execute(cql);
+ Assert.fail("should not succeed");
+ }
+ catch (SyntaxError e)
+ {
+ // nop
+ }
+
+ AuditLogEntry logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+ assertLogEntry(logEntry, cql);
+ assertEquals(0, ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.size());
+ }
+
+ @Test
+ public void testCqlPrepareQueryError()
+ {
+ createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+ String cql = "INSERT INTO " + KEYSPACE + '.' + currentTable() + " (id, v1, v2) VALUES (?,?,?)";
+ try
+ {
+ Session session = sessionNet();
+
+ PreparedStatement pstmt = session.prepare(cql);
+ AuditLogEntry logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+ assertLogEntry(cql, AuditLogEntryType.PREPARE_STATEMENT, logEntry, false);
+
+ dropTable("DROP TABLE %s");
+ ResultSet rs = session.execute(pstmt.bind(1, "insert_audit", "test"));
+ Assert.fail("should not succeed");
+ }
+ catch (NoHostAvailableException e)
+ {
+ // nop
+ }
+
+ AuditLogEntry logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+ assertLogEntry(logEntry, null);
+ logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+ assertLogEntry(logEntry, cql);
+ assertEquals(0, ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.size());
+ }
+
+ @Test
+ public void testCqlPrepareQuerySyntaxError()
+ {
+ String cql = "INSERT INTO " + KEYSPACE + '.' + "foo" + "(id, v1, v2) VALES (?,?,?)";
+ try
+ {
+ createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+ Session session = sessionNet();
+ PreparedStatement pstmt = session.prepare(cql);
+ ResultSet rs = session.execute(pstmt.bind(1, "insert_audit", "test"));
+ Assert.fail("should not succeed");
+ }
+ catch (SyntaxError e)
+ {
+ // nop
+ }
+ AuditLogEntry logEntry = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+ assertLogEntry(logEntry, cql);
+ assertEquals(0, ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.size());
+ }
+
+ /**
+ * Helper methods for Audit Log CQL Testing
+ */
+
+ private ResultSet executeAndAssert(String cql, AuditLogEntryType type) throws Throwable
+ {
+ return executeAndAssert(cql, type, false, KEYSPACE);
+ }
+
+ private ResultSet executeAndAssert(String cql, AuditLogEntryType type, boolean isTableNull, String keyspace) throws Throwable
+ {
+ Session session = sessionNet();
+
+ ResultSet rs = session.execute(cql);
+
+ AuditLogEntry logEntry1 = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+ assertLogEntry(cql, type, logEntry1, isTableNull, keyspace);
+
+ assertEquals(0, ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.size());
+ return rs;
+ }
+
+ private ResultSet executeAndAssertWithPrepare(String cql, AuditLogEntryType exceuteType, Object... bindValues) throws Throwable
+ {
+ return executeAndAssertWithPrepare(cql, exceuteType, false, bindValues);
+ }
+
+ private ResultSet executeAndAssertWithPrepare(String cql, AuditLogEntryType executeType, boolean isTableNull, Object... bindValues) throws Throwable
+ {
+ Session session = sessionNet();
+
+ PreparedStatement pstmt = session.prepare(cql);
+ ResultSet rs = session.execute(pstmt.bind(bindValues));
+
+ AuditLogEntry logEntry1 = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+ assertLogEntry(cql, AuditLogEntryType.PREPARE_STATEMENT, logEntry1, isTableNull);
+
+ AuditLogEntry logEntry2 = ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.poll();
+ assertLogEntry(cql, executeType, logEntry2, isTableNull);
+
+ assertEquals(0, ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.size());
+ return rs;
+ }
+
+ private ResultSet executeAndAssertNoAuditLog(String cql, Object... bindValues)
+ {
+ Session session = sessionNet();
+
+ PreparedStatement pstmt = session.prepare(cql);
+ ResultSet rs = session.execute(pstmt.bind(bindValues));
+
+ assertEquals(0, ((InMemoryAuditLogger) AuditLogManager.getInstance().getLogger()).inMemQueue.size());
+ return rs;
+ }
+
+ private ResultSet executeAndAssertDisableAuditLog(String cql, Object... bindValues)
+ {
+ Session session = sessionNet();
+
+ PreparedStatement pstmt = session.prepare(cql);
+ ResultSet rs = session.execute(pstmt.bind(bindValues));
+
+ assertThat(AuditLogManager.getInstance().getLogger(),instanceOf(NoOpAuditLogger.class));
+ return rs;
+ }
+
+ private void assertLogEntry(String cql, AuditLogEntryType type, AuditLogEntry actual, boolean isTableNull)
+ {
+ assertLogEntry(cql, type, actual, isTableNull, KEYSPACE);
+ }
+
+ private void assertLogEntry(String cql, AuditLogEntryType type, AuditLogEntry actual, boolean isTableNull, String keyspace)
+ {
+ assertLogEntry(cql, currentTable(), type, actual, isTableNull, keyspace);
+ }
+
+ private void assertLogEntry(String cql, String table, AuditLogEntryType type, AuditLogEntry actual, boolean isTableNull, String keyspace)
+ {
+ assertEquals(keyspace, actual.getKeyspace());
+ if (!isTableNull)
+ {
+ assertEquals(table, actual.getScope());
+ }
+ assertEquals(type, actual.getType());
+ assertEquals(cql, actual.getOperation());
+ assertNotEquals(0,actual.getTimestamp());
+ }
+
+ private void assertLogEntry(AuditLogEntry logEntry, String cql)
+ {
+ assertNull(logEntry.getKeyspace());
+ assertNull(logEntry.getScope());
+ assertNotEquals(0,logEntry.getTimestamp());
+ assertEquals(AuditLogEntryType.REQUEST_FAILURE, logEntry.getType());
+ if (null != cql && !cql.isEmpty())
+ {
+ assertThat(logEntry.getOperation(), containsString(cql));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/test/unit/org/apache/cassandra/audit/BinAuditLoggerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/audit/BinAuditLoggerTest.java b/test/unit/org/apache/cassandra/audit/BinAuditLoggerTest.java
new file mode 100644
index 0000000..f9d2930
--- /dev/null
+++ b/test/unit/org/apache/cassandra/audit/BinAuditLoggerTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.audit;
+
+import java.nio.file.Path;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import net.openhft.chronicle.queue.ChronicleQueue;
+import net.openhft.chronicle.queue.ChronicleQueueBuilder;
+import net.openhft.chronicle.queue.ExcerptTailer;
+import net.openhft.chronicle.queue.RollCycles;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.utils.binlog.BinLogTest;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+public class BinAuditLoggerTest extends CQLTester
+{
+ private static Path tempDir;
+
+ @BeforeClass
+ public static void setUp() throws Exception
+ {
+ tempDir = BinLogTest.tempDir();
+
+ AuditLogOptions options = new AuditLogOptions();
+ options.enabled = true;
+ options.logger = "BinAuditLogger";
+ options.roll_cycle = "TEST_SECONDLY";
+ options.audit_logs_dir = tempDir.toString();
+ DatabaseDescriptor.setAuditLoggingOptions(options);
+ requireNetwork();
+ }
+
+ @Test
+ public void testSelectRoundTripQuery() throws Throwable
+ {
+ createTable("CREATE TABLE %s (id int primary key, v1 text, v2 text)");
+ execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 1, "Apache", "Cassandra");
+ execute("INSERT INTO %s (id, v1, v2) VALUES (?, ?, ?)", 2, "trace", "test");
+
+ String cql = "SELECT id, v1, v2 FROM " + KEYSPACE + '.' + currentTable() + " WHERE id = ?";
+
+ Session session = sessionNet();
+
+ PreparedStatement pstmt = session.prepare(cql);
+ ResultSet rs = session.execute(pstmt.bind(1));
+
+ assertEquals(1, rs.all().size());
+ try (ChronicleQueue queue = ChronicleQueueBuilder.single(tempDir.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build())
+ {
+ ExcerptTailer tailer = queue.createTailer();
+ assertTrue(tailer.readDocument(wire -> {
+ assertEquals("AuditLog", wire.read("type").text());
+ assertThat(wire.read("message").text(), containsString(AuditLogEntryType.PREPARE_STATEMENT.toString()));
+ }));
+
+ assertTrue(tailer.readDocument(wire -> {
+ assertEquals("AuditLog", wire.read("type").text());
+ assertThat(wire.read("message").text(), containsString(AuditLogEntryType.SELECT.toString()));
+ }));
+ assertFalse(tailer.readDocument(wire -> {
+ }));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f56871b8/test/unit/org/apache/cassandra/audit/FullQueryLoggerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/audit/FullQueryLoggerTest.java b/test/unit/org/apache/cassandra/audit/FullQueryLoggerTest.java
new file mode 100644
index 0000000..14f7f23
--- /dev/null
+++ b/test/unit/org/apache/cassandra/audit/FullQueryLoggerTest.java
@@ -0,0 +1,610 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.audit;
+
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import io.netty.buffer.Unpooled;
+import net.openhft.chronicle.queue.ChronicleQueue;
+import net.openhft.chronicle.queue.ChronicleQueueBuilder;
+import net.openhft.chronicle.queue.ExcerptTailer;
+import net.openhft.chronicle.queue.RollCycles;
+import net.openhft.chronicle.wire.ValueIn;
+import net.openhft.chronicle.wire.WireOut;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.audit.FullQueryLogger.WeighableMarshallableQuery;
+import org.apache.cassandra.audit.FullQueryLogger.WeighableMarshallableBatch;
+import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.binlog.BinLogTest;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class FullQueryLoggerTest extends CQLTester
+{
+ private static Path tempDir;
+
+ @BeforeClass
+ public static void beforeClass() throws Exception
+ {
+ tempDir = BinLogTest.tempDir();
+ }
+
+ private FullQueryLogger instance;
+
+ @Before
+ public void setUp()
+ {
+ instance = AuditLogManager.getInstance().getFullQueryLogger();
+ }
+
+ @After
+ public void tearDown()
+ {
+ instance.reset(tempDir.toString());
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testConfigureNullPath() throws Exception
+ {
+ instance.configure(null, "", true, 1, 1);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testConfigureNullRollCycle() throws Exception
+ {
+ instance.configure(BinLogTest.tempDir(), null, true, 1, 1);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testConfigureInvalidRollCycle() throws Exception
+ {
+ instance.configure(BinLogTest.tempDir(), "foobar", true, 1, 1);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testConfigureInvalidMaxQueueWeight() throws Exception
+ {
+ instance.configure(BinLogTest.tempDir(), "DAILY", true, 0, 1);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testConfigureInvalidMaxQueueLogSize() throws Exception
+ {
+ instance.configure(BinLogTest.tempDir(), "DAILY", true, 1, 0);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testConfigureOverExistingFile() throws Exception
+ {
+ File f = File.createTempFile("foo", "bar");
+ f.deleteOnExit();
+ instance.configure(f.toPath(), "TEST_SECONDLY", true, 1, 1);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testCanRead() throws Exception
+ {
+ tempDir.toFile().setReadable(false);
+ try
+ {
+ configureFQL();
+ }
+ finally
+ {
+ tempDir.toFile().setReadable(true);
+ }
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testCanWrite() throws Exception
+ {
+ tempDir.toFile().setWritable(false);
+ try
+ {
+ configureFQL();
+ }
+ finally
+ {
+ tempDir.toFile().setWritable(true);
+ }
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testCanExecute() throws Exception
+ {
+ tempDir.toFile().setExecutable(false);
+ try
+ {
+ configureFQL();
+ }
+ finally
+ {
+ tempDir.toFile().setExecutable(true);
+ }
+ }
+
+ @Test
+ public void testResetWithoutConfigure() throws Exception
+ {
+ instance.reset(tempDir.toString());
+ instance.reset(tempDir.toString());
+ }
+
+ @Test
+ public void stopWithoutConfigure() throws Exception
+ {
+ instance.stop();
+ instance.stop();
+ }
+
+ /**
+ * Both the last used and supplied directory should get cleaned
+ */
+ @Test
+ public void testResetCleansPaths() throws Exception
+ {
+ configureFQL();
+ File tempA = File.createTempFile("foo", "bar", tempDir.toFile());
+ assertTrue(tempA.exists());
+ File tempB = File.createTempFile("foo", "bar", BinLogTest.tempDir().toFile());
+ instance.reset(tempB.getParent());
+ assertFalse(tempA.exists());
+ assertFalse(tempB.exists());
+ }
+
+ /**
+ * The last used and configured directory are the same and it shouldn't be an issue
+ */
+ @Test
+ public void testResetSamePath() throws Exception
+ {
+ configureFQL();
+ File tempA = File.createTempFile("foo", "bar", tempDir.toFile());
+ assertTrue(tempA.exists());
+ instance.reset(tempA.getParent());
+ assertFalse(tempA.exists());
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testDoubleConfigure() throws Exception
+ {
+ configureFQL();
+ configureFQL();
+ }
+
+ @Test
+ public void testCleansDirectory() throws Exception
+ {
+ assertTrue(new File(tempDir.toFile(), "foobar").createNewFile());
+ configureFQL();
+ assertEquals(tempDir.toFile().listFiles().length, 1);
+ assertEquals("directory-listing.cq4t", tempDir.toFile().listFiles()[0].getName());
+ }
+
+ @Test
+ public void testEnabledReset() throws Exception
+ {
+ assertFalse(instance.enabled());
+ configureFQL();
+ assertTrue(instance.enabled());
+ instance.reset(tempDir.toString());
+ assertFalse(instance.enabled());
+ }
+
+ @Test
+ public void testEnabledStop() throws Exception
+ {
+ assertFalse(instance.enabled());
+ configureFQL();
+ assertTrue(instance.enabled());
+ instance.stop();
+ assertFalse(instance.enabled());
+ }
+
+ /**
+ * Test that a thread will block if the FQL is over weight, and unblock once the backup is cleared
+ */
+ @Test
+ public void testBlocking() throws Exception
+ {
+ configureFQL();
+ //Prevent the bin log thread from making progress, causing the task queue to block
+ Semaphore blockBinLog = new Semaphore(0);
+ try
+ {
+ //Find out when the bin log thread has been blocked, necessary to not run into batch task drain behavior
+ Semaphore binLogBlocked = new Semaphore(0);
+ instance.binLog.put(new WeighableMarshallableQuery("foo1", QueryOptions.DEFAULT, 1)
+ {
+
+ public void writeMarshallable(WireOut wire)
+ {
+ //Notify that the bin log is blocking now
+ binLogBlocked.release();
+ try
+ {
+ //Block the bin log thread so the task queue can be filled
+ blockBinLog.acquire();
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ super.writeMarshallable(wire);
+ }
+
+ public void release()
+ {
+ super.release();
+ }
+ });
+
+ //Wait for the bin log thread to block so it can't batch drain tasks
+ Util.spinAssertEquals(true, binLogBlocked::tryAcquire, 60);
+
+ //Now fill the task queue
+ logQuery("foo2");
+
+ //Start a thread to block waiting on the bin log queue
+ Thread t = new Thread(() ->
+ {
+ logQuery("foo3");
+ //Should be able to log another query without an issue
+ logQuery("foo4");
+ });
+ t.start();
+ Thread.sleep(500);
+ //If thread state is terminated then the thread started, finished, and didn't block on the full task queue
+ assertTrue(t.getState() != Thread.State.TERMINATED);
+ }
+ finally
+ {
+ //Unblock the binlog thread
+ blockBinLog.release();
+ }
+ Util.spinAssertEquals(true, () -> checkForQueries(Arrays.asList("foo1", "foo2", "foo3", "foo4")), 60);
+ }
+
+ private boolean checkForQueries(List<String> queries)
+ {
+ try (ChronicleQueue queue = ChronicleQueueBuilder.single(tempDir.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build())
+ {
+ ExcerptTailer tailer = queue.createTailer();
+ List<String> expectedQueries = new LinkedList<>(queries);
+ while (!expectedQueries.isEmpty())
+ {
+ if (!tailer.readDocument(wire -> {
+ assertEquals(expectedQueries.get(0), wire.read("query").text());
+ expectedQueries.remove(0);
+ }))
+ {
+ return false;
+ }
+ }
+ assertFalse(tailer.readDocument(wire -> {}));
+ return true;
+ }
+ }
+
+ @Test
+ public void testNonBlocking() throws Exception
+ {
+ instance.configure(tempDir, "TEST_SECONDLY", false, 1, 1024 * 1024 * 256);
+ //Prevent the bin log thread from making progress, causing the task queue to refuse tasks
+ Semaphore blockBinLog = new Semaphore(0);
+ try
+ {
+ //Find out when the bin log thread has been blocked, necessary to not run into batch task drain behavior
+ Semaphore binLogBlocked = new Semaphore(0);
+ instance.binLog.put(new WeighableMarshallableQuery("foo1", QueryOptions.DEFAULT, 1)
+ {
+
+ public void writeMarshallable(WireOut wire)
+ {
+ //Notify that the bin log is blocking now
+ binLogBlocked.release();
+ try
+ {
+ //Block the bin log thread so the task queue can be filled
+ blockBinLog.acquire();
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ super.writeMarshallable(wire);
+ }
+
+ public void release()
+ {
+ super.release();
+ }
+ });
+
+ //Wait for the bin log thread to block so it can't batch drain tasks
+ Util.spinAssertEquals(true, binLogBlocked::tryAcquire, 60);
+
+ //Now fill the task queue
+ logQuery("foo2");
+
+ //This sample should get dropped AKA released without being written
+ AtomicInteger releasedCount = new AtomicInteger(0);
+ AtomicInteger writtenCount = new AtomicInteger(0);
+ instance.logRecord(new WeighableMarshallableQuery("foo3", QueryOptions.DEFAULT, 1) {
+ public void writeMarshallable(WireOut wire)
+ {
+ writtenCount.incrementAndGet();
+ super.writeMarshallable(wire);
+ }
+
+ public void release()
+ {
+ releasedCount.incrementAndGet();
+ super.release();
+ }
+ }, instance.binLog);
+
+ Util.spinAssertEquals(1, releasedCount::get, 60);
+ assertEquals(0, writtenCount.get());
+ }
+ finally
+ {
+ blockBinLog.release();
+ }
+ //Wait for tasks to drain so there should be space in the queue
+ Util.spinAssertEquals(true, () -> checkForQueries(Arrays.asList("foo1", "foo2")), 60);
+ //Should be able to log again
+ logQuery("foo4");
+ Util.spinAssertEquals(true, () -> checkForQueries(Arrays.asList("foo1", "foo2", "foo4")), 60);
+ }
+
+ @Test
+ public void testRoundTripQuery() throws Exception
+ {
+ configureFQL();
+ logQuery("foo");
+ Util.spinAssertEquals(true, () -> checkForQueries(Arrays.asList("foo")), 60);
+ try (ChronicleQueue queue = ChronicleQueueBuilder.single(tempDir.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build())
+ {
+ ExcerptTailer tailer = queue.createTailer();
+ assertTrue(tailer.readDocument(wire -> {
+ assertEquals("single", wire.read("type").text());
+ ProtocolVersion protocolVersion = ProtocolVersion.decode(wire.read("protocol-version").int32());
+ assertEquals(ProtocolVersion.CURRENT, protocolVersion);
+ QueryOptions queryOptions = QueryOptions.codec.decode(Unpooled.wrappedBuffer(wire.read("query-options").bytes()), protocolVersion);
+ compareQueryOptions(QueryOptions.DEFAULT, queryOptions);
+ assertEquals(1L, wire.read("query-time").int64());
+ assertEquals("foo", wire.read("query").text());
+ }));
+ }
+ }
+
+ @Test
+ public void testRoundTripBatch() throws Exception
+ {
+ configureFQL();
+ instance.logBatch("UNLOGGED", Arrays.asList("foo1", "foo2"), Arrays.asList(Arrays.asList(ByteBuffer.allocate(1) , ByteBuffer.allocateDirect(2)), Arrays.asList()), QueryOptions.DEFAULT, 1);
+ Util.spinAssertEquals(true, () ->
+ {
+ try (ChronicleQueue queue = ChronicleQueueBuilder.single(tempDir.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build())
+ {
+ return queue.createTailer().readingDocument().isPresent();
+ }
+ }, 60);
+ try (ChronicleQueue queue = ChronicleQueueBuilder.single(tempDir.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build())
+ {
+ ExcerptTailer tailer = queue.createTailer();
+ assertTrue(tailer.readDocument(wire -> {
+ assertEquals("batch", wire.read("type").text());
+ ProtocolVersion protocolVersion = ProtocolVersion.decode(wire.read("protocol-version").int32());
+ assertEquals(ProtocolVersion.CURRENT, protocolVersion);
+ QueryOptions queryOptions = QueryOptions.codec.decode(Unpooled.wrappedBuffer(wire.read("query-options").bytes()), protocolVersion);
+ assertEquals(1L, wire.read("query-time").int64());
+ compareQueryOptions(QueryOptions.DEFAULT, queryOptions);
+ assertEquals("UNLOGGED", wire.read("batch-type").text());
+ ValueIn in = wire.read("queries");
+ assertEquals(2, in.int32());
+ assertEquals("foo1", in.text());
+ assertEquals("foo2", in.text());
+ in = wire.read("values");
+ assertEquals(2, in.int32());
+ assertEquals(2, in.int32());
+ assertTrue(Arrays.equals(new byte[1], in.bytes()));
+ assertTrue(Arrays.equals(new byte[2], in.bytes()));
+ assertEquals(0, in.int32());
+ }));
+ }
+ }
+
+ @Test
+ public void testQueryWeight()
+ {
+ //Empty query should have some weight
+ WeighableMarshallableQuery query = new WeighableMarshallableQuery("", QueryOptions.DEFAULT, 1);
+ assertTrue(query.weight() >= 95);
+
+ StringBuilder sb = new StringBuilder();
+ for (int ii = 0; ii < 1024 * 1024; ii++)
+ {
+ sb.append('a');
+ }
+ query = new WeighableMarshallableQuery(sb.toString(), QueryOptions.DEFAULT, 1);
+
+ //A large query should be reflected in the size, * 2 since characters are still two bytes
+ assertTrue(query.weight() > ObjectSizes.measureDeep(sb.toString()));
+
+ //Large query options should be reflected
+ QueryOptions largeOptions = QueryOptions.forInternalCalls(Arrays.asList(ByteBuffer.allocate(1024 * 1024)));
+ query = new WeighableMarshallableQuery("", largeOptions, 1);
+ assertTrue(query.weight() > 1024 * 1024);
+ System.out.printf("weight %d%n", query.weight());
+ }
+
+ @Test
+ public void testBatchWeight()
+ {
+ //An empty batch should have weight
+ WeighableMarshallableBatch batch = new WeighableMarshallableBatch("", new ArrayList<>(), new ArrayList<>(), QueryOptions.DEFAULT, 1);
+ assertTrue(batch.weight() >= 183);
+
+ StringBuilder sb = new StringBuilder();
+ for (int ii = 0; ii < 1024 * 1024; ii++)
+ {
+ sb.append('a');
+ }
+
+ //The weight of the type string should be reflected
+ batch = new WeighableMarshallableBatch(sb.toString(), new ArrayList<>(), new ArrayList<>(), QueryOptions.DEFAULT, 1);
+ assertTrue(batch.weight() > ObjectSizes.measureDeep(sb.toString()));
+
+ //The weight of the list containing queries should be reflected
+ List<String> bigList = new ArrayList(100000);
+ for (int ii = 0; ii < 100000; ii++)
+ {
+ bigList.add("");
+ }
+ batch = new WeighableMarshallableBatch("", bigList, new ArrayList<>(), QueryOptions.DEFAULT, 1);
+ assertTrue(batch.weight() > ObjectSizes.measureDeep(bigList));
+
+ //The size of the query should be reflected
+ bigList = new ArrayList(1);
+ bigList.add(sb.toString());
+ batch = new WeighableMarshallableBatch("", bigList, new ArrayList<>(), QueryOptions.DEFAULT, 1);
+ assertTrue(batch.weight() > ObjectSizes.measureDeep(bigList));
+
+ bigList = null;
+ //The size of the list of values should be reflected
+ List<List<ByteBuffer>> bigValues = new ArrayList<>(100000);
+ for (int ii = 0; ii < 100000; ii++)
+ {
+ bigValues.add(new ArrayList<>(0));
+ }
+ bigValues.get(0).add(ByteBuffer.allocate(1024 * 1024 * 5));
+ batch = new WeighableMarshallableBatch("", new ArrayList<>(), bigValues, QueryOptions.DEFAULT, 1);
+ assertTrue(batch.weight() > ObjectSizes.measureDeep(bigValues));
+
+ //As should the size of the values
+ QueryOptions largeOptions = QueryOptions.forInternalCalls(Arrays.asList(ByteBuffer.allocate(1024 * 1024)));
+ batch = new WeighableMarshallableBatch("", new ArrayList<>(), new ArrayList<>(), largeOptions, 1);
+ assertTrue(batch.weight() > 1024 * 1024);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testLogBatchNullType() throws Exception
+ {
+ instance.logBatch(null, new ArrayList<>(), new ArrayList<>(), QueryOptions.DEFAULT, 1);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testLogBatchNullQueries() throws Exception
+ {
+ instance.logBatch("", null, new ArrayList<>(), QueryOptions.DEFAULT, 1);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testLogBatchNullQueriesQuery() throws Exception
+ {
+ configureFQL();
+ instance.logBatch("", Arrays.asList((String)null), new ArrayList<>(), QueryOptions.DEFAULT, 1);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testLogBatchNullValues() throws Exception
+ {
+ instance.logBatch("", new ArrayList<>(), null, QueryOptions.DEFAULT, 1);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testLogBatchNullValuesValue() throws Exception
+ {
+ instance.logBatch("", new ArrayList<>(), Arrays.asList((List<ByteBuffer>)null), null, 1);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testLogBatchNullQueryOptions() throws Exception
+ {
+ instance.logBatch("", new ArrayList<>(), new ArrayList<>(), null, 1);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testLogBatchNegativeTime() throws Exception
+ {
+ instance.logBatch("", new ArrayList<>(), new ArrayList<>(), QueryOptions.DEFAULT, -1);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testLogQueryNullQuery() throws Exception
+ {
+ instance.logQuery(null, QueryOptions.DEFAULT, 1);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testLogQueryNullQueryOptions() throws Exception
+ {
+ instance.logQuery("", null, 1);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testLogQueryNegativeTime() throws Exception
+ {
+ instance.logQuery("", QueryOptions.DEFAULT, -1);
+ }
+
+ private static void compareQueryOptions(QueryOptions a, QueryOptions b)
+ {
+ assertEquals(a.getClass(), b.getClass());
+ assertEquals(a.getProtocolVersion(), b.getProtocolVersion());
+ assertEquals(a.getPageSize(), b.getPageSize());
+ assertEquals(a.getConsistency(), b.getConsistency());
+ assertEquals(a.getPagingState(), b.getPagingState());
+ assertEquals(a.getValues(), b.getValues());
+ assertEquals(a.getSerialConsistency(), b.getSerialConsistency());
+ }
+
+ private void configureFQL() throws Exception
+ {
+ instance.configure(tempDir, "TEST_SECONDLY", true, 1, 1024 * 1024 * 256);
+ }
+
+ private void logQuery(String query)
+ {
+ instance.logQuery(query, QueryOptions.DEFAULT, 1);
+ }
+
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org