You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2014/08/07 20:41:13 UTC
git commit: Workaround for netty issue causing corrupted data to come
off the wire
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1.0 d8eff03df -> b3ada2bc4
Workaround for netty issue causing corrupted data to come off the wire
patch by tjake, test by Johan Bjork; reviewed by belliottsmith for (CASSANDRA-7695)
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b3ada2bc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b3ada2bc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b3ada2bc
Branch: refs/heads/cassandra-2.1.0
Commit: b3ada2bc453e84a470b070b56e35a13e0913662b
Parents: d8eff03
Author: Jake Luciani <ja...@apache.org>
Authored: Thu Aug 7 14:35:28 2014 -0400
Committer: Jake Luciani <ja...@apache.org>
Committed: Thu Aug 7 14:35:28 2014 -0400
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/service/CassandraDaemon.java | 9 +
.../apache/cassandra/cql3/CorruptionTest.java | 195 +++++++++++++++++++
3 files changed, 205 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b3ada2bc/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 26b39e0..aef0c40 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.0-final
+ * workaround for netty issue causing corrupted data off the wire (CASSANDRA-7695)
* cqlsh DESC CLUSTER fails retrieving ring information (CASSANDRA-7687)
* Fix binding null values inside UDT (CASSANDRA-7685)
* Fix UDT field selection with empty fields (CASSANDRA-7670)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b3ada2bc/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 7c85f81..5c88cb1 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -37,6 +37,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.addthis.metrics.reporter.config.ReporterConfig;
+import io.netty.util.internal.PlatformDependent;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
@@ -69,6 +70,14 @@ import org.apache.cassandra.utils.Pair;
*/
public class CassandraDaemon
{
+
+ //Workaround for netty issue
+ static
+ {
+ System.setProperty("io.netty.noUnsafe","true");
+ assert !PlatformDependent.hasUnsafe();
+ }
+
public static final String MBEAN_NAME = "org.apache.cassandra.db:type=NativeAccess";
// Have a dedicated thread to call exit to avoid deadlock in the case where the thread that wants to invoke exit
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b3ada2bc/test/long/org/apache/cassandra/cql3/CorruptionTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/cql3/CorruptionTest.java b/test/long/org/apache/cassandra/cql3/CorruptionTest.java
new file mode 100644
index 0000000..1a42112
--- /dev/null
+++ b/test/long/org/apache/cassandra/cql3/CorruptionTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3;
+
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.datastax.driver.core.*;
+import com.datastax.driver.core.policies.LoggingRetryPolicy;
+import com.datastax.driver.core.policies.Policies;
+import com.datastax.driver.core.utils.Bytes;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.service.EmbeddedCassandraService;
+
+public class CorruptionTest extends SchemaLoader
+{
+
+ private static EmbeddedCassandraService cassandra;
+ private static Cluster cluster;
+ private static Session session;
+
+ private static PreparedStatement getStatement;
+ private static PreparedStatement putStatement;
+ private static String KEYSPACE = "cass_test";
+ private static final String TABLE="put_test";
+ private static final String KEY = "SingleFailingKey";
+ private static String VALUE;
+ private final int THREADPOOL_SIZE=40;
+
+ @BeforeClass()
+ public static void setup() throws ConfigurationException, IOException
+ {
+ Schema.instance.clear();
+
+ cassandra = new EmbeddedCassandraService();
+ cassandra.start();
+
+ cluster = Cluster.builder().addContactPoint("127.0.0.1")
+ .withRetryPolicy(new LoggingRetryPolicy(Policies.defaultRetryPolicy()))
+ .withPort(DatabaseDescriptor.getNativeTransportPort()).build();
+ session = cluster.connect();
+
+ session.execute("CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE +" WITH replication " +
+ "= {'class':'SimpleStrategy', 'replication_factor':1};");
+ session.execute("USE " + KEYSPACE);
+ session.execute("CREATE TABLE IF NOT EXISTS " + TABLE + " (" +
+ "key blob," +
+ "value blob," +
+ "PRIMARY KEY (key));");
+
+
+ // Prepared statements
+ getStatement = session.prepare("SELECT value FROM " + TABLE + " WHERE key = ?;");
+ getStatement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
+
+ putStatement = session.prepare("INSERT INTO " + TABLE + " (key, value) VALUES (?, ?);");
+ putStatement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
+
+
+
+ StringBuilder s = new StringBuilder();
+ char a='a';
+ char z='z';
+ for (int i = 0; i < 500*1024; i++)
+ {
+ char x = (char)((i%((z-a)+1))+a);
+ if (x == 'a')
+ {
+ x = '\n';
+ }
+ s.append(x);
+ }
+ VALUE = s.toString();
+ }
+
+ @Test
+ public void runCorruptionTest()
+ {
+
+ final CountDownLatch failure = new CountDownLatch(1);
+
+
+ ExecutorService executor = Executors.newFixedThreadPool(THREADPOOL_SIZE);
+ for (int i = 0; i < THREADPOOL_SIZE; i++)
+ {
+ executor.execute(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ for (int i = 0; i < 100000; i++)
+ {
+ put(KEY.getBytes(), VALUE.getBytes());
+ byte[] res = get(KEY.getBytes());
+ //since we're flooding the server we might get some timeouts, that's not
+ //relevant for this test
+ if (res == null)
+ continue;
+
+ if (!Arrays.equals(VALUE.getBytes(), res))
+ {
+ /*try
+ {
+ dumpKeys(VALUE.getBytes(), res);
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ }*/
+ failure.countDown();
+ }
+ }
+ }
+
+ private void dumpKeys(byte[] putdata, byte[] getdata) throws IOException {
+ String basename = "bad-data-tid" + Thread.currentThread().getId();
+ File put = new File(basename+"-put");
+ File get = new File(basename+"-get");
+ try(FileWriter pw = new FileWriter(put)) {
+ pw.write(new String(putdata));
+ }
+ try(FileWriter pw = new FileWriter(get)) {
+ pw.write(new String(getdata));
+ }
+ }
+ });
+ }
+
+ try
+ {
+ assert!failure.await(2, TimeUnit.MINUTES);
+ }
+ catch (InterruptedException e)
+ {
+
+ }
+ executor.shutdownNow();
+
+ }
+
+ public static byte[] get(byte[] key)
+ {
+ BoundStatement boundStatement = new BoundStatement(getStatement);
+ boundStatement.setBytes(0, ByteBuffer.wrap(key));
+
+ final com.datastax.driver.core.ResultSet resultSet = session.execute(boundStatement);
+ final Row row = resultSet.one();
+ if (row != null)
+ {
+ final ByteBuffer byteBuf = row.getBytes("value");
+ return Bytes.getArray(byteBuf);
+ }
+
+ return null;
+ }
+
+ public static void put(byte[] key, byte[] value)
+ {
+ BoundStatement boundStatement = new BoundStatement(putStatement);
+ boundStatement.setBytes(0, ByteBuffer.wrap(key));
+ boundStatement.setBytes(1, ByteBuffer.wrap(value));
+
+ session.execute(boundStatement);
+ }
+}