You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2015/05/15 15:03:10 UTC

[1/3] cassandra git commit: Warn on unlogged batch misuse

Repository: cassandra
Updated Branches:
  refs/heads/trunk 1644e82f7 -> bf208377a


Warn on unlogged batch misuse

Patch by tjake; reviewed by jbellis for CASSANDRA-9282


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ebd05ddb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ebd05ddb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ebd05ddb

Branch: refs/heads/trunk
Commit: ebd05ddbe1fca8c70e9790628c0cce47327e4708
Parents: f5f5912
Author: T Jake Luciani <ja...@apache.org>
Authored: Mon May 4 14:12:53 2015 -0400
Committer: T Jake Luciani <ja...@apache.org>
Committed: Fri May 15 08:50:21 2015 -0400

----------------------------------------------------------------------
 .../cql3/statements/BatchStatement.java         | 40 +++++++++++++++++---
 1 file changed, 35 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ebd05ddb/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index c93bf64..6d4d3a1 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -19,10 +19,13 @@ package org.apache.cassandra.cql3.statements;
 
 import java.nio.ByteBuffer;
 import java.util.*;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.Function;
 import com.google.common.collect.*;
+
 import org.apache.cassandra.config.DatabaseDescriptor;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,10 +38,10 @@ import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.transport.messages.ResultMessage;
+import org.apache.cassandra.utils.NoSpamLogger;
 
 /**
  * A <code>BATCH</code> statement parsed from a CQL query.
- *
  */
 public class BatchStatement implements CQLStatement
 {
@@ -53,14 +56,15 @@ public class BatchStatement implements CQLStatement
     private final Attributes attrs;
     private final boolean hasConditions;
     private static final Logger logger = LoggerFactory.getLogger(BatchStatement.class);
+    private static final String unloggedBatchWarning = "Unlogged batch covering {} partition{} detected against table{} {}. You should use a logged batch for atomicity, or asynchronous writes for performance.";
 
     /**
      * Creates a new BatchStatement from a list of statements and a
      * Thrift consistency level.
      *
-     * @param type type of the batch
+     * @param type       type of the batch
      * @param statements a list of UpdateStatements
-     * @param attrs additional attributes for statement (CL, timestamp, timeToLive)
+     * @param attrs      additional attributes for statement (CL, timestamp, timeToLive)
      */
     public BatchStatement(int boundTerms, Type type, List<ModificationStatement> statements, Attributes attrs)
     {
@@ -170,13 +174,16 @@ public class BatchStatement implements CQLStatement
 
     private Collection<? extends IMutation> unzipMutations(Map<String, Map<ByteBuffer, IMutation>> mutations)
     {
+
         // The case where all statement where on the same keyspace is pretty common
         if (mutations.size() == 1)
             return mutations.values().iterator().next().values();
 
+
         List<IMutation> ms = new ArrayList<>();
         for (Map<ByteBuffer, IMutation> ksMap : mutations.values())
             ms.addAll(ksMap.values());
+
         return ms;
     }
 
@@ -214,7 +221,7 @@ public class BatchStatement implements CQLStatement
             }
             else
             {
-                mut = statement.cfm.isCounter() ? ((CounterMutation)mutation).getMutation() : (Mutation)mutation;
+                mut = statement.cfm.isCounter() ? ((CounterMutation) mutation).getMutation() : (Mutation) mutation;
             }
 
             statement.addUpdateForKey(mut.addOrGet(statement.cfm), key, clusteringPrefix, params);
@@ -223,6 +230,7 @@ public class BatchStatement implements CQLStatement
 
     /**
      * Checks batch size to ensure threshold is met. If not, a warning is logged.
+     *
      * @param cfs ColumnFamilies that will store the batch's mutations.
      */
     public static void verifyBatchSize(Iterable<ColumnFamily> cfs)
@@ -237,13 +245,33 @@ public class BatchStatement implements CQLStatement
         {
             Set<String> ksCfPairs = new HashSet<>();
             for (ColumnFamily cf : cfs)
-                ksCfPairs.add(cf.metadata().ksName + "." + cf.metadata().cfName);
+                ksCfPairs.add(String.format("%s.%s", cf.metadata().ksName, cf.metadata().cfName));
 
             String format = "Batch of prepared statements for {} is of size {}, exceeding specified threshold of {} by {}.";
             logger.warn(format, ksCfPairs, size, warnThreshold, size - warnThreshold);
         }
     }
 
