You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pm...@apache.org on 2009/03/02 08:57:31 UTC
svn commit: r749218 [22/34] - in /incubator/cassandra: branches/ dist/
nightly/ site/ tags/ trunk/ trunk/lib/ trunk/src/ trunk/src/org/
trunk/src/org/apache/ trunk/src/org/apache/cassandra/
trunk/src/org/apache/cassandra/analytics/ trunk/src/org/apache...
Added: incubator/cassandra/trunk/src/org/apache/cassandra/loader/KeyType.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/loader/KeyType.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/loader/KeyType.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/loader/KeyType.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,125 @@
+//
+// This file was generated by the JavaTM Architecture for XML Binding(JAXB) Reference Implementation, vJAXB 2.1.3 in JDK 1.6
+// See <a href="http://java.sun.com/xml/jaxb">http://java.sun.com/xml/jaxb</a>
+// Any modifications to this file will be lost upon recompilation of the source schema.
+// Generated on: 2007.10.19 at 01:36:41 PM PDT
+//
+
+
+package org.apache.cassandra.loader;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlType;
+
+
+/**
+ * <p>Java class for KeyType complex type.
+ *
+ * <p>The following schema fragment specifies the expected content contained within this class.
+ *
+ * <pre>
+ * <complexType name="KeyType">
+ * <complexContent>
+ * <restriction base="{http://www.w3.org/2001/XMLSchema}anyType">
+ * <sequence>
+ * <element name="OptimizeIt" type="{http://www.w3.org/2001/XMLSchema}boolean"/>
+ * <element name="Combiner" type="{http://www.w3.org/2001/XMLSchema}string"/>
+ * <element name="Fields" type="{}FieldCollection"/>
+ * </sequence>
+ * </restriction>
+ * </complexContent>
+ * </complexType>
+ * </pre>
+ *
+ *
+ */
+@XmlAccessorType(XmlAccessType.FIELD)
+@XmlType(name = "KeyType", propOrder = {
+ "optimizeIt",
+ "combiner",
+ "fields"
+})
+public class KeyType {
+
+ @XmlElement(name = "OptimizeIt", required = true, type = Boolean.class, nillable = true)
+ protected Boolean optimizeIt;
+ @XmlElement(name = "Combiner", required = true)
+ protected String combiner;
+ @XmlElement(name = "Fields", required = true)
+ protected FieldCollection fields;
+
+ /**
+ * Gets the value of the optimizeIt property.
+ *
+ * @return
+ * possible object is
+ * {@link Boolean }
+ *
+ */
+ public Boolean isOptimizeIt() {
+ return optimizeIt;
+ }
+
+ /**
+ * Sets the value of the optimizeIt property.
+ *
+ * @param value
+ * allowed object is
+ * {@link Boolean }
+ *
+ */
+ public void setOptimizeIt(Boolean value) {
+ this.optimizeIt = value;
+ }
+
+ /**
+ * Gets the value of the combiner property.
+ *
+ * @return
+ * possible object is
+ * {@link String }
+ *
+ */
+ public String getCombiner() {
+ return combiner;
+ }
+
+ /**
+ * Sets the value of the combiner property.
+ *
+ * @param value
+ * allowed object is
+ * {@link String }
+ *
+ */
+ public void setCombiner(String value) {
+ this.combiner = value;
+ }
+
+ /**
+ * Gets the value of the fields property.
+ *
+ * @return
+ * possible object is
+ * {@link FieldCollection }
+ *
+ */
+ public FieldCollection getFields() {
+ return fields;
+ }
+
+ /**
+ * Sets the value of the fields property.
+ *
+ * @param value
+ * allowed object is
+ * {@link FieldCollection }
+ *
+ */
+ public void setFields(FieldCollection value) {
+ this.fields = value;
+ }
+
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/loader/Loader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/loader/Loader.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/loader/Loader.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/loader/Loader.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,356 @@
+/**
+ *
+ */
+package org.apache.cassandra.loader;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.StringTokenizer;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.Unmarshaller;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.locator.EndPointSnitch;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.Token;
+import org.apache.lucene.analysis.TokenStream;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.cassandra.utils.*;
+
+
+/**
+ * This class is used to load the storage endpoints with the relevant data
+ * The data should be both what they are responsible for and what should be replicated on the specific
+ * endpoints.
+ * Population is done based on a xml file which should adhere to a schema.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+public class Loader
+{
+ private static long siesta_ = 60*1000;
+ private static Logger logger_ = Logger.getLogger( Loader.class );
+ private Importer importer_;
+ private StorageService storageService_;
+
+ public Loader(StorageService storageService)
+ {
+ storageService_ = storageService;
+ }
+
+ /*
+ * This method loads all the keys into a special column family
+ * called "RecycleBin". This column family is used for temporary
+ * processing of data and then can be recycled. The idea is that
+ * after the load is complete we have all the keys in the system.
+ * Now we force a compaction and examine the single Index file
+ * that is generated to determine how the nodes need to relocate
+ * to be perfectly load balanced.
+ *
+ * param @ rootDirectory - rootDirectory at which the parsing begins.
+ * param @ table - table that will be populated.
+ * param @ cfName - name of the column that will be populated. This is
+ * passed in so that we do not unncessary allocate temporary String objects.
+ */
+ private void preParse(File rootDirectory, String table, String cfName) throws Throwable
+ {
+ File[] files = rootDirectory.listFiles();
+
+ for ( File file : files )
+ {
+ if ( file.isDirectory() )
+ preParse(file, table, cfName);
+ else
+ {
+ String fileName = file.getName();
+ RowMutation rm = new RowMutation(table, fileName);
+ rm.add(cfName, fileName.getBytes(), 0);
+ rm.apply();
+ }
+ }
+ }
+
+ /*
+ * Merges a list of strings with a particular combiner.
+ */
+ String merge( List<String> listFields, String combiner)
+ {
+ if(listFields.size() == 0 )
+ return null;
+ if(listFields.size() == 1)
+ return listFields.get(0);
+ String mergedKey = null;
+ for(String field: listFields)
+ {
+ if(mergedKey == null)
+ {
+ mergedKey = field;
+ }
+ else
+ {
+ mergedKey = mergedKey + combiner + field;
+ }
+ }
+ return mergedKey;
+
+ }
+
+ /*
+ * This function checks if the local storage endpoint
+ * is reponsible for storing this key .
+ */
+ boolean checkIfProcessKey(String key)
+ {
+ EndPoint[] endPoints = storageService_.getNStorageEndPoint(key);
+ EndPoint localEndPoint = StorageService.getLocalStorageEndPoint();
+ for(EndPoint endPoint : endPoints)
+ {
+ if(endPoint.equals(localEndPoint))
+ return true;
+ }
+ return false;
+ }
+
+ /*
+ * This functions parses each file based on delimiters specified in the
+ * xml file. It also looks at all the parameters specified in teh xml and based
+ * on that populates the internal Row structure.
+ */
+ void parse(String filepath) throws Throwable
+ {
+ BufferedReader bufReader = new BufferedReader(new InputStreamReader(
+ new FileInputStream(filepath)), 16 * 1024 * 1024);
+ String line = null;
+ String delimiter_ = new String(",");
+ RowMutation rm = null;
+ Map<String, RowMutation> rms = new HashMap<String, RowMutation>();
+ if(importer_.columnFamily.delimiter != null)
+ {
+ delimiter_ = importer_.columnFamily.delimiter;
+ }
+ while ((line = bufReader.readLine()) != null)
+ {
+ StringTokenizer st = new StringTokenizer(line, delimiter_);
+ List<String> tokenList = new ArrayList<String>();
+ String key = null;
+ while (st.hasMoreElements())
+ {
+ tokenList.add((String)st.nextElement());
+ }
+ /* Construct the Key */
+ List<String> keyFields = new ArrayList<String> ();
+ for(int fieldId: importer_.key.fields.field)
+ {
+ keyFields.add(tokenList.get(fieldId));
+ }
+ key = merge(keyFields, importer_.key.combiner);
+ if(importer_.key.optimizeIt != null && !importer_.key.optimizeIt)
+ {
+ if(!checkIfProcessKey(key))
+ {
+ continue;
+ }
+ }
+ rm = rms.get(key);
+ if( rm == null)
+ {
+ rm = new RowMutation(importer_.table, key);
+ rms.put(key, rm);
+ }
+ if(importer_.columnFamily.superColumn != null)
+ {
+ List<String> superColumnList = new ArrayList<String>();
+ for(int fieldId : importer_.columnFamily.superColumn.fields.field)
+ {
+ superColumnList.add(tokenList.get(fieldId));
+ }
+ String superColumnName = merge(superColumnList, " ");
+ superColumnList.clear();
+ if(importer_.columnFamily.superColumn.tokenize)
+ {
+ Analyzer analyzer = new StandardAnalyzer();
+ TokenStream ts = analyzer.tokenStream("superColumn", new StringReader(superColumnName));
+ Token token = null;
+ token = ts.next();
+ while(token != null)
+ {
+ superColumnList.add(token.termText());
+ token = ts.next();
+ }
+ }
+ else
+ {
+ superColumnList.add(superColumnName);
+ }
+ for(String sName : superColumnList)
+ {
+ String cfName = importer_.columnFamily.name + ":" + sName;
+ if(importer_.columnFamily.column != null)
+ {
+ for(ColumnType column : importer_.columnFamily.column )
+ {
+ String cfColumn = cfName +":" + (column.name == null ? tokenList.get(column.field):column.name);
+ rm.add(cfColumn, tokenList.get(column.value.field).getBytes(), Integer.parseInt(tokenList.get(column.timestamp.field)));
+ }
+ }
+
+ }
+
+ }
+ else
+ {
+ if(importer_.columnFamily.column != null)
+ {
+ for(ColumnType column : importer_.columnFamily.column )
+ {
+ String cfColumn = importer_.columnFamily.name +":" + (column.name == null ? tokenList.get(column.field):column.name);
+ rm.add(cfColumn, tokenList.get(column.value.field).getBytes(), Integer.parseInt(tokenList.get(column.timestamp.field)));
+ }
+ }
+ }
+ }
+ // Now apply the data for all keys
+ // TODO : Add checks for large data
+ // size maybe we want to check the
+ // data size and then apply.
+ Set<String> keys = rms.keySet();
+ for(String pKey : keys)
+ {
+ rm = rms.get(pKey);
+ if( rm != null)
+ {
+ rm.apply();
+ }
+ }
+ }
+
+
+ void parseFileList(File dir)
+ {
+ int fileCount = dir.list().length;
+ for ( int i = 0 ; i < fileCount ; i++ )
+ {
+ File file = new File(dir.list()[i]);
+ if ( file.isDirectory())
+ {
+ parseFileList(file);
+ }
+ else
+ {
+ try
+ {
+ if(importer_.key.optimizeIt != null && importer_.key.optimizeIt)
+ {
+ if(checkIfProcessKey(dir.list()[i]))
+ {
+ parse(dir.listFiles()[i].getAbsolutePath());
+ }
+ }
+ else
+ {
+ parse(dir.listFiles()[i].getAbsolutePath());
+ }
+ }
+ catch ( Throwable ex )
+ {
+ logger_.error(ex.toString());
+ }
+ }
+ }
+ }
+
+ void preLoad(File rootDirectory) throws Throwable
+ {
+ String table = DatabaseDescriptor.getTables().get(0);
+ String cfName = Table.recycleBin_ + ":" + "Keys";
+ /* populate just the keys. */
+ preParse(rootDirectory, table, cfName);
+ /* dump the memtables */
+ Table.open(table).flush(false);
+ /* force a compaction of the files. */
+ Table.open(table).forceCompaction(null,null,null);
+
+ /*
+ * This is a hack to let everyone finish. Just sleep for
+ * a couple of minutes.
+ */
+ logger_.info("Taking a nap after forcing a compaction ...");
+ Thread.sleep(Loader.siesta_);
+
+ /* Figure out the keys in the index file to relocate the node */
+ List<String> ssTables = Table.open(table).getAllSSTablesOnDisk();
+ /* Load the indexes into memory */
+ for ( String df : ssTables )
+ {
+ SSTable ssTable = new SSTable(df);
+ ssTable.close();
+ }
+ /* We should have only one file since we just compacted. */
+ List<String> indexedKeys = SSTable.getIndexedKeys();
+ storageService_.relocate(indexedKeys.toArray( new String[0]) );
+
+ /*
+ * This is a hack to let everyone relocate and learn about
+ * each other. Just sleep for a couple of minutes.
+ */
+ logger_.info("Taking a nap after relocating ...");
+ Thread.sleep(Loader.siesta_);
+
+ /*
+ * Do the cleanup necessary. Delete all commit logs and
+ * the SSTables and reset the load state in the StorageService.
+ */
+ SSTable.delete(ssTables.get(0));
+// File commitLogDirectory = new File( DatabaseDescriptor.getLogFileLocation() );
+// FileUtils.delete(commitLogDirectory.listFiles());
+ storageService_.resetLoadState();
+ logger_.info("Finished all the requisite clean up ...");
+ }
+
+ void load(String xmlFile) throws Throwable
+ {
+ try
+ {
+ JAXBContext jc = JAXBContext.newInstance(this.getClass().getPackage().getName());
+ Unmarshaller u = jc.createUnmarshaller();
+ importer_ = (Importer)u.unmarshal(new FileInputStream( xmlFile ) );
+ String directory = importer_.columnFamily.directory;
+ File rootDirectory = new File(directory);
+ preLoad(rootDirectory);
+ parseFileList(rootDirectory);
+ }
+ catch (Exception e)
+ {
+ logger_.info(LogUtil.throwableToString(e));
+ }
+
+ }
+
+ /**
+ * @param args
+ */
+ public static void main(String[] args) throws Throwable
+ {
+ LogUtil.init();
+ StorageService s = StorageService.instance();
+ s.start();
+ Loader loader = new Loader(s);
+ loader.load("mbox_importer.xml");
+ }
+
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/loader/ObjectFactory.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/loader/ObjectFactory.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/loader/ObjectFactory.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/loader/ObjectFactory.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,136 @@
+//
+// This file was generated by the JavaTM Architecture for XML Binding(JAXB) Reference Implementation, vJAXB 2.1.3 in JDK 1.6
+// See <a href="http://java.sun.com/xml/jaxb">http://java.sun.com/xml/jaxb</a>
+// Any modifications to this file will be lost upon recompilation of the source schema.
+// Generated on: 2007.10.19 at 01:36:41 PM PDT
+//
+
+
+package org.apache.cassandra.loader;
+
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.annotation.XmlElementDecl;
+import javax.xml.bind.annotation.XmlRegistry;
+import javax.xml.namespace.QName;
+
+
+/**
+ * This object contains factory methods for each
+ * Java content interface and Java element interface
+ * generated in the com.facebook.infrastructure.loader package.
+ * <p>An ObjectFactory allows you to programatically
+ * construct new instances of the Java representation
+ * for XML content. The Java representation of XML
+ * content can consist of schema derived interfaces
+ * and classes representing the binding of schema
+ * type definitions, element declarations and model
+ * groups. Factory methods for each of these are
+ * provided in this class.
+ *
+ */
+@XmlRegistry
+public class ObjectFactory {
+
+ private final static QName _SuperColumn_QNAME = new QName("", "SuperColumn");
+ private final static QName _Table_QNAME = new QName("", "Table");
+ private final static QName _Column_QNAME = new QName("", "Column");
+
+ /**
+ * Create a new ObjectFactory that can be used to create new instances of schema derived classes for package: com.facebook.infrastructure.loader
+ *
+ */
+ public ObjectFactory() {
+ }
+
+ /**
+ * Create an instance of {@link KeyType }
+ *
+ */
+ public KeyType createKeyType() {
+ return new KeyType();
+ }
+
+ /**
+ * Create an instance of {@link SuperColumnType }
+ *
+ */
+ public SuperColumnType createSuperColumnType() {
+ return new SuperColumnType();
+ }
+
+ /**
+ * Create an instance of {@link ColumnType }
+ *
+ */
+ public ColumnType createColumnType() {
+ return new ColumnType();
+ }
+
+ /**
+ * Create an instance of {@link Importer }
+ *
+ */
+ public Importer createImporter() {
+ return new Importer();
+ }
+
+ /**
+ * Create an instance of {@link ColumnFamilyType }
+ *
+ */
+ public ColumnFamilyType createColumnFamilyType() {
+ return new ColumnFamilyType();
+ }
+
+ /**
+ * Create an instance of {@link TimestampType }
+ *
+ */
+ public TimestampType createTimestampType() {
+ return new TimestampType();
+ }
+
+ /**
+ * Create an instance of {@link FieldCollection }
+ *
+ */
+ public FieldCollection createFieldCollection() {
+ return new FieldCollection();
+ }
+
+ /**
+ * Create an instance of {@link ValueType }
+ *
+ */
+ public ValueType createValueType() {
+ return new ValueType();
+ }
+
+ /**
+ * Create an instance of {@link JAXBElement }{@code <}{@link SuperColumnType }{@code >}}
+ *
+ */
+ @XmlElementDecl(namespace = "", name = "SuperColumn")
+ public JAXBElement<SuperColumnType> createSuperColumn(SuperColumnType value) {
+ return new JAXBElement<SuperColumnType>(_SuperColumn_QNAME, SuperColumnType.class, null, value);
+ }
+
+ /**
+ * Create an instance of {@link JAXBElement }{@code <}{@link String }{@code >}}
+ *
+ */
+ @XmlElementDecl(namespace = "", name = "Table")
+ public JAXBElement<String> createTable(String value) {
+ return new JAXBElement<String>(_Table_QNAME, String.class, null, value);
+ }
+
+ /**
+ * Create an instance of {@link JAXBElement }{@code <}{@link ColumnType }{@code >}}
+ *
+ */
+ @XmlElementDecl(namespace = "", name = "Column")
+ public JAXBElement<ColumnType> createColumn(ColumnType value) {
+ return new JAXBElement<ColumnType>(_Column_QNAME, ColumnType.class, null, value);
+ }
+
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/loader/PreLoad.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/loader/PreLoad.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/loader/PreLoad.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/loader/PreLoad.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.loader;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
+import java.util.List;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class PreLoad
+{
+
+ private static long siesta_ = 2*60*1000;
+ private static Logger logger_ = Logger.getLogger( Loader.class );
+ private StorageService storageService_;
+
+ public PreLoad(StorageService storageService)
+ {
+ storageService_ = storageService;
+ }
+ /*
+ * This method loads all the keys into a special column family
+ * called "RecycleBin". This column family is used for temporary
+ * processing of data and then can be recycled. The idea is that
+ * after the load is complete we have all the keys in the system.
+ * Now we force a compaction and examine the single Index file
+ * that is generated to determine how the nodes need to relocate
+ * to be perfectly load balanced.
+ *
+ * param @ rootDirectory - rootDirectory at which the parsing begins.
+ * param @ table - table that will be populated.
+ * param @ cfName - name of the column that will be populated. This is
+ * passed in so that we do not unncessary allocate temporary String objects.
+ */
+ private void preParse(String rootDirectory, String table, String cfName) throws Throwable
+ {
+ BufferedReader bufReader = new BufferedReader(new InputStreamReader(
+ new FileInputStream(rootDirectory)), 16 * 1024 * 1024);
+ String line = null;
+ while ((line = bufReader.readLine()) != null)
+ {
+ String userId = line;
+ RowMutation rm = new RowMutation(table, userId);
+ rm.add(cfName, userId.getBytes(), 0);
+ rm.apply();
+ }
+ }
+
+ void run(String userFile) throws Throwable
+ {
+ String table = DatabaseDescriptor.getTables().get(0);
+ String cfName = Table.recycleBin_ + ":" + "Keys";
+ /* populate just the keys. */
+ preParse(userFile, table, cfName);
+ /* dump the memtables */
+ Table.open(table).flush(false);
+ /* force a compaction of the files. */
+ Table.open(table).forceCompaction(null, null,null);
+
+ /*
+ * This is a hack to let everyone finish. Just sleep for
+ * a couple of minutes.
+ */
+ logger_.info("Taking a nap after forcing a compaction ...");
+ Thread.sleep(PreLoad.siesta_);
+
+ /* Figure out the keys in the index file to relocate the node */
+ List<String> ssTables = Table.open(table).getAllSSTablesOnDisk();
+ /* Load the indexes into memory */
+ for ( String df : ssTables )
+ {
+ SSTable ssTable = new SSTable(df);
+ ssTable.close();
+ }
+ /* We should have only one file since we just compacted. */
+ List<String> indexedKeys = SSTable.getIndexedKeys();
+ storageService_.relocate(indexedKeys.toArray( new String[0]) );
+
+ /*
+ * This is a hack to let everyone relocate and learn about
+ * each other. Just sleep for a couple of minutes.
+ */
+ logger_.info("Taking a nap after relocating ...");
+ Thread.sleep(PreLoad.siesta_);
+
+ /*
+ * Do the cleanup necessary. Delete all commit logs and
+ * the SSTables and reset the load state in the StorageService.
+ */
+ SSTable.delete(ssTables.get(0));
+ storageService_.resetLoadState();
+ logger_.info("Finished all the requisite clean up ...");
+ }
+
+
+ /**
+ * @param args
+ */
+ public static void main(String[] args) throws Throwable
+ {
+ if(args.length != 1)
+ {
+ System.out.println("Usage: PreLoad <Fully qualified path of the file containing user names>");
+ }
+ // TODO Auto-generated method stub
+ LogUtil.init();
+ StorageService s = StorageService.instance();
+ s.start();
+ PreLoad preLoad = new PreLoad(s);
+ preLoad.run(args[0]);
+ }
+
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/loader/SuperColumnType.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/loader/SuperColumnType.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/loader/SuperColumnType.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/loader/SuperColumnType.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,97 @@
+//
+// This file was generated by the JavaTM Architecture for XML Binding(JAXB) Reference Implementation, vJAXB 2.1.3 in JDK 1.6
+// See <a href="http://java.sun.com/xml/jaxb">http://java.sun.com/xml/jaxb</a>
+// Any modifications to this file will be lost upon recompilation of the source schema.
+// Generated on: 2007.10.19 at 01:36:41 PM PDT
+//
+
+
+package org.apache.cassandra.loader;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlType;
+
+
+/**
+ * <p>Java class for SuperColumnType complex type.
+ *
+ * <p>The following schema fragment specifies the expected content contained within this class.
+ *
+ * <pre>
+ * <complexType name="SuperColumnType">
+ * <complexContent>
+ * <restriction base="{http://www.w3.org/2001/XMLSchema}anyType">
+ * <sequence>
+ * <element name="Fields" type="{}FieldCollection"/>
+ * </sequence>
+ * <attribute name="Tokenize" type="{http://www.w3.org/2001/XMLSchema}boolean" />
+ * </restriction>
+ * </complexContent>
+ * </complexType>
+ * </pre>
+ *
+ *
+ */
+@XmlAccessorType(XmlAccessType.FIELD)
+@XmlType(name = "SuperColumnType", propOrder = {
+ "fields"
+})
+public class SuperColumnType {
+
+ @XmlElement(name = "Fields", required = true)
+ protected FieldCollection fields;
+ @XmlAttribute(name = "Tokenize")
+ protected Boolean tokenize;
+
+ /**
+ * Gets the value of the fields property.
+ *
+ * @return
+ * possible object is
+ * {@link FieldCollection }
+ *
+ */
+ public FieldCollection getFields() {
+ return fields;
+ }
+
+ /**
+ * Sets the value of the fields property.
+ *
+ * @param value
+ * allowed object is
+ * {@link FieldCollection }
+ *
+ */
+ public void setFields(FieldCollection value) {
+ this.fields = value;
+ }
+
+ /**
+ * Gets the value of the tokenize property.
+ *
+ * @return
+ * possible object is
+ * {@link Boolean }
+ *
+ */
+ public Boolean isTokenize() {
+ return tokenize;
+ }
+
+ /**
+ * Sets the value of the tokenize property.
+ *
+ * @param value
+ * allowed object is
+ * {@link Boolean }
+ *
+ */
+ public void setTokenize(Boolean value) {
+ this.tokenize = value;
+ }
+
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/loader/TimestampType.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/loader/TimestampType.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/loader/TimestampType.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/loader/TimestampType.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,65 @@
+//
+// This file was generated by the JavaTM Architecture for XML Binding(JAXB) Reference Implementation, vJAXB 2.1.3 in JDK 1.6
+// See <a href="http://java.sun.com/xml/jaxb">http://java.sun.com/xml/jaxb</a>
+// Any modifications to this file will be lost upon recompilation of the source schema.
+// Generated on: 2007.10.19 at 01:36:41 PM PDT
+//
+
+
+package org.apache.cassandra.loader;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlType;
+
+
+/**
+ * <p>Java class for TimestampType complex type.
+ *
+ * <p>The following schema fragment specifies the expected content contained within this class.
+ *
+ * <pre>
+ * <complexType name="TimestampType">
+ * <complexContent>
+ * <restriction base="{http://www.w3.org/2001/XMLSchema}anyType">
+ * <attribute name="Field" type="{http://www.w3.org/2001/XMLSchema}int" />
+ * </restriction>
+ * </complexContent>
+ * </complexType>
+ * </pre>
+ *
+ *
+ */
+@XmlAccessorType(XmlAccessType.FIELD)
+@XmlType(name = "TimestampType")
+public class TimestampType {
+
+ @XmlAttribute(name = "Field")
+ protected Integer field;
+
+ /**
+ * Gets the value of the field property.
+ *
+ * @return
+ * possible object is
+ * {@link Integer }
+ *
+ */
+ public Integer getField() {
+ return field;
+ }
+
+ /**
+ * Sets the value of the field property.
+ *
+ * @param value
+ * allowed object is
+ * {@link Integer }
+ *
+ */
+ public void setField(Integer value) {
+ this.field = value;
+ }
+
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/loader/ValueType.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/loader/ValueType.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/loader/ValueType.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/loader/ValueType.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,65 @@
+//
+// This file was generated by the JavaTM Architecture for XML Binding(JAXB) Reference Implementation, vJAXB 2.1.3 in JDK 1.6
+// See <a href="http://java.sun.com/xml/jaxb">http://java.sun.com/xml/jaxb</a>
+// Any modifications to this file will be lost upon recompilation of the source schema.
+// Generated on: 2007.10.19 at 01:36:41 PM PDT
+//
+
+
+package org.apache.cassandra.loader;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlType;
+
+
+/**
+ * <p>Java class for ValueType complex type.
+ *
+ * <p>The following schema fragment specifies the expected content contained within this class.
+ *
+ * <pre>
+ * <complexType name="ValueType">
+ * <complexContent>
+ * <restriction base="{http://www.w3.org/2001/XMLSchema}anyType">
+ * <attribute name="Field" type="{http://www.w3.org/2001/XMLSchema}int" />
+ * </restriction>
+ * </complexContent>
+ * </complexType>
+ * </pre>
+ *
+ *
+ */
+@XmlAccessorType(XmlAccessType.FIELD)
+@XmlType(name = "ValueType")
+public class ValueType {
+
+ @XmlAttribute(name = "Field")
+ protected Integer field;
+
+ /**
+ * Gets the value of the field property.
+ *
+ * @return
+ * possible object is
+ * {@link Integer }
+ *
+ */
+ public Integer getField() {
+ return field;
+ }
+
+ /**
+ * Sets the value of the field property.
+ *
+ * @param value
+ * allowed object is
+ * {@link Integer }
+ *
+ */
+ public void setField(Integer value) {
+ this.field = value;
+ }
+
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/locator/AbstractStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/locator/AbstractStrategy.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/locator/AbstractStrategy.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/locator/AbstractStrategy.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,112 @@
+package org.apache.cassandra.locator;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.service.StorageService;
+import org.apache.log4j.Logger;
+
+/**
+ * This class contains a helper method that will be used by
+ * all abstraction that implement the IReplicaPlacementStrategy
+ * interface.
+*/
+public abstract class AbstractStrategy implements IReplicaPlacementStrategy
+{
+ protected static Logger logger_ = Logger.getLogger(AbstractStrategy.class);
+
+ protected TokenMetadata tokenMetadata_;
+
+ AbstractStrategy(TokenMetadata tokenMetadata)
+ {
+ tokenMetadata_ = tokenMetadata;
+ }
+
+ /*
+ * This method changes the ports of the endpoints from
+ * the control port to the storage ports.
+ */
+ protected void retrofitPorts(List<EndPoint> eps)
+ {
+ for ( EndPoint ep : eps )
+ {
+ ep.setPort(DatabaseDescriptor.getStoragePort());
+ }
+ }
+
+ protected EndPoint getNextAvailableEndPoint(EndPoint startPoint, List<EndPoint> topN, List<EndPoint> liveNodes)
+ {
+ EndPoint endPoint = null;
+ Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+ List<BigInteger> tokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
+ Collections.sort(tokens);
+ BigInteger token = tokenMetadata_.getToken(startPoint);
+ int index = Collections.binarySearch(tokens, token);
+ if(index < 0)
+ {
+ index = (index + 1) * (-1);
+ if (index >= tokens.size())
+ index = 0;
+ }
+ int totalNodes = tokens.size();
+ int startIndex = (index+1)%totalNodes;
+ for (int i = startIndex, count = 1; count < totalNodes ; ++count, i = (i+1)%totalNodes)
+ {
+ EndPoint tmpEndPoint = tokenToEndPointMap.get(tokens.get(i));
+ if(FailureDetector.instance().isAlive(tmpEndPoint) && !topN.contains(tmpEndPoint) && !liveNodes.contains(tmpEndPoint))
+ {
+ endPoint = tmpEndPoint;
+ break;
+ }
+ }
+ return endPoint;
+ }
+
+ /*
+ * This method returns the hint map. The key is the endpoint
+ * on which the data is being placed and the value is the
+ * endpoint which is in the top N.
+ * Get the map of top N to the live nodes currently.
+ */
+ public Map<EndPoint, EndPoint> getHintedStorageEndPoints(BigInteger token)
+ {
+ List<EndPoint> liveList = new ArrayList<EndPoint>();
+ Map<EndPoint, EndPoint> map = new HashMap<EndPoint, EndPoint>();
+ EndPoint[] topN = getStorageEndPoints( token );
+
+ for( int i = 0 ; i < topN.length ; i++)
+ {
+ if( FailureDetector.instance().isAlive(topN[i]))
+ {
+ map.put(topN[i], topN[i]);
+ liveList.add(topN[i]) ;
+ }
+ else
+ {
+ EndPoint endPoint = getNextAvailableEndPoint(topN[i], Arrays.asList(topN), liveList);
+ if(endPoint != null)
+ {
+ map.put(endPoint, topN[i]);
+ liveList.add(endPoint) ;
+ }
+ else
+ {
+ // log a warning , maybe throw an exception
+ logger_.warn("Unable to find a live Endpoint we might be out of live nodes , This is dangerous !!!!");
+ }
+ }
+ }
+ return map;
+ }
+
+ public abstract EndPoint[] getStorageEndPoints(BigInteger token);
+
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/locator/EndPointSnitch.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/locator/EndPointSnitch.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/locator/EndPointSnitch.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/locator/EndPointSnitch.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import java.net.*;
+
+import org.apache.cassandra.net.EndPoint;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class EndPointSnitch implements IEndPointSnitch
+{
+ public boolean isOnSameRack(EndPoint host, EndPoint host2) throws UnknownHostException
+ {
+ /*
+ * Look at the IP Address of the two hosts. Compare
+ * the 3rd octet. If they are the same then the hosts
+ * are in the same rack else different racks.
+ */
+ byte[] ip = getIPAddress(host.getHost());
+ byte[] ip2 = getIPAddress(host2.getHost());
+
+ return ( ip[2] == ip2[2] );
+ }
+
+ public boolean isInSameDataCenter(EndPoint host, EndPoint host2) throws UnknownHostException
+ {
+ /*
+ * Look at the IP Address of the two hosts. Compare
+ * the 2nd octet. If they are the same then the hosts
+ * are in the same datacenter else different datacenter.
+ */
+ byte[] ip = getIPAddress(host.getHost());
+ byte[] ip2 = getIPAddress(host2.getHost());
+
+ return ( ip[1] == ip2[1] );
+ }
+
+ private byte[] getIPAddress(String host) throws UnknownHostException
+ {
+ InetAddress ia = InetAddress.getByName(host);
+ return ia.getAddress();
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/locator/IEndPointSnitch.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/locator/IEndPointSnitch.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/locator/IEndPointSnitch.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/locator/IEndPointSnitch.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import java.net.UnknownHostException;
+
+import org.apache.cassandra.net.EndPoint;
+
+
+/**
+ * This interface helps determine location of node in the data center relative to another node.
+ * Give a node A and another node B it can tell if A and B are on the same rack or in the same
+ * data center.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IEndPointSnitch
+{
+ /**
+ * Helps determine if 2 nodes are in the same rack in the data center.
+ * @param host a specified endpoint
+ * @param host2 another specified endpoint
+ * @return true if on the same rack false otherwise
+ * @throws UnknownHostException
+ */
+ public boolean isOnSameRack(EndPoint host, EndPoint host2) throws UnknownHostException;
+
+ /**
+ * Helps determine if 2 nodes are in the same data center.
+ * @param host a specified endpoint
+ * @param host2 another specified endpoint
+ * @return true if in the same data center false otherwise
+ * @throws UnknownHostException
+ */
+ public boolean isInSameDataCenter(EndPoint host, EndPoint host2) throws UnknownHostException;
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/locator/IReplicaPlacementStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/locator/IReplicaPlacementStrategy.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/locator/IReplicaPlacementStrategy.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/locator/IReplicaPlacementStrategy.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import java.math.BigInteger;
+import java.util.Map;
+
+import org.apache.cassandra.net.EndPoint;
+
+
+/*
+ * This interface has two implementations. One which
+ * does not respect rack or datacenter awareness and
+ * the other which does.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+public interface IReplicaPlacementStrategy
+{
+ public EndPoint[] getStorageEndPoints(BigInteger token);
+ public Map<String, EndPoint[]> getStorageEndPoints(String[] keys);
+ public EndPoint[] getStorageEndPoints(BigInteger token, Map<BigInteger, EndPoint> tokenToEndPointMap);
+ public Map<EndPoint, EndPoint> getHintedStorageEndPoints(BigInteger token);
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackAwareStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackAwareStrategy.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackAwareStrategy.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackAwareStrategy.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,205 @@
+package org.apache.cassandra.locator;
+
+import java.math.BigInteger;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+
+
+/*
+ * This class returns the nodes responsible for a given
+ * key but does respects rack awareness. It makes a best
+ * effort to get a node from a different data center and
+ * a node in a different rack in the same datacenter as
+ * the primary.
+ */
+public class RackAwareStrategy extends AbstractStrategy
+{
+ public RackAwareStrategy(TokenMetadata tokenMetadata)
+ {
+ super(tokenMetadata);
+ }
+
+ public EndPoint[] getStorageEndPoints(BigInteger token)
+ {
+ int startIndex = 0 ;
+ List<EndPoint> list = new ArrayList<EndPoint>();
+ boolean bDataCenter = false;
+ boolean bOtherRack = false;
+ int foundCount = 0;
+ int N = DatabaseDescriptor.getReplicationFactor();
+ Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+ List<BigInteger> tokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
+ Collections.sort(tokens);
+ int index = Collections.binarySearch(tokens, token);
+ if(index < 0)
+ {
+ index = (index + 1) * (-1);
+ if (index >= tokens.size())
+ index = 0;
+ }
+ int totalNodes = tokens.size();
+ // Add the node at the index by default
+ list.add(tokenToEndPointMap.get(tokens.get(index)));
+ foundCount++;
+ if( N == 1 )
+ {
+ return list.toArray(new EndPoint[0]);
+ }
+ startIndex = (index + 1)%totalNodes;
+ IEndPointSnitch endPointSnitch = StorageService.instance().getEndPointSnitch();
+
+ for (int i = startIndex, count = 1; count < totalNodes && foundCount < N; ++count, i = (i+1)%totalNodes)
+ {
+ try
+ {
+ // First try to find one in a different data center
+ if(!endPointSnitch.isInSameDataCenter(tokenToEndPointMap.get(tokens.get(index)), tokenToEndPointMap.get(tokens.get(i))))
+ {
+ // If we have already found something in a diff datacenter no need to find another
+ if( !bDataCenter )
+ {
+ list.add(tokenToEndPointMap.get(tokens.get(i)));
+ bDataCenter = true;
+ foundCount++;
+ }
+ continue;
+ }
+ // Now try to find one on a different rack
+ if(!endPointSnitch.isOnSameRack(tokenToEndPointMap.get(tokens.get(index)), tokenToEndPointMap.get(tokens.get(i))) &&
+ endPointSnitch.isInSameDataCenter(tokenToEndPointMap.get(tokens.get(index)), tokenToEndPointMap.get(tokens.get(i))))
+ {
+ // If we have already found something in a diff rack no need to find another
+ if( !bOtherRack )
+ {
+ list.add(tokenToEndPointMap.get(tokens.get(i)));
+ bOtherRack = true;
+ foundCount++;
+ }
+ continue;
+ }
+ }
+ catch (UnknownHostException e)
+ {
+ logger_.debug(LogUtil.throwableToString(e));
+ }
+
+ }
+ // If we found N number of nodes we are good. This loop wil just exit. Otherwise just
+ // loop through the list and add until we have N nodes.
+ for (int i = startIndex, count = 1; count < totalNodes && foundCount < N; ++count, i = (i+1)%totalNodes)
+ {
+ if( ! list.contains(tokenToEndPointMap.get(tokens.get(i))))
+ {
+ list.add(tokenToEndPointMap.get(tokens.get(i)));
+ foundCount++;
+ continue;
+ }
+ }
+ retrofitPorts(list);
+ return list.toArray(new EndPoint[0]);
+ }
+
+ public Map<String, EndPoint[]> getStorageEndPoints(String[] keys)
+ {
+ Map<String, EndPoint[]> results = new HashMap<String, EndPoint[]>();
+ List<EndPoint> list = new ArrayList<EndPoint>();
+ int startIndex = 0 ;
+ int foundCount = 0;
+ boolean bDataCenter = false;
+ boolean bOtherRack = false;
+
+ Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+ int N = DatabaseDescriptor.getReplicationFactor();
+ List<BigInteger> tokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
+ Collections.sort(tokens);
+
+ for ( String key : keys )
+ {
+ BigInteger token = StorageService.hash(key);
+ int index = Collections.binarySearch(tokens, token);
+ if(index < 0)
+ {
+ index = (index + 1) * (-1);
+ if (index >= tokens.size())
+ index = 0;
+ }
+ int totalNodes = tokens.size();
+ // Add the node at the index by default
+ list.add(tokenToEndPointMap.get(tokens.get(index)));
+ foundCount++;
+ if( N == 1 )
+ {
+ results.put( key, list.toArray(new EndPoint[0]) );
+ return results;
+ }
+ startIndex = (index + 1)%totalNodes;
+ IEndPointSnitch endPointSnitch = StorageService.instance().getEndPointSnitch();
+
+ for (int i = startIndex, count = 1; count < totalNodes && foundCount < N; ++count, i = (i+1)%totalNodes)
+ {
+ try
+ {
+ // First try to find one in a different data center
+ if(!endPointSnitch.isInSameDataCenter(tokenToEndPointMap.get(tokens.get(index)), tokenToEndPointMap.get(tokens.get(i))))
+ {
+ // If we have already found something in a diff datacenter no need to find another
+ if( !bDataCenter )
+ {
+ list.add(tokenToEndPointMap.get(tokens.get(i)));
+ bDataCenter = true;
+ foundCount++;
+ }
+ continue;
+ }
+ // Now try to find one on a different rack
+ if(!endPointSnitch.isOnSameRack(tokenToEndPointMap.get(tokens.get(index)), tokenToEndPointMap.get(tokens.get(i))) &&
+ endPointSnitch.isInSameDataCenter(tokenToEndPointMap.get(tokens.get(index)), tokenToEndPointMap.get(tokens.get(i))))
+ {
+ // If we have already found something in a diff rack no need to find another
+ if( !bOtherRack )
+ {
+ list.add(tokenToEndPointMap.get(tokens.get(i)));
+ bOtherRack = true;
+ foundCount++;
+ }
+ continue;
+ }
+ }
+ catch (UnknownHostException e)
+ {
+ logger_.debug(LogUtil.throwableToString(e));
+ }
+
+ }
+ // If we found N number of nodes we are good. This loop wil just exit. Otherwise just
+ // loop through the list and add until we have N nodes.
+ for (int i = startIndex, count = 1; count < totalNodes && foundCount < N; ++count, i = (i+1)%totalNodes)
+ {
+ if( ! list.contains(tokenToEndPointMap.get(tokens.get(i))))
+ {
+ list.add(tokenToEndPointMap.get(tokens.get(i)));
+ foundCount++;
+ continue;
+ }
+ }
+ retrofitPorts(list);
+ results.put(key, list.toArray(new EndPoint[0]));
+ }
+
+ return results;
+ }
+
+ public EndPoint[] getStorageEndPoints(BigInteger token, Map<BigInteger, EndPoint> tokenToEndPointMap)
+ {
+ throw new UnsupportedOperationException("This operation is not currently supported");
+ }
+}
\ No newline at end of file
Added: incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackUnawareStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackUnawareStrategy.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackUnawareStrategy.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackUnawareStrategy.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,154 @@
+package org.apache.cassandra.locator;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.service.StorageService;
+
+
+/**
+ * This class returns the nodes responsible for a given
+ * key but does not respect rack awareness. Basically
+ * returns the 3 nodes that lie right next to each other
+ * on the ring.
+ */
+public class RackUnawareStrategy extends AbstractStrategy
+{
+ /* Use this flag to check if initialization is in order. */
+ private AtomicBoolean initialized_ = new AtomicBoolean(false);
+ private Map<Range, List<EndPoint>> rangeToEndPointMap_;
+
+ public RackUnawareStrategy(TokenMetadata tokenMetadata)
+ {
+ super(tokenMetadata);
+ }
+
+ public EndPoint[] getStorageEndPoints(BigInteger token)
+ {
+ return getStorageEndPoints(token, tokenMetadata_.cloneTokenEndPointMap());
+ }
+
+ public EndPoint[] getStorageEndPoints(BigInteger token, Map<BigInteger, EndPoint> tokenToEndPointMap)
+ {
+ int startIndex = 0 ;
+ List<EndPoint> list = new ArrayList<EndPoint>();
+ int foundCount = 0;
+ int N = DatabaseDescriptor.getReplicationFactor();
+ List<BigInteger> tokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
+ Collections.sort(tokens);
+ int index = Collections.binarySearch(tokens, token);
+ if(index < 0)
+ {
+ index = (index + 1) * (-1);
+ if (index >= tokens.size())
+ index = 0;
+ }
+ int totalNodes = tokens.size();
+ // Add the node at the index by default
+ list.add(tokenToEndPointMap.get(tokens.get(index)));
+ foundCount++;
+ startIndex = (index + 1)%totalNodes;
+ // If we found N number of nodes we are good. This loop will just exit. Otherwise just
+ // loop through the list and add until we have N nodes.
+ for (int i = startIndex, count = 1; count < totalNodes && foundCount < N; ++count, i = (i+1)%totalNodes)
+ {
+ if( ! list.contains(tokenToEndPointMap.get(tokens.get(i))))
+ {
+ list.add(tokenToEndPointMap.get(tokens.get(i)));
+ foundCount++;
+ continue;
+ }
+ }
+ retrofitPorts(list);
+ return list.toArray(new EndPoint[0]);
+ }
+
+ private void doInitialization()
+ {
+ if ( !initialized_.get() )
+ {
+ /* construct the mapping from the ranges to the replicas responsible for them */
+ rangeToEndPointMap_ = StorageService.instance().getRangeToEndPointMap();
+ initialized_.set(true);
+ }
+ }
+
+ /**
+ * This method determines which range in the array actually contains
+ * the hash of the key
+ * @param ranges
+ * @param key
+ * @return
+ */
+ private int findRangeIndexForKey(Range[] ranges, String key)
+ {
+ int index = 0;
+ BigInteger hash = StorageService.hash(key);
+ for ( int i = 0; i < ranges.length; ++i )
+ {
+ if ( ranges[i].contains(hash) )
+ {
+ index = i;
+ break;
+ }
+ }
+
+ return index;
+ }
+
+ public Map<String, EndPoint[]> getStorageEndPoints(String[] keys)
+ {
+ Arrays.sort(keys);
+ Range[] ranges = StorageService.instance().getAllRanges();
+
+ Map<String, EndPoint[]> results = new HashMap<String, EndPoint[]>();
+ List<EndPoint> list = new ArrayList<EndPoint>();
+ int startIndex = 0 ;
+ int foundCount = 0;
+
+ Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+ int N = DatabaseDescriptor.getReplicationFactor();
+ List<BigInteger> tokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
+ Collections.sort(tokens);
+ for ( String key : keys )
+ {
+ BigInteger token = StorageService.hash(key);
+ int index = Collections.binarySearch(tokens, token);
+ if(index < 0)
+ {
+ index = (index + 1) * (-1);
+ if (index >= tokens.size())
+ index = 0;
+ }
+ int totalNodes = tokens.size();
+ // Add the node at the index by default
+ list.add(tokenToEndPointMap.get(tokens.get(index)));
+ foundCount++;
+ startIndex = (index + 1)%totalNodes;
+ // If we found N number of nodes we are good. This loop will just exit. Otherwise just
+ // loop through the list and add until we have N nodes.
+ for (int i = startIndex, count = 1; count < totalNodes && foundCount < N; ++count, i = (i+1)%totalNodes)
+ {
+ if( ! list.contains(tokenToEndPointMap.get(tokens.get(i))))
+ {
+ list.add(tokenToEndPointMap.get(tokens.get(i)));
+ foundCount++;
+ continue;
+ }
+ }
+ retrofitPorts(list);
+ results.put(key, list.toArray(new EndPoint[0]));
+ }
+
+ return results;
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/locator/TokenMetadata.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/locator/TokenMetadata.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/locator/TokenMetadata.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/locator/TokenMetadata.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.CompactEndPointSerializationHelper;
+import org.apache.cassandra.net.EndPoint;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class TokenMetadata
+{
+ private static ICompactSerializer<TokenMetadata> serializer_ = new TokenMetadataSerializer();
+
+ public static ICompactSerializer<TokenMetadata> serializer()
+ {
+ return serializer_;
+ }
+
+ /* Maintains token to endpoint map of every node in the cluster. */
+ private Map<BigInteger, EndPoint> tokenToEndPointMap_ = new HashMap<BigInteger, EndPoint>();
+ /* Maintains a reverse index of endpoint to token in the cluster. */
+ private Map<EndPoint, BigInteger> endPointToTokenMap_ = new HashMap<EndPoint, BigInteger>();
+
+ /* Use this lock for manipulating the token map */
+ private ReadWriteLock lock_ = new ReentrantReadWriteLock(true);
+
+ /*
+ * For JAXB purposes.
+ */
+ public TokenMetadata()
+ {
+ }
+
+ protected TokenMetadata(Map<BigInteger, EndPoint> tokenToEndPointMap, Map<EndPoint, BigInteger> endPointToTokenMap)
+ {
+ tokenToEndPointMap_ = tokenToEndPointMap;
+ endPointToTokenMap_ = endPointToTokenMap;
+ }
+
+ public TokenMetadata cloneMe()
+ {
+ Map<BigInteger, EndPoint> tokenToEndPointMap = cloneTokenEndPointMap();
+ Map<EndPoint, BigInteger> endPointToTokenMap = cloneEndPointTokenMap();
+ return new TokenMetadata( tokenToEndPointMap, endPointToTokenMap );
+ }
+
+ /**
+ * Update the two maps in an safe mode.
+ */
+ public void update(BigInteger token, EndPoint endpoint)
+ {
+ lock_.writeLock().lock();
+ try
+ {
+ BigInteger oldToken = endPointToTokenMap_.get(endpoint);
+ if ( oldToken != null )
+ tokenToEndPointMap_.remove(oldToken);
+ tokenToEndPointMap_.put(token, endpoint);
+ endPointToTokenMap_.put(endpoint, token);
+ }
+ finally
+ {
+ lock_.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Remove the entries in the two maps.
+ * @param endpoint
+ */
+ public void remove(EndPoint endpoint)
+ {
+ lock_.writeLock().lock();
+ try
+ {
+ BigInteger oldToken = endPointToTokenMap_.get(endpoint);
+ if ( oldToken != null )
+ tokenToEndPointMap_.remove(oldToken);
+ endPointToTokenMap_.remove(endpoint);
+ }
+ finally
+ {
+ lock_.writeLock().unlock();
+ }
+ }
+
+ public BigInteger getToken(EndPoint endpoint)
+ {
+ lock_.readLock().lock();
+ try
+ {
+ return endPointToTokenMap_.get(endpoint);
+ }
+ finally
+ {
+ lock_.readLock().unlock();
+ }
+ }
+
+ public boolean isKnownEndPoint(EndPoint ep)
+ {
+ lock_.readLock().lock();
+ try
+ {
+ return endPointToTokenMap_.containsKey(ep);
+ }
+ finally
+ {
+ lock_.readLock().unlock();
+ }
+ }
+
+ /*
+ * Returns a safe clone of tokenToEndPointMap_.
+ */
+ public Map<BigInteger, EndPoint> cloneTokenEndPointMap()
+ {
+ lock_.readLock().lock();
+ try
+ {
+ return new HashMap<BigInteger, EndPoint>( tokenToEndPointMap_ );
+ }
+ finally
+ {
+ lock_.readLock().unlock();
+ }
+ }
+
+ /*
+ * Returns a safe clone of endPointTokenMap_.
+ */
+ public Map<EndPoint, BigInteger> cloneEndPointTokenMap()
+ {
+ lock_.readLock().lock();
+ try
+ {
+ return new HashMap<EndPoint, BigInteger>( endPointToTokenMap_ );
+ }
+ finally
+ {
+ lock_.readLock().unlock();
+ }
+ }
+
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ Set<EndPoint> eps = endPointToTokenMap_.keySet();
+
+ for ( EndPoint ep : eps )
+ {
+ sb.append(ep);
+ sb.append(":");
+ sb.append(endPointToTokenMap_.get(ep));
+ sb.append(System.getProperty("line.separator"));
+ }
+
+ return sb.toString();
+ }
+}
+
+class TokenMetadataSerializer implements ICompactSerializer<TokenMetadata>
+{
+ public void serialize(TokenMetadata tkMetadata, DataOutputStream dos) throws IOException
+ {
+ Map<BigInteger, EndPoint> tokenToEndPointMap = tkMetadata.cloneTokenEndPointMap();
+ Set<BigInteger> tokens = tokenToEndPointMap.keySet();
+ /* write the size */
+ dos.writeInt(tokens.size());
+ for ( BigInteger token : tokens )
+ {
+ byte[] bytes = token.toByteArray();
+ /* Convert the BigInteger to byte[] and persist */
+ dos.writeInt(bytes.length);
+ dos.write(bytes);
+ /* Write the endpoint out */
+ CompactEndPointSerializationHelper.serialize(tokenToEndPointMap.get(token), dos);
+ }
+ }
+
+ public TokenMetadata deserialize(DataInputStream dis) throws IOException
+ {
+ TokenMetadata tkMetadata = null;
+ int size = dis.readInt();
+
+ if ( size > 0 )
+ {
+ Map<BigInteger, EndPoint> tokenToEndPointMap = new HashMap<BigInteger, EndPoint>();
+ Map<EndPoint, BigInteger> endPointToTokenMap = new HashMap<EndPoint, BigInteger>();
+
+ for ( int i = 0; i < size; ++i )
+ {
+ /* Read the byte[] and convert to BigInteger */
+ byte[] bytes = new byte[dis.readInt()];
+ dis.readFully(bytes);
+ BigInteger token = new BigInteger(bytes);
+ /* Read the endpoint out */
+ EndPoint endpoint = CompactEndPointSerializationHelper.deserialize(dis);
+ tokenToEndPointMap.put(token, endpoint);
+ endPointToTokenMap.put(endpoint, token);
+ }
+
+ tkMetadata = new TokenMetadata( tokenToEndPointMap, endPointToTokenMap );
+ }
+
+ return tkMetadata;
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/AsyncResult.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/AsyncResult.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/AsyncResult.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/AsyncResult.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.util.List;
+import java.util.Hashtable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.service.QuorumResponseHandler;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class AsyncResult implements IAsyncResult
+{
+ private static Logger logger_ = Logger.getLogger( AsyncResult.class );
+ private Object[] result_ = new Object[0];
+ private AtomicBoolean done_ = new AtomicBoolean(false);
+ private Lock lock_ = new ReentrantLock();
+ private Condition condition_;
+
+ public AsyncResult()
+ {
+ condition_ = lock_.newCondition();
+ }
+
+ public Object[] get()
+ {
+ lock_.lock();
+ try
+ {
+ if ( !done_.get() )
+ {
+ condition_.await();
+ }
+ }
+ catch ( InterruptedException ex )
+ {
+ logger_.warn( LogUtil.throwableToString(ex) );
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ return result_;
+ }
+
+ public boolean isDone()
+ {
+ return done_.get();
+ }
+
+ public Object[] get(long timeout, TimeUnit tu) throws TimeoutException
+ {
+ lock_.lock();
+ try
+ {
+ boolean bVal = true;
+ try
+ {
+ if ( !done_.get() )
+ {
+ bVal = condition_.await(timeout, tu);
+ }
+ }
+ catch ( InterruptedException ex )
+ {
+ logger_.warn( LogUtil.throwableToString(ex) );
+ }
+
+ if ( !bVal && !done_.get() )
+ {
+ throw new TimeoutException("Operation timed out.");
+ }
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ return result_;
+ }
+
+ void result(Object[] result)
+ {
+ try
+ {
+ lock_.lock();
+ if ( !done_.get() )
+ {
+ result_ = result;
+ done_.set(true);
+ condition_.signal();
+ }
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/CompactEndPointSerializationHelper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/CompactEndPointSerializationHelper.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/CompactEndPointSerializationHelper.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/CompactEndPointSerializationHelper.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.*;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class CompactEndPointSerializationHelper
+{
+ public static void serialize(EndPoint endPoint, DataOutputStream dos) throws IOException
+ {
+ dos.write(EndPoint.toBytes(endPoint));
+ }
+
+ public static EndPoint deserialize(DataInputStream dis) throws IOException
+ {
+ byte[] bytes = new byte[6];
+ dis.readFully(bytes, 0, bytes.length);
+ return EndPoint.fromBytes(bytes);
+ }
+
+ private static byte[] getIPAddress(String host) throws UnknownHostException
+ {
+ InetAddress ia = InetAddress.getByName(host);
+ return ia.getAddress();
+ }
+
+ private static String getHostName(byte[] ipAddr) throws UnknownHostException
+ {
+ InetAddress ia = InetAddress.getByAddress(ipAddr);
+ return ia.getCanonicalHostName();
+ }
+
+ public static void main(String[] args) throws Throwable
+ {
+ EndPoint ep = new EndPoint(7000);
+ byte[] bytes = EndPoint.toBytes(ep);
+ System.out.println(bytes.length);
+ EndPoint ep2 = EndPoint.fromBytes(bytes);
+ System.out.println(ep2);
+ }
+}
\ No newline at end of file
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/ConnectionStatistics.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/ConnectionStatistics.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/ConnectionStatistics.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/ConnectionStatistics.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class ConnectionStatistics
+{
+ private String localHost;
+ private int localPort;
+ private String remoteHost;
+ private int remotePort;
+ private int totalConnections;
+ private int connectionsInUse;
+
+ ConnectionStatistics(EndPoint localEp, EndPoint remoteEp, int tc, int ciu)
+ {
+ localHost = localEp.getHost();
+ localPort = localEp.getPort();
+ remoteHost = remoteEp.getHost();
+ remotePort = remoteEp.getPort();
+ totalConnections = tc;
+ connectionsInUse = ciu;
+ }
+
+ public String getLocalHost()
+ {
+ return localHost;
+ }
+
+ public int getLocalPort()
+ {
+ return localPort;
+ }
+
+ public String getRemoteHost()
+ {
+ return remoteHost;
+ }
+
+ public int getRemotePort()
+ {
+ return remotePort;
+ }
+
+ public int getTotalConnections()
+ {
+ return totalConnections;
+ }
+
+ public int getConnectionInUse()
+ {
+ return connectionsInUse;
+ }
+
+ public String toString()
+ {
+ return localHost + ":" + localPort + "->" + remoteHost + ":" + remotePort + " Total Connections open : " + totalConnections + " Connections in use : " + connectionsInUse;
+ }
+}
\ No newline at end of file
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/EndPoint.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/EndPoint.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/EndPoint.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/EndPoint.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1 @@
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.net;
import java.io.IOException;
import java.io.Serializable;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.u
til.HashMap;
import java.util.Map;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.LogUtil;
import org.apache.log4j.Logger;
/**
* Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
*/
public class EndPoint implements Serializable, Comparable<EndPoint>
{
// logging and profiling.
private static Logger logger_ = Logger.getLogger(EndPoint.class);
private static final long serialVersionUID = -4962625949179835907L;
private static Map<CharBuffer, String> hostNames_ = new HashMap<CharBuffer, String>();
protected static final int randomPort_ = 5555;
public static EndPoint randomLocalEndPoint_;
static
{
try
{
randomLocalEndPoint_ = new EndPoint(FBUtilities.getHostName(), EndPoint.randomPort_);
}
catch ( IOException ex )
{
logger_.warn(LogUtil.throwableToString(ex));
}
}
private String host_;
pri
vate int port_;
private transient InetSocketAddress ia_;
/* Ctor for JAXB. DO NOT DELETE */
private EndPoint()
{
}
public EndPoint(String host, int port)
{
/*
* Attempts to resolve the host, but does not fail if it cannot.
*/
host_ = host;
port_ = port;
}
// create a local endpoint id
public EndPoint(int port)
{
try
{
host_ = FBUtilities.getLocalHostName();
port_ = port;
}
catch (UnknownHostException e)
{
logger_.warn(LogUtil.throwableToString(e));
}
}
public String getHost()
{
return host_;
}
public int getPort()
{
return port_;
}
public void setPort(int port)
{
port_ = port;
}
public InetSocketAddress getInetAddress()
{
if (ia_ == null || ia_.isUnresolved())
{
ia_ = new InetSocketAddress(host_, port_);
}
return ia_;
}
public boolean equals(Object o)
{
if (!(o instanceof EndPoint))
return false;
EndPoint rhs = (EndPoint) o;
return (host_.equals(rhs.host_) && port_ == rhs.port_);
}
public int hashCode()
{
return (host_ + port_).hashCode();
}
public int compareTo(EndPoint rhs)
{
return host_.compareTo(rhs.host_);
}
public String toString()
{
return (host_ + ":" + port_);
}
public static EndPoint fromString(String str)
{
String[] values = str.split(":");
return new EndPoint(values[0], Integer.parseInt(values[1]));
}
public static byte[] toBytes(EndPoint ep)
{
ByteBuffer buffer = ByteBuffer.allocate(6);
byte[] iaBytes = ep.getInetAddress().getAddress().getAddress();
buffer.put(iaBytes);
buffer.put(MessagingService.toByteArray((short) ep.getPort()));
buffer.flip();
return buffer.array();
}
public static EndPoint fromBytes(byte[] bytes)
{
ByteBuffer buffer = ByteBuffer.allocate(4);
System.arraycopy(bytes, 0, buffer.array(), 0, 4);
byte[] portBytes = new byte[2];
System.arraycopy(bytes, 4, portBytes, 0, portBytes.length);
try
{
CharBuffer charBuffer = buffer.asCharBuffer();
String host = hostNa
mes_.get(charBuffer);
if (host == null)
{
host = InetAddress.getByAddress(buffer.array()).getHostName();
hostNames_.put(charBuffer, host);
}
int port = (int) MessagingService.byteArrayToShort(portBytes);
return new EndPoint(host, port);
}
catch (UnknownHostException e)
{
throw new IllegalArgumentException(e);
}
}
}
\ No newline at end of file
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/FileStreamTask.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/FileStreamTask.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/FileStreamTask.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/FileStreamTask.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.*;
+import java.net.SocketException;
+
+import org.apache.cassandra.net.sink.SinkManager;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class FileStreamTask implements Runnable
+{
+ private static Logger logger_ = Logger.getLogger( FileStreamTask.class );
+
+ private String file_;
+ private long startPosition_;
+ private long total_;
+ private EndPoint from_;
+ private EndPoint to_;
+
+ FileStreamTask(String file, long startPosition, long total, EndPoint from, EndPoint to)
+ {
+ file_ = file;
+ startPosition_ = startPosition;
+ total_ = total;
+ from_ = from;
+ to_ = to;
+ }
+
+ public void run()
+ {
+ TcpConnection connection = null;
+ try
+ {
+ connection = new TcpConnection(from_, to_);
+ File file = new File(file_);
+ connection.stream(file, startPosition_, total_);
+ MessagingService.setStreamingMode(false);
+ logger_.debug("Done streaming " + file);
+ }
+ catch ( SocketException se )
+ {
+ logger_.info(LogUtil.throwableToString(se));
+ }
+ catch ( IOException e )
+ {
+ logConnectAndIOException(e, connection);
+ }
+ catch (Throwable th)
+ {
+ logger_.warn(LogUtil.throwableToString(th));
+ }
+ }
+
+ private void logConnectAndIOException(IOException ex, TcpConnection connection)
+ {
+ if ( connection != null )
+ {
+ connection.errorClose();
+ }
+ logger_.info(LogUtil.throwableToString(ex));
+ }
+}