You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/05/02 10:54:52 UTC

git commit: IOException related cleanups

Updated Branches:
  refs/heads/trunk 2bc79a074 -> e2506f1d0


IOException related cleanups


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e2506f1d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e2506f1d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e2506f1d

Branch: refs/heads/trunk
Commit: e2506f1d0f14d8719e5f4fde8020b0c5a31383fd
Parents: 2bc79a0
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu May 2 10:54:42 2013 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu May 2 10:54:42 2013 +0200

----------------------------------------------------------------------
 .../org/apache/cassandra/db/CounterColumn.java     |    4 +-
 .../org/apache/cassandra/db/CounterMutation.java   |    2 +-
 .../cassandra/db/CounterMutationVerbHandler.java   |    4 -
 .../apache/cassandra/db/HintedHandOffManager.java  |    2 +-
 .../org/apache/cassandra/service/StorageProxy.java |   79 ++++++---------
 .../org/apache/cassandra/utils/FBUtilities.java    |   26 +++--
 6 files changed, 53 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2506f1d/src/java/org/apache/cassandra/db/CounterColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterColumn.java b/src/java/org/apache/cassandra/db/CounterColumn.java
index f94672d..15da1df 100644
--- a/src/java/org/apache/cassandra/db/CounterColumn.java
+++ b/src/java/org/apache/cassandra/db/CounterColumn.java
@@ -344,7 +344,7 @@ public class CounterColumn extends Column
         return new CounterColumn(name, contextManager.markDeltaToBeCleared(value), timestamp, timestampOfLastDelete);
     }
 
