You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/03/31 18:28:47 UTC
[2/3] cassandra git commit: Constrain internode message buffer sizes,
and improve IO class hierarchy
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java b/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java
new file mode 100644
index 0000000..31abfa8
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.util;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.utils.memory.MemoryUtil;
+
+import com.google.common.base.Function;
+
+/**
+ * Base class for DataOutput implementations that does not have an optimized implementations of Plus methods
+ * and does no buffering.
+ * <p/>
+ * Unlike BufferedDataOutputStreamPlus this is capable of operating as an unbuffered output stream.
+ * Currently necessary because SequentialWriter implements its own buffering along with mark/reset/truncate.
+ */
+public abstract class UnbufferedDataOutputStreamPlus extends DataOutputStreamPlus
+{
+ protected UnbufferedDataOutputStreamPlus()
+ {
+ super();
+ }
+
+ protected UnbufferedDataOutputStreamPlus(WritableByteChannel channel)
+ {
+ super(channel);
+ }
+
+ /*
+ !! DataOutput methods below are copied from the implementation in Apache Harmony RandomAccessFile.
+ */
+
+ /**
+ * Writes the entire contents of the byte array <code>buffer</code> to
+ * this RandomAccessFile starting at the current file pointer.
+ *
+ * @param buffer the buffer to be written.
+ * @throws IOException If an error occurs trying to write to this RandomAccessFile.
+ */
+ public void write(byte[] buffer) throws IOException
+ {
+ write(buffer, 0, buffer.length);
+ }
+
+ /**
+ * Writes <code>count</code> bytes from the byte array <code>buffer</code>
+ * starting at <code>offset</code> to this RandomAccessFile starting at
+ * the current file pointer..
+ *
+ * @param buffer the bytes to be written
+ * @param offset offset in buffer to get bytes
+ * @param count number of bytes in buffer to write
+ * @throws IOException If an error occurs attempting to write to this
+ * RandomAccessFile.
+ * @throws IndexOutOfBoundsException If offset or count are outside of bounds.
+ */
+ public abstract void write(byte[] buffer, int offset, int count) throws IOException;
+
+ /**
+ * Writes the specified byte <code>oneByte</code> to this RandomAccessFile
+ * starting at the current file pointer. Only the low order byte of
+ * <code>oneByte</code> is written.
+ *
+ * @param oneByte the byte to be written
+ * @throws IOException If an error occurs attempting to write to this
+ * RandomAccessFile.
+ */
+ public abstract void write(int oneByte) throws IOException;
+
+ /**
+ * Writes a boolean to this output stream.
+ *
+ * @param val the boolean value to write to the OutputStream
+ * @throws IOException If an error occurs attempting to write to this
+ * DataOutputStream.
+ */
+ public final void writeBoolean(boolean val) throws IOException
+ {
+ write(val ? 1 : 0);
+ }
+
+ /**
+ * Writes a 8-bit byte to this output stream.
+ *
+ * @param val the byte value to write to the OutputStream
+ * @throws java.io.IOException If an error occurs attempting to write to this
+ * DataOutputStream.
+ */
+ public final void writeByte(int val) throws IOException
+ {
+ write(val & 0xFF);
+ }
+
+ /**
+ * Writes the low order 8-bit bytes from a String to this output stream.
+ *
+ * @param str the String containing the bytes to write to the OutputStream
+ * @throws IOException If an error occurs attempting to write to this
+ * DataOutputStream.
+ */
+ public final void writeBytes(String str) throws IOException
+ {
+ byte bytes[] = new byte[str.length()];
+ for (int index = 0; index < str.length(); index++)
+ {
+ bytes[index] = (byte) (str.charAt(index) & 0xFF);
+ }
+ write(bytes);
+ }
+
+ /**
+ * Writes the specified 16-bit character to the OutputStream. Only the lower
+ * 2 bytes are written with the higher of the 2 bytes written first. This
+ * represents the Unicode value of val.
+ *
+ * @param val the character to be written
+ * @throws IOException If an error occurs attempting to write to this
+ * DataOutputStream.
+ */
+ public final void writeChar(int val) throws IOException
+ {
+ write((val >>> 8) & 0xFF);
+ write((val >>> 0) & 0xFF);
+ }
+
+ /**
+ * Writes the specified 16-bit characters contained in str to the
+ * OutputStream. Only the lower 2 bytes of each character are written with
+ * the higher of the 2 bytes written first. This represents the Unicode
+ * value of each character in str.
+ *
+ * @param str the String whose characters are to be written.
+ * @throws IOException If an error occurs attempting to write to this
+ * DataOutputStream.
+ */
+ public final void writeChars(String str) throws IOException
+ {
+ byte newBytes[] = new byte[str.length() * 2];
+ for (int index = 0; index < str.length(); index++)
+ {
+ int newIndex = index == 0 ? index : index * 2;
+ newBytes[newIndex] = (byte) ((str.charAt(index) >> 8) & 0xFF);
+ newBytes[newIndex + 1] = (byte) (str.charAt(index) & 0xFF);
+ }
+ write(newBytes);
+ }
+
+ /**
+ * Writes a 64-bit double to this output stream. The resulting output is the
+ * 8 bytes resulting from calling Double.doubleToLongBits().
+ *
+ * @param val the double to be written.
+ * @throws IOException If an error occurs attempting to write to this
+ * DataOutputStream.
+ */
+ public final void writeDouble(double val) throws IOException
+ {
+ writeLong(Double.doubleToLongBits(val));
+ }
+
+ /**
+ * Writes a 32-bit float to this output stream. The resulting output is the
+ * 4 bytes resulting from calling Float.floatToIntBits().
+ *
+ * @param val the float to be written.
+ * @throws IOException If an error occurs attempting to write to this
+ * DataOutputStream.
+ */
+ public final void writeFloat(float val) throws IOException
+ {
+ writeInt(Float.floatToIntBits(val));
+ }
+
+ /**
+ * Writes a 32-bit int to this output stream. The resulting output is the 4
+ * bytes, highest order first, of val.
+ *
+ * @param val the int to be written.
+ * @throws IOException If an error occurs attempting to write to this
+ * DataOutputStream.
+ */
+ public void writeInt(int val) throws IOException
+ {
+ write((val >>> 24) & 0xFF);
+ write((val >>> 16) & 0xFF);
+ write((val >>> 8) & 0xFF);
+ write((val >>> 0) & 0xFF);
+ }
+
+ /**
+ * Writes a 64-bit long to this output stream. The resulting output is the 8
+ * bytes, highest order first, of val.
+ *
+ * @param val the long to be written.
+ * @throws IOException If an error occurs attempting to write to this
+ * DataOutputStream.
+ */
+ public void writeLong(long val) throws IOException
+ {
+ write((int) (val >>> 56) & 0xFF);
+ write((int) (val >>> 48) & 0xFF);
+ write((int) (val >>> 40) & 0xFF);
+ write((int) (val >>> 32) & 0xFF);
+ write((int) (val >>> 24) & 0xFF);
+ write((int) (val >>> 16) & 0xFF);
+ write((int) (val >>> 8) & 0xFF);
+ write((int) (val >>> 0) & 0xFF);
+ }
+
+ /**
+ * Writes the specified 16-bit short to the OutputStream. Only the lower 2
+ * bytes are written with the higher of the 2 bytes written first.
+ *
+ * @param val the short to be written
+ * @throws IOException If an error occurs attempting to write to this
+ * DataOutputStream.
+ */
+ public void writeShort(int val) throws IOException
+ {
+ writeChar(val);
+ }
+
+ /**
+ * Writes the specified String out in UTF format to the provided DataOutput
+ *
+ * @param str the String to be written in UTF format.
+ * @param out the DataOutput to write the UTF encoded string to
+ * @throws IOException If an error occurs attempting to write to this
+ * DataOutputStream.
+ */
+ public static void writeUTF(String str, DataOutput out) throws IOException
+ {
+ int length = str.length();
+ int utfCount = calculateUTFLength(str, length);
+
+ if (utfCount > 65535)
+ throw new UTFDataFormatException(); //$NON-NLS-1$
+
+ byte[] utfBytes = retrieveTemporaryBuffer(utfCount + 2);
+
+ int utfIndex = 2;
+ utfBytes[0] = (byte) (utfCount >> 8);
+ utfBytes[1] = (byte) utfCount;
+ int bufferLength = utfBytes.length;
+
+ if (utfCount == length && utfCount + utfIndex < bufferLength)
+ {
+ for (int charIndex = 0 ; charIndex < length ; charIndex++)
+ utfBytes[utfIndex++] = (byte) str.charAt(charIndex);
+ }
+ else
+ {
+ int charIndex = 0;
+ while (charIndex < length)
+ {
+ char ch = str.charAt(charIndex);
+ int sizeOfChar = sizeOfChar(ch);
+ if (utfIndex + sizeOfChar > bufferLength)
+ {
+ out.write(utfBytes, 0, utfIndex);
+ utfIndex = 0;
+ }
+
+ switch (sizeOfChar)
+ {
+ case 3:
+ utfBytes[utfIndex] = (byte) (0xe0 | (0x0f & (ch >> 12)));
+ utfBytes[utfIndex + 1] = (byte) (0x80 | (0x3f & (ch >> 6)));
+ utfBytes[utfIndex + 2] = (byte) (0x80 | (0x3f & ch));
+ break;
+ case 2:
+ utfBytes[utfIndex] = (byte) (0xc0 | (0x1f & (ch >> 6)));
+ utfBytes[utfIndex + 1] = (byte) (0x80 | (0x3f & ch));
+ break;
+ case 1:
+ utfBytes[utfIndex] = (byte) ch;
+ break;
+ default:
+ throw new IllegalStateException();
+ }
+ utfIndex += sizeOfChar;
+ charIndex++;
+ }
+ }
+ out.write(utfBytes, 0, utfIndex);
+ }
+
+ /*
+ * Factored out into separate method to create more flexibility around inlining
+ */
+ private static int calculateUTFLength(String str, int length)
+ {
+ int utfCount = 0;
+ for (int i = 0; i < length; i++)
+ utfCount += sizeOfChar(str.charAt(i));
+ return utfCount;
+ }
+
+ private static int sizeOfChar(int ch)
+ {
+ // wrap 0 around to max, because it requires 3 bytes
+ return 1
+ // if >= 128, we need an extra byte, so we divide by 128 and check the value is > 0
+ // (by negating it and taking the sign bit)
+ + (-(ch / 128) >>> 31)
+ // if >= 2048, or == 0, we need another extra byte; we subtract one and wrap around,
+ // so we only then need to confirm it is greater than 2047
+ + (-(((ch - 1) & 0xffff) / 2047) >>> 31);
+ }
+
+ /**
+ * Writes the specified String out in UTF format.
+ *
+ * @param str the String to be written in UTF format.
+ * @throws IOException If an error occurs attempting to write to this
+ * DataOutputStream.
+ */
+ public final void writeUTF(String str) throws IOException
+ {
+ writeUTF(str, this);
+ }
+
+ // ByteBuffer to use for defensive copies
+ private final ByteBuffer hollowBufferD = MemoryUtil.getHollowDirectByteBuffer();
+
+ @Override
+ public void write(ByteBuffer buf) throws IOException
+ {
+ if (buf.hasArray())
+ {
+ write(buf.array(), buf.arrayOffset() + buf.position(), buf.remaining());
+ }
+ else
+ {
+ assert buf.isDirect();
+ MemoryUtil.duplicateDirectByteBuffer(buf, hollowBufferD);
+ while (hollowBufferD.hasRemaining())
+ channel.write(hollowBufferD);
+ }
+ }
+
+ public void write(Memory memory, long offset, long length) throws IOException
+ {
+ for (ByteBuffer buffer : memory.asByteBuffers(offset, length))
+ write(buffer);
+ }
+
+ @Override
+ public <R> R applyToChannel(Function<WritableByteChannel, R> f) throws IOException
+ {
+ return f.apply(channel);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/WrappedDataOutputStreamPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/WrappedDataOutputStreamPlus.java b/src/java/org/apache/cassandra/io/util/WrappedDataOutputStreamPlus.java
new file mode 100644
index 0000000..d8c8f0c
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/WrappedDataOutputStreamPlus.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.util;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * When possible use {@link WrappedDataOutputStreamPlus} instead of this class, as it will
+ * be more efficient when using Plus methods. This class is only for situations where it cannot be used.
+ *
+ * The channel provided by this class is just a wrapper around the output stream.
+ */
+public class WrappedDataOutputStreamPlus extends UnbufferedDataOutputStreamPlus
+{
+ protected final OutputStream out;
+ public WrappedDataOutputStreamPlus(OutputStream out)
+ {
+ super();
+ this.out = out;
+ }
+
+ public WrappedDataOutputStreamPlus(OutputStream out, WritableByteChannel channel)
+ {
+ super(channel);
+ this.out = out;
+ }
+
+ @Override
+ public void write(byte[] buffer, int offset, int count) throws IOException
+ {
+ out.write(buffer, offset, count);
+ }
+
+ @Override
+ public void write(int oneByte) throws IOException
+ {
+ out.write(oneByte);
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ out.close();
+ }
+
+ @Override
+ public void flush() throws IOException
+ {
+ out.flush();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
index e7d434b..e94f15f 100644
--- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
@@ -30,12 +30,13 @@ import net.jpountz.lz4.LZ4BlockInputStream;
import net.jpountz.lz4.LZ4FastDecompressor;
import net.jpountz.lz4.LZ4Factory;
import net.jpountz.xxhash.XXHashFactory;
-import org.xerial.snappy.SnappyInputStream;
import org.apache.cassandra.config.Config;
+import org.xerial.snappy.SnappyInputStream;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.UnknownColumnFamilyException;
import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.io.util.NIODataInputStream;
public class IncomingTcpConnection extends Thread
{
@@ -109,7 +110,7 @@ public class IncomingTcpConnection extends Thread
DataOutputStream out = new DataOutputStream(socket.getOutputStream());
out.writeInt(MessagingService.current_version);
out.flush();
- DataInputStream in = new DataInputStream(socket.getInputStream());
+ DataInput in = new DataInputStream(socket.getInputStream());
int maxVersion = in.readInt();
from = CompactEndpointSerializationHelper.deserialize(in);
@@ -135,7 +136,7 @@ public class IncomingTcpConnection extends Thread
}
else
{
- in = new DataInputStream(new BufferedInputStream(socket.getInputStream(), BUFFER_SIZE));
+ in = new NIODataInputStream(socket.getChannel(), BUFFER_SIZE);
}
if (version > MessagingService.current_version)
@@ -154,7 +155,7 @@ public class IncomingTcpConnection extends Thread
}
}
- private InetAddress receiveMessage(DataInputStream input, int version) throws IOException
+ private InetAddress receiveMessage(DataInput input, int version) throws IOException
{
int id;
if (version < MessagingService.VERSION_20)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index dc43106..18ad6c1 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.net;
-import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
@@ -43,6 +42,8 @@ import net.jpountz.lz4.LZ4Factory;
import net.jpountz.xxhash.XXHashFactory;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
+import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
import org.apache.cassandra.tracing.TraceState;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.CoalescingStrategies;
@@ -398,7 +399,8 @@ public class OutboundTcpConnection extends Thread
logger.warn("Failed to set send buffer size on internode socket.", se);
}
}
- out = new DataOutputStreamPlus(new BufferedOutputStream(socket.getOutputStream(), BUFFER_SIZE));
+
+ out = new BufferedDataOutputStreamPlus(socket.getChannel(), BUFFER_SIZE);
out.writeInt(MessagingService.PROTOCOL_MAGIC);
writeHeader(out, targetVersion, shouldCompressConnection());
@@ -445,14 +447,14 @@ public class OutboundTcpConnection extends Thread
if (targetVersion < MessagingService.VERSION_21)
{
// Snappy is buffered, so no need for extra buffering output stream
- out = new DataOutputStreamPlus(new SnappyOutputStream(socket.getOutputStream()));
+ out = new WrappedDataOutputStreamPlus(new SnappyOutputStream(socket.getOutputStream()));
}
else
{
// TODO: custom LZ4 OS that supports BB write methods
LZ4Compressor compressor = LZ4Factory.fastestInstance().fastCompressor();
Checksum checksum = XXHashFactory.fastestInstance().newStreamingHash32(LZ4_HASH_SEED).asChecksum();
- out = new DataOutputStreamPlus(new LZ4BlockOutputStream(socket.getOutputStream(),
+ out = new WrappedDataOutputStreamPlus(new LZ4BlockOutputStream(socket.getOutputStream(),
1 << 14, // 16k block size
compressor,
checksum,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/service/GCInspector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/GCInspector.java b/src/java/org/apache/cassandra/service/GCInspector.java
index c4bffac..ef7f1e2 100644
--- a/src/java/org/apache/cassandra/service/GCInspector.java
+++ b/src/java/org/apache/cassandra/service/GCInspector.java
@@ -19,11 +19,14 @@ package org.apache.cassandra.service;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryUsage;
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+
import javax.management.MBeanServer;
import javax.management.Notification;
import javax.management.NotificationListener;
@@ -34,6 +37,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.sun.management.GarbageCollectionNotificationInfo;
+
import org.apache.cassandra.io.sstable.SSTableDeletingTask;
import org.apache.cassandra.utils.StatusLogger;
@@ -43,6 +47,29 @@ public class GCInspector implements NotificationListener, GCInspectorMXBean
private static final Logger logger = LoggerFactory.getLogger(GCInspector.class);
final static long MIN_LOG_DURATION = 200;
final static long MIN_LOG_DURATION_TPSTATS = 1000;
+ /*
+ * The field from java.nio.Bits that tracks the total number of allocated
+ * bytes of direct memory requires via ByteBuffer.allocateDirect that have not been GCed.
+ */
+ final static Field BITS_TOTAL_CAPACITY;
+
+ static
+ {
+ Field temp = null;
+ try
+ {
+ Class<?> bitsClass = Class.forName("java.nio.Bits");
+ Field f = bitsClass.getDeclaredField("totalCapacity");
+ f.setAccessible(true);
+ temp = f;
+ }
+ catch (Throwable t)
+ {
+ logger.debug("Error accessing field of java.nio.Bits", t);
+ //Don't care, will just return the dummy value -1 if we can't get at the field in this JVM
+ }
+ BITS_TOTAL_CAPACITY = temp;
+ }
static final class State
{
@@ -160,13 +187,30 @@ public class GCInspector implements NotificationListener, GCInspectorMXBean
public double[] getAndResetStats()
{
State state = getTotalSinceLastCheck();
- double[] r = new double[6];
+ double[] r = new double[7];
r[0] = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - state.startNanos);
r[1] = state.maxRealTimeElapsed;
r[2] = state.totalRealTimeElapsed;
r[3] = state.sumSquaresRealTimeElapsed;
r[4] = state.totalBytesReclaimed;
r[5] = state.count;
+ r[6] = getAllocatedDirectMemory();
+
return r;
}
+
+ private static long getAllocatedDirectMemory()
+ {
+ if (BITS_TOTAL_CAPACITY == null) return -1;
+ try
+ {
+ return BITS_TOTAL_CAPACITY.getLong(null);
+ }
+ catch (Throwable t)
+ {
+ logger.trace("Error accessing field of java.nio.Bits", t);
+ //Don't care how or why we failed to get the value in this JVM. Return -1 to indicate failure
+ return -1;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/service/pager/PagingState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/PagingState.java b/src/java/org/apache/cassandra/service/pager/PagingState.java
index bbae921..43d3cb8 100644
--- a/src/java/org/apache/cassandra/service/pager/PagingState.java
+++ b/src/java/org/apache/cassandra/service/pager/PagingState.java
@@ -65,7 +65,7 @@ public class PagingState
ByteBufferUtil.writeWithShortLength(partitionKey, out);
ByteBufferUtil.writeWithShortLength(cellName, out);
out.writeInt(remaining);
- return out.asByteBuffer();
+ return out.buffer();
}
catch (IOException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index 7a7ccbf..780018c 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.streaming;
+import java.io.BufferedOutputStream;
import java.io.IOException;
import java.net.Socket;
import java.net.SocketException;
@@ -33,10 +34,12 @@ import java.util.concurrent.atomic.AtomicReference;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
+import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
import org.apache.cassandra.streaming.messages.StreamInitMessage;
import org.apache.cassandra.streaming.messages.StreamMessage;
import org.apache.cassandra.utils.FBUtilities;
@@ -154,13 +157,13 @@ public class ConnectionHandler
protected abstract String name();
- protected static DataOutputStreamAndChannel getWriteChannel(Socket socket) throws IOException
+ protected static DataOutputStreamPlus getWriteChannel(Socket socket) throws IOException
{
WritableByteChannel out = socket.getChannel();
// socket channel is null when encrypted(SSL)
if (out == null)
- out = Channels.newChannel(socket.getOutputStream());
- return new DataOutputStreamAndChannel(socket.getOutputStream(), out);
+ return new WrappedDataOutputStreamPlus(new BufferedOutputStream(socket.getOutputStream()));
+ return new BufferedDataOutputStreamPlus(out);
}
protected static ReadableByteChannel getReadChannel(Socket socket) throws IOException
@@ -182,7 +185,9 @@ public class ConnectionHandler
isForOutgoing,
session.keepSSTableLevel());
ByteBuffer messageBuf = message.createMessage(false, protocolVersion);
- getWriteChannel(socket).write(messageBuf);
+ DataOutputStreamPlus out = getWriteChannel(socket);
+ out.write(messageBuf);
+ out.flush();
}
public void start(Socket socket, int protocolVersion)
@@ -308,7 +313,7 @@ public class ConnectionHandler
{
try
{
- DataOutputStreamAndChannel out = getWriteChannel(socket);
+ DataOutputStreamPlus out = getWriteChannel(socket);
StreamMessage next;
while (!isClosed())
@@ -340,11 +345,12 @@ public class ConnectionHandler
}
}
- private void sendMessage(DataOutputStreamAndChannel out, StreamMessage message)
+ private void sendMessage(DataOutputStreamPlus out, StreamMessage message)
{
try
{
StreamMessage.serialize(message, out, protocolVersion, session);
+ out.flush();
}
catch (SocketException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/StreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamWriter.java b/src/java/org/apache/cassandra/streaming/StreamWriter.java
index 93903a7..392dccd 100644
--- a/src/java/org/apache/cassandra/streaming/StreamWriter.java
+++ b/src/java/org/apache/cassandra/streaming/StreamWriter.java
@@ -20,8 +20,6 @@ package org.apache.cassandra.streaming;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
import java.util.Collection;
import com.ning.compress.lzf.LZFOutputStream;
@@ -30,6 +28,7 @@ import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.DataIntegrityMetadata;
import org.apache.cassandra.io.util.DataIntegrityMetadata.ChecksumValidator;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.streaming.StreamManager.StreamRateLimiter;
@@ -65,10 +64,10 @@ public class StreamWriter
*
* StreamWriter uses LZF compression on wire to decrease size to transfer.
*
- * @param channel where this writes data to
+ * @param output where this writes data to
* @throws IOException on any I/O error
*/
- public void write(WritableByteChannel channel) throws IOException
+ public void write(DataOutputStreamPlus output) throws IOException
{
long totalSize = totalSize();
RandomAccessReader file = sstable.openDataReader();
@@ -78,7 +77,7 @@ public class StreamWriter
transferBuffer = validator == null ? new byte[DEFAULT_CHUNK_SIZE] : new byte[validator.chunkSize];
// setting up data compression stream
- compressedOutput = new LZFOutputStream(Channels.newOutputStream(channel));
+ compressedOutput = new LZFOutputStream(output);
long progress = 0L;
try
@@ -106,7 +105,7 @@ public class StreamWriter
readOffset = 0;
}
- // make sure that current section is send
+ // make sure that current section is sent
compressedOutput.flush();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
index 786ff23..063a49a 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
@@ -24,8 +24,12 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import com.google.common.base.Function;
+
+import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.streaming.ProgressInfo;
@@ -49,11 +53,11 @@ public class CompressedStreamWriter extends StreamWriter
}
@Override
- public void write(WritableByteChannel channel) throws IOException
+ public void write(DataOutputStreamPlus out) throws IOException
{
long totalSize = totalSize();
RandomAccessReader file = sstable.openDataReader();
- FileChannel fc = file.getChannel();
+ final FileChannel fc = file.getChannel();
long progress = 0L;
// calculate chunks to transfer. we want to send continuous chunks altogether.
@@ -61,7 +65,7 @@ public class CompressedStreamWriter extends StreamWriter
try
{
// stream each of the required sections of the file
- for (Pair<Long, Long> section : sections)
+ for (final Pair<Long, Long> section : sections)
{
// length of the section to stream
long length = section.right - section.left;
@@ -69,9 +73,23 @@ public class CompressedStreamWriter extends StreamWriter
long bytesTransferred = 0;
while (bytesTransferred < length)
{
- int toTransfer = (int) Math.min(CHUNK_SIZE, length - bytesTransferred);
+ final long bytesTransferredFinal = bytesTransferred;
+ final int toTransfer = (int) Math.min(CHUNK_SIZE, length - bytesTransferred);
limiter.acquire(toTransfer);
- long lastWrite = fc.transferTo(section.left + bytesTransferred, toTransfer, channel);
+ long lastWrite = out.applyToChannel( new Function<WritableByteChannel, Long>()
+ {
+ public Long apply(WritableByteChannel wbc)
+ {
+ try
+ {
+ return fc.transferTo(section.left + bytesTransferredFinal, toTransfer, wbc);
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, sstable.getFilename());
+ }
+ }
+ });
bytesTransferred += lastWrite;
progress += lastWrite;
session.progress(sstable.descriptor, ProgressInfo.Direction.OUT, progress, totalSize);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java b/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java
index ec9c66c..b555f64 100644
--- a/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.streaming.messages;
import java.io.IOException;
import java.nio.channels.ReadableByteChannel;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.streaming.StreamSession;
public class CompleteMessage extends StreamMessage
@@ -32,7 +32,7 @@ public class CompleteMessage extends StreamMessage
return new CompleteMessage();
}
- public void serialize(CompleteMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException {}
+ public void serialize(CompleteMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException {}
};
public CompleteMessage()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
index 237fb70..33298bf 100644
--- a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
@@ -23,7 +23,7 @@ import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import org.apache.cassandra.io.sstable.format.SSTableWriter;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.streaming.StreamReader;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.streaming.compress.CompressedStreamReader;
@@ -55,7 +55,7 @@ public class IncomingFileMessage extends StreamMessage
}
}
- public void serialize(IncomingFileMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException
+ public void serialize(IncomingFileMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
{
throw new UnsupportedOperationException("Not allowed to call serialize on an incoming file");
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
index 7047c84..bfa02fa 100644
--- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
@@ -23,13 +23,12 @@ import java.util.List;
import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.streaming.StreamWriter;
import org.apache.cassandra.streaming.compress.CompressedStreamWriter;
import org.apache.cassandra.streaming.compress.CompressionInfo;
import org.apache.cassandra.utils.Pair;
-
import org.apache.cassandra.utils.concurrent.Ref;
/**
@@ -44,7 +43,7 @@ public class OutgoingFileMessage extends StreamMessage
throw new UnsupportedOperationException("Not allowed to call deserialize on an outgoing file");
}
- public void serialize(OutgoingFileMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException
+ public void serialize(OutgoingFileMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
{
FileMessageHeader.serializer.serialize(message.header, out, version);
@@ -54,7 +53,7 @@ public class OutgoingFileMessage extends StreamMessage
new CompressedStreamWriter(reader,
message.header.sections,
message.header.compressionInfo, session);
- writer.write(out.getChannel());
+ writer.write(out);
session.fileSent(message.header);
}
};
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java b/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java
index 7efe075..004df18 100644
--- a/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java
@@ -23,7 +23,7 @@ import java.nio.channels.ReadableByteChannel;
import java.util.ArrayList;
import java.util.Collection;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.streaming.StreamRequest;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.streaming.StreamSummary;
@@ -47,7 +47,7 @@ public class PrepareMessage extends StreamMessage
return message;
}
- public void serialize(PrepareMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException
+ public void serialize(PrepareMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
{
// requests
out.writeInt(message.requests.size());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java b/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
index f206d0d..1255947 100644
--- a/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
@@ -22,7 +22,7 @@ import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.UUID;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.utils.UUIDSerializer;
@@ -37,7 +37,7 @@ public class ReceivedMessage extends StreamMessage
return new ReceivedMessage(UUIDSerializer.serializer.deserialize(input, MessagingService.current_version), input.readInt());
}
- public void serialize(ReceivedMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException
+ public void serialize(ReceivedMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
{
UUIDSerializer.serializer.serialize(message.cfId, out, MessagingService.current_version);
out.writeInt(message.sequenceNumber);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java b/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java
index 8d5707a..29e84bf 100644
--- a/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java
@@ -22,7 +22,7 @@ import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.UUID;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.utils.UUIDSerializer;
@@ -37,7 +37,7 @@ public class RetryMessage extends StreamMessage
return new RetryMessage(UUIDSerializer.serializer.deserialize(input, MessagingService.current_version), input.readInt());
}
- public void serialize(RetryMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException
+ public void serialize(RetryMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
{
UUIDSerializer.serializer.serialize(message.cfId, out, MessagingService.current_version);
out.writeInt(message.sequenceNumber);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java b/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java
index ae15620..46f49d6 100644
--- a/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.streaming.messages;
import java.io.IOException;
import java.nio.channels.ReadableByteChannel;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.streaming.StreamSession;
public class SessionFailedMessage extends StreamMessage
@@ -32,7 +32,7 @@ public class SessionFailedMessage extends StreamMessage
return new SessionFailedMessage();
}
- public void serialize(SessionFailedMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException {}
+ public void serialize(SessionFailedMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException {}
};
public SessionFailedMessage()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
index 20490db..8e3eeef 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
@@ -21,7 +21,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.streaming.StreamSession;
/**
@@ -36,7 +36,7 @@ public abstract class StreamMessage
public static final int VERSION_30 = 3;
public static final int CURRENT_VERSION = VERSION_30;
- public static void serialize(StreamMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException
+ public static void serialize(StreamMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
{
ByteBuffer buff = ByteBuffer.allocate(1);
// message type
@@ -67,7 +67,7 @@ public abstract class StreamMessage
public static interface Serializer<V extends StreamMessage>
{
V deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException;
- void serialize(V message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException;
+ void serialize(V message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException;
}
/** StreamMessage types */
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 5b49ae3..e92e0c6 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -1867,6 +1867,10 @@ public class CassandraServer implements Cassandra.Iface
{
throw new InvalidRequestException("Error deflating query string.");
}
+ catch (IOException e)
+ {
+ throw new AssertionError(e);
+ }
return queryString;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 5a1d6b4..2805c52 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -1186,7 +1186,7 @@ public class NodeProbe implements AutoCloseable
}
catch (Exception e)
{
- throw new RuntimeException("Error setting log for " + classQualifier +" on level " + level +". Please check logback configuration and ensure to have <jmxConfigurator /> set", e);
+ throw new RuntimeException("Error setting log for " + classQualifier +" on level " + level +". Please check logback configuration and ensure to have <jmxConfigurator /> set", e);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/tools/NodeTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java
index e6d4df6..fa6966c 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -2690,8 +2690,8 @@ public class NodeTool
double[] stats = probe.getAndResetGCStats();
double mean = stats[2] / stats[5];
double stdev = Math.sqrt((stats[3] / stats[5]) - (mean * mean));
- System.out.printf("%20s%20s%20s%20s%20s%20s%n", "Interval (ms)", "Max GC Elapsed (ms)", "Total GC Elapsed (ms)", "Stdev GC Elapsed (ms)", "GC Reclaimed (MB)", "Collections");
- System.out.printf("%20.0f%20.0f%20.0f%20.0f%20.0f%20.0f%n", stats[0], stats[1], stats[2], stdev, stats[4], stats[5]);
+ System.out.printf("%20s%20s%20s%20s%20s%20s%25s%n", "Interval (ms)", "Max GC Elapsed (ms)", "Total GC Elapsed (ms)", "Stdev GC Elapsed (ms)", "GC Reclaimed (MB)", "Collections", "Direct Memory Bytes");
+ System.out.printf("%20.0f%20.0f%20.0f%20.0f%20.0f%20.0f%25d%n", stats[0], stats[1], stats[2], stdev, stats[4], stats[5], (long)stats[6]);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/transport/CBUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java b/src/java/org/apache/cassandra/transport/CBUtil.java
index b37e0da..8f0dee0 100644
--- a/src/java/org/apache/cassandra/transport/CBUtil.java
+++ b/src/java/org/apache/cassandra/transport/CBUtil.java
@@ -36,6 +36,7 @@ import java.util.UUID;
import io.netty.buffer.*;
import io.netty.util.CharsetUtil;
+import org.apache.cassandra.config.Config;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.utils.Pair;
@@ -51,7 +52,8 @@ import org.apache.cassandra.utils.ByteBufferUtil;
*/
public abstract class CBUtil
{
- public static final ByteBufAllocator allocator = new PooledByteBufAllocator(true);
+ public static final boolean USE_HEAP_ALLOCATOR = Boolean.getBoolean(Config.PROPERTY_PREFIX + "netty_use_heap_allocator");
+ public static final ByteBufAllocator allocator = USE_HEAP_ALLOCATOR ? new UnpooledByteBufAllocator(false) : new PooledByteBufAllocator(true);
private CBUtil() {}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java b/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java
index 8304bd5..d2b2879 100644
--- a/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java
@@ -35,6 +35,10 @@ public abstract class MemoryUtil
private static final long DIRECT_BYTE_BUFFER_ADDRESS_OFFSET;
private static final long DIRECT_BYTE_BUFFER_CAPACITY_OFFSET;
private static final long DIRECT_BYTE_BUFFER_LIMIT_OFFSET;
+ private static final long DIRECT_BYTE_BUFFER_POSITION_OFFSET;
+ private static final Class<?> BYTE_BUFFER_CLASS;
+ private static final long BYTE_BUFFER_OFFSET_OFFSET;
+ private static final long BYTE_BUFFER_HB_OFFSET;
private static final long BYTE_ARRAY_BASE_OFFSET;
private static final boolean BIG_ENDIAN = ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN);
@@ -57,7 +61,14 @@ public abstract class MemoryUtil
DIRECT_BYTE_BUFFER_ADDRESS_OFFSET = unsafe.objectFieldOffset(Buffer.class.getDeclaredField("address"));
DIRECT_BYTE_BUFFER_CAPACITY_OFFSET = unsafe.objectFieldOffset(Buffer.class.getDeclaredField("capacity"));
DIRECT_BYTE_BUFFER_LIMIT_OFFSET = unsafe.objectFieldOffset(Buffer.class.getDeclaredField("limit"));
+ DIRECT_BYTE_BUFFER_POSITION_OFFSET = unsafe.objectFieldOffset(Buffer.class.getDeclaredField("position"));
DIRECT_BYTE_BUFFER_CLASS = clazz;
+
+ clazz = ByteBuffer.allocate(0).getClass();
+ BYTE_BUFFER_OFFSET_OFFSET = unsafe.objectFieldOffset(ByteBuffer.class.getDeclaredField("offset"));
+ BYTE_BUFFER_HB_OFFSET = unsafe.objectFieldOffset(ByteBuffer.class.getDeclaredField("hb"));
+ BYTE_BUFFER_CLASS = clazz;
+
BYTE_ARRAY_BASE_OFFSET = unsafe.arrayBaseOffset(byte[].class);
}
catch (Exception e)
@@ -144,6 +155,21 @@ public abstract class MemoryUtil
return instance;
}
+ public static ByteBuffer getHollowByteBuffer()
+ {
+ ByteBuffer instance;
+ try
+ {
+ instance = (ByteBuffer) unsafe.allocateInstance(BYTE_BUFFER_CLASS);
+ }
+ catch (InstantiationException e)
+ {
+ throw new AssertionError(e);
+ }
+ instance.order(ByteOrder.nativeOrder());
+ return instance;
+ }
+
public static void setByteBuffer(ByteBuffer instance, long address, int length)
{
unsafe.putLong(instance, DIRECT_BYTE_BUFFER_ADDRESS_OFFSET, address);
@@ -151,6 +177,27 @@ public abstract class MemoryUtil
unsafe.putInt(instance, DIRECT_BYTE_BUFFER_LIMIT_OFFSET, length);
}
+ public static ByteBuffer duplicateDirectByteBuffer(ByteBuffer source, ByteBuffer hollowBuffer)
+ {
+ assert(source.isDirect());
+ unsafe.putLong(hollowBuffer, DIRECT_BYTE_BUFFER_ADDRESS_OFFSET, unsafe.getLong(source, DIRECT_BYTE_BUFFER_ADDRESS_OFFSET));
+ unsafe.putInt(hollowBuffer, DIRECT_BYTE_BUFFER_POSITION_OFFSET, unsafe.getInt(source, DIRECT_BYTE_BUFFER_POSITION_OFFSET));
+ unsafe.putInt(hollowBuffer, DIRECT_BYTE_BUFFER_LIMIT_OFFSET, unsafe.getInt(source, DIRECT_BYTE_BUFFER_LIMIT_OFFSET));
+ unsafe.putInt(hollowBuffer, DIRECT_BYTE_BUFFER_CAPACITY_OFFSET, unsafe.getInt(source, DIRECT_BYTE_BUFFER_CAPACITY_OFFSET));
+ return hollowBuffer;
+ }
+
+ public static ByteBuffer duplicateByteBuffer(ByteBuffer source, ByteBuffer hollowBuffer)
+ {
+ assert(!source.isDirect());
+ unsafe.putInt(hollowBuffer, DIRECT_BYTE_BUFFER_POSITION_OFFSET, unsafe.getInt(source, DIRECT_BYTE_BUFFER_POSITION_OFFSET));
+ unsafe.putInt(hollowBuffer, DIRECT_BYTE_BUFFER_LIMIT_OFFSET, unsafe.getInt(source, DIRECT_BYTE_BUFFER_LIMIT_OFFSET));
+ unsafe.putInt(hollowBuffer, DIRECT_BYTE_BUFFER_CAPACITY_OFFSET, unsafe.getInt(source, DIRECT_BYTE_BUFFER_CAPACITY_OFFSET));
+ unsafe.putInt(hollowBuffer, BYTE_BUFFER_OFFSET_OFFSET, unsafe.getInt(source, BYTE_BUFFER_OFFSET_OFFSET));
+ unsafe.putObject(hollowBuffer, BYTE_BUFFER_HB_OFFSET, unsafe.getObject(source, BYTE_BUFFER_HB_OFFSET));
+ return hollowBuffer;
+ }
+
public static long getLongByByte(long address)
{
if (BIG_ENDIAN)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java b/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java
index 92612b6..fe43ff2 100644
--- a/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java
+++ b/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java
@@ -20,13 +20,13 @@ package org.apache.cassandra.utils.vint;
import java.io.IOException;
import java.io.OutputStream;
-import org.apache.cassandra.io.util.AbstractDataOutput;
+import org.apache.cassandra.io.util.UnbufferedDataOutputStreamPlus;
/**
* Borrows idea from
* https://developers.google.com/protocol-buffers/docs/encoding#varints
*/
-public class EncodedDataOutputStream extends AbstractDataOutput
+public class EncodedDataOutputStream extends UnbufferedDataOutputStreamPlus
{
private OutputStream out;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/AbstractSerializationsTester.java b/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
index 15e5d34..ebfa79d 100644
--- a/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
+++ b/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
@@ -21,7 +21,8 @@ package org.apache.cassandra;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
import org.apache.cassandra.net.MessagingService;
import java.io.DataInputStream;
@@ -65,10 +66,11 @@ public class AbstractSerializationsTester
return new DataInputStream(new FileInputStream(f));
}
- protected static DataOutputStreamAndChannel getOutput(String name) throws IOException
+ @SuppressWarnings("resource")
+ protected static DataOutputStreamPlus getOutput(String name) throws IOException
{
File f = new File("test/data/serialization/" + CUR_VER + "/" + name);
f.getParentFile().mkdirs();
- return new DataOutputStreamAndChannel(new FileOutputStream(f));
+ return new BufferedDataOutputStreamPlus(new FileOutputStream(f).getChannel());
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/test/unit/org/apache/cassandra/db/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/SerializationsTest.java b/test/unit/org/apache/cassandra/db/SerializationsTest.java
index f8e757a..a280448 100644
--- a/test/unit/org/apache/cassandra/db/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/db/SerializationsTest.java
@@ -32,7 +32,7 @@ import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.locator.SimpleStrategy;
import org.apache.cassandra.net.CallbackInfo;
import org.apache.cassandra.net.MessageIn;
@@ -40,7 +40,6 @@ import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
-
import org.junit.BeforeClass;
import org.junit.Test;
@@ -92,7 +91,7 @@ public class SerializationsTest extends AbstractSerializationsTester
RangeSliceCommand regRangeCmdSup = new RangeSliceCommand(statics.KS, "Super1", statics.readTs, nonEmptyRangeSCPred, bounds, 100);
MessageOut<RangeSliceCommand> regRangeCmdSupMsg = regRangeCmdSup.createMessage();
- DataOutputStreamAndChannel out = getOutput("db.RangeSliceCommand.bin");
+ DataOutputStreamPlus out = getOutput("db.RangeSliceCommand.bin");
namesCmdMsg.serialize(out, getVersion());
emptyRangeCmdMsg.serialize(out, getVersion());
regRangeCmdMsg.serialize(out, getVersion());
@@ -127,7 +126,7 @@ public class SerializationsTest extends AbstractSerializationsTester
SliceByNamesReadCommand standardCmd = new SliceByNamesReadCommand(statics.KS, statics.Key, statics.StandardCF, statics.readTs, namesPred);
SliceByNamesReadCommand superCmd = new SliceByNamesReadCommand(statics.KS, statics.Key, statics.SuperCF, statics.readTs, namesSCPred);
- DataOutputStreamAndChannel out = getOutput("db.SliceByNamesReadCommand.bin");
+ DataOutputStreamPlus out = getOutput("db.SliceByNamesReadCommand.bin");
SliceByNamesReadCommand.serializer.serialize(standardCmd, out, getVersion());
SliceByNamesReadCommand.serializer.serialize(superCmd, out, getVersion());
ReadCommand.serializer.serialize(standardCmd, out, getVersion());
@@ -161,8 +160,8 @@ public class SerializationsTest extends AbstractSerializationsTester
{
SliceFromReadCommand standardCmd = new SliceFromReadCommand(statics.KS, statics.Key, statics.StandardCF, statics.readTs, nonEmptyRangePred);
SliceFromReadCommand superCmd = new SliceFromReadCommand(statics.KS, statics.Key, statics.SuperCF, statics.readTs, nonEmptyRangeSCPred);
-
- DataOutputStreamAndChannel out = getOutput("db.SliceFromReadCommand.bin");
+
+ DataOutputStreamPlus out = getOutput("db.SliceFromReadCommand.bin");
SliceFromReadCommand.serializer.serialize(standardCmd, out, getVersion());
SliceFromReadCommand.serializer.serialize(superCmd, out, getVersion());
ReadCommand.serializer.serialize(standardCmd, out, getVersion());
@@ -195,7 +194,7 @@ public class SerializationsTest extends AbstractSerializationsTester
private void testRowWrite() throws IOException
{
- DataOutputStreamAndChannel out = getOutput("db.Row.bin");
+ DataOutputStreamPlus out = getOutput("db.Row.bin");
Row.serializer.serialize(statics.StandardRow, out, getVersion());
Row.serializer.serialize(statics.SuperRow, out, getVersion());
Row.serializer.serialize(statics.NullRow, out, getVersion());
@@ -232,7 +231,7 @@ public class SerializationsTest extends AbstractSerializationsTester
mods.put(statics.SuperCf.metadata().cfId, statics.SuperCf);
Mutation mixedRm = new Mutation(statics.KS, statics.Key, mods);
- DataOutputStreamAndChannel out = getOutput("db.RowMutation.bin");
+ DataOutputStreamPlus out = getOutput("db.RowMutation.bin");
Mutation.serializer.serialize(standardRowRm, out, getVersion());
Mutation.serializer.serialize(superRowRm, out, getVersion());
Mutation.serializer.serialize(standardRm, out, getVersion());
@@ -281,7 +280,7 @@ public class SerializationsTest extends AbstractSerializationsTester
Truncation tr = new Truncation(statics.KS, "Doesn't Really Matter");
TruncateResponse aff = new TruncateResponse(statics.KS, "Doesn't Matter Either", true);
TruncateResponse neg = new TruncateResponse(statics.KS, "Still Doesn't Matter", false);
- DataOutputStreamAndChannel out = getOutput("db.Truncation.bin");
+ DataOutputStreamPlus out = getOutput("db.Truncation.bin");
Truncation.serializer.serialize(tr, out, getVersion());
TruncateResponse.serializer.serialize(aff, out, getVersion());
TruncateResponse.serializer.serialize(neg, out, getVersion());
@@ -323,7 +322,7 @@ public class SerializationsTest extends AbstractSerializationsTester
{
WriteResponse aff = new WriteResponse();
WriteResponse neg = new WriteResponse();
- DataOutputStreamAndChannel out = getOutput("db.WriteResponse.bin");
+ DataOutputStreamPlus out = getOutput("db.WriteResponse.bin");
WriteResponse.serializer.serialize(aff, out, getVersion());
WriteResponse.serializer.serialize(neg, out, getVersion());
out.close();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/test/unit/org/apache/cassandra/gms/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/gms/SerializationsTest.java b/test/unit/org/apache/cassandra/gms/SerializationsTest.java
index a773ccf..080ae53 100644
--- a/test/unit/org/apache/cassandra/gms/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/gms/SerializationsTest.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.gms;
import org.apache.cassandra.AbstractSerializationsTester;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import org.junit.Test;
@@ -38,7 +38,7 @@ public class SerializationsTest extends AbstractSerializationsTester
{
private void testEndpointStateWrite() throws IOException
{
- DataOutputStreamAndChannel out = getOutput("gms.EndpointState.bin");
+ DataOutputStreamPlus out = getOutput("gms.EndpointState.bin");
HeartBeatState.serializer.serialize(Statics.HeartbeatSt, out, getVersion());
EndpointState.serializer.serialize(Statics.EndpointSt, out, getVersion());
VersionedValue.serializer.serialize(Statics.vv0, out, getVersion());
@@ -75,7 +75,7 @@ public class SerializationsTest extends AbstractSerializationsTester
GossipDigestAck2 ack2 = new GossipDigestAck2(states);
GossipDigestSyn syn = new GossipDigestSyn("Not a real cluster name", StorageService.getPartitioner().getClass().getCanonicalName(), Statics.Digests);
- DataOutputStreamAndChannel out = getOutput("gms.Gossip.bin");
+ DataOutputStreamPlus out = getOutput("gms.Gossip.bin");
for (GossipDigest gd : Statics.Digests)
GossipDigest.serializer.serialize(gd, out, getVersion());
GossipDigestAck.serializer.serialize(ack, out, getVersion());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
index a7010ae..6471558 100644
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
@@ -134,6 +134,10 @@ public class IndexSummaryTest
IndexSummary summary = builder.build(DatabaseDescriptor.getPartitioner());
return Pair.create(list, summary);
}
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
}
@Test
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
index 9cc2d23..eda4f17 100644
--- a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
@@ -25,15 +25,16 @@ import java.util.Map;
import java.util.Set;
import com.google.common.collect.Sets;
-import org.junit.Test;
+import org.junit.Test;
import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.composites.SimpleDenseCellNameType;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.dht.RandomPartitioner;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.utils.EstimatedHistogram;
@@ -70,7 +71,7 @@ public class MetadataSerializerTest
MetadataSerializer serializer = new MetadataSerializer();
// Serialize to tmp file
File statsFile = File.createTempFile(Component.STATS.name, null);
- try (DataOutputStreamAndChannel out = new DataOutputStreamAndChannel(new FileOutputStream(statsFile)))
+ try (DataOutputStreamPlus out = new BufferedDataOutputStreamPlus(new FileOutputStream(statsFile)))
{
serializer.serialize(originalMetadata, out);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java b/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
new file mode 100644
index 0000000..8ac6d92
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
@@ -0,0 +1,391 @@
+package org.apache.cassandra.io.util;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.Random;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class BufferedDataOutputStreamTest
+{
+ WritableByteChannel adapter = new WritableByteChannel()
+ {
+
+ @Override
+ public boolean isOpen() {return true;}
+
+ @Override
+ public void close() throws IOException {}
+
+ @Override
+ public int write(ByteBuffer src) throws IOException
+ {
+ int retval = src.remaining();
+ while (src.hasRemaining())
+ generated.write(src.get());
+ return retval;
+ }
+
+ };
+
+ BufferedDataOutputStreamPlus fakeStream = new BufferedDataOutputStreamPlus(adapter, 8);
+
+ @SuppressWarnings("resource")
+ @Test(expected = NullPointerException.class)
+ public void testNullChannel()
+ {
+ new BufferedDataOutputStreamPlus((WritableByteChannel)null, 8);
+ }
+
+ @SuppressWarnings("resource")
+ @Test(expected = IllegalArgumentException.class)
+ public void testTooSmallBuffer()
+ {
+ new BufferedDataOutputStreamPlus(adapter, 7);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testNullBuffer() throws Exception
+ {
+ byte type[] = null;
+ fakeStream.write(type, 0, 1);
+ }
+
+ @Test(expected = IndexOutOfBoundsException.class)
+ public void testNegativeOffset() throws Exception
+ {
+ byte type[] = new byte[10];
+ fakeStream.write(type, -1, 1);
+ }
+
+ @Test(expected = IndexOutOfBoundsException.class)
+ public void testNegativeLength() throws Exception
+ {
+ byte type[] = new byte[10];
+ fakeStream.write(type, 0, -1);
+ }
+
+ @Test(expected = IndexOutOfBoundsException.class)
+ public void testTooBigLength() throws Exception
+ {
+ byte type[] = new byte[10];
+ fakeStream.write(type, 0, 11);
+ }
+
+ @Test(expected = IndexOutOfBoundsException.class)
+ public void testTooBigLengthWithOffset() throws Exception
+ {
+ byte type[] = new byte[10];
+ fakeStream.write(type, 8, 3);
+ }
+
+ @Test(expected = IndexOutOfBoundsException.class)
+ public void testTooBigOffset() throws Exception
+ {
+ byte type[] = new byte[10];
+ fakeStream.write(type, 11, 1);
+ }
+
+ static final Random r;
+
+ static Field baos_bytes;
+ static {
+ long seed = System.nanoTime();
+ //seed = 210187780999648L;
+ System.out.println("Seed " + seed);
+ r = new Random(seed);
+ try
+ {
+ baos_bytes = ByteArrayOutputStream.class.getDeclaredField("buf");
+ baos_bytes.setAccessible(true);
+ }
+ catch (Throwable t)
+ {
+ throw new RuntimeException(t);
+ }
+ }
+
+ private ByteArrayOutputStream generated;
+ private BufferedDataOutputStreamPlus ndosp;
+
+ private ByteArrayOutputStream canonical;
+ private DataOutputStreamPlus dosp;
+
+ void setUp()
+ {
+
+ generated = new ByteArrayOutputStream();
+ canonical = new ByteArrayOutputStream();
+ dosp = new WrappedDataOutputStreamPlus(canonical);
+ ndosp = new BufferedDataOutputStreamPlus(adapter, 4096);
+ }
+
+ @Test
+ public void testFuzz() throws Exception
+ {
+ for (int ii = 0; ii < 30; ii++)
+ fuzzOnce();
+ }
+
+ String simple = "foobar42";
+ String twoByte = "ƀ";
+ String threeByte = "㒨";
+ String fourByte = "𠝹";
+
+ @SuppressWarnings("unused")
+ private void fuzzOnce() throws Exception
+ {
+ setUp();
+ int iteration = 0;
+ int bytesChecked = 0;
+ int action = 0;
+ while (generated.size() < 1024 * 1024 * 8)
+ {
+ action = r.nextInt(18);
+
+ //System.out.println("Action " + action + " iteration " + iteration);
+ iteration++;
+
+ switch (action)
+ {
+ case 0:
+ {
+ generated.flush();
+ dosp.flush();
+ break;
+ }
+ case 1:
+ {
+ int val = r.nextInt();
+ dosp.write(val);
+ ndosp.write(val);
+ break;
+ }
+ case 2:
+ {
+ byte randomBytes[] = new byte[r.nextInt(4096 * 2 + 1)];
+ r.nextBytes(randomBytes);
+ dosp.write(randomBytes);
+ ndosp.write(randomBytes);
+ break;
+ }
+ case 3:
+ {
+ byte randomBytes[] = new byte[r.nextInt(4096 * 2 + 1)];
+ r.nextBytes(randomBytes);
+ int offset = randomBytes.length == 0 ? 0 : r.nextInt(randomBytes.length);
+ int length = randomBytes.length == 0 ? 0 : r.nextInt(randomBytes.length - offset);
+ dosp.write(randomBytes, offset, length);
+ ndosp.write(randomBytes, offset, length);
+ break;
+ }
+ case 4:
+ {
+ boolean val = r.nextInt(2) == 0;
+ dosp.writeBoolean(val);
+ ndosp.writeBoolean(val);
+ break;
+ }
+ case 5:
+ {
+ int val = r.nextInt();
+ dosp.writeByte(val);
+ ndosp.writeByte(val);
+ break;
+ }
+ case 6:
+ {
+ int val = r.nextInt();
+ dosp.writeShort(val);
+ ndosp.writeShort(val);
+ break;
+ }
+ case 7:
+ {
+ int val = r.nextInt();
+ dosp.writeChar(val);
+ ndosp.writeChar(val);
+ break;
+ }
+ case 8:
+ {
+ int val = r.nextInt();
+ dosp.writeInt(val);
+ ndosp.writeInt(val);
+ break;
+ }
+ case 9:
+ {
+ int val = r.nextInt();
+ dosp.writeLong(val);
+ ndosp.writeLong(val);
+ break;
+ }
+ case 10:
+ {
+ float val = r.nextFloat();
+ dosp.writeFloat(val);
+ ndosp.writeFloat(val);
+ break;
+ }
+ case 11:
+ {
+ double val = r.nextDouble();
+ dosp.writeDouble(val);
+ ndosp.writeDouble(val);
+ break;
+ }
+ case 12:
+ {
+ dosp.writeBytes(simple);
+ ndosp.writeBytes(simple);
+ break;
+ }
+ case 13:
+ {
+ dosp.writeChars(twoByte);
+ ndosp.writeChars(twoByte);
+ break;
+ }
+ case 14:
+ {
+ StringBuilder sb = new StringBuilder();
+ int length = r.nextInt(500);
+ sb.append(simple + twoByte + threeByte + fourByte);
+ for (int ii = 0; ii < length; ii++)
+ {
+ sb.append((char)(r.nextInt() & 0xffff));
+ }
+ String str = sb.toString();
+ writeUTFLegacy(str, dosp);
+ ndosp.writeUTF(str);
+ break;
+ }
+ case 15:
+ {
+ ByteBuffer buf = ByteBuffer.allocate(r.nextInt(1024 * 8 + 1));
+ r.nextBytes(buf.array());
+ buf.position(buf.capacity() == 0 ? 0 : r.nextInt(buf.capacity()));
+ buf.limit(buf.position() + (buf.capacity() - buf.position() == 0 ? 0 : r.nextInt(buf.capacity() - buf.position())));
+ ByteBuffer dup = buf.duplicate();
+ ndosp.write(buf.duplicate());
+ assertEquals(dup.position(), buf.position());
+ assertEquals(dup.limit(), buf.limit());
+ dosp.write(buf.duplicate());
+ break;
+ }
+ case 16:
+ {
+ ByteBuffer buf = ByteBuffer.allocateDirect(r.nextInt(1024 * 8 + 1));
+ while (buf.hasRemaining())
+ buf.put((byte)r.nextInt());
+ buf.position(buf.capacity() == 0 ? 0 : r.nextInt(buf.capacity()));
+ buf.limit(buf.position() + (buf.capacity() - buf.position() == 0 ? 0 : r.nextInt(buf.capacity() - buf.position())));
+ ByteBuffer dup = buf.duplicate();
+ ndosp.write(buf.duplicate());
+ assertEquals(dup.position(), buf.position());
+ assertEquals(dup.limit(), buf.limit());
+ dosp.write(buf.duplicate());
+ break;
+ }
+ case 17:
+ {
+ try (Memory buf = Memory.allocate(r.nextInt(1024 * 8 - 1) + 1);)
+ {
+ for (int ii = 0; ii < buf.size(); ii++)
+ buf.setByte(ii, (byte)r.nextInt());
+ long offset = buf.size() == 0 ? 0 : r.nextInt((int)buf.size());
+ long length = (buf.size() - offset == 0 ? 0 : r.nextInt((int)(buf.size() - offset)));
+ ndosp.write(buf, offset, length);
+ dosp.write(buf, offset, length);
+ }
+ break;
+ }
+ default:
+ fail("Shouldn't reach here");
+ }
+ //bytesChecked = assertSameOutput(bytesChecked, action, iteration);
+ }
+
+ assertSameOutput(0, -1, iteration);
+ }
+
+ static void writeUTFLegacy(String str, DataOutput out) throws IOException
+ {
+ int utfCount = 0, length = str.length();
+ for (int i = 0; i < length; i++)
+ {
+ int charValue = str.charAt(i);
+ if (charValue > 0 && charValue <= 127)
+ {
+ utfCount++;
+ }
+ else if (charValue <= 2047)
+ {
+ utfCount += 2;
+ }
+ else
+ {
+ utfCount += 3;
+ }
+ }
+ if (utfCount > 65535)
+ {
+ throw new UTFDataFormatException(); //$NON-NLS-1$
+ }
+ byte utfBytes[] = new byte[utfCount + 2];
+ int utfIndex = 2;
+ for (int i = 0; i < length; i++)
+ {
+ int charValue = str.charAt(i);
+ if (charValue > 0 && charValue <= 127)
+ {
+ utfBytes[utfIndex++] = (byte) charValue;
+ }
+ else if (charValue <= 2047)
+ {
+ utfBytes[utfIndex++] = (byte) (0xc0 | (0x1f & (charValue >> 6)));
+ utfBytes[utfIndex++] = (byte) (0x80 | (0x3f & charValue));
+ }
+ else
+ {
+ utfBytes[utfIndex++] = (byte) (0xe0 | (0x0f & (charValue >> 12)));
+ utfBytes[utfIndex++] = (byte) (0x80 | (0x3f & (charValue >> 6)));
+ utfBytes[utfIndex++] = (byte) (0x80 | (0x3f & charValue));
+ }
+ }
+ utfBytes[0] = (byte) (utfCount >> 8);
+ utfBytes[1] = (byte) utfCount;
+ out.write(utfBytes);
+ }
+
+ private int assertSameOutput(int bytesChecked, int lastAction, int iteration) throws Exception
+ {
+ ndosp.flush();
+ dosp.flush();
+
+ byte generatedBytes[] = (byte[])baos_bytes.get(generated);
+ byte canonicalBytes[] = (byte[])baos_bytes.get(canonical);
+
+ int count = generated.size();
+ if (count != canonical.size())
+ System.out.println("Failed at " + bytesChecked + " last action " + lastAction + " iteration " + iteration);
+ assertEquals(count, canonical.size());
+ for (;bytesChecked < count; bytesChecked++)
+ {
+ byte generatedByte = generatedBytes[bytesChecked];
+ byte canonicalByte = canonicalBytes[bytesChecked];
+ if (generatedByte != canonicalByte)
+ System.out.println("Failed at " + bytesChecked + " last action " + lastAction + " iteration " + iteration);
+ assertEquals(generatedByte, canonicalByte);
+ }
+ return count;
+ }
+}