You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/07/07 15:21:05 UTC
svn commit: r961366 - in /cassandra/trunk:
src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
src/java/org/apache/cassandra/hadoop/ConfigHelper.java
test/unit/org/apache/cassandra/hadoop/SampleColumnFamilyOutputTool.java
Author: jbellis
Date: Wed Jul 7 13:21:05 2010
New Revision: 961366
URL: http://svn.apache.org/viewvc?rev=961366&view=rev
Log:
r/m Hadoop outputSlicePredicate. patch by jbellis; reviewed by jhanna for CASSANDRA-1246
Modified:
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
cassandra/trunk/test/unit/org/apache/cassandra/hadoop/SampleColumnFamilyOutputTool.java
Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java?rev=961366&r1=961365&r2=961366&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java Wed Jul 7 13:21:05 2010
@@ -22,7 +22,6 @@ package org.apache.cassandra.hadoop;
*/
import java.io.IOException;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -36,12 +35,7 @@ import org.apache.cassandra.thrift.Authe
import org.apache.cassandra.thrift.AuthenticationRequest;
import org.apache.cassandra.thrift.AuthorizationException;
import org.apache.cassandra.thrift.Cassandra;
-import org.apache.cassandra.thrift.ColumnParent;
-import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.InvalidRequestException;
-import org.apache.cassandra.thrift.KeyRange;
-import org.apache.cassandra.thrift.KeySlice;
-import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
@@ -64,15 +58,12 @@ import org.slf4j.LoggerFactory;
* As is the case with the {@link ColumnFamilyInputFormat}, you need to set the
* CF and predicate (description of columns to extract from each row) in your
* Hadoop job Configuration. The {@link ConfigHelper} class, through its
- * {@link ConfigHelper#setColumnFamily} and
- * {@link ConfigHelper#setSlicePredicate} methods, is provided to make this
+ * {@link ConfigHelper#setOutputColumnFamily} method, is provided to make this
* simple.
* </p>
*
* <p>
- * By default, it prevents overwriting existing rows in the column family, by
- * ensuring at initialization time that it contains no rows in the given slice
- * predicate. For the sake of performance, it employs a lazy write-back caching
+ * For the sake of performance, this class employs a lazy write-back caching
* mechanism, where its record writer batches mutations created based on the
* reduce's inputs (in a task-specific map). When the writer is closed, then it
* makes the changes official by sending a batch mutate request to Cassandra.
@@ -89,47 +80,18 @@ public class ColumnFamilyOutputFormat ex
/**
* Check for validity of the output-specification for the job.
*
- * <p>
- * This is to validate the output specification for the job when it is a job
- * is submitted. By default, it will prevent writes to the given column
- * family, if it already contains one or more rows in the given slice
- * predicate. If you wish to relax that restriction, you may override this
- * method is a sub-class of your choosing.
- * </p>
- *
* @param context
* information about the job
* @throws IOException
* when output should not be attempted
*/
@Override
- public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException
+ public void checkOutputSpecs(JobContext context)
{
- validateConfiguration(context.getConfiguration());
- String keyspace = ConfigHelper.getOutputKeyspace(context.getConfiguration());
- String columnFamily = ConfigHelper.getOutputColumnFamily(context.getConfiguration());
- SlicePredicate slicePredicate = ConfigHelper.getOutputSlicePredicate(context.getConfiguration());
- assert slicePredicate != null;
- if (slicePredicate.column_names == null && slicePredicate.slice_range == null)
- slicePredicate = slicePredicate.setColumn_names(new ArrayList<byte[]>());
-
- List<KeySlice> keySlices;
- try
- {
- TSocket socket = new TSocket(DatabaseDescriptor.getListenAddress().getHostName(), DatabaseDescriptor.getRpcPort());
- Cassandra.Client client = createAuthenticatedClient(socket, context);
- ColumnParent parent = new ColumnParent().setColumn_family(columnFamily);
- KeyRange range = new KeyRange().setStart_key("".getBytes()).setEnd_key("".getBytes());
- keySlices = client.get_range_slices(parent, slicePredicate, range, ConsistencyLevel.ONE);
- }
- catch (Exception e)
- {
- throw new IOException(e);
- }
- if (keySlices.size() > 0)
+ Configuration conf = context.getConfiguration();
+ if (ConfigHelper.getOutputKeyspace(conf) == null || ConfigHelper.getOutputColumnFamily(conf) == null)
{
- throw new IOException(String.format("The column family %s in the keyspace %s already has %d keys in the slice predicate %s",
- columnFamily, keyspace, keySlices.size(), slicePredicate));
+ throw new UnsupportedOperationException("you must set the keyspace and columnfamily with setColumnFamily()");
}
}
@@ -175,25 +137,6 @@ public class ColumnFamilyOutputFormat ex
{
return new ColumnFamilyRecordWriter(context);
}
-
- /**
- * Ensure that this output format has been configured correctly, with a
- * valid keyspace, column family and slice predicate.
- *
- * @param conf
- */
- public void validateConfiguration(Configuration conf)
- {
- if (ConfigHelper.getOutputKeyspace(conf) == null || ConfigHelper.getOutputColumnFamily(conf) == null)
- {
- throw new UnsupportedOperationException("you must set the keyspace and columnfamily with setColumnFamily()");
- }
- if (ConfigHelper.getOutputSlicePredicate(conf) == null)
- {
- System.err.printf("Since no slice predicate was specified, all columns in %s will be overwritten%n",
- ConfigHelper.getOutputColumnFamily(conf));
- }
- }
/**
* Return a client based on the given socket that points to the configured
Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java?rev=961366&r1=961365&r2=961366&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java Wed Jul 7 13:21:05 2010
@@ -152,22 +152,6 @@ public class ConfigHelper
return predicateFromString(conf.get(INPUT_PREDICATE_CONFIG));
}
- /**
- * Set the predicate that determines what columns will be selected from each row.
- *
- * @param conf Job configuration you are about to run
- * @param predicate
- */
- public static void setOutputSlicePredicate(Configuration conf, SlicePredicate predicate)
- {
- conf.set(OUTPUT_PREDICATE_CONFIG, predicateToString(predicate));
- }
-
- public static SlicePredicate getOutputSlicePredicate(Configuration conf)
- {
- return predicateFromString(conf.get(OUTPUT_PREDICATE_CONFIG));
- }
-
private static String predicateToString(SlicePredicate predicate)
{
assert predicate != null;
Modified: cassandra/trunk/test/unit/org/apache/cassandra/hadoop/SampleColumnFamilyOutputTool.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/hadoop/SampleColumnFamilyOutputTool.java?rev=961366&r1=961365&r2=961366&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/hadoop/SampleColumnFamilyOutputTool.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/hadoop/SampleColumnFamilyOutputTool.java Wed Jul 7 13:21:05 2010
@@ -60,7 +60,6 @@ public class SampleColumnFamilyOutputToo
ConfigHelper.setOutputColumnFamily(job.getConfiguration(),
ColumnFamilyOutputFormatTest.KEYSPACE,
ColumnFamilyOutputFormatTest.COLUMN_FAMILY);
- ConfigHelper.setOutputSlicePredicate(job.getConfiguration(), new SlicePredicate());
SequenceFileInputFormat.addInputPath(job, inputdir);