You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2008/08/13 00:35:23 UTC

svn commit: r685353 [7/13] - in /hadoop/core/trunk: ./ src/contrib/chukwa/ src/contrib/chukwa/bin/ src/contrib/chukwa/build/ src/contrib/chukwa/conf/ src/contrib/chukwa/dist/ src/contrib/chukwa/docs/ src/contrib/chukwa/docs/paper/ src/contrib/chukwa/ha...

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/ChukwaArchiveKey.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/ChukwaArchiveKey.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/ChukwaArchiveKey.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/ChukwaArchiveKey.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,286 @@
+// File generated by hadoop record compiler. Do not edit.
+package org.apache.hadoop.chukwa;
+
+public class ChukwaArchiveKey extends org.apache.hadoop.record.Record {
+  private static final org.apache.hadoop.record.meta.RecordTypeInfo _rio_recTypeInfo;
+  private static org.apache.hadoop.record.meta.RecordTypeInfo _rio_rtiFilter;
+  private static int[] _rio_rtiFilterFields;
+  static {
+    _rio_recTypeInfo = new org.apache.hadoop.record.meta.RecordTypeInfo("ChukwaArchiveKey");
+    _rio_recTypeInfo.addField("timePartition", org.apache.hadoop.record.meta.TypeID.LongTypeID);
+    _rio_recTypeInfo.addField("dataType", org.apache.hadoop.record.meta.TypeID.StringTypeID);
+    _rio_recTypeInfo.addField("streamName", org.apache.hadoop.record.meta.TypeID.StringTypeID);
+    _rio_recTypeInfo.addField("seqId", org.apache.hadoop.record.meta.TypeID.LongTypeID);
+  }
+  
+  private long timePartition;
+  private String dataType;
+  private String streamName;
+  private long seqId;
+  public ChukwaArchiveKey() { }
+  public ChukwaArchiveKey(
+    final long timePartition,
+    final String dataType,
+    final String streamName,
+    final long seqId) {
+    this.timePartition = timePartition;
+    this.dataType = dataType;
+    this.streamName = streamName;
+    this.seqId = seqId;
+  }
+  public static org.apache.hadoop.record.meta.RecordTypeInfo getTypeInfo() {
+    return _rio_recTypeInfo;
+  }
+  public static void setTypeFilter(org.apache.hadoop.record.meta.RecordTypeInfo rti) {
+    if (null == rti) return;
+    _rio_rtiFilter = rti;
+    _rio_rtiFilterFields = null;
+  }
+  private static void setupRtiFields()
+  {
+    if (null == _rio_rtiFilter) return;
+    // we may already have done this
+    if (null != _rio_rtiFilterFields) return;
+    int _rio_i, _rio_j;
+    _rio_rtiFilterFields = new int [_rio_rtiFilter.getFieldTypeInfos().size()];
+    for (_rio_i=0; _rio_i<_rio_rtiFilterFields.length; _rio_i++) {
+      _rio_rtiFilterFields[_rio_i] = 0;
+    }
+    java.util.Iterator<org.apache.hadoop.record.meta.FieldTypeInfo> _rio_itFilter = _rio_rtiFilter.getFieldTypeInfos().iterator();
+    _rio_i=0;
+    while (_rio_itFilter.hasNext()) {
+      org.apache.hadoop.record.meta.FieldTypeInfo _rio_tInfoFilter = _rio_itFilter.next();
+      java.util.Iterator<org.apache.hadoop.record.meta.FieldTypeInfo> _rio_it = _rio_recTypeInfo.getFieldTypeInfos().iterator();
+      _rio_j=1;
+      while (_rio_it.hasNext()) {
+        org.apache.hadoop.record.meta.FieldTypeInfo _rio_tInfo = _rio_it.next();
+        if (_rio_tInfo.equals(_rio_tInfoFilter)) {
+          _rio_rtiFilterFields[_rio_i] = _rio_j;
+          break;
+        }
+        _rio_j++;
+      }
+      _rio_i++;
+    }
+  }
+  public long getTimePartition() {
+    return timePartition;
+  }
+  public void setTimePartition(final long timePartition) {
+    this.timePartition=timePartition;
+  }
+  public String getDataType() {
+    return dataType;
+  }
+  public void setDataType(final String dataType) {
+    this.dataType=dataType;
+  }
+  public String getStreamName() {
+    return streamName;
+  }
+  public void setStreamName(final String streamName) {
+    this.streamName=streamName;
+  }
+  public long getSeqId() {
+    return seqId;
+  }
+  public void setSeqId(final long seqId) {
+    this.seqId=seqId;
+  }
+  public void serialize(final org.apache.hadoop.record.RecordOutput _rio_a, final String _rio_tag)
+  throws java.io.IOException {
+    _rio_a.startRecord(this,_rio_tag);
+    _rio_a.writeLong(timePartition,"timePartition");
+    _rio_a.writeString(dataType,"dataType");
+    _rio_a.writeString(streamName,"streamName");
+    _rio_a.writeLong(seqId,"seqId");
+    _rio_a.endRecord(this,_rio_tag);
+  }
+  private void deserializeWithoutFilter(final org.apache.hadoop.record.RecordInput _rio_a, final String _rio_tag)
+  throws java.io.IOException {
+    _rio_a.startRecord(_rio_tag);
+    timePartition=_rio_a.readLong("timePartition");
+    dataType=_rio_a.readString("dataType");
+    streamName=_rio_a.readString("streamName");
+    seqId=_rio_a.readLong("seqId");
+    _rio_a.endRecord(_rio_tag);
+  }
+  public void deserialize(final org.apache.hadoop.record.RecordInput _rio_a, final String _rio_tag)
+  throws java.io.IOException {
+    if (null == _rio_rtiFilter) {
+      deserializeWithoutFilter(_rio_a, _rio_tag);
+      return;
+    }
+    // if we're here, we need to read based on version info
+    _rio_a.startRecord(_rio_tag);
+    setupRtiFields();
+    for (int _rio_i=0; _rio_i<_rio_rtiFilter.getFieldTypeInfos().size(); _rio_i++) {
+      if (1 == _rio_rtiFilterFields[_rio_i]) {
+        timePartition=_rio_a.readLong("timePartition");
+      }
+      else if (2 == _rio_rtiFilterFields[_rio_i]) {
+        dataType=_rio_a.readString("dataType");
+      }
+      else if (3 == _rio_rtiFilterFields[_rio_i]) {
+        streamName=_rio_a.readString("streamName");
+      }
+      else if (4 == _rio_rtiFilterFields[_rio_i]) {
+        seqId=_rio_a.readLong("seqId");
+      }
+      else {
+        java.util.ArrayList<org.apache.hadoop.record.meta.FieldTypeInfo> typeInfos = (java.util.ArrayList<org.apache.hadoop.record.meta.FieldTypeInfo>)(_rio_rtiFilter.getFieldTypeInfos());
+        org.apache.hadoop.record.meta.Utils.skip(_rio_a, typeInfos.get(_rio_i).getFieldID(), typeInfos.get(_rio_i).getTypeID());
+      }
+    }
+    _rio_a.endRecord(_rio_tag);
+  }
+  public int compareTo (final Object _rio_peer_) throws ClassCastException {
+    if (!(_rio_peer_ instanceof ChukwaArchiveKey)) {
+      throw new ClassCastException("Comparing different types of records.");
+    }
+    ChukwaArchiveKey _rio_peer = (ChukwaArchiveKey) _rio_peer_;
+    int _rio_ret = 0;
+    _rio_ret = (timePartition == _rio_peer.timePartition)? 0 :((timePartition<_rio_peer.timePartition)?-1:1);
+    if (_rio_ret != 0) return _rio_ret;
+    _rio_ret = dataType.compareTo(_rio_peer.dataType);
+    if (_rio_ret != 0) return _rio_ret;
+    _rio_ret = streamName.compareTo(_rio_peer.streamName);
+    if (_rio_ret != 0) return _rio_ret;
+    _rio_ret = (seqId == _rio_peer.seqId)? 0 :((seqId<_rio_peer.seqId)?-1:1);
+    if (_rio_ret != 0) return _rio_ret;
+    return _rio_ret;
+  }
+  public boolean equals(final Object _rio_peer_) {
+    if (!(_rio_peer_ instanceof ChukwaArchiveKey)) {
+      return false;
+    }
+    if (_rio_peer_ == this) {
+      return true;
+    }
+    ChukwaArchiveKey _rio_peer = (ChukwaArchiveKey) _rio_peer_;
+    boolean _rio_ret = false;
+    _rio_ret = (timePartition==_rio_peer.timePartition);
+    if (!_rio_ret) return _rio_ret;
+    _rio_ret = dataType.equals(_rio_peer.dataType);
+    if (!_rio_ret) return _rio_ret;
+    _rio_ret = streamName.equals(_rio_peer.streamName);
+    if (!_rio_ret) return _rio_ret;
+    _rio_ret = (seqId==_rio_peer.seqId);
+    if (!_rio_ret) return _rio_ret;
+    return _rio_ret;
+  }
+  public Object clone() throws CloneNotSupportedException {
+    ChukwaArchiveKey _rio_other = new ChukwaArchiveKey();
+    _rio_other.timePartition = this.timePartition;
+    _rio_other.dataType = this.dataType;
+    _rio_other.streamName = this.streamName;
+    _rio_other.seqId = this.seqId;
+    return _rio_other;
+  }
+  public int hashCode() {
+    int _rio_result = 17;
+    int _rio_ret;
+    _rio_ret = (int) (timePartition^(timePartition>>>32));
+    _rio_result = 37*_rio_result + _rio_ret;
+    _rio_ret = dataType.hashCode();
+    _rio_result = 37*_rio_result + _rio_ret;
+    _rio_ret = streamName.hashCode();
+    _rio_result = 37*_rio_result + _rio_ret;
+    _rio_ret = (int) (seqId^(seqId>>>32));
+    _rio_result = 37*_rio_result + _rio_ret;
+    return _rio_result;
+  }
+  public static String signature() {
+    return "LChukwaArchiveKey(lssl)";
+  }
+  public static class Comparator extends org.apache.hadoop.record.RecordComparator {
+    public Comparator() {
+      super(ChukwaArchiveKey.class);
+    }
+    static public int slurpRaw(byte[] b, int s, int l) {
+      try {
+        int os = s;
+        {
+          long i = org.apache.hadoop.record.Utils.readVLong(b, s);
+          int z = org.apache.hadoop.record.Utils.getVIntSize(i);
+          s+=z; l-=z;
+        }
+        {
+          int i = org.apache.hadoop.record.Utils.readVInt(b, s);
+          int z = org.apache.hadoop.record.Utils.getVIntSize(i);
+          s+=(z+i); l-= (z+i);
+        }
+        {
+          int i = org.apache.hadoop.record.Utils.readVInt(b, s);
+          int z = org.apache.hadoop.record.Utils.getVIntSize(i);
+          s+=(z+i); l-= (z+i);
+        }
+        {
+          long i = org.apache.hadoop.record.Utils.readVLong(b, s);
+          int z = org.apache.hadoop.record.Utils.getVIntSize(i);
+          s+=z; l-=z;
+        }
+        return (os - s);
+      } catch(java.io.IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    static public int compareRaw(byte[] b1, int s1, int l1,
+                                   byte[] b2, int s2, int l2) {
+      try {
+        int os1 = s1;
+        {
+          long i1 = org.apache.hadoop.record.Utils.readVLong(b1, s1);
+          long i2 = org.apache.hadoop.record.Utils.readVLong(b2, s2);
+          if (i1 != i2) {
+            return ((i1-i2) < 0) ? -1 : 0;
+          }
+          int z1 = org.apache.hadoop.record.Utils.getVIntSize(i1);
+          int z2 = org.apache.hadoop.record.Utils.getVIntSize(i2);
+          s1+=z1; s2+=z2; l1-=z1; l2-=z2;
+        }
+        {
+          int i1 = org.apache.hadoop.record.Utils.readVInt(b1, s1);
+          int i2 = org.apache.hadoop.record.Utils.readVInt(b2, s2);
+          int z1 = org.apache.hadoop.record.Utils.getVIntSize(i1);
+          int z2 = org.apache.hadoop.record.Utils.getVIntSize(i2);
+          s1+=z1; s2+=z2; l1-=z1; l2-=z2;
+          int r1 = org.apache.hadoop.record.Utils.compareBytes(b1,s1,i1,b2,s2,i2);
+          if (r1 != 0) { return (r1<0)?-1:0; }
+          s1+=i1; s2+=i2; l1-=i1; l1-=i2;
+        }
+        {
+          int i1 = org.apache.hadoop.record.Utils.readVInt(b1, s1);
+          int i2 = org.apache.hadoop.record.Utils.readVInt(b2, s2);
+          int z1 = org.apache.hadoop.record.Utils.getVIntSize(i1);
+          int z2 = org.apache.hadoop.record.Utils.getVIntSize(i2);
+          s1+=z1; s2+=z2; l1-=z1; l2-=z2;
+          int r1 = org.apache.hadoop.record.Utils.compareBytes(b1,s1,i1,b2,s2,i2);
+          if (r1 != 0) { return (r1<0)?-1:0; }
+          s1+=i1; s2+=i2; l1-=i1; l1-=i2;
+        }
+        {
+          long i1 = org.apache.hadoop.record.Utils.readVLong(b1, s1);
+          long i2 = org.apache.hadoop.record.Utils.readVLong(b2, s2);
+          if (i1 != i2) {
+            return ((i1-i2) < 0) ? -1 : 0;
+          }
+          int z1 = org.apache.hadoop.record.Utils.getVIntSize(i1);
+          int z2 = org.apache.hadoop.record.Utils.getVIntSize(i2);
+          s1+=z1; s2+=z2; l1-=z1; l2-=z2;
+        }
+        return (os1 - s1);
+      } catch(java.io.IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    public int compare(byte[] b1, int s1, int l1,
+                         byte[] b2, int s2, int l2) {
+      int ret = compareRaw(b1,s1,l1,b2,s2,l2);
+      return (ret == -1)? -1 : ((ret==0)? 1 : 0);}
+  }
+  
+  static {
+    org.apache.hadoop.record.RecordComparator.define(ChukwaArchiveKey.class, new Comparator());
+  }
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/Chunk.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/Chunk.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/Chunk.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/Chunk.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.chukwa.datacollection.adaptor.*;
+
+/**
+ * A chunk is a sequence of bytes at a particular logical offset in a stream,
+ * and containing one or more "records".
+ *  Chunks have various metadata, such as source, format,
+ * and pointers to record boundaries within the chunk.
+ * 
+ */
+public interface Chunk {
+	
+//these conceptually are really network addresses
+	public String getSource();
+	public void setSource(String logSource);
+	
+	/**
+	 * Get the name of the stream that this Chunk is a chunk of
+	 * @return the name of this stream; e.g. file name
+	 */
+	public String getStreamName();
+	public void setStreamName(String streamName);
+	
+	public String getApplication();  
+  public void setApplication(String a);
+	
+  //These describe the format of the data buffer
+  public String getDataType();
+  public void setDataType(String t);
+
+  /**
+   * @return the user data in the chunk
+   */
+	public byte[] getData();
+	/**
+	 * @param logEvent the user data in the chunk
+	 */
+	public void setData(byte[] logEvent);
+	
+	/**
+	 * get/set the <b>end</b> offsets of records in the buffer.
+	 * 
+	 * We use end, rather than start offsets, since the first start
+	 * offset is always 0, but the last end offset specifies how much of the buffer is valid.
+	 * 
+	 * More precisely, offsets[i] is the offset in the Chunk of the last byte of record i
+	 *  in this chunk.
+	 * @return a list of record end offsets
+	 */
+	public int[] getRecordOffsets();
+	public void setRecordOffsets(int[] offsets);
+	
+	/**
+	 * @return  the byte offset of the first byte not in this chunk.
+	 * 
+	 * We pick this convention so that subtracting sequence IDs yields length.
+	 */
+	public long getSeqID();
+	public void setSeqID(long l);
+
+	/**
+	 * Retrieve a reference to the adaptor that sent this event.
+	 * Used by LocalAgent and Connectors to deliver acks to the appropriate place.
+	 */
+	public Adaptor getInitiator();
+	
+  /**
+   * Estimate the size of this Chunk on the wire, assuming each char of metadata takes two bytes
+   * to serialize.  This is pessimistic.
+   * @return size in bytes that this Chunk might take once serialized.
+   */
+  public int getSerializedSizeEstimate();
+  
+  public void write(DataOutput data) throws IOException;
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/ChunkBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/ChunkBuilder.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/ChunkBuilder.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/ChunkBuilder.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa;
+
+import java.util.*;
+
+import org.apache.hadoop.io.DataOutputBuffer;
+import java.io.*;
+
+/**
+ * Right now, just handles record collection.
+ *
+ */
+public class ChunkBuilder {
+  
+  ArrayList<Integer> recOffsets = new ArrayList<Integer>();
+  int lastRecOffset = -1;
+  DataOutputBuffer buf = new DataOutputBuffer();
+  /**
+   * Adds the data in rec to an internal buffer; rec can be reused immediately.
+   * @param rec
+   */
+  public void addRecord(byte[] rec)  {
+    lastRecOffset = lastRecOffset + rec.length;
+    recOffsets.add(lastRecOffset);
+    try {
+    buf.write(rec);
+    } catch(IOException e) {
+      throw new RuntimeException("buffer write failed.  Out of memory?", e);
+    }
+  }
+  
+  public Chunk getChunk() {
+    ChunkImpl c = new ChunkImpl();
+    c.setData(buf.getData());
+    c.setSeqID(buf.getLength());
+    int[] offsets = new int[recOffsets.size()];
+    for(int i = 0; i < offsets.length; ++i)
+      offsets[i] = recOffsets.get(i);
+    c.setRecordOffsets(offsets);
+    
+    return c;
+  }
+  
+  
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/ChunkImpl.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/ChunkImpl.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/ChunkImpl.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/ChunkImpl.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
+
+public class ChunkImpl implements org.apache.hadoop.io.Writable, Chunk 
+{
+  
+  private String source = "";
+  private String application = "";
+  private String dataType = "";
+  private byte[] data = null;
+  private int[] recordEndOffsets;
+  
+  private String debuggingInfo="";
+  
+  private transient Adaptor initiator;
+  long seqID;
+  
+  ChunkImpl() {
+  }
+  
+  public static ChunkImpl getBlankChunk() {
+    return new ChunkImpl();
+  }
+  
+  public ChunkImpl(String dataType, String streamName, long seq, byte[] data, Adaptor source) {
+    this.seqID = seq;
+    this.application = streamName;
+    this.dataType = dataType;
+    this.data = data;
+    this.initiator = source;
+    this.source = localHostAddr;
+  }
+  
+  /**
+   *  @see org.apache.hadoop.chukwa.Chunk#getData()
+   */
+  public byte[] getData()	{
+  	return data;
+  }
+  
+  /**
+   *  @see org.apache.hadoop.chukwa.Chunk#setData(byte[])
+   */
+  public void setData(byte[] logEvent) {
+  	this.data = logEvent;
+  }
+  
+  /**
+   * @see org.apache.hadoop.chukwa.Chunk#getStreamName()
+   */
+  public String getStreamName() {
+  	return application;
+  }
+  
+  public void setStreamName(String logApplication)	{
+  	this.application = logApplication;
+  }
+   
+  public String getSource() {
+    return source;
+  }
+  
+  public void setSource(String logSource)	{
+  	this.source = logSource;
+  }
+  
+  public String getDebugInfo() {
+  	return debuggingInfo;
+  }
+  
+  public void setDebugInfo(String a) {
+  	this.debuggingInfo = a;
+  }
+  
+  /**
+   * @see org.apache.hadoop.chukwa.Chunk#getSeqID()
+   */
+  public long getSeqID()  {
+    return seqID;
+  }
+  
+  public void setSeqID(long l) {
+    seqID=l;
+  }
+  
+  public String getApplication(){
+    return application;
+  }
+  
+  public void setApplication(String a){
+    application = a;
+  }
+  
+  public Adaptor getInitiator() {
+    return initiator;
+  }
+  
+  public void setInitiator(Adaptor a) {
+    initiator = a;
+  }
+  
+  
+  public void setLogSource() {
+    source = localHostAddr;
+  }
+  
+  public int[] getRecordOffsets() {
+
+    if(recordEndOffsets == null)
+      recordEndOffsets = new int[] {data.length -1};
+    return recordEndOffsets;
+  }
+  
+  public void setRecordOffsets(int[] offsets) {
+    recordEndOffsets = offsets;
+  }
+  
+  public String getDataType() {
+    return dataType;
+  }
+  
+  public void setDataType(String t) {
+    dataType = t;
+  }
+  
+  /**
+   * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
+   */
+  public void readFields(DataInput in) throws IOException {
+    setSeqID(in.readLong());
+    setSource(in.readUTF());
+    setApplication(in.readUTF());
+    setDataType(in.readUTF());
+    setDebugInfo(in.readUTF());
+    
+    int numRecords = in.readInt();
+    recordEndOffsets = new int[numRecords];
+    for(int i=0; i < numRecords; ++i)
+      recordEndOffsets[i] = in.readInt();
+    data = new byte[recordEndOffsets[recordEndOffsets.length -1]+1 ] ;
+    in.readFully(data);
+    
+  }
+  /**
+   * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
+   */
+  public void write(DataOutput out) throws IOException {
+    out.writeLong(seqID);
+    out.writeUTF(source);
+    out.writeUTF(application);
+    out.writeUTF(dataType);
+    out.writeUTF(debuggingInfo);
+    
+    if(recordEndOffsets == null)
+      recordEndOffsets = new int[] {data.length -1};
+      
+    out.writeInt(recordEndOffsets.length);
+    for(int i =0; i < recordEndOffsets.length; ++i)
+      out.writeInt(recordEndOffsets[i]);
+    
+    out.write(data, 0, recordEndOffsets[recordEndOffsets.length -1] + 1); //byte at last offset is valid
+  }
+  
+  public static ChunkImpl read(DataInput in) throws IOException {
+    ChunkImpl w = new ChunkImpl();
+    w.readFields(in);
+    return w;
+  }
+  
+    //FIXME: should do something better here, but this is OK for debugging
+  public String toString() {
+    return source+":" + application +":"+ new String(data)+ "/"+seqID;
+  }
+  
+  private static String localHostAddr;
+  static
+  {
+    try {
+      localHostAddr = InetAddress.getLocalHost().getHostName();
+    } catch (UnknownHostException e) {
+      localHostAddr = "localhost";
+    }
+  }
+  
+  /**
+   * @see org.apache.hadoop.chukwa.Chunk#getSerializedSizeEstimate()
+   */
+  public int getSerializedSizeEstimate() {
+    int size= 2 * (source.length() + application.length() + 
+        dataType.length() + debuggingInfo.length()); //length of strings (pessimistic)
+    size += data.length + 4;
+    if(recordEndOffsets == null)
+      size+=8;
+    else
+      size += 4 * (recordEndOffsets.length + 1); //+1 for length of array
+    size += 8; //uuid
+    return size;
+  }
+
+  public void setRecordOffsets(java.util.Collection<Integer> carriageReturns)
+  {
+    recordEndOffsets = new int [carriageReturns.size()];
+    int i = 0;
+    for(Integer offset:carriageReturns )
+      recordEndOffsets[i++] = offset;
+  }
+	
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/conf/ChukwaConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/conf/ChukwaConfiguration.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/conf/ChukwaConfiguration.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/conf/ChukwaConfiguration.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.conf;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+public class ChukwaConfiguration extends Configuration {
+	static Logger log = Logger.getLogger(ChukwaConfiguration.class);
+
+	public ChukwaConfiguration() {
+		this(true);
+	}
+
+	public ChukwaConfiguration(boolean loadDefaults) {
+		super();
+		if (loadDefaults) {
+		  String chukwaHome = System.getenv("CHUKWA_HOME");
+		  if (chukwaHome == null)
+		    chukwaHome = ".";
+		  log.info("chukwaHome is " + chukwaHome);
+		  
+			super.addResource(new Path(chukwaHome + "/conf/chukwa-collector-conf.xml"));
+			log.debug("added chukwa-collector-conf.xml to ChukwaConfiguration");
+			
+			super.addResource(new Path(chukwaHome + "/conf/chukwa-agent-conf.xml"));
+			log.debug("added chukwa-agent-conf.xml to ChukwaConfiguration");
+		}
+	}
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/ChunkQueue.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/ChunkQueue.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/ChunkQueue.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/ChunkQueue.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection;
+
+import java.util.List;
+
+import org.apache.hadoop.chukwa.Chunk;
+
+/**
+ * A generic interface for queues of Chunks.
+ * 
+ * Differs from a normal queue interface primarily by having collect().
+ */
+public interface ChunkQueue extends ChunkReceiver
+{
+  /**
+   *  Add a chunk to the queue, blocking if queue is full.
+   * @param event
+   * @throws InterruptedException if thread is interrupted while blocking
+   */
+	public void add(Chunk event) throws InterruptedException;
+	
+	/**
+	 * Return at least one, and no more than count, Chunks into events.
+	 * Blocks if queue is empty.
+	 */
+	public void collect(List<Chunk> events,int count) throws InterruptedException;
+	
+	/**
+	 * Return an approximation of the number of chunks in the queue currently.
+	 * No guarantees are made about the accuracy of this number. 
+	 */
+	public int size();
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/ChunkReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/ChunkReceiver.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/ChunkReceiver.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/ChunkReceiver.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,15 @@
+package org.apache.hadoop.chukwa.datacollection;
+
+import org.apache.hadoop.chukwa.Chunk;
+
+public interface ChunkReceiver {
+  
+  /**
+   *  Add a chunk to the queue, potentially blocking.
+   * @param event
+   * @throws InterruptedException if thread is interrupted while blocking
+   */
+  public void add(Chunk event) throws java.lang.InterruptedException;
+  
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/DataFactory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/DataFactory.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/DataFactory.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/DataFactory.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.chukwa.datacollection.sender.RetryListOfCollectors;
+
+import org.apache.hadoop.chukwa.datacollection.agent.*;
+import org.apache.log4j.Logger;
+
+public class DataFactory
+{
+  static Logger log = Logger.getLogger(DataFactory.class);
+	static final int QUEUE_SIZE_KB = 10 * 1024;
+	private static DataFactory dataFactory = null;
+	private ChunkQueue chunkQueue = new MemLimitQueue(QUEUE_SIZE_KB * 1024);
+
+	static 
+	{
+		dataFactory = new DataFactory();
+	}
+
+	private DataFactory()
+	{}
+
+	public static DataFactory getInstance() {
+		return dataFactory;
+	}
+
+	public ChunkQueue getEventQueue() {
+		return chunkQueue;
+	}
+
+	/**
+	 * @return empty list if file does not exist
+	 * @throws IOException on other error
+	 */
+	public Iterator<String> getCollectors() throws IOException
+	{
+	  String chukwaHome = System.getenv("CHUKWA_HOME");
+	  if (chukwaHome == null){
+	    chukwaHome = ".";
+	  }
+	  log.info("setting up collectors file: " + chukwaHome + "/conf/collectors");
+		File collectors = new File(chukwaHome + "/conf/collectors");
+		try{
+		  return new RetryListOfCollectors(collectors, 1000 * 15);//time is ms between tries
+		} catch(java.io.IOException e) {
+			log.error("failed to read collectors file: ", e);
+			throw e;
+		}
+	}
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.adaptor;
+import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
+
+/**
+ * An adaptor is a component that runs within the Local Agent, producing 
+ * chunks of monitoring data.
+ * 
+ * An adaptor can, but need not, have an associated thread. If an adaptor
+ * lacks a thread, it needs to arrange some mechanism to periodically get control
+ * and send reports such as a callback somewhere.
+ * 
+ * Adaptors must be able to stop and resume without losing data, using
+ * a byte offset in the stream.
+ * 
+ * If an adaptor crashes at byte offset n, and is restarted at byte offset k,
+ * with k < n, it is allowed to send different values for bytes k through n the 
+ * second time around.  However, the stream must still be parseable, assuming that
+ * bytes 0-k come from the first run,and bytes k - n come from the second.
+ */
+public interface Adaptor
+{
+  /**
+   * Start this adaptor
+   * @param type the application type, who is starting this adaptor
+   * @param status the status string to use for configuration.
+   * @param offset the stream offset of the first byte sent by this adaptor
+   * @throws AdaptorException
+   */
+	public void start(String type, String status, long offset, ChunkReceiver dest) throws AdaptorException;
+	
+	/**
+	 * Return the adaptor's state
+	 * Should not include class name, datatype or byte offset, which are written by caller.
+	 * @return the adaptor state as a string
+	 * @throws AdaptorException
+	 */
+	public String getCurrentStatus() throws AdaptorException;
+	public String getType();
+	/**
+	 * Signals this adaptor to come to an orderly stop.
+	 * The adaptor ought to push out all the data it can
+	 * before exiting.
+	 * 
+	 * This method is synchronous:
+	 * In other words, after shutdown() returns, no new data should be written.
+	 * 
+	 * @return the logical offset at which the adaptor stops
+	 * @throws AdaptorException
+	 */
+	public long shutdown() throws AdaptorException;
+	
+	/**
+	 * Signals this adaptor to come to an abrupt stop, as quickly as it can.
+	 * The use case here is "Whups, I didn't mean to start that adaptor tailing
+	 * a gigabyte file, stop it now".
+	 * 
+	 * Adaptors might need to do something nontrivial here, e.g., in the case in which  
+	 * they have registered periodic timer interrupts, or use a shared worker thread
+	 * from which they need to disengage.
+	 * 
+	 * This method is synchronous:
+   * In other words, after shutdown() returns, no new data should be written.
+   *
+	 * @throws AdaptorException
+	 */
+	public void hardStop() throws AdaptorException;
+
+}
\ No newline at end of file

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AdaptorException.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AdaptorException.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AdaptorException.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AdaptorException.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.adaptor;
+
+public class AdaptorException extends Exception
+{
+
+	private static final long serialVersionUID = -8490279345367308690L;
+
+	public AdaptorException()
+	{
+		super();
+	}
+
+	public AdaptorException(String arg0, Throwable arg1)
+	{
+		super(arg0, arg1);
+	}
+
+	public AdaptorException(String arg0)
+	{
+		super(arg0);
+	}
+
+	public AdaptorException(Throwable arg0)
+	{
+		super(arg0);
+	}
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/ExecAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/ExecAdaptor.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/ExecAdaptor.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/ExecAdaptor.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.adaptor;
+
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
+import org.apache.hadoop.chukwa.inputtools.plugin.ExecPlugin;
+import org.apache.log4j.helpers.ISO8601DateFormat;
+import org.json.JSONException;
+import org.json.JSONObject;
+import java.util.*;
+
+public class ExecAdaptor extends ExecPlugin implements Adaptor {
+
+  static final boolean FAKE_LOG4J_HEADER = true;
+   
+  class RunToolTask extends TimerTask {
+    public void run() {
+      JSONObject o = execute();
+      try {
+        
+        if(o.getInt("status") == statusKO)
+          hardStop();
+        
+         //FIXME: downstream customers would like timestamps here.
+         //Doing that efficiently probably means cutting out all the
+         //excess buffer copies here, and appending into an OutputBuffer. 
+        byte[] data;
+        if(FAKE_LOG4J_HEADER) {
+          StringBuilder result = new StringBuilder();
+          ISO8601DateFormat dateFormat = new org.apache.log4j.helpers.ISO8601DateFormat();
+          result.append(dateFormat.format(new java.util.Date()));
+          result.append(" INFO org.apache.hadoop.chukwa.");
+          result.append(type);
+          result.append(": ");  
+          result.append(o.getString("stdout"));
+          data = result.toString().getBytes();
+        } else {
+          String stdout = o.getString("stdout");
+          data = stdout.getBytes();
+        }
+ 
+        ArrayList<Integer> carriageReturns = new  ArrayList<Integer>();
+        for(int i = 0; i < data.length ; ++i)
+          if(data[i] == '\n')
+            carriageReturns.add(i);
+        
+        sendOffset += data.length;
+        ChunkImpl c = new ChunkImpl(ExecAdaptor.this.type,
+            "results from " + cmd, sendOffset , data, ExecAdaptor.this);
+        c.setRecordOffsets(carriageReturns);
+        dest.add(c);
+      } catch(JSONException e ) {
+        //FIXME: log this somewhere
+      } catch (InterruptedException e)  {
+        // TODO Auto-generated catch block
+      }catch(AdaptorException e ) {
+        //FIXME: log this somewhere
+      }
+    }
+  };
+  
+  String cmd;
+  String type;
+  ChunkReceiver dest;
+  final java.util.Timer timer;
+  long period = 5 * 1000;
+  volatile long sendOffset = 0;
+  
+  public ExecAdaptor() {
+    timer = new java.util.Timer();
+  }
+  
+  @Override
+  public String getCurrentStatus() throws AdaptorException {
+    return cmd;
+  }
+
+  @Override
+  public void hardStop() throws AdaptorException {
+    super.stop();
+    timer.cancel();
+  }
+
+  @Override
+  public long shutdown() throws AdaptorException   { 
+    try {
+      timer.cancel();
+      super.waitFor(); //wait for last data to get pushed out
+    } catch(InterruptedException e) {
+     return sendOffset; 
+    }
+    return sendOffset;
+  }
+
+  @Override
+  public void start(String type, String status, long offset, ChunkReceiver dest)
+      throws AdaptorException
+  {
+    cmd = status;
+    this.type = type;
+    this.dest = dest;
+    this.sendOffset = offset;
+    
+    TimerTask exec = new RunToolTask();
+    timer.schedule(exec, 0L, period);
+  }
+
+  @Override
+  public String getCmde() {
+    return cmd;
+  }
+  
+
+  @Override
+  public String getType() {
+    return type;
+  }
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/CharFileTailingAdaptorUTF8.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/CharFileTailingAdaptorUTF8.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/CharFileTailingAdaptorUTF8.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/CharFileTailingAdaptorUTF8.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
+
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
+import java.util.ArrayList;
+
+/**
+ * A subclass of FileTailingAdaptor that reads UTF8/ascii
+ * files and splits records at carriage returns.
+ *
+ */
+public class CharFileTailingAdaptorUTF8 extends FileTailingAdaptor {
+  
+
+
+  private static final char SEPARATOR = '\n';
+  
+  private ArrayList<Integer> offsets = new ArrayList<Integer>();
+  
+  /**
+   * 
+   * Note: this method uses a temporary ArrayList (shared across instances).
+   * This means we're copying ints each time. This could be a performance issue.
+   * Also, 'offsets' never shrinks, and will be of size proportional to the 
+   * largest number of lines ever seen in an event.
+   */
+  @Override
+    protected int extractRecords(ChunkReceiver eq, long buffOffsetInFile, byte[] buf)
+    throws InterruptedException
+  {
+      for(int i = 0; i < buf.length; ++i) {
+        if(buf[i] == SEPARATOR) {
+          offsets.add(i);
+        }
+      }
+
+      if(offsets.size() > 0)  {
+        int[] offsets_i = new int[offsets.size()];
+        for(int i = 0; i < offsets_i.length ; ++i)
+          offsets_i[i] = offsets.get(i);
+      
+        int bytesUsed = offsets_i[offsets_i.length-1]  + 1; //char at last offset uses a byte
+        assert bytesUsed > 0: " shouldn't send empty events";
+        ChunkImpl event = new ChunkImpl(type, toWatch.getAbsolutePath(),buffOffsetInFile + bytesUsed, buf, this );
+
+        event.setRecordOffsets(offsets_i);
+        eq.add(event);
+        
+        offsets.clear();
+        return bytesUsed;
+      }
+      else
+        return 0;
+      
+  }
+
+  
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/CharFileTailingAdaptorUTF8NewLineEscaped.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/CharFileTailingAdaptorUTF8NewLineEscaped.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/CharFileTailingAdaptorUTF8NewLineEscaped.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/CharFileTailingAdaptorUTF8NewLineEscaped.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
+
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
+import org.apache.hadoop.chukwa.util.RecordConstants; 
+
+import java.util.ArrayList;
+
+
+/**
+ * A subclass of FileTailingAdaptor that reads UTF8/ascii
+ * files and splits records at non-escaped carriage returns
+ *
+ */
+public class CharFileTailingAdaptorUTF8NewLineEscaped extends FileTailingAdaptor {
+  
+
+  private static final char SEPARATOR = '\n';
+  
+  private ArrayList<Integer> offsets = new ArrayList<Integer>();
+  
+  /**
+   * 
+   * Note: this method uses a temporary ArrayList (shared across instances).
+   * This means we're copying ints each time. This could be a performance issue.
+   * Also, 'offsets' never shrinks, and will be of size proportional to the 
+   * largest number of lines ever seen in an event.
+   */
+  @Override
+    protected int extractRecords(ChunkReceiver eq, long buffOffsetInFile, byte[] buf)
+    throws InterruptedException
+  {
+    String es = RecordConstants.RECORD_SEPARATOR_ESCAPE_SEQ;
+    for(int i = 0; i < buf.length; ++i) {
+        // if this is a separator
+      if(buf[i] == SEPARATOR){
+        // if possibly preceded by escape sequence (avoid outOfBounds here)
+        boolean escaped = false; // was it escaped?
+        if (i-es.length() >= 0){
+          escaped = true; // maybe (at least there was room for an escape sequence, so let's check for the e.s.)
+          for (int j = 0; j < es.length(); j++){
+            if (buf[i-es.length()+j] != es.charAt(j)){
+              escaped = false;
+            }
+          }
+        }
+        if (!escaped){
+          offsets.add(i);
+        }
+      }
+    }
+
+    if(offsets.size() > 0)  {
+      int[] offsets_i = new int[offsets.size()];
+      for(int i = 0; i < offsets_i.length ; ++i)
+        offsets_i[i] = offsets.get(i);
+      //make the stream unique to this adaptor 
+      int bytesUsed = offsets_i[offsets_i.length-1]  + 1; //char at last offset uses a byte
+      assert bytesUsed > 0: " shouldn't send empty events";
+      ChunkImpl chunk = new ChunkImpl(type, toWatch.getAbsolutePath(),
+           buffOffsetInFile + bytesUsed, buf,this);
+      
+      chunk.setSeqID(buffOffsetInFile + bytesUsed);
+      chunk.setRecordOffsets(offsets_i);
+      eq.add(chunk);
+      
+      offsets.clear();
+      return bytesUsed;
+    }
+    else
+      return 0;
+  }
+
+  public String toString() {
+    return "escaped newline CFTA-UTF8";
+  }
+
+  
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
+
+import java.util.*;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.apache.hadoop.chukwa.datacollection.DataFactory;
+import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
+
+/**
+ * A shared thread used by all FileTailingAdaptors. 
+ * 
+ * For now, it tries each file in succession. If it gets through every
+ * file within two seconds, and no more data remains, it will sleep.
+ * 
+ * If there was still data available in any file, the adaptor will loop again.
+ *
+ */
+class FileTailer extends Thread {
+  private  List<FileTailingAdaptor> adaptors;
+  ChunkQueue eq; //not private -- useful for file tailing adaptor classes
+  
+  /**
+   * How often to tail each file.
+   */
+  int SAMPLE_PERIOD_MS = 1000* 2; //FIXME: should be configurable
+  
+  FileTailer() {
+     eq = DataFactory.getInstance().getEventQueue();
+     
+       //iterations are much more common than adding a new adaptor
+     adaptors = new CopyOnWriteArrayList<FileTailingAdaptor>();
+
+     this.setDaemon(true);
+     start();//start the file-tailing thread
+  }
+   
+  //called by FileTailingAdaptor, only
+   void startWatchingFile(FileTailingAdaptor f) {
+       adaptors.add(f);
+   }
+
+   //called by FileTailingAdaptor, only
+   void stopWatchingFile(FileTailingAdaptor f) {
+     adaptors.remove(f);
+   }
+   
+  public void run()  {
+    try{
+      while(true) {
+        boolean shouldISleep = true;
+        long startTime = System.currentTimeMillis();
+        for(FileTailingAdaptor f: adaptors) {
+          boolean hasMoreData = f.tailFile(eq);   
+          shouldISleep &= !hasMoreData;
+        }
+        long timeToReadFiles = System.currentTimeMillis() - startTime;
+        assert timeToReadFiles >= 0 : " time shouldn't go backwards";
+        if(timeToReadFiles < SAMPLE_PERIOD_MS && shouldISleep)
+          Thread.sleep(SAMPLE_PERIOD_MS - timeToReadFiles+1);
+      }
+    }
+    catch(InterruptedException e)
+    {}
+  }
+  
+  
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
+
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
+import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
+import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
+import org.apache.log4j.Logger;
+
+import java.io.*;
+
+/**
+ * An adaptor that repeatedly tails a specified file, sending the new bytes.
+ * This class does not split out records, but just sends everything up to end of file.
+ * Subclasses can alter this behavior by overriding extractRecords().
+ * 
+ */
+public class FileTailingAdaptor implements Adaptor
+{
+
+	static Logger log;
+
+	/**
+	 * This is the maximum amount we'll read from any one file before moving on
+	 * to the next. This way, we get quick response time for other files if one
+	 * file is growing rapidly.
+	 */
+	public static final int MAX_READ_SIZE = 128 * 1024;
+
+	File toWatch;
+	/**
+	 * next PHYSICAL offset to read
+	 */
+	protected long fileReadOffset;
+	protected String type;
+	private ChunkReceiver dest;
+	
+	/**
+	 * The logical offset of the first byte of the file
+	 */
+	private long offsetOfFirstByte = 0;
+	
+	private static FileTailer tailer;
+
+	static {
+		tailer = new FileTailer();
+		log =Logger.getLogger(FileTailingAdaptor.class);
+	}
+
+	public void start(String type, String params, long bytes, ChunkReceiver dest) {
+	  //in this case params = filename 
+		log.info("started file tailer on file " + params);
+	  this.type = type;
+	  this.dest = dest;
+			  
+	  String[] words = params.split(" ");
+	  if(words.length > 1) {
+	    offsetOfFirstByte = Long.parseLong(words[0]);
+	    toWatch = new File(params.substring(words[0].length() + 1));
+	  }
+	  else
+	    toWatch = new File(params);
+	  
+		this.fileReadOffset= bytes - offsetOfFirstByte;
+		tailer.startWatchingFile(this);
+	}
+
+	/**
+	 * Do one last tail, and then stop
+	 * @see org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor#shutdown()
+	 */
+	public long shutdown() throws AdaptorException {
+	  try{
+	    tailFile(tailer.eq); // get tail end of file.
+	  } catch(InterruptedException e) {
+	    Thread.currentThread().interrupt();
+	  }
+		hardStop();//need to do this afterwards, so that offset stays visible during tailFile().
+		return fileReadOffset + offsetOfFirstByte;
+	}
+	/**
+	 * Stop tailing the file, effective immediately.
+	 */
+	public void hardStop() throws AdaptorException {
+    tailer.stopWatchingFile(this);
+	}
+
+  /**
+   * @see org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor#getCurrentStatus()
+   */
+	public String getCurrentStatus() {
+		return type + " " + offsetOfFirstByte+ " " + toWatch.getPath();
+		// can make this more efficient using a StringBuilder
+	}
+
+	public String toString() {
+		return "Tailer on " + toWatch;
+	}
+
+	/**
+	 * Looks at the tail of the associated file, adds some of it to event queue
+	 * This method is not thread safe. Returns true if there's more data in the
+	 * file
+	 * 
+	 * @param eq the queue to write Chunks into
+	 */
+	public synchronized boolean tailFile(ChunkReceiver eq) throws InterruptedException {
+    boolean hasMoreData = false;
+    try {
+      if(!toWatch.exists())
+        return false;  //no more data
+      
+    	RandomAccessFile reader = new RandomAccessFile(toWatch, "r");
+    	long len = reader.length();
+    	if (len > fileReadOffset) {
+    		reader.seek(fileReadOffset);
+    
+    		long bufSize = len - fileReadOffset;
+    		if (bufSize > MAX_READ_SIZE) {
+    			bufSize = MAX_READ_SIZE;
+    			hasMoreData = true;
+    		}
+    		byte[] buf = new byte[(int) bufSize];
+    		reader.read(buf);
+    		assert reader.getFilePointer() == fileReadOffset + bufSize : " event size arithmetic is broken: "
+    				+ " pointer is "
+    				+ reader.getFilePointer()
+    				+ " but offset is " + fileReadOffset + bufSize;
+    
+    		int bytesUsed = extractRecords(dest, fileReadOffset + offsetOfFirstByte, buf);
+    		fileReadOffset = fileReadOffset + bytesUsed;
+    	}
+    	reader.close();
+    } catch (IOException e) {
+    	log.warn("failure reading " + toWatch, e);
+    }
+    return hasMoreData;
+	}
+	
+  /**
+   * Extract records from a byte sequence
+   * @param eq the queue to stick the new chunk[s] in
+   * @param buffOffsetInFile the byte offset in the stream at which buf[] begins
+   * @param buf the byte buffer to extract records from
+   * @return the number of bytes processed
+   * @throws InterruptedException
+   */
+  protected int extractRecords(ChunkReceiver eq, long buffOffsetInFile, byte[] buf)
+      throws InterruptedException
+  {
+    ChunkImpl chunk = new ChunkImpl(type, toWatch.getAbsolutePath(), buffOffsetInFile + buf.length,
+        buf, this);
+
+    eq.add(chunk);
+    return buf.length;
+  }
+
+  @Override
+  public String getType() {
+    return type;
+  }
+
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorFactory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorFactory.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorFactory.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorFactory.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.agent;
+
+import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
+
+/**
+ * Produces new unconfigured adaptors, given the class name of the appender type
+ * 
+ */
+public class AdaptorFactory {
+   
+    /**
+     * Instantiate an adaptor that can be added by the {@link ChukwaAgent}
+     * @param className the name of the {@link Adaptor} class to instantiate
+     * @return an Adaptor of the specified type
+     */
+    static Adaptor createAdaptor(String className){
+    Object obj = null;
+    try{
+      //the following reflection business for type checking is probably unnecessary
+      //since it will just throw a ClassCastException on error anyway.
+      obj = Class.forName(className).newInstance();
+      if (Adaptor.class.isInstance(obj)){
+        return (Adaptor) obj;
+      }
+      else return null;
+    } catch (Exception e){
+      System.out.println("Error instantiating new adaptor by class" + e);
+      return null;
+    }
+    
+  }
+  
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.agent;
+import java.net.*;
+import java.io.*;
+
+import org.apache.hadoop.chukwa.datacollection.adaptor.*;
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.log4j.Logger;
+import java.util.Map;
+
+/**
+ * Class to handle the agent control protocol.
+ * This is a simple line-oriented ASCII protocol, that is designed
+ * to be easy to work with both programmatically and via telnet.
+ *
+ *  The port to bind to can be specified by setting option
+ *     chukwaAgent.agent.control.port
+ */
+public class AgentControlSocketListener extends Thread {
+
+
+  static Logger log= Logger.getLogger(AgentControlSocketListener.class);
+  
+  ChukwaAgent agent;
+  int portno;
+  ServerSocket s= null;
+  boolean closing = false;
+  
+  private class ListenThread extends Thread
+  {
+    Socket connection;
+    ListenThread(Socket conn)  {
+      connection = conn;
+      this.setName("listen thread for " + connection.getRemoteSocketAddress());
+    }
+    
+    public void run()  {
+      try {
+      InputStream in = connection.getInputStream();
+      BufferedReader br = new BufferedReader(new InputStreamReader(in));
+      PrintStream out = new PrintStream(new BufferedOutputStream(connection.getOutputStream()));
+      String cmd = null;
+      while((cmd = br.readLine()) != null)  {
+        processCommand(cmd, out);
+      }
+      log.info("control connection closed");
+      }
+      catch(SocketException e ) {
+        if(e.getMessage().equals("Socket Closed"))
+          log.info("control socket closed");
+      } catch(IOException e)  {
+        log.warn("a control connection broke", e);
+      }
+    }
+    
+    /**
+     * process a protocol command
+     * @param cmd the command given by the user
+     * @param out  a PrintStream writing to the socket
+     * @throws IOException
+     */
+    public void processCommand(String cmd, PrintStream out) throws IOException  {
+      String[] words = cmd.split(" ");
+      log.info("command from " + connection.getRemoteSocketAddress() + ":"+ cmd);
+      
+      if(words[0].equalsIgnoreCase("help"))  {
+        out.println("you're talking to the Chukwa agent.  Commands available: ");
+        out.println("add [adaptorname] [args] [offset] -- start an adaptor");
+        out.println("shutdown [adaptornumber]  -- graceful stop");
+        out.println("stop [adaptornumber]  -- abrupt stop");
+        out.println("list -- list running adaptors");
+        out.println("close -- close this connection");
+        out.println("stopagent -- stop the whole agent process");
+        out.println("help -- print this message");
+        out.println("\t Command names are case-blind.");
+      }
+      else if(words[0].equalsIgnoreCase("close"))  {
+        connection.close();
+      }
+      else if(words[0].equalsIgnoreCase("add"))   {
+        long newID = agent.processCommand(cmd);
+        if(newID != -1)
+          out.println("OK add completed; new ID is " +newID);
+        else
+          out.println("failed to start adaptor...check logs for details");
+      }
+      else if(words[0].equalsIgnoreCase("shutdown"))  {
+        if(words.length < 2) {
+          out.println("need to specify an adaptor to shut down, by number");
+        }
+        else {
+          long num = Long.parseLong(words[1]);
+          long offset = agent.stopAdaptor(num, true);
+          if(offset != -1)
+            out.println("OK adaptor "+ num+ " stopping gracefully at " + offset);
+          else
+            out.println("FAIL: perhaps adaptor " + num + " does not exist");
+        }
+      }     
+      else if(words[0].equalsIgnoreCase("stop"))  {
+        if(words.length < 2) {
+          out.println("need to specify an adaptor to shut down, by number");
+        } else {
+          long num = Long.parseLong(words[1]);
+          agent.stopAdaptor(num, false);
+          out.println("OK adaptor "+ num+ " stopped");
+        }
+      }
+      else if(words[0].equalsIgnoreCase("list") )  {
+        Map<Long, Adaptor> adaptorsByNumber = agent.getAdaptorList();
+        System.out.println("number of adaptors: " + adaptorsByNumber.size());
+        synchronized(adaptorsByNumber)   {
+          for(Map.Entry<Long, Adaptor> a: adaptorsByNumber.entrySet())  {
+            try{
+              out.print(a.getKey());
+              out.print(") ");
+              out.print(" ");
+              out.println(formatAdaptorStatus(a.getValue()));
+            }  catch(AdaptorException e)  {
+              log.error(e);
+            }
+          }
+          out.println("");
+        }
+      } else if(words[0].equalsIgnoreCase("stopagent")) {
+        out.println("stopping agent process.");
+        connection.close();
+        agent.shutdown(true);
+      }
+      else  {
+        log.warn("unknown command " + words[0]);
+        out.println("unknown command" + words[0]);
+        out.println("say 'help' for a list of legal commands");
+      }
+      out.flush();
+    }
+    
+  }
+  /**
+   * Initializes listener, but does not bind to socket.
+   * @param a the agent to control
+   */
+  public AgentControlSocketListener(ChukwaAgent a)
+  {
+    ChukwaConfiguration conf = new ChukwaConfiguration();
+    this.setDaemon(false); //to keep the local agent alive
+    agent = a;
+    portno = conf.getInt("chukwaAgent.agent.control.port", 9093);
+    log.info("AgentControlSocketListerner port set to " + portno);
+    this.setName("control socket listener");
+  }
+  
+  public String formatAdaptorStatus(Adaptor a)  throws AdaptorException  {
+    return a.getClass().getCanonicalName() + " " + a.getCurrentStatus() + " " + agent.getOffset(a);
+  }
+
+  /**
+   * Binds to socket, starts looping listening for commands
+   */
+  public void run()  {
+    try {
+      if(!isBound()) 
+        tryToBind();
+    } catch(IOException e) {
+      return;
+    }
+    
+    while(!closing)
+    {
+      try {
+        Socket connection = s.accept();
+        log.info("new connection from " + connection.getInetAddress());
+        ListenThread l = new ListenThread(connection);
+        l.setDaemon(true);
+        l.start();
+      } catch(IOException e)  {
+        if(!closing)
+          log.warn("control socket error: ",e );
+        else {
+          log.info("shutting down listen thread due to shutdown() call");
+          break;
+        }
+      }
+    }//end while
+  }
+  /**
+   * Close the control socket, and exit. Triggers graceful thread shutdown.
+   */
+  public void shutdown()  {
+    closing = true;
+    try{
+      if(s != null)
+        s.close();
+      s = null;
+    }
+    catch(IOException e)
+    {}  //ignore exception on close
+  }
+
+  public boolean isBound() {
+    return s!= null &&  s.isBound();
+  }
+
+  public void tryToBind() throws IOException
+  {
+    s= new ServerSocket(portno);
+    if(s.isBound())
+      log.debug("socket bound to " + portno);
+    else
+      log.debug("socket isn't bound");
+     
+  }
+  
+}