You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/05/14 16:25:05 UTC

cassandra git commit: Add client warnings to native protocol v4

Repository: cassandra
Updated Branches:
  refs/heads/trunk 1684e08cf -> 68722e7e5


Add client warnings to native protocol v4

patch by Carl Yeksigan; reviewed by Tyler Hobbs and Aleksey Yeschenko
for CASSANDRA-8930


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/68722e7e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/68722e7e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/68722e7e

Branch: refs/heads/trunk
Commit: 68722e7e594d228b4bf14c8cd8cbee19b50835ec
Parents: 1684e08
Author: Carl Yeksigian <ca...@apache.org>
Authored: Thu May 14 17:23:15 2015 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Thu May 14 17:24:57 2015 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 doc/native_protocol_v4.spec                     |  9 ++-
 .../org/apache/cassandra/config/Config.java     |  4 +-
 .../cassandra/config/DatabaseDescriptor.java    | 10 +--
 .../cql3/statements/BatchStatement.java         |  2 +
 .../cassandra/db/filter/SliceQueryFilter.java   |  2 +
 .../apache/cassandra/service/ClientWarn.java    | 73 ++++++++++++++++++
 .../org/apache/cassandra/transport/Frame.java   |  3 +-
 .../org/apache/cassandra/transport/Message.java | 38 ++++++++-
 .../cassandra/transport/SimpleClient.java       |  3 +-
 .../cassandra/service/ClientWarningsTest.java   | 81 ++++++++++++++++++++
 11 files changed, 215 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/68722e7e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e83b385..325a5f3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2
+ * Add client warnings to native protocol v4 (CASSANDRA-8930)
  * Allow roles cache to be invalidated (CASSANDRA-8967)
  * Upgrade Snappy (CASSANDRA-9063)
  * Don't start Thrift rpc by default (CASSANDRA-9319)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/68722e7e/doc/native_protocol_v4.spec
----------------------------------------------------------------------
diff --git a/doc/native_protocol_v4.spec b/doc/native_protocol_v4.spec
index 143fc4a..4014594 100644
--- a/doc/native_protocol_v4.spec
+++ b/doc/native_protocol_v4.spec
@@ -133,6 +133,12 @@ Table of Contents
           If both trace-flag and payload-flag are set, the generic key-value
           payload appears after trace's data.
           Type of custom payload is [bytes map] (see below).
+    0x08: Warning flag. The response contains warnings from the server which
+          were generated by the server to go along with this response.
+          If a response frame has the warning flag set, its body will contain the
+          text of the warnings. The warnings are a [string list] and will be the
+          first value in the frame body if the tracing flag is not set, or directly
+          after the tracing ID if it is.
 
   The rest of the flags is currently unused and ignored.
 
@@ -772,7 +778,7 @@ Table of Contents
   Clients are expected to answer the server challenge by an AUTH_RESPONSE
   message.
 
-4.2.7. AUTH_SUCCESS
+4.2.8. AUTH_SUCCESS
 
   Indicate the success of the authentication phase. See Section 4.2.3 for more
   details.
@@ -1134,3 +1140,4 @@ Table of Contents
   * Function_failure error code was added.
   * Add custom payload to frames for custom QueryHandler implementations (ignored by Cassandra's standard QueryHandler)
   * Add "TRACE_COMPLETE" event (section 4.2.6).
+  * Add warnings to frames for responses for which the server generated a warning during processing, which the client needs to address.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/68722e7e/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 421794e..2ede76e 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -148,8 +148,8 @@ public class Config
 
     /* if the size of columns or super-columns are more than this, indexing will kick in */
     public Integer column_index_size_in_kb = 64;
-    public Integer batch_size_warn_threshold_in_kb = 5;
-    public volatile Integer batch_size_fail_threshold_in_kb = 50;
+    public volatile int batch_size_warn_threshold_in_kb = 5;
+    public volatile int batch_size_fail_threshold_in_kb = 50;
     public Integer concurrent_compactors;
     public volatile Integer compaction_throughput_mb_per_sec = 16;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/68722e7e/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index ec90be2..b5c5fb4 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -617,11 +617,6 @@ public class DatabaseDescriptor
         }
         if (seedProvider.getSeeds().size() == 0)
             throw new ConfigurationException("The seed provider lists no seeds.", false);
-
-        if (conf.batch_size_fail_threshold_in_kb == null)
-        {
-            conf.batch_size_fail_threshold_in_kb = conf.batch_size_warn_threshold_in_kb * 10;
-        }
     }
 
     private static IEndpointSnitch createEndpointSnitch(String snitchClassName) throws ConfigurationException
@@ -801,6 +796,11 @@ public class DatabaseDescriptor
         return conf.batch_size_fail_threshold_in_kb;
     }
 
