You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/03/27 16:53:08 UTC
[8/12] git commit: clean up DataOutputBuffer
clean up DataOutputBuffer
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9252ee95
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9252ee95
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9252ee95
Branch: refs/heads/trunk
Commit: 9252ee95b9f8e30f94ffb2eaf3a3828dadee3610
Parents: b4af12b
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Feb 29 13:59:20 2012 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Mar 27 09:51:53 2012 -0500
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../apache/cassandra/db/marshal/CompositeType.java | 92 ++++++---------
.../apache/cassandra/io/util/DataOutputBuffer.java | 46 +++++---
.../org/apache/cassandra/io/util/OutputBuffer.java | 73 ------------
.../apache/cassandra/thrift/CassandraServer.java | 19 +--
5 files changed, 76 insertions(+), 156 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9252ee95/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8355543..052d357 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
1.1.1-dev
+ * clean up and optimize DataOutputBuffer, used by CQL compression and
+ CompositeType (CASSANDRA-4072)
* optimize commitlog checksumming (CASSANDRA-3610)
* identify and blacklist corrupted SSTables from future compactions (CASSANDRA-2261)
* Move CfDef and KsDef validation out of thrift (CASSANDRA-4037)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9252ee95/src/java/org/apache/cassandra/db/marshal/CompositeType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CompositeType.java b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
index a3e9e98..67323bf 100644
--- a/src/java/org/apache/cassandra/db/marshal/CompositeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
@@ -18,17 +18,18 @@
*/
package org.apache.cassandra.db.marshal;
-import java.io.*;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.cql3.ColumnNameBuilder;
import org.apache.cassandra.cql3.Relation;
import org.apache.cassandra.cql3.Term;
-import org.apache.cassandra.config.ConfigurationException;
-import org.apache.cassandra.io.util.FastByteArrayOutputStream;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.utils.ByteBufferUtil;
/*
* The encoding of a CompositeType column name should be:
@@ -171,8 +172,7 @@ public class CompositeType extends AbstractCompositeType
private final CompositeType composite;
private int current;
- private final FastByteArrayOutputStream baos = new FastByteArrayOutputStream();
- private final DataOutput out = new DataOutputStream(baos);
+ private final DataOutputBuffer out = new DataOutputBuffer();
public Builder(CompositeType composite)
{
@@ -183,14 +183,7 @@ public class CompositeType extends AbstractCompositeType
{
this(b.composite);
this.current = b.current;
- try
- {
- out.write(b.baos.toByteArray());
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
+ out.write(b.out.getData(), 0, b.out.getLength());
}
public Builder add(Term t, Relation.Type op, List<ByteBuffer> variables) throws InvalidRequestException
@@ -200,38 +193,31 @@ public class CompositeType extends AbstractCompositeType
AbstractType currentType = composite.types.get(current++);
ByteBuffer buffer = t.getByteBuffer(currentType, variables);
- try
- {
- ByteBufferUtil.writeWithShortLength(buffer, out);
-
- /*
- * Given the rules for eoc (end-of-component, see AbstractCompositeType.compare()),
- * We can select:
- * - = 'a' by using <'a'><0>
- * - < 'a' by using <'a'><-1>
- * - <= 'a' by using <'a'><1>
- * - > 'a' by using <'a'><1>
- * - >= 'a' by using <'a'><0>
- */
- switch (op)
- {
- case LT:
- out.write((byte) -1);
- break;
- case GT:
- case LTE:
- out.write((byte) 1);
- break;
- default:
- out.write((byte) 0);
- break;
- }
- return this;
- }
- catch (IOException e)
+ ByteBufferUtil.writeWithShortLength(buffer, out);
+
+ /*
+ * Given the rules for eoc (end-of-component, see AbstractCompositeType.compare()),
+ * We can select:
+ * - = 'a' by using <'a'><0>
+ * - < 'a' by using <'a'><-1>
+ * - <= 'a' by using <'a'><1>
+ * - > 'a' by using <'a'><1>
+ * - >= 'a' by using <'a'><0>
+ */
+ switch (op)
{
- throw new RuntimeException(e);
+ case LT:
+ out.write((byte) -1);
+ break;
+ case GT:
+ case LTE:
+ out.write((byte) 1);
+ break;
+ default:
+ out.write((byte) 0);
+ break;
}
+ return this;
}
public Builder add(ByteBuffer bb)
@@ -239,16 +225,9 @@ public class CompositeType extends AbstractCompositeType
if (current >= composite.types.size())
throw new IllegalStateException("Composite column is already fully constructed");
- try
- {
- ByteBufferUtil.writeWithShortLength(bb, out);
- out.write((byte) 0);
- return this;
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
+ ByteBufferUtil.writeWithShortLength(bb, out);
+ out.write((byte) 0);
+ return this;
}
public int componentCount()
@@ -258,7 +237,8 @@ public class CompositeType extends AbstractCompositeType
public ByteBuffer build()
{
- return ByteBuffer.wrap(baos.toByteArray());
+ // potentially slightly space-wasteful in favor of avoiding a copy
+ return ByteBuffer.wrap(out.getData(), 0, out.getLength());
}
public ByteBuffer buildAsEndOfRange()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9252ee95/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java b/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
index 6248657..93b9bc5 100644
--- a/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
+++ b/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
@@ -19,11 +19,14 @@
package org.apache.cassandra.io.util;
import java.io.DataOutputStream;
+import java.io.IOException;
/**
- * An implementation of the DataOutputStream interface. This class is completely thread
- * unsafe.
+ * An implementation of the DataOutputStream interface using a FastByteArrayOutputStream and exposing
+ * its buffer so copies can be avoided.
+ *
+ * This class is completely thread unsafe.
*/
public final class DataOutputBuffer extends DataOutputStream
{
@@ -34,12 +37,33 @@ public final class DataOutputBuffer extends DataOutputStream
public DataOutputBuffer(int size)
{
- super(new OutputBuffer(size));
+ super(new FastByteArrayOutputStream(size));
+ }
+
+ @Override
+ public void write(int b)
+ {
+ try
+ {
+ super.write(b);
+ }
+ catch (IOException e)
+ {
+ throw new AssertionError(e); // FBOS does not throw IOE
+ }
}
- private OutputBuffer buffer()
+ @Override
+ public void write(byte[] b, int off, int len)
{
- return (OutputBuffer)out;
+ try
+ {
+ super.write(b, off, len);
+ }
+ catch (IOException e)
+ {
+ throw new AssertionError(e); // FBOS does not throw IOE
+ }
}
/**
@@ -48,20 +72,12 @@ public final class DataOutputBuffer extends DataOutputStream
*/
public byte[] getData()
{
- return buffer().getData();
+ return ((FastByteArrayOutputStream) out).buf;
}
/** Returns the length of the valid data currently in the buffer. */
public int getLength()
{
- return buffer().getLength();
- }
-
- /** Resets the buffer to empty. */
- public DataOutputBuffer reset()
- {
- this.written = 0;
- buffer().reset();
- return this;
+ return ((FastByteArrayOutputStream) out).count;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9252ee95/src/java/org/apache/cassandra/io/util/OutputBuffer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/OutputBuffer.java b/src/java/org/apache/cassandra/io/util/OutputBuffer.java
deleted file mode 100644
index 2a64430..0000000
--- a/src/java/org/apache/cassandra/io/util/OutputBuffer.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.io.util;
-
-import java.io.DataInput;
-import java.io.IOException;
-
-/**
- * Extends FastByteArrayOutputStream to minimize copies.
- */
-public final class OutputBuffer extends FastByteArrayOutputStream
-{
- public OutputBuffer()
- {
- this(128);
- }
-
- public OutputBuffer(int size)
- {
- super(size);
- }
-
- public byte[] getData()
- {
- return buf;
- }
-
- public int getLength()
- {
- return count;
- }
-
- public void write(DataInput in, int len) throws IOException
- {
- int newcount = count + len;
- if (newcount > buf.length)
- {
- byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)];
- System.arraycopy(buf, 0, newbuf, 0, count);
- buf = newbuf;
- }
- in.readFully(buf, count, len);
- count = newcount;
- }
-
- /**
- * @return The valid contents of the buffer, possibly by copying: only safe for one-time-use buffers.
- */
- public byte[] asByteArray()
- {
- if (count == buf.length)
- // no-copy
- return buf;
- // copy
- return this.toByteArray();
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9252ee95/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 5fd3a45..27c7cb9 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -24,7 +24,6 @@ import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
import java.util.*;
-import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.zip.DataFormatException;
import java.util.zip.Inflater;
@@ -40,18 +39,14 @@ import org.apache.cassandra.config.*;
import org.apache.cassandra.cql.CQLStatement;
import org.apache.cassandra.cql.QueryProcessor;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.context.CounterContext;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.marshal.MarshalException;
-import org.apache.cassandra.db.context.CounterContext;
import org.apache.cassandra.dht.*;
-import org.apache.cassandra.io.util.FastByteArrayOutputStream;
-import org.apache.cassandra.locator.*;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.locator.DynamicEndpointSnitch;
import org.apache.cassandra.scheduler.IRequestScheduler;
-import org.apache.cassandra.service.ClientState;
-import org.apache.cassandra.service.MigrationManager;
-import org.apache.cassandra.service.SocketSessionManagementService;
-import org.apache.cassandra.service.StorageProxy;
-import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.*;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.thrift.TException;
@@ -1170,7 +1165,7 @@ public class CassandraServer implements Cassandra.Iface
switch (compression)
{
case GZIP:
- FastByteArrayOutputStream byteArray = new FastByteArrayOutputStream();
+ DataOutputBuffer decompressed = new DataOutputBuffer();
byte[] outBuffer = new byte[1024], inBuffer = new byte[1024];
Inflater decompressor = new Inflater();
@@ -1185,7 +1180,7 @@ public class CassandraServer implements Cassandra.Iface
int lenWrite = 0;
while ((lenWrite = decompressor.inflate(outBuffer)) !=0)
- byteArray.write(outBuffer, 0, lenWrite);
+ decompressed.write(outBuffer, 0, lenWrite);
if (decompressor.finished())
break;
@@ -1193,7 +1188,7 @@ public class CassandraServer implements Cassandra.Iface
decompressor.end();
- queryString = new String(byteArray.toByteArray(), 0, byteArray.size(), "UTF-8");
+ queryString = new String(decompressed.getData(), 0, decompressed.size(), "UTF-8");
break;
case NONE:
try