+    private void verifyBatchType(Collection<? extends IMutation> mutations)
+    {
+        if (type != Type.LOGGED && mutations.size() > 1)
+        {
+            Set<String> ksCfPairs = new HashSet<>();
+            Set<ByteBuffer> keySet = new HashSet<>();
+
+            for (IMutation im : mutations)
+            {
+                keySet.add(im.key());
+                for (ColumnFamily cf : im.getColumnFamilies())
+                    ksCfPairs.add(String.format("%s.%s", cf.metadata().ksName, cf.metadata().cfName));
+            }
+
+            NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.MINUTES, unloggedBatchWarning,
+                             keySet.size(), keySet.size() == 1 ? "" : "s",
+                             ksCfPairs.size() == 1 ? "" : "s", ksCfPairs);
+        }
+    }
+
     public ResultMessage execute(QueryState queryState, QueryOptions options) throws RequestExecutionException, RequestValidationException
     {
         return execute(queryState, BatchQueryOptions.withoutPerStatementVariables(options));
@@ -279,7 +307,9 @@ public class BatchStatement implements CQLStatement
                 return im.getColumnFamilies();
             }
         }));
+
         verifyBatchSize(cfs);
+        verifyBatchType(mutations);
 
         boolean mutateAtomic = (type == Type.LOGGED && mutations.size() > 1);
         StorageProxy.mutateWithTriggers(mutations, cl, mutateAtomic);


[2/3] cassandra git commit: changes.txt

Posted by ja...@apache.org.
changes.txt


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6544fb7a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6544fb7a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6544fb7a

Branch: refs/heads/trunk
Commit: 6544fb7aaa7f338cc5db5017b65274d3cf99011c
Parents: ebd05dd
Author: T Jake Luciani <ja...@apache.org>
Authored: Fri May 15 08:55:12 2015 -0400
Committer: T Jake Luciani <ja...@apache.org>
Committed: Fri May 15 08:55:12 2015 -0400

----------------------------------------------------------------------
 CHANGES.txt | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6544fb7a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 46a1374..651f49a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.6
+ * Warn on misuse of unlogged batches (CASSANDRA-9282)
  * Failure detector detects and ignores local pauses (CASSANDRA-9183)
  * Add utility class to support for rate limiting a given log statement (CASSANDRA-9029)
  * Add missing consistency levels to cassandra-stess (CASSANDRA-9361)


[3/3] cassandra git commit: Merge branch 'cassandra-2.1' into trunk

Posted by ja...@apache.org.
Merge branch 'cassandra-2.1' into trunk

Conflicts:
	src/java/org/apache/cassandra/cql3/statements/BatchStatement.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bf208377
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bf208377
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bf208377

