You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2009/03/02 07:13:17 UTC
svn commit: r749207 [1/12] - in
/incubator/cassandra/src/org/apache/cassandra: loader/ locator/ net/
net/http/ net/io/ net/sink/ procedures/ service/ test/ tools/
Author: alakshman
Date: Mon Mar 2 06:13:14 2009
New Revision: 749207
URL: http://svn.apache.org/viewvc?rev=749207&view=rev
Log:
Adding the remaining sources into Apache.
Added:
incubator/cassandra/src/org/apache/cassandra/loader/
incubator/cassandra/src/org/apache/cassandra/loader/ColumnFamilyType.java
incubator/cassandra/src/org/apache/cassandra/loader/ColumnType.java
incubator/cassandra/src/org/apache/cassandra/loader/CustomLoader.java
incubator/cassandra/src/org/apache/cassandra/loader/FieldCollection.java
incubator/cassandra/src/org/apache/cassandra/loader/Importer.java
incubator/cassandra/src/org/apache/cassandra/loader/KeyType.java
incubator/cassandra/src/org/apache/cassandra/loader/Loader.java
incubator/cassandra/src/org/apache/cassandra/loader/ObjectFactory.java
incubator/cassandra/src/org/apache/cassandra/loader/PreLoad.java
incubator/cassandra/src/org/apache/cassandra/loader/SuperColumnType.java
incubator/cassandra/src/org/apache/cassandra/loader/TimestampType.java
incubator/cassandra/src/org/apache/cassandra/loader/ValueType.java
incubator/cassandra/src/org/apache/cassandra/locator/
incubator/cassandra/src/org/apache/cassandra/locator/AbstractStrategy.java
incubator/cassandra/src/org/apache/cassandra/locator/EndPointSnitch.java
incubator/cassandra/src/org/apache/cassandra/locator/IEndPointSnitch.java
incubator/cassandra/src/org/apache/cassandra/locator/IReplicaPlacementStrategy.java
incubator/cassandra/src/org/apache/cassandra/locator/RackAwareStrategy.java
incubator/cassandra/src/org/apache/cassandra/locator/RackUnawareStrategy.java
incubator/cassandra/src/org/apache/cassandra/locator/TokenMetadata.java
incubator/cassandra/src/org/apache/cassandra/net/
incubator/cassandra/src/org/apache/cassandra/net/AsyncResult.java
incubator/cassandra/src/org/apache/cassandra/net/CompactEndPointSerializationHelper.java
incubator/cassandra/src/org/apache/cassandra/net/ConnectionStatistics.java
incubator/cassandra/src/org/apache/cassandra/net/EndPoint.java
incubator/cassandra/src/org/apache/cassandra/net/FileStreamTask.java
incubator/cassandra/src/org/apache/cassandra/net/Header.java
incubator/cassandra/src/org/apache/cassandra/net/HeaderTypes.java
incubator/cassandra/src/org/apache/cassandra/net/IAsyncCallback.java
incubator/cassandra/src/org/apache/cassandra/net/IAsyncResult.java
incubator/cassandra/src/org/apache/cassandra/net/IMessagingService.java
incubator/cassandra/src/org/apache/cassandra/net/IVerbHandler.java
incubator/cassandra/src/org/apache/cassandra/net/Message.java
incubator/cassandra/src/org/apache/cassandra/net/MessageDeliveryTask.java
incubator/cassandra/src/org/apache/cassandra/net/MessageDeserializationTask.java
incubator/cassandra/src/org/apache/cassandra/net/MessageSerializationTask.java
incubator/cassandra/src/org/apache/cassandra/net/MessagingConfig.java
incubator/cassandra/src/org/apache/cassandra/net/MessagingService.java
incubator/cassandra/src/org/apache/cassandra/net/MessagingServiceMBean.java
incubator/cassandra/src/org/apache/cassandra/net/ProtocolHeader.java
incubator/cassandra/src/org/apache/cassandra/net/ResponseVerbHandler.java
incubator/cassandra/src/org/apache/cassandra/net/SelectionKeyHandler.java
incubator/cassandra/src/org/apache/cassandra/net/SelectorManager.java
incubator/cassandra/src/org/apache/cassandra/net/TcpConnection.java
incubator/cassandra/src/org/apache/cassandra/net/TcpConnectionHandler.java
incubator/cassandra/src/org/apache/cassandra/net/TcpConnectionManager.java
incubator/cassandra/src/org/apache/cassandra/net/UdpConnection.java
incubator/cassandra/src/org/apache/cassandra/net/http/
incubator/cassandra/src/org/apache/cassandra/net/http/ColumnFamilyFormatter.java
incubator/cassandra/src/org/apache/cassandra/net/http/HTMLFormatter.java
incubator/cassandra/src/org/apache/cassandra/net/http/HttpConnection.java
incubator/cassandra/src/org/apache/cassandra/net/http/HttpConnectionHandler.java
incubator/cassandra/src/org/apache/cassandra/net/http/HttpHeaderParser.java
incubator/cassandra/src/org/apache/cassandra/net/http/HttpParsingException.java
incubator/cassandra/src/org/apache/cassandra/net/http/HttpProtocolConstants.java
incubator/cassandra/src/org/apache/cassandra/net/http/HttpRequest.java
incubator/cassandra/src/org/apache/cassandra/net/http/HttpResponse.java
incubator/cassandra/src/org/apache/cassandra/net/http/HttpStartLineParser.java
incubator/cassandra/src/org/apache/cassandra/net/http/HttpWriteResponse.java
incubator/cassandra/src/org/apache/cassandra/net/io/
incubator/cassandra/src/org/apache/cassandra/net/io/ContentLengthState.java
incubator/cassandra/src/org/apache/cassandra/net/io/ContentState.java
incubator/cassandra/src/org/apache/cassandra/net/io/ContentStreamState.java
incubator/cassandra/src/org/apache/cassandra/net/io/DoneState.java
incubator/cassandra/src/org/apache/cassandra/net/io/FastSerializer.java
incubator/cassandra/src/org/apache/cassandra/net/io/ISerializer.java
incubator/cassandra/src/org/apache/cassandra/net/io/IStreamComplete.java
incubator/cassandra/src/org/apache/cassandra/net/io/ProtocolHeaderState.java
incubator/cassandra/src/org/apache/cassandra/net/io/ProtocolState.java
incubator/cassandra/src/org/apache/cassandra/net/io/ReadNotCompleteException.java
incubator/cassandra/src/org/apache/cassandra/net/io/SerializerAttribute.java
incubator/cassandra/src/org/apache/cassandra/net/io/SerializerType.java
incubator/cassandra/src/org/apache/cassandra/net/io/StartState.java
incubator/cassandra/src/org/apache/cassandra/net/io/StreamContextManager.java
incubator/cassandra/src/org/apache/cassandra/net/io/TcpReader.java
incubator/cassandra/src/org/apache/cassandra/net/sink/
incubator/cassandra/src/org/apache/cassandra/net/sink/IMessageSink.java
incubator/cassandra/src/org/apache/cassandra/net/sink/SinkManager.java
incubator/cassandra/src/org/apache/cassandra/procedures/
incubator/cassandra/src/org/apache/cassandra/procedures/GroovyScriptRunner.java
incubator/cassandra/src/org/apache/cassandra/service/
incubator/cassandra/src/org/apache/cassandra/service/BootstrapAndLbHelper.java
incubator/cassandra/src/org/apache/cassandra/service/Cassandra.java
incubator/cassandra/src/org/apache/cassandra/service/CassandraException.java
incubator/cassandra/src/org/apache/cassandra/service/CassandraServer.java
incubator/cassandra/src/org/apache/cassandra/service/ConsistencyManager.java
incubator/cassandra/src/org/apache/cassandra/service/Constants.java
incubator/cassandra/src/org/apache/cassandra/service/CqlResult_t.java
incubator/cassandra/src/org/apache/cassandra/service/DigestMismatchException.java
incubator/cassandra/src/org/apache/cassandra/service/HttpRequestVerbHandler.java
incubator/cassandra/src/org/apache/cassandra/service/IComponentShutdown.java
incubator/cassandra/src/org/apache/cassandra/service/IPartitioner.java
incubator/cassandra/src/org/apache/cassandra/service/IResponseResolver.java
incubator/cassandra/src/org/apache/cassandra/service/LeaderElector.java
incubator/cassandra/src/org/apache/cassandra/service/LoadDisseminator.java
incubator/cassandra/src/org/apache/cassandra/service/LoadInfo.java
incubator/cassandra/src/org/apache/cassandra/service/LocationInfoVerbHandler.java
incubator/cassandra/src/org/apache/cassandra/service/OrderPreservingHashPartitioner.java
incubator/cassandra/src/org/apache/cassandra/service/PartitionerType.java
incubator/cassandra/src/org/apache/cassandra/service/QuorumResponseHandler.java
incubator/cassandra/src/org/apache/cassandra/service/RandomPartitioner.java
incubator/cassandra/src/org/apache/cassandra/service/ReadRepairManager.java
incubator/cassandra/src/org/apache/cassandra/service/ReadResponseResolver.java
incubator/cassandra/src/org/apache/cassandra/service/StorageLoadBalancer.java
incubator/cassandra/src/org/apache/cassandra/service/StorageProxy.java
incubator/cassandra/src/org/apache/cassandra/service/StorageService.java
incubator/cassandra/src/org/apache/cassandra/service/StorageServiceMBean.java
incubator/cassandra/src/org/apache/cassandra/service/StreamManager.java
incubator/cassandra/src/org/apache/cassandra/service/TokenInfoVerbHandler.java
incubator/cassandra/src/org/apache/cassandra/service/TokenUpdateVerbHandler.java
incubator/cassandra/src/org/apache/cassandra/service/WriteResponseResolver.java
incubator/cassandra/src/org/apache/cassandra/service/ZookeeperWatcher.java
incubator/cassandra/src/org/apache/cassandra/service/batch_mutation_super_t.java
incubator/cassandra/src/org/apache/cassandra/service/batch_mutation_t.java
incubator/cassandra/src/org/apache/cassandra/service/column_t.java
incubator/cassandra/src/org/apache/cassandra/service/superColumn_t.java
incubator/cassandra/src/org/apache/cassandra/test/
incubator/cassandra/src/org/apache/cassandra/test/DBTest.java
incubator/cassandra/src/org/apache/cassandra/test/DataImporter.java
incubator/cassandra/src/org/apache/cassandra/test/SSTableTest.java
incubator/cassandra/src/org/apache/cassandra/test/StressTest.java
incubator/cassandra/src/org/apache/cassandra/test/TestChoice.java
incubator/cassandra/src/org/apache/cassandra/test/TestRunner.java
incubator/cassandra/src/org/apache/cassandra/test/UtilsTest.java
incubator/cassandra/src/org/apache/cassandra/tools/
incubator/cassandra/src/org/apache/cassandra/tools/AdminTool.java
incubator/cassandra/src/org/apache/cassandra/tools/ClusterTool.java
incubator/cassandra/src/org/apache/cassandra/tools/FileSizeTokenGenerator.java
incubator/cassandra/src/org/apache/cassandra/tools/IndexBuilder.java
incubator/cassandra/src/org/apache/cassandra/tools/KeyChecker.java
incubator/cassandra/src/org/apache/cassandra/tools/KeyExtracter.java
incubator/cassandra/src/org/apache/cassandra/tools/MembershipCleaner.java
incubator/cassandra/src/org/apache/cassandra/tools/MembershipCleanerVerbHandler.java
incubator/cassandra/src/org/apache/cassandra/tools/ThreadListBuilder.java
incubator/cassandra/src/org/apache/cassandra/tools/TokenUpdateVerbHandler.java
incubator/cassandra/src/org/apache/cassandra/tools/TokenUpdater.java
Added: incubator/cassandra/src/org/apache/cassandra/loader/ColumnFamilyType.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/loader/ColumnFamilyType.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/loader/ColumnFamilyType.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/loader/ColumnFamilyType.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,188 @@
+//
+// This file was generated by the JavaTM Architecture for XML Binding(JAXB) Reference Implementation, vJAXB 2.1.3 in JDK 1.6
+// See <a href="http://java.sun.com/xml/jaxb">http://java.sun.com/xml/jaxb</a>
+// Any modifications to this file will be lost upon recompilation of the source schema.
+// Generated on: 2007.10.19 at 01:36:41 PM PDT
+//
+
+
+package org.apache.cassandra.loader;
+
+import java.util.ArrayList;
+import java.util.List;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlType;
+
+
+/**
+ * <p>Java class for ColumnFamilyType complex type.
+ *
+ * <p>The following schema fragment specifies the expected content contained within this class.
+ *
+ * <pre>
+ * <complexType name="ColumnFamilyType">
+ * <complexContent>
+ * <restriction base="{http://www.w3.org/2001/XMLSchema}anyType">
+ * <sequence>
+ * <element name="Column" type="{}ColumnType" maxOccurs="unbounded"/>
+ * <element name="SuperColumn" type="{}SuperColumnType"/>
+ * <element name="Directory" type="{http://www.w3.org/2001/XMLSchema}string"/>
+ * <element name="Delimiter" type="{http://www.w3.org/2001/XMLSchema}string"/>
+ * </sequence>
+ * <attribute name="Name" type="{http://www.w3.org/2001/XMLSchema}string" />
+ * </restriction>
+ * </complexContent>
+ * </complexType>
+ * </pre>
+ *
+ *
+ */
+@XmlAccessorType(XmlAccessType.FIELD)
+@XmlType(name = "ColumnFamilyType", propOrder = {
+ "column",
+ "superColumn",
+ "directory",
+ "delimiter"
+})
+public class ColumnFamilyType {
+
+ @XmlElement(name = "Column", required = true, nillable = true)
+ protected List<ColumnType> column;
+ @XmlElement(name = "SuperColumn", required = true, nillable = true)
+ protected SuperColumnType superColumn;
+ @XmlElement(name = "Directory", required = true)
+ protected String directory;
+ @XmlElement(name = "Delimiter", required = true)
+ protected String delimiter;
+ @XmlAttribute(name = "Name")
+ protected String name;
+
+ /**
+ * Gets the value of the column property.
+ *
+ * <p>
+ * This accessor method returns a reference to the live list,
+ * not a snapshot. Therefore any modification you make to the
+ * returned list will be present inside the JAXB object.
+ * This is why there is not a <CODE>set</CODE> method for the column property.
+ *
+ * <p>
+ * For example, to add a new item, do as follows:
+ * <pre>
+ * getColumn().add(newItem);
+ * </pre>
+ *
+ *
+ * <p>
+ * Objects of the following type(s) are allowed in the list
+ * {@link ColumnType }
+ *
+ *
+ */
+ public List<ColumnType> getColumn() {
+ if (column == null) {
+ column = new ArrayList<ColumnType>();
+ }
+ return this.column;
+ }
+
+ /**
+ * Gets the value of the superColumn property.
+ *
+ * @return
+ * possible object is
+ * {@link SuperColumnType }
+ *
+ */
+ public SuperColumnType getSuperColumn() {
+ return superColumn;
+ }
+
+ /**
+ * Sets the value of the superColumn property.
+ *
+ * @param value
+ * allowed object is
+ * {@link SuperColumnType }
+ *
+ */
+ public void setSuperColumn(SuperColumnType value) {
+ this.superColumn = value;
+ }
+
+ /**
+ * Gets the value of the directory property.
+ *
+ * @return
+ * possible object is
+ * {@link String }
+ *
+ */
+ public String getDirectory() {
+ return directory;
+ }
+
+ /**
+ * Sets the value of the directory property.
+ *
+ * @param value
+ * allowed object is
+ * {@link String }
+ *
+ */
+ public void setDirectory(String value) {
+ this.directory = value;
+ }
+
+ /**
+ * Gets the value of the delimiter property.
+ *
+ * @return
+ * possible object is
+ * {@link String }
+ *
+ */
+ public String getDelimiter() {
+ return delimiter;
+ }
+
+ /**
+ * Sets the value of the delimiter property.
+ *
+ * @param value
+ * allowed object is
+ * {@link String }
+ *
+ */
+ public void setDelimiter(String value) {
+ this.delimiter = value;
+ }
+
+ /**
+ * Gets the value of the name property.
+ *
+ * @return
+ * possible object is
+ * {@link String }
+ *
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Sets the value of the name property.
+ *
+ * @param value
+ * allowed object is
+ * {@link String }
+ *
+ */
+ public void setName(String value) {
+ this.name = value;
+ }
+
+}
Added: incubator/cassandra/src/org/apache/cassandra/loader/ColumnType.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/loader/ColumnType.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/loader/ColumnType.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/loader/ColumnType.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,152 @@
+//
+// This file was generated by the JavaTM Architecture for XML Binding(JAXB) Reference Implementation, vJAXB 2.1.3 in JDK 1.6
+// See <a href="http://java.sun.com/xml/jaxb">http://java.sun.com/xml/jaxb</a>
+// Any modifications to this file will be lost upon recompilation of the source schema.
+// Generated on: 2007.10.19 at 01:36:41 PM PDT
+//
+
+
+package org.apache.cassandra.loader;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlType;
+
+
+/**
+ * <p>Java class for ColumnType complex type.
+ *
+ * <p>The following schema fragment specifies the expected content contained within this class.
+ *
+ * <pre>
+ * <complexType name="ColumnType">
+ * <complexContent>
+ * <restriction base="{http://www.w3.org/2001/XMLSchema}anyType">
+ * <sequence>
+ * <element name="Value" type="{}ValueType"/>
+ * <element name="Timestamp" type="{}TimestampType"/>
+ * </sequence>
+ * <attribute name="Name" type="{http://www.w3.org/2001/XMLSchema}string" />
+ * <attribute name="Field" type="{http://www.w3.org/2001/XMLSchema}int" />
+ * </restriction>
+ * </complexContent>
+ * </complexType>
+ * </pre>
+ *
+ *
+ */
+@XmlAccessorType(XmlAccessType.FIELD)
+@XmlType(name = "ColumnType", propOrder = {
+ "value",
+ "timestamp"
+})
+public class ColumnType {
+
+ @XmlElement(name = "Value", required = true)
+ protected ValueType value;
+ @XmlElement(name = "Timestamp", required = true)
+ protected TimestampType timestamp;
+ @XmlAttribute(name = "Name")
+ protected String name;
+ @XmlAttribute(name = "Field")
+ protected Integer field;
+
+ /**
+ * Gets the value of the value property.
+ *
+ * @return
+ * possible object is
+ * {@link ValueType }
+ *
+ */
+ public ValueType getValue() {
+ return value;
+ }
+
+ /**
+ * Sets the value of the value property.
+ *
+ * @param value
+ * allowed object is
+ * {@link ValueType }
+ *
+ */
+ public void setValue(ValueType value) {
+ this.value = value;
+ }
+
+ /**
+ * Gets the value of the timestamp property.
+ *
+ * @return
+ * possible object is
+ * {@link TimestampType }
+ *
+ */
+ public TimestampType getTimestamp() {
+ return timestamp;
+ }
+
+ /**
+ * Sets the value of the timestamp property.
+ *
+ * @param value
+ * allowed object is
+ * {@link TimestampType }
+ *
+ */
+ public void setTimestamp(TimestampType value) {
+ this.timestamp = value;
+ }
+
+ /**
+ * Gets the value of the name property.
+ *
+ * @return
+ * possible object is
+ * {@link String }
+ *
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Sets the value of the name property.
+ *
+ * @param value
+ * allowed object is
+ * {@link String }
+ *
+ */
+ public void setName(String value) {
+ this.name = value;
+ }
+
+ /**
+ * Gets the value of the field property.
+ *
+ * @return
+ * possible object is
+ * {@link Integer }
+ *
+ */
+ public Integer getField() {
+ return field;
+ }
+
+ /**
+ * Sets the value of the field property.
+ *
+ * @param value
+ * allowed object is
+ * {@link Integer }
+ *
+ */
+ public void setField(Integer value) {
+ this.field = value;
+ }
+
+}
Added: incubator/cassandra/src/org/apache/cassandra/loader/CustomLoader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/loader/CustomLoader.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/loader/CustomLoader.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/loader/CustomLoader.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.loader;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.StringTokenizer;
+
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.Token;
+import org.apache.lucene.analysis.TokenStream;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+
+
+public class CustomLoader
+{
+ private static Logger logger_ = Logger.getLogger( CustomLoader.class );
+ private StorageService storageService_;
+ private String path_;
+
+ public CustomLoader(StorageService storageService, String path)
+ {
+ storageService_ = storageService;
+ path_ = path;
+ }
+ /*
+ * This function checks if the local storage endpoint
+ * is reponsible for storing this key .
+ */
+ boolean checkIfProcessKey(String key)
+ {
+ EndPoint[] endPoints = storageService_.getNStorageEndPoint(key);
+ EndPoint localEndPoint = StorageService.getLocalStorageEndPoint();
+ for(EndPoint endPoint : endPoints)
+ {
+ if(endPoint.equals(localEndPoint))
+ return true;
+ }
+ return false;
+ }
+
+ boolean checkUser(String user, String[] list)
+ {
+ boolean bFound = false;
+ for(String l:list)
+ {
+ if(user.equals(l))
+ {
+ bFound = true;
+ }
+ }
+ return bFound;
+ }
+
+
+ void parse(String filepath) throws Throwable
+ {
+ try
+ {
+ BufferedReader bufReader = new BufferedReader(new InputStreamReader(
+ new FileInputStream(filepath)), 16 * 1024 * 1024);
+ String line = null;
+ RowMutation rm = null;
+ while ((line = bufReader.readLine()) != null)
+ {
+ // userid threadid folder date part-list author-list subject body
+ String columns[] = line.split("\t");
+ if(columns.length < 7)
+ continue;
+ if( rm == null)
+ {
+ rm = new RowMutation("Mailbox", columns[0]);
+ }
+ Analyzer analyzer = new StandardAnalyzer();
+ String body = null;
+ if(columns.length > 7 )
+ body = columns[6]+" "+columns[7];
+ else
+ body = columns[6];
+
+ TokenStream ts = analyzer.tokenStream("superColumn", new StringReader(body));
+ Token token = null;
+ token = ts.next();
+ while(token != null)
+ {
+ if(token.termText() != "")
+ {
+ rm.add("MailboxThreadList0:"+token.termText()+":"+columns[1], columns[2].getBytes(), Integer.parseInt(columns[3]) );
+ }
+ token = ts.next();
+ }
+ rm.add("MailboxMailList0:"+columns[1], columns[2].getBytes(), Integer.parseInt(columns[3]));
+ String authors = columns[5];
+ String participants = columns[4];
+ if( authors == null)
+ authors = "";
+ if(participants == null)
+ participants = "";
+ String[] authorList = authors.split(":");
+ String[] partList = participants.split(":");
+ String[] mailersList = null;
+ if(checkUser(columns[0], authorList))
+ mailersList = partList;
+ else
+ mailersList = authorList;
+ for(String mailer : mailersList)
+ {
+ if(!mailer.equals(columns[0]))
+ {
+ rm.add("MailboxUserList0:"+ mailer + ":" +columns[1], columns[2].getBytes(), Integer.parseInt(columns[3]) );
+ }
+ }
+ }
+ if(rm != null)
+ {
+ rm.apply();
+ }
+ }
+ catch ( Throwable ex )
+ {
+ logger_.error( LogUtil.throwableToString(ex) );
+ }
+ }
+
+ void parseFileList(File dir)
+ {
+ int fileCount = dir.list().length;
+ String[] dirList = dir.list();
+ File[] fileList = dir.listFiles();
+ for ( int i = 0 ; i < fileCount ; i++ )
+ {
+ File file = new File(fileList[i].getAbsolutePath());
+ if ( file.isDirectory())
+ {
+ parseFileList(file);
+ }
+ else
+ {
+ try
+ {
+ if(checkIfProcessKey(dirList[i]))
+ {
+ parse(fileList[i].getAbsolutePath());
+ }
+ }
+ catch ( Throwable ex )
+ {
+ logger_.error( LogUtil.throwableToString(ex) );
+ }
+ }
+ }
+ }
+
+
+
+
+ /**
+ * @param args
+ */
+ public static void main(String[] args) throws Throwable
+ {
+ if(args.length != 1)
+ {
+ System.out.println("Usage: CustomLoader <root path to the data files>");
+ }
+ LogUtil.init();
+ StorageService s = StorageService.instance();
+ s.start();
+ CustomLoader loader = new CustomLoader(s, args[0]);
+ File rootDirectory = new File(args[0]);
+ long start = System.currentTimeMillis();
+ loader.parseFileList(rootDirectory);
+ logger_.info("Done Loading: " + (System.currentTimeMillis() - start)
+ + " ms.");
+ }
+
+}
Added: incubator/cassandra/src/org/apache/cassandra/loader/FieldCollection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/loader/FieldCollection.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/loader/FieldCollection.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/loader/FieldCollection.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,76 @@
+//
+// This file was generated by the JavaTM Architecture for XML Binding(JAXB) Reference Implementation, vJAXB 2.1.3 in JDK 1.6
+// See <a href="http://java.sun.com/xml/jaxb">http://java.sun.com/xml/jaxb</a>
+// Any modifications to this file will be lost upon recompilation of the source schema.
+// Generated on: 2007.10.19 at 01:36:41 PM PDT
+//
+
+
+package org.apache.cassandra.loader;
+
+import java.util.ArrayList;
+import java.util.List;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlType;
+
+
+/**
+ * <p>Java class for FieldCollection complex type.
+ *
+ * <p>The following schema fragment specifies the expected content contained within this class.
+ *
+ * <pre>
+ * <complexType name="FieldCollection">
+ * <complexContent>
+ * <restriction base="{http://www.w3.org/2001/XMLSchema}anyType">
+ * <sequence>
+ * <element name="Field" type="{http://www.w3.org/2001/XMLSchema}int" maxOccurs="unbounded"/>
+ * </sequence>
+ * </restriction>
+ * </complexContent>
+ * </complexType>
+ * </pre>
+ *
+ *
+ */
+@XmlAccessorType(XmlAccessType.FIELD)
+@XmlType(name = "FieldCollection", propOrder = {
+ "field"
+})
+public class FieldCollection {
+
+ @XmlElement(name = "Field", type = Integer.class)
+ protected List<Integer> field;
+
+ /**
+ * Gets the value of the field property.
+ *
+ * <p>
+ * This accessor method returns a reference to the live list,
+ * not a snapshot. Therefore any modification you make to the
+ * returned list will be present inside the JAXB object.
+ * This is why there is not a <CODE>set</CODE> method for the field property.
+ *
+ * <p>
+ * For example, to add a new item, do as follows:
+ * <pre>
+ * getField().add(newItem);
+ * </pre>
+ *
+ *
+ * <p>
+ * Objects of the following type(s) are allowed in the list
+ * {@link Integer }
+ *
+ *
+ */
+ public List<Integer> getField() {
+ if (field == null) {
+ field = new ArrayList<Integer>();
+ }
+ return this.field;
+ }
+
+}
Added: incubator/cassandra/src/org/apache/cassandra/loader/Importer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/loader/Importer.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/loader/Importer.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/loader/Importer.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,127 @@
+//
+// This file was generated by the JavaTM Architecture for XML Binding(JAXB) Reference Implementation, vJAXB 2.1.3 in JDK 1.6
+// See <a href="http://java.sun.com/xml/jaxb">http://java.sun.com/xml/jaxb</a>
+// Any modifications to this file will be lost upon recompilation of the source schema.
+// Generated on: 2007.10.19 at 01:36:41 PM PDT
+//
+
+
+package org.apache.cassandra.loader;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.XmlType;
+
+
+/**
+ * <p>Java class for anonymous complex type.
+ *
+ * <p>The following schema fragment specifies the expected content contained within this class.
+ *
+ * <pre>
+ * <complexType>
+ * <complexContent>
+ * <restriction base="{http://www.w3.org/2001/XMLSchema}anyType">
+ * <sequence>
+ * <element name="Table" type="{http://www.w3.org/2001/XMLSchema}string"/>
+ * <element name="Key" type="{}KeyType"/>
+ * <element name="ColumnFamily" type="{}ColumnFamilyType"/>
+ * </sequence>
+ * </restriction>
+ * </complexContent>
+ * </complexType>
+ * </pre>
+ *
+ *
+ */
+@XmlAccessorType(XmlAccessType.FIELD)
+@XmlType(name = "", propOrder = {
+ "table",
+ "key",
+ "columnFamily"
+})
+@XmlRootElement(name = "Importer")
+public class Importer {
+
+ @XmlElement(name = "Table", required = true)
+ protected String table;
+ @XmlElement(name = "Key", required = true)
+ protected KeyType key;
+ @XmlElement(name = "ColumnFamily", required = true)
+ protected ColumnFamilyType columnFamily;
+
+ /**
+ * Gets the value of the table property.
+ *
+ * @return
+ * possible object is
+ * {@link String }
+ *
+ */
+ public String getTable() {
+ return table;
+ }
+
+ /**
+ * Sets the value of the table property.
+ *
+ * @param value
+ * allowed object is
+ * {@link String }
+ *
+ */
+ public void setTable(String value) {
+ this.table = value;
+ }
+
+ /**
+ * Gets the value of the key property.
+ *
+ * @return
+ * possible object is
+ * {@link KeyType }
+ *
+ */
+ public KeyType getKey() {
+ return key;
+ }
+
+ /**
+ * Sets the value of the key property.
+ *
+ * @param value
+ * allowed object is
+ * {@link KeyType }
+ *
+ */
+ public void setKey(KeyType value) {
+ this.key = value;
+ }
+
+ /**
+ * Gets the value of the columnFamily property.
+ *
+ * @return
+ * possible object is
+ * {@link ColumnFamilyType }
+ *
+ */
+ public ColumnFamilyType getColumnFamily() {
+ return columnFamily;
+ }
+
+ /**
+ * Sets the value of the columnFamily property.
+ *
+ * @param value
+ * allowed object is
+ * {@link ColumnFamilyType }
+ *
+ */
+ public void setColumnFamily(ColumnFamilyType value) {
+ this.columnFamily = value;
+ }
+
+}
Added: incubator/cassandra/src/org/apache/cassandra/loader/KeyType.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/loader/KeyType.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/loader/KeyType.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/loader/KeyType.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,125 @@
+//
+// This file was generated by the JavaTM Architecture for XML Binding(JAXB) Reference Implementation, vJAXB 2.1.3 in JDK 1.6
+// See <a href="http://java.sun.com/xml/jaxb">http://java.sun.com/xml/jaxb</a>
+// Any modifications to this file will be lost upon recompilation of the source schema.
+// Generated on: 2007.10.19 at 01:36:41 PM PDT
+//
+
+
+package org.apache.cassandra.loader;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlType;
+
+
+/**
+ * <p>Java class for KeyType complex type.
+ *
+ * <p>The following schema fragment specifies the expected content contained within this class.
+ *
+ * <pre>
+ * <complexType name="KeyType">
+ * <complexContent>
+ * <restriction base="{http://www.w3.org/2001/XMLSchema}anyType">
+ * <sequence>
+ * <element name="OptimizeIt" type="{http://www.w3.org/2001/XMLSchema}boolean"/>
+ * <element name="Combiner" type="{http://www.w3.org/2001/XMLSchema}string"/>
+ * <element name="Fields" type="{}FieldCollection"/>
+ * </sequence>
+ * </restriction>
+ * </complexContent>
+ * </complexType>
+ * </pre>
+ *
+ *
+ */
+@XmlAccessorType(XmlAccessType.FIELD)
+@XmlType(name = "KeyType", propOrder = {
+ "optimizeIt",
+ "combiner",
+ "fields"
+})
+public class KeyType {
+
+ @XmlElement(name = "OptimizeIt", required = true, type = Boolean.class, nillable = true)
+ protected Boolean optimizeIt;
+ @XmlElement(name = "Combiner", required = true)
+ protected String combiner;
+ @XmlElement(name = "Fields", required = true)
+ protected FieldCollection fields;
+
+ /**
+ * Gets the value of the optimizeIt property.
+ *
+ * @return
+ * possible object is
+ * {@link Boolean }
+ *
+ */
+ public Boolean isOptimizeIt() {
+ return optimizeIt;
+ }
+
+ /**
+ * Sets the value of the optimizeIt property.
+ *
+ * @param value
+ * allowed object is
+ * {@link Boolean }
+ *
+ */
+ public void setOptimizeIt(Boolean value) {
+ this.optimizeIt = value;
+ }
+
+ /**
+ * Gets the value of the combiner property.
+ *
+ * @return
+ * possible object is
+ * {@link String }
+ *
+ */
+ public String getCombiner() {
+ return combiner;
+ }
+
+ /**
+ * Sets the value of the combiner property.
+ *
+ * @param value
+ * allowed object is
+ * {@link String }
+ *
+ */
+ public void setCombiner(String value) {
+ this.combiner = value;
+ }
+
+ /**
+ * Gets the value of the fields property.
+ *
+ * @return
+ * possible object is
+ * {@link FieldCollection }
+ *
+ */
+ public FieldCollection getFields() {
+ return fields;
+ }
+
+ /**
+ * Sets the value of the fields property.
+ *
+ * @param value
+ * allowed object is
+ * {@link FieldCollection }
+ *
+ */
+ public void setFields(FieldCollection value) {
+ this.fields = value;
+ }
+
+}
Added: incubator/cassandra/src/org/apache/cassandra/loader/Loader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/loader/Loader.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/loader/Loader.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/loader/Loader.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,356 @@
+/**
+ *
+ */
+package org.apache.cassandra.loader;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.StringTokenizer;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.Unmarshaller;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.locator.EndPointSnitch;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.Token;
+import org.apache.lucene.analysis.TokenStream;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.cassandra.utils.*;
+
+
+/**
+ * This class is used to load the storage endpoints with the relevant data
+ * The data should be both what they are responsible for and what should be replicated on the specific
+ * endpoints.
+ * Population is done based on a xml file which should adhere to a schema.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+public class Loader
+{
+ private static long siesta_ = 60*1000;
+ private static Logger logger_ = Logger.getLogger( Loader.class );
+ private Importer importer_;
+ private StorageService storageService_;
+
+ public Loader(StorageService storageService)
+ {
+ storageService_ = storageService;
+ }
+
+ /*
+ * This method loads all the keys into a special column family
+ * called "RecycleBin". This column family is used for temporary
+ * processing of data and then can be recycled. The idea is that
+ * after the load is complete we have all the keys in the system.
+ * Now we force a compaction and examine the single Index file
+ * that is generated to determine how the nodes need to relocate
+ * to be perfectly load balanced.
+ *
+ * param @ rootDirectory - rootDirectory at which the parsing begins.
+ * param @ table - table that will be populated.
+ * param @ cfName - name of the column that will be populated. This is
+ * passed in so that we do not unncessary allocate temporary String objects.
+ */
+ private void preParse(File rootDirectory, String table, String cfName) throws Throwable
+ {
+ File[] files = rootDirectory.listFiles();
+
+ for ( File file : files )
+ {
+ if ( file.isDirectory() )
+ preParse(file, table, cfName);
+ else
+ {
+ String fileName = file.getName();
+ RowMutation rm = new RowMutation(table, fileName);
+ rm.add(cfName, fileName.getBytes(), 0);
+ rm.apply();
+ }
+ }
+ }
+
+ /*
+ * Merges a list of strings with a particular combiner.
+ */
+ String merge( List<String> listFields, String combiner)
+ {
+ if(listFields.size() == 0 )
+ return null;
+ if(listFields.size() == 1)
+ return listFields.get(0);
+ String mergedKey = null;
+ for(String field: listFields)
+ {
+ if(mergedKey == null)
+ {
+ mergedKey = field;
+ }
+ else
+ {
+ mergedKey = mergedKey + combiner + field;
+ }
+ }
+ return mergedKey;
+
+ }
+
+ /*
+ * This function checks if the local storage endpoint
+ * is reponsible for storing this key .
+ */
+ boolean checkIfProcessKey(String key)
+ {
+ EndPoint[] endPoints = storageService_.getNStorageEndPoint(key);
+ EndPoint localEndPoint = StorageService.getLocalStorageEndPoint();
+ for(EndPoint endPoint : endPoints)
+ {
+ if(endPoint.equals(localEndPoint))
+ return true;
+ }
+ return false;
+ }
+
+ /*
+ * This functions parses each file based on delimiters specified in the
+ * xml file. It also looks at all the parameters specified in teh xml and based
+ * on that populates the internal Row structure.
+ */
+ void parse(String filepath) throws Throwable
+ {
+ BufferedReader bufReader = new BufferedReader(new InputStreamReader(
+ new FileInputStream(filepath)), 16 * 1024 * 1024);
+ String line = null;
+ String delimiter_ = new String(",");
+ RowMutation rm = null;
+ Map<String, RowMutation> rms = new HashMap<String, RowMutation>();
+ if(importer_.columnFamily.delimiter != null)
+ {
+ delimiter_ = importer_.columnFamily.delimiter;
+ }
+ while ((line = bufReader.readLine()) != null)
+ {
+ StringTokenizer st = new StringTokenizer(line, delimiter_);
+ List<String> tokenList = new ArrayList<String>();
+ String key = null;
+ while (st.hasMoreElements())
+ {
+ tokenList.add((String)st.nextElement());
+ }
+ /* Construct the Key */
+ List<String> keyFields = new ArrayList<String> ();
+ for(int fieldId: importer_.key.fields.field)
+ {
+ keyFields.add(tokenList.get(fieldId));
+ }
+ key = merge(keyFields, importer_.key.combiner);
+ if(importer_.key.optimizeIt != null && !importer_.key.optimizeIt)
+ {
+ if(!checkIfProcessKey(key))
+ {
+ continue;
+ }
+ }
+ rm = rms.get(key);
+ if( rm == null)
+ {
+ rm = new RowMutation(importer_.table, key);
+ rms.put(key, rm);
+ }
+ if(importer_.columnFamily.superColumn != null)
+ {
+ List<String> superColumnList = new ArrayList<String>();
+ for(int fieldId : importer_.columnFamily.superColumn.fields.field)
+ {
+ superColumnList.add(tokenList.get(fieldId));
+ }
+ String superColumnName = merge(superColumnList, " ");
+ superColumnList.clear();
+ if(importer_.columnFamily.superColumn.tokenize)
+ {
+ Analyzer analyzer = new StandardAnalyzer();
+ TokenStream ts = analyzer.tokenStream("superColumn", new StringReader(superColumnName));
+ Token token = null;
+ token = ts.next();
+ while(token != null)
+ {
+ superColumnList.add(token.termText());
+ token = ts.next();
+ }
+ }
+ else
+ {
+ superColumnList.add(superColumnName);
+ }
+ for(String sName : superColumnList)
+ {
+ String cfName = importer_.columnFamily.name + ":" + sName;
+ if(importer_.columnFamily.column != null)
+ {
+ for(ColumnType column : importer_.columnFamily.column )
+ {
+ String cfColumn = cfName +":" + (column.name == null ? tokenList.get(column.field):column.name);
+ rm.add(cfColumn, tokenList.get(column.value.field).getBytes(), Integer.parseInt(tokenList.get(column.timestamp.field)));
+ }
+ }
+
+ }
+
+ }
+ else
+ {
+ if(importer_.columnFamily.column != null)
+ {
+ for(ColumnType column : importer_.columnFamily.column )
+ {
+ String cfColumn = importer_.columnFamily.name +":" + (column.name == null ? tokenList.get(column.field):column.name);
+ rm.add(cfColumn, tokenList.get(column.value.field).getBytes(), Integer.parseInt(tokenList.get(column.timestamp.field)));
+ }
+ }
+ }
+ }
+ // Now apply the data for all keys
+ // TODO : Add checks for large data
+ // size maybe we want to check the
+ // data size and then apply.
+ Set<String> keys = rms.keySet();
+ for(String pKey : keys)
+ {
+ rm = rms.get(pKey);
+ if( rm != null)
+ {
+ rm.apply();
+ }
+ }
+ }
+
+
+ void parseFileList(File dir)
+ {
+ int fileCount = dir.list().length;
+ for ( int i = 0 ; i < fileCount ; i++ )
+ {
+ File file = new File(dir.list()[i]);
+ if ( file.isDirectory())
+ {
+ parseFileList(file);
+ }
+ else
+ {
+ try
+ {
+ if(importer_.key.optimizeIt != null && importer_.key.optimizeIt)
+ {
+ if(checkIfProcessKey(dir.list()[i]))
+ {
+ parse(dir.listFiles()[i].getAbsolutePath());
+ }
+ }
+ else
+ {
+ parse(dir.listFiles()[i].getAbsolutePath());
+ }
+ }
+ catch ( Throwable ex )
+ {
+ logger_.error(ex.toString());
+ }
+ }
+ }
+ }
+
+ void preLoad(File rootDirectory) throws Throwable
+ {
+ String table = DatabaseDescriptor.getTables().get(0);
+ String cfName = Table.recycleBin_ + ":" + "Keys";
+ /* populate just the keys. */
+ preParse(rootDirectory, table, cfName);
+ /* dump the memtables */
+ Table.open(table).flush(false);
+ /* force a compaction of the files. */
+ Table.open(table).forceCompaction(null,null,null);
+
+ /*
+ * This is a hack to let everyone finish. Just sleep for
+ * a couple of minutes.
+ */
+ logger_.info("Taking a nap after forcing a compaction ...");
+ Thread.sleep(Loader.siesta_);
+
+ /* Figure out the keys in the index file to relocate the node */
+ List<String> ssTables = Table.open(table).getAllSSTablesOnDisk();
+ /* Load the indexes into memory */
+ for ( String df : ssTables )
+ {
+ SSTable ssTable = new SSTable(df);
+ ssTable.close();
+ }
+ /* We should have only one file since we just compacted. */
+ List<String> indexedKeys = SSTable.getIndexedKeys();
+ storageService_.relocate(indexedKeys.toArray( new String[0]) );
+
+ /*
+ * This is a hack to let everyone relocate and learn about
+ * each other. Just sleep for a couple of minutes.
+ */
+ logger_.info("Taking a nap after relocating ...");
+ Thread.sleep(Loader.siesta_);
+
+ /*
+ * Do the cleanup necessary. Delete all commit logs and
+ * the SSTables and reset the load state in the StorageService.
+ */
+ SSTable.delete(ssTables.get(0));
+// File commitLogDirectory = new File( DatabaseDescriptor.getLogFileLocation() );
+// FileUtils.delete(commitLogDirectory.listFiles());
+ storageService_.resetLoadState();
+ logger_.info("Finished all the requisite clean up ...");
+ }
+
+ void load(String xmlFile) throws Throwable
+ {
+ try
+ {
+ JAXBContext jc = JAXBContext.newInstance(this.getClass().getPackage().getName());
+ Unmarshaller u = jc.createUnmarshaller();
+ importer_ = (Importer)u.unmarshal(new FileInputStream( xmlFile ) );
+ String directory = importer_.columnFamily.directory;
+ File rootDirectory = new File(directory);
+ preLoad(rootDirectory);
+ parseFileList(rootDirectory);
+ }
+ catch (Exception e)
+ {
+ logger_.info(LogUtil.throwableToString(e));
+ }
+
+ }
+
+ /**
+ * @param args
+ */
+ public static void main(String[] args) throws Throwable
+ {
+ LogUtil.init();
+ StorageService s = StorageService.instance();
+ s.start();
+ Loader loader = new Loader(s);
+ loader.load("mbox_importer.xml");
+ }
+
+}
Added: incubator/cassandra/src/org/apache/cassandra/loader/ObjectFactory.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/loader/ObjectFactory.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/loader/ObjectFactory.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/loader/ObjectFactory.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,136 @@
+//
+// This file was generated by the JavaTM Architecture for XML Binding(JAXB) Reference Implementation, vJAXB 2.1.3 in JDK 1.6
+// See <a href="http://java.sun.com/xml/jaxb">http://java.sun.com/xml/jaxb</a>
+// Any modifications to this file will be lost upon recompilation of the source schema.
+// Generated on: 2007.10.19 at 01:36:41 PM PDT
+//
+
+
+package org.apache.cassandra.loader;
+
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.annotation.XmlElementDecl;
+import javax.xml.bind.annotation.XmlRegistry;
+import javax.xml.namespace.QName;
+
+
+/**
+ * This object contains factory methods for each
+ * Java content interface and Java element interface
+ * generated in the com.facebook.infrastructure.loader package.
+ * <p>An ObjectFactory allows you to programatically
+ * construct new instances of the Java representation
+ * for XML content. The Java representation of XML
+ * content can consist of schema derived interfaces
+ * and classes representing the binding of schema
+ * type definitions, element declarations and model
+ * groups. Factory methods for each of these are
+ * provided in this class.
+ *
+ */
+@XmlRegistry
+public class ObjectFactory {
+
+ private final static QName _SuperColumn_QNAME = new QName("", "SuperColumn");
+ private final static QName _Table_QNAME = new QName("", "Table");
+ private final static QName _Column_QNAME = new QName("", "Column");
+
+ /**
+ * Create a new ObjectFactory that can be used to create new instances of schema derived classes for package: com.facebook.infrastructure.loader
+ *
+ */
+ public ObjectFactory() {
+ }
+
+ /**
+ * Create an instance of {@link KeyType }
+ *
+ */
+ public KeyType createKeyType() {
+ return new KeyType();
+ }
+
+ /**
+ * Create an instance of {@link SuperColumnType }
+ *
+ */
+ public SuperColumnType createSuperColumnType() {
+ return new SuperColumnType();
+ }
+
+ /**
+ * Create an instance of {@link ColumnType }
+ *
+ */
+ public ColumnType createColumnType() {
+ return new ColumnType();
+ }
+
+ /**
+ * Create an instance of {@link Importer }
+ *
+ */
+ public Importer createImporter() {
+ return new Importer();
+ }
+
+ /**
+ * Create an instance of {@link ColumnFamilyType }
+ *
+ */
+ public ColumnFamilyType createColumnFamilyType() {
+ return new ColumnFamilyType();
+ }
+
+ /**
+ * Create an instance of {@link TimestampType }
+ *
+ */
+ public TimestampType createTimestampType() {
+ return new TimestampType();
+ }
+
+ /**
+ * Create an instance of {@link FieldCollection }
+ *
+ */
+ public FieldCollection createFieldCollection() {
+ return new FieldCollection();
+ }
+
+ /**
+ * Create an instance of {@link ValueType }
+ *
+ */
+ public ValueType createValueType() {
+ return new ValueType();
+ }
+
+ /**
+ * Create an instance of {@link JAXBElement }{@code <}{@link SuperColumnType }{@code >}}
+ *
+ */
+ @XmlElementDecl(namespace = "", name = "SuperColumn")
+ public JAXBElement<SuperColumnType> createSuperColumn(SuperColumnType value) {
+ return new JAXBElement<SuperColumnType>(_SuperColumn_QNAME, SuperColumnType.class, null, value);
+ }
+
+ /**
+ * Create an instance of {@link JAXBElement }{@code <}{@link String }{@code >}}
+ *
+ */
+ @XmlElementDecl(namespace = "", name = "Table")
+ public JAXBElement<String> createTable(String value) {
+ return new JAXBElement<String>(_Table_QNAME, String.class, null, value);
+ }
+
+ /**
+ * Create an instance of {@link JAXBElement }{@code <}{@link ColumnType }{@code >}}
+ *
+ */
+ @XmlElementDecl(namespace = "", name = "Column")
+ public JAXBElement<ColumnType> createColumn(ColumnType value) {
+ return new JAXBElement<ColumnType>(_Column_QNAME, ColumnType.class, null, value);
+ }
+
+}
Added: incubator/cassandra/src/org/apache/cassandra/loader/PreLoad.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/loader/PreLoad.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/loader/PreLoad.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/loader/PreLoad.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.loader;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
+import java.util.List;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class PreLoad
+{
+
+ private static long siesta_ = 2*60*1000;
+ private static Logger logger_ = Logger.getLogger( Loader.class );
+ private StorageService storageService_;
+
+ public PreLoad(StorageService storageService)
+ {
+ storageService_ = storageService;
+ }
+ /*
+ * This method loads all the keys into a special column family
+ * called "RecycleBin". This column family is used for temporary
+ * processing of data and then can be recycled. The idea is that
+ * after the load is complete we have all the keys in the system.
+ * Now we force a compaction and examine the single Index file
+ * that is generated to determine how the nodes need to relocate
+ * to be perfectly load balanced.
+ *
+ * param @ rootDirectory - rootDirectory at which the parsing begins.
+ * param @ table - table that will be populated.
+ * param @ cfName - name of the column that will be populated. This is
+ * passed in so that we do not unncessary allocate temporary String objects.
+ */
+ private void preParse(String rootDirectory, String table, String cfName) throws Throwable
+ {
+ BufferedReader bufReader = new BufferedReader(new InputStreamReader(
+ new FileInputStream(rootDirectory)), 16 * 1024 * 1024);
+ String line = null;
+ while ((line = bufReader.readLine()) != null)
+ {
+ String userId = line;
+ RowMutation rm = new RowMutation(table, userId);
+ rm.add(cfName, userId.getBytes(), 0);
+ rm.apply();
+ }
+ }
+
+ void run(String userFile) throws Throwable
+ {
+ String table = DatabaseDescriptor.getTables().get(0);
+ String cfName = Table.recycleBin_ + ":" + "Keys";
+ /* populate just the keys. */
+ preParse(userFile, table, cfName);
+ /* dump the memtables */
+ Table.open(table).flush(false);
+ /* force a compaction of the files. */
+ Table.open(table).forceCompaction(null, null,null);
+
+ /*
+ * This is a hack to let everyone finish. Just sleep for
+ * a couple of minutes.
+ */
+ logger_.info("Taking a nap after forcing a compaction ...");
+ Thread.sleep(PreLoad.siesta_);
+
+ /* Figure out the keys in the index file to relocate the node */
+ List<String> ssTables = Table.open(table).getAllSSTablesOnDisk();
+ /* Load the indexes into memory */
+ for ( String df : ssTables )
+ {
+ SSTable ssTable = new SSTable(df);
+ ssTable.close();
+ }
+ /* We should have only one file since we just compacted. */
+ List<String> indexedKeys = SSTable.getIndexedKeys();
+ storageService_.relocate(indexedKeys.toArray( new String[0]) );
+
+ /*
+ * This is a hack to let everyone relocate and learn about
+ * each other. Just sleep for a couple of minutes.
+ */
+ logger_.info("Taking a nap after relocating ...");
+ Thread.sleep(PreLoad.siesta_);
+
+ /*
+ * Do the cleanup necessary. Delete all commit logs and
+ * the SSTables and reset the load state in the StorageService.
+ */
+ SSTable.delete(ssTables.get(0));
+ storageService_.resetLoadState();
+ logger_.info("Finished all the requisite clean up ...");
+ }
+
+
+ /**
+ * @param args
+ */
+ public static void main(String[] args) throws Throwable
+ {
+ if(args.length != 1)
+ {
+ System.out.println("Usage: PreLoad <Fully qualified path of the file containing user names>");
+ }
+ // TODO Auto-generated method stub
+ LogUtil.init();
+ StorageService s = StorageService.instance();
+ s.start();
+ PreLoad preLoad = new PreLoad(s);
+ preLoad.run(args[0]);
+ }
+
+}
Added: incubator/cassandra/src/org/apache/cassandra/loader/SuperColumnType.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/loader/SuperColumnType.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/loader/SuperColumnType.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/loader/SuperColumnType.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,97 @@
+//
+// This file was generated by the JavaTM Architecture for XML Binding(JAXB) Reference Implementation, vJAXB 2.1.3 in JDK 1.6
+// See <a href="http://java.sun.com/xml/jaxb">http://java.sun.com/xml/jaxb</a>
+// Any modifications to this file will be lost upon recompilation of the source schema.
+// Generated on: 2007.10.19 at 01:36:41 PM PDT
+//
+
+
+package org.apache.cassandra.loader;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlType;
+
+
+/**
+ * <p>Java class for SuperColumnType complex type.
+ *
+ * <p>The following schema fragment specifies the expected content contained within this class.
+ *
+ * <pre>
+ * <complexType name="SuperColumnType">
+ * <complexContent>
+ * <restriction base="{http://www.w3.org/2001/XMLSchema}anyType">
+ * <sequence>
+ * <element name="Fields" type="{}FieldCollection"/>
+ * </sequence>
+ * <attribute name="Tokenize" type="{http://www.w3.org/2001/XMLSchema}boolean" />
+ * </restriction>
+ * </complexContent>
+ * </complexType>
+ * </pre>
+ *
+ *
+ */
+@XmlAccessorType(XmlAccessType.FIELD)
+@XmlType(name = "SuperColumnType", propOrder = {
+ "fields"
+})
+public class SuperColumnType {
+
+ @XmlElement(name = "Fields", required = true)
+ protected FieldCollection fields;
+ @XmlAttribute(name = "Tokenize")
+ protected Boolean tokenize;
+
+ /**
+ * Gets the value of the fields property.
+ *
+ * @return
+ * possible object is
+ * {@link FieldCollection }
+ *
+ */
+ public FieldCollection getFields() {
+ return fields;
+ }
+
+ /**
+ * Sets the value of the fields property.
+ *
+ * @param value
+ * allowed object is
+ * {@link FieldCollection }
+ *
+ */
+ public void setFields(FieldCollection value) {
+ this.fields = value;
+ }
+
+ /**
+ * Gets the value of the tokenize property.
+ *
+ * @return
+ * possible object is
+ * {@link Boolean }
+ *
+ */
+ public Boolean isTokenize() {
+ return tokenize;
+ }
+
+ /**
+ * Sets the value of the tokenize property.
+ *
+ * @param value
+ * allowed object is
+ * {@link Boolean }
+ *
+ */
+ public void setTokenize(Boolean value) {
+ this.tokenize = value;
+ }
+
+}
Added: incubator/cassandra/src/org/apache/cassandra/loader/TimestampType.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/loader/TimestampType.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/loader/TimestampType.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/loader/TimestampType.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,65 @@
+//
+// This file was generated by the JavaTM Architecture for XML Binding(JAXB) Reference Implementation, vJAXB 2.1.3 in JDK 1.6
+// See <a href="http://java.sun.com/xml/jaxb">http://java.sun.com/xml/jaxb</a>
+// Any modifications to this file will be lost upon recompilation of the source schema.
+// Generated on: 2007.10.19 at 01:36:41 PM PDT
+//
+
+
+package org.apache.cassandra.loader;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlType;
+
+
+/**
+ * <p>Java class for TimestampType complex type.
+ *
+ * <p>The following schema fragment specifies the expected content contained within this class.
+ *
+ * <pre>
+ * <complexType name="TimestampType">
+ * <complexContent>
+ * <restriction base="{http://www.w3.org/2001/XMLSchema}anyType">
+ * <attribute name="Field" type="{http://www.w3.org/2001/XMLSchema}int" />
+ * </restriction>
+ * </complexContent>
+ * </complexType>
+ * </pre>
+ *
+ *
+ */
+@XmlAccessorType(XmlAccessType.FIELD)
+@XmlType(name = "TimestampType")
+public class TimestampType {
+
+ @XmlAttribute(name = "Field")
+ protected Integer field;
+
+ /**
+ * Gets the value of the field property.
+ *
+ * @return
+ * possible object is
+ * {@link Integer }
+ *
+ */
+ public Integer getField() {
+ return field;
+ }
+
+ /**
+ * Sets the value of the field property.
+ *
+ * @param value
+ * allowed object is
+ * {@link Integer }
+ *
+ */
+ public void setField(Integer value) {
+ this.field = value;
+ }
+
+}
Added: incubator/cassandra/src/org/apache/cassandra/loader/ValueType.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/loader/ValueType.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/loader/ValueType.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/loader/ValueType.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,65 @@
+//
+// This file was generated by the JavaTM Architecture for XML Binding(JAXB) Reference Implementation, vJAXB 2.1.3 in JDK 1.6
+// See <a href="http://java.sun.com/xml/jaxb">http://java.sun.com/xml/jaxb</a>
+// Any modifications to this file will be lost upon recompilation of the source schema.
+// Generated on: 2007.10.19 at 01:36:41 PM PDT
+//
+
+
+package org.apache.cassandra.loader;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlType;
+
+
+/**
+ * <p>Java class for ValueType complex type.
+ *
+ * <p>The following schema fragment specifies the expected content contained within this class.
+ *
+ * <pre>
+ * <complexType name="ValueType">
+ * <complexContent>
+ * <restriction base="{http://www.w3.org/2001/XMLSchema}anyType">
+ * <attribute name="Field" type="{http://www.w3.org/2001/XMLSchema}int" />
+ * </restriction>
+ * </complexContent>
+ * </complexType>
+ * </pre>
+ *
+ *
+ */
+@XmlAccessorType(XmlAccessType.FIELD)
+@XmlType(name = "ValueType")
+public class ValueType {
+
+ @XmlAttribute(name = "Field")
+ protected Integer field;
+
+ /**
+ * Gets the value of the field property.
+ *
+ * @return
+ * possible object is
+ * {@link Integer }
+ *
+ */
+ public Integer getField() {
+ return field;
+ }
+
+ /**
+ * Sets the value of the field property.
+ *
+ * @param value
+ * allowed object is
+ * {@link Integer }
+ *
+ */
+ public void setField(Integer value) {
+ this.field = value;
+ }
+
+}
Added: incubator/cassandra/src/org/apache/cassandra/locator/AbstractStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/locator/AbstractStrategy.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/locator/AbstractStrategy.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/locator/AbstractStrategy.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,112 @@
+package org.apache.cassandra.locator;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.service.StorageService;
+import org.apache.log4j.Logger;
+
+/**
+ * This class contains a helper method that will be used by
+ * all abstraction that implement the IReplicaPlacementStrategy
+ * interface.
+*/
+public abstract class AbstractStrategy implements IReplicaPlacementStrategy
+{
+ protected static Logger logger_ = Logger.getLogger(AbstractStrategy.class);
+
+ protected TokenMetadata tokenMetadata_;
+
+ AbstractStrategy(TokenMetadata tokenMetadata)
+ {
+ tokenMetadata_ = tokenMetadata;
+ }
+
+ /*
+ * This method changes the ports of the endpoints from
+ * the control port to the storage ports.
+ */
+ protected void retrofitPorts(List<EndPoint> eps)
+ {
+ for ( EndPoint ep : eps )
+ {
+ ep.setPort(DatabaseDescriptor.getStoragePort());
+ }
+ }
+
+ protected EndPoint getNextAvailableEndPoint(EndPoint startPoint, List<EndPoint> topN, List<EndPoint> liveNodes)
+ {
+ EndPoint endPoint = null;
+ Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+ List<BigInteger> tokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
+ Collections.sort(tokens);
+ BigInteger token = tokenMetadata_.getToken(startPoint);
+ int index = Collections.binarySearch(tokens, token);
+ if(index < 0)
+ {
+ index = (index + 1) * (-1);
+ if (index >= tokens.size())
+ index = 0;
+ }
+ int totalNodes = tokens.size();
+ int startIndex = (index+1)%totalNodes;
+ for (int i = startIndex, count = 1; count < totalNodes ; ++count, i = (i+1)%totalNodes)
+ {
+ EndPoint tmpEndPoint = tokenToEndPointMap.get(tokens.get(i));
+ if(FailureDetector.instance().isAlive(tmpEndPoint) && !topN.contains(tmpEndPoint) && !liveNodes.contains(tmpEndPoint))
+ {
+ endPoint = tmpEndPoint;
+ break;
+ }
+ }
+ return endPoint;
+ }
+
+ /*
+ * This method returns the hint map. The key is the endpoint
+ * on which the data is being placed and the value is the
+ * endpoint which is in the top N.
+ * Get the map of top N to the live nodes currently.
+ */
+ public Map<EndPoint, EndPoint> getHintedStorageEndPoints(BigInteger token)
+ {
+ List<EndPoint> liveList = new ArrayList<EndPoint>();
+ Map<EndPoint, EndPoint> map = new HashMap<EndPoint, EndPoint>();
+ EndPoint[] topN = getStorageEndPoints( token );
+
+ for( int i = 0 ; i < topN.length ; i++)
+ {
+ if( FailureDetector.instance().isAlive(topN[i]))
+ {
+ map.put(topN[i], topN[i]);
+ liveList.add(topN[i]) ;
+ }
+ else
+ {
+ EndPoint endPoint = getNextAvailableEndPoint(topN[i], Arrays.asList(topN), liveList);
+ if(endPoint != null)
+ {
+ map.put(endPoint, topN[i]);
+ liveList.add(endPoint) ;
+ }
+ else
+ {
+ // log a warning , maybe throw an exception
+ logger_.warn("Unable to find a live Endpoint we might be out of live nodes , This is dangerous !!!!");
+ }
+ }
+ }
+ return map;
+ }
+
+ public abstract EndPoint[] getStorageEndPoints(BigInteger token);
+
+}
Added: incubator/cassandra/src/org/apache/cassandra/locator/EndPointSnitch.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/locator/EndPointSnitch.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/locator/EndPointSnitch.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/locator/EndPointSnitch.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import java.net.*;
+
+import org.apache.cassandra.net.EndPoint;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class EndPointSnitch implements IEndPointSnitch
+{
+ public boolean isOnSameRack(EndPoint host, EndPoint host2) throws UnknownHostException
+ {
+ /*
+ * Look at the IP Address of the two hosts. Compare
+ * the 3rd octet. If they are the same then the hosts
+ * are in the same rack else different racks.
+ */
+ byte[] ip = getIPAddress(host.getHost());
+ byte[] ip2 = getIPAddress(host2.getHost());
+
+ return ( ip[2] == ip2[2] );
+ }
+
+ public boolean isInSameDataCenter(EndPoint host, EndPoint host2) throws UnknownHostException
+ {
+ /*
+ * Look at the IP Address of the two hosts. Compare
+ * the 2nd octet. If they are the same then the hosts
+ * are in the same datacenter else different datacenter.
+ */
+ byte[] ip = getIPAddress(host.getHost());
+ byte[] ip2 = getIPAddress(host2.getHost());
+
+ return ( ip[1] == ip2[1] );
+ }
+
+ private byte[] getIPAddress(String host) throws UnknownHostException
+ {
+ InetAddress ia = InetAddress.getByName(host);
+ return ia.getAddress();
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/locator/IEndPointSnitch.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/locator/IEndPointSnitch.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/locator/IEndPointSnitch.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/locator/IEndPointSnitch.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import java.net.UnknownHostException;
+
+import org.apache.cassandra.net.EndPoint;
+
+
+/**
+ * This interface helps determine location of node in the data center relative to another node.
+ * Give a node A and another node B it can tell if A and B are on the same rack or in the same
+ * data center.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IEndPointSnitch
+{
+ /**
+ * Helps determine if 2 nodes are in the same rack in the data center.
+ * @param host a specified endpoint
+ * @param host2 another specified endpoint
+ * @return true if on the same rack false otherwise
+ * @throws UnknownHostException
+ */
+ public boolean isOnSameRack(EndPoint host, EndPoint host2) throws UnknownHostException;
+
+ /**
+ * Helps determine if 2 nodes are in the same data center.
+ * @param host a specified endpoint
+ * @param host2 another specified endpoint
+ * @return true if in the same data center false otherwise
+ * @throws UnknownHostException
+ */
+ public boolean isInSameDataCenter(EndPoint host, EndPoint host2) throws UnknownHostException;
+}
Added: incubator/cassandra/src/org/apache/cassandra/locator/IReplicaPlacementStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/locator/IReplicaPlacementStrategy.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/locator/IReplicaPlacementStrategy.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/locator/IReplicaPlacementStrategy.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import java.math.BigInteger;
+import java.util.Map;
+
+import org.apache.cassandra.net.EndPoint;
+
+
+/*
+ * This interface has two implementations. One which
+ * does not respect rack or datacenter awareness and
+ * the other which does.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+public interface IReplicaPlacementStrategy
+{
+ public EndPoint[] getStorageEndPoints(BigInteger token);
+ public Map<String, EndPoint[]> getStorageEndPoints(String[] keys);
+ public EndPoint[] getStorageEndPoints(BigInteger token, Map<BigInteger, EndPoint> tokenToEndPointMap);
+ public Map<EndPoint, EndPoint> getHintedStorageEndPoints(BigInteger token);
+}