You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/05/02 10:54:52 UTC
git commit: IOException related cleanups
Updated Branches:
refs/heads/trunk 2bc79a074 -> e2506f1d0
IOException related cleanups
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e2506f1d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e2506f1d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e2506f1d
Branch: refs/heads/trunk
Commit: e2506f1d0f14d8719e5f4fde8020b0c5a31383fd
Parents: 2bc79a0
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu May 2 10:54:42 2013 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu May 2 10:54:42 2013 +0200
----------------------------------------------------------------------
.../org/apache/cassandra/db/CounterColumn.java | 4 +-
.../org/apache/cassandra/db/CounterMutation.java | 2 +-
.../cassandra/db/CounterMutationVerbHandler.java | 4 -
.../apache/cassandra/db/HintedHandOffManager.java | 2 +-
.../org/apache/cassandra/service/StorageProxy.java | 79 ++++++---------
.../org/apache/cassandra/utils/FBUtilities.java | 26 +++--
6 files changed, 53 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2506f1d/src/java/org/apache/cassandra/db/CounterColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterColumn.java b/src/java/org/apache/cassandra/db/CounterColumn.java
index f94672d..15da1df 100644
--- a/src/java/org/apache/cassandra/db/CounterColumn.java
+++ b/src/java/org/apache/cassandra/db/CounterColumn.java
@@ -344,7 +344,7 @@ public class CounterColumn extends Column
return new CounterColumn(name, contextManager.markDeltaToBeCleared(value), timestamp, timestampOfLastDelete);
}
- private static void sendToOtherReplica(DecoratedKey key, ColumnFamily cf) throws RequestExecutionException, IOException
+ private static void sendToOtherReplica(DecoratedKey key, ColumnFamily cf) throws RequestExecutionException
{
RowMutation rm = new RowMutation(cf.metadata().ksName, key.key, cf);
@@ -354,7 +354,7 @@ public class CounterColumn extends Column
StorageProxy.performWrite(rm, ConsistencyLevel.ANY, localDataCenter, new StorageProxy.WritePerformer()
{
public void apply(IMutation mutation, Iterable<InetAddress> targets, AbstractWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level)
- throws IOException, OverloadedException
+ throws OverloadedException
{
// We should only send to the remote replica, not the local one
Set<InetAddress> remotes = Sets.difference(ImmutableSet.copyOf(targets), ImmutableSet.of(local));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2506f1d/src/java/org/apache/cassandra/db/CounterMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java b/src/java/org/apache/cassandra/db/CounterMutation.java
index d60c5eb..65ca22a 100644
--- a/src/java/org/apache/cassandra/db/CounterMutation.java
+++ b/src/java/org/apache/cassandra/db/CounterMutation.java
@@ -108,7 +108,7 @@ public class CounterMutation implements IMutation
commands.add(new SliceByNamesReadCommand(table, key, columnFamily.metadata().cfName, new NamesQueryFilter(s)));
}
- public MessageOut<CounterMutation> makeMutationMessage() throws IOException
+ public MessageOut<CounterMutation> makeMutationMessage()
{
return new MessageOut<CounterMutation>(MessagingService.Verb.COUNTER_MUTATION, this, serializer);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2506f1d/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
index 38ee66f..3286c9a 100644
--- a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
@@ -63,9 +63,5 @@ public class CounterMutationVerbHandler implements IVerbHandler<CounterMutation>
// The coordinator will timeout on it's own so ignore
logger.debug("counter error", e);
}
- catch (IOException e)
- {
- logger.error("Error in counter mutation", e);
- }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2506f1d/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 6f2ecb4..0939abb 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -116,7 +116,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
* Returns a mutation representing a Hint to be sent to <code>targetId</code>
* as soon as it becomes available again.
*/
- public static RowMutation hintFor(RowMutation mutation, UUID targetId) throws IOException
+ public static RowMutation hintFor(RowMutation mutation, UUID targetId)
{
UUID hintId = UUIDGen.getTimeUUID();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2506f1d/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index e187841..8d2dc60 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.service;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
@@ -54,7 +53,7 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.io.util.FastByteArrayOutputStream;
+import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.locator.TokenMetadata;
@@ -115,7 +114,7 @@ public class StorageProxy implements StorageProxyMBean
AbstractWriteResponseHandler responseHandler,
String localDataCenter,
ConsistencyLevel consistency_level)
- throws IOException, OverloadedException
+ throws OverloadedException
{
assert mutation instanceof RowMutation;
sendToHintedEndpoints((RowMutation) mutation, targets, responseHandler, localDataCenter, consistency_level);
@@ -135,7 +134,6 @@ public class StorageProxy implements StorageProxyMBean
AbstractWriteResponseHandler responseHandler,
String localDataCenter,
ConsistencyLevel consistency_level)
- throws IOException
{
if (logger.isTraceEnabled())
logger.trace("insert writing local & replicate " + mutation.toString(true));
@@ -152,7 +150,6 @@ public class StorageProxy implements StorageProxyMBean
AbstractWriteResponseHandler responseHandler,
String localDataCenter,
ConsistencyLevel consistency_level)
- throws IOException
{
if (logger.isTraceEnabled())
logger.trace("insert writing local & replicate " + mutation.toString(true));
@@ -395,12 +392,10 @@ public class StorageProxy implements StorageProxyMBean
long startTime = System.nanoTime();
List<AbstractWriteResponseHandler> responseHandlers = new ArrayList<AbstractWriteResponseHandler>(mutations.size());
- IMutation mostRecentMutation = null;
try
{
for (IMutation mutation : mutations)
{
- mostRecentMutation = mutation;
if (mutation instanceof CounterMutation)
{
responseHandlers.add(mutateCounter((CounterMutation)mutation, localDataCenter));
@@ -446,11 +441,6 @@ public class StorageProxy implements StorageProxyMBean
Tracing.trace("Overloaded");
throw e;
}
- catch (IOException e)
- {
- assert mostRecentMutation != null;
- throw new RuntimeException("error writing key " + ByteBufferUtil.bytesToHex(mostRecentMutation.key()), e);
- }
finally
{
writeMetrics.addNano(System.nanoTime() - startTime);
@@ -596,7 +586,7 @@ public class StorageProxy implements StorageProxyMBean
WritePerformer performer,
Runnable callback,
WriteType writeType)
- throws UnavailableException, OverloadedException, IOException
+ throws UnavailableException, OverloadedException
{
String table = mutation.getTable();
AbstractReplicationStrategy rs = Table.open(table).getReplicationStrategy();
@@ -763,7 +753,7 @@ public class StorageProxy implements StorageProxyMBean
HintRunnable runnable = new HintRunnable(target)
{
- public void runMayThrow() throws IOException
+ public void runMayThrow()
{
logger.debug("Adding hint for {}", target);
@@ -784,7 +774,7 @@ public class StorageProxy implements StorageProxyMBean
return (Future<Void>) StageManager.getStage(Stage.MUTATION).submit(runnable);
}
- public static void writeHintForMutation(RowMutation mutation, InetAddress target) throws IOException
+ public static void writeHintForMutation(RowMutation mutation, InetAddress target)
{
UUID hostId = StorageService.instance.getTokenMetadata().getHostId(target);
assert hostId != null : "Missing host ID for " + target.getHostAddress();
@@ -816,18 +806,6 @@ public class StorageProxy implements StorageProxyMBean
private static void sendMessagesToOneDC(MessageOut message, Collection<InetAddress> targets, boolean localDC, AbstractWriteResponseHandler handler)
{
- try
- {
- sendMessagesToOneDCInternal(message, targets, localDC, handler);
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- private static void sendMessagesToOneDCInternal(MessageOut message, Collection<InetAddress> targets, boolean localDC, AbstractWriteResponseHandler handler) throws IOException
- {
Iterator<InetAddress> iter = targets.iterator();
InetAddress target = iter.next();
@@ -847,21 +825,28 @@ public class StorageProxy implements StorageProxyMBean
}
// Add all the other destinations of the same message as a FORWARD_HEADER entry
- FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
- DataOutputStream out = new DataOutputStream(bos);
- out.writeInt(targets.size() - 1);
- while (iter.hasNext())
- {
- InetAddress destination = iter.next();
- CompactEndpointSerializationHelper.serialize(destination, out);
- int id = MessagingService.instance().addCallback(handler, message, destination, message.getTimeout());
- out.writeInt(id);
- logger.trace("Adding FWD message to {}@{}", id, destination);
+ DataOutputBuffer out = new DataOutputBuffer();
+ try
+ {
+ out.writeInt(targets.size() - 1);
+ while (iter.hasNext())
+ {
+ InetAddress destination = iter.next();
+ CompactEndpointSerializationHelper.serialize(destination, out);
+ int id = MessagingService.instance().addCallback(handler, message, destination, message.getTimeout());
+ out.writeInt(id);
+ logger.trace("Adding FWD message to {}@{}", id, destination);
+ }
+ message = message.withParameter(RowMutation.FORWARD_TO, out.getData());
+ // send the combined message + forward headers
+ int id = MessagingService.instance().sendRR(message, target, handler);
+ logger.trace("Sending message to {}@{}", id, target);
+ }
+ catch (IOException e)
+ {
+ // DataOutputBuffer is in-memory, doesn't throw IOException
+ throw new AssertionError(e);
}
- message = message.withParameter(RowMutation.FORWARD_TO, bos.toByteArray());
- // send the combined message + forward headers
- int id = MessagingService.instance().sendRR(message, target, handler);
- logger.trace("Sending message to {}@{}", id, target);
}
private static void insertLocal(final RowMutation rm, final AbstractWriteResponseHandler responseHandler)
@@ -871,7 +856,7 @@ public class StorageProxy implements StorageProxyMBean
Runnable runnable = new DroppableRunnable(MessagingService.Verb.MUTATION)
{
- public void runMayThrow() throws IOException
+ public void runMayThrow()
{
rm.apply();
responseHandler.response(null);
@@ -894,7 +879,7 @@ public class StorageProxy implements StorageProxyMBean
* quicker response and because the WriteResponseHandlers don't make it easy to send back an error. We also always gather
* the write latencies at the coordinator node to make gathering point similar to the case of standard writes.
*/
- public static AbstractWriteResponseHandler mutateCounter(CounterMutation cm, String localDataCenter) throws UnavailableException, OverloadedException, IOException
+ public static AbstractWriteResponseHandler mutateCounter(CounterMutation cm, String localDataCenter) throws UnavailableException, OverloadedException
{
InetAddress endpoint = findSuitableEndpoint(cm.getTable(), cm.key(), localDataCenter, cm.consistency());
@@ -963,7 +948,7 @@ public class StorageProxy implements StorageProxyMBean
// Must be called on a replica of the mutation. This replica becomes the
// leader of this mutation.
public static AbstractWriteResponseHandler applyCounterMutationOnLeader(CounterMutation cm, String localDataCenter, Runnable callback)
- throws UnavailableException, IOException, OverloadedException
+ throws UnavailableException, OverloadedException
{
return performWrite(cm, cm.consistency(), localDataCenter, counterWritePerformer, callback, WriteType.COUNTER);
}
@@ -971,7 +956,7 @@ public class StorageProxy implements StorageProxyMBean
// Same as applyCounterMutationOnLeader but must with the difference that it use the MUTATION stage to execute the write (while
// applyCounterMutationOnLeader assumes it is on the MUTATION stage already)
public static AbstractWriteResponseHandler applyCounterMutationOnCoordinator(CounterMutation cm, String localDataCenter)
- throws UnavailableException, IOException, OverloadedException
+ throws UnavailableException, OverloadedException
{
return performWrite(cm, cm.consistency(), localDataCenter, counterWriteOnCoordinatorPerformer, null, WriteType.COUNTER);
}
@@ -984,7 +969,7 @@ public class StorageProxy implements StorageProxyMBean
{
return new LocalMutationRunnable()
{
- public void runMayThrow() throws IOException
+ public void runMayThrow()
{
assert mutation instanceof CounterMutation;
final CounterMutation cm = (CounterMutation) mutation;
@@ -1738,7 +1723,7 @@ public class StorageProxy implements StorageProxyMBean
public interface WritePerformer
{
- public void apply(IMutation mutation, Iterable<InetAddress> targets, AbstractWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws IOException, OverloadedException;
+ public void apply(IMutation mutation, Iterable<InetAddress> targets, AbstractWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws OverloadedException;
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2506f1d/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index ec8241c..e606e06 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -607,14 +607,22 @@ public class FBUtilities
public void close() {}
}
- public static <T> byte[] serialize(T object, IVersionedSerializer<T> serializer, int version) throws IOException
- {
- int size = (int) serializer.serializedSize(object, version);
- DataOutputBuffer buffer = new DataOutputBuffer(size);
- serializer.serialize(object, buffer, version);
- assert buffer.getLength() == size && buffer.getData().length == size
- : String.format("Final buffer length %s to accommodate data size of %s (predicted %s) for %s",
- buffer.getData().length, buffer.getLength(), size, object);
- return buffer.getData();
+ public static <T> byte[] serialize(T object, IVersionedSerializer<T> serializer, int version)
+ {
+ try
+ {
+ int size = (int) serializer.serializedSize(object, version);
+ DataOutputBuffer buffer = new DataOutputBuffer(size);
+ serializer.serialize(object, buffer, version);
+ assert buffer.getLength() == size && buffer.getData().length == size
+ : String.format("Final buffer length %s to accommodate data size of %s (predicted %s) for %s",
+ buffer.getData().length, buffer.getLength(), size, object);
+ return buffer.getData();
+ }
+ catch (IOException e)
+ {
+ // We're doing in-memory serialization...
+ throw new AssertionError(e);
+ }
}
}