Branch: refs/heads/trunk
Commit: bf208377a1ad73d90dd6b6181049fdc3ed25e3bb
Parents: 1644e82 6544fb7
Author: T Jake Luciani <ja...@apache.org>
Authored: Fri May 15 09:02:44 2015 -0400
Committer: T Jake Luciani <ja...@apache.org>
Committed: Fri May 15 09:02:44 2015 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cql3/statements/BatchStatement.java         | 43 +++++++++++++++++---
 .../cassandra/service/ClientWarningsTest.java   | 32 +++++++++++++++
 3 files changed, 71 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf208377/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index bcae76f,651f49a..6aa3059
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,112 -1,5 +1,113 @@@
 +2.2
 + * Populate TokenMetadata early during startup (CASSANDRA-9317)
 + * Make Functions.declared thread-safe
 + * Add client warnings to native protocol v4 (CASSANDRA-8930)
 + * Allow roles cache to be invalidated (CASSANDRA-8967)
 + * Upgrade Snappy (CASSANDRA-9063)
 + * Don't start Thrift rpc by default (CASSANDRA-9319)
 + * Only stream from unrepaired sstables with incremental repair (CASSANDRA-8267)
 + * Aggregate UDFs allow SFUNC return type to differ from STYPE if FFUNC specified (CASSANDRA-9321)
 + * Remove Thrift dependencies in bundled tools (CASSANDRA-8358)
 + * Disable memory mapping of hsperfdata file for JVM statistics (CASSANDRA-9242)
 + * Add pre-startup checks to detect potential incompatibilities (CASSANDRA-8049)
 + * Distinguish between null and unset in protocol v4 (CASSANDRA-7304)
 + * Add user/role permissions for user-defined functions (CASSANDRA-7557)
 + * Allow cassandra config to be updated to restart daemon without unloading classes (CASSANDRA-9046)
 + * Don't initialize compaction writer before checking if iter is empty (CASSANDRA-9117)
 + * Don't execute any functions at prepare-time (CASSANDRA-9037)
 + * Share file handles between all instances of a SegmentedFile (CASSANDRA-8893)
 + * Make it possible to major compact LCS (CASSANDRA-7272)
 + * Make FunctionExecutionException extend RequestExecutionException
 +   (CASSANDRA-9055)
 + * Add support for SELECT JSON, INSERT JSON syntax and new toJson(), fromJson()
 +   functions (CASSANDRA-7970)
 + * Optimise max purgeable timestamp calculation in compaction (CASSANDRA-8920)
 + * Constrain internode message buffer sizes, and improve IO class hierarchy (CASSANDRA-8670) 
 + * New tool added to validate all sstables in a node (CASSANDRA-5791)
 + * Push notification when tracing completes for an operation (CASSANDRA-7807)
 + * Delay "node up" and "node added" notifications until native protocol server is started (CASSANDRA-8236)
 + * Compressed Commit Log (CASSANDRA-6809)
 + * Optimise IntervalTree (CASSANDRA-8988)
 + * Add a key-value payload for third party usage (CASSANDRA-8553, 9212)
 + * Bump metrics-reporter-config dependency for metrics 3.0 (CASSANDRA-8149)
 + * Partition intra-cluster message streams by size, not type (CASSANDRA-8789)
 + * Add WriteFailureException to native protocol, notify coordinator of
 +   write failures (CASSANDRA-8592)
 + * Convert SequentialWriter to nio (CASSANDRA-8709)
 + * Add role based access control (CASSANDRA-7653, 8650, 7216, 8760, 8849, 8761, 8850)
 + * Record client ip address in tracing sessions (CASSANDRA-8162)
 + * Indicate partition key columns in response metadata for prepared
 +   statements (CASSANDRA-7660)
 + * Merge UUIDType and TimeUUIDType parse logic (CASSANDRA-8759)
 + * Avoid memory allocation when searching index summary (CASSANDRA-8793)
 + * Optimise (Time)?UUIDType Comparisons (CASSANDRA-8730)
 + * Make CRC32Ex into a separate maven dependency (CASSANDRA-8836)
 + * Use preloaded jemalloc w/ Unsafe (CASSANDRA-8714, 9197)
 + * Avoid accessing partitioner through StorageProxy (CASSANDRA-8244, 8268)
 + * Upgrade Metrics library and remove depricated metrics (CASSANDRA-5657)
 + * Serializing Row cache alternative, fully off heap (CASSANDRA-7438)
 + * Duplicate rows returned when in clause has repeated values (CASSANDRA-6707)
 + * Make CassandraException unchecked, extend RuntimeException (CASSANDRA-8560)
 + * Support direct buffer decompression for reads (CASSANDRA-8464)
 + * DirectByteBuffer compatible LZ4 methods (CASSANDRA-7039)
 + * Group sstables for anticompaction correctly (CASSANDRA-8578)
 + * Add ReadFailureException to native protocol, respond
 +   immediately when replicas encounter errors while handling
 +   a read request (CASSANDRA-7886)
 + * Switch CommitLogSegment from RandomAccessFile to nio (CASSANDRA-8308)
 + * Allow mixing token and partition key restrictions (CASSANDRA-7016)
 + * Support index key/value entries on map collections (CASSANDRA-8473)
 + * Modernize schema tables (CASSANDRA-8261)
 + * Support for user-defined aggregation functions (CASSANDRA-8053)
 + * Fix NPE in SelectStatement with empty IN values (CASSANDRA-8419)
 + * Refactor SelectStatement, return IN results in natural order instead
 +   of IN value list order and ignore duplicate values in partition key IN restrictions (CASSANDRA-7981)
 + * Support UDTs, tuples, and collections in user-defined
 +   functions (CASSANDRA-7563)
 + * Fix aggregate fn results on empty selection, result column name,
 +   and cqlsh parsing (CASSANDRA-8229)
 + * Mark sstables as repaired after full repair (CASSANDRA-7586)
 + * Extend Descriptor to include a format value and refactor reader/writer
 +   APIs (CASSANDRA-7443)
 + * Integrate JMH for microbenchmarks (CASSANDRA-8151)
 + * Keep sstable levels when bootstrapping (CASSANDRA-7460)
 + * Add Sigar library and perform basic OS settings check on startup (CASSANDRA-7838)
 + * Support for aggregation functions (CASSANDRA-4914)
 + * Remove cassandra-cli (CASSANDRA-7920)
 + * Accept dollar quoted strings in CQL (CASSANDRA-7769)
 + * Make assassinate a first class command (CASSANDRA-7935)
 + * Support IN clause on any partition key column (CASSANDRA-7855)
 + * Support IN clause on any clustering column (CASSANDRA-4762)
 + * Improve compaction logging (CASSANDRA-7818)
 + * Remove YamlFileNetworkTopologySnitch (CASSANDRA-7917)
 + * Do anticompaction in groups (CASSANDRA-6851)
 + * Support user-defined functions (CASSANDRA-7395, 7526, 7562, 7740, 7781, 7929,
 +   7924, 7812, 8063, 7813, 7708)
 + * Permit configurable timestamps with cassandra-stress (CASSANDRA-7416)
 + * Move sstable RandomAccessReader to nio2, which allows using the
 +   FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050)
 + * Remove CQL2 (CASSANDRA-5918)
 + * Optimize fetching multiple cells by name (CASSANDRA-6933)
 + * Allow compilation in java 8 (CASSANDRA-7028)
 + * Make incremental repair default (CASSANDRA-7250)
 + * Enable code coverage thru JaCoCo (CASSANDRA-7226)
 + * Switch external naming of 'column families' to 'tables' (CASSANDRA-4369) 
 + * Shorten SSTable path (CASSANDRA-6962)
 + * Use unsafe mutations for most unit tests (CASSANDRA-6969)
 + * Fix race condition during calculation of pending ranges (CASSANDRA-7390)
 + * Fail on very large batch sizes (CASSANDRA-8011)
 + * Improve concurrency of repair (CASSANDRA-6455, 8208, 9145)
 + * Select optimal CRC32 implementation at runtime (CASSANDRA-8614)
 + * Evaluate MurmurHash of Token once per query (CASSANDRA-7096)
 + * Generalize progress reporting (CASSANDRA-8901)
 + * Resumable bootstrap streaming (CASSANDRA-8838, CASSANDRA-8942)
 + * Allow scrub for secondary index (CASSANDRA-5174)
 + * Save repair data to system table (CASSANDRA-5839)
 + * fix nodetool names that reference column families (CASSANDRA-8872)
 +
 +
  2.1.6
