You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by "Paul Loy (JIRA)" <ji...@apache.org> on 2011/06/24 12:34:47 UTC
[jira] [Updated] (CASSANDRA-2820) Re-introduce
FastByteArrayInputStream (and Output equivalent)
[ https://issues.apache.org/jira/browse/CASSANDRA-2820?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Paul Loy updated CASSANDRA-2820:
--------------------------------
Comment: was deleted
(was: Index: org/apache/cassandra/db/BinaryVerbHandler.java
===================================================================
--- org/apache/cassandra/db/BinaryVerbHandler.java (revision 1134295)
+++ org/apache/cassandra/db/BinaryVerbHandler.java (working copy)
@@ -18,15 +18,14 @@
package org.apache.cassandra.db;
-import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class BinaryVerbHandler implements IVerbHandler
{
@@ -35,7 +34,7 @@
public void doVerb(Message message, String id)
{
byte[] bytes = message.getMessageBody();
- ByteArrayInputStream buffer = new ByteArrayInputStream(bytes);
+ FastByteArrayInputStream buffer = new FastByteArrayInputStream(bytes);
try
{
Index: org/apache/cassandra/db/CounterMutation.java
===================================================================
--- org/apache/cassandra/db/CounterMutation.java (revision 1134295)
+++ org/apache/cassandra/db/CounterMutation.java (working copy)
@@ -18,31 +18,26 @@
package org.apache.cassandra.db;
-import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.lang.ThreadLocal;
import java.nio.ByteBuffer;
-import java.net.InetAddress;
import java.util.Collection;
import java.util.HashSet;
-import java.util.List;
import java.util.LinkedList;
+import java.util.List;
import java.util.Random;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.cassandra.db.filter.QueryPath;
-import org.apache.cassandra.db.marshal.AbstractCommutativeType;
-import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.util.FastByteArrayOutputStream;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class CounterMutation implements IMutation
{
@@ -219,7 +214,7 @@
public Message makeMutationMessage(int version) throws IOException
{
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
serializer().serialize(this, dos, version);
return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.COUNTER_MUTATION, bos.toByteArray(), version);
Index: org/apache/cassandra/db/CounterMutationVerbHandler.java
===================================================================
--- org/apache/cassandra/db/CounterMutationVerbHandler.java (revision 1134295)
+++ org/apache/cassandra/db/CounterMutationVerbHandler.java (working copy)
@@ -18,22 +18,20 @@
package org.apache.cassandra.db;
-import java.io.*;
+import java.io.DataInputStream;
+import java.io.IOException;
import java.util.concurrent.TimeoutException;
-import java.net.InetAddress;
-import java.nio.ByteBuffer;
-import java.util.Collections;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.net.*;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageProxy;
-import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.UnavailableException;
+import org.apache.cassandra.utils.FBUtilities;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class CounterMutationVerbHandler implements IVerbHandler
{
@@ -42,7 +40,7 @@
public void doVerb(Message message, String id)
{
byte[] bytes = message.getMessageBody();
- ByteArrayInputStream buffer = new ByteArrayInputStream(bytes);
+ FastByteArrayInputStream buffer = new FastByteArrayInputStream(bytes);
try
{
Index: org/apache/cassandra/db/IndexScanCommand.java
===================================================================
--- org/apache/cassandra/db/IndexScanCommand.java (revision 1134295)
+++ org/apache/cassandra/db/IndexScanCommand.java (working copy)
@@ -19,21 +19,26 @@
package org.apache.cassandra.db;
-import java.io.*;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOError;
+import java.io.IOException;
import java.util.Arrays;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.io.ICompactSerializer2;
import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessageProducer;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.IndexClause;
import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.TBinaryProtocol;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TSerializer;
-import org.apache.cassandra.thrift.TBinaryProtocol;
public class IndexScanCommand implements MessageProducer
{
@@ -75,7 +80,7 @@
public static IndexScanCommand read(Message message) throws IOException
{
byte[] bytes = message.getMessageBody();
- ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+ FastByteArrayInputStream bis = new FastByteArrayInputStream(bytes);
return serializer.deserialize(new DataInputStream(bis));
}
Index: org/apache/cassandra/db/RangeSliceCommand.java
===================================================================
--- org/apache/cassandra/db/RangeSliceCommand.java (revision 1134295)
+++ org/apache/cassandra/db/RangeSliceCommand.java (working copy)
@@ -36,7 +36,6 @@
package org.apache.cassandra.db;
-import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -46,17 +45,18 @@
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessageProducer;
import org.apache.cassandra.service.IReadCommand;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.TBinaryProtocol;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TSerializer;
-import org.apache.cassandra.thrift.TBinaryProtocol;
public class RangeSliceCommand implements MessageProducer, IReadCommand
{
@@ -112,7 +112,7 @@
public static RangeSliceCommand read(Message message) throws IOException
{
byte[] bytes = message.getMessageBody();
- ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+ FastByteArrayInputStream bis = new FastByteArrayInputStream(bytes);
return serializer.deserialize(new DataInputStream(bis), message.getVersion());
}
Index: org/apache/cassandra/db/RangeSliceReply.java
===================================================================
--- org/apache/cassandra/db/RangeSliceReply.java (revision 1134295)
+++ org/apache/cassandra/db/RangeSliceReply.java (working copy)
@@ -18,18 +18,17 @@
package org.apache.cassandra.db;
-import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import org.apache.commons.lang.StringUtils;
-
import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.commons.lang.StringUtils;
public class RangeSliceReply
{
@@ -62,7 +61,7 @@
public static RangeSliceReply read(byte[] body, int version) throws IOException
{
- ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
+ FastByteArrayInputStream bufIn = new FastByteArrayInputStream(body);
DataInputStream dis = new DataInputStream(bufIn);
int rowCount = dis.readInt();
List<Row> rows = new ArrayList<Row>(rowCount);
Index: org/apache/cassandra/db/ReadCommand.java
===================================================================
--- org/apache/cassandra/db/ReadCommand.java (revision 1134295)
+++ org/apache/cassandra/db/ReadCommand.java (working copy)
@@ -18,7 +18,6 @@
package org.apache.cassandra.db;
-import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -29,6 +28,7 @@
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.util.FastByteArrayOutputStream;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessageProducer;
import org.apache.cassandra.service.IReadCommand;
@@ -50,7 +50,7 @@
public Message getMessage(Integer version) throws IOException
{
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
ReadCommand.serializer().serialize(this, dos, version);
return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.READ, bos.toByteArray(), version);
Index: org/apache/cassandra/db/ReadRepairVerbHandler.java
===================================================================
--- org/apache/cassandra/db/ReadRepairVerbHandler.java (revision 1134295)
+++ org/apache/cassandra/db/ReadRepairVerbHandler.java (working copy)
@@ -18,11 +18,11 @@
package org.apache.cassandra.db;
-import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOError;
import java.io.IOException;
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
@@ -31,7 +31,7 @@
public void doVerb(Message message, String id)
{
byte[] body = message.getMessageBody();
- ByteArrayInputStream buffer = new ByteArrayInputStream(body);
+ FastByteArrayInputStream buffer = new FastByteArrayInputStream(body);
try
{
Index: org/apache/cassandra/db/ReadVerbHandler.java
===================================================================
--- org/apache/cassandra/db/ReadVerbHandler.java (revision 1134295)
+++ org/apache/cassandra/db/ReadVerbHandler.java (working copy)
@@ -18,26 +18,25 @@
package org.apache.cassandra.db;
-import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ReadVerbHandler implements IVerbHandler
{
protected static class ReadContext
{
- protected ByteArrayInputStream bufIn_;
+ protected FastByteArrayInputStream bufIn_;
protected DataOutputBuffer bufOut_ = new DataOutputBuffer();
}
@@ -55,7 +54,7 @@
readCtx = new ReadContext();
tls_.set(readCtx);
}
- readCtx.bufIn_ = new ByteArrayInputStream(body);
+ readCtx.bufIn_ = new FastByteArrayInputStream(body);
try
{
Index: org/apache/cassandra/db/RowMutation.java
===================================================================
--- org/apache/cassandra/db/RowMutation.java (revision 1134295)
+++ org/apache/cassandra/db/RowMutation.java (working copy)
@@ -18,27 +18,32 @@
package org.apache.cassandra.db;
-import java.io.*;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.ExecutionException;
-import org.apache.cassandra.net.MessageProducer;
-import org.apache.commons.lang.StringUtils;
-
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.KSMetaData;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
+import org.apache.cassandra.io.util.FastByteArrayOutputStream;
import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageProducer;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.thrift.Deletion;
-import org.apache.cassandra.thrift.Mutation;
-import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.commons.lang.StringUtils;
public class RowMutation implements IMutation, MessageProducer
{
@@ -239,7 +244,7 @@
byte[] preserializedBuffer = preserializedBuffers.get(version);
if (preserializedBuffer == null)
{
- ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ FastByteArrayOutputStream bout = new FastByteArrayOutputStream();
DataOutputStream dout = new DataOutputStream(bout);
RowMutation.serializer().serialize(this, dout, version);
dout.close();
@@ -321,7 +326,7 @@
static RowMutation fromBytes(byte[] raw, int version) throws IOException
{
- RowMutation rm = serializer_.deserialize(new DataInputStream(new ByteArrayInputStream(raw)), version);
+ RowMutation rm = serializer_.deserialize(new DataInputStream(new FastByteArrayInputStream(raw)), version);
rm.preserializedBuffers.put(version, raw);
return rm;
}
Index: org/apache/cassandra/db/RowMutationVerbHandler.java
===================================================================
--- org/apache/cassandra/db/RowMutationVerbHandler.java (revision 1134295)
+++ org/apache/cassandra/db/RowMutationVerbHandler.java (working copy)
@@ -18,21 +18,20 @@
package org.apache.cassandra.db;
-import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class RowMutationVerbHandler implements IVerbHandler
@@ -52,7 +51,7 @@
if (hintedBytes != null)
{
assert hintedBytes.length > 0;
- DataInputStream dis = new DataInputStream(new ByteArrayInputStream(hintedBytes));
+ DataInputStream dis = new DataInputStream(new FastByteArrayInputStream(hintedBytes));
while (dis.available() > 0)
{
ByteBuffer addressBytes = ByteBufferUtil.readWithShortLength(dis);
Index: org/apache/cassandra/db/TruncateResponse.java
===================================================================
--- org/apache/cassandra/db/TruncateResponse.java (revision 1134295)
+++ org/apache/cassandra/db/TruncateResponse.java (working copy)
@@ -18,12 +18,12 @@
package org.apache.cassandra.db;
-import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.util.FastByteArrayOutputStream;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.utils.FBUtilities;
@@ -51,7 +51,7 @@
public static Message makeTruncateResponseMessage(Message original, TruncateResponse truncateResponseMessage)
throws IOException
{
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
TruncateResponse.serializer().serialize(truncateResponseMessage, dos, original.getVersion());
return original.getReply(FBUtilities.getLocalAddress(), bos.toByteArray(), original.getVersion());
Index: org/apache/cassandra/db/TruncateVerbHandler.java
===================================================================
--- org/apache/cassandra/db/TruncateVerbHandler.java (revision 1134295)
+++ org/apache/cassandra/db/TruncateVerbHandler.java (working copy)
@@ -18,17 +18,16 @@
package org.apache.cassandra.db;
-import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOError;
import java.io.IOException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class TruncateVerbHandler implements IVerbHandler
{
@@ -37,7 +36,7 @@
public void doVerb(Message message, String id)
{
byte[] bytes = message.getMessageBody();
- ByteArrayInputStream buffer = new ByteArrayInputStream(bytes);
+ FastByteArrayInputStream buffer = new FastByteArrayInputStream(bytes);
try
{
Index: org/apache/cassandra/db/Truncation.java
===================================================================
--- org/apache/cassandra/db/Truncation.java (revision 1134295)
+++ org/apache/cassandra/db/Truncation.java (working copy)
@@ -18,12 +18,12 @@
package org.apache.cassandra.db;
-import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.util.FastByteArrayOutputStream;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessageProducer;
import org.apache.cassandra.service.StorageService;
@@ -69,7 +69,7 @@
public Message getMessage(Integer version) throws IOException
{
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
serializer().serialize(this, dos, version);
return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.TRUNCATE, bos.toByteArray(), version);
Index: org/apache/cassandra/db/WriteResponse.java
===================================================================
--- org/apache/cassandra/db/WriteResponse.java (revision 1134295)
+++ org/apache/cassandra/db/WriteResponse.java (working copy)
@@ -18,13 +18,13 @@
package org.apache.cassandra.db;
-import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.util.FastByteArrayOutputStream;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
@@ -46,7 +46,7 @@
public static Message makeWriteResponseMessage(Message original, WriteResponse writeResponseMessage) throws IOException
{
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream( bos );
WriteResponse.serializer().serialize(writeResponseMessage, dos, original.getVersion());
return original.getReply(FBUtilities.getLocalAddress(), bos.toByteArray(), original.getVersion());
Index: org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
===================================================================
--- org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java (revision 1134295)
+++ org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java (working copy)
@@ -21,18 +21,17 @@
*/
-import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Map;
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
-
public class GossipDigestAck2VerbHandler implements IVerbHandler
{
private static Logger logger_ = LoggerFactory.getLogger(GossipDigestAck2VerbHandler.class);
@@ -44,7 +43,7 @@
logger_.trace("Received a GossipDigestAck2Message from {}", from);
byte[] bytes = message.getMessageBody();
- DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
+ DataInputStream dis = new DataInputStream( new FastByteArrayInputStream(bytes) );
GossipDigestAck2Message gDigestAck2Message;
try
{
Index: org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
===================================================================
--- org/apache/cassandra/gms/GossipDigestAckVerbHandler.java (revision 1134295)
+++ org/apache/cassandra/gms/GossipDigestAckVerbHandler.java (working copy)
@@ -21,7 +21,6 @@
*/
-import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.InetAddress;
@@ -29,12 +28,12 @@
import java.util.List;
import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class GossipDigestAckVerbHandler implements IVerbHandler
{
@@ -53,7 +52,7 @@
}
byte[] bytes = message.getMessageBody();
- DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
+ DataInputStream dis = new DataInputStream( new FastByteArrayInputStream(bytes) );
try
{
Index: org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
===================================================================
--- org/apache/cassandra/gms/GossipDigestSynVerbHandler.java (revision 1134295)
+++ org/apache/cassandra/gms/GossipDigestSynVerbHandler.java (working copy)
@@ -21,19 +21,22 @@
*/
-import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.InetAddress;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class GossipDigestSynVerbHandler implements IVerbHandler
{
@@ -52,7 +55,7 @@
}
byte[] bytes = message.getMessageBody();
- DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
+ DataInputStream dis = new DataInputStream( new FastByteArrayInputStream(bytes) );
try
{
Index: org/apache/cassandra/gms/Gossiper.java
===================================================================
--- org/apache/cassandra/gms/Gossiper.java (revision 1134295)
+++ org/apache/cassandra/gms/Gossiper.java (working copy)
@@ -23,24 +23,34 @@
import java.io.IOError;
import java.io.IOException;
import java.net.InetAddress;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
-import java.util.concurrent.*;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
-import org.apache.cassandra.db.SystemTable;
+import org.apache.cassandra.concurrent.RetryingScheduledThreadPoolExecutor;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.util.FastByteArrayOutputStream;
+import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessageProducer;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.concurrent.RetryingScheduledThreadPoolExecutor;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.StorageService;
-
/**
* This module is responsible for Gossiping information for the local endpoint. This abstraction
* maintains the list of live and dead endpoints. Periodically i.e. every 1 second this module
@@ -335,7 +345,7 @@
Message makeGossipDigestSynMessage(List<GossipDigest> gDigests, int version) throws IOException
{
GossipDigestSynMessage gDigestMessage = new GossipDigestSynMessage(DatabaseDescriptor.getClusterName(), gDigests);
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream( bos );
GossipDigestSynMessage.serializer().serialize(gDigestMessage, dos, version);
return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.GOSSIP_DIGEST_SYN, bos.toByteArray(), version);
@@ -343,7 +353,7 @@
Message makeGossipDigestAckMessage(GossipDigestAckMessage gDigestAckMessage, int version) throws IOException
{
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
GossipDigestAckMessage.serializer().serialize(gDigestAckMessage, dos, version);
return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.GOSSIP_DIGEST_ACK, bos.toByteArray(), version);
@@ -351,7 +361,7 @@
Message makeGossipDigestAck2Message(GossipDigestAck2Message gDigestAck2Message, int version) throws IOException
{
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
GossipDigestAck2Message.serializer().serialize(gDigestAck2Message, dos, version);
return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.GOSSIP_DIGEST_ACK2, bos.toByteArray(), version);
Index: org/apache/cassandra/io/util/FastByteArrayInputStream.java
===================================================================
--- org/apache/cassandra/io/util/FastByteArrayInputStream.java (revision 0)
+++ org/apache/cassandra/io/util/FastByteArrayInputStream.java (revision 0)
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.cassandra.io.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * A {@link FastByteArrayInputStream} is a carbon copy of the Apache Harmony
+ * implementation of {@link ByteArrayInputStream} with the synchronized methods
+ * unsynchronized. This makes the {@link FastByteArrayInputStream}
+ * <b>non-thread-safe</b>.
+ * <p />
+ * By not synchronizing, locks do not need to be checked, making each {@link #read()}
+ * operation around 2x faster.
+ *
+ * @author Paul_Loy
+ * @see ByteArrayInputStream
+ */
+public class FastByteArrayInputStream extends InputStream {
+
+ /**
+ * The {@code byte} array containing the bytes to stream over.
+ */
+ protected byte buf[];
+
+ /**
+ * The current position within the byte array.
+ */
+ protected int pos;
+
+ /**
+ * The current mark position. Initially set to 0 or the <code>offset</code>
+ * parameter within the constructor.
+ */
+ protected int mark;
+
+ /**
+ * The total number of bytes initially available in the byte array
+ * {@code buf}.
+ */
+ protected int count;
+
+ /**
+ * Constructs a new {@code FastByteArrayInputStream} on the byte array
+ * {@code buf}.
+ *
+ * @param buf
+ * the byte array to stream over.
+ */
+ public FastByteArrayInputStream(byte buf[]) {
+ this.buf = buf;
+ this.pos = 0;
+ this.count = buf.length;
+ }
+
+ /**
+ * Constructs a new {@code FastByteArrayInputStream} on the byte array
+ * {@code buf} with the initial position set to {@code offset} and the
+ * number of bytes available set to {@code offset} + {@code length}.
+ *
+ * @param buf
+ * the byte array to stream over.
+ * @param offset
+ * the initial position in {@code buf} to start streaming from.
+ * @param length
+ * the number of bytes available for streaming.
+ */
+ public FastByteArrayInputStream(byte buf[], int offset, int length) {
+ this.buf = buf;
+ this.pos = offset;
+ this.count = Math.min(offset + length, buf.length);
+ this.mark = offset;
+ }
+
+ /**
+ * Returns the number of bytes that are available before this stream will
+ * block. This method returns the number of bytes yet to be read from the
+ * source byte array.
+ * <p>
+ * {@code synchronize} was removed from this method.
+ *
+ * @return the number of bytes available before blocking.
+ */
+ public int available() {
+ return count - pos;
+ }
+
+ /**
+ * Closes this stream and frees resources associated with this stream.
+ *
+ * @throws IOException
+ * if an I/O error occurs while closing this stream.
+ */
+ @Override
+ public void close() throws IOException {
+ // Do nothing on close, this matches JDK behaviour.
+ }
+
+ /**
+ * Sets a mark position in this ByteArrayInputStream. The parameter
+ * {@code readlimit} is ignored. Sending {@code reset()} will reposition the
+ * stream back to the marked position.
+ *
+ * @param readlimit
+ * ignored.
+ * @see #markSupported()
+ * @see #reset()
+ */
+ @Override
+ public void mark(int readlimit) {
+ mark = pos;
+ }
+
+ /**
+ * Indicates whether this stream supports the {@code mark()} and
+ * {@code reset()} methods. Returns {@code true} since this class supports
+ * these methods.
+ *
+ * @return always {@code true}.
+ * @see #mark(int)
+ * @see #reset()
+ */
+ @Override
+ public boolean markSupported() {
+ return true;
+ }
+
+ /**
+ * Reads a single byte from the source byte array and returns it as an
+ * integer in the range from 0 to 255. Returns -1 if the end of the source
+ * array has been reached.
+ * <p>
+ * {@code synchronize} was removed from this method.
+ *
+ * @return the byte read or -1 if the end of this stream has been reached.
+ */
+ @Override
+ public int read() {
+ return pos < count ? buf[pos++] & 0xFF : -1;
+ }
+
+ /**
+ * Reads at most {@code len} bytes from this stream and stores
+ * them in byte array {@code b} starting at {@code offset}. This
+ * implementation reads bytes from the source byte array.
+ * <p>
+ * {@code synchronize} was removed from this method.
+ *
+ * @param b
+ * the byte array in which to store the bytes read.
+ * @param offset
+ * the initial position in {@code b} to store the bytes read from
+ * this stream.
+ * @param length
+ * the maximum number of bytes to store in {@code b}.
+ * @return the number of bytes actually read or -1 if no bytes were read and
+ * the end of the stream was encountered.
+ * @throws IndexOutOfBoundsException
+ * if {@code offset < 0} or {@code length < 0}, or if
+ * {@code offset + length} is greater than the size of
+ * {@code b}.
+ * @throws NullPointerException
+ * if {@code b} is {@code null}.
+ */
+ @Override
+ public int read(byte b[], int offset, int length) {
+ if (b == null) {
+ throw new NullPointerException();
+ }
+ // avoid int overflow
+ if (offset < 0 || offset > b.length || length < 0
+ || length > b.length - offset) {
+ throw new IndexOutOfBoundsException();
+ }
+ // Are there any bytes available?
+ if (this.pos >= this.count) {
+ return -1;
+ }
+ if (length == 0) {
+ return 0;
+ }
+
+ int copylen = this.count - pos < length ? this.count - pos : length;
+ System.arraycopy(buf, pos, b, offset, copylen);
+ pos += copylen;
+ return copylen;
+ }
+
+ /**
+ * Resets this stream to the last marked location. This implementation
+ * resets the position to either the marked position, the start position
+ * supplied in the constructor or 0 if neither has been provided.
+ * <p>
+ * {@code synchronize} was removed from this method.
+ *
+ * @see #mark(int)
+ */
+ @Override
+ public void reset() {
+ pos = mark;
+ }
+
+ /**
+ * Skips {@code count} number of bytes in this InputStream. Subsequent
+ * {@code read()}s will not return these bytes unless {@code reset()} is
+ * used. This implementation skips {@code count} number of bytes in the
+ * target stream. It does nothing and returns 0 if {@code n} is negative.
+ * <p>
+ * {@code synchronize} was removed from this method.
+ *
+ * @param n
+ * the number of bytes to skip.
+ * @return the number of bytes actually skipped.
+ */
+ @Override
+ public long skip(long n) {
+ if (n <= 0) {
+ return 0;
+ }
+ int temp = pos;
+ pos = this.count - pos < n ? this.count : (int) (pos + n);
+ return pos - temp;
+ }
+
+}
Index: org/apache/cassandra/io/util/FastByteArrayOutputStream.java
===================================================================
--- org/apache/cassandra/io/util/FastByteArrayOutputStream.java (revision 0)
+++ org/apache/cassandra/io/util/FastByteArrayOutputStream.java (revision 0)
@@ -0,0 +1,260 @@
+package org.apache.cassandra.io.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/**
+ * A specialized {@link OutputStream} for class for writing content to an
+ * (internal) byte array. As bytes are written to this stream, the byte array
+ * may be expanded to hold more bytes. When the writing is considered to be
+ * finished, a copy of the byte array can be requested from the class.
+ * <p>
+ * This version has been adapted from the Harmony {@link ByteArrayOutputStream}
+ * by removing synchronized from all methods. When you're accessing this stream
+ * in a single thread, this can have a significant performance boost.
+ *
+ * @see ByteArrayInputStream
+ */
+public class FastByteArrayOutputStream extends OutputStream {
+ /**
+ * The byte array containing the bytes written.
+ */
+ protected byte[] buf;
+
+ /**
+ * The number of bytes written.
+ */
+ protected int count;
+
+ /**
+ * Constructs a new ByteArrayOutputStream with a default size of 32 bytes.
+ * If more than 32 bytes are written to this instance, the underlying byte
+ * array will expand.
+ */
+ public FastByteArrayOutputStream() {
+ super();
+ buf = new byte[32];
+ }
+
+ /**
+ * Constructs a new {@code ByteArrayOutputStream} with a default size of
+ * {@code size} bytes. If more than {@code size} bytes are written to this
+ * instance, the underlying byte array will expand.
+ *
+ * @param size
+ * initial size for the underlying byte array, must be
+ * non-negative.
+ * @throws IllegalArgumentException
+ * if {@code size} < 0.
+ */
+ public FastByteArrayOutputStream(int size) {
+ super();
+ if (size >= 0) {
+ buf = new byte[size];
+ } else {
+ throw new IllegalArgumentException();
+ }
+ }
+
+ /**
+ * Closes this stream. This releases system resources used for this stream.
+ *
+ * @throws IOException
+ * if an error occurs while attempting to close this stream.
+ */
+ @Override
+ public void close() throws IOException {
+ /**
+ * Although the spec claims "A closed stream cannot perform output
+ * operations and cannot be reopened.", this implementation must do
+ * nothing.
+ */
+ super.close();
+ }
+
+ private void expand(int i) {
+ /* Can the buffer handle @i more bytes, if not expand it */
+ if (count + i <= buf.length) {
+ return;
+ }
+
+ byte[] newbuf = new byte[(count + i) * 2];
+ System.arraycopy(buf, 0, newbuf, 0, count);
+ buf = newbuf;
+ }
+
+ /**
+ * Resets this stream to the beginning of the underlying byte array. All
+ * subsequent writes will overwrite any bytes previously stored in this
+ * stream.
+ * <p>
+ * {@code synchronize} was removed from this method.
+ */
+ public void reset() {
+ count = 0;
+ }
+
+ /**
+ * Returns the total number of bytes written to this stream so far.
+ *
+ * @return the number of bytes written to this stream.
+ */
+ public int size() {
+ return count;
+ }
+
+ /**
+ * Returns the contents of this ByteArrayOutputStream as a byte array. Any
+ * changes made to the receiver after returning will not be reflected in the
+ * byte array returned to the caller.
+ * <p>
+ * {@code synchronize} was removed from this method.
+ *
+ * @return this stream's current contents as a byte array.
+ */
+ public byte[] toByteArray() {
+ byte[] newArray = new byte[count];
+ System.arraycopy(buf, 0, newArray, 0, count);
+ return newArray;
+ }
+
+ /**
+ * Returns the contents of this ByteArrayOutputStream as a string. Any
+ * changes made to the receiver after returning will not be reflected in the
+ * string returned to the caller.
+ *
+ * @return this stream's current contents as a string.
+ */
+
+ @Override
+ public String toString() {
+ return new String(buf, 0, count);
+ }
+
+ /**
+ * Returns the contents of this ByteArrayOutputStream as a string. Each byte
+ * {@code b} in this stream is converted to a character {@code c} using the
+ * following function:
+ * {@code c == (char)(((hibyte & 0xff) << 8) | (b & 0xff))}. This method is
+ * deprecated and either {@link #toString()} or {@link #toString(String)}
+ * should be used.
+ *
+ * @param hibyte
+ * the high byte of each resulting Unicode character.
+ * @return this stream's current contents as a string with the high byte set
+ * to {@code hibyte}.
+ * @deprecated Use {@link #toString()}.
+ */
+ @Deprecated
+ public String toString(int hibyte) {
+ char[] newBuf = new char[size()];
+ for (int i = 0; i < newBuf.length; i++) {
+ newBuf[i] = (char) (((hibyte & 0xff) << 8) | (buf[i] & 0xff));
+ }
+ return new String(newBuf);
+ }
+
+ /**
+ * Returns the contents of this ByteArrayOutputStream as a string converted
+ * according to the encoding declared in {@code enc}.
+ *
+ * @param enc
+ * a string representing the encoding to use when translating
+ * this stream to a string.
+ * @return this stream's current contents as an encoded string.
+ * @throws UnsupportedEncodingException
+ * if the provided encoding is not supported.
+ */
+ public String toString(String enc) throws UnsupportedEncodingException {
+ return new String(buf, 0, count, enc);
+ }
+
+ /**
+ * Writes {@code count} bytes from the byte array {@code buffer} starting at
+ * offset {@code index} to this stream.
+ * <p>
+ * {@code synchronize} was removed from this method.
+ *
+ * @param buffer
+ * the buffer to be written.
+ * @param offset
+ * the initial position in {@code buffer} to retrieve bytes.
+ * @param len
+ * the number of bytes of {@code buffer} to write.
+ * @throws NullPointerException
+ * if {@code buffer} is {@code null}.
+ * @throws IndexOutOfBoundsException
+ * if {@code offset < 0} or {@code len < 0}, or if
+ * {@code offset + len} is greater than the length of
+ * {@code buffer}.
+ */
+ @Override
+ public void write(byte[] buffer, int offset, int len) {
+ // avoid int overflow
+ if (offset < 0 || offset > buffer.length || len < 0
+ || len > buffer.length - offset) {
+ throw new IndexOutOfBoundsException();
+ }
+ if (len == 0) {
+ return;
+ }
+
+ /* Expand if necessary */
+ expand(len);
+ System.arraycopy(buffer, offset, buf, this.count, len);
+ this.count += len;
+ }
+
+ /**
+ * Writes the specified byte {@code oneByte} to the OutputStream. Only the
+ * low order byte of {@code oneByte} is written.
+ * <p>
+ * {@code synchronize} was removed from this method.
+ *
+ * @param oneByte
+ * the byte to be written.
+ */
+ @Override
+ public void write(int oneByte) {
+ if (count == buf.length) {
+ expand(1);
+ }
+ buf[count++] = (byte) oneByte;
+ }
+
+ /**
+ * Takes the contents of this stream and writes it to the output stream
+ * {@code out}.
+ * <p>
+ * {@code synchronize} was removed from this method.
+ *
+ * @param out
+ * an OutputStream on which to write the contents of this stream.
+ * @throws IOException
+ * if an error occurs while writing to {@code out}.
+ */
+ public void writeTo(OutputStream out) throws IOException {
+ out.write(buf, 0, count);
+ }
+}
Index: org/apache/cassandra/io/util/OutputBuffer.java
===================================================================
--- org/apache/cassandra/io/util/OutputBuffer.java (revision 1134295)
+++ org/apache/cassandra/io/util/OutputBuffer.java (working copy)
@@ -18,14 +18,13 @@
package org.apache.cassandra.io.util;
-import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.IOException;
/**
* Extends ByteArrayOutputStream to minimize copies.
*/
-public final class OutputBuffer extends ByteArrayOutputStream
+public final class OutputBuffer extends FastByteArrayOutputStream
{
public OutputBuffer()
{
Index: org/apache/cassandra/net/IncomingTcpConnection.java
===================================================================
--- org/apache/cassandra/net/IncomingTcpConnection.java (revision 1134295)
+++ org/apache/cassandra/net/IncomingTcpConnection.java (working copy)
@@ -21,18 +21,22 @@
*/
-import java.io.*;
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOError;
+import java.io.IOException;
import java.net.Socket;
-import org.apache.cassandra.gms.Gossiper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.EncryptionOptions;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.security.streaming.SSLIncomingStreamReader;
import org.apache.cassandra.streaming.IncomingStreamReader;
import org.apache.cassandra.streaming.StreamHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class IncomingTcpConnection extends Thread
{
@@ -92,7 +96,7 @@
int size = input.readInt();
byte[] headerBytes = new byte[size];
input.readFully(headerBytes);
- stream(StreamHeader.serializer().deserialize(new DataInputStream(new ByteArrayInputStream(headerBytes)), version), input);
+ stream(StreamHeader.serializer().deserialize(new DataInputStream(new FastByteArrayInputStream(headerBytes)), version), input);
break;
}
else
@@ -111,7 +115,7 @@
else
{
// todo: need to be aware of message version.
- DataInputStream dis = new DataInputStream(new ByteArrayInputStream(contentBytes));
+ DataInputStream dis = new DataInputStream(new FastByteArrayInputStream(contentBytes));
String id = dis.readUTF();
Message message = Message.serializer().deserialize(dis, version);
MessagingService.instance().receive(message, id);
Index: org/apache/cassandra/service/AbstractRowResolver.java
===================================================================
--- org/apache/cassandra/service/AbstractRowResolver.java (revision 1134295)
+++ org/apache/cassandra/service/AbstractRowResolver.java (working copy)
@@ -21,23 +21,22 @@
*/
-import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOError;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentMap;
-import org.apache.commons.lang.ArrayUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.db.Row;
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.commons.lang.ArrayUtils;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public abstract class AbstractRowResolver implements IResponseResolver<Row>
{
@@ -58,7 +57,7 @@
public void preprocess(Message message)
{
byte[] body = message.getMessageBody();
- ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
+ FastByteArrayInputStream bufIn = new FastByteArrayInputStream(body);
try
{
ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn), message.getVersion());
Index: org/apache/cassandra/service/AntiEntropyService.java
===================================================================
--- org/apache/cassandra/service/AntiEntropyService.java (revision 1134295)
+++ org/apache/cassandra/service/AntiEntropyService.java (working copy)
@@ -18,33 +18,43 @@
package org.apache.cassandra.service;
-import java.io.*;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOError;
+import java.io.IOException;
import java.net.InetAddress;
import java.security.MessageDigest;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
-import com.google.common.base.Objects;
-
-import org.apache.cassandra.db.compaction.AbstractCompactedRow;
-import org.apache.cassandra.gms.Gossiper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.compaction.AbstractCompactedRow;
+import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
+import org.apache.cassandra.io.util.FastByteArrayOutputStream;
import org.apache.cassandra.net.CompactEndpointSerializationHelper;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
@@ -53,8 +63,17 @@
import org.apache.cassandra.streaming.StreamIn;
import org.apache.cassandra.streaming.StreamOut;
import org.apache.cassandra.streaming.StreamOutSession;
-import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.ExpiringMap;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.SimpleCondition;
+import org.apache.cassandra.utils.WrappedRunnable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.common.base.Objects;
+
/**
* AntiEntropyService encapsulates "validating" (hashing) individual column families,
* exchanging MerkleTrees with remote nodes via a TreeRequest/Response conversation,
@@ -538,7 +557,7 @@
{
try
{
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
SERIALIZER.serialize(request, dos, version);
return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.TREE_REQUEST, bos.toByteArray(), version);
@@ -580,7 +599,7 @@
{
byte[] bytes = message.getMessageBody();
- DataInputStream buffer = new DataInputStream(new ByteArrayInputStream(bytes));
+ DataInputStream buffer = new DataInputStream(new FastByteArrayInputStream(bytes));
try
{
TreeRequest remotereq = this.deserialize(buffer, message.getVersion());
@@ -610,7 +629,7 @@
{
try
{
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
SERIALIZER.serialize(validator, dos, Gossiper.instance.getVersion(validator.request.endpoint));
return new Message(local,
@@ -647,7 +666,7 @@
public void doVerb(Message message, String id)
{
byte[] bytes = message.getMessageBody();
- DataInputStream buffer = new DataInputStream(new ByteArrayInputStream(bytes));
+ DataInputStream buffer = new DataInputStream(new FastByteArrayInputStream(bytes));
try
{
Index: org/apache/cassandra/service/MigrationManager.java
===================================================================
--- org/apache/cassandra/service/MigrationManager.java (revision 1134295)
+++ org/apache/cassandra/service/MigrationManager.java (working copy)
@@ -18,19 +18,23 @@
package org.apache.cassandra.service;
-import java.io.*;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOError;
+import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.MapMaker;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.ConfigurationException;
@@ -39,14 +43,25 @@
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.marshal.TimeUUIDType;
import org.apache.cassandra.db.migration.Migration;
-import org.apache.cassandra.gms.*;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndpointState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
+import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
+import org.apache.cassandra.io.util.FastByteArrayOutputStream;
import org.apache.cassandra.net.CachingMessageProducer;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessageProducer;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.MapMaker;
+
public class MigrationManager implements IEndpointStateChangeSubscriber
{
private static final Logger logger = LoggerFactory.getLogger(MigrationManager.class);
@@ -224,7 +239,7 @@
// other half of transformation is in DefinitionsUpdateResponseVerbHandler.
private static Message makeMigrationMessage(Collection<IColumn> migrations, int version) throws IOException
{
- ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ FastByteArrayOutputStream bout = new FastByteArrayOutputStream();
DataOutputStream dout = new DataOutputStream(bout);
dout.writeInt(migrations.size());
// riddle me this: how do we know that these binary values (which contained serialized row mutations) are compatible
@@ -248,7 +263,7 @@
public static Collection<Column> makeColumns(Message msg) throws IOException
{
Collection<Column> cols = new ArrayList<Column>();
- DataInputStream in = new DataInputStream(new ByteArrayInputStream(msg.getMessageBody()));
+ DataInputStream in = new DataInputStream(new FastByteArrayInputStream(msg.getMessageBody()));
int count = in.readInt();
for (int i = 0; i < count; i++)
{
Index: org/apache/cassandra/service/StorageProxy.java
===================================================================
--- org/apache/cassandra/service/StorageProxy.java (revision 1134295)
+++ org/apache/cassandra/service/StorageProxy.java (working copy)
@@ -18,47 +18,77 @@
package org.apache.cassandra.service;
-import java.io.*;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
import javax.management.MBeanServer;
import javax.management.ObjectName;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Multimap;
-import org.apache.cassandra.net.CachingMessageProducer;
-import org.apache.cassandra.net.MessageProducer;
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.CounterMutation;
+import org.apache.cassandra.db.IMutation;
+import org.apache.cassandra.db.IndexScanCommand;
+import org.apache.cassandra.db.RangeSliceCommand;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.ReadResponse;
+import org.apache.cassandra.db.ReadVerbHandler;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.Truncation;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.io.util.FastByteArrayOutputStream;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.net.CachingMessageProducer;
import org.apache.cassandra.net.IAsyncCallback;
import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageProducer;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.*;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.IndexClause;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.UnavailableException;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.LatencyTracker;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.WrappedRunnable;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multimap;
+
public class StorageProxy implements StorageProxyMBean
{
private static final Logger logger = LoggerFactory.getLogger(StorageProxy.class);
@@ -314,7 +344,7 @@
{
InetAddress destination = iter.next();
// group all nodes in this DC as forward headers on the primary message
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
// append to older addresses
@@ -334,7 +364,7 @@
private static void addHintHeader(Message message, InetAddress target) throws IOException
{
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
byte[] previousHints = message.getHeader(RowMutation.HINT);
if (previousHints != null)
Index: org/apache/cassandra/streaming/StreamReply.java
===================================================================
--- org/apache/cassandra/streaming/StreamReply.java (revision 1134295)
+++ org/apache/cassandra/streaming/StreamReply.java (working copy)
@@ -21,12 +21,12 @@
*/
-import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.util.FastByteArrayOutputStream;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessageProducer;
import org.apache.cassandra.service.StorageService;
@@ -56,7 +56,7 @@
public Message getMessage(Integer version) throws IOException
{
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream( bos );
serializer.serialize(this, dos, version);
return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.STREAM_REPLY, bos.toByteArray(), version);
Index: org/apache/cassandra/streaming/StreamReplyVerbHandler.java
===================================================================
--- org/apache/cassandra/streaming/StreamReplyVerbHandler.java (revision 1134295)
+++ org/apache/cassandra/streaming/StreamReplyVerbHandler.java (working copy)
@@ -21,17 +21,16 @@
*
*/
-import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOError;
import java.io.IOException;
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
-
public class StreamReplyVerbHandler implements IVerbHandler
{
private static Logger logger = LoggerFactory.getLogger(StreamReplyVerbHandler.class);
@@ -39,7 +38,7 @@
public void doVerb(Message message, String id)
{
byte[] body = message.getMessageBody();
- ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
+ FastByteArrayInputStream bufIn = new FastByteArrayInputStream(body);
try
{
Index: org/apache/cassandra/streaming/StreamRequestMessage.java
===================================================================
--- org/apache/cassandra/streaming/StreamRequestMessage.java (revision 1134295)
+++ org/apache/cassandra/streaming/StreamRequestMessage.java (working copy)
@@ -21,18 +21,21 @@
*/
-import java.io.*;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOError;
+import java.io.IOException;
import java.net.InetAddress;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
-import com.google.common.collect.Iterables;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.util.FastByteArrayOutputStream;
import org.apache.cassandra.net.CompactEndpointSerializationHelper;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessageProducer;
@@ -40,6 +43,8 @@
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
+import com.google.common.collect.Iterables;
+
/**
* This class encapsulates the message that needs to be sent to nodes
* that handoff data. The message contains information about ranges
@@ -96,7 +101,7 @@
public Message getMessage(Integer version)
{
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
try
{
Index: org/apache/cassandra/streaming/StreamRequestVerbHandler.java
===================================================================
--- org/apache/cassandra/streaming/StreamRequestVerbHandler.java (revision 1134295)
+++ org/apache/cassandra/streaming/StreamRequestVerbHandler.java (working copy)
@@ -18,17 +18,16 @@
package org.apache.cassandra.streaming;
- import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
- import java.io.IOError;
- import java.io.IOException;
+import java.io.IOError;
+import java.io.IOException;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
- import org.apache.cassandra.net.IVerbHandler;
- import org.apache.cassandra.net.Message;
-
/**
* This verb handler handles the StreamRequestMessage that is sent by
* the node requesting range transfer.
@@ -43,7 +42,7 @@
logger.debug("Received a StreamRequestMessage from {}", message.getFrom());
byte[] body = message.getMessageBody();
- ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
+ FastByteArrayInputStream bufIn = new FastByteArrayInputStream(body);
try
{
StreamRequestMessage srm = StreamRequestMessage.serializer().deserialize(new DataInputStream(bufIn), message.getVersion());
)
> Re-introduce FastByteArrayInputStream (and Output equivalent)
> -------------------------------------------------------------
>
> Key: CASSANDRA-2820
> URL: https://issues.apache.org/jira/browse/CASSANDRA-2820
> Project: Cassandra
> Issue Type: Improvement
> Components: Core
> Affects Versions: 0.8.0
> Environment: n/a
> Reporter: Paul Loy
> Priority: Minor
> Labels: bytearrayinputstream, bytearrayoutputstream, license, synchronized
>
> In https://issues.apache.org/jira/browse/CASSANDRA-37 FastByteArrayInputStream and FastByteArrayOutputStream were removed due to being code copied from the JDK and then subsequently modified. The JDK license is incompatible with Apache 2 license so the code had to go.
> I have since had a look at the performance of the JDK ByteArrayInputStream and a FastByteArrayInputStream (i.e. one with synchronized methods made un-synchronized) and seen the difference is significant.
> After a warmup-period of >10000 loops I get the following for 10000 loops through a 128000 byte array:
> bais : 3513ms
> fbais: 72ms
> This varies depending on the OS, machine and Java version, but it's always in favour of the FastByteArrayInputStream as you might expect.
> Then, at Jonathan Ellis' suggestion, I tried this using a modified Apache Harmony ByteArrayInputStream - i.e. one whose license is compatible - and the results were the same. A significant boost.
> I will attach a patch with changes for the 0.8.0 tag.
--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira