You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by "Paul Loy (JIRA)" <ji...@apache.org> on 2011/06/24 12:34:47 UTC

[jira] [Updated] (CASSANDRA-2820) Re-introduce FastByteArrayInputStream (and Output equivalent)

     [ https://issues.apache.org/jira/browse/CASSANDRA-2820?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Paul Loy updated CASSANDRA-2820:
--------------------------------

    Comment: was deleted

(was: Index: org/apache/cassandra/db/BinaryVerbHandler.java
===================================================================
--- org/apache/cassandra/db/BinaryVerbHandler.java	(revision 1134295)
+++ org/apache/cassandra/db/BinaryVerbHandler.java	(working copy)
@@ -18,15 +18,14 @@
 
 package org.apache.cassandra.db;
 
-import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class BinaryVerbHandler implements IVerbHandler
 {
@@ -35,7 +34,7 @@
     public void doVerb(Message message, String id)
     { 
         byte[] bytes = message.getMessageBody();
-        ByteArrayInputStream buffer = new ByteArrayInputStream(bytes);
+        FastByteArrayInputStream buffer = new FastByteArrayInputStream(bytes);
 
         try
         {
Index: org/apache/cassandra/db/CounterMutation.java
===================================================================
--- org/apache/cassandra/db/CounterMutation.java	(revision 1134295)
+++ org/apache/cassandra/db/CounterMutation.java	(working copy)
@@ -18,31 +18,26 @@
 
 package org.apache.cassandra.db;
 
-import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.lang.ThreadLocal;
 import java.nio.ByteBuffer;
-import java.net.InetAddress;
 import java.util.Collection;
 import java.util.HashSet;
-import java.util.List;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Random;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.db.filter.QueryPath;
-import org.apache.cassandra.db.marshal.AbstractCommutativeType;
-import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.util.FastByteArrayOutputStream;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.thrift.ConsistencyLevel;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class CounterMutation implements IMutation
 {
@@ -219,7 +214,7 @@
 
     public Message makeMutationMessage(int version) throws IOException
     {
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
         serializer().serialize(this, dos, version);
         return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.COUNTER_MUTATION, bos.toByteArray(), version);
Index: org/apache/cassandra/db/CounterMutationVerbHandler.java
===================================================================
--- org/apache/cassandra/db/CounterMutationVerbHandler.java	(revision 1134295)
+++ org/apache/cassandra/db/CounterMutationVerbHandler.java	(working copy)
@@ -18,22 +18,20 @@
 
 package org.apache.cassandra.db;
 
-import java.io.*;
+import java.io.DataInputStream;
+import java.io.IOException;
 import java.util.concurrent.TimeoutException;
 
-import java.net.InetAddress;
-import java.nio.ByteBuffer;
-import java.util.Collections;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.net.*;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageProxy;
-import org.apache.cassandra.thrift.ConsistencyLevel;
 import org.apache.cassandra.thrift.UnavailableException;
+import org.apache.cassandra.utils.FBUtilities;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class CounterMutationVerbHandler implements IVerbHandler
 {
@@ -42,7 +40,7 @@
     public void doVerb(Message message, String id)
     {
         byte[] bytes = message.getMessageBody();
-        ByteArrayInputStream buffer = new ByteArrayInputStream(bytes);
+        FastByteArrayInputStream buffer = new FastByteArrayInputStream(bytes);
 
         try
         {
Index: org/apache/cassandra/db/IndexScanCommand.java
===================================================================
--- org/apache/cassandra/db/IndexScanCommand.java	(revision 1134295)
+++ org/apache/cassandra/db/IndexScanCommand.java	(working copy)
@@ -19,21 +19,26 @@
 
 package org.apache.cassandra.db;
 
-import java.io.*;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOError;
+import java.io.IOException;
 import java.util.Arrays;
 
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.io.ICompactSerializer2;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessageProducer;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.IndexClause;
 import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.TBinaryProtocol;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TSerializer;
-import org.apache.cassandra.thrift.TBinaryProtocol;
 
 public class IndexScanCommand implements MessageProducer
 {
@@ -75,7 +80,7 @@
     public static IndexScanCommand read(Message message) throws IOException
     {
         byte[] bytes = message.getMessageBody();
-        ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+        FastByteArrayInputStream bis = new FastByteArrayInputStream(bytes);
         return serializer.deserialize(new DataInputStream(bis));
     }
 
Index: org/apache/cassandra/db/RangeSliceCommand.java
===================================================================
--- org/apache/cassandra/db/RangeSliceCommand.java	(revision 1134295)
+++ org/apache/cassandra/db/RangeSliceCommand.java	(working copy)
@@ -36,7 +36,6 @@
 
 package org.apache.cassandra.db;
 
-import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -46,17 +45,18 @@
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessageProducer;
 import org.apache.cassandra.service.IReadCommand;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.ColumnParent;
 import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.TBinaryProtocol;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TSerializer;
-import org.apache.cassandra.thrift.TBinaryProtocol;
 
 public class RangeSliceCommand implements MessageProducer, IReadCommand
 {
@@ -112,7 +112,7 @@
     public static RangeSliceCommand read(Message message) throws IOException
     {
         byte[] bytes = message.getMessageBody();
-        ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+        FastByteArrayInputStream bis = new FastByteArrayInputStream(bytes);
         return serializer.deserialize(new DataInputStream(bis), message.getVersion());
     }
 
Index: org/apache/cassandra/db/RangeSliceReply.java
===================================================================
--- org/apache/cassandra/db/RangeSliceReply.java	(revision 1134295)
+++ org/apache/cassandra/db/RangeSliceReply.java	(working copy)
@@ -18,18 +18,17 @@
 
 package org.apache.cassandra.db;
 
-import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.commons.lang.StringUtils;
-
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.commons.lang.StringUtils;
 
 public class RangeSliceReply
 {
@@ -62,7 +61,7 @@
 
     public static RangeSliceReply read(byte[] body, int version) throws IOException
     {
-        ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
+        FastByteArrayInputStream bufIn = new FastByteArrayInputStream(body);
         DataInputStream dis = new DataInputStream(bufIn);
         int rowCount = dis.readInt();
         List<Row> rows = new ArrayList<Row>(rowCount);
Index: org/apache/cassandra/db/ReadCommand.java
===================================================================
--- org/apache/cassandra/db/ReadCommand.java	(revision 1134295)
+++ org/apache/cassandra/db/ReadCommand.java	(working copy)
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.db;
 
-import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -29,6 +28,7 @@
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.util.FastByteArrayOutputStream;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessageProducer;
 import org.apache.cassandra.service.IReadCommand;
@@ -50,7 +50,7 @@
 
     public Message getMessage(Integer version) throws IOException
     {
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
         ReadCommand.serializer().serialize(this, dos, version);
         return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.READ, bos.toByteArray(), version);
Index: org/apache/cassandra/db/ReadRepairVerbHandler.java
===================================================================
--- org/apache/cassandra/db/ReadRepairVerbHandler.java	(revision 1134295)
+++ org/apache/cassandra/db/ReadRepairVerbHandler.java	(working copy)
@@ -18,11 +18,11 @@
 
 package org.apache.cassandra.db;
 
-import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOError;
 import java.io.IOException;
 
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 
@@ -31,7 +31,7 @@
     public void doVerb(Message message, String id)
     {          
         byte[] body = message.getMessageBody();
-        ByteArrayInputStream buffer = new ByteArrayInputStream(body);
+        FastByteArrayInputStream buffer = new FastByteArrayInputStream(body);
         
         try
         {
Index: org/apache/cassandra/db/ReadVerbHandler.java
===================================================================
--- org/apache/cassandra/db/ReadVerbHandler.java	(revision 1134295)
+++ org/apache/cassandra/db/ReadVerbHandler.java	(working copy)
@@ -18,26 +18,25 @@
 
 package org.apache.cassandra.db;
 
-import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ReadVerbHandler implements IVerbHandler
 {
     protected static class ReadContext
     {
-        protected ByteArrayInputStream bufIn_;
+        protected FastByteArrayInputStream bufIn_;
         protected DataOutputBuffer bufOut_ = new DataOutputBuffer();
     }
 
@@ -55,7 +54,7 @@
             readCtx = new ReadContext();
             tls_.set(readCtx);
         }
-        readCtx.bufIn_ = new ByteArrayInputStream(body);
+        readCtx.bufIn_ = new FastByteArrayInputStream(body);
 
         try
         {
Index: org/apache/cassandra/db/RowMutation.java
===================================================================
--- org/apache/cassandra/db/RowMutation.java	(revision 1134295)
+++ org/apache/cassandra/db/RowMutation.java	(working copy)
@@ -18,27 +18,32 @@
 
 package org.apache.cassandra.db;
 
-import java.io.*;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutionException;
 
-import org.apache.cassandra.net.MessageProducer;
-import org.apache.commons.lang.StringUtils;
-
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
+import org.apache.cassandra.io.util.FastByteArrayOutputStream;
 import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageProducer;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.ColumnOrSuperColumn;
 import org.apache.cassandra.thrift.Deletion;
-import org.apache.cassandra.thrift.Mutation;
-import org.apache.cassandra.thrift.SlicePredicate;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.commons.lang.StringUtils;
 
 public class RowMutation implements IMutation, MessageProducer
 {
@@ -239,7 +244,7 @@
         byte[] preserializedBuffer = preserializedBuffers.get(version);
         if (preserializedBuffer == null)
         {
-            ByteArrayOutputStream bout = new ByteArrayOutputStream();
+            FastByteArrayOutputStream bout = new FastByteArrayOutputStream();
             DataOutputStream dout = new DataOutputStream(bout);
             RowMutation.serializer().serialize(this, dout, version);
             dout.close();
@@ -321,7 +326,7 @@
 
     static RowMutation fromBytes(byte[] raw, int version) throws IOException
     {
-        RowMutation rm = serializer_.deserialize(new DataInputStream(new ByteArrayInputStream(raw)), version);
+        RowMutation rm = serializer_.deserialize(new DataInputStream(new FastByteArrayInputStream(raw)), version);
         rm.preserializedBuffers.put(version, raw);
         return rm;
     }
Index: org/apache/cassandra/db/RowMutationVerbHandler.java
===================================================================
--- org/apache/cassandra/db/RowMutationVerbHandler.java	(revision 1134295)
+++ org/apache/cassandra/db/RowMutationVerbHandler.java	(working copy)
@@ -18,21 +18,20 @@
 
 package org.apache.cassandra.db;
 
-import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public class RowMutationVerbHandler implements IVerbHandler
@@ -52,7 +51,7 @@
             if (hintedBytes != null)
             {
                 assert hintedBytes.length > 0;
-                DataInputStream dis = new DataInputStream(new ByteArrayInputStream(hintedBytes));
+                DataInputStream dis = new DataInputStream(new FastByteArrayInputStream(hintedBytes));
                 while (dis.available() > 0)
                 {
                     ByteBuffer addressBytes = ByteBufferUtil.readWithShortLength(dis);
Index: org/apache/cassandra/db/TruncateResponse.java
===================================================================
--- org/apache/cassandra/db/TruncateResponse.java	(revision 1134295)
+++ org/apache/cassandra/db/TruncateResponse.java	(working copy)
@@ -18,12 +18,12 @@
 
 package org.apache.cassandra.db;
 
-import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 
 import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.util.FastByteArrayOutputStream;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -51,7 +51,7 @@
     public static Message makeTruncateResponseMessage(Message original, TruncateResponse truncateResponseMessage)
             throws IOException
     {
-    	ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    	FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
         TruncateResponse.serializer().serialize(truncateResponseMessage, dos, original.getVersion());
         return original.getReply(FBUtilities.getLocalAddress(), bos.toByteArray(), original.getVersion());
Index: org/apache/cassandra/db/TruncateVerbHandler.java
===================================================================
--- org/apache/cassandra/db/TruncateVerbHandler.java	(revision 1134295)
+++ org/apache/cassandra/db/TruncateVerbHandler.java	(working copy)
@@ -18,17 +18,16 @@
 
 package org.apache.cassandra.db;
 
-import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOError;
 import java.io.IOException;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class TruncateVerbHandler implements IVerbHandler
 {
@@ -37,7 +36,7 @@
     public void doVerb(Message message, String id)
     {
         byte[] bytes = message.getMessageBody();
-        ByteArrayInputStream buffer = new ByteArrayInputStream(bytes);
+        FastByteArrayInputStream buffer = new FastByteArrayInputStream(bytes);
 
         try
         {
Index: org/apache/cassandra/db/Truncation.java
===================================================================
--- org/apache/cassandra/db/Truncation.java	(revision 1134295)
+++ org/apache/cassandra/db/Truncation.java	(working copy)
@@ -18,12 +18,12 @@
 
 package org.apache.cassandra.db;
 
-import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 
 import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.util.FastByteArrayOutputStream;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessageProducer;
 import org.apache.cassandra.service.StorageService;
@@ -69,7 +69,7 @@
 
     public Message getMessage(Integer version) throws IOException
     {
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
         serializer().serialize(this, dos, version);
         return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.TRUNCATE, bos.toByteArray(), version);
Index: org/apache/cassandra/db/WriteResponse.java
===================================================================
--- org/apache/cassandra/db/WriteResponse.java	(revision 1134295)
+++ org/apache/cassandra/db/WriteResponse.java	(working copy)
@@ -18,13 +18,13 @@
 
 package org.apache.cassandra.db;
 
-import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.util.FastByteArrayOutputStream;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
@@ -46,7 +46,7 @@
 
     public static Message makeWriteResponseMessage(Message original, WriteResponse writeResponseMessage) throws IOException
     {
-    	ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    	FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream( bos );
         WriteResponse.serializer().serialize(writeResponseMessage, dos, original.getVersion());
         return original.getReply(FBUtilities.getLocalAddress(), bos.toByteArray(), original.getVersion());
Index: org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
===================================================================
--- org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java	(revision 1134295)
+++ org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java	(working copy)
@@ -21,18 +21,17 @@
  */
 
 
-import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.util.Map;
 
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
-
 public class GossipDigestAck2VerbHandler implements IVerbHandler
 {
     private static Logger logger_ = LoggerFactory.getLogger(GossipDigestAck2VerbHandler.class);
@@ -44,7 +43,7 @@
             logger_.trace("Received a GossipDigestAck2Message from {}", from);
 
         byte[] bytes = message.getMessageBody();
-        DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
+        DataInputStream dis = new DataInputStream( new FastByteArrayInputStream(bytes) );
         GossipDigestAck2Message gDigestAck2Message;
         try
         {
Index: org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
===================================================================
--- org/apache/cassandra/gms/GossipDigestAckVerbHandler.java	(revision 1134295)
+++ org/apache/cassandra/gms/GossipDigestAckVerbHandler.java	(working copy)
@@ -21,7 +21,6 @@
  */
 
 
-import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.net.InetAddress;
@@ -29,12 +28,12 @@
 import java.util.List;
 import java.util.Map;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class GossipDigestAckVerbHandler implements IVerbHandler
 {
@@ -53,7 +52,7 @@
         }
 
         byte[] bytes = message.getMessageBody();
-        DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
+        DataInputStream dis = new DataInputStream( new FastByteArrayInputStream(bytes) );
 
         try
         {
Index: org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
===================================================================
--- org/apache/cassandra/gms/GossipDigestSynVerbHandler.java	(revision 1134295)
+++ org/apache/cassandra/gms/GossipDigestSynVerbHandler.java	(working copy)
@@ -21,19 +21,22 @@
  */
 
 
-import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.net.InetAddress;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class GossipDigestSynVerbHandler implements IVerbHandler
 {
@@ -52,7 +55,7 @@
         }
 
         byte[] bytes = message.getMessageBody();
-        DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
+        DataInputStream dis = new DataInputStream( new FastByteArrayInputStream(bytes) );
 
         try
         {
Index: org/apache/cassandra/gms/Gossiper.java
===================================================================
--- org/apache/cassandra/gms/Gossiper.java	(revision 1134295)
+++ org/apache/cassandra/gms/Gossiper.java	(working copy)
@@ -23,24 +23,34 @@
 import java.io.IOError;
 import java.io.IOException;
 import java.net.InetAddress;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
-import java.util.concurrent.*;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
-import org.apache.cassandra.db.SystemTable;
+import org.apache.cassandra.concurrent.RetryingScheduledThreadPoolExecutor;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.util.FastByteArrayOutputStream;
+import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessageProducer;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.concurrent.RetryingScheduledThreadPoolExecutor;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.StorageService;
-
 /**
  * This module is responsible for Gossiping information for the local endpoint. This abstraction
  * maintains the list of live and dead endpoints. Periodically i.e. every 1 second this module
@@ -335,7 +345,7 @@
     Message makeGossipDigestSynMessage(List<GossipDigest> gDigests, int version) throws IOException
     {
         GossipDigestSynMessage gDigestMessage = new GossipDigestSynMessage(DatabaseDescriptor.getClusterName(), gDigests);
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream( bos );
         GossipDigestSynMessage.serializer().serialize(gDigestMessage, dos, version);
         return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.GOSSIP_DIGEST_SYN, bos.toByteArray(), version);
@@ -343,7 +353,7 @@
 
     Message makeGossipDigestAckMessage(GossipDigestAckMessage gDigestAckMessage, int version) throws IOException
     {
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
         GossipDigestAckMessage.serializer().serialize(gDigestAckMessage, dos, version);
         return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.GOSSIP_DIGEST_ACK, bos.toByteArray(), version);
@@ -351,7 +361,7 @@
 
     Message makeGossipDigestAck2Message(GossipDigestAck2Message gDigestAck2Message, int version) throws IOException
     {
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
         GossipDigestAck2Message.serializer().serialize(gDigestAck2Message, dos, version);
         return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.GOSSIP_DIGEST_ACK2, bos.toByteArray(), version);
Index: org/apache/cassandra/io/util/FastByteArrayInputStream.java
===================================================================
--- org/apache/cassandra/io/util/FastByteArrayInputStream.java	(revision 0)
+++ org/apache/cassandra/io/util/FastByteArrayInputStream.java	(revision 0)
@@ -0,0 +1,243 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *  
+ */
+
+package org.apache.cassandra.io.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * A {@link FastByteArrayInputStream} is a carbon copy of the Apache Harmony
+ * implementation of {@link ByteArrayInputStream} with the synchronized methods
+ * unsynchronized. This makes the {@link FastByteArrayInputStream}
+ * <b>non-thread-safe</b>. 
+ * <p />
+ * By not synchronizing, locks do not need to be checked, making each {@link #read()}
+ * operation around 2x faster.
+ * 
+ * @author Paul_Loy
+ * @see ByteArrayInputStream
+ */
+public class FastByteArrayInputStream extends InputStream {
+
+	/**
+     * The {@code byte} array containing the bytes to stream over.
+     */
+	protected byte buf[];
+
+	/**
+     * The current position within the byte array.
+     */
+	protected int pos;
+
+	/**
+     * The current mark position. Initially set to 0 or the <code>offset</code>
+     * parameter within the constructor.
+     */
+	protected int mark;
+
+	/**
+     * The total number of bytes initially available in the byte array
+     * {@code buf}.
+     */
+	protected int count;
+
+	/**
+     * Constructs a new {@code FastByteArrayInputStream} on the byte array
+     * {@code buf}.
+     *
+     * @param buf
+     *            the byte array to stream over.
+     */
+	public FastByteArrayInputStream(byte buf[]) {
+		this.buf = buf;
+		this.pos = 0;
+		this.count = buf.length;
+	}
+
+	/**
+     * Constructs a new {@code FastByteArrayInputStream} on the byte array
+     * {@code buf} with the initial position set to {@code offset} and the
+     * number of bytes available set to {@code offset} + {@code length}.
+     *
+     * @param buf
+     *            the byte array to stream over.
+     * @param offset
+     *            the initial position in {@code buf} to start streaming from.
+     * @param length
+     *            the number of bytes available for streaming.
+     */
+	public FastByteArrayInputStream(byte buf[], int offset, int length) {
+		this.buf = buf;
+		this.pos = offset;
+		this.count = Math.min(offset + length, buf.length);
+		this.mark = offset;
+	}
+
+	/**
+     * Returns the number of bytes that are available before this stream will
+     * block. This method returns the number of bytes yet to be read from the
+     * source byte array.
+     * <p>
+     * {@code synchronize} was removed from this method. 
+     *
+     * @return the number of bytes available before blocking.
+     */
+	public int available() {
+		return count - pos;
+	}
+	
+	/**
+     * Closes this stream and frees resources associated with this stream.
+    *
+    * @throws IOException
+    *             if an I/O error occurs while closing this stream.
+    */
+   @Override
+   public void close() throws IOException {
+       // Do nothing on close, this matches JDK behaviour.
+   }
+
+   /**
+    * Sets a mark position in this ByteArrayInputStream. The parameter
+    * {@code readlimit} is ignored. Sending {@code reset()} will reposition the
+    * stream back to the marked position.
+    *
+    * @param readlimit
+    *            ignored.
+    * @see #markSupported()
+    * @see #reset()
+    */
+   @Override
+   public void mark(int readlimit) {
+       mark = pos;
+   }
+
+   /**
+    * Indicates whether this stream supports the {@code mark()} and
+    * {@code reset()} methods. Returns {@code true} since this class supports
+    * these methods.
+    *
+    * @return always {@code true}.
+    * @see #mark(int)
+    * @see #reset()
+    */
+   @Override
+   public boolean markSupported() {
+       return true;
+   }
+
+   /**
+    * Reads a single byte from the source byte array and returns it as an
+    * integer in the range from 0 to 255. Returns -1 if the end of the source
+    * array has been reached.
+     * <p>
+     * {@code synchronize} was removed from this method. 
+    *
+    * @return the byte read or -1 if the end of this stream has been reached.
+    */
+   @Override
+   public int read() {
+       return pos < count ? buf[pos++] & 0xFF : -1;
+   }
+
+   /**
+    * Reads at most {@code len} bytes from this stream and stores
+    * them in byte array {@code b} starting at {@code offset}. This
+    * implementation reads bytes from the source byte array.
+     * <p>
+     * {@code synchronize} was removed from this method. 
+    *
+    * @param b
+    *            the byte array in which to store the bytes read.
+    * @param offset
+    *            the initial position in {@code b} to store the bytes read from
+    *            this stream.
+    * @param length
+    *            the maximum number of bytes to store in {@code b}.
+    * @return the number of bytes actually read or -1 if no bytes were read and
+    *         the end of the stream was encountered.
+    * @throws IndexOutOfBoundsException
+    *             if {@code offset < 0} or {@code length < 0}, or if
+    *             {@code offset + length} is greater than the size of
+    *             {@code b}.
+    * @throws NullPointerException
+    *             if {@code b} is {@code null}.
+    */
+   @Override
+   public int read(byte b[], int offset, int length) {
+       if (b == null) {
+           throw new NullPointerException();
+       }
+       // avoid int overflow
+       if (offset < 0 || offset > b.length || length < 0
+               || length > b.length - offset) {
+           throw new IndexOutOfBoundsException();
+       }
+       // Are there any bytes available?
+       if (this.pos >= this.count) {
+           return -1;
+       }
+       if (length == 0) {
+           return 0;
+       }
+
+       int copylen = this.count - pos < length ? this.count - pos : length;
+       System.arraycopy(buf, pos, b, offset, copylen);
+       pos += copylen;
+       return copylen;
+   }
+
+   /**
+    * Resets this stream to the last marked location. This implementation
+    * resets the position to either the marked position, the start position
+    * supplied in the constructor or 0 if neither has been provided.
+     * <p>
+     * {@code synchronize} was removed from this method. 
+    *
+    * @see #mark(int)
+    */
+   @Override
+   public void reset() {
+       pos = mark;
+   }
+
+   /**
+    * Skips {@code count} number of bytes in this InputStream. Subsequent
+    * {@code read()}s will not return these bytes unless {@code reset()} is
+    * used. This implementation skips {@code count} number of bytes in the
+    * target stream. It does nothing and returns 0 if {@code n} is negative.
+     * <p>
+     * {@code synchronize} was removed from this method. 
+    *
+    * @param n
+    *            the number of bytes to skip.
+    * @return the number of bytes actually skipped.
+    */
+   @Override
+   public long skip(long n) {
+       if (n <= 0) {
+           return 0;
+       }
+       int temp = pos;
+       pos = this.count - pos < n ? this.count : (int) (pos + n);
+       return pos - temp;
+   }
+
+}
Index: org/apache/cassandra/io/util/FastByteArrayOutputStream.java
===================================================================
--- org/apache/cassandra/io/util/FastByteArrayOutputStream.java	(revision 0)
+++ org/apache/cassandra/io/util/FastByteArrayOutputStream.java	(revision 0)
@@ -0,0 +1,260 @@
+package org.apache.cassandra.io.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
+
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+
+/**
+ * A specialized {@link OutputStream} for class for writing content to an
+ * (internal) byte array. As bytes are written to this stream, the byte array
+ * may be expanded to hold more bytes. When the writing is considered to be
+ * finished, a copy of the byte array can be requested from the class.
+ * <p>
+ * This version has been adapted from the Harmony {@link ByteArrayOutputStream}
+ * by removing synchronized from all methods. When you're accessing this stream
+ * in a single thread, this can have a significant performance boost.
+ * 
+ * @see ByteArrayInputStream
+ */
+public class FastByteArrayOutputStream extends OutputStream {
+    /**
+     * The byte array containing the bytes written.
+     */
+    protected byte[] buf;
+
+    /**
+     * The number of bytes written.
+     */
+    protected int count;
+
+    /**
+     * Constructs a new ByteArrayOutputStream with a default size of 32 bytes.
+     * If more than 32 bytes are written to this instance, the underlying byte
+     * array will expand.
+     */
+    public FastByteArrayOutputStream() {
+        super();
+        buf = new byte[32];
+    }
+
+    /**
+     * Constructs a new {@code ByteArrayOutputStream} with a default size of
+     * {@code size} bytes. If more than {@code size} bytes are written to this
+     * instance, the underlying byte array will expand.
+     * 
+     * @param size
+     *            initial size for the underlying byte array, must be
+     *            non-negative.
+     * @throws IllegalArgumentException
+     *             if {@code size} < 0.
+     */
+    public FastByteArrayOutputStream(int size) {
+        super();
+        if (size >= 0) {
+            buf = new byte[size];
+        } else {
+            throw new IllegalArgumentException();
+        }
+    }
+
+    /**
+     * Closes this stream. This releases system resources used for this stream.
+     * 
+     * @throws IOException
+     *             if an error occurs while attempting to close this stream.
+     */
+    @Override
+    public void close() throws IOException {
+        /**
+         * Although the spec claims "A closed stream cannot perform output
+         * operations and cannot be reopened.", this implementation must do
+         * nothing.
+         */
+        super.close();
+    }
+
+    private void expand(int i) {
+        /* Can the buffer handle @i more bytes, if not expand it */
+        if (count + i <= buf.length) {
+            return;
+        }
+
+        byte[] newbuf = new byte[(count + i) * 2];
+        System.arraycopy(buf, 0, newbuf, 0, count);
+        buf = newbuf;
+    }
+
+    /**
+     * Resets this stream to the beginning of the underlying byte array. All
+     * subsequent writes will overwrite any bytes previously stored in this
+     * stream.
+     * <p>
+     * {@code synchronize} was removed from this method. 
+     */
+    public void reset() {
+        count = 0;
+    }
+
+    /**
+     * Returns the total number of bytes written to this stream so far.
+     * 
+     * @return the number of bytes written to this stream.
+     */
+    public int size() {
+        return count;
+    }
+
+    /**
+     * Returns the contents of this ByteArrayOutputStream as a byte array. Any
+     * changes made to the receiver after returning will not be reflected in the
+     * byte array returned to the caller.
+     * <p>
+     * {@code synchronize} was removed from this method. 
+     * 
+     * @return this stream's current contents as a byte array.
+     */
+    public byte[] toByteArray() {
+        byte[] newArray = new byte[count];
+        System.arraycopy(buf, 0, newArray, 0, count);
+        return newArray;
+    }
+
+    /**
+     * Returns the contents of this ByteArrayOutputStream as a string. Any
+     * changes made to the receiver after returning will not be reflected in the
+     * string returned to the caller.
+     * 
+     * @return this stream's current contents as a string.
+     */
+
+    @Override
+    public String toString() {
+        return new String(buf, 0, count);
+    }
+
+    /**
+     * Returns the contents of this ByteArrayOutputStream as a string. Each byte
+     * {@code b} in this stream is converted to a character {@code c} using the
+     * following function:
+     * {@code c == (char)(((hibyte & 0xff) << 8) | (b & 0xff))}. This method is
+     * deprecated and either {@link #toString()} or {@link #toString(String)}
+     * should be used.
+     * 
+     * @param hibyte
+     *            the high byte of each resulting Unicode character.
+     * @return this stream's current contents as a string with the high byte set
+     *         to {@code hibyte}.
+     * @deprecated Use {@link #toString()}.
+     */
+    @Deprecated
+    public String toString(int hibyte) {
+        char[] newBuf = new char[size()];
+        for (int i = 0; i < newBuf.length; i++) {
+            newBuf[i] = (char) (((hibyte & 0xff) << 8) | (buf[i] & 0xff));
+        }
+        return new String(newBuf);
+    }
+
+    /**
+     * Returns the contents of this ByteArrayOutputStream as a string converted
+     * according to the encoding declared in {@code enc}.
+     * 
+     * @param enc
+     *            a string representing the encoding to use when translating
+     *            this stream to a string.
+     * @return this stream's current contents as an encoded string.
+     * @throws UnsupportedEncodingException
+     *             if the provided encoding is not supported.
+     */
+    public String toString(String enc) throws UnsupportedEncodingException {
+        return new String(buf, 0, count, enc);
+    }
+
+    /**
+     * Writes {@code count} bytes from the byte array {@code buffer} starting at
+     * offset {@code index} to this stream.
+     * <p>
+     * {@code synchronize} was removed from this method. 
+     * 
+     * @param buffer
+     *            the buffer to be written.
+     * @param offset
+     *            the initial position in {@code buffer} to retrieve bytes.
+     * @param len
+     *            the number of bytes of {@code buffer} to write.
+     * @throws NullPointerException
+     *             if {@code buffer} is {@code null}.
+     * @throws IndexOutOfBoundsException
+     *             if {@code offset < 0} or {@code len < 0}, or if
+     *             {@code offset + len} is greater than the length of
+     *             {@code buffer}.
+     */
+    @Override
+    public void write(byte[] buffer, int offset, int len) {
+        // avoid int overflow
+        if (offset < 0 || offset > buffer.length || len < 0
+                || len > buffer.length - offset) {
+            throw new IndexOutOfBoundsException();
+        }
+        if (len == 0) {
+            return;
+        }
+
+        /* Expand if necessary */
+        expand(len);
+        System.arraycopy(buffer, offset, buf, this.count, len);
+        this.count += len;
+    }
+
+    /**
+     * Writes the specified byte {@code oneByte} to the OutputStream. Only the
+     * low order byte of {@code oneByte} is written.
+     * <p>
+     * {@code synchronize} was removed from this method. 
+     * 
+     * @param oneByte
+     *            the byte to be written.
+     */
+    @Override
+    public void write(int oneByte) {
+        if (count == buf.length) {
+            expand(1);
+        }
+        buf[count++] = (byte) oneByte;
+    }
+
+    /**
+     * Takes the contents of this stream and writes it to the output stream
+     * {@code out}.
+     * <p>
+     * {@code synchronize} was removed from this method. 
+     * 
+     * @param out
+     *            an OutputStream on which to write the contents of this stream.
+     * @throws IOException
+     *             if an error occurs while writing to {@code out}.
+     */
+    public void writeTo(OutputStream out) throws IOException {
+        out.write(buf, 0, count);
+    }
+}
Index: org/apache/cassandra/io/util/OutputBuffer.java
===================================================================
--- org/apache/cassandra/io/util/OutputBuffer.java	(revision 1134295)
+++ org/apache/cassandra/io/util/OutputBuffer.java	(working copy)
@@ -18,14 +18,13 @@
 
 package org.apache.cassandra.io.util;
 
-import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
 import java.io.IOException;
 
 /**
  * Extends ByteArrayOutputStream to minimize copies.
  */
-public final class OutputBuffer extends ByteArrayOutputStream
+public final class OutputBuffer extends FastByteArrayOutputStream
 {
     public OutputBuffer()
     {
Index: org/apache/cassandra/net/IncomingTcpConnection.java
===================================================================
--- org/apache/cassandra/net/IncomingTcpConnection.java	(revision 1134295)
+++ org/apache/cassandra/net/IncomingTcpConnection.java	(working copy)
@@ -21,18 +21,22 @@
  */
 
 
-import java.io.*;
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOError;
+import java.io.IOException;
 import java.net.Socket;
 
-import org.apache.cassandra.gms.Gossiper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.EncryptionOptions;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
 import org.apache.cassandra.security.streaming.SSLIncomingStreamReader;
 import org.apache.cassandra.streaming.IncomingStreamReader;
 import org.apache.cassandra.streaming.StreamHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class IncomingTcpConnection extends Thread
 {
@@ -92,7 +96,7 @@
                     int size = input.readInt();
                     byte[] headerBytes = new byte[size];
                     input.readFully(headerBytes);
-                    stream(StreamHeader.serializer().deserialize(new DataInputStream(new ByteArrayInputStream(headerBytes)), version), input);
+                    stream(StreamHeader.serializer().deserialize(new DataInputStream(new FastByteArrayInputStream(headerBytes)), version), input);
                     break;
                 }
                 else
@@ -111,7 +115,7 @@
                     else
                     {
                         // todo: need to be aware of message version.
-                        DataInputStream dis = new DataInputStream(new ByteArrayInputStream(contentBytes));
+                        DataInputStream dis = new DataInputStream(new FastByteArrayInputStream(contentBytes));
                         String id = dis.readUTF();
                         Message message = Message.serializer().deserialize(dis, version);
                         MessagingService.instance().receive(message, id);
Index: org/apache/cassandra/service/AbstractRowResolver.java
===================================================================
--- org/apache/cassandra/service/AbstractRowResolver.java	(revision 1134295)
+++ org/apache/cassandra/service/AbstractRowResolver.java	(working copy)
@@ -21,23 +21,22 @@
  */
 
 
-import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOError;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.ConcurrentMap;
 
-import org.apache.commons.lang.ArrayUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.ReadResponse;
 import org.apache.cassandra.db.Row;
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.commons.lang.ArrayUtils;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public abstract class AbstractRowResolver implements IResponseResolver<Row>
 {
@@ -58,7 +57,7 @@
     public void preprocess(Message message)
     {
         byte[] body = message.getMessageBody();
-        ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
+        FastByteArrayInputStream bufIn = new FastByteArrayInputStream(body);
         try
         {
             ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn), message.getVersion());
Index: org/apache/cassandra/service/AntiEntropyService.java
===================================================================
--- org/apache/cassandra/service/AntiEntropyService.java	(revision 1134295)
+++ org/apache/cassandra/service/AntiEntropyService.java	(working copy)
@@ -18,33 +18,43 @@
 
 package org.apache.cassandra.service;
 
-import java.io.*;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOError;
+import java.io.IOException;
 import java.net.InetAddress;
 import java.security.MessageDigest;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import com.google.common.base.Objects;
-
-import org.apache.cassandra.db.compaction.AbstractCompactedRow;
-import org.apache.cassandra.gms.Gossiper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.compaction.AbstractCompactedRow;
+import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
+import org.apache.cassandra.io.util.FastByteArrayOutputStream;
 import org.apache.cassandra.net.CompactEndpointSerializationHelper;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
@@ -53,8 +63,17 @@
 import org.apache.cassandra.streaming.StreamIn;
 import org.apache.cassandra.streaming.StreamOut;
 import org.apache.cassandra.streaming.StreamOutSession;
-import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.ExpiringMap;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.SimpleCondition;
+import org.apache.cassandra.utils.WrappedRunnable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Objects;
+
 /**
  * AntiEntropyService encapsulates "validating" (hashing) individual column families,
  * exchanging MerkleTrees with remote nodes via a TreeRequest/Response conversation,
@@ -538,7 +557,7 @@
         {
             try
             {
-                ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            	FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
                 DataOutputStream dos = new DataOutputStream(bos);
                 SERIALIZER.serialize(request, dos, version);
                 return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.TREE_REQUEST, bos.toByteArray(), version);
@@ -580,7 +599,7 @@
         { 
             byte[] bytes = message.getMessageBody();
             
-            DataInputStream buffer = new DataInputStream(new ByteArrayInputStream(bytes));
+            DataInputStream buffer = new DataInputStream(new FastByteArrayInputStream(bytes));
             try
             {
                 TreeRequest remotereq = this.deserialize(buffer, message.getVersion());
@@ -610,7 +629,7 @@
         {
             try
             {
-                ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            	FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
                 DataOutputStream dos = new DataOutputStream(bos);
                 SERIALIZER.serialize(validator, dos, Gossiper.instance.getVersion(validator.request.endpoint));
                 return new Message(local, 
@@ -647,7 +666,7 @@
         public void doVerb(Message message, String id)
         { 
             byte[] bytes = message.getMessageBody();
-            DataInputStream buffer = new DataInputStream(new ByteArrayInputStream(bytes));
+            DataInputStream buffer = new DataInputStream(new FastByteArrayInputStream(bytes));
 
             try
             {
Index: org/apache/cassandra/service/MigrationManager.java
===================================================================
--- org/apache/cassandra/service/MigrationManager.java	(revision 1134295)
+++ org/apache/cassandra/service/MigrationManager.java	(working copy)
@@ -18,19 +18,23 @@
 
 package org.apache.cassandra.service;
 
-import java.io.*;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOError;
+import java.io.IOException;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
-import com.google.common.collect.Iterables;
-import com.google.common.collect.MapMaker;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.ConfigurationException;
@@ -39,14 +43,25 @@
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.marshal.TimeUUIDType;
 import org.apache.cassandra.db.migration.Migration;
-import org.apache.cassandra.gms.*;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndpointState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
+import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
+import org.apache.cassandra.io.util.FastByteArrayOutputStream;
 import org.apache.cassandra.net.CachingMessageProducer;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessageProducer;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Iterables;
+import com.google.common.collect.MapMaker;
+
 public class MigrationManager implements IEndpointStateChangeSubscriber
 {
     private static final Logger logger = LoggerFactory.getLogger(MigrationManager.class);
@@ -224,7 +239,7 @@
     // other half of transformation is in DefinitionsUpdateResponseVerbHandler.
     private static Message makeMigrationMessage(Collection<IColumn> migrations, int version) throws IOException
     {
-        ByteArrayOutputStream bout = new ByteArrayOutputStream();
+        FastByteArrayOutputStream bout = new FastByteArrayOutputStream();
         DataOutputStream dout = new DataOutputStream(bout);
         dout.writeInt(migrations.size());
         // riddle me this: how do we know that these binary values (which contained serialized row mutations) are compatible
@@ -248,7 +263,7 @@
     public static Collection<Column> makeColumns(Message msg) throws IOException
     {
         Collection<Column> cols = new ArrayList<Column>();
-        DataInputStream in = new DataInputStream(new ByteArrayInputStream(msg.getMessageBody()));
+        DataInputStream in = new DataInputStream(new FastByteArrayInputStream(msg.getMessageBody()));
         int count = in.readInt();
         for (int i = 0; i < count; i++)
         {
Index: org/apache/cassandra/service/StorageProxy.java
===================================================================
--- org/apache/cassandra/service/StorageProxy.java	(revision 1134295)
+++ org/apache/cassandra/service/StorageProxy.java	(working copy)
@@ -18,47 +18,77 @@
 
 package org.apache.cassandra.service;
 
-import java.io.*;
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Multimap;
-import org.apache.cassandra.net.CachingMessageProducer;
-import org.apache.cassandra.net.MessageProducer;
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.CounterMutation;
+import org.apache.cassandra.db.IMutation;
+import org.apache.cassandra.db.IndexScanCommand;
+import org.apache.cassandra.db.RangeSliceCommand;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.ReadResponse;
+import org.apache.cassandra.db.ReadVerbHandler;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.Truncation;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Bounds;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.io.util.FastByteArrayOutputStream;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.net.CachingMessageProducer;
 import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageProducer;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.*;
 import org.apache.cassandra.thrift.ConsistencyLevel;
 import org.apache.cassandra.thrift.IndexClause;
 import org.apache.cassandra.thrift.InvalidRequestException;
 import org.apache.cassandra.thrift.SlicePredicate;
 import org.apache.cassandra.thrift.UnavailableException;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.LatencyTracker;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.WrappedRunnable;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multimap;
+
 public class StorageProxy implements StorageProxyMBean
 {
     private static final Logger logger = LoggerFactory.getLogger(StorageProxy.class);
@@ -314,7 +344,7 @@
                     {
                         InetAddress destination = iter.next();
                         // group all nodes in this DC as forward headers on the primary message
-                        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+                        FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
                         DataOutputStream dos = new DataOutputStream(bos);
 
                         // append to older addresses
@@ -334,7 +364,7 @@
 
     private static void addHintHeader(Message message, InetAddress target) throws IOException
     {
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
         byte[] previousHints = message.getHeader(RowMutation.HINT);
         if (previousHints != null)
Index: org/apache/cassandra/streaming/StreamReply.java
===================================================================
--- org/apache/cassandra/streaming/StreamReply.java	(revision 1134295)
+++ org/apache/cassandra/streaming/StreamReply.java	(working copy)
@@ -21,12 +21,12 @@
  */
 
 
-import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 
 import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.util.FastByteArrayOutputStream;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessageProducer;
 import org.apache.cassandra.service.StorageService;
@@ -56,7 +56,7 @@
 
     public Message getMessage(Integer version) throws IOException
     {
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream( bos );
         serializer.serialize(this, dos, version);
         return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.STREAM_REPLY, bos.toByteArray(), version);
Index: org/apache/cassandra/streaming/StreamReplyVerbHandler.java
===================================================================
--- org/apache/cassandra/streaming/StreamReplyVerbHandler.java	(revision 1134295)
+++ org/apache/cassandra/streaming/StreamReplyVerbHandler.java	(working copy)
@@ -21,17 +21,16 @@
  *
  */
 
-import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOError;
 import java.io.IOException;
 
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
-
 public class StreamReplyVerbHandler implements IVerbHandler
 {
     private static Logger logger = LoggerFactory.getLogger(StreamReplyVerbHandler.class);
@@ -39,7 +38,7 @@
     public void doVerb(Message message, String id)
     {
         byte[] body = message.getMessageBody();
-        ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
+        FastByteArrayInputStream bufIn = new FastByteArrayInputStream(body);
 
         try
         {
Index: org/apache/cassandra/streaming/StreamRequestMessage.java
===================================================================
--- org/apache/cassandra/streaming/StreamRequestMessage.java	(revision 1134295)
+++ org/apache/cassandra/streaming/StreamRequestMessage.java	(working copy)
@@ -21,18 +21,21 @@
  */
 
 
-import java.io.*;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOError;
+import java.io.IOException;
 import java.net.InetAddress;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 
-import com.google.common.collect.Iterables;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.util.FastByteArrayOutputStream;
 import org.apache.cassandra.net.CompactEndpointSerializationHelper;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessageProducer;
@@ -40,6 +43,8 @@
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
+import com.google.common.collect.Iterables;
+
 /**
 * This class encapsulates the message that needs to be sent to nodes
 * that handoff data. The message contains information about ranges
@@ -96,7 +101,7 @@
     
     public Message getMessage(Integer version)
     {
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
         try
         {
Index: org/apache/cassandra/streaming/StreamRequestVerbHandler.java
===================================================================
--- org/apache/cassandra/streaming/StreamRequestVerbHandler.java	(revision 1134295)
+++ org/apache/cassandra/streaming/StreamRequestVerbHandler.java	(working copy)
@@ -18,17 +18,16 @@
 
  package org.apache.cassandra.streaming;
 
- import java.io.ByteArrayInputStream;
  import java.io.DataInputStream;
- import java.io.IOError;
- import java.io.IOException;
+import java.io.IOError;
+import java.io.IOException;
 
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
- import org.apache.cassandra.net.IVerbHandler;
- import org.apache.cassandra.net.Message;
-
  /**
  * This verb handler handles the StreamRequestMessage that is sent by
  * the node requesting range transfer.
@@ -43,7 +42,7 @@
             logger.debug("Received a StreamRequestMessage from {}", message.getFrom());
 
         byte[] body = message.getMessageBody();
-        ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
+        FastByteArrayInputStream bufIn = new FastByteArrayInputStream(body);
         try
         {
             StreamRequestMessage srm = StreamRequestMessage.serializer().deserialize(new DataInputStream(bufIn), message.getVersion());
)

> Re-introduce FastByteArrayInputStream (and Output equivalent)
> -------------------------------------------------------------
>
>                 Key: CASSANDRA-2820
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2820
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>    Affects Versions: 0.8.0
>         Environment: n/a
>            Reporter: Paul Loy
>            Priority: Minor
>              Labels: bytearrayinputstream, bytearrayoutputstream, license, synchronized
>
> In https://issues.apache.org/jira/browse/CASSANDRA-37 FastByteArrayInputStream and FastByteArrayOutputStream were removed due to being code copied from the JDK and then subsequently modified. The JDK license is incompatible with Apache 2 license so the code had to go.
> I have since had a look at the performance of the JDK ByteArrayInputStream and a FastByteArrayInputStream (i.e. one with synchronized methods made un-synchronized) and seen the difference is significant.
> After a warmup-period of >10000 loops I get the following for 10000 loops through a 128000 byte array:
> bais : 3513ms
> fbais: 72ms
> This varies depending on the OS, machine and Java version, but it's always in favour of the FastByteArrayInputStream as you might expect.
> Then, at Jonathan Ellis' suggestion, I tried this using a modified Apache Harmony ByteArrayInputStream - i.e. one whose license is compatible - and the results were the same. A significant boost.
> I will attach a patch with changes for the 0.8.0 tag.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira