You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/03/27 16:53:08 UTC

[8/12] git commit: clean up DataOutputBuffer

clean up DataOutputBuffer


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9252ee95
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9252ee95
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9252ee95

Branch: refs/heads/trunk
Commit: 9252ee95b9f8e30f94ffb2eaf3a3828dadee3610
Parents: b4af12b
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Feb 29 13:59:20 2012 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Mar 27 09:51:53 2012 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    2 +
 .../apache/cassandra/db/marshal/CompositeType.java |   92 ++++++---------
 .../apache/cassandra/io/util/DataOutputBuffer.java |   46 +++++---
 .../org/apache/cassandra/io/util/OutputBuffer.java |   73 ------------
 .../apache/cassandra/thrift/CassandraServer.java   |   19 +--
 5 files changed, 76 insertions(+), 156 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9252ee95/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8355543..052d357 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
 1.1.1-dev
+ * clean up and optimize DataOutputBuffer, used by CQL compression and
+   CompositeType (CASSANDRA-4072)
  * optimize commitlog checksumming (CASSANDRA-3610)
  * identify and blacklist corrupted SSTables from future compactions (CASSANDRA-2261)
  * Move CfDef and KsDef validation out of thrift (CASSANDRA-4037)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9252ee95/src/java/org/apache/cassandra/db/marshal/CompositeType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CompositeType.java b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
index a3e9e98..67323bf 100644
--- a/src/java/org/apache/cassandra/db/marshal/CompositeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
@@ -18,17 +18,18 @@
  */
 package org.apache.cassandra.db.marshal;
 
-import java.io.*;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
+import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.cql3.ColumnNameBuilder;
 import org.apache.cassandra.cql3.Relation;
 import org.apache.cassandra.cql3.Term;
-import org.apache.cassandra.config.ConfigurationException;
-import org.apache.cassandra.io.util.FastByteArrayOutputStream;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.utils.ByteBufferUtil;
 
 /*
  * The encoding of a CompositeType column name should be:
@@ -171,8 +172,7 @@ public class CompositeType extends AbstractCompositeType
         private final CompositeType composite;
         private int current;
 
-        private final FastByteArrayOutputStream baos = new FastByteArrayOutputStream();
-        private final DataOutput out = new DataOutputStream(baos);
+        private final DataOutputBuffer out = new DataOutputBuffer();
 
         public Builder(CompositeType composite)
         {
@@ -183,14 +183,7 @@ public class CompositeType extends AbstractCompositeType
         {
             this(b.composite);
             this.current = b.current;
-            try
-            {
-                out.write(b.baos.toByteArray());
-            }
-            catch (IOException e)
-            {
-                throw new RuntimeException(e);
-            }
+            out.write(b.out.getData(), 0, b.out.getLength());
         }
 
         public Builder add(Term t, Relation.Type op, List<ByteBuffer> variables) throws InvalidRequestException
@@ -200,38 +193,31 @@ public class CompositeType extends AbstractCompositeType
 
             AbstractType currentType = composite.types.get(current++);
             ByteBuffer buffer = t.getByteBuffer(currentType, variables);
-            try
-            {
-                ByteBufferUtil.writeWithShortLength(buffer, out);
-
-                /*
-                 * Given the rules for eoc (end-of-component, see AbstractCompositeType.compare()),
-                 * We can select:
-                 *   - = 'a' by using <'a'><0>
-                 *   - < 'a' by using <'a'><-1>
-                 *   - <= 'a' by using <'a'><1>
-                 *   - > 'a' by using <'a'><1>
-                 *   - >= 'a' by using <'a'><0>
-                 */
-                switch (op)
-                {
-                    case LT:
-                        out.write((byte) -1);
-                        break;
-                    case GT:
-                    case LTE:
-                        out.write((byte) 1);
-                        break;
-                    default:
-                        out.write((byte) 0);
-                        break;
-                }
-                return this;
-            }
-            catch (IOException e)
+            ByteBufferUtil.writeWithShortLength(buffer, out);
+
+            /*
+             * Given the rules for eoc (end-of-component, see AbstractCompositeType.compare()),
+             * We can select:
+             *   - = 'a' by using <'a'><0>
+             *   - < 'a' by using <'a'><-1>
+             *   - <= 'a' by using <'a'><1>
+             *   - > 'a' by using <'a'><1>
+             *   - >= 'a' by using <'a'><0>
+             */
+            switch (op)
             {
-                throw new RuntimeException(e);
+                case LT:
+                    out.write((byte) -1);
+                    break;
+                case GT:
+                case LTE:
+                    out.write((byte) 1);
+                    break;
+                default:
+                    out.write((byte) 0);
+                    break;
             }
+            return this;
         }
 
         public Builder add(ByteBuffer bb)
@@ -239,16 +225,9 @@ public class CompositeType extends AbstractCompositeType
             if (current >= composite.types.size())
                 throw new IllegalStateException("Composite column is already fully constructed");
 
-            try
-            {
-                ByteBufferUtil.writeWithShortLength(bb, out);
-                out.write((byte) 0);
-                return this;
-            }
-            catch (IOException e)
-            {
-                throw new RuntimeException(e);
-            }
+            ByteBufferUtil.writeWithShortLength(bb, out);
+            out.write((byte) 0);
+            return this;
         }
 
         public int componentCount()
@@ -258,7 +237,8 @@ public class CompositeType extends AbstractCompositeType
 
         public ByteBuffer build()
         {
-            return ByteBuffer.wrap(baos.toByteArray());
+            // potentially slightly space-wasteful in favor of avoiding a copy
+            return ByteBuffer.wrap(out.getData(), 0, out.getLength());
         }
 
         public ByteBuffer buildAsEndOfRange()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9252ee95/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java b/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
