You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pm...@apache.org on 2009/03/02 07:12:49 UTC

svn commit: r749205 [1/16] - in /incubator/cassandra/src/org/apache/cassandra: analytics/ cli/ concurrent/ config/ continuations/ cql/ cql/common/ cql/compiler/ cql/compiler/common/ cql/compiler/parse/ cql/compiler/sem/ cql/driver/ cql/execution/ dht/ ...

Author: pmalik
Date: Mon Mar  2 06:12:46 2009
New Revision: 749205

URL: http://svn.apache.org/viewvc?rev=749205&view=rev
Log:
Added Cassandra sources

Added:
    incubator/cassandra/src/org/apache/cassandra/analytics/
    incubator/cassandra/src/org/apache/cassandra/analytics/AnalyticsContext.java
    incubator/cassandra/src/org/apache/cassandra/analytics/AnalyticsException.java
    incubator/cassandra/src/org/apache/cassandra/analytics/DBAnalyticsSource.java
    incubator/cassandra/src/org/apache/cassandra/analytics/IAnalyticsSource.java
    incubator/cassandra/src/org/apache/cassandra/analytics/MetricValue.java
    incubator/cassandra/src/org/apache/cassandra/analytics/MetricsRecord.java
    incubator/cassandra/src/org/apache/cassandra/analytics/OutputRecord.java
    incubator/cassandra/src/org/apache/cassandra/analytics/VMAnalyticsSource.java
    incubator/cassandra/src/org/apache/cassandra/cli/
    incubator/cassandra/src/org/apache/cassandra/cli/Cli.g
    incubator/cassandra/src/org/apache/cassandra/cli/Cli.tokens
    incubator/cassandra/src/org/apache/cassandra/cli/CliClient.java
    incubator/cassandra/src/org/apache/cassandra/cli/CliCompiler.java
    incubator/cassandra/src/org/apache/cassandra/cli/CliLexer.java
    incubator/cassandra/src/org/apache/cassandra/cli/CliMain.java
    incubator/cassandra/src/org/apache/cassandra/cli/CliOptions.java
    incubator/cassandra/src/org/apache/cassandra/cli/CliParser.java
    incubator/cassandra/src/org/apache/cassandra/cli/CliSessionState.java
    incubator/cassandra/src/org/apache/cassandra/cli/Cli__.g
    incubator/cassandra/src/org/apache/cassandra/concurrent/
    incubator/cassandra/src/org/apache/cassandra/concurrent/AIOExecutorService.java
    incubator/cassandra/src/org/apache/cassandra/concurrent/Context.java
    incubator/cassandra/src/org/apache/cassandra/concurrent/ContinuationContext.java
    incubator/cassandra/src/org/apache/cassandra/concurrent/ContinuationStage.java
    incubator/cassandra/src/org/apache/cassandra/concurrent/ContinuationsExecutor.java
    incubator/cassandra/src/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java
    incubator/cassandra/src/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
    incubator/cassandra/src/org/apache/cassandra/concurrent/IContinuable.java
    incubator/cassandra/src/org/apache/cassandra/concurrent/IStage.java
    incubator/cassandra/src/org/apache/cassandra/concurrent/MultiThreadedStage.java
    incubator/cassandra/src/org/apache/cassandra/concurrent/RejectedExecutionHandler.java
    incubator/cassandra/src/org/apache/cassandra/concurrent/SingleThreadedContinuationStage.java
    incubator/cassandra/src/org/apache/cassandra/concurrent/SingleThreadedStage.java
    incubator/cassandra/src/org/apache/cassandra/concurrent/StageManager.java
    incubator/cassandra/src/org/apache/cassandra/concurrent/ThreadFactoryImpl.java
    incubator/cassandra/src/org/apache/cassandra/concurrent/ThreadLocalContext.java
    incubator/cassandra/src/org/apache/cassandra/config/
    incubator/cassandra/src/org/apache/cassandra/config/CFMetaData.java
    incubator/cassandra/src/org/apache/cassandra/config/DatabaseDescriptor.java
    incubator/cassandra/src/org/apache/cassandra/continuations/
    incubator/cassandra/src/org/apache/cassandra/continuations/CAgent.java
    incubator/cassandra/src/org/apache/cassandra/continuations/ContinuationClassTransformer.java
    incubator/cassandra/src/org/apache/cassandra/continuations/Suspendable.java
    incubator/cassandra/src/org/apache/cassandra/cql/
    incubator/cassandra/src/org/apache/cassandra/cql/common/
    incubator/cassandra/src/org/apache/cassandra/cql/common/BindOperand.java
    incubator/cassandra/src/org/apache/cassandra/cql/common/CExpr.java
    incubator/cassandra/src/org/apache/cassandra/cql/common/CType.java
    incubator/cassandra/src/org/apache/cassandra/cql/common/ColumnMapExpr.java
    incubator/cassandra/src/org/apache/cassandra/cql/common/ColumnRangeQueryRSD.java
    incubator/cassandra/src/org/apache/cassandra/cql/common/ConstantOperand.java
    incubator/cassandra/src/org/apache/cassandra/cql/common/CqlResult.java
    incubator/cassandra/src/org/apache/cassandra/cql/common/DMLPlan.java
    incubator/cassandra/src/org/apache/cassandra/cql/common/ExplainPlan.java
    incubator/cassandra/src/org/apache/cassandra/cql/common/OperandDef.java
    incubator/cassandra/src/org/apache/cassandra/cql/common/Pair.java
    incubator/cassandra/src/org/apache/cassandra/cql/common/Plan.java
    incubator/cassandra/src/org/apache/cassandra/cql/common/QueryPlan.java
    incubator/cassandra/src/org/apache/cassandra/cql/common/RowSourceDef.java
    incubator/cassandra/src/org/apache/cassandra/cql/common/SetColumnMap.java
    incubator/cassandra/src/org/apache/cassandra/cql/common/SetSuperColumnMap.java
    incubator/cassandra/src/org/apache/cassandra/cql/common/SetUniqueKey.java
    incubator/cassandra/src/org/apache/cassandra/cql/common/SuperColumnMapExpr.java
    incubator/cassandra/src/org/apache/cassandra/cql/common/SuperColumnRangeQueryRSD.java
    incubator/cassandra/src/org/apache/cassandra/cql/common/UniqueKeyQueryRSD.java
    incubator/cassandra/src/org/apache/cassandra/cql/common/Utils.java
    incubator/cassandra/src/org/apache/cassandra/cql/compiler/
    incubator/cassandra/src/org/apache/cassandra/cql/compiler/common/
    incubator/cassandra/src/org/apache/cassandra/cql/compiler/common/CompilerErrorMsg.java
    incubator/cassandra/src/org/apache/cassandra/cql/compiler/common/CqlCompiler.java
    incubator/cassandra/src/org/apache/cassandra/cql/compiler/parse/
    incubator/cassandra/src/org/apache/cassandra/cql/compiler/parse/Cql.g
    incubator/cassandra/src/org/apache/cassandra/cql/compiler/parse/Cql.tokens
    incubator/cassandra/src/org/apache/cassandra/cql/compiler/parse/CqlLexer.java
    incubator/cassandra/src/org/apache/cassandra/cql/compiler/parse/CqlParser.java
    incubator/cassandra/src/org/apache/cassandra/cql/compiler/parse/Cql__.g
    incubator/cassandra/src/org/apache/cassandra/cql/compiler/parse/ParseError.java
    incubator/cassandra/src/org/apache/cassandra/cql/compiler/parse/ParseException.java
    incubator/cassandra/src/org/apache/cassandra/cql/compiler/sem/
    incubator/cassandra/src/org/apache/cassandra/cql/compiler/sem/SemanticException.java
    incubator/cassandra/src/org/apache/cassandra/cql/compiler/sem/SemanticPhase.java
    incubator/cassandra/src/org/apache/cassandra/cql/driver/
    incubator/cassandra/src/org/apache/cassandra/cql/driver/CqlDriver.java
    incubator/cassandra/src/org/apache/cassandra/cql/execution/
    incubator/cassandra/src/org/apache/cassandra/cql/execution/RuntimeErrorMsg.java
    incubator/cassandra/src/org/apache/cassandra/dht/
    incubator/cassandra/src/org/apache/cassandra/dht/BootStrapper.java
    incubator/cassandra/src/org/apache/cassandra/dht/BootstrapInitiateMessage.java
    incubator/cassandra/src/org/apache/cassandra/dht/BootstrapMetadata.java
    incubator/cassandra/src/org/apache/cassandra/dht/BootstrapMetadataMessage.java
    incubator/cassandra/src/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
    incubator/cassandra/src/org/apache/cassandra/dht/BootstrapSourceTarget.java
    incubator/cassandra/src/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java
    incubator/cassandra/src/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java
    incubator/cassandra/src/org/apache/cassandra/dht/Range.java
    incubator/cassandra/src/org/apache/cassandra/gms/
    incubator/cassandra/src/org/apache/cassandra/gms/ApplicationState.java
    incubator/cassandra/src/org/apache/cassandra/gms/EndPointState.java
    incubator/cassandra/src/org/apache/cassandra/gms/FailureDetector.java
    incubator/cassandra/src/org/apache/cassandra/gms/FailureDetectorMBean.java
    incubator/cassandra/src/org/apache/cassandra/gms/GossipDigest.java
    incubator/cassandra/src/org/apache/cassandra/gms/GossipDigestAck2Message.java
    incubator/cassandra/src/org/apache/cassandra/gms/GossipDigestAckMessage.java
    incubator/cassandra/src/org/apache/cassandra/gms/GossipDigestSynMessage.java
    incubator/cassandra/src/org/apache/cassandra/gms/Gossiper.java
    incubator/cassandra/src/org/apache/cassandra/gms/HeartBeatState.java
    incubator/cassandra/src/org/apache/cassandra/gms/IEndPointStateChangePublisher.java
    incubator/cassandra/src/org/apache/cassandra/gms/IEndPointStateChangeSubscriber.java
    incubator/cassandra/src/org/apache/cassandra/gms/IFailureDetectionEventListener.java
    incubator/cassandra/src/org/apache/cassandra/gms/IFailureDetector.java
    incubator/cassandra/src/org/apache/cassandra/gms/IFailureNotification.java
    incubator/cassandra/src/org/apache/cassandra/gms/JoinMessage.java
    incubator/cassandra/src/org/apache/cassandra/gms/PureRandom.java
    incubator/cassandra/src/org/apache/cassandra/gms/VersionGenerator.java
    incubator/cassandra/src/org/apache/cassandra/io/
    incubator/cassandra/src/org/apache/cassandra/io/AIORandomAccessFile.java
    incubator/cassandra/src/org/apache/cassandra/io/BufferedRandomAccessFile.java
    incubator/cassandra/src/org/apache/cassandra/io/ChecksumManager.java
    incubator/cassandra/src/org/apache/cassandra/io/ChecksumRandomAccessFile.java
    incubator/cassandra/src/org/apache/cassandra/io/Coordinate.java
    incubator/cassandra/src/org/apache/cassandra/io/DataInputBuffer.java
    incubator/cassandra/src/org/apache/cassandra/io/DataOutputBuffer.java
    incubator/cassandra/src/org/apache/cassandra/io/FastBufferedInputStream.java
    incubator/cassandra/src/org/apache/cassandra/io/FastBufferedOutputStream.java
    incubator/cassandra/src/org/apache/cassandra/io/ICompactSerializer.java
    incubator/cassandra/src/org/apache/cassandra/io/IFileReader.java
    incubator/cassandra/src/org/apache/cassandra/io/IFileWriter.java
    incubator/cassandra/src/org/apache/cassandra/io/IndexHelper.java
    incubator/cassandra/src/org/apache/cassandra/io/SSTable.java
    incubator/cassandra/src/org/apache/cassandra/io/SequenceFile.java

