You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/05/14 16:25:05 UTC
cassandra git commit: Add client warnings to native protocol v4
Repository: cassandra
Updated Branches:
refs/heads/trunk 1684e08cf -> 68722e7e5
Add client warnings to native protocol v4
patch by Carl Yeksigan; reviewed by Tyler Hobbs and Aleksey Yeschenko
for CASSANDRA-8930
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/68722e7e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/68722e7e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/68722e7e
Branch: refs/heads/trunk
Commit: 68722e7e594d228b4bf14c8cd8cbee19b50835ec
Parents: 1684e08
Author: Carl Yeksigian <ca...@apache.org>
Authored: Thu May 14 17:23:15 2015 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Thu May 14 17:24:57 2015 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
doc/native_protocol_v4.spec | 9 ++-
.../org/apache/cassandra/config/Config.java | 4 +-
.../cassandra/config/DatabaseDescriptor.java | 10 +--
.../cql3/statements/BatchStatement.java | 2 +
.../cassandra/db/filter/SliceQueryFilter.java | 2 +
.../apache/cassandra/service/ClientWarn.java | 73 ++++++++++++++++++
.../org/apache/cassandra/transport/Frame.java | 3 +-
.../org/apache/cassandra/transport/Message.java | 38 ++++++++-
.../cassandra/transport/SimpleClient.java | 3 +-
.../cassandra/service/ClientWarningsTest.java | 81 ++++++++++++++++++++
11 files changed, 215 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/68722e7e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e83b385..325a5f3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2
+ * Add client warnings to native protocol v4 (CASSANDRA-8930)
* Allow roles cache to be invalidated (CASSANDRA-8967)
* Upgrade Snappy (CASSANDRA-9063)
* Don't start Thrift rpc by default (CASSANDRA-9319)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/68722e7e/doc/native_protocol_v4.spec
----------------------------------------------------------------------
diff --git a/doc/native_protocol_v4.spec b/doc/native_protocol_v4.spec
index 143fc4a..4014594 100644
--- a/doc/native_protocol_v4.spec
+++ b/doc/native_protocol_v4.spec
@@ -133,6 +133,12 @@ Table of Contents
If both trace-flag and payload-flag are set, the generic key-value
payload appears after trace's data.
Type of custom payload is [bytes map] (see below).
+ 0x08: Warning flag. The response contains warnings from the server which
+ were generated by the server to go along with this response.
+ If a response frame has the warning flag set, its body will contain the
+ text of the warnings. The warnings are a [string list] and will be the
+ first value in the frame body if the tracing flag is not set, or directly
+ after the tracing ID if it is.
The rest of the flags is currently unused and ignored.
@@ -772,7 +778,7 @@ Table of Contents
Clients are expected to answer the server challenge by an AUTH_RESPONSE
message.
-4.2.7. AUTH_SUCCESS
+4.2.8. AUTH_SUCCESS
Indicate the success of the authentication phase. See Section 4.2.3 for more
details.
@@ -1134,3 +1140,4 @@ Table of Contents
* Function_failure error code was added.
* Add custom payload to frames for custom QueryHandler implementations (ignored by Cassandra's standard QueryHandler)
* Add "TRACE_COMPLETE" event (section 4.2.6).
+ * Add warnings to frames for responses for which the server generated a warning during processing, which the client needs to address.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/68722e7e/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 421794e..2ede76e 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -148,8 +148,8 @@ public class Config
/* if the size of columns or super-columns are more than this, indexing will kick in */
public Integer column_index_size_in_kb = 64;
- public Integer batch_size_warn_threshold_in_kb = 5;
- public volatile Integer batch_size_fail_threshold_in_kb = 50;
+ public volatile int batch_size_warn_threshold_in_kb = 5;
+ public volatile int batch_size_fail_threshold_in_kb = 50;
public Integer concurrent_compactors;
public volatile Integer compaction_throughput_mb_per_sec = 16;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/68722e7e/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index ec90be2..b5c5fb4 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -617,11 +617,6 @@ public class DatabaseDescriptor
}
if (seedProvider.getSeeds().size() == 0)
throw new ConfigurationException("The seed provider lists no seeds.", false);
-
- if (conf.batch_size_fail_threshold_in_kb == null)
- {
- conf.batch_size_fail_threshold_in_kb = conf.batch_size_warn_threshold_in_kb * 10;
- }
}
private static IEndpointSnitch createEndpointSnitch(String snitchClassName) throws ConfigurationException
@@ -801,6 +796,11 @@ public class DatabaseDescriptor
return conf.batch_size_fail_threshold_in_kb;
}
+ public static void setBatchSizeWarnThresholdInKB(int threshold)
+ {
+ conf.batch_size_warn_threshold_in_kb = threshold;
+ }
+
public static void setBatchSizeFailThresholdInKB(int threshold)
{
conf.batch_size_fail_threshold_in_kb = threshold;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/68722e7e/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 465b2d9..ddc46c1 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -32,6 +32,7 @@ import org.apache.cassandra.db.*;
import org.apache.cassandra.db.composites.Composite;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.tracing.Tracing;
@@ -260,6 +261,7 @@ public class BatchStatement implements CQLStatement
{
logger.warn(format, ksCfPairs, size, warnThreshold, size - warnThreshold, "");
}
+ ClientWarn.warn(String.format(format, ksCfPairs, size, warnThreshold, size - warnThreshold, ""));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/68722e7e/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
index d914f51..697c715 100644
--- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
@@ -38,6 +38,7 @@ import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.tracing.Tracing;
public class SliceQueryFilter implements IDiskAtomFilter
@@ -241,6 +242,7 @@ public class SliceQueryFilter implements IDiskAtomFilter
container.metadata().getKeyValidator().getString(key.getKey()),
count,
getSlicesInfo(container));
+ ClientWarn.warn(msg);
logger.warn(msg);
}
Tracing.trace("Read {} live and {} tombstone cells{}",
http://git-wip-us.apache.org/repos/asf/cassandra/blob/68722e7e/src/java/org/apache/cassandra/service/ClientWarn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ClientWarn.java b/src/java/org/apache/cassandra/service/ClientWarn.java
new file mode 100644
index 0000000..2ed0a6c
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/ClientWarn.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.utils.FBUtilities;
+
+public class ClientWarn
+{
+ private static final String TRUNCATED = " [truncated]";
+ private static final ThreadLocal<ClientWarn> warnLocal = new ThreadLocal<>();
+
+ private final List<String> warnings = new ArrayList<>();
+
+ private ClientWarn()
+ {
+ }
+
+ public static void warn(String text)
+ {
+ ClientWarn warner = warnLocal.get();
+ if (warner != null)
+ warner.add(text);
+ }
+
+ private void add(String warning)
+ {
+ if (warnings.size() < FBUtilities.MAX_UNSIGNED_SHORT)
+ warnings.add(maybeTruncate(warning));
+ }
+
+ private static String maybeTruncate(String warning)
+ {
+ return warning.length() > FBUtilities.MAX_UNSIGNED_SHORT
+ ? warning.substring(0, FBUtilities.MAX_UNSIGNED_SHORT - TRUNCATED.length()) + TRUNCATED
+ : warning;
+ }
+
+ public static void captureWarnings()
+ {
+ warnLocal.set(new ClientWarn());
+ }
+
+ public static List<String> getWarnings()
+ {
+ ClientWarn warner = warnLocal.get();
+ if (warner == null || warner.warnings.isEmpty())
+ return null;
+ return warner.warnings;
+ }
+
+ public static void resetWarnings()
+ {
+ warnLocal.remove();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/68722e7e/src/java/org/apache/cassandra/transport/Frame.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Frame.java b/src/java/org/apache/cassandra/transport/Frame.java
index b72259d..0c038ea 100644
--- a/src/java/org/apache/cassandra/transport/Frame.java
+++ b/src/java/org/apache/cassandra/transport/Frame.java
@@ -112,7 +112,8 @@ public class Frame
// The order of that enum matters!!
COMPRESSED,
TRACING,
- CUSTOM_PAYLOAD;
+ CUSTOM_PAYLOAD,
+ WARNING;
private static final Flag[] ALL_VALUES = values();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/68722e7e/src/java/org/apache/cassandra/transport/Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java
index 3382593..b6d5a95 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -41,6 +41,7 @@ import com.google.common.collect.ImmutableSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.transport.messages.*;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.utils.JVMStabilityInspector;
@@ -223,6 +224,7 @@ public abstract class Message
public static abstract class Response extends Message
{
protected UUID tracingId;
+ protected List<String> warnings;
protected Response(Type type)
{
@@ -242,6 +244,17 @@ public abstract class Message
{
return tracingId;
}
+
+ public Message setWarnings(List<String> warnings)
+ {
+ this.warnings = warnings;
+ return this;
+ }
+
+ public List<String> getWarnings()
+ {
+ return warnings;
+ }
}
@ChannelHandler.Sharable
@@ -252,8 +265,10 @@ public abstract class Message
boolean isRequest = frame.header.type.direction == Direction.REQUEST;
boolean isTracing = frame.header.flags.contains(Frame.Header.Flag.TRACING);
boolean isCustomPayload = frame.header.flags.contains(Frame.Header.Flag.CUSTOM_PAYLOAD);
+ boolean hasWarning = frame.header.flags.contains(Frame.Header.Flag.WARNING);
UUID tracingId = isRequest || !isTracing ? null : CBUtil.readUUID(frame.body);
+ List<String> warnings = isRequest || !hasWarning ? null : CBUtil.readStringList(frame.body);
Map<String, byte[]> customPayload = !isCustomPayload ? null : CBUtil.readBytesMap(frame.body);
try
@@ -280,6 +295,8 @@ public abstract class Message
assert message instanceof Response;
if (isTracing)
((Response)message).setTracingId(tracingId);
+ if (hasWarning)
+ ((Response)message).setWarnings(warnings);
}
results.add(message);
@@ -315,6 +332,13 @@ public abstract class Message
Map<String, byte[]> customPayload = message.getCustomPayload();
if (tracingId != null)
messageSize += CBUtil.sizeOfUUID(tracingId);
+ List<String> warnings = ((Response)message).getWarnings();
+ if (warnings != null)
+ {
+ if (version < Server.VERSION_4)
+ throw new ProtocolException("Must not send frame with WARNING flag for native protocol version < 4");
+ messageSize += CBUtil.sizeOfStringList(warnings);
+ }
if (customPayload != null)
{
if (version < Server.VERSION_4)
@@ -327,6 +351,11 @@ public abstract class Message
CBUtil.writeUUID(tracingId, body);
flags.add(Frame.Header.Flag.TRACING);
}
+ if (warnings != null)
+ {
+ CBUtil.writeStringList(warnings, body);
+ flags.add(Frame.Header.Flag.WARNING);
+ }
if (customPayload != null)
{
CBUtil.writeBytesMap(customPayload, body);
@@ -468,12 +497,15 @@ public abstract class Message
{
assert request.connection() instanceof ServerConnection;
connection = (ServerConnection)request.connection();
+ if (connection.getVersion() >= Server.VERSION_4)
+ ClientWarn.captureWarnings();
+
QueryState qstate = connection.validateNewMessage(request.type, connection.getVersion(), request.getStreamId());
logger.debug("Received: {}, v={}", request, connection.getVersion());
-
response = request.execute(qstate);
response.setStreamId(request.getStreamId());
+ response.setWarnings(ClientWarn.getWarnings());
response.attach(connection);
connection.applyStateTransition(request.type, response.type);
}
@@ -484,6 +516,10 @@ public abstract class Message
flush(new FlushItem(ctx, ErrorMessage.fromException(t, handler).setStreamId(request.getStreamId()), request.getSourceFrame()));
return;
}
+ finally
+ {
+ ClientWarn.resetWarnings();
+ }
logger.debug("Responding: {}, v={}", response, connection.getVersion());
flush(new FlushItem(ctx, response, request.getSourceFrame()));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/68722e7e/src/java/org/apache/cassandra/transport/SimpleClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java b/src/java/org/apache/cassandra/transport/SimpleClient.java
index b39f166..701a24c 100644
--- a/src/java/org/apache/cassandra/transport/SimpleClient.java
+++ b/src/java/org/apache/cassandra/transport/SimpleClient.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.transport;
+import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
@@ -60,7 +61,7 @@ import io.netty.channel.ChannelPipeline;
import io.netty.handler.ssl.SslHandler;
import static org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions;
-public class SimpleClient
+public class SimpleClient implements Closeable
{
static
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/68722e7e/test/unit/org/apache/cassandra/service/ClientWarningsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/ClientWarningsTest.java b/test/unit/org/apache/cassandra/service/ClientWarningsTest.java
new file mode 100644
index 0000000..ce35169
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/ClientWarningsTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import org.apache.commons.lang3.StringUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.transport.Message;
+import org.apache.cassandra.transport.Server;
+import org.apache.cassandra.transport.SimpleClient;
+import org.apache.cassandra.transport.messages.QueryMessage;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertNull;
+
+public class ClientWarningsTest extends CQLTester
+{
+ @BeforeClass
+ public static void setUp()
+ {
+ requireNetwork();
+ DatabaseDescriptor.setBatchSizeWarnThresholdInKB(1);
+ }
+
+ @Test
+ public void testLargeBatchWithProtoV4() throws Exception
+ {
+ createTable("CREATE TABLE %s (pk int PRIMARY KEY, v text)");
+
+ try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, Server.VERSION_4))
+ {
+ client.connect(false);
+
+ QueryMessage query = new QueryMessage(createBatchStatement(DatabaseDescriptor.getBatchSizeWarnThreshold()), QueryOptions.DEFAULT);
+ Message.Response resp = client.execute(query);
+ assertEquals(1, resp.getWarnings().size());
+ }
+ }
+
+ @Test
+ public void testLargeBatchWithProtoV2() throws Exception
+ {
+ createTable("CREATE TABLE %s (pk int PRIMARY KEY, v text)");
+
+ try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, Server.VERSION_2))
+ {
+ client.connect(false);
+
+ QueryMessage query = new QueryMessage(createBatchStatement(DatabaseDescriptor.getBatchSizeWarnThreshold()), QueryOptions.DEFAULT);
+ Message.Response resp = client.execute(query);
+ assertNull(resp.getWarnings());
+ }
+ }
+
+ private String createBatchStatement(int minSize)
+ {
+ return String.format("BEGIN UNLOGGED BATCH INSERT INTO %s.%s (pk, v) VALUES (1, '%s') APPLY BATCH;",
+ KEYSPACE,
+ currentTable(),
+ StringUtils.repeat('1', minSize));
+ }
+}