You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pm...@apache.org on 2009/03/02 07:12:49 UTC
svn commit: r749205 [1/16] - in
/incubator/cassandra/src/org/apache/cassandra: analytics/ cli/ concurrent/
config/ continuations/ cql/ cql/common/ cql/compiler/ cql/compiler/common/
cql/compiler/parse/ cql/compiler/sem/ cql/driver/ cql/execution/ dht/ ...
Author: pmalik
Date: Mon Mar 2 06:12:46 2009
New Revision: 749205
URL: http://svn.apache.org/viewvc?rev=749205&view=rev
Log:
Added Cassandra sources
Added:
incubator/cassandra/src/org/apache/cassandra/analytics/
incubator/cassandra/src/org/apache/cassandra/analytics/AnalyticsContext.java
incubator/cassandra/src/org/apache/cassandra/analytics/AnalyticsException.java
incubator/cassandra/src/org/apache/cassandra/analytics/DBAnalyticsSource.java
incubator/cassandra/src/org/apache/cassandra/analytics/IAnalyticsSource.java
incubator/cassandra/src/org/apache/cassandra/analytics/MetricValue.java
incubator/cassandra/src/org/apache/cassandra/analytics/MetricsRecord.java
incubator/cassandra/src/org/apache/cassandra/analytics/OutputRecord.java
incubator/cassandra/src/org/apache/cassandra/analytics/VMAnalyticsSource.java
incubator/cassandra/src/org/apache/cassandra/cli/
incubator/cassandra/src/org/apache/cassandra/cli/Cli.g
incubator/cassandra/src/org/apache/cassandra/cli/Cli.tokens
incubator/cassandra/src/org/apache/cassandra/cli/CliClient.java
incubator/cassandra/src/org/apache/cassandra/cli/CliCompiler.java
incubator/cassandra/src/org/apache/cassandra/cli/CliLexer.java
incubator/cassandra/src/org/apache/cassandra/cli/CliMain.java
incubator/cassandra/src/org/apache/cassandra/cli/CliOptions.java
incubator/cassandra/src/org/apache/cassandra/cli/CliParser.java
incubator/cassandra/src/org/apache/cassandra/cli/CliSessionState.java
incubator/cassandra/src/org/apache/cassandra/cli/Cli__.g
incubator/cassandra/src/org/apache/cassandra/concurrent/
incubator/cassandra/src/org/apache/cassandra/concurrent/AIOExecutorService.java
incubator/cassandra/src/org/apache/cassandra/concurrent/Context.java
incubator/cassandra/src/org/apache/cassandra/concurrent/ContinuationContext.java
incubator/cassandra/src/org/apache/cassandra/concurrent/ContinuationStage.java
incubator/cassandra/src/org/apache/cassandra/concurrent/ContinuationsExecutor.java
incubator/cassandra/src/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java
incubator/cassandra/src/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
incubator/cassandra/src/org/apache/cassandra/concurrent/IContinuable.java
incubator/cassandra/src/org/apache/cassandra/concurrent/IStage.java
incubator/cassandra/src/org/apache/cassandra/concurrent/MultiThreadedStage.java
incubator/cassandra/src/org/apache/cassandra/concurrent/RejectedExecutionHandler.java
incubator/cassandra/src/org/apache/cassandra/concurrent/SingleThreadedContinuationStage.java
incubator/cassandra/src/org/apache/cassandra/concurrent/SingleThreadedStage.java
incubator/cassandra/src/org/apache/cassandra/concurrent/StageManager.java
incubator/cassandra/src/org/apache/cassandra/concurrent/ThreadFactoryImpl.java
incubator/cassandra/src/org/apache/cassandra/concurrent/ThreadLocalContext.java
incubator/cassandra/src/org/apache/cassandra/config/
incubator/cassandra/src/org/apache/cassandra/config/CFMetaData.java
incubator/cassandra/src/org/apache/cassandra/config/DatabaseDescriptor.java
incubator/cassandra/src/org/apache/cassandra/continuations/
incubator/cassandra/src/org/apache/cassandra/continuations/CAgent.java
incubator/cassandra/src/org/apache/cassandra/continuations/ContinuationClassTransformer.java
incubator/cassandra/src/org/apache/cassandra/continuations/Suspendable.java
incubator/cassandra/src/org/apache/cassandra/cql/
incubator/cassandra/src/org/apache/cassandra/cql/common/
incubator/cassandra/src/org/apache/cassandra/cql/common/BindOperand.java
incubator/cassandra/src/org/apache/cassandra/cql/common/CExpr.java
incubator/cassandra/src/org/apache/cassandra/cql/common/CType.java
incubator/cassandra/src/org/apache/cassandra/cql/common/ColumnMapExpr.java
incubator/cassandra/src/org/apache/cassandra/cql/common/ColumnRangeQueryRSD.java
incubator/cassandra/src/org/apache/cassandra/cql/common/ConstantOperand.java
incubator/cassandra/src/org/apache/cassandra/cql/common/CqlResult.java
incubator/cassandra/src/org/apache/cassandra/cql/common/DMLPlan.java
incubator/cassandra/src/org/apache/cassandra/cql/common/ExplainPlan.java
incubator/cassandra/src/org/apache/cassandra/cql/common/OperandDef.java
incubator/cassandra/src/org/apache/cassandra/cql/common/Pair.java
incubator/cassandra/src/org/apache/cassandra/cql/common/Plan.java
incubator/cassandra/src/org/apache/cassandra/cql/common/QueryPlan.java
incubator/cassandra/src/org/apache/cassandra/cql/common/RowSourceDef.java
incubator/cassandra/src/org/apache/cassandra/cql/common/SetColumnMap.java
incubator/cassandra/src/org/apache/cassandra/cql/common/SetSuperColumnMap.java
incubator/cassandra/src/org/apache/cassandra/cql/common/SetUniqueKey.java
incubator/cassandra/src/org/apache/cassandra/cql/common/SuperColumnMapExpr.java
incubator/cassandra/src/org/apache/cassandra/cql/common/SuperColumnRangeQueryRSD.java
incubator/cassandra/src/org/apache/cassandra/cql/common/UniqueKeyQueryRSD.java
incubator/cassandra/src/org/apache/cassandra/cql/common/Utils.java
incubator/cassandra/src/org/apache/cassandra/cql/compiler/
incubator/cassandra/src/org/apache/cassandra/cql/compiler/common/
incubator/cassandra/src/org/apache/cassandra/cql/compiler/common/CompilerErrorMsg.java
incubator/cassandra/src/org/apache/cassandra/cql/compiler/common/CqlCompiler.java
incubator/cassandra/src/org/apache/cassandra/cql/compiler/parse/
incubator/cassandra/src/org/apache/cassandra/cql/compiler/parse/Cql.g
incubator/cassandra/src/org/apache/cassandra/cql/compiler/parse/Cql.tokens
incubator/cassandra/src/org/apache/cassandra/cql/compiler/parse/CqlLexer.java
incubator/cassandra/src/org/apache/cassandra/cql/compiler/parse/CqlParser.java
incubator/cassandra/src/org/apache/cassandra/cql/compiler/parse/Cql__.g
incubator/cassandra/src/org/apache/cassandra/cql/compiler/parse/ParseError.java
incubator/cassandra/src/org/apache/cassandra/cql/compiler/parse/ParseException.java
incubator/cassandra/src/org/apache/cassandra/cql/compiler/sem/
incubator/cassandra/src/org/apache/cassandra/cql/compiler/sem/SemanticException.java
incubator/cassandra/src/org/apache/cassandra/cql/compiler/sem/SemanticPhase.java
incubator/cassandra/src/org/apache/cassandra/cql/driver/
incubator/cassandra/src/org/apache/cassandra/cql/driver/CqlDriver.java
incubator/cassandra/src/org/apache/cassandra/cql/execution/
incubator/cassandra/src/org/apache/cassandra/cql/execution/RuntimeErrorMsg.java
incubator/cassandra/src/org/apache/cassandra/dht/
incubator/cassandra/src/org/apache/cassandra/dht/BootStrapper.java
incubator/cassandra/src/org/apache/cassandra/dht/BootstrapInitiateMessage.java
incubator/cassandra/src/org/apache/cassandra/dht/BootstrapMetadata.java
incubator/cassandra/src/org/apache/cassandra/dht/BootstrapMetadataMessage.java
incubator/cassandra/src/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
incubator/cassandra/src/org/apache/cassandra/dht/BootstrapSourceTarget.java
incubator/cassandra/src/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java
incubator/cassandra/src/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java
incubator/cassandra/src/org/apache/cassandra/dht/Range.java
incubator/cassandra/src/org/apache/cassandra/gms/
incubator/cassandra/src/org/apache/cassandra/gms/ApplicationState.java
incubator/cassandra/src/org/apache/cassandra/gms/EndPointState.java
incubator/cassandra/src/org/apache/cassandra/gms/FailureDetector.java
incubator/cassandra/src/org/apache/cassandra/gms/FailureDetectorMBean.java
incubator/cassandra/src/org/apache/cassandra/gms/GossipDigest.java
incubator/cassandra/src/org/apache/cassandra/gms/GossipDigestAck2Message.java
incubator/cassandra/src/org/apache/cassandra/gms/GossipDigestAckMessage.java
incubator/cassandra/src/org/apache/cassandra/gms/GossipDigestSynMessage.java
incubator/cassandra/src/org/apache/cassandra/gms/Gossiper.java
incubator/cassandra/src/org/apache/cassandra/gms/HeartBeatState.java
incubator/cassandra/src/org/apache/cassandra/gms/IEndPointStateChangePublisher.java
incubator/cassandra/src/org/apache/cassandra/gms/IEndPointStateChangeSubscriber.java
incubator/cassandra/src/org/apache/cassandra/gms/IFailureDetectionEventListener.java
incubator/cassandra/src/org/apache/cassandra/gms/IFailureDetector.java
incubator/cassandra/src/org/apache/cassandra/gms/IFailureNotification.java
incubator/cassandra/src/org/apache/cassandra/gms/JoinMessage.java
incubator/cassandra/src/org/apache/cassandra/gms/PureRandom.java
incubator/cassandra/src/org/apache/cassandra/gms/VersionGenerator.java
incubator/cassandra/src/org/apache/cassandra/io/
incubator/cassandra/src/org/apache/cassandra/io/AIORandomAccessFile.java
incubator/cassandra/src/org/apache/cassandra/io/BufferedRandomAccessFile.java
incubator/cassandra/src/org/apache/cassandra/io/ChecksumManager.java
incubator/cassandra/src/org/apache/cassandra/io/ChecksumRandomAccessFile.java
incubator/cassandra/src/org/apache/cassandra/io/Coordinate.java
incubator/cassandra/src/org/apache/cassandra/io/DataInputBuffer.java
incubator/cassandra/src/org/apache/cassandra/io/DataOutputBuffer.java
incubator/cassandra/src/org/apache/cassandra/io/FastBufferedInputStream.java
incubator/cassandra/src/org/apache/cassandra/io/FastBufferedOutputStream.java
incubator/cassandra/src/org/apache/cassandra/io/ICompactSerializer.java
incubator/cassandra/src/org/apache/cassandra/io/IFileReader.java
incubator/cassandra/src/org/apache/cassandra/io/IFileWriter.java
incubator/cassandra/src/org/apache/cassandra/io/IndexHelper.java
incubator/cassandra/src/org/apache/cassandra/io/SSTable.java
incubator/cassandra/src/org/apache/cassandra/io/SequenceFile.java
Added: incubator/cassandra/src/org/apache/cassandra/analytics/AnalyticsContext.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/analytics/AnalyticsContext.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/analytics/AnalyticsContext.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/analytics/AnalyticsContext.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,788 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.analytics;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.TreeMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.service.IComponentShutdown;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+
+
+/**
+ * Context for sending metrics to Ganglia. This class drives the entire metric collection process.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com ) & Karthik Ranganathan ( kranganathan@facebook.com )
+ */
+public class AnalyticsContext implements IComponentShutdown
+{
+ private static Logger logger_ = Logger.getLogger(AnalyticsContext.class);
+
+ private static final String PERIOD_PROPERTY = "period";
+ private static final String SERVERS_PROPERTY = "servers";
+ private static final String UNITS_PROPERTY = "units";
+ private static final String SLOPE_PROPERTY = "slope";
+ private static final String TMAX_PROPERTY = "tmax";
+ private static final String DMAX_PROPERTY = "dmax";
+
+ private static final String DEFAULT_UNITS = "";
+ private static final String DEFAULT_SLOPE = "both";
+ private static final int DEFAULT_TMAX = 60;
+ private static final int DEFAULT_DMAX = 0;
+ private static final int DEFAULT_PORT = 8649;
+ private static final int BUFFER_SIZE = 1500; // as per libgmond.c
+
+ private static final Map<Class,String> typeTable_ = new HashMap<Class,String>(5);
+
+ private Map<String,RecordMap> bufferedData_ = new HashMap<String,RecordMap>();
+ /* Keeps the MetricRecord for each abstraction that implements IAnalyticsSource */
+ private Map<String, MetricsRecord> recordMap_ = new HashMap<String, MetricsRecord>();
+ private Map<String,Object> attributeMap_ = new HashMap<String,Object>();
+ private Set<IAnalyticsSource> updaters = new HashSet<IAnalyticsSource>(1);
+ private List<InetSocketAddress> metricsServers_;
+
+ private Map<String, String> unitsTable_;
+ private Map<String, String> slopeTable_;
+ private Map<String, String> tmaxTable_;
+ private Map<String, String> dmaxTable_;
+
+ /* singleton instance */
+ private static AnalyticsContext instance_;
+ /* Used to lock the factory for creation of StorageService instance */
+ private static Lock createLock_ = new ReentrantLock();
+
+ /**
+ * Default period in seconds at which data is sent to the metrics system.
+ */
+ private static final int DEFAULT_PERIOD = 5;
+
+ /**
+ * Port to which we should write the data.
+ */
+ private int port_ = DEFAULT_PORT;
+
+ private Timer timer = null;
+ private int period_ = DEFAULT_PERIOD;
+ private volatile boolean isMonitoring = false;
+ private byte[] buffer_ = new byte[BUFFER_SIZE];
+ private int offset_;
+
+ private DatagramSocket datagramSocket_;
+
+ static class TagMap extends TreeMap<String,Object>
+ {
+ private static final long serialVersionUID = 3546309335061952993L;
+ TagMap()
+ {
+ super();
+ }
+ TagMap(TagMap orig)
+ {
+ super(orig);
+ }
+ }
+
+ static class MetricMap extends TreeMap<String,Number>
+ {
+ private static final long serialVersionUID = -7495051861141631609L;
+ }
+
+ static class RecordMap extends HashMap<TagMap,MetricMap>
+ {
+ private static final long serialVersionUID = 259835619700264611L;
+ }
+
+ static
+ {
+ typeTable_.put(String.class, "string");
+ typeTable_.put(Byte.class, "int8");
+ typeTable_.put(Short.class, "int16");
+ typeTable_.put(Integer.class, "int32");
+ typeTable_.put(Float.class, "float");
+ }
+
+
+ /**
+ * Creates a new instance of AnalyticsReporter
+ */
+ public AnalyticsContext()
+ {
+ StorageService.instance().registerComponentForShutdown(this);
+ }
+
+ /**
+ * Initializes the context.
+ */
+ public void init(String contextName, String serverSpecList)
+ {
+ String periodStr = getAttribute(PERIOD_PROPERTY);
+
+ if (periodStr != null)
+ {
+ int period = 0;
+ try
+ {
+ period = Integer.parseInt(periodStr);
+ }
+ catch (NumberFormatException nfe)
+ {
+ }
+
+ if (period <= 0)
+ {
+ throw new AnalyticsException("Invalid period: " + periodStr);
+ }
+
+ setPeriod(period);
+ }
+
+ metricsServers_ = parse(serverSpecList, port_);
+ unitsTable_ = getAttributeTable(UNITS_PROPERTY);
+ slopeTable_ = getAttributeTable(SLOPE_PROPERTY);
+ tmaxTable_ = getAttributeTable(TMAX_PROPERTY);
+ dmaxTable_ = getAttributeTable(DMAX_PROPERTY);
+
+ try
+ {
+ datagramSocket_ = new DatagramSocket();
+ }
+ catch (SocketException se)
+ {
+ se.printStackTrace();
+ }
+ }
+
+ /**
+ * Sends a record to the metrics system.
+ */
+ public void emitRecord(String recordName, OutputRecord outRec) throws IOException
+ {
+ // emit each metric in turn
+ for (String metricName : outRec.getMetricNames())
+ {
+ Object metric = outRec.getMetric(metricName);
+ String type = (String) typeTable_.get(metric.getClass());
+ emitMetric(metricName, type, metric.toString());
+ }
+ }
+
+ /**
+ * Helper which actually writes the metric in XDR format.
+ *
+ * @param name
+ * @param type
+ * @param value
+ * @throws IOException
+ */
+ private void emitMetric(String name, String type, String value) throws IOException
+ {
+ String units = getUnits(name);
+ int slope = getSlope(name);
+ int tmax = getTmax(name);
+ int dmax = getDmax(name);
+ offset_ = 0;
+
+ xdr_int(0); // metric_user_defined
+ xdr_string(type);
+ xdr_string(name);
+ xdr_string(value);
+ xdr_string(units);
+ xdr_int(slope);
+ xdr_int(tmax);
+ xdr_int(dmax);
+
+ for (InetSocketAddress socketAddress : metricsServers_)
+ {
+ DatagramPacket packet = new DatagramPacket(buffer_, offset_, socketAddress);
+ datagramSocket_.send(packet);
+ }
+ }
+
+ private String getUnits(String metricName)
+ {
+ String result = (String) unitsTable_.get(metricName);
+ if (result == null)
+ {
+ result = DEFAULT_UNITS;
+ }
+
+ return result;
+ }
+
+ private int getSlope(String metricName)
+ {
+ String slopeString = (String) slopeTable_.get(metricName);
+ if (slopeString == null)
+ {
+ slopeString = DEFAULT_SLOPE;
+ }
+
+ return ("zero".equals(slopeString) ? 0 : 3); // see gmetric.c
+ }
+
+ private int getTmax(String metricName)
+ {
+ String tmaxString = (String) tmaxTable_.get(metricName);
+ if (tmaxString == null)
+ {
+ return DEFAULT_TMAX;
+ }
+ else
+ {
+ return Integer.parseInt(tmaxString);
+ }
+ }
+
+ private int getDmax(String metricName)
+ {
+ String dmaxString = (String) dmaxTable_.get(metricName);
+ if (dmaxString == null)
+ {
+ return DEFAULT_DMAX;
+ }
+ else
+ {
+ return Integer.parseInt(dmaxString);
+ }
+ }
+
+ /**
+ * Puts a string into the buffer by first writing the size of the string
+ * as an int, followed by the bytes of the string, padded if necessary to
+ * a multiple of 4.
+ */
+ private void xdr_string(String s)
+ {
+ byte[] bytes = s.getBytes();
+ int len = bytes.length;
+ xdr_int(len);
+ System.arraycopy(bytes, 0, buffer_, offset_, len);
+ offset_ += len;
+ pad();
+ }
+
+ /**
+ * Pads the buffer with zero bytes up to the nearest multiple of 4.
+ */
+ private void pad()
+ {
+ int newOffset = ((offset_ + 3) / 4) * 4;
+ while (offset_ < newOffset)
+ {
+ buffer_[offset_++] = 0;
+ }
+ }
+
+ /**
+ * Puts an integer into the buffer as 4 bytes, big-endian.
+ */
+ private void xdr_int(int i)
+ {
+ buffer_[offset_++] = (byte) ((i >> 24) & 0xff);
+ buffer_[offset_++] = (byte) ((i >> 16) & 0xff);
+ buffer_[offset_++] = (byte) ((i >> 8) & 0xff);
+ buffer_[offset_++] = (byte) (i & 0xff);
+ }
+
+
+
+ /**
+ * Returns the names of all the factory's attributes.
+ *
+ * @return the attribute names
+ */
+ public String[] getAttributeNames()
+ {
+ String[] result = new String[attributeMap_.size()];
+ int i = 0;
+ // for (String attributeName : attributeMap.keySet()) {
+ Iterator<String> it = attributeMap_.keySet().iterator();
+ while (it.hasNext())
+ {
+ result[i++] = it.next();
+ }
+ return result;
+ }
+
+ /**
+ * Sets the named factory attribute to the specified value, creating it
+ * if it did not already exist. If the value is null, this is the same as
+ * calling removeAttribute.
+ *
+ * @param attributeName the attribute name
+ * @param value the new attribute value
+ */
+ public void setAttribute(String attributeName, Object value)
+ {
+ attributeMap_.put(attributeName, value);
+ }
+
+ /**
+ * Removes the named attribute if it exists.
+ *
+ * @param attributeName the attribute name
+ */
+ public void removeAttribute(String attributeName)
+ {
+ attributeMap_.remove(attributeName);
+ }
+
+ /**
+ * Returns the value of the named attribute, or null if there is no
+ * attribute of that name.
+ *
+ * @param attributeName the attribute name
+ * @return the attribute value
+ */
+ public String getAttribute(String attributeName)
+ {
+ return (String)attributeMap_.get(attributeName);
+ }
+
+
+ /**
+ * Returns an attribute-value map derived from the factory attributes
+ * by finding all factory attributes that begin with
+ * <i>contextName</i>.<i>tableName</i>. The returned map consists of
+ * those attributes with the contextName and tableName stripped off.
+ */
+ protected Map<String,String> getAttributeTable(String tableName)
+ {
+ String prefix = tableName + ".";
+ Map<String,String> result = new HashMap<String,String>();
+ for (String attributeName : getAttributeNames())
+ {
+ if (attributeName.startsWith(prefix))
+ {
+ String name = attributeName.substring(prefix.length());
+ String value = (String) getAttribute(attributeName);
+ result.put(name, value);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Starts or restarts monitoring, the emitting of metrics records.
+ */
+ public void startMonitoring() throws IOException {
+ if (!isMonitoring)
+ {
+ startTimer();
+ isMonitoring = true;
+ }
+ }
+
+ /**
+ * Stops monitoring. This does not free buffered data.
+ * @see #close()
+ */
+ public void stopMonitoring() {
+ if (isMonitoring)
+ {
+ shutdown();
+ isMonitoring = false;
+ }
+ }
+
+ /**
+ * Returns true if monitoring is currently in progress.
+ */
+ public boolean isMonitoring() {
+ return isMonitoring;
+ }
+
+ /**
+ * Stops monitoring and frees buffered data, returning this
+ * object to its initial state.
+ */
+ public void close()
+ {
+ stopMonitoring();
+ clearUpdaters();
+ }
+
+ /**
+ * Creates a new AbstractMetricsRecord instance with the given <code>recordName</code>.
+ * Throws an exception if the metrics implementation is configured with a fixed
+ * set of record names and <code>recordName</code> is not in that set.
+ *
+ * @param recordName the name of the record
+ * @throws AnalyticsException if recordName conflicts with configuration data
+ */
+ public final void createRecord(String recordName)
+ {
+ if (bufferedData_.get(recordName) == null)
+ {
+ bufferedData_.put(recordName, new RecordMap());
+ }
+ recordMap_.put(recordName, new MetricsRecord(recordName, this));
+ }
+
+ /**
+ * Return the MetricsRecord associated with this record name.
+ * @param recordName the name of the record
+ * @return newly created instance of MetricsRecordImpl or subclass
+ */
+ public MetricsRecord getMetricsRecord(String recordName)
+ {
+ return recordMap_.get(recordName);
+ }
+
+ /**
+ * Registers a callback to be called at time intervals determined by
+ * the configuration.
+ *
+ * @param updater object to be run periodically; it should update
+ * some metrics records
+ */
+ public void registerUpdater(final IAnalyticsSource updater)
+ {
+ if (!updaters.contains(updater)) {
+ updaters.add(updater);
+ }
+ }
+
+ /**
+ * Removes a callback, if it exists.
+ *
+ * @param updater object to be removed from the callback list
+ */
+ public void unregisterUpdater(IAnalyticsSource updater)
+ {
+ updaters.remove(updater);
+ }
+
+ private void clearUpdaters()
+ {
+ updaters.clear();
+ }
+
+ /**
+ * Starts timer if it is not already started
+ */
+ private void startTimer()
+ {
+ if (timer == null)
+ {
+ timer = new Timer("Timer thread for monitoring AnalyticsContext", true);
+ TimerTask task = new TimerTask()
+ {
+ public void run()
+ {
+ try
+ {
+ timerEvent();
+ }
+ catch (IOException ioe)
+ {
+ ioe.printStackTrace();
+ }
+ }
+ };
+ long millis = period_ * 1000;
+ timer.scheduleAtFixedRate(task, millis, millis);
+ }
+ }
+
+ /**
+ * Stops timer if it is running
+ */
+ public void shutdown()
+ {
+ if (timer != null)
+ {
+ timer.cancel();
+ timer = null;
+ }
+ }
+
+ /**
+ * Timer callback.
+ */
+ private void timerEvent() throws IOException
+ {
+ if (isMonitoring)
+ {
+ Collection<IAnalyticsSource> myUpdaters;
+
+ // we dont need to synchronize as there will not be any
+ // addition or removal of listeners
+ myUpdaters = new ArrayList<IAnalyticsSource>(updaters);
+
+ // Run all the registered updates without holding a lock
+ // on this context
+ for (IAnalyticsSource updater : myUpdaters)
+ {
+ try
+ {
+ updater.doUpdates(this);
+ }
+ catch (Throwable throwable)
+ {
+ throwable.printStackTrace();
+ }
+ }
+ emitRecords();
+ }
+ }
+
+ /**
+ * Emits the records.
+ */
+ private void emitRecords() throws IOException
+ {
+ for (String recordName : bufferedData_.keySet())
+ {
+ RecordMap recordMap = bufferedData_.get(recordName);
+ synchronized (recordMap)
+ {
+ for (TagMap tagMap : recordMap.keySet())
+ {
+ MetricMap metricMap = recordMap.get(tagMap);
+ OutputRecord outRec = new OutputRecord(tagMap, metricMap);
+ emitRecord(recordName, outRec);
+ }
+ }
+ }
+ flush();
+ }
+
+ /**
+ * Called each period after all records have been emitted, this method does nothing.
+ * Subclasses may override it in order to perform some kind of flush.
+ */
+ protected void flush() throws IOException
+ {
+ }
+
+ /**
+ * Called by MetricsRecordImpl.update(). Creates or updates a row in
+ * the internal table of metric data.
+ */
+ protected void update(MetricsRecord record)
+ {
+ String recordName = record.getRecordName();
+ TagMap tagTable = record.getTagTable();
+ Map<String,MetricValue> metricUpdates = record.getMetricTable();
+
+ RecordMap recordMap = getRecordMap(recordName);
+ synchronized (recordMap)
+ {
+ MetricMap metricMap = recordMap.get(tagTable);
+ if (metricMap == null)
+ {
+ metricMap = new MetricMap();
+ TagMap tagMap = new TagMap(tagTable); // clone tags
+ recordMap.put(tagMap, metricMap);
+ }
+ for (String metricName : metricUpdates.keySet())
+ {
+ MetricValue updateValue = metricUpdates.get(metricName);
+ Number updateNumber = updateValue.getNumber();
+ Number currentNumber = metricMap.get(metricName);
+ if (currentNumber == null || updateValue.isAbsolute())
+ {
+ metricMap.put(metricName, updateNumber);
+ }
+ else
+ {
+ Number newNumber = sum(updateNumber, currentNumber);
+ metricMap.put(metricName, newNumber);
+ }
+ }
+ }
+ }
+
+ private RecordMap getRecordMap(String recordName)
+ {
+ return bufferedData_.get(recordName);
+ }
+
+ /**
+ * Adds two numbers, coercing the second to the type of the first.
+ *
+ */
+ private Number sum(Number a, Number b)
+ {
+ if (a instanceof Integer)
+ {
+ return new Integer(a.intValue() + b.intValue());
+ }
+ else if (a instanceof Float)
+ {
+ return new Float(a.floatValue() + b.floatValue());
+ }
+ else if (a instanceof Short)
+ {
+ return new Short((short)(a.shortValue() + b.shortValue()));
+ }
+ else if (a instanceof Byte)
+ {
+ return new Byte((byte)(a.byteValue() + b.byteValue()));
+ }
+ else
+ {
+ // should never happen
+ throw new AnalyticsException("Invalid number type");
+ }
+ }
+
+ /**
+ * Called by MetricsRecordImpl.remove(). Removes any matching row in
+ * the internal table of metric data. A row matches if it has the same
+ * tag names and tag values.
+ */
+ protected void remove(MetricsRecord record)
+ {
+ String recordName = record.getRecordName();
+ TagMap tagTable = record.getTagTable();
+
+ RecordMap recordMap = getRecordMap(recordName);
+
+ recordMap.remove(tagTable);
+ }
+
+ /**
+ * Returns the timer period.
+ */
+ public int getPeriod()
+ {
+ return period_;
+ }
+
+ /**
+ * Sets the timer period
+ */
+ protected void setPeriod(int period)
+ {
+ this.period_ = period;
+ }
+
+ /**
+ * Sets the default port to listen on
+ */
+ public void setPort(int port)
+ {
+ port_ = port;
+ }
+
+ /**
+ * Parses a space and/or comma separated sequence of server specifications
+ * of the form <i>hostname</i> or <i>hostname:port</i>. If
+ * the specs string is null, defaults to localhost:defaultPort.
+ *
+ * @return a list of InetSocketAddress objects.
+ */
+ private static List<InetSocketAddress> parse(String specs, int defaultPort)
+ {
+ List<InetSocketAddress> result = new ArrayList<InetSocketAddress>(1);
+ if (specs == null) {
+ result.add(new InetSocketAddress("localhost", defaultPort));
+ }
+ else {
+ String[] specStrings = specs.split("[ ,]+");
+ for (String specString : specStrings) {
+ int colon = specString.indexOf(':');
+ if (colon < 0 || colon == specString.length() - 1)
+ {
+ result.add(new InetSocketAddress(specString, defaultPort));
+ } else
+ {
+ String hostname = specString.substring(0, colon);
+ int port = Integer.parseInt(specString.substring(colon+1));
+ result.add(new InetSocketAddress(hostname, port));
+ }
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Starts up the analytics context and registers the VM metrics.
+ */
+ public void start()
+ {
+ // register the vm analytics object with the analytics context to update the data
+ registerUpdater(new VMAnalyticsSource());
+
+
+ init("analyticsContext", DatabaseDescriptor.getGangliaServers());
+
+ try
+ {
+ startMonitoring();
+ }
+ catch(IOException e)
+ {
+ logger_.error(LogUtil.throwableToString(e));
+ }
+ }
+
+ public void stop()
+ {
+ close();
+ }
+
+ /**
+ * Factory method that gets an instance of the StorageService
+ * class.
+ */
+ public static AnalyticsContext instance()
+ {
+ if ( instance_ == null )
+ {
+ AnalyticsContext.createLock_.lock();
+ try
+ {
+ if ( instance_ == null )
+ {
+ instance_ = new AnalyticsContext();
+ }
+ }
+ finally
+ {
+ createLock_.unlock();
+ }
+ }
+ return instance_;
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/analytics/AnalyticsException.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/analytics/AnalyticsException.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/analytics/AnalyticsException.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/analytics/AnalyticsException.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.analytics;
+
+/**
+ * General-purpose, unchecked metrics exception.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com ) & Karthik Ranganathan ( kranganathan@facebook.com )
+ */
+public class AnalyticsException extends RuntimeException
+{
+
+ private static final long serialVersionUID = -1643257498540498497L;
+
+ /**
+ * Creates a new instance of MetricsException
+ */
+ public AnalyticsException()
+ {
+ }
+
+ /** Creates a new instance of MetricsException
+ *
+ * @param message an error message
+ */
+ public AnalyticsException(String message)
+ {
+ super(message);
+ }
+
+ }
Added: incubator/cassandra/src/org/apache/cassandra/analytics/DBAnalyticsSource.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/analytics/DBAnalyticsSource.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/analytics/DBAnalyticsSource.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/analytics/DBAnalyticsSource.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.analytics;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * This class sets up the analytics package to report metrics into
+ * Ganglia for the various DB operations such as: reads per second,
+ * average read latency, writes per second, average write latency.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com ) & Karthik Ranganathan ( kranganathan@facebook.com )
+ */
+public class DBAnalyticsSource implements IAnalyticsSource
+{
+ private static final String METRIC_READ_OPS = "Read Operations";
+ private static final String RECORD_READ_OPS = "ReadOperationsRecord";
+ private static final String TAG_READOPS = "ReadOperationsTag";
+ private static final String TAG_READ_OPS = "ReadOperationsTagValue";
+
+ private static final String METRIC_READ_AVG = "Average Read Latency";
+ private static final String RECORD_READ_AVG = "ReadLatencyRecord";
+ private static final String TAG_READAVG = "AverageReadLatencyTag";
+ private static final String TAG_READ_AVG = "ReadLatencyTagValue";
+
+ private static final String METRIC_WRITE_OPS = "Write Operations";
+ private static final String RECORD_WRITE_OPS = "WriteOperationsRecord";
+ private static final String TAG_WRITEOPS = "WriteOperationsTag";
+ private static final String TAG_WRITE_OPS = "WriteOperationsTagValue";
+
+ private static final String METRIC_WRITE_AVG = "Average Write Latency";
+ private static final String RECORD_WRITE_AVG = "WriteLatencyRecord";
+ private static final String TAG_WRITEAVG = "AverageWriteLatencyTag";
+ private static final String TAG_WRITE_AVG = "WriteLatencyTagValue";
+
+ /* keep track of the number of read operations */
+ private AtomicInteger readOperations_ = new AtomicInteger(0);
+
+ /* keep track of the number of read latencies */
+ private AtomicLong readLatencies_ = new AtomicLong(0);
+
+ /* keep track of the number of write operations */
+ private AtomicInteger writeOperations_ = new AtomicInteger(0);
+
+ /* keep track of the number of write latencies */
+ private AtomicLong writeLatencies_ = new AtomicLong(0);
+
+ /**
+ * Create all the required records we intend to display, and
+ * register with the AnalyticsContext.
+ */
+ public DBAnalyticsSource()
+ {
+ /* register with the AnalyticsContext */
+ AnalyticsContext.instance().registerUpdater(this);
+ /* set the units for the metric type */
+ AnalyticsContext.instance().setAttribute("units." + METRIC_READ_OPS, "r/s");
+ /* create the record */
+ AnalyticsContext.instance().createRecord(RECORD_READ_OPS);
+
+ /* set the units for the metric type */
+ AnalyticsContext.instance().setAttribute("units." + METRIC_READ_AVG, "ms");
+ /* create the record */
+ AnalyticsContext.instance().createRecord(RECORD_READ_AVG);
+
+ /* set the units for the metric type */
+ AnalyticsContext.instance().setAttribute("units." + METRIC_WRITE_OPS, "w/s");
+ /* create the record */
+ AnalyticsContext.instance().createRecord(RECORD_WRITE_OPS);
+
+ /* set the units for the metric type */
+ AnalyticsContext.instance().setAttribute("units." + METRIC_WRITE_AVG, "ms");
+ /* create the record */
+ AnalyticsContext.instance().createRecord(RECORD_WRITE_AVG);
+ }
+
+ /**
+ * Update each of the records with the relevant data
+ *
+ * @param context the reference to the context which has called this callback
+ */
+ public void doUpdates(AnalyticsContext context)
+ {
+ // update the read operations record
+ MetricsRecord readUsageRecord = context.getMetricsRecord(RECORD_READ_OPS);
+ int period = context.getPeriod();
+
+ if(readUsageRecord != null)
+ {
+ if ( readOperations_.get() > 0 )
+ {
+ readUsageRecord.setTag(TAG_READOPS, TAG_READ_OPS);
+ readUsageRecord.setMetric(METRIC_READ_OPS, readOperations_.get() / period);
+ readUsageRecord.update();
+ }
+ }
+
+ // update the read latency record
+ MetricsRecord readLatencyRecord = context.getMetricsRecord(RECORD_READ_AVG);
+ if(readLatencyRecord != null)
+ {
+ if ( readOperations_.get() > 0 )
+ {
+ readLatencyRecord.setTag(TAG_READAVG, TAG_READ_AVG);
+ readLatencyRecord.setMetric(METRIC_READ_AVG, readLatencies_.get() / readOperations_.get() );
+ readLatencyRecord.update();
+ }
+ }
+
+ // update the write operations record
+ MetricsRecord writeUsageRecord = context.getMetricsRecord(RECORD_WRITE_OPS);
+ if(writeUsageRecord != null)
+ {
+ if ( writeOperations_.get() > 0 )
+ {
+ writeUsageRecord.setTag(TAG_WRITEOPS, TAG_WRITE_OPS);
+ writeUsageRecord.setMetric(METRIC_WRITE_OPS, writeOperations_.get() / period);
+ writeUsageRecord.update();
+ }
+ }
+
+ // update the write latency record
+ MetricsRecord writeLatencyRecord = context.getMetricsRecord(RECORD_WRITE_AVG);
+ if(writeLatencyRecord != null)
+ {
+ if ( writeOperations_.get() > 0 )
+ {
+ writeLatencyRecord.setTag(TAG_WRITEAVG, TAG_WRITE_AVG);
+ writeLatencyRecord.setMetric(METRIC_WRITE_AVG, writeLatencies_.get() / writeOperations_.get() );
+ writeLatencyRecord.update();
+ }
+ }
+
+ clear();
+ }
+
+ /**
+ * Reset all the metric records
+ */
+ private void clear()
+ {
+ readOperations_.set(0);
+ readLatencies_.set(0);
+ writeOperations_.set(0);
+ writeLatencies_.set(0);
+ }
+
+ /**
+ * Update the read statistics.
+ */
+ public void updateReadStatistics(long latency)
+ {
+ readOperations_.incrementAndGet();
+ readLatencies_.addAndGet(latency);
+ }
+
+ /**
+ * Update the write statistics.
+ */
+ public void updateWriteStatistics(long latency)
+ {
+ writeOperations_.incrementAndGet();
+ writeLatencies_.addAndGet(latency);
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/analytics/IAnalyticsSource.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/analytics/IAnalyticsSource.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/analytics/IAnalyticsSource.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/analytics/IAnalyticsSource.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.analytics;
+
+/**
+ * Call-back interface. See <code>AnalyticsContext.registerUpdater()</code>.
+ * This callback is called at a regular (pre-registered time interval) in
+ * order to update the metric values.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com ) & Karthik Ranganathan ( kranganathan@facebook.com )
+ */
+public interface IAnalyticsSource
+{
+ /**
+ * Timer-based call-back from the metric library.
+ */
+ public abstract void doUpdates(AnalyticsContext context);
+
+}
Added: incubator/cassandra/src/org/apache/cassandra/analytics/MetricValue.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/analytics/MetricValue.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/analytics/MetricValue.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/analytics/MetricValue.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.analytics;
+
+
+/**
+ * A Number that is either an absolute or an incremental amount.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com ) & Karthik Ranganathan ( kranganathan@facebook.com )
+ */
+public class MetricValue
+{
+ public static final boolean ABSOLUTE = false;
+ public static final boolean INCREMENT = true;
+
+ private boolean isIncrement;
+ private Number number;
+
+ /**
+ * Creates a new instance of MetricValue
+ *
+ * @param number this initializes the initial value of this metric
+ * @param isIncrement sets if the metric can be incremented or only set
+ */
+ public MetricValue(Number number, boolean isIncrement)
+ {
+ this.number = number;
+ this.isIncrement = isIncrement;
+ }
+
+ /**
+ * Checks if this metric can be incremented.
+ *
+ * @return true if the value of this metric can be incremented, false otherwise
+ */
+ public boolean isIncrement()
+ {
+ return isIncrement;
+ }
+
+ /**
+ * Checks if the value of this metric is always an absolute value. This is the
+ * inverse of isIncrement.
+ *
+ * @return true if the
+ */
+ public boolean isAbsolute()
+ {
+ return !isIncrement;
+ }
+
+ /**
+ * Returns the current number value of the metric.
+ *
+ * @return the Number value of this metric
+ */
+ public Number getNumber()
+ {
+ return number;
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/analytics/MetricsRecord.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/analytics/MetricsRecord.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/analytics/MetricsRecord.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/analytics/MetricsRecord.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.analytics;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+
+/**
+ * This class keeps a back-pointer to the AnalyticsContext
+ * and delegates back to it on <code>update</code> and <code>remove()</code>.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com ) & Karthik Ranganathan ( kranganathan@facebook.com )
+ */
+public class MetricsRecord {
+
+ private AnalyticsContext.TagMap tagTable = new AnalyticsContext.TagMap();
+ private Map<String,MetricValue> metricTable = new LinkedHashMap<String,MetricValue>();
+
+ private String recordName;
+ private AnalyticsContext context;
+
+
+ /**
+ * Creates a new instance of MetricsRecord
+ *
+ * @param recordName name of this record
+ * @param context the context which this record is a part of
+ */
+ protected MetricsRecord(String recordName, AnalyticsContext context)
+ {
+ this.recordName = recordName;
+ this.context = context;
+ }
+
+ /**
+ * Returns the record name.
+ *
+ * @return the record name
+ */
+ public String getRecordName() {
+ return recordName;
+ }
+
+ /**
+ * Sets the named tag to the specified value.
+ *
+ * @param tagName name of the tag
+ * @param tagValue new value of the tag
+ * @throws MetricsException if the tagName conflicts with the configuration
+ */
+ public void setTag(String tagName, String tagValue) {
+ if (tagValue == null) {
+ tagValue = "";
+ }
+ tagTable.put(tagName, tagValue);
+ }
+
+ /**
+ * Sets the named tag to the specified value.
+ *
+ * @param tagName name of the tag
+ * @param tagValue new value of the tag
+ * @throws MetricsException if the tagName conflicts with the configuration
+ */
+ public void setTag(String tagName, int tagValue) {
+ tagTable.put(tagName, new Integer(tagValue));
+ }
+
+ /**
+ * Sets the named tag to the specified value.
+ *
+ * @param tagName name of the tag
+ * @param tagValue new value of the tag
+ * @throws MetricsException if the tagName conflicts with the configuration
+ */
+ public void setTag(String tagName, short tagValue) {
+ tagTable.put(tagName, new Short(tagValue));
+ }
+
+ /**
+ * Sets the named tag to the specified value.
+ *
+ * @param tagName name of the tag
+ * @param tagValue new value of the tag
+ * @throws MetricsException if the tagName conflicts with the configuration
+ */
+ public void setTag(String tagName, byte tagValue)
+ {
+ tagTable.put(tagName, new Byte(tagValue));
+ }
+
+ /**
+ * Sets the named metric to the specified value.
+ *
+ * @param metricName name of the metric
+ * @param metricValue new value of the metric
+ * @throws MetricsException if the metricName or the type of the metricValue
+ * conflicts with the configuration
+ */
+ public void setMetric(String metricName, int metricValue)
+ {
+ setAbsolute(metricName, new Integer(metricValue));
+ }
+
+ /**
+ * Sets the named metric to the specified value.
+ *
+ * @param metricName name of the metric
+ * @param metricValue new value of the metric
+ * @throws MetricsException if the metricName or the type of the metricValue
+ * conflicts with the configuration
+ */
+ public void setMetric(String metricName, short metricValue)
+ {
+ setAbsolute(metricName, new Short(metricValue));
+ }
+
+ /**
+ * Sets the named metric to the specified value.
+ *
+ * @param metricName name of the metric
+ * @param metricValue new value of the metric
+ * @throws MetricsException if the metricName or the type of the metricValue
+ * conflicts with the configuration
+ */
+ public void setMetric(String metricName, byte metricValue)
+ {
+ setAbsolute(metricName, new Byte(metricValue));
+ }
+
+ /**
+ * Sets the named metric to the specified value.
+ *
+ * @param metricName name of the metric
+ * @param metricValue new value of the metric
+ * @throws MetricsException if the metricName or the type of the metricValue
+ * conflicts with the configuration
+ */
+ public void setMetric(String metricName, float metricValue)
+ {
+ setAbsolute(metricName, new Float(metricValue));
+ }
+
+ /**
+ * Increments the named metric by the specified value.
+ *
+ * @param metricName name of the metric
+ * @param metricValue incremental value
+ * @throws MetricsException if the metricName or the type of the metricValue
+ * conflicts with the configuration
+ */
+ public void incrMetric(String metricName, int metricValue)
+ {
+ setIncrement(metricName, new Integer(metricValue));
+ }
+
+ /**
+ * Increments the named metric by the specified value.
+ *
+ * @param metricName name of the metric
+ * @param metricValue incremental value
+ * @throws MetricsException if the metricName or the type of the metricValue
+ * conflicts with the configuration
+ */
+ public void incrMetric(String metricName, short metricValue)
+ {
+ setIncrement(metricName, new Short(metricValue));
+ }
+
+ /**
+ * Increments the named metric by the specified value.
+ *
+ * @param metricName name of the metric
+ * @param metricValue incremental value
+ * @throws MetricsException if the metricName or the type of the metricValue
+ * conflicts with the configuration
+ */
+ public void incrMetric(String metricName, byte metricValue)
+ {
+ setIncrement(metricName, new Byte(metricValue));
+ }
+
+ /**
+ * Increments the named metric by the specified value.
+ *
+ * @param metricName name of the metric
+ * @param metricValue incremental value
+ * @throws MetricsException if the metricName or the type of the metricValue
+ * conflicts with the configuration
+ */
+ public void incrMetric(String metricName, float metricValue)
+ {
+ setIncrement(metricName, new Float(metricValue));
+ }
+
+ /**
+ * Sets the value of the metric identified by metricName with the
+ * number metricValue.
+ *
+ * @param metricName name of the metric
+ * @param metricValue number value to which it should be updated
+ */
+ private void setAbsolute(String metricName, Number metricValue)
+ {
+ metricTable.put(metricName, new MetricValue(metricValue, MetricValue.ABSOLUTE));
+ }
+
+ /**
+ * Increments the value of the metric identified by metricName with the
+ * number metricValue.
+ *
+ * @param metricName name of the metric
+ * @param metricValue number value by which it should be incremented
+ */
+ private void setIncrement(String metricName, Number metricValue)
+ {
+ metricTable.put(metricName, new MetricValue(metricValue, MetricValue.INCREMENT));
+ }
+
+ /**
+ * Updates the table of buffered data which is to be sent periodically.
+ * If the tag values match an existing row, that row is updated;
+ * otherwise, a new row is added.
+ */
+ public void update()
+ {
+ context.update(this);
+ }
+
+ /**
+ * Removes the row, if it exists, in the buffered data table having tags
+ * that equal the tags that have been set on this record.
+ */
+ public void remove()
+ {
+ context.remove(this);
+ }
+
+ AnalyticsContext.TagMap getTagTable()
+ {
+ return tagTable;
+ }
+
+ Map<String, MetricValue> getMetricTable()
+ {
+ return metricTable;
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/analytics/OutputRecord.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/analytics/OutputRecord.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/analytics/OutputRecord.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/analytics/OutputRecord.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.analytics;
+
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * Represents a record of metric data to be sent to a metrics system.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com ) & Karthik Ranganathan ( kranganathan@facebook.com )
+ */
+public class OutputRecord
+{
+ private AnalyticsContext.TagMap tagMap;
+ private AnalyticsContext.MetricMap metricMap;
+
+ /**
+ * Creates a new instance of OutputRecord
+ */
+ OutputRecord(AnalyticsContext.TagMap tagMap, AnalyticsContext.MetricMap metricMap)
+ {
+ this.tagMap = tagMap;
+ this.metricMap = metricMap;
+ }
+
+ /**
+ * Returns the set of tag names.
+ */
+ public Set<String> getTagNames()
+ {
+ return Collections.unmodifiableSet(tagMap.keySet());
+ }
+
+ /**
+ * Returns a tag object which is can be a String, Integer, Short or Byte.
+ *
+ * @return the tag value, or null if there is no such tag
+ */
+ public Object getTag(String name)
+ {
+ return tagMap.get(name);
+ }
+
+ /**
+ * Returns the set of metric names.
+ *
+ * @return the set of metric names
+ */
+ public Set<String> getMetricNames()
+ {
+ return Collections.unmodifiableSet(metricMap.keySet());
+ }
+
+ /**
+ * Returns the metric object which can be a Float, Integer, Short or Byte.
+ *
+ * @param name name of the metric for which the value is being requested
+ * @return return the tag value, or null if there is no such tag
+ */
+ public Number getMetric(String name)
+ {
+ return (Number) metricMap.get(name);
+ }
+
+}
\ No newline at end of file
Added: incubator/cassandra/src/org/apache/cassandra/analytics/VMAnalyticsSource.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/analytics/VMAnalyticsSource.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/analytics/VMAnalyticsSource.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/analytics/VMAnalyticsSource.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.analytics;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.MemoryUsage;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+
+/**
+ * This class sets up the analytics package to report metrics into
+ * Ganglia for VM heap utilization.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com ) & Karthik Ranganathan ( kranganathan@facebook.com )
+ */
+
+public class VMAnalyticsSource implements IAnalyticsSource
+{
+ private static final String METRIC_MEMUSAGE = "VM Heap Utilization";
+ private static final String RECORD_MEMUSAGE = "MemoryUsageRecord";
+ private static final String TAG_MEMUSAGE = "MemoryUsageTag";
+ private static final String TAG_MEMUSAGE_MEMUSED = "MemoryUsedTagValue";
+
+ /**
+ * Setup the Ganglia record to display the VM heap utilization.
+ */
+ public VMAnalyticsSource()
+ {
+ // set the units for the metric type
+ AnalyticsContext.instance().setAttribute("units." + METRIC_MEMUSAGE, "MB");
+ // create the record
+ AnalyticsContext.instance().createRecord(RECORD_MEMUSAGE);
+ }
+
+ /**
+ * Update the VM heap utilization record with the relevant data.
+ *
+ * @param context the reference to the context which has called this callback
+ */
+ public void doUpdates(AnalyticsContext context)
+ {
+ // update the memory used record
+ MetricsRecord memUsageRecord = context.getMetricsRecord(RECORD_MEMUSAGE);
+ if(memUsageRecord != null)
+ {
+ updateUsedMemory(memUsageRecord);
+ }
+ }
+
+ private void updateUsedMemory(MetricsRecord memUsageRecord)
+ {
+ memUsageRecord.setTag(TAG_MEMUSAGE, TAG_MEMUSAGE_MEMUSED);
+ memUsageRecord.setMetric(METRIC_MEMUSAGE, getMemoryUsed());
+ memUsageRecord.update();
+ }
+
+ private float getMemoryUsed()
+ {
+ MemoryMXBean memoryMxBean = ManagementFactory.getMemoryMXBean();
+ MemoryUsage memUsage = memoryMxBean.getHeapMemoryUsage();
+ return (float)memUsage.getUsed()/(1024 * 1024);
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/cli/Cli.g
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/cli/Cli.g?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/cli/Cli.g (added)
+++ incubator/cassandra/src/org/apache/cassandra/cli/Cli.g Mon Mar 2 06:12:46 2009
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+//
+// ANTLR Grammar for the Cassandra Command Line Interface (CLI).
+//
+// Note: This grammar handles all but the CQL statements. CQL
+// statements are detected separately (based on the first token)
+// and directly sent to server-side for processing.
+//
+
+grammar Cli;
+
+options {
+ output=AST;
+ ASTLabelType=CommonTree;
+ backtrack=true;
+}
+
+//
+// Nodes in the AST
+//
+tokens {
+ //
+ // Top-level nodes. These typically correspond to
+ // various top-level CLI statements.
+ //
+ NODE_CONNECT;
+ NODE_DESCRIBE_TABLE;
+ NODE_EXIT;
+ NODE_HELP;
+ NODE_NO_OP;
+ NODE_SHOW_CLUSTER_NAME;
+ NODE_SHOW_CONFIG_FILE;
+ NODE_SHOW_VERSION;
+ NODE_SHOW_TABLES;
+ NODE_THRIFT_GET;
+ NODE_THRIFT_SET;
+
+ // Internal Nodes.
+ NODE_COLUMN_ACCESS;
+ NODE_ID_LIST;
+}
+
+@parser::header {
+package com.facebook.infrastructure.cli;
+}
+
+@lexer::header {
+package com.facebook.infrastructure.cli;
+}
+
+//
+// Parser Section
+//
+
+// the root node
+root: stmt SEMICOLON? EOF -> stmt;
+
+stmt
+ : connectStmt
+ | exitStmt
+ | describeTable
+ | getStmt
+ | helpStmt
+ | setStmt
+ | showStmt
+ | -> ^(NODE_NO_OP)
+ ;
+
+connectStmt
+ : K_CONNECT host SLASH port -> ^(NODE_CONNECT host port)
+ ;
+
+helpStmt
+ : K_HELP -> ^(NODE_HELP)
+ | '?' -> ^(NODE_HELP)
+ ;
+
+exitStmt
+ : K_QUIT -> ^(NODE_EXIT)
+ | K_EXIT -> ^(NODE_EXIT)
+ ;
+
+getStmt
+ : K_THRIFT K_GET columnFamilyExpr -> ^(NODE_THRIFT_GET columnFamilyExpr)
+ ;
+
+setStmt
+ : K_THRIFT K_SET columnFamilyExpr '=' value -> ^(NODE_THRIFT_SET columnFamilyExpr value)
+ ;
+
+showStmt
+ : showClusterName
+ | showVersion
+ | showConfigFile
+ | showTables
+ ;
+
+showClusterName
+ : K_SHOW K_CLUSTER K_NAME -> ^(NODE_SHOW_CLUSTER_NAME)
+ ;
+
+showConfigFile
+ : K_SHOW K_CONFIG K_FILE -> ^(NODE_SHOW_CONFIG_FILE)
+ ;
+
+showVersion
+ : K_SHOW K_VERSION -> ^(NODE_SHOW_VERSION)
+ ;
+
+showTables
+ : K_SHOW K_TABLES -> ^(NODE_SHOW_TABLES)
+ ;
+
+describeTable
+ : K_DESCRIBE K_TABLE table -> ^(NODE_DESCRIBE_TABLE table);
+
+columnFamilyExpr
+ : table DOT columnFamily '[' rowKey ']'
+ ( '[' a+=columnOrSuperColumn ']'
+ ('[' a+=columnOrSuperColumn ']')?
+ )?
+ -> ^(NODE_COLUMN_ACCESS table columnFamily rowKey ($a+)?)
+ ;
+
+table: Identifier;
+
+columnFamily: Identifier;
+
+rowKey: StringLiteral;
+
+value: StringLiteral;
+
+columnOrSuperColumn: StringLiteral;
+
+host: id+=Identifier (id+=DOT id+=Identifier)* -> ^(NODE_ID_LIST $id+);
+
+port: IntegerLiteral;
+
+//
+// Lexer Section
+//
+
+//
+// Keywords (in alphabetical order for convenience)
+//
+// CLI is case-insensitive with respect to these keywords.
+// However, they MUST be listed in upper case here.
+//
+K_CONFIG: 'CONFIG';
+K_CONNECT: 'CONNECT';
+K_CLUSTER: 'CLUSTER';
+K_DESCRIBE: 'DESCRIBE';
+K_GET: 'GET';
+K_HELP: 'HELP';
+K_EXIT: 'EXIT';
+K_FILE: 'FILE';
+K_NAME: 'NAME';
+K_QUIT: 'QUIT';
+K_SET: 'SET';
+K_SHOW: 'SHOW';
+K_TABLE: 'TABLE';
+K_TABLES: 'TABLES';
+K_THRIFT: 'THRIFT';
+K_VERSION: 'VERSION';
+
+// private syntactic rules
+fragment
+Letter
+ : 'a'..'z'
+ | 'A'..'Z'
+ ;
+
+fragment
+Digit
+ : '0'..'9'
+ ;
+
+// syntactic Elements
+Identifier
+ : Letter ( Letter | Digit | '_')*
+ ;
+
+
+// literals
+StringLiteral
+ :
+ '\'' (~'\'')* '\'' ( '\'' (~'\'')* '\'' )*
+ ;
+
+IntegerLiteral
+ : Digit+;
+
+
+//
+// syntactic elements
+//
+
+DOT
+ : '.'
+ ;
+
+SLASH
+ : '/'
+ ;
+
+SEMICOLON
+ : ';'
+ ;
+
+WS
+ : (' '|'\r'|'\t'|'\n') {$channel=HIDDEN;} // whitepace
+ ;
+
+COMMENT
+ : '--' (~('\n'|'\r'))* { $channel=HIDDEN; }
+ | '/*' (options {greedy=false;} : .)* '*/' { $channel=HIDDEN; }
+ ;
Added: incubator/cassandra/src/org/apache/cassandra/cli/Cli.tokens
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/cli/Cli.tokens?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/cli/Cli.tokens (added)
+++ incubator/cassandra/src/org/apache/cassandra/cli/Cli.tokens Mon Mar 2 06:12:46 2009
@@ -0,0 +1,43 @@
+NODE_SHOW_CONFIG_FILE=10
+K_TABLES=32
+K_VERSION=31
+K_EXIT=22
+NODE_EXIT=6
+K_FILE=30
+K_GET=24
+K_CONNECT=18
+K_CONFIG=29
+SEMICOLON=17
+Digit=40
+Identifier=36
+NODE_THRIFT_GET=13
+K_SET=25
+StringLiteral=37
+NODE_HELP=7
+NODE_NO_OP=8
+NODE_THRIFT_SET=14
+K_DESCRIBE=33
+NODE_SHOW_VERSION=11
+NODE_ID_LIST=16
+WS=41
+NODE_CONNECT=4
+SLASH=19
+K_THRIFT=23
+NODE_SHOW_TABLES=12
+K_CLUSTER=27
+K_HELP=20
+K_SHOW=26
+NODE_DESCRIBE_TABLE=5
+K_TABLE=34
+IntegerLiteral=38
+NODE_SHOW_CLUSTER_NAME=9
+COMMENT=42
+DOT=35
+K_NAME=28
+Letter=39
+NODE_COLUMN_ACCESS=15
+K_QUIT=21
+'?'=43
+'='=44
+'['=45
+']'=46
Added: incubator/cassandra/src/org/apache/cassandra/cli/CliClient.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/cli/CliClient.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/cli/CliClient.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/cli/CliClient.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,323 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cli;
+
+import com.facebook.thrift.*;
+
+import org.antlr.runtime.tree.*;
+import org.apache.cassandra.cql.common.Utils;
+import org.apache.cassandra.service.Cassandra;
+import org.apache.cassandra.service.CassandraException;
+import org.apache.cassandra.service.CqlResult_t;
+import org.apache.cassandra.service.column_t;
+import org.apache.cassandra.service.Cassandra.Client;
+import org.apache.cassandra.utils.LogUtil;
+
+import java.util.*;
+
+// Cli Client Side Library
+public class CliClient
+{
+ private Cassandra.Client thriftClient_ = null;
+ private CliSessionState css_ = null;
+
+ public CliClient(CliSessionState css, Cassandra.Client thriftClient)
+ {
+ css_ = css;
+ thriftClient_ = thriftClient;
+ }
+
+ // Execute a CLI Statement
+ public void executeCLIStmt(String stmt) throws TException
+ {
+ CommonTree ast = null;
+
+ ast = CliCompiler.compileQuery(stmt);
+
+ switch (ast.getType()) {
+ case CliParser.NODE_EXIT:
+ cleanupAndExit();
+ break;
+ case CliParser.NODE_THRIFT_GET:
+ executeGet(ast);
+ break;
+ case CliParser.NODE_HELP:
+ printCmdHelp();
+ break;
+ case CliParser.NODE_THRIFT_SET:
+ executeSet(ast);
+ break;
+ case CliParser.NODE_SHOW_CLUSTER_NAME:
+ executeShowProperty(ast, "cluster name");
+ break;
+ case CliParser.NODE_SHOW_CONFIG_FILE:
+ executeShowProperty(ast, "config file");
+ break;
+ case CliParser.NODE_SHOW_VERSION:
+ executeShowProperty(ast, "version");
+ break;
+ case CliParser.NODE_SHOW_TABLES:
+ executeShowTables(ast);
+ break;
+ case CliParser.NODE_DESCRIBE_TABLE:
+ executeDescribeTable(ast);
+ break;
+ case CliParser.NODE_CONNECT:
+ executeConnect(ast);
+ break;
+ case CliParser.NODE_NO_OP:
+ // comment lines come here; they are treated as no ops.
+ break;
+ default:
+ css_.err.println("Invalid Statement (Type: " + ast.getType() + ")");
+ break;
+ }
+ }
+
+ private void printCmdHelp()
+ {
+ css_.out.println("List of all CLI commands:");
+ css_.out.println("? Same as help.");
+ css_.out.println("connect <hostname>/<port> Connect to Cassandra's thrift service.");
+ css_.out.println("describe table <tbl> Describe table.");
+ css_.out.println("exit Exit CLI.");
+ css_.out.println("explain plan [<set stmt>|<get stmt>|<select stmt>] Explains the PLAN for specified stmt.");
+ css_.out.println("help Display this help.");
+ css_.out.println("quit Exit CLI.");
+ css_.out.println("show config file Display contents of config file");
+ css_.out.println("show cluster name Display cassandra server version");
+ css_.out.println("show tables Show list of tables.");
+ css_.out.println("show version Show server version.");
+ css_.out.println("select ... CQL select statement (TBD).");
+ css_.out.println("get ... CQL data retrieval statement.");
+ css_.out.println("set ... CQL DML statement.");
+ css_.out.println("thrift get <tbl>.<cf>['<rowKey>'] (will be deprecated)");
+ css_.out.println("thrift get <tbl>.<cf>['<rowKey>']['<colKey>'] (will be deprecated)");
+ css_.out.println("thrift set <tbl>.<cf>['<rowKey>']['<colKey>'] = '<value>' (will be deprecated)");
+ }
+
+ private void cleanupAndExit()
+ {
+ CliMain.disconnect();
+ System.exit(0);
+ }
+
+ // Execute GET statement
+ private void executeGet(CommonTree ast) throws TException
+ {
+ if (!CliMain.isConnected())
+ return;
+
+ int childCount = ast.getChildCount();
+ assert(childCount == 1);
+
+ CommonTree columnFamilySpec = (CommonTree)ast.getChild(0);
+ assert(columnFamilySpec.getType() == CliParser.NODE_COLUMN_ACCESS);
+
+ String tableName = CliCompiler.getTableName(columnFamilySpec);
+ String key = CliCompiler.getKey(columnFamilySpec);
+ String columnFamily = CliCompiler.getColumnFamily(columnFamilySpec);
+ int columnSpecCnt = CliCompiler.numColumnSpecifiers(columnFamilySpec);
+
+ // assume simple columnFamily for now
+ if (columnSpecCnt == 0)
+ {
+ // table.cf['key']
+ List<column_t> columns = new ArrayList<column_t>();
+ try
+ {
+ columns = thriftClient_.get_slice(tableName, key, columnFamily, -1, 1000000);
+ }
+ catch(CassandraException cex)
+ {
+ css_.out.println(LogUtil.throwableToString(cex));
+ }
+ int size = columns.size();
+ for (Iterator<column_t> colIter = columns.iterator(); colIter.hasNext(); )
+ {
+ column_t col = colIter.next();
+ css_.out.printf(" (column=%s, value=%s; timestamp=%d)\n",
+ col.columnName, col.value, col.timestamp);
+ }
+ css_.out.println("Returned " + size + " rows.");
+ }
+ else if (columnSpecCnt == 1)
+ {
+ // table.cf['key']['column']
+ String columnName = CliCompiler.getColumn(columnFamilySpec, 0);
+ column_t col = new column_t();
+ try
+ {
+ col = thriftClient_.get_column(tableName, key, columnFamily + ":" + columnName);
+ }
+ catch(CassandraException cex)
+ {
+ css_.out.println(LogUtil.throwableToString(cex));
+ }
+
+ css_.out.printf("==> (name=%s, value=%s; timestamp=%d)\n",
+ col.columnName, col.value, col.timestamp);
+ }
+ else
+ {
+ assert(false);
+ }
+ }
+
+ // Execute SET statement
+ private void executeSet(CommonTree ast) throws TException
+ {
+ if (!CliMain.isConnected())
+ return;
+
+ int childCount = ast.getChildCount();
+ assert(childCount == 2);
+
+ CommonTree columnFamilySpec = (CommonTree)ast.getChild(0);
+ assert(columnFamilySpec.getType() == CliParser.NODE_COLUMN_ACCESS);
+
+ String tableName = CliCompiler.getTableName(columnFamilySpec);
+ String key = CliCompiler.getKey(columnFamilySpec);
+ String columnFamily = CliCompiler.getColumnFamily(columnFamilySpec);
+ int columnSpecCnt = CliCompiler.numColumnSpecifiers(columnFamilySpec);
+ String value = Utils.unescapeSQLString(ast.getChild(1).getText());
+
+ // assume simple columnFamily for now
+ if (columnSpecCnt == 1)
+ {
+ // We have the table.cf['key']['column'] = 'value' case.
+
+ // get the column name
+ String columnName = CliCompiler.getColumn(columnFamilySpec, 0);
+
+ // do the insert
+ thriftClient_.insert(tableName, key, columnFamily + ":" + columnName,
+ value, System.currentTimeMillis());
+
+ css_.out.println("Value inserted.");
+ }
+ else
+ {
+ /* for now (until we support batch sets) */
+ assert(false);
+ }
+ }
+
+ private void executeShowProperty(CommonTree ast, String propertyName) throws TException
+ {
+ if (!CliMain.isConnected())
+ return;
+
+ String propertyValue = thriftClient_.getStringProperty(propertyName);
+ css_.out.println(propertyValue);
+ return;
+ }
+
+ // process "show tables" statement
+ private void executeShowTables(CommonTree ast) throws TException
+ {
+ if (!CliMain.isConnected())
+ return;
+
+ List<String> tables = thriftClient_.getStringListProperty("tables");
+ for (String table : tables)
+ {
+ css_.out.println(table);
+ }
+ }
+
+ // process a statement of the form: describe table <tablename>
+ private void executeDescribeTable(CommonTree ast) throws TException
+ {
+ if (!CliMain.isConnected())
+ return;
+
+ // Get table name
+ int childCount = ast.getChildCount();
+ assert(childCount == 1);
+ String tableName = ast.getChild(0).getText();
+
+ // Describe and display
+ String describe = thriftClient_.describeTable(tableName);
+ css_.out.println(describe);
+ return;
+ }
+
+ // process a statement of the form: connect hostname/port
+ private void executeConnect(CommonTree ast) throws TException
+ {
+ int portNumber = Integer.parseInt(ast.getChild(1).getText());
+ Tree idList = ast.getChild(0);
+
+ StringBuffer hostName = new StringBuffer();
+ int idCount = idList.getChildCount();
+ for (int idx = 0; idx < idCount; idx++)
+ {
+ hostName.append(idList.getChild(idx).getText());
+ }
+
+ // disconnect current connection, if any.
+ // This is a no-op, if you aren't currently connected.
+ CliMain.disconnect();
+
+ // now, connect to the newly specified host name and port
+ css_.hostName = hostName.toString();
+ css_.thriftPort = portNumber;
+ CliMain.connect(css_.hostName, css_.thriftPort);
+ }
+
+ // execute CQL query on server
+ public void executeQueryOnServer(String query) throws TException
+ {
+ if (!CliMain.isConnected())
+ return;
+
+ CqlResult_t result = thriftClient_.executeQuery(query);
+
+ if (result == null)
+ {
+ css_.out.println("Unexpected error. Received null result from server.");
+ return;
+ }
+
+ if ((result.errorTxt != null) || (result.errorCode != 0))
+ {
+ css_.out.println("Error: " + result.errorTxt);
+ }
+ else
+ {
+ List<Map<String, String>> rows = result.resultSet;
+
+ if (rows != null)
+ {
+ for (Map<String, String> row : rows)
+ {
+ for (Iterator<Map.Entry<String, String>> it = row.entrySet().iterator(); it.hasNext(); )
+ {
+ Map.Entry<String, String> entry = it.next();
+ String key = entry.getKey();
+ String value = entry.getValue();
+ css_.out.print(key + " = " + value + "; ");
+ }
+ css_.out.println();
+ }
+ }
+ css_.out.println("Statement processed.");
+ }
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/cli/CliCompiler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/cli/CliCompiler.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/cli/CliCompiler.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/cli/CliCompiler.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cli;
+
+import org.antlr.runtime.*;
+import org.antlr.runtime.tree.*;
+import org.apache.cassandra.cql.common.Utils;
+
+
+public class CliCompiler
+{
+
+ // ANTLR does not provide case-insensitive tokenization support
+ // out of the box. So we override the LA (lookahead) function
+ // of the ANTLRStringStream class. Note: This doesn't change the
+ // token text-- but just relaxes the matching rules to match
+ // in upper case. [Logic borrowed from Hive code.]
+ //
+ // Also see discussion on this topic in:
+ // http://www.antlr.org/wiki/pages/viewpage.action?pageId=1782.
+ public static class ANTLRNoCaseStringStream extends ANTLRStringStream
+ {
+ public ANTLRNoCaseStringStream(String input)
+ {
+ super(input);
+ }
+
+ public int LA(int i)
+ {
+ int returnChar = super.LA(i);
+ if (returnChar == CharStream.EOF)
+ {
+ return returnChar;
+ }
+ else if (returnChar == 0)
+ {
+ return returnChar;
+ }
+
+ return Character.toUpperCase((char)returnChar);
+ }
+ }
+
+ public static CommonTree compileQuery(String query)
+ {
+ CommonTree queryTree = null;
+ try
+ {
+ ANTLRStringStream input = new ANTLRNoCaseStringStream(query);
+
+ CliLexer lexer = new CliLexer(input);
+ CommonTokenStream tokens = new CommonTokenStream(lexer);
+
+ CliParser parser = new CliParser(tokens);
+
+ // start parsing...
+ queryTree = (CommonTree)(parser.root().getTree());
+
+ // semantic analysis if any...
+ // [tbd]
+
+ }
+ catch(Exception e)
+ {
+ System.err.println("Exception " + e.getMessage());
+ e.printStackTrace(System.err);
+ }
+ return queryTree;
+ }
+ /*
+ * NODE_COLUMN_ACCESS related functions.
+ */
+ public static String getTableName(CommonTree astNode)
+ {
+ assert(astNode.getType() == CliParser.NODE_COLUMN_ACCESS);
+
+ return astNode.getChild(0).getText();
+ }
+
+ public static String getColumnFamily(CommonTree astNode)
+ {
+ assert(astNode.getType() == CliParser.NODE_COLUMN_ACCESS);
+
+ return astNode.getChild(1).getText();
+ }
+
+ public static String getKey(CommonTree astNode)
+ {
+ assert(astNode.getType() == CliParser.NODE_COLUMN_ACCESS);
+
+ return Utils.unescapeSQLString(astNode.getChild(2).getText());
+ }
+
+ public static int numColumnSpecifiers(CommonTree astNode)
+ {
+ // Skip over table, column family and rowKey
+ return astNode.getChildCount() - 3;
+ }
+
+ // Returns the pos'th (0-based index) column specifier in the astNode
+ public static String getColumn(CommonTree astNode, int pos)
+ {
+ // Skip over table, column family and rowKey
+ return Utils.unescapeSQLString(astNode.getChild(pos + 3).getText());
+ }
+
+}