You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/09/03 17:14:50 UTC
svn commit: r810988 - in /incubator/cassandra/branches/cassandra-0.4:
CHANGES.txt contrib/ contrib/bmt_example/
contrib/bmt_example/CassandraBulkLoader.java
Author: jbellis
Date: Thu Sep 3 15:14:50 2009
New Revision: 810988
URL: http://svn.apache.org/viewvc?rev=810988&view=rev
Log:
add contrib/bmt_example/CassandraBulkLoader.java. Patch by Chris Goffinet; reviewed by jbellis for CASSSANDRA-398
Added:
incubator/cassandra/branches/cassandra-0.4/contrib/
incubator/cassandra/branches/cassandra-0.4/contrib/bmt_example/
incubator/cassandra/branches/cassandra-0.4/contrib/bmt_example/CassandraBulkLoader.java
Modified:
incubator/cassandra/branches/cassandra-0.4/CHANGES.txt
Modified: incubator/cassandra/branches/cassandra-0.4/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.4/CHANGES.txt?rev=810988&r1=810987&r2=810988&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.4/CHANGES.txt (original)
+++ incubator/cassandra/branches/cassandra-0.4/CHANGES.txt Thu Sep 3 15:14:50 2009
@@ -8,7 +8,7 @@
disagree on what data was present
* Snapshot support via JMX
* BinaryMemtable support for bulk load via hadoop; see
- https://github.com/lenn0x/Cassandra-Hadoop-BMT/tree
+ contrib/bmt_example for an example of using it.
* Thrift API has changed a _lot_:
- removed time-sorted CFs; instead, user-defined comparators
may be defined on the column names, which are now byte arrays.
Added: incubator/cassandra/branches/cassandra-0.4/contrib/bmt_example/CassandraBulkLoader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.4/contrib/bmt_example/CassandraBulkLoader.java?rev=810988&view=auto
==============================================================================
--- incubator/cassandra/branches/cassandra-0.4/contrib/bmt_example/CassandraBulkLoader.java (added)
+++ incubator/cassandra/branches/cassandra-0.4/contrib/bmt_example/CassandraBulkLoader.java Thu Sep 3 15:14:50 2009
@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ /**
+ * Cassandra has a backdoor called the Binary Memtable. The purpose of this backdoor is to
+ * mass import large amounts of data, without using the Thrift interface.
+ *
+ * Inserting data through the binary memtable, allows you to skip the commit log overhead, and an ack
+ * from Thrift on every insert. The example below utilizes Hadoop to generate all the data necessary
+ * to send to Cassandra, and sends it using the Binary Memtable interface. What Hadoop ends up doing is
+ * creating the actual data that gets put into an SSTable as if you were using Thrift. With enough Hadoop nodes
+ * inserting the data, the bottleneck at this point should become the network.
+ *
+ * We recommend adjusting the compaction threshold to 0, while the import is running. After the import, you need
+ * to run `nodeprobe -host <IP> flush_binary <Keyspace>` on every node, as this will flush the remaining data still left
+ * in memory to disk. Then it's recommended to adjust the compaction threshold to it's original value.
+ *
+ * The example below is a sample Hadoop job, and it inserts SuperColumns. It can be tweaked to work with normal Columns.
+ *
+ * You should construct your data you want to import as rows delimited by a new line. You end up grouping by <Key>
+ * in the mapper, so that the end result generates the data set into a column oriented subset. Once you get to the
+ * reduce aspect, you can generate the ColumnFamilies you want inserted, and send it to your nodes. You need to
+ * know your tokens and ip addresses ahead of time.
+ *
+ * Author : Chris Goffinet (goffinet@digg.com)
+ */
+
+package org.apache.cassandra.bulkloader;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.math.BigInteger;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Column;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.dht.BigIntegerToken;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.SelectorManager;
+import org.apache.cassandra.service.StorageService;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.*;
+
+public class CassandraBulkLoader {
+ public static class Map extends MapReduceBase implements Mapper<Text, Text, Text, Text> {
+ private Text word = new Text();
+
+ public void map(Text key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
+ // This is a simple key/value mapper.
+ output.collect(key, value);
+ }
+ }
+ public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
+ private Path[] localFiles;
+ private ArrayList<String> tokens = new ArrayList<String>();
+ private JobConf jobconf;
+
+ public void configure(JobConf job) {
+ this.jobconf = job;
+ String cassConfig;
+
+ // Get the cached files
+ try
+ {
+ localFiles = DistributedCache.getLocalCacheFiles(job);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ cassConfig = localFiles[0].getParent().toString();
+
+ System.setProperty("storage-config",cassConfig);
+
+ startMessagingService();
+ /*
+ Populate tokens
+
+ Specify your tokens and ips below.
+
+ tokens.add("0:192.168.0.1")
+ tokens.add("14178431955039102644307275309657008810:192.168.0.2")
+ */
+
+ for (String token : this.tokens)
+ {
+ String[] values = token.split(":");
+ StorageService.instance().updateTokenMetadata(new BigIntegerToken(new BigInteger(values[0])),new EndPoint(values[1], 7000));
+ }
+ }
+ public void close()
+ {
+ try
+ {
+ // release the cache
+ DistributedCache.releaseCache(new URI("/cassandra/storage-conf.xml#storage-conf.xml"), this.jobconf);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (URISyntaxException e)
+ {
+ throw new RuntimeException(e);
+ }
+ shutdownMessagingService();
+ }
+ public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException
+ {
+ ColumnFamily columnFamily;
+ String Keyspace = "Keyspace1";
+ String CFName = "Super1";
+ Message message;
+ List<ColumnFamily> columnFamilies;
+ columnFamilies = new LinkedList<ColumnFamily>();
+ String line;
+
+ /* Create a column family */
+ columnFamily = ColumnFamily.create(Keyspace, CFName);
+ while (values.hasNext()) {
+ // Split the value (line based on your own delimiter)
+ line = values.next().toString();
+ String[] fields = line.split("\1");
+ String SuperColumnName = fields[1];
+ String ColumnName = fields[2];
+ String ColumnValue = fields[3];
+ int timestamp = 0;
+ columnFamily.addColumn(new QueryPath(CFName, SuperColumnName.getBytes("UTF-8"), ColumnName.getBytes("UTF-8")), ColumnValue.getBytes(), timestamp);
+ }
+
+ columnFamilies.add(columnFamily);
+
+ /* Get serialized message to send to cluster */
+ message = createMessage(Keyspace, key.toString(), CFName, columnFamilies);
+ for (EndPoint endpoint: StorageService.instance().getNStorageEndPoint(key.toString()))
+ {
+ /* Send message to end point */
+ MessagingService.getMessagingInstance().sendOneWay(message, endpoint);
+ }
+
+ output.collect(key, new Text(" inserted into Cassandra node(s)"));
+ }
+ }
+
+ public static void runJob(String[] args)
+ {
+ JobConf conf = new JobConf(CassandraBulkLoader.class);
+
+ if(args.length >= 4)
+ {
+ conf.setNumReduceTasks(new Integer(args[3]));
+ }
+
+ try
+ {
+ // We store the cassandra storage-conf.xml on the HDFS cluster
+ DistributedCache.addCacheFile(new URI("/cassandra/storage-conf.xml#storage-conf.xml"), conf);
+ }
+ catch (URISyntaxException e)
+ {
+ throw new RuntimeException(e);
+ }
+ conf.setInputFormat(KeyValueTextInputFormat.class);
+ conf.setJobName("CassandraBulkLoader_v2");
+ conf.setMapperClass(Map.class);
+ conf.setReducerClass(Reduce.class);
+
+ conf.setOutputKeyClass(Text.class);
+ conf.setOutputValueClass(Text.class);
+
+ FileInputFormat.setInputPaths(conf, new Path(args[1]));
+ FileOutputFormat.setOutputPath(conf, new Path(args[2]));
+ try
+ {
+ JobClient.runJob(conf);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ public static Message createMessage(String Keyspace, String Key, String CFName, List<ColumnFamily> ColumnFamiles)
+ {
+ ColumnFamily baseColumnFamily;
+ DataOutputBuffer bufOut = new org.apache.cassandra.io.DataOutputBuffer();
+ RowMutation rm;
+ Message message;
+ Column column;
+
+ /* Get the first column family from list, this is just to get past validation */
+ baseColumnFamily = new ColumnFamily(CFName, "Standard",DatabaseDescriptor.getComparator(Keyspace, CFName), DatabaseDescriptor.getSubComparator(Keyspace, CFName));
+
+ for(ColumnFamily cf : ColumnFamiles) {
+ bufOut.reset();
+ try
+ {
+ ColumnFamily.serializer().serializeWithIndexes(cf, bufOut);
+ byte[] data = new byte[bufOut.getLength()];
+ System.arraycopy(bufOut.getData(), 0, data, 0, bufOut.getLength());
+
+ column = new Column(cf.name().getBytes("UTF-8"), data, 0, false);
+ baseColumnFamily.addColumn(column);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ rm = new RowMutation(Keyspace,StorageService.getPartitioner().decorateKey(Key));
+ rm.add(baseColumnFamily);
+
+ try
+ {
+ /* Make message */
+ message = rm.makeRowMutationMessage(StorageService.binaryVerbHandler_);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ return message;
+ }
+ public static void startMessagingService()
+ {
+ SelectorManager.getSelectorManager().start();
+ }
+ public static void shutdownMessagingService()
+ {
+ try
+ {
+ // Sleep just in case the number of keys we send over is small
+ Thread.sleep(3*1000);
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ MessagingService.flushAndshutdown();
+ }
+ public static void main(String[] args) throws Exception
+ {
+ runJob(args);
+ }
+}
\ No newline at end of file