index 6248657..93b9bc5 100644
--- a/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
+++ b/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
@@ -19,11 +19,14 @@
 package org.apache.cassandra.io.util;
 
 import java.io.DataOutputStream;
+import java.io.IOException;
 
 
 /**
- * An implementation of the DataOutputStream interface. This class is completely thread
- * unsafe.
+ * An implementation of the DataOutputStream interface using a FastByteArrayOutputStream and exposing
+ * its buffer so copies can be avoided.
+ *
+ * This class is completely thread unsafe.
  */
 public final class DataOutputBuffer extends DataOutputStream
 {
@@ -34,12 +37,33 @@ public final class DataOutputBuffer extends DataOutputStream
 
     public DataOutputBuffer(int size)
     {
-        super(new OutputBuffer(size));
+        super(new FastByteArrayOutputStream(size));
+    }
+
+    @Override
+    public void write(int b)
+    {
+        try
+        {
+            super.write(b);
+        }
+        catch (IOException e)
+        {
+            throw new AssertionError(e); // FBOS does not throw IOE
+        }
     }
 
-    private OutputBuffer buffer()
+    @Override
+    public void write(byte[] b, int off, int len)
     {
-        return (OutputBuffer)out;
+        try
+        {
+            super.write(b, off, len);
+        }
+        catch (IOException e)
+        {
+            throw new AssertionError(e); // FBOS does not throw IOE
+        }
     }
 
     /**
@@ -48,20 +72,12 @@ public final class DataOutputBuffer extends DataOutputStream
      */
     public byte[] getData()
     {
-        return buffer().getData();
+        return ((FastByteArrayOutputStream) out).buf;
     }
 
     /** Returns the length of the valid data currently in the buffer. */
     public int getLength()
     {
-        return buffer().getLength();
-    }
-
-    /** Resets the buffer to empty. */
-    public DataOutputBuffer reset()
-    {
-        this.written = 0;
-        buffer().reset();
-        return this;
+        return ((FastByteArrayOutputStream) out).count;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9252ee95/src/java/org/apache/cassandra/io/util/OutputBuffer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/OutputBuffer.java b/src/java/org/apache/cassandra/io/util/OutputBuffer.java
deleted file mode 100644
index 2a64430..0000000
--- a/src/java/org/apache/cassandra/io/util/OutputBuffer.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.io.util;
-
-import java.io.DataInput;
-import java.io.IOException;
-
-/**
- * Extends FastByteArrayOutputStream to minimize copies.
- */
-public final class OutputBuffer extends FastByteArrayOutputStream
-{
-    public OutputBuffer()
-    {
-        this(128);
-    }
-
-    public OutputBuffer(int size)
-    {
-        super(size);
-    }
-
-    public byte[] getData()
-    {
-        return buf;
-    }
-
-    public int getLength()
-    {
-        return count;
-    }
-
-    public void write(DataInput in, int len) throws IOException
-    {
-        int newcount = count + len;
-        if (newcount > buf.length)
-        {
-            byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)];
-            System.arraycopy(buf, 0, newbuf, 0, count);
-            buf = newbuf;
-        }
-        in.readFully(buf, count, len);
-        count = newcount;
-    }
-
-    /**
-     * @return The valid contents of the buffer, possibly by copying: only safe for one-time-use buffers.
-     */
-    public byte[] asByteArray()
-    {
-        if (count == buf.length)
-            // no-copy
-            return buf;
-        // copy
-        return this.toByteArray();
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9252ee95/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 5fd3a45..27c7cb9 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -24,7 +24,6 @@ import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
 import java.util.*;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeoutException;
 import java.util.zip.DataFormatException;
 import java.util.zip.Inflater;
@@ -40,18 +39,14 @@ import org.apache.cassandra.config.*;
 import org.apache.cassandra.cql.CQLStatement;
 import org.apache.cassandra.cql.QueryProcessor;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.marshal.MarshalException;
-import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.dht.*;
-import org.apache.cassandra.io.util.FastByteArrayOutputStream;
-import org.apache.cassandra.locator.*;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.locator.DynamicEndpointSnitch;
 import org.apache.cassandra.scheduler.IRequestScheduler;
-import org.apache.cassandra.service.ClientState;
-import org.apache.cassandra.service.MigrationManager;
-import org.apache.cassandra.service.SocketSessionManagementService;
-import org.apache.cassandra.service.StorageProxy;
-import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.thrift.TException;
 
@@ -1170,7 +1165,7 @@ public class CassandraServer implements Cassandra.Iface
             switch (compression)
             {
                 case GZIP:
-                    FastByteArrayOutputStream byteArray = new FastByteArrayOutputStream();
+                    DataOutputBuffer decompressed = new DataOutputBuffer();
                     byte[] outBuffer = new byte[1024], inBuffer = new byte[1024];
 
                     Inflater decompressor = new Inflater();
@@ -1185,7 +1180,7 @@ public class CassandraServer implements Cassandra.Iface
 
                         int lenWrite = 0;
                         while ((lenWrite = decompressor.inflate(outBuffer)) !=0)
-                            byteArray.write(outBuffer, 0, lenWrite);
+                            decompressed.write(outBuffer, 0, lenWrite);
 
                         if (decompressor.finished())
                             break;
@@ -1193,7 +1188,7 @@ public class CassandraServer implements Cassandra.Iface
 
                     decompressor.end();
 
-                    queryString = new String(byteArray.toByteArray(), 0, byteArray.size(), "UTF-8");
+                    queryString = new String(decompressed.getData(), 0, decompressed.size(), "UTF-8");
                     break;
                 case NONE:
                     try