+  * Warn on misuse of unlogged batches (CASSANDRA-9282)
   * Failure detector detects and ignores local pauses (CASSANDRA-9183)
   * Add utility class to support for rate limiting a given log statement (CASSANDRA-9029)
   * Add missing consistency levels to cassandra-stess (CASSANDRA-9361)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf208377/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 82e122c,6d4d3a1..b1751a2
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@@ -19,25 -19,26 +19,28 @@@ package org.apache.cassandra.cql3.state
  
  import java.nio.ByteBuffer;
  import java.util.*;
+ import java.util.concurrent.TimeUnit;
  
  import com.google.common.base.Function;
 -import com.google.common.collect.*;
 -
 -import org.apache.cassandra.config.DatabaseDescriptor;
 +import com.google.common.collect.Iterables;
  
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
 +import org.slf4j.helpers.MessageFormatter;
+ 
  import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.cql3.*;
  import org.apache.cassandra.db.*;
  import org.apache.cassandra.db.composites.Composite;
  import org.apache.cassandra.exceptions.*;
  import org.apache.cassandra.service.ClientState;
 +import org.apache.cassandra.service.ClientWarn;
  import org.apache.cassandra.service.QueryState;
  import org.apache.cassandra.service.StorageProxy;
 +import org.apache.cassandra.tracing.Tracing;
  import org.apache.cassandra.transport.messages.ResultMessage;
