You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2014/08/07 20:41:13 UTC

git commit: Workaround for netty issue causing corrupted data to come off the wire

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1.0 d8eff03df -> b3ada2bc4


Workaround for netty issue causing corrupted data to come off the wire

patch by tjake, test by Johan Bjork; reviewed by belliottsmith for (CASSANDRA-7695)


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b3ada2bc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b3ada2bc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b3ada2bc

Branch: refs/heads/cassandra-2.1.0
Commit: b3ada2bc453e84a470b070b56e35a13e0913662b
Parents: d8eff03
Author: Jake Luciani <ja...@apache.org>
Authored: Thu Aug 7 14:35:28 2014 -0400
Committer: Jake Luciani <ja...@apache.org>
Committed: Thu Aug 7 14:35:28 2014 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/service/CassandraDaemon.java      |   9 +
 .../apache/cassandra/cql3/CorruptionTest.java   | 195 +++++++++++++++++++
 3 files changed, 205 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b3ada2bc/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 26b39e0..aef0c40 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.0-final
+ * workaround for netty issue causing corrupted data off the wire (CASSANDRA-7695)
  * cqlsh DESC CLUSTER fails retrieving ring information (CASSANDRA-7687)
  * Fix binding null values inside UDT (CASSANDRA-7685)
  * Fix UDT field selection with empty fields (CASSANDRA-7670)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b3ada2bc/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 7c85f81..5c88cb1 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -37,6 +37,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.addthis.metrics.reporter.config.ReporterConfig;
+import io.netty.util.internal.PlatformDependent;
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
@@ -69,6 +70,14 @@ import org.apache.cassandra.utils.Pair;
  */
 public class CassandraDaemon
 {
+
+    //Workaround for netty issue
+    static 
+    {
+        System.setProperty("io.netty.noUnsafe","true");
+        assert !PlatformDependent.hasUnsafe();
+    }
+
     public static final String MBEAN_NAME = "org.apache.cassandra.db:type=NativeAccess";
 
     // Have a dedicated thread to call exit to avoid deadlock in the case where the thread that wants to invoke exit

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b3ada2bc/test/long/org/apache/cassandra/cql3/CorruptionTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/cql3/CorruptionTest.java b/test/long/org/apache/cassandra/cql3/CorruptionTest.java
new file mode 100644
index 0000000..1a42112
--- /dev/null
+++ b/test/long/org/apache/cassandra/cql3/CorruptionTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3;
+
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.datastax.driver.core.*;
+import com.datastax.driver.core.policies.LoggingRetryPolicy;
+import com.datastax.driver.core.policies.Policies;
+import com.datastax.driver.core.utils.Bytes;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.service.EmbeddedCassandraService;
+
+public class CorruptionTest extends SchemaLoader
+{
+
+    private static EmbeddedCassandraService cassandra;
+    private static Cluster cluster;
+    private static Session session;
+
+    private static PreparedStatement getStatement;
+    private static PreparedStatement putStatement;
+    private static String KEYSPACE = "cass_test";
+    private static final String TABLE="put_test";
+    private static final String KEY = "SingleFailingKey";
+    private static String VALUE;
+    private final int THREADPOOL_SIZE=40;
+
+    @BeforeClass()
+    public static void setup() throws ConfigurationException, IOException
+    {
+        Schema.instance.clear();
+
+        cassandra = new EmbeddedCassandraService();
+        cassandra.start();
+
+        cluster = Cluster.builder().addContactPoint("127.0.0.1")
+                         .withRetryPolicy(new LoggingRetryPolicy(Policies.defaultRetryPolicy()))
+                         .withPort(DatabaseDescriptor.getNativeTransportPort()).build();
+        session = cluster.connect();
+
+        session.execute("CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE +" WITH replication " +
+                        "= {'class':'SimpleStrategy', 'replication_factor':1};");
+        session.execute("USE " + KEYSPACE);
+        session.execute("CREATE TABLE IF NOT EXISTS " + TABLE + " (" +
+                         "key blob," +
+                         "value blob," +
+                         "PRIMARY KEY (key));");
+
+
+        // Prepared statements
+        getStatement = session.prepare("SELECT value FROM " + TABLE + " WHERE key = ?;");
+        getStatement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
+
+        putStatement = session.prepare("INSERT INTO " + TABLE + " (key, value) VALUES (?, ?);");
+        putStatement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
+
+
+
+        StringBuilder s = new StringBuilder();
+        char a='a';
+        char z='z';
+        for (int i = 0; i < 500*1024; i++)
+        {
+            char x = (char)((i%((z-a)+1))+a);
+            if (x == 'a')
+            {
+                x = '\n';
+            }
+            s.append(x);
+        }
+        VALUE = s.toString();
+    }
+
+    @Test
+    public void runCorruptionTest()
+    {
+
+        final CountDownLatch failure = new CountDownLatch(1);
+
+
+        ExecutorService executor = Executors.newFixedThreadPool(THREADPOOL_SIZE);
+        for (int i = 0; i < THREADPOOL_SIZE; i++)
+        {
+            executor.execute(new Runnable()
+            {
+                @Override
+                public void run()
+                {
+                    for (int i = 0; i < 100000; i++)
+                    {
+                        put(KEY.getBytes(), VALUE.getBytes());
+                        byte[] res = get(KEY.getBytes());
+                        //since we're flooding the server we might get some timeouts, that's not
+                        //relevant for this test
+                        if (res == null)
+                            continue;
+
+                        if (!Arrays.equals(VALUE.getBytes(), res))
+                        {
+                            /*try
+                            {
+                                dumpKeys(VALUE.getBytes(), res);
+                            }
+                            catch (IOException e)
+                            {
+                                e.printStackTrace();
+                            }*/
+                            failure.countDown();
+                        }
+                    }
+                }
+
+                private void dumpKeys(byte[] putdata, byte[] getdata) throws IOException {
+                    String basename = "bad-data-tid" + Thread.currentThread().getId();
+                    File put = new File(basename+"-put");
+                    File get = new File(basename+"-get");
+                    try(FileWriter pw = new FileWriter(put)) {
+                        pw.write(new String(putdata));
+                    }
+                    try(FileWriter pw = new FileWriter(get)) {
+                        pw.write(new String(getdata));
+                    }
+                }
+            });
+        }
+
+        try
+        {
+            assert!failure.await(2, TimeUnit.MINUTES);
+        }
+        catch (InterruptedException e)
+        {
+
+        }
+        executor.shutdownNow();
+
+    }
+
+    public static byte[] get(byte[] key)
+    {
+        BoundStatement boundStatement = new BoundStatement(getStatement);
+        boundStatement.setBytes(0, ByteBuffer.wrap(key));
+
+        final com.datastax.driver.core.ResultSet resultSet =  session.execute(boundStatement);
+        final Row row = resultSet.one();
+        if (row != null)
+        {
+            final ByteBuffer byteBuf = row.getBytes("value");
+            return Bytes.getArray(byteBuf);
+        }
+
+        return null;
+    }
+
+    public static void put(byte[] key, byte[] value)
+    {
+        BoundStatement boundStatement = new BoundStatement(putStatement);
+        boundStatement.setBytes(0, ByteBuffer.wrap(key));
+        boundStatement.setBytes(1, ByteBuffer.wrap(value));
+
+        session.execute(boundStatement);
+    }
+}