You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/06/28 19:59:00 UTC
svn commit: r1140758 - in /cassandra/trunk: ./ contrib/
interface/thrift/gen-java/org/apache/cassandra/thrift/
src/java/org/apache/cassandra/concurrent/
src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/
src/java/org/apache/cassand...
Author: jbellis
Date: Tue Jun 28 17:58:59 2011
New Revision: 1140758
URL: http://svn.apache.org/viewvc?rev=1140758&view=rev
Log:
merge from 0.8
Modified:
cassandra/trunk/ (props changed)
cassandra/trunk/CHANGES.txt
cassandra/trunk/NEWS.txt
cassandra/trunk/build.xml
cassandra/trunk/contrib/ (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java (props changed)
cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jun 28 17:58:59 2011
@@ -1,7 +1,7 @@
/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1131291
-/cassandra/branches/cassandra-0.7:1026516-1138710,1138996
+/cassandra/branches/cassandra-0.7:1026516-1140567
/cassandra/branches/cassandra-0.7.0:1053690-1055654
-/cassandra/branches/cassandra-0.8:1090934-1125013,1125019-1139360,1140470,1140472
+/cassandra/branches/cassandra-0.8:1090934-1125013,1125019-1140755
/cassandra/branches/cassandra-0.8.0:1125021-1130369
/cassandra/branches/cassandra-0.8.1:1101014-1125018
/cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1140758&r1=1140757&r2=1140758&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Tue Jun 28 17:58:59 2011
@@ -16,9 +16,12 @@
* Expose number of threads blocked on submitting memtable to flush
(CASSANDRA-2817)
* add ability to return "endpoints" to nodetool (CASSANDRA-2776)
+ * Add support for multiple (comma-delimited) coordinator addresses
+ to ColumnFamilyInputFormat (CASSANDRA-2807)
* fix potential NPE while scheduling read repair for range slice
(CASSANDRA-2823)
* Fix race in SystemTable.getCurrentLocalNodeId (CASSANDRA-2824)
+ * Correctly set default for replicate_on_write (CASSANDRA-2835)
0.8.1
@@ -96,6 +99,8 @@
* fix repair hanging if a neighbor has nothing to send (CASSANDRA-2797)
* purge tombstone even if row is in only one sstable (CASSANDRA-2801)
* Fix wrong purge of deleted cf during compaction (CASSANDRA-2786)
+ * fix race that could result in Hadoop writer failing to throw an
+ exception encountered after close() (CASSANDRA-2755)
0.8.0-final
@@ -214,9 +219,6 @@
* reduce contention on Table.flusherLock (CASSANDRA-1954)
* try harder to detect failures during streaming, cleaning up temporary
files more reliably (CASSANDRA-2088)
-
-
-0.6.13
* shut down server for OOM on a Thrift thread (CASSANDRA-2269)
* fix tombstone handling in repair and sstable2json (CASSANDRA-2279)
* preserve version when streaming data from old sstables (CASSANDRA-2283)
Modified: cassandra/trunk/NEWS.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/NEWS.txt?rev=1140758&r1=1140757&r2=1140758&view=diff
==============================================================================
--- cassandra/trunk/NEWS.txt (original)
+++ cassandra/trunk/NEWS.txt Tue Jun 28 17:58:59 2011
@@ -7,6 +7,20 @@ Upgrading
sstableloader tool instead.
+0.8.2
+=====
+
+Upgrading
+---------
+ - 0.8.0 and 0.8.1 shipped with a bug that was setting the
+ replicate_on_write option for counter column families to false (this
+ option has no effect on non-counter column family). This is an unsafe
+ default and 0.8.2 correct this, the default for replicate_on_write is
+ now true. It is advised to update your counter column family definitions
+ if replicate_on_write was uncorrectly set to false (before or after
+ upgrade).
+
+
0.8.1
=====
Modified: cassandra/trunk/build.xml
URL: http://svn.apache.org/viewvc/cassandra/trunk/build.xml?rev=1140758&r1=1140757&r2=1140758&view=diff
==============================================================================
--- cassandra/trunk/build.xml (original)
+++ cassandra/trunk/build.xml Tue Jun 28 17:58:59 2011
@@ -25,7 +25,7 @@
<property name="debuglevel" value="source,lines,vars"/>
<!-- default version and SCM information (we need the default SCM info as people may checkout with git-svn) -->
- <property name="base.version" value="0.8.1"/>
+ <property name="base.version" value="0.8.2-dev"/>
<property name="scm.default.path" value="cassandra/branches/cassandra-0.8"/>
<property name="scm.default.connection" value="scm:svn:http://svn.apache.org/repos/asf/${scm.default.path}"/>
<property name="scm.default.developerConnection" value="scm:svn:https://svn.apache.org/repos/asf/${scm.default.path}"/>
Propchange: cassandra/trunk/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jun 28 17:58:59 2011
@@ -1,7 +1,7 @@
/cassandra/branches/cassandra-0.6/contrib:922689-1052356,1052358-1053452,1053454,1053456-1068009
-/cassandra/branches/cassandra-0.7/contrib:1026516-1138710,1138996
+/cassandra/branches/cassandra-0.7/contrib:1026516-1140567
/cassandra/branches/cassandra-0.7.0/contrib:1053690-1055654
-/cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125019-1139360,1140470,1140472
+/cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125019-1140755
/cassandra/branches/cassandra-0.8.0/contrib:1125021-1130369
/cassandra/branches/cassandra-0.8.1/contrib:1101014-1125018
/cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jun 28 17:58:59 2011
@@ -1,7 +1,7 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1138710,1138996
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1140567
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125019-1139360,1140470,1140472
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125019-1140755
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1101014-1125018
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jun 28 17:58:59 2011
@@ -1,7 +1,7 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1138710,1138996
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1140567
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125019-1139360,1140470,1140472
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125019-1140755
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1101014-1125018
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jun 28 17:58:59 2011
@@ -1,7 +1,7 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1138710,1138996
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1140567
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125019-1139360,1140470,1140472
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125019-1140755
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1101014-1125018
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jun 28 17:58:59 2011
@@ -1,7 +1,7 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1138710,1138996
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1140567
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125019-1139360,1140470,1140472
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125019-1140755
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1101014-1125018
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jun 28 17:58:59 2011
@@ -1,7 +1,7 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1138710,1138996
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1140567
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125019-1139360,1140470,1140472
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125019-1140755
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1101014-1125018
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
Modified: cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=1140758&r1=1140757&r2=1140758&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java Tue Jun 28 17:58:59 2011
@@ -57,9 +57,9 @@ public class DebuggableThreadPoolExecuto
super(corePoolSize, corePoolSize, keepAliveTime, unit, workQueue, threadFactory);
allowCoreThreadTimeOut(true);
- // preserve task serialization. this is more complicated than it needs to be,
- // since TPE rejects if queue.offer reports a full queue. we'll just
- // override this with a handler that retries until it gets in. ugly, but effective.
+ // block task submissions until queue has room.
+ // this is fighting TPE's design a bit because TPE rejects if queue.offer reports a full queue.
+ // we'll just override this with a handler that retries until it gets in. ugly, but effective.
// (there is an extensive analysis of the options here at
// http://today.java.net/pub/a/today/2008/10/23/creating-a-notifying-blocking-thread-pool-executor.html)
this.setRejectedExecutionHandler(new RejectedExecutionHandler()
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java?rev=1140758&r1=1140757&r2=1140758&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java Tue Jun 28 17:58:59 2011
@@ -656,6 +656,8 @@ public final class CFMetaData
{
if (!cf_def.isSetComment())
cf_def.setComment("");
+ if (!cf_def.isSetReplicate_on_write())
+ cf_def.setReplicate_on_write(CFMetaData.DEFAULT_REPLICATE_ON_WRITE);
if (!cf_def.isSetMin_compaction_threshold())
cf_def.setMin_compaction_threshold(CFMetaData.DEFAULT_MIN_COMPACTION_THRESHOLD);
if (!cf_def.isSetMax_compaction_threshold())
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1140758&r1=1140757&r2=1140758&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Tue Jun 28 17:58:59 2011
@@ -92,7 +92,7 @@ public class HintedHandOffManager implem
public static final String HINTS_CF = "HintsColumnFamily";
private static final Logger logger_ = LoggerFactory.getLogger(HintedHandOffManager.class);
- private static final int PAGE_SIZE = 10000;
+ private static final int PAGE_SIZE = 1024;
private static final String SEPARATOR = "-";
private static final int LARGE_NUMBER = 65536; // 64k nodes ought to be enough for anybody.
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1140758&r1=1140757&r2=1140758&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java Tue Jun 28 17:58:59 2011
@@ -540,7 +540,7 @@ public class CompactionManager implement
goodRows++;
}
if (!key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex)
- logger.warn("Row scrubbed successfully but index file contains a different key or row size; consider rebuilding the index as described in http://www.mail-archive.com/user@cassandra.apache.org/msg03325.html");
+ logger.warn("Index file contained a different key or row size; using key from data file");
}
catch (Throwable th)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1140758&r1=1140757&r2=1140758&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Tue Jun 28 17:58:59 2011
@@ -871,17 +871,13 @@ public class Gossiper implements IFailur
*/
public void addSavedEndpoint(InetAddress ep)
{
- EndpointState epState = endpointStateMap.get(ep);
- if (epState == null)
- {
- epState = new EndpointState(new HeartBeatState(0));
- epState.markDead();
- epState.setHasToken(true);
- endpointStateMap.put(ep, epState);
- unreachableEndpoints.put(ep, System.currentTimeMillis());
- if (logger.isTraceEnabled())
- logger.trace("Adding saved endpoint " + ep + " " + epState.getHeartBeatState().getGeneration());
- }
+ EndpointState epState = new EndpointState(new HeartBeatState(0));
+ epState.markDead();
+ epState.setHasToken(true);
+ endpointStateMap.put(ep, epState);
+ unreachableEndpoints.put(ep, System.currentTimeMillis());
+ if (logger.isTraceEnabled())
+ logger.trace("Adding saved endpoint " + ep + " " + epState.getHeartBeatState().getGeneration());
}
public void addLocalApplicationState(ApplicationState state, VersionedValue value)
Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java?rev=1140758&r1=1140757&r2=1140758&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java Tue Jun 28 17:58:59 2011
@@ -214,7 +214,30 @@ public class ColumnFamilyInputFormat ext
private List<TokenRange> getRangeMap(Configuration conf) throws IOException
{
- Cassandra.Client client = createConnection(ConfigHelper.getInitialAddress(conf), ConfigHelper.getRpcPort(conf), true);
+ String[] addresses = ConfigHelper.getInitialAddress(conf).split(",");
+ Cassandra.Client client = null;
+ List<IOException> exceptions = new ArrayList<IOException>();
+ for (String address : addresses)
+ {
+ try
+ {
+ client = createConnection(address, ConfigHelper.getRpcPort(conf), true);
+ break;
+ }
+ catch (IOException ioe)
+ {
+ exceptions.add(ioe);
+ }
+ }
+ if (client == null)
+ {
+ logger.error("failed to connect to any initial addresses");
+ for (IOException ioe : exceptions)
+ {
+ logger.error("", ioe);
+ }
+ throw exceptions.get(exceptions.size() - 1);
+ }
List<TokenRange> map;
try
Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java?rev=1140758&r1=1140757&r2=1140758&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java Tue Jun 28 17:58:59 2011
@@ -51,28 +51,19 @@ import org.slf4j.LoggerFactory;
public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap<ByteBuffer, IColumn>>
{
- private static final Logger logger = LoggerFactory
- .getLogger(ColumnFamilyRecordReader.class);
- private ColumnFamilySplit split;
- private RowIterator iter;
+ private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyRecordReader.class);
+
+ private ColumnFamilySplit split;
+ private RowIterator iter;
private Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>> currentRow;
- private SlicePredicate predicate;
- private int totalRowCount; // total
- // number
- // of
- // rows
- // to
- // fetch
- private int batchRowCount; // fetch
- // this
- // many
- // per
- // batch
- private String cfName;
- private String keyspace;
- private TSocket socket;
- private Cassandra.Client client;
- private ConsistencyLevel consistencyLevel;
+ private SlicePredicate predicate;
+ private int totalRowCount; // total number of rows to fetch
+ private int batchRowCount; // fetch this many per batch
+ private String cfName;
+ private String keyspace;
+ private TSocket socket;
+ private Cassandra.Client client;
+ private ConsistencyLevel consistencyLevel;
public void close()
{
@@ -121,7 +112,7 @@ public class ColumnFamilyRecordReader ex
// create connection using thrift
List<String> locationsAttempted = new ArrayList<String>();
- for (Iterator<String> it = getLocations(conf); it.hasNext();)
+ for (Iterator<String> it = getLocations(conf); it.hasNext(); )
{
String location = it.next();
try
@@ -139,12 +130,13 @@ public class ColumnFamilyRecordReader ex
client = null;
}
}
- if (null == client)
+ if (client == null)
{
- throw new RuntimeException("For the split " + split + " there were no locations "
- + (ConfigHelper.getInputSplitUseOnlySameDCReplica(conf) ? "(from same DC) " : "") + "alive: "
- + StringUtils.join(locationsAttempted, ", "));
+ String message = String.format("For the split %s there were no locations %salive: %s",
+ split, (ConfigHelper.getInputSplitUseOnlySameDCReplica(conf) ? "(from same DC) " : ""), StringUtils.join(locationsAttempted, ", "));
+ throw new RuntimeException(message);
}
+
// log in
client.set_keyspace(keyspace);
if (ConfigHelper.getInputKeyspaceUserName(conf) != null)
@@ -177,48 +169,37 @@ public class ColumnFamilyRecordReader ex
// single-DC clusters, at least.
private Iterator<String> getLocations(final Configuration conf) throws IOException
{
- try
+ for (InetAddress address : InetAddress.getAllByName(InetAddress.getLocalHost().getHostAddress()))
{
- for (InetAddress address : InetAddress.getAllByName(InetAddress.getLocalHost().getHostAddress()))
+ for (final String location : split.getLocations())
{
- for (final String location : split.getLocations())
+ InetAddress locationAddress;
+ try
{
- InetAddress locationAddress = getInetAddressByName(location);
- if (address.equals(locationAddress))
- {
- // add fall back replicas from same DC via the following
- // Iterator
- return new SplitEndpointIterator(location, conf);
- }
+ locationAddress = InetAddress.getByName(location);
+ }
+ catch (UnknownHostException e)
+ {
+ throw new AssertionError(e);
+ }
+
+ if (address.equals(locationAddress))
+ {
+ // add fall back replicas from same DC via the following Iterator
+ return new SplitEndpointIterator(location, conf);
}
}
}
- catch (UnknownHostException e)
- {
- throw new AssertionError(e);
- }
-
- return Arrays.asList(split.getLocations()).iterator();
- }
- private static InetAddress getInetAddressByName(String name)
- {
- try
- {
- return InetAddress.getByName(name);
- }
- catch (UnknownHostException e)
- {
- throw new AssertionError(e);
- }
+ return Arrays.asList(split.getLocations()).iterator();
}
private class RowIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>>>
{
- private List<KeySlice> rows;
- private String startToken;
- private int totalRead = 0;
- private int i = 0;
+ private List<KeySlice> rows;
+ private String startToken;
+ private int totalRead = 0;
+ private int i = 0;
private final AbstractType comparator;
private final AbstractType subComparator;
private final IPartitioner partitioner;
@@ -274,8 +255,7 @@ public class ColumnFamilyRecordReader ex
return;
}
- KeyRange keyRange = new KeyRange(batchRowCount).setStart_token(startToken)
- .setEnd_token(split.getEndToken());
+ KeyRange keyRange = new KeyRange(batchRowCount).setStart_token(startToken).setEnd_token(split.getEndToken());
try
{
rows = client.get_range_slices(new ColumnParent(cfName), predicate, keyRange, consistencyLevel);
@@ -335,8 +315,7 @@ public class ColumnFamilyRecordReader ex
private IColumn unthriftifySuper(SuperColumn super_column)
{
- org.apache.cassandra.db.SuperColumn sc = new org.apache.cassandra.db.SuperColumn(super_column.name,
- subComparator);
+ org.apache.cassandra.db.SuperColumn sc = new org.apache.cassandra.db.SuperColumn(super_column.name, subComparator);
for (Column column : super_column.columns)
{
sc.addColumn(unthriftifySimple(column));
@@ -352,12 +331,12 @@ public class ColumnFamilyRecordReader ex
private class SplitEndpointIterator extends AbstractIterator<String>
{
- private final boolean restrictToSameDC;
- private final String location;
+ private final boolean restrictToSameDC;
+ private final String location;
private final Configuration conf;
- private Cassandra.Client client;
- private List<String> endpoints;
- private int endpointsIdx = -1;
+ private Cassandra.Client client;
+ private List<String> endpoints;
+ private int endpointsIdx = -1;
SplitEndpointIterator(final String location, final Configuration conf)
{
@@ -384,8 +363,7 @@ public class ColumnFamilyRecordReader ex
{
try
{
- endpoints = sortEndpointsByProximity(nextLocation, Arrays.asList(split.getLocations()),
- restrictToSameDC);
+ endpoints = sortEndpointsByProximity(nextLocation, Arrays.asList(split.getLocations()), restrictToSameDC);
if (location.equals(endpoints.get(0)))
{
++endpointsIdx;
@@ -394,17 +372,13 @@ public class ColumnFamilyRecordReader ex
}
catch (TException e)
{
- logger.info(
- "failed to sortEndpointsByProximity(" + location + ", ["
- + StringUtils.join(split.getLocations(), ',') + "], "
- + restrictToSameDC + ")", e);
+ logger.info(String.format("failed to sortEndpointsByProximity(%s, [%s], %s)",
+ location, StringUtils.join(split.getLocations(), ','), restrictToSameDC), e);
}
catch (IOException e)
{
- logger.info(
- "failed to sortEndpointsByProximity(" + location + ", ["
- + StringUtils.join(split.getLocations(), ',') + "], "
- + restrictToSameDC + ")", e);
+ logger.info(String.format("failed to sortEndpointsByProximity(%s, [%s], %s)",
+ location, StringUtils.join(split.getLocations(), ','), restrictToSameDC), e);
}
}
}
@@ -414,8 +388,8 @@ public class ColumnFamilyRecordReader ex
}
if (null == endpoints)
{
- throw new AssertionError("failed to find any fallback replica endpoints from "
- + StringUtils.join(split.getLocations(), ','));
+ throw new AssertionError(String.format("failed to find any fallback replica endpoints from %s",
+ StringUtils.join(split.getLocations(), ',')));
}
}
if (endpoints.size() > endpointsIdx)
@@ -432,8 +406,7 @@ public class ColumnFamilyRecordReader ex
try
{
// try first our configured initialAddress
- return getClient(ConfigHelper.getInitialAddress(conf)).sort_endpoints_by_proximity(location, endpoints,
- restrictToSameDC);
+ return getClient(ConfigHelper.getInitialAddress(conf)).sort_endpoints_by_proximity(location, endpoints, restrictToSameDC);
}
catch (IOException ex)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java?rev=1140758&r1=1140757&r2=1140758&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java Tue Jun 28 17:58:59 2011
@@ -32,15 +32,14 @@ import java.util.concurrent.TimeUnit;
import org.apache.cassandra.client.RingCache;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Pair;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TSocket;
-import org.apache.cassandra.utils.ByteBufferUtil;
/**
* The <code>ColumnFamilyRecordWriter</code> maps the output <key, value>
@@ -150,27 +149,33 @@ implements org.apache.hadoop.mapred.Reco
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException
{
- close((org.apache.hadoop.mapred.Reporter)null);
+ close();
}
/** Fills the deprecated RecordWriter interface for streaming. */
@Deprecated
public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException
{
+ close();
+ }
+
+ private void close() throws IOException
+ {
+ // close all the clients before throwing anything
+ IOException clientException = null;
for (RangeClient client : clients.values())
- client.stopNicely();
- try
{
- for (RangeClient client : clients.values())
+ try
{
- client.join();
client.close();
}
+ catch (IOException e)
+ {
+ clientException = e;
+ }
}
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
+ if (clientException != null)
+ throw clientException;
}
/**
@@ -186,6 +191,9 @@ implements org.apache.hadoop.mapred.Reco
private final BlockingQueue<Pair<ByteBuffer, Mutation>> queue = new ArrayBlockingQueue<Pair<ByteBuffer,Mutation>>(queueSize);
private volatile boolean run = true;
+ // we want the caller to know if something went wrong, so we record any unrecoverable exception while writing
+ // so we can throw it on the caller's stack when he calls put() again, or if there are no more put calls,
+ // when the client is closed.
private volatile IOException lastException;
private Cassandra.Client thriftClient;
@@ -222,15 +230,25 @@ implements org.apache.hadoop.mapred.Reco
}
}
- public void stopNicely() throws IOException
+ public void close() throws IOException
{
- if (lastException != null)
- throw lastException;
+ // stop the run loop. this will result in closeInternal being called by the time join() finishes.
run = false;
interrupt();
+ try
+ {
+ this.join();
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+
+ if (lastException != null)
+ throw lastException;
}
- public void close()
+ private void closeInternal()
{
if (thriftSocket != null)
{
@@ -287,7 +305,7 @@ implements org.apache.hadoop.mapred.Reco
}
catch (Exception e)
{
- close();
+ closeInternal();
if (!iter.hasNext())
{
lastException = new IOException(e);
@@ -304,7 +322,7 @@ implements org.apache.hadoop.mapred.Reco
}
catch (Exception e)
{
- close();
+ closeInternal();
// TException means something unexpected went wrong to that endpoint, so
// we should try again to another. Other exceptions (auth or invalid request) are fatal.
if ((!(e instanceof TException)) || !iter.hasNext())
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=1140758&r1=1140757&r2=1140758&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Tue Jun 28 17:58:59 2011
@@ -70,25 +70,43 @@ public class IncomingTcpConnection exten
// we should buffer
input = new DataInputStream(new BufferedInputStream(socket.getInputStream(), 4096));
version = MessagingService.getBits(header, 15, 8);
- Gossiper.instance.setVersion(socket.getInetAddress(), version);
+ if (logger.isDebugEnabled())
+ logger.debug("Version for " + socket.getInetAddress() + " is " + version);
}
catch (IOException e)
{
close();
throw new IOError(e);
}
+
+ if (version > MessagingService.version_)
+ {
+ // save the endpoint so gossip will reconnect to it
+ Gossiper.instance.addSavedEndpoint(socket.getInetAddress());
+ logger.info("Received " + (isStream ? "streaming " : "") + "connection from newer protocol version. Ignorning");
+
+ // streaming connections are per-session and have a fixed version. we can't do anything with a new-version
+ // stream connection, so drop it.
+ if (isStream)
+ {
+ close();
+ return;
+ }
+ // for non-streaming connections, continue to read the messages (and ignore them) until sender
+ // starts sending correct-version messages (which it can do without reconnecting -- version is per-Message)
+ }
+ else
+ {
+ // only set version when <= to us, otherwise it's the responsibility of the other end to mimic us
+ Gossiper.instance.setVersion(socket.getInetAddress(), version);
+ }
+
while (true)
{
try
{
if (isStream)
{
- if (version > MessagingService.version_)
- {
- logger.error("Received untranslated stream from newer protcol version. Terminating connection!");
- close();
- return;
- }
int size = input.readInt();
byte[] headerBytes = new byte[size];
input.readFully(headerBytes);
@@ -106,11 +124,8 @@ public class IncomingTcpConnection exten
input.readFully(contentBytes, offset, CHUNK_SIZE);
input.readFully(contentBytes, size - remainder, remainder);
- if (version > MessagingService.version_)
- logger.info("Received connection from newer protocol version. Ignorning message.");
- else
+ if (version <= MessagingService.version_)
{
- // todo: need to be aware of message version.
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(contentBytes));
String id = dis.readUTF();
Message message = Message.serializer().deserialize(dis, version);
@@ -120,9 +135,8 @@ public class IncomingTcpConnection exten
// prepare to read the next message
MessagingService.validateMagic(input.readInt());
int header = input.readInt();
- version = MessagingService.getBits(header, 15, 8);
assert isStream == (MessagingService.getBits(header, 3, 1) == 1) : "Connections cannot change type: " + isStream;
- assert version == MessagingService.getBits(header, 15, 8) : "Protocol version shouldn't change during a session";
+ version = MessagingService.getBits(header, 15, 8);
}
catch (EOFException e)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java?rev=1140758&r1=1140757&r2=1140758&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java Tue Jun 28 17:58:59 2011
@@ -165,9 +165,10 @@ public class OutboundTcpConnection exten
try
{
// zero means 'bind on any available port.'
- if (DatabaseDescriptor.getEncryptionOptions().internode_encryption == EncryptionOptions.InternodeEncryption.all)
+ EncryptionOptions options = DatabaseDescriptor.getEncryptionOptions();
+ if (options != null && options.internode_encryption == EncryptionOptions.InternodeEncryption.all)
{
- socket = SSLFactory.getSocket(DatabaseDescriptor.getEncryptionOptions(), endpoint, DatabaseDescriptor.getStoragePort(), FBUtilities.getLocalAddress(), 0);
+ socket = SSLFactory.getSocket(options, endpoint, DatabaseDescriptor.getStoragePort(), FBUtilities.getLocalAddress(), 0);
}
else {
socket = new Socket(endpoint, DatabaseDescriptor.getStoragePort(), FBUtilities.getLocalAddress(), 0);