Added: incubator/cassandra/src/org/apache/cassandra/analytics/AnalyticsContext.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/analytics/AnalyticsContext.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/analytics/AnalyticsContext.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/analytics/AnalyticsContext.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,788 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.analytics;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.TreeMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.service.IComponentShutdown;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+
+
+/**
+ * Context for sending metrics to Ganglia. This class drives the entire metric collection process.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com ) & Karthik Ranganathan ( kranganathan@facebook.com )
+ */
+public class AnalyticsContext implements IComponentShutdown
+{
+	private static Logger logger_ = Logger.getLogger(AnalyticsContext.class);
+
+	private static final String PERIOD_PROPERTY = "period";
+	private static final String SERVERS_PROPERTY = "servers";
+	private static final String UNITS_PROPERTY = "units";
+	private static final String SLOPE_PROPERTY = "slope";
+	private static final String TMAX_PROPERTY = "tmax";
+	private static final String DMAX_PROPERTY = "dmax";
+
+	private static final String DEFAULT_UNITS = "";
+	private static final String DEFAULT_SLOPE = "both";
+	private static final int DEFAULT_TMAX = 60;
+	private static final int DEFAULT_DMAX = 0;
+	private static final int DEFAULT_PORT = 8649;
+	private static final int BUFFER_SIZE = 1500;			 // as per libgmond.c
+
+	private static final Map<Class,String> typeTable_ = new HashMap<Class,String>(5);
+
+	private Map<String,RecordMap> bufferedData_ = new HashMap<String,RecordMap>();
+    /* Keeps the MetricRecord for each abstraction that implements IAnalyticsSource */
+    private Map<String, MetricsRecord> recordMap_ = new HashMap<String, MetricsRecord>();
+	private Map<String,Object> attributeMap_ = new HashMap<String,Object>();
+	private Set<IAnalyticsSource> updaters = new HashSet<IAnalyticsSource>(1);
+	private List<InetSocketAddress> metricsServers_;
+
+	private Map<String, String> unitsTable_;
+	private Map<String, String> slopeTable_;
+	private Map<String, String> tmaxTable_;
+	private Map<String, String> dmaxTable_;
+
+	/* singleton instance */
+	private static AnalyticsContext instance_;
+    /* Used to lock the factory for creation of StorageService instance */
+    private static Lock createLock_ = new ReentrantLock();
+
+	/**
+	 * Default period in seconds at which data is sent to the metrics system.
+	*/
+	private static final int DEFAULT_PERIOD = 5;
+
+	/**
+	 * Port to which we should write the data.
+	 */
+	private int port_ = DEFAULT_PORT;
+
+	private Timer timer = null;
+	private int period_ = DEFAULT_PERIOD;
+	private volatile boolean isMonitoring = false;
+	private byte[] buffer_ = new byte[BUFFER_SIZE];
+	private int offset_;
+
+	private DatagramSocket datagramSocket_;
+
+	static class TagMap extends TreeMap<String,Object>
+	{
+		private static final long serialVersionUID = 3546309335061952993L;
+		TagMap()
+		{
+			super();
+		}
+		TagMap(TagMap orig)
+		{
+			super(orig);
+		}
+	}
+
+	static class MetricMap extends TreeMap<String,Number>
+	{
+		private static final long serialVersionUID = -7495051861141631609L;
+	}
+
+	static class RecordMap extends HashMap<TagMap,MetricMap>
+	{
+		private static final long serialVersionUID = 259835619700264611L;
+	}
+
+	static
+	{
+		typeTable_.put(String.class, "string");
+		typeTable_.put(Byte.class, "int8");
+		typeTable_.put(Short.class, "int16");
+		typeTable_.put(Integer.class, "int32");
+		typeTable_.put(Float.class, "float");
+	}
+
+
+	/**
+	 * Creates a new instance of AnalyticsReporter
+	 */
+	public AnalyticsContext()
+	{
+		StorageService.instance().registerComponentForShutdown(this);
+	}
+
+	/**
+	* Initializes the context.
+	*/
+	public void init(String contextName, String serverSpecList)
+	{
+		String periodStr = getAttribute(PERIOD_PROPERTY);
+
+		if (periodStr != null)
+		{
+			int period = 0;
+			try
+			{
+				period = Integer.parseInt(periodStr);
+			}
+			catch (NumberFormatException nfe)
+			{
+			}
+
+			if (period <= 0)
+			{
+				throw new AnalyticsException("Invalid period: " + periodStr);
+			}
+
+			setPeriod(period);
+		}
+
+		metricsServers_ = parse(serverSpecList, port_);
+		unitsTable_ = getAttributeTable(UNITS_PROPERTY);
+		slopeTable_ = getAttributeTable(SLOPE_PROPERTY);
+		tmaxTable_ = getAttributeTable(TMAX_PROPERTY);
+		dmaxTable_ = getAttributeTable(DMAX_PROPERTY);
+
+		try
+		{
+			datagramSocket_ = new DatagramSocket();
+		}
+		catch (SocketException se)
+		{
+			se.printStackTrace();
+		}
+	}
+
+	/**
+	 * Sends a record to the metrics system.
+	 */
+	public void emitRecord(String recordName, OutputRecord outRec) throws IOException
+	{
+		// emit each metric in turn
+		for (String metricName : outRec.getMetricNames())
+		{
+			Object metric = outRec.getMetric(metricName);
+			String type = (String) typeTable_.get(metric.getClass());
+			emitMetric(metricName, type, metric.toString());
+		}
+	}
+
+	/**
+	 * Helper which actually writes the metric in XDR format.
+	 *
+	 * @param name
+	 * @param type
+	 * @param value
+	 * @throws IOException
+	 */
+	private void emitMetric(String name, String type, String value) throws IOException
+	{
+		String units = getUnits(name);
+		int slope = getSlope(name);
+		int tmax = getTmax(name);
+		int dmax = getDmax(name);
+		offset_ = 0;
+
+		xdr_int(0); // metric_user_defined
+		xdr_string(type);
+		xdr_string(name);
+		xdr_string(value);
+		xdr_string(units);
+		xdr_int(slope);
+		xdr_int(tmax);
+		xdr_int(dmax);
+
+		for (InetSocketAddress socketAddress : metricsServers_)
+		{
+			DatagramPacket packet = new DatagramPacket(buffer_, offset_, socketAddress);
+			datagramSocket_.send(packet);
+		}
+	}
+
+	private String getUnits(String metricName)
+	{
+		String result = (String) unitsTable_.get(metricName);
+		if (result == null)
+		{
+			result = DEFAULT_UNITS;
+		}
+
+		return result;
+	}
+
+	private int getSlope(String metricName)
+	{
+		String slopeString = (String) slopeTable_.get(metricName);
+		if (slopeString == null)
+		{
+			slopeString = DEFAULT_SLOPE;
+		}
+
+		return ("zero".equals(slopeString) ? 0 : 3); // see gmetric.c
+	}
+
+	private int getTmax(String metricName)
+	{
+		String tmaxString = (String) tmaxTable_.get(metricName);
+		if (tmaxString == null)
+		{
+			return DEFAULT_TMAX;
+		}
+		else
+		{
+			return Integer.parseInt(tmaxString);
+		}
+	}
+
+	private int getDmax(String metricName)
+	{
+		String dmaxString = (String) dmaxTable_.get(metricName);
+		if (dmaxString == null)
+		{
+			return DEFAULT_DMAX;
+		}
+		else
+		{
+			return Integer.parseInt(dmaxString);
+		}
+	}
+
+	/**
+	 * Puts a string into the buffer by first writing the size of the string
+	 * as an int, followed by the bytes of the string, padded if necessary to
+	 * a multiple of 4.
+	 */
+	private void xdr_string(String s)
+	{
+		byte[] bytes = s.getBytes();
+		int len = bytes.length;
+		xdr_int(len);
+		System.arraycopy(bytes, 0, buffer_, offset_, len);
+		offset_ += len;
+		pad();
+	}
+
+	/**
+	 * Pads the buffer with zero bytes up to the nearest multiple of 4.
+	 */
+	private void pad()
+	{
+		int newOffset = ((offset_ + 3) / 4) * 4;
+		while (offset_ < newOffset)
+		{
+			buffer_[offset_++] = 0;
+		}
+	}
+
+	/**
+	 * Puts an integer into the buffer as 4 bytes, big-endian.
+	 */
+	private void xdr_int(int i)
+	{
+		buffer_[offset_++] = (byte) ((i >> 24) & 0xff);
+		buffer_[offset_++] = (byte) ((i >> 16) & 0xff);
+		buffer_[offset_++] = (byte) ((i >> 8) & 0xff);
+		buffer_[offset_++] = (byte) (i & 0xff);
+	}
+
+
+
+	/**
+	 * Returns the names of all the factory's attributes.
+	 *
+	 * @return the attribute names
+	 */
+	public String[] getAttributeNames()
+	{
+		String[] result = new String[attributeMap_.size()];
+		int i = 0;
+		// for (String attributeName : attributeMap.keySet()) {
+		Iterator<String> it = attributeMap_.keySet().iterator();
+		while (it.hasNext())
+		{
+			result[i++] = it.next();
+		}
+		return result;
+	}
+
+	/**
+	 * Sets the named factory attribute to the specified value, creating it
+	 * if it did not already exist.	If the value is null, this is the same as
+	 * calling removeAttribute.
+	 *
+	 * @param attributeName the attribute name
+	 * @param value the new attribute value
+	 */
+	public void setAttribute(String attributeName, Object value)
+	{
+		attributeMap_.put(attributeName, value);
+	}
+
+	/**
+	 * Removes the named attribute if it exists.
+	 *
+	 * @param attributeName the attribute name
+	 */
+	public void removeAttribute(String attributeName)
+	{
+		attributeMap_.remove(attributeName);
+	}
+
+	/**
+	 * Returns the value of the named attribute, or null if there is no
+	 * attribute of that name.
+	 *
+	 * @param attributeName the attribute name
+	 * @return the attribute value
+	 */
+	public String getAttribute(String attributeName)
+	{
+		return (String)attributeMap_.get(attributeName);
+	}
+
+
+	/**
+	 * Returns an attribute-value map derived from the factory attributes
+	 * by finding all factory attributes that begin with
+	 * <i>contextName</i>.<i>tableName</i>.	The returned map consists of
+	 * those attributes with the contextName and tableName stripped off.
+	 */
+	protected Map<String,String> getAttributeTable(String tableName)
+	{
+		String prefix = tableName + ".";
+		Map<String,String> result = new HashMap<String,String>();
+		for (String attributeName : getAttributeNames())
+		{
+			if (attributeName.startsWith(prefix))
+			{
+				String name = attributeName.substring(prefix.length());
+				String value = (String) getAttribute(attributeName);
+				result.put(name, value);
+			}
+		}
+		return result;
+	}
+
+	/**
+	 * Starts or restarts monitoring, the emitting of metrics records.
+	 */
+	public void startMonitoring() throws IOException {
+		if (!isMonitoring)
+		{
+			startTimer();
+			isMonitoring = true;
+		}
+	}
+
+	/**
+	 * Stops monitoring.	This does not free buffered data.
+	 * @see #close()
+	 */
+	public void stopMonitoring() {
+		if (isMonitoring)
+		{
+			shutdown();
+			isMonitoring = false;
+		}
+	}
+
+	/**
+	 * Returns true if monitoring is currently in progress.
+	 */
+	public boolean isMonitoring() {
+		return isMonitoring;
+	}
+
+	/**
+	 * Stops monitoring and frees buffered data, returning this
+	 * object to its initial state.
+	 */
+	public void close()
+	{
+		stopMonitoring();
+		clearUpdaters();
+	}
+
+	/**
+	 * Creates a new AbstractMetricsRecord instance with the given <code>recordName</code>.
+	 * Throws an exception if the metrics implementation is configured with a fixed
+	 * set of record names and <code>recordName</code> is not in that set.
+	 *
+	 * @param recordName the name of the record
+	 * @throws AnalyticsException if recordName conflicts with configuration data
+	 */
+	public final void createRecord(String recordName)
+	{
+		if (bufferedData_.get(recordName) == null)
+		{
+			bufferedData_.put(recordName, new RecordMap());
+		}
+        recordMap_.put(recordName, new MetricsRecord(recordName, this));
+	}
+
+	/**
+	 * Return the MetricsRecord associated with this record name.
+	 * @param recordName the name of the record
+	 * @return newly created instance of MetricsRecordImpl or subclass
+	 */
+	public MetricsRecord getMetricsRecord(String recordName)
+	{
+		return recordMap_.get(recordName);
+	}
+
+	/**
+	 * Registers a callback to be called at time intervals determined by
+	 * the configuration.
+	 *
+	 * @param updater object to be run periodically; it should update
+	 * some metrics records
+	 */
+	public void registerUpdater(final IAnalyticsSource updater)
+	{
+		if (!updaters.contains(updater)) {
+			updaters.add(updater);
+		}
+	}
+
+	/**
+	 * Removes a callback, if it exists.
+	 *
+	 * @param updater object to be removed from the callback list
+	 */
+	public void unregisterUpdater(IAnalyticsSource updater)
+	{
+		updaters.remove(updater);
+	}
+
+	private void clearUpdaters()
+	{
+		updaters.clear();
+	}
+
+	/**
+	 * Starts timer if it is not already started
+	 */
+	private void startTimer()
+	{
+		if (timer == null)
+		{
+			timer = new Timer("Timer thread for monitoring AnalyticsContext", true);
+			TimerTask task = new TimerTask()
+			{
+				public void run()
+				{
+					try
+					{
+						timerEvent();
+					}
+					catch (IOException ioe)
+					{
+						ioe.printStackTrace();
+					}
+				}
+			};
+			long millis = period_ * 1000;
+			timer.scheduleAtFixedRate(task, millis, millis);
+		}
+	}
+
+	/**
+	 * Stops timer if it is running
+	 */
+	public void shutdown()
+	{
+		if (timer != null)
+		{
+			timer.cancel();
+			timer = null;
+		}
+	}
+
+	/**
+	 * Timer callback.
+	 */
+	private void timerEvent() throws IOException
+	{
+		if (isMonitoring)
+		{
+			Collection<IAnalyticsSource> myUpdaters;
+
+			// we dont need to synchronize as there will not be any
+			// addition or removal of listeners
+			myUpdaters = new ArrayList<IAnalyticsSource>(updaters);
+
+			// Run all the registered updates without holding a lock
+			// on this context
+			for (IAnalyticsSource updater : myUpdaters)
+			{
+				try
+				{
+					updater.doUpdates(this);
+				}
+				catch (Throwable throwable)
+				{
+					throwable.printStackTrace();
+				}
+			}
+			emitRecords();
+		}
+	}
+
+	/**
+	 *	Emits the records.
+	 */
+	private void emitRecords() throws IOException
+	{
+		for (String recordName : bufferedData_.keySet())
+		{
+			RecordMap recordMap = bufferedData_.get(recordName);
+			synchronized (recordMap)
+			{
+				for (TagMap tagMap : recordMap.keySet())
+				{
+					MetricMap metricMap = recordMap.get(tagMap);
+					OutputRecord outRec = new OutputRecord(tagMap, metricMap);
+					emitRecord(recordName, outRec);
+				}
+			}
+		}
+		flush();
+	}
+
+	/**
+	 * Called each period after all records have been emitted, this method does nothing.
+	 * Subclasses may override it in order to perform some kind of flush.
+	 */
+	protected void flush() throws IOException
+	{
+	}
+
+	/**
+	 * Called by MetricsRecordImpl.update().	Creates or updates a row in
+	 * the internal table of metric data.
+	 */
+	protected void update(MetricsRecord record)
+	{
+		String recordName = record.getRecordName();
+		TagMap tagTable = record.getTagTable();
+		Map<String,MetricValue> metricUpdates = record.getMetricTable();
+
+		RecordMap recordMap = getRecordMap(recordName);
+		synchronized (recordMap)
+		{
+			MetricMap metricMap = recordMap.get(tagTable);
+			if (metricMap == null)
+			{
+				metricMap = new MetricMap();
+				TagMap tagMap = new TagMap(tagTable); // clone tags
+				recordMap.put(tagMap, metricMap);
+			}
+			for (String metricName : metricUpdates.keySet())
+			{
+				MetricValue updateValue = metricUpdates.get(metricName);
+				Number updateNumber = updateValue.getNumber();
+				Number currentNumber = metricMap.get(metricName);
+				if (currentNumber == null || updateValue.isAbsolute())
+				{
+					metricMap.put(metricName, updateNumber);
+				}
+				else
+				{
+					Number newNumber = sum(updateNumber, currentNumber);
+					metricMap.put(metricName, newNumber);
+				}
+			}
+		}
+	}
+
+	private RecordMap getRecordMap(String recordName)
+	{
+		return bufferedData_.get(recordName);
+	}
+
+	/**
+	 * Adds two numbers, coercing the second to the type of the first.
+	 *
+	 */
+	private Number sum(Number a, Number b)
+	{
+		if (a instanceof Integer)
+		{
+			return new Integer(a.intValue() + b.intValue());
+		}
+		else if (a instanceof Float)
+		{
+			return new Float(a.floatValue() + b.floatValue());
+		}
+		else if (a instanceof Short)
+		{
+			return new Short((short)(a.shortValue() + b.shortValue()));
+		}
+		else if (a instanceof Byte)
+		{
+			return new Byte((byte)(a.byteValue() + b.byteValue()));
+		}
+		else
+		{
+			// should never happen
+			throw new AnalyticsException("Invalid number type");
+		}
+	}
+
+	/**
+	 * Called by MetricsRecordImpl.remove().	Removes any matching row in
+	 * the internal table of metric data.	A row matches if it has the same
+	 * tag names and tag values.
+	 */
+	protected void remove(MetricsRecord record)
+	{
+		String recordName = record.getRecordName();
+		TagMap tagTable = record.getTagTable();
+
+		RecordMap recordMap = getRecordMap(recordName);
+
+		recordMap.remove(tagTable);
+	}
+
+	/**
+	 * Returns the timer period.
+	 */
+	public int getPeriod()
+	{
+		return period_;
+	}
+
+	/**
+	 * Sets the timer period
+	 */
+	protected void setPeriod(int period)
+	{
+		this.period_ = period;
+	}
+
+	/**
+	 * Sets the default port to listen on
+	 */
+	public void setPort(int port)
+	{
+		port_ = port;
+	}
+
+	/**
+	 * Parses a space and/or comma separated sequence of server specifications
+	 * of the form <i>hostname</i> or <i>hostname:port</i>.	If
+	 * the specs string is null, defaults to localhost:defaultPort.
+	 *
+	 * @return a list of InetSocketAddress objects.
+	 */
+	private static List<InetSocketAddress> parse(String specs, int defaultPort)
+	{
+		List<InetSocketAddress> result = new ArrayList<InetSocketAddress>(1);
+		if (specs == null) {
+			result.add(new InetSocketAddress("localhost", defaultPort));
+		}
+		else {
+			String[] specStrings = specs.split("[ ,]+");
+			for (String specString : specStrings) {
+				int colon = specString.indexOf(':');
+				if (colon < 0 || colon == specString.length() - 1)
+				{
+					result.add(new InetSocketAddress(specString, defaultPort));
+				} else
+				{
+					String hostname = specString.substring(0, colon);
+					int port = Integer.parseInt(specString.substring(colon+1));
+					result.add(new InetSocketAddress(hostname, port));
+				}
+			}
+		}
+		return result;
+	}
+
+	/**
+	 * Starts up the analytics context and registers the VM metrics.
+	 */
+	public void start()
+	{
+		// register the vm analytics object with the analytics context to update the data
+		registerUpdater(new VMAnalyticsSource());
+
+
+        init("analyticsContext", DatabaseDescriptor.getGangliaServers());
+
+		try
+		{
+			startMonitoring();
+		}
+		catch(IOException e)
+		{
+			logger_.error(LogUtil.throwableToString(e));
+		}
+	}
+
+	public void stop()
+	{
+		close();
+	}
+
+    /**
+     * Factory method that gets an instance of the StorageService
+     * class.
+     */
+    public static AnalyticsContext instance()
+    {
+        if ( instance_ == null )
+        {
+        	AnalyticsContext.createLock_.lock();
+            try
+            {
+                if ( instance_ == null )
+                {
+                    instance_ = new AnalyticsContext();
+                }
+            }
+            finally
+            {
+                createLock_.unlock();
+            }
+        }
+        return instance_;
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/analytics/AnalyticsException.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/analytics/AnalyticsException.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/analytics/AnalyticsException.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/analytics/AnalyticsException.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.analytics;
+
+/**
+ * General-purpose, unchecked metrics exception.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com ) & Karthik Ranganathan ( kranganathan@facebook.com )
+ */
+public class AnalyticsException extends RuntimeException
+{
+
+	  private static final long serialVersionUID = -1643257498540498497L;
+
+	  /**
+	   * Creates a new instance of MetricsException
+	   */
+	  public AnalyticsException()
+	  {
+	  }
+
+	  /** Creates a new instance of MetricsException
+	   *
+	   * @param message an error message
+	   */
+	  public AnalyticsException(String message)
+	  {
+	    super(message);
+	  }
+
+	}

Added: incubator/cassandra/src/org/apache/cassandra/analytics/DBAnalyticsSource.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/analytics/DBAnalyticsSource.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/analytics/DBAnalyticsSource.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/analytics/DBAnalyticsSource.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.analytics;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * This class sets up the analytics package to report metrics into
+ * Ganglia for the various DB operations such as: reads per second,
+ * average read latency, writes per second, average write latency.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com ) & Karthik Ranganathan ( kranganathan@facebook.com )
+ */
+public class DBAnalyticsSource implements IAnalyticsSource
+{
+    private static final String METRIC_READ_OPS = "Read Operations";
+    private static final String RECORD_READ_OPS = "ReadOperationsRecord";
+    private static final String TAG_READOPS = "ReadOperationsTag";
+    private static final String TAG_READ_OPS = "ReadOperationsTagValue";
+
+    private static final String METRIC_READ_AVG = "Average Read Latency";
+    private static final String RECORD_READ_AVG = "ReadLatencyRecord";
+    private static final String TAG_READAVG = "AverageReadLatencyTag";
+    private static final String TAG_READ_AVG = "ReadLatencyTagValue";
+
+    private static final String METRIC_WRITE_OPS = "Write Operations";
+    private static final String RECORD_WRITE_OPS = "WriteOperationsRecord";
+    private static final String TAG_WRITEOPS = "WriteOperationsTag";
+    private static final String TAG_WRITE_OPS = "WriteOperationsTagValue";
+
+    private static final String METRIC_WRITE_AVG = "Average Write Latency";
+    private static final String RECORD_WRITE_AVG = "WriteLatencyRecord";
+    private static final String TAG_WRITEAVG = "AverageWriteLatencyTag";
+    private static final String TAG_WRITE_AVG = "WriteLatencyTagValue";
+
+    /* keep track of the number of read operations */
+    private AtomicInteger readOperations_ = new AtomicInteger(0);
+
+    /* keep track of the number of read latencies */
+    private AtomicLong readLatencies_ = new AtomicLong(0);
+
+    /* keep track of the number of write operations */
+    private AtomicInteger writeOperations_ = new AtomicInteger(0);
+
+    /* keep track of the number of write latencies */
+    private AtomicLong writeLatencies_ = new AtomicLong(0);
+
+    /**
+     * Create all the required records we intend to display, and
+     * register with the AnalyticsContext.
+     */
+    public DBAnalyticsSource()
+    {
+        /* register with the AnalyticsContext */
+        AnalyticsContext.instance().registerUpdater(this);
+        /* set the units for the metric type */
+        AnalyticsContext.instance().setAttribute("units." + METRIC_READ_OPS, "r/s");
+        /* create the record */
+        AnalyticsContext.instance().createRecord(RECORD_READ_OPS);
+
+        /* set the units for the metric type */
+        AnalyticsContext.instance().setAttribute("units." + METRIC_READ_AVG, "ms");
+        /* create the record */
+        AnalyticsContext.instance().createRecord(RECORD_READ_AVG);
+
+        /* set the units for the metric type */
+        AnalyticsContext.instance().setAttribute("units." + METRIC_WRITE_OPS, "w/s");
+        /* create the record */
+        AnalyticsContext.instance().createRecord(RECORD_WRITE_OPS);
+
+        /* set the units for the metric type */
+        AnalyticsContext.instance().setAttribute("units." + METRIC_WRITE_AVG, "ms");
+        /* create the record */
+        AnalyticsContext.instance().createRecord(RECORD_WRITE_AVG);
+    }
+
+    /**
+     * Update each of the records with the relevant data
+     *
+     * @param context the reference to the context which has called this callback
+     */
+    public void doUpdates(AnalyticsContext context)
+    {
+        // update the read operations record
+        MetricsRecord readUsageRecord = context.getMetricsRecord(RECORD_READ_OPS);
+        int period = context.getPeriod();
+
+        if(readUsageRecord != null)
+        {
+            if ( readOperations_.get() > 0 )
+            {
+                readUsageRecord.setTag(TAG_READOPS, TAG_READ_OPS);
+                readUsageRecord.setMetric(METRIC_READ_OPS, readOperations_.get() / period);
+                readUsageRecord.update();
+            }
+        }
+
+        // update the read latency record
+        MetricsRecord readLatencyRecord = context.getMetricsRecord(RECORD_READ_AVG);
+        if(readLatencyRecord != null)
+        {
+            if ( readOperations_.get() > 0 )
+            {
+                readLatencyRecord.setTag(TAG_READAVG, TAG_READ_AVG);
+                readLatencyRecord.setMetric(METRIC_READ_AVG, readLatencies_.get() / readOperations_.get() );
+                readLatencyRecord.update();
+            }
+        }
+
+        // update the write operations record
+        MetricsRecord writeUsageRecord = context.getMetricsRecord(RECORD_WRITE_OPS);
+        if(writeUsageRecord != null)
+        {
+            if ( writeOperations_.get() > 0 )
+            {
+                writeUsageRecord.setTag(TAG_WRITEOPS, TAG_WRITE_OPS);
+                writeUsageRecord.setMetric(METRIC_WRITE_OPS, writeOperations_.get() / period);
+                writeUsageRecord.update();
+            }
+        }
+
+        // update the write latency record
+        MetricsRecord writeLatencyRecord = context.getMetricsRecord(RECORD_WRITE_AVG);
+        if(writeLatencyRecord != null)
+        {
+            if ( writeOperations_.get() > 0 )
+            {
+                writeLatencyRecord.setTag(TAG_WRITEAVG, TAG_WRITE_AVG);
+                writeLatencyRecord.setMetric(METRIC_WRITE_AVG, writeLatencies_.get() / writeOperations_.get() );
+                writeLatencyRecord.update();
+            }
+        }
+
+        clear();
+    }
+
+    /**
+     * Reset all the metric records
+     */
+    private void clear()
+    {
+        readOperations_.set(0);
+        readLatencies_.set(0);
+        writeOperations_.set(0);
+        writeLatencies_.set(0);
+    }
+
+    /**
+     * Update the read statistics.
+     */
+    public void updateReadStatistics(long latency)
+    {
+        readOperations_.incrementAndGet();
+        readLatencies_.addAndGet(latency);
+    }
+
+    /**
+     * Update the write statistics.
+     */
+    public void updateWriteStatistics(long latency)
+    {
+        writeOperations_.incrementAndGet();
+        writeLatencies_.addAndGet(latency);
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/analytics/IAnalyticsSource.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/analytics/IAnalyticsSource.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/analytics/IAnalyticsSource.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/analytics/IAnalyticsSource.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.analytics;
+
+/**
+ * Call-back interface.  See <code>AnalyticsContext.registerUpdater()</code>.
+ * This callback is called at a regular (pre-registered time interval) in
+ * order to update the metric values.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com ) & Karthik Ranganathan ( kranganathan@facebook.com )
+ */
+public interface IAnalyticsSource
+{
+  /**
+   * Timer-based call-back from the metric library.
+   */
+  public abstract void doUpdates(AnalyticsContext context);
+
+}

Added: incubator/cassandra/src/org/apache/cassandra/analytics/MetricValue.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/analytics/MetricValue.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/analytics/MetricValue.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/analytics/MetricValue.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.analytics;
+
+
+/**
+ * A Number that is either an absolute or an incremental amount.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com ) & Karthik Ranganathan ( kranganathan@facebook.com )
+ */
+public class MetricValue
+{
+	public static final boolean ABSOLUTE = false;
+	public static final boolean INCREMENT = true;
+
+	private boolean isIncrement;
+	private Number number;
+
+	/**
+	 * Creates a new instance of MetricValue
+	 *
+	 *  @param number this initializes the initial value of this metric
+	 *  @param isIncrement sets if the metric can be incremented or only set
+	 */
+	public MetricValue(Number number, boolean isIncrement)
+	{
+		this.number = number;
+		this.isIncrement = isIncrement;
+	}
+
+	/**
+	 * Checks if this metric can be incremented.
+	 *
+	 * @return true if the value of this metric can be incremented, false otherwise
+	 */
+	public boolean isIncrement()
+	{
+		return isIncrement;
+	}
+
+	/**
+	 * Checks if the value of this metric is always an absolute value. This is the
+	 * inverse of isIncrement.
+	 *
+	 * @return true if the
+	 */
+	public boolean isAbsolute()
+	{
+		return !isIncrement;
+	}
+
+	/**
+	 * Returns the current number value of the metric.
+	 *
+	 * @return the Number value of this metric
+	 */
+	public Number getNumber()
+	{
+		return number;
+	}
+}

Added: incubator/cassandra/src/org/apache/cassandra/analytics/MetricsRecord.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/analytics/MetricsRecord.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/analytics/MetricsRecord.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/analytics/MetricsRecord.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.analytics;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+
+/**
+ * This class keeps a back-pointer to the AnalyticsContext
+ * and delegates back to it on <code>update</code> and <code>remove()</code>.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com ) & Karthik Ranganathan ( kranganathan@facebook.com )
+ */
+public class MetricsRecord {
+
+	private AnalyticsContext.TagMap tagTable = new AnalyticsContext.TagMap();
+	private Map<String,MetricValue> metricTable = new LinkedHashMap<String,MetricValue>();
+
+	private String recordName;
+	private AnalyticsContext context;
+
+
+	/**
+	 * Creates a new instance of MetricsRecord
+	 *
+	 *  @param recordName name of this record
+	 *  @param context the context which this record is a part of
+	 */
+	protected MetricsRecord(String recordName, AnalyticsContext context)
+	{
+		this.recordName = recordName;
+		this.context = context;
+	}
+
+	/**
+	 * Returns the record name.
+	 *
+	 * @return the record name
+	 */
+	public String getRecordName() {
+		return recordName;
+	}
+
+	/**
+	 * Sets the named tag to the specified value.
+	 *
+	 * @param tagName name of the tag
+	 * @param tagValue new value of the tag
+	 * @throws MetricsException if the tagName conflicts with the configuration
+	 */
+	public void setTag(String tagName, String tagValue) {
+		if (tagValue == null) {
+			tagValue = "";
+		}
+		tagTable.put(tagName, tagValue);
+	}
+
+	/**
+	 * Sets the named tag to the specified value.
+	 *
+	 * @param tagName name of the tag
+	 * @param tagValue new value of the tag
+	 * @throws MetricsException if the tagName conflicts with the configuration
+	 */
+	public void setTag(String tagName, int tagValue) {
+		tagTable.put(tagName, new Integer(tagValue));
+	}
+
+	/**
+	 * Sets the named tag to the specified value.
+	 *
+	 * @param tagName name of the tag
+	 * @param tagValue new value of the tag
+	 * @throws MetricsException if the tagName conflicts with the configuration
+	 */
+	public void setTag(String tagName, short tagValue) {
+		tagTable.put(tagName, new Short(tagValue));
+	}
+
+	/**
+	 * Sets the named tag to the specified value.
+	 *
+	 * @param tagName name of the tag
+	 * @param tagValue new value of the tag
+	 * @throws MetricsException if the tagName conflicts with the configuration
+	 */
+	public void setTag(String tagName, byte tagValue)
+	{
+		tagTable.put(tagName, new Byte(tagValue));
+	}
+
+	/**
+	 * Sets the named metric to the specified value.
+	 *
+	 * @param metricName name of the metric
+	 * @param metricValue new value of the metric
+	 * @throws MetricsException if the metricName or the type of the metricValue
+	 * conflicts with the configuration
+	 */
+	public void setMetric(String metricName, int metricValue)
+	{
+		setAbsolute(metricName, new Integer(metricValue));
+	}
+
+	/**
+	 * Sets the named metric to the specified value.
+	 *
+	 * @param metricName name of the metric
+	 * @param metricValue new value of the metric
+	 * @throws MetricsException if the metricName or the type of the metricValue
+	 * conflicts with the configuration
+	 */
+	public void setMetric(String metricName, short metricValue)
+	{
+		setAbsolute(metricName, new Short(metricValue));
+	}
+
+	/**
+	 * Sets the named metric to the specified value.
+	 *
+	 * @param metricName name of the metric
+	 * @param metricValue new value of the metric
+	 * @throws MetricsException if the metricName or the type of the metricValue
+	 * conflicts with the configuration
+	 */
+	public void setMetric(String metricName, byte metricValue)
+	{
+		setAbsolute(metricName, new Byte(metricValue));
+	}
+
+	/**
+	 * Sets the named metric to the specified value.
+	 *
+	 * @param metricName name of the metric
+	 * @param metricValue new value of the metric
+	 * @throws MetricsException if the metricName or the type of the metricValue
+	 * conflicts with the configuration
+	 */
+	public void setMetric(String metricName, float metricValue)
+	{
+		setAbsolute(metricName, new Float(metricValue));
+	}
+
+	/**
+	 * Increments the named metric by the specified value.
+	 *
+	 * @param metricName name of the metric
+	 * @param metricValue incremental value
+	 * @throws MetricsException if the metricName or the type of the metricValue
+	 * conflicts with the configuration
+	 */
+	public void incrMetric(String metricName, int metricValue)
+	{
+		setIncrement(metricName, new Integer(metricValue));
+	}
+
+	/**
+	 * Increments the named metric by the specified value.
+	 *
+	 * @param metricName name of the metric
+	 * @param metricValue incremental value
+	 * @throws MetricsException if the metricName or the type of the metricValue
+	 * conflicts with the configuration
+	 */
+	public void incrMetric(String metricName, short metricValue)
+	{
+		setIncrement(metricName, new Short(metricValue));
+	}
+
+	/**
+	 * Increments the named metric by the specified value.
+	 *
+	 * @param metricName name of the metric
+	 * @param metricValue incremental value
+	 * @throws MetricsException if the metricName or the type of the metricValue
+	 * conflicts with the configuration
+	 */
+	public void incrMetric(String metricName, byte metricValue)
+	{
+		setIncrement(metricName, new Byte(metricValue));
+	}
+
+	/**
+	 * Increments the named metric by the specified value.
+	 *
+	 * @param metricName name of the metric
+	 * @param metricValue incremental value
+	 * @throws MetricsException if the metricName or the type of the metricValue
+	 * conflicts with the configuration
+	 */
+	public void incrMetric(String metricName, float metricValue)
+	{
+		setIncrement(metricName, new Float(metricValue));
+	}
+
+	/**
+	 * Sets the value of the metric identified by metricName with the
+	 * number metricValue.
+	 *
+	 * @param metricName name of the metric
+	 * @param metricValue number value to which it should be updated
+	 */
+	private void setAbsolute(String metricName, Number metricValue)
+	{
+		metricTable.put(metricName, new MetricValue(metricValue, MetricValue.ABSOLUTE));
+	}
+
+	/**
+	 * Increments the value of the metric identified by metricName with the
+	 * number metricValue.
+	 *
+	 * @param metricName name of the metric
+	 * @param metricValue number value by which it should be incremented
+	 */
+	private void setIncrement(String metricName, Number metricValue)
+	{
+		metricTable.put(metricName, new MetricValue(metricValue, MetricValue.INCREMENT));
+	}
+
+	/**
+	 * Updates the table of buffered data which is to be sent periodically.
+	 * If the tag values match an existing row, that row is updated;
+	 * otherwise, a new row is added.
+	 */
+	public void update()
+	{
+		context.update(this);
+	}
+
+	/**
+	 * Removes the row, if it exists, in the buffered data table having tags
+	 * that equal the tags that have been set on this record.
+	 */
+	public void remove()
+	{
+		context.remove(this);
+	}
+
+	AnalyticsContext.TagMap getTagTable()
+	{
+		return tagTable;
+	}
+
+	Map<String, MetricValue> getMetricTable()
+	{
+		return metricTable;
+	}
+}

Added: incubator/cassandra/src/org/apache/cassandra/analytics/OutputRecord.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/analytics/OutputRecord.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/analytics/OutputRecord.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/analytics/OutputRecord.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.analytics;
+
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * Represents a record of metric data to be sent to a metrics system.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com ) & Karthik Ranganathan ( kranganathan@facebook.com )
+ */
+public class OutputRecord
+{
+	private AnalyticsContext.TagMap tagMap;
+	private AnalyticsContext.MetricMap metricMap;
+
+	/**
+	 * Creates a new instance of OutputRecord
+	 */
+	OutputRecord(AnalyticsContext.TagMap tagMap, AnalyticsContext.MetricMap metricMap)
+	{
+		this.tagMap = tagMap;
+		this.metricMap = metricMap;
+	}
+
+	/**
+	 * Returns the set of tag names.
+	 */
+	public Set<String> getTagNames()
+	{
+		return Collections.unmodifiableSet(tagMap.keySet());
+	}
+
+	/**
+	 * Returns a tag object which is can be a String, Integer, Short or Byte.
+	 *
+	 * @return the tag value, or null if there is no such tag
+	 */
+	public Object getTag(String name)
+	{
+		return tagMap.get(name);
+	}
+
+	/**
+	 * Returns the set of metric names.
+	 *
+	 * @return the set of metric names
+	 */
+	public Set<String> getMetricNames()
+	{
+		return Collections.unmodifiableSet(metricMap.keySet());
+	}
+
+	/**
+	 * Returns the metric object which can be a Float, Integer, Short or Byte.
+	 *
+	 * @param name name of the metric for which the value is being requested
+	 * @return return the tag value, or null if there is no such tag
+	 */
+	public Number getMetric(String name)
+	{
+		return (Number) metricMap.get(name);
+	}
+
+}
\ No newline at end of file

Added: incubator/cassandra/src/org/apache/cassandra/analytics/VMAnalyticsSource.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/analytics/VMAnalyticsSource.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/analytics/VMAnalyticsSource.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/analytics/VMAnalyticsSource.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.analytics;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.MemoryUsage;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+
+/**
+ * This class sets up the analytics package to report metrics into
+ * Ganglia for VM heap utilization.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com ) & Karthik Ranganathan ( kranganathan@facebook.com )
+ */
+
+public class VMAnalyticsSource implements IAnalyticsSource
+{
+	private static final String METRIC_MEMUSAGE = "VM Heap Utilization";
+	private static final String RECORD_MEMUSAGE = "MemoryUsageRecord";
+	private static final String TAG_MEMUSAGE = "MemoryUsageTag";
+	private static final String TAG_MEMUSAGE_MEMUSED = "MemoryUsedTagValue";
+
+	/**
+	 * Setup the Ganglia record to display the VM heap utilization.
+	 */
+	public VMAnalyticsSource()
+	{
+		// set the units for the metric type
+		AnalyticsContext.instance().setAttribute("units." + METRIC_MEMUSAGE, "MB");
+		// create the record
+        AnalyticsContext.instance().createRecord(RECORD_MEMUSAGE);
+  	}
+
+	/**
+	 * Update the VM heap utilization record with the relevant data.
+	 *
+	 * @param context the reference to the context which has called this callback
+	 */
+	public void doUpdates(AnalyticsContext context)
+	{
+        // update the memory used record
+		MetricsRecord memUsageRecord = context.getMetricsRecord(RECORD_MEMUSAGE);
+		if(memUsageRecord != null)
+		{
+			updateUsedMemory(memUsageRecord);
+		}
+	}
+
+	private void updateUsedMemory(MetricsRecord memUsageRecord)
+	{
+		memUsageRecord.setTag(TAG_MEMUSAGE, TAG_MEMUSAGE_MEMUSED);
+		memUsageRecord.setMetric(METRIC_MEMUSAGE, getMemoryUsed());
+		memUsageRecord.update();
+	}
+
+	private float getMemoryUsed()
+	{
+        MemoryMXBean memoryMxBean = ManagementFactory.getMemoryMXBean();
+        MemoryUsage memUsage = memoryMxBean.getHeapMemoryUsage();
+        return (float)memUsage.getUsed()/(1024 * 1024);
+	}
+}

Added: incubator/cassandra/src/org/apache/cassandra/cli/Cli.g
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/cli/Cli.g?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/cli/Cli.g (added)
+++ incubator/cassandra/src/org/apache/cassandra/cli/Cli.g Mon Mar  2 06:12:46 2009
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+//
+// ANTLR Grammar for the Cassandra Command Line Interface (CLI).
+//
+// Note: This grammar handles all but the CQL statements. CQL
+// statements are detected separately (based on the first token)
+// and directly sent to server-side for processing.
+//
+
+grammar Cli;
+
+options {
+    output=AST;
+    ASTLabelType=CommonTree;
+    backtrack=true;
+}
+
+//
+// Nodes in the AST
+//
+tokens {
+    //
+    // Top-level nodes. These typically correspond to
+    // various top-level CLI statements.
+    //
+    NODE_CONNECT;
+    NODE_DESCRIBE_TABLE;
+    NODE_EXIT;
+    NODE_HELP;
+    NODE_NO_OP;
+    NODE_SHOW_CLUSTER_NAME;
+    NODE_SHOW_CONFIG_FILE;
+    NODE_SHOW_VERSION;
+    NODE_SHOW_TABLES;
+    NODE_THRIFT_GET;
+    NODE_THRIFT_SET;
+
+    // Internal Nodes.
+    NODE_COLUMN_ACCESS;
+    NODE_ID_LIST;
+}
+
+@parser::header {
+package com.facebook.infrastructure.cli;
+}
+
+@lexer::header {
+package com.facebook.infrastructure.cli;
+}
+
+//
+// Parser Section
+//
+
+// the root node
+root: stmt SEMICOLON? EOF -> stmt;
+
+stmt
+    : connectStmt
+    | exitStmt
+    | describeTable
+    | getStmt
+    | helpStmt
+    | setStmt
+    | showStmt
+    | -> ^(NODE_NO_OP)
+    ;
+
+connectStmt
+    : K_CONNECT host SLASH port -> ^(NODE_CONNECT host port)
+    ;
+
+helpStmt
+    : K_HELP -> ^(NODE_HELP)
+    | '?'    -> ^(NODE_HELP)
+    ;
+
+exitStmt
+    : K_QUIT -> ^(NODE_EXIT)
+    | K_EXIT -> ^(NODE_EXIT)
+    ;
+
+getStmt
+    : K_THRIFT K_GET columnFamilyExpr -> ^(NODE_THRIFT_GET columnFamilyExpr)
+    ;
+
+setStmt
+    : K_THRIFT K_SET columnFamilyExpr '=' value -> ^(NODE_THRIFT_SET columnFamilyExpr value)
+    ;
+
+showStmt
+    : showClusterName
+    | showVersion
+    | showConfigFile
+    | showTables
+    ;
+
+showClusterName
+    : K_SHOW K_CLUSTER K_NAME -> ^(NODE_SHOW_CLUSTER_NAME)
+    ;
+
+showConfigFile
+    : K_SHOW K_CONFIG K_FILE -> ^(NODE_SHOW_CONFIG_FILE)
+    ;
+
+showVersion
+    : K_SHOW K_VERSION -> ^(NODE_SHOW_VERSION)
+    ;
+
+showTables
+    : K_SHOW K_TABLES -> ^(NODE_SHOW_TABLES)
+    ;
+
+describeTable
+    : K_DESCRIBE K_TABLE table -> ^(NODE_DESCRIBE_TABLE table);
+
+columnFamilyExpr
+    : table DOT columnFamily '[' rowKey ']' 
+        ( '[' a+=columnOrSuperColumn ']' 
+            ('[' a+=columnOrSuperColumn ']')? 
+        )?
+      -> ^(NODE_COLUMN_ACCESS table columnFamily rowKey ($a+)?)
+    ;
+
+table: Identifier;
+
+columnFamily: Identifier;
+
+rowKey:   StringLiteral;
+
+value: StringLiteral;
+
+columnOrSuperColumn: StringLiteral;
+
+host: id+=Identifier (id+=DOT id+=Identifier)* -> ^(NODE_ID_LIST $id+);
+
+port: IntegerLiteral;
+
+//
+// Lexer Section
+//
+
+//
+// Keywords (in alphabetical order for convenience)
+//
+// CLI is case-insensitive with respect to these keywords.
+// However, they MUST be listed in upper case here.
+// 
+K_CONFIG:     'CONFIG';
+K_CONNECT:    'CONNECT';
+K_CLUSTER:    'CLUSTER';
+K_DESCRIBE:   'DESCRIBE';
+K_GET:        'GET';
+K_HELP:       'HELP';
+K_EXIT:       'EXIT';
+K_FILE:       'FILE';
+K_NAME:       'NAME';
+K_QUIT:       'QUIT';
+K_SET:        'SET';
+K_SHOW:       'SHOW';
+K_TABLE:      'TABLE';
+K_TABLES:     'TABLES';
+K_THRIFT:     'THRIFT';
+K_VERSION:    'VERSION';
+
+// private syntactic rules
+fragment
+Letter
+    : 'a'..'z' 
+    | 'A'..'Z'
+    ;
+
+fragment
+Digit
+    : '0'..'9'
+    ;
+
+// syntactic Elements
+Identifier
+    : Letter ( Letter | Digit | '_')*
+    ;
+
+
+// literals
+StringLiteral
+    :
+    '\'' (~'\'')* '\'' ( '\'' (~'\'')* '\'' )* 
+    ;
+
+IntegerLiteral
+   : Digit+;
+
+
+//
+// syntactic elements
+//
+
+DOT
+    : '.'
+    ;
+
+SLASH
+    : '/'
+    ;
+
+SEMICOLON
+    : ';'
+    ;
+
+WS
+    :  (' '|'\r'|'\t'|'\n') {$channel=HIDDEN;}  // whitepace
+    ;
+
+COMMENT 
+    : '--' (~('\n'|'\r'))*                     { $channel=HIDDEN; }
+    | '/*' (options {greedy=false;} : .)* '*/' { $channel=HIDDEN; }
+    ;

Added: incubator/cassandra/src/org/apache/cassandra/cli/Cli.tokens
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/cli/Cli.tokens?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/cli/Cli.tokens (added)
+++ incubator/cassandra/src/org/apache/cassandra/cli/Cli.tokens Mon Mar  2 06:12:46 2009
@@ -0,0 +1,43 @@
+NODE_SHOW_CONFIG_FILE=10
+K_TABLES=32
+K_VERSION=31
+K_EXIT=22
+NODE_EXIT=6
+K_FILE=30
+K_GET=24
+K_CONNECT=18
+K_CONFIG=29
+SEMICOLON=17
+Digit=40
+Identifier=36
+NODE_THRIFT_GET=13
+K_SET=25
+StringLiteral=37
+NODE_HELP=7
+NODE_NO_OP=8
+NODE_THRIFT_SET=14
+K_DESCRIBE=33
+NODE_SHOW_VERSION=11
+NODE_ID_LIST=16
+WS=41
+NODE_CONNECT=4
+SLASH=19
+K_THRIFT=23
+NODE_SHOW_TABLES=12
+K_CLUSTER=27
+K_HELP=20
+K_SHOW=26
+NODE_DESCRIBE_TABLE=5
+K_TABLE=34
+IntegerLiteral=38
+NODE_SHOW_CLUSTER_NAME=9
+COMMENT=42
+DOT=35
+K_NAME=28
+Letter=39
+NODE_COLUMN_ACCESS=15
+K_QUIT=21
+'?'=43
+'='=44
+'['=45
+']'=46

Added: incubator/cassandra/src/org/apache/cassandra/cli/CliClient.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/cli/CliClient.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/cli/CliClient.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/cli/CliClient.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,323 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cli;
+
+import com.facebook.thrift.*;
+
+import org.antlr.runtime.tree.*;
+import org.apache.cassandra.cql.common.Utils;
+import org.apache.cassandra.service.Cassandra;
+import org.apache.cassandra.service.CassandraException;
+import org.apache.cassandra.service.CqlResult_t;
+import org.apache.cassandra.service.column_t;
+import org.apache.cassandra.service.Cassandra.Client;
+import org.apache.cassandra.utils.LogUtil;
+
+import java.util.*;
+
+// Cli Client Side Library
+public class CliClient 
+{
+    private Cassandra.Client thriftClient_ = null;
+    private CliSessionState css_ = null;
+
+    public CliClient(CliSessionState css, Cassandra.Client thriftClient)
+    {
+        css_ = css;
+        thriftClient_ = thriftClient;
+    }
+
+    // Execute a CLI Statement 
+    public void executeCLIStmt(String stmt) throws TException 
+    {
+        CommonTree ast = null;
+
+        ast = CliCompiler.compileQuery(stmt);
+
+        switch (ast.getType()) {
+        case CliParser.NODE_EXIT:
+            cleanupAndExit();
+            break;
+        case CliParser.NODE_THRIFT_GET:
+            executeGet(ast);
+            break;
+        case CliParser.NODE_HELP:
+            printCmdHelp();
+            break;
+        case CliParser.NODE_THRIFT_SET:
+            executeSet(ast);
+            break;
+        case CliParser.NODE_SHOW_CLUSTER_NAME:
+            executeShowProperty(ast, "cluster name");
+            break;
+        case CliParser.NODE_SHOW_CONFIG_FILE:
+            executeShowProperty(ast, "config file");
+            break;
+        case CliParser.NODE_SHOW_VERSION:
+            executeShowProperty(ast, "version");
+            break;
+        case CliParser.NODE_SHOW_TABLES:
+            executeShowTables(ast);
+            break;
+        case CliParser.NODE_DESCRIBE_TABLE:
+            executeDescribeTable(ast);
+            break;
+        case CliParser.NODE_CONNECT:
+            executeConnect(ast);
+            break;
+        case CliParser.NODE_NO_OP:
+            // comment lines come here; they are treated as no ops.
+            break;
+        default:
+            css_.err.println("Invalid Statement (Type: " + ast.getType() + ")");
+            break;
+        }
+    }
+    
+    private void printCmdHelp()
+    {
+       css_.out.println("List of all CLI commands:");
+       css_.out.println("?                                                         Same as help.");
+       css_.out.println("connect <hostname>/<port>                                 Connect to Cassandra's thrift service.");
+       css_.out.println("describe table <tbl>                                      Describe table.");
+       css_.out.println("exit                                                      Exit CLI.");
+       css_.out.println("explain plan [<set stmt>|<get stmt>|<select stmt>]        Explains the PLAN for specified stmt.");
+       css_.out.println("help                                                      Display this help.");
+       css_.out.println("quit                                                      Exit CLI.");
+       css_.out.println("show config file                                          Display contents of config file");
+       css_.out.println("show cluster name                                         Display cassandra server version");
+       css_.out.println("show tables                                               Show list of tables.");
+       css_.out.println("show version                                              Show server version.");
+       css_.out.println("select ...                                                CQL select statement (TBD).");
+       css_.out.println("get ...                                                   CQL data retrieval statement.");
+       css_.out.println("set ...                                                   CQL DML statement.");
+       css_.out.println("thrift get <tbl>.<cf>['<rowKey>']                         (will be deprecated)");            
+       css_.out.println("thrift get <tbl>.<cf>['<rowKey>']['<colKey>']             (will be deprecated)");            
+       css_.out.println("thrift set <tbl>.<cf>['<rowKey>']['<colKey>'] = '<value>' (will be deprecated)");    
+    }
+
+    private void cleanupAndExit()
+    {
+        CliMain.disconnect();
+        System.exit(0);
+    }
+
+    // Execute GET statement
+    private void executeGet(CommonTree ast) throws TException
+    {
+        if (!CliMain.isConnected())
+            return;
+
+        int childCount = ast.getChildCount();
+        assert(childCount == 1);
+
+        CommonTree columnFamilySpec = (CommonTree)ast.getChild(0);
+        assert(columnFamilySpec.getType() == CliParser.NODE_COLUMN_ACCESS);
+
+        String tableName     = CliCompiler.getTableName(columnFamilySpec);
+        String key           = CliCompiler.getKey(columnFamilySpec);
+        String columnFamily  = CliCompiler.getColumnFamily(columnFamilySpec);
+        int    columnSpecCnt = CliCompiler.numColumnSpecifiers(columnFamilySpec);
+
+        // assume simple columnFamily for now
+        if (columnSpecCnt == 0)
+        {
+            // table.cf['key']
+        	List<column_t> columns = new ArrayList<column_t>();
+        	try
+        	{
+        		columns = thriftClient_.get_slice(tableName, key, columnFamily, -1, 1000000);
+        	}
+        	catch(CassandraException cex)
+        	{
+        		css_.out.println(LogUtil.throwableToString(cex));
+        	}
+            int size = columns.size();
+            for (Iterator<column_t> colIter = columns.iterator(); colIter.hasNext(); )
+            {
+                column_t col = colIter.next();
+                css_.out.printf("  (column=%s, value=%s; timestamp=%d)\n",
+                                 col.columnName, col.value, col.timestamp);
+            }
+            css_.out.println("Returned " + size + " rows.");
+        }
+        else if (columnSpecCnt == 1)
+        {
+            // table.cf['key']['column']
+            String columnName = CliCompiler.getColumn(columnFamilySpec, 0);
+            column_t col = new column_t();
+            try
+            {
+            	col = thriftClient_.get_column(tableName, key, columnFamily + ":" + columnName);
+	    	}
+	    	catch(CassandraException cex)
+	    	{
+	    		css_.out.println(LogUtil.throwableToString(cex));
+	    	}
+            
+            css_.out.printf("==> (name=%s, value=%s; timestamp=%d)\n",
+                            col.columnName, col.value, col.timestamp);
+        }
+        else
+        {
+            assert(false);
+        }
+    }
+
+    // Execute SET statement
+    private void executeSet(CommonTree ast) throws TException
+    {
+        if (!CliMain.isConnected())
+            return;
+
+        int childCount = ast.getChildCount();
+        assert(childCount == 2);
+
+        CommonTree columnFamilySpec = (CommonTree)ast.getChild(0);
+        assert(columnFamilySpec.getType() == CliParser.NODE_COLUMN_ACCESS);
+
+        String tableName     = CliCompiler.getTableName(columnFamilySpec);
+        String key           = CliCompiler.getKey(columnFamilySpec);
+        String columnFamily  = CliCompiler.getColumnFamily(columnFamilySpec);
+        int    columnSpecCnt = CliCompiler.numColumnSpecifiers(columnFamilySpec);
+        String value         = Utils.unescapeSQLString(ast.getChild(1).getText());
+
+        // assume simple columnFamily for now
+        if (columnSpecCnt == 1)
+        {
+            // We have the table.cf['key']['column'] = 'value' case.
+
+            // get the column name
+            String columnName = CliCompiler.getColumn(columnFamilySpec, 0);
+
+            // do the insert
+            thriftClient_.insert(tableName, key, columnFamily + ":" + columnName,
+                                 value, System.currentTimeMillis());
+
+            css_.out.println("Value inserted.");
+        }
+        else
+        {
+            /* for now (until we support batch sets) */
+            assert(false);
+        }
+    }
+
+    private void executeShowProperty(CommonTree ast, String propertyName) throws TException
+    {
+        if (!CliMain.isConnected())
+            return;
+
+        String propertyValue = thriftClient_.getStringProperty(propertyName);
+        css_.out.println(propertyValue);
+        return;
+    }
+
+    // process "show tables" statement
+    private void executeShowTables(CommonTree ast) throws TException
+    {
+        if (!CliMain.isConnected())
+            return;
+        
+        List<String> tables = thriftClient_.getStringListProperty("tables");
+        for (String table : tables)
+        {
+            css_.out.println(table);
+        }
+    }
+
+    // process a statement of the form: describe table <tablename> 
+    private void executeDescribeTable(CommonTree ast) throws TException
+    {
+        if (!CliMain.isConnected())
+            return;
+
+        // Get table name
+        int childCount = ast.getChildCount();
+        assert(childCount == 1);
+        String tableName = ast.getChild(0).getText();
+        
+        // Describe and display
+        String describe = thriftClient_.describeTable(tableName);
+        css_.out.println(describe);
+        return;
+    }
+
+    // process a statement of the form: connect hostname/port
+    private void executeConnect(CommonTree ast) throws TException
+    {
+        int portNumber = Integer.parseInt(ast.getChild(1).getText());
+        Tree idList = ast.getChild(0);
+        
+        StringBuffer hostName = new StringBuffer();
+        int idCount = idList.getChildCount(); 
+        for (int idx = 0; idx < idCount; idx++)
+        {
+            hostName.append(idList.getChild(idx).getText());
+        }
+        
+        // disconnect current connection, if any.
+        // This is a no-op, if you aren't currently connected.
+        CliMain.disconnect();
+
+        // now, connect to the newly specified host name and port
+        css_.hostName = hostName.toString();
+        css_.thriftPort = portNumber;
+        CliMain.connect(css_.hostName, css_.thriftPort);
+    }
+
+    // execute CQL query on server
+    public void executeQueryOnServer(String query) throws TException
+    {
+        if (!CliMain.isConnected())
+            return;
+        
+        CqlResult_t result = thriftClient_.executeQuery(query);
+        
+        if (result == null)
+        {
+            css_.out.println("Unexpected error. Received null result from server.");
+            return;
+        }
+
+        if ((result.errorTxt != null) || (result.errorCode != 0))
+        {
+            css_.out.println("Error: " + result.errorTxt);
+        }
+        else
+        {
+            List<Map<String, String>> rows = result.resultSet;
+            
+            if (rows != null)
+            {
+                for (Map<String, String> row : rows)
+                {
+                    for (Iterator<Map.Entry<String, String>> it = row.entrySet().iterator(); it.hasNext(); )
+                    {
+                        Map.Entry<String, String> entry = it.next();
+                        String key = entry.getKey();
+                        String value = entry.getValue();
+                        css_.out.print(key + " = " + value + "; ");
+                    }
+                    css_.out.println();
+                }
+            }
+            css_.out.println("Statement processed.");
+        }
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/cli/CliCompiler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/cli/CliCompiler.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/cli/CliCompiler.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/cli/CliCompiler.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cli;
+
+import org.antlr.runtime.*;
+import org.antlr.runtime.tree.*;
+import org.apache.cassandra.cql.common.Utils;
+
+
+public class CliCompiler
+{
+
+    // ANTLR does not provide case-insensitive tokenization support
+    // out of the box. So we override the LA (lookahead) function
+    // of the ANTLRStringStream class. Note: This doesn't change the
+    // token text-- but just relaxes the matching rules to match
+    // in upper case. [Logic borrowed from Hive code.]
+    // 
+    // Also see discussion on this topic in:
+    // http://www.antlr.org/wiki/pages/viewpage.action?pageId=1782.
+    public static class ANTLRNoCaseStringStream  extends ANTLRStringStream
+    {
+        public ANTLRNoCaseStringStream(String input)
+        {
+            super(input);
+        }
+    
+        public int LA(int i)
+        {
+            int returnChar = super.LA(i);
+            if (returnChar == CharStream.EOF)
+            {
+                return returnChar; 
+            }
+            else if (returnChar == 0) 
+            {
+                return returnChar;
+            }
+
+            return Character.toUpperCase((char)returnChar);
+        }
+    }
+
+    public static CommonTree compileQuery(String query)
+    {
+        CommonTree queryTree = null;
+        try
+        {
+            ANTLRStringStream input = new ANTLRNoCaseStringStream(query);
+
+            CliLexer lexer = new CliLexer(input);
+            CommonTokenStream tokens = new CommonTokenStream(lexer);
+
+            CliParser parser = new CliParser(tokens);
+
+            // start parsing...
+            queryTree = (CommonTree)(parser.root().getTree());
+
+            // semantic analysis if any...
+            //  [tbd]
+
+        }
+        catch(Exception e)
+        {
+            System.err.println("Exception " + e.getMessage());
+            e.printStackTrace(System.err);
+        }
+        return queryTree;
+    }
+    /*
+     * NODE_COLUMN_ACCESS related functions.
+     */
+    public static String getTableName(CommonTree astNode)
+    {
+        assert(astNode.getType() == CliParser.NODE_COLUMN_ACCESS);
+
+        return astNode.getChild(0).getText();
+    }
+
+    public static String getColumnFamily(CommonTree astNode)
+    {
+        assert(astNode.getType() == CliParser.NODE_COLUMN_ACCESS);
+
+        return astNode.getChild(1).getText();
+    }
+
+    public static String getKey(CommonTree astNode)
+    {
+        assert(astNode.getType() == CliParser.NODE_COLUMN_ACCESS);
+
+        return Utils.unescapeSQLString(astNode.getChild(2).getText());
+    }
+
+    public static int numColumnSpecifiers(CommonTree astNode)
+    {
+        // Skip over table, column family and rowKey
+        return astNode.getChildCount() - 3;
+    }
+
+    // Returns the pos'th (0-based index) column specifier in the astNode
+    public static String getColumn(CommonTree astNode, int pos)
+    {
+        // Skip over table, column family and rowKey
+        return Utils.unescapeSQLString(astNode.getChild(pos + 3).getText()); 
+    }
+ 
+}