+    public static void setBatchSizeWarnThresholdInKB(int threshold)
+    {
+        conf.batch_size_warn_threshold_in_kb = threshold;
+    }
+
     public static void setBatchSizeFailThresholdInKB(int threshold)
     {
         conf.batch_size_fail_threshold_in_kb = threshold;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/68722e7e/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 465b2d9..ddc46c1 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -32,6 +32,7 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.tracing.Tracing;
@@ -260,6 +261,7 @@ public class BatchStatement implements CQLStatement
             {
                 logger.warn(format, ksCfPairs, size, warnThreshold, size - warnThreshold, "");
             }
+            ClientWarn.warn(String.format(format, ksCfPairs, size, warnThreshold, size - warnThreshold, ""));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/68722e7e/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
index d914f51..697c715 100644
--- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
@@ -38,6 +38,7 @@ import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.tracing.Tracing;
 
 public class SliceQueryFilter implements IDiskAtomFilter
@@ -241,6 +242,7 @@ public class SliceQueryFilter implements IDiskAtomFilter
                                        container.metadata().getKeyValidator().getString(key.getKey()),
                                        count,
                                        getSlicesInfo(container));
+            ClientWarn.warn(msg);
             logger.warn(msg);
         }
         Tracing.trace("Read {} live and {} tombstone cells{}",

http://git-wip-us.apache.org/repos/asf/cassandra/blob/68722e7e/src/java/org/apache/cassandra/service/ClientWarn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ClientWarn.java b/src/java/org/apache/cassandra/service/ClientWarn.java
new file mode 100644
index 0000000..2ed0a6c
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/ClientWarn.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.utils.FBUtilities;
+
+public class ClientWarn
+{
+    private static final String TRUNCATED = " [truncated]";
+    private static final ThreadLocal<ClientWarn> warnLocal = new ThreadLocal<>();
+
+    private final List<String> warnings = new ArrayList<>();
+
+    private ClientWarn()
+    {
+    }
+
+    public static void warn(String text)
+    {
+        ClientWarn warner = warnLocal.get();
+        if (warner != null)
+            warner.add(text);
+    }
+
+    private void add(String warning)
+    {
+        if (warnings.size() < FBUtilities.MAX_UNSIGNED_SHORT)
+            warnings.add(maybeTruncate(warning));
+    }
+
+    private static String maybeTruncate(String warning)
+    {
+        return warning.length() > FBUtilities.MAX_UNSIGNED_SHORT
+             ? warning.substring(0, FBUtilities.MAX_UNSIGNED_SHORT - TRUNCATED.length()) + TRUNCATED
+             : warning;
+    }
+
+    public static void captureWarnings()
+    {
+        warnLocal.set(new ClientWarn());
+    }
+
+    public static List<String> getWarnings()
+    {
+        ClientWarn warner = warnLocal.get();
+        if (warner == null || warner.warnings.isEmpty())
+            return null;
+        return warner.warnings;
+    }
+
+    public static void resetWarnings()
+    {
+        warnLocal.remove();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/68722e7e/src/java/org/apache/cassandra/transport/Frame.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Frame.java b/src/java/org/apache/cassandra/transport/Frame.java
index b72259d..0c038ea 100644
--- a/src/java/org/apache/cassandra/transport/Frame.java
+++ b/src/java/org/apache/cassandra/transport/Frame.java
@@ -112,7 +112,8 @@ public class Frame
             // The order of that enum matters!!
             COMPRESSED,
             TRACING,
-            CUSTOM_PAYLOAD;
+            CUSTOM_PAYLOAD,
+            WARNING;
 
             private static final Flag[] ALL_VALUES = values();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/68722e7e/src/java/org/apache/cassandra/transport/Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java
index 3382593..b6d5a95 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -41,6 +41,7 @@ import com.google.common.collect.ImmutableSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.transport.messages.*;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.utils.JVMStabilityInspector;
@@ -223,6 +224,7 @@ public abstract class Message
     public static abstract class Response extends Message
     {
         protected UUID tracingId;
+        protected List<String> warnings;
 
         protected Response(Type type)
         {
@@ -242,6 +244,17 @@ public abstract class Message
         {
             return tracingId;
         }
+
+        public Message setWarnings(List<String> warnings)
+        {
+            this.warnings = warnings;
+            return this;
+        }
+
+        public List<String> getWarnings()
+        {
+            return warnings;
+        }
     }
 
     @ChannelHandler.Sharable
@@ -252,8 +265,10 @@ public abstract class Message
             boolean isRequest = frame.header.type.direction == Direction.REQUEST;
             boolean isTracing = frame.header.flags.contains(Frame.Header.Flag.TRACING);
             boolean isCustomPayload = frame.header.flags.contains(Frame.Header.Flag.CUSTOM_PAYLOAD);
+            boolean hasWarning = frame.header.flags.contains(Frame.Header.Flag.WARNING);
 
             UUID tracingId = isRequest || !isTracing ? null : CBUtil.readUUID(frame.body);
+            List<String> warnings = isRequest || !hasWarning ? null : CBUtil.readStringList(frame.body);
             Map<String, byte[]> customPayload = !isCustomPayload ? null : CBUtil.readBytesMap(frame.body);
 
             try
@@ -280,6 +295,8 @@ public abstract class Message
                     assert message instanceof Response;
                     if (isTracing)
                         ((Response)message).setTracingId(tracingId);
+                    if (hasWarning)
+                        ((Response)message).setWarnings(warnings);
                 }
 
                 results.add(message);
@@ -315,6 +332,13 @@ public abstract class Message
                     Map<String, byte[]> customPayload = message.getCustomPayload();
                     if (tracingId != null)
                         messageSize += CBUtil.sizeOfUUID(tracingId);
+                    List<String> warnings = ((Response)message).getWarnings();
+                    if (warnings != null)
+                    {
+                        if (version < Server.VERSION_4)
+                            throw new ProtocolException("Must not send frame with WARNING flag for native protocol version < 4");
+                        messageSize += CBUtil.sizeOfStringList(warnings);
+                    }
                     if (customPayload != null)
                     {
                         if (version < Server.VERSION_4)
@@ -327,6 +351,11 @@ public abstract class Message
                         CBUtil.writeUUID(tracingId, body);
                         flags.add(Frame.Header.Flag.TRACING);
                     }
+                    if (warnings != null)
+                    {
+                        CBUtil.writeStringList(warnings, body);
+                        flags.add(Frame.Header.Flag.WARNING);
+                    }
                     if (customPayload != null)
                     {
                         CBUtil.writeBytesMap(customPayload, body);
@@ -468,12 +497,15 @@ public abstract class Message
             {
                 assert request.connection() instanceof ServerConnection;
                 connection = (ServerConnection)request.connection();
+                if (connection.getVersion() >= Server.VERSION_4)
+                    ClientWarn.captureWarnings();
+
                 QueryState qstate = connection.validateNewMessage(request.type, connection.getVersion(), request.getStreamId());
 
                 logger.debug("Received: {}, v={}", request, connection.getVersion());
-
                 response = request.execute(qstate);
                 response.setStreamId(request.getStreamId());
+                response.setWarnings(ClientWarn.getWarnings());
                 response.attach(connection);
                 connection.applyStateTransition(request.type, response.type);
             }
@@ -484,6 +516,10 @@ public abstract class Message
                 flush(new FlushItem(ctx, ErrorMessage.fromException(t, handler).setStreamId(request.getStreamId()), request.getSourceFrame()));
                 return;
             }
+            finally
+            {
+                ClientWarn.resetWarnings();
+            }
 
             logger.debug("Responding: {}, v={}", response, connection.getVersion());
             flush(new FlushItem(ctx, response, request.getSourceFrame()));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/68722e7e/src/java/org/apache/cassandra/transport/SimpleClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java b/src/java/org/apache/cassandra/transport/SimpleClient.java
index b39f166..701a24c 100644
--- a/src/java/org/apache/cassandra/transport/SimpleClient.java
+++ b/src/java/org/apache/cassandra/transport/SimpleClient.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.transport;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
@@ -60,7 +61,7 @@ import io.netty.channel.ChannelPipeline;
 import io.netty.handler.ssl.SslHandler;
 import static org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions;
 
-public class SimpleClient
+public class SimpleClient implements Closeable
 {
     static
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/68722e7e/test/unit/org/apache/cassandra/service/ClientWarningsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/ClientWarningsTest.java b/test/unit/org/apache/cassandra/service/ClientWarningsTest.java
new file mode 100644
index 0000000..ce35169
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/ClientWarningsTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import org.apache.commons.lang3.StringUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.transport.Message;
+import org.apache.cassandra.transport.Server;
+import org.apache.cassandra.transport.SimpleClient;
+import org.apache.cassandra.transport.messages.QueryMessage;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertNull;
+
+public class ClientWarningsTest extends CQLTester
+{
+    @BeforeClass
+    public static void setUp()
+    {
+        requireNetwork();
+        DatabaseDescriptor.setBatchSizeWarnThresholdInKB(1);
+    }
+
+    @Test
+    public void testLargeBatchWithProtoV4() throws Exception
+    {
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, v text)");
+
+        try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, Server.VERSION_4))
+        {
+            client.connect(false);
+
+            QueryMessage query = new QueryMessage(createBatchStatement(DatabaseDescriptor.getBatchSizeWarnThreshold()), QueryOptions.DEFAULT);
+            Message.Response resp = client.execute(query);
+            assertEquals(1, resp.getWarnings().size());
+        }
+    }
+
+    @Test
+    public void testLargeBatchWithProtoV2() throws Exception
+    {
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, v text)");
+
+        try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, Server.VERSION_2))
+        {
+            client.connect(false);
+
+            QueryMessage query = new QueryMessage(createBatchStatement(DatabaseDescriptor.getBatchSizeWarnThreshold()), QueryOptions.DEFAULT);
+            Message.Response resp = client.execute(query);
+            assertNull(resp.getWarnings());
+        }
+    }
+
+    private String createBatchStatement(int minSize)
+    {
+        return String.format("BEGIN UNLOGGED BATCH INSERT INTO %s.%s (pk, v) VALUES (1, '%s') APPLY BATCH;",
+                             KEYSPACE,
+                             currentTable(),
+                             StringUtils.repeat('1', minSize));
+    }
+}