You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/06/16 20:56:57 UTC
svn commit: r955347 - in /cassandra/trunk: ./
src/java/org/apache/cassandra/hadoop/ test/unit/org/apache/cassandra/
test/unit/org/apache/cassandra/hadoop/
Author: jbellis
Date: Wed Jun 16 18:56:57 2010
New Revision: 955347
URL: http://svn.apache.org/viewvc?rev=955347&view=rev
Log:
hadoop outputformat. patch by Karthick Sankarachary; reviewed by Stu Hood and jbellis for CASSSANDRA-1101
Added:
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputReducer.java
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnWritable.java
cassandra/trunk/test/unit/org/apache/cassandra/EmbeddedServer.java
cassandra/trunk/test/unit/org/apache/cassandra/hadoop/ColumnFamilyOutputFormatTest.java
cassandra/trunk/test/unit/org/apache/cassandra/hadoop/SampleColumnFamilyOutputTool.java
cassandra/trunk/test/unit/org/apache/cassandra/hadoop/SampleColumnMapper.java
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/NEWS.txt
cassandra/trunk/ivy.xml
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=955347&r1=955346&r2=955347&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Wed Jun 16 18:56:57 2010
@@ -28,6 +28,7 @@ dev
entries spanning segment boundaries, with SegmentedFile that computes
segments that always contain entire entries/rows (CASSANDRA-1117)
* avoid reading large rows into memory during compaction (CASSANDRA-16)
+ * added hadoop OutputFormat (CASSANDRA-1101)
0.6.3
Modified: cassandra/trunk/NEWS.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/NEWS.txt?rev=955347&r1=955346&r2=955347&view=diff
==============================================================================
--- cassandra/trunk/NEWS.txt (original)
+++ cassandra/trunk/NEWS.txt Wed Jun 16 18:56:57 2010
@@ -14,6 +14,7 @@ Features
ConsitencyLevel.DCQUORUM and DCQUORUMSYNC. See comments in
`cassandra.yaml.`
- row size limit increased from 2GB to 2 billion columns
+ - Hadoop OutputFormat support
Configuraton
------------
Modified: cassandra/trunk/ivy.xml
URL: http://svn.apache.org/viewvc/cassandra/trunk/ivy.xml?rev=955347&r1=955346&r2=955347&view=diff
==============================================================================
--- cassandra/trunk/ivy.xml (original)
+++ cassandra/trunk/ivy.xml Wed Jun 16 18:56:57 2010
@@ -25,6 +25,7 @@
<dependency org="com.thoughtworks.paranamer"
name="paranamer-ant" rev="2.1"/>
<dependency org="junit" name="junit" rev="4.6" />
+ <dependency org="commons-logging" name="commons-logging" rev="1.1.1"/>
<dependency org="org.apache.rat" name="apache-rat" rev="0.6" />
<dependency org="org.apache.hadoop" name="hadoop-core" rev="0.20.2"/>
</dependencies>
Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java?rev=955347&r1=955346&r2=955347&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java Wed Jun 16 18:56:57 2010
@@ -62,7 +62,6 @@ import org.apache.thrift.transport.TTran
*/
public class ColumnFamilyInputFormat extends InputFormat<byte[], SortedMap<byte[], IColumn>>
{
-
private static final Logger logger = LoggerFactory.getLogger(StorageService.class);
private int splitsize;
Added: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java?rev=955347&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java Wed Jun 16 18:56:57 2010
@@ -0,0 +1,252 @@
+package org.apache.cassandra.hadoop;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.auth.AllowAllAuthenticator;
+import org.apache.cassandra.auth.SimpleAuthenticator;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.thrift.AuthenticationException;
+import org.apache.cassandra.thrift.AuthenticationRequest;
+import org.apache.cassandra.thrift.AuthorizationException;
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.ColumnParent;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.thrift.KeyRange;
+import org.apache.cassandra.thrift.KeySlice;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TSocket;
+
+/**
+ * The <code>ColumnFamilyOutputFormat</code> acts as a Hadoop-specific
+ * OutputFormat that allows reduce tasks to store keys (and corresponding
+ * values) as Cassandra rows (and respective columns) in a given
+ * {@link ColumnFamily}.
+ *
+ * <p>
+ * As is the case with the {@link ColumnFamilyInputFormat}, you need to set the
+ * CF and predicate (description of columns to extract from each row) in your
+ * Hadoop job Configuration. The {@link ConfigHelper} class, through its
+ * {@link ConfigHelper#setColumnFamily} and
+ * {@link ConfigHelper#setSlicePredicate} methods, is provided to make this
+ * simple.
+ * </p>
+ *
+ * <p>
+ * By default, it prevents overwriting existing rows in the column family, by
+ * ensuring at initialization time that it contains no rows in the given slice
+ * predicate. For the sake of performance, it employs a lazy write-back caching
+ * mechanism, where its record writer batches mutations created based on the
+ * reduce's inputs (in a task-specific map). When the writer is closed, then it
+ * makes the changes official by sending a batch mutate request to Cassandra.
+ * </p>
+ *
+ * @author Karthick Sankarachary
+ */
+public class ColumnFamilyOutputFormat extends OutputFormat<byte[],List<IColumn>>
+{
+ private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyOutputFormat.class);
+
+ public static final String BATCH_THRESHOLD = "mapreduce.output.columnfamilyoutputformat.batch.threshold";
+
+ /**
+ * Check for validity of the output-specification for the job.
+ *
+ * <p>
+ * This is to validate the output specification for the job when it is a job
+ * is submitted. By default, it will prevent writes to the given column
+ * family, if it already contains one or more rows in the given slice
+ * predicate. If you wish to relax that restriction, you may override this
+ * method is a sub-class of your choosing.
+ * </p>
+ *
+ * @param context
+ * information about the job
+ * @throws IOException
+ * when output should not be attempted
+ */
+ @Override
+ public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException
+ {
+ validateConfiguration(context.getConfiguration());
+ String keyspace = ConfigHelper.getKeyspace(context.getConfiguration());
+ String columnFamily = ConfigHelper.getColumnFamily(context.getConfiguration());
+ SlicePredicate slicePredicate = ConfigHelper.getSlicePredicate(context.getConfiguration());
+ assert slicePredicate != null;
+ if (slicePredicate.column_names == null && slicePredicate.slice_range == null)
+ slicePredicate = slicePredicate.setColumn_names(new ArrayList<byte[]>());
+
+ List<KeySlice> keySlices;
+ try
+ {
+ TSocket socket = new TSocket(DatabaseDescriptor.getListenAddress().getHostName(), DatabaseDescriptor.getRpcPort());
+ Cassandra.Client client = createAuthenticatedClient(socket, context);
+ ColumnParent parent = new ColumnParent().setColumn_family(columnFamily);
+ KeyRange range = new KeyRange().setStart_key("".getBytes()).setEnd_key("".getBytes());
+ keySlices = client.get_range_slices(parent, slicePredicate, range, ConsistencyLevel.ONE);
+ }
+ catch (Exception e)
+ {
+ throw new IOException(e);
+ }
+ if (keySlices.size() > 0)
+ {
+ throw new IOException("The column family " + columnFamily
+ + " in the keyspace " + keyspace + " already has "
+ + keySlices.size() + " keys in the slice predicate "
+ + slicePredicate);
+ }
+ }
+
+ /**
+ * Get the output committer for this output format. This is responsible for
+ * ensuring the output is committed correctly.
+ *
+ * <p>
+ * This output format employs a lazy write-back caching mechanism, where the
+ * {@link RecordWriter} is responsible for collecting mutations in the
+ * {@link #MUTATIONS_CACHE}, and the {@link OutputCommitter} makes the
+ * changes official by making the change request to Cassandra.
+ * </p>
+ *
+ * @param context
+ * the task context
+ * @return an output committer
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException
+ {
+ return new NullOutputCommitter();
+ }
+
+ /**
+ * Get the {@link RecordWriter} for the given task.
+ *
+ * <p>
+ * As stated above, this {@link RecordWriter} merely batches the mutations
+ * that it defines in the {@link #MUTATIONS_CACHE}. In other words, it
+ * doesn't literally cause any changes on the Cassandra server.
+ * </p>
+ *
+ * @param context
+ * the information about the current task.
+ * @return a {@link RecordWriter} to write the output for the job.
+ * @throws IOException
+ */
+ @Override
+ public RecordWriter<byte[],List<IColumn>> getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException
+ {
+ return new ColumnFamilyRecordWriter(context);
+ }
+
+ /**
+ * Ensure that this output format has been configured correctly, with a
+ * valid keyspace, column family and slice predicate.
+ *
+ * @param conf
+ */
+ public void validateConfiguration(Configuration conf)
+ {
+ if (ConfigHelper.getKeyspace(conf) == null || ConfigHelper.getColumnFamily(conf) == null)
+ {
+ throw new UnsupportedOperationException("you must set the keyspace and columnfamily with setColumnFamily()");
+ }
+ if (ConfigHelper.getSlicePredicate(conf) == null)
+ {
+ System.out.println("Since no slice predicate was specified, all columns in "
+ + ConfigHelper.getColumnFamily(conf)
+ + " will be overwritten");
+ }
+ }
+
+ /**
+ * Return a client based on the given socket that points to the configured
+ * keyspace, and is logged in with the configured credentials.
+ *
+ * @param socket a socket pointing to a particular node, seed or otherwise
+ * @param context a job context
+ * @return a cassandra client
+ * @throws InvalidRequestException
+ * @throws TException
+ * @throws AuthenticationException
+ * @throws AuthorizationException
+ */
+ public static Cassandra.Client createAuthenticatedClient(TSocket socket, JobContext context)
+ throws InvalidRequestException, TException, AuthenticationException, AuthorizationException
+ {
+ TBinaryProtocol binaryProtocol = new TBinaryProtocol(socket, false, false);
+ Cassandra.Client client = new Cassandra.Client(binaryProtocol);
+ socket.open();
+ client.set_keyspace(ConfigHelper.getKeyspace(context.getConfiguration()));
+ Map<String, String> creds = new HashMap<String, String>();
+ creds.put(SimpleAuthenticator.USERNAME_KEY, ConfigHelper.getKeyspaceUserName(context.getConfiguration()));
+ creds.put(SimpleAuthenticator.PASSWORD_KEY, ConfigHelper.getKeyspacePassword(context.getConfiguration()));
+ AuthenticationRequest authRequest = new AuthenticationRequest(creds);
+ if (!(DatabaseDescriptor.getAuthenticator() instanceof AllowAllAuthenticator))
+ client.login(authRequest);
+ return client;
+
+ }
+
+ /**
+ * An {@link OutputCommitter} that does nothing.
+ */
+ public class NullOutputCommitter extends OutputCommitter
+ {
+ public void abortTask(TaskAttemptContext taskContext) { }
+
+ public void cleanupJob(JobContext jobContext) { }
+
+ public void commitTask(TaskAttemptContext taskContext) { }
+
+ public boolean needsTaskCommit(TaskAttemptContext taskContext)
+ {
+ return false;
+ }
+
+ public void setupJob(JobContext jobContext) { }
+
+ public void setupTask(TaskAttemptContext taskContext) { }
+ }
+}
Added: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputReducer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputReducer.java?rev=955347&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputReducer.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputReducer.java Wed Jun 16 18:56:57 2010
@@ -0,0 +1,65 @@
+package org.apache.cassandra.hadoop;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.db.Column;
+import org.apache.cassandra.db.IColumn;
+import org.apache.hadoop.mapreduce.Reducer;
+
+/**
+ * The <code>ColumnFamilyOutputReducer</code> reduces a <key, values>
+ * pair, where the value is a generic iterable type, into a list of columns that
+ * need to be mutated for that key, where each column corresponds to an element
+ * in the value.
+ *
+ * <p>
+ * The default implementation treats the VALUEIN type to be a
+ * {@link ColumnWritable}, in which case this reducer acts as an identity
+ * function.
+ *
+ * @author Karthick Sankarachary
+ *
+ * @param <KEYIN>
+ */
+public class ColumnFamilyOutputReducer<KEYIN, VALUEIN> extends Reducer<KEYIN, VALUEIN, byte[], List<IColumn>>
+{
+ public void reduce(KEYIN key, Iterable<VALUEIN> values, Context context) throws IOException, InterruptedException
+ {
+ byte[] cfKey = key.toString().getBytes();
+ List<IColumn> cfColumns = new ArrayList<IColumn>();
+ for (VALUEIN value : values)
+ {
+ ColumnWritable columnValue = map(value);
+ cfColumns.add(new Column(columnValue.getName(), columnValue.getValue()));
+ }
+ context.write(cfKey, cfColumns);
+ }
+
+ protected ColumnWritable map(VALUEIN value)
+ {
+ return (ColumnWritable) value;
+ }
+}
\ No newline at end of file
Added: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java?rev=955347&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java Wed Jun 16 18:56:57 2010
@@ -0,0 +1,343 @@
+package org.apache.cassandra.hadoop;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.cassandra.client.RingCache;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.Clock;
+import org.apache.cassandra.thrift.Column;
+import org.apache.cassandra.thrift.ColumnOrSuperColumn;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.Deletion;
+import org.apache.cassandra.thrift.Mutation;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.SliceRange;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.thrift.transport.TSocket;
+
+/**
+ * The <code>ColumnFamilyRecordWriter</code> maps the output <key, value>
+ * pairs to a Cassandra column family. In particular, it creates mutations for
+ * each column in the value, which it associates with the key, and in turn the
+ * responsible endpoint.
+ *
+ * <p>
+ * Note that, given that round trips to the server are fairly expensive, it
+ * merely batches the mutations in-memory (specifically in
+ * {@link ColumnFamilyOutputFormat#MUTATIONS_CACHE}), and leaves it to the
+ * {@link ColumnFamilyOutputCommitter} to send the batched mutations to the
+ * server in one shot.
+ * </p>
+ *
+ * <p>
+ * Furthermore, this writer groups the mutations by the endpoint responsible for
+ * the rows being affected. This allows the {@link ColumnFamilyOutputCommitter}
+ * to execute the mutations in parallel, on a endpoint-by-endpoint basis.
+ * </p>
+ *
+ * @author Karthick Sankarachary
+ * @see ColumnFamilyOutputCommitter
+ * @see ColumnFamilyOutputFormat
+ * @see OutputFormat
+ *
+ */
+final class ColumnFamilyRecordWriter extends RecordWriter<byte[],List<IColumn>>
+{
+ // The task attempt context this writer is associated with.
+ private final TaskAttemptContext context;
+
+ // The batched set of mutations grouped by endpoints.
+ private Map<InetAddress,Map<byte[],Map<String,List<Mutation>>>> mutationsByEndpoint;
+
+ // The ring cache that describes the token ranges each node in the ring is
+ // responsible for. This is what allows us to group the mutations by
+ // the endpoints they should be targeted at. The targeted endpoint
+ // essentially
+ // acts as the primary replica for the rows being affected by the mutations.
+ private RingCache ringCache;
+
+ // The number of mutations currently held in the mutations cache.
+ private long batchSize = 0L;
+ // The maximum number of mutations to hold in the mutations cache.
+ private long batchThreshold = Long.MAX_VALUE;
+
+ /**
+ * Upon construction, obtain the map that this writer will use to collect
+ * mutations, and the ring cache for the given keyspace.
+ *
+ * @param context the task attempt context
+ * @throws IOException
+ */
+ ColumnFamilyRecordWriter(TaskAttemptContext context) throws IOException
+ {
+ this.context = context;
+ this.mutationsByEndpoint = new HashMap<InetAddress,Map<byte[],Map<String,List<Mutation>>>>();
+ this.ringCache = new RingCache(ConfigHelper.getKeyspace(context.getConfiguration()));
+ this.batchThreshold = context.getConfiguration().getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, Long.MAX_VALUE);
+ }
+
+ /**
+ * Return the endpoint responsible for the given key. The selected endpoint
+ * one whose token range contains the given key.
+ *
+ * @param key
+ * the key being mutated
+ * @return the endpoint responsible for that key
+ */
+ protected InetAddress getEndpoint(byte[] key)
+ {
+ List<InetAddress> endpoints = ringCache.getEndpoint(key);
+ return endpoints != null && endpoints.size() > 0
+ ? endpoints.get(0)
+ : null;
+ }
+
+ /**
+ * Writes a key/value pair, not to the Cassandra server, but into a
+ * in-memory cache (viz. {@link #mutationsByEndpoint}.
+ *
+ * <p>
+ * If the key is to be associated with a valid value, a mutation is created
+ * for it with the given column family and columns. In the event the value
+ * in the column is missing (i.e., null), then it is marked for
+ * {@link Deletion}. Similarly, if the entire value for a key is missing
+ * (i.e., null), then the entire key is marked for {@link Deletion}.
+ * </p>
+ *
+ * @param key
+ * the key to write.
+ * @param value
+ * the value to write.
+ * @throws IOException
+ */
+ @Override
+ public synchronized void write(byte[] key, List<IColumn> value) throws IOException, InterruptedException
+ {
+ maybeFlush();
+ InetAddress endpoint = getEndpoint(key);
+ Map<byte[], Map<String, List<Mutation>>> mutationsByKey = mutationsByEndpoint.get(endpoint);
+ if (mutationsByKey == null)
+ {
+ mutationsByKey = new HashMap<byte[], Map<String, List<Mutation>>>();
+ mutationsByEndpoint.put(endpoint, mutationsByKey);
+ }
+
+ Map<String, List<Mutation>> cfMutation = new HashMap<String, List<Mutation>>();
+ mutationsByKey.put(key, cfMutation);
+
+ Clock clock = new Clock(System.currentTimeMillis());
+ List<Mutation> mutationList = new ArrayList<Mutation>();
+ cfMutation.put(ConfigHelper.getColumnFamily(context.getConfiguration()), mutationList);
+
+ if (value == null)
+ {
+ Mutation mutation = new Mutation();
+ Deletion deletion = new Deletion(clock);
+ mutation.setDeletion(deletion);
+ mutationList.add(mutation);
+ }
+ else
+ {
+ List<byte[]> columnsToDelete = new ArrayList<byte[]>();
+ for (IColumn column : value)
+ {
+ Mutation mutation = new Mutation();
+ if (column.value() == null)
+ {
+ if (columnsToDelete.size() != 1 || columnsToDelete.get(0) != null)
+ {
+ if (column.name() == null)
+ columnsToDelete.clear();
+ columnsToDelete.add(column.name());
+ }
+ }
+ else
+ {
+
+ ColumnOrSuperColumn cosc = new ColumnOrSuperColumn();
+ cosc.setColumn(new Column(column.name(), column.value(), clock));
+ mutation.setColumn_or_supercolumn(cosc);
+ }
+ mutationList.add(mutation);
+ }
+
+ if (columnsToDelete.size() > 0)
+ {
+ Mutation mutation = new Mutation();
+ Deletion deletion = new Deletion(clock);
+
+ if (columnsToDelete.size() != 1 || columnsToDelete.get(0) != null)
+ {
+ deletion.setPredicate(new SlicePredicate().setColumn_names(columnsToDelete));
+ }
+ else
+ {
+ SliceRange range = new SliceRange(new byte[]{ }, new byte[]{ }, false, Integer.MAX_VALUE);
+ deletion.setPredicate(new SlicePredicate().setSlice_range(range));
+ }
+
+ mutation.setDeletion(deletion);
+ mutationList.add(mutation);
+ }
+ }
+ }
+
+ /**
+ * Close this <code>RecordWriter</code> to future operations, but not before
+ * flushing out the batched mutations.
+ *
+ * @param context the context of the task
+ * @throws IOException
+ */
+ @Override
+ public void close(TaskAttemptContext context) throws IOException, InterruptedException
+ {
+ flush();
+ }
+
+ /**
+ * Flush the mutations cache, iff more mutations have been cached than
+ * {@link #batchThreshold}.
+ *
+ * @throws IOException
+ */
+ private void maybeFlush() throws IOException
+ {
+ if (++batchSize > batchThreshold)
+ {
+ flush();
+ batchSize = 0L;
+ }
+ }
+
+ /**
+ * Send the batched mutations over to Cassandra, and then clear the
+ * mutations cache.
+ *
+ * @throws IOException
+ */
+ protected synchronized void flush() throws IOException
+ {
+ ExecutorService executor = Executors.newCachedThreadPool();
+
+ try
+ {
+ List<Future<?>> mutationFutures = new ArrayList<Future<?>>();
+ for (Map.Entry<InetAddress, Map<byte[], Map<String, List<Mutation>>>> entry : mutationsByEndpoint.entrySet())
+ {
+ mutationFutures.add(executor.submit(new EndpointCallable(context, entry.getKey(), entry.getValue())));
+ }
+ // wait until we have all the results back
+ for (Future<?> mutationFuture : mutationFutures)
+ {
+ try
+ {
+ mutationFuture.get();
+ }
+ catch (ExecutionException e)
+ {
+ throw new IOException("Could not perform endpoint mutations", e.getCause());
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ }
+ }
+ finally
+ {
+ executor.shutdownNow();
+ mutationsByEndpoint.clear();
+ }
+
+ }
+
+ /**
+ * The <code>EndpointCallable</code> facilitates an asynchronous call to a
+ * specific node in the ring that commands it to perform a batched set of
+ * mutations. Needless to say, the given mutations are targeted at rows that
+ * the selected endpoint is responsible for (i.e., is the primary replica
+ * for).
+ */
+ public class EndpointCallable implements Callable<Void>
+ {
+ // The task attempt context associated with this callable.
+ private TaskAttemptContext taskContext;
+ // The endpoint of the primary replica for the rows being mutated
+ private InetAddress endpoint;
+ // The mutations to be performed in the node referenced by {@link
+ // #endpoint}.
+ private Map<byte[], Map<String, List<Mutation>>> mutations;
+
+ /**
+ * Constructs an {@link EndpointCallable} for the given endpoint and set
+ * of mutations.
+ *
+ * @param endpoint the endpoint wherein to execute the mutations
+ * @param mutations the mutation map expected by
+ * {@link Cassandra.Client#batch_mutate(Map, ConsistencyLevel)}
+ */
+ public EndpointCallable(TaskAttemptContext taskContext, InetAddress endpoint, Map<byte[], Map<String, List<Mutation>>> mutations)
+ {
+ this.taskContext = taskContext;
+ this.endpoint = endpoint;
+ this.mutations = mutations;
+ }
+
+ /**
+ * Perform the call to
+ * {@link Cassandra.Client#batch_mutate(Map, ConsistencyLevel)}.
+ */
+ public Void call() throws Exception
+ {
+ TSocket socket = null;
+ try
+ {
+ socket = new TSocket(endpoint.getHostName(), DatabaseDescriptor.getRpcPort());
+ Cassandra.Client client = ColumnFamilyOutputFormat.createAuthenticatedClient(socket, taskContext);
+ client.batch_mutate(mutations, ConsistencyLevel.ONE);
+ return null;
+ }
+ finally
+ {
+ if (socket != null)
+ socket.close();
+ }
+ }
+ }
+
+}
Added: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnWritable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnWritable.java?rev=955347&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnWritable.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnWritable.java Wed Jun 16 18:56:57 2010
@@ -0,0 +1,113 @@
+package org.apache.cassandra.hadoop;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Comparator;
+
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * The <code>ColumnWritable</code> is a {@link WritableComparable} that denotes
+ * a column name and value.
+ */
+public class ColumnWritable implements WritableComparable<ColumnWritable>
+{
+ // A comparator that checks if two byte arrays are the same or not.
+ public static final Comparator<byte[]> BYTE_ARRAY_COMPARATOR = new Comparator<byte[]>()
+ {
+ public int compare(byte[] o1, byte[] o2)
+ {
+ return FBUtilities.compareByteArrays(o1, o2);
+ }
+ };
+
+ // The name and value of the column this writable denotes.
+ private byte[] name, value;
+
+ public ColumnWritable(byte[] name, byte[] value)
+ {
+ setName(name);
+ setValue(value);
+ }
+
+ public byte[] getValue()
+ {
+ return value;
+ }
+
+ public void setValue(byte[] value)
+ {
+ this.value = value;
+ }
+
+ public byte[] getName()
+ {
+ return name;
+ }
+
+ public void setName(byte[] name)
+ {
+ this.name = name;
+ }
+
+ public void readFields(DataInput in) throws IOException
+ {
+ name = FBUtilities.readByteArray(in);
+ value = FBUtilities.readByteArray(in);
+ }
+
+ public void write(DataOutput out) throws IOException
+ {
+ FBUtilities.writeByteArray(name, out);
+ FBUtilities.writeByteArray(value, out);
+ }
+
+ /** Returns true iff <code>o</code> is a ColumnWritable with the same value. */
+ public boolean equals(Object o)
+ {
+ if (!(o instanceof ColumnWritable))
+ return false;
+ ColumnWritable that = (ColumnWritable) o;
+ return compareTo(that) == 0;
+ }
+
+ public int hashCode()
+ {
+ return name.hashCode() + value.hashCode();
+ }
+
+ /** Compares two ColumnWritables. */
+ public int compareTo(ColumnWritable o)
+ {
+ ColumnWritable that = (ColumnWritable) o;
+ int nameComparison = BYTE_ARRAY_COMPARATOR.compare(this.name, that.name);
+ if (nameComparison != 0)
+ return nameComparison;
+ return BYTE_ARRAY_COMPARATOR.compare(this.value, that.value);
+ }
+
+ public String toString()
+ {
+ return "{ " + name.toString() + " : " + value.toString() + " }";
+ }
+}
Added: cassandra/trunk/test/unit/org/apache/cassandra/EmbeddedServer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/EmbeddedServer.java?rev=955347&view=auto
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/EmbeddedServer.java (added)
+++ cassandra/trunk/test/unit/org/apache/cassandra/EmbeddedServer.java Wed Jun 16 18:56:57 2010
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.service.CassandraDaemon;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public class EmbeddedServer extends CleanupHelper
+{
+ protected static CassandraDaemon daemon = null;
+
+ enum GatewayService
+ {
+ Thrift, Avro
+ }
+
+ public static GatewayService getDaemonGatewayService()
+ {
+ return GatewayService.Thrift;
+ }
+
+ static ExecutorService executor = Executors.newSingleThreadExecutor();
+
+ @BeforeClass
+ public static void startCassandra() throws IOException
+
+ {
+ executor.submit(new Runnable()
+ {
+ public void run()
+ {
+ switch (getDaemonGatewayService())
+ {
+ case Avro:
+ daemon = new org.apache.cassandra.avro.CassandraDaemon();
+ break;
+ case Thrift:
+ default:
+ daemon = new org.apache.cassandra.thrift.CassandraDaemon();
+ }
+ daemon.activate();
+ }
+ });
+ try
+ {
+ TimeUnit.SECONDS.sleep(3);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ }
+
+ @AfterClass
+ public static void stopCassandra() throws Exception
+ {
+ if (daemon != null)
+ {
+ daemon.deactivate();
+ }
+ executor.shutdown();
+ executor.shutdownNow();
+ }
+
+}
Added: cassandra/trunk/test/unit/org/apache/cassandra/hadoop/ColumnFamilyOutputFormatTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/hadoop/ColumnFamilyOutputFormatTest.java?rev=955347&view=auto
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/hadoop/ColumnFamilyOutputFormatTest.java (added)
+++ cassandra/trunk/test/unit/org/apache/cassandra/hadoop/ColumnFamilyOutputFormatTest.java Wed Jun 16 18:56:57 2010
@@ -0,0 +1,222 @@
+package org.apache.cassandra.hadoop;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOError;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.cassandra.EmbeddedServer;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.CfDef;
+import org.apache.cassandra.thrift.ColumnOrSuperColumn;
+import org.apache.cassandra.thrift.ColumnParent;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.thrift.KeyRange;
+import org.apache.cassandra.thrift.KeySlice;
+import org.apache.cassandra.thrift.KsDef;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * A test case for the {@link ColumnFamilyOutputFormat}, which reads each
+ * <key, value> pair from a sequence file, maps them to a <key,
+ * column> pair, and then reduces it by aggregating the columns that
+ * correspond to the same key. Finally, the output <key, columns> pairs
+ * are written into the (Cassandra) column family associated with this
+ * {@link OutputFormat}.
+ *
+ * @author Karthick Sankarachary
+ *
+ */
+
+public class ColumnFamilyOutputFormatTest extends EmbeddedServer
+{
+ static final String KEYSPACE = "ColumnFamilyOutputFormatTestKeyspace";
+ static final String COLUMN_FAMILY = "outputColumnFamily";
+
+ private static final String INPUT_FOLDER = "columnfamily.outputtest";
+
+ private static final String INPUT_FILE = "rows.txt";
+
+ private static final int NUMBER_OF_ROWS = 3;
+ private static final int NUMBER_OF_COLUMNS = 4;
+
+ List<Integer> rowKeys;
+ List<Integer> columnValues;
+
+ private static FileSystem fs;
+
+ static
+ {
+ try
+ {
+ fs = FileSystem.getLocal(new Configuration());
+ }
+ catch (IOException ioe)
+ {
+ throw new IOError(ioe);
+ }
+ }
+
+ private Cassandra.Client thriftClient;
+
+ @Before
+ public void setup() throws Exception
+ {
+ setupCassandra();
+ defineRows();
+ createInputFile();
+ }
+
+ @After
+ public void tearDown() throws Exception
+ {
+ deleteInputFile();
+ deleteRows();
+ }
+
+ private void deleteRows() throws InvalidRequestException, TException
+ {
+ thriftClient.system_drop_column_family(COLUMN_FAMILY);
+ }
+
+ private void deleteInputFile() throws IOException
+ {
+ inputdir = new Path(INPUT_FOLDER);
+ if (!fs.delete(inputdir, true))
+ throw new IOException("Delete failed to remove " + inputdir.toString());
+ }
+
+ private void setupCassandra() throws TException, InvalidRequestException
+ {
+ /* Establish a thrift connection to the cassandra instance */
+ TSocket socket = new TSocket(DatabaseDescriptor.getListenAddress().getHostName(), DatabaseDescriptor.getRpcPort());
+ TTransport transport;
+ transport = socket;
+ TBinaryProtocol binaryProtocol = new TBinaryProtocol(transport, false, false);
+ Cassandra.Client cassandraClient = new Cassandra.Client(binaryProtocol);
+ transport.open();
+ thriftClient = cassandraClient;
+ Set<String> keyspaces = thriftClient.describe_keyspaces();
+ if (!keyspaces.contains(KEYSPACE))
+ {
+ List<CfDef> cfDefs = new ArrayList<CfDef>();
+ thriftClient.system_add_keyspace(new KsDef(KEYSPACE, "org.apache.cassandra.locator.RackUnawareStrategy", 1, cfDefs));
+ }
+ thriftClient.set_keyspace(KEYSPACE);
+
+ CfDef cfDef = new CfDef(KEYSPACE, COLUMN_FAMILY);
+ try
+ {
+ thriftClient.system_add_column_family(cfDef);
+ }
+ catch (InvalidRequestException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ private void defineRows()
+ {
+ rowKeys = new ArrayList<Integer>();
+ columnValues = new ArrayList<Integer>();
+ for (int key = 0; key < NUMBER_OF_ROWS; key++)
+ {
+ for (int columnValue = 0; columnValue < NUMBER_OF_COLUMNS; columnValue++)
+ {
+ rowKeys.add(key);
+ columnValues.add(columnValue);
+ }
+ }
+ }
+
+ Path inputdir;
+
+ private void createInputFile() throws IOException
+ {
+ inputdir = new Path(INPUT_FOLDER);
+ if (!fs.mkdirs(inputdir))
+ {
+ throw new IOException("Mkdirs failed to create " + inputdir.toString());
+ }
+
+ Path inputRows = new Path(inputdir, INPUT_FILE);
+ SequenceFile.Writer writer = SequenceFile.createWriter(fs, fs.getConf(), inputRows, IntWritable.class, IntWritable.class);
+ // OutputStream os = fs.create(inputRows);
+ // Writer writer = new OutputStreamWriter(os);
+ for (int keyValuePair = 0; keyValuePair < NUMBER_OF_ROWS * NUMBER_OF_COLUMNS; keyValuePair++)
+ writer.append(new IntWritable(rowKeys.get(keyValuePair)), new IntWritable(columnValues.get(keyValuePair)));
+ writer.close();
+ }
+
+ @Test
+ public void testCopyCF() throws Exception
+ {
+ ToolRunner.run(new Configuration(), new SampleColumnFamilyOutputTool(inputdir, COLUMN_FAMILY), new String[] {});
+ verifyOutput();
+
+ }
+
+ public void verifyOutput() throws Exception
+ {
+ List<KeySlice> keySlices = thriftClient.get_range_slices(new ColumnParent().setColumn_family(COLUMN_FAMILY),
+ new SlicePredicate().setColumn_names(new ArrayList<byte[]>()),
+ new KeyRange().setStart_key("".getBytes()).setEnd_key("".getBytes()),
+ ConsistencyLevel.ONE);
+ for (KeySlice keySlice : keySlices)
+ {
+ Integer key = Integer.parseInt(new String(keySlice.getKey()));
+ List<Integer> columnValues = new ArrayList<Integer>();
+ for (ColumnOrSuperColumn cosc : keySlice.getColumns())
+ columnValues.add(Integer.parseInt(new String(cosc.getColumn().getValue())));
+ verifyKeyValues(key, columnValues);
+ }
+ }
+
+ private void verifyKeyValues(Integer key, List<Integer> columnValues) throws Exception
+ {
+ List<Integer> outputColumnValues = new ArrayList<Integer>();
+ for (int i = 0; i < rowKeys.size(); i++)
+ {
+ if (rowKeys.get(i).equals(key))
+ outputColumnValues.add(this.columnValues.get(i));
+ }
+ columnValues.removeAll(outputColumnValues);
+ assert columnValues.isEmpty() : "Some of the input columns could not be found in the column family";
+ }
+}
Added: cassandra/trunk/test/unit/org/apache/cassandra/hadoop/SampleColumnFamilyOutputTool.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/hadoop/SampleColumnFamilyOutputTool.java?rev=955347&view=auto
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/hadoop/SampleColumnFamilyOutputTool.java (added)
+++ cassandra/trunk/test/unit/org/apache/cassandra/hadoop/SampleColumnFamilyOutputTool.java Wed Jun 16 18:56:57 2010
@@ -0,0 +1,80 @@
+package org.apache.cassandra.hadoop;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.SortedMap;
+
+import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.thrift.TException;
+
+/**
+ * The <code>SampleColumnFamilyOutputTool</code> provides a tool interface which
+ * runs a {@link SampleColumnMapper} on the <key, value> pairs obtained
+ * from a sequence file, and then reduces it through the default
+ * {@link ColumnFamilyOutputReducer}.
+ *
+ * @author Karthick Sankarachary
+ *
+ */
+public class SampleColumnFamilyOutputTool extends Configured implements Tool
+{
+ private Path inputdir;
+
+ public SampleColumnFamilyOutputTool(Path inputdir, String columnFamily)
+ {
+ this.inputdir = inputdir;
+ }
+
+ public int run(String[] args)
+ throws InvalidRequestException, TException, IOException, InterruptedException, ClassNotFoundException
+ {
+ Job job = new Job(new Configuration());
+
+ // In case your job runs out of memory, use this setting
+ // (provided you're on Hadoop 0.20.1 or later)
+ // job.getConfiguration().setInt(JobContext.IO_SORT_MB, 1);
+ ConfigHelper.setColumnFamily(job.getConfiguration(),
+ ColumnFamilyOutputFormatTest.KEYSPACE,
+ ColumnFamilyOutputFormatTest.COLUMN_FAMILY);
+ ConfigHelper.setSlicePredicate(job.getConfiguration(), new SlicePredicate());
+
+ SequenceFileInputFormat.addInputPath(job, inputdir);
+
+ job.setMapperClass(SampleColumnMapper.class);
+ job.setMapOutputKeyClass(IntWritable.class);
+ job.setMapOutputValueClass(ColumnWritable.class);
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+
+ job.setReducerClass(ColumnFamilyOutputReducer.class);
+ job.setOutputKeyClass(byte[].class);
+ job.setOutputValueClass(SortedMap.class);
+ job.setOutputFormatClass(ColumnFamilyOutputFormat.class);
+
+ job.waitForCompletion(true);
+ return 0;
+ }
+}
\ No newline at end of file
Added: cassandra/trunk/test/unit/org/apache/cassandra/hadoop/SampleColumnMapper.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/hadoop/SampleColumnMapper.java?rev=955347&view=auto
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/hadoop/SampleColumnMapper.java (added)
+++ cassandra/trunk/test/unit/org/apache/cassandra/hadoop/SampleColumnMapper.java Wed Jun 16 18:56:57 2010
@@ -0,0 +1,37 @@
+package org.apache.cassandra.hadoop;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+
+/**
+ * A sample mapper that takes a pair of input <key, value> (writable)
+ * integers, and writes them as <key, column> writables.
+ */
+public class SampleColumnMapper extends Mapper<IntWritable,IntWritable,IntWritable,ColumnWritable>
+{
+ protected void map(IntWritable key, IntWritable value, Context context) throws IOException, InterruptedException
+ {
+ byte[] columnNameAndValue = String.valueOf(value.get()).getBytes();
+ context.write(key, new ColumnWritable(columnNameAndValue, columnNameAndValue));
+ }
+}
\ No newline at end of file