You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/01/03 20:43:11 UTC

svn commit: r1054720 - in /cassandra/branches/cassandra-0.7: CHANGES.txt src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java

Author: jbellis
Date: Mon Jan  3 19:43:11 2011
New Revision: 1054720

URL: http://svn.apache.org/viewvc?rev=1054720&view=rev
Log:
retry hadoop split requests on connection failure
patch by mck; reviewed by jbellis for CASSANDRA-1927

Modified:
    cassandra/branches/cassandra-0.7/CHANGES.txt
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java

Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1054720&r1=1054719&r2=1054720&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Mon Jan  3 19:43:11 2011
@@ -6,6 +6,7 @@ dev
    and seq scan operations (CASSANDRA-1470)
  * add RMI authentication options to nodetool (CASSANDRA-1921)
  * Make snitches configurable at runtime (CASSANDRA-1374)
+ * retry hadoop split requests on connection failure (CASSANDRA-1927)
 
 
 0.7.0-rc4

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java?rev=1054720&r1=1054719&r2=1054720&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java Mon Jan  3 19:43:11 2011
@@ -1,6 +1,6 @@
 package org.apache.cassandra.hadoop;
 /*
- * 
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -8,16 +8,16 @@ package org.apache.cassandra.hadoop;
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- * 
+ *
  */
 
 
@@ -30,6 +30,7 @@ import java.util.concurrent.ExecutorServ
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,7 +67,7 @@ import org.apache.thrift.transport.TTran
 public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<ByteBuffer, IColumn>>
 {
     private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyInputFormat.class);
-    
+
     private String keyspace;
     private String cfName;
 
@@ -93,8 +94,8 @@ public class ColumnFamilyInputFormat ext
 
         keyspace = ConfigHelper.getInputKeyspace(context.getConfiguration());
         cfName = ConfigHelper.getInputColumnFamily(context.getConfiguration());
-        
-        // cannonical ranges, split into pieces, fetching the splits in parallel 
+
+        // cannonical ranges, split into pieces, fetching the splits in parallel
         ExecutorService executor = Executors.newCachedThreadPool();
         List<InputSplit> splits = new ArrayList<InputSplit>();
 
@@ -103,23 +104,23 @@ public class ColumnFamilyInputFormat ext
             List<Future<List<InputSplit>>> splitfutures = new ArrayList<Future<List<InputSplit>>>();
             for (TokenRange range : masterRangeNodes)
             {
-                // for each range, pick a live owner and ask it to compute bite-sized splits
-                splitfutures.add(executor.submit(new SplitCallable(range, conf)));
+                    // for each range, pick a live owner and ask it to compute bite-sized splits
+                    splitfutures.add(executor.submit(new SplitCallable(range, conf)));
             }
-    
+
             // wait until we have all the results back
             for (Future<List<InputSplit>> futureInputSplits : splitfutures)
             {
                 try
                 {
                     splits.addAll(futureInputSplits.get());
-                } 
+                }
                 catch (Exception e)
                 {
                     throw new IOException("Could not get input splits", e);
-                } 
+                }
             }
-        } 
+        }
         finally
         {
             executor.shutdownNow();
@@ -132,7 +133,7 @@ public class ColumnFamilyInputFormat ext
 
     /**
      * Gets a token range and splits it up according to the suggested
-     * size into input splits that Hadoop can use. 
+     * size into input splits that Hadoop can use.
      */
     class SplitCallable implements Callable<List<InputSplit>>
     {
@@ -158,7 +159,7 @@ public class ColumnFamilyInputFormat ext
             {
                 endpoints[i] = InetAddress.getByName(endpoints[i]).getHostName();
             }
-            
+
             for (int i = 1; i < tokens.size(); i++)
             {
                 ColumnFamilySplit split = new ColumnFamilySplit(tokens.get(i - 1), tokens.get(i), endpoints);
@@ -171,24 +172,29 @@ public class ColumnFamilyInputFormat ext
 
     private List<String> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf) throws IOException
     {
-        // TODO handle failure of range replicas & retry
-        List<String> splits;
         int splitsize = ConfigHelper.getInputSplitSize(conf);
-        try
+        for (String host : range.endpoints)
         {
-            Cassandra.Client client = createConnection(range.endpoints.get(0), ConfigHelper.getRpcPort(conf), true);
-            client.set_keyspace(keyspace);
-            splits = client.describe_splits(cfName, range.start_token, range.end_token, splitsize);
-        }
-        catch (TException e)
-        {
-            throw new RuntimeException(e);
-        }
-        catch (InvalidRequestException e)
-        {
-            throw new RuntimeException(e);
+            try
+            {
+                Cassandra.Client client = createConnection(host, ConfigHelper.getRpcPort(conf), true);
+                client.set_keyspace(keyspace);
+                return client.describe_splits(cfName, range.start_token, range.end_token, splitsize);
+            }
+            catch (IOException e)
+            {
+                logger.debug("failed connect to endpoint " + host, e);
+            }
+            catch (TException e)
+            {
+                throw new RuntimeException(e);
+            }
+            catch (InvalidRequestException e)
+            {
+                throw new RuntimeException(e);
+            }
         }
-        return splits;
+        throw new IOException("failed connecting to all endpoints " + StringUtils.join(range.endpoints, ","));
     }
 
     private static Cassandra.Client createConnection(String host, Integer port, boolean framed) throws IOException