You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pm...@apache.org on 2009/03/02 08:57:31 UTC
svn commit: r749218 [27/34] - in /incubator/cassandra: branches/ dist/
nightly/ site/ tags/ trunk/ trunk/lib/ trunk/src/ trunk/src/org/
trunk/src/org/apache/ trunk/src/org/apache/cassandra/
trunk/src/org/apache/cassandra/analytics/ trunk/src/org/apache...
Added: incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraException.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraException.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraException.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraException.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,119 @@
+/**
+ * Autogenerated by Thrift
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ */
+package org.apache.cassandra.service;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.HashSet;
+import com.facebook.thrift.*;
+
+import com.facebook.thrift.protocol.*;
+import com.facebook.thrift.transport.*;
+
+public class CassandraException extends Exception implements TBase, java.io.Serializable {
+ public String error;
+
+ public final Isset __isset = new Isset();
+ public static final class Isset implements java.io.Serializable {
+ public boolean error = false;
+ }
+
+ public CassandraException() {
+ }
+
+ public CassandraException(
+ String error)
+ {
+ this();
+ this.error = error;
+ this.__isset.error = true;
+ }
+
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof CassandraException)
+ return this.equals((CassandraException)that);
+ return false;
+ }
+
+ public boolean equals(CassandraException that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_error = true && (this.error != null);
+ boolean that_present_error = true && (that.error != null);
+ if (this_present_error || that_present_error) {
+ if (!(this_present_error && that_present_error))
+ return false;
+ if (!this.error.equals(that.error))
+ return false;
+ }
+
+ return true;
+ }
+
+ public int hashCode() {
+ return 0;
+ }
+
+ public void read(TProtocol iprot) throws TException {
+ TField field;
+ iprot.readStructBegin();
+ while (true)
+ {
+ field = iprot.readFieldBegin();
+ if (field.type == TType.STOP) {
+ break;
+ }
+ switch (field.id)
+ {
+ case 1:
+ if (field.type == TType.STRING) {
+ this.error = iprot.readString();
+ this.__isset.error = true;
+ } else {
+ TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ default:
+ TProtocolUtil.skip(iprot, field.type);
+ break;
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+ }
+
+ public void write(TProtocol oprot) throws TException {
+ TStruct struct = new TStruct("CassandraException");
+ oprot.writeStructBegin(struct);
+ TField field = new TField();
+ if (this.error != null) {
+ field.name = "error";
+ field.type = TType.STRING;
+ field.id = 1;
+ oprot.writeFieldBegin(field);
+ oprot.writeString(this.error);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder("CassandraException(");
+ sb.append("error:");
+ sb.append(this.error);
+ sb.append(")");
+ return sb.toString();
+ }
+
+}
+
Added: incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,1072 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import com.facebook.thrift.*;
+import com.facebook.thrift.server.*;
+import com.facebook.thrift.server.TThreadPoolServer.Options;
+import com.facebook.thrift.transport.*;
+import com.facebook.thrift.protocol.*;
+import com.facebook.fb303.FacebookBase;
+import com.facebook.fb303.fb_status;
+import java.io.*;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.net.*;
+import org.apache.cassandra.utils.*;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql.common.CqlResult;
+import org.apache.cassandra.cql.driver.CqlDriver;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.RowMutationMessage;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class CassandraServer extends FacebookBase implements
+ Cassandra.Iface
+{
+
+ private static Logger logger_ = Logger.getLogger(CassandraServer.class);
+ /*
+ * Handle to the storage service to interact with the other machines in the
+ * cluster.
+ */
+ protected StorageService storageService;
+
+ protected CassandraServer(String name)
+ {
+ super(name);
+ // Create the instance of the storage service
+ storageService = StorageService.instance();
+ }
+
+ public CassandraServer() throws Throwable
+ {
+ super("CassandraServer");
+ // Create the instance of the storage service
+ storageService = StorageService.instance();
+ }
+
+ /*
+ * The start function initializes the server and start's listening on the
+ * specified port.
+ */
+ public void start() throws Throwable
+ {
+ LogUtil.init();
+ //LogUtil.setLogLevel("com.facebook", "DEBUG");
+ // Start the storage service
+ storageService.start();
+ }
+
+ private void validateTable(String table) throws CassandraException
+ {
+ if ( !DatabaseDescriptor.getTables().contains(table) )
+ {
+ throw new CassandraException("Table " + table + " does not exist in this schema.");
+ }
+ }
+
+ protected ColumnFamily get_cf(String tablename, String key, String columnFamily, List<String> columNames) throws CassandraException, TException
+ {
+ ColumnFamily cfamily = null;
+ try
+ {
+ validateTable(tablename);
+ String[] values = RowMutation.getColumnAndColumnFamily(columnFamily);
+ // check for values
+ if( values.length < 1 )
+ {
+ throw new CassandraException("Column Family " + columnFamily + " is invalid.");
+ }
+ Row row = StorageProxy.readProtocol(tablename, key, columnFamily, columNames, StorageService.ConsistencyLevel.WEAK);
+ if (row == null)
+ {
+ throw new CassandraException("No row exists for key " + key);
+ }
+ Map<String, ColumnFamily> cfMap = row.getColumnFamilies();
+ if (cfMap == null || cfMap.size() == 0)
+ {
+ logger_ .info("ERROR ColumnFamily " + columnFamily + " map is missing.....: " + " key:" + key );
+ throw new CassandraException("Either the key " + key + " is not present or the columns requested are not present.");
+ }
+ cfamily = cfMap.get(values[0]);
+ if (cfamily == null)
+ {
+ logger_.info("ERROR ColumnFamily " + columnFamily + " is missing.....: " + " key:" + key + " ColumnFamily:" + values[0]);
+ throw new CassandraException("Either the key " + key + " is not present or the column family " + values[0] + " is not present.");
+ }
+ }
+ catch (Throwable ex)
+ {
+ String exception = LogUtil.throwableToString(ex);
+ logger_.info( exception );
+ throw new CassandraException(exception);
+ }
+ return cfamily;
+ }
+
+ public ArrayList<column_t> get_columns_since(String tablename, String key, String columnFamily_column, long timeStamp) throws CassandraException,TException
+ {
+ ArrayList<column_t> retlist = new ArrayList<column_t>();
+ long startTime = System.currentTimeMillis();
+ try
+ {
+ validateTable(tablename);
+ String[] values = RowMutation.getColumnAndColumnFamily(columnFamily_column);
+ // check for values
+ if( values.length < 1 )
+ {
+ throw new CassandraException("Column Family " + columnFamily_column + " is invalid.");
+ }
+ Row row = StorageProxy.readProtocol(tablename, key, columnFamily_column, timeStamp, StorageService.ConsistencyLevel.WEAK);
+ if (row == null)
+ {
+ logger_.info("ERROR No row for this key .....: " + key);
+ throw new CassandraException("ERROR No row for this key .....: " + key);
+ }
+
+ Map<String, ColumnFamily> cfMap = row.getColumnFamilies();
+ if (cfMap == null || cfMap.size() == 0)
+ {
+ logger_ .info("ERROR ColumnFamily " + columnFamily_column + " map is missing.....: " + " key:" + key);
+ throw new CassandraException("Either the key " + key + " is not present or the columns requested are not present.");
+ }
+ ColumnFamily cfamily = cfMap.get(values[0]);
+ if (cfamily == null)
+ {
+ logger_.info("ERROR ColumnFamily " + columnFamily_column + " is missing.....: "+" key:" + key + " ColumnFamily:" + values[0]);
+ throw new CassandraException("Either the key " + key + " is not present or the columns requested" + columnFamily_column + "are not present.");
+ }
+ Collection<IColumn> columns = null;
+ if( values.length > 1 )
+ {
+ // this is the super column case
+ IColumn column = cfamily.getColumn(values[1]);
+ if(column != null)
+ columns = column.getSubColumns();
+ }
+ else
+ {
+ columns = cfamily.getAllColumns();
+ }
+ if (columns == null || columns.size() == 0)
+ {
+ logger_ .info("ERROR Columns are missing.....: " + " key:" + key + " ColumnFamily:" + values[0]);
+ throw new CassandraException("ERROR Columns are missing.....: " + " key:" + key + " ColumnFamily:" + values[0]);
+ }
+ for(IColumn column : columns)
+ {
+ column_t thrift_column = new column_t();
+ thrift_column.columnName = column.name();
+ thrift_column.value = new String(column.value()); // This needs to be Utf8ed
+ thrift_column.timestamp = column.timestamp();
+ retlist.add(thrift_column);
+ }
+ }
+ catch (Exception ex)
+ {
+ String exception = LogUtil.throwableToString(ex);
+ logger_.info( exception );
+ throw new CassandraException(exception);
+ }
+ logger_.debug("get_slice2: " + (System.currentTimeMillis() - startTime) + " ms.");
+ return retlist;
+ }
+
+
+ public List<column_t> get_slice_by_names(String tablename, String key, String columnFamily, List<String> columnNames) throws CassandraException, TException
+ {
+ ArrayList<column_t> retlist = new ArrayList<column_t>();
+ long startTime = System.currentTimeMillis();
+ try
+ {
+ validateTable(tablename);
+ ColumnFamily cfamily = get_cf(tablename, key, columnFamily, columnNames);
+ if (cfamily == null)
+ {
+ logger_.info("ERROR ColumnFamily " + columnFamily + " is missing.....: "
+ +" key:" + key
+ + " ColumnFamily:" + columnFamily);
+ throw new CassandraException("Either the key " + key + " is not present or the columnFamily requested" + columnFamily + "is not present.");
+ }
+ Collection<IColumn> columns = null;
+ columns = cfamily.getAllColumns();
+ if (columns == null || columns.size() == 0)
+ {
+ logger_ .info("ERROR Columns are missing.....: "
+ + " key:" + key
+ + " ColumnFamily:" + columnFamily);
+ throw new CassandraException("ERROR Columns are missing.....: " + " key:" + key + " ColumnFamily:" + columnFamily);
+ }
+
+ for(IColumn column : columns)
+ {
+ column_t thrift_column = new column_t();
+ thrift_column.columnName = column.name();
+ thrift_column.value = new String(column.value()); // This needs to be Utf8ed
+ thrift_column.timestamp = column.timestamp();
+ retlist.add(thrift_column);
+ }
+ }
+ catch (Exception ex)
+ {
+ String exception = LogUtil.throwableToString(ex);
+ logger_.info( exception );
+ throw new CassandraException(exception);
+ }
+
+ logger_.debug("get_slice2: " + (System.currentTimeMillis() - startTime)
+ + " ms.");
+ return retlist;
+ }
+
+ public ArrayList<column_t> get_slice(String tablename, String key, String columnFamily_column, int start, int count) throws CassandraException,TException
+ {
+ ArrayList<column_t> retlist = new ArrayList<column_t>();
+ long startTime = System.currentTimeMillis();
+ try
+ {
+ validateTable(tablename);
+ String[] values = RowMutation.getColumnAndColumnFamily(columnFamily_column);
+ // check for values
+ if( values.length < 1 )
+ {
+ throw new CassandraException("Column Family " + columnFamily_column + " is invalid.");
+ }
+ Row row = StorageProxy.readProtocol(tablename, key, columnFamily_column, start, count, StorageService.ConsistencyLevel.WEAK);
+ if (row == null)
+ {
+ logger_.info("ERROR No row for this key .....: " + key);
+ throw new CassandraException("ERROR No row for this key .....: " + key);
+ }
+
+ Map<String, ColumnFamily> cfMap = row.getColumnFamilies();
+ if (cfMap == null || cfMap.size() == 0)
+ {
+ logger_ .info("ERROR ColumnFamily " + columnFamily_column + " map is missing.....: " + " key:" + key);
+ throw new CassandraException("Either the key " + key + " is not present or the columns requested are not present.");
+ }
+ ColumnFamily cfamily = cfMap.get(values[0]);
+ if (cfamily == null)
+ {
+ logger_.info("ERROR ColumnFamily " + columnFamily_column + " is missing.....: " + " key:" + key + " ColumnFamily:" + values[0]);
+ throw new CassandraException("Either the key " + key + " is not present or the columns requested" + columnFamily_column + "are not present.");
+ }
+ Collection<IColumn> columns = null;
+ if( values.length > 1 )
+ {
+ // this is the super column case
+ IColumn column = cfamily.getColumn(values[1]);
+ if(column != null)
+ columns = column.getSubColumns();
+ }
+ else
+ {
+ columns = cfamily.getAllColumns();
+ }
+ if (columns == null || columns.size() == 0)
+ {
+ logger_ .info("ERROR Columns are missing.....: " + " key:" + key + " ColumnFamily:" + values[0]);
+ throw new CassandraException("ERROR Columns are missing.....: " + " key:" + key + " ColumnFamily:" + values[0]);
+ }
+ for(IColumn column : columns)
+ {
+ column_t thrift_column = new column_t();
+ thrift_column.columnName = column.name();
+ thrift_column.value = new String(column.value()); // This needs to be Utf8ed
+ thrift_column.timestamp = column.timestamp();
+ retlist.add(thrift_column);
+ }
+ }
+ catch (Exception ex)
+ {
+ String exception = LogUtil.throwableToString(ex);
+ logger_.info( exception );
+ throw new CassandraException(exception);
+ }
+ logger_.debug("get_slice2: " + (System.currentTimeMillis() - startTime)
+ + " ms.");
+ return retlist;
+ }
+
+ public column_t get_column(String tablename, String key, String columnFamily_column) throws CassandraException,TException
+ {
+ column_t ret = null;
+ try
+ {
+ validateTable(tablename);
+ String[] values = RowMutation.getColumnAndColumnFamily(columnFamily_column);
+ // check for values
+ if( values.length < 2 )
+ {
+ throw new CassandraException("Column Family " + columnFamily_column + " is invalid.");
+ }
+ Row row = StorageProxy.readProtocol(tablename, key, columnFamily_column, -1, Integer.MAX_VALUE, StorageService.ConsistencyLevel.WEAK);
+ if (row == null)
+ {
+ logger_.info("ERROR No row for this key .....: " + key);
+ throw new CassandraException("ERROR No row for this key .....: " + key);
+ }
+
+ Map<String, ColumnFamily> cfMap = row.getColumnFamilies();
+ if (cfMap == null || cfMap.size() == 0)
+ {
+ logger_ .info("ERROR ColumnFamily map is missing.....: "
+ + " key:" + key
+ );
+ throw new CassandraException("Either the key " + key + " is not present or the columns requested are not present.");
+ }
+ ColumnFamily cfamily = cfMap.get(values[0]);
+ if (cfamily == null)
+ {
+ logger_.info("ERROR ColumnFamily is missing.....: "
+ +" key:" + key
+ + " ColumnFamily:" + values[0]);
+ throw new CassandraException("Either the key " + key + " is not present or the columns requested" + columnFamily_column + "are not present.");
+ }
+ Collection<IColumn> columns = null;
+ if( values.length > 2 )
+ {
+ // this is the super column case
+ IColumn column = cfamily.getColumn(values[1]);
+ if(column != null)
+ columns = column.getSubColumns();
+ }
+ else
+ {
+ columns = cfamily.getAllColumns();
+ }
+ if (columns == null || columns.size() == 0)
+ {
+ logger_ .info("ERROR Columns are missing.....: "
+ + " key:" + key
+ + " ColumnFamily:" + values[0]);
+ throw new CassandraException("ERROR Columns are missing.....: " + " key:" + key + " ColumnFamily:" + values[0]);
+ }
+ ret = new column_t();
+ for(IColumn column : columns)
+ {
+ ret.columnName = column.name();
+ ret.value = new String(column.value());
+ ret.timestamp = column.timestamp();
+ }
+ }
+ catch (Exception ex)
+ {
+ String exception = LogUtil.throwableToString(ex);
+ logger_.info( exception );
+ throw new CassandraException(exception);
+ }
+ return ret;
+ }
+
+
+ public int get_column_count(String tablename, String key, String columnFamily_column) throws CassandraException
+ {
+ int count = -1;
+ try
+ {
+ validateTable(tablename);
+ String[] values = RowMutation.getColumnAndColumnFamily(columnFamily_column);
+ // check for values
+ if( values.length < 1 )
+ {
+ throw new CassandraException("Column Family " + columnFamily_column + " is invalid.");
+ }
+ Row row = StorageProxy.readProtocol(tablename, key, columnFamily_column, -1, Integer.MAX_VALUE, StorageService.ConsistencyLevel.WEAK);
+ if (row == null)
+ {
+ logger_.info("ERROR No row for this key .....: " + key);
+ throw new CassandraException("ERROR No row for this key .....: " + key);
+ }
+
+ Map<String, ColumnFamily> cfMap = row.getColumnFamilies();
+ if (cfMap == null || cfMap.size() == 0)
+ {
+ logger_ .info("ERROR ColumnFamily map is missing.....: "
+ + " key:" + key
+ );
+ throw new CassandraException("Either the key " + key + " is not present or the columns requested are not present.");
+ }
+ ColumnFamily cfamily = cfMap.get(values[0]);
+ if (cfamily == null)
+ {
+ logger_.info("ERROR ColumnFamily is missing.....: "
+ +" key:" + key
+ + " ColumnFamily:" + values[0]);
+ throw new CassandraException("Either the key " + key + " is not present or the columns requested" + columnFamily_column + "are not present.");
+ }
+ Collection<IColumn> columns = null;
+ if( values.length > 1 )
+ {
+ // this is the super column case
+ IColumn column = cfamily.getColumn(values[1]);
+ if(column != null)
+ columns = column.getSubColumns();
+ }
+ else
+ {
+ columns = cfamily.getAllColumns();
+ }
+ if (columns == null || columns.size() == 0)
+ {
+ logger_ .info("ERROR Columns are missing.....: "
+ + " key:" + key
+ + " ColumnFamily:" + values[0]);
+ throw new CassandraException("ERROR Columns are missing.....: " + " key:" + key + " ColumnFamily:" + values[0]);
+ }
+ count = columns.size();
+ }
+ catch (Exception ex)
+ {
+ String exception = LogUtil.throwableToString(ex);
+ logger_.info( exception );
+ throw new CassandraException(exception);
+ }
+ return count;
+ }
+
+ public void insert(String tablename, String key, String columnFamily_column, String cellData, long timestamp)
+ {
+ try
+ {
+ validateTable(tablename);
+ RowMutation rm = new RowMutation(tablename, key.trim());
+ rm.add(columnFamily_column, cellData.getBytes(), timestamp);
+ StorageProxy.insert(rm);
+ }
+ catch (Exception e)
+ {
+ logger_.debug( LogUtil.throwableToString(e) );
+ }
+ return;
+ }
+
+ public boolean batch_insert_blocking(batch_mutation_t batchMutation)
+ {
+ // 1. Get the N nodes from storage service where the data needs to be
+ // replicated
+ // 2. Construct a message for read\write
+ // 3. SendRR ( to all the nodes above )
+ // 4. Wait for a response from atleast X nodes where X <= N
+ // 5. return success
+ boolean result = false;
+ try
+ {
+ logger_.warn(" batch_insert_blocking");
+ validateTable(batchMutation.table);
+ IResponseResolver<Boolean> writeResponseResolver = new WriteResponseResolver();
+ QuorumResponseHandler<Boolean> quorumResponseHandler = new QuorumResponseHandler<Boolean>(
+ DatabaseDescriptor.getReplicationFactor(),
+ writeResponseResolver);
+ EndPoint[] endpoints = storageService.getNStorageEndPoint(batchMutation.key);
+ // TODO: throw a thrift exception if we do not have N nodes
+
+ logger_.debug(" Creating the row mutation");
+ RowMutation rm = new RowMutation(batchMutation.table,
+ batchMutation.key.trim());
+ Set keys = batchMutation.cfmap.keySet();
+ Iterator keyIter = keys.iterator();
+ while (keyIter.hasNext())
+ {
+ Object key = keyIter.next(); // Get the next key.
+ List<column_t> list = batchMutation.cfmap.get(key);
+ for (column_t columnData : list)
+ {
+ rm.add(key.toString() + ":" + columnData.columnName,
+ columnData.value.getBytes(), columnData.timestamp);
+
+ }
+ }
+
+ RowMutationMessage rmMsg = new RowMutationMessage(rm);
+ Message message = new Message(StorageService.getLocalStorageEndPoint(),
+ StorageService.mutationStage_,
+ StorageService.mutationVerbHandler_,
+ new Object[]{ rmMsg }
+ );
+ MessagingService.getMessagingInstance().sendRR(message, endpoints,
+ quorumResponseHandler);
+ logger_.debug(" Calling quorum response handler's get");
+ result = quorumResponseHandler.get();
+
+ // TODO: if the result is false that means the writes to all the
+ // servers failed hence we need to throw an exception or return an
+ // error back to the client so that it can take appropriate action.
+ }
+ catch (Exception e)
+ {
+ logger_.info( LogUtil.throwableToString(e) );
+ }
+ return result;
+
+ }
+ public void batch_insert(batch_mutation_t batchMutation)
+ {
+ // 1. Get the N nodes from storage service where the data needs to be
+ // replicated
+ // 2. Construct a message for read\write
+ // 3. SendRR ( to all the nodes above )
+ // 4. Wait for a response from atleast X nodes where X <= N
+ // 5. return success
+
+ try
+ {
+ logger_.debug(" batch_insert");
+ logger_.debug(" Creating the row mutation");
+ validateTable(batchMutation.table);
+ RowMutation rm = new RowMutation(batchMutation.table,
+ batchMutation.key.trim());
+ if(batchMutation.cfmap != null)
+ {
+ Set keys = batchMutation.cfmap.keySet();
+ Iterator keyIter = keys.iterator();
+ while (keyIter.hasNext())
+ {
+ Object key = keyIter.next(); // Get the next key.
+ List<column_t> list = batchMutation.cfmap.get(key);
+ for (column_t columnData : list)
+ {
+ rm.add(key.toString() + ":" + columnData.columnName,
+ columnData.value.getBytes(), columnData.timestamp);
+
+ }
+ }
+ }
+ if(batchMutation.cfmapdel != null)
+ {
+ Set keys = batchMutation.cfmapdel.keySet();
+ Iterator keyIter = keys.iterator();
+ while (keyIter.hasNext())
+ {
+ Object key = keyIter.next(); // Get the next key.
+ List<column_t> list = batchMutation.cfmapdel.get(key);
+ for (column_t columnData : list)
+ {
+ rm.delete(key.toString() + ":" + columnData.columnName);
+ }
+ }
+ }
+ StorageProxy.insert(rm);
+ }
+ catch (Exception e)
+ {
+ logger_.info( LogUtil.throwableToString(e) );
+ }
+ return;
+ }
+
+ public void remove(String tablename, String key, String columnFamily_column)
+ {
+ try
+ {
+ validateTable(tablename);
+ RowMutation rm = new RowMutation(tablename, key.trim());
+ rm.delete(columnFamily_column);
+ StorageProxy.insert(rm);
+ }
+ catch (Exception e)
+ {
+ logger_.debug( LogUtil.throwableToString(e) );
+ }
+ return;
+ }
+
+ public List<superColumn_t> get_slice_super_by_names(String tablename, String key, String columnFamily, List<String> superColumnNames) throws CassandraException, TException
+ {
+ ArrayList<superColumn_t> retlist = new ArrayList<superColumn_t>();
+ long startTime = System.currentTimeMillis();
+
+ try
+ {
+ validateTable(tablename);
+ ColumnFamily cfamily = get_cf(tablename, key, columnFamily, superColumnNames);
+ if (cfamily == null)
+ {
+ logger_.info("ERROR ColumnFamily " + columnFamily + " is missing.....: "+" key:" + key
+ + " ColumnFamily:" + columnFamily);
+ throw new CassandraException("Either the key " + key + " is not present or the column family requested" + columnFamily + "is not present.");
+ }
+ Collection<IColumn> columns = null;
+ columns = cfamily.getAllColumns();
+ if (columns == null || columns.size() == 0)
+ {
+ logger_ .info("ERROR Columns are missing.....: " + " key:" + key + " ColumnFamily:" + columnFamily);
+ throw new CassandraException("ERROR Columns are missing.....: " + " key:" + key + " ColumnFamily:" + columnFamily);
+ }
+
+ for(IColumn column : columns)
+ {
+ superColumn_t thrift_superColumn = new superColumn_t();
+ thrift_superColumn.name = column.name();
+ Collection<IColumn> subColumns = column.getSubColumns();
+ if(subColumns.size() != 0 )
+ {
+ thrift_superColumn.columns = new ArrayList<column_t>();
+ for( IColumn subColumn : subColumns )
+ {
+ column_t thrift_column = new column_t();
+ thrift_column.columnName = subColumn.name();
+ thrift_column.value = new String(subColumn.value());
+ thrift_column.timestamp = subColumn.timestamp();
+ thrift_superColumn.columns.add(thrift_column);
+ }
+ }
+ retlist.add(thrift_superColumn);
+ }
+ }
+ catch (Exception ex)
+ {
+ String exception = LogUtil.throwableToString(ex);
+ logger_.info( exception );
+ throw new CassandraException(exception);
+ }
+ logger_.debug("get_slice2: " + (System.currentTimeMillis() - startTime)
+ + " ms.");
+ return retlist;
+ }
+
+
+ public ArrayList<superColumn_t> get_slice_super(String tablename, String key, String columnFamily_superColumnName, int start, int count) throws CassandraException
+ {
+ ArrayList<superColumn_t> retlist = new ArrayList<superColumn_t>();
+ try
+ {
+ validateTable(tablename);
+ String[] values = RowMutation.getColumnAndColumnFamily(columnFamily_superColumnName);
+ // check for values
+ if( values.length < 1 )
+ {
+ throw new CassandraException("Column Family " + columnFamily_superColumnName + " is invalid.");
+ }
+ Row row = StorageProxy.readProtocol(tablename, key, columnFamily_superColumnName, start, count, StorageService.ConsistencyLevel.WEAK);
+ if (row == null)
+ {
+ logger_.info("ERROR No row for this key .....: " + key);
+ throw new CassandraException("ERROR No row for this key .....: " + key);
+ }
+
+ Map<String, ColumnFamily> cfMap = row.getColumnFamilies();
+ if (cfMap == null || cfMap.size() == 0)
+ {
+ logger_ .info("ERROR ColumnFamily map is missing.....: "
+ + " key:" + key
+ );
+ throw new CassandraException("Either the key " + key + " is not present or the columns requested are not present.");
+ }
+ ColumnFamily cfamily = cfMap.get(values[0]);
+ if (cfamily == null)
+ {
+ logger_.info("ERROR ColumnFamily is missing.....: "
+ +" key:" + key
+ + " ColumnFamily:" + values[0]);
+ throw new CassandraException("Either the key " + key + " is not present or the columns requested" + columnFamily_superColumnName + "are not present.");
+ }
+ Collection<IColumn> columns = cfamily.getAllColumns();
+ if (columns == null || columns.size() == 0)
+ {
+ logger_ .info("ERROR Columns are missing.....: "
+ + " key:" + key
+ + " ColumnFamily:" + values[0]);
+ throw new CassandraException("ERROR Columns are missing.....: " + " key:" + key + " ColumnFamily:" + values[0]);
+ }
+
+ for(IColumn column : columns)
+ {
+ superColumn_t thrift_superColumn = new superColumn_t();
+ thrift_superColumn.name = column.name();
+ Collection<IColumn> subColumns = column.getSubColumns();
+ if(subColumns.size() != 0 )
+ {
+ thrift_superColumn.columns = new ArrayList<column_t>();
+ for( IColumn subColumn : subColumns )
+ {
+ column_t thrift_column = new column_t();
+ thrift_column.columnName = subColumn.name();
+ thrift_column.value = new String(subColumn.value());
+ thrift_column.timestamp = subColumn.timestamp();
+ thrift_superColumn.columns.add(thrift_column);
+ }
+ }
+ retlist.add(thrift_superColumn);
+ }
+ }
+ catch (Exception ex)
+ {
+ String exception = LogUtil.throwableToString(ex);
+ logger_.info( exception );
+ throw new CassandraException(exception);
+ }
+ return retlist;
+
+ }
+
+ public superColumn_t get_superColumn(String tablename, String key, String columnFamily_column) throws CassandraException
+ {
+ superColumn_t ret = null;
+ try
+ {
+ validateTable(tablename);
+ String[] values = RowMutation.getColumnAndColumnFamily(columnFamily_column);
+ // check for values
+ if( values.length < 2 )
+ {
+ throw new CassandraException("Column Family " + columnFamily_column + " is invalid.");
+ }
+
+ Row row = StorageProxy.readProtocol(tablename, key, columnFamily_column, -1, Integer.MAX_VALUE, StorageService.ConsistencyLevel.WEAK);
+ if (row == null)
+ {
+ logger_.info("ERROR No row for this key .....: " + key);
+ throw new CassandraException("ERROR No row for this key .....: " + key);
+ }
+
+ Map<String, ColumnFamily> cfMap = row.getColumnFamilies();
+ if (cfMap == null || cfMap.size() == 0)
+ {
+ logger_ .info("ERROR ColumnFamily map is missing.....: "
+ + " key:" + key
+ );
+ throw new CassandraException("Either the key " + key + " is not present or the columns requested are not present.");
+ }
+ ColumnFamily cfamily = cfMap.get(values[0]);
+ if (cfamily == null)
+ {
+ logger_.info("ERROR ColumnFamily is missing.....: "
+ +" key:" + key
+ + " ColumnFamily:" + values[0]);
+ throw new CassandraException("Either the key " + key + " is not present or the columns requested" + columnFamily_column + "are not present.");
+ }
+ Collection<IColumn> columns = cfamily.getAllColumns();
+ if (columns == null || columns.size() == 0)
+ {
+ logger_ .info("ERROR Columns are missing.....: "
+ + " key:" + key
+ + " ColumnFamily:" + values[0]);
+ throw new CassandraException("ERROR Columns are missing.....: " + " key:" + key + " ColumnFamily:" + values[0]);
+ }
+
+ for(IColumn column : columns)
+ {
+ ret = new superColumn_t();
+ ret.name = column.name();
+ Collection<IColumn> subColumns = column.getSubColumns();
+ if(subColumns.size() != 0 )
+ {
+ ret.columns = new ArrayList<column_t>();
+ for(IColumn subColumn : subColumns)
+ {
+ column_t thrift_column = new column_t();
+ thrift_column.columnName = subColumn.name();
+ thrift_column.value = new String(subColumn.value());
+ thrift_column.timestamp = subColumn.timestamp();
+ ret.columns.add(thrift_column);
+ }
+ }
+ }
+ }
+ catch (Exception ex)
+ {
+ String exception = LogUtil.throwableToString(ex);
+ logger_.info( exception );
+ throw new CassandraException(exception);
+ }
+ return ret;
+
+ }
+
+ public boolean batch_insert_superColumn_blocking(batch_mutation_super_t batchMutationSuper)
+ {
+ boolean result = false;
+ try
+ {
+ logger_.warn(" batch_insert_SuperColumn_blocking");
+ logger_.debug(" Creating the row mutation");
+ validateTable(batchMutationSuper.table);
+ RowMutation rm = new RowMutation(batchMutationSuper.table,
+ batchMutationSuper.key.trim());
+ Set keys = batchMutationSuper.cfmap.keySet();
+ Iterator keyIter = keys.iterator();
+ while (keyIter.hasNext())
+ {
+ Object key = keyIter.next(); // Get the next key.
+ List<superColumn_t> list = batchMutationSuper.cfmap.get(key);
+ for (superColumn_t superColumnData : list)
+ {
+ if(superColumnData.columns.size() != 0 )
+ {
+ for (column_t columnData : superColumnData.columns)
+ {
+ rm.add(key.toString() + ":" + superColumnData.name +":" + columnData.columnName,
+ columnData.value.getBytes(), columnData.timestamp);
+ }
+ }
+ else
+ {
+ rm.add(key.toString() + ":" + superColumnData.name, new byte[0], 0);
+ }
+ }
+ }
+ StorageProxy.insert(rm);
+ }
+ catch (Exception e)
+ {
+ logger_.info( LogUtil.throwableToString(e) );
+ }
+ return result;
+
+ }
+ public void batch_insert_superColumn(batch_mutation_super_t batchMutationSuper)
+ {
+ try
+ {
+ logger_.debug(" batch_insert");
+ logger_.debug(" Creating the row mutation");
+ validateTable(batchMutationSuper.table);
+ RowMutation rm = new RowMutation(batchMutationSuper.table,
+ batchMutationSuper.key.trim());
+ if(batchMutationSuper.cfmap != null)
+ {
+ Set keys = batchMutationSuper.cfmap.keySet();
+ Iterator keyIter = keys.iterator();
+ while (keyIter.hasNext())
+ {
+ Object key = keyIter.next(); // Get the next key.
+ List<superColumn_t> list = batchMutationSuper.cfmap.get(key);
+ for (superColumn_t superColumnData : list)
+ {
+ if(superColumnData.columns.size() != 0 )
+ {
+ for (column_t columnData : superColumnData.columns)
+ {
+ rm.add(key.toString() + ":" + superColumnData.name +":" + columnData.columnName,
+ columnData.value.getBytes(), columnData.timestamp);
+ }
+ }
+ else
+ {
+ rm.add(key.toString() + ":" + superColumnData.name, new byte[0], 0);
+ }
+ }
+ }
+ }
+ if(batchMutationSuper.cfmapdel != null)
+ {
+ Set keys = batchMutationSuper.cfmapdel.keySet();
+ Iterator keyIter = keys.iterator();
+ while (keyIter.hasNext())
+ {
+ Object key = keyIter.next(); // Get the next key.
+ List<superColumn_t> list = batchMutationSuper.cfmapdel.get(key);
+ for (superColumn_t superColumnData : list)
+ {
+ if(superColumnData.columns.size() != 0 )
+ {
+ for (column_t columnData : superColumnData.columns)
+ {
+ rm.delete(key.toString() + ":" + superColumnData.name +":" + columnData.columnName);
+ }
+ }
+ else
+ {
+ rm.delete(key.toString() + ":" + superColumnData.name);
+ }
+ }
+ }
+ }
+ StorageProxy.insert(rm);
+ }
+ catch (Exception e)
+ {
+ logger_.info( LogUtil.throwableToString(e) );
+ }
+ return;
+ }
+
+ public String getStringProperty(String propertyName) throws TException
+ {
+ if (propertyName.equals("cluster name"))
+ {
+ return DatabaseDescriptor.getClusterName();
+ }
+ else if (propertyName.equals("config file"))
+ {
+ String filename = DatabaseDescriptor.getConfigFileName();
+ try
+ {
+ StringBuffer fileData = new StringBuffer(8192);
+ BufferedInputStream stream = new BufferedInputStream(new FileInputStream(filename));
+ byte[] buf = new byte[1024];
+ int numRead;
+ while( (numRead = stream.read(buf)) != -1)
+ {
+ String str = new String(buf, 0, numRead);
+ fileData.append(str);
+ }
+ stream.close();
+ return fileData.toString();
+ }
+ catch (IOException e)
+ {
+ return "file not found!";
+ }
+ }
+ else if (propertyName.equals("version"))
+ {
+ return getVersion();
+ }
+ else
+ {
+ return "?";
+ }
+ }
+
+ public List<String> getStringListProperty(String propertyName) throws TException
+ {
+ if (propertyName.equals("tables"))
+ {
+ return DatabaseDescriptor.getTables();
+ }
+ else
+ {
+ return new ArrayList<String>();
+ }
+ }
+
+ public String describeTable(String tableName) throws TException
+ {
+ String desc = "";
+ Map<String, CFMetaData> tableMetaData = DatabaseDescriptor.getTableMetaData(tableName);
+
+ if (tableMetaData == null)
+ {
+ return "Table " + tableName + " not found.";
+ }
+
+ Iterator iter = tableMetaData.entrySet().iterator();
+ while (iter.hasNext())
+ {
+ Map.Entry<String, CFMetaData> pairs = (Map.Entry<String, CFMetaData>)iter.next();
+ desc = desc + pairs.getValue().pretty() + "-----\n";
+ }
+ return desc;
+ }
+
+ public CqlResult_t executeQuery(String query) throws TException
+ {
+ CqlResult_t result = new CqlResult_t();
+
+ CqlResult cqlResult = CqlDriver.executeQuery(query);
+
+ // convert CQL result type to Thrift specific return type
+ if (cqlResult != null)
+ {
+ result.errorTxt = cqlResult.errorTxt;
+ result.resultSet = cqlResult.resultSet;
+ result.errorCode = cqlResult.errorCode;
+ }
+ return result;
+ }
+
+ /*
+ * This method is used to ensure that all keys
+ * prior to the specified key, as dtermined by
+ * the SSTable index bucket it falls in, are in
+ * buffer cache.
+ */
+ public void touch (String key , boolean fData)
+ {
+ try
+ {
+ StorageProxy.touchProtocol(DatabaseDescriptor.getTables().get(0), key, fData, StorageService.ConsistencyLevel.WEAK);
+ }
+ catch ( Exception e)
+ {
+ logger_.info( LogUtil.throwableToString(e) );
+ }
+ }
+
+
+ public String getVersion()
+ {
+ return "1";
+ }
+
+ public int getStatus()
+ {
+ return fb_status.ALIVE;
+ }
+
+ public String getStatusDetails()
+ {
+ return null;
+ }
+
+ public static void main(String[] args) throws Throwable
+ {
+ int port = 9160;
+ try
+ {
+ CassandraServer peerStorageServer = new CassandraServer();
+ peerStorageServer.start();
+ Cassandra.Processor processor = new Cassandra.Processor(
+ peerStorageServer);
+ // Transport
+ TServerSocket tServerSocket = new TServerSocket(port);
+ // Protocol factory
+ TProtocolFactory tProtocolFactory = new TBinaryProtocol.Factory();
+ // ThreadPool Server
+ Options options = new Options();
+ options.minWorkerThreads = 64;
+ TThreadPoolServer serverEngine = new TThreadPoolServer(processor, tServerSocket, tProtocolFactory);
+ serverEngine.serve();
+
+ }
+ catch (Exception x)
+ {
+ System.err.println("UNCAUGHT EXCEPTION IN main()");
+ x.printStackTrace();
+ System.exit(1);
+ }
+
+ }
+
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/service/ConsistencyManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/ConsistencyManager.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/ConsistencyManager.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/ConsistencyManager.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.ThreadFactoryImpl;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ReadMessage;
+import org.apache.cassandra.db.ReadResponseMessage;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.Cachetable;
+import org.apache.cassandra.utils.ICacheExpungeHook;
+import org.apache.cassandra.utils.ICachetable;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+
+class ConsistencyManager implements Runnable
+{
+ private static Logger logger_ = Logger.getLogger(ConsistencyManager.class);
+
+ class DigestResponseHandler implements IAsyncCallback
+ {
+ List<Message> responses_ = new ArrayList<Message>();
+
+ public void response(Message msg)
+ {
+ logger_.debug("Received reponse : " + msg.toString());
+ responses_.add(msg);
+ if ( responses_.size() == ConsistencyManager.this.replicas_.size() )
+ handleDigestResponses();
+ }
+
+ private void handleDigestResponses()
+ {
+ DataInputBuffer bufIn = new DataInputBuffer();
+ logger_.debug("Handle Digest reponses");
+ for( Message response : responses_ )
+ {
+ byte[] body = (byte[])response.getMessageBody()[0];
+ bufIn.reset(body, body.length);
+ try
+ {
+ ReadResponseMessage result = ReadResponseMessage.serializer().deserialize(bufIn);
+ byte[] digest = result.digest();
+ if( !Arrays.equals(row_.digest(), digest) )
+ {
+ doReadRepair();
+ break;
+ }
+ }
+ catch( IOException ex )
+ {
+ logger_.info(LogUtil.throwableToString(ex));
+ }
+ }
+ }
+
+ private void doReadRepair() throws IOException
+ {
+ IResponseResolver<Row> readResponseResolver = new ReadResponseResolver();
+ /* Add the local storage endpoint to the replicas_ list */
+ replicas_.add(StorageService.getLocalStorageEndPoint());
+ IAsyncCallback responseHandler = new DataRepairHandler(ConsistencyManager.this.replicas_.size(), readResponseResolver);
+ String table = DatabaseDescriptor.getTables().get(0);
+ ReadMessage readMessage = new ReadMessage(table, row_.key(), columnFamily_);
+ Message message = ReadMessage.makeReadMessage(readMessage);
+ MessagingService.getMessagingInstance().sendRR(message, replicas_.toArray( new EndPoint[0] ), responseHandler);
+ }
+ }
+
+ class DataRepairHandler implements IAsyncCallback, ICacheExpungeHook<String, String>
+ {
+ private List<Message> responses_ = new ArrayList<Message>();
+ private IResponseResolver<Row> readResponseResolver_;
+ private int majority_;
+
+ DataRepairHandler(int responseCount, IResponseResolver<Row> readResponseResolver)
+ {
+ readResponseResolver_ = readResponseResolver;
+ majority_ = (responseCount >> 1) + 1;
+ }
+
+ public void response(Message message)
+ {
+ logger_.debug("Received responses in DataRepairHandler : " + message.toString());
+ responses_.add(message);
+ if ( responses_.size() == majority_ )
+ {
+ String messageId = message.getMessageId();
+ readRepairTable_.put(messageId, messageId, this);
+ // handleResponses();
+ }
+ }
+
+ public void callMe(String key, String value)
+ {
+ handleResponses();
+ }
+
+ private void handleResponses()
+ {
+ try
+ {
+ readResponseResolver_.resolve(new ArrayList<Message>(responses_));
+ }
+ catch ( DigestMismatchException ex )
+ {
+ logger_.info("We should not be coming here under any circumstances ...");
+ logger_.info(LogUtil.throwableToString(ex));
+ }
+ }
+ }
+ private static long scheduledTimeMillis_ = 600;
+ private static ICachetable<String, String> readRepairTable_ = new Cachetable<String, String>(scheduledTimeMillis_);
+ private Row row_;
+ protected List<EndPoint> replicas_;
+ private String columnFamily_;
+ private int start_;
+ private int count_;
+ private long sinceTimestamp_;
+ private List<String> columnNames_ = new ArrayList<String>();
+
+ ConsistencyManager(Row row, List<EndPoint> replicas, String columnFamily, List<String> columns)
+ {
+ row_ = row;
+ replicas_ = replicas;
+ columnFamily_ = columnFamily;
+ columnNames_ = columns;
+ }
+
+ ConsistencyManager(Row row, List<EndPoint> replicas, String columnFamily, int start, int count)
+ {
+ row_ = row;
+ replicas_ = replicas;
+ columnFamily_ = columnFamily;
+ start_ = start;
+ count_ = count;
+ }
+
+ ConsistencyManager(Row row, List<EndPoint> replicas, String columnFamily, long sinceTimestamp)
+ {
+ row_ = row;
+ replicas_ = replicas;
+ columnFamily_ = columnFamily;
+ sinceTimestamp_ = sinceTimestamp;
+ }
+
+ public void run()
+ {
+ logger_.debug(" Run the consistency checks for " + columnFamily_);
+ String table = DatabaseDescriptor.getTables().get(0);
+ ReadMessage readMessageDigestOnly = null;
+ if(columnNames_.size() == 0)
+ {
+ if( start_ >= 0 && count_ < Integer.MAX_VALUE)
+ {
+ readMessageDigestOnly = new ReadMessage(table, row_.key(), columnFamily_, start_, count_);
+ }
+ else if(sinceTimestamp_ > 0)
+ {
+ readMessageDigestOnly = new ReadMessage(table, row_.key(), columnFamily_, sinceTimestamp_);
+ }
+ else
+ {
+ readMessageDigestOnly = new ReadMessage(table, row_.key(), columnFamily_);
+ }
+ }
+ else
+ {
+ readMessageDigestOnly = new ReadMessage(table, row_.key(), columnFamily_, columnNames_);
+
+ }
+ readMessageDigestOnly.setIsDigestQuery(true);
+ try
+ {
+ Message messageDigestOnly = ReadMessage.makeReadMessage(readMessageDigestOnly);
+ IAsyncCallback digestResponseHandler = new DigestResponseHandler();
+ MessagingService.getMessagingInstance().sendRR(messageDigestOnly, replicas_.toArray(new EndPoint[0]), digestResponseHandler);
+ }
+ catch ( IOException ex )
+ {
+ logger_.info(LogUtil.throwableToString(ex));
+ }
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/service/Constants.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/Constants.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/Constants.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/Constants.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,18 @@
+/**
+ * Autogenerated by Thrift
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ */
+package org.apache.cassandra.service;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.HashSet;
+import com.facebook.thrift.*;
+
+public class Constants {
+
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/service/CqlResult_t.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/CqlResult_t.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/CqlResult_t.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/CqlResult_t.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,216 @@
+/**
+ * Autogenerated by Thrift
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ */
+package org.apache.cassandra.service;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.HashSet;
+import com.facebook.thrift.*;
+
+import com.facebook.thrift.protocol.*;
+import com.facebook.thrift.transport.*;
+
+public class CqlResult_t implements TBase, java.io.Serializable {
+ public int errorCode;
+ public String errorTxt;
+ public List<Map<String,String>> resultSet;
+
+ public final Isset __isset = new Isset();
+ public static final class Isset implements java.io.Serializable {
+ public boolean errorCode = false;
+ public boolean errorTxt = false;
+ public boolean resultSet = false;
+ }
+
+ public CqlResult_t() {
+ }
+
+ public CqlResult_t(
+ int errorCode,
+ String errorTxt,
+ List<Map<String,String>> resultSet)
+ {
+ this();
+ this.errorCode = errorCode;
+ this.__isset.errorCode = true;
+ this.errorTxt = errorTxt;
+ this.__isset.errorTxt = true;
+ this.resultSet = resultSet;
+ this.__isset.resultSet = true;
+ }
+
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof CqlResult_t)
+ return this.equals((CqlResult_t)that);
+ return false;
+ }
+
+ public boolean equals(CqlResult_t that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_errorCode = true;
+ boolean that_present_errorCode = true;
+ if (this_present_errorCode || that_present_errorCode) {
+ if (!(this_present_errorCode && that_present_errorCode))
+ return false;
+ if (this.errorCode != that.errorCode)
+ return false;
+ }
+
+ boolean this_present_errorTxt = true && (this.errorTxt != null);
+ boolean that_present_errorTxt = true && (that.errorTxt != null);
+ if (this_present_errorTxt || that_present_errorTxt) {
+ if (!(this_present_errorTxt && that_present_errorTxt))
+ return false;
+ if (!this.errorTxt.equals(that.errorTxt))
+ return false;
+ }
+
+ boolean this_present_resultSet = true && (this.resultSet != null);
+ boolean that_present_resultSet = true && (that.resultSet != null);
+ if (this_present_resultSet || that_present_resultSet) {
+ if (!(this_present_resultSet && that_present_resultSet))
+ return false;
+ if (!this.resultSet.equals(that.resultSet))
+ return false;
+ }
+
+ return true;
+ }
+
+ public int hashCode() {
+ return 0;
+ }
+
+ public void read(TProtocol iprot) throws TException {
+ TField field;
+ iprot.readStructBegin();
+ while (true)
+ {
+ field = iprot.readFieldBegin();
+ if (field.type == TType.STOP) {
+ break;
+ }
+ switch (field.id)
+ {
+ case 1:
+ if (field.type == TType.I32) {
+ this.errorCode = iprot.readI32();
+ this.__isset.errorCode = true;
+ } else {
+ TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ case 2:
+ if (field.type == TType.STRING) {
+ this.errorTxt = iprot.readString();
+ this.__isset.errorTxt = true;
+ } else {
+ TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ case 3:
+ if (field.type == TType.LIST) {
+ {
+ TList _list40 = iprot.readListBegin();
+ this.resultSet = new ArrayList<Map<String,String>>(_list40.size);
+ for (int _i41 = 0; _i41 < _list40.size; ++_i41)
+ {
+ Map<String,String> _elem42 = new HashMap<String,String>();
+ {
+ TMap _map43 = iprot.readMapBegin();
+ _elem42 = new HashMap<String,String>(2*_map43.size);
+ for (int _i44 = 0; _i44 < _map43.size; ++_i44)
+ {
+ String _key45;
+ String _val46;
+ _key45 = iprot.readString();
+ _val46 = iprot.readString();
+ _elem42.put(_key45, _val46);
+ }
+ iprot.readMapEnd();
+ }
+ this.resultSet.add(_elem42);
+ }
+ iprot.readListEnd();
+ }
+ this.__isset.resultSet = true;
+ } else {
+ TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ default:
+ TProtocolUtil.skip(iprot, field.type);
+ break;
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+ }
+
+ public void write(TProtocol oprot) throws TException {
+ TStruct struct = new TStruct("CqlResult_t");
+ oprot.writeStructBegin(struct);
+ TField field = new TField();
+ field.name = "errorCode";
+ field.type = TType.I32;
+ field.id = 1;
+ oprot.writeFieldBegin(field);
+ oprot.writeI32(this.errorCode);
+ oprot.writeFieldEnd();
+ if (this.errorTxt != null) {
+ field.name = "errorTxt";
+ field.type = TType.STRING;
+ field.id = 2;
+ oprot.writeFieldBegin(field);
+ oprot.writeString(this.errorTxt);
+ oprot.writeFieldEnd();
+ }
+ if (this.resultSet != null) {
+ field.name = "resultSet";
+ field.type = TType.LIST;
+ field.id = 3;
+ oprot.writeFieldBegin(field);
+ {
+ oprot.writeListBegin(new TList(TType.MAP, this.resultSet.size()));
+ for (Map<String,String> _iter47 : this.resultSet) {
+ {
+ oprot.writeMapBegin(new TMap(TType.STRING, TType.STRING, _iter47.size()));
+ for (String _iter48 : _iter47.keySet()) {
+ oprot.writeString(_iter48);
+ oprot.writeString(_iter47.get(_iter48));
+ }
+ oprot.writeMapEnd();
+ }
+ }
+ oprot.writeListEnd();
+ }
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder("CqlResult_t(");
+ sb.append("errorCode:");
+ sb.append(this.errorCode);
+ sb.append(",errorTxt:");
+ sb.append(this.errorTxt);
+ sb.append(",resultSet:");
+ sb.append(this.resultSet);
+ sb.append(")");
+ return sb.toString();
+ }
+
+}
+
Added: incubator/cassandra/trunk/src/org/apache/cassandra/service/DigestMismatchException.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/DigestMismatchException.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/DigestMismatchException.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/DigestMismatchException.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class DigestMismatchException extends Exception
+{
+ public DigestMismatchException(String message)
+ {
+ super(message);
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/service/HttpRequestVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/HttpRequestVerbHandler.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/HttpRequestVerbHandler.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/HttpRequestVerbHandler.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,729 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.MemoryUsage;
+import java.lang.management.RuntimeMXBean;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.CalloutDeployMessage;
+import org.apache.cassandra.db.CalloutManager;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.http.ColumnFamilyFormatter;
+import org.apache.cassandra.net.http.HTMLFormatter;
+import org.apache.cassandra.net.http.HttpConnection;
+import org.apache.cassandra.net.http.HttpRequest;
+import org.apache.cassandra.net.http.HttpWriteResponse;
+import org.apache.cassandra.procedures.GroovyScriptRunner;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.net.*;
+
+/*
+ * This class handles the incoming HTTP request after
+ * it has been parsed.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+public class HttpRequestVerbHandler implements IVerbHandler
+{
+ private static final Logger logger_ = Logger.getLogger(HttpRequestVerbHandler.class);
+ /* These are the list of actions supported */
+ private static final String DETAILS = "details";
+ private static final String LOADME = "loadme";
+ private static final String KILLME = "killme";
+ private static final String COMPACTME = "compactme";
+ private static final String LB_HEALTH_CHECK = "lb_health_check";
+ private static final String LB_HEALTH_CHECK_RESPONSE = "I-AM-ALIVE";
+ private static final String QUERY = "query";
+ private static final String INSERT = "insert";
+ private static final String SCRIPT = "script";
+ private static final String QUERYRESULTSDIV = "queryResultsDiv";
+ private static final String INSERTRESULTSDIV = "insertResultsDiv";
+ private static final String SCRIPTRESULTSDIV = "insertResultsDiv";
+ private static final String JS_UPDATE_QUERY_FUNCTION = "updateQueryResults";
+ private static final String JS_UPDATE_INSERT_FUNCTION = "updateInsertResults";
+
+ private StorageService storageService_;
+
+ public HttpRequestVerbHandler(StorageService storageService)
+ {
+ storageService_ = storageService;
+ }
+
+ public void doVerb(Message message)
+ {
+ HttpConnection.HttpRequestMessage httpRequestMessage = (HttpConnection.HttpRequestMessage)message.getMessageBody()[0];
+ try
+ {
+ HttpRequest httpRequest = httpRequestMessage.getHttpRequest();
+ HttpWriteResponse httpServerResponse = new HttpWriteResponse(httpRequest);
+ if(httpRequest.getMethod().toUpperCase().equals("GET"))
+ {
+ // handle the get request type
+ doGet(httpRequest, httpServerResponse);
+ }
+ else if(httpRequest.getMethod().toUpperCase().equals("POST"))
+ {
+ // handle the POST request type
+ doPost(httpRequest, httpServerResponse);
+ }
+
+ // write the response we have constructed into the socket
+ ByteBuffer buffer = httpServerResponse.flush();
+ httpRequestMessage.getHttpConnection().write(buffer);
+ }
+ catch(Exception e)
+ {
+ logger_.warn(LogUtil.throwableToString(e));
+ }
+ }
+
+ private void doGet(HttpRequest httpRequest, HttpWriteResponse httpResponse)
+ {
+ boolean fServeSummary = true;
+ HTMLFormatter formatter = new HTMLFormatter();
+ String query = httpRequest.getQuery();
+ /*
+ * we do not care about the path for most requests except those
+ * from the load balancer
+ */
+ String path = httpRequest.getPath();
+ /* for the health checks, just return the string only */
+ if(path.indexOf(LB_HEALTH_CHECK) != -1)
+ {
+ httpResponse.println(handleLBHealthCheck());
+ return;
+ }
+
+ formatter.startBody(true, getJSFunctions(), true, true);
+ formatter.appendLine("<h1><font color=\"white\"> Cluster map </font></h1>");
+
+ StringBuilder sbResult = new StringBuilder();
+ do
+ {
+ if(query.indexOf(DETAILS) != -1)
+ {
+ fServeSummary = false;
+ sbResult.append(handleNodeDetails());
+ break;
+ }
+ else if(query.indexOf(LOADME) != -1)
+ {
+ sbResult.append(handleLoadMe());
+ break;
+ }
+ else if(query.indexOf(KILLME) != -1)
+ {
+ sbResult.append(handleKillMe());
+ break;
+ }
+ else if(query.indexOf(COMPACTME) != -1)
+ {
+ sbResult.append(handleCompactMe());
+ break;
+ }
+ }
+ while(false);
+
+ //formatter.appendLine("<br>-------END DEBUG INFO-------<br><br>");
+
+ if(fServeSummary)
+ {
+ formatter.appendLine(handlePageDisplay(null, null, null));
+ }
+
+ formatter.appendLine("<br>");
+
+ if(sbResult.toString() != null)
+ {
+ formatter.appendLine(sbResult.toString());
+ }
+
+ formatter.endBody();
+ httpResponse.println(formatter.toString());
+ }
+
+ /*
+ * As a result of the POST query, we currently only send back some
+ * javascript that updates the data in some place on the browser.
+ */
+ private void doPost(HttpRequest httpRequest, HttpWriteResponse httpResponse)
+ {
+ String query = httpRequest.getQuery();
+
+ HTMLFormatter formatter = new HTMLFormatter();
+ formatter.startBody(true, getJSFunctions(), true, true);
+ formatter.appendLine("<h1><font color=\"white\"> Cluster map </font></h1>");
+
+ // write a shell for adding some javascript to do in-place updates
+ StringBuilder sbResult = new StringBuilder();
+ do
+ {
+ if(query.indexOf(QUERY) != -1)
+ {
+ sbResult.append(handleQuery(httpRequest));
+ break;
+ }
+ else if(query.indexOf(INSERT) != -1)
+ {
+ sbResult.append(handleInsert(httpRequest));
+ break;
+ }
+ else if(query.indexOf(SCRIPT) != -1)
+ {
+ sbResult.append(handleScript(httpRequest));
+ break;
+ }
+ }
+ while(false);
+
+ if(sbResult.toString() != null)
+ {
+ formatter.appendLine(sbResult.toString());
+ }
+
+ formatter.endBody();
+
+ httpResponse.println(formatter.toString());
+ }
+
+ private String handleNodeDetails()
+ {
+ HTMLFormatter formatter = new HTMLFormatter();
+
+ formatter.appendLine("Token: " + storageService_.getToken());
+ RuntimeMXBean runtimeMxBean = ManagementFactory.getRuntimeMXBean();
+ formatter.appendLine("Up time (in seconds): " + (runtimeMxBean.getUptime()/1000));
+
+ MemoryMXBean memoryMxBean = ManagementFactory.getMemoryMXBean();
+ MemoryUsage memUsage = memoryMxBean.getHeapMemoryUsage();
+ java.text.DecimalFormat df = new java.text.DecimalFormat("#0.00");
+ String smemUsed = df.format((double)memUsage.getUsed()/(1024 * 1024));
+ String smemMax = df.format((double)memUsage.getMax()/(1024 * 1024));
+ formatter.appendLine("Heap memory usage (in MB): " + smemUsed + "/" + smemMax);
+
+ formatter.appendLine("<br>");
+ formatter.appendLine("<br>");
+
+ /*
+ * Display DB statatics if we have something to show.
+ */
+ displayDBStatistics(formatter, df);
+
+ formatter.appendLine("<button onClick=\"window.location='" + StorageService.getHostUrl() + "?" + LOADME + "=T'\">Load Me</button>");
+ formatter.appendLine("<button onClick=\"window.location='" + StorageService.getHostUrl() + "?" + COMPACTME + "=T'\">Compact Me</button>");
+ formatter.appendLine("<button onClick=\"window.location='" + StorageService.getHostUrl() + "?" + KILLME + "=T'\">Kill Me</button>");
+
+ formatter.appendLine("<br>");
+ formatter.appendLine("<br><a href='" + StorageService.getHostUrl() + "'>Back to live nodes list" + "</a>");
+
+ return formatter.toString();
+ }
+
+ private void displayDBStatistics(HTMLFormatter formatter, java.text.DecimalFormat df)
+ {
+ String tableStats = Table.open( DatabaseDescriptor.getTables().get(0) ).tableStats("\n<br>\n", df);
+
+ if ( tableStats.length() == 0 )
+ return;
+
+ formatter.appendLine("DB statistics:");
+ formatter.appendLine("<br>");
+ formatter.appendLine("<br>");
+
+ formatter.appendLine(tableStats);
+ formatter.appendLine("<br>");
+ formatter.appendLine("<br>");
+ }
+
+ private String handlePageDisplay(String queryFormData, String insertFormData, String scriptFormData)
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\n<div id=\"header\"> \n");
+ sb.append("<ul>\n");
+ sb.append(" <li name=\"one\" onclick=\"javascript:selectTab('one')\"><a href=\"#\">Cluster</a></li>\n");
+ sb.append(" <li name=\"two\" onclick=\"javascript:selectTab('two')\"><a href=\"#\">SQL</a></li>\n");
+ sb.append(" <li name=\"three\" onclick=\"javascript:selectTab('three')\"><a href=\"#\">Ring</a></li>\n");
+ sb.append("</ul>\n");
+ sb.append("</div>\n\n");
+
+ sb.append("<div name=\"one\" id=\"content\"> <!-- start tab one -->\n\n");
+ sb.append(serveSummary());
+ sb.append("</div> <!-- finish tab one -->\n\n");
+
+ sb.append("<div name=\"two\" id=\"content\"> <!-- start tab two -->\n\n");
+ sb.append(serveInsertForm(insertFormData));
+ sb.append(serveQueryForm(queryFormData));
+ sb.append(serveGroovyForm(scriptFormData));
+ sb.append("</div> <!-- finish tab two -->\n\n");
+
+ sb.append("<div name=\"three\" id=\"content\"> <!-- start tab three -->\n\n");
+ sb.append(serveRingView());
+ sb.append("</div> <!-- finish tab three -->\n\n");
+
+ sb.append("\n<script type=\"text/javascript\">\n");
+ if(queryFormData != null || insertFormData != null || scriptFormData != null)
+ sb.append("selectTab(\"two\");\n");
+ else
+ sb.append("selectTab(\"one\");\n");
+
+ sb.append("</script>\n");
+
+ return (sb.toString() == null)?"":sb.toString();
+ }
+
+ /*
+ * Serve the summary of the current node.
+ */
+ private String serveSummary()
+ {
+ HTMLFormatter formatter = new HTMLFormatter();
+
+ Set<EndPoint> liveNodeList = Gossiper.instance().getAllMembers();
+ // we want this set of live nodes sorted based on the hostname
+ EndPoint[] liveNodes = liveNodeList.toArray(new EndPoint[0]);
+ Arrays.sort(liveNodes);
+
+ String[] sHeaders = {"Node No.", "Host:Port", "Status", "Leader", "Load Info", "Token", "Generation No."};
+ formatter.startTable();
+ formatter.addHeaders(sHeaders);
+ int iNodeNumber = 0;
+ for( EndPoint curNode : liveNodes )
+ {
+ formatter.startRow();
+ ++iNodeNumber;
+
+ // Node No.
+ formatter.addCol("" + iNodeNumber);
+ // Host:Port
+ formatter.addCol("<a href='http://" + curNode.getHost() + ":" + DatabaseDescriptor.getHttpPort() + "/home?" + DETAILS + "=T'>" + curNode.getHost() + ":" + curNode.getPort() + "</a>");
+ //Status
+ String status = ( FailureDetector.instance().isAlive(curNode) ) ? "Up" : "Down";
+ formatter.addCol(status);
+ //Leader
+ boolean isLeader = StorageService.instance().isLeader(curNode);
+ formatter.addCol(Boolean.toString(isLeader));
+ //Load Info
+ String loadInfo = getLoadInfo(curNode);
+ formatter.addCol(loadInfo);
+ // Token
+ if(curNode == null)
+ formatter.addCol("NULL!");
+ else
+ formatter.addCol(storageService_.getToken(curNode));
+ // Generation Number
+ formatter.addCol(Integer.toString(Gossiper.instance().getCurrentGenerationNumber(curNode)));
+
+ formatter.endRow();
+ }
+
+ formatter.endTable();
+
+ return formatter.toString();
+ }
+
+ private String serveRingView()
+ {
+ HTMLFormatter formatter = new HTMLFormatter();
+ String[] sHeaders = {"Range No.", "Range", "N1", "N2", "N3"};
+ formatter.startTable();
+ formatter.addHeaders(sHeaders);
+
+ Map<Range, List<EndPoint>> oldRangeToEndPointMap = StorageService.instance().getRangeToEndPointMap();
+ Set<Range> rangeSet = oldRangeToEndPointMap.keySet();
+
+ int iNodeNumber = 0;
+ for ( Range range : rangeSet )
+ {
+ formatter.startRow();
+ ++iNodeNumber;
+
+ // Range No.
+ formatter.addCol("" + iNodeNumber);
+
+ // Range
+ formatter.addCol("(" + range.left() + ",<br>" + range.right() + "]");
+
+ List<EndPoint> replicas = oldRangeToEndPointMap.get(range);
+ for ( EndPoint replica : replicas )
+ {
+ // N1 N2 N3
+ formatter.addCol(replica.toString());
+ }
+
+ formatter.endRow();
+ }
+
+ formatter.endTable();
+
+ return formatter.toString();
+ }
+
+ private String getLoadInfo(EndPoint ep)
+ {
+ if ( StorageService.getLocalControlEndPoint().equals(ep) )
+ {
+ return StorageService.instance().getLoadInfo();
+ }
+ else
+ {
+ return StorageService.instance().getLoadInfo(ep);
+ }
+ }
+
+ /*
+ * Returns the HTML code for a form to query data from the db cluster.
+ */
+ private String serveQueryForm(String queryResult)
+ {
+ HTMLFormatter formatter = new HTMLFormatter();
+ formatter.appendLine("<BR><fieldset><legend>Query the cluster</legend>");
+ formatter.appendLine("<FORM action=\"" + StorageService.getHostUrl() + "/home?" + QUERY + "=T\" method=\"post\">");
+
+ // get the list of column families
+ Table table = Table.open("Mailbox");
+ Set<String> columnFamilyComboBoxSet = table.getColumnFamilies();
+
+ formatter.append("select from ");
+ formatter.addCombobox(columnFamilyComboBoxSet, "columnfamily", 0);
+ formatter.append(" : <INPUT name=columnName>");
+ formatter.appendLine(" where key = <INPUT name=key>");
+ formatter.appendLine("<BR>");
+ formatter.appendLine("<INPUT type=\"submit\" value=\"Send\"> <INPUT type=\"reset\">");
+
+ formatter.appendLine("</FORM>");
+ formatter.addDivElement(QUERYRESULTSDIV, queryResult);
+ formatter.appendLine("</fieldset><BR>");
+
+ return formatter.toString();
+ }
+
+ /*
+ * Returns the HTML code for a form to to run custom code on the cluster.
+ */
+ private String serveGroovyForm(String scriptResult)
+ {
+ HTMLFormatter formatter = new HTMLFormatter();
+ formatter.appendLine("<BR><fieldset><legend>Run custom code on the cluster</legend>");
+ formatter.appendLine("<FORM action=\"" + StorageService.getHostUrl() + "/home?" + SCRIPT + "=T\" method=\"post\">");
+ formatter.append(" Callout name : <INPUT name=calloutName>");
+ formatter.appendLine("<BR>");
+ formatter.append("Groovy code to run on the server:<br>");
+ formatter.append("<textarea name=scriptTextArea rows=\"10\" cols=\"100\"></textarea>");
+ formatter.appendLine("<BR>");
+ formatter.appendLine("<INPUT name=deploy type=\"submit\" value=\"Deploy\"> <INPUT name=execute type=\"submit\" value=\"Execute\"> <INPUT name=reset type=\"reset\">");
+
+ formatter.appendLine("</FORM>");
+ formatter.addDivElement(SCRIPTRESULTSDIV, scriptResult);
+ formatter.appendLine("</fieldset><BR>");
+
+ return formatter.toString();
+ }
+
+ /*
+ * Returns the HTML code for a form to insert data into the db cluster.
+ */
+ private String serveInsertForm(String insertResult)
+ {
+ HTMLFormatter formatter = new HTMLFormatter();
+ formatter.appendLine("<BR><fieldset>\n<legend>Insert data into the cluster</legend>\n");
+ formatter.appendLine("<FORM action=\"" + StorageService.getHostUrl() + "/home?" + INSERT + "=T\" method=\"post\">");
+
+ // get the list of column families
+ Table table = Table.open("Mailbox");
+ Set<String> columnFamilyComboBoxSet = table.getColumnFamilies();
+
+ formatter.append("insert into ");
+ formatter.addCombobox(columnFamilyComboBoxSet, "columnfamily", 0);
+ formatter.append(" : <INPUT name=columnName>");
+ formatter.append(" data = <INPUT name=data>");
+ formatter.appendLine(" where key = <INPUT name=key>\n");
+ formatter.appendLine("<BR>\n");
+ formatter.appendLine("<INPUT type=\"submit\" value=\"Send\"> <INPUT type=\"reset\">\n");
+
+ formatter.appendLine("</FORM>\n");
+ formatter.addDivElement(INSERTRESULTSDIV, insertResult);
+ formatter.appendLine("</fieldset>\n<BR>\n");
+
+ return formatter.toString();
+ }
+
+ /*
+ * Handle the query of some data from the client.
+ */
+ private String handleQuery(HttpRequest httpRequest)
+ {
+ boolean fQuerySuccess = false;
+ String sRetVal = "";
+
+ // get the various values for this HTTP request
+ String sColumnFamily = httpRequest.getParameter("columnfamily");
+ String sColumn = httpRequest.getParameter("columnName");
+ String sKey = httpRequest.getParameter("key");
+
+ // get the table name
+ String sTableName = DatabaseDescriptor.getTables().get(0);
+
+ StringBuilder sb = new StringBuilder();
+ ColumnFamilyFormatter cformatter = new ColumnFamilyFormatter(sb);
+
+ try
+ {
+ Table table = Table.open(sTableName);
+ String queryFor = sColumnFamily;
+ if(sColumn != null && !"*".equals(sColumn))
+ {
+ queryFor += ":" + sColumn;
+ }
+ ColumnFamily cf = table.get(sKey, queryFor);
+
+ if (cf == null)
+ {
+ sRetVal = "Key [" + sKey + "], column family [" + sColumnFamily + "] not found.";
+ }
+ else
+ {
+ cformatter.printKeyColumnFamily(sb, sKey, cf);
+ fQuerySuccess = true;
+ sRetVal = sb.toString();
+ }
+ }
+ catch (Exception e)
+ {
+ // write failed - return the reason
+ sRetVal = e.getMessage();
+ }
+
+ if(fQuerySuccess)
+ sRetVal = "Success: " + sRetVal;
+ else
+ sRetVal = "Error: " + sRetVal;
+
+ return handlePageDisplay(sRetVal, null, null);
+ }
+
+ /*
+ * Handle the query of some data from the client.
+ */
+ private String handleInsert(HttpRequest httpRequest)
+ {
+ boolean fInsertSuccess = false;
+ String sRetVal = "";
+
+ // get the various values for this HTTP request
+ String sColumnFamily = httpRequest.getParameter("columnfamily");
+ String sColumn = httpRequest.getParameter("columnName");
+ String sKey = httpRequest.getParameter("key");
+ String sDataToInsert = httpRequest.getParameter("data");
+
+ // get the table name
+ String sTableName = DatabaseDescriptor.getTables().get(0);
+
+ try
+ {
+ // do the insert first
+ RowMutation rm = new RowMutation(sTableName, sKey);
+ rm.add(sColumnFamily + ":" + sColumn, sDataToInsert.getBytes(), 0);
+ rm.apply();
+
+ fInsertSuccess = true;
+ sRetVal = "columnfamily=" + httpRequest.getParameter("columnfamily") + " key=" + httpRequest.getParameter("key") + " data=" + httpRequest.getParameter("data");
+ }
+ catch (Exception e)
+ {
+ // write failed - return the reason
+ sRetVal = e.getMessage();
+ }
+ System.out.println("Write done ...");
+
+ if(fInsertSuccess)
+ sRetVal = "The insert was successful : " + sRetVal;
+ else
+ sRetVal = "The insert was failed : " + sRetVal;
+
+ return handlePageDisplay(null, sRetVal, null);
+ }
+
+ /*
+ * Handle the script to be run on the server.
+ */
+ private String handleScript(HttpRequest httpRequest)
+ {
+ boolean fQuerySuccess = false;
+ String sRetVal = "";
+
+ // get the various values for this HTTP request
+ String callout = httpRequest.getParameter("calloutName");
+ String script = httpRequest.getParameter("scriptTextArea");
+ String deploy = httpRequest.getParameter("deploy");
+ String execute = httpRequest.getParameter("execute");
+ try
+ {
+ if ( deploy != null )
+ {
+ if ( callout != null && script != null )
+ {
+ doDeploy(callout, script);
+ sRetVal = "Finished deployment of callouts ...";
+ }
+ }
+ if ( execute != null )
+ {
+ if ( script != null )
+ {
+ sRetVal = GroovyScriptRunner.evaluateString(script);
+ }
+ }
+ fQuerySuccess = true;
+ }
+ catch(Throwable t)
+ {
+ sRetVal = t.getMessage();
+ logger_.warn(LogUtil.throwableToString(t));
+ }
+
+ if(fQuerySuccess)
+ sRetVal = "Result: Success<br>\nReturn value: <br>\n" + sRetVal;
+ else
+ sRetVal = "Result: Error<br>\nError: <br>\n" + sRetVal;
+
+ return handlePageDisplay(null, null, sRetVal);
+ }
+
+ private void doDeploy(String callout, String script)
+ {
+ Set<EndPoint> allMbrs = Gossiper.instance().getAllMembers();
+ /* Send the script to all mbrs to deploy it locally. */
+ CalloutDeployMessage cdMessage = new CalloutDeployMessage(callout, script);
+ try
+ {
+ Message message = CalloutDeployMessage.getCalloutDeployMessage(cdMessage);
+ for ( EndPoint mbr : allMbrs )
+ {
+ if ( mbr.equals( StorageService.getLocalControlEndPoint() ) )
+ {
+ /* Deploy locally */
+ CalloutManager.instance().addCallout(callout, script);
+ }
+ else
+ {
+ EndPoint to = new EndPoint(mbr.getHost(), DatabaseDescriptor.getStoragePort());
+ logger_.debug("Deploying the script to " + mbr);
+ MessagingService.getMessagingInstance().sendOneWay(message, to);
+ }
+ }
+ }
+ catch ( IOException ex )
+ {
+ logger_.warn( LogUtil.throwableToString(ex) );
+ }
+ }
+
+ private String getJSFunctions()
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append("function " + JS_UPDATE_QUERY_FUNCTION + "(text)\n");
+ sb.append("{\n");
+ sb.append(" obj = document.getElementById(\"" + QUERYRESULTSDIV + "\");\n");
+ sb.append(" if(obj)\n");
+ sb.append(" obj.innerHTML = text;\n");
+ sb.append("}\n");
+ sb.append("\n");
+ sb.append("function " + JS_UPDATE_INSERT_FUNCTION + "(text)\n");
+ sb.append("{\n");
+ sb.append(" obj = document.getElementById(\"" + INSERTRESULTSDIV + "\");\n");
+ sb.append(" if(obj)\n");
+ sb.append(" obj.innerHTML = text;\n");
+ sb.append("}\n");
+ sb.append("\n");
+
+ return sb.toString();
+ }
+
+ /*
+ * Load the current node with data.
+ */
+ private String handleLoadMe()
+ {
+ return "Loading...";
+ }
+
+ private String handleCompactMe()
+ {
+ Table table = Table.open(DatabaseDescriptor.getTables().get(0));
+ try
+ {
+ table.forceCompaction();
+ }
+ catch (IOException ex)
+ {
+ logger_.debug(LogUtil.throwableToString(ex));
+ }
+ return "Compacting ...";
+ }
+
+ private String handleLBHealthCheck()
+ {
+ if(StorageService.instance().isShutdown())
+ return "";
+ return LB_HEALTH_CHECK_RESPONSE;
+ }
+
+ /*
+ * Kill the current node.
+ */
+ private String handleKillMe()
+ {
+ if(StorageService.instance().isShutdown())
+ return "Already scheduled for being shutdown";
+ /*
+ * The storage service will wait for a period of time to let the
+ * VIP know that we are shutting down, then will perform an actual
+ * shutdown on a separate thread.
+ */
+ String status = "Service has been killed";
+ try
+ {
+ StorageService.instance().killMe();
+ }
+ catch( Throwable th )
+ {
+ logger_.warn(LogUtil.throwableToString(th));
+ status = "Failed to kill service.";
+ }
+ return status;
+ }
+
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/service/IComponentShutdown.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/IComponentShutdown.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/IComponentShutdown.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/IComponentShutdown.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+public interface IComponentShutdown
+{
+ public void shutdown();
+}