You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/06/28 14:28:00 UTC

svn commit: r1140565 - in /cassandra/branches/cassandra-0.7: CHANGES.txt src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java

Author: jbellis
Date: Tue Jun 28 12:28:00 2011
New Revision: 1140565

URL: http://svn.apache.org/viewvc?rev=1140565&view=rev
Log:
fix race that could result in Hadoopwriter failing to throw exception for encountered error
patch by Mck SembWever and jbellis for CASSANDRA-2755

Modified:
    cassandra/branches/cassandra-0.7/CHANGES.txt
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java

Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1140565&r1=1140564&r2=1140565&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Tue Jun 28 12:28:00 2011
@@ -23,6 +23,8 @@
  * Expose number of threads blocked on submitting memtable to flush
    (CASSANDRA-2817)
  * Fix potential NPE during read repair (CASSANDRA-2823)
+ * fix race that could result in Hadoop writer failing to throw an
+   exception encountered after close() (CASSANDRA-2755)
 
 
 0.7.6

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java?rev=1140565&r1=1140564&r2=1140565&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java Tue Jun 28 12:28:00 2011
@@ -32,15 +32,14 @@ import java.util.concurrent.TimeUnit;
 import org.apache.cassandra.client.RingCache;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.thrift.TException;
 import org.apache.thrift.transport.TSocket;
 
-import org.apache.cassandra.utils.ByteBufferUtil;
 
 /**
  * The <code>ColumnFamilyRecordWriter</code> maps the output &lt;key, value&gt;
@@ -219,27 +218,33 @@ implements org.apache.hadoop.mapred.Reco
     @Override
     public void close(TaskAttemptContext context) throws IOException, InterruptedException
     {
-        close((org.apache.hadoop.mapred.Reporter)null);
+        close();
     }
 
     /** Fills the deprecated RecordWriter interface for streaming. */
     @Deprecated
     public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException
     {
+        close();
+    }
+
+    private void close() throws IOException
+    {
+        // close all the clients before throwing anything
+        IOException clientException = null;
         for (RangeClient client : clients.values())
-            client.stopNicely();
-        try
         {
-            for (RangeClient client : clients.values())
+            try
             {
-                client.join();
                 client.close();
             }
+            catch (IOException e)
+            {
+                clientException = e;
+            }
         }
-        catch (InterruptedException e)
-        {
-            throw new AssertionError(e);
-        }
+        if (clientException != null)
+            throw clientException;
     }
 
     /**
@@ -255,6 +260,9 @@ implements org.apache.hadoop.mapred.Reco
         private final BlockingQueue<Pair<ByteBuffer, Mutation>> queue = new ArrayBlockingQueue<Pair<ByteBuffer,Mutation>>(queueSize);
 
         private volatile boolean run = true;
+        // we want the caller to know if something went wrong, so we record any unrecoverable exception while writing
+        // so we can throw it on the caller's stack when he calls put() again, or if there are no more put calls,
+        // when the client is closed.
         private volatile IOException lastException;
 
         private Cassandra.Client thriftClient;
@@ -291,15 +299,25 @@ implements org.apache.hadoop.mapred.Reco
             }
         }
 
-        public void stopNicely() throws IOException
+        public void close() throws IOException
         {
-            if (lastException != null)
-                throw lastException;
+            // stop the run loop.  this will result in closeInternal being called by the time join() finishes.
             run = false;
             interrupt();
+            try
+            {
+                this.join();
+            }
+            catch (InterruptedException e)
+            {
+                throw new AssertionError(e);
+            }
+
+            if (lastException != null)
+                throw lastException;
         }
 
-        public void close()
+        private void closeInternal()
         {
             if (thriftSocket != null)
             {
@@ -356,7 +374,7 @@ implements org.apache.hadoop.mapred.Reco
                     }
                     catch (Exception e)
                     {
-                        close();
+                        closeInternal();
                         if (!iter.hasNext())
                         {
                             lastException = new IOException(e);
@@ -373,7 +391,7 @@ implements org.apache.hadoop.mapred.Reco
                     }
                     catch (Exception e)
                     {
-                        close();
+                        closeInternal();
                         // TException means something unexpected went wrong to that endpoint, so
                         // we should try again to another.  Other exceptions (auth or invalid request) are fatal.
                         if ((!(e instanceof TException)) || !iter.hasNext())