You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/06/28 19:59:00 UTC

svn commit: r1140758 - in /cassandra/trunk: ./ contrib/ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/concurrent/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassand...

Author: jbellis
Date: Tue Jun 28 17:58:59 2011
New Revision: 1140758

URL: http://svn.apache.org/viewvc?rev=1140758&view=rev
Log:
merge from 0.8

Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/NEWS.txt
    cassandra/trunk/build.xml
    cassandra/trunk/contrib/   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java   (props changed)
    cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
    cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
    cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
    cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
    cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
    cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
    cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
    cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
    cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jun 28 17:58:59 2011
@@ -1,7 +1,7 @@
 /cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1131291
-/cassandra/branches/cassandra-0.7:1026516-1138710,1138996
+/cassandra/branches/cassandra-0.7:1026516-1140567
 /cassandra/branches/cassandra-0.7.0:1053690-1055654
-/cassandra/branches/cassandra-0.8:1090934-1125013,1125019-1139360,1140470,1140472
+/cassandra/branches/cassandra-0.8:1090934-1125013,1125019-1140755
 /cassandra/branches/cassandra-0.8.0:1125021-1130369
 /cassandra/branches/cassandra-0.8.1:1101014-1125018
 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1140758&r1=1140757&r2=1140758&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Tue Jun 28 17:58:59 2011
@@ -16,9 +16,12 @@
  * Expose number of threads blocked on submitting memtable to flush
    (CASSANDRA-2817)
  * add ability to return "endpoints" to nodetool (CASSANDRA-2776)
+ * Add support for multiple (comma-delimited) coordinator addresses
+   to ColumnFamilyInputFormat (CASSANDRA-2807)
  * fix potential NPE while scheduling read repair for range slice
    (CASSANDRA-2823)
  * Fix race in SystemTable.getCurrentLocalNodeId (CASSANDRA-2824)
+ * Correctly set default for replicate_on_write (CASSANDRA-2835)
 
 
 0.8.1
@@ -96,6 +99,8 @@
  * fix repair hanging if a neighbor has nothing to send (CASSANDRA-2797)
  * purge tombstone even if row is in only one sstable (CASSANDRA-2801)
  * Fix wrong purge of deleted cf during compaction (CASSANDRA-2786)
+ * fix race that could result in Hadoop writer failing to throw an
+   exception encountered after close() (CASSANDRA-2755)
 
 
 0.8.0-final
@@ -214,9 +219,6 @@
  * reduce contention on Table.flusherLock (CASSANDRA-1954)
  * try harder to detect failures during streaming, cleaning up temporary
    files more reliably (CASSANDRA-2088)
-
-
-0.6.13
  * shut down server for OOM on a Thrift thread (CASSANDRA-2269)
  * fix tombstone handling in repair and sstable2json (CASSANDRA-2279)
  * preserve version when streaming data from old sstables (CASSANDRA-2283)

Modified: cassandra/trunk/NEWS.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/NEWS.txt?rev=1140758&r1=1140757&r2=1140758&view=diff
==============================================================================
--- cassandra/trunk/NEWS.txt (original)
+++ cassandra/trunk/NEWS.txt Tue Jun 28 17:58:59 2011
@@ -7,6 +7,20 @@ Upgrading
       sstableloader tool instead.
 
 
+0.8.2
+=====
+
+Upgrading
+---------
+    - 0.8.0 and 0.8.1 shipped with a bug that was setting the
+      replicate_on_write option for counter column families to false (this
+      option has no effect on non-counter column family). This is an unsafe
+      default and 0.8.2 correct this, the default for replicate_on_write is
+      now true. It is advised to update your counter column family definitions
+      if replicate_on_write was uncorrectly set to false (before or after
+      upgrade).
+
+
 0.8.1
 =====
 

Modified: cassandra/trunk/build.xml
URL: http://svn.apache.org/viewvc/cassandra/trunk/build.xml?rev=1140758&r1=1140757&r2=1140758&view=diff
==============================================================================
--- cassandra/trunk/build.xml (original)
+++ cassandra/trunk/build.xml Tue Jun 28 17:58:59 2011
@@ -25,7 +25,7 @@
     <property name="debuglevel" value="source,lines,vars"/>
 
     <!-- default version and SCM information (we need the default SCM info as people may checkout with git-svn) -->
-    <property name="base.version" value="0.8.1"/>
+    <property name="base.version" value="0.8.2-dev"/>
     <property name="scm.default.path" value="cassandra/branches/cassandra-0.8"/>
     <property name="scm.default.connection" value="scm:svn:http://svn.apache.org/repos/asf/${scm.default.path}"/>
     <property name="scm.default.developerConnection" value="scm:svn:https://svn.apache.org/repos/asf/${scm.default.path}"/>

Propchange: cassandra/trunk/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jun 28 17:58:59 2011
@@ -1,7 +1,7 @@
 /cassandra/branches/cassandra-0.6/contrib:922689-1052356,1052358-1053452,1053454,1053456-1068009
-/cassandra/branches/cassandra-0.7/contrib:1026516-1138710,1138996
+/cassandra/branches/cassandra-0.7/contrib:1026516-1140567
 /cassandra/branches/cassandra-0.7.0/contrib:1053690-1055654
-/cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125019-1139360,1140470,1140472
+/cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125019-1140755
 /cassandra/branches/cassandra-0.8.0/contrib:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/contrib:1101014-1125018
 /cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jun 28 17:58:59 2011
@@ -1,7 +1,7 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1138710,1138996
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1140567
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125019-1139360,1140470,1140472
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125019-1140755
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1101014-1125018
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jun 28 17:58:59 2011
@@ -1,7 +1,7 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1138710,1138996
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1140567
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125019-1139360,1140470,1140472
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125019-1140755
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1101014-1125018
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jun 28 17:58:59 2011
@@ -1,7 +1,7 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1138710,1138996
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1140567
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125019-1139360,1140470,1140472
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125019-1140755
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1101014-1125018
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jun 28 17:58:59 2011
@@ -1,7 +1,7 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1138710,1138996
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1140567
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125019-1139360,1140470,1140472
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125019-1140755
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1101014-1125018
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jun 28 17:58:59 2011
@@ -1,7 +1,7 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1138710,1138996
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1140567
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125019-1139360,1140470,1140472
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125019-1140755
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1101014-1125018
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689

Modified: cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=1140758&r1=1140757&r2=1140758&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java Tue Jun 28 17:58:59 2011
@@ -57,9 +57,9 @@ public class DebuggableThreadPoolExecuto
         super(corePoolSize, corePoolSize, keepAliveTime, unit, workQueue, threadFactory);
         allowCoreThreadTimeOut(true);
 
-        // preserve task serialization.  this is more complicated than it needs to be,
-        // since TPE rejects if queue.offer reports a full queue.  we'll just
-        // override this with a handler that retries until it gets in.  ugly, but effective.
+        // block task submissions until queue has room.
+        // this is fighting TPE's design a bit because TPE rejects if queue.offer reports a full queue.
+        // we'll just override this with a handler that retries until it gets in.  ugly, but effective.
         // (there is an extensive analysis of the options here at
         //  http://today.java.net/pub/a/today/2008/10/23/creating-a-notifying-blocking-thread-pool-executor.html)
         this.setRejectedExecutionHandler(new RejectedExecutionHandler()

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java?rev=1140758&r1=1140757&r2=1140758&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java Tue Jun 28 17:58:59 2011
@@ -656,6 +656,8 @@ public final class CFMetaData
     {
         if (!cf_def.isSetComment())
             cf_def.setComment("");
+        if (!cf_def.isSetReplicate_on_write())
+            cf_def.setReplicate_on_write(CFMetaData.DEFAULT_REPLICATE_ON_WRITE);
         if (!cf_def.isSetMin_compaction_threshold())
             cf_def.setMin_compaction_threshold(CFMetaData.DEFAULT_MIN_COMPACTION_THRESHOLD);
         if (!cf_def.isSetMax_compaction_threshold())

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1140758&r1=1140757&r2=1140758&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Tue Jun 28 17:58:59 2011
@@ -92,7 +92,7 @@ public class HintedHandOffManager implem
     public static final String HINTS_CF = "HintsColumnFamily";
 
     private static final Logger logger_ = LoggerFactory.getLogger(HintedHandOffManager.class);
-    private static final int PAGE_SIZE = 10000;
+    private static final int PAGE_SIZE = 1024;
     private static final String SEPARATOR = "-";
     private static final int LARGE_NUMBER = 65536; // 64k nodes ought to be enough for anybody.
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1140758&r1=1140757&r2=1140758&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java Tue Jun 28 17:58:59 2011
@@ -540,7 +540,7 @@ public class CompactionManager implement
                             goodRows++;
                         }
                         if (!key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex)
-                            logger.warn("Row scrubbed successfully but index file contains a different key or row size; consider rebuilding the index as described in http://www.mail-archive.com/user@cassandra.apache.org/msg03325.html");
+                            logger.warn("Index file contained a different key or row size; using key from data file");
                     }
                     catch (Throwable th)
                     {

Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1140758&r1=1140757&r2=1140758&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Tue Jun 28 17:58:59 2011
@@ -871,17 +871,13 @@ public class Gossiper implements IFailur
      */
     public void addSavedEndpoint(InetAddress ep)
     {
-        EndpointState epState = endpointStateMap.get(ep);
-        if (epState == null)
-        {
-            epState = new EndpointState(new HeartBeatState(0));
-            epState.markDead();
-            epState.setHasToken(true);
-            endpointStateMap.put(ep, epState);
-            unreachableEndpoints.put(ep, System.currentTimeMillis());
-            if (logger.isTraceEnabled())
-                logger.trace("Adding saved endpoint " + ep + " " + epState.getHeartBeatState().getGeneration());
-        }
+        EndpointState epState = new EndpointState(new HeartBeatState(0));
+        epState.markDead();
+        epState.setHasToken(true);
+        endpointStateMap.put(ep, epState);
+        unreachableEndpoints.put(ep, System.currentTimeMillis());
+        if (logger.isTraceEnabled())
+            logger.trace("Adding saved endpoint " + ep + " " + epState.getHeartBeatState().getGeneration());
     }
 
     public void addLocalApplicationState(ApplicationState state, VersionedValue value)

Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java?rev=1140758&r1=1140757&r2=1140758&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java Tue Jun 28 17:58:59 2011
@@ -214,7 +214,30 @@ public class ColumnFamilyInputFormat ext
 
     private List<TokenRange> getRangeMap(Configuration conf) throws IOException
     {
-        Cassandra.Client client = createConnection(ConfigHelper.getInitialAddress(conf), ConfigHelper.getRpcPort(conf), true);
+        String[] addresses = ConfigHelper.getInitialAddress(conf).split(",");
+        Cassandra.Client client = null;
+        List<IOException> exceptions = new ArrayList<IOException>();
+        for (String address : addresses)
+        {
+            try
+            {
+                client = createConnection(address, ConfigHelper.getRpcPort(conf), true);
+                break;
+            }
+            catch (IOException ioe)
+            {
+                exceptions.add(ioe);
+            }
+        }
+        if (client == null)
+        {
+            logger.error("failed to connect to any initial addresses");
+            for (IOException ioe : exceptions)
+            {
+                logger.error("", ioe);
+            }
+            throw exceptions.get(exceptions.size() - 1);
+        }
 
         List<TokenRange> map;
         try

Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java?rev=1140758&r1=1140757&r2=1140758&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java Tue Jun 28 17:58:59 2011
@@ -51,28 +51,19 @@ import org.slf4j.LoggerFactory;
 
 public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap<ByteBuffer, IColumn>>
 {
-    private static final Logger                              logger = LoggerFactory
-                                                                            .getLogger(ColumnFamilyRecordReader.class);
-    private ColumnFamilySplit                                split;
-    private RowIterator                                      iter;
+    private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyRecordReader.class);
+
+    private ColumnFamilySplit split;
+    private RowIterator iter;
     private Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>> currentRow;
-    private SlicePredicate                                   predicate;
-    private int                                              totalRowCount;                                            // total
-                                                                                                                        // number
-                                                                                                                        // of
-                                                                                                                        // rows
-                                                                                                                        // to
-                                                                                                                        // fetch
-    private int                                              batchRowCount;                                            // fetch
-                                                                                                                        // this
-                                                                                                                        // many
-                                                                                                                        // per
-                                                                                                                        // batch
-    private String                                           cfName;
-    private String                                           keyspace;
-    private TSocket                                          socket;
-    private Cassandra.Client                                 client;
-    private ConsistencyLevel                                 consistencyLevel;
+    private SlicePredicate predicate;
+    private int totalRowCount; // total number of rows to fetch
+    private int batchRowCount; // fetch this many per batch
+    private String cfName;
+    private String keyspace;
+    private TSocket socket;
+    private Cassandra.Client client;
+    private ConsistencyLevel consistencyLevel;
 
     public void close()
     {
@@ -121,7 +112,7 @@ public class ColumnFamilyRecordReader ex
 
             // create connection using thrift
             List<String> locationsAttempted = new ArrayList<String>();
-            for (Iterator<String> it = getLocations(conf); it.hasNext();)
+            for (Iterator<String> it = getLocations(conf); it.hasNext(); )
             {
                 String location = it.next();
                 try
@@ -139,12 +130,13 @@ public class ColumnFamilyRecordReader ex
                     client = null;
                 }
             }
-            if (null == client)
+            if (client == null)
             {
-                throw new RuntimeException("For the split " + split + " there were no locations "
-                        + (ConfigHelper.getInputSplitUseOnlySameDCReplica(conf) ? "(from same DC) " : "") + "alive: "
-                        + StringUtils.join(locationsAttempted, ", "));
+                String message = String.format("For the split %s there were no locations %salive: %s",
+                                               split, (ConfigHelper.getInputSplitUseOnlySameDCReplica(conf) ? "(from same DC) " : ""), StringUtils.join(locationsAttempted, ", "));
+                throw new RuntimeException(message);
             }
+
             // log in
             client.set_keyspace(keyspace);
             if (ConfigHelper.getInputKeyspaceUserName(conf) != null)
@@ -177,48 +169,37 @@ public class ColumnFamilyRecordReader ex
     // single-DC clusters, at least.
     private Iterator<String> getLocations(final Configuration conf) throws IOException
     {
-        try
+        for (InetAddress address : InetAddress.getAllByName(InetAddress.getLocalHost().getHostAddress()))
         {
-            for (InetAddress address : InetAddress.getAllByName(InetAddress.getLocalHost().getHostAddress()))
+            for (final String location : split.getLocations())
             {
-                for (final String location : split.getLocations())
+                InetAddress locationAddress;
+                try
                 {
-                    InetAddress locationAddress = getInetAddressByName(location);
-                    if (address.equals(locationAddress))
-                    {
-                        // add fall back replicas from same DC via the following
-                        // Iterator
-                        return new SplitEndpointIterator(location, conf);
-                    }
+                    locationAddress = InetAddress.getByName(location);
+                }
+                catch (UnknownHostException e)
+                {
+                    throw new AssertionError(e);
+                }
+
+                if (address.equals(locationAddress))
+                {
+                    // add fall back replicas from same DC via the following Iterator
+                    return new SplitEndpointIterator(location, conf);
                 }
             }
         }
-        catch (UnknownHostException e)
-        {
-            throw new AssertionError(e);
-        }
-        
-        return Arrays.asList(split.getLocations()).iterator();
-    }
 
-    private static InetAddress getInetAddressByName(String name)
-    {
-        try
-        {
-            return InetAddress.getByName(name);
-        }
-        catch (UnknownHostException e)
-        {
-            throw new AssertionError(e);
-        }
+        return Arrays.asList(split.getLocations()).iterator();
     }
 
     private class RowIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>>>
     {
-        private List<KeySlice>     rows;
-        private String             startToken;
-        private int                totalRead = 0;
-        private int                i         = 0;
+        private List<KeySlice> rows;
+        private String startToken;
+        private int totalRead = 0;
+        private int i = 0;
         private final AbstractType comparator;
         private final AbstractType subComparator;
         private final IPartitioner partitioner;
@@ -274,8 +255,7 @@ public class ColumnFamilyRecordReader ex
                 return;
             }
 
-            KeyRange keyRange = new KeyRange(batchRowCount).setStart_token(startToken)
-                    .setEnd_token(split.getEndToken());
+            KeyRange keyRange = new KeyRange(batchRowCount).setStart_token(startToken).setEnd_token(split.getEndToken());
             try
             {
                 rows = client.get_range_slices(new ColumnParent(cfName), predicate, keyRange, consistencyLevel);
@@ -335,8 +315,7 @@ public class ColumnFamilyRecordReader ex
 
         private IColumn unthriftifySuper(SuperColumn super_column)
         {
-            org.apache.cassandra.db.SuperColumn sc = new org.apache.cassandra.db.SuperColumn(super_column.name,
-                    subComparator);
+            org.apache.cassandra.db.SuperColumn sc = new org.apache.cassandra.db.SuperColumn(super_column.name, subComparator);
             for (Column column : super_column.columns)
             {
                 sc.addColumn(unthriftifySimple(column));
@@ -352,12 +331,12 @@ public class ColumnFamilyRecordReader ex
 
     private class SplitEndpointIterator extends AbstractIterator<String>
     {
-        private final boolean       restrictToSameDC;
-        private final String        location;
+        private final boolean restrictToSameDC;
+        private final String location;
         private final Configuration conf;
-        private Cassandra.Client    client;
-        private List<String>        endpoints;
-        private int                 endpointsIdx = -1;
+        private Cassandra.Client client;
+        private List<String> endpoints;
+        private int endpointsIdx = -1;
 
         SplitEndpointIterator(final String location, final Configuration conf)
         {
@@ -384,8 +363,7 @@ public class ColumnFamilyRecordReader ex
                         {
                             try
                             {
-                                endpoints = sortEndpointsByProximity(nextLocation, Arrays.asList(split.getLocations()),
-                                        restrictToSameDC);
+                                endpoints = sortEndpointsByProximity(nextLocation, Arrays.asList(split.getLocations()), restrictToSameDC);
                                 if (location.equals(endpoints.get(0)))
                                 {
                                     ++endpointsIdx;
@@ -394,17 +372,13 @@ public class ColumnFamilyRecordReader ex
                             }
                             catch (TException e)
                             {
-                                logger.info(
-                                        "failed to sortEndpointsByProximity(" + location + ", ["
-                                                + StringUtils.join(split.getLocations(), ',') + "], "
-                                                + restrictToSameDC + ")", e);
+                                logger.info(String.format("failed to sortEndpointsByProximity(%s, [%s], %s)",
+                                                          location, StringUtils.join(split.getLocations(), ','), restrictToSameDC), e);
                             }
                             catch (IOException e)
                             {
-                                logger.info(
-                                        "failed to sortEndpointsByProximity(" + location + ", ["
-                                                + StringUtils.join(split.getLocations(), ',') + "], "
-                                                + restrictToSameDC + ")", e);
+                                logger.info(String.format("failed to sortEndpointsByProximity(%s, [%s], %s)",
+                                                          location, StringUtils.join(split.getLocations(), ','), restrictToSameDC), e);
                             }
                         }
                     }
@@ -414,8 +388,8 @@ public class ColumnFamilyRecordReader ex
                     }
                     if (null == endpoints)
                     {
-                        throw new AssertionError("failed to find any fallback replica endpoints from "
-                                + StringUtils.join(split.getLocations(), ','));
+                        throw new AssertionError(String.format("failed to find any fallback replica endpoints from %s",
+                                                               StringUtils.join(split.getLocations(), ',')));
                     }
                 }
                 if (endpoints.size() > endpointsIdx)
@@ -432,8 +406,7 @@ public class ColumnFamilyRecordReader ex
             try
             {
                 // try first our configured initialAddress
-                return getClient(ConfigHelper.getInitialAddress(conf)).sort_endpoints_by_proximity(location, endpoints,
-                        restrictToSameDC);
+                return getClient(ConfigHelper.getInitialAddress(conf)).sort_endpoints_by_proximity(location, endpoints, restrictToSameDC);
             }
             catch (IOException ex)
             {

Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java?rev=1140758&r1=1140757&r2=1140758&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java Tue Jun 28 17:58:59 2011
@@ -32,15 +32,14 @@ import java.util.concurrent.TimeUnit;
 import org.apache.cassandra.client.RingCache;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.thrift.TException;
 import org.apache.thrift.transport.TSocket;
 
-import org.apache.cassandra.utils.ByteBufferUtil;
 
 /**
  * The <code>ColumnFamilyRecordWriter</code> maps the output &lt;key, value&gt;
@@ -150,27 +149,33 @@ implements org.apache.hadoop.mapred.Reco
     @Override
     public void close(TaskAttemptContext context) throws IOException, InterruptedException
     {
-        close((org.apache.hadoop.mapred.Reporter)null);
+        close();
     }
 
     /** Fills the deprecated RecordWriter interface for streaming. */
     @Deprecated
     public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException
     {
+        close();
+    }
+
+    private void close() throws IOException
+    {
+        // close all the clients before throwing anything
+        IOException clientException = null;
         for (RangeClient client : clients.values())
-            client.stopNicely();
-        try
         {
-            for (RangeClient client : clients.values())
+            try
             {
-                client.join();
                 client.close();
             }
+            catch (IOException e)
+            {
+                clientException = e;
+            }
         }
-        catch (InterruptedException e)
-        {
-            throw new AssertionError(e);
-        }
+        if (clientException != null)
+            throw clientException;
     }
 
     /**
@@ -186,6 +191,9 @@ implements org.apache.hadoop.mapred.Reco
         private final BlockingQueue<Pair<ByteBuffer, Mutation>> queue = new ArrayBlockingQueue<Pair<ByteBuffer,Mutation>>(queueSize);
 
         private volatile boolean run = true;
+        // we want the caller to know if something went wrong, so we record any unrecoverable exception while writing
+        // so we can throw it on the caller's stack when he calls put() again, or if there are no more put calls,
+        // when the client is closed.
         private volatile IOException lastException;
 
         private Cassandra.Client thriftClient;
@@ -222,15 +230,25 @@ implements org.apache.hadoop.mapred.Reco
             }
         }
 
-        public void stopNicely() throws IOException
+        public void close() throws IOException
         {
-            if (lastException != null)
-                throw lastException;
+            // stop the run loop.  this will result in closeInternal being called by the time join() finishes.
             run = false;
             interrupt();
+            try
+            {
+                this.join();
+            }
+            catch (InterruptedException e)
+            {
+                throw new AssertionError(e);
+            }
+
+            if (lastException != null)
+                throw lastException;
         }
 
-        public void close()
+        private void closeInternal()
         {
             if (thriftSocket != null)
             {
@@ -287,7 +305,7 @@ implements org.apache.hadoop.mapred.Reco
                     }
                     catch (Exception e)
                     {
-                        close();
+                        closeInternal();
                         if (!iter.hasNext())
                         {
                             lastException = new IOException(e);
@@ -304,7 +322,7 @@ implements org.apache.hadoop.mapred.Reco
                     }
                     catch (Exception e)
                     {
-                        close();
+                        closeInternal();
                         // TException means something unexpected went wrong to that endpoint, so
                         // we should try again to another.  Other exceptions (auth or invalid request) are fatal.
                         if ((!(e instanceof TException)) || !iter.hasNext())

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=1140758&r1=1140757&r2=1140758&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Tue Jun 28 17:58:59 2011
@@ -70,25 +70,43 @@ public class IncomingTcpConnection exten
                 // we should buffer
                 input = new DataInputStream(new BufferedInputStream(socket.getInputStream(), 4096));
             version = MessagingService.getBits(header, 15, 8);
-            Gossiper.instance.setVersion(socket.getInetAddress(), version);
+            if (logger.isDebugEnabled())
+                logger.debug("Version for " + socket.getInetAddress() + " is " + version);
         }
         catch (IOException e)
         {
             close();
             throw new IOError(e);
         }
+
+        if (version > MessagingService.version_)
+        {
+            // save the endpoint so gossip will reconnect to it
+            Gossiper.instance.addSavedEndpoint(socket.getInetAddress());
+            logger.info("Received " + (isStream ? "streaming " : "") + "connection from newer protocol version. Ignorning");
+
+            // streaming connections are per-session and have a fixed version.  we can't do anything with a new-version
+            // stream connection, so drop it.
+            if (isStream)
+            {
+                close();
+                return;
+            }
+            // for non-streaming connections, continue to read the messages (and ignore them) until sender
+            // starts sending correct-version messages (which it can do without reconnecting -- version is per-Message)
+        }
+        else
+        {
+            // only set version when <= to us, otherwise it's the responsibility of the other end to mimic us
+            Gossiper.instance.setVersion(socket.getInetAddress(), version);
+        }
+
         while (true)
         {
             try
             {
                 if (isStream)
                 {
-                    if (version > MessagingService.version_)
-                    {
-                        logger.error("Received untranslated stream from newer protcol version. Terminating connection!");
-                        close();
-                        return;
-                    }
                     int size = input.readInt();
                     byte[] headerBytes = new byte[size];
                     input.readFully(headerBytes);
@@ -106,11 +124,8 @@ public class IncomingTcpConnection exten
                         input.readFully(contentBytes, offset, CHUNK_SIZE);
                     input.readFully(contentBytes, size - remainder, remainder);
 
-                    if (version > MessagingService.version_)
-                        logger.info("Received connection from newer protocol version. Ignorning message.");
-                    else
+                    if (version <= MessagingService.version_)
                     {
-                        // todo: need to be aware of message version.
                         DataInputStream dis = new DataInputStream(new ByteArrayInputStream(contentBytes));
                         String id = dis.readUTF();
                         Message message = Message.serializer().deserialize(dis, version);
@@ -120,9 +135,8 @@ public class IncomingTcpConnection exten
                 // prepare to read the next message
                 MessagingService.validateMagic(input.readInt());
                 int header = input.readInt();
-                version = MessagingService.getBits(header, 15, 8);
                 assert isStream == (MessagingService.getBits(header, 3, 1) == 1) : "Connections cannot change type: " + isStream;
-                assert version == MessagingService.getBits(header, 15, 8) : "Protocol version shouldn't change during a session";
+                version = MessagingService.getBits(header, 15, 8);
             }
             catch (EOFException e)
             {

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java?rev=1140758&r1=1140757&r2=1140758&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java Tue Jun 28 17:58:59 2011
@@ -165,9 +165,10 @@ public class OutboundTcpConnection exten
             try
             {
                 // zero means 'bind on any available port.'
-                if (DatabaseDescriptor.getEncryptionOptions().internode_encryption == EncryptionOptions.InternodeEncryption.all)
+                EncryptionOptions options = DatabaseDescriptor.getEncryptionOptions();
+                if (options != null && options.internode_encryption == EncryptionOptions.InternodeEncryption.all)
                 {
-                    socket = SSLFactory.getSocket(DatabaseDescriptor.getEncryptionOptions(), endpoint, DatabaseDescriptor.getStoragePort(), FBUtilities.getLocalAddress(), 0);
+                    socket = SSLFactory.getSocket(options, endpoint, DatabaseDescriptor.getStoragePort(), FBUtilities.getLocalAddress(), 0);
                 }
                 else {
                     socket = new Socket(endpoint, DatabaseDescriptor.getStoragePort(), FBUtilities.getLocalAddress(), 0);