You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/01/03 20:43:11 UTC
svn commit: r1054720 - in /cassandra/branches/cassandra-0.7: CHANGES.txt
src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
Author: jbellis
Date: Mon Jan 3 19:43:11 2011
New Revision: 1054720
URL: http://svn.apache.org/viewvc?rev=1054720&view=rev
Log:
retry hadoop split requests on connection failure
patch by mck; reviewed by jbellis for CASSANDRA-1927
Modified:
cassandra/branches/cassandra-0.7/CHANGES.txt
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1054720&r1=1054719&r2=1054720&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Mon Jan 3 19:43:11 2011
@@ -6,6 +6,7 @@ dev
and seq scan operations (CASSANDRA-1470)
* add RMI authentication options to nodetool (CASSANDRA-1921)
* Make snitches configurable at runtime (CASSANDRA-1374)
+ * retry hadoop split requests on connection failure (CASSANDRA-1927)
0.7.0-rc4
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java?rev=1054720&r1=1054719&r2=1054720&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java Mon Jan 3 19:43:11 2011
@@ -1,6 +1,6 @@
package org.apache.cassandra.hadoop;
/*
- *
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -8,16 +8,16 @@ package org.apache.cassandra.hadoop;
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
+ *
*/
@@ -30,6 +30,7 @@ import java.util.concurrent.ExecutorServ
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,7 +67,7 @@ import org.apache.thrift.transport.TTran
public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<ByteBuffer, IColumn>>
{
private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyInputFormat.class);
-
+
private String keyspace;
private String cfName;
@@ -93,8 +94,8 @@ public class ColumnFamilyInputFormat ext
keyspace = ConfigHelper.getInputKeyspace(context.getConfiguration());
cfName = ConfigHelper.getInputColumnFamily(context.getConfiguration());
-
- // cannonical ranges, split into pieces, fetching the splits in parallel
+
+ // cannonical ranges, split into pieces, fetching the splits in parallel
ExecutorService executor = Executors.newCachedThreadPool();
List<InputSplit> splits = new ArrayList<InputSplit>();
@@ -103,23 +104,23 @@ public class ColumnFamilyInputFormat ext
List<Future<List<InputSplit>>> splitfutures = new ArrayList<Future<List<InputSplit>>>();
for (TokenRange range : masterRangeNodes)
{
- // for each range, pick a live owner and ask it to compute bite-sized splits
- splitfutures.add(executor.submit(new SplitCallable(range, conf)));
+ // for each range, pick a live owner and ask it to compute bite-sized splits
+ splitfutures.add(executor.submit(new SplitCallable(range, conf)));
}
-
+
// wait until we have all the results back
for (Future<List<InputSplit>> futureInputSplits : splitfutures)
{
try
{
splits.addAll(futureInputSplits.get());
- }
+ }
catch (Exception e)
{
throw new IOException("Could not get input splits", e);
- }
+ }
}
- }
+ }
finally
{
executor.shutdownNow();
@@ -132,7 +133,7 @@ public class ColumnFamilyInputFormat ext
/**
* Gets a token range and splits it up according to the suggested
- * size into input splits that Hadoop can use.
+ * size into input splits that Hadoop can use.
*/
class SplitCallable implements Callable<List<InputSplit>>
{
@@ -158,7 +159,7 @@ public class ColumnFamilyInputFormat ext
{
endpoints[i] = InetAddress.getByName(endpoints[i]).getHostName();
}
-
+
for (int i = 1; i < tokens.size(); i++)
{
ColumnFamilySplit split = new ColumnFamilySplit(tokens.get(i - 1), tokens.get(i), endpoints);
@@ -171,24 +172,29 @@ public class ColumnFamilyInputFormat ext
private List<String> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf) throws IOException
{
- // TODO handle failure of range replicas & retry
- List<String> splits;
int splitsize = ConfigHelper.getInputSplitSize(conf);
- try
+ for (String host : range.endpoints)
{
- Cassandra.Client client = createConnection(range.endpoints.get(0), ConfigHelper.getRpcPort(conf), true);
- client.set_keyspace(keyspace);
- splits = client.describe_splits(cfName, range.start_token, range.end_token, splitsize);
- }
- catch (TException e)
- {
- throw new RuntimeException(e);
- }
- catch (InvalidRequestException e)
- {
- throw new RuntimeException(e);
+ try
+ {
+ Cassandra.Client client = createConnection(host, ConfigHelper.getRpcPort(conf), true);
+ client.set_keyspace(keyspace);
+ return client.describe_splits(cfName, range.start_token, range.end_token, splitsize);
+ }
+ catch (IOException e)
+ {
+ logger.debug("failed connect to endpoint " + host, e);
+ }
+ catch (TException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (InvalidRequestException e)
+ {
+ throw new RuntimeException(e);
+ }
}
- return splits;
+ throw new IOException("failed connecting to all endpoints " + StringUtils.join(range.endpoints, ","));
}
private static Cassandra.Client createConnection(String host, Integer port, boolean framed) throws IOException