You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/06/28 14:28:00 UTC
svn commit: r1140565 - in /cassandra/branches/cassandra-0.7: CHANGES.txt
src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
Author: jbellis
Date: Tue Jun 28 12:28:00 2011
New Revision: 1140565
URL: http://svn.apache.org/viewvc?rev=1140565&view=rev
Log:
fix race that could result in Hadoopwriter failing to throw exception for encountered error
patch by Mck SembWever and jbellis for CASSANDRA-2755
Modified:
cassandra/branches/cassandra-0.7/CHANGES.txt
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1140565&r1=1140564&r2=1140565&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Tue Jun 28 12:28:00 2011
@@ -23,6 +23,8 @@
* Expose number of threads blocked on submitting memtable to flush
(CASSANDRA-2817)
* Fix potential NPE during read repair (CASSANDRA-2823)
+ * fix race that could result in Hadoop writer failing to throw an
+ exception encountered after close() (CASSANDRA-2755)
0.7.6
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java?rev=1140565&r1=1140564&r2=1140565&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java Tue Jun 28 12:28:00 2011
@@ -32,15 +32,14 @@ import java.util.concurrent.TimeUnit;
import org.apache.cassandra.client.RingCache;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Pair;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TSocket;
-import org.apache.cassandra.utils.ByteBufferUtil;
/**
* The <code>ColumnFamilyRecordWriter</code> maps the output <key, value>
@@ -219,27 +218,33 @@ implements org.apache.hadoop.mapred.Reco
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException
{
- close((org.apache.hadoop.mapred.Reporter)null);
+ close();
}
/** Fills the deprecated RecordWriter interface for streaming. */
@Deprecated
public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException
{
+ close();
+ }
+
+ private void close() throws IOException
+ {
+ // close all the clients before throwing anything
+ IOException clientException = null;
for (RangeClient client : clients.values())
- client.stopNicely();
- try
{
- for (RangeClient client : clients.values())
+ try
{
- client.join();
client.close();
}
+ catch (IOException e)
+ {
+ clientException = e;
+ }
}
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
+ if (clientException != null)
+ throw clientException;
}
/**
@@ -255,6 +260,9 @@ implements org.apache.hadoop.mapred.Reco
private final BlockingQueue<Pair<ByteBuffer, Mutation>> queue = new ArrayBlockingQueue<Pair<ByteBuffer,Mutation>>(queueSize);
private volatile boolean run = true;
+ // we want the caller to know if something went wrong, so we record any unrecoverable exception while writing
+ // so we can throw it on the caller's stack when he calls put() again, or if there are no more put calls,
+ // when the client is closed.
private volatile IOException lastException;
private Cassandra.Client thriftClient;
@@ -291,15 +299,25 @@ implements org.apache.hadoop.mapred.Reco
}
}
- public void stopNicely() throws IOException
+ public void close() throws IOException
{
- if (lastException != null)
- throw lastException;
+ // stop the run loop. this will result in closeInternal being called by the time join() finishes.
run = false;
interrupt();
+ try
+ {
+ this.join();
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+
+ if (lastException != null)
+ throw lastException;
}
- public void close()
+ private void closeInternal()
{
if (thriftSocket != null)
{
@@ -356,7 +374,7 @@ implements org.apache.hadoop.mapred.Reco
}
catch (Exception e)
{
- close();
+ closeInternal();
if (!iter.hasNext())
{
lastException = new IOException(e);
@@ -373,7 +391,7 @@ implements org.apache.hadoop.mapred.Reco
}
catch (Exception e)
{
- close();
+ closeInternal();
// TException means something unexpected went wrong to that endpoint, so
// we should try again to another. Other exceptions (auth or invalid request) are fatal.
if ((!(e instanceof TException)) || !iter.hasNext())