You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2009/03/02 07:13:17 UTC
svn commit: r749207 [11/12] - in
/incubator/cassandra/src/org/apache/cassandra: loader/ locator/ net/
net/http/ net/io/ net/sink/ procedures/ service/ test/ tools/
Added: incubator/cassandra/src/org/apache/cassandra/test/SSTableTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/test/SSTableTest.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/test/SSTableTest.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/test/SSTableTest.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.test;
+
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.PrimaryKey;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.service.PartitionerType;
+import org.apache.cassandra.utils.BloomFilter;
+import org.apache.cassandra.utils.FBUtilities;
+
+
+public class SSTableTest
+{
+ private static void rawSSTableWrite() throws Throwable
+ {
+ SSTable ssTable = new SSTable("C:\\Engagements\\Cassandra", "Table-Test-1");
+ DataOutputBuffer bufOut = new DataOutputBuffer();
+ BloomFilter bf = new BloomFilter(1000, 8);
+ byte[] bytes = new byte[64*1024];
+ Random random = new Random();
+ for ( int i = 100; i < 1000; ++i )
+ {
+ String key = Integer.toString(i);
+ ColumnFamily cf = new ColumnFamily("Test", "Standard");
+ bufOut.reset();
+ // random.nextBytes(bytes);
+ cf.createColumn("C", "Avinash Lakshman is a good man".getBytes(), i);
+ ColumnFamily.serializer2().serialize(cf, bufOut);
+ ssTable.append(key, bufOut);
+ bf.fill(key);
+ }
+ ssTable.close(bf);
+ }
+
+ private static void hashSSTableWrite() throws Throwable
+ {
+ Map<String, ColumnFamily> columnFamilies = new HashMap<String, ColumnFamily>();
+ byte[] bytes = new byte[64*1024];
+ Random random = new Random();
+ for ( int i = 100; i < 1000; ++i )
+ {
+ String key = Integer.toString(i);
+ ColumnFamily cf = new ColumnFamily("Test", "Standard");
+ // random.nextBytes(bytes);
+ cf.createColumn("C", "Avinash Lakshman is a good man".getBytes(), i);
+ columnFamilies.put(key, cf);
+ }
+ flushForRandomPartitioner(columnFamilies);
+ }
+
+ private static void flushForRandomPartitioner(Map<String, ColumnFamily> columnFamilies) throws Throwable
+ {
+ SSTable ssTable = new SSTable("C:\\Engagements\\Cassandra", "Table-Test-1", PartitionerType.RANDOM);
+ /* List of primary keys in sorted order */
+ List<PrimaryKey> pKeys = PrimaryKey.create( columnFamilies.keySet() );
+ DataOutputBuffer buffer = new DataOutputBuffer();
+ /* Use this BloomFilter to decide if a key exists in a SSTable */
+ BloomFilter bf = new BloomFilter(pKeys.size(), 15);
+ for ( PrimaryKey pKey : pKeys )
+ {
+ buffer.reset();
+ ColumnFamily columnFamily = columnFamilies.get(pKey.key());
+ if ( columnFamily != null )
+ {
+ /* serialize the cf with column indexes */
+ ColumnFamily.serializer2().serialize( columnFamily, buffer );
+ /* Now write the key and value to disk */
+ ssTable.append(pKey.key(), pKey.hash(), buffer);
+ bf.fill(pKey.key());
+ }
+ }
+ ssTable.close(bf);
+ }
+
+ private static void readSSTable() throws Throwable
+ {
+ SSTable ssTable = new SSTable("C:\\Engagements\\Cassandra\\Table-Test-1-Data.db");
+ for ( int i = 100; i < 1000; ++i )
+ {
+ String key = Integer.toString(i);
+ DataInputBuffer bufIn = ssTable.next(key, "Test:C");
+ ColumnFamily cf = ColumnFamily.serializer().deserialize(bufIn);
+ if ( cf != null )
+ {
+ System.out.println("KEY:" + key);
+ System.out.println(cf.name());
+ Collection<IColumn> columns = cf.getAllColumns();
+ for ( IColumn column : columns )
+ {
+ System.out.println(column.name());
+ }
+ }
+ else
+ {
+ System.out.println("CF doesn't exist for key " + key);
+ }
+ }
+ }
+
+ public static void main(String[] args) throws Throwable
+ {
+ BloomFilter bf = new BloomFilter(1024*1024, 15);
+ for ( int i = 0; i < 1024*1024; ++i )
+ {
+ bf.fill(Integer.toString(i));
+ }
+
+ DataOutputBuffer bufOut = new DataOutputBuffer();
+ BloomFilter.serializer().serialize(bf, bufOut);
+ FileOutputStream fos = new FileOutputStream("C:\\Engagements\\bf.dat", true);
+ fos.write(bufOut.getData(), 0, bufOut.getLength());
+ fos.close();
+
+ FileInputStream fis = new FileInputStream("C:\\Engagements\\bf.dat");
+ byte[] bytes = new byte[fis.available()];
+ fis.read(bytes);
+ DataInputBuffer bufIn = new DataInputBuffer();
+ bufIn.reset(bytes, bytes.length );
+ BloomFilter bf2 = BloomFilter.serializer().deserialize(bufIn);
+
+ int count = 0;
+ for ( int i = 0; i < 1024*1024; ++i )
+ {
+ if ( bf.isPresent(Integer.toString(i)) )
+ ++count;
+ }
+ System.out.println(count);
+
+ //DatabaseDescriptor.init();
+ //hashSSTableWrite();
+ //rawSSTableWrite();
+ //readSSTable();
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/test/StressTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/test/StressTest.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/test/StressTest.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/test/StressTest.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,883 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.test;
+
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.analytics.AnalyticsContext;
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.ThreadFactoryImpl;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.db.ReadMessage;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.RowMutationMessage;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.IAsyncResult;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.Cassandra;
+import org.apache.cassandra.service.IResponseResolver;
+import org.apache.cassandra.service.QuorumResponseHandler;
+import org.apache.cassandra.service.ReadResponseResolver;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.batch_mutation_super_t;
+import org.apache.cassandra.service.batch_mutation_t;
+import org.apache.cassandra.service.column_t;
+import org.apache.cassandra.service.superColumn_t;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import com.facebook.thrift.protocol.TBinaryProtocol;
+import com.facebook.thrift.transport.TSocket;
+import com.facebook.thrift.transport.TTransport;
+import com.martiansoftware.jsap.*;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class StressTest
+{
+ private static Logger logger_ = Logger.getLogger(DataImporter.class);
+
+ private static final String tablename_ = new String("Test");
+
+ public static EndPoint from_ = new EndPoint("172.24.24.209", 10001);
+
+ public static EndPoint to_ = new EndPoint("hadoop071.sf2p.facebook.com", 7000);
+ private static String server_ = new String("hadoop071.sf2p.facebook.com");
+ private static final String columnFamilyColumn_ = new String("ColumnList");
+ private static final String columnFamilySuperColumn_ = new String("SuperColumnList");
+ private static final String keyFix_ = new String("");
+ private static final String columnFix_ = new String("Column-");
+ private static final String superColumnFix_ = new String("SuperColumn-");
+
+ private Cassandra.Client peerstorageClient_ = null;
+ TTransport transport_ = null;
+ private int requestsPerSecond_ = 1000;
+ private ExecutorService runner_ = null;
+
+
+ class LoadManager implements Runnable
+ {
+ private RowMutationMessage rmsg_ = null;
+ private batch_mutation_t bt_ = null;
+ private batch_mutation_super_t bts_ = null;
+
+ LoadManager(RowMutationMessage rmsg)
+ {
+ rmsg_ = rmsg;
+ }
+ LoadManager(batch_mutation_t bt)
+ {
+ bt_ = bt;
+ }
+ LoadManager(batch_mutation_super_t bts)
+ {
+ bts_ = bts;
+ }
+
+ public void run()
+ {
+ if( rmsg_ != null )
+ {
+ Message message = new Message(from_ , StorageService.mutationStage_,
+ StorageService.loadVerbHandler_, new Object[] { rmsg_ });
+ MessagingService.getMessagingInstance().sendOneWay(message, to_);
+ }
+
+ }
+ }
+
+
+ /*
+ * This function will apply the given task . It is based on a requests per
+ * second member variable which can be set to teh required ammount , it will
+ * generate only those many requests and if thos emany requests have already
+ * been entered then it will sleep . This function assumes that there is no
+ * waiting in any other part of the code so the requests are being generated
+ * instantaniously .
+ */
+ public void applyLoad(RowMutation rm) throws IOException {
+ try
+ {
+ long t = System.currentTimeMillis();
+ RowMutationMessage rmMsg = new RowMutationMessage(rm);
+ Message message = new Message(from_,
+ StorageService.mutationStage_,
+ StorageService.mutationVerbHandler_,
+ new Object[]{ rmMsg }
+ );
+ MessagingService.getMessagingInstance().sendOneWay(message, to_);
+ Thread.sleep(1, 1000000000/requestsPerSecond_);
+
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+
+ }
+
+ public void readLoad(ReadMessage readMessage)
+ {
+ IResponseResolver<Row> readResponseResolver = new ReadResponseResolver();
+ QuorumResponseHandler<Row> quorumResponseHandler = new QuorumResponseHandler<Row>(
+ 1,
+ readResponseResolver);
+ Message message = new Message(from_, StorageService.readStage_,
+ StorageService.readVerbHandler_,
+ new Object[] { readMessage });
+ MessagingService.getMessagingInstance().sendOneWay(message, to_);
+ /*IAsyncResult iar = MessagingService.getMessagingInstance().sendRR(message, to_);
+ try
+ {
+ long t = System.currentTimeMillis();
+ iar.get(2000, TimeUnit.MILLISECONDS );
+ logger_.debug("Time taken for read..."
+ + (System.currentTimeMillis() - t));
+
+ }
+ catch (Exception ex)
+ {
+ ex.printStackTrace();
+ }*/
+ }
+
+
+
+
+
+ public void randomReadColumn (int keys, int columns, int size, int tps)
+ {
+ Random random = new Random();
+ try
+ {
+ while(true)
+ {
+ int key = random.nextInt(keys) + 1;
+ String stringKey = new Integer(key).toString();
+ stringKey = stringKey + keyFix_ ;
+ int j = random.nextInt(columns) + 1;
+ ReadMessage rm = new ReadMessage(tablename_, stringKey, columnFamilyColumn_ + ":" + columnFix_ + j);
+ readLoad(rm);
+ if ( requestsPerSecond_ > 1000)
+ Thread.sleep(0, 1000000000/requestsPerSecond_);
+ else
+ Thread.sleep(1000/requestsPerSecond_);
+ }
+ }
+ catch(Exception ex)
+ {
+ ex.printStackTrace();
+ }
+ }
+
+ public void randomWriteColumn(int keys, int columns, int size, int tps)
+ {
+ Random random = new Random();
+ byte[] bytes = new byte[size];
+ int ts = 1;
+ try
+ {
+ while(true)
+ {
+ int key = random.nextInt(keys) + 1;
+ String stringKey = new Integer(key).toString();
+ stringKey = stringKey + keyFix_ ;
+ RowMutation rm = new RowMutation(tablename_, stringKey);
+ int j = random.nextInt(columns) + 1;
+ random.nextBytes(bytes);
+ rm.add( columnFamilyColumn_ + ":" + columnFix_ + j, bytes, ts);
+ if ( ts == Integer.MAX_VALUE)
+ {
+ ts = 0 ;
+ }
+ ts++;
+ for(int k = 0 ; k < requestsPerSecond_/1000 +1 ; k++ )
+ {
+ runner_.submit(new LoadManager(new RowMutationMessage(rm)));
+ }
+ try
+ {
+ if ( requestsPerSecond_ > 1000)
+ Thread.sleep(1);
+ else
+ Thread.sleep(1000/requestsPerSecond_);
+ }
+ catch ( Exception ex)
+ {
+
+ }
+ }
+ }
+ catch(Exception ex)
+ {
+ ex.printStackTrace();
+ }
+ }
+
+ public void randomReadSuperColumn(int keys, int superColumns, int columns, int size, int tps)
+ {
+ Random random = new Random();
+ try
+ {
+ while(true)
+ {
+ int key = random.nextInt(keys) + 1;
+ String stringKey = new Integer(key).toString();
+ stringKey = stringKey + keyFix_ ;
+ int i = random.nextInt(superColumns) + 1;
+ int j = random.nextInt(columns) + 1;
+ ReadMessage rm = new ReadMessage(tablename_, stringKey, columnFamilySuperColumn_ + ":" + superColumnFix_ + i + ":" + columnFix_ + j);
+ readLoad(rm);
+ }
+ }
+ catch(Exception ex)
+ {
+ ex.printStackTrace();
+ }
+ }
+
+
+ public void randomWriteSuperColumn(int keys, int superColumns,int columns, int size, int tps)
+ {
+ Random random = new Random();
+ byte[] bytes = new byte[size];
+ int ts = 1;
+ try
+ {
+ while(true)
+ {
+ int key = random.nextInt(keys) + 1;
+ String stringKey = new Integer(key).toString();
+ stringKey = stringKey + keyFix_ ;
+ RowMutation rm = new RowMutation(tablename_, stringKey);
+ int i = random.nextInt(superColumns) + 1;
+ int j = random.nextInt(columns) + 1;
+ random.nextBytes(bytes);
+ rm.add( columnFamilySuperColumn_ + ":" + superColumnFix_ + i + ":" + columnFix_ + j, bytes, ts);
+ if ( ts == Integer.MAX_VALUE )
+ {
+ ts = 0 ;
+ }
+ ts++;
+ for(int k = 0 ; k < requestsPerSecond_/1000 +1 ; k++ )
+ {
+ runner_.submit(new LoadManager(new RowMutationMessage(rm)));
+ }
+ try
+ {
+ if ( requestsPerSecond_ > 1000)
+ Thread.sleep(1);
+ else
+ Thread.sleep(1000/requestsPerSecond_);
+ }
+ catch ( Exception ex)
+ {
+
+ }
+ }
+ }
+ catch(Exception ex)
+ {
+ ex.printStackTrace();
+ }
+ }
+
+ public void bulkWriteColumn(int keys, int columns, int size, int tps)
+ {
+ Random random = new Random();
+ byte[] bytes = new byte[size];
+ int ts = 1;
+ long time = System.currentTimeMillis();
+ try
+ {
+ for(int key = 1; key <= keys ; key++)
+ {
+ String stringKey = new Integer(key).toString();
+ stringKey = stringKey + keyFix_ ;
+ RowMutation rm = new RowMutation(tablename_, stringKey);
+ for( int j = 1; j <= columns ; j++)
+ {
+ random.nextBytes(bytes);
+ rm.add( columnFamilyColumn_ + ":" + columnFix_ + j, bytes, ts);
+ }
+ RowMutationMessage rmMsg = new RowMutationMessage(rm);
+
+ for(int k = 0 ; k < requestsPerSecond_/1000 +1 ; k++ )
+ {
+ runner_.submit(new LoadManager(rmMsg));
+ }
+ try
+ {
+ if ( requestsPerSecond_ > 1000)
+ Thread.sleep(1);
+ else
+ Thread.sleep(1000/requestsPerSecond_);
+ }
+ catch ( Exception ex)
+ {
+
+ }
+
+ }
+ }
+ catch(Exception ex)
+ {
+ ex.printStackTrace();
+ }
+ System.out.println(System.currentTimeMillis() - time);
+ }
+
+ public void bulkWriteSuperColumn(int keys, int superColumns, int columns, int size, int tps)
+ {
+ Random random = new Random();
+ byte[] bytes = new byte[size];
+ int ts = 1;
+ try
+ {
+ for(int key = 1; key <= keys ; key++)
+ {
+ String stringKey = new Integer(key).toString();
+ stringKey = stringKey + keyFix_ ;
+ RowMutation rm = new RowMutation(tablename_, stringKey);
+ for( int i = 1; i <= superColumns ; i++)
+ {
+ for( int j = 1; j <= columns ; j++)
+ {
+ random.nextBytes(bytes);
+ rm.add( columnFamilySuperColumn_ + ":" + superColumnFix_ + i + ":" + columnFix_ + j, bytes, ts);
+ }
+ }
+ RowMutationMessage rmMsg = new RowMutationMessage(rm);
+ for(int k = 0 ; k < requestsPerSecond_/1000 +1 ; k++ )
+ {
+ runner_.submit(new LoadManager(rmMsg));
+ }
+ try
+ {
+ if ( requestsPerSecond_ > 1000)
+ Thread.sleep(1);
+ else
+ Thread.sleep(1000/requestsPerSecond_);
+ }
+ catch ( Exception ex)
+ {
+
+ }
+ }
+ }
+ catch(Exception ex)
+ {
+ ex.printStackTrace();
+ }
+ }
+
+ // Stress the server using the thrift API
+
+ public Cassandra.Client connect() {
+ int port = 9160;
+ TSocket socket = new TSocket(server_, port);
+ if(transport_ != null)
+ transport_.close();
+ transport_ = socket;
+
+ TBinaryProtocol binaryProtocol = new TBinaryProtocol(transport_, false,
+ false);
+ Cassandra.Client peerstorageClient = new Cassandra.Client(
+ binaryProtocol);
+ try
+ {
+ transport_.open();
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ }
+ return peerstorageClient;
+ }
+
+ public void applyThrift(String table, String key, String columnFamily, byte[] bytes, long ts ) {
+
+ try {
+ if ( requestsPerSecond_ > 1000)
+ Thread.sleep(0, 1000000000/requestsPerSecond_);
+ else
+ Thread.sleep(1000/requestsPerSecond_);
+ peerstorageClient_.insert(table, key, columnFamily, new String(bytes), ts);
+ } catch (Exception e) {
+ try {
+ peerstorageClient_ = connect();
+ peerstorageClient_.insert(table, key, columnFamily, new String(bytes), ts);
+ } catch (Exception e1) {
+ e1.printStackTrace();
+ }
+ }
+ }
+
+
+ public void apply(batch_mutation_t batchMutation) {
+
+ try {
+ if ( requestsPerSecond_ > 1000)
+ Thread.sleep(0, 1000000000/requestsPerSecond_);
+ else
+ Thread.sleep(1000/requestsPerSecond_);
+ peerstorageClient_.batch_insert(batchMutation);
+ } catch (Exception e) {
+ try {
+ peerstorageClient_ = connect();
+ peerstorageClient_.batch_insert(batchMutation);
+ } catch (Exception e1) {
+ e1.printStackTrace();
+ }
+ }
+ }
+
+ public void apply(batch_mutation_super_t batchMutation) {
+
+ try {
+ if ( requestsPerSecond_ > 1000)
+ Thread.sleep(0, 1000000000/requestsPerSecond_);
+ else
+ Thread.sleep(1000/requestsPerSecond_);
+ long t = System.currentTimeMillis();
+ peerstorageClient_.batch_insert_superColumn(batchMutation);
+ logger_.debug("Time taken for thrift..."
+ + (System.currentTimeMillis() - t));
+ } catch (Exception e) {
+ try {
+ peerstorageClient_ = connect();
+ peerstorageClient_.batch_insert_superColumn(batchMutation);
+ } catch (Exception e1) {
+ e1.printStackTrace();
+ }
+ }
+ }
+
+ public void readLoadColumn(String tableName, String key, String cf)
+ {
+ try
+ {
+ column_t column = peerstorageClient_.get_column(tableName, key, cf);
+ }
+ catch(Exception ex)
+ {
+ peerstorageClient_ = connect();
+ ex.printStackTrace();
+ }
+ }
+
+ public void randomReadColumnThrift(int keys, int columns, int size, int tps)
+ {
+ Random random = new Random();
+ try
+ {
+ while(true)
+ {
+ int key = random.nextInt(keys) + 1;
+ String stringKey = new Integer(key).toString();
+ stringKey = stringKey + keyFix_ ;
+ int j = random.nextInt(columns) + 1;
+ readLoadColumn(tablename_, stringKey, columnFamilyColumn_ + ":" + columnFix_ + j);
+ if ( requestsPerSecond_ > 1000)
+ Thread.sleep(0, 1000000000/requestsPerSecond_);
+ else
+ Thread.sleep(1000/requestsPerSecond_);
+ }
+ }
+ catch(Exception ex)
+ {
+ ex.printStackTrace();
+ }
+ }
+
+ public void randomWriteColumnThrift(int keys, int columns, int size, int tps)
+ {
+ Random random = new Random();
+ byte[] bytes = new byte[size];
+ int ts = 1;
+ try
+ {
+ while(true)
+ {
+ int key = random.nextInt(keys) + 1;
+ String stringKey = new Integer(key).toString();
+ stringKey = stringKey + keyFix_ ;
+ int j = random.nextInt(columns) + 1;
+ random.nextBytes(bytes);
+ if ( ts == Integer.MAX_VALUE)
+ {
+ ts = 0 ;
+ }
+ ts++;
+ applyThrift(tablename_, stringKey, columnFamilyColumn_ + ":" + columnFix_ + j, bytes, ts);
+ }
+ }
+ catch(Exception ex)
+ {
+ ex.printStackTrace();
+ }
+ }
+
+ public void randomReadSuperColumnThrift(int keys, int superColumns, int columns, int size, int tps)
+ {
+ Random random = new Random();
+ try
+ {
+ while(true)
+ {
+ int key = random.nextInt(keys) + 1;
+ String stringKey = new Integer(key).toString();
+ stringKey = stringKey + keyFix_ ;
+ int i = random.nextInt(superColumns) + 1;
+ int j = random.nextInt(columns) + 1;
+ readLoadColumn(tablename_, stringKey, columnFamilySuperColumn_ + ":" + superColumnFix_ + i + ":" + columnFix_ + j);
+ }
+ }
+ catch(Exception ex)
+ {
+ ex.printStackTrace();
+ }
+ }
+
+
+ public void randomWriteSuperColumnThrift(int keys, int superColumns,int columns, int size, int tps)
+ {
+ Random random = new Random();
+ byte[] bytes = new byte[size];
+ int ts = 1;
+ try
+ {
+ while(true)
+ {
+ int key = random.nextInt(keys) + 1;
+ String stringKey = new Integer(key).toString();
+ stringKey = stringKey + keyFix_ ;
+ int i = random.nextInt(superColumns) + 1;
+ int j = random.nextInt(columns) + 1;
+ random.nextBytes(bytes);
+ if ( ts == Integer.MAX_VALUE)
+ {
+ ts = 0 ;
+ }
+ ts++;
+ applyThrift(tablename_, stringKey, columnFamilySuperColumn_ + ":" + superColumnFix_ + i + ":" + columnFix_ + j, bytes, ts);
+ }
+ }
+ catch(Exception ex)
+ {
+ ex.printStackTrace();
+ }
+
+
+ }
+
+ public void bulkWriteColumnThrift(int keys, int columns, int size, int tps)
+ {
+ Random random = new Random();
+ byte[] bytes = new byte[size];
+ int ts = 1;
+ long time = System.currentTimeMillis();
+ try
+ {
+ for(int key = 1; key <= keys ; key++)
+ {
+ String stringKey = new Integer(key).toString();
+ stringKey = stringKey + keyFix_ ;
+ batch_mutation_t bt = new batch_mutation_t();
+ bt.key = stringKey;
+ bt.table = tablename_;
+ bt.cfmap = new HashMap<String,List<column_t>>();
+ ArrayList<column_t> column_arr = new ArrayList<column_t>();
+ for( int j = 1; j <= columns ; j++)
+ {
+ random.nextBytes(bytes);
+ column_arr.add(new column_t(columnFix_ + j, bytes.toString(), ts));
+ }
+ bt.cfmap.put(columnFamilyColumn_, column_arr);
+ apply(bt);
+ }
+ }
+ catch(Exception ex)
+ {
+ ex.printStackTrace();
+ }
+ System.out.println(System.currentTimeMillis() - time);
+ }
+
+ public void bulkWriteSuperColumnThrift(int keys, int supercolumns, int columns, int size, int tps)
+ {
+ Random random = new Random();
+ byte[] bytes = new byte[size];
+ int ts = 1;
+ long time = System.currentTimeMillis();
+ try
+ {
+ for(int key = 1; key <= keys ; key++)
+ {
+ String stringKey = new Integer(key).toString();
+ stringKey = stringKey + keyFix_ ;
+ batch_mutation_super_t bt = new batch_mutation_super_t();
+ bt.key = stringKey;
+ bt.table = tablename_;
+ bt.cfmap = new HashMap<String,List<superColumn_t>>();
+ ArrayList<superColumn_t> superColumn_arr = new ArrayList<superColumn_t>();
+
+ for( int i = 1; i <= supercolumns; i++ )
+ {
+ ArrayList<column_t> column_arr = new ArrayList<column_t>();
+ for( int j = 1; j <= columns ; j++)
+ {
+ random.nextBytes(bytes);
+ column_arr.add(new column_t(columnFix_ + j, bytes.toString(), ts));
+ }
+ superColumn_arr.add(new superColumn_t(superColumnFix_ + i, column_arr));
+ }
+ bt.cfmap.put(columnFamilySuperColumn_, superColumn_arr);
+ apply(bt);
+ }
+ }
+ catch(Exception ex)
+ {
+ ex.printStackTrace();
+ }
+ System.out.println(System.currentTimeMillis() - time);
+ }
+
+ public void testCommitLog() throws Throwable
+ {
+ Random random = new Random(System.currentTimeMillis());
+ byte[] bytes = new byte[4096];
+ random.nextBytes(bytes);
+ byte[] bytes1 = new byte[64];
+ random.nextBytes(bytes1);
+ peerstorageClient_ = connect();
+ int t = 0 ;
+ while( true )
+ {
+ int key = random.nextInt();
+ int threadId = random.nextInt();
+ int word = random.nextInt();
+ peerstorageClient_.insert("Mailbox", Integer.toString(key), "MailboxMailList0:" + Integer.toString(threadId), new String(bytes1), t++);
+ peerstorageClient_.insert("Mailbox", Integer.toString(key), "MailboxThreadList0:" + Integer.toString(word) + ":" + Integer.toString(threadId), new String(bytes), t++);
+ peerstorageClient_.insert("Mailbox", Integer.toString(key), "MailboxUserList0:"+ Integer.toString(word) + ":" + Integer.toString(threadId), new String(bytes), t++);
+ }
+ }
+
+ JSAPResult ParseArguments(String[] args)
+ {
+ JSAPResult config = null;
+ try
+ {
+
+ SimpleJSAP jsap = new SimpleJSAP(
+ "StressTest",
+ "Runs stress test for Cassandra",
+ new Parameter[] {
+ new FlaggedOption( "keys", JSAP.INTEGER_PARSER, "10000", JSAP.REQUIRED, 'k', JSAP.NO_LONGFLAG,
+ "The number of keys from 1 to this number" ),
+ new FlaggedOption( "columns", JSAP.INTEGER_PARSER, "1000", JSAP.REQUIRED, 'c', JSAP.NO_LONGFLAG,
+ "The number of columns from 1 to this number" ),
+ new FlaggedOption( "supercolumns", JSAP.INTEGER_PARSER, "0", JSAP.NOT_REQUIRED, 'u', JSAP.NO_LONGFLAG,
+ "The number of super columns from 1 to this number" ),
+ new FlaggedOption( "size", JSAP.INTEGER_PARSER, "1000", JSAP.REQUIRED, 's', JSAP.NO_LONGFLAG,
+ "The Size in bytes of each column" ),
+ new FlaggedOption( "tps", JSAP.INTEGER_PARSER, "1000", JSAP.REQUIRED, 't', JSAP.NO_LONGFLAG,
+ "Requests per second" ),
+ new FlaggedOption( "thrift", JSAP.INTEGER_PARSER, "0", JSAP.REQUIRED, 'h', JSAP.NO_LONGFLAG,
+ "Use Thrift - 1 , use messaging - 0" ),
+ new FlaggedOption( "mailboxstress", JSAP.INTEGER_PARSER, "0", JSAP.REQUIRED, 'M', JSAP.NO_LONGFLAG,
+ "Run mailbox stress - 1 , hmm default - 0" ),
+ new FlaggedOption( "commitLogTest", JSAP.INTEGER_PARSER, "0", JSAP.REQUIRED, 'C', JSAP.NO_LONGFLAG,
+ "Run mailbox stress - 1 , hmm default - 0" ),
+ new QualifiedSwitch( "randomize", JSAP.STRING_PARSER, JSAP.NO_DEFAULT, JSAP.NOT_REQUIRED, 'z', "randomize",
+ "Random reads or writes" ).setList( true ).setListSeparator( ',' ),
+ new QualifiedSwitch( "reads", JSAP.STRING_PARSER, JSAP.NO_DEFAULT, JSAP.NOT_REQUIRED, 'r', "reads",
+ "Read data" ).setList( true ).setListSeparator( ',' ),
+ new QualifiedSwitch( "writes", JSAP.STRING_PARSER, JSAP.NO_DEFAULT, JSAP.NOT_REQUIRED, 'w', "writes",
+ "Write Data" ).setList( false ).setListSeparator( ',' ),
+ new QualifiedSwitch( "bulkwrites", JSAP.STRING_PARSER, JSAP.NO_DEFAULT, JSAP.NOT_REQUIRED, 'b', "bulkwrites",
+ "Bulk Write Data" ).setList( false ).setListSeparator( ',' ),
+ new UnflaggedOption( "Server", JSAP.STRING_PARSER, JSAP.REQUIRED, "Name of the server the request needs to be sent to." ) }
+ ) ;
+
+
+ config = jsap.parse(args);
+ if( !config.success())
+ {
+ System.err.println();
+ System.err.println("Usage: java "
+ + StressTest.class.getName());
+ System.err.println(" "
+ + jsap.getUsage());
+ System.err.println();
+ // show full help as well
+ System.err.println(jsap.getHelp());
+ System.err.println("**********Errors*************");
+ }
+ if ( jsap.messagePrinted() ) return null;
+ String hostName = FBUtilities.getHostName();
+ from_ = new EndPoint(hostName,10001);
+ MessagingService.getMessagingInstance().listen(from_, false);
+ }
+ catch ( Exception ex)
+ {
+ logger_.debug(LogUtil.throwableToString(ex));
+ }
+ return config;
+ }
+
+ void run( JSAPResult config ) throws Throwable
+ {
+ requestsPerSecond_ = config.getInt("tps");
+ int numThreads = requestsPerSecond_/1000 + 1;
+ if(config.getString("Server") != null)
+ {
+ server_ = config.getString("Server");
+ to_ = new EndPoint(config.getString("Server"), 7000);
+ }
+ runner_ = new DebuggableThreadPoolExecutor( numThreads,
+ numThreads,
+ Integer.MAX_VALUE,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new ThreadFactoryImpl("MEMTABLE-FLUSHER-POOL")
+ );
+ if(config.getInt("mailboxstress") == 1)
+ {
+// stressMailboxWrites();
+ return;
+ }
+ if(config.getInt("commitLogTest") == 1)
+ {
+ testCommitLog();
+ return;
+ }
+ if(config.getInt("thrift") == 0)
+ {
+ if(config.getInt("supercolumns") == 0)
+ {
+ if(config.getBoolean("reads"))
+ {
+ randomReadColumn(config.getInt("keys"), config.getInt("columns"), config.getInt("size"), config.getInt("tps"));
+ return;
+ }
+ if(config.getBoolean("bulkwrites"))
+ {
+ bulkWriteColumn(config.getInt("keys"), config.getInt("columns"), config.getInt("size"), config.getInt("tps"));
+ return;
+ }
+ if(config.getBoolean("writes"))
+ {
+ randomWriteColumn(config.getInt("keys"), config.getInt("columns"), config.getInt("size"), config.getInt("tps"));
+ return;
+ }
+ }
+ else
+ {
+ if(config.getBoolean("reads"))
+ {
+ randomReadSuperColumn(config.getInt("keys"), config.getInt("supercolumns"), config.getInt("columns"), config.getInt("size"), config.getInt("tps"));
+ return;
+ }
+ if(config.getBoolean("bulkwrites"))
+ {
+ bulkWriteSuperColumn(config.getInt("keys"), config.getInt("supercolumns"), config.getInt("columns"), config.getInt("size"), config.getInt("tps"));
+ return;
+ }
+ if(config.getBoolean("writes"))
+ {
+ randomWriteSuperColumn(config.getInt("keys"), config.getInt("supercolumns"), config.getInt("columns"), config.getInt("size"), config.getInt("tps"));
+ return;
+ }
+
+ }
+ }
+ else
+ {
+ peerstorageClient_ = connect();
+ if(config.getInt("supercolumns") == 0)
+ {
+ if(config.getBoolean("reads"))
+ {
+ randomReadColumnThrift(config.getInt("keys"), config.getInt("columns"), config.getInt("size"), config.getInt("tps"));
+ return;
+ }
+ if(config.getBoolean("bulkwrites"))
+ {
+ bulkWriteColumnThrift(config.getInt("keys"), config.getInt("columns"), config.getInt("size"), config.getInt("tps"));
+ return;
+ }
+ if(config.getBoolean("writes"))
+ {
+ randomWriteColumnThrift(config.getInt("keys"), config.getInt("columns"), config.getInt("size"), config.getInt("tps"));
+ return;
+ }
+ }
+ else
+ {
+ if(config.getBoolean("reads"))
+ {
+ randomReadSuperColumnThrift(config.getInt("keys"), config.getInt("supercolumns"), config.getInt("columns"), config.getInt("size"), config.getInt("tps"));
+ return;
+ }
+ if(config.getBoolean("bulkwrites"))
+ {
+ bulkWriteSuperColumnThrift(config.getInt("keys"), config.getInt("supercolumns"), config.getInt("columns"), config.getInt("size"), config.getInt("tps"));
+ return;
+ }
+ if(config.getBoolean("writes"))
+ {
+ randomWriteSuperColumnThrift(config.getInt("keys"), config.getInt("supercolumns"), config.getInt("columns"), config.getInt("size"), config.getInt("tps"));
+ return;
+ }
+
+ }
+
+ }
+ System.out.println(" StressTest : Done !!!!!!");
+ }
+
+ /**
+ * @param args
+ */
+ public static void main(String[] args) throws Throwable
+ {
+ LogUtil.init();
+ StressTest stressTest = new StressTest();
+ JSAPResult config = stressTest.ParseArguments( args );
+ if( config == null ) System.exit(-1);
+ stressTest.run(config);
+ }
+
+}
Added: incubator/cassandra/src/org/apache/cassandra/test/TestChoice.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/test/TestChoice.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/test/TestChoice.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/test/TestChoice.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.test;
+
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.EndPointSnitch;
+import org.apache.cassandra.locator.IEndPointSnitch;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+public class TestChoice
+{
+ private static final Logger logger_ = Logger.getLogger(TestChoice.class);
+ private Set<EndPoint> allNodes_;
+ private Map<EndPoint, List<EndPoint>> nodeToReplicaMap_ = new HashMap<EndPoint, List<EndPoint>>();
+
+ public TestChoice(Set<EndPoint> allNodes)
+ {
+ allNodes_ = new HashSet<EndPoint>(allNodes);
+ }
+
+ public void assignReplicas()
+ {
+ IEndPointSnitch snitch = new EndPointSnitch();
+ Set<EndPoint> allNodes = new HashSet<EndPoint>(allNodes_);
+ Map<EndPoint, Integer> nOccurences = new HashMap<EndPoint, Integer>();
+
+ for ( EndPoint node : allNodes_ )
+ {
+ nOccurences.put(node, 1);
+ }
+
+ for ( EndPoint node : allNodes_ )
+ {
+ allNodes.remove(node);
+ for ( EndPoint choice : allNodes )
+ {
+ List<EndPoint> replicasChosen = nodeToReplicaMap_.get(node);
+ if ( replicasChosen == null || replicasChosen.size() < DatabaseDescriptor.getReplicationFactor() - 1 )
+ {
+ try
+ {
+ if ( !snitch.isInSameDataCenter(node, choice) )
+ {
+ if ( replicasChosen == null )
+ {
+ replicasChosen = new ArrayList<EndPoint>();
+ nodeToReplicaMap_.put(node, replicasChosen);
+ }
+ int nOccurence = nOccurences.get(choice);
+ if ( nOccurence < DatabaseDescriptor.getReplicationFactor() )
+ {
+ nOccurences.put(choice, ++nOccurence);
+ replicasChosen.add(choice);
+ }
+ }
+ }
+ catch ( UnknownHostException ex )
+ {
+ ex.printStackTrace();
+ }
+ }
+ else
+ {
+ allNodes.add(node);
+ break;
+ }
+ }
+ }
+
+
+ Set<EndPoint> nodes = nodeToReplicaMap_.keySet();
+ for ( EndPoint node : nodes )
+ {
+ List<EndPoint> replicas = nodeToReplicaMap_.get(node);
+ StringBuilder sb = new StringBuilder("");
+ for ( EndPoint replica : replicas )
+ {
+ sb.append(replica);
+ sb.append(", ");
+ }
+ System.out.println(node + " ---> " + sb.toString() );
+ }
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/test/TestRunner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/test/TestRunner.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/test/TestRunner.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/test/TestRunner.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.file.FileSystem;
+import java.nio.file.FileSystems;
+import java.nio.file.Path;
+import java.nio.file.StandardWatchEventKind;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.Random;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.concurrent.ContinuationContext;
+import org.apache.cassandra.concurrent.ContinuationStage;
+import org.apache.cassandra.concurrent.ContinuationsExecutor;
+import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.IStage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.concurrent.ThreadFactoryImpl;
+import org.apache.cassandra.continuations.Suspendable;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.ReadMessage;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.io.IFileWriter;
+import org.apache.cassandra.io.SequenceFile;
+import org.apache.cassandra.mapreduce.SequentialScanner;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageDeliveryTask;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.commons.javaflow.Continuation;
+import org.apache.log4j.Logger;
+
+
+public class TestRunner
+{
+ private static EndPoint to_ = new EndPoint("tdsearch001.sf2p.facebook.com", 7000);
+
+ private static void doWrite() throws Throwable
+ {
+
+ Table table = Table.open("Mailbox");
+ Random random = new Random();
+ byte[] bytes = new byte[1024];
+ for (int i = 1001; i <= 1130; ++i)
+ {
+ String key = Integer.toString(i);
+ RowMutation rm = new RowMutation("Mailbox", key);
+ random.nextBytes(bytes);
+ for ( int j = 0; j < 1; ++j )
+ {
+ for ( int k = 0; k < 1; ++k )
+ {
+ rm.add("MailboxMailData0:SuperColumn-" + j + ":Column-" + k, bytes, k);
+ }
+ }
+ rm.apply();
+ }
+ System.out.println("Write done");
+ }
+
+ private static void doRead() throws Throwable
+ {
+ Table table = Table.open("Mailbox");
+ String key = "511055962";
+
+ /*
+ List<String> list = new ArrayList<String>();
+ list.add("SuperColumn-0");
+ Row row = table.getRow(key, "MailboxMailList0", list);
+ System.out.println(row);
+ */
+
+ ColumnFamily cf = table.get(key, "MailboxMailData0");
+ try
+ {
+ Collection<IColumn> columns = cf.getAllColumns();
+ for ( IColumn column : columns )
+ {
+ System.out.println(column.name());
+ Collection<IColumn> subColumns = column.getSubColumns();
+ for ( IColumn subColumn : subColumns )
+ {
+ System.out.println(subColumn);
+ }
+ }
+ }
+ catch ( Throwable th )
+ {
+ th.printStackTrace();
+ }
+ }
+
+ private static void doDeletes()
+ {
+
+ }
+
+
+ public static void main(String[] args) throws Throwable
+ {
+ /*
+ String name = "/var/cassandra/test.dat";
+ FileInputStream f = new FileInputStream(name);
+ File file = new File("/var/cassandra");
+ Path path = file.toPath();
+ WatchService watcher = FileSystems.getDefault().newWatchService();
+ Thread thread = new Thread( new WatchKeyMonitor(watcher) );
+ thread.start();
+
+ WatchKey wKey = path.register( watcher, StandardWatchEventKind.ENTRY_DELETE );
+ file = new File(name);
+ file.delete();
+
+ Thread.sleep(3000);
+ System.out.println("Closing the stream ...");
+ f.close();
+ */
+
+ /*
+ LogUtil.init();
+ StorageService s = StorageService.instance();
+ s.start();
+ doRead();
+ */
+ /*
+ FileOutputStream fos = new FileOutputStream("C:\\Engagements\\Test.dat", true);
+ SequentialScanner scanner = new SequentialScanner("Mailbox");
+ int count = 0;
+ while ( scanner.hasNext() )
+ {
+ Row row = scanner.next();
+ String value = row.key() + System.getProperty("line.separator");
+ fos.write( value.getBytes() );
+
+ Map<String, ColumnFamily> cfs = row.getColumnFamilies();
+ Set<String> keys = cfs.keySet();
+
+ for ( String key : keys )
+ {
+ System.out.println(row.getColumnFamily(key));
+ }
+ }
+ fos.close();
+ System.out.println("Done ...");
+ */
+ /*
+ ExecutorService es = new DebuggableThreadPoolExecutor(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactoryImpl("TEST"));
+ es.execute(new TestImpl());
+ */
+ /*
+ LogUtil.init();
+ StorageService s = StorageService.instance();
+ s.start();
+ */
+ /*
+ ReadMessage readMessage = new ReadMessage("Mailbox", args[1], "Test");
+ Message message = ReadMessage.makeReadMessage(readMessage);
+ Runnable task = new MessageDeliveryTask(message);
+
+ ExecutorService es = new ContinuationsExecutor(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactoryImpl("TEST"));
+ int end = Integer.parseInt(args[0]);
+ for ( int i = 0; i < end; ++i )
+ {
+ es.execute(task);
+ }
+ */
+
+ /*
+ if ( args[0].equals("S") )
+ {
+ ExecutorService es = new ContinuationsExecutor(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>() );
+ es.execute( new Scanner() );
+ }
+ */
+ /*
+ DataOutputBuffer bufOut = new DataOutputBuffer();
+ String value = "Avinash Lakshman";
+ for ( int i = 0; i < 100; ++i )
+ {
+ bufOut.writeUTF(Integer.toString(i));
+ bufOut.writeInt(value.length());
+ bufOut.write(value.getBytes());
+ }
+
+ DataInputBuffer bufIn = new DataInputBuffer();
+ bufIn.reset(bufOut.getData(), bufOut.getLength());
+ DataOutputBuffer buffer = new DataOutputBuffer();
+ IFileWriter writer = SequenceFile.aioWriter("C:\\Engagements\\test.dat", 64*1024);
+ SortedMap<String, Integer> offsets = getOffsets(bufIn);
+ Set<String> keys = offsets.keySet();
+ for ( String key : keys )
+ {
+ bufIn.setPosition(offsets.get(key));
+ buffer.reset();
+ buffer.write(bufIn, bufIn.readInt());
+ writer.append(key, buffer);
+ }
+ writer.close();
+ */
+ }
+}
+
+@Suspendable
+class Scanner implements Runnable
+{
+ private static final Logger logger_ = Logger.getLogger(Scanner.class);
+
+ public void run()
+ {
+ try
+ {
+ SequentialScanner scanner = new SequentialScanner("Mailbox");
+
+ while ( scanner.hasNext() )
+ {
+ Row row = scanner.next();
+ logger_.debug(row.key());
+ }
+ }
+ catch ( IOException ex )
+ {
+ ex.printStackTrace();
+ }
+ }
+}
+
+class Test
+{
+ public static void goo()
+ {
+ System.out.println("I am goo()");
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/test/UtilsTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/test/UtilsTest.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/test/UtilsTest.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/test/UtilsTest.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,49 @@
+package org.apache.cassandra.test;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.FastHashMap;
+import org.apache.cassandra.utils.GuidGenerator;
+
+public class UtilsTest
+{
+ private static void doHashPerf() throws Throwable
+ {
+ List<BigInteger> list = new ArrayList<BigInteger>();
+ for ( int i = 0; i < 100; ++i )
+ {
+ String guid = GuidGenerator.guid();
+ list.add( FBUtilities.hash(guid) );
+ }
+ Collections.sort(list);
+
+ int startValue = 1000000;
+
+ while ( true )
+ {
+ long start = System.currentTimeMillis();
+ for ( int i = 0; i < 1024; ++i )
+ {
+ String key = Integer.toString(startValue + i);
+ BigInteger hash = FBUtilities.hash(key);
+ Collections.binarySearch(list, hash);
+ }
+ System.out.println("TIME TAKEN: " + (System.currentTimeMillis() - start));
+ Thread.sleep(100);
+ }
+ }
+
+ public static void main(String[] args) throws Throwable
+ {
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/tools/AdminTool.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/tools/AdminTool.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/tools/AdminTool.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/tools/AdminTool.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools;
+
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.RowMutationMessage;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.BasicUtilities;
+import org.apache.cassandra.utils.LogUtil;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class AdminTool
+{
+
+ String server_ = null;
+ String tableName_ = "Mailbox";
+ String key_ = "Random";
+ String cf1_ = "MailboxThreadList0";
+ String cf2_ = "MailboxUserList0";
+ String cf3_ = "MailboxMailList0";
+ String cf4_ = "MailboxMailData0";
+// String cf5_ = "MailboxUserList";
+ public static EndPoint from_ = new EndPoint("hadoop071.sf2p.facebook.com", 10001);
+ private static final String[] servers_ =
+ {
+ "insearch001.sf2p.facebook.com",
+ "insearch002.sf2p.facebook.com",
+ "insearch003.sf2p.facebook.com",
+ "insearch004.sf2p.facebook.com",
+ "insearch005.sf2p.facebook.com",
+ "insearch016.sf2p.facebook.com",
+ "insearch007.sf2p.facebook.com",
+ "insearch008.sf2p.facebook.com",
+ "insearch009.sf2p.facebook.com",
+ "insearch010.sf2p.facebook.com",
+ "insearch011.sf2p.facebook.com",
+ "insearch012.sf2p.facebook.com",
+ "insearch013.sf2p.facebook.com",
+ "insearch014.sf2p.facebook.com",
+ "insearch015.sf2p.facebook.com",
+ "insearch016.sf2p.facebook.com",
+ "insearch017.sf2p.facebook.com",
+ "insearch018.sf2p.facebook.com",
+ "insearch019.sf2p.facebook.com",
+ "insearch020.sf2p.facebook.com",
+ "insearch021.sf2p.facebook.com",
+ "insearch022.sf2p.facebook.com",
+ "insearch023.sf2p.facebook.com",
+ "insearch024.sf2p.facebook.com",
+ "insearch025.sf2p.facebook.com",
+ "insearch026.sf2p.facebook.com",
+ "insearch027.sf2p.facebook.com",
+ "insearch028.sf2p.facebook.com",
+ "insearch029.sf2p.facebook.com",
+ "insearch030.sf2p.facebook.com",
+ "insearch031.sf2p.facebook.com",
+ "insearch032.sf2p.facebook.com",
+ "insearch033.sf2p.facebook.com",
+ "insearch034.sf2p.facebook.com",
+ "insearch035.sf2p.facebook.com",
+ "insearch036.sf2p.facebook.com",
+ "insearch037.sf2p.facebook.com",
+ "insearch038.sf2p.facebook.com",
+ "insearch039.sf2p.facebook.com",
+ "insearch040.sf2p.facebook.com",
+
+ "insearch001.ash1.facebook.com",
+ "insearch002.ash1.facebook.com",
+ "insearch003.ash1.facebook.com",
+ "insearch004.ash1.facebook.com",
+ "insearch005.ash1.facebook.com",
+ "insearch016.ash1.facebook.com",
+ "insearch007.ash1.facebook.com",
+ "insearch008.ash1.facebook.com",
+ "insearch009.ash1.facebook.com",
+ "insearch010.ash1.facebook.com",
+ "insearch011.ash1.facebook.com",
+ "insearch012.ash1.facebook.com",
+ "insearch013.ash1.facebook.com",
+ "insearch014.ash1.facebook.com",
+ "insearch015.ash1.facebook.com",
+ "insearch016.ash1.facebook.com",
+ "insearch017.ash1.facebook.com",
+ "insearch018.ash1.facebook.com",
+ "insearch019.ash1.facebook.com",
+ "insearch020.ash1.facebook.com",
+ "insearch021.ash1.facebook.com",
+ "insearch022.ash1.facebook.com",
+ "insearch023.ash1.facebook.com",
+ "insearch024.ash1.facebook.com",
+ "insearch025.ash1.facebook.com",
+ "insearch026.ash1.facebook.com",
+ "insearch027.ash1.facebook.com",
+ "insearch028.ash1.facebook.com",
+ "insearch029.ash1.facebook.com",
+ "insearch030.ash1.facebook.com",
+ "insearch031.ash1.facebook.com",
+ "insearch032.ash1.facebook.com",
+ "insearch033.ash1.facebook.com",
+ "insearch034.ash1.facebook.com",
+ "insearch035.ash1.facebook.com",
+ "insearch036.ash1.facebook.com",
+ "insearch037.ash1.facebook.com",
+ "insearch038.ash1.facebook.com",
+ "insearch039.ash1.facebook.com",
+ "insearch040.ash1.facebook.com",
+ };
+
+ AdminTool()
+ {
+ server_ = null;
+ }
+
+ AdminTool(String server)
+ {
+ server_ = server;
+ }
+
+ public void run(int operation, String columnFamilyName, long skip) throws Throwable
+ {
+ byte[] bytes = BasicUtilities.longToByteArray( skip );
+ RowMutation rm = new RowMutation(tableName_, key_);
+ if( columnFamilyName == null )
+ {
+ rm.add(Table.recycleBin_ + ":" + cf1_, bytes, operation);
+ rm.add(Table.recycleBin_ + ":" + cf2_, bytes, operation);
+ rm.add(Table.recycleBin_ + ":" + cf3_, bytes, operation);
+ rm.add(Table.recycleBin_ + ":" + cf4_, bytes, operation);
+// rm.add(Table.recycleBin_ + ":" + cf5_, bytes, operation);
+ }
+ else
+ {
+ rm.add(Table.recycleBin_ + ":" + columnFamilyName, bytes, operation);
+ }
+ RowMutationMessage rmMsg = new RowMutationMessage(rm);
+ if( server_ != null)
+ {
+ Message message = RowMutationMessage.makeRowMutationMessage(rmMsg, StorageService.binaryVerbHandler_);
+ EndPoint to = new EndPoint(server_, 7000);
+ MessagingService.getMessagingInstance().sendOneWay(message, to);
+ }
+ else
+ {
+ for( String server : servers_ )
+ {
+ Message message = RowMutationMessage.makeRowMutationMessage(rmMsg, StorageService.binaryVerbHandler_);
+ EndPoint to = new EndPoint(server, 7000);
+ MessagingService.getMessagingInstance().sendOneWay(message, to);
+ }
+ }
+ }
+
+
+ /**
+ * @param args
+ */
+ public static void main(String[] args) throws Throwable
+ {
+ LogUtil.init();
+ AdminTool postLoad = null;
+ int operation = 1;
+ String columnFamilyName = null;
+ long skip = 0L;
+ if(args.length < 1 )
+ {
+ System.out.println("Usage: PostLoad <serverName> < operation 1- flushBinary 2 - compactions 3- flush> <ColumnFamilyName> <skip factor for compactions> or PostLoad <-all> <operation> <ColumnFamilyName> <skip factor for compactions>");
+ }
+ if(args[0].equals("-all"))
+ {
+ postLoad = new AdminTool();
+ }
+ else
+ {
+ postLoad = new AdminTool(args[0]);
+ }
+ if(args.length > 1 )
+ operation = Integer.parseInt(args[1]);
+ if(args.length > 2 )
+ columnFamilyName = args[2];
+ if(args.length > 3 )
+ skip = Long.parseLong(args[3]);
+ postLoad.run(operation, columnFamilyName, skip);
+
+ Thread.sleep(10000);
+ System.out.println("Exiting app...");
+ System.exit(0);
+ }
+
+}
Added: incubator/cassandra/src/org/apache/cassandra/tools/ClusterTool.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/tools/ClusterTool.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/tools/ClusterTool.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/tools/ClusterTool.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,107 @@
+package org.apache.cassandra.tools;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.net.InetAddress;
+
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+
+
+public class ClusterTool
+{
+ public static final String SET_TOKEN = "settoken";
+ public static final String HASH_KEY = "hash";
+ public static final String BUILD_INDEX = "build_index";
+ public static final String READ_TEST = "read_test";
+ public static final String WRITE_TEST = "write_test";
+
+ public static void applyToken(String serverName, BigInteger token) throws IOException
+ {
+ try
+ {
+ EndPoint from = new EndPoint(InetAddress.getLocalHost().getHostName(), 7000);
+ System.out.println("Updating token of server " + serverName + " with token " + token);
+ Message message = new Message(from, "", StorageService.tokenVerbHandler_, new Object[]{ token.toByteArray() });
+ EndPoint ep = new EndPoint(serverName, 7000);
+ MessagingService.getMessagingInstance().sendOneWay(message, ep);
+ Thread.sleep(1000);
+ System.out.println("Successfully calibrated " + serverName);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace(System.out);
+ }
+ }
+
+ public static void printUsage()
+ {
+ System.out.println("Usage: java -jar <cassandra-tools.jar> <command> <options>");
+ System.out.println("Commands:");
+ System.out.println("\t" + SET_TOKEN + " <server> <token>");
+ System.out.println("\t" + HASH_KEY + " <key>");
+ System.out.println("\t" + BUILD_INDEX + " <full path to the data file>");
+ System.out.println("\t" + READ_TEST + " <number of threads> <requests per sec per thread> <machine(s) to read (':' separated list)>");
+ System.out.println("\t" + WRITE_TEST + " <number of threads> <requests per sec per thread> <machine(s) to write (':' separated list)>");
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ if(args.length < 2)
+ {
+ printUsage();
+ return;
+ }
+
+ int argc = 0;
+ try
+ {
+ /* set the token for a particular node in the Cassandra cluster */
+ if(SET_TOKEN.equals(args[argc]))
+ {
+ String serverName = args[argc + 1];
+ BigInteger token = new BigInteger(args[argc + 2]);
+ //System.out.println("Calibrating " + serverName + " with token " + token);
+ applyToken(serverName, token);
+ }
+ /* Print the hash of a given key */
+ else if(HASH_KEY.equals(args[argc]))
+ {
+ System.out.println("Hash = [" + StorageService.hash(args[argc + 1]) + "]");
+ }
+ /* build indexes given the data file */
+ else if(BUILD_INDEX.equals(args[argc]))
+ {
+ IndexBuilder.main(args);
+ }
+ /* test reads */
+ else if(READ_TEST.equals(args[argc]))
+ {
+ System.out.println("Testing reads...");
+ int numThreads = Integer.parseInt(args[argc + 1]);
+ int rpsPerThread = Integer.parseInt(args[argc + 2]);
+ String machinesToRead = args[argc + 3];
+// ReadTest.runReadTest(numThreads, rpsPerThread, machinesToRead);
+ }
+ /* test writes */
+ else if(WRITE_TEST.equals(args[argc]))
+ {
+ System.out.println("Testing writes...");
+ int numThreads = Integer.parseInt(args[argc + 1]);
+ int rpsPerThread = Integer.parseInt(args[argc + 2]);
+ String machinesToWrite = args[argc + 3];
+// WriteTest.runWriteTest(numThreads, rpsPerThread, machinesToWrite);
+ }
+ } catch(Exception e)
+ {
+ System.err.println("Exception " + e.getMessage());
+ e.printStackTrace(System.err);
+ printUsage();
+ }
+
+ System.exit(0);
+ }
+
+}
Added: incubator/cassandra/src/org/apache/cassandra/tools/FileSizeTokenGenerator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/tools/FileSizeTokenGenerator.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/tools/FileSizeTokenGenerator.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/tools/FileSizeTokenGenerator.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools;
+
+import java.io.File;
+import java.io.IOException;
+import java.math.BigInteger;
+
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.io.IFileReader;
+import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.io.SequenceFile;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.io.*;
+import org.apache.cassandra.utils.*;
+
+public class FileSizeTokenGenerator
+{
+ private static Logger logger_ = Logger.getLogger(IndexBuilder.class);
+
+ public static void main(String[] args)
+ {
+ if ( args.length != 4 )
+ {
+ System.out.println("Usage : java com.facebook.infrastructure.tools.IndexBuilder <full path to the data file> < split factor>");
+ System.exit(1);
+ }
+
+ try
+ {
+ int splitCount = Integer.parseInt(args[3]);
+ BigInteger l = new BigInteger(args[1]);
+ BigInteger h = new BigInteger(args[2]);
+ long totalSize = getTotalSize(args[0], l, h);
+ System.out.println(" Total Size : " + totalSize);
+ BigInteger[] tokens = generateTokens(args[0], l, h, totalSize, splitCount);
+ int i = 0 ;
+ for( BigInteger token : tokens)
+ {
+ System.out.println(i++ + " th Token " + token);
+ }
+ }
+ catch( Throwable th )
+ {
+ logger_.warn(LogUtil.throwableToString(th));
+ }
+ }
+
+ private static long getTotalSize(String dataFile, BigInteger l , BigInteger h) throws IOException
+ {
+ final int bufferSize = 64*1024;
+
+ IFileReader dataReader = SequenceFile.bufferedReader(dataFile, bufferSize);
+ DataOutputBuffer bufOut = new DataOutputBuffer();
+ DataInputBuffer bufIn = new DataInputBuffer();
+ long totalSize = 0;
+ try
+ {
+ while ( !dataReader.isEOF() )
+ {
+ bufOut.reset();
+ /* Record the position of the key. */
+ dataReader.next(bufOut);
+ bufIn.reset(bufOut.getData(), bufOut.getLength());
+ /* Key just read */
+ String key = bufIn.readUTF();
+ if ( !key.equals(SSTable.blockIndexKey_) && l.compareTo(StorageService.hash(key)) < 0 && h.compareTo(StorageService.hash(key)) > 0 )
+ {
+ int sz = bufIn.readInt();
+ byte[] keyData = new byte[sz];
+ bufIn.read(keyData, 0, sz);
+ totalSize= totalSize + sz;
+ }
+ }
+ }
+ finally
+ {
+ dataReader.close();
+ }
+ return totalSize;
+ }
+
+
+ private static BigInteger[] generateTokens(String dataFile,BigInteger l , BigInteger h, long totalSize, int splitCount) throws IOException
+ {
+ final int bufferSize = 64*1024;
+
+ IFileReader dataReader = SequenceFile.bufferedReader(dataFile, bufferSize);
+ DataOutputBuffer bufOut = new DataOutputBuffer();
+ DataInputBuffer bufIn = new DataInputBuffer();
+ long splitFactor = totalSize/(splitCount+1);
+ long curSize = 0;
+ BigInteger[] tokens = new BigInteger[splitCount];
+ int k = 0 ;
+ try
+ {
+ while ( !dataReader.isEOF())
+ {
+ bufOut.reset();
+ /* Record the position of the key. */
+ dataReader.next(bufOut);
+ bufIn.reset(bufOut.getData(), bufOut.getLength());
+ /* Key just read */
+ String key = bufIn.readUTF();
+ if ( !key.equals(SSTable.blockIndexKey_) && l.compareTo(StorageService.hash(key)) < 0 && h.compareTo(StorageService.hash(key)) > 0 )
+ {
+ int sz = bufIn.readInt();
+ curSize = curSize + sz;
+ byte[] keyData = new byte[sz];
+ bufIn.read(keyData, 0, sz);
+
+ if( curSize > splitFactor)
+ {
+ tokens[k++] = StorageService.hash(key);
+ curSize = 0 ;
+ if( k == splitCount)
+ {
+ break;
+ }
+ }
+ }
+ }
+ }
+ finally
+ {
+ dataReader.close();
+ }
+ return tokens;
+ }
+
+}
Added: incubator/cassandra/src/org/apache/cassandra/tools/IndexBuilder.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/tools/IndexBuilder.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/tools/IndexBuilder.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/tools/IndexBuilder.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools;
+
+import java.io.IOException;
+
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.io.IFileReader;
+import org.apache.cassandra.io.IFileWriter;
+import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.io.SequenceFile;
+import org.apache.cassandra.utils.BasicUtilities;
+import org.apache.cassandra.utils.BloomFilter;
+
+import org.apache.cassandra.io.*;
+import org.apache.cassandra.utils.*;
+
+public class IndexBuilder
+{
+ private static final int bufferSize_ = 64*1024;
+
+ public static void main(String[] args)
+ {
+ if ( args.length != 1 )
+ {
+ System.out.println("Usage : java com.facebook.infrastructure.tools.IndexBuilder <full path to the data file>");
+ System.exit(1);
+ }
+
+ try
+ {
+ int blockCount = getBlockCount(args[0]);
+ System.out.println("Number of keys in the data file : " + (blockCount + 1)*SSTable.indexInterval());
+ buildIndex(args[0], blockCount);
+ }
+ catch(Throwable t)
+ {
+ System.err.println("Exception: " + t.getMessage());
+ t.printStackTrace(System.err);
+ }
+ }
+
+ private static int getBlockCount(String dataFile) throws IOException
+ {
+ IFileReader dataReader = SequenceFile.bufferedReader(dataFile, bufferSize_);
+ DataOutputBuffer bufOut = new DataOutputBuffer();
+ DataInputBuffer bufIn = new DataInputBuffer();
+ int blockCount = 0;
+
+ try
+ {
+ while ( !dataReader.isEOF() )
+ {
+ bufOut.reset();
+ dataReader.next(bufOut);
+ bufIn.reset(bufOut.getData(), bufOut.getLength());
+ /* Key just read */
+ String key = bufIn.readUTF();
+ if ( key.equals(SSTable.blockIndexKey_) )
+ {
+ ++blockCount;
+ }
+ }
+ }
+ finally
+ {
+ dataReader.close();
+ }
+ return blockCount;
+ }
+
+ private static void buildIndex(String dataFile, int blockCount) throws IOException
+ {
+ String indexFile = dataFile.replace("-Data.", "-Index.");
+ final int bufferSize = 64*1024;
+
+ IFileWriter indexWriter = SequenceFile.bufferedWriter(indexFile, bufferSize);
+ IFileReader dataReader = SequenceFile.bufferedReader(dataFile, bufferSize);
+ DataOutputBuffer bufOut = new DataOutputBuffer();
+ DataInputBuffer bufIn = new DataInputBuffer();
+ /* BloomFilter of all data in the data file */
+ BloomFilter bf = new BloomFilter((SSTable.indexInterval() + 1)*blockCount, 8);
+
+ try
+ {
+ while ( !dataReader.isEOF() )
+ {
+ bufOut.reset();
+ /* Record the position of the key. */
+ long blockIndexOffset = dataReader.getCurrentPosition();
+ dataReader.next(bufOut);
+ bufIn.reset(bufOut.getData(), bufOut.getLength());
+ /* Key just read */
+ String key = bufIn.readUTF();
+ if ( key.equals(SSTable.blockIndexKey_) )
+ {
+ /* Ignore the size of the data associated with the block index */
+ bufIn.readInt();
+ /* Number of keys in the block. */
+ int blockSize = bufIn.readInt();
+ /* Largest key in the block */
+ String largestKey = null;
+
+ /*
+ * Read the keys in this block and find the largest key in
+ * this block. This is the key that gets written into the
+ * index file.
+ */
+ for ( int i = 0; i < blockSize; ++i )
+ {
+ String currentKey = bufIn.readUTF();
+ bf.fill(currentKey);
+ if ( largestKey == null )
+ {
+ largestKey = currentKey;
+ }
+ else
+ {
+ if ( currentKey.compareTo(largestKey) > 0 )
+ {
+ /* record this key */
+ largestKey = currentKey;
+ }
+ }
+ /* read the position of the key and the size of key data and throws it away. */
+ bufIn.readLong();
+ bufIn.readLong();
+ }
+
+ /*
+ * Write into the index file the largest key in the block
+ * and the offset of the block index in the data file.
+ */
+ indexWriter.append(largestKey, BasicUtilities.longToByteArray(blockIndexOffset));
+ }
+ }
+ }
+ finally
+ {
+ dataReader.close();
+ /* Cache the bloom filter */
+ SSTable.storeBloomFilter(dataFile, bf);
+ /* Write the bloom filter into the index file */
+ bufOut.reset();
+ BloomFilter.serializer().serialize(bf, bufOut);
+ byte[] bytes = new byte[bufOut.getLength()];
+ System.arraycopy(bufOut.getData(), 0, bytes, 0, bytes.length);
+ indexWriter.close(bytes, bytes.length);
+ bufOut.close();
+ }
+ }
+
+}
Added: incubator/cassandra/src/org/apache/cassandra/tools/KeyChecker.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/tools/KeyChecker.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/tools/KeyChecker.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/tools/KeyChecker.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
+import java.io.RandomAccessFile;
+
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.LogUtil;
+
+
+public class KeyChecker
+{
+ private static final int bufSize_ = 128*1024*1024;
+ /*
+ * This function checks if the local storage endpoint
+ * is reponsible for storing this key .
+ */
+ private static boolean checkIfProcessKey(String key)
+ {
+ EndPoint[] endPoints = StorageService.instance().getNStorageEndPoint(key);
+ EndPoint localEndPoint = StorageService.getLocalStorageEndPoint();
+ for(EndPoint endPoint : endPoints)
+ {
+ if(endPoint.equals(localEndPoint))
+ return true;
+ }
+ return false;
+ }
+
+ public static void main(String[] args) throws Throwable
+ {
+ if ( args.length != 1 )
+ {
+ System.out.println("Usage : java com.facebook.infrastructure.tools.KeyChecker <file containing all keys>");
+ System.exit(1);
+ }
+
+ LogUtil.init();
+ StorageService s = StorageService.instance();
+ s.start();
+
+ /* Sleep for proper discovery */
+ Thread.sleep(240000);
+ /* Create the file for the missing keys */
+ RandomAccessFile raf = new RandomAccessFile( "Missing-" + FBUtilities.getHostName() + ".dat", "rw");
+
+ /* Start reading the file that contains the keys */
+ BufferedReader bufReader = new BufferedReader( new InputStreamReader( new FileInputStream(args[0]) ), KeyChecker.bufSize_ );
+ String key = null;
+ boolean bStarted = false;
+
+ while ( ( key = bufReader.readLine() ) != null )
+ {
+ if ( !bStarted )
+ {
+ bStarted = true;
+ System.out.println("Started the processing of the file ...");
+ }
+
+ key = key.trim();
+ if ( StorageService.instance().isPrimary(key) )
+ {
+ System.out.println("Processing key " + key);
+ Row row = Table.open("Mailbox").getRow(key, "MailboxMailList0");
+ if ( row.isEmpty() )
+ {
+ System.out.println("MISSING KEY : " + key);
+ raf.write(key.getBytes());
+ raf.write(System.getProperty("line.separator").getBytes());
+ }
+ }
+ }
+ System.out.println("DONE checking keys ...");
+ raf.close();
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/tools/KeyExtracter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/tools/KeyExtracter.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/tools/KeyExtracter.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/tools/KeyExtracter.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools;
+
+import java.io.DataOutputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.io.IFileReader;
+import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.io.SequenceFile;
+import org.apache.cassandra.io.SSTable.KeyPositionInfo;
+import org.apache.cassandra.utils.BasicUtilities;
+
+
+public class KeyExtracter
+{
+ private static final int bufferSize_ = 64*1024;
+
+ public static void main(String[] args) throws Throwable
+ {
+ if ( args.length != 3 )
+ {
+ System.out.println("Usage : java com.facebook.infrastructure.tools.IndexBuilder <key to extract> <data file> <output file>");
+ System.exit(1);
+ }
+ String keyToExtract = args[0];
+ String dataFile = args[1];
+ String outputFile = args[2];
+
+ extractKeyIntoFile(keyToExtract, dataFile, outputFile);
+ }
+
+ public static boolean extractKeyIntoFile(String keyToExtract, String dataFile, String outputFile) throws IOException
+ {
+ IFileReader dataReader = SequenceFile.bufferedReader(dataFile, bufferSize_);
+ DataOutputBuffer bufOut = new DataOutputBuffer();
+ DataInputBuffer bufIn = new DataInputBuffer();
+
+ try
+ {
+ while ( !dataReader.isEOF() )
+ {
+ bufOut.reset();
+ dataReader.next(bufOut);
+ bufIn.reset(bufOut.getData(), bufOut.getLength());
+ /* Key just read */
+ String key = bufIn.readUTF();
+ /* check if we want this key */
+ if ( key.equals(keyToExtract) )
+ {
+ int keySize = bufIn.readInt();
+ byte[] keyData = new byte[keySize];
+ bufIn.read(keyData, 0, keySize);
+
+ /* write the key data into a file */
+ RandomAccessFile raf = new RandomAccessFile(outputFile, "rw");
+ raf.writeUTF(key);
+ raf.writeInt(keySize);
+ raf.write(keyData);
+ dumpBlockIndex(keyToExtract, 0L, keySize, raf);
+ raf.close();
+ return true;
+ }
+ }
+ }
+ finally
+ {
+ dataReader.close();
+ }
+
+ return false;
+ }
+
+ private static void dumpBlockIndex(String key, long position, long size, RandomAccessFile raf) throws IOException
+ {
+ DataOutputBuffer bufOut = new DataOutputBuffer();
+ /* Number of keys in this block */
+ bufOut.writeInt(1);
+ bufOut.writeUTF(key);
+ bufOut.writeLong(position);
+ bufOut.writeLong(size);
+
+ /* Write out the block index. */
+ raf.writeUTF(SSTable.blockIndexKey_);
+ raf.writeInt(bufOut.getLength());
+ raf.write(bufOut.getData(), 0, bufOut.getLength());
+ }
+}