You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/06/25 04:37:26 UTC

svn commit: r1139483 - /cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java

Author: jbellis
Date: Sat Jun 25 02:37:25 2011
New Revision: 1139483

URL: http://svn.apache.org/viewvc?rev=1139483&view=rev
Log:
cleanup

Modified:
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java?rev=1139483&r1=1139482&r2=1139483&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java Sat Jun 25 02:37:25 2011
@@ -51,28 +51,19 @@ import org.slf4j.LoggerFactory;
 
 public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap<ByteBuffer, IColumn>>
 {
-    private static final Logger                              logger = LoggerFactory
-                                                                            .getLogger(ColumnFamilyRecordReader.class);
-    private ColumnFamilySplit                                split;
-    private RowIterator                                      iter;
+    private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyRecordReader.class);
+
+    private ColumnFamilySplit split;
+    private RowIterator iter;
     private Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>> currentRow;
-    private SlicePredicate                                   predicate;
-    private int                                              totalRowCount;                                            // total
-                                                                                                                        // number
-                                                                                                                        // of
-                                                                                                                        // rows
-                                                                                                                        // to
-                                                                                                                        // fetch
-    private int                                              batchRowCount;                                            // fetch
-                                                                                                                        // this
-                                                                                                                        // many
-                                                                                                                        // per
-                                                                                                                        // batch
-    private String                                           cfName;
-    private String                                           keyspace;
-    private TSocket                                          socket;
-    private Cassandra.Client                                 client;
-    private ConsistencyLevel                                 consistencyLevel;
+    private SlicePredicate predicate;
+    private int totalRowCount; // total number of rows to fetch
+    private int batchRowCount; // fetch this many per batch
+    private String cfName;
+    private String keyspace;
+    private TSocket socket;
+    private Cassandra.Client client;
+    private ConsistencyLevel consistencyLevel;
 
     public void close()
     {
@@ -121,7 +112,7 @@ public class ColumnFamilyRecordReader ex
 
             // create connection using thrift
             List<String> locationsAttempted = new ArrayList<String>();
-            for (Iterator<String> it = getLocations(conf); it.hasNext();)
+            for (Iterator<String> it = getLocations(conf); it.hasNext(); )
             {
                 String location = it.next();
                 try
@@ -139,12 +130,13 @@ public class ColumnFamilyRecordReader ex
                     client = null;
                 }
             }
-            if (null == client)
+            if (client == null)
             {
-                throw new RuntimeException("For the split " + split + " there were no locations "
-                        + (ConfigHelper.getInputSplitUseOnlySameDCReplica(conf) ? "(from same DC) " : "") + "alive: "
-                        + StringUtils.join(locationsAttempted, ", "));
+                String message = String.format("For the split %s there were no locations %salive: %s",
+                                               split, (ConfigHelper.getInputSplitUseOnlySameDCReplica(conf) ? "(from same DC) " : ""), StringUtils.join(locationsAttempted, ", "));
+                throw new RuntimeException(message);
             }
+
             // log in
             client.set_keyspace(keyspace);
             if (ConfigHelper.getInputKeyspaceUserName(conf) != null)
@@ -177,48 +169,37 @@ public class ColumnFamilyRecordReader ex
     // single-DC clusters, at least.
     private Iterator<String> getLocations(final Configuration conf) throws IOException
     {
-        try
+        for (InetAddress address : InetAddress.getAllByName(InetAddress.getLocalHost().getHostAddress()))
         {
-            for (InetAddress address : InetAddress.getAllByName(InetAddress.getLocalHost().getHostAddress()))
+            for (final String location : split.getLocations())
             {
-                for (final String location : split.getLocations())
+                InetAddress locationAddress;
+                try
                 {
-                    InetAddress locationAddress = getInetAddressByName(location);
-                    if (address.equals(locationAddress))
-                    {
-                        // add fall back replicas from same DC via the following
-                        // Iterator
-                        return new SplitEndpointIterator(location, conf);
-                    }
+                    locationAddress = InetAddress.getByName(location);
+                }
+                catch (UnknownHostException e)
+                {
+                    throw new AssertionError(e);
+                }
+
+                if (address.equals(locationAddress))
+                {
+                    // add fall back replicas from same DC via the following Iterator
+                    return new SplitEndpointIterator(location, conf);
                 }
             }
         }
-        catch (UnknownHostException e)
-        {
-            throw new AssertionError(e);
-        }
-        
-        return Arrays.asList(split.getLocations()).iterator();
-    }
 