-    private static void sendToOtherReplica(DecoratedKey key, ColumnFamily cf) throws RequestExecutionException, IOException
+    private static void sendToOtherReplica(DecoratedKey key, ColumnFamily cf) throws RequestExecutionException
     {
         RowMutation rm = new RowMutation(cf.metadata().ksName, key.key, cf);
 
@@ -354,7 +354,7 @@ public class CounterColumn extends Column
         StorageProxy.performWrite(rm, ConsistencyLevel.ANY, localDataCenter, new StorageProxy.WritePerformer()
         {
             public void apply(IMutation mutation, Iterable<InetAddress> targets, AbstractWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level)
-            throws IOException, OverloadedException
+            throws OverloadedException
             {
                 // We should only send to the remote replica, not the local one
                 Set<InetAddress> remotes = Sets.difference(ImmutableSet.copyOf(targets), ImmutableSet.of(local));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2506f1d/src/java/org/apache/cassandra/db/CounterMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java b/src/java/org/apache/cassandra/db/CounterMutation.java
index d60c5eb..65ca22a 100644
--- a/src/java/org/apache/cassandra/db/CounterMutation.java
+++ b/src/java/org/apache/cassandra/db/CounterMutation.java
@@ -108,7 +108,7 @@ public class CounterMutation implements IMutation
         commands.add(new SliceByNamesReadCommand(table, key, columnFamily.metadata().cfName, new NamesQueryFilter(s)));
     }
 
-    public MessageOut<CounterMutation> makeMutationMessage() throws IOException
+    public MessageOut<CounterMutation> makeMutationMessage()
     {
         return new MessageOut<CounterMutation>(MessagingService.Verb.COUNTER_MUTATION, this, serializer);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2506f1d/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
index 38ee66f..3286c9a 100644
--- a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
@@ -63,9 +63,5 @@ public class CounterMutationVerbHandler implements IVerbHandler<CounterMutation>
             // The coordinator will timeout on it's own so ignore
             logger.debug("counter error", e);
         }
-        catch (IOException e)
-        {
-            logger.error("Error in counter mutation", e);
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2506f1d/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 6f2ecb4..0939abb 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -116,7 +116,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
      * Returns a mutation representing a Hint to be sent to <code>targetId</code>
      * as soon as it becomes available again.
      */
-    public static RowMutation hintFor(RowMutation mutation, UUID targetId) throws IOException
+    public static RowMutation hintFor(RowMutation mutation, UUID targetId)
     {
         UUID hintId = UUIDGen.getTimeUUID();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2506f1d/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index e187841..8d2dc60 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.service;
 
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
@@ -54,7 +53,7 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.io.util.FastByteArrayOutputStream;
+import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.locator.TokenMetadata;
@@ -115,7 +114,7 @@ public class StorageProxy implements StorageProxyMBean
                               AbstractWriteResponseHandler responseHandler,
                               String localDataCenter,
                               ConsistencyLevel consistency_level)
-            throws IOException, OverloadedException
+            throws OverloadedException
             {
                 assert mutation instanceof RowMutation;
                 sendToHintedEndpoints((RowMutation) mutation, targets, responseHandler, localDataCenter, consistency_level);
@@ -135,7 +134,6 @@ public class StorageProxy implements StorageProxyMBean
                               AbstractWriteResponseHandler responseHandler,
                               String localDataCenter,
                               ConsistencyLevel consistency_level)
-            throws IOException
             {
                 if (logger.isTraceEnabled())
                     logger.trace("insert writing local & replicate " + mutation.toString(true));
@@ -152,7 +150,6 @@ public class StorageProxy implements StorageProxyMBean
                               AbstractWriteResponseHandler responseHandler,
                               String localDataCenter,
                               ConsistencyLevel consistency_level)
-            throws IOException
             {
                 if (logger.isTraceEnabled())
                     logger.trace("insert writing local & replicate " + mutation.toString(true));
@@ -395,12 +392,10 @@ public class StorageProxy implements StorageProxyMBean
         long startTime = System.nanoTime();
         List<AbstractWriteResponseHandler> responseHandlers = new ArrayList<AbstractWriteResponseHandler>(mutations.size());
 
-        IMutation mostRecentMutation = null;
         try
         {
             for (IMutation mutation : mutations)
             {
-                mostRecentMutation = mutation;
                 if (mutation instanceof CounterMutation)
                 {
                     responseHandlers.add(mutateCounter((CounterMutation)mutation, localDataCenter));
@@ -446,11 +441,6 @@ public class StorageProxy implements StorageProxyMBean
             Tracing.trace("Overloaded");
             throw e;
         }
-        catch (IOException e)
-        {
-            assert mostRecentMutation != null;
-            throw new RuntimeException("error writing key " + ByteBufferUtil.bytesToHex(mostRecentMutation.key()), e);
-        }
         finally
         {
             writeMetrics.addNano(System.nanoTime() - startTime);
@@ -596,7 +586,7 @@ public class StorageProxy implements StorageProxyMBean
                                                             WritePerformer performer,
                                                             Runnable callback,
                                                             WriteType writeType)
-    throws UnavailableException, OverloadedException, IOException
+    throws UnavailableException, OverloadedException
     {
         String table = mutation.getTable();
         AbstractReplicationStrategy rs = Table.open(table).getReplicationStrategy();
@@ -763,7 +753,7 @@ public class StorageProxy implements StorageProxyMBean
 
         HintRunnable runnable = new HintRunnable(target)
         {
-            public void runMayThrow() throws IOException
+            public void runMayThrow()
             {
                 logger.debug("Adding hint for {}", target);
 
@@ -784,7 +774,7 @@ public class StorageProxy implements StorageProxyMBean
         return (Future<Void>) StageManager.getStage(Stage.MUTATION).submit(runnable);
     }
 
-    public static void writeHintForMutation(RowMutation mutation, InetAddress target) throws IOException
+    public static void writeHintForMutation(RowMutation mutation, InetAddress target)
     {
         UUID hostId = StorageService.instance.getTokenMetadata().getHostId(target);
         assert hostId != null : "Missing host ID for " + target.getHostAddress();
@@ -816,18 +806,6 @@ public class StorageProxy implements StorageProxyMBean
 
     private static void sendMessagesToOneDC(MessageOut message, Collection<InetAddress> targets, boolean localDC, AbstractWriteResponseHandler handler)
     {
-        try
-        {
-            sendMessagesToOneDCInternal(message, targets, localDC, handler);
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private static void sendMessagesToOneDCInternal(MessageOut message, Collection<InetAddress> targets, boolean localDC, AbstractWriteResponseHandler handler) throws IOException
-    {
         Iterator<InetAddress> iter = targets.iterator();
         InetAddress target = iter.next();
 
@@ -847,21 +825,28 @@ public class StorageProxy implements StorageProxyMBean
         }
 
         // Add all the other destinations of the same message as a FORWARD_HEADER entry
-        FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
-        DataOutputStream out = new DataOutputStream(bos);
-        out.writeInt(targets.size() - 1);
-        while (iter.hasNext())
-        {
-            InetAddress destination = iter.next();
-            CompactEndpointSerializationHelper.serialize(destination, out);
-            int id = MessagingService.instance().addCallback(handler, message, destination, message.getTimeout());
-            out.writeInt(id);
-            logger.trace("Adding FWD message to {}@{}", id, destination);
+        DataOutputBuffer out = new DataOutputBuffer();
+        try
+        {
+            out.writeInt(targets.size() - 1);
+            while (iter.hasNext())
+            {
+                InetAddress destination = iter.next();
+                CompactEndpointSerializationHelper.serialize(destination, out);
+                int id = MessagingService.instance().addCallback(handler, message, destination, message.getTimeout());
+                out.writeInt(id);
+                logger.trace("Adding FWD message to {}@{}", id, destination);
+            }
+            message = message.withParameter(RowMutation.FORWARD_TO, out.getData());
+            // send the combined message + forward headers
+            int id = MessagingService.instance().sendRR(message, target, handler);
+            logger.trace("Sending message to {}@{}", id, target);
+        }
+        catch (IOException e)
+        {
+            // DataOutputBuffer is in-memory, doesn't throw IOException
+            throw new AssertionError(e);
         }
-        message = message.withParameter(RowMutation.FORWARD_TO, bos.toByteArray());
-        // send the combined message + forward headers
-        int id = MessagingService.instance().sendRR(message, target, handler);
-        logger.trace("Sending message to {}@{}", id, target);
     }
 
     private static void insertLocal(final RowMutation rm, final AbstractWriteResponseHandler responseHandler)
@@ -871,7 +856,7 @@ public class StorageProxy implements StorageProxyMBean
 
         Runnable runnable = new DroppableRunnable(MessagingService.Verb.MUTATION)
         {
-            public void runMayThrow() throws IOException
+            public void runMayThrow()
             {
                 rm.apply();
                 responseHandler.response(null);
@@ -894,7 +879,7 @@ public class StorageProxy implements StorageProxyMBean
      * quicker response and because the WriteResponseHandlers don't make it easy to send back an error. We also always gather
      * the write latencies at the coordinator node to make gathering point similar to the case of standard writes.
      */
-    public static AbstractWriteResponseHandler mutateCounter(CounterMutation cm, String localDataCenter) throws UnavailableException, OverloadedException, IOException
+    public static AbstractWriteResponseHandler mutateCounter(CounterMutation cm, String localDataCenter) throws UnavailableException, OverloadedException
     {
         InetAddress endpoint = findSuitableEndpoint(cm.getTable(), cm.key(), localDataCenter, cm.consistency());
 
@@ -963,7 +948,7 @@ public class StorageProxy implements StorageProxyMBean
     // Must be called on a replica of the mutation. This replica becomes the
     // leader of this mutation.
     public static AbstractWriteResponseHandler applyCounterMutationOnLeader(CounterMutation cm, String localDataCenter, Runnable callback)
-    throws UnavailableException, IOException, OverloadedException
+    throws UnavailableException, OverloadedException
     {
         return performWrite(cm, cm.consistency(), localDataCenter, counterWritePerformer, callback, WriteType.COUNTER);
     }
@@ -971,7 +956,7 @@ public class StorageProxy implements StorageProxyMBean
     // Same as applyCounterMutationOnLeader but must with the difference that it use the MUTATION stage to execute the write (while
     // applyCounterMutationOnLeader assumes it is on the MUTATION stage already)
     public static AbstractWriteResponseHandler applyCounterMutationOnCoordinator(CounterMutation cm, String localDataCenter)
-    throws UnavailableException, IOException, OverloadedException
+    throws UnavailableException, OverloadedException
     {
         return performWrite(cm, cm.consistency(), localDataCenter, counterWriteOnCoordinatorPerformer, null, WriteType.COUNTER);
     }
@@ -984,7 +969,7 @@ public class StorageProxy implements StorageProxyMBean
     {
         return new LocalMutationRunnable()
         {
-            public void runMayThrow() throws IOException
+            public void runMayThrow()
             {
                 assert mutation instanceof CounterMutation;
                 final CounterMutation cm = (CounterMutation) mutation;
@@ -1738,7 +1723,7 @@ public class StorageProxy implements StorageProxyMBean
 
     public interface WritePerformer
     {
-        public void apply(IMutation mutation, Iterable<InetAddress> targets, AbstractWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws IOException, OverloadedException;
+        public void apply(IMutation mutation, Iterable<InetAddress> targets, AbstractWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws OverloadedException;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2506f1d/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index ec8241c..e606e06 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -607,14 +607,22 @@ public class FBUtilities
         public void close() {}
     }
 
-    public static <T> byte[] serialize(T object, IVersionedSerializer<T> serializer, int version) throws IOException
-    {
-        int size = (int) serializer.serializedSize(object, version);
-        DataOutputBuffer buffer = new DataOutputBuffer(size);
-        serializer.serialize(object, buffer, version);
-        assert buffer.getLength() == size && buffer.getData().length == size
-               : String.format("Final buffer length %s to accommodate data size of %s (predicted %s) for %s",
-                               buffer.getData().length, buffer.getLength(), size, object);
-        return buffer.getData();
+    public static <T> byte[] serialize(T object, IVersionedSerializer<T> serializer, int version)
+    {
+        try
+        {
+            int size = (int) serializer.serializedSize(object, version);
+            DataOutputBuffer buffer = new DataOutputBuffer(size);
+            serializer.serialize(object, buffer, version);
+            assert buffer.getLength() == size && buffer.getData().length == size
+                : String.format("Final buffer length %s to accommodate data size of %s (predicted %s) for %s",
+                        buffer.getData().length, buffer.getLength(), size, object);
+            return buffer.getData();
+        }
+        catch (IOException e)
+        {
+            // We're doing in-memory serialization...
+            throw new AssertionError(e);
+        }
     }
 }