You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by vi...@apache.org on 2012/04/28 04:00:17 UTC
[1/2] git commit: Add varint encoding to Serializing Cache patch by
Vijay; reviewed by jbellis,xedin for CASSANDRA-4138
Updated Branches:
refs/heads/trunk c6f95117d -> cb25a8fcd
Add varint encoding to Serializing Cache
patch by Vijay; reviewed by jbellis,xedin for CASSANDRA-4138
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cb25a8fc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cb25a8fc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cb25a8fc
Branch: refs/heads/trunk
Commit: cb25a8fcdd9626c6b29a3b1a94dfda5fd0375928
Parents: c6f9511
Author: Vijay Parthasarathy <vi...@gmail.com>
Authored: Fri Apr 27 18:56:41 2012 -0700
Committer: Vijay Parthasarathy <vi...@gmail.com>
Committed: Fri Apr 27 18:56:41 2012 -0700
----------------------------------------------------------------------
.../org/apache/cassandra/cache/KeyCacheKey.java | 4 +-
.../org/apache/cassandra/cache/RowCacheKey.java | 4 +-
.../apache/cassandra/cache/SerializingCache.java | 12 +-
.../cassandra/cache/SerializingCacheProvider.java | 14 +-
src/java/org/apache/cassandra/db/Column.java | 10 +-
src/java/org/apache/cassandra/db/ColumnFamily.java | 23 +-
.../cassandra/db/ColumnFamilySerializer.java | 24 +-
src/java/org/apache/cassandra/db/ColumnIndex.java | 13 +-
.../org/apache/cassandra/db/ColumnSerializer.java | 4 +-
.../org/apache/cassandra/db/CounterColumn.java | 4 +-
.../org/apache/cassandra/db/CounterMutation.java | 3 +-
src/java/org/apache/cassandra/db/DBConstants.java | 27 --
src/java/org/apache/cassandra/db/DBTypeSizes.java | 106 +++++
.../org/apache/cassandra/db/ExpiringColumn.java | 4 +-
src/java/org/apache/cassandra/db/IColumn.java | 4 +-
src/java/org/apache/cassandra/db/Memtable.java | 2 +-
.../org/apache/cassandra/db/RangeSliceReply.java | 5 +-
src/java/org/apache/cassandra/db/ReadResponse.java | 10 +-
src/java/org/apache/cassandra/db/Row.java | 3 +-
.../org/apache/cassandra/db/RowIndexEntry.java | 7 +-
src/java/org/apache/cassandra/db/RowMutation.java | 13 +-
src/java/org/apache/cassandra/db/RowPosition.java | 16 +-
.../cassandra/db/SliceByNamesReadCommand.java | 19 +-
.../apache/cassandra/db/SliceFromReadCommand.java | 22 +-
src/java/org/apache/cassandra/db/SuperColumn.java | 24 +-
.../org/apache/cassandra/db/WriteResponse.java | 9 +-
.../cassandra/db/commitlog/ReplayPosition.java | 3 +-
.../db/compaction/LazilyCompactedRow.java | 2 +-
.../cassandra/db/context/CounterContext.java | 10 +-
.../org/apache/cassandra/db/filter/QueryPath.java | 42 ++-
src/java/org/apache/cassandra/dht/Token.java | 3 +-
src/java/org/apache/cassandra/io/ISerializer.java | 4 +-
.../apache/cassandra/io/sstable/IndexHelper.java | 9 +-
.../io/sstable/SSTableSimpleUnsortedWriter.java | 4 +-
.../apache/cassandra/io/sstable/SSTableWriter.java | 4 +-
.../cassandra/io/util/AbstractDataInput.java | 8 +-
.../cassandra/io/util/AbstractDataOutput.java | 301 +++++++++++++++
.../cassandra/utils/BloomFilterSerializer.java | 22 +-
.../org/apache/cassandra/utils/ByteBufferUtil.java | 6 +-
.../apache/cassandra/utils/EstimatedHistogram.java | 3 +-
.../org/apache/cassandra/utils/FilterFactory.java | 8 +-
.../apache/cassandra/utils/StreamingHistogram.java | 4 +-
.../utils/vint/EncodedDataInputStream.java | 115 ++++++
.../utils/vint/EncodedDataOutputStream.java | 98 +++++
test/unit/org/apache/cassandra/Util.java | 5 +
.../apache/cassandra/utils/EncodedStreamsTest.java | 160 ++++++++
46 files changed, 1031 insertions(+), 166 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/cache/KeyCacheKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/KeyCacheKey.java b/src/java/org/apache/cassandra/cache/KeyCacheKey.java
index df23928..830fcdb 100644
--- a/src/java/org/apache/cassandra/cache/KeyCacheKey.java
+++ b/src/java/org/apache/cassandra/cache/KeyCacheKey.java
@@ -23,7 +23,7 @@ import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
import java.util.Arrays;
-import org.apache.cassandra.db.DBConstants;
+import org.apache.cassandra.db.DBTypeSizes;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Pair;
@@ -52,7 +52,7 @@ public class KeyCacheKey implements CacheKey
public int serializedSize()
{
- return key.length + DBConstants.INT_SIZE;
+ return DBTypeSizes.NATIVE.sizeof(key.length) + key.length;
}
public String toString()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/cache/RowCacheKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/RowCacheKey.java b/src/java/org/apache/cassandra/cache/RowCacheKey.java
index 2c15faf..07daeb2 100644
--- a/src/java/org/apache/cassandra/cache/RowCacheKey.java
+++ b/src/java/org/apache/cassandra/cache/RowCacheKey.java
@@ -23,7 +23,7 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.DBConstants;
+import org.apache.cassandra.db.DBTypeSizes;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
@@ -58,7 +58,7 @@ public class RowCacheKey implements CacheKey, Comparable<RowCacheKey>
public int serializedSize()
{
- return key.length + DBConstants.INT_SIZE;
+ return key.length + DBTypeSizes.NATIVE.sizeof(key.length);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/cache/SerializingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/SerializingCache.java b/src/java/org/apache/cassandra/cache/SerializingCache.java
index 1dac92e..3b641e8 100644
--- a/src/java/org/apache/cassandra/cache/SerializingCache.java
+++ b/src/java/org/apache/cassandra/cache/SerializingCache.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.cache;
-import java.io.DataOutputStream;
import java.io.IOError;
import java.io.IOException;
import java.util.Set;
@@ -27,9 +26,12 @@ import com.googlecode.concurrentlinkedhashmap.EvictionListener;
import com.googlecode.concurrentlinkedhashmap.Weigher;
import com.googlecode.concurrentlinkedhashmap.Weighers;
+import org.apache.cassandra.db.DBTypeSizes;
import org.apache.cassandra.io.ISerializer;
import org.apache.cassandra.io.util.MemoryInputStream;
import org.apache.cassandra.io.util.MemoryOutputStream;
+import org.apache.cassandra.utils.vint.EncodedDataInputStream;
+import org.apache.cassandra.utils.vint.EncodedDataOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,6 +42,8 @@ import org.slf4j.LoggerFactory;
public class SerializingCache<K, V> implements ICache<K, V>
{
private static final Logger logger = LoggerFactory.getLogger(SerializingCache.class);
+ private static final DBTypeSizes ENCODED_TYPE_SIZES = DBTypeSizes.VINT;
+
private static final int DEFAULT_CONCURENCY_LEVEL = 64;
private final ConcurrentLinkedHashMap<K, FreeableMemory> map;
@@ -82,7 +86,7 @@ public class SerializingCache<K, V> implements ICache<K, V>
{
try
{
- return serializer.deserialize(new MemoryInputStream(mem));
+ return serializer.deserialize(new EncodedDataInputStream(new MemoryInputStream(mem)));
}
catch (IOException e)
{
@@ -93,7 +97,7 @@ public class SerializingCache<K, V> implements ICache<K, V>
private FreeableMemory serialize(V value)
{
- long serializedSize = serializer.serializedSize(value);
+ long serializedSize = serializer.serializedSize(value, ENCODED_TYPE_SIZES);
if (serializedSize > Integer.MAX_VALUE)
throw new IllegalArgumentException("Unable to allocate " + serializedSize + " bytes");
@@ -109,7 +113,7 @@ public class SerializingCache<K, V> implements ICache<K, V>
try
{
- serializer.serialize(value, new DataOutputStream(new MemoryOutputStream(freeableMemory)));
+ serializer.serialize(value, new EncodedDataOutputStream(new MemoryOutputStream(freeableMemory)));
}
catch (IOException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java b/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
index d3e8744..4ddc819 100644
--- a/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
+++ b/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
@@ -23,7 +23,7 @@ import java.io.IOError;
import java.io.IOException;
import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.DBConstants;
+import org.apache.cassandra.db.DBTypeSizes;
import org.apache.cassandra.io.ISerializer;
public class SerializingCacheProvider implements IRowCacheProvider
@@ -60,12 +60,14 @@ public class SerializingCacheProvider implements IRowCacheProvider
return ColumnFamily.serializer.deserialize(in);
}
- public long serializedSize(IRowCacheEntry cf)
+ public long serializedSize(IRowCacheEntry cf, DBTypeSizes typeSizes)
{
- return DBConstants.BOOL_SIZE
- + (cf instanceof RowCacheSentinel
- ? DBConstants.INT_SIZE + DBConstants.LONG_SIZE
- : ColumnFamily.serializer().serializedSize((ColumnFamily) cf));
+ int size = typeSizes.sizeof(true);
+ if (cf instanceof RowCacheSentinel)
+ size += typeSizes.sizeof(((RowCacheSentinel) cf).sentinelId);
+ else
+ size += ColumnFamily.serializer().serializedSize((ColumnFamily) cf, typeSizes);
+ return size;
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/Column.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Column.java b/src/java/org/apache/cassandra/db/Column.java
index 5f57af5..a1d89d8 100644
--- a/src/java/org/apache/cassandra/db/Column.java
+++ b/src/java/org/apache/cassandra/db/Column.java
@@ -124,7 +124,7 @@ public class Column implements IColumn
return timestamp;
}
- public int size()
+ public int size(DBTypeSizes typeSizes)
{
/*
* Size of a column is =
@@ -134,16 +134,18 @@ public class Column implements IColumn
* + 4 bytes which basically indicates the size of the byte array
* + entire byte array.
*/
- return DBConstants.SHORT_SIZE + name.remaining() + 1 + DBConstants.TIMESTAMP_SIZE + DBConstants.INT_SIZE + value.remaining();
+ int nameSize = name.remaining();
+ int valueSize = value.remaining();
+ return typeSizes.sizeof((short) nameSize) + nameSize + 1 + typeSizes.sizeof(timestamp) + typeSizes.sizeof(valueSize) + valueSize;
}
/*
* This returns the size of the column when serialized.
* @see com.facebook.infrastructure.db.IColumn#serializedSize()
*/
- public int serializedSize()
+ public int serializedSize(DBTypeSizes typeSizes)
{
- return size();
+ return size(typeSizes);
}
public int serializationFlags()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/ColumnFamily.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java
index b00f168..1c5fd53 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamily.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamily.java
@@ -17,8 +17,6 @@
*/
package org.apache.cassandra.db;
-import static org.apache.cassandra.db.DBConstants.*;
-
import java.nio.ByteBuffer;
import java.security.MessageDigest;
@@ -256,12 +254,12 @@ public class ColumnFamily extends AbstractColumnContainer implements IRowCacheEn
return null;
}
- int size()
+ int size(DBTypeSizes typeSizes)
{
int size = 0;
for (IColumn column : columns)
{
- size += column.size();
+ size += column.size(typeSizes);
}
return size;
}
@@ -353,23 +351,6 @@ public class ColumnFamily extends AbstractColumnContainer implements IRowCacheEn
addAll(cf, allocator);
}
- public long serializedSize()
- {
- return BOOL_SIZE // nullness bool
- + INT_SIZE // id
- + serializedSizeForSSTable();
- }
-
- public long serializedSizeForSSTable()
- {
- int size = INT_SIZE // local deletion time
- + LONG_SIZE // client deletion time
- + INT_SIZE; // column count
- for (IColumn column : columns)
- size += column.serializedSize();
- return size;
- }
-
/**
* Goes over all columns and check the fields are valid (as far as we can
* tell).
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java b/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
index 53b2183..927a8f2 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
@@ -28,7 +28,6 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.io.ISerializer;
-import org.apache.cassandra.io.sstable.ColumnStats;
public class ColumnFamilySerializer implements ISerializer<ColumnFamily>
{
@@ -140,8 +139,27 @@ public class ColumnFamilySerializer implements ISerializer<ColumnFamily>
return cf;
}
- public long serializedSize(ColumnFamily cf)
+ public long serializedSize(ColumnFamily cf, DBTypeSizes type)
{
- return cf == null ? DBConstants.BOOL_SIZE : cf.serializedSize();
+ if (cf == null)
+ {
+ return type.sizeof(false);
+ }
+ else
+ {
+ return type.sizeof(true) + /* nullness bool */
+ type.sizeof(cf.id()) + /* id */
+ serializedSizeForSSTable(cf, type);
+ }
+ }
+
+ public long serializedSizeForSSTable(ColumnFamily cf, DBTypeSizes typeSizes)
+ {
+ int size = typeSizes.sizeof(cf.getLocalDeletionTime()) // local deletion time
+ + typeSizes.sizeof(cf.getMarkedForDeleteAt()) // client deletion time
+ + typeSizes.sizeof(cf.getColumnCount()); // column count
+ for (IColumn column : cf.columns)
+ size += column.serializedSize(typeSizes);
+ return size;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
index b4c2f90..98a9a4a 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -73,10 +73,13 @@ public class ColumnIndex
*/
private static long rowHeaderSize(ByteBuffer key)
{
- return DBConstants.SHORT_SIZE + key.remaining() // Row key
- + DBConstants.LONG_SIZE // Row data size
- + DBConstants.INT_SIZE + DBConstants.LONG_SIZE // Deletion info
- + DBConstants.INT_SIZE; // Column count
+ DBTypeSizes typeSizes = DBTypeSizes.NATIVE;
+ // TODO fix constantSize when changing the nativeconststs.
+ int keysize = key.remaining();
+ return typeSizes.sizeof((short) keysize) + keysize + // Row key
+ + typeSizes.sizeof(0L) // Row data size
+ + typeSizes.sizeof(0) + typeSizes.sizeof(0L) // Deletion info
+ + typeSizes.sizeof(0); // Column count
}
/**
@@ -110,7 +113,7 @@ public class ColumnIndex
startPosition = endPosition;
}
- endPosition += column.serializedSize();
+ endPosition += column.serializedSize(DBTypeSizes.NATIVE);
// if we hit the column index size that we have to index after, go ahead and index it.
if (endPosition - startPosition >= DatabaseDescriptor.getColumnIndexSize())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/ColumnSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnSerializer.java b/src/java/org/apache/cassandra/db/ColumnSerializer.java
index 53680e4..1be3eb2 100644
--- a/src/java/org/apache/cassandra/db/ColumnSerializer.java
+++ b/src/java/org/apache/cassandra/db/ColumnSerializer.java
@@ -121,9 +121,9 @@ public class ColumnSerializer implements IColumnSerializer
}
}
- public long serializedSize(IColumn object)
+ public long serializedSize(IColumn object, DBTypeSizes type)
{
- return object.serializedSize();
+ return object.serializedSize(type);
}
private static class CorruptColumnException extends IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/CounterColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterColumn.java b/src/java/org/apache/cassandra/db/CounterColumn.java
index 369e183..6390f25 100644
--- a/src/java/org/apache/cassandra/db/CounterColumn.java
+++ b/src/java/org/apache/cassandra/db/CounterColumn.java
@@ -93,13 +93,13 @@ public class CounterColumn extends Column
}
@Override
- public int size()
+ public int size(DBTypeSizes typeSizes)
{
/*
* A counter column adds to a Column :
* + 8 bytes for timestampOfLastDelete
*/
- return super.size() + DBConstants.TIMESTAMP_SIZE;
+ return super.size(typeSizes) + typeSizes.sizeof(timestampOfLastDelete);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/CounterMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java b/src/java/org/apache/cassandra/db/CounterMutation.java
index 7ee9d80..ccc0e0a 100644
--- a/src/java/org/apache/cassandra/db/CounterMutation.java
+++ b/src/java/org/apache/cassandra/db/CounterMutation.java
@@ -186,7 +186,8 @@ class CounterMutationSerializer implements IVersionedSerializer<CounterMutation>
public long serializedSize(CounterMutation cm, int version)
{
+ int tableSize = FBUtilities.encodedUTF8Length(cm.consistency().name());
return RowMutation.serializer().serializedSize(cm.rowMutation(), version)
- + DBConstants.SHORT_SIZE + FBUtilities.encodedUTF8Length(cm.consistency().name());
+ + DBTypeSizes.NATIVE.sizeof((short) tableSize) + tableSize;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/DBConstants.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DBConstants.java b/src/java/org/apache/cassandra/db/DBConstants.java
deleted file mode 100644
index cc8245c..0000000
--- a/src/java/org/apache/cassandra/db/DBConstants.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-public class DBConstants
-{
- public static final int BOOL_SIZE = 1;
- public static final int INT_SIZE = 4;
- public static final int LONG_SIZE = 8;
- public static final int SHORT_SIZE = 2;
- public static final int TIMESTAMP_SIZE = 8;
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/DBTypeSizes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DBTypeSizes.java b/src/java/org/apache/cassandra/db/DBTypeSizes.java
new file mode 100644
index 0000000..3bf982e
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/DBTypeSizes.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+public abstract class DBTypeSizes
+{
+ public static final DBTypeSizes NATIVE = new NativeDBTypeSizes();
+ public static final DBTypeSizes VINT = new VIntEncodedDBTypeSizes();
+
+ private static final int BOOL_SIZE = 1;
+ private static final int SHORT_SIZE = 2;
+ private static final int INT_SIZE = 4;
+ private static final int LONG_SIZE = 8;
+
+ public abstract int sizeof(boolean value);
+ public abstract int sizeof(short value);
+ public abstract int sizeof(int value);
+ public abstract int sizeof(long value);
+
+ public static class NativeDBTypeSizes extends DBTypeSizes
+ {
+ public int sizeof(boolean value)
+ {
+ return BOOL_SIZE;
+ }
+
+ public int sizeof(short value)
+ {
+ return SHORT_SIZE;
+ }
+
+ public int sizeof(int value)
+ {
+ return INT_SIZE;
+ }
+
+ public int sizeof(long value)
+ {
+ return LONG_SIZE;
+ }
+ }
+
+ public static class VIntEncodedDBTypeSizes extends DBTypeSizes
+ {
+ private static final int BOOL_SIZE = 1;
+
+ public int sizeofVInt(long i)
+ {
+ if (i >= -112 && i <= 127)
+ return 1;
+
+ int size = 0;
+ int len = -112;
+ if (i < 0)
+ {
+ i ^= -1L; // take one's complement'
+ len = -120;
+ }
+ long tmp = i;
+ while (tmp != 0)
+ {
+ tmp = tmp >> 8;
+ len--;
+ }
+ size++;
+ len = (len < -120) ? -(len + 120) : -(len + 112);
+ size += len;
+ return size;
+ }
+
+ public int sizeof(long i)
+ {
+ return sizeofVInt(i);
+ }
+
+ public int sizeof(boolean i)
+ {
+ return BOOL_SIZE;
+ }
+
+ public int sizeof(short i)
+ {
+ return sizeofVInt(i);
+ }
+
+ public int sizeof(int i)
+ {
+ return sizeofVInt(i);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/ExpiringColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ExpiringColumn.java b/src/java/org/apache/cassandra/db/ExpiringColumn.java
index 03e41d2..0272d5c 100644
--- a/src/java/org/apache/cassandra/db/ExpiringColumn.java
+++ b/src/java/org/apache/cassandra/db/ExpiringColumn.java
@@ -74,14 +74,14 @@ public class ExpiringColumn extends Column
}
@Override
- public int size()
+ public int size(DBTypeSizes typeSizes)
{
/*
* An expired column adds to a Column :
* 4 bytes for the localExpirationTime
* + 4 bytes for the timeToLive
*/
- return super.size() + DBConstants.INT_SIZE + DBConstants.INT_SIZE;
+ return super.size(typeSizes) + typeSizes.sizeof(localExpirationTime) + typeSizes.sizeof(timeToLive);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/IColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/IColumn.java b/src/java/org/apache/cassandra/db/IColumn.java
index eb2e270..18c2335 100644
--- a/src/java/org/apache/cassandra/db/IColumn.java
+++ b/src/java/org/apache/cassandra/db/IColumn.java
@@ -42,8 +42,8 @@ public interface IColumn
public long mostRecentLiveChangeAt();
public long mostRecentNonGCableChangeAt(int gcbefore);
public ByteBuffer name();
- public int size();
- public int serializedSize();
+ public int size(DBTypeSizes typeSizes);
+ public int serializedSize(DBTypeSizes typeSizes);
public int serializationFlags();
public long timestamp();
public ByteBuffer value();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 06d191f..3a3fc31 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -225,7 +225,7 @@ public class Memtable
private void resolve(DecoratedKey key, ColumnFamily cf)
{
- currentThroughput.addAndGet(cf.size());
+ currentThroughput.addAndGet(cf.size(DBTypeSizes.NATIVE));
currentOperations.addAndGet((cf.getColumnCount() == 0)
? cf.isMarkedForDelete() ? 1 : 0
: cf.getColumnCount());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/RangeSliceReply.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeSliceReply.java b/src/java/org/apache/cassandra/db/RangeSliceReply.java
index 8976f0e..c563c9c 100644
--- a/src/java/org/apache/cassandra/db/RangeSliceReply.java
+++ b/src/java/org/apache/cassandra/db/RangeSliceReply.java
@@ -40,12 +40,13 @@ public class RangeSliceReply
public Message getReply(Message originalMessage) throws IOException
{
- int size = DBConstants.INT_SIZE;
+ int rowCount = rows.size();
+ int size = DBTypeSizes.NATIVE.sizeof(rowCount);
for (Row row : rows)
size += Row.serializer().serializedSize(row, originalMessage.getVersion());
DataOutputBuffer buffer = new DataOutputBuffer(size);
- buffer.writeInt(rows.size());
+ buffer.writeInt(rowCount);
for (Row row : rows)
Row.serializer().serialize(row, buffer, originalMessage.getVersion());
assert buffer.getLength() == buffer.getData().length;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/ReadResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java
index 9480380..77c9209 100644
--- a/src/java/org/apache/cassandra/db/ReadResponse.java
+++ b/src/java/org/apache/cassandra/db/ReadResponse.java
@@ -113,11 +113,11 @@ class ReadResponseSerializer implements IVersionedSerializer<ReadResponse>
public long serializedSize(ReadResponse response, int version)
{
- int size = DBConstants.INT_SIZE;
- size += DBConstants.BOOL_SIZE;
- if (response.isDigestQuery())
- size += response.digest().remaining();
- else
+ DBTypeSizes typeSizes = DBTypeSizes.NATIVE;
+ ByteBuffer buffer = response.isDigestQuery() ? response.digest() : ByteBufferUtil.EMPTY_BYTE_BUFFER;
+ int size = typeSizes.sizeof(buffer.remaining());
+ size += typeSizes.sizeof(response.isDigestQuery());
+ if (!response.isDigestQuery())
size += Row.serializer().serializedSize(response.row(), version);
return size;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/Row.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Row.java b/src/java/org/apache/cassandra/db/Row.java
index dc003f9..16e50d4 100644
--- a/src/java/org/apache/cassandra/db/Row.java
+++ b/src/java/org/apache/cassandra/db/Row.java
@@ -79,7 +79,8 @@ public class Row
public long serializedSize(Row row, int version)
{
- return DBConstants.SHORT_SIZE + row.key.key.remaining() + ColumnFamily.serializer().serializedSize(row.cf);
+ int keySize = row.key.key.remaining();
+ return DBTypeSizes.NATIVE.sizeof((short) keySize) + keySize + ColumnFamily.serializer().serializedSize(row.cf, DBTypeSizes.NATIVE);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/RowIndexEntry.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowIndexEntry.java b/src/java/org/apache/cassandra/db/RowIndexEntry.java
index aea3c2f..307ce8b 100644
--- a/src/java/org/apache/cassandra/db/RowIndexEntry.java
+++ b/src/java/org/apache/cassandra/db/RowIndexEntry.java
@@ -183,10 +183,11 @@ public class RowIndexEntry
public int serializedSize()
{
- int size = DBConstants.LONG_SIZE + DBConstants.INT_SIZE; // deletion info
- size += DBConstants.INT_SIZE; // number of entries
+ DBTypeSizes typeSizes = DBTypeSizes.NATIVE;
+ int size = typeSizes.sizeof(deletionInfo.localDeletionTime) + typeSizes.sizeof(deletionInfo.markedForDeleteAt); // deletion info
+ size += typeSizes.sizeof(columnsIndex.size()); // number of entries
for (IndexHelper.IndexInfo info : columnsIndex)
- size += info.serializedSize();
+ size += info.serializedSize(typeSizes);
return size + (int) FilterFactory.serializedSize(bloomFilter);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/RowMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowMutation.java b/src/java/org/apache/cassandra/db/RowMutation.java
index fb84e39..f731b98 100644
--- a/src/java/org/apache/cassandra/db/RowMutation.java
+++ b/src/java/org/apache/cassandra/db/RowMutation.java
@@ -446,14 +446,17 @@ public class RowMutation implements IMutation, MessageProducer
public long serializedSize(RowMutation rm, int version)
{
- int size = DBConstants.SHORT_SIZE + FBUtilities.encodedUTF8Length(rm.getTable());
- size += DBConstants.SHORT_SIZE + rm.key().remaining();
+ DBTypeSizes typeSizes = DBTypeSizes.NATIVE;
+ int tableSize = FBUtilities.encodedUTF8Length(rm.getTable());
+ int keySize = rm.key().remaining();
+ int size = typeSizes.sizeof((short) tableSize) + tableSize;
+ size += typeSizes.sizeof((short) keySize) + keySize;
- size += DBConstants.INT_SIZE;
+ size += typeSizes.sizeof(rm.modifications.size());
for (Map.Entry<Integer,ColumnFamily> entry : rm.modifications.entrySet())
{
- size += DBConstants.INT_SIZE;
- size += entry.getValue().serializedSize();
+ size += typeSizes.sizeof(entry.getKey());
+ size += ColumnFamily.serializer.serializedSize(entry.getValue(), DBTypeSizes.NATIVE);
}
return size;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/RowPosition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowPosition.java b/src/java/org/apache/cassandra/db/RowPosition.java
index e7a45ca..fd80f6b 100644
--- a/src/java/org/apache/cassandra/db/RowPosition.java
+++ b/src/java/org/apache/cassandra/db/RowPosition.java
@@ -100,12 +100,20 @@ public abstract class RowPosition implements RingPosition<RowPosition>
}
}
- public long serializedSize(RowPosition pos)
+ public long serializedSize(RowPosition pos, DBTypeSizes typeSizes)
{
Kind kind = pos.kind();
- return DBConstants.BOOL_SIZE
- + (kind == Kind.ROW_KEY ? DBConstants.SHORT_SIZE + ((DecoratedKey)pos).key.remaining()
- : Token.serializer().serializedSize(pos.getToken()));
+ int size = 1; // 1 byte for enum
+ if (kind == Kind.ROW_KEY)
+ {
+ int keySize = ((DecoratedKey)pos).key.remaining();
+ size += (typeSizes.sizeof((short) keySize) + keySize);
+ }
+ else
+ {
+ Token.serializer().serializedSize(pos.getToken(), typeSizes);
+ }
+ return size;
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java b/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
index 13d4174..13d2995 100644
--- a/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
@@ -110,16 +110,23 @@ class SliceByNamesReadCommandSerializer implements IVersionedSerializer<ReadComm
public long serializedSize(ReadCommand cmd, int version)
{
+ DBTypeSizes typeSizes = DBTypeSizes.NATIVE;
SliceByNamesReadCommand command = (SliceByNamesReadCommand) cmd;
- int size = DBConstants.BOOL_SIZE;
- size += DBConstants.SHORT_SIZE + FBUtilities.encodedUTF8Length(command.table);
- size += DBConstants.SHORT_SIZE + command.key.remaining();
- size += command.queryPath.serializedSize();
- size += DBConstants.INT_SIZE;
+ int size = typeSizes.sizeof(command.isDigestQuery());
+ int tableSize = FBUtilities.encodedUTF8Length(command.table);
+ int keySize = command.key.remaining();
+
+ size += typeSizes.sizeof((short) tableSize) + tableSize;
+ size += typeSizes.sizeof(keySize) + keySize;
+ size += command.queryPath.serializedSize(typeSizes);
+ size += typeSizes.sizeof(command.columnNames.size());
if (!command.columnNames.isEmpty())
{
for (ByteBuffer cName : command.columnNames)
- size += DBConstants.SHORT_SIZE + cName.remaining();
+ {
+ int cNameSize = cName.remaining();
+ size += typeSizes.sizeof((short) cNameSize) + cNameSize;
+ }
}
return size;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
index 47f0b3a..6f6b809 100644
--- a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
@@ -184,15 +184,21 @@ class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand
public long serializedSize(ReadCommand cmd, int version)
{
+ DBTypeSizes typeSizes = DBTypeSizes.NATIVE;
SliceFromReadCommand command = (SliceFromReadCommand) cmd;
- int size = DBConstants.BOOL_SIZE;
- size += DBConstants.SHORT_SIZE + FBUtilities.encodedUTF8Length(command.table);
- size += DBConstants.SHORT_SIZE + command.key.remaining();
- size += command.queryPath.serializedSize();
- size += DBConstants.SHORT_SIZE + command.start.remaining();
- size += DBConstants.SHORT_SIZE + command.finish.remaining();
- size += DBConstants.BOOL_SIZE;
- size += DBConstants.INT_SIZE;
+ int tableSize = FBUtilities.encodedUTF8Length(command.table);
+ int keySize = command.key.remaining();
+ int startSize = command.start.remaining();
+ int finishSize = command.finish.remaining();
+
+ int size = typeSizes.sizeof(cmd.isDigestQuery()); // boolean
+ size += typeSizes.sizeof((short) tableSize) + tableSize;
+ size += typeSizes.sizeof((short) keySize) + keySize;
+ size += command.queryPath.serializedSize(typeSizes);
+ size += typeSizes.sizeof((short) startSize) + startSize;
+ size += typeSizes.sizeof((short) finishSize) + finishSize;
+ size += typeSizes.sizeof(command.reversed);
+ size += typeSizes.sizeof(command.count);
return size;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/SuperColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SuperColumn.java b/src/java/org/apache/cassandra/db/SuperColumn.java
index 70ccca7..30ab43a 100644
--- a/src/java/org/apache/cassandra/db/SuperColumn.java
+++ b/src/java/org/apache/cassandra/db/SuperColumn.java
@@ -102,12 +102,12 @@ public class SuperColumn extends AbstractColumnContainer implements IColumn
/**
* This calculates the exact size of the sub columns on the fly
*/
- public int size()
+ public int size(DBTypeSizes typeSizes)
{
int size = 0;
for (IColumn subColumn : getSubColumns())
{
- size += subColumn.serializedSize();
+ size += subColumn.serializedSize(typeSizes);
}
return size;
}
@@ -116,13 +116,25 @@ public class SuperColumn extends AbstractColumnContainer implements IColumn
* This returns the size of the super-column when serialized.
* @see org.apache.cassandra.db.IColumn#serializedSize()
*/
- public int serializedSize()
+ public int serializedSize(DBTypeSizes typeSizes)
{
/*
* We need to keep the way we are calculating the column size in sync with the
* way we are calculating the size for the column family serializer.
+ *
+ * 2 bytes for name size
+ * n bytes for the name
+ * 4 bytes for getLocalDeletionTime
+ * 8 bytes for getMarkedForDeleteAt
+ * 4 bytes for the subcolumns size
+ * size(constantSize) of subcolumns.
*/
- return DBConstants.SHORT_SIZE + name.remaining() + DBConstants.INT_SIZE + DBConstants.LONG_SIZE + DBConstants.INT_SIZE + size();
+ int nameSize = name.remaining();
+ int subColumnsSize = size(typeSizes);
+ return typeSizes.sizeof((short) nameSize) + nameSize
+ + typeSizes.sizeof(getLocalDeletionTime())
+ + typeSizes.sizeof(getMarkedForDeleteAt())
+ + typeSizes.sizeof(subColumnsSize) + subColumnsSize;
}
public long timestamp()
@@ -400,8 +412,8 @@ class SuperColumnSerializer implements IColumnSerializer
return superColumn;
}
- public long serializedSize(IColumn object)
+ public long serializedSize(IColumn object, DBTypeSizes typeSizes)
{
- return object.serializedSize();
+ return object.serializedSize(typeSizes);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/WriteResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/WriteResponse.java b/src/java/org/apache/cassandra/db/WriteResponse.java
index 2814038..f9bba85 100644
--- a/src/java/org/apache/cassandra/db/WriteResponse.java
+++ b/src/java/org/apache/cassandra/db/WriteResponse.java
@@ -93,9 +93,12 @@ public class WriteResponse
public long serializedSize(WriteResponse response, int version)
{
- int size = DBConstants.SHORT_SIZE + FBUtilities.encodedUTF8Length(response.table());
- size += DBConstants.SHORT_SIZE + response.key().remaining();
- size += DBConstants.BOOL_SIZE;
+ DBTypeSizes typeSizes = DBTypeSizes.NATIVE;
+ int utfSize = FBUtilities.encodedUTF8Length(response.table());
+ int keySize = response.key().remaining();
+ int size = typeSizes.sizeof((short) utfSize) + utfSize;
+ size += typeSizes.sizeof((short) keySize) + keySize;
+ size += typeSizes.sizeof(response.isSuccess());
return size;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java b/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
index 93b7283..b4f709e 100644
--- a/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
+++ b/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
@@ -26,6 +26,7 @@ import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering;
+import org.apache.cassandra.db.DBTypeSizes;
import org.apache.cassandra.io.ISerializer;
import org.apache.cassandra.io.sstable.SSTableReader;
@@ -129,7 +130,7 @@ public class ReplayPosition implements Comparable<ReplayPosition>
return new ReplayPosition(dis.readLong(), dis.readInt());
}
- public long serializedSize(ReplayPosition object)
+ public long serializedSize(ReplayPosition object, DBTypeSizes typeSizes)
{
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 6eae79c..9990851 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -238,7 +238,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements IIterabl
IColumn reduced = purged.iterator().next();
container.clear();
- serializedSize += reduced.serializedSize();
+ serializedSize += reduced.serializedSize(DBTypeSizes.NATIVE);
columns++;
maxTimestampSeen = Math.max(maxTimestampSeen, reduced.maxTimestamp());
int deletionTime = reduced.getLocalDeletionTime();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/context/CounterContext.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/context/CounterContext.java b/src/java/org/apache/cassandra/db/context/CounterContext.java
index 40c93b4..3f18351 100644
--- a/src/java/org/apache/cassandra/db/context/CounterContext.java
+++ b/src/java/org/apache/cassandra/db/context/CounterContext.java
@@ -26,7 +26,7 @@ import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.db.DBConstants;
+import org.apache.cassandra.db.DBTypeSizes;
import org.apache.cassandra.db.marshal.MarshalException;
import org.apache.cassandra.utils.Allocator;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -70,10 +70,10 @@ import org.apache.cassandra.utils.NodeId;
*/
public class CounterContext implements IContext
{
- private static final int HEADER_SIZE_LENGTH = DBConstants.SHORT_SIZE;
- private static final int HEADER_ELT_LENGTH = DBConstants.SHORT_SIZE;
- private static final int CLOCK_LENGTH = DBConstants.LONG_SIZE;
- private static final int COUNT_LENGTH = DBConstants.LONG_SIZE;
+ private static final int HEADER_SIZE_LENGTH = DBTypeSizes.NATIVE.sizeof(Short.MAX_VALUE);
+ private static final int HEADER_ELT_LENGTH = DBTypeSizes.NATIVE.sizeof(Short.MAX_VALUE);
+ private static final int CLOCK_LENGTH = DBTypeSizes.NATIVE.sizeof(Long.MAX_VALUE);
+ private static final int COUNT_LENGTH = DBTypeSizes.NATIVE.sizeof(Long.MAX_VALUE);
private static final int STEP_LENGTH = NodeId.LENGTH + CLOCK_LENGTH + COUNT_LENGTH;
private static final Logger logger = LoggerFactory.getLogger(CounterContext.class);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/db/filter/QueryPath.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/QueryPath.java b/src/java/org/apache/cassandra/db/filter/QueryPath.java
index 31b39d0..b47e9f1 100644
--- a/src/java/org/apache/cassandra/db/filter/QueryPath.java
+++ b/src/java/org/apache/cassandra/db/filter/QueryPath.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.db.filter;
import java.io.*;
import java.nio.ByteBuffer;
-import org.apache.cassandra.db.DBConstants;
+import org.apache.cassandra.db.DBTypeSizes;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.ColumnPath;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -94,11 +94,43 @@ public class QueryPath
cName.remaining() == 0 ? null : cName);
}
- public int serializedSize()
+ public int serializedSize(DBTypeSizes typeSizes)
{
- int size = DBConstants.SHORT_SIZE + (columnFamilyName == null ? 0 : FBUtilities.encodedUTF8Length(columnFamilyName));
- size += DBConstants.SHORT_SIZE + (superColumnName == null ? 0 : superColumnName.remaining());
- size += DBConstants.SHORT_SIZE + (columnName == null ? 0 : columnName.remaining());
+ int size = 0;
+
+ if (columnFamilyName == null)
+ {
+ size += typeSizes.sizeof((short) 0);
+ }
+ else
+ {
+ int cfNameSize = FBUtilities.encodedUTF8Length(columnFamilyName);
+ size += typeSizes.sizeof((short) cfNameSize);
+ size += cfNameSize;
+ }
+
+ if (superColumnName == null)
+ {
+ size += typeSizes.sizeof((short) 0);
+ }
+ else
+ {
+ int scNameSize = superColumnName.remaining();
+ size += typeSizes.sizeof((short) scNameSize);
+ size += scNameSize;
+ }
+
+ if (columnName == null)
+ {
+ size += typeSizes.sizeof((short) 0);
+ }
+ else
+ {
+ int cNameSize = columnName.remaining();
+ size += typeSizes.sizeof((short) cNameSize);
+ size += cNameSize;
+ }
+
return size;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/dht/Token.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Token.java b/src/java/org/apache/cassandra/dht/Token.java
index 045d8a2..bf80b3c 100644
--- a/src/java/org/apache/cassandra/dht/Token.java
+++ b/src/java/org/apache/cassandra/dht/Token.java
@@ -24,6 +24,7 @@ import java.io.Serializable;
import java.nio.ByteBuffer;
import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.db.DBTypeSizes;
import org.apache.cassandra.db.RowPosition;
import org.apache.cassandra.io.ISerializer;
import org.apache.cassandra.service.StorageService;
@@ -102,7 +103,7 @@ public abstract class Token<T> implements RingPosition<Token<T>>, Serializable
return p.getTokenFactory().fromByteArray(ByteBuffer.wrap(bytes));
}
- public long serializedSize(Token object)
+ public long serializedSize(Token object, DBTypeSizes typeSizes)
{
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/io/ISerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/ISerializer.java b/src/java/org/apache/cassandra/io/ISerializer.java
index fc67dec..08ec20f 100644
--- a/src/java/org/apache/cassandra/io/ISerializer.java
+++ b/src/java/org/apache/cassandra/io/ISerializer.java
@@ -21,6 +21,8 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.cassandra.db.DBTypeSizes;
+
public interface ISerializer<T>
{
/**
@@ -39,5 +41,5 @@ public interface ISerializer<T>
*/
public T deserialize(DataInput dis) throws IOException;
- public long serializedSize(T t);
+ public long serializedSize(T t, DBTypeSizes type);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexHelper.java b/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
index 5c7d645..2965cbf 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
@@ -24,6 +24,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.List;
+import org.apache.cassandra.db.DBTypeSizes;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.io.util.FileDataInput;
import org.apache.cassandra.io.util.FileMark;
@@ -199,9 +200,13 @@ public class IndexHelper
dos.writeLong(width);
}
- public int serializedSize()
+ public int serializedSize(DBTypeSizes typeSizes)
{
- return 2 + firstName.remaining() + 2 + lastName.remaining() + 8 + 8;
+ int firstNameSize = firstName.remaining();
+ int lastNameSize = lastName.remaining();
+ return typeSizes.sizeof((short) firstNameSize) + firstNameSize +
+ typeSizes.sizeof((short) lastNameSize) + lastNameSize +
+ typeSizes.sizeof(offset) + typeSizes.sizeof(width);
}
public static IndexInfo deserialize(DataInput dis) throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
index 5c00f5d..b3f08e1 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
@@ -88,7 +88,7 @@ public class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
protected void writeRow(DecoratedKey key, ColumnFamily columnFamily) throws IOException
{
- currentSize += key.key.remaining() + columnFamily.serializedSize() * 1.2;
+ currentSize += key.key.remaining() + ColumnFamily.serializer.serializedSize(columnFamily, DBTypeSizes.NATIVE) * 1.2;
if (currentSize > bufferSize)
sync();
@@ -107,7 +107,7 @@ public class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
{
// We will reuse a CF that we have counted already. But because it will be easier to add the full size
// of the CF in the next writeRow call than to find out the delta, we just remove the size until that next call
- currentSize -= currentKey.key.remaining() + previous.serializedSize() * 1.2;
+ currentSize -= currentKey.key.remaining() + ColumnFamily.serializer.serializedSize(previous, DBTypeSizes.NATIVE) * 1.2;
}
return previous;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index d505151..522fb3b 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.io.sstable;
import java.io.*;
-import java.nio.ByteBuffer;
import java.util.*;
import java.util.regex.Pattern;
@@ -49,6 +48,7 @@ public class SSTableWriter extends SSTable
private DecoratedKey lastWrittenKey;
private FileMark dataMark;
private final SSTableMetadata.Collector sstableMetadataCollector;
+ private final DBTypeSizes typeSizes = DBTypeSizes.NATIVE;
public SSTableWriter(String filename, long keyCount) throws IOException
{
@@ -175,7 +175,7 @@ public class SSTableWriter extends SSTable
ColumnIndex index = new ColumnIndex.Builder(cf.getComparator(), decoratedKey.key, cf.getColumnCount()).build(cf);
// write out row size + data
- dataFile.stream.writeLong(cf.serializedSizeForSSTable());
+ dataFile.stream.writeLong(ColumnFamily.serializer().serializedSizeForSSTable(cf, typeSizes));
ColumnFamily.serializer().serializeForSSTable(cf, dataFile.stream);
afterAppend(decoratedKey, startPosition, cf.deletionInfo(), index);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/io/util/AbstractDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/AbstractDataInput.java b/src/java/org/apache/cassandra/io/util/AbstractDataInput.java
index b42eb81..b0766a4 100644
--- a/src/java/org/apache/cassandra/io/util/AbstractDataInput.java
+++ b/src/java/org/apache/cassandra/io/util/AbstractDataInput.java
@@ -187,7 +187,7 @@ public abstract class AbstractDataInput extends InputStream implements DataInput
* @throws IOException
* if this file is closed or another I/O error occurs.
*/
- public final int readInt() throws IOException {
+ public int readInt() throws IOException {
byte[] buffer = new byte[4];
if (read(buffer, 0, buffer.length) != buffer.length) {
throw new EOFException();
@@ -251,7 +251,7 @@ public abstract class AbstractDataInput extends InputStream implements DataInput
* @throws IOException
* if this file is closed or another I/O error occurs.
*/
- public final long readLong() throws IOException {
+ public long readLong() throws IOException {
byte[] buffer = new byte[8];
int n = read(buffer, 0, buffer.length);
if (n != buffer.length) {
@@ -276,7 +276,7 @@ public abstract class AbstractDataInput extends InputStream implements DataInput
* @throws IOException
* if this file is closed or another I/O error occurs.
*/
- public final short readShort() throws IOException {
+ public short readShort() throws IOException {
byte[] buffer = new byte[2];
if (read(buffer, 0, buffer.length) != buffer.length) {
throw new EOFException();
@@ -314,7 +314,7 @@ public abstract class AbstractDataInput extends InputStream implements DataInput
* @throws IOException
* if this file is closed or another I/O error occurs.
*/
- public final int readUnsignedShort() throws IOException {
+ public int readUnsignedShort() throws IOException {
byte[] buffer = new byte[2];
if (read(buffer, 0, buffer.length) != buffer.length) {
throw new EOFException();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/io/util/AbstractDataOutput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/AbstractDataOutput.java b/src/java/org/apache/cassandra/io/util/AbstractDataOutput.java
new file mode 100644
index 0000000..be26094
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/AbstractDataOutput.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.util;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.UTFDataFormatException;
+
+public abstract class AbstractDataOutput extends OutputStream implements DataOutput
+{
+ /*
+ !! DataOutput methods below are copied from the implementation in Apache Harmony RandomAccessFile.
+ */
+
+ /**
+ * Writes the entire contents of the byte array <code>buffer</code> to
+ * this RandomAccessFile starting at the current file pointer.
+ *
+ * @param buffer
+ * the buffer to be written.
+ *
+ * @throws IOException
+ * If an error occurs trying to write to this RandomAccessFile.
+ */
+ public void write(byte[] buffer) throws IOException {
+ write(buffer, 0, buffer.length);
+ }
+
+ /**
+ * Writes <code>count</code> bytes from the byte array <code>buffer</code>
+ * starting at <code>offset</code> to this RandomAccessFile starting at
+ * the current file pointer..
+ *
+ * @param buffer
+ * the bytes to be written
+ * @param offset
+ * offset in buffer to get bytes
+ * @param count
+ * number of bytes in buffer to write
+ *
+ * @throws IOException
+ * If an error occurs attempting to write to this
+ * RandomAccessFile.
+ * @throws IndexOutOfBoundsException
+ * If offset or count are outside of bounds.
+ */
+ public abstract void write(byte[] buffer, int offset, int count) throws IOException;
+
+ /**
+ * Writes the specified byte <code>oneByte</code> to this RandomAccessFile
+ * starting at the current file pointer. Only the low order byte of
+ * <code>oneByte</code> is written.
+ *
+ * @param oneByte
+ * the byte to be written
+ *
+ * @throws IOException
+ * If an error occurs attempting to write to this
+ * RandomAccessFile.
+ */
+ public abstract void write(int oneByte) throws IOException;
+
+ /**
+ * Writes a boolean to this output stream.
+ *
+ * @param val
+ * the boolean value to write to the OutputStream
+ *
+ * @throws IOException
+ * If an error occurs attempting to write to this
+ * DataOutputStream.
+ */
+ public final void writeBoolean(boolean val) throws IOException {
+ write(val ? 1 : 0);
+ }
+
+ /**
+ * Writes a 8-bit byte to this output stream.
+ *
+ * @param val
+ * the byte value to write to the OutputStream
+ *
+ * @throws java.io.IOException
+ * If an error occurs attempting to write to this
+ * DataOutputStream.
+ */
+ public final void writeByte(int val) throws IOException {
+ write(val & 0xFF);
+ }
+
+ /**
+ * Writes the low order 8-bit bytes from a String to this output stream.
+ *
+ * @param str
+ * the String containing the bytes to write to the OutputStream
+ *
+ * @throws IOException
+ * If an error occurs attempting to write to this
+ * DataOutputStream.
+ */
+ public final void writeBytes(String str) throws IOException {
+ byte bytes[] = new byte[str.length()];
+ for (int index = 0; index < str.length(); index++) {
+ bytes[index] = (byte) (str.charAt(index) & 0xFF);
+ }
+ write(bytes);
+ }
+
+ /**
+ * Writes the specified 16-bit character to the OutputStream. Only the lower
+ * 2 bytes are written with the higher of the 2 bytes written first. This
+ * represents the Unicode value of val.
+ *
+ * @param val
+ * the character to be written
+ *
+ * @throws IOException
+ * If an error occurs attempting to write to this
+ * DataOutputStream.
+ */
+ public final void writeChar(int val) throws IOException {
+ byte[] buffer = new byte[2];
+ buffer[0] = (byte) (val >> 8);
+ buffer[1] = (byte) val;
+ write(buffer, 0, buffer.length);
+ }
+
+ /**
+ * Writes the specified 16-bit characters contained in str to the
+ * OutputStream. Only the lower 2 bytes of each character are written with
+ * the higher of the 2 bytes written first. This represents the Unicode
+ * value of each character in str.
+ *
+ * @param str
+ * the String whose characters are to be written.
+ *
+ * @throws IOException
+ * If an error occurs attempting to write to this
+ * DataOutputStream.
+ */
+ public final void writeChars(String str) throws IOException {
+ byte newBytes[] = new byte[str.length() * 2];
+ for (int index = 0; index < str.length(); index++) {
+ int newIndex = index == 0 ? index : index * 2;
+ newBytes[newIndex] = (byte) ((str.charAt(index) >> 8) & 0xFF);
+ newBytes[newIndex + 1] = (byte) (str.charAt(index) & 0xFF);
+ }
+ write(newBytes);
+ }
+
+ /**
+ * Writes a 64-bit double to this output stream. The resulting output is the
+ * 8 bytes resulting from calling Double.doubleToLongBits().
+ *
+ * @param val
+ * the double to be written.
+ *
+ * @throws IOException
+ * If an error occurs attempting to write to this
+ * DataOutputStream.
+ */
+ public final void writeDouble(double val) throws IOException {
+ writeLong(Double.doubleToLongBits(val));
+ }
+
+ /**
+ * Writes a 32-bit float to this output stream. The resulting output is the
+ * 4 bytes resulting from calling Float.floatToIntBits().
+ *
+ * @param val
+ * the float to be written.
+ *
+ * @throws IOException
+ * If an error occurs attempting to write to this
+ * DataOutputStream.
+ */
+ public final void writeFloat(float val) throws IOException {
+ writeInt(Float.floatToIntBits(val));
+ }
+
+ /**
+ * Writes a 32-bit int to this output stream. The resulting output is the 4
+ * bytes, highest order first, of val.
+ *
+ * @param val
+ * the int to be written.
+ *
+ * @throws IOException
+ * If an error occurs attempting to write to this
+ * DataOutputStream.
+ */
+ public void writeInt(int val) throws IOException {
+ byte[] buffer = new byte[4];
+ buffer[0] = (byte) (val >> 24);
+ buffer[1] = (byte) (val >> 16);
+ buffer[2] = (byte) (val >> 8);
+ buffer[3] = (byte) val;
+ write(buffer, 0, buffer.length);
+ }
+
+ /**
+ * Writes a 64-bit long to this output stream. The resulting output is the 8
+ * bytes, highest order first, of val.
+ *
+ * @param val
+ * the long to be written.
+ *
+ * @throws IOException
+ * If an error occurs attempting to write to this
+ * DataOutputStream.
+ */
+ public void writeLong(long val) throws IOException {
+ byte[] buffer = new byte[8];
+ int t = (int) (val >> 32);
+ buffer[0] = (byte) (t >> 24);
+ buffer[1] = (byte) (t >> 16);
+ buffer[2] = (byte) (t >> 8);
+ buffer[3] = (byte) t;
+ buffer[4] = (byte) (val >> 24);
+ buffer[5] = (byte) (val >> 16);
+ buffer[6] = (byte) (val >> 8);
+ buffer[7] = (byte) val;
+ write(buffer, 0, buffer.length);
+ }
+
+ /**
+ * Writes the specified 16-bit short to the OutputStream. Only the lower 2
+ * bytes are written with the higher of the 2 bytes written first.
+ *
+ * @param val
+ * the short to be written
+ *
+ * @throws IOException
+ * If an error occurs attempting to write to this
+ * DataOutputStream.
+ */
+ public void writeShort(int val) throws IOException {
+ writeChar(val);
+ }
+
+ /**
+ * Writes the specified String out in UTF format.
+ *
+ * @param str
+ * the String to be written in UTF format.
+ *
+ * @throws IOException
+ * If an error occurs attempting to write to this
+ * DataOutputStream.
+ */
+ public final void writeUTF(String str) throws IOException {
+ int utfCount = 0, length = str.length();
+ for (int i = 0; i < length; i++) {
+ int charValue = str.charAt(i);
+ if (charValue > 0 && charValue <= 127) {
+ utfCount++;
+ } else if (charValue <= 2047) {
+ utfCount += 2;
+ } else {
+ utfCount += 3;
+ }
+ }
+ if (utfCount > 65535) {
+ throw new UTFDataFormatException(); //$NON-NLS-1$
+ }
+ byte utfBytes[] = new byte[utfCount + 2];
+ int utfIndex = 2;
+ for (int i = 0; i < length; i++) {
+ int charValue = str.charAt(i);
+ if (charValue > 0 && charValue <= 127) {
+ utfBytes[utfIndex++] = (byte) charValue;
+ } else if (charValue <= 2047) {
+ utfBytes[utfIndex++] = (byte) (0xc0 | (0x1f & (charValue >> 6)));
+ utfBytes[utfIndex++] = (byte) (0x80 | (0x3f & charValue));
+ } else {
+ utfBytes[utfIndex++] = (byte) (0xe0 | (0x0f & (charValue >> 12)));
+ utfBytes[utfIndex++] = (byte) (0x80 | (0x3f & (charValue >> 6)));
+ utfBytes[utfIndex++] = (byte) (0x80 | (0x3f & charValue));
+ }
+ }
+ utfBytes[0] = (byte) (utfCount >> 8);
+ utfBytes[1] = (byte) utfCount;
+ write(utfBytes);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java b/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java
index 4d3c4af..7eb7020 100644
--- a/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java
+++ b/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java
@@ -21,7 +21,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.cassandra.db.DBConstants;
+import org.apache.cassandra.db.DBTypeSizes;
import org.apache.cassandra.io.ISerializer;
import org.apache.cassandra.utils.obs.OpenBitSet;
@@ -72,10 +72,22 @@ abstract class BloomFilterSerializer implements ISerializer<BloomFilter>
*
* @return serialized size of the given bloom filter
*/
- public long serializedSize(BloomFilter bf)
+ public long serializedSize(BloomFilter bf, DBTypeSizes typeSizes)
{
- return DBConstants.INT_SIZE // hash count
- + DBConstants.INT_SIZE // length
- + bf.bitset.getNumWords() * DBConstants.LONG_SIZE; // buckets
+ int bitLength = bf.bitset.getNumWords();
+ int pageSize = bf.bitset.getPageSize();
+ int pageCount = bf.bitset.getPageCount();
+
+ int size = 0;
+ size += typeSizes.sizeof(bf.getHashCount()); // hash count
+ size += typeSizes.sizeof(bitLength); // length
+
+ for (int p = 0; p < pageCount; p++)
+ {
+ long[] bits = bf.bitset.getPage(p);
+ for (int i = 0; i < pageSize && bitLength-- > 0; i++)
+ size += typeSizes.sizeof(bits[i]); // bucket
+ }
+ return size;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
index b5bb859..027bc99 100644
--- a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
+++ b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
@@ -341,8 +341,7 @@ public class ByteBufferUtil
assert 0 <= length && length <= FBUtilities.MAX_UNSIGNED_SHORT : length;
try
{
- out.writeByte((length >> 8) & 0xFF);
- out.writeByte(length & 0xFF);
+ out.writeShort(length);
write(buffer, out); // writing data bytes to output source
}
catch (IOException e)
@@ -365,8 +364,7 @@ public class ByteBufferUtil
/* @return An unsigned short in an integer. */
public static int readShortLength(DataInput in) throws IOException
{
- int length = (in.readByte() & 0xFF) << 8;
- return length | (in.readByte() & 0xFF);
+ return in.readUnsignedShort();
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/utils/EstimatedHistogram.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/EstimatedHistogram.java b/src/java/org/apache/cassandra/utils/EstimatedHistogram.java
index 7a184d8..9cabb9f 100644
--- a/src/java/org/apache/cassandra/utils/EstimatedHistogram.java
+++ b/src/java/org/apache/cassandra/utils/EstimatedHistogram.java
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicLongArray;
import com.google.common.base.Objects;
+import org.apache.cassandra.db.DBTypeSizes;
import org.apache.cassandra.io.ISerializer;
public class EstimatedHistogram
@@ -272,7 +273,7 @@ public class EstimatedHistogram
return new EstimatedHistogram(offsets, buckets);
}
- public long serializedSize(EstimatedHistogram object)
+ public long serializedSize(EstimatedHistogram object, DBTypeSizes typeSizes)
{
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/utils/FilterFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FilterFactory.java b/src/java/org/apache/cassandra/utils/FilterFactory.java
index 93c642f..ab453f8 100644
--- a/src/java/org/apache/cassandra/utils/FilterFactory.java
+++ b/src/java/org/apache/cassandra/utils/FilterFactory.java
@@ -18,16 +18,18 @@
package org.apache.cassandra.utils;
import java.io.DataInput;
-import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.cassandra.db.DBTypeSizes;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FilterFactory
{
private static final Logger logger = LoggerFactory.getLogger(FilterFactory.class);
+ private static final DBTypeSizes TYPE_SIZES = DBTypeSizes.NATIVE;
public enum Type
{
@@ -80,9 +82,9 @@ public class FilterFactory
case SHA:
return LegacyBloomFilter.serializer.serializedSize((LegacyBloomFilter) bf);
case MURMUR2:
- return Murmur2BloomFilter.serializer.serializedSize((Murmur2BloomFilter) bf);
+ return Murmur2BloomFilter.serializer.serializedSize((Murmur2BloomFilter) bf, TYPE_SIZES);
default:
- return Murmur3BloomFilter.serializer.serializedSize((Murmur3BloomFilter) bf);
+ return Murmur3BloomFilter.serializer.serializedSize((Murmur3BloomFilter) bf, TYPE_SIZES);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/utils/StreamingHistogram.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/StreamingHistogram.java b/src/java/org/apache/cassandra/utils/StreamingHistogram.java
index 98f1ce9..df675e6 100644
--- a/src/java/org/apache/cassandra/utils/StreamingHistogram.java
+++ b/src/java/org/apache/cassandra/utils/StreamingHistogram.java
@@ -17,7 +17,7 @@
*/
package org.apache.cassandra.utils;
-import org.apache.cassandra.db.DBConstants;
+import org.apache.cassandra.db.DBTypeSizes;
import org.apache.cassandra.io.ISerializer;
import java.io.DataInput;
@@ -193,7 +193,7 @@ public class StreamingHistogram
return new StreamingHistogram(maxBinSize, tmp);
}
- public long serializedSize(StreamingHistogram histogram)
+ public long serializedSize(StreamingHistogram histogram, DBTypeSizes typeSizes)
{
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java b/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java
new file mode 100644
index 0000000..b35d180
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils.vint;
+
+import java.io.DataInput;
+import java.io.IOException;
+
+import org.apache.cassandra.io.util.AbstractDataInput;
+
+/**
+ * Borrows idea from
+ * https://developers.google.com/protocol-buffers/docs/encoding#varints
+ *
+ * Should be used with EncodedDataOutputStream
+ */
+public class EncodedDataInputStream extends AbstractDataInput
+{
+ private DataInput input;
+
+ public EncodedDataInputStream(DataInput input)
+ {
+ this.input = input;
+ }
+
+ public int skipBytes(int n) throws IOException
+ {
+ return input.skipBytes(n);
+ }
+
+ public int read() throws IOException
+ {
+ return input.readByte() & 0xFF;
+ }
+
+ protected void seekInternal(int position)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ protected int getPosition()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ /* as all of the integer types could be decoded using VInt we can use single method vintEncode */
+
+ public int readInt() throws IOException
+ {
+ return (int) vintDecode();
+ }
+
+ public long readLong() throws IOException
+ {
+ return vintDecode();
+ }
+
+ public int readUnsignedShort() throws IOException
+ {
+ return (short) vintDecode();
+ }
+
+ public short readShort() throws IOException
+ {
+ return (short) vintDecode();
+ }
+
+ private long vintDecode() throws IOException
+ {
+ byte firstByte = input.readByte();
+ int len = vintDecodeSize(firstByte);
+ if (len == 1)
+ return firstByte;
+ long i = 0;
+ for (int idx = 0; idx < len - 1; idx++)
+ {
+ byte b = input.readByte();
+ i = i << 8;
+ i = i | (b & 0xFF);
+ }
+ return (vintIsNegative(firstByte) ? (i ^ -1L) : i);
+ }
+
+ private int vintDecodeSize(byte value)
+ {
+ if (value >= -112)
+ {
+ return 1;
+ }
+ else if (value < -120)
+ {
+ return -119 - value;
+ }
+ return -111 - value;
+ }
+
+ private boolean vintIsNegative(byte value)
+ {
+ return value < -120 || (value >= -112 && value < 0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java b/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java
new file mode 100644
index 0000000..92612b6
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils.vint;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.cassandra.io.util.AbstractDataOutput;
+
+/**
+ * Borrows idea from
+ * https://developers.google.com/protocol-buffers/docs/encoding#varints
+ */
+public class EncodedDataOutputStream extends AbstractDataOutput
+{
+ private OutputStream out;
+
+ public EncodedDataOutputStream(OutputStream out)
+ {
+ this.out = out;
+ }
+
+ public void write(int b) throws IOException
+ {
+ out.write(b);
+ }
+
+ public void write(byte[] b) throws IOException
+ {
+ out.write(b);
+ }
+
+ public void write(byte[] b, int off, int len) throws IOException
+ {
+ out.write(b, off, len);
+ }
+
+ /* as all of the integer types could be encoded using VInt we can use single method vintEncode */
+
+ public void writeInt(int v) throws IOException
+ {
+ vintEncode(v);
+ }
+
+ public void writeLong(long v) throws IOException
+ {
+ vintEncode(v);
+ }
+
+ public void writeShort(int v) throws IOException
+ {
+ vintEncode(v);
+ }
+
+ private void vintEncode(long i) throws IOException
+ {
+ if (i >= -112 && i <= 127)
+ {
+ writeByte((byte) i);
+ return;
+ }
+ int len = -112;
+ if (i < 0)
+ {
+ i ^= -1L; // take one's complement'
+ len = -120;
+ }
+ long tmp = i;
+ while (tmp != 0)
+ {
+ tmp = tmp >> 8;
+ len--;
+ }
+ writeByte((byte) len);
+ len = (len < -120) ? -(len + 120) : -(len + 112);
+ for (int idx = len; idx != 0; idx--)
+ {
+ int shiftbits = (idx - 1) * 8;
+ long mask = 0xFFL << shiftbits;
+ writeByte((byte) ((i & mask) >> shiftbits));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb25a8fc/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index 1f6a799..fd5259a 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -75,6 +75,11 @@ public class Util
return new Column(ByteBufferUtil.bytes(name), ByteBufferUtil.bytes(value), timestamp);
}
+ public static Column counterColumn(String name, long value, long timestamp)
+ {
+ return new CounterUpdateColumn(ByteBufferUtil.bytes(name), value, timestamp);
+ }
+
public static SuperColumn superColumn(ColumnFamily cf, String name, Column... columns)
{
SuperColumn sc = new SuperColumn(ByteBufferUtil.bytes(name), cf.metadata().comparator);