+ import org.apache.cassandra.utils.NoSpamLogger;
  
  /**
   * A <code>BATCH</code> statement parsed from a CQL query.
@@@ -234,9 -230,10 +240,10 @@@ public class BatchStatement implements 
  
      /**
       * Checks batch size to ensure threshold is met. If not, a warning is logged.
+      *
       * @param cfs ColumnFamilies that will store the batch's mutations.
       */
 -    public static void verifyBatchSize(Iterable<ColumnFamily> cfs)
 +    public static void verifyBatchSize(Iterable<ColumnFamily> cfs) throws InvalidRequestException
      {
          long size = 0;
          long warnThreshold = DatabaseDescriptor.getBatchSizeWarnThreshold();
@@@ -249,23 -245,33 +256,47 @@@
          {
              Set<String> ksCfPairs = new HashSet<>();
              for (ColumnFamily cf : cfs)
-                 ksCfPairs.add(cf.metadata().ksName + "." + cf.metadata().cfName);
+                 ksCfPairs.add(String.format("%s.%s", cf.metadata().ksName, cf.metadata().cfName));
  
 -            String format = "Batch of prepared statements for {} is of size {}, exceeding specified threshold of {} by {}.";
 -            logger.warn(format, ksCfPairs, size, warnThreshold, size - warnThreshold);
 +            String format = "Batch of prepared statements for {} is of size {}, exceeding specified threshold of {} by {}.{}";
 +            if (size > failThreshold)
 +            {
 +                Tracing.trace(format, ksCfPairs, size, failThreshold, size - failThreshold, " (see batch_size_fail_threshold_in_kb)");
 +                logger.error(format, ksCfPairs, size, failThreshold, size - failThreshold, " (see batch_size_fail_threshold_in_kb)");
 +                throw new InvalidRequestException("Batch too large");
 +            }
 +            else if (logger.isWarnEnabled())
 +            {
 +                logger.warn(format, ksCfPairs, size, warnThreshold, size - warnThreshold, "");
 +            }
 +            ClientWarn.warn(MessageFormatter.arrayFormat(format, new Object[] {ksCfPairs, size, warnThreshold, size - warnThreshold, ""}).getMessage());
          }
      }
  
