You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/06/25 04:37:26 UTC
svn commit: r1139483 -
/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
Author: jbellis
Date: Sat Jun 25 02:37:25 2011
New Revision: 1139483
URL: http://svn.apache.org/viewvc?rev=1139483&view=rev
Log:
cleanup
Modified:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java?rev=1139483&r1=1139482&r2=1139483&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java Sat Jun 25 02:37:25 2011
@@ -51,28 +51,19 @@ import org.slf4j.LoggerFactory;
public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap<ByteBuffer, IColumn>>
{
- private static final Logger logger = LoggerFactory
- .getLogger(ColumnFamilyRecordReader.class);
- private ColumnFamilySplit split;
- private RowIterator iter;
+ private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyRecordReader.class);
+
+ private ColumnFamilySplit split;
+ private RowIterator iter;
private Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>> currentRow;
- private SlicePredicate predicate;
- private int totalRowCount; // total
- // number
- // of
- // rows
- // to
- // fetch
- private int batchRowCount; // fetch
- // this
- // many
- // per
- // batch
- private String cfName;
- private String keyspace;
- private TSocket socket;
- private Cassandra.Client client;
- private ConsistencyLevel consistencyLevel;
+ private SlicePredicate predicate;
+ private int totalRowCount; // total number of rows to fetch
+ private int batchRowCount; // fetch this many per batch
+ private String cfName;
+ private String keyspace;
+ private TSocket socket;
+ private Cassandra.Client client;
+ private ConsistencyLevel consistencyLevel;
public void close()
{
@@ -121,7 +112,7 @@ public class ColumnFamilyRecordReader ex
// create connection using thrift
List<String> locationsAttempted = new ArrayList<String>();
- for (Iterator<String> it = getLocations(conf); it.hasNext();)
+ for (Iterator<String> it = getLocations(conf); it.hasNext(); )
{
String location = it.next();
try
@@ -139,12 +130,13 @@ public class ColumnFamilyRecordReader ex
client = null;
}
}
- if (null == client)
+ if (client == null)
{
- throw new RuntimeException("For the split " + split + " there were no locations "
- + (ConfigHelper.getInputSplitUseOnlySameDCReplica(conf) ? "(from same DC) " : "") + "alive: "
- + StringUtils.join(locationsAttempted, ", "));
+ String message = String.format("For the split %s there were no locations %salive: %s",
+ split, (ConfigHelper.getInputSplitUseOnlySameDCReplica(conf) ? "(from same DC) " : ""), StringUtils.join(locationsAttempted, ", "));
+ throw new RuntimeException(message);
}
+
// log in
client.set_keyspace(keyspace);
if (ConfigHelper.getInputKeyspaceUserName(conf) != null)
@@ -177,48 +169,37 @@ public class ColumnFamilyRecordReader ex
// single-DC clusters, at least.
private Iterator<String> getLocations(final Configuration conf) throws IOException
{
- try
+ for (InetAddress address : InetAddress.getAllByName(InetAddress.getLocalHost().getHostAddress()))
{
- for (InetAddress address : InetAddress.getAllByName(InetAddress.getLocalHost().getHostAddress()))
+ for (final String location : split.getLocations())
{
- for (final String location : split.getLocations())
+ InetAddress locationAddress;
+ try
{
- InetAddress locationAddress = getInetAddressByName(location);
- if (address.equals(locationAddress))
- {
- // add fall back replicas from same DC via the following
- // Iterator
- return new SplitEndpointIterator(location, conf);
- }
+ locationAddress = InetAddress.getByName(location);
+ }
+ catch (UnknownHostException e)
+ {
+ throw new AssertionError(e);
+ }
+
+ if (address.equals(locationAddress))
+ {
+ // add fall back replicas from same DC via the following Iterator
+ return new SplitEndpointIterator(location, conf);
}
}
}
- catch (UnknownHostException e)
- {
- throw new AssertionError(e);
- }
-
- return Arrays.asList(split.getLocations()).iterator();
- }
- private static InetAddress getInetAddressByName(String name)
- {
- try
- {
- return InetAddress.getByName(name);
- }
- catch (UnknownHostException e)
- {
- throw new AssertionError(e);
- }
+ return Arrays.asList(split.getLocations()).iterator();
}
private class RowIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>>>
{
- private List<KeySlice> rows;
- private String startToken;
- private int totalRead = 0;
- private int i = 0;
+ private List<KeySlice> rows;
+ private String startToken;
+ private int totalRead = 0;
+ private int i = 0;
private final AbstractType comparator;
private final AbstractType subComparator;
private final IPartitioner partitioner;
@@ -274,8 +255,7 @@ public class ColumnFamilyRecordReader ex
return;
}
- KeyRange keyRange = new KeyRange(batchRowCount).setStart_token(startToken)
- .setEnd_token(split.getEndToken());
+ KeyRange keyRange = new KeyRange(batchRowCount).setStart_token(startToken).setEnd_token(split.getEndToken());
try
{
rows = client.get_range_slices(new ColumnParent(cfName), predicate, keyRange, consistencyLevel);
@@ -335,8 +315,7 @@ public class ColumnFamilyRecordReader ex
private IColumn unthriftifySuper(SuperColumn super_column)
{
- org.apache.cassandra.db.SuperColumn sc = new org.apache.cassandra.db.SuperColumn(super_column.name,
- subComparator);
+ org.apache.cassandra.db.SuperColumn sc = new org.apache.cassandra.db.SuperColumn(super_column.name, subComparator);
for (Column column : super_column.columns)
{
sc.addColumn(unthriftifySimple(column));
@@ -352,12 +331,12 @@ public class ColumnFamilyRecordReader ex
private class SplitEndpointIterator extends AbstractIterator<String>
{
- private final boolean restrictToSameDC;
- private final String location;
+ private final boolean restrictToSameDC;
+ private final String location;
private final Configuration conf;
- private Cassandra.Client client;
- private List<String> endpoints;
- private int endpointsIdx = -1;
+ private Cassandra.Client client;
+ private List<String> endpoints;
+ private int endpointsIdx = -1;
SplitEndpointIterator(final String location, final Configuration conf)
{
@@ -384,8 +363,7 @@ public class ColumnFamilyRecordReader ex
{
try
{
- endpoints = sortEndpointsByProximity(nextLocation, Arrays.asList(split.getLocations()),
- restrictToSameDC);
+ endpoints = sortEndpointsByProximity(nextLocation, Arrays.asList(split.getLocations()), restrictToSameDC);
if (location.equals(endpoints.get(0)))
{
++endpointsIdx;
@@ -394,17 +372,13 @@ public class ColumnFamilyRecordReader ex
}
catch (TException e)
{
- logger.info(
- "failed to sortEndpointsByProximity(" + location + ", ["
- + StringUtils.join(split.getLocations(), ',') + "], "
- + restrictToSameDC + ")", e);
+ logger.info(String.format("failed to sortEndpointsByProximity(%s, [%s], %s)",
+ location, StringUtils.join(split.getLocations(), ','), restrictToSameDC), e);
}
catch (IOException e)
{
- logger.info(
- "failed to sortEndpointsByProximity(" + location + ", ["
- + StringUtils.join(split.getLocations(), ',') + "], "
- + restrictToSameDC + ")", e);
+ logger.info(String.format("failed to sortEndpointsByProximity(%s, [%s], %s)",
+ location, StringUtils.join(split.getLocations(), ','), restrictToSameDC), e);
}
}
}
@@ -414,8 +388,8 @@ public class ColumnFamilyRecordReader ex
}
if (null == endpoints)
{
- throw new AssertionError("failed to find any fallback replica endpoints from "
- + StringUtils.join(split.getLocations(), ','));
+ throw new AssertionError(String.format("failed to find any fallback replica endpoints from %s",
+ StringUtils.join(split.getLocations(), ',')));
}
}
if (endpoints.size() > endpointsIdx)
@@ -432,8 +406,7 @@ public class ColumnFamilyRecordReader ex
try
{
// try first our configured initialAddress
- return getClient(ConfigHelper.getInitialAddress(conf)).sort_endpoints_by_proximity(location, endpoints,
- restrictToSameDC);
+ return getClient(ConfigHelper.getInitialAddress(conf)).sort_endpoints_by_proximity(location, endpoints, restrictToSameDC);
}
catch (IOException ex)
{