-    private static InetAddress getInetAddressByName(String name)
-    {
-        try
-        {
-            return InetAddress.getByName(name);
-        }
-        catch (UnknownHostException e)
-        {
-            throw new AssertionError(e);
-        }
+        return Arrays.asList(split.getLocations()).iterator();
     }
 
     private class RowIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>>>
     {
-        private List<KeySlice>     rows;
-        private String             startToken;
-        private int                totalRead = 0;
-        private int                i         = 0;
+        private List<KeySlice> rows;
+        private String startToken;
+        private int totalRead = 0;
+        private int i = 0;
         private final AbstractType comparator;
         private final AbstractType subComparator;
         private final IPartitioner partitioner;
@@ -274,8 +255,7 @@ public class ColumnFamilyRecordReader ex
                 return;
             }
 
-            KeyRange keyRange = new KeyRange(batchRowCount).setStart_token(startToken)
-                    .setEnd_token(split.getEndToken());
+            KeyRange keyRange = new KeyRange(batchRowCount).setStart_token(startToken).setEnd_token(split.getEndToken());
             try
             {
                 rows = client.get_range_slices(new ColumnParent(cfName), predicate, keyRange, consistencyLevel);
@@ -335,8 +315,7 @@ public class ColumnFamilyRecordReader ex
 
         private IColumn unthriftifySuper(SuperColumn super_column)
         {
-            org.apache.cassandra.db.SuperColumn sc = new org.apache.cassandra.db.SuperColumn(super_column.name,
-                    subComparator);
+            org.apache.cassandra.db.SuperColumn sc = new org.apache.cassandra.db.SuperColumn(super_column.name, subComparator);
             for (Column column : super_column.columns)
             {
                 sc.addColumn(unthriftifySimple(column));
@@ -352,12 +331,12 @@ public class ColumnFamilyRecordReader ex
 
     private class SplitEndpointIterator extends AbstractIterator<String>
     {
-        private final boolean       restrictToSameDC;
-        private final String        location;
+        private final boolean restrictToSameDC;
+        private final String location;
         private final Configuration conf;
-        private Cassandra.Client    client;
-        private List<String>        endpoints;
-        private int                 endpointsIdx = -1;
+        private Cassandra.Client client;
+        private List<String> endpoints;
+        private int endpointsIdx = -1;
 
         SplitEndpointIterator(final String location, final Configuration conf)
         {
@@ -384,8 +363,7 @@ public class ColumnFamilyRecordReader ex
                         {
                             try
                             {
-                                endpoints = sortEndpointsByProximity(nextLocation, Arrays.asList(split.getLocations()),
-                                        restrictToSameDC);
+                                endpoints = sortEndpointsByProximity(nextLocation, Arrays.asList(split.getLocations()), restrictToSameDC);
                                 if (location.equals(endpoints.get(0)))
                                 {
                                     ++endpointsIdx;
@@ -394,17 +372,13 @@ public class ColumnFamilyRecordReader ex
                             }
                             catch (TException e)
                             {
-                                logger.info(
-                                        "failed to sortEndpointsByProximity(" + location + ", ["
-                                                + StringUtils.join(split.getLocations(), ',') + "], "
-                                                + restrictToSameDC + ")", e);
+                                logger.info(String.format("failed to sortEndpointsByProximity(%s, [%s], %s)",
+                                                          location, StringUtils.join(split.getLocations(), ','), restrictToSameDC), e);
                             }
                             catch (IOException e)
                             {
-                                logger.info(
-                                        "failed to sortEndpointsByProximity(" + location + ", ["
-                                                + StringUtils.join(split.getLocations(), ',') + "], "
-                                                + restrictToSameDC + ")", e);
+                                logger.info(String.format("failed to sortEndpointsByProximity(%s, [%s], %s)",
+                                                          location, StringUtils.join(split.getLocations(), ','), restrictToSameDC), e);
                             }
                         }
                     }
@@ -414,8 +388,8 @@ public class ColumnFamilyRecordReader ex
                     }
                     if (null == endpoints)
                     {
-                        throw new AssertionError("failed to find any fallback replica endpoints from "
-                                + StringUtils.join(split.getLocations(), ','));
+                        throw new AssertionError(String.format("failed to find any fallback replica endpoints from %s",
+                                                               StringUtils.join(split.getLocations(), ',')));
                     }
                 }
                 if (endpoints.size() > endpointsIdx)
@@ -432,8 +406,7 @@ public class ColumnFamilyRecordReader ex
             try
             {
                 // try first our configured initialAddress
-                return getClient(ConfigHelper.getInitialAddress(conf)).sort_endpoints_by_proximity(location, endpoints,
-                        restrictToSameDC);
+                return getClient(ConfigHelper.getInitialAddress(conf)).sort_endpoints_by_proximity(location, endpoints, restrictToSameDC);
             }
             catch (IOException ex)
             {