+     private void verifyBatchType(Collection<? extends IMutation> mutations)
+     {
+         if (type != Type.LOGGED && mutations.size() > 1)
+         {
+             Set<String> ksCfPairs = new HashSet<>();
+             Set<ByteBuffer> keySet = new HashSet<>();
+ 
+             for (IMutation im : mutations)
+             {
+                 keySet.add(im.key());
+                 for (ColumnFamily cf : im.getColumnFamilies())
+                     ksCfPairs.add(String.format("%s.%s", cf.metadata().ksName, cf.metadata().cfName));
+             }
+ 
+             NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.MINUTES, unloggedBatchWarning,
+                              keySet.size(), keySet.size() == 1 ? "" : "s",
+                              ksCfPairs.size() == 1 ? "" : "s", ksCfPairs);
++
++            ClientWarn.warn(MessageFormatter.arrayFormat(unloggedBatchWarning, new Object[]{keySet.size(), keySet.size() == 1 ? "" : "s",
++                                                    ksCfPairs.size() == 1 ? "" : "s", ksCfPairs}).getMessage());
++
+         }
+     }
+ 
      public ResultMessage execute(QueryState queryState, QueryOptions options) throws RequestExecutionException, RequestValidationException
      {
          return execute(queryState, BatchQueryOptions.withoutPerStatementVariables(options));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf208377/test/unit/org/apache/cassandra/service/ClientWarningsTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/service/ClientWarningsTest.java
index ce35169,0000000..da7577e
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/service/ClientWarningsTest.java
+++ b/test/unit/org/apache/cassandra/service/ClientWarningsTest.java
@@@ -1,81 -1,0 +1,113 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.service;
 +
 +import org.apache.commons.lang3.StringUtils;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.cql3.CQLTester;
 +import org.apache.cassandra.cql3.QueryOptions;
 +import org.apache.cassandra.transport.Message;
 +import org.apache.cassandra.transport.Server;
 +import org.apache.cassandra.transport.SimpleClient;
 +import org.apache.cassandra.transport.messages.QueryMessage;
 +
 +import static junit.framework.Assert.assertEquals;
 +import static junit.framework.Assert.assertNull;
 +
 +public class ClientWarningsTest extends CQLTester
 +{
 +    @BeforeClass
 +    public static void setUp()
 +    {
 +        requireNetwork();
 +        DatabaseDescriptor.setBatchSizeWarnThresholdInKB(1);
 +    }
 +
 +    @Test
++    public void testUnloggedBatchWithProtoV4() throws Exception
++    {
++        createTable("CREATE TABLE %s (pk int PRIMARY KEY, v text)");
++
++        try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, Server.VERSION_4))
++        {
++            client.connect(false);
++
++            QueryMessage query = new QueryMessage(createBatchStatement2(1), QueryOptions.DEFAULT);
++            Message.Response resp = client.execute(query);
++            assertEquals(1, resp.getWarnings().size());
++
++            query = new QueryMessage(createBatchStatement2(DatabaseDescriptor.getBatchSizeWarnThreshold()), QueryOptions.DEFAULT);
++            resp = client.execute(query);
++            assertEquals(2, resp.getWarnings().size());
++
++        }
++    }
++
++    @Test
 +    public void testLargeBatchWithProtoV4() throws Exception
 +    {
 +        createTable("CREATE TABLE %s (pk int PRIMARY KEY, v text)");
 +
 +        try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, Server.VERSION_4))
 +        {
 +            client.connect(false);
 +
 +            QueryMessage query = new QueryMessage(createBatchStatement(DatabaseDescriptor.getBatchSizeWarnThreshold()), QueryOptions.DEFAULT);
 +            Message.Response resp = client.execute(query);
 +            assertEquals(1, resp.getWarnings().size());
 +        }
 +    }
 +
 +    @Test
 +    public void testLargeBatchWithProtoV2() throws Exception
 +    {
 +        createTable("CREATE TABLE %s (pk int PRIMARY KEY, v text)");
 +
 +        try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, Server.VERSION_2))
 +        {
 +            client.connect(false);
 +
 +            QueryMessage query = new QueryMessage(createBatchStatement(DatabaseDescriptor.getBatchSizeWarnThreshold()), QueryOptions.DEFAULT);
 +            Message.Response resp = client.execute(query);
 +            assertNull(resp.getWarnings());
 +        }
 +    }
 +
 +    private String createBatchStatement(int minSize)
 +    {
 +        return String.format("BEGIN UNLOGGED BATCH INSERT INTO %s.%s (pk, v) VALUES (1, '%s') APPLY BATCH;",
 +                             KEYSPACE,
 +                             currentTable(),
 +                             StringUtils.repeat('1', minSize));
 +    }
++
++    private String createBatchStatement2(int minSize)
++    {
++        return String.format("BEGIN UNLOGGED BATCH INSERT INTO %s.%s (pk, v) VALUES (1, '%s'); INSERT INTO %s.%s (pk, v) VALUES (2, '%s'); APPLY BATCH;",
++                             KEYSPACE,
++                             currentTable(),
++                             StringUtils.repeat('1', minSize),
++                             KEYSPACE,
++                             currentTable(),
++                             StringUtils.repeat('1', minSize));
++    }
++
 +}